Skip to main content

cortex_store/
mirror.rs

1//! JSONL <-> SQLite event mirror consistency and recovery.
2//!
3//! The JSONL leg is the disaster-recovery source for immutable raw events.
4//! This module keeps the boundary small: verify event-set parity, replay
5//! acknowledged JSONL rows into SQLite, and provide a mirrored append path
6//! for callers that need the JSONL fsync to happen before the SQLite commit.
7
8use std::collections::BTreeMap;
9use std::path::Path;
10
11use chrono::{DateTime, Utc};
12use cortex_core::{
13    compose_policy_outcomes, Attestor, Event, EventId, EventSource, EventType, PolicyContribution,
14    PolicyDecision, PolicyOutcome, TraceId,
15};
16use cortex_ledger::{seal, JsonlLog};
17use rusqlite::{params, OptionalExtension, Row};
18
19use crate::{Pool, StoreError, StoreResult};
20
21/// Required contributor rule id documenting that the JSONL <-> SQLite
22/// parity invariant (BUILD_SPEC §7) composed into the policy decision for
23/// a mirrored append. The mirror refuses a final outcome of `Reject` or
24/// `Quarantine` so a parity violation cannot enter the durable event
25/// set.
26pub const MIRROR_APPEND_PARITY_INVARIANT_RULE_ID: &str = "mirror.append.parity_invariant";
27
28/// Difference for an event id present in both stores but with different content.
29#[derive(Debug, Clone, PartialEq, Eq)]
30pub struct EventMismatch {
31    /// Event id whose content diverged.
32    pub id: EventId,
33    /// Event hash read from JSONL.
34    pub jsonl_event_hash: String,
35    /// Event hash read from SQLite.
36    pub sqlite_event_hash: String,
37}
38
39/// Event-set parity report between JSONL and SQLite.
40#[derive(Debug, Clone, PartialEq, Eq)]
41pub struct EventSetParity {
42    /// Number of unique event ids read from JSONL.
43    pub jsonl_event_count: usize,
44    /// Number of event rows read from SQLite.
45    pub sqlite_event_count: usize,
46    /// Event ids present in JSONL but missing from SQLite.
47    pub missing_in_sqlite: Vec<EventId>,
48    /// Event ids present in SQLite but missing from JSONL.
49    pub missing_in_jsonl: Vec<EventId>,
50    /// Event ids present in both stores with non-identical event rows.
51    pub mismatched: Vec<EventMismatch>,
52}
53
54impl EventSetParity {
55    /// Returns true when both stores contain the same event ids and rows.
56    #[must_use]
57    pub fn is_consistent(&self) -> bool {
58        self.missing_in_sqlite.is_empty()
59            && self.missing_in_jsonl.is_empty()
60            && self.mismatched.is_empty()
61    }
62}
63
64/// Replay outcome for JSONL recovery into SQLite.
65#[derive(Debug, Clone, PartialEq, Eq)]
66pub struct ReplayReport {
67    /// JSONL events inserted into SQLite.
68    pub replayed: usize,
69    /// JSONL events already present in SQLite with identical content.
70    pub skipped_existing: usize,
71    /// Final parity after replay.
72    pub parity: EventSetParity,
73}
74
75/// Appends one event to JSONL and then commits the same sealed event to SQLite.
76///
77/// JSONL is fsynced by [`JsonlLog::append`] before the SQLite transaction
78/// commits. If the process dies after JSONL append but before SQLite commit,
79/// [`replay_jsonl_into_sqlite`] reconstructs the missing SQLite row.
80///
81/// `ledger_policy` is the composed [`PolicyDecision`] inherited from the
82/// underlying unsigned ledger append (see [`JsonlLog::append`] for the
83/// required contributor set). `mirror_policy` is the composed
84/// [`PolicyDecision`] for the JSONL <-> SQLite parity invariant; the
85/// mirror refuses a final outcome of `Reject` or `Quarantine` so a parity
86/// violation cannot enter the durable event set.
87pub fn append_event(
88    log: &mut JsonlLog,
89    pool: &mut Pool,
90    event: Event,
91    ledger_policy: &PolicyDecision,
92    mirror_policy: &PolicyDecision,
93) -> StoreResult<Event> {
94    require_mirror_parity_contributor(mirror_policy)?;
95    require_mirror_final_outcome(mirror_policy, "mirror.append")?;
96
97    if let Some(existing) = select_event_by_id(pool, &event.id)? {
98        return Err(StoreError::Validation(format!(
99            "event id `{}` already exists in SQLite with hash `{}`",
100            existing.id, existing.event_hash
101        )));
102    }
103
104    let mut sealed = event.clone();
105    sealed.prev_event_hash = log.head().map(str::to_owned);
106    seal(&mut sealed);
107
108    let appended_head = log.append(event, ledger_policy).map_err(jsonl_error)?;
109    if appended_head != sealed.event_hash {
110        return Err(StoreError::Validation(format!(
111            "JSONL append head `{appended_head}` did not match sealed event `{}`",
112            sealed.event_hash
113        )));
114    }
115
116    let tx = pool.transaction()?;
117    insert_event(&tx, &sealed)?;
118    tx.commit()?;
119    Ok(sealed)
120}
121
122/// Appends one signed event to JSONL and then commits the same sealed event to SQLite.
123///
124/// This is the signed variant of [`append_event`]. The duplicate-id preflight,
125/// JSONL-head check, and exact sealed SQLite insert intentionally match the
126/// unsigned mirror path while using [`JsonlLog::append_signed`] for the JSONL row.
127///
128/// `ledger_policy` is the composed [`PolicyDecision`] inherited from the
129/// underlying signed ledger append (see [`JsonlLog::append_signed`] for
130/// the required contributor set including ADR 0023 current-use
131/// revalidation and the ADR 0019 trust-tier minimum). `mirror_policy` is
132/// the composed [`PolicyDecision`] for the JSONL <-> SQLite parity
133/// invariant; the mirror refuses a final outcome of `Reject` or
134/// `Quarantine` so a parity violation cannot enter the durable event set.
135pub fn append_signed_event(
136    log: &mut JsonlLog,
137    pool: &mut Pool,
138    event: Event,
139    attestor: &dyn Attestor,
140    ledger_policy: &PolicyDecision,
141    mirror_policy: &PolicyDecision,
142) -> StoreResult<Event> {
143    require_mirror_parity_contributor(mirror_policy)?;
144    require_mirror_final_outcome(mirror_policy, "mirror.append_signed")?;
145
146    if let Some(existing) = select_event_by_id(pool, &event.id)? {
147        return Err(StoreError::Validation(format!(
148            "event id `{}` already exists in SQLite with hash `{}`",
149            existing.id, existing.event_hash
150        )));
151    }
152
153    let mut sealed = event.clone();
154    sealed.prev_event_hash = log.head().map(str::to_owned);
155    seal(&mut sealed);
156
157    let appended_head = log
158        .append_signed(event, attestor, ledger_policy)
159        .map_err(jsonl_error)?;
160    if appended_head != sealed.event_hash {
161        return Err(StoreError::Validation(format!(
162            "signed JSONL append head `{appended_head}` did not match sealed event `{}`",
163            sealed.event_hash
164        )));
165    }
166
167    let tx = pool.transaction()?;
168    insert_event(&tx, &sealed)?;
169    tx.commit()?;
170    Ok(sealed)
171}
172
173/// Mirror exactly one already-sealed event into SQLite.
174///
175/// Used by the schema v2 atomic cutover (`cortex migrate v2`) to insert the
176/// boundary `schema_migration.v1_to_v2` row into SQLite after the JSONL
177/// append has fsynced. Idempotent: an identical row already present in
178/// SQLite is treated as a no-op; a same-id row with diverging content fails
179/// closed so a partial mirror cannot silently overwrite immutable raw rows.
180///
181/// This helper is intentionally narrow — full mirror replay belongs in
182/// [`replay_jsonl_into_sqlite`], which discovers missing rows by walking the
183/// JSONL log. The cutover path already has the boundary event in hand and
184/// must not implicitly back-fill unrelated rows.
185pub fn mirror_single_event_into_sqlite(pool: &mut Pool, event: &Event) -> StoreResult<()> {
186    let tx = pool.transaction()?;
187    match mirror_single_event_into_sqlite_in_tx(&tx, event)? {
188        MirrorSingleEventOutcome::Inserted | MirrorSingleEventOutcome::AlreadyPresent => {
189            tx.commit()?;
190            Ok(())
191        }
192    }
193}
194
195/// In-transaction outcome reported by [`mirror_single_event_into_sqlite_in_tx`].
196///
197/// The variants exist so callers in larger transactions (notably the schema v2
198/// atomic cutover) can decide whether to emit operator-facing evidence; the
199/// transactional behaviour is the same in both cases.
200#[derive(Debug, Clone, Copy, PartialEq, Eq)]
201pub enum MirrorSingleEventOutcome {
202    /// The event row was inserted into SQLite by this call.
203    Inserted,
204    /// An identical event row was already present; no INSERT was issued.
205    AlreadyPresent,
206}
207
208/// In-transaction variant of [`mirror_single_event_into_sqlite`] for callers
209/// that compose the SQLite-side mirror into a larger atomic cutover
210/// transaction (B1: schema v2 cutover, ADR 0033 §1 partial-mutation refusal).
211///
212/// Caller owns the surrounding `pool.transaction()` so a failure in any later
213/// in-tx step rolls back the boundary mirror in the same atomic unit. The
214/// idempotency contract matches [`mirror_single_event_into_sqlite`]:
215/// identical row → no-op; same-id row with diverging hash → fail closed.
216pub fn mirror_single_event_into_sqlite_in_tx(
217    tx: &rusqlite::Transaction<'_>,
218    event: &Event,
219) -> StoreResult<MirrorSingleEventOutcome> {
220    match select_event_by_id(tx, &event.id)? {
221        Some(existing) if existing == *event => Ok(MirrorSingleEventOutcome::AlreadyPresent),
222        Some(existing) => Err(StoreError::Validation(format!(
223            "event id `{}` already mirrored with diverging hash `{}` (expected `{}`); refusing to overwrite",
224            event.id, existing.event_hash, event.event_hash
225        ))),
226        None => {
227            insert_event(tx, event)?;
228            Ok(MirrorSingleEventOutcome::Inserted)
229        }
230    }
231}
232
233fn require_mirror_parity_contributor(policy: &PolicyDecision) -> StoreResult<()> {
234    let contains_rule = policy
235        .contributing
236        .iter()
237        .chain(policy.discarded.iter())
238        .any(|contribution| {
239            contribution.rule_id.as_str() == MIRROR_APPEND_PARITY_INVARIANT_RULE_ID
240        });
241    if contains_rule {
242        Ok(())
243    } else {
244        Err(StoreError::Validation(format!(
245            "policy decision missing required contributor `{MIRROR_APPEND_PARITY_INVARIANT_RULE_ID}`; caller skipped ADR 0026 composition",
246        )))
247    }
248}
249
250fn require_mirror_final_outcome(policy: &PolicyDecision, surface: &str) -> StoreResult<()> {
251    match policy.final_outcome {
252        PolicyOutcome::Allow | PolicyOutcome::Warn | PolicyOutcome::BreakGlass => Ok(()),
253        PolicyOutcome::Quarantine | PolicyOutcome::Reject => Err(StoreError::Validation(format!(
254            "{surface} preflight: composed policy outcome {:?} blocks mirrored append",
255            policy.final_outcome,
256        ))),
257    }
258}
259
260/// Build a [`PolicyDecision`] that satisfies the mirror parity invariant
261/// gate for [`append_event`] / [`append_signed_event`]. Intended for tests
262/// and fixtures only; production callers MUST compose
263/// [`MIRROR_APPEND_PARITY_INVARIANT_RULE_ID`] from real parity evidence.
264#[must_use]
265pub fn mirror_policy_decision_test_allow() -> PolicyDecision {
266    compose_policy_outcomes(
267        vec![PolicyContribution::new(
268            MIRROR_APPEND_PARITY_INVARIANT_RULE_ID,
269            PolicyOutcome::Allow,
270            "test fixture: JSONL <-> SQLite parity invariant satisfied",
271        )
272        .expect("static test contribution is valid")],
273        None,
274    )
275}
276
277/// Replays JSONL events missing from SQLite.
278///
279/// Existing identical rows are skipped, which makes recovery idempotent.
280/// Existing same-id rows with different content fail closed instead of
281/// silently overwriting immutable raw events.
282pub fn replay_jsonl_into_sqlite(
283    pool: &mut Pool,
284    jsonl_path: impl AsRef<Path>,
285) -> StoreResult<ReplayReport> {
286    let jsonl_path = jsonl_path.as_ref();
287    let jsonl_events = read_jsonl_events(jsonl_path)?;
288    let mut replayed = 0;
289    let mut skipped_existing = 0;
290
291    {
292        let tx = pool.transaction()?;
293        for event in jsonl_events.values() {
294            match select_event_by_id(&tx, &event.id)? {
295                Some(existing) if existing == *event => skipped_existing += 1,
296                Some(existing) => {
297                    return Err(StoreError::Validation(format!(
298                        "event id `{}` differs between JSONL hash `{}` and SQLite hash `{}`",
299                        event.id, event.event_hash, existing.event_hash
300                    )));
301                }
302                None => {
303                    insert_event(&tx, event)?;
304                    replayed += 1;
305                }
306            }
307        }
308        tx.commit()?;
309    }
310
311    let parity = verify_event_set_parity(pool, jsonl_path)?;
312    Ok(ReplayReport {
313        replayed,
314        skipped_existing,
315        parity,
316    })
317}
318
319/// Verifies that JSONL and SQLite contain the same immutable event set.
320pub fn verify_event_set_parity(
321    pool: &Pool,
322    jsonl_path: impl AsRef<Path>,
323) -> StoreResult<EventSetParity> {
324    let jsonl_events = read_jsonl_events(jsonl_path.as_ref())?;
325    let sqlite_events = read_sqlite_events(pool)?;
326
327    let mut missing_in_sqlite = Vec::new();
328    let mut missing_in_jsonl = Vec::new();
329    let mut mismatched = Vec::new();
330
331    for (id, jsonl_event) in &jsonl_events {
332        match sqlite_events.get(id) {
333            Some(sqlite_event) if sqlite_event == jsonl_event => {}
334            Some(sqlite_event) => mismatched.push(EventMismatch {
335                id: jsonl_event.id,
336                jsonl_event_hash: jsonl_event.event_hash.clone(),
337                sqlite_event_hash: sqlite_event.event_hash.clone(),
338            }),
339            None => missing_in_sqlite.push(jsonl_event.id),
340        }
341    }
342
343    for (id, sqlite_event) in &sqlite_events {
344        if !jsonl_events.contains_key(id) {
345            missing_in_jsonl.push(sqlite_event.id);
346        }
347    }
348
349    Ok(EventSetParity {
350        jsonl_event_count: jsonl_events.len(),
351        sqlite_event_count: sqlite_events.len(),
352        missing_in_sqlite,
353        missing_in_jsonl,
354        mismatched,
355    })
356}
357
358fn read_jsonl_events(path: &Path) -> StoreResult<BTreeMap<String, Event>> {
359    if !path.is_file() {
360        return Err(StoreError::Validation(format!(
361            "JSONL mirror does not exist at `{}`",
362            path.display()
363        )));
364    }
365
366    let log = JsonlLog::open(path).map_err(jsonl_error)?;
367    log.verify_chain().map_err(jsonl_error)?;
368    let mut events = BTreeMap::new();
369    for item in log.iter().map_err(jsonl_error)? {
370        let event = item.map_err(jsonl_error)?;
371        let id = event.id.to_string();
372        if let Some(existing) = events.insert(id.clone(), event.clone()) {
373            return Err(StoreError::Validation(format!(
374                "duplicate event id `{id}` in JSONL mirror: hashes `{}` and `{}`",
375                existing.event_hash, event.event_hash
376            )));
377        }
378    }
379    Ok(events)
380}
381
382fn read_sqlite_events(pool: &Pool) -> StoreResult<BTreeMap<String, Event>> {
383    let mut stmt = pool.prepare("SELECT id FROM events ORDER BY id;")?;
384    let ids = stmt
385        .query_map([], |row| row.get::<_, String>(0))?
386        .collect::<Result<Vec<_>, _>>()?;
387
388    let mut events = BTreeMap::new();
389    for id in ids {
390        let event = select_event_by_id(pool, &id.parse::<EventId>()?)?.ok_or_else(|| {
391            StoreError::Validation(format!("event id `{id}` disappeared during parity read"))
392        })?;
393        events.insert(id, event);
394    }
395    Ok(events)
396}
397
398fn insert_event(conn: &rusqlite::Connection, event: &Event) -> StoreResult<()> {
399    let source_json = serde_json::to_string(&event.source)?;
400    let domain_tags_json = serde_json::to_string(&event.domain_tags)?;
401    let payload_json = serde_json::to_string(&event.payload)?;
402    let trace_id = event.trace_id.map(|id| id.to_string());
403
404    conn.execute(
405        "INSERT INTO events (
406            id, schema_version, observed_at, recorded_at, source_json,
407            event_type, trace_id, session_id, domain_tags_json, payload_json,
408            payload_hash, prev_event_hash, event_hash
409         ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13);",
410        params![
411            event.id.to_string(),
412            i64::from(event.schema_version),
413            event.observed_at.to_rfc3339(),
414            event.recorded_at.to_rfc3339(),
415            source_json,
416            event.event_type.wire_str(),
417            trace_id,
418            event.session_id,
419            domain_tags_json,
420            payload_json,
421            event.payload_hash,
422            event.prev_event_hash,
423            event.event_hash,
424        ],
425    )?;
426    Ok(())
427}
428
429fn select_event_by_id(conn: &rusqlite::Connection, id: &EventId) -> StoreResult<Option<Event>> {
430    conn.query_row(
431        "SELECT id, schema_version, observed_at, recorded_at, source_json,
432                event_type, trace_id, session_id, domain_tags_json, payload_json,
433                payload_hash, prev_event_hash, event_hash
434         FROM events
435         WHERE id = ?1;",
436        params![id.to_string()],
437        event_row,
438    )
439    .optional()?
440    .map(TryInto::try_into)
441    .transpose()
442}
443
444#[derive(Debug)]
445struct EventRow {
446    id: String,
447    schema_version: i64,
448    observed_at: String,
449    recorded_at: String,
450    source_json: String,
451    event_type: String,
452    trace_id: Option<String>,
453    session_id: Option<String>,
454    domain_tags_json: String,
455    payload_json: String,
456    payload_hash: String,
457    prev_event_hash: Option<String>,
458    event_hash: String,
459}
460
461fn event_row(row: &Row<'_>) -> rusqlite::Result<EventRow> {
462    Ok(EventRow {
463        id: row.get(0)?,
464        schema_version: row.get(1)?,
465        observed_at: row.get(2)?,
466        recorded_at: row.get(3)?,
467        source_json: row.get(4)?,
468        event_type: row.get(5)?,
469        trace_id: row.get(6)?,
470        session_id: row.get(7)?,
471        domain_tags_json: row.get(8)?,
472        payload_json: row.get(9)?,
473        payload_hash: row.get(10)?,
474        prev_event_hash: row.get(11)?,
475        event_hash: row.get(12)?,
476    })
477}
478
479impl TryFrom<EventRow> for Event {
480    type Error = StoreError;
481
482    fn try_from(row: EventRow) -> StoreResult<Self> {
483        let schema_version = u16::try_from(row.schema_version).map_err(|_| {
484            StoreError::Validation(format!(
485                "invalid event schema_version {}",
486                row.schema_version
487            ))
488        })?;
489
490        Ok(Self {
491            id: row.id.parse::<EventId>()?,
492            schema_version,
493            observed_at: parse_utc(&row.observed_at)?,
494            recorded_at: parse_utc(&row.recorded_at)?,
495            source: serde_json::from_str::<EventSource>(&row.source_json)?,
496            event_type: serde_json::from_value::<EventType>(serde_json::Value::String(
497                row.event_type,
498            ))?,
499            trace_id: row.trace_id.map(|id| id.parse::<TraceId>()).transpose()?,
500            session_id: row.session_id,
501            domain_tags: serde_json::from_str(&row.domain_tags_json)?,
502            payload: serde_json::from_str(&row.payload_json)?,
503            payload_hash: row.payload_hash,
504            prev_event_hash: row.prev_event_hash,
505            event_hash: row.event_hash,
506        })
507    }
508}
509
510fn parse_utc(value: &str) -> StoreResult<DateTime<Utc>> {
511    Ok(DateTime::parse_from_rfc3339(value)?.with_timezone(&Utc))
512}
513
514fn jsonl_error(err: cortex_ledger::JsonlError) -> StoreError {
515    StoreError::Validation(err.to_string())
516}