crtx-store 0.1.0

SQLite persistence: migrations, repositories, transactions.
Documentation
//! Event repository operations.

use chrono::{DateTime, Utc};
use cortex_core::{
    Event, EventId, EventSource, EventType, SourceAttestation, TraceId,
    SCHEMA_MIGRATION_V1_TO_V2_TARGET,
};
use rusqlite::{params, OptionalExtension, Row};

use crate::{Pool, StoreError, StoreResult};

/// Repository for immutable event rows.
#[derive(Debug)]
pub struct EventRepo<'a> {
    pool: &'a Pool,
}

impl<'a> EventRepo<'a> {
    /// Creates an event repository over an open SQLite connection.
    #[must_use]
    pub const fn new(pool: &'a Pool) -> Self {
        Self { pool }
    }

    /// Appends one event row.
    ///
    /// Uses `INSERT OR IGNORE` so re-ingesting the same event id is a no-op
    /// (idempotent dual-write from the session-close pipeline).
    pub fn append(&self, event: &Event) -> StoreResult<()> {
        let source_json = serde_json::to_string(&event.source)?;
        let domain_tags_json = serde_json::to_string(&event.domain_tags)?;
        let payload_json = serde_json::to_string(&event.payload)?;
        let trace_id = event.trace_id.map(|id| id.to_string());

        self.pool.execute(
            "INSERT OR IGNORE INTO events (
                id, schema_version, observed_at, recorded_at, source_json,
                event_type, trace_id, session_id, domain_tags_json, payload_json,
                payload_hash, prev_event_hash, event_hash
             ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13);",
            params![
                event.id.to_string(),
                i64::from(event.schema_version),
                event.observed_at.to_rfc3339(),
                event.recorded_at.to_rfc3339(),
                source_json,
                event.event_type.wire_str(),
                trace_id,
                event.session_id,
                domain_tags_json,
                payload_json,
                event.payload_hash,
                event.prev_event_hash,
                event.event_hash,
            ],
        )?;

        Ok(())
    }

    /// Appends one opt-in schema-v2 event row with explicit source attestation.
    ///
    /// This is a Lane S2 cutover-readiness API. It is not called from the default
    /// v1 append path while `cortex_core::SCHEMA_VERSION` remains 1.
    pub fn append_schema_v2_with_source_attestation(
        &self,
        event: &Event,
        source_attestation: &SourceAttestation,
    ) -> StoreResult<()> {
        if event.schema_version != SCHEMA_MIGRATION_V1_TO_V2_TARGET {
            return Err(StoreError::Validation(format!(
                "schema-v2 event append requires schema_version {}; found {}",
                SCHEMA_MIGRATION_V1_TO_V2_TARGET, event.schema_version
            )));
        }
        if matches!(source_attestation, SourceAttestation::Missing) {
            return Err(StoreError::Validation(
                "schema-v2 event append requires explicit source attestation; missing is not persistable"
                    .into(),
            ));
        }

        let source_json = serde_json::to_string(&event.source)?;
        let source_attestation_json = serde_json::to_string(source_attestation)?;
        let domain_tags_json = serde_json::to_string(&event.domain_tags)?;
        let payload_json = serde_json::to_string(&event.payload)?;
        let trace_id = event.trace_id.map(|id| id.to_string());

        self.pool.execute(
            "INSERT INTO events (
                id, schema_version, observed_at, recorded_at, source_json,
                event_type, trace_id, session_id, domain_tags_json, payload_json,
                payload_hash, prev_event_hash, event_hash, source_attestation_json
             ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14);",
            params![
                event.id.to_string(),
                i64::from(event.schema_version),
                event.observed_at.to_rfc3339(),
                event.recorded_at.to_rfc3339(),
                source_json,
                event.event_type.wire_str(),
                trace_id,
                event.session_id,
                domain_tags_json,
                payload_json,
                event.payload_hash,
                event.prev_event_hash,
                event.event_hash,
                source_attestation_json,
            ],
        )?;

        Ok(())
    }

    /// Fetches an event by id.
    pub fn get_by_id(&self, id: &EventId) -> StoreResult<Option<Event>> {
        let row = self
            .pool
            .query_row(
                "SELECT id, schema_version, observed_at, recorded_at, source_json,
                        event_type, trace_id, session_id, domain_tags_json, payload_json,
                        payload_hash, prev_event_hash, event_hash
                 FROM events
                 WHERE id = ?1;",
                params![id.to_string()],
                event_row,
            )
            .optional()?;

        row.map(TryInto::try_into).transpose()
    }

    /// Returns the newest event hash-chain head, if any.
    pub fn chain_head(&self) -> StoreResult<Option<Event>> {
        let row = self
            .pool
            .query_row(
                "SELECT e.id, e.schema_version, e.observed_at, e.recorded_at, e.source_json,
                        e.event_type, e.trace_id, e.session_id, e.domain_tags_json,
                        e.payload_json, e.payload_hash, e.prev_event_hash, e.event_hash
                 FROM events e
                 WHERE NOT EXISTS (
                     SELECT 1 FROM events child WHERE child.prev_event_hash = e.event_hash
                 )
                 ORDER BY e.recorded_at DESC, e.id DESC
                 LIMIT 1;",
                [],
                event_row,
            )
            .optional()?;

        row.map(TryInto::try_into).transpose()
    }
}

#[derive(Debug)]
struct EventRow {
    id: String,
    schema_version: i64,
    observed_at: String,
    recorded_at: String,
    source_json: String,
    event_type: String,
    trace_id: Option<String>,
    session_id: Option<String>,
    domain_tags_json: String,
    payload_json: String,
    payload_hash: String,
    prev_event_hash: Option<String>,
    event_hash: String,
}

fn event_row(row: &Row<'_>) -> rusqlite::Result<EventRow> {
    Ok(EventRow {
        id: row.get(0)?,
        schema_version: row.get(1)?,
        observed_at: row.get(2)?,
        recorded_at: row.get(3)?,
        source_json: row.get(4)?,
        event_type: row.get(5)?,
        trace_id: row.get(6)?,
        session_id: row.get(7)?,
        domain_tags_json: row.get(8)?,
        payload_json: row.get(9)?,
        payload_hash: row.get(10)?,
        prev_event_hash: row.get(11)?,
        event_hash: row.get(12)?,
    })
}

impl TryFrom<EventRow> for Event {
    type Error = StoreError;

    fn try_from(row: EventRow) -> StoreResult<Self> {
        let schema_version = u16::try_from(row.schema_version).map_err(|_| {
            StoreError::Validation(format!(
                "invalid event schema_version {}",
                row.schema_version
            ))
        })?;

        Ok(Self {
            id: row.id.parse::<EventId>()?,
            schema_version,
            observed_at: parse_utc(&row.observed_at)?,
            recorded_at: parse_utc(&row.recorded_at)?,
            source: serde_json::from_str::<EventSource>(&row.source_json)?,
            event_type: serde_json::from_value::<EventType>(serde_json::Value::String(
                row.event_type,
            ))?,
            trace_id: row.trace_id.map(|id| id.parse::<TraceId>()).transpose()?,
            session_id: row.session_id,
            domain_tags: serde_json::from_str(&row.domain_tags_json)?,
            payload: serde_json::from_str(&row.payload_json)?,
            payload_hash: row.payload_hash,
            prev_event_hash: row.prev_event_hash,
            event_hash: row.event_hash,
        })
    }
}

fn parse_utc(value: &str) -> StoreResult<DateTime<Utc>> {
    Ok(DateTime::parse_from_rfc3339(value)?.with_timezone(&Utc))
}