Skip to main content

datasynth_runtime/
stream_pipeline.rs

1//! Phase-aware streaming pipeline for real-time data emission.
2//!
3//! [`StreamPipeline`] implements the [`PhaseSink`] trait, allowing generated
4//! data to be streamed to files or HTTP endpoints as it is produced, rather
5//! than buffering everything in memory.
6
7use std::path::PathBuf;
8use std::sync::{Arc, Mutex};
9use std::time::Instant;
10
11/// Trait for sinks that receive generated items phase-by-phase.
12pub trait PhaseSink: Send + Sync {
13    /// Emit a single generated item.
14    fn emit(
15        &self,
16        phase: &str,
17        item_type: &str,
18        item: &serde_json::Value,
19    ) -> Result<(), StreamError>;
20
21    /// Signal that a generation phase has completed.
22    fn phase_complete(&self, phase: &str) -> Result<(), StreamError>;
23
24    /// Flush any buffered data to the underlying sink.
25    fn flush(&self) -> Result<(), StreamError>;
26
27    /// Return current streaming statistics.
28    fn stats(&self) -> StreamStats;
29}
30
31/// Accumulated statistics for a streaming pipeline.
32#[derive(Debug, Clone, Default)]
33pub struct StreamStats {
34    /// Total items emitted across all phases.
35    pub items_emitted: u64,
36    /// Total bytes written/sent.
37    pub bytes_sent: u64,
38    /// Number of errors encountered.
39    pub errors: u64,
40    /// Number of phases that have completed.
41    pub phases_completed: u64,
42}
43
44/// Errors that can occur during streaming.
45#[derive(Debug, thiserror::Error)]
46pub enum StreamError {
47    /// An I/O error occurred.
48    #[error("IO error: {0}")]
49    Io(#[from] std::io::Error),
50
51    /// Serialization failed.
52    #[error("Serialization error: {0}")]
53    Serialization(String),
54
55    /// Connection to the remote endpoint failed or was lost.
56    #[error("Connection error: {0}")]
57    Connection(String),
58
59    /// The internal buffer is full and the backpressure strategy is to reject.
60    #[error("Backpressure: buffer full")]
61    BackpressureFull,
62}
63
64/// Where the stream sends its data.
65#[derive(Debug, Clone)]
66pub enum StreamTarget {
67    /// Send JSONL to an HTTP endpoint.
68    Http {
69        /// Target URL.
70        url: String,
71        /// Optional API key for authentication.
72        api_key: Option<String>,
73        /// Number of items to batch before sending.
74        batch_size: usize,
75    },
76    /// Append JSONL to a local file.
77    File {
78        /// Path to the output file.
79        path: PathBuf,
80    },
81    /// Discard all output (no-op sink).
82    None,
83}
84
85/// Strategy for handling back-pressure when the sink cannot keep up.
86#[derive(Debug, Clone, Default)]
87pub enum BackpressureStrategy {
88    /// Block the producer until the sink is ready.
89    #[default]
90    Block,
91    /// Drop the oldest buffered items to make room.
92    DropOldest,
93    /// Buffer up to `max_items` before applying back-pressure.
94    Buffer {
95        /// Maximum number of items to buffer.
96        max_items: usize,
97    },
98}
99
100/// A streaming pipeline that writes generated data as JSONL envelopes.
101pub struct StreamPipeline {
102    target: StreamTarget,
103    stats: Arc<Mutex<StreamStats>>,
104    writer: Mutex<Option<Box<dyn std::io::Write + Send>>>,
105}
106
107impl StreamPipeline {
108    /// Create a new pipeline for the given target.
109    pub fn new(target: StreamTarget) -> Result<Self, StreamError> {
110        let writer: Option<Box<dyn std::io::Write + Send>> = match &target {
111            StreamTarget::File { path } => {
112                let file = std::fs::File::create(path)?;
113                Some(Box::new(std::io::BufWriter::new(file)))
114            }
115            StreamTarget::Http { .. } => None,
116            StreamTarget::None => None,
117        };
118        Ok(Self {
119            target,
120            stats: Arc::new(Mutex::new(StreamStats::default())),
121            writer: Mutex::new(writer),
122        })
123    }
124
125    /// Create a no-op pipeline that discards all output.
126    pub fn none() -> Self {
127        Self {
128            target: StreamTarget::None,
129            stats: Arc::new(Mutex::new(StreamStats::default())),
130            writer: Mutex::new(None),
131        }
132    }
133
134    /// Returns `true` if this pipeline will actually emit data.
135    pub fn is_active(&self) -> bool {
136        !matches!(self.target, StreamTarget::None)
137    }
138}
139
140impl PhaseSink for StreamPipeline {
141    fn emit(
142        &self,
143        phase: &str,
144        item_type: &str,
145        item: &serde_json::Value,
146    ) -> Result<(), StreamError> {
147        if !self.is_active() {
148            return Ok(());
149        }
150
151        let envelope = serde_json::json!({
152            "phase": phase,
153            "item_type": item_type,
154            "data": item,
155        });
156        let json = serde_json::to_string(&envelope)
157            .map_err(|e| StreamError::Serialization(e.to_string()))?;
158        let bytes = json.len() as u64 + 1; // +1 for newline
159
160        if let Ok(mut writer_guard) = self.writer.lock() {
161            if let Some(writer) = writer_guard.as_mut() {
162                use std::io::Write;
163                writeln!(writer, "{json}")?;
164            }
165        }
166
167        if let Ok(mut stats) = self.stats.lock() {
168            stats.items_emitted += 1;
169            stats.bytes_sent += bytes;
170        }
171        Ok(())
172    }
173
174    fn phase_complete(&self, _phase: &str) -> Result<(), StreamError> {
175        if let Ok(mut stats) = self.stats.lock() {
176            stats.phases_completed += 1;
177        }
178        self.flush()
179    }
180
181    fn flush(&self) -> Result<(), StreamError> {
182        if let Ok(mut writer_guard) = self.writer.lock() {
183            if let Some(writer) = writer_guard.as_mut() {
184                use std::io::Write;
185                writer.flush()?;
186            }
187        }
188        Ok(())
189    }
190
191    fn stats(&self) -> StreamStats {
192        self.stats.lock().map(|s| s.clone()).unwrap_or_default()
193    }
194}
195
196/// A rate-limited wrapper around any [`PhaseSink`].
197///
198/// Uses a token bucket [`RateLimiter`] to control emission rate. Each call to
199/// `emit()` acquires a token before forwarding to the inner sink, blocking if
200/// the rate is exceeded.
201///
202/// Also adds a monotonic sequence number and optional progress events to the
203/// output stream.
204///
205/// **Thread safety note:** The rate limiter may sleep while holding an internal
206/// mutex. This is correct for single-threaded generation (the current model)
207/// but would serialize all callers if `emit()` is called from multiple threads
208/// concurrently. If multi-threaded emission is needed in the future, the
209/// lock-then-sleep pattern must be restructured.
210pub struct RateLimitedPipeline {
211    inner: Box<dyn PhaseSink>,
212    limiter: Mutex<datasynth_core::rate_limit::RateLimiter>,
213    sequence: std::sync::atomic::AtomicU64,
214    progress_interval: u64,
215    start_time: Instant,
216}
217
218impl RateLimitedPipeline {
219    /// Wrap a `PhaseSink` with rate limiting.
220    ///
221    /// - `events_per_second`: target rate (0 = unlimited)
222    /// - `burst_size`: token bucket burst capacity
223    /// - `progress_interval`: emit a `_progress` event every N items (0 = disabled)
224    pub fn new(
225        inner: Box<dyn PhaseSink>,
226        events_per_second: f64,
227        burst_size: u32,
228        progress_interval: u64,
229    ) -> Self {
230        let config = if events_per_second > 0.0 {
231            datasynth_core::rate_limit::RateLimitConfig {
232                entities_per_second: events_per_second,
233                burst_size,
234                backpressure: datasynth_core::rate_limit::RateLimitBackpressure::Block,
235                enabled: true,
236            }
237        } else {
238            datasynth_core::rate_limit::RateLimitConfig {
239                enabled: false,
240                ..Default::default()
241            }
242        };
243
244        Self {
245            inner,
246            limiter: Mutex::new(datasynth_core::rate_limit::RateLimiter::new(config)),
247            sequence: std::sync::atomic::AtomicU64::new(0),
248            progress_interval,
249            start_time: Instant::now(),
250        }
251    }
252
253    /// Update the rate limit dynamically (e.g., from a REST endpoint).
254    pub fn set_rate(&self, events_per_second: f64) {
255        if let Ok(mut limiter) = self.limiter.lock() {
256            *limiter = datasynth_core::rate_limit::RateLimiter::new(
257                datasynth_core::rate_limit::RateLimitConfig {
258                    entities_per_second: events_per_second,
259                    burst_size: 100,
260                    backpressure: datasynth_core::rate_limit::RateLimitBackpressure::Block,
261                    enabled: events_per_second > 0.0,
262                },
263            );
264        }
265    }
266}
267
268impl PhaseSink for RateLimitedPipeline {
269    fn emit(
270        &self,
271        phase: &str,
272        item_type: &str,
273        item: &serde_json::Value,
274    ) -> Result<(), StreamError> {
275        // Acquire a token (blocks if rate exceeded)
276        if let Ok(mut limiter) = self.limiter.lock() {
277            limiter.acquire();
278        }
279
280        let seq = self
281            .sequence
282            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
283
284        // Write through to inner sink
285        self.inner.emit(phase, item_type, item)?;
286
287        // Periodic progress events
288        if self.progress_interval > 0 && seq > 0 && seq.is_multiple_of(self.progress_interval) {
289            let elapsed = self.start_time.elapsed();
290            let rate = if elapsed.as_secs_f64() > 0.0 {
291                seq as f64 / elapsed.as_secs_f64()
292            } else {
293                0.0
294            };
295            let progress = serde_json::json!({
296                "type": "_progress",
297                "items_emitted": seq,
298                "rate_actual": (rate * 100.0).round() / 100.0,
299                "elapsed_ms": elapsed.as_millis() as u64,
300            });
301            self.inner.emit("_progress", "StreamProgress", &progress)?;
302        }
303
304        Ok(())
305    }
306
307    fn phase_complete(&self, phase: &str) -> Result<(), StreamError> {
308        self.inner.phase_complete(phase)
309    }
310
311    fn flush(&self) -> Result<(), StreamError> {
312        self.inner.flush()
313    }
314
315    fn stats(&self) -> StreamStats {
316        let mut stats = self.inner.stats();
317        stats.items_emitted = self.sequence.load(std::sync::atomic::Ordering::Relaxed);
318        stats
319    }
320}
321
322#[cfg(test)]
323#[allow(clippy::unwrap_used)]
324mod tests {
325    use super::*;
326
327    #[test]
328    fn test_none_pipeline_is_inactive() {
329        let pipeline = StreamPipeline::none();
330        assert!(!pipeline.is_active());
331    }
332
333    #[test]
334    fn test_none_pipeline_emit_is_noop() {
335        let pipeline = StreamPipeline::none();
336        let item = serde_json::json!({"id": "noop"});
337        pipeline.emit("phase", "Type", &item).unwrap();
338        let stats = pipeline.stats();
339        assert_eq!(stats.items_emitted, 0);
340    }
341
342    #[test]
343    fn test_file_pipeline_writes_jsonl() {
344        let tmp = std::env::temp_dir().join("test_stream_pipeline_writes.jsonl");
345        let pipeline = StreamPipeline::new(StreamTarget::File { path: tmp.clone() }).unwrap();
346        assert!(pipeline.is_active());
347        let item = serde_json::json!({"id": "test-001", "amount": 100.0});
348        pipeline
349            .emit("journal_entries", "JournalEntry", &item)
350            .unwrap();
351        pipeline.flush().unwrap();
352        let content = std::fs::read_to_string(&tmp).unwrap();
353        assert!(content.contains("test-001"));
354        assert!(content.contains("journal_entries"));
355        assert!(content.contains("JournalEntry"));
356        let _ = std::fs::remove_file(&tmp);
357    }
358
359    #[test]
360    fn test_stats_increment() {
361        let tmp = std::env::temp_dir().join("test_stream_pipeline_stats.jsonl");
362        let pipeline = StreamPipeline::new(StreamTarget::File { path: tmp.clone() }).unwrap();
363        let item = serde_json::json!({"id": 1});
364        pipeline.emit("phase1", "Item", &item).unwrap();
365        pipeline.emit("phase1", "Item", &item).unwrap();
366        pipeline.phase_complete("phase1").unwrap();
367        let stats = pipeline.stats();
368        assert_eq!(stats.items_emitted, 2);
369        assert_eq!(stats.phases_completed, 1);
370        assert!(stats.bytes_sent > 0);
371        let _ = std::fs::remove_file(&tmp);
372    }
373
374    #[test]
375    fn test_multiple_phases() {
376        let tmp = std::env::temp_dir().join("test_stream_pipeline_phases.jsonl");
377        let pipeline = StreamPipeline::new(StreamTarget::File { path: tmp.clone() }).unwrap();
378        let item = serde_json::json!({"id": 1});
379        pipeline.emit("phase1", "A", &item).unwrap();
380        pipeline.phase_complete("phase1").unwrap();
381        pipeline.emit("phase2", "B", &item).unwrap();
382        pipeline.phase_complete("phase2").unwrap();
383        let stats = pipeline.stats();
384        assert_eq!(stats.items_emitted, 2);
385        assert_eq!(stats.phases_completed, 2);
386        let _ = std::fs::remove_file(&tmp);
387    }
388
389    #[test]
390    fn test_file_output_is_valid_jsonl() {
391        let tmp = std::env::temp_dir().join("test_stream_pipeline_valid_jsonl.jsonl");
392        let pipeline = StreamPipeline::new(StreamTarget::File { path: tmp.clone() }).unwrap();
393        let item1 = serde_json::json!({"id": "a"});
394        let item2 = serde_json::json!({"id": "b"});
395        pipeline.emit("p", "T", &item1).unwrap();
396        pipeline.emit("p", "T", &item2).unwrap();
397        pipeline.flush().unwrap();
398        let content = std::fs::read_to_string(&tmp).unwrap();
399        for line in content.lines() {
400            let parsed: serde_json::Value =
401                serde_json::from_str(line).expect("each line should be valid JSON");
402            assert!(parsed.get("phase").is_some());
403            assert!(parsed.get("item_type").is_some());
404            assert!(parsed.get("data").is_some());
405        }
406        let _ = std::fs::remove_file(&tmp);
407    }
408
409    #[test]
410    fn test_backpressure_strategy_default() {
411        let strategy = BackpressureStrategy::default();
412        assert!(matches!(strategy, BackpressureStrategy::Block));
413    }
414
415    /// A mock PhaseSink that records all emitted items for testing.
416    pub struct MockPhaseSink {
417        pub items: Mutex<Vec<(String, String, serde_json::Value)>>,
418        pub completed_phases: Mutex<Vec<String>>,
419        pub flushed: Mutex<bool>,
420    }
421
422    impl MockPhaseSink {
423        pub fn new() -> Self {
424            Self {
425                items: Mutex::new(Vec::new()),
426                completed_phases: Mutex::new(Vec::new()),
427                flushed: Mutex::new(false),
428            }
429        }
430    }
431
432    impl PhaseSink for MockPhaseSink {
433        fn emit(
434            &self,
435            phase: &str,
436            item_type: &str,
437            item: &serde_json::Value,
438        ) -> Result<(), StreamError> {
439            self.items.lock().unwrap().push((
440                phase.to_string(),
441                item_type.to_string(),
442                item.clone(),
443            ));
444            Ok(())
445        }
446
447        fn phase_complete(&self, phase: &str) -> Result<(), StreamError> {
448            self.completed_phases
449                .lock()
450                .unwrap()
451                .push(phase.to_string());
452            Ok(())
453        }
454
455        fn flush(&self) -> Result<(), StreamError> {
456            *self.flushed.lock().unwrap() = true;
457            Ok(())
458        }
459
460        fn stats(&self) -> StreamStats {
461            let items = self.items.lock().unwrap();
462            let phases = self.completed_phases.lock().unwrap();
463            StreamStats {
464                items_emitted: items.len() as u64,
465                phases_completed: phases.len() as u64,
466                bytes_sent: 0,
467                errors: 0,
468            }
469        }
470    }
471
472    #[test]
473    fn test_mock_phase_sink_records_emissions() {
474        let mock = MockPhaseSink::new();
475        let item1 = serde_json::json!({"id": "V001", "name": "Acme Corp"});
476        let item2 = serde_json::json!({"id": "V002", "name": "Global Parts"});
477        mock.emit("master_data", "Vendor", &item1).unwrap();
478        mock.emit("master_data", "Vendor", &item2).unwrap();
479        mock.phase_complete("master_data").unwrap();
480
481        let items = mock.items.lock().unwrap();
482        assert_eq!(items.len(), 2);
483        assert_eq!(items[0].0, "master_data");
484        assert_eq!(items[0].1, "Vendor");
485        assert_eq!(items[1].2["name"], "Global Parts");
486
487        let phases = mock.completed_phases.lock().unwrap();
488        assert_eq!(phases.len(), 1);
489        assert_eq!(phases[0], "master_data");
490    }
491
492    #[test]
493    fn test_mock_phase_sink_multi_phase_emission() {
494        let mock = MockPhaseSink::new();
495        let je = serde_json::json!({"entry_id": "JE-001"});
496        let anomaly = serde_json::json!({"label": "DuplicateEntry"});
497
498        mock.emit("journal_entries", "JournalEntry", &je).unwrap();
499        mock.phase_complete("journal_entries").unwrap();
500        mock.emit("anomaly_injection", "LabeledAnomaly", &anomaly)
501            .unwrap();
502        mock.phase_complete("anomaly_injection").unwrap();
503        mock.flush().unwrap();
504
505        let stats = mock.stats();
506        assert_eq!(stats.items_emitted, 2);
507        assert_eq!(stats.phases_completed, 2);
508        assert!(*mock.flushed.lock().unwrap());
509
510        let items = mock.items.lock().unwrap();
511        // Verify items from different phases are properly tagged
512        assert_eq!(items[0].0, "journal_entries");
513        assert_eq!(items[1].0, "anomaly_injection");
514    }
515
516    #[test]
517    fn test_rate_limited_pipeline_emits_and_tracks_sequence() {
518        let mock = MockPhaseSink::new();
519        let pipeline = RateLimitedPipeline::new(
520            Box::new(mock),
521            0.0, // unlimited
522            100,
523            0, // no progress events
524        );
525        let item = serde_json::json!({"id": "test"});
526        pipeline.emit("phase", "Type", &item).unwrap();
527        pipeline.emit("phase", "Type", &item).unwrap();
528        pipeline.emit("phase", "Type", &item).unwrap();
529
530        let stats = pipeline.stats();
531        assert_eq!(stats.items_emitted, 3);
532    }
533
534    #[test]
535    fn test_rate_limited_pipeline_emits_progress() {
536        let mock = MockPhaseSink::new();
537        let pipeline = RateLimitedPipeline::new(
538            Box::new(mock),
539            0.0, // unlimited
540            100,
541            5, // progress every 5 items
542        );
543        let item = serde_json::json!({"id": "test"});
544        for _ in 0..10 {
545            pipeline.emit("phase", "Type", &item).unwrap();
546        }
547
548        let stats = pipeline.stats();
549        // Outer sequence counter reports 10 (progress events forwarded to inner sink, not counted)
550        assert_eq!(stats.items_emitted, 10);
551    }
552
553    #[test]
554    fn test_rate_limited_pipeline_respects_rate() {
555        let mock = MockPhaseSink::new();
556        let pipeline = RateLimitedPipeline::new(
557            Box::new(mock),
558            100.0, // 100 events/sec
559            10,
560            0,
561        );
562        let item = serde_json::json!({"id": "test"});
563        let start = Instant::now();
564        // Emit 15 items at 100/sec with burst=10 — first 10 are instant, next 5 take ~50ms
565        for _ in 0..15 {
566            pipeline.emit("phase", "Type", &item).unwrap();
567        }
568        let elapsed = start.elapsed();
569        // Should take at least 40ms (5 items beyond burst at 10ms each)
570        assert!(
571            elapsed.as_millis() >= 30,
572            "expected rate limiting, got {:?}",
573            elapsed
574        );
575    }
576
577    #[test]
578    fn test_rate_limited_pipeline_dynamic_rate_change() {
579        let mock = MockPhaseSink::new();
580        let pipeline = RateLimitedPipeline::new(
581            Box::new(mock),
582            0.0, // start unlimited
583            100,
584            0,
585        );
586        let item = serde_json::json!({"id": "test"});
587        pipeline.emit("phase", "Type", &item).unwrap();
588
589        // Change to limited
590        pipeline.set_rate(50.0);
591
592        // Still works
593        pipeline.emit("phase", "Type", &item).unwrap();
594        assert_eq!(pipeline.stats().items_emitted, 2);
595    }
596}