crtx-store 0.1.1

SQLite persistence: migrations, repositories, transactions.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
//! Phase 4.D decay job repository.
//!
//! Storage substrate for operator-fired scheduled compression /
//! re-promotion review jobs. The typed state machine lives in
//! `cortex_memory::decay`; this repo owns the persistence shape so the
//! cortex-memory crate stays free of `rusqlite`.
//!
//! Because cortex-memory already depends on cortex-store, this repo cannot
//! import the typed enums directly without creating a cycle. The repo
//! therefore exposes a *wire-shape* record [`DecayJobRecord`] (strings +
//! cortex-core IDs only) and the cortex-memory crate provides
//! `impl From<DecayJob> for DecayJobRecord` / `impl TryFrom<DecayJobRecord>
//! for DecayJob`. The wire alphabet matches the SQLite `CHECK (... IN
//! (...))` constraints in `migrations/007_decay_jobs.sql` exactly.
//!
//! Phase 4.D guardrail: raw events and the hash chain remain unchanged.
//! This repo NEVER writes to `events` or `trace_events`; it only manipulates
//! rows in `decay_jobs`. Provenance for any summary memory the job produces
//! is carried by the existing `memories.source_episodes_json` /
//! `memories.source_events_json` columns, not by anything in this table.

use chrono::{DateTime, Utc};
use cortex_core::{DecayJobId, EpisodeId, MemoryId};
use rusqlite::{params, OptionalExtension, Row};
use serde_json::Value;

use crate::{Pool, StoreError, StoreResult};

/// Wire token reserved for the `summary_method` column when the job kind
/// does not carry a summary method (currently only
/// `expired_principle_review`). Must stay in sync with
/// `cortex_memory::decay::SUMMARY_METHOD_NONE_WIRE` and the SQLite `CHECK`
/// constraint alphabet in `007_decay_jobs.sql`.
pub const SUMMARY_METHOD_NONE_WIRE: &str = "none";

/// Wire-shape decay job record used by [`DecayJobRepo`].
///
/// Fields are persistence-shaped strings (with cortex-core newtype IDs
/// where stable). Higher-level callers should round-trip through
/// `cortex_memory::decay::DecayJob`.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct DecayJobRecord {
    /// Stable decay job identifier.
    pub id: DecayJobId,
    /// Wire discriminator: `episode_compression` | `candidate_compression`
    /// | `expired_principle_review`.
    pub kind_wire: String,
    /// Wire discriminator: `deterministic_concatenate` | `llm_summary` |
    /// [`SUMMARY_METHOD_NONE_WIRE`].
    pub summary_method_wire: String,
    /// JSON-encoded array of source ids; shape depends on `kind_wire`.
    pub source_ids_json: Value,
    /// Wire discriminator: `pending` | `in_progress` | `completed` |
    /// `failed` | `cancelled`.
    pub state_wire: String,
    /// Operator-visible failure reason. Present only for `state_wire =
    /// "failed"`.
    pub state_reason: Option<String>,
    /// Summary memory the job produced. Present only for `state_wire =
    /// "completed"` with a kind that lands a memory.
    pub result_memory_id: Option<MemoryId>,
    /// When the job becomes eligible to run.
    pub scheduled_for: DateTime<Utc>,
    /// When the job was created.
    pub created_at: DateTime<Utc>,
    /// Operator principal who created the job.
    pub created_by: String,
    /// When the row was last mutated.
    pub updated_at: DateTime<Utc>,
}

/// Decay job repository over an open SQLite connection.
///
/// The substrate does not enforce a state machine — that lives in the worker
/// (Phase 4.D D3-B). The repo only persists and reads typed rows.
#[derive(Debug)]
pub struct DecayJobRepo<'a> {
    pool: &'a Pool,
}

impl<'a> DecayJobRepo<'a> {
    /// Creates a repository wrapper over an open pool.
    #[must_use]
    pub const fn new(pool: &'a Pool) -> Self {
        Self { pool }
    }

