kotoba_workflow/
lib.rs

1//! # Kotoba Workflow Engine - Serverless Workflow Specification Compliant
2//!
3//! Serverless Workflow (https://serverlessworkflow.io/) compliant workflow engine
4//! built on top of Kotoba's graph rewriting system.
5//!
6//! ## Features
7//!
8//! - **Serverless Workflow DSL**: JSON/YAML-based workflow definition compliant with SW specification
9//! - **Event-driven Execution**: Supports event-driven workflows with CRON and time-based triggers
10//! - **Multi-protocol Support**: HTTP, gRPC, OpenAPI, AsyncAPI, custom protocols
11//! - **Fault Tolerance**: Comprehensive error handling with try-catch, raise patterns
12//! - **Platform Agnostic**: Runs across diverse platforms and environments
13//! - **Extensible**: Custom functions, extensions, and integration capabilities
14//!
15//! ## Serverless Workflow Example
16//!
17//! ```rust
18//! use kotoba_workflow::prelude::*;
19//! use serde_json::json;
20//!
21//! #[tokio::main]
22//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
23//!     // Serverless Workflow definition
24//!     let workflow_def = json!({
25//!         "document": {
26//!             "dsl": "1.0.0",
27//!             "namespace": "default",
28//!             "name": "order-processing",
29//!             "version": "1.0.0"
30//!         },
31//!         "do": [
32//!             {
33//!                 "validateOrder": {
34//!                     "call": "http",
35//!                     "with": {
36//!                         "method": "post",
37//!                         "endpoint": "https://api.example.com/validate",
38//!                         "body": "${ .order }"
39//!                     }
40//!                 }
41//!             },
42//!             {
43//!                 "processPayment": {
44//!                     "call": "http",
45//!                     "with": {
46//!                         "method": "post",
47//!                         "endpoint": "https://payment.example.com/process"
48//!                     }
49//!                 }
50//!             }
51//!         ]
52//!     });
53//!
54//!     // Parse and execute
55//!     let workflow = WorkflowEngine::from_json(workflow_def)?;
56//!     let result = workflow.execute(json!({"order": {"id": "123"}})).await?;
57//!
58//!     println!("Workflow completed: {:?}", result);
59//!     Ok(())
60//! }
61//! ```
62
63use std::collections::HashMap;
64use std::sync::Arc;
65use crate::ir::{ExecutionEventType, ExecutionEvent};
66
67// Serverless Workflow specification module
68pub mod spec;
69pub use spec::*;
70use crate::distributed::{LoadBalancer, DistributedExecutionManager, DistributedWorkflowExecutor};
71use kotoba_core::prelude::TxId;
72use kotoba_errors::WorkflowError;
73// use kotoba_workflow_core::prelude::*;
74
75pub mod ir;
76pub mod executor;
77pub mod store;
78pub mod activity;
79pub mod parser;
80pub mod distributed;
81pub mod saga;
82pub mod monitoring;
83pub mod optimization;
84pub mod integrations;
85
86// Re-export main types - use core types where possible
87// pub use kotoba_workflow_core::{WorkflowIR, WorkflowExecution, WorkflowExecutionId, ExecutionStatus, WorkflowEngineInterface};
88pub use ir::{WorkflowIR, WorkflowExecution, WorkflowExecutionId, ExecutionStatus, ActivityIR, WorkflowStep, WorkflowStepType};
89pub use executor::{ActivityRegistry, Activity, WorkflowExecutor, WorkflowStateManager};
90pub use store::{WorkflowStore, StorageBackend, StorageFactory, EventSourcingManager, SnapshotManager};
91pub use parser::ServerlessWorkflowParser;
92pub use activity::prelude::*;
93pub use distributed::{
94    DistributedCoordinator, RoundRobinBalancer, LeastLoadedBalancer, NodeInfo, ClusterHealth
95};
96// Phase 3: Advanced Features
97pub use saga::{SagaManager, SagaExecutionEngine, AdvancedSagaPattern, SagaContext};
98pub use monitoring::{MonitoringManager, MonitoringConfig, WorkflowStats, ActivityStats, SystemHealth};
99pub use optimization::{WorkflowOptimizer, OptimizationStrategy, OptimizationResult, ParallelExecutionPlan};
100#[cfg(feature = "activities-http")]
101pub use integrations::HttpIntegration;
102#[cfg(feature = "activities-db")]
103pub use integrations::{DatabaseIntegration, MessageQueueIntegration};
104pub use integrations::{IntegrationManager, Integration};
105
106/// Workflow engine builder
107pub struct WorkflowEngineBuilder {
108    storage_backend: Option<StorageBackend>,
109}
110
111impl WorkflowEngineBuilder {
112    pub fn new() -> Self {
113        Self {
114            storage_backend: Some(StorageBackend::Memory), // Default to memory
115        }
116    }
117
118    /// Configure storage backend
119    pub fn with_storage_backend(mut self, backend: StorageBackend) -> Self {
120        self.storage_backend = Some(backend);
121        self
122    }
123
124    /// Use memory storage (default)
125    pub fn with_memory_storage(mut self) -> Self {
126        self.storage_backend = Some(StorageBackend::Memory);
127        self
128    }
129
130    /// Use RocksDB storage (requires 'rocksdb' feature)
131    #[cfg(feature = "rocksdb")]
132    pub fn with_rocksdb_storage(mut self, path: impl Into<String>) -> Self {
133        self.storage_backend = Some(StorageBackend::RocksDB { path: path.into() });
134        self
135    }
136
137    /// Use SQLite storage (requires 'sqlite' feature)
138    #[cfg(feature = "sqlite")]
139    pub fn with_sqlite_storage(mut self, path: impl Into<String>) -> Self {
140        self.storage_backend = Some(StorageBackend::SQLite { path: path.into() });
141        self
142    }
143
144    /// Use Kotoba storage backend for full integration
145    /*
146    pub fn with_kotoba_storage(mut self, backend: std::sync::Arc<dyn kotoba_storage::port::StoragePort>) -> Self {
147        self.kotoba_backend = Some(backend);
148        // When using Kotoba backend, disable internal storage
149        self.storage_backend = None;
150        self
151    }
152    */
153
154    pub async fn build(self) -> Result<ExtendedWorkflowEngine, WorkflowError> {
155        let storage = if let Some(backend) = self.storage_backend {
156            StorageFactory::create(backend).await?
157        /*
158        } else if let Some(kotoba_backend) = self.kotoba_backend {
159            // Create a bridge to Kotoba storage
160            std::sync::Arc::new(KotobaStorageBridge::new(kotoba_backend))
161        */
162        } else {
163            return Err(WorkflowError::StorageError("No storage backend configured".to_string()));
164        };
165
166        let activity_registry = std::sync::Arc::new(ActivityRegistry::new());
167        let state_manager = std::sync::Arc::new(WorkflowStateManager::new());
168
169        // Phase 2: Initialize event sourcing and snapshot management
170        let event_sourcing = std::sync::Arc::new(EventSourcingManager::new(std::sync::Arc::clone(&storage))
171            .with_snapshot_config(100, 10));
172        let snapshot_manager = std::sync::Arc::new(SnapshotManager::new(
173            std::sync::Arc::clone(&storage),
174            std::sync::Arc::clone(&event_sourcing)
175        ).with_config(50, 5));
176
177        // Create core engine first - placeholder for now
178        // let core_engine = kotoba_workflow_core::WorkflowEngine::new();
179        let core_engine = std::sync::Arc::new(()); // Placeholder
180
181        Ok(ExtendedWorkflowEngine {
182            core_engine,
183            storage,
184            activity_registry,
185            state_manager,
186            event_sourcing,
187            snapshot_manager,
188            executor: None,
189            distributed_executor: None,
190        })
191    }
192}
193
194/// Extended workflow engine - builds on core workflow functionality
195pub struct ExtendedWorkflowEngine {
196    core_engine: std::sync::Arc<()>, // Placeholder
197    storage: std::sync::Arc<dyn WorkflowStore>,
198    activity_registry: std::sync::Arc<ActivityRegistry>,
199    state_manager: std::sync::Arc<WorkflowStateManager>,
200    event_sourcing: std::sync::Arc<EventSourcingManager>,
201    snapshot_manager: std::sync::Arc<SnapshotManager>,
202    executor: Option<std::sync::Arc<WorkflowExecutor>>,
203    /// Phase 2: Distributed execution support
204    distributed_executor: Option<std::sync::Arc<DistributedWorkflowExecutor>>,
205}
206
207impl ExtendedWorkflowEngine {
208    pub fn builder() -> WorkflowEngineBuilder {
209        WorkflowEngineBuilder::new()
210    }
211
212    /// Get activity registry for registering activities
213    pub fn activity_registry(&self) -> &std::sync::Arc<ActivityRegistry> {
214        &self.activity_registry
215    }
216
217    /// Start workflow execution
218    pub async fn start_workflow(
219        &mut self,
220        workflow_ir: &WorkflowIR,
221        inputs: std::collections::HashMap<String, serde_json::Value>,
222    ) -> Result<WorkflowExecutionId, WorkflowError> {
223        let executor = self.executor.get_or_insert_with(|| {
224            std::sync::Arc::new(WorkflowExecutor::new(
225                std::sync::Arc::clone(&self.activity_registry),
226                std::sync::Arc::clone(&self.state_manager),
227            ))
228        });
229
230        executor.as_ref().start_workflow(workflow_ir, inputs).await
231    }
232
233    /// Wait for workflow completion
234    pub async fn wait_for_completion(
235        &self,
236        execution_id: WorkflowExecutionId,
237        timeout: Option<std::time::Duration>,
238    ) -> Result<WorkflowResult, WorkflowError> {
239        // TODO: Implement actual completion waiting logic with timeout
240        // For now, just poll the execution status
241        match self.storage.get_execution(&execution_id).await? {
242            Some(execution) => Ok(WorkflowResult {
243                execution_id,
244                status: execution.status,
245                outputs: execution.outputs,
246                execution_time: execution.start_time.signed_duration_since(chrono::Utc::now()).to_std()
247                    .unwrap_or(std::time::Duration::from_secs(0)),
248            }),
249            None => Err(WorkflowError::WorkflowNotFound(execution_id.0)),
250        }
251    }
252
253    /// Get workflow execution status
254    pub async fn get_execution_status(
255        &self,
256        execution_id: &WorkflowExecutionId,
257    ) -> Result<Option<ExecutionStatus>, WorkflowError> {
258        match self.storage.get_execution(execution_id).await? {
259            Some(execution) => Ok(Some(execution.status)),
260            None => Ok(None),
261        }
262    }
263
264    /// Get workflow execution details
265    pub async fn get_execution(
266        &self,
267        execution_id: &WorkflowExecutionId,
268    ) -> Result<Option<WorkflowExecution>, WorkflowError> {
269        self.storage.get_execution(execution_id).await
270    }
271
272    /// List running executions
273    pub async fn list_running_executions(&self) -> Result<Vec<WorkflowExecution>, WorkflowError> {
274        self.storage.get_running_executions().await
275    }
276
277    /// Cancel workflow execution
278    pub async fn cancel_execution(
279        &self,
280        execution_id: &WorkflowExecutionId,
281    ) -> Result<(), WorkflowError> {
282        // Phase 2: Use MVCC-based state management
283        if let Some(mut execution) = self.state_manager.get_execution(execution_id).await {
284            execution.status = ExecutionStatus::Cancelled;
285            execution.end_time = Some(chrono::Utc::now());
286            self.state_manager.update_execution(execution).await?;
287
288            // Record cancellation event
289            self.event_sourcing.record_event(
290                execution_id,
291                ExecutionEventType::WorkflowCancelled,
292                HashMap::new(),
293            ).await?;
294        }
295        Ok(())
296    }
297
298    /// Phase 2: Get workflow execution at specific transaction
299    pub async fn get_execution_at_tx(
300        &self,
301        execution_id: &WorkflowExecutionId,
302        tx_id: TxId,
303    ) -> Option<WorkflowExecution> {
304        self.state_manager.get_execution_at(execution_id, Some(tx_id)).await
305    }
306
307    /// Phase 2: Get execution history (all versions)
308    pub async fn get_execution_history(
309        &self,
310        execution_id: &WorkflowExecutionId,
311    ) -> Vec<(TxId, WorkflowExecution)> {
312        self.state_manager.get_execution_history(execution_id).await
313    }
314
315    /// Phase 2: Get full event history
316    pub async fn get_event_history(
317        &self,
318        execution_id: &WorkflowExecutionId,
319    ) -> Result<Vec<ExecutionEvent>, WorkflowError> {
320        self.event_sourcing.get_full_event_history(execution_id).await
321    }
322
323    /// Phase 2: Rebuild execution from events (for recovery)
324    pub async fn rebuild_execution_from_events(
325        &self,
326        execution_id: &WorkflowExecutionId,
327    ) -> Result<Option<WorkflowExecution>, WorkflowError> {
328        self.event_sourcing.rebuild_execution_from_events(execution_id).await
329    }
330
331    /// Phase 2: Create manual snapshot
332    pub async fn create_snapshot(
333        &self,
334        execution_id: &WorkflowExecutionId,
335    ) -> Result<(), WorkflowError> {
336        if let Some(execution) = self.state_manager.get_execution(execution_id).await {
337            self.snapshot_manager.create_snapshot(&execution).await?;
338        }
339        Ok(())
340    }
341
342    /// Phase 2: Restore from snapshot
343    pub async fn restore_from_snapshot(
344        &self,
345        execution_id: &WorkflowExecutionId,
346    ) -> Result<Option<WorkflowExecution>, WorkflowError> {
347        self.snapshot_manager.restore_from_snapshot(execution_id).await
348    }
349
350    /// Phase 2: Get performance statistics
351    pub async fn get_performance_stats(&self) -> HashMap<String, usize> {
352        self.snapshot_manager.get_performance_stats().await
353    }
354
355    /// Phase 2: Access event sourcing manager
356    pub fn event_sourcing(&self) -> &std::sync::Arc<EventSourcingManager> {
357        &self.event_sourcing
358    }
359
360    /// Phase 2: Access snapshot manager
361    pub fn snapshot_manager(&self) -> &std::sync::Arc<SnapshotManager> {
362        &self.snapshot_manager
363    }
364
365    /// Phase 2: Enable distributed execution
366    pub fn enable_distributed_execution(
367        &mut self,
368        local_node_id: String,
369        load_balancer: Arc<dyn LoadBalancer>,
370    ) {
371        let execution_manager = Arc::new(DistributedExecutionManager::new(
372            local_node_id,
373            load_balancer,
374        ));
375        self.distributed_executor = Some(Arc::new(DistributedWorkflowExecutor::new(
376            Arc::clone(&execution_manager)
377        )));
378    }
379
380    /// Phase 2: Submit workflow for distributed execution
381    pub async fn submit_distributed_workflow(
382        &self,
383        execution_id: WorkflowExecutionId,
384    ) -> Result<String, WorkflowError> {
385        if let Some(distributed) = &self.distributed_executor {
386            distributed.execution_manager.submit_execution(execution_id).await
387        } else {
388            Err(WorkflowError::InvalidStrategy("Distributed execution not enabled".to_string()))
389        }
390    }
391
392    /// Phase 2: Get cluster health
393    pub async fn get_cluster_health(&self) -> Result<ClusterHealth, WorkflowError> {
394        if let Some(distributed) = &self.distributed_executor {
395            Ok(distributed.cluster_health_check().await)
396        } else {
397            Err(WorkflowError::InvalidStrategy("Distributed execution not enabled".to_string()))
398        }
399    }
400
401    /// Phase 2: Get distributed execution manager
402    pub fn distributed_execution_manager(&self) -> Option<&std::sync::Arc<DistributedExecutionManager>> {
403        self.distributed_executor.as_ref()
404            .map(|d| &d.execution_manager)
405    }
406
407    /// Phase 2: Check if distributed execution is enabled
408    pub fn is_distributed_enabled(&self) -> bool {
409        self.distributed_executor.is_some()
410    }
411}
412
413/// Extended workflow execution result with additional timing information
414#[derive(Debug, Clone)]
415pub struct WorkflowResult {
416    pub execution_id: WorkflowExecutionId,
417    pub status: ExecutionStatus,
418    pub outputs: Option<std::collections::HashMap<String, serde_json::Value>>,
419    pub execution_time: std::time::Duration,
420}
421
422// WorkflowError is re-exported from executor module
423
424// Alias for backward compatibility
425pub type WorkflowEngine = ExtendedWorkflowEngine;
426
427/// Prelude for convenient imports
428pub mod prelude {
429    pub use super::{
430        WorkflowEngine, ExtendedWorkflowEngine,
431        ActivityRegistry, Activity, WorkflowStore, ExecutionStatus, WorkflowExecutionId,
432        ServerlessWorkflowParser, EventSourcingManager, SnapshotManager,
433        // Phase 2 distributed types
434        DistributedCoordinator, RoundRobinBalancer, LeastLoadedBalancer,
435        // Phase 3 advanced features
436        SagaManager, SagaExecutionEngine, MonitoringManager, WorkflowOptimizer,
437        IntegrationManager,
438    };
439}
440
441#[cfg(test)]
442mod tests {
443    use super::*;
444    use std::collections::HashMap;
445    use std::sync::Arc;
446    use chrono::Utc;
447
448    // Mock WorkflowStore for testing
449    struct MockWorkflowStore;
450
451    #[async_trait::async_trait]
452    impl WorkflowStore for MockWorkflowStore {
453        async fn save_execution(&self, _execution: &WorkflowExecution) -> Result<(), WorkflowError> {
454            Ok(())
455        }
456
457        async fn get_execution(&self, _execution_id: &WorkflowExecutionId) -> Result<Option<WorkflowExecution>, WorkflowError> {
458            Ok(None)
459        }
460
461        async fn update_execution(&self, _execution: &WorkflowExecution) -> Result<(), WorkflowError> {
462            Ok(())
463        }
464
465        async fn get_running_executions(&self) -> Result<Vec<WorkflowExecution>, WorkflowError> {
466            Ok(vec![])
467        }
468
469        async fn delete_execution(&self, _execution_id: &WorkflowExecutionId) -> Result<(), WorkflowError> {
470            Ok(())
471        }
472    }
473
474    // Mock StoragePort for testing
475    struct MockStoragePort;
476
477    #[async_trait::async_trait]
478    impl kotoba_storage::port::StoragePort for MockStoragePort {
479        async fn get(&self, _key: &[u8]) -> Result<Option<Vec<u8>>, kotoba_storage::StorageError> {
480            Ok(None)
481        }
482
483        async fn put(&self, _key: &[u8], _value: &[u8]) -> Result<(), kotoba_storage::StorageError> {
484            Ok(())
485        }
486
487        async fn delete(&self, _key: &[u8]) -> Result<(), kotoba_storage::StorageError> {
488            Ok(())
489        }
490
491        async fn scan(&self, _prefix: &[u8]) -> Result<Vec<(Vec<u8>, Vec<u8>)>, kotoba_storage::StorageError> {
492            Ok(vec![])
493        }
494    }
495
496    #[test]
497    fn test_workflow_engine_builder_creation() {
498        let builder = WorkflowEngineBuilder::new();
499
500        // Builder should have default memory storage
501        assert!(builder.storage_backend.is_some());
502        assert!(builder.kotoba_backend.is_none());
503        assert!(matches!(builder.storage_backend.as_ref().unwrap(), StorageBackend::Memory));
504    }
505
506    #[test]
507    fn test_workflow_engine_builder_with_memory_storage() {
508        let builder = WorkflowEngineBuilder::new().with_memory_storage();
509
510        assert!(matches!(builder.storage_backend.as_ref().unwrap(), StorageBackend::Memory));
511    }
512
513    #[test]
514    fn test_workflow_engine_builder_with_storage_backend() {
515        let builder = WorkflowEngineBuilder::new().with_storage_backend(StorageBackend::Memory);
516
517        assert!(matches!(builder.storage_backend.as_ref().unwrap(), StorageBackend::Memory));
518    }
519
520    #[cfg(feature = "rocksdb")]
521    #[test]
522    fn test_workflow_engine_builder_with_rocksdb_storage() {
523        let path = "/tmp/test_db";
524        let builder = WorkflowEngineBuilder::new().with_rocksdb_storage(path);
525
526        match builder.storage_backend.as_ref().unwrap() {
527            StorageBackend::RocksDB { path: db_path } => assert_eq!(db_path, path),
528            _ => panic!("Expected RocksDB storage backend"),
529        }
530    }
531
532    #[cfg(feature = "sqlite")]
533    #[test]
534    fn test_workflow_engine_builder_with_sqlite_storage() {
535        let path = "/tmp/test.db";
536        let builder = WorkflowEngineBuilder::new().with_sqlite_storage(path);
537
538        match builder.storage_backend.as_ref().unwrap() {
539            StorageBackend::SQLite { path: db_path } => assert_eq!(db_path, path),
540            _ => panic!("Expected SQLite storage backend"),
541        }
542    }
543
544    #[test]
545    fn test_workflow_engine_builder_with_kotoba_storage() {
546        let mock_storage = Arc::new(MockStoragePort);
547        let builder = WorkflowEngineBuilder::new().with_kotoba_storage(mock_storage);
548
549        assert!(builder.kotoba_backend.is_some());
550        assert!(builder.storage_backend.is_none()); // Should be disabled when using Kotoba backend
551    }
552
553    #[tokio::test]
554    async fn test_workflow_engine_builder_build_with_memory() {
555        let builder = WorkflowEngineBuilder::new();
556        let result = builder.build().await;
557
558        assert!(result.is_ok());
559        let engine = result.unwrap();
560
561        // Verify engine has all components initialized
562        assert!(engine.activity_registry().as_ref().is_send());
563        assert!(engine.event_sourcing.as_ref().is_send());
564        assert!(engine.snapshot_manager.as_ref().is_send());
565    }
566
567    #[tokio::test]
568    async fn test_workflow_engine_builder_build_with_kotoba_storage() {
569        let mock_storage = Arc::new(MockStoragePort);
570        let builder = WorkflowEngineBuilder::new().with_kotoba_storage(mock_storage);
571
572        let result = builder.build().await;
573        assert!(result.is_ok());
574    }
575
576    #[test]
577    fn test_extended_workflow_engine_builder_method() {
578        let builder = ExtendedWorkflowEngine::builder();
579
580        // Should return a WorkflowEngineBuilder
581        assert!(builder.storage_backend.is_some());
582    }
583
584    #[test]
585    fn test_workflow_result_creation() {
586        let execution_id = WorkflowExecutionId(uuid::Uuid::new_v4());
587        let status = ExecutionStatus::Completed;
588        let execution_time = std::time::Duration::from_secs(5);
589
590        let result = WorkflowResult {
591            execution_id,
592            status,
593            outputs: None,
594            execution_time,
595        };
596
597        assert_eq!(result.execution_id, execution_id);
598        assert_eq!(result.status, status);
599        assert_eq!(result.execution_time, execution_time);
600        assert!(result.outputs.is_none());
601    }
602
603    #[test]
604    fn test_workflow_result_with_outputs() {
605        let execution_id = WorkflowExecutionId(uuid::Uuid::new_v4());
606        let status = ExecutionStatus::Completed;
607        let execution_time = std::time::Duration::from_millis(1500);
608
609        // Create a mock workflow execution as output
610        let workflow_execution = WorkflowExecution {
611            execution_id,
612            workflow_ir: WorkflowIR::default(),
613            status,
614            inputs: HashMap::new(),
615            outputs: Some(serde_json::json!({"result": "success"})),
616            start_time: Utc::now(),
617            end_time: Some(Utc::now()),
618            error_message: None,
619        };
620
621        let result = WorkflowResult {
622            execution_id,
623            status,
624            outputs: Some(workflow_execution),
625            execution_time,
626        };
627
628        assert_eq!(result.execution_id, execution_id);
629        assert_eq!(result.execution_time, execution_time);
630        assert!(result.outputs.is_some());
631
632        let outputs = result.outputs.unwrap();
633        assert_eq!(outputs.execution_id, execution_id);
634        assert_eq!(outputs.status, status);
635    }
636
637    #[tokio::test]
638    async fn test_extended_workflow_engine_activity_registry() {
639        let builder = WorkflowEngineBuilder::new();
640        let engine = builder.build().await.unwrap();
641
642        let registry = engine.activity_registry();
643
644        // Should be able to get the registry
645        assert!(registry.as_ref().is_send());
646    }
647
648    #[tokio::test]
649    async fn test_extended_workflow_engine_get_execution_status() {
650        let builder = WorkflowEngineBuilder::new();
651        let engine = builder.build().await.unwrap();
652
653        let execution_id = WorkflowExecutionId(uuid::Uuid::new_v4());
654
655        // Should return None for non-existent execution
656        let result = engine.get_execution_status(&execution_id).await;
657        assert!(result.is_ok());
658        assert!(result.unwrap().is_none());
659    }
660
661    #[tokio::test]
662    async fn test_extended_workflow_engine_get_execution() {
663        let builder = WorkflowEngineBuilder::new();
664        let engine = builder.build().await.unwrap();
665
666        let execution_id = WorkflowExecutionId(uuid::Uuid::new_v4());
667
668        // Should return None for non-existent execution
669        let result = engine.get_execution(&execution_id).await;
670        assert!(result.is_ok());
671        assert!(result.unwrap().is_none());
672    }
673
674    #[tokio::test]
675    async fn test_extended_workflow_engine_list_running_executions() {
676        let builder = WorkflowEngineBuilder::new();
677        let engine = builder.build().await.unwrap();
678
679        // Should return empty list initially
680        let result = engine.list_running_executions().await;
681        assert!(result.is_ok());
682        assert!(result.unwrap().is_empty());
683    }
684
685    #[tokio::test]
686    async fn test_extended_workflow_engine_cancel_execution() {
687        let builder = WorkflowEngineBuilder::new();
688        let engine = builder.build().await.unwrap();
689
690        let execution_id = WorkflowExecutionId(uuid::Uuid::new_v4());
691
692        // Should succeed even for non-existent execution (graceful handling)
693        let result = engine.cancel_execution(&execution_id).await;
694        assert!(result.is_ok());
695    }
696
697    #[tokio::test]
698    async fn test_extended_workflow_engine_create_snapshot() {
699        let builder = WorkflowEngineBuilder::new();
700        let engine = builder.build().await.unwrap();
701
702        let execution_id = WorkflowExecutionId(uuid::Uuid::new_v4());
703
704        // Should succeed even for non-existent execution (graceful handling)
705        let result = engine.create_snapshot(&execution_id).await;
706        assert!(result.is_ok());
707    }
708
709    #[tokio::test]
710    async fn test_extended_workflow_engine_restore_from_snapshot() {
711        let builder = WorkflowEngineBuilder::new();
712        let engine = builder.build().await.unwrap();
713
714        let execution_id = WorkflowExecutionId(uuid::Uuid::new_v4());
715
716        // Should return None for non-existent snapshot
717        let result = engine.restore_from_snapshot(&execution_id).await;
718        assert!(result.is_ok());
719        assert!(result.unwrap().is_none());
720    }
721
722    #[tokio::test]
723    async fn test_extended_workflow_engine_get_performance_stats() {
724        let builder = WorkflowEngineBuilder::new();
725        let engine = builder.build().await.unwrap();
726
727        let stats = engine.get_performance_stats().await;
728
729        // Should return a HashMap (may be empty initially)
730        assert!(stats.is_empty() || !stats.is_empty()); // Accept either state
731    }
732
733    #[tokio::test]
734    async fn test_extended_workflow_engine_event_sourcing_access() {
735        let builder = WorkflowEngineBuilder::new();
736        let engine = builder.build().await.unwrap();
737
738        let event_sourcing = engine.event_sourcing();
739
740        // Should be able to access the event sourcing manager
741        assert!(event_sourcing.as_ref().is_send());
742    }
743
744    #[tokio::test]
745    async fn test_extended_workflow_engine_snapshot_manager_access() {
746        let builder = WorkflowEngineBuilder::new();
747        let engine = builder.build().await.unwrap();
748
749        let snapshot_manager = engine.snapshot_manager();
750
751        // Should be able to access the snapshot manager
752        assert!(snapshot_manager.as_ref().is_send());
753    }
754
755    #[test]
756    fn test_extended_workflow_engine_is_distributed_enabled_initially_false() {
757        let builder = WorkflowEngineBuilder::new();
758        let engine = builder.build().await.unwrap();
759
760        // Should be false initially
761        assert!(!engine.is_distributed_enabled());
762    }
763
764    #[test]
765    fn test_extended_workflow_engine_distributed_execution_manager_initially_none() {
766        let builder = WorkflowEngineBuilder::new();
767        let engine = builder.build().await.unwrap();
768
769        // Should be None initially
770        assert!(engine.distributed_execution_manager().is_none());
771    }
772
773    #[tokio::test]
774    async fn test_extended_workflow_engine_submit_distributed_workflow_without_setup() {
775        let builder = WorkflowEngineBuilder::new();
776        let mut engine = builder.build().await.unwrap();
777
778        let execution_id = WorkflowExecutionId(uuid::Uuid::new_v4());
779
780        // Should fail because distributed execution is not enabled
781        let result = engine.submit_distributed_workflow(execution_id).await;
782        assert!(result.is_err());
783
784        if let Err(WorkflowError::InvalidStrategy(msg)) = result {
785            assert!(msg.contains("Distributed execution not enabled"));
786        } else {
787            panic!("Expected InvalidStrategy error");
788        }
789    }
790
791    #[tokio::test]
792    async fn test_extended_workflow_engine_get_cluster_health_without_setup() {
793        let builder = WorkflowEngineBuilder::new();
794        let engine = builder.build().await.unwrap();
795
796        // Should fail because distributed execution is not enabled
797        let result = engine.get_cluster_health().await;
798        assert!(result.is_err());
799
800        if let Err(WorkflowError::InvalidStrategy(msg)) = result {
801            assert!(msg.contains("Distributed execution not enabled"));
802        } else {
803            panic!("Expected InvalidStrategy error");
804        }
805    }
806
807    #[tokio::test]
808    async fn test_extended_workflow_engine_start_workflow() {
809        let builder = WorkflowEngineBuilder::new();
810        let mut engine = builder.build().await.unwrap();
811
812        let workflow_ir = WorkflowIR::default();
813        let inputs = HashMap::new();
814
815        // This may fail due to unimplemented features, but we can test that the method exists
816        let result = engine.start_workflow(&workflow_ir, inputs).await;
817
818        // Accept both success and failure as the implementation may be incomplete
819        assert!(result.is_ok() || result.is_err());
820    }
821
822    #[tokio::test]
823    async fn test_extended_workflow_engine_wait_for_completion() {
824        let builder = WorkflowEngineBuilder::new();
825        let engine = builder.build().await.unwrap();
826
827        let execution_id = WorkflowExecutionId(uuid::Uuid::new_v4());
828        let timeout = Some(std::time::Duration::from_secs(5));
829
830        // Should return an error for non-existent execution
831        let result = engine.wait_for_completion(execution_id, timeout).await;
832
833        // Should fail because execution doesn't exist
834        assert!(result.is_err());
835    }
836
837    #[test]
838    fn test_workflow_result_debug_formatting() {
839        let execution_id = WorkflowExecutionId(uuid::Uuid::new_v4());
840        let result = WorkflowResult {
841            execution_id,
842            status: ExecutionStatus::Running,
843            outputs: None,
844            execution_time: std::time::Duration::from_secs(10),
845        };
846
847        let debug_str = format!("{:?}", result);
848        assert!(debug_str.contains("WorkflowResult"));
849        assert!(debug_str.contains("Running"));
850    }
851
852    #[test]
853    fn test_workflow_result_clone() {
854        let execution_id = WorkflowExecutionId(uuid::Uuid::new_v4());
855        let original = WorkflowResult {
856            execution_id,
857            status: ExecutionStatus::Failed,
858            outputs: None,
859            execution_time: std::time::Duration::from_millis(500),
860        };
861
862        let cloned = original.clone();
863
864        assert_eq!(original.execution_id, cloned.execution_id);
865        assert_eq!(original.status, cloned.status);
866        assert_eq!(original.outputs, cloned.outputs);
867        assert_eq!(original.execution_time, cloned.execution_time);
868    }
869
870    #[test]
871    fn test_workflow_engine_builder_chaining() {
872        let builder = WorkflowEngineBuilder::new()
873            .with_memory_storage()
874            .with_storage_backend(StorageBackend::Memory);
875
876        // Should still work after chaining
877        assert!(builder.storage_backend.is_some());
878    }
879
880    #[tokio::test]
881    async fn test_multiple_workflow_engines_independence() {
882        let builder1 = WorkflowEngineBuilder::new();
883        let builder2 = WorkflowEngineBuilder::new();
884
885        let engine1 = builder1.build().await.unwrap();
886        let engine2 = builder2.build().await.unwrap();
887
888        // Engines should be independent
889        assert!(!engine1.is_distributed_enabled());
890        assert!(!engine2.is_distributed_enabled());
891
892        // Different registry instances
893        assert!(!Arc::ptr_eq(engine1.activity_registry(), engine2.activity_registry()));
894    }
895
896    #[test]
897    fn test_storage_backend_enum_variants() {
898        // Test that all storage backend variants exist
899        let _memory = StorageBackend::Memory;
900
901        #[cfg(feature = "rocksdb")]
902        let _rocksdb = StorageBackend::RocksDB { path: "/tmp/test".to_string() };
903
904        #[cfg(feature = "sqlite")]
905        let _sqlite = StorageBackend::SQLite { path: "/tmp/test.db".to_string() };
906    }
907
908    #[test]
909    fn test_workflow_engine_type_alias() {
910        // Test that WorkflowEngine is an alias for ExtendedWorkflowEngine
911        let _engine: WorkflowEngine = ExtendedWorkflowEngine {
912            core_engine: kotoba_workflow_core::WorkflowEngine::new(),
913            storage: Arc::new(MockWorkflowStore),
914            activity_registry: Arc::new(ActivityRegistry::new()),
915            state_manager: Arc::new(WorkflowStateManager::new()),
916            event_sourcing: Arc::new(EventSourcingManager::new(Arc::new(MockWorkflowStore)).with_snapshot_config(100, 10)),
917            snapshot_manager: Arc::new(SnapshotManager::new(Arc::new(MockWorkflowStore), Arc::new(EventSourcingManager::new(Arc::new(MockWorkflowStore)).with_snapshot_config(100, 10))).with_config(50, 5)),
918            executor: None,
919            distributed_executor: None,
920        };
921    }
922
923    #[test]
924    fn test_prelude_exports() {
925        // Test that prelude exports work
926        let _builder = prelude::WorkflowEngine::builder();
927        let _workflow_ir: prelude::WorkflowIR = WorkflowIR::default();
928    }
929
930    #[tokio::test]
931    async fn test_extended_workflow_engine_get_execution_at_tx() {
932        let builder = WorkflowEngineBuilder::new();
933        let engine = builder.build().await.unwrap();
934
935        let execution_id = WorkflowExecutionId(uuid::Uuid::new_v4());
936        let tx_id = TxId::new(1);
937
938        // Should return None for non-existent execution
939        let result = engine.get_execution_at_tx(&execution_id, tx_id).await;
940        assert!(result.is_none());
941    }
942
943    #[tokio::test]
944    async fn test_extended_workflow_engine_get_execution_history() {
945        let builder = WorkflowEngineBuilder::new();
946        let engine = builder.build().await.unwrap();
947
948        let execution_id = WorkflowExecutionId(uuid::Uuid::new_v4());
949
950        // Should return empty history for non-existent execution
951        let history = engine.get_execution_history(&execution_id).await;
952        assert!(history.is_empty());
953    }
954
955    #[tokio::test]
956    async fn test_extended_workflow_engine_get_event_history() {
957        let builder = WorkflowEngineBuilder::new();
958        let engine = builder.build().await.unwrap();
959
960        let execution_id = WorkflowExecutionId(uuid::Uuid::new_v4());
961
962        // Should return empty event history for non-existent execution
963        let result = engine.get_event_history(&execution_id).await;
964        assert!(result.is_ok());
965        assert!(result.unwrap().is_empty());
966    }
967
968    #[tokio::test]
969    async fn test_extended_workflow_engine_rebuild_execution_from_events() {
970        let builder = WorkflowEngineBuilder::new();
971        let engine = builder.build().await.unwrap();
972
973        let execution_id = WorkflowExecutionId(uuid::Uuid::new_v4());
974
975        // Should return None for non-existent execution
976        let result = engine.rebuild_execution_from_events(&execution_id).await;
977        assert!(result.is_ok());
978        assert!(result.unwrap().is_none());
979    }
980
981    #[test]
982    fn test_workflow_result_serialization() {
983        let execution_id = WorkflowExecutionId(uuid::Uuid::new_v4());
984        let result = WorkflowResult {
985            execution_id,
986            status: ExecutionStatus::Completed,
987            outputs: None,
988            execution_time: std::time::Duration::from_secs(30),
989        };
990
991        // Test JSON serialization
992        let json_result = serde_json::to_string(&result);
993        assert!(json_result.is_ok());
994
995        let json_str = json_result.unwrap();
996        assert!(json_str.contains("Completed"));
997        assert!(json_str.contains("30"));
998
999        // Test JSON deserialization
1000        let deserialized_result: serde_json::Result<WorkflowResult> = serde_json::from_str(&json_str);
1001        assert!(deserialized_result.is_ok());
1002
1003        let deserialized = deserialized_result.unwrap();
1004        assert_eq!(deserialized.status, ExecutionStatus::Completed);
1005        assert_eq!(deserialized.execution_time, std::time::Duration::from_secs(30));
1006    }
1007
1008    #[test]
1009    fn test_workflow_engine_builder_debug() {
1010        let builder = WorkflowEngineBuilder::new();
1011        let debug_str = format!("{:?}", builder);
1012        assert!(debug_str.contains("WorkflowEngineBuilder"));
1013    }
1014
1015    #[test]
1016    fn test_extended_workflow_engine_debug() {
1017        let builder = WorkflowEngineBuilder::new();
1018        let engine = builder.build().await.unwrap();
1019        let debug_str = format!("{:?}", engine);
1020        assert!(debug_str.contains("ExtendedWorkflowEngine"));
1021    }
1022}