Expand description
§apalis-libsql
Native libSQL storage backend for Apalis background job processing.
Problem: Apalis uses sqlx for SQLite, but that doesn’t work with Turso’s embedded replica model or edge deployment.
Solution: This crate provides a native libSQL driver that enables:
- Local-first development with SQLite
- Embedded replicas that sync with Turso Cloud
- Edge deployment (Cloudflare Workers, Fly.io, etc.)
- Automatic cloud sync without managing connections
§Performance
Benchmarks run on bare metal (not containerized):
| Spec | Value |
|---|---|
| CPU | AMD Ryzen 9 5950X (16-core, 4.6GHz) |
| RAM | 128GB DDR4 |
| Storage | Samsung MZQL2 NVMe (enterprise SSD) |
| OS | Linux (bare metal) |
Results:
Raw write IOPS: ~117K/sec (single INSERTs, worst case)
Batched writes: ~578K/sec (transaction batching, best case)
Read IOPS: ~272K/sec (primary key lookups)
Transaction TPS: ~78K/sec (BEGIN/UPDATE x2/COMMIT)What this means:
- Your mileage will vary based on hardware
- NVMe storage is critical for write performance
- These are local SQLite numbers, not network-bound Turso Cloud
- Containerized/VM performance will be lower
Run your own benchmarks: cargo test --test perf_test --release -- --nocapture
§Architecture
+-----------------+ +------------------+ +-----------------+
| Your App |---->| Local Replica |<----| Turso Cloud |
| | | (SQLite file) | | (Distributed) |
+-----------------+ +------------------+ +-----------------+
| |
v v
+------------------+ +------------------+
| Apalis Workers | | Other Replicas |
| Process Tasks | | Edge Locations |
+------------------+ +------------------+How it works: Your app writes to a local SQLite file. libSQL automatically syncs changes with Turso Cloud, which distributes to other replicas. Works offline, syncs when connected.
§When to Use This vs apalis-sqlite
Use apalis-sqlite:
- Standard SQLite deployment
- Compile-time SQL query validation
- Single-machine applications
- Traditional server environments
Use apalis-libsql:
- Turso Cloud integration needed
- Embedded replica model (local-first)
- Edge deployment (Cloudflare Workers, Fly.io)
- Multi-region applications
- Offline-first requirements
§Quick Start
§Local Development
use apalis_libsql::LibsqlStorage;
use libsql::Builder;
use serde::{Serialize, Deserialize};
use apalis_core::backend::TaskSink;
#[derive(Debug, Clone, Serialize, Deserialize)]
struct Email {
to: String,
subject: String,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Local SQLite database
let db = Builder::new_local("tasks.db").build().await?;
let db: &'static _ = Box::leak(Box::new(db));
let storage = LibsqlStorage::<Email, ()>::new(db);
storage.setup().await?;
// Push tasks
let mut storage = storage;
storage.push(Email {
to: "user@example.com".to_string(),
subject: "Hello!".to_string(),
}).await?;
Ok(())
}§Turso Cloud (Embedded Replica)
use apalis_libsql::LibsqlStorage;
use libsql::Builder;
use serde::{Serialize, Deserialize};
use apalis_core::backend::TaskSink;
#[derive(Debug, Clone, Serialize, Deserialize)]
struct Email {
to: String,
subject: String,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Embedded replica with cloud sync
let db = Builder::new_remote(
"libsql://your-db.turso.io".to_string(),
"your-auth-token".to_string()
).build().await?;
let db: &'static _ = Box::leak(Box::new(db));
let storage = LibsqlStorage::<Email, ()>::new(db);
storage.setup().await?;
// Tasks automatically sync with Turso Cloud
let mut storage = storage;
storage.push(Email {
to: "user@example.com".to_string(),
subject: "Hello from Turso!".to_string(),
}).await?;
Ok(())
}§Task Processing
use apalis_libsql::LibsqlStorage;
use libsql::Builder;
use serde::{Serialize, Deserialize};
use apalis_core::backend::{TaskSink, Backend};
use apalis_core::worker::context::WorkerContext;
use futures::StreamExt;
#[derive(Debug, Clone, Serialize, Deserialize)]
struct Email {
to: String,
subject: String,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let db = Builder::new_local("tasks.db").build().await?;
let db: &'static _ = Box::leak(Box::new(db));
let storage = LibsqlStorage::<Email, ()>::new(db);
storage.setup().await?;
// Push some tasks
let mut storage = storage;
storage.push(Email {
to: "user@example.com".to_string(),
subject: "Hello!".to_string(),
}).await?;
// Create worker context for polling
let worker = WorkerContext::new::<&str>("email-worker");
// Poll for tasks and process them
let mut stream = storage.poll(&worker);
while let Some(task_result) = stream.next().await {
match task_result {
Ok(Some(task)) => {
println!("Processing email to: {}", task.args.to);
// Process the task here
// In production, you'd use proper worker infrastructure
}
Ok(None) => {
// No tasks available, continue polling
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
}
Err(e) => {
eprintln!("Error polling task: {}", e);
}
}
}
Ok(())
}§Configuration
use std::time::Duration;
use apalis_libsql::{LibsqlStorage, Config};
use libsql::Builder;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let db = Builder::new_local("tasks.db").build().await?;
let db: &'static _ = Box::leak(Box::new(db));
let config = Config::new("my_queue")
.set_buffer_size(50) // Tasks per poll (default: 10)
.set_poll_interval(Duration::from_millis(50)) // Poll frequency (default: 100ms)
.set_keep_alive(Duration::from_secs(60)) // Worker heartbeat (default: 30s)
.set_reenqueue_orphaned_after(Duration::from_secs(600)); // Retry dead tasks after 10min
let storage = LibsqlStorage::<(), ()>::new_with_config(db, config);
Ok(())
}Key settings:
buffer_size: Tasks fetched per poll (affects memory usage)poll_interval: How often to check for new tasks (affects latency)keep_alive: Worker heartbeat interval (affects failure detection)reenqueue_orphaned_after: When to retry tasks from crashed workers
§Turso Setup
- Create database:
# Install Turso CLI
curl -sSfL https://get.tur.so/install.sh | bash
# Login
turso auth login
# Create database
turso db create my-tasks-db
# Get database URL
turso db show my-tasks-db --url- Get auth token:
turso db tokens create my-tasks-db- Use in your app:
use apalis_libsql::LibsqlStorage;
use libsql::Builder;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let db = Builder::new_remote(
"libsql://my-tasks-db-your-org.turso.io".to_string(),
"your-auth-token-here".to_string()
).build().await?;
let db: &'static _ = Box::leak(Box::new(db));
let storage = LibsqlStorage::<(), ()>::new(db);
Ok(())
}§Edge Deployment
Works on Cloudflare Workers, Fly.io, and other edge platforms:
// This is a conceptual example - actual Cloudflare Workers integration
// would require the worker crate and proper WASM setup
use apalis_libsql::LibsqlStorage;
use libsql::Builder;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// In a real Cloudflare Worker, you'd get these from environment secrets
let db = Builder::new_remote(
"libsql://your-db.turso.io".to_string(),
"your-auth-token".to_string()
).build().await?;
let db: &'static _ = Box::leak(Box::new(db));
let storage = LibsqlStorage::<(), ()>::new(db);
// Process tasks at the edge
Ok(())
}§Database Schema
The storage creates these tables:
-- Workers table (worker registration and heartbeats)
CREATE TABLE Workers (
id TEXT PRIMARY KEY,
worker_type TEXT NOT NULL,
storage_name TEXT NOT NULL,
layers TEXT,
last_seen INTEGER NOT NULL
);
-- Jobs table (task storage)
CREATE TABLE Jobs (
job BLOB NOT NULL, -- serialized task data
id TEXT PRIMARY KEY, -- task ID (ULID)
job_type TEXT NOT NULL, -- queue name
status TEXT NOT NULL, -- Pending, Running, Done, Failed
attempts INTEGER NOT NULL,
max_attempts INTEGER NOT NULL,
run_at INTEGER NOT NULL, -- scheduled execution time
last_error TEXT, -- error message on failure
lock_at INTEGER, -- when task was locked
lock_by TEXT, -- worker that locked the task
done_at INTEGER, -- completion time
priority INTEGER NOT NULL,
metadata TEXT -- additional JSON metadata
);§Installation
[dependencies]
apalis-libsql = "0.1.0"Feature flags:
tokio-comp(default): Tokio runtime supportasync-std-comp: async-std runtime support
§Testing
# Run all tests
cargo test --all-features
# Run performance benchmarks
cargo test --test perf_test --release -- --nocapture
# Test with Turso (requires env vars)
TURSO_AUTH_TOKEN=xxx TURSO_DATABASE_URL=xxx cargo test --test turso_cloud§License
MIT
Re-exports§
pub use ack::LibsqlAck;pub use ack::LockTaskLayer;pub use ack::LockTaskService;pub use config::Config;pub use fetcher::LibsqlPollFetcher;pub use sink::LibsqlSink;
Modules§
- ack
- Acknowledgment implementation for libSQL backend
- config
- Configuration for the libSQL storage backend Configuration for LibsqlStorage
- fetcher
- Fetcher implementation for polling tasks Fetcher implementation for polling tasks from libSQL database
- row
- Row mapping from database rows to task structs Row mapping for libSQL database rows to TaskRow
- sink
- Sink implementation for pushing tasks Sink implementation for pushing tasks to libSQL database
Structs§
- Libsql
Storage - LibsqlStorage is a storage backend for apalis using libsql as the database.
- SqlContext
- The SQL context used for jobs stored in a SQL database
Enums§
- Libsql
Error - Error type for libSQL storage operations
Functions§
- enable_
wal_ mode - Enable WAL mode for better concurrency
- reenqueue_
orphaned - Re-enqueue orphaned tasks from dead workers
Type Aliases§
- Compact
Type - CompactType is the type used for compact serialization in libsql backend
- Libsql
Task - Type alias for a task stored in libsql backend