Crate apalis_libsql

Crate apalis_libsql 

Source
Expand description

§apalis-libsql

Crates.io Documentation License

Native libSQL storage backend for Apalis background job processing.

Problem: Apalis uses sqlx for SQLite, but that doesn’t work with Turso’s embedded replica model or edge deployment.

Solution: This crate provides a native libSQL driver that enables:

  • Local-first development with SQLite
  • Embedded replicas that sync with Turso Cloud
  • Edge deployment (Cloudflare Workers, Fly.io, etc.)
  • Automatic cloud sync without managing connections

§Performance

Benchmarks run on bare metal (not containerized):

SpecValue
CPUAMD Ryzen 9 5950X (16-core, 4.6GHz)
RAM128GB DDR4
StorageSamsung MZQL2 NVMe (enterprise SSD)
OSLinux (bare metal)

Results:

Raw write IOPS:     ~117K/sec  (single INSERTs, worst case)
Batched writes:     ~578K/sec  (transaction batching, best case)
Read IOPS:          ~272K/sec  (primary key lookups)
Transaction TPS:    ~78K/sec   (BEGIN/UPDATE x2/COMMIT)

What this means:

  • Your mileage will vary based on hardware
  • NVMe storage is critical for write performance
  • These are local SQLite numbers, not network-bound Turso Cloud
  • Containerized/VM performance will be lower

Run your own benchmarks: cargo test --test perf_test --release -- --nocapture

§Architecture

+-----------------+     +------------------+     +-----------------+
|   Your App      |---->|  Local Replica   |<----|  Turso Cloud    |
|                 |     |  (SQLite file)   |     |  (Distributed)  |
+-----------------+     +------------------+     +-----------------+
                               |                         |
                               v                         v
                        +------------------+     +------------------+
                        |  Apalis Workers  |     |  Other Replicas  |
                        |  Process Tasks   |     |  Edge Locations  |
                        +------------------+     +------------------+

How it works: Your app writes to a local SQLite file. libSQL automatically syncs changes with Turso Cloud, which distributes to other replicas. Works offline, syncs when connected.

§When to Use This vs apalis-sqlite

Use apalis-sqlite:

  • Standard SQLite deployment
  • Compile-time SQL query validation
  • Single-machine applications
  • Traditional server environments

Use apalis-libsql:

  • Turso Cloud integration needed
  • Embedded replica model (local-first)
  • Edge deployment (Cloudflare Workers, Fly.io)
  • Multi-region applications
  • Offline-first requirements

§Quick Start

§Local Development

use apalis_libsql::LibsqlStorage;
use libsql::Builder;
use serde::{Serialize, Deserialize};
use apalis_core::backend::TaskSink;

#[derive(Debug, Clone, Serialize, Deserialize)]
struct Email {
    to: String,
    subject: String,
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Local SQLite database
    let db = Builder::new_local("tasks.db").build().await?;
    let db: &'static _ = Box::leak(Box::new(db));
    
    let storage = LibsqlStorage::<Email, ()>::new(db);
    storage.setup().await?;
    
    // Push tasks
    let mut storage = storage;
    storage.push(Email {
        to: "user@example.com".to_string(),
        subject: "Hello!".to_string(),
    }).await?;
    
    Ok(())
}

§Turso Cloud (Embedded Replica)

use apalis_libsql::LibsqlStorage;
use libsql::Builder;
use serde::{Serialize, Deserialize};
use apalis_core::backend::TaskSink;

#[derive(Debug, Clone, Serialize, Deserialize)]
struct Email {
    to: String,
    subject: String,
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Embedded replica with cloud sync
    let db = Builder::new_remote(
        "libsql://your-db.turso.io".to_string(),
        "your-auth-token".to_string()
    ).build().await?;
    
    let db: &'static _ = Box::leak(Box::new(db));
    let storage = LibsqlStorage::<Email, ()>::new(db);
    storage.setup().await?;
    
    // Tasks automatically sync with Turso Cloud
    let mut storage = storage;
    storage.push(Email {
        to: "user@example.com".to_string(),
        subject: "Hello from Turso!".to_string(),
    }).await?;
    
    Ok(())
}

§Task Processing

use apalis_libsql::LibsqlStorage;
use libsql::Builder;
use serde::{Serialize, Deserialize};
use apalis_core::backend::{TaskSink, Backend};
use apalis_core::worker::context::WorkerContext;
use futures::StreamExt;

#[derive(Debug, Clone, Serialize, Deserialize)]
struct Email {
    to: String,
    subject: String,
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let db = Builder::new_local("tasks.db").build().await?;
    let db: &'static _ = Box::leak(Box::new(db));
    
    let storage = LibsqlStorage::<Email, ()>::new(db);
    storage.setup().await?;
    
    // Push some tasks
    let mut storage = storage;
    storage.push(Email {
        to: "user@example.com".to_string(),
        subject: "Hello!".to_string(),
    }).await?;
    
    // Create worker context for polling
    let worker = WorkerContext::new::<&str>("email-worker");
    
    // Poll for tasks and process them
    let mut stream = storage.poll(&worker);
    while let Some(task_result) = stream.next().await {
        match task_result {
            Ok(Some(task)) => {
                println!("Processing email to: {}", task.args.to);
                // Process the task here
                // In production, you'd use proper worker infrastructure
            }
            Ok(None) => {
                // No tasks available, continue polling
                tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
            }
            Err(e) => {
                eprintln!("Error polling task: {}", e);
            }
        }
    }
    
