use std::sync::LazyLock;
use anyhow::Result;
use crate::memory::config::MemoryConfig;
use crate::memory::queue::gate::{LlmGate, Permit};
use crate::memory::queue::handlers::{self, QueueDelegates};
use crate::memory::queue::store::{claim_next, purge_retired_jobs, DEFAULT_LOCK_DURATION_MS};
use crate::memory::queue::store_settle::{
mark_deferred, mark_done, mark_failed_typed, recover_stale_locks,
};
use crate::memory::queue::types::{Job, JobFailure, JobOutcome};
static LLM_GATE: LazyLock<LlmGate> = LazyLock::new(LlmGate::default);
pub fn llm_gate() -> &'static LlmGate {
&LLM_GATE
}
pub fn bootstrap(config: &MemoryConfig) -> Result<(usize, usize)> {
let purged = purge_retired_jobs(config)?;
let recovered = recover_stale_locks(config)?;
Ok((purged, recovered))
}
pub async fn run_once(config: &MemoryConfig, delegates: &dyn QueueDelegates) -> Result<bool> {
let Some(job) = claim_next(config, DEFAULT_LOCK_DURATION_MS)? else {
return Ok(false);
};
let permit: Option<Permit> = if job.kind.is_llm_bound() {
Some(LLM_GATE.acquire())
} else {
None
};
let result = handlers::handle_job(config, &job, delegates).await;
drop(permit);
settle_job(config, &job, result)?;
Ok(true)
}
fn settle_job(config: &MemoryConfig, job: &Job, result: Result<JobOutcome>) -> Result<()> {
match result {
Ok(JobOutcome::Done) => mark_done(config, job),
Ok(JobOutcome::Defer { until_ms, reason }) => {
mark_deferred(config, job, until_ms, &reason)
}
Err(err) => {
let message = format!("{err:#}");
let typed = err.downcast_ref::<JobFailure>();
mark_failed_typed(config, job, &message, typed)
}
}
}
pub fn is_sqlite_busy(err: &anyhow::Error) -> bool {
if let Some(rusqlite::Error::SqliteFailure(sqlite_err, _)) =
err.downcast_ref::<rusqlite::Error>()
{
return matches!(
sqlite_err.code,
rusqlite::ErrorCode::DatabaseBusy | rusqlite::ErrorCode::DatabaseLocked
);
}
let msg = format!("{err:#}").to_ascii_lowercase();
msg.contains("database is locked") || msg.contains("database table is locked")
}
pub fn is_sqlite_io_transient(err: &anyhow::Error) -> bool {
if let Some(rusqlite::Error::SqliteFailure(f, _)) = err.downcast_ref::<rusqlite::Error>() {
if matches!(f.extended_code, 14 | 1546 | 4618 | 4874 | 5386 | 8714) {
return true;
}
if f.code == rusqlite::ErrorCode::CannotOpen {
return true;
}
}
let msg = format!("{err:#}").to_ascii_lowercase();
msg.contains("circuit breaker open")
|| msg.contains("disk i/o error")
|| msg.contains("unable to open database file")
|| msg.contains("xshmmap")
|| msg.contains("truncate file")
}
pub fn is_sqlite_disk_full(err: &anyhow::Error) -> bool {
if let Some(rusqlite::Error::SqliteFailure(sqlite_err, _)) =
err.downcast_ref::<rusqlite::Error>()
{
if sqlite_err.code == rusqlite::ErrorCode::DiskFull {
return true;
}
}
let msg = format!("{err:#}").to_ascii_lowercase();
msg.contains("database or disk is full")
|| msg.contains("insertion failed because database is full")
}
pub fn is_sqlite_corrupt(err: &anyhow::Error) -> bool {
if let Some(rusqlite::Error::SqliteFailure(sqlite_err, _)) =
err.downcast_ref::<rusqlite::Error>()
{
if matches!(
sqlite_err.code,
rusqlite::ErrorCode::DatabaseCorrupt | rusqlite::ErrorCode::NotADatabase
) {
return true;
}
}
let msg = format!("{err:#}").to_ascii_lowercase();
msg.contains("database disk image is malformed") || msg.contains("file is not a database")
}
#[cfg(test)]
#[path = "worker_tests.rs"]
mod tests;