use anyhow::Result;
use chrono::Utc;
use rusqlite::params;
use crate::memory::chunks::with_connection;
use crate::memory::config::MemoryConfig;
use crate::memory::queue::store::backoff_ms;
use crate::memory::queue::types::{Job, JobFailure};
pub fn mark_done(config: &MemoryConfig, job: &Job) -> Result<()> {
let job_id = &job.id;
let claim_attempts = job.attempts as i64;
let claim_started_at = job.started_at_ms;
with_connection(config, |conn| {
let now_ms = Utc::now().timestamp_millis();
conn.execute(
"UPDATE mem_tree_jobs
SET status = 'done',
completed_at_ms = ?1,
locked_until_ms = NULL,
last_error = NULL
WHERE id = ?2
AND attempts = ?3
AND started_at_ms IS ?4",
params![now_ms, job_id, claim_attempts, claim_started_at],
)?;
Ok(())
})
}
pub fn mark_failed(config: &MemoryConfig, job: &Job, error: &str) -> Result<()> {
mark_failed_typed(config, job, error, None)
}
pub fn mark_failed_typed(
config: &MemoryConfig,
job: &Job,
error: &str,
failure: Option<&JobFailure>,
) -> Result<()> {
let job_id = &job.id;
let attempts = job.attempts as i64;
let max_attempts = job.max_attempts as i64;
let claim_started_at = job.started_at_ms;
let unrecoverable = failure.map(|f| f.is_unrecoverable()).unwrap_or(false);
let failure_reason = failure.map(|f| f.code);
let failure_class = failure.map(|f| f.class);
with_connection(config, |conn| {
let now_ms = Utc::now().timestamp_millis();
if attempts >= max_attempts || unrecoverable {
conn.execute(
"UPDATE mem_tree_jobs
SET status = 'failed',
completed_at_ms = ?1,
locked_until_ms = NULL,
last_error = ?2,
failure_reason = ?6,
failure_class = ?7
WHERE id = ?3
AND attempts = ?4
AND started_at_ms IS ?5",
params![
now_ms,
error,
job_id,
attempts,
claim_started_at,
failure_reason,
failure_class,
],
)?;
} else {
let next_at = now_ms.saturating_add(backoff_ms(attempts as u32));
conn.execute(
"UPDATE mem_tree_jobs
SET status = 'ready',
available_at_ms = ?1,
locked_until_ms = NULL,
last_error = ?2
WHERE id = ?3
AND attempts = ?4
AND started_at_ms IS ?5",
params![next_at, error, job_id, attempts, claim_started_at],
)?;
}
Ok(())
})
}
pub fn mark_deferred(config: &MemoryConfig, job: &Job, until_ms: i64, reason: &str) -> Result<()> {
let job_id = &job.id;
let claim_attempts = job.attempts as i64;
let pre_claim_attempts = claim_attempts.saturating_sub(1);
let claim_started_at = job.started_at_ms;
with_connection(config, |conn| {
conn.execute(
"UPDATE mem_tree_jobs
SET status = 'ready',
attempts = ?1,
available_at_ms = ?2,
locked_until_ms = NULL,
started_at_ms = NULL,
last_error = ?3
WHERE id = ?4
AND attempts = ?5
AND started_at_ms IS ?6",
params![
pre_claim_attempts,
until_ms,
reason,
job_id,
claim_attempts,
claim_started_at,
],
)?;
Ok(())
})
}
pub fn recover_stale_locks(config: &MemoryConfig) -> Result<usize> {
with_connection(config, |conn| {
let now_ms = Utc::now().timestamp_millis();
let n = conn.execute(
"UPDATE mem_tree_jobs
SET status = 'ready',
last_error = COALESCE(last_error, 'recovered_from_stale_lock')
WHERE status = 'running'
AND locked_until_ms IS NOT NULL
AND locked_until_ms < ?1",
params![now_ms],
)?;
Ok(n)
})
}
pub fn release_running_locks(config: &MemoryConfig) -> Result<usize> {
with_connection(config, |conn| {
let n = conn.execute(
"UPDATE mem_tree_jobs
SET status = 'ready',
locked_until_ms = NULL
WHERE status = 'running'",
[],
)?;
Ok(n)
})
}
pub fn requeue_failed(config: &MemoryConfig) -> Result<u64> {
requeue_failed_where(config, "status = 'failed'")
}
pub fn requeue_transient_failed(config: &MemoryConfig) -> Result<u64> {
requeue_failed_where(
config,
"status = 'failed' AND (failure_class IS NULL OR failure_class != 'unrecoverable')",
)
}
pub fn retry_all_failed(config: &MemoryConfig) -> Result<u64> {
requeue_failed(config)
}
fn requeue_failed_where(config: &MemoryConfig, predicate: &str) -> Result<u64> {
with_connection(config, |conn| {
let now_ms = Utc::now().timestamp_millis();
let sql = format!(
"UPDATE mem_tree_jobs
SET status = 'ready',
attempts = 0,
available_at_ms = ?1,
locked_until_ms = NULL,
started_at_ms = NULL,
completed_at_ms = NULL,
last_error = NULL,
failure_reason = NULL,
failure_class = NULL
WHERE {predicate}"
);
let n = conn.execute(&sql, params![now_ms])?;
Ok(n as u64)
})
}
#[cfg(test)]
#[path = "store_settle_tests.rs"]
mod tests;