agentic_workflow/engine/
stream.rs1use std::collections::HashMap;
2
3use chrono::Utc;
4use uuid::Uuid;
5
6use crate::types::{
7 BackpressureConfig, BackpressureStrategy, ProcessingWindow, StreamCheckpoint,
8 StreamFork, StreamProcessor, StreamSource, StreamStatus,
9 WorkflowError, WorkflowResult,
10};
11
12pub struct StreamEngine {
14 processors: HashMap<String, StreamProcessor>,
15 checkpoints: HashMap<String, StreamCheckpoint>,
16 forks: HashMap<String, Vec<StreamFork>>,
17}
18
19impl StreamEngine {
20 pub fn new() -> Self {
21 Self {
22 processors: HashMap::new(),
23 checkpoints: HashMap::new(),
24 forks: HashMap::new(),
25 }
26 }
27
28 pub fn create_processor(
30 &mut self,
31 name: &str,
32 workflow_id: &str,
33 source: StreamSource,
34 window: Option<ProcessingWindow>,
35 max_queue_size: usize,
36 ) -> WorkflowResult<String> {
37 let id = Uuid::new_v4().to_string();
38 let processor = StreamProcessor {
39 id: id.clone(),
40 name: name.to_string(),
41 workflow_id: workflow_id.to_string(),
42 source,
43 window,
44 backpressure: BackpressureConfig {
45 max_queue_size,
46 strategy: BackpressureStrategy::SlowDown,
47 },
48 status: StreamStatus::Created,
49 created_at: Utc::now(),
50 };
51
52 self.processors.insert(id.clone(), processor);
53 Ok(id)
54 }
55
56 pub fn start(&mut self, stream_id: &str) -> WorkflowResult<()> {
58 let proc = self
59 .processors
60 .get_mut(stream_id)
61 .ok_or_else(|| WorkflowError::StreamError(format!("Not found: {}", stream_id)))?;
62
63 proc.status = StreamStatus::Running;
64 Ok(())
65 }
66
67 pub fn pause(&mut self, stream_id: &str) -> WorkflowResult<()> {
69 let proc = self
70 .processors
71 .get_mut(stream_id)
72 .ok_or_else(|| WorkflowError::StreamError(format!("Not found: {}", stream_id)))?;
73
74 proc.status = StreamStatus::Paused;
75 Ok(())
76 }
77
78 pub fn stop(&mut self, stream_id: &str) -> WorkflowResult<()> {
80 let proc = self
81 .processors
82 .get_mut(stream_id)
83 .ok_or_else(|| WorkflowError::StreamError(format!("Not found: {}", stream_id)))?;
84
85 proc.status = StreamStatus::Stopped;
86 Ok(())
87 }
88
89 pub fn checkpoint(&mut self, stream_id: &str, offset: u64, items_processed: u64) -> WorkflowResult<()> {
91 if !self.processors.contains_key(stream_id) {
92 return Err(WorkflowError::StreamError(format!("Not found: {}", stream_id)));
93 }
94
95 let cp = StreamCheckpoint {
96 stream_id: stream_id.to_string(),
97 offset,
98 items_processed,
99 checkpoint_at: Utc::now(),
100 };
101
102 self.checkpoints.insert(stream_id.to_string(), cp);
103 Ok(())
104 }
105
106 pub fn add_fork(
108 &mut self,
109 stream_id: &str,
110 name: &str,
111 condition: &str,
112 target_workflow_id: &str,
113 ) -> WorkflowResult<String> {
114 if !self.processors.contains_key(stream_id) {
115 return Err(WorkflowError::StreamError(format!("Not found: {}", stream_id)));
116 }
117
118 let fork_id = Uuid::new_v4().to_string();
119 let fork = StreamFork {
120 id: fork_id.clone(),
121 stream_id: stream_id.to_string(),
122 condition: condition.to_string(),
123 target_workflow_id: target_workflow_id.to_string(),
124 name: name.to_string(),
125 };
126
127 self.forks
128 .entry(stream_id.to_string())
129 .or_default()
130 .push(fork);
131
132 Ok(fork_id)
133 }
134
135 pub fn get_processor(&self, stream_id: &str) -> WorkflowResult<&StreamProcessor> {
137 self.processors
138 .get(stream_id)
139 .ok_or_else(|| WorkflowError::StreamError(format!("Not found: {}", stream_id)))
140 }
141
142 pub fn list_processors(&self) -> Vec<&StreamProcessor> {
144 self.processors.values().collect()
145 }
146}
147
148impl Default for StreamEngine {
149 fn default() -> Self {
150 Self::new()
151 }
152}
153
154#[cfg(test)]
155mod tests {
156 use super::*;
157
158 #[test]
159 fn test_stream_lifecycle() {
160 let mut engine = StreamEngine::new();
161 let sid = engine
162 .create_processor(
163 "file-watcher",
164 "wf-1",
165 StreamSource::FileWatch {
166 path: "/tmp/data".to_string(),
167 pattern: Some("*.csv".to_string()),
168 },
169 None,
170 100,
171 )
172 .unwrap();
173
174 assert_eq!(engine.get_processor(&sid).unwrap().status, StreamStatus::Created);
175 engine.start(&sid).unwrap();
176 assert_eq!(engine.get_processor(&sid).unwrap().status, StreamStatus::Running);
177 engine.pause(&sid).unwrap();
178 assert_eq!(engine.get_processor(&sid).unwrap().status, StreamStatus::Paused);
179 engine.stop(&sid).unwrap();
180 assert_eq!(engine.get_processor(&sid).unwrap().status, StreamStatus::Stopped);
181 }
182}