dbq 0.1.0

Job queueing and processing library with queues stored in Postgres 9.5+
extern crate dbq;
extern crate postgres;
extern crate serde_json;

use std::error::Error;
use std::result::Result;
use std::sync::{
    atomic::{AtomicUsize, Ordering},
use std::thread;
use std::time::Duration;

// An "HTTP client"
struct HttpClient {}

// Handler that holds an HTTP client that can be used within the jobs that it
// runs and tracks the number of jobs that it runs
struct StatefulHandler {
    run_count: Arc<AtomicUsize>,
    http_client: HttpClient,

fn main() -> Result<(), Box<Error>> {
    let db_conn_params = "postgres://postgres:password@localhost/dbq";
    let conn = postgres::Connection::connect(db_conn_params, postgres::TlsMode::None)?;

    // Schema config allows for changing the database schema and table names
    // Defaults are no schema (default is used) and tables are prefixed with "dbq_"
    let schema_config = dbq::SchemaConfig::default();
    // Run the migrations on start. Migrations are idempotent and should be run
    // on startup
    dbq::run_migrations(&schema_config, &conn, Some(646_271)).unwrap();

    let queue = dbq::Queue::new(schema_config, "example_stateful".to_string());

    // Enqueue a job
    queue.enqueue("job_a", serde_json::Value::Null, 3, &conn)?;
    // Create a dependency and the job handler
    let http_client = HttpClient::new();
    let handler = StatefulHandler::new(http_client);

    // Start a worker pool
    let workers_config =
        dbq::WorkerPoolConfig::new(queue, db_conn_params, handler.clone())?;
    let workers = dbq::WorkerPool::start(workers_config);

    // Give a worker time to find and start the job
    thread::sleep(Duration::new(1, 0));
    // Shutdown the worker pool waiting for all currently executing jobs to finish

    // Print the job count
    println!("handler ran {} jobs", handler.run_count());

impl StatefulHandler {
    fn new(http_client: HttpClient) -> StatefulHandler {
        StatefulHandler {
            run_count: Arc::new(AtomicUsize::new(0)),

    fn increment_run_count(&self) {
        self.run_count.fetch_add(1, Ordering::SeqCst);

    fn run_count(&self) -> usize {

impl dbq::Handler for StatefulHandler {
    type Error = std::io::Error;

    fn handle(&self, _ctx: dbq::JobContext) -> Result<(), Self::Error> {
        // Increment the run count

        // Make an HTTP request


impl HttpClient {
    fn new() -> HttpClient {
        HttpClient {}

    fn get(&self, _url: &str) -> std::io::Result<&str> {