use chrono::{DateTime, Utc};
use cortex_core::{DecayJobId, EpisodeId, MemoryId};
use rusqlite::{params, OptionalExtension, Row};
use serde_json::Value;
use crate::{Pool, StoreError, StoreResult};
pub const SUMMARY_METHOD_NONE_WIRE: &str = "none";
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct DecayJobRecord {
pub id: DecayJobId,
pub kind_wire: String,
pub summary_method_wire: String,
pub source_ids_json: Value,
pub state_wire: String,
pub state_reason: Option<String>,
pub result_memory_id: Option<MemoryId>,
pub scheduled_for: DateTime<Utc>,
pub created_at: DateTime<Utc>,
pub created_by: String,
pub updated_at: DateTime<Utc>,
}
#[derive(Debug)]
pub struct DecayJobRepo<'a> {
pool: &'a Pool,
}
impl<'a> DecayJobRepo<'a> {
#[must_use]
pub const fn new(pool: &'a Pool) -> Self {
Self { pool }
}
pub fn insert(&self, record: &DecayJobRecord) -> StoreResult<()> {
validate_record(record)?;
self.pool.execute(
"INSERT INTO decay_jobs (
id, kind, summary_method, source_ids_json, state, state_reason,
result_memory_id, scheduled_for, created_at, created_by, updated_at
) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11);",
params![
record.id.to_string(),
record.kind_wire,
record.summary_method_wire,
serde_json::to_string(&record.source_ids_json)?,
record.state_wire,
record.state_reason,
record.result_memory_id.as_ref().map(ToString::to_string),
record.scheduled_for.to_rfc3339(),
record.created_at.to_rfc3339(),
record.created_by,
record.updated_at.to_rfc3339(),
],
)?;
Ok(())
}
pub fn read(&self, id: &DecayJobId) -> StoreResult<Option<DecayJobRecord>> {
let row = self
.pool
.query_row(
DECAY_JOB_SELECT_SQL_BY_ID,
params![id.to_string()],
decay_job_row,
)
.optional()?;
row.map(TryInto::try_into).transpose()
}
pub fn list_pending_ready(&self, now: DateTime<Utc>) -> StoreResult<Vec<DecayJobRecord>> {
let mut stmt = self.pool.prepare(
"SELECT id, kind, summary_method, source_ids_json, state, state_reason,
result_memory_id, scheduled_for, created_at, created_by, updated_at
FROM decay_jobs
WHERE state = 'pending' AND scheduled_for <= ?1
ORDER BY scheduled_for, id;",
)?;
let rows = stmt.query_map(params![now.to_rfc3339()], decay_job_row)?;
collect_records(rows)
}
pub fn list_by_state(&self, state_wire: &str) -> StoreResult<Vec<DecayJobRecord>> {
let mut stmt = self.pool.prepare(
"SELECT id, kind, summary_method, source_ids_json, state, state_reason,
result_memory_id, scheduled_for, created_at, created_by, updated_at
FROM decay_jobs
WHERE state = ?1
ORDER BY scheduled_for, id;",
)?;
let rows = stmt.query_map(params![state_wire], decay_job_row)?;
collect_records(rows)
}
pub fn update_state(
&self,
id: &DecayJobId,
state_wire: &str,
state_reason: Option<&str>,
result_memory_id: Option<&MemoryId>,
updated_at: DateTime<Utc>,
) -> StoreResult<()> {
validate_state_payload(state_wire, state_reason, result_memory_id)?;
let changed = self.pool.execute(
"UPDATE decay_jobs
SET state = ?2, state_reason = ?3, result_memory_id = ?4, updated_at = ?5
WHERE id = ?1;",
params![
id.to_string(),
state_wire,
state_reason,
result_memory_id.map(ToString::to_string),
updated_at.to_rfc3339(),
],
)?;
if changed == 0 {
return Err(StoreError::Validation(format!(
"decay job {id} not found for update_state"
)));
}
Ok(())
}
pub fn record_memory_supersession(
&self,
source_memory_id: &MemoryId,
summary_memory_id: &MemoryId,
decay_job_id: Option<&DecayJobId>,
recorded_at: DateTime<Utc>,
) -> StoreResult<()> {
if source_memory_id == summary_memory_id {
return Err(StoreError::Validation(
"memory supersession requires distinct source and summary ids".into(),
));
}
self.pool.execute(
"INSERT OR IGNORE INTO memory_supersessions
(source_memory_id, summary_memory_id, decay_job_id, recorded_at)
VALUES (?1, ?2, ?3, ?4);",
params![
source_memory_id.to_string(),
summary_memory_id.to_string(),
decay_job_id.map(ToString::to_string),
recorded_at.to_rfc3339(),
],
)?;
Ok(())
}
pub fn record_episode_supersession(
&self,
source_episode_id: &EpisodeId,
summary_memory_id: &MemoryId,
decay_job_id: Option<&DecayJobId>,
recorded_at: DateTime<Utc>,
) -> StoreResult<()> {
self.pool.execute(
"INSERT OR IGNORE INTO episode_supersessions
(source_episode_id, summary_memory_id, decay_job_id, recorded_at)
VALUES (?1, ?2, ?3, ?4);",
params![
source_episode_id.to_string(),
summary_memory_id.to_string(),
decay_job_id.map(ToString::to_string),
recorded_at.to_rfc3339(),
],
)?;
Ok(())
}
pub fn list_memory_sources_for(
&self,
summary_memory_id: &MemoryId,
) -> StoreResult<Vec<MemoryId>> {
let mut stmt = self.pool.prepare(
"SELECT source_memory_id FROM memory_supersessions
WHERE summary_memory_id = ?1
ORDER BY source_memory_id;",
)?;
let rows = stmt.query_map(params![summary_memory_id.to_string()], |row| {
row.get::<_, String>(0)
})?;
let mut out = Vec::new();
for row in rows {
let id_text = row?;
out.push(id_text.parse::<MemoryId>().map_err(|err| {
StoreError::Validation(format!(
"memory_supersessions returned unparseable source_memory_id `{id_text}`: {err}"
))
})?);
}
Ok(out)
}
pub fn list_episode_sources_for(
&self,
summary_memory_id: &MemoryId,
) -> StoreResult<Vec<EpisodeId>> {
let mut stmt = self.pool.prepare(
"SELECT source_episode_id FROM episode_supersessions
WHERE summary_memory_id = ?1
ORDER BY source_episode_id;",
)?;
let rows = stmt.query_map(params![summary_memory_id.to_string()], |row| {
row.get::<_, String>(0)
})?;
let mut out = Vec::new();
for row in rows {
let id_text = row?;
out.push(id_text.parse::<EpisodeId>().map_err(|err| {
StoreError::Validation(format!(
"episode_supersessions returned unparseable source_episode_id `{id_text}`: {err}"
))
})?);
}
Ok(out)
}
}
const DECAY_JOB_SELECT_SQL_BY_ID: &str =
"SELECT id, kind, summary_method, source_ids_json, state, state_reason,
result_memory_id, scheduled_for, created_at, created_by, updated_at
FROM decay_jobs
WHERE id = ?1;";
#[derive(Debug)]
struct DecayJobRowRaw {
id: String,
kind: String,
summary_method: String,
source_ids_json: String,
state: String,
state_reason: Option<String>,
result_memory_id: Option<String>,
scheduled_for: String,
created_at: String,
created_by: String,
updated_at: String,
}
fn decay_job_row(row: &Row<'_>) -> rusqlite::Result<DecayJobRowRaw> {
Ok(DecayJobRowRaw {
id: row.get(0)?,
kind: row.get(1)?,
summary_method: row.get(2)?,
source_ids_json: row.get(3)?,
state: row.get(4)?,
state_reason: row.get(5)?,
result_memory_id: row.get(6)?,
scheduled_for: row.get(7)?,
created_at: row.get(8)?,
created_by: row.get(9)?,
updated_at: row.get(10)?,
})
}
fn collect_records<F>(rows: rusqlite::MappedRows<'_, F>) -> StoreResult<Vec<DecayJobRecord>>
where
F: FnMut(&Row<'_>) -> rusqlite::Result<DecayJobRowRaw>,
{
let mut records = Vec::new();
for row in rows {
records.push(row?.try_into()?);
}
Ok(records)
}
impl TryFrom<DecayJobRowRaw> for DecayJobRecord {
type Error = StoreError;
fn try_from(raw: DecayJobRowRaw) -> StoreResult<Self> {
let source_ids_json: Value = serde_json::from_str(&raw.source_ids_json)?;
let result_memory_id = raw
.result_memory_id
.map(|raw| raw.parse::<MemoryId>().map_err(StoreError::from))
.transpose()?;
Ok(Self {
id: raw.id.parse()?,
kind_wire: raw.kind,
summary_method_wire: raw.summary_method,
source_ids_json,
state_wire: raw.state,
state_reason: raw.state_reason,
result_memory_id,
scheduled_for: DateTime::parse_from_rfc3339(&raw.scheduled_for)?.with_timezone(&Utc),
created_at: DateTime::parse_from_rfc3339(&raw.created_at)?.with_timezone(&Utc),
created_by: raw.created_by,
updated_at: DateTime::parse_from_rfc3339(&raw.updated_at)?.with_timezone(&Utc),
})
}
}
fn validate_record(record: &DecayJobRecord) -> StoreResult<()> {
if record.created_by.trim().is_empty() {
return Err(StoreError::Validation(
"decay job created_by must not be empty".into(),
));
}
validate_kind_wire(&record.kind_wire)?;
validate_summary_method_wire(&record.kind_wire, &record.summary_method_wire)?;
validate_state_payload(
&record.state_wire,
record.state_reason.as_deref(),
record.result_memory_id.as_ref(),
)?;
if !record.source_ids_json.is_array() {
return Err(StoreError::Validation(
"decay job source_ids_json must be a JSON array".into(),
));
}
let array = record.source_ids_json.as_array().unwrap();
if array.is_empty() {
return Err(StoreError::Validation(
"decay job source_ids_json must contain at least one id".into(),
));
}
if !array.iter().all(Value::is_string) {
return Err(StoreError::Validation(
"decay job source_ids_json entries must all be strings".into(),
));
}
Ok(())
}
fn validate_kind_wire(kind_wire: &str) -> StoreResult<()> {
matches!(
kind_wire,
"episode_compression" | "candidate_compression" | "expired_principle_review"
)
.then_some(())
.ok_or_else(|| StoreError::Validation(format!("invalid decay job kind wire `{kind_wire}`")))
}
fn validate_summary_method_wire(kind_wire: &str, method_wire: &str) -> StoreResult<()> {
let known = matches!(
method_wire,
"deterministic_concatenate" | "llm_summary" | "none"
);
if !known {
return Err(StoreError::Validation(format!(
"invalid decay job summary method wire `{method_wire}`"
)));
}
if kind_wire == "expired_principle_review" && method_wire != SUMMARY_METHOD_NONE_WIRE {
return Err(StoreError::Validation(format!(
"expired_principle_review decay job must carry summary_method='{SUMMARY_METHOD_NONE_WIRE}', got '{method_wire}'"
)));
}
if kind_wire != "expired_principle_review" && method_wire == SUMMARY_METHOD_NONE_WIRE {
return Err(StoreError::Validation(format!(
"decay job kind `{kind_wire}` must carry a summary method (not 'none')"
)));
}
Ok(())
}
fn validate_state_payload(
state_wire: &str,
state_reason: Option<&str>,
result_memory_id: Option<&MemoryId>,
) -> StoreResult<()> {
match state_wire {
"pending" | "in_progress" | "cancelled" => {
if state_reason.is_some() {
return Err(StoreError::Validation(format!(
"decay job state `{state_wire}` must not carry state_reason"
)));
}
if result_memory_id.is_some() {
return Err(StoreError::Validation(format!(
"decay job state `{state_wire}` must not carry result_memory_id"
)));
}
}
"completed" => {
if state_reason.is_some() {
return Err(StoreError::Validation(
"decay job completed state must not carry state_reason".into(),
));
}
}
"failed" => {
let reason = state_reason.ok_or_else(|| {
StoreError::Validation(
"decay job failed state requires a non-empty state_reason".into(),
)
})?;
if reason.trim().is_empty() {
return Err(StoreError::Validation(
"decay job failed state requires a non-empty state_reason".into(),
));
}
if result_memory_id.is_some() {
return Err(StoreError::Validation(
"decay job failed state must not carry result_memory_id".into(),
));
}
}
other => {
return Err(StoreError::Validation(format!(
"invalid decay job state wire `{other}`"
)))
}
}
Ok(())
}