1use std::path::PathBuf;
8use std::sync::{Arc, Mutex};
9use std::time::Instant;
10
11pub trait PhaseSink: Send + Sync {
13 fn emit(
15 &self,
16 phase: &str,
17 item_type: &str,
18 item: &serde_json::Value,
19 ) -> Result<(), StreamError>;
20
21 fn phase_complete(&self, phase: &str) -> Result<(), StreamError>;
23
24 fn flush(&self) -> Result<(), StreamError>;
26
27 fn stats(&self) -> StreamStats;
29}
30
31#[derive(Debug, Clone, Default)]
33pub struct StreamStats {
34 pub items_emitted: u64,
36 pub bytes_sent: u64,
38 pub errors: u64,
40 pub phases_completed: u64,
42}
43
44#[derive(Debug, thiserror::Error)]
46pub enum StreamError {
47 #[error("IO error: {0}")]
49 Io(#[from] std::io::Error),
50
51 #[error("Serialization error: {0}")]
53 Serialization(String),
54
55 #[error("Connection error: {0}")]
57 Connection(String),
58
59 #[error("Backpressure: buffer full")]
61 BackpressureFull,
62}
63
64#[derive(Debug, Clone)]
66pub enum StreamTarget {
67 Http {
69 url: String,
71 api_key: Option<String>,
73 batch_size: usize,
75 },
76 File {
78 path: PathBuf,
80 },
81 None,
83}
84
85#[derive(Debug, Clone, Default)]
87pub enum BackpressureStrategy {
88 #[default]
90 Block,
91 DropOldest,
93 Buffer {
95 max_items: usize,
97 },
98}
99
100pub struct StreamPipeline {
102 target: StreamTarget,
103 stats: Arc<Mutex<StreamStats>>,
104 writer: Mutex<Option<Box<dyn std::io::Write + Send>>>,
105}
106
107impl StreamPipeline {
108 pub fn new(target: StreamTarget) -> Result<Self, StreamError> {
110 let writer: Option<Box<dyn std::io::Write + Send>> = match &target {
111 StreamTarget::File { path } => {
112 let file = std::fs::File::create(path)?;
113 Some(Box::new(std::io::BufWriter::new(file)))
114 }
115 StreamTarget::Http { .. } => None,
116 StreamTarget::None => None,
117 };
118 Ok(Self {
119 target,
120 stats: Arc::new(Mutex::new(StreamStats::default())),
121 writer: Mutex::new(writer),
122 })
123 }
124
125 pub fn none() -> Self {
127 Self {
128 target: StreamTarget::None,
129 stats: Arc::new(Mutex::new(StreamStats::default())),
130 writer: Mutex::new(None),
131 }
132 }
133
134 pub fn is_active(&self) -> bool {
136 !matches!(self.target, StreamTarget::None)
137 }
138}
139
140impl PhaseSink for StreamPipeline {
141 fn emit(
142 &self,
143 phase: &str,
144 item_type: &str,
145 item: &serde_json::Value,
146 ) -> Result<(), StreamError> {
147 if !self.is_active() {
148 return Ok(());
149 }
150
151 let envelope = serde_json::json!({
152 "phase": phase,
153 "item_type": item_type,
154 "data": item,
155 });
156 let json = serde_json::to_string(&envelope)
157 .map_err(|e| StreamError::Serialization(e.to_string()))?;
158 let bytes = json.len() as u64 + 1; if let Ok(mut writer_guard) = self.writer.lock() {
161 if let Some(writer) = writer_guard.as_mut() {
162 use std::io::Write;
163 writeln!(writer, "{json}")?;
164 }
165 }
166
167 if let Ok(mut stats) = self.stats.lock() {
168 stats.items_emitted += 1;
169 stats.bytes_sent += bytes;
170 }
171 Ok(())
172 }
173
174 fn phase_complete(&self, _phase: &str) -> Result<(), StreamError> {
175 if let Ok(mut stats) = self.stats.lock() {
176 stats.phases_completed += 1;
177 }
178 self.flush()
179 }
180
181 fn flush(&self) -> Result<(), StreamError> {
182 if let Ok(mut writer_guard) = self.writer.lock() {
183 if let Some(writer) = writer_guard.as_mut() {
184 use std::io::Write;
185 writer.flush()?;
186 }
187 }
188 Ok(())
189 }
190
191 fn stats(&self) -> StreamStats {
192 self.stats.lock().map(|s| s.clone()).unwrap_or_default()
193 }
194}
195
196pub struct RateLimitedPipeline {
211 inner: Box<dyn PhaseSink>,
212 limiter: Mutex<datasynth_core::rate_limit::RateLimiter>,
213 sequence: std::sync::atomic::AtomicU64,
214 progress_interval: u64,
215 start_time: Instant,
216}
217
218impl RateLimitedPipeline {
219 pub fn new(
225 inner: Box<dyn PhaseSink>,
226 events_per_second: f64,
227 burst_size: u32,
228 progress_interval: u64,
229 ) -> Self {
230 let config = if events_per_second > 0.0 {
231 datasynth_core::rate_limit::RateLimitConfig {
232 entities_per_second: events_per_second,
233 burst_size,
234 backpressure: datasynth_core::rate_limit::RateLimitBackpressure::Block,
235 enabled: true,
236 }
237 } else {
238 datasynth_core::rate_limit::RateLimitConfig {
239 enabled: false,
240 ..Default::default()
241 }
242 };
243
244 Self {
245 inner,
246 limiter: Mutex::new(datasynth_core::rate_limit::RateLimiter::new(config)),
247 sequence: std::sync::atomic::AtomicU64::new(0),
248 progress_interval,
249 start_time: Instant::now(),
250 }
251 }
252
253 pub fn set_rate(&self, events_per_second: f64) {
255 if let Ok(mut limiter) = self.limiter.lock() {
256 *limiter = datasynth_core::rate_limit::RateLimiter::new(
257 datasynth_core::rate_limit::RateLimitConfig {
258 entities_per_second: events_per_second,
259 burst_size: 100,
260 backpressure: datasynth_core::rate_limit::RateLimitBackpressure::Block,
261 enabled: events_per_second > 0.0,
262 },
263 );
264 }
265 }
266}
267
268impl PhaseSink for RateLimitedPipeline {
269 fn emit(
270 &self,
271 phase: &str,
272 item_type: &str,
273 item: &serde_json::Value,
274 ) -> Result<(), StreamError> {
275 if let Ok(mut limiter) = self.limiter.lock() {
277 limiter.acquire();
278 }
279
280 let seq = self
281 .sequence
282 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
283
284 self.inner.emit(phase, item_type, item)?;
286
287 if self.progress_interval > 0 && seq > 0 && seq.is_multiple_of(self.progress_interval) {
289 let elapsed = self.start_time.elapsed();
290 let rate = if elapsed.as_secs_f64() > 0.0 {
291 seq as f64 / elapsed.as_secs_f64()
292 } else {
293 0.0
294 };
295 let progress = serde_json::json!({
296 "type": "_progress",
297 "items_emitted": seq,
298 "rate_actual": (rate * 100.0).round() / 100.0,
299 "elapsed_ms": elapsed.as_millis() as u64,
300 });
301 self.inner.emit("_progress", "StreamProgress", &progress)?;
302 }
303
304 Ok(())
305 }
306
307 fn phase_complete(&self, phase: &str) -> Result<(), StreamError> {
308 self.inner.phase_complete(phase)
309 }
310
311 fn flush(&self) -> Result<(), StreamError> {
312 self.inner.flush()
313 }
314
315 fn stats(&self) -> StreamStats {
316 let mut stats = self.inner.stats();
317 stats.items_emitted = self.sequence.load(std::sync::atomic::Ordering::Relaxed);
318 stats
319 }
320}
321
322#[cfg(test)]
323#[allow(clippy::unwrap_used)]
324mod tests {
325 use super::*;
326
327 #[test]
328 fn test_none_pipeline_is_inactive() {
329 let pipeline = StreamPipeline::none();
330 assert!(!pipeline.is_active());
331 }
332
333 #[test]
334 fn test_none_pipeline_emit_is_noop() {
335 let pipeline = StreamPipeline::none();
336 let item = serde_json::json!({"id": "noop"});
337 pipeline.emit("phase", "Type", &item).unwrap();
338 let stats = pipeline.stats();
339 assert_eq!(stats.items_emitted, 0);
340 }
341
342 #[test]
343 fn test_file_pipeline_writes_jsonl() {
344 let tmp = std::env::temp_dir().join("test_stream_pipeline_writes.jsonl");
345 let pipeline = StreamPipeline::new(StreamTarget::File { path: tmp.clone() }).unwrap();
346 assert!(pipeline.is_active());
347 let item = serde_json::json!({"id": "test-001", "amount": 100.0});
348 pipeline
349 .emit("journal_entries", "JournalEntry", &item)
350 .unwrap();
351 pipeline.flush().unwrap();
352 let content = std::fs::read_to_string(&tmp).unwrap();
353 assert!(content.contains("test-001"));
354 assert!(content.contains("journal_entries"));
355 assert!(content.contains("JournalEntry"));
356 let _ = std::fs::remove_file(&tmp);
357 }
358
359 #[test]
360 fn test_stats_increment() {
361 let tmp = std::env::temp_dir().join("test_stream_pipeline_stats.jsonl");
362 let pipeline = StreamPipeline::new(StreamTarget::File { path: tmp.clone() }).unwrap();
363 let item = serde_json::json!({"id": 1});
364 pipeline.emit("phase1", "Item", &item).unwrap();
365 pipeline.emit("phase1", "Item", &item).unwrap();
366 pipeline.phase_complete("phase1").unwrap();
367 let stats = pipeline.stats();
368 assert_eq!(stats.items_emitted, 2);
369 assert_eq!(stats.phases_completed, 1);
370 assert!(stats.bytes_sent > 0);
371 let _ = std::fs::remove_file(&tmp);
372 }
373
374 #[test]
375 fn test_multiple_phases() {
376 let tmp = std::env::temp_dir().join("test_stream_pipeline_phases.jsonl");
377 let pipeline = StreamPipeline::new(StreamTarget::File { path: tmp.clone() }).unwrap();
378 let item = serde_json::json!({"id": 1});
379 pipeline.emit("phase1", "A", &item).unwrap();
380 pipeline.phase_complete("phase1").unwrap();
381 pipeline.emit("phase2", "B", &item).unwrap();
382 pipeline.phase_complete("phase2").unwrap();
383 let stats = pipeline.stats();
384 assert_eq!(stats.items_emitted, 2);
385 assert_eq!(stats.phases_completed, 2);
386 let _ = std::fs::remove_file(&tmp);
387 }
388
389 #[test]
390 fn test_file_output_is_valid_jsonl() {
391 let tmp = std::env::temp_dir().join("test_stream_pipeline_valid_jsonl.jsonl");
392 let pipeline = StreamPipeline::new(StreamTarget::File { path: tmp.clone() }).unwrap();
393 let item1 = serde_json::json!({"id": "a"});
394 let item2 = serde_json::json!({"id": "b"});
395 pipeline.emit("p", "T", &item1).unwrap();
396 pipeline.emit("p", "T", &item2).unwrap();
397 pipeline.flush().unwrap();
398 let content = std::fs::read_to_string(&tmp).unwrap();
399 for line in content.lines() {
400 let parsed: serde_json::Value =
401 serde_json::from_str(line).expect("each line should be valid JSON");
402 assert!(parsed.get("phase").is_some());
403 assert!(parsed.get("item_type").is_some());
404 assert!(parsed.get("data").is_some());
405 }
406 let _ = std::fs::remove_file(&tmp);
407 }
408
409 #[test]
410 fn test_backpressure_strategy_default() {
411 let strategy = BackpressureStrategy::default();
412 assert!(matches!(strategy, BackpressureStrategy::Block));
413 }
414
415 pub struct MockPhaseSink {
417 pub items: Mutex<Vec<(String, String, serde_json::Value)>>,
418 pub completed_phases: Mutex<Vec<String>>,
419 pub flushed: Mutex<bool>,
420 }
421
422 impl MockPhaseSink {
423 pub fn new() -> Self {
424 Self {
425 items: Mutex::new(Vec::new()),
426 completed_phases: Mutex::new(Vec::new()),
427 flushed: Mutex::new(false),
428 }
429 }
430 }
431
432 impl PhaseSink for MockPhaseSink {
433 fn emit(
434 &self,
435 phase: &str,
436 item_type: &str,
437 item: &serde_json::Value,
438 ) -> Result<(), StreamError> {
439 self.items.lock().unwrap().push((
440 phase.to_string(),
441 item_type.to_string(),
442 item.clone(),
443 ));
444 Ok(())
445 }
446
447 fn phase_complete(&self, phase: &str) -> Result<(), StreamError> {
448 self.completed_phases
449 .lock()
450 .unwrap()
451 .push(phase.to_string());
452 Ok(())
453 }
454
455 fn flush(&self) -> Result<(), StreamError> {
456 *self.flushed.lock().unwrap() = true;
457 Ok(())
458 }
459
460 fn stats(&self) -> StreamStats {
461 let items = self.items.lock().unwrap();
462 let phases = self.completed_phases.lock().unwrap();
463 StreamStats {
464 items_emitted: items.len() as u64,
465 phases_completed: phases.len() as u64,
466 bytes_sent: 0,
467 errors: 0,
468 }
469 }
470 }
471
472 #[test]
473 fn test_mock_phase_sink_records_emissions() {
474 let mock = MockPhaseSink::new();
475 let item1 = serde_json::json!({"id": "V001", "name": "Acme Corp"});
476 let item2 = serde_json::json!({"id": "V002", "name": "Global Parts"});
477 mock.emit("master_data", "Vendor", &item1).unwrap();
478 mock.emit("master_data", "Vendor", &item2).unwrap();
479 mock.phase_complete("master_data").unwrap();
480
481 let items = mock.items.lock().unwrap();
482 assert_eq!(items.len(), 2);
483 assert_eq!(items[0].0, "master_data");
484 assert_eq!(items[0].1, "Vendor");
485 assert_eq!(items[1].2["name"], "Global Parts");
486
487 let phases = mock.completed_phases.lock().unwrap();
488 assert_eq!(phases.len(), 1);
489 assert_eq!(phases[0], "master_data");
490 }
491
492 #[test]
493 fn test_mock_phase_sink_multi_phase_emission() {
494 let mock = MockPhaseSink::new();
495 let je = serde_json::json!({"entry_id": "JE-001"});
496 let anomaly = serde_json::json!({"label": "DuplicateEntry"});
497
498 mock.emit("journal_entries", "JournalEntry", &je).unwrap();
499 mock.phase_complete("journal_entries").unwrap();
500 mock.emit("anomaly_injection", "LabeledAnomaly", &anomaly)
501 .unwrap();
502 mock.phase_complete("anomaly_injection").unwrap();
503 mock.flush().unwrap();
504
505 let stats = mock.stats();
506 assert_eq!(stats.items_emitted, 2);
507 assert_eq!(stats.phases_completed, 2);
508 assert!(*mock.flushed.lock().unwrap());
509
510 let items = mock.items.lock().unwrap();
511 assert_eq!(items[0].0, "journal_entries");
513 assert_eq!(items[1].0, "anomaly_injection");
514 }
515
516 #[test]
517 fn test_rate_limited_pipeline_emits_and_tracks_sequence() {
518 let mock = MockPhaseSink::new();
519 let pipeline = RateLimitedPipeline::new(
520 Box::new(mock),
521 0.0, 100,
523 0, );
525 let item = serde_json::json!({"id": "test"});
526 pipeline.emit("phase", "Type", &item).unwrap();
527 pipeline.emit("phase", "Type", &item).unwrap();
528 pipeline.emit("phase", "Type", &item).unwrap();
529
530 let stats = pipeline.stats();
531 assert_eq!(stats.items_emitted, 3);
532 }
533
534 #[test]
535 fn test_rate_limited_pipeline_emits_progress() {
536 let mock = MockPhaseSink::new();
537 let pipeline = RateLimitedPipeline::new(
538 Box::new(mock),
539 0.0, 100,
541 5, );
543 let item = serde_json::json!({"id": "test"});
544 for _ in 0..10 {
545 pipeline.emit("phase", "Type", &item).unwrap();
546 }
547
548 let stats = pipeline.stats();
549 assert_eq!(stats.items_emitted, 10);
551 }
552
553 #[test]
554 fn test_rate_limited_pipeline_respects_rate() {
555 let mock = MockPhaseSink::new();
556 let pipeline = RateLimitedPipeline::new(
557 Box::new(mock),
558 100.0, 10,
560 0,
561 );
562 let item = serde_json::json!({"id": "test"});
563 let start = Instant::now();
564 for _ in 0..15 {
566 pipeline.emit("phase", "Type", &item).unwrap();
567 }
568 let elapsed = start.elapsed();
569 assert!(
571 elapsed.as_millis() >= 30,
572 "expected rate limiting, got {:?}",
573 elapsed
574 );
575 }
576
577 #[test]
578 fn test_rate_limited_pipeline_dynamic_rate_change() {
579 let mock = MockPhaseSink::new();
580 let pipeline = RateLimitedPipeline::new(
581 Box::new(mock),
582 0.0, 100,
584 0,
585 );
586 let item = serde_json::json!({"id": "test"});
587 pipeline.emit("phase", "Type", &item).unwrap();
588
589 pipeline.set_rate(50.0);
591
592 pipeline.emit("phase", "Type", &item).unwrap();
594 assert_eq!(pipeline.stats().items_emitted, 2);
595 }
596}