duroxide 0.1.27

Durable code execution framework for Rust
Documentation
# SQLite Provider Design for Duroxide

## Status: Implemented ✅

The SQLite provider has been successfully implemented with full transactional support.

## Executive Summary

SQLite is an excellent choice for a transactional Provider because:
- **ACID compliance** with full transactional support
- **Single-file deployment** with zero configuration
- **Excellent performance** for local/embedded scenarios
- **Battle-tested reliability** used in millions of applications
- **Built-in concurrency** with proper locking mechanisms

## Schema Design

### Core Tables

```sql
-- Instance metadata and current state
CREATE TABLE instances (
    instance_id TEXT PRIMARY KEY,
    orchestration_name TEXT NOT NULL,
    orchestration_version TEXT NOT NULL,
    current_execution_id INTEGER NOT NULL DEFAULT 1,
    status TEXT NOT NULL DEFAULT 'Running', -- Running, Completed, Failed, ContinuedAsNew
    parent_instance_id TEXT,  -- For sub-orchestrations: tracks parent for cascade deletion
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    FOREIGN KEY (parent_instance_id) REFERENCES instances(instance_id) ON DELETE CASCADE
);

CREATE INDEX idx_instances_parent ON instances(parent_instance_id);

-- Multi-execution support for ContinueAsNew
CREATE TABLE executions (
    instance_id TEXT NOT NULL,
    execution_id INTEGER NOT NULL,
    status TEXT NOT NULL DEFAULT 'Running',
    started_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    completed_at TIMESTAMP,
    PRIMARY KEY (instance_id, execution_id),
    FOREIGN KEY (instance_id) REFERENCES instances(instance_id)
);

-- Event history (append-only)
CREATE TABLE history (
    instance_id TEXT NOT NULL,
    execution_id INTEGER NOT NULL,
    sequence_num INTEGER NOT NULL,
    event_type TEXT NOT NULL,
    event_data TEXT NOT NULL, -- JSON serialized Event
    timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    PRIMARY KEY (instance_id, execution_id, sequence_num),
    FOREIGN KEY (instance_id, execution_id) REFERENCES executions(instance_id, execution_id)
);

-- Orchestrator queue with visibility support (also handles timers via visible_at)
CREATE TABLE orchestrator_queue (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    instance_id TEXT NOT NULL,
    work_item TEXT NOT NULL, -- JSON serialized WorkItem
    visible_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    lock_token TEXT,
    locked_until TIMESTAMP,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    INDEX idx_visible (visible_at, lock_token),
    INDEX idx_instance (instance_id)
);

-- Worker queue for activity execution
CREATE TABLE worker_queue (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    work_item TEXT NOT NULL, -- JSON serialized WorkItem
    visible_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    lock_token TEXT,
    locked_until TIMESTAMP,
    attempt_count INTEGER NOT NULL DEFAULT 0,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    session_id TEXT,                  -- Session affinity routing key
    INDEX idx_available (visible_at, lock_token, id),
    INDEX idx_session (session_id)
);

-- Session affinity tracking (flow table)
CREATE TABLE sessions (
    session_id     TEXT PRIMARY KEY,  -- Flow key
    worker_id      TEXT NOT NULL,     -- Owning backend
    locked_until   INTEGER NOT NULL,  -- Heartbeat lease expiry
    last_activity_at INTEGER NOT NULL -- Last activity flow timestamp
);
```

### Key Design Decisions

1. **Separate queues** for orchestrator and worker (timers reuse the orchestrator queue via delayed visibility) to avoid contention
2. **Lock tokens with expiry** for crash recovery
3. **Visibility timestamps** for delayed message processing
4. **Normalized schema** for efficient queries and updates
5. **Session table as a flow table** — routes session-bound activities to owning workers (analogous to network flow affinity)
5. **JSON storage** for flexible event/work item serialization

## Implementation Plan

### Phase 1: Core Provider Structure

