sayiir-persistence 0.1.0-alpha.13

Pluggable persistence backends for Sayiir workflow snapshots
Documentation

sayiir-persistence

Persistence layer for distributed workflow execution with checkpoint/restore capabilities.

Overview

This crate provides traits and implementations for persisting workflow execution state, enabling:

  • Distributed execution: Multiple worker nodes can execute tasks from the same workflow
  • Fault tolerance: Workflows can be resumed after crashes
  • Task claiming: Atomic task claiming prevents duplicate execution
  • Flexible backends: Implement custom storage (Redis, PostgreSQL, etc.)

Quick Start

use sayiir_persistence::{InMemoryBackend, PersistentBackend};
use sayiir_core::snapshot::WorkflowSnapshot;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Create a backend
    let backend = InMemoryBackend::new();
    
    // Save a workflow snapshot
    let snapshot = WorkflowSnapshot::new(
        "instance-123".to_string(),
        "workflow-hash".to_string()
    );
    backend.save_snapshot(snapshot).await?;
    
    // Load it back
    let loaded = backend.load_snapshot("instance-123").await?;
    println!("Loaded workflow: {}", loaded.instance_id);
    
    Ok(())
}

Implementing Custom Backends

To create a custom persistence backend (e.g., for Redis, PostgreSQL, or any other storage):

1. Add Dependencies

[dependencies]
sayiir-persistence = { .. }
sayiir-core = { .. }
async-trait = { .. }

2. Implement PersistentBackend

use sayiir_persistence::{PersistentBackend, BackendError};
use sayiir_core::snapshot::WorkflowSnapshot;
use sayiir_core::task_claim::{TaskClaim, AvailableTask};
use async_trait::async_trait;
use chrono::Duration;

pub struct RedisBackend {
    client: redis::Client,
}

#[async_trait]
impl PersistentBackend for RedisBackend {
    async fn save_snapshot(&self, snapshot: WorkflowSnapshot) -> Result<(), BackendError> {
        // Serialize and save to Redis
        let serialized = serde_json::to_vec(&snapshot)
            .map_err(|e| BackendError::Serialization(e.to_string()))?;
        
        self.client
            .set(&format!("workflow:{}", snapshot.instance_id), serialized)
            .await
            .map_err(|e| BackendError::Backend(e.to_string()))?;
        
        Ok(())
    }
    
    async fn load_snapshot(&self, instance_id: &str) -> Result<WorkflowSnapshot, BackendError> {
        let data: Vec<u8> = self.client
            .get(&format!("workflow:{}", instance_id))
            .await
            .map_err(|_| BackendError::NotFound(instance_id.to_string()))?;
        
        serde_json::from_slice(&data)
            .map_err(|e| BackendError::Serialization(e.to_string()))
    }
    
    // Implement other methods...
    async fn delete_snapshot(&self, instance_id: &str) -> Result<(), BackendError> {
        // ...
    }
    
    async fn list_snapshots(&self) -> Result<Vec<String>, BackendError> {
        // ...
    }
    
    async fn claim_task(
        &self,
        instance_id: &str,
        task_id: &str,
        worker_id: &str,
        ttl: Option<Duration>,
    ) -> Result<Option<TaskClaim>, BackendError> {
        // Use Redis SETNX for atomic claiming
        // ...
    }
    
    async fn release_task_claim(
        &self,
        instance_id: &str,
        task_id: &str,
        worker_id: &str,
    ) -> Result<(), BackendError> {
        // ...
    }
    
    async fn extend_task_claim(
        &self,
        instance_id: &str,
        task_id: &str,
        worker_id: &str,
        additional_duration: Duration,
    ) -> Result<(), BackendError> {
        // ...
    }
    
    async fn find_available_tasks(
        &self,
        worker_id: &str,
        limit: usize,
    ) -> Result<Vec<AvailableTask>, BackendError> {
        // Query for unclaimed, ready-to-execute tasks
        // ...
    }
}

Key Concepts

Snapshots

Snapshots capture the complete execution state of a workflow:

  • Which tasks have completed
  • Task outputs
  • Current execution position
  • Initial workflow input

Task Claims

Task claims enable distributed execution:

  • Atomic claiming: Only one worker can claim a task
  • Expiration: Claims can have TTLs to prevent stuck tasks
  • Extension: Long-running tasks can extend their claims
  • Release: Workers release claims when done

Backend Requirements

A production backend should provide:

  • Durability: Snapshots survive process crashes
  • Atomicity: Task claims must be atomic (use database transactions, Redis SETNX, etc.)
  • Consistency: Multiple workers see consistent state
  • Performance: Efficient querying for available tasks

Built-in Backends

InMemoryBackend

Provided for testing and development:

  • No persistence across restarts
  • Good for unit tests and prototyping

Architecture

┌─────────────────────┐
│  Workflow Runtime   │
└──────────┬──────────┘
           │
           ▼
┌─────────────────────┐
│ PersistentBackend   │ ◄─── Trait you implement
│      (trait)        │
└──────────┬──────────┘
           │
     ┌─────┴─────┬───────────┬──────────────┐
     ▼           ▼           ▼              ▼
┌─────────┐ ┌─────────┐ ┌─────────┐   ┌─────────┐
│ Memory  │ │  Redis  │ │Postgres │   │  Your   │
│ Backend │ │ Backend │ │ Backend │   │ Backend │
└─────────┘ └─────────┘ └─────────┘   └─────────┘