1use std::collections::HashMap;
64use std::sync::Arc;
65use crate::ir::{ExecutionEventType, ExecutionEvent};
66
67pub mod spec;
69pub use spec::*;
70use crate::distributed::{LoadBalancer, DistributedExecutionManager, DistributedWorkflowExecutor};
71use kotoba_core::prelude::TxId;
72use kotoba_errors::WorkflowError;
73pub mod ir;
76pub mod executor;
77pub mod store;
78pub mod activity;
79pub mod parser;
80pub mod distributed;
81pub mod saga;
82pub mod monitoring;
83pub mod optimization;
84pub mod integrations;
85
86pub use ir::{WorkflowIR, WorkflowExecution, WorkflowExecutionId, ExecutionStatus, ActivityIR, WorkflowStep, WorkflowStepType};
89pub use executor::{ActivityRegistry, Activity, WorkflowExecutor, WorkflowStateManager};
90pub use store::{WorkflowStore, StorageBackend, StorageFactory, EventSourcingManager, SnapshotManager};
91pub use parser::ServerlessWorkflowParser;
92pub use activity::prelude::*;
93pub use distributed::{
94 DistributedCoordinator, RoundRobinBalancer, LeastLoadedBalancer, NodeInfo, ClusterHealth
95};
96pub use saga::{SagaManager, SagaExecutionEngine, AdvancedSagaPattern, SagaContext};
98pub use monitoring::{MonitoringManager, MonitoringConfig, WorkflowStats, ActivityStats, SystemHealth};
99pub use optimization::{WorkflowOptimizer, OptimizationStrategy, OptimizationResult, ParallelExecutionPlan};
100#[cfg(feature = "activities-http")]
101pub use integrations::HttpIntegration;
102#[cfg(feature = "activities-db")]
103pub use integrations::{DatabaseIntegration, MessageQueueIntegration};
104pub use integrations::{IntegrationManager, Integration};
105
106pub struct WorkflowEngineBuilder {
108 storage_backend: Option<StorageBackend>,
109}
110
111impl WorkflowEngineBuilder {
112 pub fn new() -> Self {
113 Self {
114 storage_backend: Some(StorageBackend::Memory), }
116 }
117
118 pub fn with_storage_backend(mut self, backend: StorageBackend) -> Self {
120 self.storage_backend = Some(backend);
121 self
122 }
123
124 pub fn with_memory_storage(mut self) -> Self {
126 self.storage_backend = Some(StorageBackend::Memory);
127 self
128 }
129
130 #[cfg(feature = "rocksdb")]
132 pub fn with_rocksdb_storage(mut self, path: impl Into<String>) -> Self {
133 self.storage_backend = Some(StorageBackend::RocksDB { path: path.into() });
134 self
135 }
136
137 #[cfg(feature = "sqlite")]
139 pub fn with_sqlite_storage(mut self, path: impl Into<String>) -> Self {
140 self.storage_backend = Some(StorageBackend::SQLite { path: path.into() });
141 self
142 }
143
144 pub async fn build(self) -> Result<ExtendedWorkflowEngine, WorkflowError> {
155 let storage = if let Some(backend) = self.storage_backend {
156 StorageFactory::create(backend).await?
157 } else {
163 return Err(WorkflowError::StorageError("No storage backend configured".to_string()));
164 };
165
166 let activity_registry = std::sync::Arc::new(ActivityRegistry::new());
167 let state_manager = std::sync::Arc::new(WorkflowStateManager::new());
168
169 let event_sourcing = std::sync::Arc::new(EventSourcingManager::new(std::sync::Arc::clone(&storage))
171 .with_snapshot_config(100, 10));
172 let snapshot_manager = std::sync::Arc::new(SnapshotManager::new(
173 std::sync::Arc::clone(&storage),
174 std::sync::Arc::clone(&event_sourcing)
175 ).with_config(50, 5));
176
177 let core_engine = std::sync::Arc::new(()); Ok(ExtendedWorkflowEngine {
182 core_engine,
183 storage,
184 activity_registry,
185 state_manager,
186 event_sourcing,
187 snapshot_manager,
188 executor: None,
189 distributed_executor: None,
190 })
191 }
192}
193
194pub struct ExtendedWorkflowEngine {
196 core_engine: std::sync::Arc<()>, storage: std::sync::Arc<dyn WorkflowStore>,
198 activity_registry: std::sync::Arc<ActivityRegistry>,
199 state_manager: std::sync::Arc<WorkflowStateManager>,
200 event_sourcing: std::sync::Arc<EventSourcingManager>,
201 snapshot_manager: std::sync::Arc<SnapshotManager>,
202 executor: Option<std::sync::Arc<WorkflowExecutor>>,
203 distributed_executor: Option<std::sync::Arc<DistributedWorkflowExecutor>>,
205}
206
207impl ExtendedWorkflowEngine {
208 pub fn builder() -> WorkflowEngineBuilder {
209 WorkflowEngineBuilder::new()
210 }
211
212 pub fn activity_registry(&self) -> &std::sync::Arc<ActivityRegistry> {
214 &self.activity_registry
215 }
216
217 pub async fn start_workflow(
219 &mut self,
220 workflow_ir: &WorkflowIR,
221 inputs: std::collections::HashMap<String, serde_json::Value>,
222 ) -> Result<WorkflowExecutionId, WorkflowError> {
223 let executor = self.executor.get_or_insert_with(|| {
224 std::sync::Arc::new(WorkflowExecutor::new(
225 std::sync::Arc::clone(&self.activity_registry),
226 std::sync::Arc::clone(&self.state_manager),
227 ))
228 });
229
230 executor.as_ref().start_workflow(workflow_ir, inputs).await
231 }
232
233 pub async fn wait_for_completion(
235 &self,
236 execution_id: WorkflowExecutionId,
237 timeout: Option<std::time::Duration>,
238 ) -> Result<WorkflowResult, WorkflowError> {
239 match self.storage.get_execution(&execution_id).await? {
242 Some(execution) => Ok(WorkflowResult {
243 execution_id,
244 status: execution.status,
245 outputs: execution.outputs,
246 execution_time: execution.start_time.signed_duration_since(chrono::Utc::now()).to_std()
247 .unwrap_or(std::time::Duration::from_secs(0)),
248 }),
249 None => Err(WorkflowError::WorkflowNotFound(execution_id.0)),
250 }
251 }
252
253 pub async fn get_execution_status(
255 &self,
256 execution_id: &WorkflowExecutionId,
257 ) -> Result<Option<ExecutionStatus>, WorkflowError> {
258 match self.storage.get_execution(execution_id).await? {
259 Some(execution) => Ok(Some(execution.status)),
260 None => Ok(None),
261 }
262 }
263
264 pub async fn get_execution(
266 &self,
267 execution_id: &WorkflowExecutionId,
268 ) -> Result<Option<WorkflowExecution>, WorkflowError> {
269 self.storage.get_execution(execution_id).await
270 }
271
272 pub async fn list_running_executions(&self) -> Result<Vec<WorkflowExecution>, WorkflowError> {
274 self.storage.get_running_executions().await
275 }
276
277 pub async fn cancel_execution(
279 &self,
280 execution_id: &WorkflowExecutionId,
281 ) -> Result<(), WorkflowError> {
282 if let Some(mut execution) = self.state_manager.get_execution(execution_id).await {
284 execution.status = ExecutionStatus::Cancelled;
285 execution.end_time = Some(chrono::Utc::now());
286 self.state_manager.update_execution(execution).await?;
287
288 self.event_sourcing.record_event(
290 execution_id,
291 ExecutionEventType::WorkflowCancelled,
292 HashMap::new(),
293 ).await?;
294 }
295 Ok(())
296 }
297
298 pub async fn get_execution_at_tx(
300 &self,
301 execution_id: &WorkflowExecutionId,
302 tx_id: TxId,
303 ) -> Option<WorkflowExecution> {
304 self.state_manager.get_execution_at(execution_id, Some(tx_id)).await
305 }
306
307 pub async fn get_execution_history(
309 &self,
310 execution_id: &WorkflowExecutionId,
311 ) -> Vec<(TxId, WorkflowExecution)> {
312 self.state_manager.get_execution_history(execution_id).await
313 }
314
315 pub async fn get_event_history(
317 &self,
318 execution_id: &WorkflowExecutionId,
319 ) -> Result<Vec<ExecutionEvent>, WorkflowError> {
320 self.event_sourcing.get_full_event_history(execution_id).await
321 }
322
323 pub async fn rebuild_execution_from_events(
325 &self,
326 execution_id: &WorkflowExecutionId,
327 ) -> Result<Option<WorkflowExecution>, WorkflowError> {
328 self.event_sourcing.rebuild_execution_from_events(execution_id).await
329 }
330
331 pub async fn create_snapshot(
333 &self,
334 execution_id: &WorkflowExecutionId,
335 ) -> Result<(), WorkflowError> {
336 if let Some(execution) = self.state_manager.get_execution(execution_id).await {
337 self.snapshot_manager.create_snapshot(&execution).await?;
338 }
339 Ok(())
340 }
341
342 pub async fn restore_from_snapshot(
344 &self,
345 execution_id: &WorkflowExecutionId,
346 ) -> Result<Option<WorkflowExecution>, WorkflowError> {
347 self.snapshot_manager.restore_from_snapshot(execution_id).await
348 }
349
350 pub async fn get_performance_stats(&self) -> HashMap<String, usize> {
352 self.snapshot_manager.get_performance_stats().await
353 }
354
355 pub fn event_sourcing(&self) -> &std::sync::Arc<EventSourcingManager> {
357 &self.event_sourcing
358 }
359
360 pub fn snapshot_manager(&self) -> &std::sync::Arc<SnapshotManager> {
362 &self.snapshot_manager
363 }
364
365 pub fn enable_distributed_execution(
367 &mut self,
368 local_node_id: String,
369 load_balancer: Arc<dyn LoadBalancer>,
370 ) {
371 let execution_manager = Arc::new(DistributedExecutionManager::new(
372 local_node_id,
373 load_balancer,
374 ));
375 self.distributed_executor = Some(Arc::new(DistributedWorkflowExecutor::new(
376 Arc::clone(&execution_manager)
377 )));
378 }
379
380 pub async fn submit_distributed_workflow(
382 &self,
383 execution_id: WorkflowExecutionId,
384 ) -> Result<String, WorkflowError> {
385 if let Some(distributed) = &self.distributed_executor {
386 distributed.execution_manager.submit_execution(execution_id).await
387 } else {
388 Err(WorkflowError::InvalidStrategy("Distributed execution not enabled".to_string()))
389 }
390 }
391
392 pub async fn get_cluster_health(&self) -> Result<ClusterHealth, WorkflowError> {
394 if let Some(distributed) = &self.distributed_executor {
395 Ok(distributed.cluster_health_check().await)
396 } else {
397 Err(WorkflowError::InvalidStrategy("Distributed execution not enabled".to_string()))
398 }
399 }
400
401 pub fn distributed_execution_manager(&self) -> Option<&std::sync::Arc<DistributedExecutionManager>> {
403 self.distributed_executor.as_ref()
404 .map(|d| &d.execution_manager)
405 }
406
407 pub fn is_distributed_enabled(&self) -> bool {
409 self.distributed_executor.is_some()
410 }
411}
412
413#[derive(Debug, Clone)]
415pub struct WorkflowResult {
416 pub execution_id: WorkflowExecutionId,
417 pub status: ExecutionStatus,
418 pub outputs: Option<std::collections::HashMap<String, serde_json::Value>>,
419 pub execution_time: std::time::Duration,
420}
421
422pub type WorkflowEngine = ExtendedWorkflowEngine;
426
427pub mod prelude {
429 pub use super::{
430 WorkflowEngine, ExtendedWorkflowEngine,
431 ActivityRegistry, Activity, WorkflowStore, ExecutionStatus, WorkflowExecutionId,
432 ServerlessWorkflowParser, EventSourcingManager, SnapshotManager,
433 DistributedCoordinator, RoundRobinBalancer, LeastLoadedBalancer,
435 SagaManager, SagaExecutionEngine, MonitoringManager, WorkflowOptimizer,
437 IntegrationManager,
438 };
439}
440
441#[cfg(test)]
442mod tests {
443 use super::*;
444 use std::collections::HashMap;
445 use std::sync::Arc;
446 use chrono::Utc;
447
448 struct MockWorkflowStore;
450
451 #[async_trait::async_trait]
452 impl WorkflowStore for MockWorkflowStore {
453 async fn save_execution(&self, _execution: &WorkflowExecution) -> Result<(), WorkflowError> {
454 Ok(())
455 }
456
457 async fn get_execution(&self, _execution_id: &WorkflowExecutionId) -> Result<Option<WorkflowExecution>, WorkflowError> {
458 Ok(None)
459 }
460
461 async fn update_execution(&self, _execution: &WorkflowExecution) -> Result<(), WorkflowError> {
462 Ok(())
463 }
464
465 async fn get_running_executions(&self) -> Result<Vec<WorkflowExecution>, WorkflowError> {
466 Ok(vec![])
467 }
468
469 async fn delete_execution(&self, _execution_id: &WorkflowExecutionId) -> Result<(), WorkflowError> {
470 Ok(())
471 }
472 }
473
474 struct MockStoragePort;
476
477 #[async_trait::async_trait]
478 impl kotoba_storage::port::StoragePort for MockStoragePort {
479 async fn get(&self, _key: &[u8]) -> Result<Option<Vec<u8>>, kotoba_storage::StorageError> {
480 Ok(None)
481 }
482
483 async fn put(&self, _key: &[u8], _value: &[u8]) -> Result<(), kotoba_storage::StorageError> {
484 Ok(())
485 }
486
487 async fn delete(&self, _key: &[u8]) -> Result<(), kotoba_storage::StorageError> {
488 Ok(())
489 }
490
491 async fn scan(&self, _prefix: &[u8]) -> Result<Vec<(Vec<u8>, Vec<u8>)>, kotoba_storage::StorageError> {
492 Ok(vec![])
493 }
494 }
495
496 #[test]
497 fn test_workflow_engine_builder_creation() {
498 let builder = WorkflowEngineBuilder::new();
499
500 assert!(builder.storage_backend.is_some());
502 assert!(builder.kotoba_backend.is_none());
503 assert!(matches!(builder.storage_backend.as_ref().unwrap(), StorageBackend::Memory));
504 }
505
506 #[test]
507 fn test_workflow_engine_builder_with_memory_storage() {
508 let builder = WorkflowEngineBuilder::new().with_memory_storage();
509
510 assert!(matches!(builder.storage_backend.as_ref().unwrap(), StorageBackend::Memory));
511 }
512
513 #[test]
514 fn test_workflow_engine_builder_with_storage_backend() {
515 let builder = WorkflowEngineBuilder::new().with_storage_backend(StorageBackend::Memory);
516
517 assert!(matches!(builder.storage_backend.as_ref().unwrap(), StorageBackend::Memory));
518 }
519
520 #[cfg(feature = "rocksdb")]
521 #[test]
522 fn test_workflow_engine_builder_with_rocksdb_storage() {
523 let path = "/tmp/test_db";
524 let builder = WorkflowEngineBuilder::new().with_rocksdb_storage(path);
525
526 match builder.storage_backend.as_ref().unwrap() {
527 StorageBackend::RocksDB { path: db_path } => assert_eq!(db_path, path),
528 _ => panic!("Expected RocksDB storage backend"),
529 }
530 }
531
532 #[cfg(feature = "sqlite")]
533 #[test]
534 fn test_workflow_engine_builder_with_sqlite_storage() {
535 let path = "/tmp/test.db";
536 let builder = WorkflowEngineBuilder::new().with_sqlite_storage(path);
537
538 match builder.storage_backend.as_ref().unwrap() {
539 StorageBackend::SQLite { path: db_path } => assert_eq!(db_path, path),
540 _ => panic!("Expected SQLite storage backend"),
541 }
542 }
543
544 #[test]
545 fn test_workflow_engine_builder_with_kotoba_storage() {
546 let mock_storage = Arc::new(MockStoragePort);
547 let builder = WorkflowEngineBuilder::new().with_kotoba_storage(mock_storage);
548
549 assert!(builder.kotoba_backend.is_some());
550 assert!(builder.storage_backend.is_none()); }
552
553 #[tokio::test]
554 async fn test_workflow_engine_builder_build_with_memory() {
555 let builder = WorkflowEngineBuilder::new();
556 let result = builder.build().await;
557
558 assert!(result.is_ok());
559 let engine = result.unwrap();
560
561 assert!(engine.activity_registry().as_ref().is_send());
563 assert!(engine.event_sourcing.as_ref().is_send());
564 assert!(engine.snapshot_manager.as_ref().is_send());
565 }
566
567 #[tokio::test]
568 async fn test_workflow_engine_builder_build_with_kotoba_storage() {
569 let mock_storage = Arc::new(MockStoragePort);
570 let builder = WorkflowEngineBuilder::new().with_kotoba_storage(mock_storage);
571
572 let result = builder.build().await;
573 assert!(result.is_ok());
574 }
575
576 #[test]
577 fn test_extended_workflow_engine_builder_method() {
578 let builder = ExtendedWorkflowEngine::builder();
579
580 assert!(builder.storage_backend.is_some());
582 }
583
584 #[test]
585 fn test_workflow_result_creation() {
586 let execution_id = WorkflowExecutionId(uuid::Uuid::new_v4());
587 let status = ExecutionStatus::Completed;
588 let execution_time = std::time::Duration::from_secs(5);
589
590 let result = WorkflowResult {
591 execution_id,
592 status,
593 outputs: None,
594 execution_time,
595 };
596
597 assert_eq!(result.execution_id, execution_id);
598 assert_eq!(result.status, status);
599 assert_eq!(result.execution_time, execution_time);
600 assert!(result.outputs.is_none());
601 }
602
603 #[test]
604 fn test_workflow_result_with_outputs() {
605 let execution_id = WorkflowExecutionId(uuid::Uuid::new_v4());
606 let status = ExecutionStatus::Completed;
607 let execution_time = std::time::Duration::from_millis(1500);
608
609 let workflow_execution = WorkflowExecution {
611 execution_id,
612 workflow_ir: WorkflowIR::default(),
613 status,
614 inputs: HashMap::new(),
615 outputs: Some(serde_json::json!({"result": "success"})),
616 start_time: Utc::now(),
617 end_time: Some(Utc::now()),
618 error_message: None,
619 };
620
621 let result = WorkflowResult {
622 execution_id,
623 status,
624 outputs: Some(workflow_execution),
625 execution_time,
626 };
627
628 assert_eq!(result.execution_id, execution_id);
629 assert_eq!(result.execution_time, execution_time);
630 assert!(result.outputs.is_some());
631
632 let outputs = result.outputs.unwrap();
633 assert_eq!(outputs.execution_id, execution_id);
634 assert_eq!(outputs.status, status);
635 }
636
637 #[tokio::test]
638 async fn test_extended_workflow_engine_activity_registry() {
639 let builder = WorkflowEngineBuilder::new();
640 let engine = builder.build().await.unwrap();
641
642 let registry = engine.activity_registry();
643
644 assert!(registry.as_ref().is_send());
646 }
647
648 #[tokio::test]
649 async fn test_extended_workflow_engine_get_execution_status() {
650 let builder = WorkflowEngineBuilder::new();
651 let engine = builder.build().await.unwrap();
652
653 let execution_id = WorkflowExecutionId(uuid::Uuid::new_v4());
654
655 let result = engine.get_execution_status(&execution_id).await;
657 assert!(result.is_ok());
658 assert!(result.unwrap().is_none());
659 }
660
661 #[tokio::test]
662 async fn test_extended_workflow_engine_get_execution() {
663 let builder = WorkflowEngineBuilder::new();
664 let engine = builder.build().await.unwrap();
665
666 let execution_id = WorkflowExecutionId(uuid::Uuid::new_v4());
667
668 let result = engine.get_execution(&execution_id).await;
670 assert!(result.is_ok());
671 assert!(result.unwrap().is_none());
672 }
673
674 #[tokio::test]
675 async fn test_extended_workflow_engine_list_running_executions() {
676 let builder = WorkflowEngineBuilder::new();
677 let engine = builder.build().await.unwrap();
678
679 let result = engine.list_running_executions().await;
681 assert!(result.is_ok());
682 assert!(result.unwrap().is_empty());
683 }
684
685 #[tokio::test]
686 async fn test_extended_workflow_engine_cancel_execution() {
687 let builder = WorkflowEngineBuilder::new();
688 let engine = builder.build().await.unwrap();
689
690 let execution_id = WorkflowExecutionId(uuid::Uuid::new_v4());
691
692 let result = engine.cancel_execution(&execution_id).await;
694 assert!(result.is_ok());
695 }
696
697 #[tokio::test]
698 async fn test_extended_workflow_engine_create_snapshot() {
699 let builder = WorkflowEngineBuilder::new();
700 let engine = builder.build().await.unwrap();
701
702 let execution_id = WorkflowExecutionId(uuid::Uuid::new_v4());
703
704 let result = engine.create_snapshot(&execution_id).await;
706 assert!(result.is_ok());
707 }
708
709 #[tokio::test]
710 async fn test_extended_workflow_engine_restore_from_snapshot() {
711 let builder = WorkflowEngineBuilder::new();
712 let engine = builder.build().await.unwrap();
713
714 let execution_id = WorkflowExecutionId(uuid::Uuid::new_v4());
715
716 let result = engine.restore_from_snapshot(&execution_id).await;
718 assert!(result.is_ok());
719 assert!(result.unwrap().is_none());
720 }
721
722 #[tokio::test]
723 async fn test_extended_workflow_engine_get_performance_stats() {
724 let builder = WorkflowEngineBuilder::new();
725 let engine = builder.build().await.unwrap();
726
727 let stats = engine.get_performance_stats().await;
728
729 assert!(stats.is_empty() || !stats.is_empty()); }
732
733 #[tokio::test]
734 async fn test_extended_workflow_engine_event_sourcing_access() {
735 let builder = WorkflowEngineBuilder::new();
736 let engine = builder.build().await.unwrap();
737
738 let event_sourcing = engine.event_sourcing();
739
740 assert!(event_sourcing.as_ref().is_send());
742 }
743
744 #[tokio::test]
745 async fn test_extended_workflow_engine_snapshot_manager_access() {
746 let builder = WorkflowEngineBuilder::new();
747 let engine = builder.build().await.unwrap();
748
749 let snapshot_manager = engine.snapshot_manager();
750
751 assert!(snapshot_manager.as_ref().is_send());
753 }
754
755 #[test]
756 fn test_extended_workflow_engine_is_distributed_enabled_initially_false() {
757 let builder = WorkflowEngineBuilder::new();
758 let engine = builder.build().await.unwrap();
759
760 assert!(!engine.is_distributed_enabled());
762 }
763
764 #[test]
765 fn test_extended_workflow_engine_distributed_execution_manager_initially_none() {
766 let builder = WorkflowEngineBuilder::new();
767 let engine = builder.build().await.unwrap();
768
769 assert!(engine.distributed_execution_manager().is_none());
771 }
772
773 #[tokio::test]
774 async fn test_extended_workflow_engine_submit_distributed_workflow_without_setup() {
775 let builder = WorkflowEngineBuilder::new();
776 let mut engine = builder.build().await.unwrap();
777
778 let execution_id = WorkflowExecutionId(uuid::Uuid::new_v4());
779
780 let result = engine.submit_distributed_workflow(execution_id).await;
782 assert!(result.is_err());
783
784 if let Err(WorkflowError::InvalidStrategy(msg)) = result {
785 assert!(msg.contains("Distributed execution not enabled"));
786 } else {
787 panic!("Expected InvalidStrategy error");
788 }
789 }
790
791 #[tokio::test]
792 async fn test_extended_workflow_engine_get_cluster_health_without_setup() {
793 let builder = WorkflowEngineBuilder::new();
794 let engine = builder.build().await.unwrap();
795
796 let result = engine.get_cluster_health().await;
798 assert!(result.is_err());
799
800 if let Err(WorkflowError::InvalidStrategy(msg)) = result {
801 assert!(msg.contains("Distributed execution not enabled"));
802 } else {
803 panic!("Expected InvalidStrategy error");
804 }
805 }
806
807 #[tokio::test]
808 async fn test_extended_workflow_engine_start_workflow() {
809 let builder = WorkflowEngineBuilder::new();
810 let mut engine = builder.build().await.unwrap();
811
812 let workflow_ir = WorkflowIR::default();
813 let inputs = HashMap::new();
814
815 let result = engine.start_workflow(&workflow_ir, inputs).await;
817
818 assert!(result.is_ok() || result.is_err());
820 }
821
822 #[tokio::test]
823 async fn test_extended_workflow_engine_wait_for_completion() {
824 let builder = WorkflowEngineBuilder::new();
825 let engine = builder.build().await.unwrap();
826
827 let execution_id = WorkflowExecutionId(uuid::Uuid::new_v4());
828 let timeout = Some(std::time::Duration::from_secs(5));
829
830 let result = engine.wait_for_completion(execution_id, timeout).await;
832
833 assert!(result.is_err());
835 }
836
837 #[test]
838 fn test_workflow_result_debug_formatting() {
839 let execution_id = WorkflowExecutionId(uuid::Uuid::new_v4());
840 let result = WorkflowResult {
841 execution_id,
842 status: ExecutionStatus::Running,
843 outputs: None,
844 execution_time: std::time::Duration::from_secs(10),
845 };
846
847 let debug_str = format!("{:?}", result);
848 assert!(debug_str.contains("WorkflowResult"));
849 assert!(debug_str.contains("Running"));
850 }
851
852 #[test]
853 fn test_workflow_result_clone() {
854 let execution_id = WorkflowExecutionId(uuid::Uuid::new_v4());
855 let original = WorkflowResult {
856 execution_id,
857 status: ExecutionStatus::Failed,
858 outputs: None,
859 execution_time: std::time::Duration::from_millis(500),
860 };
861
862 let cloned = original.clone();
863
864 assert_eq!(original.execution_id, cloned.execution_id);
865 assert_eq!(original.status, cloned.status);
866 assert_eq!(original.outputs, cloned.outputs);
867 assert_eq!(original.execution_time, cloned.execution_time);
868 }
869
870 #[test]
871 fn test_workflow_engine_builder_chaining() {
872 let builder = WorkflowEngineBuilder::new()
873 .with_memory_storage()
874 .with_storage_backend(StorageBackend::Memory);
875
876 assert!(builder.storage_backend.is_some());
878 }
879
880 #[tokio::test]
881 async fn test_multiple_workflow_engines_independence() {
882 let builder1 = WorkflowEngineBuilder::new();
883 let builder2 = WorkflowEngineBuilder::new();
884
885 let engine1 = builder1.build().await.unwrap();
886 let engine2 = builder2.build().await.unwrap();
887
888 assert!(!engine1.is_distributed_enabled());
890 assert!(!engine2.is_distributed_enabled());
891
892 assert!(!Arc::ptr_eq(engine1.activity_registry(), engine2.activity_registry()));
894 }
895
896 #[test]
897 fn test_storage_backend_enum_variants() {
898 let _memory = StorageBackend::Memory;
900
901 #[cfg(feature = "rocksdb")]
902 let _rocksdb = StorageBackend::RocksDB { path: "/tmp/test".to_string() };
903
904 #[cfg(feature = "sqlite")]
905 let _sqlite = StorageBackend::SQLite { path: "/tmp/test.db".to_string() };
906 }
907
908 #[test]
909 fn test_workflow_engine_type_alias() {
910 let _engine: WorkflowEngine = ExtendedWorkflowEngine {
912 core_engine: kotoba_workflow_core::WorkflowEngine::new(),
913 storage: Arc::new(MockWorkflowStore),
914 activity_registry: Arc::new(ActivityRegistry::new()),
915 state_manager: Arc::new(WorkflowStateManager::new()),
916 event_sourcing: Arc::new(EventSourcingManager::new(Arc::new(MockWorkflowStore)).with_snapshot_config(100, 10)),
917 snapshot_manager: Arc::new(SnapshotManager::new(Arc::new(MockWorkflowStore), Arc::new(EventSourcingManager::new(Arc::new(MockWorkflowStore)).with_snapshot_config(100, 10))).with_config(50, 5)),
918 executor: None,
919 distributed_executor: None,
920 };
921 }
922
923 #[test]
924 fn test_prelude_exports() {
925 let _builder = prelude::WorkflowEngine::builder();
927 let _workflow_ir: prelude::WorkflowIR = WorkflowIR::default();
928 }
929
930 #[tokio::test]
931 async fn test_extended_workflow_engine_get_execution_at_tx() {
932 let builder = WorkflowEngineBuilder::new();
933 let engine = builder.build().await.unwrap();
934
935 let execution_id = WorkflowExecutionId(uuid::Uuid::new_v4());
936 let tx_id = TxId::new(1);
937
938 let result = engine.get_execution_at_tx(&execution_id, tx_id).await;
940 assert!(result.is_none());
941 }
942
943 #[tokio::test]
944 async fn test_extended_workflow_engine_get_execution_history() {
945 let builder = WorkflowEngineBuilder::new();
946 let engine = builder.build().await.unwrap();
947
948 let execution_id = WorkflowExecutionId(uuid::Uuid::new_v4());
949
950 let history = engine.get_execution_history(&execution_id).await;
952 assert!(history.is_empty());
953 }
954
955 #[tokio::test]
956 async fn test_extended_workflow_engine_get_event_history() {
957 let builder = WorkflowEngineBuilder::new();
958 let engine = builder.build().await.unwrap();
959
960 let execution_id = WorkflowExecutionId(uuid::Uuid::new_v4());
961
962 let result = engine.get_event_history(&execution_id).await;
964 assert!(result.is_ok());
965 assert!(result.unwrap().is_empty());
966 }
967
968 #[tokio::test]
969 async fn test_extended_workflow_engine_rebuild_execution_from_events() {
970 let builder = WorkflowEngineBuilder::new();
971 let engine = builder.build().await.unwrap();
972
973 let execution_id = WorkflowExecutionId(uuid::Uuid::new_v4());
974
975 let result = engine.rebuild_execution_from_events(&execution_id).await;
977 assert!(result.is_ok());
978 assert!(result.unwrap().is_none());
979 }
980
981 #[test]
982 fn test_workflow_result_serialization() {
983 let execution_id = WorkflowExecutionId(uuid::Uuid::new_v4());
984 let result = WorkflowResult {
985 execution_id,
986 status: ExecutionStatus::Completed,
987 outputs: None,
988 execution_time: std::time::Duration::from_secs(30),
989 };
990
991 let json_result = serde_json::to_string(&result);
993 assert!(json_result.is_ok());
994
995 let json_str = json_result.unwrap();
996 assert!(json_str.contains("Completed"));
997 assert!(json_str.contains("30"));
998
999 let deserialized_result: serde_json::Result<WorkflowResult> = serde_json::from_str(&json_str);
1001 assert!(deserialized_result.is_ok());
1002
1003 let deserialized = deserialized_result.unwrap();
1004 assert_eq!(deserialized.status, ExecutionStatus::Completed);
1005 assert_eq!(deserialized.execution_time, std::time::Duration::from_secs(30));
1006 }
1007
1008 #[test]
1009 fn test_workflow_engine_builder_debug() {
1010 let builder = WorkflowEngineBuilder::new();
1011 let debug_str = format!("{:?}", builder);
1012 assert!(debug_str.contains("WorkflowEngineBuilder"));
1013 }
1014
1015 #[test]
1016 fn test_extended_workflow_engine_debug() {
1017 let builder = WorkflowEngineBuilder::new();
1018 let engine = builder.build().await.unwrap();
1019 let debug_str = format!("{:?}", engine);
1020 assert!(debug_str.contains("ExtendedWorkflowEngine"));
1021 }
1022}