1#![allow(dead_code)]
4#![allow(missing_docs)]
5
6use super::*;
7use crate::error::Result;
8use crossbeam_channel::Receiver;
9#[cfg(feature = "async")]
10use futures::stream::{self, StreamExt};
11use std::sync::atomic::{AtomicUsize, Ordering};
12use std::thread;
13use std::time::Instant;
14#[cfg(feature = "async")]
15use tokio::runtime::Runtime;
16
17pub trait PipelineExecutor<I, O> {
19 fn execute(&self, pipeline: &Pipeline<I, O>, input: I) -> Result<O>;
21
22 fn name(&self) -> &str;
24}
25
26pub struct SequentialExecutor;
28
29impl<I, O> PipelineExecutor<I, O> for SequentialExecutor
30where
31 I: 'static + Send + Sync,
32 O: 'static + Send + Sync,
33{
34 fn execute(&self, pipeline: &Pipeline<I, O>, input: I) -> Result<O> {
35 pipeline.execute(input)
36 }
37
38 fn name(&self) -> &str {
39 "sequential"
40 }
41}
42
43pub struct StreamingExecutor {
45 pub chunk_size: usize,
46}
47
48impl StreamingExecutor {
49 pub fn new(chunk_size: usize) -> Self {
50 Self { chunk_size }
51 }
52}
53
54impl<I, O> PipelineExecutor<Vec<I>, Vec<O>> for StreamingExecutor
55where
56 I: 'static + Send + Sync + Clone,
57 O: 'static + Send + Sync,
58{
59 fn execute(&self, pipeline: &Pipeline<Vec<I>, Vec<O>>, input: Vec<I>) -> Result<Vec<O>> {
60 let chunks: Vec<Vec<I>> = input
61 .chunks(self.chunk_size)
62 .map(|chunk| chunk.to_vec())
63 .collect();
64
65 let mut results = Vec::new();
66
67 for chunk in chunks {
68 let chunk_result = pipeline.execute(chunk)?;
69 results.extend(chunk_result);
70 }
71
72 Ok(results)
73 }
74
75 fn name(&self) -> &str {
76 "streaming"
77 }
78}
79
80#[cfg(feature = "async")]
82pub struct AsyncExecutor {
83 runtime: Runtime,
84}
85
86#[cfg(feature = "async")]
87impl AsyncExecutor {
88 pub fn new() -> Self {
89 Self {
90 runtime: Runtime::new().unwrap(),
91 }
92 }
93}
94
95#[cfg(feature = "async")]
96impl<I, O> PipelineExecutor<I, O> for AsyncExecutor
97where
98 I: 'static + Send + Sync,
99 O: 'static + Send + Sync,
100{
101 fn execute(&self, pipeline: &Pipeline<I, O>, input: I) -> Result<O> {
102 self.runtime.block_on(async {
103 pipeline.execute(input)
106 })
107 }
108
109 fn name(&self) -> &str {
110 "async"
111 }
112}
113
114pub struct CachedExecutor {
116 cache_dir: PathBuf,
117}
118
119impl CachedExecutor {
120 pub fn new(cache_dir: impl AsRef<Path>) -> Self {
121 Self {
122 cache_dir: cache_dir.as_ref().to_path_buf(),
123 }
124 }
125
126 fn cache_key<T>(&self, stagename: &str, input: &T) -> String
127 where
128 T: std::fmt::Debug,
129 {
130 use std::collections::hash_map::DefaultHasher;
131 use std::hash::{Hash, Hasher};
132
133 let mut hasher = DefaultHasher::new();
134 format!("{:?}", input).hash(&mut hasher);
135 format!("{}_{:x}", stagename, hasher.finish())
136 }
137}
138
139impl<I, O> PipelineExecutor<I, O> for CachedExecutor
140where
141 I: 'static + Send + Sync + std::fmt::Debug,
142 O: 'static + Send + Sync + serde::Serialize + serde::de::DeserializeOwned,
143{
144 fn execute(&self, pipeline: &Pipeline<I, O>, input: I) -> Result<O> {
145 let cache_key = self.cache_key("pipeline", &input);
147 let cache_path = self.cache_dir.join(format!("{}.cache", cache_key));
148
149 if cache_path.exists() {
150 if let Ok(cached_data) = std::fs::read(&cache_path) {
152 if let Ok((result, _len)) = bincode::serde::decode_from_slice::<O, _>(
153 &cached_data,
154 bincode::config::standard(),
155 ) {
156 return Ok(result);
157 }
158 }
159 }
160
161 let result = pipeline.execute(input)?;
163
164 if let Ok(serialized) = bincode::serde::encode_to_vec(&result, bincode::config::standard())
166 {
167 let _ = std::fs::create_dir_all(&self.cache_dir);
168 let _ = std::fs::write(&cache_path, serialized);
169 }
170
171 Ok(result)
172 }
173
174 fn name(&self) -> &str {
175 "cached"
176 }
177}
178
179pub struct DistributedExecutor {
181 num_workers: usize,
182}
183
184impl DistributedExecutor {
185 pub fn new(num_workers: usize) -> Self {
186 Self { num_workers }
187 }
188}
189
190impl<I, O> PipelineExecutor<Vec<I>, Vec<O>> for DistributedExecutor
191where
192 I: 'static + Send + Sync + Clone,
193 O: 'static + Send + Sync,
194{
195 fn execute(&self, pipeline: &Pipeline<Vec<I>, Vec<O>>, input: Vec<I>) -> Result<Vec<O>> {
196 use scirs2_core::parallel_ops::*;
197
198 let chunk_size = (input.len() + self.num_workers - 1) / self.num_workers;
201
202 let results: Result<Vec<Vec<O>>> = input
204 .par_chunks(chunk_size)
205 .map(|chunk| {
206 pipeline.execute(chunk.to_vec())
208 })
209 .collect();
210
211 results.map(|chunks| chunks.into_iter().flatten().collect())
213 }
214
215 fn name(&self) -> &str {
216 "distributed"
217 }
218}
219
220pub struct CheckpointedExecutor {
222 checkpoint_dir: PathBuf,
223 checkpoint_interval: usize,
224}
225
226impl CheckpointedExecutor {
227 pub fn new(checkpoint_dir: impl AsRef<Path>, interval: usize) -> Self {
228 Self {
229 checkpoint_dir: checkpoint_dir.as_ref().to_path_buf(),
230 checkpoint_interval: interval,
231 }
232 }
233}
234
235impl<I, O> PipelineExecutor<I, O> for CheckpointedExecutor
236where
237 I: 'static + Send + Sync + serde::Serialize + serde::de::DeserializeOwned,
238 O: 'static + Send + Sync + serde::Serialize + serde::de::DeserializeOwned,
239{
240 fn execute(&self, pipeline: &Pipeline<I, O>, input: I) -> Result<O> {
241 std::fs::create_dir_all(&self.checkpoint_dir).map_err(IoError::Io)?;
243
244 let result = pipeline.execute(input)?;
247
248 let checkpoint_path = self.checkpoint_dir.join("final.checkpoint");
250 let serialized = bincode::serde::encode_to_vec(&result, bincode::config::standard())
251 .map_err(|e| IoError::SerializationError(e.to_string()))?;
252 std::fs::write(&checkpoint_path, serialized).map_err(IoError::Io)?;
253
254 Ok(result)
255 }
256
257 fn name(&self) -> &str {
258 "checkpointed"
259 }
260}
261
262pub struct ExecutorFactory;
264
265impl ExecutorFactory {
266 pub fn sequential() -> Box<dyn PipelineExecutor<Vec<i32>, Vec<i32>>> {
268 Box::new(SequentialExecutor)
269 }
270
271 pub fn streaming(chunk_size: usize) -> Box<dyn PipelineExecutor<Vec<i32>, Vec<i32>>> {
273 Box::new(StreamingExecutor::new(chunk_size))
274 }
275
276 #[cfg(feature = "async")]
278 pub fn async_executor() -> Box<dyn PipelineExecutor<Vec<i32>, Vec<i32>>> {
279 Box::new(AsyncExecutor::new())
280 }
281
282 pub fn cached(cache_dir: impl AsRef<Path>) -> Box<dyn PipelineExecutor<Vec<i32>, Vec<i32>>> {
284 Box::new(CachedExecutor::new(cache_dir))
285 }
286
287 pub fn distributed(num_workers: usize) -> Box<dyn PipelineExecutor<Vec<i32>, Vec<i32>>> {
289 Box::new(DistributedExecutor::new(num_workers))
290 }
291
292 pub fn checkpointed(
294 checkpoint_dir: impl AsRef<Path>,
295 interval: usize,
296 ) -> Box<dyn PipelineExecutor<Vec<i32>, Vec<i32>>> {
297 Box::new(CheckpointedExecutor::new(checkpoint_dir, interval))
298 }
299}
300
301#[cfg(test)]
302mod tests {
303 use super::*;
304
305 #[test]
306 fn test_sequential_executor() {
307 let pipeline: Pipeline<i32, i32> =
308 Pipeline::new().add_stage(function_stage("double", |x: i32| Ok(x * 2)));
309
310 let executor = SequentialExecutor;
311 let result = executor.execute(&pipeline, 21).unwrap();
312 assert_eq!(result, 42);
313 }
314
315 #[test]
316 fn test_streaming_executor() {
317 let pipeline: Pipeline<Vec<i32>, Vec<i32>> = Pipeline::new()
318 .add_stage(function_stage("double_all", |nums: Vec<i32>| {
319 Ok(nums.into_iter().map(|x| x * 2).collect::<Vec<_>>())
320 }));
321
322 let executor = StreamingExecutor::new(2);
323 let result = executor.execute(&pipeline, vec![1, 2, 3, 4]).unwrap();
324 assert_eq!(result, vec![2, 4, 6, 8]);
325 }
326}
327
328pub struct BackpressureStreamingExecutor {
330 chunk_size: usize,
331 max_pending_chunks: usize,
332 timeout: Duration,
333}
334
335impl BackpressureStreamingExecutor {
336 pub fn new(chunk_size: usize, max_pending_chunks: usize) -> Self {
337 Self {
338 chunk_size,
339 max_pending_chunks,
340 timeout: Duration::from_secs(30),
341 }
342 }
343
344 pub fn with_timeout(mut self, timeout: Duration) -> Self {
345 self.timeout = timeout;
346 self
347 }
348}
349
350impl<I, O> PipelineExecutor<Vec<I>, Vec<O>> for BackpressureStreamingExecutor
351where
352 I: 'static + Send + Sync + Clone,
353 O: 'static + Send + Sync,
354{
355 fn execute(&self, pipeline: &Pipeline<Vec<I>, Vec<O>>, input: Vec<I>) -> Result<Vec<O>> {
356 let mut all_results = Vec::new();
359
360 for chunk in input.chunks(self.chunk_size) {
361 let chunk_vec = chunk.to_vec();
362 let result = pipeline.execute(chunk_vec)?;
363 all_results.extend(result);
364 }
365
366 Ok(all_results)
367 }
368
369 fn name(&self) -> &str {
370 "backpressure_streaming"
371 }
372}
373
374pub struct MonitoringExecutor<E> {
376 inner: E,
377 metrics_collector: Arc<Mutex<PipelineMetrics>>,
378}
379
380#[derive(Debug)]
381pub struct PipelineMetrics {
382 pub total_items: AtomicUsize,
383 pub successful_items: AtomicUsize,
384 pub failed_items: AtomicUsize,
385 pub stage_metrics: HashMap<String, StageMetrics>,
386 pub start_time: Option<Instant>,
387 pub end_time: Option<Instant>,
388}
389
390impl Default for PipelineMetrics {
391 fn default() -> Self {
392 Self {
393 total_items: AtomicUsize::new(0),
394 successful_items: AtomicUsize::new(0),
395 failed_items: AtomicUsize::new(0),
396 stage_metrics: HashMap::new(),
397 start_time: None,
398 end_time: None,
399 }
400 }
401}
402
403impl Clone for PipelineMetrics {
404 fn clone(&self) -> Self {
405 Self {
406 total_items: AtomicUsize::new(self.total_items.load(Ordering::SeqCst)),
407 successful_items: AtomicUsize::new(self.successful_items.load(Ordering::SeqCst)),
408 failed_items: AtomicUsize::new(self.failed_items.load(Ordering::SeqCst)),
409 stage_metrics: self.stage_metrics.clone(),
410 start_time: self.start_time,
411 end_time: self.end_time,
412 }
413 }
414}
415
416#[derive(Debug, Clone, Default)]
417pub struct StageMetrics {
418 pub execution_count: usize,
419 pub total_duration: Duration,
420 pub min_duration: Option<Duration>,
421 pub max_duration: Option<Duration>,
422 pub errors: Vec<String>,
423}
424
425impl<E> MonitoringExecutor<E> {
426 pub fn new(inner: E) -> Self {
427 Self {
428 inner,
429 metrics_collector: Arc::new(Mutex::new(PipelineMetrics::default())),
430 }
431 }
432
433 pub fn get_metrics(&self) -> PipelineMetrics {
434 self.metrics_collector.lock().unwrap().clone()
435 }
436}
437
438impl<E, I, O> PipelineExecutor<I, O> for MonitoringExecutor<E>
439where
440 E: PipelineExecutor<I, O>,
441 I: 'static + Send + Sync,
442 O: 'static + Send + Sync,
443{
444 fn execute(&self, pipeline: &Pipeline<I, O>, input: I) -> Result<O> {
445 {
446 let mut metrics = self.metrics_collector.lock().unwrap();
447 metrics.start_time = Some(Instant::now());
448 metrics.total_items.fetch_add(1, Ordering::SeqCst);
449 }
450
451 let result = self.inner.execute(pipeline, input);
452
453 {
454 let mut metrics = self.metrics_collector.lock().unwrap();
455 metrics.end_time = Some(Instant::now());
456
457 match &result {
458 Ok(_) => {
459 metrics.successful_items.fetch_add(1, Ordering::SeqCst);
460 }
461 Err(_) => {
462 metrics.failed_items.fetch_add(1, Ordering::SeqCst);
463 }
464 }
465 }
466
467 result
468 }
469
470 fn name(&self) -> &str {
471 "monitoring"
472 }
473}
474
475pub struct RetryExecutor<E> {
477 inner: E,
478 max_retries: usize,
479 retry_delay: Duration,
480 exponential_backoff: bool,
481}
482
483impl<E> RetryExecutor<E> {
484 pub fn new(inner: E, max_retries: usize) -> Self {
485 Self {
486 inner,
487 max_retries,
488 retry_delay: Duration::from_secs(1),
489 exponential_backoff: true,
490 }
491 }
492
493 pub fn with_delay(mut self, delay: Duration) -> Self {
494 self.retry_delay = delay;
495 self
496 }
497
498 pub fn with_exponential_backoff(mut self, enabled: bool) -> Self {
499 self.exponential_backoff = enabled;
500 self
501 }
502}
503
504impl<E, I, O> PipelineExecutor<I, O> for RetryExecutor<E>
505where
506 E: PipelineExecutor<I, O>,
507 I: 'static + Send + Sync + Clone,
508 O: 'static + Send + Sync,
509{
510 fn execute(&self, pipeline: &Pipeline<I, O>, input: I) -> Result<O> {
511 let mut last_error = None;
512 let mut delay = self.retry_delay;
513
514 for attempt in 0..=self.max_retries {
515 if attempt > 0 {
516 thread::sleep(delay);
517 if self.exponential_backoff {
518 delay *= 2;
519 }
520 }
521
522 match self.inner.execute(pipeline, input.clone()) {
523 Ok(result) => return Ok(result),
524 Err(e) => {
525 last_error = Some(e);
526 }
527 }
528 }
529
530 Err(last_error.unwrap_or_else(|| IoError::Other("Retry failed".to_string())))
531 }
532
533 fn name(&self) -> &str {
534 "retry"
535 }
536}
537
538pub struct EventDrivenExecutor {
540 event_receiver: Receiver<Event>,
541}
542
543#[derive(Debug, Clone)]
544pub enum Event {
545 DataAvailable(String),
546 ScheduledTime(Instant),
547 ExternalTrigger(String),
548 FileCreated(PathBuf),
549}
550
551impl EventDrivenExecutor {
552 pub fn new(event_receiver: Receiver<Event>) -> Self {
553 Self { event_receiver }
554 }
555}
556
557impl<I, O> PipelineExecutor<I, O> for EventDrivenExecutor
558where
559 I: 'static + Send + Sync,
560 O: 'static + Send + Sync,
561{
562 fn execute(&self, pipeline: &Pipeline<I, O>, input: I) -> Result<O> {
563 match self.event_receiver.recv() {
565 Ok(event) => {
566 match event {
567 Event::DataAvailable(_) | Event::ExternalTrigger(_) | Event::FileCreated(_) => {
568 pipeline.execute(input)
570 }
571 Event::ScheduledTime(scheduled) => {
572 let now = Instant::now();
574 if scheduled > now {
575 thread::sleep(scheduled - now);
576 }
577 pipeline.execute(input)
578 }
579 }
580 }
581 Err(_) => Err(IoError::Other("Event channel closed".to_string())),
582 }
583 }
584
585 fn name(&self) -> &str {
586 "event_driven"
587 }
588}
589
590pub struct ParallelStageExecutor {
592 max_parallelism: usize,
593}
594
595impl ParallelStageExecutor {
596 pub fn new(max_parallelism: usize) -> Self {
597 Self { max_parallelism }
598 }
599}
600
601impl<I, O> PipelineExecutor<I, O> for ParallelStageExecutor
602where
603 I: 'static + Send + Sync,
604 O: 'static + Send + Sync,
605{
606 fn execute(&self, pipeline: &Pipeline<I, O>, input: I) -> Result<O> {
607 pipeline.execute(input)
611 }
612
613 fn name(&self) -> &str {
614 "parallel_stage"
615 }
616}