1use std::path::PathBuf;
8use std::sync::{Arc, Mutex};
9
10pub trait PhaseSink: Send + Sync {
12 fn emit(
14 &self,
15 phase: &str,
16 item_type: &str,
17 item: &serde_json::Value,
18 ) -> Result<(), StreamError>;
19
20 fn phase_complete(&self, phase: &str) -> Result<(), StreamError>;
22
23 fn flush(&self) -> Result<(), StreamError>;
25
26 fn stats(&self) -> StreamStats;
28}
29
30#[derive(Debug, Clone, Default)]
32pub struct StreamStats {
33 pub items_emitted: u64,
35 pub bytes_sent: u64,
37 pub errors: u64,
39 pub phases_completed: u64,
41}
42
43#[derive(Debug, thiserror::Error)]
45pub enum StreamError {
46 #[error("IO error: {0}")]
48 Io(#[from] std::io::Error),
49
50 #[error("Serialization error: {0}")]
52 Serialization(String),
53
54 #[error("Connection error: {0}")]
56 Connection(String),
57
58 #[error("Backpressure: buffer full")]
60 BackpressureFull,
61}
62
63#[derive(Debug, Clone)]
65pub enum StreamTarget {
66 Http {
68 url: String,
70 api_key: Option<String>,
72 batch_size: usize,
74 },
75 File {
77 path: PathBuf,
79 },
80 None,
82}
83
84#[derive(Debug, Clone, Default)]
86pub enum BackpressureStrategy {
87 #[default]
89 Block,
90 DropOldest,
92 Buffer {
94 max_items: usize,
96 },
97}
98
99pub struct StreamPipeline {
101 target: StreamTarget,
102 stats: Arc<Mutex<StreamStats>>,
103 writer: Mutex<Option<Box<dyn std::io::Write + Send>>>,
104}
105
106impl StreamPipeline {
107 pub fn new(target: StreamTarget) -> Result<Self, StreamError> {
109 let writer: Option<Box<dyn std::io::Write + Send>> = match &target {
110 StreamTarget::File { path } => {
111 let file = std::fs::File::create(path)?;
112 Some(Box::new(std::io::BufWriter::new(file)))
113 }
114 StreamTarget::Http { .. } => None,
115 StreamTarget::None => None,
116 };
117 Ok(Self {
118 target,
119 stats: Arc::new(Mutex::new(StreamStats::default())),
120 writer: Mutex::new(writer),
121 })
122 }
123
124 pub fn none() -> Self {
126 Self {
127 target: StreamTarget::None,
128 stats: Arc::new(Mutex::new(StreamStats::default())),
129 writer: Mutex::new(None),
130 }
131 }
132
133 pub fn is_active(&self) -> bool {
135 !matches!(self.target, StreamTarget::None)
136 }
137}
138
139impl PhaseSink for StreamPipeline {
140 fn emit(
141 &self,
142 phase: &str,
143 item_type: &str,
144 item: &serde_json::Value,
145 ) -> Result<(), StreamError> {
146 if !self.is_active() {
147 return Ok(());
148 }
149
150 let envelope = serde_json::json!({
151 "phase": phase,
152 "item_type": item_type,
153 "data": item,
154 });
155 let json = serde_json::to_string(&envelope)
156 .map_err(|e| StreamError::Serialization(e.to_string()))?;
157 let bytes = json.len() as u64 + 1; if let Ok(mut writer_guard) = self.writer.lock() {
160 if let Some(writer) = writer_guard.as_mut() {
161 use std::io::Write;
162 writeln!(writer, "{}", json)?;
163 }
164 }
165
166 if let Ok(mut stats) = self.stats.lock() {
167 stats.items_emitted += 1;
168 stats.bytes_sent += bytes;
169 }
170 Ok(())
171 }
172
173 fn phase_complete(&self, _phase: &str) -> Result<(), StreamError> {
174 if let Ok(mut stats) = self.stats.lock() {
175 stats.phases_completed += 1;
176 }
177 self.flush()
178 }
179
180 fn flush(&self) -> Result<(), StreamError> {
181 if let Ok(mut writer_guard) = self.writer.lock() {
182 if let Some(writer) = writer_guard.as_mut() {
183 use std::io::Write;
184 writer.flush()?;
185 }
186 }
187 Ok(())
188 }
189
190 fn stats(&self) -> StreamStats {
191 self.stats.lock().map(|s| s.clone()).unwrap_or_default()
192 }
193}
194
195#[cfg(test)]
196mod tests {
197 use super::*;
198
199 #[test]
200 fn test_none_pipeline_is_inactive() {
201 let pipeline = StreamPipeline::none();
202 assert!(!pipeline.is_active());
203 }
204
205 #[test]
206 fn test_none_pipeline_emit_is_noop() {
207 let pipeline = StreamPipeline::none();
208 let item = serde_json::json!({"id": "noop"});
209 pipeline.emit("phase", "Type", &item).unwrap();
210 let stats = pipeline.stats();
211 assert_eq!(stats.items_emitted, 0);
212 }
213
214 #[test]
215 fn test_file_pipeline_writes_jsonl() {
216 let tmp = std::env::temp_dir().join("test_stream_pipeline_writes.jsonl");
217 let pipeline = StreamPipeline::new(StreamTarget::File { path: tmp.clone() }).unwrap();
218 assert!(pipeline.is_active());
219 let item = serde_json::json!({"id": "test-001", "amount": 100.0});
220 pipeline
221 .emit("journal_entries", "JournalEntry", &item)
222 .unwrap();
223 pipeline.flush().unwrap();
224 let content = std::fs::read_to_string(&tmp).unwrap();
225 assert!(content.contains("test-001"));
226 assert!(content.contains("journal_entries"));
227 assert!(content.contains("JournalEntry"));
228 let _ = std::fs::remove_file(&tmp);
229 }
230
231 #[test]
232 fn test_stats_increment() {
233 let tmp = std::env::temp_dir().join("test_stream_pipeline_stats.jsonl");
234 let pipeline = StreamPipeline::new(StreamTarget::File { path: tmp.clone() }).unwrap();
235 let item = serde_json::json!({"id": 1});
236 pipeline.emit("phase1", "Item", &item).unwrap();
237 pipeline.emit("phase1", "Item", &item).unwrap();
238 pipeline.phase_complete("phase1").unwrap();
239 let stats = pipeline.stats();
240 assert_eq!(stats.items_emitted, 2);
241 assert_eq!(stats.phases_completed, 1);
242 assert!(stats.bytes_sent > 0);
243 let _ = std::fs::remove_file(&tmp);
244 }
245
246 #[test]
247 fn test_multiple_phases() {
248 let tmp = std::env::temp_dir().join("test_stream_pipeline_phases.jsonl");
249 let pipeline = StreamPipeline::new(StreamTarget::File { path: tmp.clone() }).unwrap();
250 let item = serde_json::json!({"id": 1});
251 pipeline.emit("phase1", "A", &item).unwrap();
252 pipeline.phase_complete("phase1").unwrap();
253 pipeline.emit("phase2", "B", &item).unwrap();
254 pipeline.phase_complete("phase2").unwrap();
255 let stats = pipeline.stats();
256 assert_eq!(stats.items_emitted, 2);
257 assert_eq!(stats.phases_completed, 2);
258 let _ = std::fs::remove_file(&tmp);
259 }
260
261 #[test]
262 fn test_file_output_is_valid_jsonl() {
263 let tmp = std::env::temp_dir().join("test_stream_pipeline_valid_jsonl.jsonl");
264 let pipeline = StreamPipeline::new(StreamTarget::File { path: tmp.clone() }).unwrap();
265 let item1 = serde_json::json!({"id": "a"});
266 let item2 = serde_json::json!({"id": "b"});
267 pipeline.emit("p", "T", &item1).unwrap();
268 pipeline.emit("p", "T", &item2).unwrap();
269 pipeline.flush().unwrap();
270 let content = std::fs::read_to_string(&tmp).unwrap();
271 for line in content.lines() {
272 let parsed: serde_json::Value =
273 serde_json::from_str(line).expect("each line should be valid JSON");
274 assert!(parsed.get("phase").is_some());
275 assert!(parsed.get("item_type").is_some());
276 assert!(parsed.get("data").is_some());
277 }
278 let _ = std::fs::remove_file(&tmp);
279 }
280
281 #[test]
282 fn test_backpressure_strategy_default() {
283 let strategy = BackpressureStrategy::default();
284 assert!(matches!(strategy, BackpressureStrategy::Block));
285 }
286
287 pub struct MockPhaseSink {
289 pub items: Mutex<Vec<(String, String, serde_json::Value)>>,
290 pub completed_phases: Mutex<Vec<String>>,
291 pub flushed: Mutex<bool>,
292 }
293
294 impl MockPhaseSink {
295 pub fn new() -> Self {
296 Self {
297 items: Mutex::new(Vec::new()),
298 completed_phases: Mutex::new(Vec::new()),
299 flushed: Mutex::new(false),
300 }
301 }
302 }
303
304 impl PhaseSink for MockPhaseSink {
305 fn emit(
306 &self,
307 phase: &str,
308 item_type: &str,
309 item: &serde_json::Value,
310 ) -> Result<(), StreamError> {
311 self.items.lock().unwrap().push((
312 phase.to_string(),
313 item_type.to_string(),
314 item.clone(),
315 ));
316 Ok(())
317 }
318
319 fn phase_complete(&self, phase: &str) -> Result<(), StreamError> {
320 self.completed_phases
321 .lock()
322 .unwrap()
323 .push(phase.to_string());
324 Ok(())
325 }
326
327 fn flush(&self) -> Result<(), StreamError> {
328 *self.flushed.lock().unwrap() = true;
329 Ok(())
330 }
331
332 fn stats(&self) -> StreamStats {
333 let items = self.items.lock().unwrap();
334 let phases = self.completed_phases.lock().unwrap();
335 StreamStats {
336 items_emitted: items.len() as u64,
337 phases_completed: phases.len() as u64,
338 bytes_sent: 0,
339 errors: 0,
340 }
341 }
342 }
343
344 #[test]
345 fn test_mock_phase_sink_records_emissions() {
346 let mock = MockPhaseSink::new();
347 let item1 = serde_json::json!({"id": "V001", "name": "Acme Corp"});
348 let item2 = serde_json::json!({"id": "V002", "name": "Global Parts"});
349 mock.emit("master_data", "Vendor", &item1).unwrap();
350 mock.emit("master_data", "Vendor", &item2).unwrap();
351 mock.phase_complete("master_data").unwrap();
352
353 let items = mock.items.lock().unwrap();
354 assert_eq!(items.len(), 2);
355 assert_eq!(items[0].0, "master_data");
356 assert_eq!(items[0].1, "Vendor");
357 assert_eq!(items[1].2["name"], "Global Parts");
358
359 let phases = mock.completed_phases.lock().unwrap();
360 assert_eq!(phases.len(), 1);
361 assert_eq!(phases[0], "master_data");
362 }
363
364 #[test]
365 fn test_mock_phase_sink_multi_phase_emission() {
366 let mock = MockPhaseSink::new();
367 let je = serde_json::json!({"entry_id": "JE-001"});
368 let anomaly = serde_json::json!({"label": "DuplicateEntry"});
369
370 mock.emit("journal_entries", "JournalEntry", &je).unwrap();
371 mock.phase_complete("journal_entries").unwrap();
372 mock.emit("anomaly_injection", "LabeledAnomaly", &anomaly)
373 .unwrap();
374 mock.phase_complete("anomaly_injection").unwrap();
375 mock.flush().unwrap();
376
377 let stats = mock.stats();
378 assert_eq!(stats.items_emitted, 2);
379 assert_eq!(stats.phases_completed, 2);
380 assert!(*mock.flushed.lock().unwrap());
381
382 let items = mock.items.lock().unwrap();
383 assert_eq!(items[0].0, "journal_entries");
385 assert_eq!(items[1].0, "anomaly_injection");
386 }
387}