use std::path::Path;
use std::sync::Arc;
use std::sync::mpsc::{Receiver, RecvTimeoutError};
use std::time::Duration;
use parking_lot::{Mutex, MutexGuard};
use rusqlite::{Connection, params};
use serde::{Deserialize, Serialize, de::DeserializeOwned};
use thiserror::Error;
use honker_core::{SharedUpdateWatcher, WatcherConfig};
#[derive(Debug, Error)]
pub enum Error {
#[error("sqlite error: {0}")]
Sqlite(#[from] rusqlite::Error),
#[error("json error: {0}")]
Json(#[from] serde_json::Error),
#[error("core error: {0}")]
Core(String),
#[error("update channel closed")]
UpdateClosed,
}
pub type Result<T> = std::result::Result<T, Error>;
#[derive(Debug, Clone, Default)]
pub struct OpenOptions {
watcher_config: WatcherConfig,
}
impl OpenOptions {
pub fn watcher_backend(mut self, backend: impl AsRef<str>) -> Result<Self> {
let backend =
honker_core::WatcherBackend::parse(Some(backend.as_ref())).map_err(Error::Core)?;
self.watcher_config.backend = backend;
Ok(self)
}
pub fn watcher_poll_interval(mut self, interval: Duration) -> Result<Self> {
self.watcher_config = self
.watcher_config
.with_poll_interval(interval)
.map_err(Error::Core)?;
Ok(self)
}
}
struct Inner {
conn: Mutex<Connection>,
updates: SharedUpdateWatcher,
}
impl Inner {
fn with_conn<F, R>(&self, f: F) -> R
where
F: FnOnce(&Connection) -> R,
{
let guard = self.conn.lock();
f(&guard)
}
}
#[derive(Clone)]
pub struct Database {
inner: Arc<Inner>,
}
impl Database {
pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
Self::open_with_options(path, OpenOptions::default())
}
pub fn open_with_options<P: AsRef<Path>>(path: P, options: OpenOptions) -> Result<Self> {
let path_ref = path.as_ref();
let path_str = path_ref.to_string_lossy().into_owned();
let conn =
honker_core::open_conn(&path_str, true).map_err(|e| Error::Core(e.to_string()))?;
honker_core::attach_honker_functions(&conn)?;
honker_core::bootstrap_honker_schema(&conn).map_err(|e| Error::Core(e.to_string()))?;
options
.watcher_config
.backend
.probe(path_ref)
.map_err(|e| Error::Core(format!("watcher_backend probe failed: {e}")))?;
let updates =
SharedUpdateWatcher::new_with_config(path_ref.to_path_buf(), options.watcher_config);
Ok(Self {
inner: Arc::new(Inner {
conn: Mutex::new(conn),
updates,
}),
})
}
pub fn queue(&self, name: &str, opts: QueueOpts) -> Queue {
Queue {
inner: self.inner.clone(),
name: name.to_string(),
opts,
}
}
pub fn outbox(&self, name: &str, opts: OutboxOpts) -> Outbox {
Outbox::new(self, name, opts)
}
pub fn stream(&self, name: &str) -> Stream {
Stream {
inner: self.inner.clone(),
name: name.to_string(),
}
}
pub fn scheduler(&self) -> Scheduler {
Scheduler {
inner: self.inner.clone(),
}
}
pub fn notify<P: Serialize>(&self, channel: &str, payload: &P) -> Result<i64> {
let json = serde_json::to_string(payload)?;
Ok(self.inner.with_conn(|c| {
c.query_row("SELECT notify(?1, ?2)", params![channel, json], |r| {
r.get::<_, i64>(0)
})
})?)
}
pub fn notify_tx<P: Serialize>(
&self,
tx: &Transaction<'_>,
channel: &str,
payload: &P,
) -> Result<i64> {
let json = serde_json::to_string(payload)?;
tx.query_row("SELECT notify(?1, ?2)", params![channel, json], |r| {
r.get::<_, i64>(0)
})
}
pub fn listen(&self, channel: &str) -> Result<Subscription> {
let last_id: i64 = self.inner.with_conn(|c| {
c.query_row(
"SELECT COALESCE(MAX(id), 0) FROM _honker_notifications",
[],
|r| r.get(0),
)
})?;
let (sub_id, rx) = self.inner.updates.subscribe();
Ok(Subscription {
inner: self.inner.clone(),
sub_id,
rx,
channel: channel.to_string(),
last_id,
pending: std::collections::VecDeque::new(),
closed: false,
})
}
pub fn update_events(&self) -> UpdateEvents {
let (sub_id, rx) = self.inner.updates.subscribe();
UpdateEvents {
inner: self.inner.clone(),
sub_id,
rx,
}
}
pub fn transaction(&self) -> Result<Transaction<'_>> {
let guard = self.inner.conn.lock();
guard.execute("BEGIN IMMEDIATE", [])?;
Ok(Transaction { guard, done: false })
}
pub fn try_lock(&self, name: &str, owner: &str, ttl_s: i64) -> Result<Option<Lock>> {
let acquired: i64 = self.inner.with_conn(|c| {
c.query_row(
"SELECT honker_lock_acquire(?1, ?2, ?3)",
params![name, owner, ttl_s],
|r| r.get(0),
)
})?;
if acquired == 1 {
Ok(Some(Lock {
inner: self.inner.clone(),
name: name.to_string(),
owner: owner.to_string(),
released: false,
}))
} else {
Ok(None)
}
}
pub fn try_rate_limit(&self, name: &str, limit: i64, per: i64) -> Result<bool> {
let ok: i64 = self.inner.with_conn(|c| {
c.query_row(
"SELECT honker_rate_limit_try(?1, ?2, ?3)",
params![name, limit, per],
|r| r.get(0),
)
})?;
Ok(ok == 1)
}
pub fn save_result(&self, job_id: i64, value: &str, ttl_s: i64) -> Result<()> {
self.inner.with_conn(|c| {
c.query_row(
"SELECT honker_result_save(?1, ?2, ?3)",
params![job_id, value, ttl_s],
|_| Ok(()),
)
})?;
Ok(())
}
pub fn get_result(&self, job_id: i64) -> Result<Option<String>> {
Ok(self.inner.with_conn(|c| {
c.query_row("SELECT honker_result_get(?1)", params![job_id], |r| {
r.get::<_, Option<String>>(0)
})
})?)
}
pub fn sweep_results(&self) -> Result<i64> {
Ok(self.inner.with_conn(|c| {
c.query_row("SELECT honker_result_sweep()", [], |r| r.get::<_, i64>(0))
})?)
}
pub fn prune_notifications(&self, older_than_s: i64) -> Result<i64> {
let n = self.inner.with_conn(|c| {
c.execute(
"DELETE FROM _honker_notifications
WHERE created_at < unixepoch() - ?1",
params![older_than_s],
)
})?;
Ok(n as i64)
}
pub fn prune_notifications_keep_latest(&self, max_keep: i64) -> Result<i64> {
let n = self.inner.with_conn(|c| {
c.execute(
"DELETE FROM _honker_notifications
WHERE id < (
SELECT COALESCE(MIN(id), 0) FROM (
SELECT id FROM _honker_notifications
ORDER BY id DESC LIMIT ?1
)
)",
params![max_keep],
)
})?;
Ok(n as i64)
}
pub fn with_conn<F, R>(&self, f: F) -> R
where
F: FnOnce(&Connection) -> R,
{
self.inner.with_conn(f)
}
}
pub struct Transaction<'db> {
guard: MutexGuard<'db, Connection>,
done: bool,
}
impl<'db> Transaction<'db> {
pub fn execute<P: rusqlite::Params>(&self, sql: &str, params: P) -> Result<usize> {
Ok(self.guard.execute(sql, params)?)
}
pub fn query_row<T, P, F>(&self, sql: &str, params: P, f: F) -> Result<T>
where
P: rusqlite::Params,
F: FnOnce(&rusqlite::Row<'_>) -> rusqlite::Result<T>,
{
Ok(self.guard.query_row(sql, params, f)?)
}
pub fn conn(&self) -> &Connection {
&self.guard
}
pub fn commit(mut self) -> Result<()> {
self.guard.execute("COMMIT", [])?;
self.done = true;
Ok(())
}
pub fn rollback(mut self) -> Result<()> {
self.guard.execute("ROLLBACK", [])?;
self.done = true;
Ok(())
}
}
impl<'db> Drop for Transaction<'db> {
fn drop(&mut self) {
if !self.done {
let _ = self.guard.execute("ROLLBACK", []);
}
}
}
#[derive(Debug, Clone)]
pub struct QueueOpts {
pub visibility_timeout_s: i64,
pub max_attempts: i64,
}
impl Default for QueueOpts {
fn default() -> Self {
Self {
visibility_timeout_s: 300,
max_attempts: 3,
}
}
}
#[derive(Debug, Clone, Default)]
pub struct EnqueueOpts {
pub delay: Option<i64>,
pub run_at: Option<i64>,
pub priority: i64,
pub expires: Option<i64>,
}
#[derive(Clone)]
pub struct Queue {
inner: Arc<Inner>,
name: String,
opts: QueueOpts,
}
#[derive(Debug, Clone)]
pub struct OutboxOpts {
pub visibility_timeout_s: i64,
pub max_attempts: i64,
pub base_backoff_s: i64,
}
impl Default for OutboxOpts {
fn default() -> Self {
Self {
visibility_timeout_s: 60,
max_attempts: 5,
base_backoff_s: 5,
}
}
}
#[derive(Clone)]
pub struct Outbox {
name: String,
queue: Queue,
base_backoff_s: i64,
}
impl Outbox {
pub fn new(db: &Database, name: &str, opts: OutboxOpts) -> Self {
let queue = db.queue(
&format!("_outbox:{name}"),
QueueOpts {
visibility_timeout_s: opts.visibility_timeout_s,
max_attempts: opts.max_attempts,
},
);
Self {
name: name.to_string(),
queue,
base_backoff_s: opts.base_backoff_s,
}
}
pub fn name(&self) -> &str {
&self.name
}
pub fn queue(&self) -> &Queue {
&self.queue
}
pub fn enqueue<P: Serialize>(&self, payload: &P, opts: EnqueueOpts) -> Result<i64> {
self.queue.enqueue(payload, opts)
}
pub fn enqueue_tx<P: Serialize>(
&self,
tx: &Transaction<'_>,
payload: &P,
opts: EnqueueOpts,
) -> Result<i64> {
self.queue.enqueue_tx(tx, payload, opts)
}
pub fn run_once<F, E>(&self, worker_id: &str, mut delivery: F) -> Result<bool>
where
F: FnMut(serde_json::Value) -> std::result::Result<(), E>,
E: std::fmt::Display,
{
let Some(job) = self.queue.claim_one(worker_id)? else {
return Ok(false);
};
let payload: serde_json::Value = serde_json::from_slice(&job.payload)?;
match delivery(payload) {
Ok(()) => {
if !job.ack()? {
return Err(Error::Core(format!("outbox ack failed for job {}", job.id)));
}
}
Err(err) => {
if !job.retry(self.retry_delay(job.attempts), &err.to_string())? {
return Err(Error::Core(format!(
"outbox retry failed for job {}",
job.id
)));
}
}
}
Ok(true)
}
fn retry_delay(&self, attempts: i64) -> i64 {
if self.base_backoff_s <= 0 {
return 0;
}
self.base_backoff_s
.saturating_mul(2_i64.saturating_pow(attempts.saturating_sub(1) as u32))
}
}
impl Queue {
pub fn name(&self) -> &str {
&self.name
}
pub fn enqueue<P: Serialize>(&self, payload: &P, opts: EnqueueOpts) -> Result<i64> {
let json = serde_json::to_string(payload)?;
Ok(self
.inner
.with_conn(|c| enqueue_on(c, &self.name, &json, self.opts.max_attempts, &opts))?)
}
pub fn enqueue_tx<P: Serialize>(
&self,
tx: &Transaction<'_>,
payload: &P,
opts: EnqueueOpts,
) -> Result<i64> {
let json = serde_json::to_string(payload)?;
Ok(enqueue_on(
tx.conn(),
&self.name,
&json,
self.opts.max_attempts,
&opts,
)?)
}
pub fn claim_batch(&self, worker_id: &str, n: i64) -> Result<Vec<Job>> {
let rows_json: String = self.inner.with_conn(|c| {
c.query_row(
"SELECT honker_claim_batch(?1, ?2, ?3, ?4)",
params![self.name, worker_id, n, self.opts.visibility_timeout_s],
|r| r.get(0),
)
})?;
let raw: Vec<RawJob> = serde_json::from_str(&rows_json)?;
Ok(raw
.into_iter()
.map(|r| Job {
inner: self.inner.clone(),
id: r.id,
queue: r.queue,
payload: r.payload.into_bytes(),
worker_id: r.worker_id,
attempts: r.attempts,
})
.collect())
}
pub fn claim_one(&self, worker_id: &str) -> Result<Option<Job>> {
Ok(self.claim_batch(worker_id, 1)?.into_iter().next())
}
pub fn ack_batch(&self, ids: &[i64], worker_id: &str) -> Result<i64> {
let ids_json = serde_json::to_string(ids)?;
Ok(self.inner.with_conn(|c| {
c.query_row(
"SELECT honker_ack_batch(?1, ?2)",
params![ids_json, worker_id],
|r| r.get::<_, i64>(0),
)
})?)
}
pub fn cancel(&self, job_id: i64) -> Result<bool> {
Ok(self.inner.with_conn(|c| {
c.query_row(
"SELECT honker_cancel(?1)",
params![job_id],
|r| r.get::<_, i64>(0),
)
})? > 0)
}
pub fn get_job(&self, job_id: i64) -> Result<Option<JobRow>> {
let json: String = self.inner.with_conn(|c| {
c.query_row("SELECT honker_get_job(?1)", params![job_id], |r| r.get(0))
})?;
if json.is_empty() {
return Ok(None);
}
Ok(Some(serde_json::from_str(&json)?))
}
pub fn sweep_expired(&self) -> Result<i64> {
Ok(self.inner.with_conn(|c| {
c.query_row("SELECT honker_sweep_expired(?1)", params![self.name], |r| {
r.get::<_, i64>(0)
})
})?)
}
pub fn claim_waker(&self) -> ClaimWaker {
let (sub_id, rx) = self.inner.updates.subscribe();
ClaimWaker {
inner: self.inner.clone(),
name: self.name.clone(),
visibility_timeout_s: self.opts.visibility_timeout_s,
sub_id,
rx,
}
}
}
fn enqueue_on(
conn: &Connection,
queue: &str,
payload_json: &str,
max_attempts: i64,
opts: &EnqueueOpts,
) -> rusqlite::Result<i64> {
conn.query_row(
"SELECT honker_enqueue(?1, ?2, ?3, ?4, ?5, ?6, ?7)",
params![
queue,
payload_json,
opts.run_at,
opts.delay,
opts.priority,
max_attempts,
opts.expires
],
|r| r.get::<_, i64>(0),
)
}
#[derive(Deserialize)]
struct RawJob {
id: i64,
queue: String,
payload: String,
worker_id: String,
attempts: i64,
#[serde(rename = "claim_expires_at")]
#[allow(dead_code)]
claim_expires_at: i64,
}
#[derive(Debug, Deserialize, Clone)]
pub struct JobRow {
pub id: i64,
pub queue: String,
pub payload: String,
pub state: String,
pub priority: i64,
pub run_at: i64,
pub worker_id: Option<String>,
pub claim_expires_at: Option<i64>,
pub attempts: i64,
pub max_attempts: i64,
pub created_at: i64,
pub expires_at: Option<i64>,
}
pub struct Job {
inner: Arc<Inner>,
pub id: i64,
pub queue: String,
pub payload: Vec<u8>,
pub worker_id: String,
pub attempts: i64,
}
impl Job {
pub fn payload_as<T: DeserializeOwned>(&self) -> Result<T> {
Ok(serde_json::from_slice(&self.payload)?)
}
pub fn ack(&self) -> Result<bool> {
let n: i64 = self.inner.with_conn(|c| {
c.query_row(
"SELECT honker_ack(?1, ?2)",
params![self.id, self.worker_id],
|r| r.get(0),
)
})?;
Ok(n > 0)
}
pub fn retry(&self, delay_s: i64, error: &str) -> Result<bool> {
let n: i64 = self.inner.with_conn(|c| {
c.query_row(
"SELECT honker_retry(?1, ?2, ?3, ?4)",
params![self.id, self.worker_id, delay_s, error],
|r| r.get(0),
)
})?;
Ok(n > 0)
}
pub fn fail(&self, error: &str) -> Result<bool> {
let n: i64 = self.inner.with_conn(|c| {
c.query_row(
"SELECT honker_fail(?1, ?2, ?3)",
params![self.id, self.worker_id, error],
|r| r.get(0),
)
})?;
Ok(n > 0)
}
pub fn heartbeat(&self, extend_s: i64) -> Result<bool> {
let n: i64 = self.inner.with_conn(|c| {
c.query_row(
"SELECT honker_heartbeat(?1, ?2, ?3)",
params![self.id, self.worker_id, extend_s],
|r| r.get(0),
)
})?;
Ok(n > 0)
}
}
pub struct ClaimWaker {
inner: Arc<Inner>,
name: String,
visibility_timeout_s: i64,
sub_id: u64,
rx: Receiver<()>,
}
impl ClaimWaker {
pub fn next(&self, worker_id: &str) -> Result<Option<Job>> {
loop {
let rows_json: String = self.inner.with_conn(|c| {
c.query_row(
"SELECT honker_claim_batch(?1, ?2, ?3, ?4)",
params![self.name, worker_id, 1, self.visibility_timeout_s],
|r| r.get(0),
)
})?;
let raw: Vec<RawJob> = serde_json::from_str(&rows_json)?;
if let Some(r) = raw.into_iter().next() {
return Ok(Some(Job {
inner: self.inner.clone(),
id: r.id,
queue: r.queue,
payload: r.payload.into_bytes(),
worker_id: r.worker_id,
attempts: r.attempts,
}));
}
let next_at: i64 = self.inner.with_conn(|c| {
c.query_row(
"SELECT honker_queue_next_claim_at(?1)",
params![self.name],
|r| r.get::<_, i64>(0),
)
})?;
if next_at > 0 && next_at <= chrono_like_now() {
continue;
}
match recv_until(&self.rx, next_at) {
Ok(true) => continue,
Ok(false) => continue,
Err(RecvTimeoutError::Disconnected) => return Ok(None),
Err(RecvTimeoutError::Timeout) => continue,
}
}
}
pub fn try_next(&self, worker_id: &str) -> Result<Option<Job>> {
let rows_json: String = self.inner.with_conn(|c| {
c.query_row(
"SELECT honker_claim_batch(?1, ?2, ?3, ?4)",
params![self.name, worker_id, 1, self.visibility_timeout_s],
|r| r.get(0),
)
})?;
let raw: Vec<RawJob> = serde_json::from_str(&rows_json)?;
Ok(raw.into_iter().next().map(|r| Job {
inner: self.inner.clone(),
id: r.id,
queue: r.queue,
payload: r.payload.into_bytes(),
worker_id: r.worker_id,
attempts: r.attempts,
}))
}
}
impl Drop for ClaimWaker {
fn drop(&mut self) {
self.inner.updates.unsubscribe(self.sub_id);
}
}
pub struct Stream {
inner: Arc<Inner>,
name: String,
}
impl Stream {
pub fn topic(&self) -> &str {
&self.name
}
pub fn publish<P: Serialize>(&self, payload: &P) -> Result<i64> {
self.publish_with_key_opt(None, payload)
}
pub fn publish_with_key<P: Serialize>(&self, key: &str, payload: &P) -> Result<i64> {
self.publish_with_key_opt(Some(key), payload)
}
pub fn publish_tx<P: Serialize>(&self, tx: &Transaction<'_>, payload: &P) -> Result<i64> {
let json = serde_json::to_string(payload)?;
tx.query_row(
"SELECT honker_stream_publish(?1, NULL, ?2)",
params![self.name, json],
|r| r.get::<_, i64>(0),
)
}
fn publish_with_key_opt<P: Serialize>(&self, key: Option<&str>, payload: &P) -> Result<i64> {
let json = serde_json::to_string(payload)?;
Ok(self.inner.with_conn(|c| {
c.query_row(
"SELECT honker_stream_publish(?1, ?2, ?3)",
params![self.name, key, json],
|r| r.get::<_, i64>(0),
)
})?)
}
pub fn read_since(&self, offset: i64, limit: i64) -> Result<Vec<StreamEvent>> {
let rows_json: String = self.inner.with_conn(|c| {
c.query_row(
"SELECT honker_stream_read_since(?1, ?2, ?3)",
params![self.name, offset, limit],
|r| r.get(0),
)
})?;
let raw: Vec<RawStreamEvent> = serde_json::from_str(&rows_json)?;
Ok(raw.into_iter().map(StreamEvent::from).collect())
}
pub fn read_from_consumer(&self, consumer: &str, limit: i64) -> Result<Vec<StreamEvent>> {
let offset = self.get_offset(consumer)?;
self.read_since(offset, limit)
}
pub fn save_offset(&self, consumer: &str, offset: i64) -> Result<bool> {
let n: i64 = self.inner.with_conn(|c| {
c.query_row(
"SELECT honker_stream_save_offset(?1, ?2, ?3)",
params![consumer, self.name, offset],
|r| r.get(0),
)
})?;
Ok(n > 0)
}
pub fn save_offset_tx(
&self,
tx: &Transaction<'_>,
consumer: &str,
offset: i64,
) -> Result<bool> {
let n: i64 = tx.query_row(
"SELECT honker_stream_save_offset(?1, ?2, ?3)",
params![consumer, self.name, offset],
|r| r.get(0),
)?;
Ok(n > 0)
}
pub fn get_offset(&self, consumer: &str) -> Result<i64> {
Ok(self.inner.with_conn(|c| {
c.query_row(
"SELECT honker_stream_get_offset(?1, ?2)",
params![consumer, self.name],
|r| r.get::<_, i64>(0),
)
})?)
}
pub fn subscribe(&self, consumer: &str) -> Result<StreamSubscription> {
let offset = self.get_offset(consumer)?;
let (sub_id, rx) = self.inner.updates.subscribe();
Ok(StreamSubscription {
inner: self.inner.clone(),
topic: self.name.clone(),
consumer: consumer.to_string(),
sub_id,
rx,
last_offset: offset,
last_saved: offset,
save_every_n: 1000,
pending: std::collections::VecDeque::new(),
})
}
}
#[derive(Deserialize)]
struct RawStreamEvent {
offset: i64,
topic: String,
key: Option<String>,
payload: String,
created_at: i64,
}
pub struct StreamEvent {
pub offset: i64,
pub topic: String,
pub key: Option<String>,
pub payload: Vec<u8>,
pub created_at: i64,
}
impl StreamEvent {
pub fn payload_as<T: DeserializeOwned>(&self) -> Result<T> {
Ok(serde_json::from_slice(&self.payload)?)
}
}
impl From<RawStreamEvent> for StreamEvent {
fn from(r: RawStreamEvent) -> Self {
Self {
offset: r.offset,
topic: r.topic,
key: r.key,
payload: r.payload.into_bytes(),
created_at: r.created_at,
}
}
}
pub struct StreamSubscription {
inner: Arc<Inner>,
topic: String,
consumer: String,
sub_id: u64,
rx: Receiver<()>,
last_offset: i64,
last_saved: i64,
save_every_n: i64,
pending: std::collections::VecDeque<StreamEvent>,
}
impl StreamSubscription {
pub fn save_every(mut self, n: i64) -> Self {
self.save_every_n = n;
self
}
pub fn offset(&self) -> i64 {
self.last_offset
}
pub fn save_offset(&mut self) -> Result<()> {
if self.last_offset > self.last_saved {
let n: i64 = self.inner.with_conn(|c| {
c.query_row(
"SELECT honker_stream_save_offset(?1, ?2, ?3)",
params![self.consumer, self.topic, self.last_offset],
|r| r.get(0),
)
})?;
if n > 0 {
self.last_saved = self.last_offset;
}
}
Ok(())
}
fn refill(&mut self) -> Result<()> {
let rows_json: String = self.inner.with_conn(|c| {
c.query_row(
"SELECT honker_stream_read_since(?1, ?2, ?3)",
params![self.topic, self.last_offset, 100],
|r| r.get(0),
)
})?;
let raw: Vec<RawStreamEvent> = serde_json::from_str(&rows_json)?;
for r in raw {
self.pending.push_back(StreamEvent::from(r));
}
Ok(())
}
}
impl Iterator for StreamSubscription {
type Item = Result<StreamEvent>;
fn next(&mut self) -> Option<Self::Item> {
loop {
if let Some(ev) = self.pending.pop_front() {
self.last_offset = ev.offset;
if self.save_every_n > 0
&& self.last_offset - self.last_saved >= self.save_every_n
&& let Err(e) = self.save_offset()
{
return Some(Err(e));
}
return Some(Ok(ev));
}
if let Err(e) = self.refill() {
return Some(Err(e));
}
if !self.pending.is_empty() {
continue;
}
match self.rx.recv() {
Ok(()) => {
while self.rx.try_recv().is_ok() {}
continue;
}
Err(_) => return None,
}
}
}
}
impl Drop for StreamSubscription {
fn drop(&mut self) {
let _ = self.save_offset();
self.inner.updates.unsubscribe(self.sub_id);
}
}
pub struct Notification {
pub id: i64,
pub channel: String,
pub payload: String,
}
impl Notification {
pub fn payload_as<T: DeserializeOwned>(&self) -> Result<T> {
Ok(serde_json::from_str(&self.payload)?)
}
}
pub struct Subscription {
inner: Arc<Inner>,
sub_id: u64,
rx: Receiver<()>,
channel: String,
last_id: i64,
pending: std::collections::VecDeque<Notification>,
closed: bool,
}
impl Subscription {
pub fn recv(&mut self) -> Option<Result<Notification>> {
if self.closed {
return None;
}
loop {
if let Some(n) = self.pending.pop_front() {
self.last_id = n.id;
return Some(Ok(n));
}
if let Err(e) = self.refill() {
return Some(Err(e));
}
if !self.pending.is_empty() {
continue;
}
match self.rx.recv() {
Ok(()) => {
while self.rx.try_recv().is_ok() {}
continue;
}
Err(_) => {
self.closed = true;
return None;
}
}
}
}
pub fn try_next(&mut self) -> Result<Option<Notification>> {
if let Some(n) = self.pending.pop_front() {
self.last_id = n.id;
return Ok(Some(n));
}
self.refill()?;
if let Some(n) = self.pending.pop_front() {
self.last_id = n.id;
return Ok(Some(n));
}
Ok(None)
}
pub fn recv_timeout(&mut self, timeout: Duration) -> Result<Option<Notification>> {
if let Some(n) = self.pending.pop_front() {
self.last_id = n.id;
return Ok(Some(n));
}
let deadline = std::time::Instant::now() + timeout;
loop {
self.refill()?;
if let Some(n) = self.pending.pop_front() {
self.last_id = n.id;
return Ok(Some(n));
}
let now = std::time::Instant::now();
if now >= deadline {
return Ok(None);
}
match self.rx.recv_timeout(deadline - now) {
Ok(()) => {
while self.rx.try_recv().is_ok() {}
continue;
}
Err(RecvTimeoutError::Timeout) => return Ok(None),
Err(RecvTimeoutError::Disconnected) => {
self.closed = true;
return Ok(None);
}
}
}
}
fn refill(&mut self) -> Result<()> {
let rows: Vec<(i64, String, String)> = self.inner.with_conn(|c| {
let mut stmt = c.prepare_cached(
"SELECT id, channel, payload
FROM _honker_notifications
WHERE id > ?1 AND channel = ?2
ORDER BY id ASC
LIMIT 1000",
)?;
let iter = stmt.query_map(params![self.last_id, self.channel], |r| {
Ok((
r.get::<_, i64>(0)?,
r.get::<_, String>(1)?,
r.get::<_, String>(2)?,
))
})?;
iter.collect::<rusqlite::Result<Vec<_>>>()
})?;
for (id, channel, payload) in rows {
self.pending.push_back(Notification {
id,
channel,
payload,
});
}
Ok(())
}
}
impl Iterator for Subscription {
type Item = Result<Notification>;
fn next(&mut self) -> Option<Self::Item> {
Subscription::recv(self)
}
}
impl Drop for Subscription {
fn drop(&mut self) {
self.inner.updates.unsubscribe(self.sub_id);
}
}
pub struct UpdateEvents {
inner: Arc<Inner>,
sub_id: u64,
rx: Receiver<()>,
}
impl UpdateEvents {
pub fn recv(&self) -> Result<()> {
self.rx.recv().map_err(|_| Error::UpdateClosed)
}
pub fn try_recv(&self) -> Option<()> {
self.rx.try_recv().ok()
}
pub fn recv_timeout(&self, timeout: Duration) -> Result<Option<()>> {
match self.rx.recv_timeout(timeout) {
Ok(()) => Ok(Some(())),
Err(RecvTimeoutError::Timeout) => Ok(None),
Err(RecvTimeoutError::Disconnected) => Err(Error::UpdateClosed),
}
}
}
impl Drop for UpdateEvents {
fn drop(&mut self) {
self.inner.updates.unsubscribe(self.sub_id);
}
}
#[derive(Debug, Clone)]
pub struct ScheduledTask {
pub name: String,
pub queue: String,
pub schedule: String,
pub payload: serde_json::Value,
pub priority: i64,
pub expires_s: Option<i64>,
}
#[derive(Debug, Deserialize)]
pub struct ScheduledFire {
pub name: String,
pub queue: String,
pub fire_at: i64,
pub job_id: i64,
}
#[derive(Debug, Deserialize, Clone)]
pub struct ScheduleRow {
pub name: String,
pub queue: String,
pub cron_expr: String,
pub payload: String,
pub priority: i64,
pub expires_s: Option<i64>,
pub next_fire_at: i64,
pub enabled: bool,
}
#[derive(Debug, Default, Clone)]
pub struct ScheduleUpdate {
pub cron_expr: Option<String>,
pub payload: Option<serde_json::Value>,
pub priority: Option<i64>,
pub expires_s: Option<Option<i64>>,
}
pub struct Scheduler {
inner: Arc<Inner>,
}
impl Scheduler {
pub fn add(&self, task: ScheduledTask) -> Result<()> {
let payload_json = serde_json::to_string(&task.payload)?;
self.inner.with_conn(|c| {
c.query_row(
"SELECT honker_scheduler_register(?1, ?2, ?3, ?4, ?5, ?6)",
params![
task.name,
task.queue,
task.schedule,
payload_json,
task.priority,
task.expires_s,
],
|_| Ok(()),
)
})?;
Ok(())
}
pub fn remove(&self, name: &str) -> Result<i64> {
Ok(self.inner.with_conn(|c| {
c.query_row(
"SELECT honker_scheduler_unregister(?1)",
params![name],
|r| r.get::<_, i64>(0),
)
})?)
}
pub fn tick(&self) -> Result<Vec<ScheduledFire>> {
let now = chrono_like_now();
let rows_json: String = self.inner.with_conn(|c| {
c.query_row("SELECT honker_scheduler_tick(?1)", params![now], |r| {
r.get(0)
})
})?;
Ok(serde_json::from_str(&rows_json)?)
}
pub fn soonest(&self) -> Result<i64> {
Ok(self.inner.with_conn(|c| {
c.query_row("SELECT honker_scheduler_soonest()", [], |r| {
r.get::<_, i64>(0)
})
})?)
}
pub fn pause(&self, name: &str) -> Result<bool> {
Ok(self.inner.with_conn(|c| {
c.query_row(
"SELECT honker_scheduler_pause(?1)",
params![name],
|r| r.get::<_, i64>(0),
)
})? > 0)
}
pub fn resume(&self, name: &str) -> Result<bool> {
Ok(self.inner.with_conn(|c| {
c.query_row(
"SELECT honker_scheduler_resume(?1)",
params![name],
|r| r.get::<_, i64>(0),
)
})? > 0)
}
pub fn list(&self) -> Result<Vec<ScheduleRow>> {
let json: String = self.inner.with_conn(|c| {
c.query_row("SELECT honker_scheduler_list()", [], |r| r.get(0))
})?;
Ok(serde_json::from_str(&json)?)
}
pub fn update(&self, name: &str, opts: ScheduleUpdate) -> Result<bool> {
if opts.cron_expr.is_none()
&& opts.payload.is_none()
&& opts.priority.is_none()
&& opts.expires_s.is_none()
{
return Ok(false);
}
let payload_json = match &opts.payload {
Some(v) => Some(serde_json::to_string(v)?),
None => None,
};
let touch_expires: i64 = if opts.expires_s.is_some() { 1 } else { 0 };
let expires_arg: Option<i64> = opts.expires_s.flatten();
Ok(self.inner.with_conn(|c| {
c.query_row(
"SELECT honker_scheduler_update(?1, ?2, ?3, ?4, ?5, ?6)",
params![
name,
opts.cron_expr,
payload_json,
opts.priority,
expires_arg,
touch_expires,
],
|r| r.get::<_, i64>(0),
)
})? > 0)
}
pub fn run(&self, stop: Arc<std::sync::atomic::AtomicBool>, owner: &str) -> Result<()> {
const LOCK_TTL: i64 = 60;
const HEARTBEAT: Duration = Duration::from_secs(20);
let (sub_id, rx) = self.inner.updates.subscribe();
let result = (|| -> Result<()> {
while !stop.load(std::sync::atomic::Ordering::Acquire) {
let acquired = lock_try_acquire(&self.inner, "honker-scheduler", owner, LOCK_TTL)?;
if !acquired {
match rx.recv_timeout(Duration::from_secs(5)) {
Ok(()) => {
while rx.try_recv().is_ok() {}
continue;
}
Err(RecvTimeoutError::Timeout) => continue,
Err(RecvTimeoutError::Disconnected) => return Ok(()),
}
}
let leader_result = self.leader_loop(&stop, owner, LOCK_TTL, HEARTBEAT, &rx);
let _ = lock_release(&self.inner, "honker-scheduler", owner);
leader_result?;
}
Ok(())
})();
self.inner.updates.unsubscribe(sub_id);
result
}
fn leader_loop(
&self,
stop: &Arc<std::sync::atomic::AtomicBool>,
owner: &str,
lock_ttl: i64,
heartbeat: Duration,
rx: &Receiver<()>,
) -> Result<()> {
let mut last_heartbeat = std::time::Instant::now();
while !stop.load(std::sync::atomic::Ordering::Acquire) {
self.tick()?;
if last_heartbeat.elapsed() >= heartbeat {
let still_ours =
lock_try_acquire(&self.inner, "honker-scheduler", owner, lock_ttl)?;
if !still_ours {
return Ok(());
}
last_heartbeat = std::time::Instant::now();
}
let mut wait_for = heartbeat.saturating_sub(last_heartbeat.elapsed());
let soonest = self.soonest()?;
if soonest > 0 {
let now = chrono_like_now();
let until_soonest = if soonest <= now {
Duration::ZERO
} else {
Duration::from_secs((soonest - now) as u64)
};
if until_soonest < wait_for {
wait_for = until_soonest;
}
}
match rx.recv_timeout(wait_for) {
Ok(()) => while rx.try_recv().is_ok() {},
Err(RecvTimeoutError::Timeout) => {}
Err(RecvTimeoutError::Disconnected) => return Ok(()),
}
}
Ok(())
}
}
fn recv_until(rx: &Receiver<()>, unix_sec: i64) -> std::result::Result<bool, RecvTimeoutError> {
if unix_sec <= 0 {
match rx.recv() {
Ok(()) => {
while rx.try_recv().is_ok() {}
Ok(true)
}
Err(_) => Err(RecvTimeoutError::Disconnected),
}
} else {
let now = chrono_like_now();
let timeout = if unix_sec <= now {
Duration::ZERO
} else {
Duration::from_secs((unix_sec - now) as u64)
};
match rx.recv_timeout(timeout) {
Ok(()) => {
while rx.try_recv().is_ok() {}
Ok(true)
}
Err(e) => Err(e),
}
}
}
fn lock_try_acquire(inner: &Inner, name: &str, owner: &str, ttl_s: i64) -> Result<bool> {
let n: i64 = inner.with_conn(|c| {
c.query_row(
"SELECT honker_lock_acquire(?1, ?2, ?3)",
params![name, owner, ttl_s],
|r| r.get(0),
)
})?;
Ok(n == 1)
}
fn lock_release(inner: &Inner, name: &str, owner: &str) -> Result<bool> {
let n: i64 = inner.with_conn(|c| {
c.query_row(
"SELECT honker_lock_release(?1, ?2)",
params![name, owner],
|r| r.get(0),
)
})?;
Ok(n > 0)
}
fn chrono_like_now() -> i64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_secs() as i64)
.unwrap_or(0)
}
pub struct Lock {
inner: Arc<Inner>,
name: String,
owner: String,
released: bool,
}
impl Lock {
pub fn name(&self) -> &str {
&self.name
}
pub fn owner(&self) -> &str {
&self.owner
}
pub fn release(mut self) -> Result<bool> {
let n: i64 = self.inner.with_conn(|c| {
c.query_row(
"SELECT honker_lock_release(?1, ?2)",
params![self.name, self.owner],
|r| r.get(0),
)
})?;
self.released = true;
Ok(n > 0)
}
pub fn heartbeat(&self, ttl_s: i64) -> Result<bool> {
lock_try_acquire(&self.inner, &self.name, &self.owner, ttl_s)
}
}
impl Drop for Lock {
fn drop(&mut self) {
if !self.released {
let _ = self.inner.with_conn(|c| {
c.query_row(
"SELECT honker_lock_release(?1, ?2)",
params![self.name, self.owner],
|_| Ok(()),
)
});
}
}
}