Skip to main content

statsai_sync/
lib.rs

1//! Sync sink interfaces for `statsai`.
2
3use anyhow::{bail, Context, Result};
4use statsai_core::{SyncAck, SyncBatch, SYNC_ACK_SCHEMA_VERSION};
5use std::io::Write;
6use std::path::PathBuf;
7use std::time::Duration;
8
9pub trait SyncSink {
10    fn name(&self) -> &'static str;
11    fn send(&self, batch: &SyncBatch) -> Result<()>;
12}
13
14pub struct StdoutSink;
15
16impl SyncSink for StdoutSink {
17    fn name(&self) -> &'static str {
18        "stdout"
19    }
20
21    fn send(&self, batch: &SyncBatch) -> Result<()> {
22        let stdout = std::io::stdout();
23        let mut lock = stdout.lock();
24        serde_json::to_writer_pretty(&mut lock, batch)?;
25        writeln!(lock)?;
26        Ok(())
27    }
28}
29
30#[derive(Debug, Clone)]
31pub struct HttpSink {
32    endpoint: String,
33    bearer_token: Option<String>,
34    timeout: Duration,
35}
36
37impl HttpSink {
38    /// Creates an HTTP sync sink.
39    ///
40    /// # Errors
41    ///
42    /// Returns an error when `endpoint` is not an `http://` or `https://` URL.
43    pub fn new(endpoint: impl AsRef<str>, bearer_token: Option<String>) -> Result<Self> {
44        let endpoint = endpoint.as_ref().trim();
45        if !(endpoint.starts_with("http://") || endpoint.starts_with("https://")) {
46            bail!("http sink supports http:// and https:// endpoints only");
47        }
48        Ok(Self {
49            endpoint: endpoint.to_string(),
50            bearer_token,
51            timeout: Duration::from_secs(30),
52        })
53    }
54
55    /// Sends a batch and returns the server acknowledgement.
56    ///
57    /// # Errors
58    ///
59    /// Returns an error if the endpoint rejects the batch, the connection fails,
60    /// or the response is not a supported sync acknowledgement.
61    pub fn send_with_ack(&self, batch: &SyncBatch) -> Result<SyncAck> {
62        let request = ureq::post(&self.endpoint)
63            .timeout(self.timeout)
64            .set(
65                "User-Agent",
66                &format!("statsai/{}", env!("CARGO_PKG_VERSION")),
67            )
68            .set("Content-Type", "application/json")
69            .set("Accept", "application/json");
70        let request = if let Some(token) = self
71            .bearer_token
72            .as_deref()
73            .filter(|token| !token.is_empty())
74        {
75            request.set("Authorization", &format!("Bearer {token}"))
76        } else {
77            request
78        };
79        let response = request.send_json(serde_json::to_value(batch)?);
80        let response = match response {
81            Ok(response) => response,
82            Err(ureq::Error::Status(code, response)) => {
83                let body = response.into_string().unwrap_or_default();
84                bail!(
85                    "sync endpoint returned HTTP {}: {}",
86                    code,
87                    body.trim().chars().take(200).collect::<String>()
88                );
89            }
90            Err(error) => bail!("sync endpoint request failed: {}", error),
91        };
92        let ack: SyncAck = response.into_json().context("parse sync ack")?;
93        if ack.schema_version != SYNC_ACK_SCHEMA_VERSION {
94            bail!("unsupported sync ack schema {}", ack.schema_version);
95        }
96        Ok(ack)
97    }
98}
99
100impl SyncSink for HttpSink {
101    fn name(&self) -> &'static str {
102        "http"
103    }
104
105    fn send(&self, batch: &SyncBatch) -> Result<()> {
106        self.send_with_ack(batch)?;
107        Ok(())
108    }
109}
110
111#[derive(Debug, Clone)]
112pub struct FileSink {
113    path: PathBuf,
114}
115
116impl FileSink {
117    #[must_use]
118    pub fn new(path: PathBuf) -> Self {
119        Self { path }
120    }
121}
122
123impl SyncSink for FileSink {
124    fn name(&self) -> &'static str {
125        "file"
126    }
127
128    fn send(&self, batch: &SyncBatch) -> Result<()> {
129        if let Some(parent) = self.path.parent() {
130            std::fs::create_dir_all(parent)
131                .with_context(|| format!("create {}", parent.display()))?;
132        }
133        let file = std::fs::File::create(&self.path)
134            .with_context(|| format!("write {}", self.path.display()))?;
135        serde_json::to_writer_pretty(file, batch)?;
136        Ok(())
137    }
138}
139
140#[cfg(test)]
141mod tests {
142    use super::*;
143    use chrono::Utc;
144    use statsai_core::SyncBatch;
145    use std::sync::mpsc;
146    use tiny_http::{Header, Method, Response, Server};
147
148    fn empty_batch() -> SyncBatch {
149        SyncBatch {
150            schema_version: "sync_batch.v1".to_string(),
151            batch_id: "batch_1".to_string(),
152            device_id: "device".to_string(),
153            sources: Vec::new(),
154            accounts: Vec::new(),
155            source_account_assignments: Vec::new(),
156            subscriptions: Vec::new(),
157            events: Vec::new(),
158            summaries: Vec::new(),
159            created_at: Utc::now(),
160        }
161    }
162
163    #[test]
164    fn file_sink_writes_json() {
165        let dir = tempfile::tempdir().expect("tempdir");
166        let path = dir.path().join("batch.json");
167        let sink = FileSink::new(path.clone());
168        sink.send(&empty_batch()).expect("write");
169
170        let content = std::fs::read_to_string(&path).expect("read");
171        assert!(content.contains("batch_1"));
172        assert!(content.contains("device"));
173    }
174
175    #[test]
176    fn http_sink_posts_sync_batch_with_bearer_token() {
177        let server = Server::http("127.0.0.1:0").expect("server");
178        let endpoint = format!("http://{}/v1/sync/batches", server.server_addr());
179        let (tx, rx) = mpsc::channel();
180        let handle = std::thread::spawn(move || {
181            let mut request = server.recv().expect("request");
182            assert_eq!(request.method(), &Method::Post);
183            assert_eq!(request.url(), "/v1/sync/batches");
184            let auth = request
185                .headers()
186                .iter()
187                .find(|header| header.field.equiv("Authorization"))
188                .map(|header| header.value.as_str().to_string());
189            let content_type = request
190                .headers()
191                .iter()
192                .find(|header| header.field.equiv("Content-Type"))
193                .map(|header| header.value.as_str().to_string());
194            let mut body = String::new();
195            request.as_reader().read_to_string(&mut body).expect("body");
196            tx.send((auth, content_type, body)).expect("send body");
197            let response = Response::from_string(test_ack_json("batch_1"))
198                .with_header(Header::from_bytes("content-type", "application/json").unwrap());
199            request.respond(response).expect("respond");
200        });
201
202        let sink = HttpSink::new(endpoint, Some("token_123".to_string())).expect("sink");
203        sink.send(&empty_batch()).expect("send");
204        handle.join().expect("server thread");
205        let (auth, content_type, body) = rx.recv().expect("request body");
206        assert_eq!(auth.as_deref(), Some("Bearer token_123"));
207        assert_eq!(content_type.as_deref(), Some("application/json"));
208        assert!(body.contains("\"schema_version\":\"sync_batch.v1\""));
209        assert!(body.contains("\"batch_id\":\"batch_1\""));
210    }
211
212    #[test]
213    fn http_sink_rejects_non_success_status() {
214        let server = Server::http("127.0.0.1:0").expect("server");
215        let endpoint = format!("http://{}/v1/sync/batches", server.server_addr());
216        let handle = std::thread::spawn(move || {
217            let request = server.recv().expect("request");
218            request
219                .respond(Response::from_string("nope").with_status_code(500))
220                .expect("respond");
221        });
222
223        let sink = HttpSink::new(endpoint, None).expect("sink");
224        let error = sink.send(&empty_batch()).expect_err("500 should fail");
225        handle.join().expect("server thread");
226        assert!(error.to_string().contains("HTTP 500"));
227    }
228
229    #[test]
230    fn http_sink_rejects_non_http_url() {
231        let error =
232            HttpSink::new("ftp://example.com/v1/sync/batches", None).expect_err("bad scheme");
233        assert!(error.to_string().contains("http://"));
234    }
235
236    fn test_ack_json(batch_id: &str) -> String {
237        format!(
238            r#"{{
239              "schema_version":"sync_ack.v1",
240              "batch_id":"{batch_id}",
241              "accepted":{{"sources":0,"accounts":0,"source_account_assignments":0,"subscriptions":0,"events":0,"summaries":0}},
242              "duplicates":{{"sources":0,"accounts":0,"source_account_assignments":0,"subscriptions":0,"events":0,"summaries":0}},
243              "rejected":[]
244            }}"#
245        )
246    }
247}