1use anyhow::{bail, Context, Result};
4use statsai_core::{SyncAck, SyncBatch, SYNC_ACK_SCHEMA_VERSION};
5use std::io::Write;
6use std::path::PathBuf;
7use std::time::Duration;
8
9pub trait SyncSink {
10 fn name(&self) -> &'static str;
11 fn send(&self, batch: &SyncBatch) -> Result<()>;
12}
13
14pub struct StdoutSink;
15
16impl SyncSink for StdoutSink {
17 fn name(&self) -> &'static str {
18 "stdout"
19 }
20
21 fn send(&self, batch: &SyncBatch) -> Result<()> {
22 let stdout = std::io::stdout();
23 let mut lock = stdout.lock();
24 serde_json::to_writer_pretty(&mut lock, batch)?;
25 writeln!(lock)?;
26 Ok(())
27 }
28}
29
30#[derive(Debug, Clone)]
31pub struct HttpSink {
32 endpoint: String,
33 bearer_token: Option<String>,
34 timeout: Duration,
35}
36
37impl HttpSink {
38 pub fn new(endpoint: impl AsRef<str>, bearer_token: Option<String>) -> Result<Self> {
44 let endpoint = endpoint.as_ref().trim();
45 if !(endpoint.starts_with("http://") || endpoint.starts_with("https://")) {
46 bail!("http sink supports http:// and https:// endpoints only");
47 }
48 Ok(Self {
49 endpoint: endpoint.to_string(),
50 bearer_token,
51 timeout: Duration::from_secs(30),
52 })
53 }
54
55 pub fn send_with_ack(&self, batch: &SyncBatch) -> Result<SyncAck> {
62 let request = ureq::post(&self.endpoint)
63 .timeout(self.timeout)
64 .set(
65 "User-Agent",
66 &format!("statsai/{}", env!("CARGO_PKG_VERSION")),
67 )
68 .set("Content-Type", "application/json")
69 .set("Accept", "application/json");
70 let request = if let Some(token) = self
71 .bearer_token
72 .as_deref()
73 .filter(|token| !token.is_empty())
74 {
75 request.set("Authorization", &format!("Bearer {token}"))
76 } else {
77 request
78 };
79 let response = request.send_json(serde_json::to_value(batch)?);
80 let response = match response {
81 Ok(response) => response,
82 Err(ureq::Error::Status(code, response)) => {
83 let body = response.into_string().unwrap_or_default();
84 bail!(
85 "sync endpoint returned HTTP {}: {}",
86 code,
87 body.trim().chars().take(200).collect::<String>()
88 );
89 }
90 Err(error) => bail!("sync endpoint request failed: {}", error),
91 };
92 let ack: SyncAck = response.into_json().context("parse sync ack")?;
93 if ack.schema_version != SYNC_ACK_SCHEMA_VERSION {
94 bail!("unsupported sync ack schema {}", ack.schema_version);
95 }
96 Ok(ack)
97 }
98}
99
100impl SyncSink for HttpSink {
101 fn name(&self) -> &'static str {
102 "http"
103 }
104
105 fn send(&self, batch: &SyncBatch) -> Result<()> {
106 self.send_with_ack(batch)?;
107 Ok(())
108 }
109}
110
111#[derive(Debug, Clone)]
112pub struct FileSink {
113 path: PathBuf,
114}
115
116impl FileSink {
117 #[must_use]
118 pub fn new(path: PathBuf) -> Self {
119 Self { path }
120 }
121}
122
123impl SyncSink for FileSink {
124 fn name(&self) -> &'static str {
125 "file"
126 }
127
128 fn send(&self, batch: &SyncBatch) -> Result<()> {
129 if let Some(parent) = self.path.parent() {
130 std::fs::create_dir_all(parent)
131 .with_context(|| format!("create {}", parent.display()))?;
132 }
133 let file = std::fs::File::create(&self.path)
134 .with_context(|| format!("write {}", self.path.display()))?;
135 serde_json::to_writer_pretty(file, batch)?;
136 Ok(())
137 }
138}
139
140#[cfg(test)]
141mod tests {
142 use super::*;
143 use chrono::Utc;
144 use statsai_core::SyncBatch;
145 use std::sync::mpsc;
146 use tiny_http::{Header, Method, Response, Server};
147
148 fn empty_batch() -> SyncBatch {
149 SyncBatch {
150 schema_version: "sync_batch.v1".to_string(),
151 batch_id: "batch_1".to_string(),
152 device_id: "device".to_string(),
153 sources: Vec::new(),
154 accounts: Vec::new(),
155 source_account_assignments: Vec::new(),
156 subscriptions: Vec::new(),
157 events: Vec::new(),
158 summaries: Vec::new(),
159 created_at: Utc::now(),
160 }
161 }
162
163 #[test]
164 fn file_sink_writes_json() {
165 let dir = tempfile::tempdir().expect("tempdir");
166 let path = dir.path().join("batch.json");
167 let sink = FileSink::new(path.clone());
168 sink.send(&empty_batch()).expect("write");
169
170 let content = std::fs::read_to_string(&path).expect("read");
171 assert!(content.contains("batch_1"));
172 assert!(content.contains("device"));
173 }
174
175 #[test]
176 fn http_sink_posts_sync_batch_with_bearer_token() {
177 let server = Server::http("127.0.0.1:0").expect("server");
178 let endpoint = format!("http://{}/v1/sync/batches", server.server_addr());
179 let (tx, rx) = mpsc::channel();
180 let handle = std::thread::spawn(move || {
181 let mut request = server.recv().expect("request");
182 assert_eq!(request.method(), &Method::Post);
183 assert_eq!(request.url(), "/v1/sync/batches");
184 let auth = request
185 .headers()
186 .iter()
187 .find(|header| header.field.equiv("Authorization"))
188 .map(|header| header.value.as_str().to_string());
189 let content_type = request
190 .headers()
191 .iter()
192 .find(|header| header.field.equiv("Content-Type"))
193 .map(|header| header.value.as_str().to_string());
194 let mut body = String::new();
195 request.as_reader().read_to_string(&mut body).expect("body");
196 tx.send((auth, content_type, body)).expect("send body");
197 let response = Response::from_string(test_ack_json("batch_1"))
198 .with_header(Header::from_bytes("content-type", "application/json").unwrap());
199 request.respond(response).expect("respond");
200 });
201
202 let sink = HttpSink::new(endpoint, Some("token_123".to_string())).expect("sink");
203 sink.send(&empty_batch()).expect("send");
204 handle.join().expect("server thread");
205 let (auth, content_type, body) = rx.recv().expect("request body");
206 assert_eq!(auth.as_deref(), Some("Bearer token_123"));
207 assert_eq!(content_type.as_deref(), Some("application/json"));
208 assert!(body.contains("\"schema_version\":\"sync_batch.v1\""));
209 assert!(body.contains("\"batch_id\":\"batch_1\""));
210 }
211
212 #[test]
213 fn http_sink_rejects_non_success_status() {
214 let server = Server::http("127.0.0.1:0").expect("server");
215 let endpoint = format!("http://{}/v1/sync/batches", server.server_addr());
216 let handle = std::thread::spawn(move || {
217 let request = server.recv().expect("request");
218 request
219 .respond(Response::from_string("nope").with_status_code(500))
220 .expect("respond");
221 });
222
223 let sink = HttpSink::new(endpoint, None).expect("sink");
224 let error = sink.send(&empty_batch()).expect_err("500 should fail");
225 handle.join().expect("server thread");
226 assert!(error.to_string().contains("HTTP 500"));
227 }
228
229 #[test]
230 fn http_sink_rejects_non_http_url() {
231 let error =
232 HttpSink::new("ftp://example.com/v1/sync/batches", None).expect_err("bad scheme");
233 assert!(error.to_string().contains("http://"));
234 }
235
236 fn test_ack_json(batch_id: &str) -> String {
237 format!(
238 r#"{{
239 "schema_version":"sync_ack.v1",
240 "batch_id":"{batch_id}",
241 "accepted":{{"sources":0,"accounts":0,"source_account_assignments":0,"subscriptions":0,"events":0,"summaries":0}},
242 "duplicates":{{"sources":0,"accounts":0,"source_account_assignments":0,"subscriptions":0,"events":0,"summaries":0}},
243 "rejected":[]
244 }}"#
245 )
246 }
247}