1use std::fmt;
7use std::time::Duration;
8
9use serde::{Deserialize, Serialize};
10
11use crate::error::SynthResult;
12
13#[derive(Debug, Clone)]
15pub enum StreamEvent<T> {
16 Data(T),
18 Progress(StreamProgress),
20 BatchComplete {
22 batch_id: u64,
24 count: usize,
26 },
27 Error(StreamError),
29 Complete(StreamSummary),
31}
32
33impl<T> StreamEvent<T> {
34 pub fn is_data(&self) -> bool {
36 matches!(self, StreamEvent::Data(_))
37 }
38
39 pub fn is_complete(&self) -> bool {
41 matches!(self, StreamEvent::Complete(_))
42 }
43
44 pub fn is_error(&self) -> bool {
46 matches!(self, StreamEvent::Error(_))
47 }
48
49 pub fn into_data(self) -> Option<T> {
51 match self {
52 StreamEvent::Data(data) => Some(data),
53 _ => None,
54 }
55 }
56}
57
58#[derive(Debug, Clone, Serialize, Deserialize)]
60pub struct StreamProgress {
61 pub items_generated: u64,
63 pub items_per_second: f64,
65 pub elapsed_ms: u64,
67 pub phase: String,
69 #[serde(skip_serializing_if = "Option::is_none")]
71 pub memory_usage_mb: Option<u64>,
72 #[serde(skip_serializing_if = "Option::is_none")]
74 pub buffer_fill_ratio: Option<f64>,
75 #[serde(skip_serializing_if = "Option::is_none")]
77 pub items_remaining: Option<u64>,
78}
79
80impl StreamProgress {
81 pub fn new(phase: impl Into<String>) -> Self {
83 Self {
84 items_generated: 0,
85 items_per_second: 0.0,
86 elapsed_ms: 0,
87 phase: phase.into(),
88 memory_usage_mb: None,
89 buffer_fill_ratio: None,
90 items_remaining: None,
91 }
92 }
93
94 pub fn update(&mut self, items_generated: u64, elapsed_ms: u64) {
96 self.items_generated = items_generated;
97 self.elapsed_ms = elapsed_ms;
98 if elapsed_ms > 0 {
99 self.items_per_second = (items_generated as f64) / (elapsed_ms as f64 / 1000.0);
100 }
101 }
102
103 pub fn eta_ms(&self) -> Option<u64> {
105 self.items_remaining.map(|remaining| {
106 if self.items_per_second > 0.0 {
107 ((remaining as f64 / self.items_per_second) * 1000.0) as u64
108 } else {
109 0
110 }
111 })
112 }
113}
114
115#[derive(Debug, Clone, Serialize, Deserialize)]
117pub struct StreamError {
118 pub message: String,
120 pub category: StreamErrorCategory,
122 pub recoverable: bool,
124 pub items_affected: Option<usize>,
126}
127
128impl StreamError {
129 pub fn new(message: impl Into<String>, category: StreamErrorCategory) -> Self {
131 Self {
132 message: message.into(),
133 category,
134 recoverable: true,
135 items_affected: None,
136 }
137 }
138
139 pub fn non_recoverable(mut self) -> Self {
141 self.recoverable = false;
142 self
143 }
144
145 pub fn with_affected_items(mut self, count: usize) -> Self {
147 self.items_affected = Some(count);
148 self
149 }
150}
151
152impl fmt::Display for StreamError {
153 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
154 write!(f, "[{:?}] {}", self.category, self.message)
155 }
156}
157
158impl std::error::Error for StreamError {}
159
160#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
162#[serde(rename_all = "snake_case")]
163pub enum StreamErrorCategory {
164 Configuration,
166 Generation,
168 Output,
170 Resource,
172 Validation,
174 Network,
176 Internal,
178}
179
180#[derive(Debug, Clone, Serialize, Deserialize)]
182pub struct StreamSummary {
183 pub total_items: u64,
185 pub total_time_ms: u64,
187 pub avg_items_per_second: f64,
189 pub error_count: u64,
191 pub dropped_count: u64,
193 #[serde(skip_serializing_if = "Option::is_none")]
195 pub peak_memory_mb: Option<u64>,
196 pub phases_completed: Vec<String>,
198}
199
200impl StreamSummary {
201 pub fn new(total_items: u64, total_time_ms: u64) -> Self {
203 let avg_items_per_second = if total_time_ms > 0 {
204 (total_items as f64) / (total_time_ms as f64 / 1000.0)
205 } else {
206 0.0
207 };
208
209 Self {
210 total_items,
211 total_time_ms,
212 avg_items_per_second,
213 error_count: 0,
214 dropped_count: 0,
215 peak_memory_mb: None,
216 phases_completed: Vec::new(),
217 }
218 }
219}
220
221#[derive(Debug, Clone)]
223pub struct StreamConfig {
224 pub buffer_size: usize,
226 pub enable_progress: bool,
228 pub progress_interval: u64,
230 pub backpressure: BackpressureStrategy,
232 pub timeout: Option<Duration>,
234 pub batch_size: usize,
236}
237
238impl Default for StreamConfig {
239 fn default() -> Self {
240 Self {
241 buffer_size: 1000,
242 enable_progress: true,
243 progress_interval: 100,
244 backpressure: BackpressureStrategy::Block,
245 timeout: None,
246 batch_size: 100,
247 }
248 }
249}
250
251#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
253#[serde(rename_all = "snake_case")]
254pub enum BackpressureStrategy {
255 #[default]
257 Block,
258 DropOldest,
260 DropNewest,
262 Buffer {
264 max_overflow: usize,
266 },
267}
268
269#[derive(Debug)]
273pub struct StreamControl {
274 cancelled: std::sync::atomic::AtomicBool,
276 paused: std::sync::atomic::AtomicBool,
278}
279
280impl StreamControl {
281 pub fn new() -> Self {
283 Self {
284 cancelled: std::sync::atomic::AtomicBool::new(false),
285 paused: std::sync::atomic::AtomicBool::new(false),
286 }
287 }
288
289 pub fn cancel(&self) {
291 self.cancelled
292 .store(true, std::sync::atomic::Ordering::SeqCst);
293 }
294
295 pub fn pause(&self) {
297 self.paused.store(true, std::sync::atomic::Ordering::SeqCst);
298 }
299
300 pub fn resume(&self) {
302 self.paused
303 .store(false, std::sync::atomic::Ordering::SeqCst);
304 }
305
306 pub fn is_cancelled(&self) -> bool {
308 self.cancelled.load(std::sync::atomic::Ordering::SeqCst)
309 }
310
311 pub fn is_paused(&self) -> bool {
313 self.paused.load(std::sync::atomic::Ordering::SeqCst)
314 }
315}
316
317impl Default for StreamControl {
318 fn default() -> Self {
319 Self::new()
320 }
321}
322
323impl Clone for StreamControl {
324 fn clone(&self) -> Self {
325 Self {
326 cancelled: std::sync::atomic::AtomicBool::new(self.is_cancelled()),
327 paused: std::sync::atomic::AtomicBool::new(self.is_paused()),
328 }
329 }
330}
331
332#[allow(clippy::type_complexity)]
337pub trait StreamingGenerator {
338 type Item: Clone + Send + 'static;
340
341 fn stream(
345 &mut self,
346 config: StreamConfig,
347 ) -> SynthResult<(
348 std::sync::mpsc::Receiver<StreamEvent<Self::Item>>,
349 std::sync::Arc<StreamControl>,
350 )>;
351
352 fn stream_with_progress<F>(
354 &mut self,
355 config: StreamConfig,
356 on_progress: F,
357 ) -> SynthResult<(
358 std::sync::mpsc::Receiver<StreamEvent<Self::Item>>,
359 std::sync::Arc<StreamControl>,
360 )>
361 where
362 F: Fn(&StreamProgress) + Send + Sync + 'static;
363}
364
365pub trait StreamingSink<T>: Send {
367 fn process(&mut self, event: StreamEvent<T>) -> SynthResult<()>;
369
370 fn flush(&mut self) -> SynthResult<()>;
372
373 fn close(self) -> SynthResult<()>;
375
376 fn items_processed(&self) -> u64;
378}
379
380pub struct CollectorSink<T> {
382 items: Vec<T>,
383 errors: Vec<StreamError>,
384 summary: Option<StreamSummary>,
385}
386
387impl<T> CollectorSink<T> {
388 pub fn new() -> Self {
390 Self {
391 items: Vec::new(),
392 errors: Vec::new(),
393 summary: None,
394 }
395 }
396
397 pub fn with_capacity(capacity: usize) -> Self {
399 Self {
400 items: Vec::with_capacity(capacity),
401 errors: Vec::new(),
402 summary: None,
403 }
404 }
405
406 pub fn into_items(self) -> Vec<T> {
408 self.items
409 }
410
411 pub fn items(&self) -> &[T] {
413 &self.items
414 }
415
416 pub fn errors(&self) -> &[StreamError] {
418 &self.errors
419 }
420
421 pub fn summary(&self) -> Option<&StreamSummary> {
423 self.summary.as_ref()
424 }
425}
426
427impl<T> Default for CollectorSink<T> {
428 fn default() -> Self {
429 Self::new()
430 }
431}
432
433impl<T: Send> StreamingSink<T> for CollectorSink<T> {
434 fn process(&mut self, event: StreamEvent<T>) -> SynthResult<()> {
435 match event {
436 StreamEvent::Data(item) => {
437 self.items.push(item);
438 }
439 StreamEvent::Error(error) => {
440 self.errors.push(error);
441 }
442 StreamEvent::Complete(summary) => {
443 self.summary = Some(summary);
444 }
445 _ => {}
446 }
447 Ok(())
448 }
449
450 fn flush(&mut self) -> SynthResult<()> {
451 Ok(())
452 }
453
454 fn close(self) -> SynthResult<()> {
455 Ok(())
456 }
457
458 fn items_processed(&self) -> u64 {
459 self.items.len() as u64
460 }
461}
462
463#[cfg(test)]
464mod tests {
465 use super::*;
466
467 #[test]
468 fn test_stream_progress() {
469 let mut progress = StreamProgress::new("test_phase");
470 progress.update(1000, 2000);
471
472 assert_eq!(progress.items_generated, 1000);
473 assert_eq!(progress.items_per_second, 500.0);
474 }
475
476 #[test]
477 fn test_stream_error() {
478 let error =
479 StreamError::new("test error", StreamErrorCategory::Generation).with_affected_items(5);
480
481 assert_eq!(error.message, "test error");
482 assert_eq!(error.items_affected, Some(5));
483 assert!(error.recoverable);
484 }
485
486 #[test]
487 fn test_stream_summary() {
488 let summary = StreamSummary::new(10000, 5000);
489
490 assert_eq!(summary.total_items, 10000);
491 assert_eq!(summary.avg_items_per_second, 2000.0);
492 }
493
494 #[test]
495 fn test_stream_control() {
496 let control = StreamControl::new();
497
498 assert!(!control.is_cancelled());
499 assert!(!control.is_paused());
500
501 control.pause();
502 assert!(control.is_paused());
503
504 control.resume();
505 assert!(!control.is_paused());
506
507 control.cancel();
508 assert!(control.is_cancelled());
509 }
510
511 #[test]
512 fn test_collector_sink() {
513 let mut sink = CollectorSink::new();
514
515 sink.process(StreamEvent::Data(1)).unwrap();
516 sink.process(StreamEvent::Data(2)).unwrap();
517 sink.process(StreamEvent::Data(3)).unwrap();
518
519 assert_eq!(sink.items(), &[1, 2, 3]);
520 assert_eq!(sink.items_processed(), 3);
521 }
522
523 #[test]
524 fn test_backpressure_strategy_default() {
525 let strategy = BackpressureStrategy::default();
526 assert_eq!(strategy, BackpressureStrategy::Block);
527 }
528
529 #[test]
530 fn test_stream_config_default() {
531 let config = StreamConfig::default();
532 assert_eq!(config.buffer_size, 1000);
533 assert!(config.enable_progress);
534 assert_eq!(config.progress_interval, 100);
535 }
536}