use rusqlite::{Connection, params};
use serde::{Deserialize, Serialize};
use thiserror::Error;
#[derive(Debug, Error)]
pub enum Error {
#[error("sqlite error: {0}")]
Sqlite(#[from] rusqlite::Error),
#[error("json error: {0}")]
Json(#[from] serde_json::Error),
#[error("core error: {0}")]
Core(String),
}
pub type Result<T> = std::result::Result<T, Error>;
pub struct Database {
conn: Connection,
}
impl Database {
pub fn open<P: AsRef<std::path::Path>>(path: P) -> Result<Self> {
let path_str = path.as_ref().to_string_lossy().into_owned();
let conn = honker_core::open_conn(&path_str, true)
.map_err(|e| Error::Core(e.to_string()))?;
honker_core::attach_honker_functions(&conn)?;
honker_core::bootstrap_honker_schema(&conn)
.map_err(|e| Error::Core(e.to_string()))?;
Ok(Self { conn })
}
pub fn queue(&self, name: &str, opts: QueueOpts) -> Queue<'_> {
Queue {
db: self,
name: name.to_string(),
opts,
}
}
pub fn notify<P: Serialize>(&self, channel: &str, payload: &P) -> Result<i64> {
let json = serde_json::to_string(payload)?;
let id = self.conn.query_row(
"SELECT notify(?1, ?2)",
params![channel, json],
|r| r.get(0),
)?;
Ok(id)
}
pub fn conn(&self) -> &Connection {
&self.conn
}
}
#[derive(Debug, Clone)]
pub struct QueueOpts {
pub visibility_timeout_s: i64,
pub max_attempts: i64,
}
impl Default for QueueOpts {
fn default() -> Self {
Self {
visibility_timeout_s: 300,
max_attempts: 3,
}
}
}
pub struct Queue<'a> {
db: &'a Database,
name: String,
opts: QueueOpts,
}
#[derive(Debug, Clone, Default)]
pub struct EnqueueOpts {
pub delay: Option<i64>,
pub run_at: Option<i64>,
pub priority: i64,
pub expires: Option<i64>,
}
impl<'a> Queue<'a> {
pub fn enqueue<P: Serialize>(&self, payload: &P, opts: EnqueueOpts) -> Result<i64> {
let json = serde_json::to_string(payload)?;
let id = self.db.conn.query_row(
"SELECT honker_enqueue(?1, ?2, ?3, ?4, ?5, ?6, ?7)",
params![
self.name,
json,
opts.run_at,
opts.delay,
opts.priority,
self.opts.max_attempts,
opts.expires
],
|r| r.get(0),
)?;
Ok(id)
}
pub fn claim_batch(&self, worker_id: &str, n: i64) -> Result<Vec<Job<'a>>> {
let rows_json: String = self.db.conn.query_row(
"SELECT honker_claim_batch(?1, ?2, ?3, ?4)",
params![self.name, worker_id, n, self.opts.visibility_timeout_s],
|r| r.get(0),
)?;
let raw: Vec<RawJob> = serde_json::from_str(&rows_json)?;
Ok(raw
.into_iter()
.map(|r| Job {
db: self.db,
id: r.id,
queue: r.queue,
payload: r.payload.into_bytes(),
worker_id: r.worker_id,
attempts: r.attempts,
})
.collect())
}
pub fn claim_one(&self, worker_id: &str) -> Result<Option<Job<'a>>> {
Ok(self.claim_batch(worker_id, 1)?.into_iter().next())
}
}
#[derive(Deserialize)]
struct RawJob {
id: i64,
queue: String,
payload: String,
worker_id: String,
attempts: i64,
#[serde(rename = "claim_expires_at")]
#[allow(dead_code)]
claim_expires_at: i64,
}
pub struct Job<'a> {
db: &'a Database,
pub id: i64,
pub queue: String,
pub payload: Vec<u8>,
pub worker_id: String,
pub attempts: i64,
}
impl<'a> Job<'a> {
pub fn payload_as<T: for<'de> serde::Deserialize<'de>>(&self) -> Result<T> {
Ok(serde_json::from_slice(&self.payload)?)
}
pub fn ack(&self) -> Result<bool> {
let n: i64 = self.db.conn.query_row(
"SELECT honker_ack(?1, ?2)",
params![self.id, self.worker_id],
|r| r.get(0),
)?;
Ok(n > 0)
}
pub fn retry(&self, delay_s: i64, error: &str) -> Result<bool> {
let n: i64 = self.db.conn.query_row(
"SELECT honker_retry(?1, ?2, ?3, ?4)",
params![self.id, self.worker_id, delay_s, error],
|r| r.get(0),
)?;
Ok(n > 0)
}
pub fn fail(&self, error: &str) -> Result<bool> {
let n: i64 = self.db.conn.query_row(
"SELECT honker_fail(?1, ?2, ?3)",
params![self.id, self.worker_id, error],
|r| r.get(0),
)?;
Ok(n > 0)
}
pub fn heartbeat(&self, extend_s: i64) -> Result<bool> {
let n: i64 = self.db.conn.query_row(
"SELECT honker_heartbeat(?1, ?2, ?3)",
params![self.id, self.worker_id, extend_s],
|r| r.get(0),
)?;
Ok(n > 0)
}
}