Skip to main content

datasynth_runtime/
stream_pipeline.rs

1//! Phase-aware streaming pipeline for real-time data emission.
2//!
3//! [`StreamPipeline`] implements the [`PhaseSink`] trait, allowing generated
4//! data to be streamed to files or HTTP endpoints as it is produced, rather
5//! than buffering everything in memory.
6
7use std::path::PathBuf;
8use std::sync::{Arc, Mutex};
9use std::time::Instant;
10
11/// Trait for sinks that receive generated items phase-by-phase.
12pub trait PhaseSink: Send + Sync {
13    /// Emit a single generated item.
14    fn emit(
15        &self,
16        phase: &str,
17        item_type: &str,
18        item: &serde_json::Value,
19    ) -> Result<(), StreamError>;
20
21    /// Signal that a generation phase has completed.
22    fn phase_complete(&self, phase: &str) -> Result<(), StreamError>;
23
24    /// Flush any buffered data to the underlying sink.
25    fn flush(&self) -> Result<(), StreamError>;
26
27    /// Return current streaming statistics.
28    fn stats(&self) -> StreamStats;
29}
30
31/// Accumulated statistics for a streaming pipeline.
32#[derive(Debug, Clone, Default)]
33pub struct StreamStats {
34    /// Total items emitted across all phases.
35    pub items_emitted: u64,
36    /// Total bytes written/sent.
37    pub bytes_sent: u64,
38    /// Number of errors encountered.
39    pub errors: u64,
40    /// Number of phases that have completed.
41    pub phases_completed: u64,
42}
43
44/// Errors that can occur during streaming.
45#[derive(Debug, thiserror::Error)]
46pub enum StreamError {
47    /// An I/O error occurred.
48    #[error("IO error: {0}")]
49    Io(#[from] std::io::Error),
50
51    /// Serialization failed.
52    #[error("Serialization error: {0}")]
53    Serialization(String),
54
55    /// Connection to the remote endpoint failed or was lost.
56    #[error("Connection error: {0}")]
57    Connection(String),
58
59    /// The internal buffer is full and the backpressure strategy is to reject.
60    #[error("Backpressure: buffer full")]
61    BackpressureFull,
62}
63
64/// Where the stream sends its data.
65#[derive(Debug, Clone)]
66pub enum StreamTarget {
67    /// Send JSONL to an HTTP endpoint.
68    Http {
69        /// Target URL.
70        url: String,
71        /// Optional API key for authentication.
72        api_key: Option<String>,
73        /// Number of items to batch before sending.
74        batch_size: usize,
75    },
76    /// Append JSONL to a local file.
77    File {
78        /// Path to the output file.
79        path: PathBuf,
80    },
81    /// Discard all output (no-op sink).
82    None,
83}
84
85/// Strategy for handling back-pressure when the sink cannot keep up.
86#[derive(Debug, Clone, Default)]
87pub enum BackpressureStrategy {
88    /// Block the producer until the sink is ready.
89    #[default]
90    Block,
91    /// Drop the oldest buffered items to make room.
92    DropOldest,
93    /// Buffer up to `max_items` before applying back-pressure.
94    Buffer {
95        /// Maximum number of items to buffer.
96        max_items: usize,
97    },
98}
99
100/// A streaming pipeline that writes generated data as JSONL envelopes.
101pub struct StreamPipeline {
102    target: StreamTarget,
103    stats: Arc<Mutex<StreamStats>>,
104    writer: Mutex<Option<Box<dyn std::io::Write + Send>>>,
105}
106
107impl StreamPipeline {
108    /// Create a new pipeline for the given target.
109    pub fn new(target: StreamTarget) -> Result<Self, StreamError> {
110        let writer: Option<Box<dyn std::io::Write + Send>> = match &target {
111            StreamTarget::File { path } => {
112                let file = std::fs::File::create(path)?;
113                Some(Box::new(std::io::BufWriter::new(file)))
114            }
115            StreamTarget::Http { .. } => None,
116            StreamTarget::None => None,
117        };
118        Ok(Self {
119            target,
120            stats: Arc::new(Mutex::new(StreamStats::default())),
121            writer: Mutex::new(writer),
122        })
123    }
124
125    /// Create a no-op pipeline that discards all output.
126    pub fn none() -> Self {
127        Self {
128            target: StreamTarget::None,
129            stats: Arc::new(Mutex::new(StreamStats::default())),
130            writer: Mutex::new(None),
131        }
132    }
133
134    /// Returns `true` if this pipeline will actually emit data.
135    pub fn is_active(&self) -> bool {
136        !matches!(self.target, StreamTarget::None)
137    }
138}
139
140impl PhaseSink for StreamPipeline {
141    fn emit(
142        &self,
143        phase: &str,
144        item_type: &str,
145        item: &serde_json::Value,
146    ) -> Result<(), StreamError> {
147        if !self.is_active() {
148            return Ok(());
149        }
150
151        let envelope = serde_json::json!({
152            "phase": phase,
153            "item_type": item_type,
154            "data": item,
155        });
156        let json = serde_json::to_string(&envelope)
157            .map_err(|e| StreamError::Serialization(e.to_string()))?;
158        let bytes = json.len() as u64 + 1; // +1 for newline
159
160        if let Ok(mut writer_guard) = self.writer.lock() {
161            if let Some(writer) = writer_guard.as_mut() {
162                use std::io::Write;
163                writeln!(writer, "{json}")?;
164            }
165        }
166
167        if let Ok(mut stats) = self.stats.lock() {
168            stats.items_emitted += 1;
169            stats.bytes_sent += bytes;
170        }
171        Ok(())
172    }
173
174    fn phase_complete(&self, _phase: &str) -> Result<(), StreamError> {
175        if let Ok(mut stats) = self.stats.lock() {
176            stats.phases_completed += 1;
177        }
178        self.flush()
179    }
180
181    fn flush(&self) -> Result<(), StreamError> {
182        if let Ok(mut writer_guard) = self.writer.lock() {
183            if let Some(writer) = writer_guard.as_mut() {
184                use std::io::Write;
185                writer.flush()?;
186            }
187        }
188        Ok(())
189    }
190
191    fn stats(&self) -> StreamStats {
192        self.stats.lock().map(|s| s.clone()).unwrap_or_default()
193    }
194}
195
196/// A rate-limited wrapper around any [`PhaseSink`].
197///
198/// Uses a token bucket `RateLimiter` to control emission rate. Each call to
199/// `emit()` acquires a token before forwarding to the inner sink, blocking if
200/// the rate is exceeded.
201///
202/// Also adds a monotonic sequence number and optional progress events to the
203/// output stream.
204///
205/// **Thread safety note:** The rate limiter may sleep while holding an internal
206/// mutex. This is correct for single-threaded generation (the current model)
207/// but would serialize all callers if `emit()` is called from multiple threads
208/// concurrently. If multi-threaded emission is needed in the future, the
209/// lock-then-sleep pattern must be restructured.
210pub struct RateLimitedPipeline {
211    inner: Box<dyn PhaseSink>,
212    limiter: Mutex<datasynth_core::rate_limit::RateLimiter>,
213    sequence: std::sync::atomic::AtomicU64,
214    progress_interval: u64,
215    start_time: Instant,
216}
217
218impl RateLimitedPipeline {
219    /// Wrap a `PhaseSink` with rate limiting.
220    ///
221    /// - `events_per_second`: target rate (0 = unlimited)
222    /// - `burst_size`: token bucket burst capacity
223    /// - `progress_interval`: emit a `_progress` event every N items (0 = disabled)
224    pub fn new(
225        inner: Box<dyn PhaseSink>,
226        events_per_second: f64,
227        burst_size: u32,
228        progress_interval: u64,
229    ) -> Self {
230        let config = if events_per_second > 0.0 {
231            datasynth_core::rate_limit::RateLimitConfig {
232                entities_per_second: events_per_second,
233                burst_size,
234                backpressure: datasynth_core::rate_limit::RateLimitBackpressure::Block,
235                enabled: true,
236            }
237        } else {
238            datasynth_core::rate_limit::RateLimitConfig {
239                enabled: false,
240                ..Default::default()
241            }
242        };
243
244        Self {
245            inner,
246            limiter: Mutex::new(datasynth_core::rate_limit::RateLimiter::new(config)),
247            sequence: std::sync::atomic::AtomicU64::new(0),
248            progress_interval,
249            start_time: Instant::now(),
250        }
251    }
252
253    /// Update the rate limit dynamically (e.g., from a REST endpoint).
254    pub fn set_rate(&self, events_per_second: f64) {
255        if let Ok(mut limiter) = self.limiter.lock() {
256            *limiter = datasynth_core::rate_limit::RateLimiter::new(
257                datasynth_core::rate_limit::RateLimitConfig {
258                    entities_per_second: events_per_second,
259                    burst_size: 100,
260                    backpressure: datasynth_core::rate_limit::RateLimitBackpressure::Block,
261                    enabled: events_per_second > 0.0,
262                },
263            );
264        }
265    }
266}
267
268impl PhaseSink for RateLimitedPipeline {
269    fn emit(
270        &self,
271        phase: &str,
272        item_type: &str,
273        item: &serde_json::Value,
274    ) -> Result<(), StreamError> {
275        // Acquire a token (blocks if rate exceeded)
276        if let Ok(mut limiter) = self.limiter.lock() {
277            limiter.acquire();
278        }
279
280        let seq = self
281            .sequence
282            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
283
284        // Write through to inner sink
285        self.inner.emit(phase, item_type, item)?;
286
287        // Periodic progress events
288        if self.progress_interval > 0 && seq > 0 && seq.is_multiple_of(self.progress_interval) {
289            let elapsed = self.start_time.elapsed();
290            let rate = if elapsed.as_secs_f64() > 0.0 {
291                seq as f64 / elapsed.as_secs_f64()
292            } else {
293                0.0
294            };
295            let progress = serde_json::json!({
296                "type": "_progress",
297                "items_emitted": seq,
298                "rate_actual": (rate * 100.0).round() / 100.0,
299                "elapsed_ms": elapsed.as_millis() as u64,
300            });
301            self.inner.emit("_progress", "StreamProgress", &progress)?;
302        }
303
304        Ok(())
305    }
306
307    fn phase_complete(&self, phase: &str) -> Result<(), StreamError> {
308        self.inner.phase_complete(phase)
309    }
310
311    fn flush(&self) -> Result<(), StreamError> {
312        self.inner.flush()
313    }
314
315    fn stats(&self) -> StreamStats {
316        let mut stats = self.inner.stats();
317        stats.items_emitted = self.sequence.load(std::sync::atomic::Ordering::Relaxed);
318        stats
319    }
320}
321
322#[cfg(test)]
323mod tests {
324    use super::*;
325
326    #[test]
327    fn test_none_pipeline_is_inactive() {
328        let pipeline = StreamPipeline::none();
329        assert!(!pipeline.is_active());
330    }
331
332    #[test]
333    fn test_none_pipeline_emit_is_noop() {
334        let pipeline = StreamPipeline::none();
335        let item = serde_json::json!({"id": "noop"});
336        pipeline.emit("phase", "Type", &item).unwrap();
337        let stats = pipeline.stats();
338        assert_eq!(stats.items_emitted, 0);
339    }
340
341    #[test]
342    fn test_file_pipeline_writes_jsonl() {
343        let tmp = std::env::temp_dir().join("test_stream_pipeline_writes.jsonl");
344        let pipeline = StreamPipeline::new(StreamTarget::File { path: tmp.clone() }).unwrap();
345        assert!(pipeline.is_active());
346        let item = serde_json::json!({"id": "test-001", "amount": 100.0});
347        pipeline
348            .emit("journal_entries", "JournalEntry", &item)
349            .unwrap();
350        pipeline.flush().unwrap();
351        let content = std::fs::read_to_string(&tmp).unwrap();
352        assert!(content.contains("test-001"));
353        assert!(content.contains("journal_entries"));
354        assert!(content.contains("JournalEntry"));
355        let _ = std::fs::remove_file(&tmp);
356    }
357
358    #[test]
359    fn test_stats_increment() {
360        let tmp = std::env::temp_dir().join("test_stream_pipeline_stats.jsonl");
361        let pipeline = StreamPipeline::new(StreamTarget::File { path: tmp.clone() }).unwrap();
362        let item = serde_json::json!({"id": 1});
363        pipeline.emit("phase1", "Item", &item).unwrap();
364        pipeline.emit("phase1", "Item", &item).unwrap();
365        pipeline.phase_complete("phase1").unwrap();
366        let stats = pipeline.stats();
367        assert_eq!(stats.items_emitted, 2);
368        assert_eq!(stats.phases_completed, 1);
369        assert!(stats.bytes_sent > 0);
370        let _ = std::fs::remove_file(&tmp);
371    }
372
373    #[test]
374    fn test_multiple_phases() {
375        let tmp = std::env::temp_dir().join("test_stream_pipeline_phases.jsonl");
376        let pipeline = StreamPipeline::new(StreamTarget::File { path: tmp.clone() }).unwrap();
377        let item = serde_json::json!({"id": 1});
378        pipeline.emit("phase1", "A", &item).unwrap();
379        pipeline.phase_complete("phase1").unwrap();
380        pipeline.emit("phase2", "B", &item).unwrap();
381        pipeline.phase_complete("phase2").unwrap();
382        let stats = pipeline.stats();
383        assert_eq!(stats.items_emitted, 2);
384        assert_eq!(stats.phases_completed, 2);
385        let _ = std::fs::remove_file(&tmp);
386    }
387
388    #[test]
389    fn test_file_output_is_valid_jsonl() {
390        let tmp = std::env::temp_dir().join("test_stream_pipeline_valid_jsonl.jsonl");
391        let pipeline = StreamPipeline::new(StreamTarget::File { path: tmp.clone() }).unwrap();
392        let item1 = serde_json::json!({"id": "a"});
393        let item2 = serde_json::json!({"id": "b"});
394        pipeline.emit("p", "T", &item1).unwrap();
395        pipeline.emit("p", "T", &item2).unwrap();
396        pipeline.flush().unwrap();
397        let content = std::fs::read_to_string(&tmp).unwrap();
398        for line in content.lines() {
399            let parsed: serde_json::Value =
400                serde_json::from_str(line).expect("each line should be valid JSON");
401            assert!(parsed.get("phase").is_some());
402            assert!(parsed.get("item_type").is_some());
403            assert!(parsed.get("data").is_some());
404        }
405        let _ = std::fs::remove_file(&tmp);
406    }
407
408    #[test]
409    fn test_backpressure_strategy_default() {
410        let strategy = BackpressureStrategy::default();
411        assert!(matches!(strategy, BackpressureStrategy::Block));
412    }
413
414    /// A mock PhaseSink that records all emitted items for testing.
415    pub struct MockPhaseSink {
416        pub items: Mutex<Vec<(String, String, serde_json::Value)>>,
417        pub completed_phases: Mutex<Vec<String>>,
418        pub flushed: Mutex<bool>,
419    }
420
421    impl MockPhaseSink {
422        pub fn new() -> Self {
423            Self {
424                items: Mutex::new(Vec::new()),
425                completed_phases: Mutex::new(Vec::new()),
426                flushed: Mutex::new(false),
427            }
428        }
429    }
430
431    impl PhaseSink for MockPhaseSink {
432        fn emit(
433            &self,
434            phase: &str,
435            item_type: &str,
436            item: &serde_json::Value,
437        ) -> Result<(), StreamError> {
438            self.items.lock().unwrap().push((
439                phase.to_string(),
440                item_type.to_string(),
441                item.clone(),
442            ));
443            Ok(())
444        }
445
446        fn phase_complete(&self, phase: &str) -> Result<(), StreamError> {
447            self.completed_phases
448                .lock()
449                .unwrap()
450                .push(phase.to_string());
451            Ok(())
452        }
453
454        fn flush(&self) -> Result<(), StreamError> {
455            *self.flushed.lock().unwrap() = true;
456            Ok(())
457        }
458
459        fn stats(&self) -> StreamStats {
460            let items = self.items.lock().unwrap();
461            let phases = self.completed_phases.lock().unwrap();
462            StreamStats {
463                items_emitted: items.len() as u64,
464                phases_completed: phases.len() as u64,
465                bytes_sent: 0,
466                errors: 0,
467            }
468        }
469    }
470
471    #[test]
472    fn test_mock_phase_sink_records_emissions() {
473        let mock = MockPhaseSink::new();
474        let item1 = serde_json::json!({"id": "V001", "name": "Acme Corp"});
475        let item2 = serde_json::json!({"id": "V002", "name": "Global Parts"});
476        mock.emit("master_data", "Vendor", &item1).unwrap();
477        mock.emit("master_data", "Vendor", &item2).unwrap();
478        mock.phase_complete("master_data").unwrap();
479
480        let items = mock.items.lock().unwrap();
481        assert_eq!(items.len(), 2);
482        assert_eq!(items[0].0, "master_data");
483        assert_eq!(items[0].1, "Vendor");
484        assert_eq!(items[1].2["name"], "Global Parts");
485
486        let phases = mock.completed_phases.lock().unwrap();
487        assert_eq!(phases.len(), 1);
488        assert_eq!(phases[0], "master_data");
489    }
490
491    #[test]
492    fn test_mock_phase_sink_multi_phase_emission() {
493        let mock = MockPhaseSink::new();
494        let je = serde_json::json!({"entry_id": "JE-001"});
495        let anomaly = serde_json::json!({"label": "DuplicateEntry"});
496
497        mock.emit("journal_entries", "JournalEntry", &je).unwrap();
498        mock.phase_complete("journal_entries").unwrap();
499        mock.emit("anomaly_injection", "LabeledAnomaly", &anomaly)
500            .unwrap();
501        mock.phase_complete("anomaly_injection").unwrap();
502        mock.flush().unwrap();
503
504        let stats = mock.stats();
505        assert_eq!(stats.items_emitted, 2);
506        assert_eq!(stats.phases_completed, 2);
507        assert!(*mock.flushed.lock().unwrap());
508
509        let items = mock.items.lock().unwrap();
510        // Verify items from different phases are properly tagged
511        assert_eq!(items[0].0, "journal_entries");
512        assert_eq!(items[1].0, "anomaly_injection");
513    }
514
515    #[test]
516    fn test_rate_limited_pipeline_emits_and_tracks_sequence() {
517        let mock = MockPhaseSink::new();
518        let pipeline = RateLimitedPipeline::new(
519            Box::new(mock),
520            0.0, // unlimited
521            100,
522            0, // no progress events
523        );
524        let item = serde_json::json!({"id": "test"});
525        pipeline.emit("phase", "Type", &item).unwrap();
526        pipeline.emit("phase", "Type", &item).unwrap();
527        pipeline.emit("phase", "Type", &item).unwrap();
528
529        let stats = pipeline.stats();
530        assert_eq!(stats.items_emitted, 3);
531    }
532
533    #[test]
534    fn test_rate_limited_pipeline_emits_progress() {
535        let mock = MockPhaseSink::new();
536        let pipeline = RateLimitedPipeline::new(
537            Box::new(mock),
538            0.0, // unlimited
539            100,
540            5, // progress every 5 items
541        );
542        let item = serde_json::json!({"id": "test"});
543        for _ in 0..10 {
544            pipeline.emit("phase", "Type", &item).unwrap();
545        }
546
547        let stats = pipeline.stats();
548        // Outer sequence counter reports 10 (progress events forwarded to inner sink, not counted)
549        assert_eq!(stats.items_emitted, 10);
550    }
551
552    #[test]
553    fn test_rate_limited_pipeline_respects_rate() {
554        let mock = MockPhaseSink::new();
555        let pipeline = RateLimitedPipeline::new(
556            Box::new(mock),
557            100.0, // 100 events/sec
558            10,
559            0,
560        );
561        let item = serde_json::json!({"id": "test"});
562        let start = Instant::now();
563        // Emit 15 items at 100/sec with burst=10 — first 10 are instant, next 5 take ~50ms
564        for _ in 0..15 {
565            pipeline.emit("phase", "Type", &item).unwrap();
566        }
567        let elapsed = start.elapsed();
568        // Should take at least 40ms (5 items beyond burst at 10ms each)
569        assert!(
570            elapsed.as_millis() >= 30,
571            "expected rate limiting, got {:?}",
572            elapsed
573        );
574    }
575
576    #[test]
577    fn test_rate_limited_pipeline_dynamic_rate_change() {
578        let mock = MockPhaseSink::new();
579        let pipeline = RateLimitedPipeline::new(
580            Box::new(mock),
581            0.0, // start unlimited
582            100,
583            0,
584        );
585        let item = serde_json::json!({"id": "test"});
586        pipeline.emit("phase", "Type", &item).unwrap();
587
588        // Change to limited
589        pipeline.set_rate(50.0);
590
591        // Still works
592        pipeline.emit("phase", "Type", &item).unwrap();
593        assert_eq!(pipeline.stats().items_emitted, 2);
594    }
595}