Skip to main content

datasynth_runtime/
stream_client.rs

1//! Streaming HTTP client for pushing unified JSONL to a RustGraph ingest endpoint.
2//!
3//! Implements `std::io::Write` so it can be passed directly to
4//! `RustGraphUnifiedExporter::export_to_writer`. Buffers JSONL lines in memory
5//! and auto-flushes when batch size is reached.
6
7use std::io::{self, Write};
8use std::time::Duration;
9
10use reqwest::blocking::Client;
11use tracing::{debug, warn};
12
13/// Configuration for the streaming client.
14#[derive(Debug, Clone)]
15pub struct StreamConfig {
16    /// Target URL for the RustGraph ingest endpoint.
17    pub target_url: String,
18    /// Number of JSONL lines per HTTP POST batch.
19    pub batch_size: usize,
20    /// HTTP request timeout in seconds.
21    pub timeout_secs: u64,
22    /// Optional API key for authentication.
23    pub api_key: Option<String>,
24    /// Maximum number of retries per batch.
25    pub max_retries: u32,
26}
27
28impl Default for StreamConfig {
29    fn default() -> Self {
30        Self {
31            target_url: String::new(),
32            batch_size: 1000,
33            timeout_secs: 30,
34            api_key: None,
35            max_retries: 3,
36        }
37    }
38}
39
40/// HTTP streaming client that implements `Write` for JSONL output.
41///
42/// Each complete line (terminated by `\n`) is buffered. When the buffer
43/// reaches `batch_size` lines, the batch is POSTed to the target URL.
44pub struct StreamClient {
45    config: StreamConfig,
46    client: Client,
47    /// Buffer of complete JSONL lines ready to send.
48    lines: Vec<String>,
49    /// Partial line buffer (data received without trailing newline).
50    partial: String,
51    /// Total lines sent.
52    total_sent: usize,
53}
54
55impl StreamClient {
56    /// Create a new streaming client with the given configuration.
57    pub fn new(config: StreamConfig) -> io::Result<Self> {
58        let client = Client::builder()
59            .timeout(Duration::from_secs(config.timeout_secs))
60            .build()
61            .map_err(io::Error::other)?;
62
63        Ok(Self {
64            config,
65            client,
66            lines: Vec::new(),
67            partial: String::new(),
68            total_sent: 0,
69        })
70    }
71
72    /// Returns the total number of lines sent so far.
73    pub fn total_sent(&self) -> usize {
74        self.total_sent
75    }
76
77    /// Send a batch of lines to the target endpoint.
78    fn send_batch(&mut self) -> io::Result<()> {
79        if self.lines.is_empty() {
80            return Ok(());
81        }
82
83        let payload = self.lines.join("");
84        let batch_len = self.lines.len();
85
86        for attempt in 0..=self.config.max_retries {
87            let mut request = self
88                .client
89                .post(&self.config.target_url)
90                .header("Content-Type", "application/x-ndjson")
91                .body(payload.clone());
92
93            if let Some(ref key) = self.config.api_key {
94                request = request.header("X-API-Key", key);
95            }
96
97            match request.send() {
98                Ok(response) if response.status().is_success() => {
99                    debug!(
100                        "Streamed batch of {} lines (total: {})",
101                        batch_len,
102                        self.total_sent + batch_len
103                    );
104                    self.total_sent += batch_len;
105                    self.lines.clear();
106                    return Ok(());
107                }
108                Ok(response) => {
109                    let status = response.status();
110                    if attempt < self.config.max_retries {
111                        warn!(
112                            "Stream batch failed (HTTP {}), retry {}/{}",
113                            status,
114                            attempt + 1,
115                            self.config.max_retries
116                        );
117                        std::thread::sleep(Duration::from_millis(500 * (attempt as u64 + 1)));
118                    } else {
119                        return Err(io::Error::other(format!(
120                            "Stream batch failed after {} retries (HTTP {})",
121                            self.config.max_retries, status
122                        )));
123                    }
124                }
125                Err(e) => {
126                    if attempt < self.config.max_retries {
127                        warn!(
128                            "Stream batch error: {}, retry {}/{}",
129                            e,
130                            attempt + 1,
131                            self.config.max_retries
132                        );
133                        std::thread::sleep(Duration::from_millis(500 * (attempt as u64 + 1)));
134                    } else {
135                        return Err(io::Error::other(format!(
136                            "Stream batch failed after {} retries: {}",
137                            self.config.max_retries, e
138                        )));
139                    }
140                }
141            }
142        }
143
144        Ok(())
145    }
146}
147
148impl Write for StreamClient {
149    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
150        let s =
151            std::str::from_utf8(buf).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
152
153        self.partial.push_str(s);
154
155        // Split on newlines and buffer complete lines
156        while let Some(pos) = self.partial.find('\n') {
157            let line = self.partial[..=pos].to_string();
158            self.partial = self.partial[pos + 1..].to_string();
159            self.lines.push(line);
160
161            // Auto-flush when batch size reached
162            if self.lines.len() >= self.config.batch_size {
163                self.send_batch()?;
164            }
165        }
166
167        Ok(buf.len())
168    }
169
170    fn flush(&mut self) -> io::Result<()> {
171        // If there's a partial line remaining, treat it as a complete line
172        if !self.partial.is_empty() {
173            let mut line = std::mem::take(&mut self.partial);
174            if !line.ends_with('\n') {
175                line.push('\n');
176            }
177            self.lines.push(line);
178        }
179
180        // Send any remaining buffered lines
181        self.send_batch()
182    }
183}
184
185#[cfg(test)]
186#[allow(clippy::unwrap_used)]
187mod tests {
188    use super::*;
189
190    #[test]
191    fn test_stream_config_default() {
192        let config = StreamConfig::default();
193        assert_eq!(config.batch_size, 1000);
194        assert_eq!(config.timeout_secs, 30);
195        assert_eq!(config.max_retries, 3);
196        assert!(config.api_key.is_none());
197    }
198
199    #[test]
200    fn test_line_buffering() {
201        // We can't easily test HTTP posting without a server, but we can
202        // test the line buffering logic by using a large batch_size so
203        // no actual sends happen.
204        let config = StreamConfig {
205            target_url: "http://localhost:9999/ingest".to_string(),
206            batch_size: 10000, // Large so no auto-flush
207            ..Default::default()
208        };
209        let mut client = StreamClient::new(config).unwrap();
210
211        // Write some JSONL
212        client
213            .write_all(b"{\"_type\":\"node\",\"id\":\"1\"}\n")
214            .unwrap();
215        client
216            .write_all(b"{\"_type\":\"node\",\"id\":\"2\"}\n")
217            .unwrap();
218
219        assert_eq!(client.lines.len(), 2);
220        assert!(client.partial.is_empty());
221        assert_eq!(client.total_sent, 0);
222    }
223
224    #[test]
225    fn test_partial_line_handling() {
226        let config = StreamConfig {
227            target_url: "http://localhost:9999/ingest".to_string(),
228            batch_size: 10000,
229            ..Default::default()
230        };
231        let mut client = StreamClient::new(config).unwrap();
232
233        // Write partial line
234        client.write_all(b"{\"_type\":\"node\"").unwrap();
235        assert_eq!(client.lines.len(), 0);
236        assert_eq!(client.partial, "{\"_type\":\"node\"");
237
238        // Complete the line
239        client.write_all(b",\"id\":\"1\"}\n").unwrap();
240        assert_eq!(client.lines.len(), 1);
241        assert!(client.partial.is_empty());
242    }
243}