electric_sql_client/
lib.rs1use reqwest::{header::HeaderMap, StatusCode, Url};
2
3const ENDPOINT_SHAPE: &str = "/v1/shape";
4const NEGATIVE_ONE: &str = "-1";
5
6pub struct ShapeStream {
7 cursor: Option<String>,
8 handle: Option<String>,
9 host: String,
10 is_up_to_date: bool,
11 offset: String,
12 table: String,
13}
14
15impl ShapeStream {
16 pub fn new(host: &str, table: &str, offset: Option<&str>) -> Self {
17 let offset = offset.unwrap_or(NEGATIVE_ONE).to_string();
18
19 Self {
20 cursor: None,
21 handle: None,
22 host: host.to_string(),
23 is_up_to_date: false,
24 offset,
25 table: table.to_string(),
26 }
27 }
28}
29
30impl ShapeStream {
31 fn set_offset(&mut self, headers: &HeaderMap) {
32 self.offset = headers
33 .get("electric-offset")
34 .expect("electric-offset is missing in the header")
35 .to_str()
36 .expect("header electric-offset contains non-ASCII chars")
37 .to_string();
38 }
39
40 fn set_handle(&mut self, headers: &HeaderMap) {
41 self.handle = headers.get("electric-handle").map(|header| {
42 header
43 .to_str()
44 .expect("header electric-handle contains non-ASCII chars")
45 .to_string()
46 });
47 }
48
49 fn set_cursor(&mut self, headers: &HeaderMap) {
50 self.cursor = headers.get("electric-cursor").map(|header| {
51 header
52 .to_str()
53 .expect("header electric-cursor contains non-ASCII chars")
54 .to_string()
55 });
56 }
57
58 fn set_is_up_to_date(&mut self, headers: &HeaderMap) {
59 self.is_up_to_date = headers.get("electric-up-to-date").is_some();
60 }
61
62 fn get_url(&self) -> Result<Url, Box<dyn std::error::Error>> {
63 let base = Url::parse(&self.host)?;
64 let mut url = base.join(ENDPOINT_SHAPE)?;
65
66 url.query_pairs_mut()
67 .append_pair("table", &self.table)
68 .append_pair("offset", &self.offset);
69
70 if let Some(handle) = &self.handle {
71 url.query_pairs_mut().append_pair("handle", handle);
72 }
73
74 if self.is_up_to_date {
75 if let Some(cursor) = &self.cursor {
76 url.query_pairs_mut().append_pair("cursor", cursor);
77 }
78 url.query_pairs_mut().append_pair("live", "true");
79 }
80
81 Ok(url)
82 }
83
84 pub async fn fetch(&mut self) -> Result<Vec<String>, Box<dyn std::error::Error>> {
85 let mut messages = Vec::new();
86
87 loop {
88 let url = self.get_url()?;
89 let resp = reqwest::get(url).await?;
90 let status = resp.status();
91
92 if !status.is_success() {
93 eprintln!("request is unsuccessful: {}", status);
94 break;
95 }
96
97 let headers = resp.headers();
98
99 self.set_offset(headers);
100 self.set_handle(headers);
101 self.set_cursor(headers);
102 self.set_is_up_to_date(headers);
103
104 if status == StatusCode::NO_CONTENT {
105 continue;
106 }
107
108 let body = resp.text().await?;
109 messages.push(body);
110
111 if self.is_up_to_date {
112 break;
113 }
114 }
115
116 Ok(messages)
117 }
118}