Skip to main content

Crate pgqrs

Crate pgqrs 

Source
Expand description

pgqrs is a postgres-native, library-only durable execution engine.

Built in Rust with Python bindings, pgqrs runs in-process and persists workflow state in your database. It supports PostgreSQL for production and SQLite/Turso for embedded or test setups.

§Key Properties

  • Postgres-native execution using SKIP LOCKED and ACID transactions
  • Library-only runtime that runs alongside your application
  • Multi-backend support: Postgres, SQLite, and Turso
  • Exactly-once step execution with durable workflow state

§Quick Start (Queue)

use pgqrs;
use pgqrs::store::Store;
use serde_json::json;

async fn app_workflow(
    run: pgqrs::Run,
    input: serde_json::Value,
) -> Result<serde_json::Value, pgqrs::Error> {
    let files = pgqrs::workflow_step(&run, "list_files", || async {
        Ok::<_, pgqrs::Error>(vec![input["path"].as_str().unwrap().to_string()])
    })
    .await?;

    let archive = pgqrs::workflow_step(&run, "create_archive", || async {
        Ok::<_, pgqrs::Error>(format!("{}.zip", files[0]))
    })
    .await?;

    Ok(json!({"archive": archive}))
}

let store = pgqrs::connect("postgresql://localhost/mydb").await?;
pgqrs::admin(&store).install().await?;

pgqrs::workflow()
    .name(archive_files)
    .create(&store)
    .await?;

let consumer = pgqrs::consumer("worker-1", 8080, archive_files.name())
    .create(&store)
    .await?;

let handler = pgqrs::workflow_handler(store.clone(), move |run, input| async move {
    app_workflow(run, input).await
});
let handler = { let handler = handler.clone(); move |msg| (handler)(msg) };

pgqrs::workflow()
    .name(archive_files)
    .trigger(&json!({"path": "/tmp/report.csv"}))?
    .execute(&store)
    .await?;

pgqrs::dequeue()
    .worker(&consumer)
    .handle(handler)
    .execute(&store)
    .await?;

Learn more in the documentation: https://pgqrs.vrajat.com

Re-exports§

pub use crate::store::Store;
pub use crate::tables::MessageTable;
pub use crate::tables::QueueTable;
pub use crate::tables::RunRecordTable;
pub use crate::tables::StepRecordTable;
pub use crate::tables::WorkerTable;
pub use crate::tables::WorkflowTable;
pub use crate::workers::Admin;
pub use crate::workers::Consumer;
pub use crate::workers::Producer;
pub use crate::workers::Run;
pub use crate::workers::Step;
pub use crate::workers::Worker;
pub use crate::workflow::pause_error;
pub use crate::workflow::workflow_handler;
pub use crate::workflow::workflow_step;
pub use crate::workflow::WorkflowDef;
pub use crate::workflow::WorkflowFuture;
pub use crate::config::Config;
pub use crate::error::Error;
pub use crate::error::Result;
pub use crate::error::TransientStepError;
pub use crate::policy::BackoffStrategy;
pub use crate::policy::StepRetryPolicy;
pub use crate::policy::WorkflowConfig;
pub use crate::stats::QueueMetrics;
pub use crate::stats::SystemStats;
pub use crate::stats::WorkerHealthStats;
pub use crate::stats::WorkerStats;
pub use crate::types::NewQueueMessage;
pub use crate::types::NewQueueRecord;
pub use crate::types::NewRunRecord;
pub use crate::types::NewStepRecord;
pub use crate::types::NewWorkerRecord;
pub use crate::types::NewWorkflowRecord;
pub use crate::types::QueueMessage;
pub use crate::types::QueueRecord;
pub use crate::types::RunRecord;
pub use crate::types::StepRecord;
pub use crate::types::WorkerRecord;
pub use crate::types::WorkerStatus;
pub use crate::types::WorkflowRecord;
pub use crate::types::WorkflowStatus;
pub use crate::validation::ValidationConfig;

Modules§

builders
High-level builder APIs for queues and workflows.
config
Configuration for pgqrs connections and defaults.
error
Error types and fallible results for pgqrs.
policy
Retry policies and workflow backoff strategies.
stats
Metrics and system statistics.
store
Core database abstraction for pgqrs.
tables
Repository traits for low-level storage operations.
types
Core queue, workflow, and worker data types.
validation
Input validation and rate limiting for payloads.
workers
Worker, producer, and consumer interfaces.
workflow
Workflow helpers and handler utilities.

Structs§

RateLimitStatus
Current token bucket status.

Functions§

admin
Create a managed admin builder.
connect
Connect to a database using a DSN string.
connect_with_config
Connect to a database using a custom configuration.
consumer
Create a managed consumer worker.
dequeue
Start a dequeue operation.
enqueue
Start an enqueue operation.
producer
Create a managed producer worker.
run
Entry point for creating a run handle from a message.
step
Entry point for acquiring a step.
tables
Start a tables builder.
workflow
Entry point for creating or getting a workflow.

Attribute Macros§

pgqrs_step
Attribute macro for defining a workflow step with automatic retry / resume semantics.
pgqrs_workflow
Attribute macro for defining a workflow entry point.