ccd-cli 1.0.0-beta.3

Bootstrap and validate Continuous Context Development repositories
use anyhow::Result;
use rusqlite::{Connection, OptionalExtension};

const MAX_CACHE_ENTRIES: usize = 64;
const MAX_CACHE_EVENTS: usize = 256;

#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) struct ProjectionCacheEntryRecord {
    pub(crate) id: Option<i64>,
    pub(crate) observed_at_epoch_s: u64,
    pub(crate) target: String,
    pub(crate) format: String,
    pub(crate) source_fingerprint: String,
    pub(crate) tool_surface_fingerprint: String,
    pub(crate) payload_json: String,
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) struct ProjectionCacheEventRecord {
    pub(crate) id: Option<i64>,
    pub(crate) observed_at_epoch_s: u64,
    pub(crate) session_id: Option<String>,
    pub(crate) target: String,
    pub(crate) format: String,
    pub(crate) source_fingerprint: String,
    pub(crate) tool_surface_fingerprint: String,
    pub(crate) cache_status: String,
}

pub(crate) fn normalize_tool_surface_fingerprint(value: Option<&str>) -> String {
    value.unwrap_or("").trim().to_owned()
}

pub(crate) fn read_entry(
    conn: &Connection,
    target: &str,
    format: &str,
    source_fingerprint: &str,
    tool_surface_fingerprint: &str,
) -> Result<Option<ProjectionCacheEntryRecord>> {
    conn.query_row(
        "SELECT id, observed_at_epoch_s, target, format, source_fingerprint,
                tool_surface_fingerprint, payload_json
         FROM projection_cache_entries
         WHERE target = ?1 AND format = ?2 AND source_fingerprint = ?3
           AND tool_surface_fingerprint = ?4
         LIMIT 1",
        rusqlite::params![target, format, source_fingerprint, tool_surface_fingerprint],
        |row| {
            Ok(ProjectionCacheEntryRecord {
                id: row.get(0)?,
                observed_at_epoch_s: row.get(1)?,
                target: row.get(2)?,
                format: row.get(3)?,
                source_fingerprint: row.get(4)?,
                tool_surface_fingerprint: row.get(5)?,
                payload_json: row.get(6)?,
            })
        },
    )
    .optional()
    .map_err(Into::into)
}

pub(crate) fn upsert_entry(conn: &Connection, record: &ProjectionCacheEntryRecord) -> Result<()> {
    conn.execute(
        "INSERT INTO projection_cache_entries
            (observed_at_epoch_s, target, format, source_fingerprint,
             tool_surface_fingerprint, payload_json)
         VALUES (?1, ?2, ?3, ?4, ?5, ?6)
         ON CONFLICT(target, format, source_fingerprint, tool_surface_fingerprint)
         DO UPDATE SET
            observed_at_epoch_s = excluded.observed_at_epoch_s,
            payload_json = excluded.payload_json",
        rusqlite::params![
            record.observed_at_epoch_s,
            &record.target,
            &record.format,
            &record.source_fingerprint,
            &record.tool_surface_fingerprint,
            &record.payload_json,
        ],
    )?;
    prune_entries(conn, MAX_CACHE_ENTRIES)?;
    Ok(())
}

pub(crate) fn record_event(conn: &Connection, record: &ProjectionCacheEventRecord) -> Result<()> {
    conn.execute(
        "INSERT INTO projection_cache_events
            (observed_at_epoch_s, session_id, target, format, source_fingerprint,
             tool_surface_fingerprint, cache_status)
         VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
        rusqlite::params![
            record.observed_at_epoch_s,
            &record.session_id,
            &record.target,
            &record.format,
            &record.source_fingerprint,
            &record.tool_surface_fingerprint,
            &record.cache_status,
        ],
    )?;
    prune_events(conn, MAX_CACHE_EVENTS)?;
    Ok(())
}

