Skip to main content

agentic_workflow/engine/
stream.rs

1use std::collections::HashMap;
2
3use chrono::Utc;
4use uuid::Uuid;
5
6use crate::types::{
7    BackpressureConfig, BackpressureStrategy, ProcessingWindow, StreamCheckpoint,
8    StreamFork, StreamProcessor, StreamSource, StreamStatus,
9    WorkflowError, WorkflowResult,
10};
11
12/// Unified stream processing engine.
13pub struct StreamEngine {
14    processors: HashMap<String, StreamProcessor>,
15    checkpoints: HashMap<String, StreamCheckpoint>,
16    forks: HashMap<String, Vec<StreamFork>>,
17}
18
19impl StreamEngine {
20    pub fn new() -> Self {
21        Self {
22            processors: HashMap::new(),
23            checkpoints: HashMap::new(),
24            forks: HashMap::new(),
25        }
26    }
27
28    /// Create a stream processor.
29    pub fn create_processor(
30        &mut self,
31        name: &str,
32        workflow_id: &str,
33        source: StreamSource,
34        window: Option<ProcessingWindow>,
35        max_queue_size: usize,
36    ) -> WorkflowResult<String> {
37        let id = Uuid::new_v4().to_string();
38        let processor = StreamProcessor {
39            id: id.clone(),
40            name: name.to_string(),
41            workflow_id: workflow_id.to_string(),
42            source,
43            window,
44            backpressure: BackpressureConfig {
45                max_queue_size,
46                strategy: BackpressureStrategy::SlowDown,
47            },
48            status: StreamStatus::Created,
49            created_at: Utc::now(),
50        };
51
52        self.processors.insert(id.clone(), processor);
53        Ok(id)
54    }
55
56    /// Start consuming from a stream.
57    pub fn start(&mut self, stream_id: &str) -> WorkflowResult<()> {
58        let proc = self
59            .processors
60            .get_mut(stream_id)
61            .ok_or_else(|| WorkflowError::StreamError(format!("Not found: {}", stream_id)))?;
62
63        proc.status = StreamStatus::Running;
64        Ok(())
65    }
66
67    /// Pause stream consumption.
68    pub fn pause(&mut self, stream_id: &str) -> WorkflowResult<()> {
69        let proc = self
70            .processors
71            .get_mut(stream_id)
72            .ok_or_else(|| WorkflowError::StreamError(format!("Not found: {}", stream_id)))?;
73
74        proc.status = StreamStatus::Paused;
75        Ok(())
76    }
77
78    /// Stop a stream processor.
79    pub fn stop(&mut self, stream_id: &str) -> WorkflowResult<()> {
80        let proc = self
81            .processors
82            .get_mut(stream_id)
83            .ok_or_else(|| WorkflowError::StreamError(format!("Not found: {}", stream_id)))?;
84
85        proc.status = StreamStatus::Stopped;
86        Ok(())
87    }
88
89    /// Force checkpoint at current position.
90    pub fn checkpoint(&mut self, stream_id: &str, offset: u64, items_processed: u64) -> WorkflowResult<()> {
91        if !self.processors.contains_key(stream_id) {
92            return Err(WorkflowError::StreamError(format!("Not found: {}", stream_id)));
93        }
94
95        let cp = StreamCheckpoint {
96            stream_id: stream_id.to_string(),
97            offset,
98            items_processed,
99            checkpoint_at: Utc::now(),
100        };
101
102        self.checkpoints.insert(stream_id.to_string(), cp);
103        Ok(())
104    }
105
106    /// Add a fork to split stream by condition.
107    pub fn add_fork(
108        &mut self,
109        stream_id: &str,
110        name: &str,
111        condition: &str,
112        target_workflow_id: &str,
113    ) -> WorkflowResult<String> {
114        if !self.processors.contains_key(stream_id) {
115            return Err(WorkflowError::StreamError(format!("Not found: {}", stream_id)));
116        }
117
118        let fork_id = Uuid::new_v4().to_string();
119        let fork = StreamFork {
120            id: fork_id.clone(),
121            stream_id: stream_id.to_string(),
122            condition: condition.to_string(),
123            target_workflow_id: target_workflow_id.to_string(),
124            name: name.to_string(),
125        };
126
127        self.forks
128            .entry(stream_id.to_string())
129            .or_default()
130            .push(fork);
131
132        Ok(fork_id)
133    }
134
135    /// Get processor status.
136    pub fn get_processor(&self, stream_id: &str) -> WorkflowResult<&StreamProcessor> {
137        self.processors
138            .get(stream_id)
139            .ok_or_else(|| WorkflowError::StreamError(format!("Not found: {}", stream_id)))
140    }
141
142    /// List all processors.
143    pub fn list_processors(&self) -> Vec<&StreamProcessor> {
144        self.processors.values().collect()
145    }
146}
147
148impl Default for StreamEngine {
149    fn default() -> Self {
150        Self::new()
151    }
152}
153
154#[cfg(test)]
155mod tests {
156    use super::*;
157
158    #[test]
159    fn test_stream_lifecycle() {
160        let mut engine = StreamEngine::new();
161        let sid = engine
162            .create_processor(
163                "file-watcher",
164                "wf-1",
165                StreamSource::FileWatch {
166                    path: "/tmp/data".to_string(),
167                    pattern: Some("*.csv".to_string()),
168                },
169                None,
170                100,
171            )
172            .unwrap();
173
174        assert_eq!(engine.get_processor(&sid).unwrap().status, StreamStatus::Created);
175        engine.start(&sid).unwrap();
176        assert_eq!(engine.get_processor(&sid).unwrap().status, StreamStatus::Running);
177        engine.pause(&sid).unwrap();
178        assert_eq!(engine.get_processor(&sid).unwrap().status, StreamStatus::Paused);
179        engine.stop(&sid).unwrap();
180        assert_eq!(engine.get_processor(&sid).unwrap().status, StreamStatus::Stopped);
181    }
182}