Kotoba Workflow Engine (Itonami)
Temporal-inspired workflow engine built on top of Kotoba's graph rewriting system.
Overview
Itonami provides a powerful workflow execution engine that combines:
- Temporal Patterns: Sequence, Parallel, Decision, Wait, Saga, Activity, Sub-workflow
- Graph-based Execution: Declarative workflow definition using graph transformations
- MVCC Persistence: Workflow state management with Merkle DAG
- Activity System: Extensible activity execution framework
- Event Sourcing: Complete audit trail of workflow execution
Architecture
┌─────────────────────────────────────────────────────────────┐
│ Workflow Engine Layer │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Workflow Definition (.kotoba) │ │
│ │ - WorkflowIR: Temporalパターンの宣言的定義 │ │
│ │ - StrategyIR: 拡張(Parallel, Wait, Compensation) │ │
│ └─────────────────────────────────────────────────────────┘ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Execution Engine │ │
│ │ - WorkflowExecutor: ワークフロー実行器 │ │
│ │ - ActivityExecutor: Activity実行器 │ │
│ │ - StateManager: MVCCベース状態管理 │ │
│ └─────────────────────────────────────────────────────────┘ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Persistence Layer │ │
│ │ - WorkflowStore: ワークフロー永続化 │ │
│ │ - EventStore: イベント/メッセージ永続化 │ │
│ └─────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘
│
▼ (extends)
┌─────────────────────────────────────────────────────────────┐
│ Kotoba Core Engine │
│ - Graph Store (MVCC + Merkle) │
│ - Rule Engine (DPO) │
│ - Query Engine (GQL) │
│ - Distributed Execution │
└─────────────────────────────────────────────────────────────┘
Features
Workflow Patterns
- Sequence: Execute activities in order
- Parallel: Execute activities concurrently
- Decision: Conditional branching based on data
- Wait: Wait for events, timers, or signals
- Saga: Long-running transactions with compensation
- Activity: Execute external tasks (HTTP, DB, functions)
- Sub-workflow: Call other workflows
Persistence
- MVCC-based State: Immutable workflow state with versioning
- Merkle DAG: Content-addressable state snapshots
- Event Sourcing: Complete audit trail of execution history
- Snapshots: Performance optimization for long-running workflows
Activity System
- Extensible: Easy to add new activity types
- Timeout Support: Configurable timeouts per activity
- Retry Policies: Exponential backoff and custom retry logic
- Built-in Activities: HTTP, Database, Function calls
Phase 2 Features
MVCC-based State Management
Workflow executions now use Multi-Version Concurrency Control (MVCC) for:
- Versioned State: Each state change creates a new version with TxId
- Point-in-Time Queries: Query workflow state at any transaction point
- Concurrent Access: Multiple readers can access different versions simultaneously
- Conflict Resolution: Optimistic concurrency control for state updates
// Get workflow state at specific transaction
let execution_at_tx = engine.get_execution_at_tx.await;
// Get complete version history
let history = engine.get_execution_history.await;
Event Sourcing
Complete audit trail with event sourcing:
- Immutable Events: All state changes recorded as events
- Event Replay: Rebuild workflow state from event history
- Event Types: Started, ActivityScheduled, Completed, Failed, etc.
- Performance Optimization: Automatic snapshot creation
// Get full event history
let events = engine.get_event_history.await?;
// Rebuild execution from events (for recovery)
let execution = engine.rebuild_execution_from_events.await?;
Distributed Execution
Cluster-wide workflow distribution:
- Load Balancing: Round-robin and least-loaded strategies
- Node Management: Automatic node discovery and health monitoring
- Failover: Automatic task reassignment on node failure
- Cluster Health: Real-time monitoring of cluster status
// Enable distributed execution
engine.enable_distributed_execution;
// Submit workflow for distributed execution
let task_id = engine.submit_distributed_workflow.await?;
// Check cluster health
let health = engine.get_cluster_health.await?;
Snapshot Optimization
Performance optimization for long-running workflows:
- Automatic Snapshots: Periodic state snapshots to reduce replay time
- Configurable Intervals: Customize snapshot frequency
- Fast Recovery: Restore from snapshots + recent events
- Storage Efficiency: Automatic cleanup of old snapshots
Phase 3 Features
Advanced Saga Pattern Implementation
Enterprise-grade Saga pattern support with comprehensive compensation logic:
- Saga State Management: Complete lifecycle tracking of Saga transactions
- Compensation Orchestration: Automatic rollback with custom compensation strategies
- Dependency Resolution: Intelligent handling of Activity dependencies
- Saga Monitoring: Real-time tracking of Saga progress and failures
- Timeout Handling: Configurable timeouts for Saga transactions
// Create advanced Saga pattern
let saga_pattern = AdvancedSagaPattern ;
// Execute advanced Saga
let result = saga_engine.execute_advanced_saga.await?;
Comprehensive Monitoring & Observability
Production-ready monitoring with metrics, tracing, and logging:
- Metrics Collection: Counter, Gauge, Histogram, and Summary metrics
- Distributed Tracing: End-to-end request tracing with spans and events
- Structured Logging: Contextual logging with execution and activity tracking
- Health Checks: System health monitoring with component status
- Performance Analytics: Workflow and Activity performance statistics
// Configure monitoring
let monitoring_config = MonitoringConfig ;
// Track workflow execution
monitor.track_workflow_event.await?;
// Get performance stats
let stats = monitor.get_workflow_stats?;
println!;
Intelligent Workflow Optimization
Advanced optimization engine with multiple strategies:
- Cost-Based Optimization: Minimize execution costs based on resource pricing
- Performance Optimization: Maximize throughput and minimize latency
- Parallel Execution Planning: Automatic detection and optimization of parallelizable tasks
- Resource-Aware Scheduling: Consider resource constraints in optimization
- Historical Learning: Use execution history to improve future optimizations
// Create optimization engine
let optimizer = new;
// Add optimization rules
optimizer.add_rule;
optimizer.add_rule;
// Optimize workflow
let result = optimizer.optimize_workflow.await?;
println!;
External System Integrations
Seamless integration with external systems:
- HTTP Integration: REST API calls with retry and timeout support
- Database Integration: SQL database operations with connection pooling
- Message Queue Integration: Publish/consume messages with AMQP support
- Cloud Storage Integration: AWS S3, GCP Cloud Storage, Azure Blob Storage
- Email Integration: SMTP-based email sending
- Webhook Integration: HTTP webhook notifications
// Setup integrations
let mut integration_manager = new;
// Add HTTP integration
integration_manager.register_integration;
// Execute integration
let result = integration_manager.execute_integration.await?;
println!;
Advanced Features Summary
Feature | Description | Benefits |
---|---|---|
Saga Patterns | Distributed transaction management with compensation | Reliable complex workflows |
Monitoring | Metrics, tracing, logging, health checks | Production observability |
Optimization | Cost-based, performance, parallel execution | Efficient resource usage |
Integrations | HTTP, DB, MQ, Cloud, Email, Webhooks | Seamless system connectivity |
Health Checks | Component status monitoring | Proactive failure detection |
Performance Analytics | Historical performance analysis | Continuous improvement |
Usage
Basic Example
use ;
async
Workflow Definition (.kotoba)
{
workflow: {
id: "order_processing",
name: "Order Processing Workflow",
version: "1.0.0",
inputs: [
{ name: "orderId", type: "string", required: true },
{ name: "customerId", type: "string", required: true },
{ name: "amount", type: "number", required: true },
],
outputs: [
{ name: "processed", type: "boolean" },
{ name: "confirmationId", type: "string" },
],
strategy: {
op: "saga",
main_flow: {
op: "seq",
strategies: [
{
op: "activity",
activity_ref: "validate_order",
input_mapping: {
order_id: "$.inputs.orderId",
customer_id: "$.inputs.customerId",
},
},
{
op: "parallel",
branches: [
{
op: "activity",
activity_ref: "process_payment",
input_mapping: { amount: "$.inputs.amount" },
},
{
op: "activity",
activity_ref: "reserve_inventory",
input_mapping: { order_id: "$.inputs.orderId" },
},
],
},
{
op: "activity",
activity_ref: "send_confirmation",
},
],
},
compensation: {
op: "seq",
strategies: [
{ op: "activity", activity_ref: "cancel_payment" },
{ op: "activity", activity_ref: "release_inventory" },
{ op: "activity", activity_ref: "send_failure_notification" },
],
},
},
timeout: "PT30M",
},
}
Activity Implementation
use *;
async
Comparison with Temporal
Aspect | Temporal | Itonami |
---|---|---|
Execution Model | Strict workflow control | Graph-based declarative execution |
Persistence | Event sourcing + snapshots | MVCC + Merkle DAG |
Language | Go | Rust (with .kotoba DSL) |
Activity Types | SDK-based | Extensible trait system |
Deployment | Dedicated server | Embedded in Kotoba |
Query Language | SQL-like | GQL integration |
State Management | Temporal server | Kotoba graph store |
Roadmap
Phase 1: Core Implementation ✅
- WorkflowIR definition
- StrategyIR extensions (Temporal patterns)
- Activity system
- Basic execution engine
Phase 2: Persistence & Distribution ✅
- MVCC-based state management
- Event sourcing implementation
- Distributed execution support
- Snapshot optimization
Phase 3: Advanced Features ✅
- Saga pattern full implementation
- Monitoring and observability
- Workflow optimization
- External system integrations
Phase 4: Ecosystem 🌟
- Workflow designer UI
- Pre-built activity libraries
- Kubernetes operator
- Cloud-native integrations
Contributing
Contributions are welcome! Please see the main Kotoba repository for contribution guidelines.
License
Licensed under MIT OR Apache-2.0