scirs2_io/pipeline/
executors.rs

1//! Pipeline execution strategies and executors
2
3#![allow(dead_code)]
4#![allow(missing_docs)]
5
6use super::*;
7use crate::error::Result;
8use crossbeam_channel::Receiver;
9#[cfg(feature = "async")]
10use futures::stream::{self, StreamExt};
11use std::sync::atomic::{AtomicUsize, Ordering};
12use std::thread;
13use std::time::Instant;
14#[cfg(feature = "async")]
15use tokio::runtime::Runtime;
16
17/// Trait for pipeline executors
18pub trait PipelineExecutor<I, O> {
19    /// Execute the pipeline with the given input
20    fn execute(&self, pipeline: &Pipeline<I, O>, input: I) -> Result<O>;
21
22    /// Get executor name
23    fn name(&self) -> &str;
24}
25
26/// Sequential executor - executes stages one after another
27pub struct SequentialExecutor;
28
29impl<I, O> PipelineExecutor<I, O> for SequentialExecutor
30where
31    I: 'static + Send + Sync,
32    O: 'static + Send + Sync,
33{
34    fn execute(&self, pipeline: &Pipeline<I, O>, input: I) -> Result<O> {
35        pipeline.execute(input)
36    }
37
38    fn name(&self) -> &str {
39        "sequential"
40    }
41}
42
43/// Streaming executor - processes data in chunks
44pub struct StreamingExecutor {
45    pub chunk_size: usize,
46}
47
48impl StreamingExecutor {
49    pub fn new(chunk_size: usize) -> Self {
50        Self { chunk_size }
51    }
52}
53
54impl<I, O> PipelineExecutor<Vec<I>, Vec<O>> for StreamingExecutor
55where
56    I: 'static + Send + Sync + Clone,
57    O: 'static + Send + Sync,
58{
59    fn execute(&self, pipeline: &Pipeline<Vec<I>, Vec<O>>, input: Vec<I>) -> Result<Vec<O>> {
60        let chunks: Vec<Vec<I>> = input
61            .chunks(self.chunk_size)
62            .map(|chunk| chunk.to_vec())
63            .collect();
64
65        let mut results = Vec::new();
66
67        for chunk in chunks {
68            let chunk_result = pipeline.execute(chunk)?;
69            results.extend(chunk_result);
70        }
71
72        Ok(results)
73    }
74
75    fn name(&self) -> &str {
76        "streaming"
77    }
78}
79
80/// Async executor - executes pipeline asynchronously
81#[cfg(feature = "async")]
82pub struct AsyncExecutor {
83    runtime: Runtime,
84}
85
86#[cfg(feature = "async")]
87impl AsyncExecutor {
88    pub fn new() -> Self {
89        Self {
90            runtime: Runtime::new().unwrap(),
91        }
92    }
93}
94
95#[cfg(feature = "async")]
96impl<I, O> PipelineExecutor<I, O> for AsyncExecutor
97where
98    I: 'static + Send + Sync,
99    O: 'static + Send + Sync,
100{
101    fn execute(&self, pipeline: &Pipeline<I, O>, input: I) -> Result<O> {
102        self.runtime.block_on(async {
103            // Execute pipeline in async context
104            // Note: We execute directly without spawn_blocking since Pipeline doesn't implement Send
105            pipeline.execute(input)
106        })
107    }
108
109    fn name(&self) -> &str {
110        "async"
111    }
112}
113
114/// Cached executor - caches intermediate results
115pub struct CachedExecutor {
116    cache_dir: PathBuf,
117}
118
119impl CachedExecutor {
120    pub fn new(cache_dir: impl AsRef<Path>) -> Self {
121        Self {
122            cache_dir: cache_dir.as_ref().to_path_buf(),
123        }
124    }
125
126    fn cache_key<T>(&self, stagename: &str, input: &T) -> String
127    where
128        T: std::fmt::Debug,
129    {
130        use std::collections::hash_map::DefaultHasher;
131        use std::hash::{Hash, Hasher};
132
133        let mut hasher = DefaultHasher::new();
134        format!("{:?}", input).hash(&mut hasher);
135        format!("{}_{:x}", stagename, hasher.finish())
136    }
137}
138
139impl<I, O> PipelineExecutor<I, O> for CachedExecutor
140where
141    I: 'static + Send + Sync + std::fmt::Debug,
142    O: 'static + Send + Sync + serde::Serialize + serde::de::DeserializeOwned,
143{
144    fn execute(&self, pipeline: &Pipeline<I, O>, input: I) -> Result<O> {
145        // Check cache first
146        let cache_key = self.cache_key("pipeline", &input);
147        let cache_path = self.cache_dir.join(format!("{}.cache", cache_key));
148
149        if cache_path.exists() {
150            // Try to load from cache
151            if let Ok(cached_data) = std::fs::read(&cache_path) {
152                if let Ok((result, _len)) = bincode::serde::decode_from_slice::<O, _>(
153                    &cached_data,
154                    bincode::config::standard(),
155                ) {
156                    return Ok(result);
157                }
158            }
159        }
160
161        // Execute pipeline
162        let result = pipeline.execute(input)?;
163
164        // Save to cache
165        if let Ok(serialized) = bincode::serde::encode_to_vec(&result, bincode::config::standard())
166        {
167            let _ = std::fs::create_dir_all(&self.cache_dir);
168            let _ = std::fs::write(&cache_path, serialized);
169        }
170
171        Ok(result)
172    }
173
174    fn name(&self) -> &str {
175        "cached"
176    }
177}
178
179/// Distributed executor - distributes work across multiple workers
180pub struct DistributedExecutor {
181    num_workers: usize,
182}
183
184impl DistributedExecutor {
185    pub fn new(num_workers: usize) -> Self {
186        Self { num_workers }
187    }
188}
189
190impl<I, O> PipelineExecutor<Vec<I>, Vec<O>> for DistributedExecutor
191where
192    I: 'static + Send + Sync + Clone,
193    O: 'static + Send + Sync,
194{
195    fn execute(&self, pipeline: &Pipeline<Vec<I>, Vec<O>>, input: Vec<I>) -> Result<Vec<O>> {
196        use scirs2_core::parallel_ops::*;
197
198        // For now, we'll use a simpler approach that processes chunks in parallel
199        // but executes the pipeline sequentially on each chunk
200        let chunk_size = (input.len() + self.num_workers - 1) / self.num_workers;
201
202        // Process chunks in parallel using scirs2-core's parallel operations
203        let results: Result<Vec<Vec<O>>> = input
204            .par_chunks(chunk_size)
205            .map(|chunk| {
206                // Execute pipeline on this chunk
207                pipeline.execute(chunk.to_vec())
208            })
209            .collect();
210
211        // Flatten results
212        results.map(|chunks| chunks.into_iter().flatten().collect())
213    }
214
215    fn name(&self) -> &str {
216        "distributed"
217    }
218}
219
220/// Checkpointed executor - saves progress at intervals
221pub struct CheckpointedExecutor {
222    checkpoint_dir: PathBuf,
223    checkpoint_interval: usize,
224}
225
226impl CheckpointedExecutor {
227    pub fn new(checkpoint_dir: impl AsRef<Path>, interval: usize) -> Self {
228        Self {
229            checkpoint_dir: checkpoint_dir.as_ref().to_path_buf(),
230            checkpoint_interval: interval,
231        }
232    }
233}
234
235impl<I, O> PipelineExecutor<I, O> for CheckpointedExecutor
236where
237    I: 'static + Send + Sync + serde::Serialize + serde::de::DeserializeOwned,
238    O: 'static + Send + Sync + serde::Serialize + serde::de::DeserializeOwned,
239{
240    fn execute(&self, pipeline: &Pipeline<I, O>, input: I) -> Result<O> {
241        // Create checkpoint directory
242        std::fs::create_dir_all(&self.checkpoint_dir).map_err(IoError::Io)?;
243
244        // Execute with checkpointing logic
245        // Note: This is simplified - real implementation would checkpoint at each stage
246        let result = pipeline.execute(input)?;
247
248        // Save final checkpoint
249        let checkpoint_path = self.checkpoint_dir.join("final.checkpoint");
250        let serialized = bincode::serde::encode_to_vec(&result, bincode::config::standard())
251            .map_err(|e| IoError::SerializationError(e.to_string()))?;
252        std::fs::write(&checkpoint_path, serialized).map_err(IoError::Io)?;
253
254        Ok(result)
255    }
256
257    fn name(&self) -> &str {
258        "checkpointed"
259    }
260}
261
262/// Factory for creating executors
263pub struct ExecutorFactory;
264
265impl ExecutorFactory {
266    /// Create a sequential executor
267    pub fn sequential() -> Box<dyn PipelineExecutor<Vec<i32>, Vec<i32>>> {
268        Box::new(SequentialExecutor)
269    }
270
271    /// Create a streaming executor
272    pub fn streaming(chunk_size: usize) -> Box<dyn PipelineExecutor<Vec<i32>, Vec<i32>>> {
273        Box::new(StreamingExecutor::new(chunk_size))
274    }
275
276    /// Create an async executor
277    #[cfg(feature = "async")]
278    pub fn async_executor() -> Box<dyn PipelineExecutor<Vec<i32>, Vec<i32>>> {
279        Box::new(AsyncExecutor::new())
280    }
281
282    /// Create a cached executor
283    pub fn cached(cache_dir: impl AsRef<Path>) -> Box<dyn PipelineExecutor<Vec<i32>, Vec<i32>>> {
284        Box::new(CachedExecutor::new(cache_dir))
285    }
286
287    /// Create a distributed executor
288    pub fn distributed(num_workers: usize) -> Box<dyn PipelineExecutor<Vec<i32>, Vec<i32>>> {
289        Box::new(DistributedExecutor::new(num_workers))
290    }
291
292    /// Create a checkpointed executor
293    pub fn checkpointed(
294        checkpoint_dir: impl AsRef<Path>,
295        interval: usize,
296    ) -> Box<dyn PipelineExecutor<Vec<i32>, Vec<i32>>> {
297        Box::new(CheckpointedExecutor::new(checkpoint_dir, interval))
298    }
299}
300
301#[cfg(test)]
302mod tests {
303    use super::*;
304
305    #[test]
306    fn test_sequential_executor() {
307        let pipeline: Pipeline<i32, i32> =
308            Pipeline::new().add_stage(function_stage("double", |x: i32| Ok(x * 2)));
309
310        let executor = SequentialExecutor;
311        let result = executor.execute(&pipeline, 21).unwrap();
312        assert_eq!(result, 42);
313    }
314
315    #[test]
316    fn test_streaming_executor() {
317        let pipeline: Pipeline<Vec<i32>, Vec<i32>> = Pipeline::new()
318            .add_stage(function_stage("double_all", |nums: Vec<i32>| {
319                Ok(nums.into_iter().map(|x| x * 2).collect::<Vec<_>>())
320            }));
321
322        let executor = StreamingExecutor::new(2);
323        let result = executor.execute(&pipeline, vec![1, 2, 3, 4]).unwrap();
324        assert_eq!(result, vec![2, 4, 6, 8]);
325    }
326}
327
328/// Enhanced streaming executor with backpressure control
329pub struct BackpressureStreamingExecutor {
330    chunk_size: usize,
331    max_pending_chunks: usize,
332    timeout: Duration,
333}
334
335impl BackpressureStreamingExecutor {
336    pub fn new(chunk_size: usize, max_pending_chunks: usize) -> Self {
337        Self {
338            chunk_size,
339            max_pending_chunks,
340            timeout: Duration::from_secs(30),
341        }
342    }
343
344    pub fn with_timeout(mut self, timeout: Duration) -> Self {
345        self.timeout = timeout;
346        self
347    }
348}
349
350impl<I, O> PipelineExecutor<Vec<I>, Vec<O>> for BackpressureStreamingExecutor
351where
352    I: 'static + Send + Sync + Clone,
353    O: 'static + Send + Sync,
354{
355    fn execute(&self, pipeline: &Pipeline<Vec<I>, Vec<O>>, input: Vec<I>) -> Result<Vec<O>> {
356        // Process chunks sequentially to avoid borrowing issues
357        // In a production implementation, you might want to use Arc<Pipeline> for sharing
358        let mut all_results = Vec::new();
359
360        for chunk in input.chunks(self.chunk_size) {
361            let chunk_vec = chunk.to_vec();
362            let result = pipeline.execute(chunk_vec)?;
363            all_results.extend(result);
364        }
365
366        Ok(all_results)
367    }
368
369    fn name(&self) -> &str {
370        "backpressure_streaming"
371    }
372}
373
374/// Monitoring executor that collects detailed metrics
375pub struct MonitoringExecutor<E> {
376    inner: E,
377    metrics_collector: Arc<Mutex<PipelineMetrics>>,
378}
379
380#[derive(Debug)]
381pub struct PipelineMetrics {
382    pub total_items: AtomicUsize,
383    pub successful_items: AtomicUsize,
384    pub failed_items: AtomicUsize,
385    pub stage_metrics: HashMap<String, StageMetrics>,
386    pub start_time: Option<Instant>,
387    pub end_time: Option<Instant>,
388}
389
390impl Default for PipelineMetrics {
391    fn default() -> Self {
392        Self {
393            total_items: AtomicUsize::new(0),
394            successful_items: AtomicUsize::new(0),
395            failed_items: AtomicUsize::new(0),
396            stage_metrics: HashMap::new(),
397            start_time: None,
398            end_time: None,
399        }
400    }
401}
402
403impl Clone for PipelineMetrics {
404    fn clone(&self) -> Self {
405        Self {
406            total_items: AtomicUsize::new(self.total_items.load(Ordering::SeqCst)),
407            successful_items: AtomicUsize::new(self.successful_items.load(Ordering::SeqCst)),
408            failed_items: AtomicUsize::new(self.failed_items.load(Ordering::SeqCst)),
409            stage_metrics: self.stage_metrics.clone(),
410            start_time: self.start_time,
411            end_time: self.end_time,
412        }
413    }
414}
415
416#[derive(Debug, Clone, Default)]
417pub struct StageMetrics {
418    pub execution_count: usize,
419    pub total_duration: Duration,
420    pub min_duration: Option<Duration>,
421    pub max_duration: Option<Duration>,
422    pub errors: Vec<String>,
423}
424
425impl<E> MonitoringExecutor<E> {
426    pub fn new(inner: E) -> Self {
427        Self {
428            inner,
429            metrics_collector: Arc::new(Mutex::new(PipelineMetrics::default())),
430        }
431    }
432
433    pub fn get_metrics(&self) -> PipelineMetrics {
434        self.metrics_collector.lock().unwrap().clone()
435    }
436}
437
438impl<E, I, O> PipelineExecutor<I, O> for MonitoringExecutor<E>
439where
440    E: PipelineExecutor<I, O>,
441    I: 'static + Send + Sync,
442    O: 'static + Send + Sync,
443{
444    fn execute(&self, pipeline: &Pipeline<I, O>, input: I) -> Result<O> {
445        {
446            let mut metrics = self.metrics_collector.lock().unwrap();
447            metrics.start_time = Some(Instant::now());
448            metrics.total_items.fetch_add(1, Ordering::SeqCst);
449        }
450
451        let result = self.inner.execute(pipeline, input);
452
453        {
454            let mut metrics = self.metrics_collector.lock().unwrap();
455            metrics.end_time = Some(Instant::now());
456
457            match &result {
458                Ok(_) => {
459                    metrics.successful_items.fetch_add(1, Ordering::SeqCst);
460                }
461                Err(_) => {
462                    metrics.failed_items.fetch_add(1, Ordering::SeqCst);
463                }
464            }
465        }
466
467        result
468    }
469
470    fn name(&self) -> &str {
471        "monitoring"
472    }
473}
474
475/// Retry executor for fault tolerance
476pub struct RetryExecutor<E> {
477    inner: E,
478    max_retries: usize,
479    retry_delay: Duration,
480    exponential_backoff: bool,
481}
482
483impl<E> RetryExecutor<E> {
484    pub fn new(inner: E, max_retries: usize) -> Self {
485        Self {
486            inner,
487            max_retries,
488            retry_delay: Duration::from_secs(1),
489            exponential_backoff: true,
490        }
491    }
492
493    pub fn with_delay(mut self, delay: Duration) -> Self {
494        self.retry_delay = delay;
495        self
496    }
497
498    pub fn with_exponential_backoff(mut self, enabled: bool) -> Self {
499        self.exponential_backoff = enabled;
500        self
501    }
502}
503
504impl<E, I, O> PipelineExecutor<I, O> for RetryExecutor<E>
505where
506    E: PipelineExecutor<I, O>,
507    I: 'static + Send + Sync + Clone,
508    O: 'static + Send + Sync,
509{
510    fn execute(&self, pipeline: &Pipeline<I, O>, input: I) -> Result<O> {
511        let mut last_error = None;
512        let mut delay = self.retry_delay;
513
514        for attempt in 0..=self.max_retries {
515            if attempt > 0 {
516                thread::sleep(delay);
517                if self.exponential_backoff {
518                    delay *= 2;
519                }
520            }
521
522            match self.inner.execute(pipeline, input.clone()) {
523                Ok(result) => return Ok(result),
524                Err(e) => {
525                    last_error = Some(e);
526                }
527            }
528        }
529
530        Err(last_error.unwrap_or_else(|| IoError::Other("Retry failed".to_string())))
531    }
532
533    fn name(&self) -> &str {
534        "retry"
535    }
536}
537
538/// Event-driven executor that triggers on specific conditions
539pub struct EventDrivenExecutor {
540    event_receiver: Receiver<Event>,
541}
542
543#[derive(Debug, Clone)]
544pub enum Event {
545    DataAvailable(String),
546    ScheduledTime(Instant),
547    ExternalTrigger(String),
548    FileCreated(PathBuf),
549}
550
551impl EventDrivenExecutor {
552    pub fn new(event_receiver: Receiver<Event>) -> Self {
553        Self { event_receiver }
554    }
555}
556
557impl<I, O> PipelineExecutor<I, O> for EventDrivenExecutor
558where
559    I: 'static + Send + Sync,
560    O: 'static + Send + Sync,
561{
562    fn execute(&self, pipeline: &Pipeline<I, O>, input: I) -> Result<O> {
563        // Wait for event
564        match self.event_receiver.recv() {
565            Ok(event) => {
566                match event {
567                    Event::DataAvailable(_) | Event::ExternalTrigger(_) | Event::FileCreated(_) => {
568                        // Execute pipeline when event is received
569                        pipeline.execute(input)
570                    }
571                    Event::ScheduledTime(scheduled) => {
572                        // Wait until scheduled time
573                        let now = Instant::now();
574                        if scheduled > now {
575                            thread::sleep(scheduled - now);
576                        }
577                        pipeline.execute(input)
578                    }
579                }
580            }
581            Err(_) => Err(IoError::Other("Event channel closed".to_string())),
582        }
583    }
584
585    fn name(&self) -> &str {
586        "event_driven"
587    }
588}
589
590/// Parallel stage executor for executing pipeline stages in parallel
591pub struct ParallelStageExecutor {
592    max_parallelism: usize,
593}
594
595impl ParallelStageExecutor {
596    pub fn new(max_parallelism: usize) -> Self {
597        Self { max_parallelism }
598    }
599}
600
601impl<I, O> PipelineExecutor<I, O> for ParallelStageExecutor
602where
603    I: 'static + Send + Sync,
604    O: 'static + Send + Sync,
605{
606    fn execute(&self, pipeline: &Pipeline<I, O>, input: I) -> Result<O> {
607        // For now, delegate to regular execution
608        // In a full implementation, this would analyze stage dependencies
609        // and execute independent stages in parallel
610        pipeline.execute(input)
611    }
612
613    fn name(&self) -> &str {
614        "parallel_stage"
615    }
616}