pub(crate) fn list_recent_events_for_target(
    conn: &Connection,
    target: &str,
    limit: usize,
) -> Result<Vec<ProjectionCacheEventRecord>> {
    let mut stmt = conn.prepare(
        "SELECT id, observed_at_epoch_s, session_id, target, format, source_fingerprint,
                tool_surface_fingerprint, cache_status
         FROM projection_cache_events
         WHERE target = ?1
         ORDER BY observed_at_epoch_s DESC, id DESC
         LIMIT ?2",
    )?;

    let rows = stmt.query_map(rusqlite::params![target, limit as i64], |row| {
        Ok(ProjectionCacheEventRecord {
            id: row.get(0)?,
            observed_at_epoch_s: row.get(1)?,
            session_id: row.get(2)?,
            target: row.get(3)?,
            format: row.get(4)?,
            source_fingerprint: row.get(5)?,
            tool_surface_fingerprint: row.get(6)?,
            cache_status: row.get(7)?,
        })
    })?;

    rows.collect::<Result<Vec<_>, _>>().map_err(Into::into)
}

fn prune_entries(conn: &Connection, max_entries: usize) -> Result<()> {
    let count: i64 =
        conn.query_row("SELECT COUNT(*) FROM projection_cache_entries", [], |row| {
            row.get(0)
        })?;
    if count as usize > max_entries {
        let excess = count as usize - max_entries;
        conn.execute(
            "DELETE FROM projection_cache_entries WHERE id IN
             (SELECT id FROM projection_cache_entries ORDER BY observed_at_epoch_s ASC, id ASC LIMIT ?1)",
            [excess as i64],
        )?;
    }
    Ok(())
}

fn prune_events(conn: &Connection, max_events: usize) -> Result<()> {
    let count: i64 = conn.query_row("SELECT COUNT(*) FROM projection_cache_events", [], |row| {
        row.get(0)
    })?;
    if count as usize > max_events {
        let excess = count as usize - max_events;
        conn.execute(
            "DELETE FROM projection_cache_events WHERE id IN
             (SELECT id FROM projection_cache_events ORDER BY observed_at_epoch_s ASC, id ASC LIMIT ?1)",
            [excess as i64],
        )?;
    }
    Ok(())
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::db::schema;

    fn test_conn() -> Connection {
        let conn = Connection::open_in_memory().unwrap();
        schema::initialize(&conn, None).unwrap();
        conn
    }

    #[test]
    fn upsert_and_read_entry_roundtrip() {
        let conn = test_conn();
        let record = ProjectionCacheEntryRecord {
            id: None,
            observed_at_epoch_s: 42,
            target: "session".to_owned(),
            format: "bundle".to_owned(),
            source_fingerprint: "fp_1".to_owned(),
            tool_surface_fingerprint: String::new(),
            payload_json: "{\"task\":\"demo\"}".to_owned(),
        };

        upsert_entry(&conn, &record).unwrap();

        let loaded = read_entry(&conn, "session", "bundle", "fp_1", "")
            .unwrap()
            .expect("entry");
        assert_eq!(loaded.source_fingerprint, "fp_1");
        assert_eq!(loaded.payload_json, "{\"task\":\"demo\"}");
    }

    #[test]
    fn record_event_and_list_recent_roundtrip() {
        let conn = test_conn();
        record_event(
            &conn,
            &ProjectionCacheEventRecord {
                id: None,
                observed_at_epoch_s: 100,
                session_id: Some("ses_1".to_owned()),
                target: "session".to_owned(),
                format: "narrative".to_owned(),
                source_fingerprint: "fp_1".to_owned(),
                tool_surface_fingerprint: String::new(),
                cache_status: "miss".to_owned(),
            },
        )
        .unwrap();

        let rows = list_recent_events_for_target(&conn, "session", 10).unwrap();
        assert_eq!(rows.len(), 1);
        assert_eq!(rows[0].cache_status, "miss");
    }
}