electric_sql_client/
lib.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
use reqwest::{header::HeaderMap, StatusCode, Url};

const ENDPOINT_SHAPE: &str = "/v1/shape";
const NEGATIVE_ONE: &str = "-1";

pub struct ShapeStream {
    cursor: Option<String>,
    handle: Option<String>,
    host: String,
    is_up_to_date: bool,
    offset: String,
    table: String,
}

impl ShapeStream {
    pub fn new(host: &str, table: &str, offset: Option<&str>) -> Self {
        let offset = offset.unwrap_or(NEGATIVE_ONE).to_string();

        Self {
            cursor: None,
            handle: None,
            host: host.to_string(),
            is_up_to_date: false,
            offset,
            table: table.to_string(),
        }
    }
}

impl ShapeStream {
    fn set_offset(&mut self, headers: &HeaderMap) {
        self.offset = headers
            .get("electric-offset")
            .expect("electric-offset is missing in the header")
            .to_str()
            .expect("header electric-offset contains non-ASCII chars")
            .to_string();
    }

    fn set_handle(&mut self, headers: &HeaderMap) {
        self.handle = headers.get("electric-handle").map(|header| {
            header
                .to_str()
                .expect("header electric-handle contains non-ASCII chars")
                .to_string()
        });
    }

    fn set_cursor(&mut self, headers: &HeaderMap) {
        self.cursor = headers.get("electric-cursor").map(|header| {
            header
                .to_str()
                .expect("header electric-cursor contains non-ASCII chars")
                .to_string()
        });
    }

    fn set_is_up_to_date(&mut self, headers: &HeaderMap) {
        self.is_up_to_date = headers.get("electric-up-to-date").is_some();
    }

    fn get_url(&self) -> Result<Url, Box<dyn std::error::Error>> {
        let base = Url::parse(&self.host)?;
        let mut url = base.join(ENDPOINT_SHAPE)?;

        url.query_pairs_mut()
            .append_pair("table", &self.table)
            .append_pair("offset", &self.offset);

        if let Some(handle) = &self.handle {
            url.query_pairs_mut().append_pair("handle", handle);
        }

        if self.is_up_to_date {
            if let Some(cursor) = &self.cursor {
                url.query_pairs_mut().append_pair("cursor", cursor);
            }
            url.query_pairs_mut().append_pair("live", "true");
        }

        Ok(url)
    }

    pub async fn fetch(&mut self) -> Result<Vec<String>, Box<dyn std::error::Error>> {
        let mut messages = Vec::new();

        loop {
            let url = self.get_url()?;
            let resp = reqwest::get(url).await?;
            let status = resp.status();

            if !status.is_success() {
                eprintln!("request is unsuccessful: {}", status);
                break;
            }

            let headers = resp.headers();

            self.set_offset(headers);
            self.set_handle(headers);
            self.set_cursor(headers);
            self.set_is_up_to_date(headers);

            if status == StatusCode::NO_CONTENT {
                continue;
            }

            let body = resp.text().await?;
            messages.push(body);

            if self.is_up_to_date {
                break;
            }
        }

        Ok(messages)
    }
}