1use crate::error::{CoreError, ErrorContext, ErrorLocation};
12use crate::memory_efficient::chunked::{ChunkedArray, ChunkingStrategy};
13use crate::memory_efficient::prefetch::PrefetchConfig;
14use ::ndarray::{ArrayBase, Dimension, OwnedRepr, RemoveAxis};
15use std::collections::{BTreeMap, VecDeque};
16use std::sync::{Arc, Condvar, Mutex, RwLock};
17use std::thread::{self, JoinHandle};
18use std::time::{Duration, Instant};
19
20type ProcessFn<T, U> = Arc<dyn Fn(Vec<T>) -> Result<Vec<U>, CoreError> + Send + Sync>;
22
23#[derive(Debug, Clone, Copy, PartialEq, Eq)]
25pub enum StreamMode {
26 Immediate,
28 Buffered,
30 Adaptive,
32 SlidingWindow,
34}
35
36#[derive(Debug, Clone, Copy, PartialEq, Eq)]
38pub enum StreamSource {
39 File,
41 Network,
43 Sensor,
45 Generated,
47 Stream,
49}
50
51#[derive(Debug, Clone, Copy, PartialEq, Eq)]
53pub enum StreamState {
54 Initialized,
56 Running,
58 Paused,
60 Completed,
62 Error,
64}
65
66#[derive(Debug, Clone)]
68pub struct StreamConfig {
69 pub mode: StreamMode,
71 pub buffersize: usize,
73 pub max_batch_size: usize,
75 pub min_batch_size: usize,
77 pub chunk_size: usize,
79 pub parallel: bool,
81 pub workers: Option<usize>,
83 pub rate_limit: usize,
85 pub timeout_ms: u64,
87 pub enable_prefetch: bool,
89 pub prefetch_config: Option<PrefetchConfig>,
91 pub enable_backpressure: bool,
93 pub windowsize: usize,
95 pub window_stride: usize,
97}
98
99impl Default for StreamConfig {
100 fn default() -> Self {
101 Self {
102 mode: StreamMode::Buffered,
103 buffersize: 1024 * 1024, max_batch_size: 65536, min_batch_size: 1024, chunk_size: 1024, parallel: true,
108 workers: None,
109 rate_limit: 0,
110 timeout_ms: 1000,
111 enable_prefetch: true,
112 prefetch_config: None,
113 enable_backpressure: true,
114 windowsize: 1024,
115 window_stride: 256,
116 }
117 }
118}
119
120#[derive(Debug, Clone, Default)]
122pub struct StreamConfigBuilder {
123 config: StreamConfig,
124}
125
126impl StreamConfigBuilder {
127 pub fn new() -> Self {
129 Self::default()
130 }
131
132 pub const fn mode(mut self, mode: StreamMode) -> Self {
134 self.config.mode = mode;
135 self
136 }
137
138 pub const fn buffersize(mut self, size: usize) -> Self {
140 self.config.buffersize = size;
141 self
142 }
143
144 pub const fn max_batch_size(mut self, size: usize) -> Self {
146 self.config.max_batch_size = size;
147 self
148 }
149
150 pub const fn min_batch_size(mut self, size: usize) -> Self {
152 self.config.min_batch_size = size;
153 self
154 }
155
156 pub const fn chunk_size(mut self, size: usize) -> Self {
158 self.config.chunk_size = size;
159 self
160 }
161
162 pub const fn parallel(mut self, enable: bool) -> Self {
164 self.config.parallel = enable;
165 self
166 }
167
168 pub const fn workers(mut self, workers: Option<usize>) -> Self {
170 self.config.workers = workers;
171 self
172 }
173
174 pub const fn rate_limit(mut self, limit: usize) -> Self {
176 self.config.rate_limit = limit;
177 self
178 }
179
180 pub const fn timeout_ms(mut self, timeout: u64) -> Self {
182 self.config.timeout_ms = timeout;
183 self
184 }
185
186 pub const fn enable_prefetch(mut self, enable: bool) -> Self {
188 self.config.enable_prefetch = enable;
189 self
190 }
191
192 pub const fn prefetch_config(mut self, config: Option<PrefetchConfig>) -> Self {
194 self.config.prefetch_config = config;
195 self
196 }
197
198 pub const fn enable_backpressure(mut self, enable: bool) -> Self {
200 self.config.enable_backpressure = enable;
201 self
202 }
203
204 pub const fn windowsize(mut self, size: usize) -> Self {
206 self.config.windowsize = size;
207 self
208 }
209
210 pub const fn window_stride(mut self, stride: usize) -> Self {
212 self.config.window_stride = stride;
213 self
214 }
215
216 pub fn build(self) -> StreamConfig {
218 self.config
219 }
220}
221
222#[derive(Debug, Clone)]
224pub struct StreamStats {
225 pub processed_items: usize,
227 pub processed_batches: usize,
229 pub avg_batch_size: f64,
231 pub avg_batch_time_ms: f64,
233 pub avg_throughput: f64,
235 pub uptime_seconds: f64,
237 pub backpressure_count: usize,
239 pub buffer_high_water_mark: usize,
241 pub error_count: usize,
243 pub lasterror: Option<String>,
245}
246
247impl Default for StreamStats {
248 fn default() -> Self {
249 Self {
250 processed_items: 0,
251 processed_batches: 0,
252 avg_batch_size: 0.0,
253 avg_batch_time_ms: 0.0,
254 avg_throughput: 0.0,
255 uptime_seconds: 0.0,
256 backpressure_count: 0,
257 buffer_high_water_mark: 0,
258 error_count: 0,
259 lasterror: None,
260 }
261 }
262}
263
264#[derive(Debug)]
266struct StreamBuffer<T: Clone + Send + 'static> {
267 data: VecDeque<T>,
269 maxsize: usize,
271 mutex: Mutex<()>,
273 condvar: Condvar,
275 closed: bool,
277}
278
279impl<T: Clone + Send + 'static> StreamBuffer<T> {
280 fn new(maxsize: usize) -> Self {
282 Self {
283 data: VecDeque::with_capacity(maxsize),
284 maxsize,
285 mutex: Mutex::new(()),
286 condvar: Condvar::new(),
287 closed: false,
288 }
289 }
290
291 fn push(&mut self, item: T) -> Result<(), CoreError> {
293 let mut guard = self.mutex.lock().expect("Failed to acquire lock");
294
295 if self.closed {
297 return Err(CoreError::StreamError(
298 ErrorContext::new("Stream is closed".to_string())
299 .with_location(ErrorLocation::new(file!(), line!())),
300 ));
301 }
302
303 while self.data.len() >= self.maxsize {
305 guard = self
306 .condvar
307 .wait(guard)
308 .expect("Condition variable wait failed");
309
310 if self.closed {
312 return Err(CoreError::StreamError(
313 ErrorContext::new("Stream is closed".to_string())
314 .with_location(ErrorLocation::new(file!(), line!())),
315 ));
316 }
317 }
318
319 self.data.push_back(item);
321
322 self.condvar.notify_one();
324
325 Ok(())
326 }
327
328 fn push_batch(&mut self, items: Vec<T>) -> Result<(), CoreError> {
330 let mut guard = self.mutex.lock().expect("Failed to acquire lock");
331
332 if self.closed {
334 return Err(CoreError::StreamError(
335 ErrorContext::new("Stream is closed".to_string())
336 .with_location(ErrorLocation::new(file!(), line!())),
337 ));
338 }
339
340 while self.data.len() + items.len() > self.maxsize {
342 guard = self
343 .condvar
344 .wait(guard)
345 .expect("Condition variable wait failed");
346
347 if self.closed {
349 return Err(CoreError::StreamError(
350 ErrorContext::new("Stream is closed".to_string())
351 .with_location(ErrorLocation::new(file!(), line!())),
352 ));
353 }
354 }
355
356 self.data.extend(items);
358
359 self.condvar.notify_one();
361
362 Ok(())
363 }
364
365 fn pop_batch(&mut self, max_batch_size: usize, timeoutms: u64) -> Result<Vec<T>, CoreError> {
367 let mut guard = self.mutex.lock().expect("Failed to acquire lock");
368
369 if self.data.is_empty() && !self.closed {
371 if timeoutms > 0 {
372 let timeout = Duration::from_millis(timeoutms);
373 let result = self.condvar.wait_timeout(guard, timeout);
374
375 match result {
376 Ok((g, timeout_result)) => {
377 #[allow(unused_assignments)]
378 {
379 guard = g;
380 }
381
382 if timeout_result.timed_out() && self.data.is_empty() {
384 return Err(CoreError::TimeoutError(
385 ErrorContext::new("Timeout waiting for data".to_string())
386 .with_location(ErrorLocation::new(file!(), line!())),
387 ));
388 }
389 }
390 Err(_) => {
391 return Err(CoreError::StreamError(
392 ErrorContext::new("Error waiting for data".to_string())
393 .with_location(ErrorLocation::new(file!(), line!())),
394 ));
395 }
396 }
397 } else {
398 #[allow(unused_assignments)]
400 {
401 guard = self
402 .condvar
403 .wait(guard)
404 .expect("Condition variable wait failed");
405 }
406 }
407 }
408
409 if self.data.is_empty() && self.closed {
411 return Err(CoreError::EndOfStream(
412 ErrorContext::new("End of stream".to_string())
413 .with_location(ErrorLocation::new(file!(), line!())),
414 ));
415 }
416
417 let batch_size = std::cmp::min(max_batch_size, self.data.len());
419 let mut batch = Vec::with_capacity(batch_size);
420
421 for _ in 0..batch_size {
422 if let Some(item) = self.data.pop_front() {
423 batch.push(item);
424 } else {
425 break;
426 }
427 }
428
429 self.condvar.notify_one();
431
432 Ok(batch)
433 }
434
435 fn len(&self) -> usize {
437 let _guard = self.mutex.lock().expect("Failed to acquire lock");
438 self.data.len()
439 }
440
441 fn is_empty(&self) -> bool {
443 let _guard = self.mutex.lock().expect("Failed to acquire lock");
444 self.data.is_empty()
445 }
446
447 fn close(&mut self) {
449 let _guard = self.mutex.lock().expect("Failed to acquire lock");
450 self.closed = true;
451 self.condvar.notify_all();
452 }
453
454 #[allow(dead_code)]
456 fn is_closed(&self) -> bool {
457 let _guard = self.mutex.lock().expect("Failed to acquire lock");
458 self.closed
459 }
460
461 fn clear(&mut self) {
463 let _guard = self.mutex.lock().expect("Failed to acquire lock");
464 self.data.clear();
465 self.condvar.notify_all();
466 }
467}
468
469pub struct StreamProcessor<T: Clone + Send + 'static, U: Clone + Send + 'static> {
471 config: StreamConfig,
473 input_buffer: Arc<Mutex<StreamBuffer<T>>>,
475 processfn: ProcessFn<T, U>,
477 output_buffer: Arc<Mutex<StreamBuffer<U>>>,
479 state: Arc<RwLock<StreamState>>,
481 stats: Arc<RwLock<StreamStats>>,
483 worker_thread: Option<JoinHandle<()>>,
485 start_time: Arc<RwLock<Option<Instant>>>,
487}
488
489impl<T, U> std::fmt::Debug for StreamProcessor<T, U>
490where
491 T: Clone + Send + 'static,
492 U: Clone + Send + 'static,
493{
494 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
495 f.debug_struct("StreamProcessor")
496 .field("config", &self.config)
497 .field("state", &self.state)
498 .field("stats", &self.stats)
499 .field("worker_thread", &self.worker_thread.is_some())
500 .field("start_time", &self.start_time)
501 .finish_non_exhaustive()
502 }
503}
504
505impl<T: Clone + Send + 'static, U: Clone + Send + 'static> StreamProcessor<T, U> {
506 pub fn new<F>(config: StreamConfig, processfn: F) -> Self
508 where
509 F: Fn(Vec<T>) -> Result<Vec<U>, CoreError> + Send + Sync + 'static,
510 {
511 let input_buffer = Arc::new(Mutex::new(StreamBuffer::new(config.buffersize)));
512 let output_buffer = Arc::new(Mutex::new(StreamBuffer::new(config.buffersize)));
513
514 Self {
515 config,
516 input_buffer,
517 processfn: Arc::new(processfn),
518 output_buffer,
519 state: Arc::new(RwLock::new(StreamState::Initialized)),
520 stats: Arc::new(RwLock::new(StreamStats::default())),
521 worker_thread: None,
522 start_time: Arc::new(RwLock::new(None)),
523 }
524 }
525
526 pub fn start(&mut self) -> Result<(), CoreError> {
528 let mut state = self.state.write().expect("Failed to acquire write lock");
529
530 if *state == StreamState::Running {
532 return Err(CoreError::StreamError(
533 ErrorContext::new("Stream already running".to_string())
534 .with_location(ErrorLocation::new(file!(), line!())),
535 ));
536 }
537
538 *state = StreamState::Running;
540
541 let mut start_time = self
543 .start_time
544 .write()
545 .expect("Failed to acquire write lock");
546 *start_time = Some(Instant::now());
547
548 let input_buffer = self.input_buffer.clone();
550 let output_buffer = self.output_buffer.clone();
551 let processfn = self.processfn.clone();
552 let config = self.config.clone();
553 let state = self.state.clone();
554 let stats = self.stats.clone();
555 let start_time_clone = self.start_time.clone();
556
557 let worker = thread::spawn(move || {
558 Self::worker_loop(
559 input_buffer,
560 output_buffer,
561 processfn,
562 config,
563 state,
564 stats,
565 start_time_clone,
566 );
567 });
568
569 self.worker_thread = Some(worker);
570
571 Ok(())
572 }
573
574 fn worker_loop(
576 input_buffer: Arc<Mutex<StreamBuffer<T>>>,
577 output_buffer: Arc<Mutex<StreamBuffer<U>>>,
578 processfn: ProcessFn<T, U>,
579 config: StreamConfig,
580 state: Arc<RwLock<StreamState>>,
581 stats: Arc<RwLock<StreamStats>>,
582 start_time: Arc<RwLock<Option<Instant>>>,
583 ) {
584 let rate_limit = config.rate_limit;
586 let mut last_batch_time = Instant::now();
587 let mut batch_window = VecDeque::new();
588
589 loop {
591 {
593 let current_state = state.read().expect("Failed to acquire read lock");
594 if *current_state != StreamState::Running {
595 break;
596 }
597 }
598
599 if rate_limit > 0 {
601 let min_time_per_batch =
603 Duration::from_secs_f64(config.min_batch_size as f64 / rate_limit as f64);
604
605 let elapsed = last_batch_time.elapsed();
607 if elapsed < min_time_per_batch {
608 thread::sleep(min_time_per_batch - elapsed);
609 }
610 }
611
612 let batch_size = match config.mode {
614 StreamMode::Immediate => 1,
615 StreamMode::Buffered => config.max_batch_size,
616 StreamMode::Adaptive => {
617 let stats_guard = stats.read().expect("Failed to acquire read lock");
619 let avg_time = stats_guard.avg_batch_time_ms;
620
621 if avg_time < 10.0 {
622 config.max_batch_size
624 } else if avg_time < 50.0 {
625 (config.max_batch_size + config.min_batch_size) / 2
627 } else {
628 config.min_batch_size
630 }
631 }
632 StreamMode::SlidingWindow => config.windowsize,
633 };
634
635 let input_batch = match input_buffer
637 .lock()
638 .expect("Failed to acquire lock")
639 .pop_batch(batch_size, config.timeout_ms)
640 {
641 Ok(batch) => batch,
642 Err(err) => {
643 match err {
644 CoreError::EndOfStream(_) => {
645 let mut current_state =
647 state.write().expect("Failed to acquire write lock");
648 *current_state = StreamState::Completed;
649 break;
650 }
651 CoreError::TimeoutError(_) => {
652 continue;
654 }
655 _ => {
656 let mut stats_guard =
658 stats.write().expect("Failed to acquire write lock");
659 stats_guard.error_count += 1;
660 stats_guard.lasterror = Some(err.to_string());
661 continue;
662 }
663 }
664 }
665 };
666
667 if input_batch.is_empty() {
669 continue;
670 }
671
672 let process_input = if config.mode == StreamMode::SlidingWindow {
674 if batch_window.len() < config.windowsize {
675 batch_window.extend(input_batch);
677
678 if batch_window.len() < config.windowsize {
679 continue;
681 }
682
683 batch_window.make_contiguous().to_vec()
685 } else {
686 let stride = std::cmp::min(config.window_stride, input_batch.len());
688
689 for _ in 0..stride {
691 batch_window.pop_front();
692 }
693
694 batch_window.extend(input_batch);
696
697 batch_window.make_contiguous().to_vec()
699 }
700 } else {
701 input_batch
703 };
704
705 let process_result = {
707 let batch_start_time = Instant::now();
708 let result = processfn(process_input.clone());
709
710 let mut stats_guard = stats.write().expect("Failed to acquire write lock");
712 stats_guard.processed_batches += 1;
713 stats_guard.processed_items += process_input.len();
714
715 let total_items = stats_guard.processed_items;
717 let total_batches = stats_guard.processed_batches;
718 stats_guard.avg_batch_size = total_items as f64 / total_batches as f64;
719
720 let batch_time = batch_start_time.elapsed().as_millis() as f64;
722 stats_guard.avg_batch_time_ms =
723 (stats_guard.avg_batch_time_ms * (total_batches - 1) as f64 + batch_time)
724 / total_batches as f64;
725
726 if let Some(start) = *start_time.read().expect("Failed to acquire read lock") {
728 let uptime_seconds = start.elapsed().as_secs_f64();
729 stats_guard.uptime_seconds = uptime_seconds;
730 stats_guard.avg_throughput = total_items as f64 / uptime_seconds;
731 }
732
733 let buffer_len = input_buffer.lock().expect("Failed to acquire lock").len();
735 if buffer_len > stats_guard.buffer_high_water_mark {
736 stats_guard.buffer_high_water_mark = buffer_len;
737 }
738
739 result
740 };
741
742 match process_result {
744 Ok(output_batch) => {
745 if !output_batch.is_empty() {
747 match output_buffer
748 .lock()
749 .expect("Failed to acquire lock")
750 .push_batch(output_batch)
751 {
752 Ok(_) => {}
753 Err(err) => {
754 let mut stats_guard =
756 stats.write().expect("Failed to acquire write lock");
757 stats_guard.error_count += 1;
758 stats_guard.lasterror = Some(err.to_string());
759 }
760 }
761 }
762 }
763 Err(err) => {
764 let mut stats_guard = stats.write().expect("Failed to acquire write lock");
766 stats_guard.error_count += 1;
767 stats_guard.lasterror = Some(err.to_string());
768 }
769 }
770
771 last_batch_time = Instant::now();
773 }
774 }
775
776 pub fn stop(&mut self) -> Result<(), CoreError> {
778 let mut state = self.state.write().expect("Failed to acquire write lock");
779
780 if *state != StreamState::Running {
782 return Err(CoreError::StreamError(
783 ErrorContext::new("Stream not running".to_string())
784 .with_location(ErrorLocation::new(file!(), line!())),
785 ));
786 }
787
788 *state = StreamState::Paused;
790
791 self.input_buffer
793 .lock()
794 .expect("Failed to acquire lock")
795 .close();
796
797 if let Some(worker) = self.worker_thread.take() {
799 match worker.join() {
800 Ok(_) => {}
801 Err(_) => {
802 return Err(CoreError::StreamError(
803 ErrorContext::new("Error joining worker thread".to_string())
804 .with_location(ErrorLocation::new(file!(), line!())),
805 ));
806 }
807 }
808 }
809
810 Ok(())
811 }
812
813 pub fn push(&self, data: T) -> Result<(), CoreError> {
815 let state = self.state.read().expect("Failed to acquire read lock");
817 if *state != StreamState::Running {
818 return Err(CoreError::StreamError(
819 ErrorContext::new("Stream not running".to_string())
820 .with_location(ErrorLocation::new(file!(), line!())),
821 ));
822 }
823
824 self.input_buffer
826 .lock()
827 .expect("Failed to acquire lock")
828 .push(data)
829 }
830
831 pub fn push_batch(&self, data: Vec<T>) -> Result<(), CoreError> {
833 let state = self.state.read().expect("Failed to acquire read lock");
835 if *state != StreamState::Running {
836 return Err(CoreError::StreamError(
837 ErrorContext::new("Stream not running".to_string())
838 .with_location(ErrorLocation::new(file!(), line!())),
839 ));
840 }
841
842 self.input_buffer
844 .lock()
845 .expect("Failed to acquire lock")
846 .push_batch(data)
847 }
848
849 pub fn pop(&self) -> Result<U, CoreError> {
851 let state = self.state.read().expect("Failed to acquire read lock");
853 if *state != StreamState::Running && *state != StreamState::Completed {
854 return Err(CoreError::StreamError(
855 ErrorContext::new("Stream not running or completed".to_string())
856 .with_location(ErrorLocation::new(file!(), line!())),
857 ));
858 }
859
860 let result = self
862 .output_buffer
863 .lock()
864 .expect("Failed to acquire lock")
865 .pop_batch(1, self.config.timeout_ms)?;
866
867 if result.is_empty() {
868 Err(CoreError::TimeoutError(
869 ErrorContext::new("Timeout waiting for data".to_string())
870 .with_location(ErrorLocation::new(file!(), line!())),
871 ))
872 } else {
873 Ok(result[0].clone())
874 }
875 }
876
877 pub fn pop_batch(&self, batchsize: usize) -> Result<Vec<U>, CoreError> {
879 let state = self.state.read().expect("Failed to acquire read lock");
881 if *state != StreamState::Running && *state != StreamState::Completed {
882 return Err(CoreError::StreamError(
883 ErrorContext::new("Stream not running or completed".to_string())
884 .with_location(ErrorLocation::new(file!(), line!())),
885 ));
886 }
887
888 self.output_buffer
890 .lock()
891 .expect("Failed to acquire lock")
892 .pop_batch(batchsize, self.config.timeout_ms)
893 }
894
895 pub fn state(&self) -> StreamState {
897 *self.state.read().expect("Failed to acquire read lock")
898 }
899
900 pub fn stats(&self) -> StreamStats {
902 self.stats
903 .read()
904 .expect("Failed to acquire read lock")
905 .clone()
906 }
907
908 pub fn is_empty(&self) -> bool {
910 self.input_buffer
911 .lock()
912 .expect("Failed to acquire lock")
913 .is_empty()
914 && self
915 .output_buffer
916 .lock()
917 .expect("Failed to acquire lock")
918 .is_empty()
919 }
920
921 pub fn clear(&self) -> Result<(), CoreError> {
923 let state = self.state.read().expect("Failed to acquire read lock");
925 if *state == StreamState::Running {
926 return Err(CoreError::StreamError(
927 ErrorContext::new("Cannot clear running stream".to_string())
928 .with_location(ErrorLocation::new(file!(), line!())),
929 ));
930 }
931
932 self.input_buffer
934 .lock()
935 .expect("Failed to acquire lock")
936 .clear();
937 self.output_buffer
938 .lock()
939 .expect("Failed to acquire lock")
940 .clear();
941
942 Ok(())
943 }
944}
945
946impl<T: Clone + Send + 'static, U: Clone + Send + 'static> Drop for StreamProcessor<T, U> {
947 fn drop(&mut self) {
948 if *self.state.read().expect("Failed to acquire read lock") == StreamState::Running {
950 let _ = self.stop();
951 }
952 }
953}
954
955#[derive(Debug)]
957pub struct PipelineStage<I: Clone + Send + 'static, O: Clone + Send + 'static> {
958 pub name: String,
960 processor: Arc<Mutex<StreamProcessor<I, O>>>,
962 pub parallel: bool,
964 pub parallelism: usize,
966}
967
968impl<I: Clone + Send + 'static, O: Clone + Send + 'static> PipelineStage<I, O> {
969 pub fn new<F>(
971 name: String,
972 config: StreamConfig,
973 processfn: F,
974 parallel: bool,
975 parallelism: usize,
976 ) -> Self
977 where
978 F: Fn(Vec<I>) -> Result<Vec<O>, CoreError> + Send + Sync + Clone + 'static,
979 {
980 let processor = StreamProcessor::new(config, processfn);
981
982 Self {
983 name,
984 processor: Arc::new(Mutex::new(processor)),
985 parallel,
986 parallelism,
987 }
988 }
989
990 pub fn processor(&self) -> Arc<Mutex<StreamProcessor<I, O>>> {
992 self.processor.clone()
993 }
994
995 pub fn start(&self) -> Result<(), CoreError> {
997 self.processor
998 .lock()
999 .expect("Failed to acquire lock")
1000 .start()
1001 }
1002
1003 pub fn stop(&self) -> Result<(), CoreError> {
1005 self.processor
1006 .lock()
1007 .expect("Failed to acquire lock")
1008 .stop()
1009 }
1010
1011 pub fn state(&self) -> StreamState {
1013 self.processor
1014 .lock()
1015 .expect("Failed to acquire lock")
1016 .state()
1017 }
1018
1019 pub fn stats(&self) -> StreamStats {
1021 self.processor
1022 .lock()
1023 .expect("Failed to acquire lock")
1024 .stats()
1025 }
1026}
1027
1028pub struct Pipeline {
1030 pub name: String,
1032 stages: Vec<Box<dyn AnyStage>>,
1034 connections: Vec<(usize, usize)>, workers: Vec<JoinHandle<()>>,
1038 state: Arc<RwLock<StreamState>>,
1040 #[allow(dead_code)]
1042 stats: Arc<RwLock<PipelineStats>>,
1043 error_context: Arc<RwLock<Option<ErrorContext>>>,
1045}
1046
1047#[derive(Debug, Clone)]
1049pub struct PipelineStats {
1050 pub stage_stats: BTreeMap<String, StreamStats>,
1052 pub total_items: usize,
1054 pub uptime_seconds: f64,
1056 pub overall_throughput: f64,
1058 pub bottleneck_stage: Option<String>,
1060 pub bottleneck_throughput: f64,
1062}
1063
1064impl Default for PipelineStats {
1065 fn default() -> Self {
1066 Self {
1067 stage_stats: BTreeMap::new(),
1068 total_items: 0,
1069 uptime_seconds: 0.0,
1070 overall_throughput: 0.0,
1071 bottleneck_stage: None,
1072 bottleneck_throughput: f64::MAX,
1073 }
1074 }
1075}
1076
1077pub trait AnyStage: Send + Sync {
1079 fn name(&self) -> &str;
1081 fn start(&self) -> Result<(), CoreError>;
1083 fn stop(&self) -> Result<(), CoreError>;
1085 fn state(&self) -> StreamState;
1087 fn stats(&self) -> StreamStats;
1089 fn is_empty(&self) -> bool;
1091 fn push_raw(&self, data: Box<dyn std::any::Any + Send>) -> Result<(), CoreError>;
1093 fn pop_raw(&self) -> Result<Box<dyn std::any::Any + Send>, CoreError>;
1095 fn clone_box_impl(&self) -> Box<dyn AnyStage>;
1097}
1098
1099pub struct PipelineBuilder {
1101 name: String,
1103 stages: Vec<Box<dyn AnyStage>>,
1105 connections: Vec<(usize, usize)>, }
1108
1109impl PipelineBuilder {
1110 pub fn new(name: String) -> Self {
1112 Self {
1113 name,
1114 stages: Vec::new(),
1115 connections: Vec::new(),
1116 }
1117 }
1118
1119 pub fn add_stage<I, O, F>(
1121 &mut self,
1122 name: String,
1123 config: StreamConfig,
1124 processfn: F,
1125 parallel: bool,
1126 parallelism: usize,
1127 ) -> usize
1128 where
1129 I: Clone + Send + 'static,
1130 O: Clone + Send + 'static,
1131 F: Fn(Vec<I>) -> Result<Vec<O>, CoreError> + Send + Sync + Clone + 'static,
1132 {
1133 let stage = PipelineStage::new(name, config, processfn, parallel, parallelism);
1134 let stage_index = self.stages.len();
1135 self.stages.push(Box::new(StageWrapper::new(stage)));
1136 stage_index
1137 }
1138
1139 pub fn connect(&mut self, from_stage: usize, tostage: usize) -> &mut Self {
1141 if from_stage < self.stages.len() && tostage < self.stages.len() {
1142 self.connections.push((from_stage, tostage));
1143 }
1144 self
1145 }
1146
1147 pub fn build(self) -> Pipeline {
1149 Pipeline {
1150 name: self.name,
1151 stages: self.stages,
1152 connections: self.connections,
1153 workers: Vec::new(),
1154 state: Arc::new(RwLock::new(StreamState::Initialized)),
1155 stats: Arc::new(RwLock::new(PipelineStats::default())),
1156 error_context: Arc::new(RwLock::new(None)),
1157 }
1158 }
1159}
1160
1161impl Pipeline {
1162 pub fn start(&mut self) -> Result<(), CoreError> {
1164 let mut state = self.state.write().expect("Failed to acquire write lock");
1165
1166 if *state == StreamState::Running {
1168 return Err(CoreError::StreamError(
1169 ErrorContext::new("Pipeline already running".to_string())
1170 .with_location(ErrorLocation::new(file!(), line!())),
1171 ));
1172 }
1173
1174 for stage in &self.stages {
1176 stage.start()?;
1177 }
1178
1179 for (from_stage, to_stage) in &self.connections {
1181 let from_stage = &self.stages[*from_stage];
1182 let to_stage = &self.stages[*to_stage];
1183
1184 let from_stage_clone = from_stage.clone_box();
1185 let to_stage_clone = to_stage.clone_box();
1186 let state_clone = self.state.clone();
1187 let error_context_clone = self.error_context.clone();
1188
1189 let worker = thread::spawn(move || {
1190 Self::connection_worker(
1191 from_stage_clone,
1192 to_stage_clone,
1193 state_clone,
1194 error_context_clone,
1195 );
1196 });
1197
1198 self.workers.push(worker);
1199 }
1200
1201 *state = StreamState::Running;
1203
1204 Ok(())
1205 }
1206
1207 fn connection_worker(
1209 from_stage: Box<dyn AnyStage>,
1210 to_stage: Box<dyn AnyStage>,
1211 state: Arc<RwLock<StreamState>>,
1212 error_context: Arc<RwLock<Option<ErrorContext>>>,
1213 ) {
1214 let mut consecutiveerrors = 0;
1215 let error_threshold = 10; loop {
1219 {
1221 let current_state = state.read().expect("Failed to acquire read lock");
1222 if *current_state != StreamState::Running {
1223 break;
1224 }
1225 }
1226
1227 match from_stage.pop_raw() {
1229 Ok(data) => {
1230 consecutiveerrors = 0;
1232
1233 if let Err(err) = to_stage.push_raw(data) {
1235 consecutiveerrors += 1;
1237
1238 let mut error_context_guard =
1240 error_context.write().expect("Failed to acquire write lock");
1241 *error_context_guard = Some(
1242 ErrorContext::new(format!(
1243 "Error pushing data from {} to {}: {}",
1244 from_stage.name(),
1245 to_stage.name(),
1246 err
1247 ))
1248 .with_location(ErrorLocation::new(file!(), line!())),
1249 );
1250
1251 if consecutiveerrors >= error_threshold {
1253 let mut current_state =
1254 state.write().expect("Failed to acquire write lock");
1255 *current_state = StreamState::Error;
1256 break;
1257 }
1258
1259 thread::sleep(Duration::from_millis(100));
1261 }
1262 }
1263 Err(err) => {
1264 match err {
1265 CoreError::EndOfStream(_) => {
1266 break;
1268 }
1269 CoreError::TimeoutError(_) => {
1270 continue;
1272 }
1273 _ => {
1274 consecutiveerrors += 1;
1276
1277 let mut error_context_guard =
1279 error_context.write().expect("Failed to acquire write lock");
1280 *error_context_guard = Some(
1281 ErrorContext::new(format!(
1282 "Error popping data from {}: {}",
1283 from_stage.name(),
1284 err
1285 ))
1286 .with_location(ErrorLocation::new(file!(), line!())),
1287 );
1288
1289 if consecutiveerrors >= error_threshold {
1291 let mut current_state =
1292 state.write().expect("Failed to acquire write lock");
1293 *current_state = StreamState::Error;
1294 break;
1295 }
1296
1297 thread::sleep(Duration::from_millis(100));
1299 }
1300 }
1301 }
1302 }
1303 }
1304 }
1305
1306 pub fn stop(&mut self) -> Result<(), CoreError> {
1308 let mut state = self.state.write().expect("Failed to acquire write lock");
1309
1310 if *state != StreamState::Running {
1312 return Err(CoreError::StreamError(
1313 ErrorContext::new("Pipeline not running".to_string())
1314 .with_location(ErrorLocation::new(file!(), line!())),
1315 ));
1316 }
1317
1318 *state = StreamState::Paused;
1320
1321 for stage in &self.stages {
1323 stage.stop()?;
1324 }
1325
1326 for worker in self.workers.drain(..) {
1328 match worker.join() {
1329 Ok(_) => {}
1330 Err(_) => {
1331 return Err(CoreError::StreamError(
1332 ErrorContext::new("Error joining worker thread".to_string())
1333 .with_location(ErrorLocation::new(file!(), line!())),
1334 ));
1335 }
1336 }
1337 }
1338
1339 Ok(())
1340 }
1341
1342 pub fn state(&self) -> StreamState {
1344 *self.state.read().expect("Failed to acquire read lock")
1345 }
1346
1347 pub fn stats(&self) -> PipelineStats {
1349 let mut stats = PipelineStats::default();
1350
1351 for stage in &self.stages {
1353 let stage_stats = stage.stats();
1354 let stage_name = stage.name().to_string();
1355
1356 stats
1357 .stage_stats
1358 .insert(stage_name.clone(), stage_stats.clone());
1359
1360 let stage_throughput = stage_stats.avg_throughput;
1362 if stage_throughput > 0.0 && stage_throughput < stats.bottleneck_throughput {
1363 stats.bottleneck_throughput = stage_throughput;
1364 stats.bottleneck_stage = Some(stage_name);
1365 }
1366
1367 if !self
1369 .connections
1370 .iter()
1371 .any(|(_, to)| *to == self.stages.len() - 1)
1372 {
1373 stats.total_items = stage_stats.processed_items;
1374 }
1375 }
1376
1377 let mut max_uptime = 0.0;
1379 for stage_stats in stats.stage_stats.values() {
1380 if stage_stats.uptime_seconds > max_uptime {
1381 max_uptime = stage_stats.uptime_seconds;
1382 }
1383 }
1384
1385 stats.uptime_seconds = max_uptime;
1386
1387 if max_uptime > 0.0 {
1388 stats.overall_throughput = stats.total_items as f64 / max_uptime;
1389 }
1390
1391 stats
1392 }
1393
1394 pub fn lasterror(&self) -> Option<ErrorContext> {
1396 self.error_context
1397 .read()
1398 .expect("Failed to acquire read lock")
1399 .clone()
1400 }
1401
1402 pub fn is_empty(&self) -> bool {
1404 self.stages.iter().all(|stage| stage.is_empty())
1405 }
1406
1407 pub fn stage(&self, index: usize) -> Option<&dyn AnyStage> {
1409 self.stages.get(index).map(|s| s.as_ref())
1410 }
1411
1412 pub fn num_stages(&self) -> usize {
1414 self.stages.len()
1415 }
1416}
1417
1418impl Drop for Pipeline {
1419 fn drop(&mut self) {
1420 if *self.state.read().expect("Failed to acquire read lock") == StreamState::Running {
1422 let _ = self.stop();
1423 }
1424 }
1425}
1426
1427struct StageWrapper<I: Clone + Send + 'static, O: Clone + Send + 'static> {
1429 stage: PipelineStage<I, O>,
1430}
1431
1432impl<I: Clone + Send + 'static, O: Clone + Send + 'static> StageWrapper<I, O> {
1433 fn new(stage: PipelineStage<I, O>) -> Self {
1435 Self { stage }
1436 }
1437}
1438
1439impl<I: Clone + Send + 'static, O: Clone + Send + 'static> AnyStage for StageWrapper<I, O> {
1440 fn name(&self) -> &str {
1441 &self.stage.name
1442 }
1443
1444 fn start(&self) -> Result<(), CoreError> {
1445 self.stage.start()
1446 }
1447
1448 fn stop(&self) -> Result<(), CoreError> {
1449 self.stage.stop()
1450 }
1451
1452 fn state(&self) -> StreamState {
1453 self.stage.state()
1454 }
1455
1456 fn stats(&self) -> StreamStats {
1457 self.stage.stats()
1458 }
1459
1460 fn clone_box_impl(&self) -> Box<dyn AnyStage> {
1461 Box::new(self.clone())
1462 }
1463
1464 fn is_empty(&self) -> bool {
1465 self.stage
1466 .processor
1467 .lock()
1468 .expect("Failed to acquire lock")
1469 .is_empty()
1470 }
1471
1472 fn push_raw(&self, data: Box<dyn std::any::Any + Send>) -> Result<(), CoreError> {
1473 let input = match data.downcast::<Vec<I>>() {
1474 Ok(input) => *input,
1475 Err(data) => {
1476 match data.downcast::<I>() {
1478 Ok(item) => vec![*item],
1479 Err(_) => {
1480 return Err(CoreError::StreamError(
1481 ErrorContext::new(format!(
1482 "Type mismatch when pushing data to stage {}",
1483 self.name()
1484 ))
1485 .with_location(ErrorLocation::new(file!(), line!())),
1486 ));
1487 }
1488 }
1489 }
1490 };
1491
1492 self.stage
1493 .processor
1494 .lock()
1495 .expect("Failed to acquire lock")
1496 .push_batch(input)
1497 }
1498
1499 fn pop_raw(&self) -> Result<Box<dyn std::any::Any + Send>, CoreError> {
1500 let output = self
1501 .stage
1502 .processor
1503 .lock()
1504 .expect("Failed to acquire lock")
1505 .pop_batch(100)?;
1506 Ok(Box::new(output))
1507 }
1508}
1509
1510impl dyn AnyStage {
1511 fn clone_box(&self) -> Box<dyn AnyStage> {
1513 self.clone_box_impl()
1514 }
1515}
1516
1517impl<I: Clone + Send + 'static, O: Clone + Send + 'static> Clone for StageWrapper<I, O> {
1518 fn clone(&self) -> Self {
1519 let stage = PipelineStage {
1520 name: self.stage.name.clone(),
1521 processor: self.stage.processor(),
1522 parallel: self.stage.parallel,
1523 parallelism: self.stage.parallelism,
1524 };
1525
1526 Self { stage }
1527 }
1528}
1529
1530impl<A, D> StreamProcessor<ArrayBase<OwnedRepr<A>, D>, ArrayBase<OwnedRepr<A>, D>>
1532where
1533 A: Clone + Send + Default + 'static,
1534 D: Dimension + Clone + Send + 'static + RemoveAxis,
1535{
1536 pub fn newarray<F>(config: StreamConfig, processfn: F) -> Self
1538 where
1539 F: Fn(
1540 Vec<ArrayBase<OwnedRepr<A>, D>>,
1541 ) -> Result<Vec<ArrayBase<OwnedRepr<A>, D>>, CoreError>
1542 + Send
1543 + Sync
1544 + 'static,
1545 {
1546 Self::new(config, processfn)
1547 }
1548
1549 pub fn chunk_wise<F>(config: StreamConfig, chunk_size: usize, processfn: F) -> Self
1551 where
1552 F: Fn(&ArrayBase<OwnedRepr<A>, D>) -> Result<ArrayBase<OwnedRepr<A>, D>, CoreError>
1553 + Send
1554 + Sync
1555 + Clone
1556 + 'static,
1557 {
1558 let chunking_strategy = ChunkingStrategy::Fixed(chunk_size);
1559
1560 let processfn_clone = processfn.clone();
1561 let chunks_fn = move |arrays: Vec<ArrayBase<OwnedRepr<A>, D>>| -> Result<Vec<ArrayBase<OwnedRepr<A>, D>>, CoreError> {
1562 let mut results = Vec::with_capacity(arrays.len());
1563
1564 for array in arrays {
1565 let chunked = ChunkedArray::new(array, chunking_strategy);
1567
1568 let mut chunk_results = Vec::new();
1570 for chunk in chunked.get_chunks() {
1571 let result = processfn_clone(&chunk)?;
1572 chunk_results.push(result);
1573 }
1574
1575 let combined = if !chunk_results.is_empty() {
1577 if let Ok(combined_1d) = crate::ndarray::concatenate(
1579 crate::ndarray::Axis(0),
1580 &chunk_results
1581 .iter()
1582 .map(|arr| arr.view())
1583 .collect::<Vec<_>>()
1584 ) {
1585 if let Ok(reshaped) = combined_1d.into_dimensionality::<D>() {
1587 reshaped
1588 } else {
1589 chunk_results.into_iter().next().expect("Expected at least one result")
1591 }
1592 } else {
1593 chunk_results.into_iter().next().expect("Expected at least one result")
1596 }
1597 } else {
1598 return Err(CoreError::ValueError(ErrorContext::new(
1599 "No chunks to process".to_string(),
1600 )));
1601 };
1602 results.push(combined);
1603 }
1604
1605 Ok(results)
1606 };
1607
1608 Self::new(config, chunks_fn)
1609 }
1610
1611 #[cfg(feature = "parallel")]
1613 pub fn parallel<F>(config: StreamConfig, processfn: F) -> Self
1614 where
1615 F: Fn(&ArrayBase<OwnedRepr<A>, D>) -> Result<ArrayBase<OwnedRepr<A>, D>, CoreError>
1616 + Send
1617 + Sync
1618 + Clone
1619 + 'static,
1620 A: Send + Sync,
1621 {
1622 let workers = config.workers.unwrap_or_else(num_cpus::get);
1623
1624 let parallel_fn = move |arrays: Vec<ArrayBase<OwnedRepr<A>, D>>| -> Result<Vec<ArrayBase<OwnedRepr<A>, D>>, CoreError> {
1625 use crate::parallel_ops::*;
1627 let pool = ThreadPoolBuilder::new()
1628 .num_threads(workers)
1629 .build()
1630 .map_err(|e| CoreError::StreamError(
1631 ErrorContext::new(format!("{e}"))
1632 .with_location(ErrorLocation::new(file!(), line!()))
1633 ))?;
1634
1635 let processfn_clone = processfn.clone();
1636 pool.install(|| {
1637 let results: Result<Vec<_>, _> = arrays
1638 .par_iter()
1639 .map(|array| processfn_clone(array))
1640 .collect();
1641
1642 results
1643 })
1644 };
1645
1646 Self::new(config, parallel_fn)
1647 }
1648}
1649
1650#[allow(dead_code)]
1652pub fn create_stream_processor<T, U, F>(processfn: F) -> StreamProcessor<T, U>
1653where
1654 T: Clone + Send + 'static,
1655 U: Clone + Send + 'static,
1656 F: Fn(Vec<T>) -> Result<Vec<U>, CoreError> + Send + Sync + 'static,
1657{
1658 StreamProcessor::new(StreamConfig::default(), processfn)
1659}
1660
1661#[allow(dead_code)]
1663pub fn create_pipeline(name: &str) -> PipelineBuilder {
1664 PipelineBuilder::new(name.to_string())
1665}
1666
1667pub trait StreamError {
1669 #[allow(dead_code)]
1671 fn to_streamerror(self, message: &str) -> CoreError;
1672}
1673
1674impl<T> StreamError for std::result::Result<T, CoreError> {
1675 fn to_streamerror(self, message: &str) -> CoreError {
1676 match self {
1677 Ok(_) => CoreError::StreamError(
1678 ErrorContext::new(message.to_string())
1679 .with_location(ErrorLocation::new(file!(), line!())),
1680 ),
1681 Err(e) => CoreError::StreamError(
1682 ErrorContext::new(format!("{message}, {e}"))
1683 .with_location(ErrorLocation::new(file!(), line!())),
1684 ),
1685 }
1686 }
1687}
1688
1689impl CoreError {
1691 pub fn message(message: &str) -> Self {
1693 CoreError::EndOfStream(
1694 ErrorContext::new(message.to_string())
1695 .with_location(ErrorLocation::new(file!(), line!())),
1696 )
1697 }
1698
1699 pub fn message_2(message: &str) -> Self {
1701 CoreError::StreamError(
1702 ErrorContext::new(message.to_string())
1703 .with_location(ErrorLocation::new(file!(), line!())),
1704 )
1705 }
1706
1707 pub fn message_3(message: &str) -> Self {
1709 CoreError::TimeoutError(
1710 ErrorContext::new(message.to_string())
1711 .with_location(ErrorLocation::new(file!(), line!())),
1712 )
1713 }
1714}