kotoba_workflow/
store.rs

1//! Workflow Store - プラガブルなワークフロー状態管理
2//!
3//! ワークフロー実行状態を様々なバックエンド(メモリ/RocksDB/SQLite)で永続化します。
4
5use std::collections::HashMap;
6use std::sync::Arc;
7use tokio::sync::RwLock;
8use chrono::{DateTime, Utc};
9
10use kotoba_core::types::GraphRef_ as GraphRef;
11use crate::ir::{WorkflowExecution, WorkflowExecutionId, ExecutionEvent, ExecutionEventType, ExecutionStatus};
12use kotoba_errors::WorkflowError;
13
14/// ストレージバックエンド種別
15#[derive(Debug, Clone)]
16pub enum StorageBackend {
17    Memory,
18    #[cfg(feature = "rocksdb")]
19    RocksDB { path: String },
20    #[cfg(feature = "sqlite")]
21    SQLite { path: String },
22}
23
24/// ストレージバックエンドのファクトリ
25pub struct StorageFactory;
26
27impl StorageFactory {
28    pub async fn create(backend: StorageBackend) -> Result<Arc<dyn WorkflowStore>, WorkflowError> {
29        match backend {
30            StorageBackend::Memory => Ok(Arc::new(MemoryWorkflowStore::new())),
31            #[cfg(feature = "rocksdb")]
32            StorageBackend::RocksDB { path } => {
33                RocksDBWorkflowStore::new(&path).await.map(|s| Arc::new(s) as Arc<dyn WorkflowStore>)
34            }
35            #[cfg(feature = "sqlite")]
36            StorageBackend::SQLite { path } => {
37                SQLiteWorkflowStore::new(&path).await.map(|s| Arc::new(s) as Arc<dyn WorkflowStore>)
38            }
39            #[cfg(not(any(feature = "rocksdb", feature = "sqlite")))]
40            _ => Err(WorkflowError::StorageError("No storage backend enabled. Enable 'rocksdb' or 'sqlite' feature".to_string())),
41        }
42    }
43}
44
45/// ワークフロー永続化インターフェース
46#[async_trait::async_trait]
47pub trait WorkflowStore: Send + Sync {
48    /// ワークフロー実行を保存
49    async fn save_execution(&self, execution: &WorkflowExecution) -> Result<(), WorkflowError>;
50
51    /// ワークフロー実行を取得
52    async fn get_execution(&self, id: &WorkflowExecutionId) -> Result<Option<WorkflowExecution>, WorkflowError>;
53
54    /// ワークフロー実行を更新
55    async fn update_execution(&self, execution: &WorkflowExecution) -> Result<(), WorkflowError>;
56
57    /// 実行イベントを追加
58    async fn add_event(&self, execution_id: &WorkflowExecutionId, event: ExecutionEvent) -> Result<(), WorkflowError>;
59
60    /// 実行イベントを取得
61    async fn get_events(&self, execution_id: &WorkflowExecutionId) -> Result<Vec<ExecutionEvent>, WorkflowError>;
62
63    /// 実行中のワークフロー一覧を取得
64    async fn get_running_executions(&self) -> Result<Vec<WorkflowExecution>, WorkflowError>;
65
66    /// 任意のキーバリューストレージ
67    async fn put(&self, key: String, value: Vec<u8>) -> Result<(), WorkflowError>;
68    async fn get(&self, key: &str) -> Result<Option<Vec<u8>>, WorkflowError>;
69    async fn get_keys_with_prefix(&self, prefix: &str) -> Result<Vec<String>, WorkflowError>;
70}
71
72/// メモリベースのワークフローストア実装
73pub struct MemoryWorkflowStore {
74    executions: RwLock<HashMap<String, WorkflowExecution>>,
75    events: RwLock<HashMap<String, Vec<ExecutionEvent>>>,
76    kv_store: RwLock<HashMap<String, Vec<u8>>>,
77}
78
79impl MemoryWorkflowStore {
80    pub fn new() -> Self {
81        Self {
82            executions: RwLock::new(HashMap::new()),
83            events: RwLock::new(HashMap::new()),
84            kv_store: RwLock::new(HashMap::new()),
85        }
86    }
87}
88
89#[async_trait::async_trait]
90impl WorkflowStore for MemoryWorkflowStore {
91    async fn save_execution(&self, execution: &WorkflowExecution) -> Result<(), WorkflowError> {
92        let mut executions = self.executions.write().await;
93        executions.insert(execution.id.0.clone(), execution.clone());
94        Ok(())
95    }
96
97    async fn get_execution(&self, id: &WorkflowExecutionId) -> Result<Option<WorkflowExecution>, WorkflowError> {
98        let executions = self.executions.read().await;
99        Ok(executions.get(&id.0).cloned())
100    }
101
102    async fn update_execution(&self, execution: &WorkflowExecution) -> Result<(), WorkflowError> {
103        let mut executions = self.executions.write().await;
104        executions.insert(execution.id.0.clone(), execution.clone());
105        Ok(())
106    }
107
108    async fn add_event(&self, execution_id: &WorkflowExecutionId, event: ExecutionEvent) -> Result<(), WorkflowError> {
109        let mut events = self.events.write().await;
110        events.entry(execution_id.0.clone())
111            .or_insert_with(Vec::new)
112            .push(event);
113        Ok(())
114    }
115
116    async fn get_events(&self, execution_id: &WorkflowExecutionId) -> Result<Vec<ExecutionEvent>, WorkflowError> {
117        let events = self.events.read().await;
118        Ok(events.get(&execution_id.0).cloned().unwrap_or_default())
119    }
120
121    async fn get_running_executions(&self) -> Result<Vec<WorkflowExecution>, WorkflowError> {
122        let executions = self.executions.read().await;
123        let running = executions.values()
124            .filter(|e| matches!(e.status, ExecutionStatus::Running))
125            .cloned()
126            .collect();
127        Ok(running)
128    }
129
130    async fn put(&self, key: String, value: Vec<u8>) -> Result<(), WorkflowError> {
131        let mut kv_store = self.kv_store.write().await;
132        kv_store.insert(key, value);
133        Ok(())
134    }
135
136    async fn get(&self, key: &str) -> Result<Option<Vec<u8>>, WorkflowError> {
137        let kv_store = self.kv_store.read().await;
138        Ok(kv_store.get(key).cloned())
139    }
140
141    async fn get_keys_with_prefix(&self, prefix: &str) -> Result<Vec<String>, WorkflowError> {
142        let kv_store = self.kv_store.read().await;
143        let keys = kv_store.keys()
144            .filter(|k| k.starts_with(prefix))
145            .cloned()
146            .collect();
147        Ok(keys)
148    }
149}
150
151/// イベントソーシングマネージャー - 完全なイベントドリブン永続化
152pub struct EventSourcingManager {
153    store: Arc<dyn WorkflowStore>,
154    /// スナップショット間隔(イベント数)
155    snapshot_interval: usize,
156    /// 最大保持バージョン数
157    max_versions: usize,
158}
159
160impl EventSourcingManager {
161    pub fn new(store: Arc<dyn WorkflowStore>) -> Self {
162        Self {
163            store,
164            snapshot_interval: 100, // デフォルト100イベントごとにスナップショット
165            max_versions: 10, // デフォルト10バージョン保持
166        }
167    }
168
169    pub fn with_snapshot_config(mut self, snapshot_interval: usize, max_versions: usize) -> Self {
170        self.snapshot_interval = snapshot_interval;
171        self.max_versions = max_versions;
172        self
173    }
174
175    /// ワークフロー実行をイベントから再構築
176    pub async fn rebuild_execution(&self, execution_id: &WorkflowExecutionId) -> Result<Option<WorkflowExecution>, WorkflowError> {
177        let events = self.store.get_events(execution_id).await?;
178
179        if events.is_empty() {
180            return Ok(None);
181        }
182
183        let mut execution = WorkflowExecution {
184            id: execution_id.clone(),
185            workflow_id: String::new(), // 最初のイベントから取得
186            status: ExecutionStatus::Running,
187            start_time: events[0].timestamp,
188            end_time: None,
189            inputs: HashMap::new(),
190            outputs: None,
191            current_graph: GraphRef("reconstructed".to_string()), // TODO: 適切な初期化
192            execution_history: events.clone(),
193            retry_count: 0,
194            timeout_at: None,
195        };
196
197        // イベントを順番に適用して状態を再構築
198        for event in events {
199            match event.event_type {
200                ExecutionEventType::Started => {
201                    if let Some(workflow_id) = event.payload.get("workflow_id").and_then(|v| v.as_str()) {
202                        execution.workflow_id = workflow_id.to_string();
203                    }
204                    if let Some(inputs) = event.payload.get("inputs").and_then(|v| v.as_object()) {
205                        execution.inputs = inputs.clone().into_iter().collect();
206                    }
207                }
208                ExecutionEventType::WorkflowCompleted => {
209                    execution.status = ExecutionStatus::Completed;
210                    execution.end_time = Some(event.timestamp);
211                    if let Some(outputs) = event.payload.get("outputs") {
212                        execution.outputs = serde_json::from_value(outputs.clone()).ok();
213                    }
214                }
215                ExecutionEventType::WorkflowFailed => {
216                    execution.status = ExecutionStatus::Failed;
217                    execution.end_time = Some(event.timestamp);
218                }
219                ExecutionEventType::WorkflowCancelled => {
220                    execution.status = ExecutionStatus::Cancelled;
221                    execution.end_time = Some(event.timestamp);
222                }
223                _ => {} // 他のイベントは状態に直接影響しない
224            }
225        }
226
227        Ok(Some(execution))
228    }
229
230    /// ワークフローイベントを記録(スナップショット最適化付き)
231    pub async fn record_event(&self, execution_id: &WorkflowExecutionId, event_type: ExecutionEventType, payload: HashMap<String, serde_json::Value>) -> Result<(), WorkflowError> {
232        let event = ExecutionEvent {
233            id: uuid::Uuid::new_v4().to_string(),
234            timestamp: Utc::now(),
235            event_type,
236            payload,
237        };
238
239        // イベントを保存
240        self.store.add_event(execution_id, event).await?;
241
242        // スナップショットが必要かチェック
243        if self.needs_snapshot(execution_id).await? {
244            self.create_snapshot(execution_id).await?;
245        }
246
247        Ok(())
248    }
249
250    /// スナップショットが必要かチェック
251    pub async fn needs_snapshot(&self, execution_id: &WorkflowExecutionId) -> Result<bool, WorkflowError> {
252        let events = self.store.get_events(execution_id).await?;
253        Ok(events.len() % self.snapshot_interval == 0)
254    }
255
256    /// スナップショットを作成(最新の実行状態を保存)
257    pub async fn create_snapshot(&self, execution_id: &WorkflowExecutionId) -> Result<(), WorkflowError> {
258        if let Some(execution) = self.store.get_execution(execution_id).await? {
259            // スナップショットとして実行状態を保存
260            self.store.save_execution(&execution).await?;
261
262            // 古いイベントをクリーンアップ(オプション)
263            self.cleanup_old_events(execution_id).await?;
264        }
265        Ok(())
266    }
267
268    /// 古いイベントをクリーンアップ
269    pub async fn cleanup_old_events(&self, execution_id: &WorkflowExecutionId) -> Result<(), WorkflowError> {
270        // メモリストアの場合は何もしない
271        // 永続ストアの場合は古いイベントをアーカイブ
272        Ok(())
273    }
274
275    /// ワークフロー実行の完全なイベント履歴を取得
276    pub async fn get_full_event_history(&self, execution_id: &WorkflowExecutionId) -> Result<Vec<ExecutionEvent>, WorkflowError> {
277        self.store.get_events(execution_id).await
278    }
279
280    /// イベントベースのワークフロー状態再構築
281    pub async fn rebuild_execution_from_events(&self, execution_id: &WorkflowExecutionId) -> Result<Option<WorkflowExecution>, WorkflowError> {
282        let events = self.get_full_event_history(execution_id).await?;
283
284        if events.is_empty() {
285            return Ok(None);
286        }
287
288        // 最新のスナップショットを取得(ある場合)
289        let mut execution = if let Some(snapshot) = self.store.get_execution(execution_id).await? {
290            snapshot
291        } else {
292            // スナップショットがない場合は最初から再構築
293            WorkflowExecution {
294                id: execution_id.clone(),
295                workflow_id: String::new(),
296                status: ExecutionStatus::Running,
297                start_time: events[0].timestamp,
298                end_time: None,
299                inputs: HashMap::new(),
300                outputs: None,
301                current_graph: GraphRef("reconstructed".to_string()),
302                execution_history: vec![],
303                retry_count: 0,
304                timeout_at: None,
305            }
306        };
307
308        // スナップショット以降のイベントを適用
309        for event in &events {
310            if event.timestamp > execution.start_time {
311                self.apply_event_to_execution(&mut execution, event);
312            }
313        }
314
315        Ok(Some(execution))
316    }
317
318    /// イベントを実行状態に適用
319    fn apply_event_to_execution(&self, execution: &mut WorkflowExecution, event: &ExecutionEvent) {
320        match &event.event_type {
321            ExecutionEventType::Started => {
322                if let Some(workflow_id) = event.payload.get("workflow_id").and_then(|v| v.as_str()) {
323                    execution.workflow_id = workflow_id.to_string();
324                }
325                if let Some(inputs) = event.payload.get("inputs").and_then(|v| v.as_object()) {
326                    execution.inputs = inputs.clone().into_iter().collect();
327                }
328            }
329            ExecutionEventType::ActivityScheduled => {
330                // Activityスケジュール状態を更新
331                execution.execution_history.push(event.clone());
332            }
333            ExecutionEventType::ActivityStarted => {
334                execution.execution_history.push(event.clone());
335            }
336            ExecutionEventType::ActivityCompleted => {
337                execution.execution_history.push(event.clone());
338            }
339            ExecutionEventType::ActivityFailed => {
340                execution.execution_history.push(event.clone());
341            }
342            ExecutionEventType::WorkflowCompleted => {
343                execution.status = ExecutionStatus::Completed;
344                execution.end_time = Some(event.timestamp);
345                if let Some(outputs) = event.payload.get("outputs") {
346                    execution.outputs = serde_json::from_value(outputs.clone()).ok();
347                }
348            }
349            ExecutionEventType::WorkflowFailed => {
350                execution.status = ExecutionStatus::Failed;
351                execution.end_time = Some(event.timestamp);
352            }
353            ExecutionEventType::WorkflowCancelled => {
354                execution.status = ExecutionStatus::Cancelled;
355                execution.end_time = Some(event.timestamp);
356            }
357            _ => {
358                // その他のイベントも履歴に追加
359                execution.execution_history.push(event.clone());
360            }
361        }
362    }
363}
364
365/// スナップショットマネージャー(パフォーマンス最適化) - Phase 2
366pub struct SnapshotManager {
367    store: Arc<dyn WorkflowStore>,
368    event_sourcing: Arc<EventSourcingManager>,
369    snapshot_interval: usize, // イベント数
370    max_snapshots_per_execution: usize, // 実行ごとの最大スナップショット数
371}
372
373impl SnapshotManager {
374    pub fn new(store: Arc<dyn WorkflowStore>, event_sourcing: Arc<EventSourcingManager>) -> Self {
375        Self {
376            store,
377            event_sourcing,
378            snapshot_interval: 50, // デフォルト50イベントごとにスナップショット
379            max_snapshots_per_execution: 5, // 実行ごとに最大5スナップショット
380        }
381    }
382
383    pub fn with_config(mut self, snapshot_interval: usize, max_snapshots: usize) -> Self {
384        self.snapshot_interval = snapshot_interval;
385        self.max_snapshots_per_execution = max_snapshots;
386        self
387    }
388
389    /// スナップショットが必要かチェック
390    pub async fn needs_snapshot(&self, execution_id: &WorkflowExecutionId) -> Result<bool, WorkflowError> {
391        let events = self.store.get_events(execution_id).await?;
392        Ok(events.len() % self.snapshot_interval == 0)
393    }
394
395    /// 最適化されたスナップショットを作成
396    pub async fn create_snapshot(&self, execution: &WorkflowExecution) -> Result<(), WorkflowError> {
397        // 現在の実行状態を効率的な形式で保存
398        self.store.save_execution(execution).await?;
399
400        // 古いスナップショットをクリーンアップ
401        self.cleanup_old_snapshots(&execution.id).await?;
402
403        Ok(())
404    }
405
406    /// 古いスナップショットをクリーンアップ
407    pub async fn cleanup_old_snapshots(&self, execution_id: &WorkflowExecutionId) -> Result<(), WorkflowError> {
408        // メモリストアの場合は何もしない
409        // 永続ストアの場合は古いスナップショットをアーカイブまたは削除
410        Ok(())
411    }
412
413    /// ワークフロー実行をスナップショットから高速復元
414    pub async fn restore_from_snapshot(&self, execution_id: &WorkflowExecutionId) -> Result<Option<WorkflowExecution>, WorkflowError> {
415        // 最新のスナップショットを取得
416        if let Some(execution) = self.store.get_execution(execution_id).await? {
417            // スナップショット以降のイベントを適用して最新状態に更新
418            let events = self.store.get_events(execution_id).await?;
419            let mut restored_execution = execution;
420
421            // スナップショット以降のイベントを適用
422            for event in events {
423                if event.timestamp > restored_execution.start_time {
424                    self.event_sourcing.apply_event_to_execution(&mut restored_execution, &event);
425                }
426            }
427
428            Ok(Some(restored_execution))
429        } else {
430            Ok(None)
431        }
432    }
433
434    /// パフォーマンス統計を取得
435    pub async fn get_performance_stats(&self) -> HashMap<String, usize> {
436        let mut stats = HashMap::new();
437        // TODO: 実際の実装ではストアから統計情報を収集
438        stats.insert("total_snapshots".to_string(), 0);
439        stats.insert("avg_events_per_snapshot".to_string(), self.snapshot_interval);
440        stats
441    }
442}
443
444#[cfg(feature = "rocksdb")]
445/// RocksDBベースのワークフローストア実装
446pub struct RocksDBWorkflowStore {
447    db: rocksdb::DB,
448}
449
450#[cfg(feature = "rocksdb")]
451impl RocksDBWorkflowStore {
452    pub async fn new(path: &str) -> Result<Self, WorkflowError> {
453        let mut opts = rocksdb::Options::default();
454        opts.create_if_missing(true);
455        opts.set_compression_type(rocksdb::DBCompressionType::Lz4);
456
457        let db = rocksdb::DB::open(&opts, path)
458            .map_err(|e| WorkflowError::StorageError(format!("Failed to open RocksDB: {}", e)))?;
459
460        Ok(Self { db })
461    }
462
463    fn execution_key(execution_id: &WorkflowExecutionId) -> String {
464        format!("execution:{}", execution_id.0)
465    }
466
467    fn events_key(execution_id: &WorkflowExecutionId) -> String {
468        format!("events:{}", execution_id.0)
469    }
470}
471
472#[cfg(feature = "rocksdb")]
473#[async_trait::async_trait]
474impl WorkflowStore for RocksDBWorkflowStore {
475    async fn save_execution(&self, execution: &WorkflowExecution) -> Result<(), WorkflowError> {
476        let key = Self::execution_key(&execution.id);
477        let value = serde_json::to_string(execution)
478            .map_err(|e| WorkflowError::SerializationError(e))?;
479
480        self.db.put(key.as_bytes(), value.as_bytes())
481            .map_err(|e| WorkflowError::StorageError(format!("RocksDB put error: {}", e)))?;
482
483        Ok(())
484    }
485
486    async fn get_execution(&self, id: &WorkflowExecutionId) -> Result<Option<WorkflowExecution>, WorkflowError> {
487        let key = Self::execution_key(id);
488
489        match self.db.get(key.as_bytes())
490            .map_err(|e| WorkflowError::StorageError(format!("RocksDB get error: {}", e)))? {
491            Some(data) => {
492                let execution: WorkflowExecution = serde_json::from_slice(&data)
493                    .map_err(|e| WorkflowError::SerializationError(e))?;
494                Ok(Some(execution))
495            }
496            None => Ok(None),
497        }
498    }
499
500    async fn update_execution(&self, execution: &WorkflowExecution) -> Result<(), WorkflowError> {
501        self.save_execution(execution).await
502    }
503
504    async fn add_event(&self, execution_id: &WorkflowExecutionId, event: ExecutionEvent) -> Result<(), WorkflowError> {
505        let key = Self::events_key(execution_id);
506
507        // 既存のイベントを取得
508        let mut events = match self.db.get(key.as_bytes())
509            .map_err(|e| WorkflowError::StorageError(format!("RocksDB get error: {}", e)))? {
510            Some(data) => serde_json::from_slice::<Vec<ExecutionEvent>>(&data)
511                .map_err(|e| WorkflowError::SerializationError(e))?,
512            None => Vec::new(),
513        };
514
515        events.push(event);
516
517        let value = serde_json::to_string(&events)
518            .map_err(|e| WorkflowError::SerializationError(e))?;
519
520        self.db.put(key.as_bytes(), value.as_bytes())
521            .map_err(|e| WorkflowError::StorageError(format!("RocksDB put error: {}", e)))?;
522
523        Ok(())
524    }
525
526    async fn get_events(&self, execution_id: &WorkflowExecutionId) -> Result<Vec<ExecutionEvent>, WorkflowError> {
527        let key = Self::events_key(execution_id);
528
529        match self.db.get(key.as_bytes())
530            .map_err(|e| WorkflowError::StorageError(format!("RocksDB get error: {}", e)))? {
531            Some(data) => {
532                let events: Vec<ExecutionEvent> = serde_json::from_slice(&data)
533                    .map_err(|e| WorkflowError::SerializationError(e))?;
534                Ok(events)
535            }
536            None => Ok(Vec::new()),
537        }
538    }
539
540    async fn get_running_executions(&self) -> Result<Vec<WorkflowExecution>, WorkflowError> {
541        // RocksDBでは全実行をスキャンする必要がある(最適化可能)
542        let mut executions = Vec::new();
543        let iter = self.db.prefix_iterator(b"execution:");
544
545        for item in iter {
546            let (_key, value) = item.map_err(|e| WorkflowError::StorageError(format!("Iterator error: {}", e)))?;
547            let execution: WorkflowExecution = serde_json::from_slice(&value)
548                .map_err(|e| WorkflowError::SerializationError(e))?;
549
550            if matches!(execution.status, ExecutionStatus::Running) {
551                executions.push(execution);
552            }
553        }
554
555        Ok(executions)
556    }
557}
558
559#[cfg(feature = "sqlite")]
560use std::sync::Mutex;
561
562/// SQLiteベースのワークフローストア実装
563#[cfg(feature = "sqlite")]
564pub struct SQLiteWorkflowStore {
565    conn: Arc<Mutex<rusqlite::Connection>>,
566}
567
568#[cfg(feature = "sqlite")]
569impl SQLiteWorkflowStore {
570    pub async fn new(path: &str) -> Result<Self, WorkflowError> {
571        let conn = rusqlite::Connection::open(path)
572            .map_err(|e| WorkflowError::StorageError(format!("Failed to open SQLite: {}", e)))?;
573
574        // テーブル作成
575        conn.execute(
576            "CREATE TABLE IF NOT EXISTS executions (
577                id TEXT PRIMARY KEY,
578                data TEXT NOT NULL
579            )",
580            [],
581        ).map_err(|e| WorkflowError::StorageError(format!("Table creation error: {}", e)))?;
582
583        conn.execute(
584            "CREATE TABLE IF NOT EXISTS events (
585                id INTEGER PRIMARY KEY,
586                execution_id TEXT NOT NULL,
587                event_data TEXT NOT NULL,
588                FOREIGN KEY(execution_id) REFERENCES executions(id)
589            )",
590            [],
591        ).map_err(|e| WorkflowError::StorageError(format!("Table creation error: {}", e)))?;
592
593        Ok(Self {
594            conn: Arc::new(Mutex::new(conn)),
595        })
596    }
597}
598
599#[cfg(feature = "sqlite")]
600#[async_trait::async_trait]
601impl WorkflowStore for SQLiteWorkflowStore {
602    async fn save_execution(&self, execution: &WorkflowExecution) -> Result<(), WorkflowError> {
603        let data = serde_json::to_string(execution)
604            .map_err(|e| WorkflowError::SerializationError(e))?;
605
606        let conn = self.conn.lock().map_err(|_| WorkflowError::StorageError("Mutex poison".to_string()))?;
607        conn.execute(
608            "INSERT OR REPLACE INTO executions (id, data) VALUES (?1, ?2)",
609            [&execution.id.0, &data],
610        ).map_err(|e| WorkflowError::StorageError(format!("SQLite insert error: {}", e)))?;
611
612        Ok(())
613    }
614
615    async fn get_execution(&self, id: &WorkflowExecutionId) -> Result<Option<WorkflowExecution>, WorkflowError> {
616        let conn = self.conn.lock().map_err(|_| WorkflowError::StorageError("Mutex poison".to_string()))?;
617        let mut stmt = conn.prepare("SELECT data FROM executions WHERE id = ?1")
618            .map_err(|e| WorkflowError::StorageError(format!("SQLite prepare error: {}", e)))?;
619
620        let mut rows = stmt.query_map([&id.0], |row| row.get::<_, String>(0))
621            .map_err(|e| WorkflowError::StorageError(format!("SQLite query error: {}", e)))?;
622
623        if let Some(row) = rows.next() {
624            let data: String = row.map_err(|e| WorkflowError::StorageError(format!("SQLite row error: {}", e)))?;
625            let execution: WorkflowExecution = serde_json::from_str(&data)
626                .map_err(|e| WorkflowError::SerializationError(e))?;
627            Ok(Some(execution))
628        } else {
629            Ok(None)
630        }
631    }
632
633    async fn update_execution(&self, execution: &WorkflowExecution) -> Result<(), WorkflowError> {
634        self.save_execution(execution).await
635    }
636
637    async fn add_event(&self, execution_id: &WorkflowExecutionId, event: ExecutionEvent) -> Result<(), WorkflowError> {
638        let event_data = serde_json::to_string(&event)
639            .map_err(|e| WorkflowError::SerializationError(e))?;
640
641        let conn = self.conn.lock().map_err(|_| WorkflowError::StorageError("Mutex poison".to_string()))?;
642        conn.execute(
643            "INSERT INTO events (execution_id, event_data) VALUES (?1, ?2)",
644            [&execution_id.0, &event_data],
645        ).map_err(|e| WorkflowError::StorageError(format!("SQLite insert error: {}", e)))?;
646
647        Ok(())
648    }
649
650    async fn get_events(&self, execution_id: &WorkflowExecutionId) -> Result<Vec<ExecutionEvent>, WorkflowError> {
651        let conn = self.conn.lock().map_err(|_| WorkflowError::StorageError("Mutex poison".to_string()))?;
652        let mut stmt = conn.prepare("SELECT event_data FROM events WHERE execution_id = ?1 ORDER BY id")
653            .map_err(|e| WorkflowError::StorageError(format!("SQLite prepare error: {}", e)))?;
654
655        let rows = stmt.query_map([&execution_id.0], |row| row.get::<_, String>(0))
656            .map_err(|e| WorkflowError::StorageError(format!("SQLite query error: {}", e)))?;
657
658        let mut events = Vec::new();
659        for row in rows {
660            let data: String = row.map_err(|e| WorkflowError::StorageError(format!("SQLite row error: {}", e)))?;
661            let event: ExecutionEvent = serde_json::from_str(&data)
662                .map_err(|e| WorkflowError::SerializationError(e))?;
663            events.push(event);
664        }
665
666        Ok(events)
667    }
668
669    async fn get_running_executions(&self) -> Result<Vec<WorkflowExecution>, WorkflowError> {
670        let conn = self.conn.lock().map_err(|_| WorkflowError::StorageError("Mutex poison".to_string()))?;
671        let mut stmt = conn.prepare("SELECT data FROM executions")
672            .map_err(|e| WorkflowError::StorageError(format!("SQLite prepare error: {}", e)))?;
673
674        let rows = stmt.query_map([], |row| row.get::<_, String>(0))
675            .map_err(|e| WorkflowError::StorageError(format!("SQLite query error: {}", e)))?;
676
677        let mut executions = Vec::new();
678        for row in rows {
679            let data: String = row.map_err(|e| WorkflowError::StorageError(format!("SQLite row error: {}", e)))?;
680            let execution: WorkflowExecution = serde_json::from_str(&data)
681                .map_err(|e| WorkflowError::SerializationError(e))?;
682
683            if matches!(execution.status, ExecutionStatus::Running) {
684                executions.push(execution);
685            }
686        }
687
688        Ok(executions)
689    }
690}
691/*
692pub struct KotobaStorageBridge {
693    kotoba_backend: std::sync::Arc<dyn kotoba_storage::port::StoragePort>,
694}
695
696impl KotobaStorageBridge {
697    pub fn new(backend: std::sync::Arc<dyn kotoba_storage::port::StoragePort>) -> Self {
698        Self {
699            kotoba_backend: backend,
700        }
701    }
702
703    fn execution_key(execution_id: &crate::ir::WorkflowExecutionId) -> String {
704        format!("workflow:execution:{}", execution_id.0)
705    }
706
707    fn events_key(execution_id: &crate::ir::WorkflowExecutionId) -> String {
708        format!("workflow:events:{}", execution_id.0)
709    }
710}
711*/
712
713/*
714#[async_trait::async_trait]
715impl WorkflowStore for KotobaStorageBridge {
716    async fn save_execution(&self, execution: &crate::ir::WorkflowExecution) -> std::result::Result<(), crate::WorkflowError> {
717        let key = Self::execution_key(&execution.id);
718        let value = serde_json::to_vec(execution)
719            .map_err(|e| crate::WorkflowError::SerializationError(e))?;
720        self.kotoba_backend.put(key.as_bytes(), &value).await
721            .map_err(|e| crate::WorkflowError::StorageError(format!("Kotoba storage error: {}", e)))
722    }
723
724    async fn get_execution(&self, id: &crate::ir::WorkflowExecutionId) -> std::result::Result<std::option::Option<crate::ir::WorkflowExecution>, crate::WorkflowError> {
725        let key = Self::execution_key(id);
726        match self.kotoba_backend.get(key.as_bytes()).await {
727            Ok(Some(data)) => {
728                let execution: crate::ir::WorkflowExecution = serde_json::from_slice(&data)
729                    .map_err(|e| crate::WorkflowError::SerializationError(e))?;
730                Ok(Some(execution))
731            }
732            Ok(None) => Ok(None),
733            Err(e) => Err(crate::WorkflowError::StorageError(format!("Kotoba storage error: {}", e))),
734        }
735    }
736
737    async fn update_execution(&self, execution: &crate::ir::WorkflowExecution) -> std::result::Result<(), crate::WorkflowError> {
738        self.save_execution(execution).await
739    }
740
741    async fn add_event(&self, execution_id: &crate::ir::WorkflowExecutionId, event: crate::ir::ExecutionEvent) -> std::result::Result<(), crate::WorkflowError> {
742        let key = Self::events_key(execution_id);
743        let mut events = match self.kotoba_backend.get(key.as_bytes()).await {
744            Ok(Some(data)) => serde_json::from_slice::<Vec<crate::ir::ExecutionEvent>>(&data)
745                .map_err(|e| crate::WorkflowError::SerializationError(e))?,
746            Ok(None) => Vec::new(),
747            Err(e) => return Err(crate::WorkflowError::StorageError(format!("Kotoba storage error: {}", e))),
748        };
749        events.push(event);
750        let value = serde_json::to_vec(&events)
751            .map_err(|e| crate::WorkflowError::SerializationError(e))?;
752        self.kotoba_backend.put(key.as_bytes(), &value).await
753            .map_err(|e| crate::WorkflowError::StorageError(format!("Kotoba storage error: {}", e)))
754    }
755
756    async fn get_events(&self, execution_id: &crate::ir::WorkflowExecutionId) -> std::result::Result<Vec<crate::ir::ExecutionEvent>, crate::WorkflowError> {
757        let key = Self::events_key(execution_id);
758        match self.kotoba_backend.get(key.as_bytes()).await {
759            Ok(Some(data)) => {
760                let events: Vec<crate::ir::ExecutionEvent> = serde_json::from_slice(&data)
761                    .map_err(|e| crate::WorkflowError::SerializationError(e))?;
762                Ok(events)
763            }
764            Ok(None) => Ok(Vec::new()),
765            Err(e) => Err(crate::WorkflowError::StorageError(format!("Kotoba storage error: {}", e))),
766        }
767    }
768
769    async fn get_running_executions(&self) -> std::result::Result<Vec<crate::ir::WorkflowExecution>, crate::WorkflowError> {
770        let prefix = "workflow:execution:".to_string();
771        let keys = self.kotoba_backend.get_keys_with_prefix(prefix.as_bytes()).await
772            .map_err(|e| crate::WorkflowError::StorageError(format!("Kotoba storage error: {}", e)))?;
773        let mut executions = Vec::new();
774        for key in keys {
775            if let Ok(Some(data)) = self.kotoba_backend.get(key.as_bytes()).await {
776                if let Ok(execution) = serde_json::from_slice::<crate::ir::WorkflowExecution>(&data) {
777                    if matches!(execution.status, crate::ir::ExecutionStatus::Running) {
778                        executions.push(execution);
779                    }
780                }
781            }
782        }
783        Ok(executions)
784    }
785
786    // Adapter methods to bridge StoragePort to expected interface
787    async fn put(&self, key: String, value: Vec<u8>) -> std::result::Result<(), crate::WorkflowError> {
788        // For now, just store in memory as a bridge until StoragePort interface is clarified
789        // TODO: Implement proper StoragePort integration
790        let mut kv_store = self.kv_store.write().await;
791        kv_store.insert(key, value);
792        Ok(())
793    }
794
795    async fn get(&self, key: &str) -> std::result::Result<std::option::Option<Vec<u8>>, crate::WorkflowError> {
796        // For now, just get from memory storage
797        let kv_store = self.kv_store.read().await;
798        Ok(kv_store.get(key).cloned())
799    }
800
801    async fn get_keys_with_prefix(&self, prefix: &str) -> std::result::Result<Vec<String>, crate::WorkflowError> {
802        let kv_store = self.kv_store.read().await;
803        let keys = kv_store.keys()
804            .filter(|k| k.starts_with(prefix))
805            .cloned()
806            .collect();
807        Ok(keys)
808    }
809}
810*/