Skip to main content

datasynth_core/traits/
streaming.rs

1//! Streaming traits for real-time data generation.
2//!
3//! This module provides traits for streaming generation with backpressure,
4//! progress reporting, and cancellation support.
5
6use std::fmt;
7use std::time::Duration;
8
9use serde::{Deserialize, Serialize};
10
11use crate::error::SynthResult;
12
13/// Events emitted during streaming generation.
14#[derive(Debug, Clone)]
15pub enum StreamEvent<T> {
16    /// A data item was generated.
17    Data(T),
18    /// Progress update.
19    Progress(StreamProgress),
20    /// A batch of items was completed.
21    BatchComplete {
22        /// Batch identifier.
23        batch_id: u64,
24        /// Number of items in the batch.
25        count: usize,
26    },
27    /// An error occurred (non-fatal, generation continues).
28    Error(StreamError),
29    /// Generation is complete.
30    Complete(StreamSummary),
31}
32
33impl<T> StreamEvent<T> {
34    /// Returns true if this is a data event.
35    pub fn is_data(&self) -> bool {
36        matches!(self, StreamEvent::Data(_))
37    }
38
39    /// Returns true if this is a completion event.
40    pub fn is_complete(&self) -> bool {
41        matches!(self, StreamEvent::Complete(_))
42    }
43
44    /// Returns true if this is an error event.
45    pub fn is_error(&self) -> bool {
46        matches!(self, StreamEvent::Error(_))
47    }
48
49    /// Extracts data from a Data event.
50    pub fn into_data(self) -> Option<T> {
51        match self {
52            StreamEvent::Data(data) => Some(data),
53            _ => None,
54        }
55    }
56}
57
58/// Progress information during streaming.
59#[derive(Debug, Clone, Serialize, Deserialize)]
60pub struct StreamProgress {
61    /// Total items generated so far.
62    pub items_generated: u64,
63    /// Generation rate (items per second).
64    pub items_per_second: f64,
65    /// Elapsed time in milliseconds.
66    pub elapsed_ms: u64,
67    /// Current phase/stage name.
68    pub phase: String,
69    /// Memory usage in MB (if available).
70    #[serde(skip_serializing_if = "Option::is_none")]
71    pub memory_usage_mb: Option<u64>,
72    /// Buffer fill level (0.0 to 1.0).
73    #[serde(skip_serializing_if = "Option::is_none")]
74    pub buffer_fill_ratio: Option<f64>,
75    /// Estimated items remaining.
76    #[serde(skip_serializing_if = "Option::is_none")]
77    pub items_remaining: Option<u64>,
78}
79
80impl StreamProgress {
81    /// Creates a new progress tracker.
82    pub fn new(phase: impl Into<String>) -> Self {
83        Self {
84            items_generated: 0,
85            items_per_second: 0.0,
86            elapsed_ms: 0,
87            phase: phase.into(),
88            memory_usage_mb: None,
89            buffer_fill_ratio: None,
90            items_remaining: None,
91        }
92    }
93
94    /// Updates the progress with new values.
95    pub fn update(&mut self, items_generated: u64, elapsed_ms: u64) {
96        self.items_generated = items_generated;
97        self.elapsed_ms = elapsed_ms;
98        if elapsed_ms > 0 {
99            self.items_per_second = (items_generated as f64) / (elapsed_ms as f64 / 1000.0);
100        }
101    }
102
103    /// Calculates estimated time remaining in milliseconds.
104    pub fn eta_ms(&self) -> Option<u64> {
105        self.items_remaining.map(|remaining| {
106            if self.items_per_second > 0.0 {
107                ((remaining as f64 / self.items_per_second) * 1000.0) as u64
108            } else {
109                0
110            }
111        })
112    }
113}
114
115/// Error during streaming (non-fatal).
116#[derive(Debug, Clone, Serialize, Deserialize)]
117pub struct StreamError {
118    /// Error message.
119    pub message: String,
120    /// Error category.
121    pub category: StreamErrorCategory,
122    /// Whether the error is recoverable.
123    pub recoverable: bool,
124    /// Number of items affected.
125    pub items_affected: Option<usize>,
126}
127
128impl StreamError {
129    /// Creates a new stream error.
130    pub fn new(message: impl Into<String>, category: StreamErrorCategory) -> Self {
131        Self {
132            message: message.into(),
133            category,
134            recoverable: true,
135            items_affected: None,
136        }
137    }
138
139    /// Marks this error as non-recoverable.
140    pub fn non_recoverable(mut self) -> Self {
141        self.recoverable = false;
142        self
143    }
144
145    /// Sets the number of affected items.
146    pub fn with_affected_items(mut self, count: usize) -> Self {
147        self.items_affected = Some(count);
148        self
149    }
150}
151
152impl fmt::Display for StreamError {
153    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
154        write!(f, "[{:?}] {}", self.category, self.message)
155    }
156}
157
158impl std::error::Error for StreamError {}
159
160/// Categories of streaming errors.
161#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
162#[serde(rename_all = "snake_case")]
163pub enum StreamErrorCategory {
164    /// Configuration error.
165    Configuration,
166    /// Generation error.
167    Generation,
168    /// Output/sink error.
169    Output,
170    /// Resource exhaustion.
171    Resource,
172    /// Validation error.
173    Validation,
174    /// Network error (for streaming to remote).
175    Network,
176    /// Internal error.
177    Internal,
178}
179
180/// Summary of a completed stream.
181#[derive(Debug, Clone, Serialize, Deserialize)]
182pub struct StreamSummary {
183    /// Total items generated.
184    pub total_items: u64,
185    /// Total time taken in milliseconds.
186    pub total_time_ms: u64,
187    /// Average generation rate (items per second).
188    pub avg_items_per_second: f64,
189    /// Number of errors encountered.
190    pub error_count: u64,
191    /// Number of items dropped due to backpressure.
192    pub dropped_count: u64,
193    /// Peak memory usage in MB.
194    #[serde(skip_serializing_if = "Option::is_none")]
195    pub peak_memory_mb: Option<u64>,
196    /// Generation phases completed.
197    pub phases_completed: Vec<String>,
198}
199
200impl StreamSummary {
201    /// Creates a new stream summary.
202    pub fn new(total_items: u64, total_time_ms: u64) -> Self {
203        let avg_items_per_second = if total_time_ms > 0 {
204            (total_items as f64) / (total_time_ms as f64 / 1000.0)
205        } else {
206            0.0
207        };
208
209        Self {
210            total_items,
211            total_time_ms,
212            avg_items_per_second,
213            error_count: 0,
214            dropped_count: 0,
215            peak_memory_mb: None,
216            phases_completed: Vec::new(),
217        }
218    }
219}
220
221/// Configuration for streaming generation.
222#[derive(Debug, Clone)]
223pub struct StreamConfig {
224    /// Buffer size for the output channel.
225    pub buffer_size: usize,
226    /// Enable progress reporting.
227    pub enable_progress: bool,
228    /// Interval for progress updates (in items).
229    pub progress_interval: u64,
230    /// Backpressure strategy.
231    pub backpressure: BackpressureStrategy,
232    /// Timeout for blocking operations.
233    pub timeout: Option<Duration>,
234    /// Batch size for generation.
235    pub batch_size: usize,
236}
237
238impl Default for StreamConfig {
239    fn default() -> Self {
240        Self {
241            buffer_size: 1000,
242            enable_progress: true,
243            progress_interval: 100,
244            backpressure: BackpressureStrategy::Block,
245            timeout: None,
246            batch_size: 100,
247        }
248    }
249}
250
251/// Backpressure handling strategy.
252#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
253#[serde(rename_all = "snake_case")]
254pub enum BackpressureStrategy {
255    /// Block until space is available in the buffer.
256    #[default]
257    Block,
258    /// Drop the oldest items in the buffer.
259    DropOldest,
260    /// Don't add new items to a full buffer.
261    DropNewest,
262    /// Buffer additional items before blocking.
263    Buffer {
264        /// Maximum overflow buffer size.
265        max_overflow: usize,
266    },
267}
268
269/// Handle for controlling an active stream.
270///
271/// Provides methods to pause, resume, and cancel streaming.
272#[derive(Debug)]
273pub struct StreamControl {
274    /// Whether the stream should be cancelled.
275    cancelled: std::sync::atomic::AtomicBool,
276    /// Whether the stream is paused.
277    paused: std::sync::atomic::AtomicBool,
278}
279
280impl StreamControl {
281    /// Creates a new stream control handle.
282    pub fn new() -> Self {
283        Self {
284            cancelled: std::sync::atomic::AtomicBool::new(false),
285            paused: std::sync::atomic::AtomicBool::new(false),
286        }
287    }
288
289    /// Requests cancellation of the stream.
290    pub fn cancel(&self) {
291        self.cancelled
292            .store(true, std::sync::atomic::Ordering::SeqCst);
293    }
294
295    /// Pauses the stream.
296    pub fn pause(&self) {
297        self.paused.store(true, std::sync::atomic::Ordering::SeqCst);
298    }
299
300    /// Resumes a paused stream.
301    pub fn resume(&self) {
302        self.paused
303            .store(false, std::sync::atomic::Ordering::SeqCst);
304    }
305
306    /// Checks if cancellation has been requested.
307    pub fn is_cancelled(&self) -> bool {
308        self.cancelled.load(std::sync::atomic::Ordering::SeqCst)
309    }
310
311    /// Checks if the stream is paused.
312    pub fn is_paused(&self) -> bool {
313        self.paused.load(std::sync::atomic::Ordering::SeqCst)
314    }
315}
316
317impl Default for StreamControl {
318    fn default() -> Self {
319        Self::new()
320    }
321}
322
323impl Clone for StreamControl {
324    fn clone(&self) -> Self {
325        Self {
326            cancelled: std::sync::atomic::AtomicBool::new(self.is_cancelled()),
327            paused: std::sync::atomic::AtomicBool::new(self.is_paused()),
328        }
329    }
330}
331
332/// Trait for generators that support streaming output.
333///
334/// Extends the basic Generator trait with streaming capabilities,
335/// including backpressure handling and progress reporting.
336#[allow(clippy::type_complexity)]
337pub trait StreamingGenerator {
338    /// The type of items this generator produces.
339    type Item: Clone + Send + 'static;
340
341    /// Starts streaming generation.
342    ///
343    /// Returns a receiver for stream events and a control handle.
344    fn stream(
345        &mut self,
346        config: StreamConfig,
347    ) -> SynthResult<(
348        std::sync::mpsc::Receiver<StreamEvent<Self::Item>>,
349        std::sync::Arc<StreamControl>,
350    )>;
351
352    /// Streams generation with a custom progress callback.
353    fn stream_with_progress<F>(
354        &mut self,
355        config: StreamConfig,
356        on_progress: F,
357    ) -> SynthResult<(
358        std::sync::mpsc::Receiver<StreamEvent<Self::Item>>,
359        std::sync::Arc<StreamControl>,
360    )>
361    where
362        F: Fn(&StreamProgress) + Send + Sync + 'static;
363}
364
365/// Trait for output sinks that support streaming input.
366pub trait StreamingSink<T>: Send {
367    /// Processes a stream event.
368    fn process(&mut self, event: StreamEvent<T>) -> SynthResult<()>;
369
370    /// Flushes any buffered data.
371    fn flush(&mut self) -> SynthResult<()>;
372
373    /// Closes the sink and releases resources.
374    fn close(self) -> SynthResult<()>;
375
376    /// Returns the number of items processed.
377    fn items_processed(&self) -> u64;
378}
379
380/// A simple collector sink that stores all items in memory.
381pub struct CollectorSink<T> {
382    items: Vec<T>,
383    errors: Vec<StreamError>,
384    summary: Option<StreamSummary>,
385}
386
387impl<T> CollectorSink<T> {
388    /// Creates a new collector sink.
389    pub fn new() -> Self {
390        Self {
391            items: Vec::new(),
392            errors: Vec::new(),
393            summary: None,
394        }
395    }
396
397    /// Creates a collector with pre-allocated capacity.
398    pub fn with_capacity(capacity: usize) -> Self {
399        Self {
400            items: Vec::with_capacity(capacity),
401            errors: Vec::new(),
402            summary: None,
403        }
404    }
405
406    /// Returns the collected items.
407    pub fn into_items(self) -> Vec<T> {
408        self.items
409    }
410
411    /// Returns references to collected items.
412    pub fn items(&self) -> &[T] {
413        &self.items
414    }
415
416    /// Returns collected errors.
417    pub fn errors(&self) -> &[StreamError] {
418        &self.errors
419    }
420
421    /// Returns the stream summary if generation completed.
422    pub fn summary(&self) -> Option<&StreamSummary> {
423        self.summary.as_ref()
424    }
425}
426
427impl<T> Default for CollectorSink<T> {
428    fn default() -> Self {
429        Self::new()
430    }
431}
432
433impl<T: Send> StreamingSink<T> for CollectorSink<T> {
434    fn process(&mut self, event: StreamEvent<T>) -> SynthResult<()> {
435        match event {
436            StreamEvent::Data(item) => {
437                self.items.push(item);
438            }
439            StreamEvent::Error(error) => {
440                self.errors.push(error);
441            }
442            StreamEvent::Complete(summary) => {
443                self.summary = Some(summary);
444            }
445            _ => {}
446        }
447        Ok(())
448    }
449
450    fn flush(&mut self) -> SynthResult<()> {
451        Ok(())
452    }
453
454    fn close(self) -> SynthResult<()> {
455        Ok(())
456    }
457
458    fn items_processed(&self) -> u64 {
459        self.items.len() as u64
460    }
461}
462
463#[cfg(test)]
464mod tests {
465    use super::*;
466
467    #[test]
468    fn test_stream_progress() {
469        let mut progress = StreamProgress::new("test_phase");
470        progress.update(1000, 2000);
471
472        assert_eq!(progress.items_generated, 1000);
473        assert_eq!(progress.items_per_second, 500.0);
474    }
475
476    #[test]
477    fn test_stream_error() {
478        let error =
479            StreamError::new("test error", StreamErrorCategory::Generation).with_affected_items(5);
480
481        assert_eq!(error.message, "test error");
482        assert_eq!(error.items_affected, Some(5));
483        assert!(error.recoverable);
484    }
485
486    #[test]
487    fn test_stream_summary() {
488        let summary = StreamSummary::new(10000, 5000);
489
490        assert_eq!(summary.total_items, 10000);
491        assert_eq!(summary.avg_items_per_second, 2000.0);
492    }
493
494    #[test]
495    fn test_stream_control() {
496        let control = StreamControl::new();
497
498        assert!(!control.is_cancelled());
499        assert!(!control.is_paused());
500
501        control.pause();
502        assert!(control.is_paused());
503
504        control.resume();
505        assert!(!control.is_paused());
506
507        control.cancel();
508        assert!(control.is_cancelled());
509    }
510
511    #[test]
512    fn test_collector_sink() {
513        let mut sink = CollectorSink::new();
514
515        sink.process(StreamEvent::Data(1)).unwrap();
516        sink.process(StreamEvent::Data(2)).unwrap();
517        sink.process(StreamEvent::Data(3)).unwrap();
518
519        assert_eq!(sink.items(), &[1, 2, 3]);
520        assert_eq!(sink.items_processed(), 3);
521    }
522
523    #[test]
524    fn test_backpressure_strategy_default() {
525        let strategy = BackpressureStrategy::default();
526        assert_eq!(strategy, BackpressureStrategy::Block);
527    }
528
529    #[test]
530    fn test_stream_config_default() {
531        let config = StreamConfig::default();
532        assert_eq!(config.buffer_size, 1000);
533        assert!(config.enable_progress);
534        assert_eq!(config.progress_interval, 100);
535    }
536}