electric_sql_client/
lib.rs

1use reqwest::{header::HeaderMap, StatusCode, Url};
2
3const ENDPOINT_SHAPE: &str = "/v1/shape";
4const NEGATIVE_ONE: &str = "-1";
5
6pub struct ShapeStream {
7    cursor: Option<String>,
8    handle: Option<String>,
9    host: String,
10    is_up_to_date: bool,
11    offset: String,
12    table: String,
13}
14
15impl ShapeStream {
16    pub fn new(host: &str, table: &str, offset: Option<&str>) -> Self {
17        let offset = offset.unwrap_or(NEGATIVE_ONE).to_string();
18
19        Self {
20            cursor: None,
21            handle: None,
22            host: host.to_string(),
23            is_up_to_date: false,
24            offset,
25            table: table.to_string(),
26        }
27    }
28}
29
30impl ShapeStream {
31    fn set_offset(&mut self, headers: &HeaderMap) {
32        self.offset = headers
33            .get("electric-offset")
34            .expect("electric-offset is missing in the header")
35            .to_str()
36            .expect("header electric-offset contains non-ASCII chars")
37            .to_string();
38    }
39
40    fn set_handle(&mut self, headers: &HeaderMap) {
41        self.handle = headers.get("electric-handle").map(|header| {
42            header
43                .to_str()
44                .expect("header electric-handle contains non-ASCII chars")
45                .to_string()
46        });
47    }
48
49    fn set_cursor(&mut self, headers: &HeaderMap) {
50        self.cursor = headers.get("electric-cursor").map(|header| {
51            header
52                .to_str()
53                .expect("header electric-cursor contains non-ASCII chars")
54                .to_string()
55        });
56    }
57
58    fn set_is_up_to_date(&mut self, headers: &HeaderMap) {
59        self.is_up_to_date = headers.get("electric-up-to-date").is_some();
60    }
61
62    fn get_url(&self) -> Result<Url, Box<dyn std::error::Error>> {
63        let base = Url::parse(&self.host)?;
64        let mut url = base.join(ENDPOINT_SHAPE)?;
65
66        url.query_pairs_mut()
67            .append_pair("table", &self.table)
68            .append_pair("offset", &self.offset);
69
70        if let Some(handle) = &self.handle {
71            url.query_pairs_mut().append_pair("handle", handle);
72        }
73
74        if self.is_up_to_date {
75            if let Some(cursor) = &self.cursor {
76                url.query_pairs_mut().append_pair("cursor", cursor);
77            }
78            url.query_pairs_mut().append_pair("live", "true");
79        }
80
81        Ok(url)
82    }
83
84    pub async fn fetch(&mut self) -> Result<Vec<String>, Box<dyn std::error::Error>> {
85        let mut messages = Vec::new();
86
87        loop {
88            let url = self.get_url()?;
89            let resp = reqwest::get(url).await?;
90            let status = resp.status();
91
92            if !status.is_success() {
93                eprintln!("request is unsuccessful: {}", status);
94                break;
95            }
96
97            let headers = resp.headers();
98
99            self.set_offset(headers);
100            self.set_handle(headers);
101            self.set_cursor(headers);
102            self.set_is_up_to_date(headers);
103
104            if status == StatusCode::NO_CONTENT {
105                continue;
106            }
107
108            let body = resp.text().await?;
109            messages.push(body);
110
111            if self.is_up_to_date {
112                break;
113            }
114        }
115
116        Ok(messages)
117    }
118}