extern crate dbq;
extern crate postgres;
extern crate serde_json;
use std::error::Error;
use std::result::Result;
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc,
};
use std::thread;
use std::time::Duration;
#[derive(Clone)]
struct HttpClient {}
#[derive(Clone)]
struct StatefulHandler {
run_count: Arc<AtomicUsize>,
http_client: HttpClient,
}
fn main() -> Result<(), Box<Error>> {
let db_conn_params = "postgres://postgres:password@localhost/dbq";
let conn = postgres::Connection::connect(db_conn_params, postgres::TlsMode::None)?;
let schema_config = dbq::SchemaConfig::default();
dbq::run_migrations(&schema_config, &conn, Some(646_271)).unwrap();
let queue = dbq::Queue::new(schema_config, "example_stateful".to_string());
queue.enqueue("job_a", serde_json::Value::Null, 3, &conn)?;
let http_client = HttpClient::new();
let handler = StatefulHandler::new(http_client);
let workers_config =
dbq::WorkerPoolConfig::new(queue, db_conn_params, handler.clone())?;
let workers = dbq::WorkerPool::start(workers_config);
thread::sleep(Duration::new(1, 0));
workers.join();
println!("handler ran {} jobs", handler.run_count());
Ok(())
}
impl StatefulHandler {
fn new(http_client: HttpClient) -> StatefulHandler {
StatefulHandler {
run_count: Arc::new(AtomicUsize::new(0)),
http_client,
}
}
fn increment_run_count(&self) {
self.run_count.fetch_add(1, Ordering::SeqCst);
}
fn run_count(&self) -> usize {
self.run_count.load(Ordering::SeqCst)
}
}
impl dbq::Handler for StatefulHandler {
type Error = std::io::Error;
fn handle(&self, _ctx: dbq::JobContext) -> Result<(), Self::Error> {
self.increment_run_count();
self.http_client.get("https://www.example.com")?;
Ok(())
}
}
impl HttpClient {
fn new() -> HttpClient {
HttpClient {}
}
fn get(&self, _url: &str) -> std::io::Result<&str> {
Ok("")
}
}