datasynth_runtime/
stream_client.rs1use std::io::{self, Write};
8use std::time::Duration;
9
10use reqwest::blocking::Client;
11use tracing::{debug, warn};
12
13#[derive(Debug, Clone)]
15pub struct StreamConfig {
16 pub target_url: String,
18 pub batch_size: usize,
20 pub timeout_secs: u64,
22 pub api_key: Option<String>,
24 pub max_retries: u32,
26}
27
28impl Default for StreamConfig {
29 fn default() -> Self {
30 Self {
31 target_url: String::new(),
32 batch_size: 1000,
33 timeout_secs: 30,
34 api_key: None,
35 max_retries: 3,
36 }
37 }
38}
39
40pub struct StreamClient {
45 config: StreamConfig,
46 client: Client,
47 lines: Vec<String>,
49 partial: String,
51 total_sent: usize,
53}
54
55impl StreamClient {
56 pub fn new(config: StreamConfig) -> io::Result<Self> {
58 let client = Client::builder()
59 .timeout(Duration::from_secs(config.timeout_secs))
60 .build()
61 .map_err(io::Error::other)?;
62
63 Ok(Self {
64 config,
65 client,
66 lines: Vec::new(),
67 partial: String::new(),
68 total_sent: 0,
69 })
70 }
71
72 pub fn total_sent(&self) -> usize {
74 self.total_sent
75 }
76
77 fn send_batch(&mut self) -> io::Result<()> {
79 if self.lines.is_empty() {
80 return Ok(());
81 }
82
83 let payload = self.lines.join("");
84 let batch_len = self.lines.len();
85
86 for attempt in 0..=self.config.max_retries {
87 let mut request = self
88 .client
89 .post(&self.config.target_url)
90 .header("Content-Type", "application/x-ndjson")
91 .body(payload.clone());
92
93 if let Some(ref key) = self.config.api_key {
94 request = request.header("X-API-Key", key);
95 }
96
97 match request.send() {
98 Ok(response) if response.status().is_success() => {
99 debug!(
100 "Streamed batch of {} lines (total: {})",
101 batch_len,
102 self.total_sent + batch_len
103 );
104 self.total_sent += batch_len;
105 self.lines.clear();
106 return Ok(());
107 }
108 Ok(response) => {
109 let status = response.status();
110 if attempt < self.config.max_retries {
111 warn!(
112 "Stream batch failed (HTTP {}), retry {}/{}",
113 status,
114 attempt + 1,
115 self.config.max_retries
116 );
117 std::thread::sleep(Duration::from_millis(500 * (attempt as u64 + 1)));
118 } else {
119 return Err(io::Error::other(format!(
120 "Stream batch failed after {} retries (HTTP {})",
121 self.config.max_retries, status
122 )));
123 }
124 }
125 Err(e) => {
126 if attempt < self.config.max_retries {
127 warn!(
128 "Stream batch error: {}, retry {}/{}",
129 e,
130 attempt + 1,
131 self.config.max_retries
132 );
133 std::thread::sleep(Duration::from_millis(500 * (attempt as u64 + 1)));
134 } else {
135 return Err(io::Error::other(format!(
136 "Stream batch failed after {} retries: {}",
137 self.config.max_retries, e
138 )));
139 }
140 }
141 }
142 }
143
144 Ok(())
145 }
146}
147
148impl Write for StreamClient {
149 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
150 let s =
151 std::str::from_utf8(buf).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
152
153 self.partial.push_str(s);
154
155 while let Some(pos) = self.partial.find('\n') {
157 let line = self.partial[..=pos].to_string();
158 self.partial = self.partial[pos + 1..].to_string();
159 self.lines.push(line);
160
161 if self.lines.len() >= self.config.batch_size {
163 self.send_batch()?;
164 }
165 }
166
167 Ok(buf.len())
168 }
169
170 fn flush(&mut self) -> io::Result<()> {
171 if !self.partial.is_empty() {
173 let mut line = std::mem::take(&mut self.partial);
174 if !line.ends_with('\n') {
175 line.push('\n');
176 }
177 self.lines.push(line);
178 }
179
180 self.send_batch()
182 }
183}
184
185#[cfg(test)]
186mod tests {
187 use super::*;
188
189 #[test]
190 fn test_stream_config_default() {
191 let config = StreamConfig::default();
192 assert_eq!(config.batch_size, 1000);
193 assert_eq!(config.timeout_secs, 30);
194 assert_eq!(config.max_retries, 3);
195 assert!(config.api_key.is_none());
196 }
197
198 #[test]
199 fn test_line_buffering() {
200 let config = StreamConfig {
204 target_url: "http://localhost:9999/ingest".to_string(),
205 batch_size: 10000, ..Default::default()
207 };
208 let mut client = StreamClient::new(config).unwrap();
209
210 client
212 .write_all(b"{\"_type\":\"node\",\"id\":\"1\"}\n")
213 .unwrap();
214 client
215 .write_all(b"{\"_type\":\"node\",\"id\":\"2\"}\n")
216 .unwrap();
217
218 assert_eq!(client.lines.len(), 2);
219 assert!(client.partial.is_empty());
220 assert_eq!(client.total_sent, 0);
221 }
222
223 #[test]
224 fn test_partial_line_handling() {
225 let config = StreamConfig {
226 target_url: "http://localhost:9999/ingest".to_string(),
227 batch_size: 10000,
228 ..Default::default()
229 };
230 let mut client = StreamClient::new(config).unwrap();
231
232 client.write_all(b"{\"_type\":\"node\"").unwrap();
234 assert_eq!(client.lines.len(), 0);
235 assert_eq!(client.partial, "{\"_type\":\"node\"");
236
237 client.write_all(b",\"id\":\"1\"}\n").unwrap();
239 assert_eq!(client.lines.len(), 1);
240 assert!(client.partial.is_empty());
241 }
242}