Skip to main content

datasynth_runtime/
stream_pipeline.rs

1//! Phase-aware streaming pipeline for real-time data emission.
2//!
3//! [`StreamPipeline`] implements the [`PhaseSink`] trait, allowing generated
4//! data to be streamed to files or HTTP endpoints as it is produced, rather
5//! than buffering everything in memory.
6
7use std::path::PathBuf;
8use std::sync::{Arc, Mutex};
9
10/// Trait for sinks that receive generated items phase-by-phase.
11pub trait PhaseSink: Send + Sync {
12    /// Emit a single generated item.
13    fn emit(
14        &self,
15        phase: &str,
16        item_type: &str,
17        item: &serde_json::Value,
18    ) -> Result<(), StreamError>;
19
20    /// Signal that a generation phase has completed.
21    fn phase_complete(&self, phase: &str) -> Result<(), StreamError>;
22
23    /// Flush any buffered data to the underlying sink.
24    fn flush(&self) -> Result<(), StreamError>;
25
26    /// Return current streaming statistics.
27    fn stats(&self) -> StreamStats;
28}
29
30/// Accumulated statistics for a streaming pipeline.
31#[derive(Debug, Clone, Default)]
32pub struct StreamStats {
33    /// Total items emitted across all phases.
34    pub items_emitted: u64,
35    /// Total bytes written/sent.
36    pub bytes_sent: u64,
37    /// Number of errors encountered.
38    pub errors: u64,
39    /// Number of phases that have completed.
40    pub phases_completed: u64,
41}
42
43/// Errors that can occur during streaming.
44#[derive(Debug, thiserror::Error)]
45pub enum StreamError {
46    /// An I/O error occurred.
47    #[error("IO error: {0}")]
48    Io(#[from] std::io::Error),
49
50    /// Serialization failed.
51    #[error("Serialization error: {0}")]
52    Serialization(String),
53
54    /// Connection to the remote endpoint failed or was lost.
55    #[error("Connection error: {0}")]
56    Connection(String),
57
58    /// The internal buffer is full and the backpressure strategy is to reject.
59    #[error("Backpressure: buffer full")]
60    BackpressureFull,
61}
62
63/// Where the stream sends its data.
64#[derive(Debug, Clone)]
65pub enum StreamTarget {
66    /// Send JSONL to an HTTP endpoint.
67    Http {
68        /// Target URL.
69        url: String,
70        /// Optional API key for authentication.
71        api_key: Option<String>,
72        /// Number of items to batch before sending.
73        batch_size: usize,
74    },
75    /// Append JSONL to a local file.
76    File {
77        /// Path to the output file.
78        path: PathBuf,
79    },
80    /// Discard all output (no-op sink).
81    None,
82}
83
84/// Strategy for handling back-pressure when the sink cannot keep up.
85#[derive(Debug, Clone, Default)]
86pub enum BackpressureStrategy {
87    /// Block the producer until the sink is ready.
88    #[default]
89    Block,
90    /// Drop the oldest buffered items to make room.
91    DropOldest,
92    /// Buffer up to `max_items` before applying back-pressure.
93    Buffer {
94        /// Maximum number of items to buffer.
95        max_items: usize,
96    },
97}
98
99/// A streaming pipeline that writes generated data as JSONL envelopes.
100pub struct StreamPipeline {
101    target: StreamTarget,
102    stats: Arc<Mutex<StreamStats>>,
103    writer: Mutex<Option<Box<dyn std::io::Write + Send>>>,
104}
105
106impl StreamPipeline {
107    /// Create a new pipeline for the given target.
108    pub fn new(target: StreamTarget) -> Result<Self, StreamError> {
109        let writer: Option<Box<dyn std::io::Write + Send>> = match &target {
110            StreamTarget::File { path } => {
111                let file = std::fs::File::create(path)?;
112                Some(Box::new(std::io::BufWriter::new(file)))
113            }
114            StreamTarget::Http { .. } => None,
115            StreamTarget::None => None,
116        };
117        Ok(Self {
118            target,
119            stats: Arc::new(Mutex::new(StreamStats::default())),
120            writer: Mutex::new(writer),
121        })
122    }
123
124    /// Create a no-op pipeline that discards all output.
125    pub fn none() -> Self {
126        Self {
127            target: StreamTarget::None,
128            stats: Arc::new(Mutex::new(StreamStats::default())),
129            writer: Mutex::new(None),
130        }
131    }
132
133    /// Returns `true` if this pipeline will actually emit data.
134    pub fn is_active(&self) -> bool {
135        !matches!(self.target, StreamTarget::None)
136    }
137}
138
139impl PhaseSink for StreamPipeline {
140    fn emit(
141        &self,
142        phase: &str,
143        item_type: &str,
144        item: &serde_json::Value,
145    ) -> Result<(), StreamError> {
146        if !self.is_active() {
147            return Ok(());
148        }
149
150        let envelope = serde_json::json!({
151            "phase": phase,
152            "item_type": item_type,
153            "data": item,
154        });
155        let json = serde_json::to_string(&envelope)
156            .map_err(|e| StreamError::Serialization(e.to_string()))?;
157        let bytes = json.len() as u64 + 1; // +1 for newline
158
159        if let Ok(mut writer_guard) = self.writer.lock() {
160            if let Some(writer) = writer_guard.as_mut() {
161                use std::io::Write;
162                writeln!(writer, "{}", json)?;
163            }
164        }
165
166        if let Ok(mut stats) = self.stats.lock() {
167            stats.items_emitted += 1;
168            stats.bytes_sent += bytes;
169        }
170        Ok(())
171    }
172
173    fn phase_complete(&self, _phase: &str) -> Result<(), StreamError> {
174        if let Ok(mut stats) = self.stats.lock() {
175            stats.phases_completed += 1;
176        }
177        self.flush()
178    }
179
180    fn flush(&self) -> Result<(), StreamError> {
181        if let Ok(mut writer_guard) = self.writer.lock() {
182            if let Some(writer) = writer_guard.as_mut() {
183                use std::io::Write;
184                writer.flush()?;
185            }
186        }
187        Ok(())
188    }
189
190    fn stats(&self) -> StreamStats {
191        self.stats.lock().map(|s| s.clone()).unwrap_or_default()
192    }
193}
194
195#[cfg(test)]
196mod tests {
197    use super::*;
198
199    #[test]
200    fn test_none_pipeline_is_inactive() {
201        let pipeline = StreamPipeline::none();
202        assert!(!pipeline.is_active());
203    }
204
205    #[test]
206    fn test_none_pipeline_emit_is_noop() {
207        let pipeline = StreamPipeline::none();
208        let item = serde_json::json!({"id": "noop"});
209        pipeline.emit("phase", "Type", &item).unwrap();
210        let stats = pipeline.stats();
211        assert_eq!(stats.items_emitted, 0);
212    }
213
214    #[test]
215    fn test_file_pipeline_writes_jsonl() {
216        let tmp = std::env::temp_dir().join("test_stream_pipeline_writes.jsonl");
217        let pipeline = StreamPipeline::new(StreamTarget::File { path: tmp.clone() }).unwrap();
218        assert!(pipeline.is_active());
219        let item = serde_json::json!({"id": "test-001", "amount": 100.0});
220        pipeline
221            .emit("journal_entries", "JournalEntry", &item)
222            .unwrap();
223        pipeline.flush().unwrap();
224        let content = std::fs::read_to_string(&tmp).unwrap();
225        assert!(content.contains("test-001"));
226        assert!(content.contains("journal_entries"));
227        assert!(content.contains("JournalEntry"));
228        let _ = std::fs::remove_file(&tmp);
229    }
230
231    #[test]
232    fn test_stats_increment() {
233        let tmp = std::env::temp_dir().join("test_stream_pipeline_stats.jsonl");
234        let pipeline = StreamPipeline::new(StreamTarget::File { path: tmp.clone() }).unwrap();
235        let item = serde_json::json!({"id": 1});
236        pipeline.emit("phase1", "Item", &item).unwrap();
237        pipeline.emit("phase1", "Item", &item).unwrap();
238        pipeline.phase_complete("phase1").unwrap();
239        let stats = pipeline.stats();
240        assert_eq!(stats.items_emitted, 2);
241        assert_eq!(stats.phases_completed, 1);
242        assert!(stats.bytes_sent > 0);
243        let _ = std::fs::remove_file(&tmp);
244    }
245
246    #[test]
247    fn test_multiple_phases() {
248        let tmp = std::env::temp_dir().join("test_stream_pipeline_phases.jsonl");
249        let pipeline = StreamPipeline::new(StreamTarget::File { path: tmp.clone() }).unwrap();
250        let item = serde_json::json!({"id": 1});
251        pipeline.emit("phase1", "A", &item).unwrap();
252        pipeline.phase_complete("phase1").unwrap();
253        pipeline.emit("phase2", "B", &item).unwrap();
254        pipeline.phase_complete("phase2").unwrap();
255        let stats = pipeline.stats();
256        assert_eq!(stats.items_emitted, 2);
257        assert_eq!(stats.phases_completed, 2);
258        let _ = std::fs::remove_file(&tmp);
259    }
260
261    #[test]
262    fn test_file_output_is_valid_jsonl() {
263        let tmp = std::env::temp_dir().join("test_stream_pipeline_valid_jsonl.jsonl");
264        let pipeline = StreamPipeline::new(StreamTarget::File { path: tmp.clone() }).unwrap();
265        let item1 = serde_json::json!({"id": "a"});
266        let item2 = serde_json::json!({"id": "b"});
267        pipeline.emit("p", "T", &item1).unwrap();
268        pipeline.emit("p", "T", &item2).unwrap();
269        pipeline.flush().unwrap();
270        let content = std::fs::read_to_string(&tmp).unwrap();
271        for line in content.lines() {
272            let parsed: serde_json::Value =
273                serde_json::from_str(line).expect("each line should be valid JSON");
274            assert!(parsed.get("phase").is_some());
275            assert!(parsed.get("item_type").is_some());
276            assert!(parsed.get("data").is_some());
277        }
278        let _ = std::fs::remove_file(&tmp);
279    }
280
281    #[test]
282    fn test_backpressure_strategy_default() {
283        let strategy = BackpressureStrategy::default();
284        assert!(matches!(strategy, BackpressureStrategy::Block));
285    }
286
287    /// A mock PhaseSink that records all emitted items for testing.
288    pub struct MockPhaseSink {
289        pub items: Mutex<Vec<(String, String, serde_json::Value)>>,
290        pub completed_phases: Mutex<Vec<String>>,
291        pub flushed: Mutex<bool>,
292    }
293
294    impl MockPhaseSink {
295        pub fn new() -> Self {
296            Self {
297                items: Mutex::new(Vec::new()),
298                completed_phases: Mutex::new(Vec::new()),
299                flushed: Mutex::new(false),
300            }
301        }
302    }
303
304    impl PhaseSink for MockPhaseSink {
305        fn emit(
306            &self,
307            phase: &str,
308            item_type: &str,
309            item: &serde_json::Value,
310        ) -> Result<(), StreamError> {
311            self.items.lock().unwrap().push((
312                phase.to_string(),
313                item_type.to_string(),
314                item.clone(),
315            ));
316            Ok(())
317        }
318
319        fn phase_complete(&self, phase: &str) -> Result<(), StreamError> {
320            self.completed_phases
321                .lock()
322                .unwrap()
323                .push(phase.to_string());
324            Ok(())
325        }
326
327        fn flush(&self) -> Result<(), StreamError> {
328            *self.flushed.lock().unwrap() = true;
329            Ok(())
330        }
331
332        fn stats(&self) -> StreamStats {
333            let items = self.items.lock().unwrap();
334            let phases = self.completed_phases.lock().unwrap();
335            StreamStats {
336                items_emitted: items.len() as u64,
337                phases_completed: phases.len() as u64,
338                bytes_sent: 0,
339                errors: 0,
340            }
341        }
342    }
343
344    #[test]
345    fn test_mock_phase_sink_records_emissions() {
346        let mock = MockPhaseSink::new();
347        let item1 = serde_json::json!({"id": "V001", "name": "Acme Corp"});
348        let item2 = serde_json::json!({"id": "V002", "name": "Global Parts"});
349        mock.emit("master_data", "Vendor", &item1).unwrap();
350        mock.emit("master_data", "Vendor", &item2).unwrap();
351        mock.phase_complete("master_data").unwrap();
352
353        let items = mock.items.lock().unwrap();
354        assert_eq!(items.len(), 2);
355        assert_eq!(items[0].0, "master_data");
356        assert_eq!(items[0].1, "Vendor");
357        assert_eq!(items[1].2["name"], "Global Parts");
358
359        let phases = mock.completed_phases.lock().unwrap();
360        assert_eq!(phases.len(), 1);
361        assert_eq!(phases[0], "master_data");
362    }
363
364    #[test]
365    fn test_mock_phase_sink_multi_phase_emission() {
366        let mock = MockPhaseSink::new();
367        let je = serde_json::json!({"entry_id": "JE-001"});
368        let anomaly = serde_json::json!({"label": "DuplicateEntry"});
369
370        mock.emit("journal_entries", "JournalEntry", &je).unwrap();
371        mock.phase_complete("journal_entries").unwrap();
372        mock.emit("anomaly_injection", "LabeledAnomaly", &anomaly)
373            .unwrap();
374        mock.phase_complete("anomaly_injection").unwrap();
375        mock.flush().unwrap();
376
377        let stats = mock.stats();
378        assert_eq!(stats.items_emitted, 2);
379        assert_eq!(stats.phases_completed, 2);
380        assert!(*mock.flushed.lock().unwrap());
381
382        let items = mock.items.lock().unwrap();
383        // Verify items from different phases are properly tagged
384        assert_eq!(items[0].0, "journal_entries");
385        assert_eq!(items[1].0, "anomaly_injection");
386    }
387}