Skip to main content

datasynth_runtime/
stream_pipeline.rs

1//! Phase-aware streaming pipeline for real-time data emission.
2//!
3//! [`StreamPipeline`] implements the [`PhaseSink`] trait, allowing generated
4//! data to be streamed to files or HTTP endpoints as it is produced, rather
5//! than buffering everything in memory.
6
7use std::path::PathBuf;
8use std::sync::{Arc, Mutex};
9
10/// Trait for sinks that receive generated items phase-by-phase.
11pub trait PhaseSink: Send + Sync {
12    /// Emit a single generated item.
13    fn emit(
14        &self,
15        phase: &str,
16        item_type: &str,
17        item: &serde_json::Value,
18    ) -> Result<(), StreamError>;
19
20    /// Signal that a generation phase has completed.
21    fn phase_complete(&self, phase: &str) -> Result<(), StreamError>;
22
23    /// Flush any buffered data to the underlying sink.
24    fn flush(&self) -> Result<(), StreamError>;
25
26    /// Return current streaming statistics.
27    fn stats(&self) -> StreamStats;
28}
29
30/// Accumulated statistics for a streaming pipeline.
31#[derive(Debug, Clone, Default)]
32pub struct StreamStats {
33    /// Total items emitted across all phases.
34    pub items_emitted: u64,
35    /// Total bytes written/sent.
36    pub bytes_sent: u64,
37    /// Number of errors encountered.
38    pub errors: u64,
39    /// Number of phases that have completed.
40    pub phases_completed: u64,
41}
42
43/// Errors that can occur during streaming.
44#[derive(Debug, thiserror::Error)]
45pub enum StreamError {
46    /// An I/O error occurred.
47    #[error("IO error: {0}")]
48    Io(#[from] std::io::Error),
49
50    /// Serialization failed.
51    #[error("Serialization error: {0}")]
52    Serialization(String),
53
54    /// Connection to the remote endpoint failed or was lost.
55    #[error("Connection error: {0}")]
56    Connection(String),
57
58    /// The internal buffer is full and the backpressure strategy is to reject.
59    #[error("Backpressure: buffer full")]
60    BackpressureFull,
61}
62
63/// Where the stream sends its data.
64#[derive(Debug, Clone)]
65pub enum StreamTarget {
66    /// Send JSONL to an HTTP endpoint.
67    Http {
68        /// Target URL.
69        url: String,
70        /// Optional API key for authentication.
71        api_key: Option<String>,
72        /// Number of items to batch before sending.
73        batch_size: usize,
74    },
75    /// Append JSONL to a local file.
76    File {
77        /// Path to the output file.
78        path: PathBuf,
79    },
80    /// Discard all output (no-op sink).
81    None,
82}
83
84/// Strategy for handling back-pressure when the sink cannot keep up.
85#[derive(Debug, Clone, Default)]
86pub enum BackpressureStrategy {
87    /// Block the producer until the sink is ready.
88    #[default]
89    Block,
90    /// Drop the oldest buffered items to make room.
91    DropOldest,
92    /// Buffer up to `max_items` before applying back-pressure.
93    Buffer {
94        /// Maximum number of items to buffer.
95        max_items: usize,
96    },
97}
98
99/// A streaming pipeline that writes generated data as JSONL envelopes.
100pub struct StreamPipeline {
101    target: StreamTarget,
102    stats: Arc<Mutex<StreamStats>>,
103    writer: Mutex<Option<Box<dyn std::io::Write + Send>>>,
104}
105
106impl StreamPipeline {
107    /// Create a new pipeline for the given target.
108    pub fn new(target: StreamTarget) -> Result<Self, StreamError> {
109        let writer: Option<Box<dyn std::io::Write + Send>> = match &target {
110            StreamTarget::File { path } => {
111                let file = std::fs::File::create(path)?;
112                Some(Box::new(std::io::BufWriter::new(file)))
113            }
114            StreamTarget::Http { .. } => None,
115            StreamTarget::None => None,
116        };
117        Ok(Self {
118            target,
119            stats: Arc::new(Mutex::new(StreamStats::default())),
120            writer: Mutex::new(writer),
121        })
122    }
123
124    /// Create a no-op pipeline that discards all output.
125    pub fn none() -> Self {
126        Self {
127            target: StreamTarget::None,
128            stats: Arc::new(Mutex::new(StreamStats::default())),
129            writer: Mutex::new(None),
130        }
131    }
132
133    /// Returns `true` if this pipeline will actually emit data.
134    pub fn is_active(&self) -> bool {
135        !matches!(self.target, StreamTarget::None)
136    }
137}
138
139impl PhaseSink for StreamPipeline {
140    fn emit(
141        &self,
142        phase: &str,
143        item_type: &str,
144        item: &serde_json::Value,
145    ) -> Result<(), StreamError> {
146        if !self.is_active() {
147            return Ok(());
148        }
149
150        let envelope = serde_json::json!({
151            "phase": phase,
152            "item_type": item_type,
153            "data": item,
154        });
155        let json = serde_json::to_string(&envelope)
156            .map_err(|e| StreamError::Serialization(e.to_string()))?;
157        let bytes = json.len() as u64 + 1; // +1 for newline
158
159        if let Ok(mut writer_guard) = self.writer.lock() {
160            if let Some(writer) = writer_guard.as_mut() {
161                use std::io::Write;
162                writeln!(writer, "{json}")?;
163            }
164        }
165
166        if let Ok(mut stats) = self.stats.lock() {
167            stats.items_emitted += 1;
168            stats.bytes_sent += bytes;
169        }
170        Ok(())
171    }
172
173    fn phase_complete(&self, _phase: &str) -> Result<(), StreamError> {
174        if let Ok(mut stats) = self.stats.lock() {
175            stats.phases_completed += 1;
176        }
177        self.flush()
178    }
179
180    fn flush(&self) -> Result<(), StreamError> {
181        if let Ok(mut writer_guard) = self.writer.lock() {
182            if let Some(writer) = writer_guard.as_mut() {
183                use std::io::Write;
184                writer.flush()?;
185            }
186        }
187        Ok(())
188    }
189
190    fn stats(&self) -> StreamStats {
191        self.stats.lock().map(|s| s.clone()).unwrap_or_default()
192    }
193}
194
195#[cfg(test)]
196#[allow(clippy::unwrap_used)]
197mod tests {
198    use super::*;
199
200    #[test]
201    fn test_none_pipeline_is_inactive() {
202        let pipeline = StreamPipeline::none();
203        assert!(!pipeline.is_active());
204    }
205
206    #[test]
207    fn test_none_pipeline_emit_is_noop() {
208        let pipeline = StreamPipeline::none();
209        let item = serde_json::json!({"id": "noop"});
210        pipeline.emit("phase", "Type", &item).unwrap();
211        let stats = pipeline.stats();
212        assert_eq!(stats.items_emitted, 0);
213    }
214
215    #[test]
216    fn test_file_pipeline_writes_jsonl() {
217        let tmp = std::env::temp_dir().join("test_stream_pipeline_writes.jsonl");
218        let pipeline = StreamPipeline::new(StreamTarget::File { path: tmp.clone() }).unwrap();
219        assert!(pipeline.is_active());
220        let item = serde_json::json!({"id": "test-001", "amount": 100.0});
221        pipeline
222            .emit("journal_entries", "JournalEntry", &item)
223            .unwrap();
224        pipeline.flush().unwrap();
225        let content = std::fs::read_to_string(&tmp).unwrap();
226        assert!(content.contains("test-001"));
227        assert!(content.contains("journal_entries"));
228        assert!(content.contains("JournalEntry"));
229        let _ = std::fs::remove_file(&tmp);
230    }
231
232    #[test]
233    fn test_stats_increment() {
234        let tmp = std::env::temp_dir().join("test_stream_pipeline_stats.jsonl");
235        let pipeline = StreamPipeline::new(StreamTarget::File { path: tmp.clone() }).unwrap();
236        let item = serde_json::json!({"id": 1});
237        pipeline.emit("phase1", "Item", &item).unwrap();
238        pipeline.emit("phase1", "Item", &item).unwrap();
239        pipeline.phase_complete("phase1").unwrap();
240        let stats = pipeline.stats();
241        assert_eq!(stats.items_emitted, 2);
242        assert_eq!(stats.phases_completed, 1);
243        assert!(stats.bytes_sent > 0);
244        let _ = std::fs::remove_file(&tmp);
245    }
246
247    #[test]
248    fn test_multiple_phases() {
249        let tmp = std::env::temp_dir().join("test_stream_pipeline_phases.jsonl");
250        let pipeline = StreamPipeline::new(StreamTarget::File { path: tmp.clone() }).unwrap();
251        let item = serde_json::json!({"id": 1});
252        pipeline.emit("phase1", "A", &item).unwrap();
253        pipeline.phase_complete("phase1").unwrap();
254        pipeline.emit("phase2", "B", &item).unwrap();
255        pipeline.phase_complete("phase2").unwrap();
256        let stats = pipeline.stats();
257        assert_eq!(stats.items_emitted, 2);
258        assert_eq!(stats.phases_completed, 2);
259        let _ = std::fs::remove_file(&tmp);
260    }
261
262    #[test]
263    fn test_file_output_is_valid_jsonl() {
264        let tmp = std::env::temp_dir().join("test_stream_pipeline_valid_jsonl.jsonl");
265        let pipeline = StreamPipeline::new(StreamTarget::File { path: tmp.clone() }).unwrap();
266        let item1 = serde_json::json!({"id": "a"});
267        let item2 = serde_json::json!({"id": "b"});
268        pipeline.emit("p", "T", &item1).unwrap();
269        pipeline.emit("p", "T", &item2).unwrap();
270        pipeline.flush().unwrap();
271        let content = std::fs::read_to_string(&tmp).unwrap();
272        for line in content.lines() {
273            let parsed: serde_json::Value =
274                serde_json::from_str(line).expect("each line should be valid JSON");
275            assert!(parsed.get("phase").is_some());
276            assert!(parsed.get("item_type").is_some());
277            assert!(parsed.get("data").is_some());
278        }
279        let _ = std::fs::remove_file(&tmp);
280    }
281
282    #[test]
283    fn test_backpressure_strategy_default() {
284        let strategy = BackpressureStrategy::default();
285        assert!(matches!(strategy, BackpressureStrategy::Block));
286    }
287
288    /// A mock PhaseSink that records all emitted items for testing.
289    pub struct MockPhaseSink {
290        pub items: Mutex<Vec<(String, String, serde_json::Value)>>,
291        pub completed_phases: Mutex<Vec<String>>,
292        pub flushed: Mutex<bool>,
293    }
294
295    impl MockPhaseSink {
296        pub fn new() -> Self {
297            Self {
298                items: Mutex::new(Vec::new()),
299                completed_phases: Mutex::new(Vec::new()),
300                flushed: Mutex::new(false),
301            }
302        }
303    }
304
305    impl PhaseSink for MockPhaseSink {
306        fn emit(
307            &self,
308            phase: &str,
309            item_type: &str,
310            item: &serde_json::Value,
311        ) -> Result<(), StreamError> {
312            self.items.lock().unwrap().push((
313                phase.to_string(),
314                item_type.to_string(),
315                item.clone(),
316            ));
317            Ok(())
318        }
319
320        fn phase_complete(&self, phase: &str) -> Result<(), StreamError> {
321            self.completed_phases
322                .lock()
323                .unwrap()
324                .push(phase.to_string());
325            Ok(())
326        }
327
328        fn flush(&self) -> Result<(), StreamError> {
329            *self.flushed.lock().unwrap() = true;
330            Ok(())
331        }
332
333        fn stats(&self) -> StreamStats {
334            let items = self.items.lock().unwrap();
335            let phases = self.completed_phases.lock().unwrap();
336            StreamStats {
337                items_emitted: items.len() as u64,
338                phases_completed: phases.len() as u64,
339                bytes_sent: 0,
340                errors: 0,
341            }
342        }
343    }
344
345    #[test]
346    fn test_mock_phase_sink_records_emissions() {
347        let mock = MockPhaseSink::new();
348        let item1 = serde_json::json!({"id": "V001", "name": "Acme Corp"});
349        let item2 = serde_json::json!({"id": "V002", "name": "Global Parts"});
350        mock.emit("master_data", "Vendor", &item1).unwrap();
351        mock.emit("master_data", "Vendor", &item2).unwrap();
352        mock.phase_complete("master_data").unwrap();
353
354        let items = mock.items.lock().unwrap();
355        assert_eq!(items.len(), 2);
356        assert_eq!(items[0].0, "master_data");
357        assert_eq!(items[0].1, "Vendor");
358        assert_eq!(items[1].2["name"], "Global Parts");
359
360        let phases = mock.completed_phases.lock().unwrap();
361        assert_eq!(phases.len(), 1);
362        assert_eq!(phases[0], "master_data");
363    }
364
365    #[test]
366    fn test_mock_phase_sink_multi_phase_emission() {
367        let mock = MockPhaseSink::new();
368        let je = serde_json::json!({"entry_id": "JE-001"});
369        let anomaly = serde_json::json!({"label": "DuplicateEntry"});
370
371        mock.emit("journal_entries", "JournalEntry", &je).unwrap();
372        mock.phase_complete("journal_entries").unwrap();
373        mock.emit("anomaly_injection", "LabeledAnomaly", &anomaly)
374            .unwrap();
375        mock.phase_complete("anomaly_injection").unwrap();
376        mock.flush().unwrap();
377
378        let stats = mock.stats();
379        assert_eq!(stats.items_emitted, 2);
380        assert_eq!(stats.phases_completed, 2);
381        assert!(*mock.flushed.lock().unwrap());
382
383        let items = mock.items.lock().unwrap();
384        // Verify items from different phases are properly tagged
385        assert_eq!(items[0].0, "journal_entries");
386        assert_eq!(items[1].0, "anomaly_injection");
387    }
388}