use anyhow::{Context, Result};
use chrono::Utc;
use rusqlite::{params, Connection, OptionalExtension, Transaction};
use uuid::Uuid;
use crate::memory::chunks::with_connection;
use crate::memory::config::MemoryConfig;
use crate::memory::queue::types::{Job, JobKind, JobStatus, NewJob, RETIRED_JOB_KINDS};
pub const DEFAULT_LOCK_DURATION_MS: i64 = 5 * 60 * 1_000;
pub(crate) const RETRY_BASE_MS: i64 = 60 * 1_000;
pub(crate) const RETRY_CAP_MS: i64 = 60 * 60 * 1_000;
pub(crate) const DEFAULT_MAX_ATTEMPTS: u32 = 5;
pub fn enqueue(config: &MemoryConfig, job: &NewJob) -> Result<Option<String>> {
with_connection(config, |conn| enqueue_conn(conn, job))
}
pub fn enqueue_tx(tx: &Transaction<'_>, job: &NewJob) -> Result<Option<String>> {
enqueue_conn(tx, job)
}
pub(crate) fn enqueue_conn(conn: &Connection, job: &NewJob) -> Result<Option<String>> {
let id = format!("job:{}", Uuid::new_v4());
let now_ms = Utc::now().timestamp_millis();
let available_at = job.available_at_ms.unwrap_or(now_ms);
let max_attempts = job.max_attempts.unwrap_or(DEFAULT_MAX_ATTEMPTS) as i64;
let inserted = conn.execute(
"INSERT OR IGNORE INTO mem_tree_jobs (
id, kind, payload_json, dedupe_key, status, attempts, max_attempts,
available_at_ms, locked_until_ms, last_error,
created_at_ms, started_at_ms, completed_at_ms
) VALUES (?1, ?2, ?3, ?4, 'ready', 0, ?5, ?6, NULL, NULL, ?7, NULL, NULL)",
params![
id,
job.kind.as_str(),
job.payload_json,
job.dedupe_key,
max_attempts,
available_at,
now_ms,
],
)?;
if inserted == 0 {
return Ok(None);
}
Ok(Some(id))
}
pub fn claim_next(config: &MemoryConfig, lock_duration_ms: i64) -> Result<Option<Job>> {
with_connection(config, |conn| {
let now_ms = Utc::now().timestamp_millis();
let lock_until = now_ms.saturating_add(lock_duration_ms);
let row = conn
.query_row(
"UPDATE mem_tree_jobs
SET status = 'running',
attempts = attempts + 1,
started_at_ms = ?1,
locked_until_ms = ?2,
last_error = NULL
WHERE id = (
SELECT id FROM mem_tree_jobs
WHERE status = 'ready'
AND available_at_ms <= ?1
AND kind NOT IN ('topic_route', 'digest_daily')
ORDER BY
CASE kind
WHEN 'seal' THEN 1
WHEN 'flush_stale' THEN 2
WHEN 'append_buffer' THEN 3
ELSE 4
END ASC,
available_at_ms ASC
LIMIT 1
)
RETURNING id, kind, payload_json, dedupe_key, status, attempts,
max_attempts, available_at_ms, locked_until_ms, last_error,
created_at_ms, started_at_ms, completed_at_ms,
failure_reason, failure_class",
params![now_ms, lock_until],
row_to_job,
)
.optional()
.context("Failed to claim next mem_tree_jobs row")?;
Ok(row)
})
}
pub fn purge_retired_jobs(config: &MemoryConfig) -> Result<usize> {
with_connection(config, |conn| {
let n = conn.execute(
"DELETE FROM mem_tree_jobs WHERE kind IN ('topic_route', 'digest_daily')",
[],
)?;
Ok(n)
})
}
pub fn count_by_status(config: &MemoryConfig, status: JobStatus) -> Result<u64> {
with_connection(config, |conn| {
let n: i64 = conn.query_row(
"SELECT COUNT(*) FROM mem_tree_jobs WHERE status = ?1",
params![status.as_str()],
|r| r.get(0),
)?;
Ok(n.max(0) as u64)
})
}
pub fn count_failed_unrecoverable(config: &MemoryConfig) -> Result<u64> {
with_connection(config, |conn| {
let n: i64 = conn.query_row(
"SELECT COUNT(*) FROM mem_tree_jobs \
WHERE status = 'failed' AND failure_class = 'unrecoverable'",
[],
|r| r.get(0),
)?;
Ok(n.max(0) as u64)
})
}
pub fn count_total(config: &MemoryConfig) -> Result<u64> {
with_connection(config, |conn| {
let n: i64 = conn.query_row("SELECT COUNT(*) FROM mem_tree_jobs", [], |r| r.get(0))?;
Ok(n.max(0) as u64)
})
}
pub fn get_job(config: &MemoryConfig, id: &str) -> Result<Option<Job>> {
with_connection(config, |conn| {
let job = conn
.query_row(
"SELECT id, kind, payload_json, dedupe_key, status, attempts, max_attempts,
available_at_ms, locked_until_ms, last_error,
created_at_ms, started_at_ms, completed_at_ms,
failure_reason, failure_class
FROM mem_tree_jobs WHERE id = ?1",
params![id],
row_to_job,
)
.optional()?;
Ok(job)
})
}
pub(crate) fn row_to_job(row: &rusqlite::Row<'_>) -> rusqlite::Result<Job> {
let id: String = row.get(0)?;
let kind_s: String = row.get(1)?;
let payload_json: String = row.get(2)?;
let dedupe_key: Option<String> = row.get(3)?;
let status_s: String = row.get(4)?;
let attempts: i64 = row.get(5)?;
let max_attempts: i64 = row.get(6)?;
let available_at_ms: i64 = row.get(7)?;
let locked_until_ms: Option<i64> = row.get(8)?;
let last_error: Option<String> = row.get(9)?;
let created_at_ms: i64 = row.get(10)?;
let started_at_ms: Option<i64> = row.get(11)?;
let completed_at_ms: Option<i64> = row.get(12)?;
let failure_reason: Option<String> = row.get(13)?;
let failure_class: Option<String> = row.get(14)?;
let kind = JobKind::parse(&kind_s).map_err(|e| {
rusqlite::Error::FromSqlConversionFailure(1, rusqlite::types::Type::Text, e.into())
})?;
let status = JobStatus::parse(&status_s).map_err(|e| {
rusqlite::Error::FromSqlConversionFailure(4, rusqlite::types::Type::Text, e.into())
})?;
Ok(Job {
id,
kind,
payload_json,
dedupe_key,
status,
attempts: attempts.max(0) as u32,
max_attempts: max_attempts.max(0) as u32,
available_at_ms,
locked_until_ms,
last_error,
failure_reason,
failure_class,
created_at_ms,
started_at_ms,
completed_at_ms,
})
}
pub(crate) fn backoff_ms(attempts_so_far: u32) -> i64 {
let exp = attempts_so_far.saturating_sub(1).min(20);
let mult = 1i64 << exp;
let raw = RETRY_BASE_MS.saturating_mul(mult);
raw.min(RETRY_CAP_MS)
}
pub fn is_retired_kind(kind: &str) -> bool {
RETIRED_JOB_KINDS.contains(&kind)
}
#[cfg(test)]
#[path = "store_tests.rs"]
mod tests;