Expand description
pgqrs is a postgres-native, library-only durable execution engine.
Built in Rust with Python bindings, pgqrs runs in-process and persists workflow state in your database. It supports PostgreSQL for production and SQLite/Turso for embedded or test setups.
§Key Properties
- Postgres-native execution using SKIP LOCKED and ACID transactions
- Library-only runtime that runs alongside your application
- Multi-backend support: Postgres, SQLite, and Turso
- Exactly-once step execution with durable workflow state
§Quick Start (Queue)
use pgqrs;
use pgqrs::store::Store;
use serde_json::json;
async fn app_workflow(
run: pgqrs::Run,
input: serde_json::Value,
) -> Result<serde_json::Value, pgqrs::Error> {
let files = pgqrs::workflow_step(&run, "list_files", || async {
Ok::<_, pgqrs::Error>(vec![input["path"].as_str().unwrap().to_string()])
})
.await?;
let archive = pgqrs::workflow_step(&run, "create_archive", || async {
Ok::<_, pgqrs::Error>(format!("{}.zip", files[0]))
})
.await?;
Ok(json!({"archive": archive}))
}
let store = pgqrs::connect("postgresql://localhost/mydb").await?;
pgqrs::admin(&store).install().await?;
pgqrs::workflow()
.name(archive_files)
.create(&store)
.await?;
let consumer = pgqrs::consumer("worker-1", 8080, archive_files.name())
.create(&store)
.await?;
let handler = pgqrs::workflow_handler(store.clone(), move |run, input| async move {
app_workflow(run, input).await
});
let handler = { let handler = handler.clone(); move |msg| (handler)(msg) };
pgqrs::workflow()
.name(archive_files)
.trigger(&json!({"path": "/tmp/report.csv"}))?
.execute(&store)
.await?;
pgqrs::dequeue()
.worker(&consumer)
.handle(handler)
.execute(&store)
.await?;Learn more in the documentation: https://pgqrs.vrajat.com
Re-exports§
pub use crate::store::Store;pub use crate::tables::MessageTable;pub use crate::tables::QueueTable;pub use crate::tables::RunRecordTable;pub use crate::tables::StepRecordTable;pub use crate::tables::WorkerTable;pub use crate::tables::WorkflowTable;pub use crate::workers::Admin;pub use crate::workers::Consumer;pub use crate::workers::Producer;pub use crate::workers::Run;pub use crate::workers::Step;pub use crate::workers::Worker;pub use crate::workflow::pause_error;pub use crate::workflow::workflow_handler;pub use crate::workflow::workflow_step;pub use crate::workflow::WorkflowDef;pub use crate::workflow::WorkflowFuture;pub use crate::config::Config;pub use crate::error::Error;pub use crate::error::Result;pub use crate::error::TransientStepError;pub use crate::policy::BackoffStrategy;pub use crate::policy::StepRetryPolicy;pub use crate::policy::WorkflowConfig;pub use crate::stats::QueueMetrics;pub use crate::stats::SystemStats;pub use crate::stats::WorkerHealthStats;pub use crate::stats::WorkerStats;pub use crate::types::NewQueueMessage;pub use crate::types::NewQueueRecord;pub use crate::types::NewRunRecord;pub use crate::types::NewStepRecord;pub use crate::types::NewWorkerRecord;pub use crate::types::NewWorkflowRecord;pub use crate::types::QueueMessage;pub use crate::types::QueueRecord;pub use crate::types::RunRecord;pub use crate::types::StepRecord;pub use crate::types::WorkerRecord;pub use crate::types::WorkerStatus;pub use crate::types::WorkflowRecord;pub use crate::types::WorkflowStatus;pub use crate::validation::ValidationConfig;
Modules§
- builders
- High-level builder APIs for queues and workflows.
- config
- Configuration for pgqrs connections and defaults.
- error
- Error types and fallible results for pgqrs.
- policy
- Retry policies and workflow backoff strategies.
- stats
- Metrics and system statistics.
- store
- Core database abstraction for pgqrs.
- tables
- Repository traits for low-level storage operations.
- types
- Core queue, workflow, and worker data types.
- validation
- Input validation and rate limiting for payloads.
- workers
- Worker, producer, and consumer interfaces.
- workflow
- Workflow helpers and handler utilities.
Structs§
- Rate
Limit Status - Current token bucket status.
Functions§
- admin
- Create a managed admin builder.
- connect
- Connect to a database using a DSN string.
- connect_
with_ config - Connect to a database using a custom configuration.
- consumer
- Create a managed consumer worker.
- dequeue
- Start a dequeue operation.
- enqueue
- Start an enqueue operation.
- producer
- Create a managed producer worker.
- run
- Entry point for creating a run handle from a message.
- step
- Entry point for acquiring a step.
- tables
- Start a tables builder.
- workflow
- Entry point for creating or getting a workflow.
Attribute Macros§
- pgqrs_
step - Attribute macro for defining a workflow step with automatic retry / resume semantics.
- pgqrs_
workflow - Attribute macro for defining a workflow entry point.