    /// Inserts a new decay job row.
    pub fn insert(&self, record: &DecayJobRecord) -> StoreResult<()> {
        validate_record(record)?;
        self.pool.execute(
            "INSERT INTO decay_jobs (
                id, kind, summary_method, source_ids_json, state, state_reason,
                result_memory_id, scheduled_for, created_at, created_by, updated_at
             ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11);",
            params![
                record.id.to_string(),
                record.kind_wire,
                record.summary_method_wire,
                serde_json::to_string(&record.source_ids_json)?,
                record.state_wire,
                record.state_reason,
                record.result_memory_id.as_ref().map(ToString::to_string),
                record.scheduled_for.to_rfc3339(),
                record.created_at.to_rfc3339(),
                record.created_by,
                record.updated_at.to_rfc3339(),
            ],
        )?;
        Ok(())
    }

    /// Reads one decay job by id.
    pub fn read(&self, id: &DecayJobId) -> StoreResult<Option<DecayJobRecord>> {
        let row = self
            .pool
            .query_row(
                DECAY_JOB_SELECT_SQL_BY_ID,
                params![id.to_string()],
                decay_job_row,
            )
            .optional()?;
        row.map(TryInto::try_into).transpose()
    }

    /// Lists pending jobs whose `scheduled_for` is at or before `now`,
    /// ordered by `scheduled_for` then `id` for determinism. This is the
    /// worker pickup query and matches the
    /// `idx_decay_jobs_state_scheduled` composite index.
    pub fn list_pending_ready(&self, now: DateTime<Utc>) -> StoreResult<Vec<DecayJobRecord>> {
        let mut stmt = self.pool.prepare(
            "SELECT id, kind, summary_method, source_ids_json, state, state_reason,
                    result_memory_id, scheduled_for, created_at, created_by, updated_at
             FROM decay_jobs
             WHERE state = 'pending' AND scheduled_for <= ?1
             ORDER BY scheduled_for, id;",
        )?;
        let rows = stmt.query_map(params![now.to_rfc3339()], decay_job_row)?;
        collect_records(rows)
    }

    /// Lists jobs whose state matches the supplied wire token. The token
    /// MUST be one of the values in the `state` `CHECK` constraint.
    pub fn list_by_state(&self, state_wire: &str) -> StoreResult<Vec<DecayJobRecord>> {
        let mut stmt = self.pool.prepare(
            "SELECT id, kind, summary_method, source_ids_json, state, state_reason,
                    result_memory_id, scheduled_for, created_at, created_by, updated_at
             FROM decay_jobs
             WHERE state = ?1
             ORDER BY scheduled_for, id;",
        )?;
        let rows = stmt.query_map(params![state_wire], decay_job_row)?;
        collect_records(rows)
    }

    /// Updates the lifecycle state of one decay job.
    ///
    /// Returns an error when no row matched the id. The substrate refuses
    /// silent no-ops so a worker that targets a stale row surfaces the
    /// drift instead of silently dropping the transition.
    pub fn update_state(
        &self,
        id: &DecayJobId,
        state_wire: &str,
        state_reason: Option<&str>,
        result_memory_id: Option<&MemoryId>,
        updated_at: DateTime<Utc>,
    ) -> StoreResult<()> {
        validate_state_payload(state_wire, state_reason, result_memory_id)?;
        let changed = self.pool.execute(
            "UPDATE decay_jobs
             SET state = ?2, state_reason = ?3, result_memory_id = ?4, updated_at = ?5
             WHERE id = ?1;",
            params![
                id.to_string(),
                state_wire,
                state_reason,
                result_memory_id.map(ToString::to_string),
                updated_at.to_rfc3339(),
            ],
        )?;
        if changed == 0 {
            return Err(StoreError::Validation(format!(
                "decay job {id} not found for update_state"
            )));
        }
        Ok(())
    }

    /// Records that `summary_memory_id` was produced by compressing
    /// `source_memory_id`. Additive: the source memory row is NOT deleted
    /// and no existing memory row is mutated. Duplicate writes (same
    /// `(source, summary)` pair) are coalesced via the join table's
    /// composite primary key (`INSERT OR IGNORE`), so the call is
    /// idempotent.
    ///
    /// Refuses self-edges (source == summary) at the repo before SQL —
    /// the `CHECK` clause in migration 009 would catch this anyway, but
    /// surfacing it as a typed validation error keeps the caller's error
    /// message stable.
    ///
    /// `decay_job_id` is optional because the supersession edge is
    /// meaningful on its own (e.g. an operator-driven backfill could
    /// record the edge without an associated queued job). When present,
    /// the FK ties the edge to the job that wrote it for audit.
    pub fn record_memory_supersession(
        &self,
        source_memory_id: &MemoryId,
        summary_memory_id: &MemoryId,
        decay_job_id: Option<&DecayJobId>,
        recorded_at: DateTime<Utc>,
    ) -> StoreResult<()> {
        if source_memory_id == summary_memory_id {
            return Err(StoreError::Validation(
                "memory supersession requires distinct source and summary ids".into(),
            ));
        }
        self.pool.execute(
            "INSERT OR IGNORE INTO memory_supersessions
                (source_memory_id, summary_memory_id, decay_job_id, recorded_at)
             VALUES (?1, ?2, ?3, ?4);",
            params![
                source_memory_id.to_string(),
                summary_memory_id.to_string(),
                decay_job_id.map(ToString::to_string),
                recorded_at.to_rfc3339(),
            ],
        )?;
        Ok(())
    }

    /// Records that `summary_memory_id` was produced by compressing
    /// `source_episode_id`. Additive: the source episode row is NOT
    /// deleted and no existing episode row is mutated. Duplicate writes
    /// are coalesced via the join table's composite primary key
    /// (`INSERT OR IGNORE`), so the call is idempotent.
    pub fn record_episode_supersession(
        &self,
        source_episode_id: &EpisodeId,
        summary_memory_id: &MemoryId,
        decay_job_id: Option<&DecayJobId>,
        recorded_at: DateTime<Utc>,
    ) -> StoreResult<()> {
        self.pool.execute(
            "INSERT OR IGNORE INTO episode_supersessions
                (source_episode_id, summary_memory_id, decay_job_id, recorded_at)
             VALUES (?1, ?2, ?3, ?4);",
            params![
                source_episode_id.to_string(),
                summary_memory_id.to_string(),
                decay_job_id.map(ToString::to_string),
                recorded_at.to_rfc3339(),
            ],
        )?;
        Ok(())
    }

    /// Lists source memory ids that a given summary memory superseded,
    /// in deterministic id order. Read-side companion to
    /// [`Self::record_memory_supersession`] used by tests and the
    /// retrieval / CLI surfaces.
    pub fn list_memory_sources_for(
        &self,
        summary_memory_id: &MemoryId,
    ) -> StoreResult<Vec<MemoryId>> {
        let mut stmt = self.pool.prepare(
            "SELECT source_memory_id FROM memory_supersessions
             WHERE summary_memory_id = ?1
             ORDER BY source_memory_id;",
        )?;
        let rows = stmt.query_map(params![summary_memory_id.to_string()], |row| {
            row.get::<_, String>(0)
        })?;
        let mut out = Vec::new();
        for row in rows {
            let id_text = row?;
            out.push(id_text.parse::<MemoryId>().map_err(|err| {
                StoreError::Validation(format!(
                    "memory_supersessions returned unparseable source_memory_id `{id_text}`: {err}"
                ))
            })?);
        }
        Ok(out)
    }

    /// Lists source episode ids that a given summary memory superseded,
    /// in deterministic id order. Read-side companion to
    /// [`Self::record_episode_supersession`].
    pub fn list_episode_sources_for(
        &self,
        summary_memory_id: &MemoryId,
    ) -> StoreResult<Vec<EpisodeId>> {
        let mut stmt = self.pool.prepare(
            "SELECT source_episode_id FROM episode_supersessions
             WHERE summary_memory_id = ?1
             ORDER BY source_episode_id;",
        )?;
        let rows = stmt.query_map(params![summary_memory_id.to_string()], |row| {
            row.get::<_, String>(0)
        })?;
        let mut out = Vec::new();
        for row in rows {
            let id_text = row?;
            out.push(id_text.parse::<EpisodeId>().map_err(|err| {
                StoreError::Validation(format!(
                    "episode_supersessions returned unparseable source_episode_id `{id_text}`: {err}"
                ))
            })?);
        }
        Ok(out)
    }
}

