Skip to main content

datasynth_runtime/
stream_client.rs

1//! Streaming HTTP client for pushing unified JSONL to a RustGraph ingest endpoint.
2//!
3//! Implements `std::io::Write` so it can be passed directly to
4//! `RustGraphUnifiedExporter::export_to_writer`. Buffers JSONL lines in memory
5//! and auto-flushes when batch size is reached.
6
7use std::io::{self, Write};
8use std::time::Duration;
9
10use reqwest::blocking::Client;
11use tracing::{debug, warn};
12
13/// Configuration for the streaming client.
14#[derive(Debug, Clone)]
15pub struct StreamConfig {
16    /// Target URL for the RustGraph ingest endpoint.
17    pub target_url: String,
18    /// Number of JSONL lines per HTTP POST batch.
19    pub batch_size: usize,
20    /// HTTP request timeout in seconds.
21    pub timeout_secs: u64,
22    /// Optional API key for authentication.
23    pub api_key: Option<String>,
24    /// Maximum number of retries per batch.
25    pub max_retries: u32,
26}
27
28impl Default for StreamConfig {
29    fn default() -> Self {
30        Self {
31            target_url: String::new(),
32            batch_size: 1000,
33            timeout_secs: 30,
34            api_key: None,
35            max_retries: 3,
36        }
37    }
38}
39
40/// HTTP streaming client that implements `Write` for JSONL output.
41///
42/// Each complete line (terminated by `\n`) is buffered. When the buffer
43/// reaches `batch_size` lines, the batch is POSTed to the target URL.
44pub struct StreamClient {
45    config: StreamConfig,
46    client: Client,
47    /// Buffer of complete JSONL lines ready to send.
48    lines: Vec<String>,
49    /// Partial line buffer (data received without trailing newline).
50    partial: String,
51    /// Total lines sent.
52    total_sent: usize,
53}
54
55impl StreamClient {
56    /// Create a new streaming client with the given configuration.
57    pub fn new(config: StreamConfig) -> io::Result<Self> {
58        let client = Client::builder()
59            .timeout(Duration::from_secs(config.timeout_secs))
60            .build()
61            .map_err(io::Error::other)?;
62
63        Ok(Self {
64            config,
65            client,
66            lines: Vec::new(),
67            partial: String::new(),
68            total_sent: 0,
69        })
70    }
71
72    /// Returns the total number of lines sent so far.
73    pub fn total_sent(&self) -> usize {
74        self.total_sent
75    }
76
77    /// Send a batch of lines to the target endpoint.
78    fn send_batch(&mut self) -> io::Result<()> {
79        if self.lines.is_empty() {
80            return Ok(());
81        }
82
83        let payload = self.lines.join("");
84        let batch_len = self.lines.len();
85
86        for attempt in 0..=self.config.max_retries {
87            let mut request = self
88                .client
89                .post(&self.config.target_url)
90                .header("Content-Type", "application/x-ndjson")
91                .body(payload.clone());
92
93            if let Some(ref key) = self.config.api_key {
94                request = request.header("X-API-Key", key);
95            }
96
97            match request.send() {
98                Ok(response) if response.status().is_success() => {
99                    debug!(
100                        "Streamed batch of {} lines (total: {})",
101                        batch_len,
102                        self.total_sent + batch_len
103                    );
104                    self.total_sent += batch_len;
105                    self.lines.clear();
106                    return Ok(());
107                }
108                Ok(response) => {
109                    let status = response.status();
110                    if attempt < self.config.max_retries {
111                        warn!(
112                            "Stream batch failed (HTTP {}), retry {}/{}",
113                            status,
114                            attempt + 1,
115                            self.config.max_retries
116                        );
117                        std::thread::sleep(Duration::from_millis(500 * (attempt as u64 + 1)));
118                    } else {
119                        return Err(io::Error::other(format!(
120                            "Stream batch failed after {} retries (HTTP {})",
121                            self.config.max_retries, status
122                        )));
123                    }
124                }
125                Err(e) => {
126                    if attempt < self.config.max_retries {
127                        warn!(
128                            "Stream batch error: {}, retry {}/{}",
129                            e,
130                            attempt + 1,
131                            self.config.max_retries
132                        );
133                        std::thread::sleep(Duration::from_millis(500 * (attempt as u64 + 1)));
134                    } else {
135                        return Err(io::Error::other(format!(
136                            "Stream batch failed after {} retries: {}",
137                            self.config.max_retries, e
138                        )));
139                    }
140                }
141            }
142        }
143
144        Ok(())
145    }
146}
147
148impl Write for StreamClient {
149    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
150        let s =
151            std::str::from_utf8(buf).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
152
153        self.partial.push_str(s);
154
155        // Split on newlines and buffer complete lines
156        while let Some(pos) = self.partial.find('\n') {
157            let line = self.partial[..=pos].to_string();
158            self.partial = self.partial[pos + 1..].to_string();
159            self.lines.push(line);
160
161            // Auto-flush when batch size reached
162            if self.lines.len() >= self.config.batch_size {
163                self.send_batch()?;
164            }
165        }
166
167        Ok(buf.len())
168    }
169
170    fn flush(&mut self) -> io::Result<()> {
171        // If there's a partial line remaining, treat it as a complete line
172        if !self.partial.is_empty() {
173            let mut line = std::mem::take(&mut self.partial);
174            if !line.ends_with('\n') {
175                line.push('\n');
176            }
177            self.lines.push(line);
178        }
179
180        // Send any remaining buffered lines
181        self.send_batch()
182    }
183}
184
185#[cfg(test)]
186mod tests {
187    use super::*;
188
189    #[test]
190    fn test_stream_config_default() {
191        let config = StreamConfig::default();
192        assert_eq!(config.batch_size, 1000);
193        assert_eq!(config.timeout_secs, 30);
194        assert_eq!(config.max_retries, 3);
195        assert!(config.api_key.is_none());
196    }
197
198    #[test]
199    fn test_line_buffering() {
200        // We can't easily test HTTP posting without a server, but we can
201        // test the line buffering logic by using a large batch_size so
202        // no actual sends happen.
203        let config = StreamConfig {
204            target_url: "http://localhost:9999/ingest".to_string(),
205            batch_size: 10000, // Large so no auto-flush
206            ..Default::default()
207        };
208        let mut client = StreamClient::new(config).unwrap();
209
210        // Write some JSONL
211        client
212            .write_all(b"{\"_type\":\"node\",\"id\":\"1\"}\n")
213            .unwrap();
214        client
215            .write_all(b"{\"_type\":\"node\",\"id\":\"2\"}\n")
216            .unwrap();
217
218        assert_eq!(client.lines.len(), 2);
219        assert!(client.partial.is_empty());
220        assert_eq!(client.total_sent, 0);
221    }
222
223    #[test]
224    fn test_partial_line_handling() {
225        let config = StreamConfig {
226            target_url: "http://localhost:9999/ingest".to_string(),
227            batch_size: 10000,
228            ..Default::default()
229        };
230        let mut client = StreamClient::new(config).unwrap();
231
232        // Write partial line
233        client.write_all(b"{\"_type\":\"node\"").unwrap();
234        assert_eq!(client.lines.len(), 0);
235        assert_eq!(client.partial, "{\"_type\":\"node\"");
236
237        // Complete the line
238        client.write_all(b",\"id\":\"1\"}\n").unwrap();
239        assert_eq!(client.lines.len(), 1);
240        assert!(client.partial.is_empty());
241    }
242}