    Ok(())
}

§Configuration

use std::time::Duration;
use apalis_libsql::{LibsqlStorage, Config};
use libsql::Builder;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let db = Builder::new_local("tasks.db").build().await?;
    let db: &'static _ = Box::leak(Box::new(db));
    
    let config = Config::new("my_queue")
        .set_buffer_size(50)              // Tasks per poll (default: 10)
        .set_poll_interval(Duration::from_millis(50))  // Poll frequency (default: 100ms)
        .set_keep_alive(Duration::from_secs(60))       // Worker heartbeat (default: 30s)
        .set_reenqueue_orphaned_after(Duration::from_secs(600)); // Retry dead tasks after 10min
    
    let storage = LibsqlStorage::<(), ()>::new_with_config(db, config);
    
    Ok(())
}

Key settings:

  • buffer_size: Tasks fetched per poll (affects memory usage)
  • poll_interval: How often to check for new tasks (affects latency)
  • keep_alive: Worker heartbeat interval (affects failure detection)
  • reenqueue_orphaned_after: When to retry tasks from crashed workers

§Turso Setup

  1. Create database:
# Install Turso CLI
curl -sSfL https://get.tur.so/install.sh | bash

# Login
turso auth login

# Create database
turso db create my-tasks-db

# Get database URL
turso db show my-tasks-db --url
  1. Get auth token:
turso db tokens create my-tasks-db
  1. Use in your app:
use apalis_libsql::LibsqlStorage;
use libsql::Builder;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let db = Builder::new_remote(
        "libsql://my-tasks-db-your-org.turso.io".to_string(),
        "your-auth-token-here".to_string()
    ).build().await?;
    
    let db: &'static _ = Box::leak(Box::new(db));
    let storage = LibsqlStorage::<(), ()>::new(db);
    
    Ok(())
}

§Edge Deployment

Works on Cloudflare Workers, Fly.io, and other edge platforms:

// This is a conceptual example - actual Cloudflare Workers integration
// would require the worker crate and proper WASM setup
use apalis_libsql::LibsqlStorage;
use libsql::Builder;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // In a real Cloudflare Worker, you'd get these from environment secrets
    let db = Builder::new_remote(
        "libsql://your-db.turso.io".to_string(),
        "your-auth-token".to_string()
    ).build().await?;
    
    let db: &'static _ = Box::leak(Box::new(db));
    let storage = LibsqlStorage::<(), ()>::new(db);
    
    // Process tasks at the edge
    Ok(())
}

§Database Schema

The storage creates these tables:

-- Workers table (worker registration and heartbeats)
CREATE TABLE Workers (
    id TEXT PRIMARY KEY,
    worker_type TEXT NOT NULL,
    storage_name TEXT NOT NULL,
    layers TEXT,
    last_seen INTEGER NOT NULL
);

-- Jobs table (task storage)
CREATE TABLE Jobs (
    job BLOB NOT NULL,              -- serialized task data
    id TEXT PRIMARY KEY,            -- task ID (ULID)
    job_type TEXT NOT NULL,         -- queue name
    status TEXT NOT NULL,           -- Pending, Running, Done, Failed
    attempts INTEGER NOT NULL,
    max_attempts INTEGER NOT NULL,
    run_at INTEGER NOT NULL,        -- scheduled execution time
    last_error TEXT,                -- error message on failure
    lock_at INTEGER,                -- when task was locked
    lock_by TEXT,                   -- worker that locked the task
    done_at INTEGER,                -- completion time
    priority INTEGER NOT NULL,
    metadata TEXT                   -- additional JSON metadata
);

§Installation

[dependencies]
apalis-libsql = "0.1.0"

Feature flags:

  • tokio-comp (default): Tokio runtime support
  • async-std-comp: async-std runtime support

§Testing

# Run all tests
cargo test --all-features

# Run performance benchmarks
cargo test --test perf_test --release -- --nocapture

# Test with Turso (requires env vars)
TURSO_AUTH_TOKEN=xxx TURSO_DATABASE_URL=xxx cargo test --test turso_cloud

§License

MIT

Re-exports§

pub use ack::LibsqlAck;
pub use ack::LockTaskLayer;
pub use ack::LockTaskService;
pub use config::Config;
pub use fetcher::LibsqlPollFetcher;
pub use sink::LibsqlSink;

Modules§

ack
Acknowledgment implementation for libSQL backend
config
Configuration for the libSQL storage backend Configuration for LibsqlStorage
fetcher
Fetcher implementation for polling tasks Fetcher implementation for polling tasks from libSQL database
row
Row mapping from database rows to task structs Row mapping for libSQL database rows to TaskRow
sink
Sink implementation for pushing tasks Sink implementation for pushing tasks to libSQL database

Structs§

LibsqlStorage
LibsqlStorage is a storage backend for apalis using libsql as the database.
SqlContext
The SQL context used for jobs stored in a SQL database

Enums§

LibsqlError
Error type for libSQL storage operations

Functions§

enable_wal_mode
Enable WAL mode for better concurrency
reenqueue_orphaned
Re-enqueue orphaned tasks from dead workers

Type Aliases§

CompactType
CompactType is the type used for compact serialization in libsql backend
LibsqlTask
Type alias for a task stored in libsql backend