1use std::collections::HashMap;
6use std::sync::Arc;
7use tokio::sync::RwLock;
8use chrono::{DateTime, Utc};
9
10use kotoba_core::types::GraphRef_ as GraphRef;
11use crate::ir::{WorkflowExecution, WorkflowExecutionId, ExecutionEvent, ExecutionEventType, ExecutionStatus};
12use kotoba_errors::WorkflowError;
13
14#[derive(Debug, Clone)]
16pub enum StorageBackend {
17 Memory,
18 #[cfg(feature = "rocksdb")]
19 RocksDB { path: String },
20 #[cfg(feature = "sqlite")]
21 SQLite { path: String },
22}
23
24pub struct StorageFactory;
26
27impl StorageFactory {
28 pub async fn create(backend: StorageBackend) -> Result<Arc<dyn WorkflowStore>, WorkflowError> {
29 match backend {
30 StorageBackend::Memory => Ok(Arc::new(MemoryWorkflowStore::new())),
31 #[cfg(feature = "rocksdb")]
32 StorageBackend::RocksDB { path } => {
33 RocksDBWorkflowStore::new(&path).await.map(|s| Arc::new(s) as Arc<dyn WorkflowStore>)
34 }
35 #[cfg(feature = "sqlite")]
36 StorageBackend::SQLite { path } => {
37 SQLiteWorkflowStore::new(&path).await.map(|s| Arc::new(s) as Arc<dyn WorkflowStore>)
38 }
39 #[cfg(not(any(feature = "rocksdb", feature = "sqlite")))]
40 _ => Err(WorkflowError::StorageError("No storage backend enabled. Enable 'rocksdb' or 'sqlite' feature".to_string())),
41 }
42 }
43}
44
45#[async_trait::async_trait]
47pub trait WorkflowStore: Send + Sync {
48 async fn save_execution(&self, execution: &WorkflowExecution) -> Result<(), WorkflowError>;
50
51 async fn get_execution(&self, id: &WorkflowExecutionId) -> Result<Option<WorkflowExecution>, WorkflowError>;
53
54 async fn update_execution(&self, execution: &WorkflowExecution) -> Result<(), WorkflowError>;
56
57 async fn add_event(&self, execution_id: &WorkflowExecutionId, event: ExecutionEvent) -> Result<(), WorkflowError>;
59
60 async fn get_events(&self, execution_id: &WorkflowExecutionId) -> Result<Vec<ExecutionEvent>, WorkflowError>;
62
63 async fn get_running_executions(&self) -> Result<Vec<WorkflowExecution>, WorkflowError>;
65
66 async fn put(&self, key: String, value: Vec<u8>) -> Result<(), WorkflowError>;
68 async fn get(&self, key: &str) -> Result<Option<Vec<u8>>, WorkflowError>;
69 async fn get_keys_with_prefix(&self, prefix: &str) -> Result<Vec<String>, WorkflowError>;
70}
71
72pub struct MemoryWorkflowStore {
74 executions: RwLock<HashMap<String, WorkflowExecution>>,
75 events: RwLock<HashMap<String, Vec<ExecutionEvent>>>,
76 kv_store: RwLock<HashMap<String, Vec<u8>>>,
77}
78
79impl MemoryWorkflowStore {
80 pub fn new() -> Self {
81 Self {
82 executions: RwLock::new(HashMap::new()),
83 events: RwLock::new(HashMap::new()),
84 kv_store: RwLock::new(HashMap::new()),
85 }
86 }
87}
88
89#[async_trait::async_trait]
90impl WorkflowStore for MemoryWorkflowStore {
91 async fn save_execution(&self, execution: &WorkflowExecution) -> Result<(), WorkflowError> {
92 let mut executions = self.executions.write().await;
93 executions.insert(execution.id.0.clone(), execution.clone());
94 Ok(())
95 }
96
97 async fn get_execution(&self, id: &WorkflowExecutionId) -> Result<Option<WorkflowExecution>, WorkflowError> {
98 let executions = self.executions.read().await;
99 Ok(executions.get(&id.0).cloned())
100 }
101
102 async fn update_execution(&self, execution: &WorkflowExecution) -> Result<(), WorkflowError> {
103 let mut executions = self.executions.write().await;
104 executions.insert(execution.id.0.clone(), execution.clone());
105 Ok(())
106 }
107
108 async fn add_event(&self, execution_id: &WorkflowExecutionId, event: ExecutionEvent) -> Result<(), WorkflowError> {
109 let mut events = self.events.write().await;
110 events.entry(execution_id.0.clone())
111 .or_insert_with(Vec::new)
112 .push(event);
113 Ok(())
114 }
115
116 async fn get_events(&self, execution_id: &WorkflowExecutionId) -> Result<Vec<ExecutionEvent>, WorkflowError> {
117 let events = self.events.read().await;
118 Ok(events.get(&execution_id.0).cloned().unwrap_or_default())
119 }
120
121 async fn get_running_executions(&self) -> Result<Vec<WorkflowExecution>, WorkflowError> {
122 let executions = self.executions.read().await;
123 let running = executions.values()
124 .filter(|e| matches!(e.status, ExecutionStatus::Running))
125 .cloned()
126 .collect();
127 Ok(running)
128 }
129
130 async fn put(&self, key: String, value: Vec<u8>) -> Result<(), WorkflowError> {
131 let mut kv_store = self.kv_store.write().await;
132 kv_store.insert(key, value);
133 Ok(())
134 }
135
136 async fn get(&self, key: &str) -> Result<Option<Vec<u8>>, WorkflowError> {
137 let kv_store = self.kv_store.read().await;
138 Ok(kv_store.get(key).cloned())
139 }
140
141 async fn get_keys_with_prefix(&self, prefix: &str) -> Result<Vec<String>, WorkflowError> {
142 let kv_store = self.kv_store.read().await;
143 let keys = kv_store.keys()
144 .filter(|k| k.starts_with(prefix))
145 .cloned()
146 .collect();
147 Ok(keys)
148 }
149}
150
151pub struct EventSourcingManager {
153 store: Arc<dyn WorkflowStore>,
154 snapshot_interval: usize,
156 max_versions: usize,
158}
159
160impl EventSourcingManager {
161 pub fn new(store: Arc<dyn WorkflowStore>) -> Self {
162 Self {
163 store,
164 snapshot_interval: 100, max_versions: 10, }
167 }
168
169 pub fn with_snapshot_config(mut self, snapshot_interval: usize, max_versions: usize) -> Self {
170 self.snapshot_interval = snapshot_interval;
171 self.max_versions = max_versions;
172 self
173 }
174
175 pub async fn rebuild_execution(&self, execution_id: &WorkflowExecutionId) -> Result<Option<WorkflowExecution>, WorkflowError> {
177 let events = self.store.get_events(execution_id).await?;
178
179 if events.is_empty() {
180 return Ok(None);
181 }
182
183 let mut execution = WorkflowExecution {
184 id: execution_id.clone(),
185 workflow_id: String::new(), status: ExecutionStatus::Running,
187 start_time: events[0].timestamp,
188 end_time: None,
189 inputs: HashMap::new(),
190 outputs: None,
191 current_graph: GraphRef("reconstructed".to_string()), execution_history: events.clone(),
193 retry_count: 0,
194 timeout_at: None,
195 };
196
197 for event in events {
199 match event.event_type {
200 ExecutionEventType::Started => {
201 if let Some(workflow_id) = event.payload.get("workflow_id").and_then(|v| v.as_str()) {
202 execution.workflow_id = workflow_id.to_string();
203 }
204 if let Some(inputs) = event.payload.get("inputs").and_then(|v| v.as_object()) {
205 execution.inputs = inputs.clone().into_iter().collect();
206 }
207 }
208 ExecutionEventType::WorkflowCompleted => {
209 execution.status = ExecutionStatus::Completed;
210 execution.end_time = Some(event.timestamp);
211 if let Some(outputs) = event.payload.get("outputs") {
212 execution.outputs = serde_json::from_value(outputs.clone()).ok();
213 }
214 }
215 ExecutionEventType::WorkflowFailed => {
216 execution.status = ExecutionStatus::Failed;
217 execution.end_time = Some(event.timestamp);
218 }
219 ExecutionEventType::WorkflowCancelled => {
220 execution.status = ExecutionStatus::Cancelled;
221 execution.end_time = Some(event.timestamp);
222 }
223 _ => {} }
225 }
226
227 Ok(Some(execution))
228 }
229
230 pub async fn record_event(&self, execution_id: &WorkflowExecutionId, event_type: ExecutionEventType, payload: HashMap<String, serde_json::Value>) -> Result<(), WorkflowError> {
232 let event = ExecutionEvent {
233 id: uuid::Uuid::new_v4().to_string(),
234 timestamp: Utc::now(),
235 event_type,
236 payload,
237 };
238
239 self.store.add_event(execution_id, event).await?;
241
242 if self.needs_snapshot(execution_id).await? {
244 self.create_snapshot(execution_id).await?;
245 }
246
247 Ok(())
248 }
249
250 pub async fn needs_snapshot(&self, execution_id: &WorkflowExecutionId) -> Result<bool, WorkflowError> {
252 let events = self.store.get_events(execution_id).await?;
253 Ok(events.len() % self.snapshot_interval == 0)
254 }
255
256 pub async fn create_snapshot(&self, execution_id: &WorkflowExecutionId) -> Result<(), WorkflowError> {
258 if let Some(execution) = self.store.get_execution(execution_id).await? {
259 self.store.save_execution(&execution).await?;
261
262 self.cleanup_old_events(execution_id).await?;
264 }
265 Ok(())
266 }
267
268 pub async fn cleanup_old_events(&self, execution_id: &WorkflowExecutionId) -> Result<(), WorkflowError> {
270 Ok(())
273 }
274
275 pub async fn get_full_event_history(&self, execution_id: &WorkflowExecutionId) -> Result<Vec<ExecutionEvent>, WorkflowError> {
277 self.store.get_events(execution_id).await
278 }
279
280 pub async fn rebuild_execution_from_events(&self, execution_id: &WorkflowExecutionId) -> Result<Option<WorkflowExecution>, WorkflowError> {
282 let events = self.get_full_event_history(execution_id).await?;
283
284 if events.is_empty() {
285 return Ok(None);
286 }
287
288 let mut execution = if let Some(snapshot) = self.store.get_execution(execution_id).await? {
290 snapshot
291 } else {
292 WorkflowExecution {
294 id: execution_id.clone(),
295 workflow_id: String::new(),
296 status: ExecutionStatus::Running,
297 start_time: events[0].timestamp,
298 end_time: None,
299 inputs: HashMap::new(),
300 outputs: None,
301 current_graph: GraphRef("reconstructed".to_string()),
302 execution_history: vec![],
303 retry_count: 0,
304 timeout_at: None,
305 }
306 };
307
308 for event in &events {
310 if event.timestamp > execution.start_time {
311 self.apply_event_to_execution(&mut execution, event);
312 }
313 }
314
315 Ok(Some(execution))
316 }
317
318 fn apply_event_to_execution(&self, execution: &mut WorkflowExecution, event: &ExecutionEvent) {
320 match &event.event_type {
321 ExecutionEventType::Started => {
322 if let Some(workflow_id) = event.payload.get("workflow_id").and_then(|v| v.as_str()) {
323 execution.workflow_id = workflow_id.to_string();
324 }
325 if let Some(inputs) = event.payload.get("inputs").and_then(|v| v.as_object()) {
326 execution.inputs = inputs.clone().into_iter().collect();
327 }
328 }
329 ExecutionEventType::ActivityScheduled => {
330 execution.execution_history.push(event.clone());
332 }
333 ExecutionEventType::ActivityStarted => {
334 execution.execution_history.push(event.clone());
335 }
336 ExecutionEventType::ActivityCompleted => {
337 execution.execution_history.push(event.clone());
338 }
339 ExecutionEventType::ActivityFailed => {
340 execution.execution_history.push(event.clone());
341 }
342 ExecutionEventType::WorkflowCompleted => {
343 execution.status = ExecutionStatus::Completed;
344 execution.end_time = Some(event.timestamp);
345 if let Some(outputs) = event.payload.get("outputs") {
346 execution.outputs = serde_json::from_value(outputs.clone()).ok();
347 }
348 }
349 ExecutionEventType::WorkflowFailed => {
350 execution.status = ExecutionStatus::Failed;
351 execution.end_time = Some(event.timestamp);
352 }
353 ExecutionEventType::WorkflowCancelled => {
354 execution.status = ExecutionStatus::Cancelled;
355 execution.end_time = Some(event.timestamp);
356 }
357 _ => {
358 execution.execution_history.push(event.clone());
360 }
361 }
362 }
363}
364
365pub struct SnapshotManager {
367 store: Arc<dyn WorkflowStore>,
368 event_sourcing: Arc<EventSourcingManager>,
369 snapshot_interval: usize, max_snapshots_per_execution: usize, }
372
373impl SnapshotManager {
374 pub fn new(store: Arc<dyn WorkflowStore>, event_sourcing: Arc<EventSourcingManager>) -> Self {
375 Self {
376 store,
377 event_sourcing,
378 snapshot_interval: 50, max_snapshots_per_execution: 5, }
381 }
382
383 pub fn with_config(mut self, snapshot_interval: usize, max_snapshots: usize) -> Self {
384 self.snapshot_interval = snapshot_interval;
385 self.max_snapshots_per_execution = max_snapshots;
386 self
387 }
388
389 pub async fn needs_snapshot(&self, execution_id: &WorkflowExecutionId) -> Result<bool, WorkflowError> {
391 let events = self.store.get_events(execution_id).await?;
392 Ok(events.len() % self.snapshot_interval == 0)
393 }
394
395 pub async fn create_snapshot(&self, execution: &WorkflowExecution) -> Result<(), WorkflowError> {
397 self.store.save_execution(execution).await?;
399
400 self.cleanup_old_snapshots(&execution.id).await?;
402
403 Ok(())
404 }
405
406 pub async fn cleanup_old_snapshots(&self, execution_id: &WorkflowExecutionId) -> Result<(), WorkflowError> {
408 Ok(())
411 }
412
413 pub async fn restore_from_snapshot(&self, execution_id: &WorkflowExecutionId) -> Result<Option<WorkflowExecution>, WorkflowError> {
415 if let Some(execution) = self.store.get_execution(execution_id).await? {
417 let events = self.store.get_events(execution_id).await?;
419 let mut restored_execution = execution;
420
421 for event in events {
423 if event.timestamp > restored_execution.start_time {
424 self.event_sourcing.apply_event_to_execution(&mut restored_execution, &event);
425 }
426 }
427
428 Ok(Some(restored_execution))
429 } else {
430 Ok(None)
431 }
432 }
433
434 pub async fn get_performance_stats(&self) -> HashMap<String, usize> {
436 let mut stats = HashMap::new();
437 stats.insert("total_snapshots".to_string(), 0);
439 stats.insert("avg_events_per_snapshot".to_string(), self.snapshot_interval);
440 stats
441 }
442}
443
444#[cfg(feature = "rocksdb")]
445pub struct RocksDBWorkflowStore {
447 db: rocksdb::DB,
448}
449
450#[cfg(feature = "rocksdb")]
451impl RocksDBWorkflowStore {
452 pub async fn new(path: &str) -> Result<Self, WorkflowError> {
453 let mut opts = rocksdb::Options::default();
454 opts.create_if_missing(true);
455 opts.set_compression_type(rocksdb::DBCompressionType::Lz4);
456
457 let db = rocksdb::DB::open(&opts, path)
458 .map_err(|e| WorkflowError::StorageError(format!("Failed to open RocksDB: {}", e)))?;
459
460 Ok(Self { db })
461 }
462
463 fn execution_key(execution_id: &WorkflowExecutionId) -> String {
464 format!("execution:{}", execution_id.0)
465 }
466
467 fn events_key(execution_id: &WorkflowExecutionId) -> String {
468 format!("events:{}", execution_id.0)
469 }
470}
471
472#[cfg(feature = "rocksdb")]
473#[async_trait::async_trait]
474impl WorkflowStore for RocksDBWorkflowStore {
475 async fn save_execution(&self, execution: &WorkflowExecution) -> Result<(), WorkflowError> {
476 let key = Self::execution_key(&execution.id);
477 let value = serde_json::to_string(execution)
478 .map_err(|e| WorkflowError::SerializationError(e))?;
479
480 self.db.put(key.as_bytes(), value.as_bytes())
481 .map_err(|e| WorkflowError::StorageError(format!("RocksDB put error: {}", e)))?;
482
483 Ok(())
484 }
485
486 async fn get_execution(&self, id: &WorkflowExecutionId) -> Result<Option<WorkflowExecution>, WorkflowError> {
487 let key = Self::execution_key(id);
488
489 match self.db.get(key.as_bytes())
490 .map_err(|e| WorkflowError::StorageError(format!("RocksDB get error: {}", e)))? {
491 Some(data) => {
492 let execution: WorkflowExecution = serde_json::from_slice(&data)
493 .map_err(|e| WorkflowError::SerializationError(e))?;
494 Ok(Some(execution))
495 }
496 None => Ok(None),
497 }
498 }
499
500 async fn update_execution(&self, execution: &WorkflowExecution) -> Result<(), WorkflowError> {
501 self.save_execution(execution).await
502 }
503
504 async fn add_event(&self, execution_id: &WorkflowExecutionId, event: ExecutionEvent) -> Result<(), WorkflowError> {
505 let key = Self::events_key(execution_id);
506
507 let mut events = match self.db.get(key.as_bytes())
509 .map_err(|e| WorkflowError::StorageError(format!("RocksDB get error: {}", e)))? {
510 Some(data) => serde_json::from_slice::<Vec<ExecutionEvent>>(&data)
511 .map_err(|e| WorkflowError::SerializationError(e))?,
512 None => Vec::new(),
513 };
514
515 events.push(event);
516
517 let value = serde_json::to_string(&events)
518 .map_err(|e| WorkflowError::SerializationError(e))?;
519
520 self.db.put(key.as_bytes(), value.as_bytes())
521 .map_err(|e| WorkflowError::StorageError(format!("RocksDB put error: {}", e)))?;
522
523 Ok(())
524 }
525
526 async fn get_events(&self, execution_id: &WorkflowExecutionId) -> Result<Vec<ExecutionEvent>, WorkflowError> {
527 let key = Self::events_key(execution_id);
528
529 match self.db.get(key.as_bytes())
530 .map_err(|e| WorkflowError::StorageError(format!("RocksDB get error: {}", e)))? {
531 Some(data) => {
532 let events: Vec<ExecutionEvent> = serde_json::from_slice(&data)
533 .map_err(|e| WorkflowError::SerializationError(e))?;
534 Ok(events)
535 }
536 None => Ok(Vec::new()),
537 }
538 }
539
540 async fn get_running_executions(&self) -> Result<Vec<WorkflowExecution>, WorkflowError> {
541 let mut executions = Vec::new();
543 let iter = self.db.prefix_iterator(b"execution:");
544
545 for item in iter {
546 let (_key, value) = item.map_err(|e| WorkflowError::StorageError(format!("Iterator error: {}", e)))?;
547 let execution: WorkflowExecution = serde_json::from_slice(&value)
548 .map_err(|e| WorkflowError::SerializationError(e))?;
549
550 if matches!(execution.status, ExecutionStatus::Running) {
551 executions.push(execution);
552 }
553 }
554
555 Ok(executions)
556 }
557}
558
559#[cfg(feature = "sqlite")]
560use std::sync::Mutex;
561
562#[cfg(feature = "sqlite")]
564pub struct SQLiteWorkflowStore {
565 conn: Arc<Mutex<rusqlite::Connection>>,
566}
567
568#[cfg(feature = "sqlite")]
569impl SQLiteWorkflowStore {
570 pub async fn new(path: &str) -> Result<Self, WorkflowError> {
571 let conn = rusqlite::Connection::open(path)
572 .map_err(|e| WorkflowError::StorageError(format!("Failed to open SQLite: {}", e)))?;
573
574 conn.execute(
576 "CREATE TABLE IF NOT EXISTS executions (
577 id TEXT PRIMARY KEY,
578 data TEXT NOT NULL
579 )",
580 [],
581 ).map_err(|e| WorkflowError::StorageError(format!("Table creation error: {}", e)))?;
582
583 conn.execute(
584 "CREATE TABLE IF NOT EXISTS events (
585 id INTEGER PRIMARY KEY,
586 execution_id TEXT NOT NULL,
587 event_data TEXT NOT NULL,
588 FOREIGN KEY(execution_id) REFERENCES executions(id)
589 )",
590 [],
591 ).map_err(|e| WorkflowError::StorageError(format!("Table creation error: {}", e)))?;
592
593 Ok(Self {
594 conn: Arc::new(Mutex::new(conn)),
595 })
596 }
597}
598
599#[cfg(feature = "sqlite")]
600#[async_trait::async_trait]
601impl WorkflowStore for SQLiteWorkflowStore {
602 async fn save_execution(&self, execution: &WorkflowExecution) -> Result<(), WorkflowError> {
603 let data = serde_json::to_string(execution)
604 .map_err(|e| WorkflowError::SerializationError(e))?;
605
606 let conn = self.conn.lock().map_err(|_| WorkflowError::StorageError("Mutex poison".to_string()))?;
607 conn.execute(
608 "INSERT OR REPLACE INTO executions (id, data) VALUES (?1, ?2)",
609 [&execution.id.0, &data],
610 ).map_err(|e| WorkflowError::StorageError(format!("SQLite insert error: {}", e)))?;
611
612 Ok(())
613 }
614
615 async fn get_execution(&self, id: &WorkflowExecutionId) -> Result<Option<WorkflowExecution>, WorkflowError> {
616 let conn = self.conn.lock().map_err(|_| WorkflowError::StorageError("Mutex poison".to_string()))?;
617 let mut stmt = conn.prepare("SELECT data FROM executions WHERE id = ?1")
618 .map_err(|e| WorkflowError::StorageError(format!("SQLite prepare error: {}", e)))?;
619
620 let mut rows = stmt.query_map([&id.0], |row| row.get::<_, String>(0))
621 .map_err(|e| WorkflowError::StorageError(format!("SQLite query error: {}", e)))?;
622
623 if let Some(row) = rows.next() {
624 let data: String = row.map_err(|e| WorkflowError::StorageError(format!("SQLite row error: {}", e)))?;
625 let execution: WorkflowExecution = serde_json::from_str(&data)
626 .map_err(|e| WorkflowError::SerializationError(e))?;
627 Ok(Some(execution))
628 } else {
629 Ok(None)
630 }
631 }
632
633 async fn update_execution(&self, execution: &WorkflowExecution) -> Result<(), WorkflowError> {
634 self.save_execution(execution).await
635 }
636
637 async fn add_event(&self, execution_id: &WorkflowExecutionId, event: ExecutionEvent) -> Result<(), WorkflowError> {
638 let event_data = serde_json::to_string(&event)
639 .map_err(|e| WorkflowError::SerializationError(e))?;
640
641 let conn = self.conn.lock().map_err(|_| WorkflowError::StorageError("Mutex poison".to_string()))?;
642 conn.execute(
643 "INSERT INTO events (execution_id, event_data) VALUES (?1, ?2)",
644 [&execution_id.0, &event_data],
645 ).map_err(|e| WorkflowError::StorageError(format!("SQLite insert error: {}", e)))?;
646
647 Ok(())
648 }
649
650 async fn get_events(&self, execution_id: &WorkflowExecutionId) -> Result<Vec<ExecutionEvent>, WorkflowError> {
651 let conn = self.conn.lock().map_err(|_| WorkflowError::StorageError("Mutex poison".to_string()))?;
652 let mut stmt = conn.prepare("SELECT event_data FROM events WHERE execution_id = ?1 ORDER BY id")
653 .map_err(|e| WorkflowError::StorageError(format!("SQLite prepare error: {}", e)))?;
654
655 let rows = stmt.query_map([&execution_id.0], |row| row.get::<_, String>(0))
656 .map_err(|e| WorkflowError::StorageError(format!("SQLite query error: {}", e)))?;
657
658 let mut events = Vec::new();
659 for row in rows {
660 let data: String = row.map_err(|e| WorkflowError::StorageError(format!("SQLite row error: {}", e)))?;
661 let event: ExecutionEvent = serde_json::from_str(&data)
662 .map_err(|e| WorkflowError::SerializationError(e))?;
663 events.push(event);
664 }
665
666 Ok(events)
667 }
668
669 async fn get_running_executions(&self) -> Result<Vec<WorkflowExecution>, WorkflowError> {
670 let conn = self.conn.lock().map_err(|_| WorkflowError::StorageError("Mutex poison".to_string()))?;
671 let mut stmt = conn.prepare("SELECT data FROM executions")
672 .map_err(|e| WorkflowError::StorageError(format!("SQLite prepare error: {}", e)))?;
673
674 let rows = stmt.query_map([], |row| row.get::<_, String>(0))
675 .map_err(|e| WorkflowError::StorageError(format!("SQLite query error: {}", e)))?;
676
677 let mut executions = Vec::new();
678 for row in rows {
679 let data: String = row.map_err(|e| WorkflowError::StorageError(format!("SQLite row error: {}", e)))?;
680 let execution: WorkflowExecution = serde_json::from_str(&data)
681 .map_err(|e| WorkflowError::SerializationError(e))?;
682
683 if matches!(execution.status, ExecutionStatus::Running) {
684 executions.push(execution);
685 }
686 }
687
688 Ok(executions)
689 }
690}
691