const DECAY_JOB_SELECT_SQL_BY_ID: &str =
    "SELECT id, kind, summary_method, source_ids_json, state, state_reason,
            result_memory_id, scheduled_for, created_at, created_by, updated_at
     FROM decay_jobs
     WHERE id = ?1;";

#[derive(Debug)]
struct DecayJobRowRaw {
    id: String,
    kind: String,
    summary_method: String,
    source_ids_json: String,
    state: String,
    state_reason: Option<String>,
    result_memory_id: Option<String>,
    scheduled_for: String,
    created_at: String,
    created_by: String,
    updated_at: String,
}

fn decay_job_row(row: &Row<'_>) -> rusqlite::Result<DecayJobRowRaw> {
    Ok(DecayJobRowRaw {
        id: row.get(0)?,
        kind: row.get(1)?,
        summary_method: row.get(2)?,
        source_ids_json: row.get(3)?,
        state: row.get(4)?,
        state_reason: row.get(5)?,
        result_memory_id: row.get(6)?,
        scheduled_for: row.get(7)?,
        created_at: row.get(8)?,
        created_by: row.get(9)?,
        updated_at: row.get(10)?,
    })
}

fn collect_records<F>(rows: rusqlite::MappedRows<'_, F>) -> StoreResult<Vec<DecayJobRecord>>
where
    F: FnMut(&Row<'_>) -> rusqlite::Result<DecayJobRowRaw>,
{
    let mut records = Vec::new();
    for row in rows {
        records.push(row?.try_into()?);
    }
    Ok(records)
}

impl TryFrom<DecayJobRowRaw> for DecayJobRecord {
    type Error = StoreError;

    fn try_from(raw: DecayJobRowRaw) -> StoreResult<Self> {
        let source_ids_json: Value = serde_json::from_str(&raw.source_ids_json)?;
        let result_memory_id = raw
            .result_memory_id
            .map(|raw| raw.parse::<MemoryId>().map_err(StoreError::from))
            .transpose()?;
        Ok(Self {
            id: raw.id.parse()?,
            kind_wire: raw.kind,
            summary_method_wire: raw.summary_method,
            source_ids_json,
            state_wire: raw.state,
            state_reason: raw.state_reason,
            result_memory_id,
            scheduled_for: DateTime::parse_from_rfc3339(&raw.scheduled_for)?.with_timezone(&Utc),
            created_at: DateTime::parse_from_rfc3339(&raw.created_at)?.with_timezone(&Utc),
            created_by: raw.created_by,
            updated_at: DateTime::parse_from_rfc3339(&raw.updated_at)?.with_timezone(&Utc),
        })
    }
}

fn validate_record(record: &DecayJobRecord) -> StoreResult<()> {
    if record.created_by.trim().is_empty() {
        return Err(StoreError::Validation(
            "decay job created_by must not be empty".into(),
        ));
    }
    validate_kind_wire(&record.kind_wire)?;
    validate_summary_method_wire(&record.kind_wire, &record.summary_method_wire)?;
    validate_state_payload(
        &record.state_wire,
        record.state_reason.as_deref(),
        record.result_memory_id.as_ref(),
    )?;
    if !record.source_ids_json.is_array() {
        return Err(StoreError::Validation(
            "decay job source_ids_json must be a JSON array".into(),
        ));
    }
    let array = record.source_ids_json.as_array().unwrap();
    if array.is_empty() {
        return Err(StoreError::Validation(
            "decay job source_ids_json must contain at least one id".into(),
        ));
    }
    if !array.iter().all(Value::is_string) {
        return Err(StoreError::Validation(
            "decay job source_ids_json entries must all be strings".into(),
        ));
    }
    Ok(())
}

fn validate_kind_wire(kind_wire: &str) -> StoreResult<()> {
    matches!(
        kind_wire,
        "episode_compression" | "candidate_compression" | "expired_principle_review"
    )
    .then_some(())
    .ok_or_else(|| StoreError::Validation(format!("invalid decay job kind wire `{kind_wire}`")))
}

fn validate_summary_method_wire(kind_wire: &str, method_wire: &str) -> StoreResult<()> {
    let known = matches!(
        method_wire,
        "deterministic_concatenate" | "llm_summary" | "none"
    );
    if !known {
        return Err(StoreError::Validation(format!(
            "invalid decay job summary method wire `{method_wire}`"
        )));
    }
    if kind_wire == "expired_principle_review" && method_wire != SUMMARY_METHOD_NONE_WIRE {
        return Err(StoreError::Validation(format!(
            "expired_principle_review decay job must carry summary_method='{SUMMARY_METHOD_NONE_WIRE}', got '{method_wire}'"
        )));
    }
    if kind_wire != "expired_principle_review" && method_wire == SUMMARY_METHOD_NONE_WIRE {
        return Err(StoreError::Validation(format!(
            "decay job kind `{kind_wire}` must carry a summary method (not 'none')"
        )));
    }
    Ok(())
}

fn validate_state_payload(
    state_wire: &str,
    state_reason: Option<&str>,
    result_memory_id: Option<&MemoryId>,
) -> StoreResult<()> {
    match state_wire {
        "pending" | "in_progress" | "cancelled" => {
            if state_reason.is_some() {
                return Err(StoreError::Validation(format!(
                    "decay job state `{state_wire}` must not carry state_reason"
                )));
            }
            if result_memory_id.is_some() {
                return Err(StoreError::Validation(format!(
                    "decay job state `{state_wire}` must not carry result_memory_id"
                )));
            }
        }
        "completed" => {
            if state_reason.is_some() {
                return Err(StoreError::Validation(
                    "decay job completed state must not carry state_reason".into(),
                ));
            }
            // result_memory_id is optional on completed (None for
            // expired_principle_review).
        }
        "failed" => {
            let reason = state_reason.ok_or_else(|| {
                StoreError::Validation(
                    "decay job failed state requires a non-empty state_reason".into(),
                )
            })?;
            if reason.trim().is_empty() {
                return Err(StoreError::Validation(
                    "decay job failed state requires a non-empty state_reason".into(),
                ));
            }
            if result_memory_id.is_some() {
                return Err(StoreError::Validation(
                    "decay job failed state must not carry result_memory_id".into(),
                ));
            }
        }
        other => {
            return Err(StoreError::Validation(format!(
                "invalid decay job state wire `{other}`"
            )))
        }
    }
    Ok(())
}