```rust
use sqlx::{SqlitePool, Transaction, Sqlite};
use serde_json;
use std::time::{SystemTime, Duration};
use crate::providers::{Provider, WorkItem, OrchestrationItem};
use crate::Event;

pub struct SqliteProvider {
    pool: SqlitePool,
    lock_timeout: Duration,
    history_cap: usize,
}

impl SqliteProvider {
    pub async fn new(database_url: &str) -> Result<Self, sqlx::Error> {
        let pool = SqlitePool::connect(database_url).await?;
        
        // Run migrations
        sqlx::migrate!("./migrations")
            .run(&pool)
            .await?;
        
        Ok(Self {
            pool,
            lock_timeout: Duration::from_secs(30),
            history_cap: 1024,
        })
    }
    
    async fn with_transaction<F, R>(&self, f: F) -> Result<R, String>
    where
        F: FnOnce(&mut Transaction<'_, Sqlite>) -> Result<R, sqlx::Error>,
    {
        let mut tx = self.pool.begin().await.map_err(|e| e.to_string())?;
        match f(&mut tx) {
            Ok(result) => {
                tx.commit().await.map_err(|e| e.to_string())?;
                Ok(result)
            }
            Err(e) => {
                tx.rollback().await.ok();
                Err(e.to_string())
            }
        }
    }
}
```

### Phase 2: Core Atomic Methods

```rust
#[async_trait::async_trait]
impl Provider for SqliteProvider {
    async fn fetch_orchestration_item(&self) -> Option<OrchestrationItem> {
        let mut tx = self.pool.begin().await.ok()?;
        
        // Get next available instance with work
        let row = sqlx::query!(
            r#"
            SELECT DISTINCT instance_id
            FROM orchestrator_queue
            WHERE visible_at <= datetime('now')
              AND (lock_token IS NULL OR locked_until < datetime('now'))
            ORDER BY visible_at
            LIMIT 1
            "#
        )
        .fetch_optional(&mut tx)
        .await
        .ok()??;
        
        let instance_id = row.instance_id;
        let lock_token = generate_lock_token();
        let locked_until = SystemTime::now() + self.lock_timeout;
        
        // Lock all messages for this instance
        sqlx::query!(
            r#"
            UPDATE orchestrator_queue
            SET lock_token = ?1, locked_until = ?2
            WHERE instance_id = ?3
              AND visible_at <= datetime('now')
              AND (lock_token IS NULL OR locked_until < datetime('now'))
            "#,
            lock_token,
            locked_until,
            instance_id
        )
        .execute(&mut tx)
        .await
        .ok()?;
        
        // Fetch locked messages
        let messages = sqlx::query!(
            r#"
            SELECT id, work_item
            FROM orchestrator_queue
            WHERE lock_token = ?1
            ORDER BY id
            "#,
            lock_token
        )
        .fetch_all(&mut tx)
        .await
        .ok()?;
        
        // Deserialize work items
        let work_items: Vec<WorkItem> = messages
            .iter()
            .filter_map(|r| serde_json::from_str(&r.work_item).ok())
            .collect();
        
        // Load history
        let history = self.read_history_in_tx(&mut tx, &instance_id).await.ok()?;
        
        tx.commit().await.ok()?;
        
        Some(OrchestrationItem {
            instance: instance_id,
            messages: work_items,
            history,
            lock_token,
        })
    }
    
    async fn ack_orchestration_item(
        &self,
        lock_token: &str,
        history_delta: Vec<Event>,
        worker_items: Vec<WorkItem>,
        orchestrator_items: Vec<WorkItem>,
    ) -> Result<(), String> {
        self.with_transaction(|tx| async move {
            // 1. Delete acknowledged messages
            sqlx::query!(
                "DELETE FROM orchestrator_queue WHERE lock_token = ?",
                lock_token
            )
            .execute(tx)
            .await?;
            
            // 2. Get instance from the lock
            let instance = self.get_instance_from_lock(tx, lock_token).await?;
            
            // 3. Append history
            self.append_history_in_tx(tx, &instance, history_delta).await?;
            
            // 4. Enqueue worker items
            for item in worker_items {
                self.enqueue_worker_in_tx(tx, item).await?;
            }
            
            // 5. Enqueue orchestrator items (may include TimerFired with delayed visibility)
            for item in orchestrator_items {
                self.enqueue_orchestrator_in_tx(tx, item).await?;
            }
            
            Ok(())
        }).await
    }
}
```

## Advantages Over Filesystem Provider

### 1. **True ACID Transactions**
```sql
BEGIN TRANSACTION;
-- All operations succeed or fail together
DELETE FROM orchestrator_queue WHERE lock_token = ?;
INSERT INTO history ...;
INSERT INTO worker_queue ...;
-- TimerFired items reuse orchestrator_queue via delayed visibility
INSERT INTO orchestrator_queue ...;
COMMIT;
```