scirs2_core/memory_efficient/
streaming.rs

1//! Streaming data processors for continuous data flows
2//!
3//! This module provides utilities for processing continuous data streams efficiently:
4//!
5//! - Stream processing with minimal memory overhead
6//! - Pipeline-based data processing for complex transformations
7//! - Backpressure handling for rate mismatches
8//! - Buffer management for smooth data flow
9//! - Fault tolerance with resume capabilities
10
11use crate::error::{CoreError, ErrorContext, ErrorLocation};
12use crate::memory_efficient::chunked::{ChunkedArray, ChunkingStrategy};
13use crate::memory_efficient::prefetch::PrefetchConfig;
14use ::ndarray::{ArrayBase, Dimension, OwnedRepr, RemoveAxis};
15use std::collections::{BTreeMap, VecDeque};
16use std::sync::{Arc, Condvar, Mutex, RwLock};
17use std::thread::{self, JoinHandle};
18use std::time::{Duration, Instant};
19
20/// Type alias for the processing function
21type ProcessFn<T, U> = Arc<dyn Fn(Vec<T>) -> Result<Vec<U>, CoreError> + Send + Sync>;
22
23/// Stream processing mode
24#[derive(Debug, Clone, Copy, PartialEq, Eq)]
25pub enum StreamMode {
26    /// Process data as it comes, with no buffering
27    Immediate,
28    /// Buffer data up to a certain size before processing
29    Buffered,
30    /// Adaptive processing based on system load and data rate
31    Adaptive,
32    /// Use a sliding window of data for processing
33    SlidingWindow,
34}
35
36/// Input source for a data stream
37#[derive(Debug, Clone, Copy, PartialEq, Eq)]
38pub enum StreamSource {
39    /// File input (memory mapped)
40    File,
41    /// Network socket input
42    Network,
43    /// Real-time sensor data
44    Sensor,
45    /// Generated data (simulation, etc.)
46    Generated,
47    /// Another stream processor
48    Stream,
49}
50
51/// Stream processor state
52#[derive(Debug, Clone, Copy, PartialEq, Eq)]
53pub enum StreamState {
54    /// Stream is initialized but not started
55    Initialized,
56    /// Stream is currently running
57    Running,
58    /// Stream is paused (can be resumed)
59    Paused,
60    /// Stream has completed
61    Completed,
62    /// Stream has encountered an error
63    Error,
64}
65
66/// Stream processor configuration
67#[derive(Debug, Clone)]
68pub struct StreamConfig {
69    /// Processing mode
70    pub mode: StreamMode,
71    /// Buffer size in elements
72    pub buffersize: usize,
73    /// Maximum batch size for processing
74    pub max_batch_size: usize,
75    /// Minimum batch size for processing
76    pub min_batch_size: usize,
77    /// Chunk size for chunked processing
78    pub chunk_size: usize,
79    /// Whether to use parallel processing
80    pub parallel: bool,
81    /// Number of worker threads for parallel processing
82    pub workers: Option<usize>,
83    /// Maximum processing rate (items per second, 0 for unlimited)
84    pub rate_limit: usize,
85    /// Timeout for waiting for data (milliseconds, 0 for none)
86    pub timeout_ms: u64,
87    /// Whether to enable prefetching
88    pub enable_prefetch: bool,
89    /// Prefetch configuration
90    pub prefetch_config: Option<PrefetchConfig>,
91    /// Whether to enable backpressure handling
92    pub enable_backpressure: bool,
93    /// Window size for sliding window mode
94    pub windowsize: usize,
95    /// Window stride for sliding window mode
96    pub window_stride: usize,
97}
98
99impl Default for StreamConfig {
100    fn default() -> Self {
101        Self {
102            mode: StreamMode::Buffered,
103            buffersize: 1024 * 1024, // 1M elements
104            max_batch_size: 65536,   // 64K elements
105            min_batch_size: 1024,    // 1K elements
106            chunk_size: 1024,        // 1K elements
107            parallel: true,
108            workers: None,
109            rate_limit: 0,
110            timeout_ms: 1000,
111            enable_prefetch: true,
112            prefetch_config: None,
113            enable_backpressure: true,
114            windowsize: 1024,
115            window_stride: 256,
116        }
117    }
118}
119
120/// Builder for stream processor configuration
121#[derive(Debug, Clone, Default)]
122pub struct StreamConfigBuilder {
123    config: StreamConfig,
124}
125
126impl StreamConfigBuilder {
127    /// Create a new stream configuration builder with default values
128    pub fn new() -> Self {
129        Self::default()
130    }
131
132    /// Set the processing mode
133    pub const fn mode(mut self, mode: StreamMode) -> Self {
134        self.config.mode = mode;
135        self
136    }
137
138    /// Set the buffer size
139    pub const fn buffersize(mut self, size: usize) -> Self {
140        self.config.buffersize = size;
141        self
142    }
143
144    /// Set the maximum batch size
145    pub const fn max_batch_size(mut self, size: usize) -> Self {
146        self.config.max_batch_size = size;
147        self
148    }
149
150    /// Set the minimum batch size
151    pub const fn min_batch_size(mut self, size: usize) -> Self {
152        self.config.min_batch_size = size;
153        self
154    }
155
156    /// Set the chunk size
157    pub const fn chunk_size(mut self, size: usize) -> Self {
158        self.config.chunk_size = size;
159        self
160    }
161
162    /// Enable or disable parallel processing
163    pub const fn parallel(mut self, enable: bool) -> Self {
164        self.config.parallel = enable;
165        self
166    }
167
168    /// Set the number of worker threads
169    pub const fn workers(mut self, workers: Option<usize>) -> Self {
170        self.config.workers = workers;
171        self
172    }
173
174    /// Set the rate limit
175    pub const fn rate_limit(mut self, limit: usize) -> Self {
176        self.config.rate_limit = limit;
177        self
178    }
179
180    /// Set the timeout
181    pub const fn timeout_ms(mut self, timeout: u64) -> Self {
182        self.config.timeout_ms = timeout;
183        self
184    }
185
186    /// Enable or disable prefetching
187    pub const fn enable_prefetch(mut self, enable: bool) -> Self {
188        self.config.enable_prefetch = enable;
189        self
190    }
191
192    /// Set the prefetch configuration
193    pub const fn prefetch_config(mut self, config: Option<PrefetchConfig>) -> Self {
194        self.config.prefetch_config = config;
195        self
196    }
197
198    /// Enable or disable backpressure handling
199    pub const fn enable_backpressure(mut self, enable: bool) -> Self {
200        self.config.enable_backpressure = enable;
201        self
202    }
203
204    /// Set the window size for sliding window mode
205    pub const fn windowsize(mut self, size: usize) -> Self {
206        self.config.windowsize = size;
207        self
208    }
209
210    /// Set the window stride for sliding window mode
211    pub const fn window_stride(mut self, stride: usize) -> Self {
212        self.config.window_stride = stride;
213        self
214    }
215
216    /// Build the configuration
217    pub fn build(self) -> StreamConfig {
218        self.config
219    }
220}
221
222/// Stream processor statistics
223#[derive(Debug, Clone)]
224pub struct StreamStats {
225    /// Number of items processed
226    pub processed_items: usize,
227    /// Number of batches processed
228    pub processed_batches: usize,
229    /// Average batch size
230    pub avg_batch_size: f64,
231    /// Average processing time per batch (milliseconds)
232    pub avg_batch_time_ms: f64,
233    /// Average throughput (items per second)
234    pub avg_throughput: f64,
235    /// Stream uptime in seconds
236    pub uptime_seconds: f64,
237    /// Number of times backpressure was applied
238    pub backpressure_count: usize,
239    /// Buffer high water mark (maximum fill level)
240    pub buffer_high_water_mark: usize,
241    /// Error count
242    pub error_count: usize,
243    /// Last error message
244    pub lasterror: Option<String>,
245}
246
247impl Default for StreamStats {
248    fn default() -> Self {
249        Self {
250            processed_items: 0,
251            processed_batches: 0,
252            avg_batch_size: 0.0,
253            avg_batch_time_ms: 0.0,
254            avg_throughput: 0.0,
255            uptime_seconds: 0.0,
256            backpressure_count: 0,
257            buffer_high_water_mark: 0,
258            error_count: 0,
259            lasterror: None,
260        }
261    }
262}
263
264/// Stream input buffer for data queuing
265#[derive(Debug)]
266struct StreamBuffer<T: Clone + Send + 'static> {
267    /// Buffer data queue
268    data: VecDeque<T>,
269    /// Maximum buffer size
270    maxsize: usize,
271    /// Mutex for buffer access
272    mutex: Mutex<()>,
273    /// Condition variable for buffer synchronization
274    condvar: Condvar,
275    /// Whether the stream is closed
276    closed: bool,
277}
278
279impl<T: Clone + Send + 'static> StreamBuffer<T> {
280    /// Create a new stream buffer
281    fn new(maxsize: usize) -> Self {
282        Self {
283            data: VecDeque::with_capacity(maxsize),
284            maxsize,
285            mutex: Mutex::new(()),
286            condvar: Condvar::new(),
287            closed: false,
288        }
289    }
290
291    /// Add an item to the buffer
292    fn push(&mut self, item: T) -> Result<(), CoreError> {
293        let mut guard = self.mutex.lock().expect("Failed to acquire lock");
294
295        // Check if the buffer is closed
296        if self.closed {
297            return Err(CoreError::StreamError(
298                ErrorContext::new("Stream is closed".to_string())
299                    .with_location(ErrorLocation::new(file!(), line!())),
300            ));
301        }
302
303        // Wait until there's space in the buffer
304        while self.data.len() >= self.maxsize {
305            guard = self
306                .condvar
307                .wait(guard)
308                .expect("Condition variable wait failed");
309
310            // Check if the buffer was closed while waiting
311            if self.closed {
312                return Err(CoreError::StreamError(
313                    ErrorContext::new("Stream is closed".to_string())
314                        .with_location(ErrorLocation::new(file!(), line!())),
315                ));
316            }
317        }
318
319        // Add the item to the buffer
320        self.data.push_back(item);
321
322        // Notify any waiting consumers
323        self.condvar.notify_one();
324
325        Ok(())
326    }
327
328    /// Add multiple items to the buffer
329    fn push_batch(&mut self, items: Vec<T>) -> Result<(), CoreError> {
330        let mut guard = self.mutex.lock().expect("Failed to acquire lock");
331
332        // Check if the buffer is closed
333        if self.closed {
334            return Err(CoreError::StreamError(
335                ErrorContext::new("Stream is closed".to_string())
336                    .with_location(ErrorLocation::new(file!(), line!())),
337            ));
338        }
339
340        // Wait until there's space in the buffer
341        while self.data.len() + items.len() > self.maxsize {
342            guard = self
343                .condvar
344                .wait(guard)
345                .expect("Condition variable wait failed");
346
347            // Check if the buffer was closed while waiting
348            if self.closed {
349                return Err(CoreError::StreamError(
350                    ErrorContext::new("Stream is closed".to_string())
351                        .with_location(ErrorLocation::new(file!(), line!())),
352                ));
353            }
354        }
355
356        // Add the items to the buffer
357        self.data.extend(items);
358
359        // Notify any waiting consumers
360        self.condvar.notify_one();
361
362        Ok(())
363    }
364
365    /// Get a batch of items from the buffer
366    fn pop_batch(&mut self, max_batch_size: usize, timeoutms: u64) -> Result<Vec<T>, CoreError> {
367        let mut guard = self.mutex.lock().expect("Failed to acquire lock");
368
369        // Wait until there are items in the buffer
370        if self.data.is_empty() && !self.closed {
371            if timeoutms > 0 {
372                let timeout = Duration::from_millis(timeoutms);
373                let result = self.condvar.wait_timeout(guard, timeout);
374
375                match result {
376                    Ok((g, timeout_result)) => {
377                        #[allow(unused_assignments)]
378                        {
379                            guard = g;
380                        }
381
382                        // Check if the timeout occurred
383                        if timeout_result.timed_out() && self.data.is_empty() {
384                            return Err(CoreError::TimeoutError(
385                                ErrorContext::new("Timeout waiting for data".to_string())
386                                    .with_location(ErrorLocation::new(file!(), line!())),
387                            ));
388                        }
389                    }
390                    Err(_) => {
391                        return Err(CoreError::StreamError(
392                            ErrorContext::new("Error waiting for data".to_string())
393                                .with_location(ErrorLocation::new(file!(), line!())),
394                        ));
395                    }
396                }
397            } else {
398                // No timeout, wait indefinitely
399                #[allow(unused_assignments)]
400                {
401                    guard = self
402                        .condvar
403                        .wait(guard)
404                        .expect("Condition variable wait failed");
405                }
406            }
407        }
408
409        // Check if the buffer is closed and empty
410        if self.data.is_empty() && self.closed {
411            return Err(CoreError::EndOfStream(
412                ErrorContext::new("End of stream".to_string())
413                    .with_location(ErrorLocation::new(file!(), line!())),
414            ));
415        }
416
417        // Get the items (up to max_batch_size)
418        let batch_size = std::cmp::min(max_batch_size, self.data.len());
419        let mut batch = Vec::with_capacity(batch_size);
420
421        for _ in 0..batch_size {
422            if let Some(item) = self.data.pop_front() {
423                batch.push(item);
424            } else {
425                break;
426            }
427        }
428
429        // Notify any waiting producers
430        self.condvar.notify_one();
431
432        Ok(batch)
433    }
434
435    /// Get the number of items in the buffer
436    fn len(&self) -> usize {
437        let _guard = self.mutex.lock().expect("Failed to acquire lock");
438        self.data.len()
439    }
440
441    /// Check if the buffer is empty
442    fn is_empty(&self) -> bool {
443        let _guard = self.mutex.lock().expect("Failed to acquire lock");
444        self.data.is_empty()
445    }
446
447    /// Close the buffer
448    fn close(&mut self) {
449        let _guard = self.mutex.lock().expect("Failed to acquire lock");
450        self.closed = true;
451        self.condvar.notify_all();
452    }
453
454    /// Check if the buffer is closed
455    #[allow(dead_code)]
456    fn is_closed(&self) -> bool {
457        let _guard = self.mutex.lock().expect("Failed to acquire lock");
458        self.closed
459    }
460
461    /// Clear the buffer
462    fn clear(&mut self) {
463        let _guard = self.mutex.lock().expect("Failed to acquire lock");
464        self.data.clear();
465        self.condvar.notify_all();
466    }
467}
468
469/// Stream processor for continuous data flows
470pub struct StreamProcessor<T: Clone + Send + 'static, U: Clone + Send + 'static> {
471    /// Configuration for the stream processor
472    config: StreamConfig,
473    /// Input buffer
474    input_buffer: Arc<Mutex<StreamBuffer<T>>>,
475    /// Processing function
476    processfn: ProcessFn<T, U>,
477    /// Output buffer
478    output_buffer: Arc<Mutex<StreamBuffer<U>>>,
479    /// Current state of the stream processor
480    state: Arc<RwLock<StreamState>>,
481    /// Statistics for the stream processor
482    stats: Arc<RwLock<StreamStats>>,
483    /// Worker thread handle
484    worker_thread: Option<JoinHandle<()>>,
485    /// Start time of the stream processor
486    start_time: Arc<RwLock<Option<Instant>>>,
487}
488
489impl<T, U> std::fmt::Debug for StreamProcessor<T, U>
490where
491    T: Clone + Send + 'static,
492    U: Clone + Send + 'static,
493{
494    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
495        f.debug_struct("StreamProcessor")
496            .field("config", &self.config)
497            .field("state", &self.state)
498            .field("stats", &self.stats)
499            .field("worker_thread", &self.worker_thread.is_some())
500            .field("start_time", &self.start_time)
501            .finish_non_exhaustive()
502    }
503}
504
505impl<T: Clone + Send + 'static, U: Clone + Send + 'static> StreamProcessor<T, U> {
506    /// Create a new stream processor
507    pub fn new<F>(config: StreamConfig, processfn: F) -> Self
508    where
509        F: Fn(Vec<T>) -> Result<Vec<U>, CoreError> + Send + Sync + 'static,
510    {
511        let input_buffer = Arc::new(Mutex::new(StreamBuffer::new(config.buffersize)));
512        let output_buffer = Arc::new(Mutex::new(StreamBuffer::new(config.buffersize)));
513
514        Self {
515            config,
516            input_buffer,
517            processfn: Arc::new(processfn),
518            output_buffer,
519            state: Arc::new(RwLock::new(StreamState::Initialized)),
520            stats: Arc::new(RwLock::new(StreamStats::default())),
521            worker_thread: None,
522            start_time: Arc::new(RwLock::new(None)),
523        }
524    }
525
526    /// Start the stream processor
527    pub fn start(&mut self) -> Result<(), CoreError> {
528        let mut state = self.state.write().expect("Failed to acquire write lock");
529
530        // Check if the stream is already running
531        if *state == StreamState::Running {
532            return Err(CoreError::StreamError(
533                ErrorContext::new("Stream already running".to_string())
534                    .with_location(ErrorLocation::new(file!(), line!())),
535            ));
536        }
537
538        // Update state
539        *state = StreamState::Running;
540
541        // Set start time
542        let mut start_time = self
543            .start_time
544            .write()
545            .expect("Failed to acquire write lock");
546        *start_time = Some(Instant::now());
547
548        // Create worker thread
549        let input_buffer = self.input_buffer.clone();
550        let output_buffer = self.output_buffer.clone();
551        let processfn = self.processfn.clone();
552        let config = self.config.clone();
553        let state = self.state.clone();
554        let stats = self.stats.clone();
555        let start_time_clone = self.start_time.clone();
556
557        let worker = thread::spawn(move || {
558            Self::worker_loop(
559                input_buffer,
560                output_buffer,
561                processfn,
562                config,
563                state,
564                stats,
565                start_time_clone,
566            );
567        });
568
569        self.worker_thread = Some(worker);
570
571        Ok(())
572    }
573
574    /// Worker loop for processing data
575    fn worker_loop(
576        input_buffer: Arc<Mutex<StreamBuffer<T>>>,
577        output_buffer: Arc<Mutex<StreamBuffer<U>>>,
578        processfn: ProcessFn<T, U>,
579        config: StreamConfig,
580        state: Arc<RwLock<StreamState>>,
581        stats: Arc<RwLock<StreamStats>>,
582        start_time: Arc<RwLock<Option<Instant>>>,
583    ) {
584        // Setup rate limiting if needed
585        let rate_limit = config.rate_limit;
586        let mut last_batch_time = Instant::now();
587        let mut batch_window = VecDeque::new();
588
589        // Processing loop
590        loop {
591            // Check if we should continue
592            {
593                let current_state = state.read().expect("Failed to acquire read lock");
594                if *current_state != StreamState::Running {
595                    break;
596                }
597            }
598
599            // Rate limiting
600            if rate_limit > 0 {
601                // Calculate the minimum _time per batch
602                let min_time_per_batch =
603                    Duration::from_secs_f64(config.min_batch_size as f64 / rate_limit as f64);
604
605                // Wait if necessary
606                let elapsed = last_batch_time.elapsed();
607                if elapsed < min_time_per_batch {
608                    thread::sleep(min_time_per_batch - elapsed);
609                }
610            }
611
612            // Determine batch size based on the mode
613            let batch_size = match config.mode {
614                StreamMode::Immediate => 1,
615                StreamMode::Buffered => config.max_batch_size,
616                StreamMode::Adaptive => {
617                    // Simple adaptive batch sizing based on processing time
618                    let stats_guard = stats.read().expect("Failed to acquire read lock");
619                    let avg_time = stats_guard.avg_batch_time_ms;
620
621                    if avg_time < 10.0 {
622                        // Processing is fast, use larger batches
623                        config.max_batch_size
624                    } else if avg_time < 50.0 {
625                        // Medium processing time, use medium batches
626                        (config.max_batch_size + config.min_batch_size) / 2
627                    } else {
628                        // Slow processing, use smaller batches
629                        config.min_batch_size
630                    }
631                }
632                StreamMode::SlidingWindow => config.windowsize,
633            };
634
635            // Get a batch of data from the input buffer
636            let input_batch = match input_buffer
637                .lock()
638                .expect("Failed to acquire lock")
639                .pop_batch(batch_size, config.timeout_ms)
640            {
641                Ok(batch) => batch,
642                Err(err) => {
643                    match err {
644                        CoreError::EndOfStream(_) => {
645                            // End of stream, update state and exit
646                            let mut current_state =
647                                state.write().expect("Failed to acquire write lock");
648                            *current_state = StreamState::Completed;
649                            break;
650                        }
651                        CoreError::TimeoutError(_) => {
652                            // Timeout, continue
653                            continue;
654                        }
655                        _ => {
656                            // Other error, update stats and continue
657                            let mut stats_guard =
658                                stats.write().expect("Failed to acquire write lock");
659                            stats_guard.error_count += 1;
660                            stats_guard.lasterror = Some(err.to_string());
661                            continue;
662                        }
663                    }
664                }
665            };
666
667            // Check if the batch is empty
668            if input_batch.is_empty() {
669                continue;
670            }
671
672            // For sliding window mode, manage the window
673            let process_input = if config.mode == StreamMode::SlidingWindow {
674                if batch_window.len() < config.windowsize {
675                    // Still filling the initial window
676                    batch_window.extend(input_batch);
677
678                    if batch_window.len() < config.windowsize {
679                        // Not enough data for a full window yet
680                        continue;
681                    }
682
683                    // We now have a full window
684                    batch_window.make_contiguous().to_vec()
685                } else {
686                    // Slide the window
687                    let stride = std::cmp::min(config.window_stride, input_batch.len());
688
689                    // Remove old elements
690                    for _ in 0..stride {
691                        batch_window.pop_front();
692                    }
693
694                    // Add new elements
695                    batch_window.extend(input_batch);
696
697                    // Return the window for processing
698                    batch_window.make_contiguous().to_vec()
699                }
700            } else {
701                // For non-window modes, just use the batch directly
702                input_batch
703            };
704
705            // Process the batch
706            let process_result = {
707                let batch_start_time = Instant::now();
708                let result = processfn(process_input.clone());
709
710                // Update processing statistics
711                let mut stats_guard = stats.write().expect("Failed to acquire write lock");
712                stats_guard.processed_batches += 1;
713                stats_guard.processed_items += process_input.len();
714
715                // Update average batch size
716                let total_items = stats_guard.processed_items;
717                let total_batches = stats_guard.processed_batches;
718                stats_guard.avg_batch_size = total_items as f64 / total_batches as f64;
719
720                // Update processing time
721                let batch_time = batch_start_time.elapsed().as_millis() as f64;
722                stats_guard.avg_batch_time_ms =
723                    (stats_guard.avg_batch_time_ms * (total_batches - 1) as f64 + batch_time)
724                        / total_batches as f64;
725
726                // Update throughput
727                if let Some(start) = *start_time.read().expect("Failed to acquire read lock") {
728                    let uptime_seconds = start.elapsed().as_secs_f64();
729                    stats_guard.uptime_seconds = uptime_seconds;
730                    stats_guard.avg_throughput = total_items as f64 / uptime_seconds;
731                }
732
733                // Update buffer statistics
734                let buffer_len = input_buffer.lock().expect("Failed to acquire lock").len();
735                if buffer_len > stats_guard.buffer_high_water_mark {
736                    stats_guard.buffer_high_water_mark = buffer_len;
737                }
738
739                result
740            };
741
742            // Handle the processing result
743            match process_result {
744                Ok(output_batch) => {
745                    // Send the output to the output _buffer
746                    if !output_batch.is_empty() {
747                        match output_buffer
748                            .lock()
749                            .expect("Failed to acquire lock")
750                            .push_batch(output_batch)
751                        {
752                            Ok(_) => {}
753                            Err(err) => {
754                                // Error sending output, update stats
755                                let mut stats_guard =
756                                    stats.write().expect("Failed to acquire write lock");
757                                stats_guard.error_count += 1;
758                                stats_guard.lasterror = Some(err.to_string());
759                            }
760                        }
761                    }
762                }
763                Err(err) => {
764                    // Processing error, update stats
765                    let mut stats_guard = stats.write().expect("Failed to acquire write lock");
766                    stats_guard.error_count += 1;
767                    stats_guard.lasterror = Some(err.to_string());
768                }
769            }
770
771            // Update rate limiting info
772            last_batch_time = Instant::now();
773        }
774    }
775
776    /// Stop the stream processor
777    pub fn stop(&mut self) -> Result<(), CoreError> {
778        let mut state = self.state.write().expect("Failed to acquire write lock");
779
780        // Check if the stream is running
781        if *state != StreamState::Running {
782            return Err(CoreError::StreamError(
783                ErrorContext::new("Stream not running".to_string())
784                    .with_location(ErrorLocation::new(file!(), line!())),
785            ));
786        }
787
788        // Update state
789        *state = StreamState::Paused;
790
791        // Close the input buffer
792        self.input_buffer
793            .lock()
794            .expect("Failed to acquire lock")
795            .close();
796
797        // Wait for the worker thread to finish
798        if let Some(worker) = self.worker_thread.take() {
799            match worker.join() {
800                Ok(_) => {}
801                Err(_) => {
802                    return Err(CoreError::StreamError(
803                        ErrorContext::new("Error joining worker thread".to_string())
804                            .with_location(ErrorLocation::new(file!(), line!())),
805                    ));
806                }
807            }
808        }
809
810        Ok(())
811    }
812
813    /// Push data to the stream processor
814    pub fn push(&self, data: T) -> Result<(), CoreError> {
815        // Check if the stream is running
816        let state = self.state.read().expect("Failed to acquire read lock");
817        if *state != StreamState::Running {
818            return Err(CoreError::StreamError(
819                ErrorContext::new("Stream not running".to_string())
820                    .with_location(ErrorLocation::new(file!(), line!())),
821            ));
822        }
823
824        // Push data to the input buffer
825        self.input_buffer
826            .lock()
827            .expect("Failed to acquire lock")
828            .push(data)
829    }
830
831    /// Push a batch of data to the stream processor
832    pub fn push_batch(&self, data: Vec<T>) -> Result<(), CoreError> {
833        // Check if the stream is running
834        let state = self.state.read().expect("Failed to acquire read lock");
835        if *state != StreamState::Running {
836            return Err(CoreError::StreamError(
837                ErrorContext::new("Stream not running".to_string())
838                    .with_location(ErrorLocation::new(file!(), line!())),
839            ));
840        }
841
842        // Push data to the input buffer
843        self.input_buffer
844            .lock()
845            .expect("Failed to acquire lock")
846            .push_batch(data)
847    }
848
849    /// Pop processed data from the stream processor
850    pub fn pop(&self) -> Result<U, CoreError> {
851        // Check if the stream is running or completed
852        let state = self.state.read().expect("Failed to acquire read lock");
853        if *state != StreamState::Running && *state != StreamState::Completed {
854            return Err(CoreError::StreamError(
855                ErrorContext::new("Stream not running or completed".to_string())
856                    .with_location(ErrorLocation::new(file!(), line!())),
857            ));
858        }
859
860        // Pop data from the output buffer
861        let result = self
862            .output_buffer
863            .lock()
864            .expect("Failed to acquire lock")
865            .pop_batch(1, self.config.timeout_ms)?;
866
867        if result.is_empty() {
868            Err(CoreError::TimeoutError(
869                ErrorContext::new("Timeout waiting for data".to_string())
870                    .with_location(ErrorLocation::new(file!(), line!())),
871            ))
872        } else {
873            Ok(result[0].clone())
874        }
875    }
876
877    /// Pop a batch of processed data from the stream processor
878    pub fn pop_batch(&self, batchsize: usize) -> Result<Vec<U>, CoreError> {
879        // Check if the stream is running or completed
880        let state = self.state.read().expect("Failed to acquire read lock");
881        if *state != StreamState::Running && *state != StreamState::Completed {
882            return Err(CoreError::StreamError(
883                ErrorContext::new("Stream not running or completed".to_string())
884                    .with_location(ErrorLocation::new(file!(), line!())),
885            ));
886        }
887
888        // Pop data from the output buffer
889        self.output_buffer
890            .lock()
891            .expect("Failed to acquire lock")
892            .pop_batch(batchsize, self.config.timeout_ms)
893    }
894
895    /// Get the current state of the stream processor
896    pub fn state(&self) -> StreamState {
897        *self.state.read().expect("Failed to acquire read lock")
898    }
899
900    /// Get the statistics for the stream processor
901    pub fn stats(&self) -> StreamStats {
902        self.stats
903            .read()
904            .expect("Failed to acquire read lock")
905            .clone()
906    }
907
908    /// Check if the stream is empty
909    pub fn is_empty(&self) -> bool {
910        self.input_buffer
911            .lock()
912            .expect("Failed to acquire lock")
913            .is_empty()
914            && self
915                .output_buffer
916                .lock()
917                .expect("Failed to acquire lock")
918                .is_empty()
919    }
920
921    /// Clear the stream buffers
922    pub fn clear(&self) -> Result<(), CoreError> {
923        // Check if the stream is not running
924        let state = self.state.read().expect("Failed to acquire read lock");
925        if *state == StreamState::Running {
926            return Err(CoreError::StreamError(
927                ErrorContext::new("Cannot clear running stream".to_string())
928                    .with_location(ErrorLocation::new(file!(), line!())),
929            ));
930        }
931
932        // Clear the buffers
933        self.input_buffer
934            .lock()
935            .expect("Failed to acquire lock")
936            .clear();
937        self.output_buffer
938            .lock()
939            .expect("Failed to acquire lock")
940            .clear();
941
942        Ok(())
943    }
944}
945
946impl<T: Clone + Send + 'static, U: Clone + Send + 'static> Drop for StreamProcessor<T, U> {
947    fn drop(&mut self) {
948        // Stop the stream if it's running
949        if *self.state.read().expect("Failed to acquire read lock") == StreamState::Running {
950            let _ = self.stop();
951        }
952    }
953}
954
955/// A stage in a stream processing pipeline
956#[derive(Debug)]
957pub struct PipelineStage<I: Clone + Send + 'static, O: Clone + Send + 'static> {
958    /// Name of the stage
959    pub name: String,
960    /// Stream processor for this stage
961    processor: Arc<Mutex<StreamProcessor<I, O>>>,
962    /// Whether this stage is parallel
963    pub parallel: bool,
964    /// Number of parallel instances
965    pub parallelism: usize,
966}
967
968impl<I: Clone + Send + 'static, O: Clone + Send + 'static> PipelineStage<I, O> {
969    /// Create a new pipeline stage
970    pub fn new<F>(
971        name: String,
972        config: StreamConfig,
973        processfn: F,
974        parallel: bool,
975        parallelism: usize,
976    ) -> Self
977    where
978        F: Fn(Vec<I>) -> Result<Vec<O>, CoreError> + Send + Sync + Clone + 'static,
979    {
980        let processor = StreamProcessor::new(config, processfn);
981
982        Self {
983            name,
984            processor: Arc::new(Mutex::new(processor)),
985            parallel,
986            parallelism,
987        }
988    }
989
990    /// Get the processor for this stage
991    pub fn processor(&self) -> Arc<Mutex<StreamProcessor<I, O>>> {
992        self.processor.clone()
993    }
994
995    /// Start the stage
996    pub fn start(&self) -> Result<(), CoreError> {
997        self.processor
998            .lock()
999            .expect("Failed to acquire lock")
1000            .start()
1001    }
1002
1003    /// Stop the stage
1004    pub fn stop(&self) -> Result<(), CoreError> {
1005        self.processor
1006            .lock()
1007            .expect("Failed to acquire lock")
1008            .stop()
1009    }
1010
1011    /// Get the state of the stage
1012    pub fn state(&self) -> StreamState {
1013        self.processor
1014            .lock()
1015            .expect("Failed to acquire lock")
1016            .state()
1017    }
1018
1019    /// Get the statistics for the stage
1020    pub fn stats(&self) -> StreamStats {
1021        self.processor
1022            .lock()
1023            .expect("Failed to acquire lock")
1024            .stats()
1025    }
1026}
1027
1028/// Stream processing pipeline
1029pub struct Pipeline {
1030    /// Name of the pipeline
1031    pub name: String,
1032    /// Stages in the pipeline
1033    stages: Vec<Box<dyn AnyStage>>,
1034    /// Connections between stages
1035    connections: Vec<(usize, usize)>, // (from_stage, to_stage)
1036    /// Worker threads for the pipeline
1037    workers: Vec<JoinHandle<()>>,
1038    /// Pipeline state
1039    state: Arc<RwLock<StreamState>>,
1040    /// Pipeline statistics
1041    #[allow(dead_code)]
1042    stats: Arc<RwLock<PipelineStats>>,
1043    /// Error context for the pipeline
1044    error_context: Arc<RwLock<Option<ErrorContext>>>,
1045}
1046
1047/// Pipeline statistics
1048#[derive(Debug, Clone)]
1049pub struct PipelineStats {
1050    /// Statistics for each stage
1051    pub stage_stats: BTreeMap<String, StreamStats>,
1052    /// Total items processed
1053    pub total_items: usize,
1054    /// Pipeline uptime in seconds
1055    pub uptime_seconds: f64,
1056    /// Overall throughput (items per second)
1057    pub overall_throughput: f64,
1058    /// Bottleneck stage (slowest stage)
1059    pub bottleneck_stage: Option<String>,
1060    /// Bottleneck throughput (items per second)
1061    pub bottleneck_throughput: f64,
1062}
1063
1064impl Default for PipelineStats {
1065    fn default() -> Self {
1066        Self {
1067            stage_stats: BTreeMap::new(),
1068            total_items: 0,
1069            uptime_seconds: 0.0,
1070            overall_throughput: 0.0,
1071            bottleneck_stage: None,
1072            bottleneck_throughput: f64::MAX,
1073        }
1074    }
1075}
1076
1077/// Trait for pipeline stages of any type
1078pub trait AnyStage: Send + Sync {
1079    /// Get the name of the stage
1080    fn name(&self) -> &str;
1081    /// Start the stage
1082    fn start(&self) -> Result<(), CoreError>;
1083    /// Stop the stage
1084    fn stop(&self) -> Result<(), CoreError>;
1085    /// Get the state of the stage
1086    fn state(&self) -> StreamState;
1087    /// Get the statistics for the stage
1088    fn stats(&self) -> StreamStats;
1089    /// Check if the stage is empty
1090    fn is_empty(&self) -> bool;
1091    /// Push raw data to the stage
1092    fn push_raw(&self, data: Box<dyn std::any::Any + Send>) -> Result<(), CoreError>;
1093    /// Pop raw data from the stage
1094    fn pop_raw(&self) -> Result<Box<dyn std::any::Any + Send>, CoreError>;
1095    /// Clone the stage into a new Box
1096    fn clone_box_impl(&self) -> Box<dyn AnyStage>;
1097}
1098
1099/// Pipeline builder
1100pub struct PipelineBuilder {
1101    /// Name of the pipeline
1102    name: String,
1103    /// Stages in the pipeline
1104    stages: Vec<Box<dyn AnyStage>>,
1105    /// Connections between stages
1106    connections: Vec<(usize, usize)>, // (from_stage, to_stage)
1107}
1108
1109impl PipelineBuilder {
1110    /// Create a new pipeline builder
1111    pub fn new(name: String) -> Self {
1112        Self {
1113            name,
1114            stages: Vec::new(),
1115            connections: Vec::new(),
1116        }
1117    }
1118
1119    /// Add a stage to the pipeline
1120    pub fn add_stage<I, O, F>(
1121        &mut self,
1122        name: String,
1123        config: StreamConfig,
1124        processfn: F,
1125        parallel: bool,
1126        parallelism: usize,
1127    ) -> usize
1128    where
1129        I: Clone + Send + 'static,
1130        O: Clone + Send + 'static,
1131        F: Fn(Vec<I>) -> Result<Vec<O>, CoreError> + Send + Sync + Clone + 'static,
1132    {
1133        let stage = PipelineStage::new(name, config, processfn, parallel, parallelism);
1134        let stage_index = self.stages.len();
1135        self.stages.push(Box::new(StageWrapper::new(stage)));
1136        stage_index
1137    }
1138
1139    /// Connect two stages in the pipeline
1140    pub fn connect(&mut self, from_stage: usize, tostage: usize) -> &mut Self {
1141        if from_stage < self.stages.len() && tostage < self.stages.len() {
1142            self.connections.push((from_stage, tostage));
1143        }
1144        self
1145    }
1146
1147    /// Build the pipeline
1148    pub fn build(self) -> Pipeline {
1149        Pipeline {
1150            name: self.name,
1151            stages: self.stages,
1152            connections: self.connections,
1153            workers: Vec::new(),
1154            state: Arc::new(RwLock::new(StreamState::Initialized)),
1155            stats: Arc::new(RwLock::new(PipelineStats::default())),
1156            error_context: Arc::new(RwLock::new(None)),
1157        }
1158    }
1159}
1160
1161impl Pipeline {
1162    /// Start the pipeline
1163    pub fn start(&mut self) -> Result<(), CoreError> {
1164        let mut state = self.state.write().expect("Failed to acquire write lock");
1165
1166        // Check if the pipeline is already running
1167        if *state == StreamState::Running {
1168            return Err(CoreError::StreamError(
1169                ErrorContext::new("Pipeline already running".to_string())
1170                    .with_location(ErrorLocation::new(file!(), line!())),
1171            ));
1172        }
1173
1174        // Start all stages
1175        for stage in &self.stages {
1176            stage.start()?;
1177        }
1178
1179        // Create worker threads for each connection
1180        for (from_stage, to_stage) in &self.connections {
1181            let from_stage = &self.stages[*from_stage];
1182            let to_stage = &self.stages[*to_stage];
1183
1184            let from_stage_clone = from_stage.clone_box();
1185            let to_stage_clone = to_stage.clone_box();
1186            let state_clone = self.state.clone();
1187            let error_context_clone = self.error_context.clone();
1188
1189            let worker = thread::spawn(move || {
1190                Self::connection_worker(
1191                    from_stage_clone,
1192                    to_stage_clone,
1193                    state_clone,
1194                    error_context_clone,
1195                );
1196            });
1197
1198            self.workers.push(worker);
1199        }
1200
1201        // Update state
1202        *state = StreamState::Running;
1203
1204        Ok(())
1205    }
1206
1207    /// Worker function for processing data between stages
1208    fn connection_worker(
1209        from_stage: Box<dyn AnyStage>,
1210        to_stage: Box<dyn AnyStage>,
1211        state: Arc<RwLock<StreamState>>,
1212        error_context: Arc<RwLock<Option<ErrorContext>>>,
1213    ) {
1214        let mut consecutiveerrors = 0;
1215        let error_threshold = 10; // Maximum number of consecutive errors before giving up
1216
1217        // Processing loop
1218        loop {
1219            // Check if we should continue
1220            {
1221                let current_state = state.read().expect("Failed to acquire read lock");
1222                if *current_state != StreamState::Running {
1223                    break;
1224                }
1225            }
1226
1227            // Try to get data from the source stage
1228            match from_stage.pop_raw() {
1229                Ok(data) => {
1230                    // Reset error counter
1231                    consecutiveerrors = 0;
1232
1233                    // Try to push data to the destination stage
1234                    if let Err(err) = to_stage.push_raw(data) {
1235                        // Handle error
1236                        consecutiveerrors += 1;
1237
1238                        // Update error context
1239                        let mut error_context_guard =
1240                            error_context.write().expect("Failed to acquire write lock");
1241                        *error_context_guard = Some(
1242                            ErrorContext::new(format!(
1243                                "Error pushing data from {} to {}: {}",
1244                                from_stage.name(),
1245                                to_stage.name(),
1246                                err
1247                            ))
1248                            .with_location(ErrorLocation::new(file!(), line!())),
1249                        );
1250
1251                        // Check if we should give up
1252                        if consecutiveerrors >= error_threshold {
1253                            let mut current_state =
1254                                state.write().expect("Failed to acquire write lock");
1255                            *current_state = StreamState::Error;
1256                            break;
1257                        }
1258
1259                        // Sleep before retrying
1260                        thread::sleep(Duration::from_millis(100));
1261                    }
1262                }
1263                Err(err) => {
1264                    match err {
1265                        CoreError::EndOfStream(_) => {
1266                            // End of stream, exit gracefully
1267                            break;
1268                        }
1269                        CoreError::TimeoutError(_) => {
1270                            // Timeout, continue
1271                            continue;
1272                        }
1273                        _ => {
1274                            // Other error, increment counter
1275                            consecutiveerrors += 1;
1276
1277                            // Update error context
1278                            let mut error_context_guard =
1279                                error_context.write().expect("Failed to acquire write lock");
1280                            *error_context_guard = Some(
1281                                ErrorContext::new(format!(
1282                                    "Error popping data from {}: {}",
1283                                    from_stage.name(),
1284                                    err
1285                                ))
1286                                .with_location(ErrorLocation::new(file!(), line!())),
1287                            );
1288
1289                            // Check if we should give up
1290                            if consecutiveerrors >= error_threshold {
1291                                let mut current_state =
1292                                    state.write().expect("Failed to acquire write lock");
1293                                *current_state = StreamState::Error;
1294                                break;
1295                            }
1296
1297                            // Sleep before retrying
1298                            thread::sleep(Duration::from_millis(100));
1299                        }
1300                    }
1301                }
1302            }
1303        }
1304    }
1305
1306    /// Stop the pipeline
1307    pub fn stop(&mut self) -> Result<(), CoreError> {
1308        let mut state = self.state.write().expect("Failed to acquire write lock");
1309
1310        // Check if the pipeline is running
1311        if *state != StreamState::Running {
1312            return Err(CoreError::StreamError(
1313                ErrorContext::new("Pipeline not running".to_string())
1314                    .with_location(ErrorLocation::new(file!(), line!())),
1315            ));
1316        }
1317
1318        // Update state
1319        *state = StreamState::Paused;
1320
1321        // Stop all stages
1322        for stage in &self.stages {
1323            stage.stop()?;
1324        }
1325
1326        // Wait for worker threads to finish
1327        for worker in self.workers.drain(..) {
1328            match worker.join() {
1329                Ok(_) => {}
1330                Err(_) => {
1331                    return Err(CoreError::StreamError(
1332                        ErrorContext::new("Error joining worker thread".to_string())
1333                            .with_location(ErrorLocation::new(file!(), line!())),
1334                    ));
1335                }
1336            }
1337        }
1338
1339        Ok(())
1340    }
1341
1342    /// Get the current state of the pipeline
1343    pub fn state(&self) -> StreamState {
1344        *self.state.read().expect("Failed to acquire read lock")
1345    }
1346
1347    /// Get the statistics for the pipeline
1348    pub fn stats(&self) -> PipelineStats {
1349        let mut stats = PipelineStats::default();
1350
1351        // Collect stats from all stages
1352        for stage in &self.stages {
1353            let stage_stats = stage.stats();
1354            let stage_name = stage.name().to_string();
1355
1356            stats
1357                .stage_stats
1358                .insert(stage_name.clone(), stage_stats.clone());
1359
1360            // Update bottleneck stats
1361            let stage_throughput = stage_stats.avg_throughput;
1362            if stage_throughput > 0.0 && stage_throughput < stats.bottleneck_throughput {
1363                stats.bottleneck_throughput = stage_throughput;
1364                stats.bottleneck_stage = Some(stage_name);
1365            }
1366
1367            // Update total items processed (use the final stage's count)
1368            if !self
1369                .connections
1370                .iter()
1371                .any(|(_, to)| *to == self.stages.len() - 1)
1372            {
1373                stats.total_items = stage_stats.processed_items;
1374            }
1375        }
1376
1377        // Calculate overall statistics
1378        let mut max_uptime = 0.0;
1379        for stage_stats in stats.stage_stats.values() {
1380            if stage_stats.uptime_seconds > max_uptime {
1381                max_uptime = stage_stats.uptime_seconds;
1382            }
1383        }
1384
1385        stats.uptime_seconds = max_uptime;
1386
1387        if max_uptime > 0.0 {
1388            stats.overall_throughput = stats.total_items as f64 / max_uptime;
1389        }
1390
1391        stats
1392    }
1393
1394    /// Get the last error from the pipeline
1395    pub fn lasterror(&self) -> Option<ErrorContext> {
1396        self.error_context
1397            .read()
1398            .expect("Failed to acquire read lock")
1399            .clone()
1400    }
1401
1402    /// Check if the pipeline is empty
1403    pub fn is_empty(&self) -> bool {
1404        self.stages.iter().all(|stage| stage.is_empty())
1405    }
1406
1407    /// Get a stage by index
1408    pub fn stage(&self, index: usize) -> Option<&dyn AnyStage> {
1409        self.stages.get(index).map(|s| s.as_ref())
1410    }
1411
1412    /// Get the number of stages in the pipeline
1413    pub fn num_stages(&self) -> usize {
1414        self.stages.len()
1415    }
1416}
1417
1418impl Drop for Pipeline {
1419    fn drop(&mut self) {
1420        // Stop the pipeline if it's running
1421        if *self.state.read().expect("Failed to acquire read lock") == StreamState::Running {
1422            let _ = self.stop();
1423        }
1424    }
1425}
1426
1427/// Wrapper for pipeline stages to implement AnyStage
1428struct StageWrapper<I: Clone + Send + 'static, O: Clone + Send + 'static> {
1429    stage: PipelineStage<I, O>,
1430}
1431
1432impl<I: Clone + Send + 'static, O: Clone + Send + 'static> StageWrapper<I, O> {
1433    /// Create a new stage wrapper
1434    fn new(stage: PipelineStage<I, O>) -> Self {
1435        Self { stage }
1436    }
1437}
1438
1439impl<I: Clone + Send + 'static, O: Clone + Send + 'static> AnyStage for StageWrapper<I, O> {
1440    fn name(&self) -> &str {
1441        &self.stage.name
1442    }
1443
1444    fn start(&self) -> Result<(), CoreError> {
1445        self.stage.start()
1446    }
1447
1448    fn stop(&self) -> Result<(), CoreError> {
1449        self.stage.stop()
1450    }
1451
1452    fn state(&self) -> StreamState {
1453        self.stage.state()
1454    }
1455
1456    fn stats(&self) -> StreamStats {
1457        self.stage.stats()
1458    }
1459
1460    fn clone_box_impl(&self) -> Box<dyn AnyStage> {
1461        Box::new(self.clone())
1462    }
1463
1464    fn is_empty(&self) -> bool {
1465        self.stage
1466            .processor
1467            .lock()
1468            .expect("Failed to acquire lock")
1469            .is_empty()
1470    }
1471
1472    fn push_raw(&self, data: Box<dyn std::any::Any + Send>) -> Result<(), CoreError> {
1473        let input = match data.downcast::<Vec<I>>() {
1474            Ok(input) => *input,
1475            Err(data) => {
1476                // Try to downcast to a single item
1477                match data.downcast::<I>() {
1478                    Ok(item) => vec![*item],
1479                    Err(_) => {
1480                        return Err(CoreError::StreamError(
1481                            ErrorContext::new(format!(
1482                                "Type mismatch when pushing data to stage {}",
1483                                self.name()
1484                            ))
1485                            .with_location(ErrorLocation::new(file!(), line!())),
1486                        ));
1487                    }
1488                }
1489            }
1490        };
1491
1492        self.stage
1493            .processor
1494            .lock()
1495            .expect("Failed to acquire lock")
1496            .push_batch(input)
1497    }
1498
1499    fn pop_raw(&self) -> Result<Box<dyn std::any::Any + Send>, CoreError> {
1500        let output = self
1501            .stage
1502            .processor
1503            .lock()
1504            .expect("Failed to acquire lock")
1505            .pop_batch(100)?;
1506        Ok(Box::new(output))
1507    }
1508}
1509
1510impl dyn AnyStage {
1511    /// Clone the stage into a new Box
1512    fn clone_box(&self) -> Box<dyn AnyStage> {
1513        self.clone_box_impl()
1514    }
1515}
1516
1517impl<I: Clone + Send + 'static, O: Clone + Send + 'static> Clone for StageWrapper<I, O> {
1518    fn clone(&self) -> Self {
1519        let stage = PipelineStage {
1520            name: self.stage.name.clone(),
1521            processor: self.stage.processor(),
1522            parallel: self.stage.parallel,
1523            parallelism: self.stage.parallelism,
1524        };
1525
1526        Self { stage }
1527    }
1528}
1529
1530/// Extensions to the StreamProcessor to enable ndarray processing
1531impl<A, D> StreamProcessor<ArrayBase<OwnedRepr<A>, D>, ArrayBase<OwnedRepr<A>, D>>
1532where
1533    A: Clone + Send + Default + 'static,
1534    D: Dimension + Clone + Send + 'static + RemoveAxis,
1535{
1536    /// Create a new array stream processor
1537    pub fn newarray<F>(config: StreamConfig, processfn: F) -> Self
1538    where
1539        F: Fn(
1540                Vec<ArrayBase<OwnedRepr<A>, D>>,
1541            ) -> Result<Vec<ArrayBase<OwnedRepr<A>, D>>, CoreError>
1542            + Send
1543            + Sync
1544            + 'static,
1545    {
1546        Self::new(config, processfn)
1547    }
1548
1549    /// Process arrays chunk-wise
1550    pub fn chunk_wise<F>(config: StreamConfig, chunk_size: usize, processfn: F) -> Self
1551    where
1552        F: Fn(&ArrayBase<OwnedRepr<A>, D>) -> Result<ArrayBase<OwnedRepr<A>, D>, CoreError>
1553            + Send
1554            + Sync
1555            + Clone
1556            + 'static,
1557    {
1558        let chunking_strategy = ChunkingStrategy::Fixed(chunk_size);
1559
1560        let processfn_clone = processfn.clone();
1561        let chunks_fn = move |arrays: Vec<ArrayBase<OwnedRepr<A>, D>>| -> Result<Vec<ArrayBase<OwnedRepr<A>, D>>, CoreError> {
1562            let mut results = Vec::with_capacity(arrays.len());
1563
1564            for array in arrays {
1565                // Create chunked array
1566                let chunked = ChunkedArray::new(array, chunking_strategy);
1567
1568                // Process each chunk and combine results
1569                let mut chunk_results = Vec::new();
1570                for chunk in chunked.get_chunks() {
1571                    let result = processfn_clone(&chunk)?;
1572                    chunk_results.push(result);
1573                }
1574
1575                // Combine chunk results by concatenating along the flattened dimension
1576                let combined = if !chunk_results.is_empty() {
1577                    // For 1D arrays, concatenate directly
1578                    if let Ok(combined_1d) = crate::ndarray::concatenate(
1579                        crate::ndarray::Axis(0),
1580                        &chunk_results
1581                            .iter()
1582                            .map(|arr| arr.view())
1583                            .collect::<Vec<_>>()
1584                    ) {
1585                        // Try to reshape back to original dimensions
1586                        if let Ok(reshaped) = combined_1d.into_dimensionality::<D>() {
1587                            reshaped
1588                        } else {
1589                            // If reshaping fails, use the first chunk as fallback
1590                            chunk_results.into_iter().next().expect("Expected at least one result")
1591                        }
1592                    } else {
1593                        // For multi-dimensional arrays, more complex logic would be needed
1594                        // For now, concatenate and reshape approach
1595                        chunk_results.into_iter().next().expect("Expected at least one result")
1596                    }
1597                } else {
1598                    return Err(CoreError::ValueError(ErrorContext::new(
1599                        "No chunks to process".to_string(),
1600                    )));
1601                };
1602                results.push(combined);
1603            }
1604
1605            Ok(results)
1606        };
1607
1608        Self::new(config, chunks_fn)
1609    }
1610
1611    /// Process arrays in parallel
1612    #[cfg(feature = "parallel")]
1613    pub fn parallel<F>(config: StreamConfig, processfn: F) -> Self
1614    where
1615        F: Fn(&ArrayBase<OwnedRepr<A>, D>) -> Result<ArrayBase<OwnedRepr<A>, D>, CoreError>
1616            + Send
1617            + Sync
1618            + Clone
1619            + 'static,
1620        A: Send + Sync,
1621    {
1622        let workers = config.workers.unwrap_or_else(num_cpus::get);
1623
1624        let parallel_fn = move |arrays: Vec<ArrayBase<OwnedRepr<A>, D>>| -> Result<Vec<ArrayBase<OwnedRepr<A>, D>>, CoreError> {
1625            // Process arrays in parallel using rayon
1626            use crate::parallel_ops::*;
1627            let pool = ThreadPoolBuilder::new()
1628                .num_threads(workers)
1629                .build()
1630                .map_err(|e| CoreError::StreamError(
1631                    ErrorContext::new(format!("{e}"))
1632                        .with_location(ErrorLocation::new(file!(), line!()))
1633                ))?;
1634
1635            let processfn_clone = processfn.clone();
1636            pool.install(|| {
1637                let results: Result<Vec<_>, _> = arrays
1638                    .par_iter()
1639                    .map(|array| processfn_clone(array))
1640                    .collect();
1641
1642                results
1643            })
1644        };
1645
1646        Self::new(config, parallel_fn)
1647    }
1648}
1649
1650/// Create a new stream processor with default configuration
1651#[allow(dead_code)]
1652pub fn create_stream_processor<T, U, F>(processfn: F) -> StreamProcessor<T, U>
1653where
1654    T: Clone + Send + 'static,
1655    U: Clone + Send + 'static,
1656    F: Fn(Vec<T>) -> Result<Vec<U>, CoreError> + Send + Sync + 'static,
1657{
1658    StreamProcessor::new(StreamConfig::default(), processfn)
1659}
1660
1661/// Create a new pipeline
1662#[allow(dead_code)]
1663pub fn create_pipeline(name: &str) -> PipelineBuilder {
1664    PipelineBuilder::new(name.to_string())
1665}
1666
1667/// Extension trait for error handling in stream processing
1668pub trait StreamError {
1669    /// Convert to a stream error
1670    #[allow(dead_code)]
1671    fn to_streamerror(self, message: &str) -> CoreError;
1672}
1673
1674impl<T> StreamError for std::result::Result<T, CoreError> {
1675    fn to_streamerror(self, message: &str) -> CoreError {
1676        match self {
1677            Ok(_) => CoreError::StreamError(
1678                ErrorContext::new(message.to_string())
1679                    .with_location(ErrorLocation::new(file!(), line!())),
1680            ),
1681            Err(e) => CoreError::StreamError(
1682                ErrorContext::new(format!("{message}, {e}"))
1683                    .with_location(ErrorLocation::new(file!(), line!())),
1684            ),
1685        }
1686    }
1687}
1688
1689/// Extension to CoreError for stream errors
1690impl CoreError {
1691    /// Create a new end of stream error
1692    pub fn message(message: &str) -> Self {
1693        CoreError::EndOfStream(
1694            ErrorContext::new(message.to_string())
1695                .with_location(ErrorLocation::new(file!(), line!())),
1696        )
1697    }
1698
1699    /// Create a new stream error
1700    pub fn message_2(message: &str) -> Self {
1701        CoreError::StreamError(
1702            ErrorContext::new(message.to_string())
1703                .with_location(ErrorLocation::new(file!(), line!())),
1704        )
1705    }
1706
1707    /// Create a new timeout error
1708    pub fn message_3(message: &str) -> Self {
1709        CoreError::TimeoutError(
1710            ErrorContext::new(message.to_string())
1711                .with_location(ErrorLocation::new(file!(), line!())),
1712        )
1713    }
1714}