ccd-cli 1.0.0-beta.2

Bootstrap and validate Continuous Context Development repositories
use std::path::Path;

use anyhow::{Context, Result};
use rusqlite::{params, Connection, Error as SqliteError, OpenFlags, OptionalExtension};

const MAX_HOST_LOOP_EVENTS_PER_HOST_HOOK: i64 = 256;

#[derive(Debug, Clone)]
pub(crate) struct HostLoopEventRecord {
    pub(crate) _id: Option<i64>,
    pub(crate) observed_at_epoch_s: u64,
    pub(crate) session_id: Option<String>,
    pub(crate) host: String,
    pub(crate) hook: String,
    pub(crate) status: String,
    pub(crate) session_boundary_action: Option<String>,
    pub(crate) source_fingerprint: Option<String>,
    pub(crate) normalized_payload_hash: Option<String>,
    pub(crate) payload_chars: Option<u64>,
    pub(crate) payload_estimated_tokens: Option<u64>,
    pub(crate) host_total_context_chars: Option<u64>,
    pub(crate) overhead_ratio: Option<f64>,
    pub(crate) session_started_at_epoch_s: Option<u64>,
    pub(crate) session_last_started_at_epoch_s: Option<u64>,
    pub(crate) session_start_count: Option<u32>,
    pub(crate) section_metrics_json: Option<String>,
}

pub(crate) fn insert(conn: &Connection, record: &HostLoopEventRecord) -> Result<()> {
    conn.execute(
        "INSERT INTO host_loop_events (
            observed_at_epoch_s,
            session_id,
            host,
            hook,
            status,
            session_boundary_action,
            source_fingerprint,
            normalized_payload_hash,
            payload_chars,
            payload_estimated_tokens,
            host_total_context_chars,
            overhead_ratio,
            session_started_at_epoch_s,
            session_last_started_at_epoch_s,
            session_start_count,
            section_metrics_json
        ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16)",
        params![
            i64::try_from(record.observed_at_epoch_s)
                .context("observed_at_epoch_s overflow while writing host loop event")?,
            record.session_id,
            record.host,
            record.hook,
            record.status,
            record.session_boundary_action,
            record.source_fingerprint,
            record.normalized_payload_hash,
            record
                .payload_chars
                .map(|value| {
                    i64::try_from(value).context("payload_chars overflow while writing host loop")
                })
                .transpose()?,
            record
                .payload_estimated_tokens
                .map(|value| {
                    i64::try_from(value)
                        .context("payload_estimated_tokens overflow while writing host loop")
                })
                .transpose()?,
            record
                .host_total_context_chars
                .map(|value| {
                    i64::try_from(value)
                        .context("host_total_context_chars overflow while writing host loop")
                })
                .transpose()?,
            record.overhead_ratio,
            record
                .session_started_at_epoch_s
                .map(|value| {
                    i64::try_from(value)
                        .context("session_started_at_epoch_s overflow while writing host loop")
                })
                .transpose()?,
            record
                .session_last_started_at_epoch_s
                .map(|value| {
                    i64::try_from(value)
                        .context("session_last_started_at_epoch_s overflow while writing host loop")
                })
                .transpose()?,
            record.session_start_count.map(i64::from),
            record.section_metrics_json,
        ],
    )?;
    prune_host_hook_events(conn, &record.host, &record.hook)?;
    Ok(())
}

pub(crate) fn latest_for_host_hook(
    path: &Path,
    host: &str,
    hook: &str,
) -> Result<Option<HostLoopEventRecord>> {
    if !path.is_file() {
        return Ok(None);
    }

    let conn = Connection::open_with_flags(path, OpenFlags::SQLITE_OPEN_READ_ONLY)
        .with_context(|| format!("open host loop DB: {}", path.display()))?;
    match conn
        .query_row(
        "SELECT id, observed_at_epoch_s, session_id, host, hook, status, session_boundary_action,
                source_fingerprint, normalized_payload_hash, payload_chars,
                payload_estimated_tokens, host_total_context_chars, overhead_ratio,
                session_started_at_epoch_s, session_last_started_at_epoch_s,
                session_start_count, section_metrics_json
         FROM host_loop_events
         WHERE host = ?1 AND hook = ?2
         ORDER BY observed_at_epoch_s DESC, id DESC
         LIMIT 1",
        params![host, hook],
        row_to_record,
    )
    .optional()
    {
        Ok(record) => Ok(record),
        Err(error) if is_missing_host_loop_table(&error) => Ok(None),
        Err(error) => Err(error.into()),
    }
}

pub(crate) fn list_recent(path: &Path, limit: usize) -> Result<Vec<HostLoopEventRecord>> {
    if !path.is_file() {
        return Ok(Vec::new());
    }
    if limit == 0 {
        return Ok(Vec::new());
    }

    let conn = Connection::open_with_flags(path, OpenFlags::SQLITE_OPEN_READ_ONLY)
        .with_context(|| format!("open host loop DB: {}", path.display()))?;
    let limit = i64::try_from(limit).context("limit overflow while listing host loop telemetry")?;
    let mut stmt = match conn.prepare(
        "SELECT id, observed_at_epoch_s, session_id, host, hook, status, session_boundary_action,
                source_fingerprint, normalized_payload_hash, payload_chars,
                payload_estimated_tokens, host_total_context_chars, overhead_ratio,
                session_started_at_epoch_s, session_last_started_at_epoch_s,
                session_start_count, section_metrics_json
         FROM host_loop_events
         ORDER BY observed_at_epoch_s DESC, id DESC
         LIMIT ?1",
    ) {
        Ok(stmt) => stmt,
        Err(error) if is_missing_host_loop_table(&error) => return Ok(Vec::new()),
        Err(error) => return Err(error.into()),
    };
    let rows = match stmt.query_map([limit], row_to_record) {
        Ok(rows) => rows,
        Err(error) if is_missing_host_loop_table(&error) => return Ok(Vec::new()),
        Err(error) => return Err(error.into()),
    };
    let mut records = Vec::new();
    for row in rows {
        records.push(row?);
    }
    Ok(records)
}

fn is_missing_host_loop_table(error: &SqliteError) -> bool {
    matches!(error, SqliteError::SqliteFailure(_, Some(message)) if message.contains("no such table: host_loop_events"))
}

fn prune_host_hook_events(conn: &Connection, host: &str, hook: &str) -> Result<()> {
    conn.execute(
        "DELETE FROM host_loop_events
         WHERE host = ?1
           AND hook = ?2
           AND id NOT IN (
               SELECT id
               FROM host_loop_events
               WHERE host = ?1 AND hook = ?2
               ORDER BY observed_at_epoch_s DESC, id DESC
               LIMIT ?3
           )",
        params![host, hook, MAX_HOST_LOOP_EVENTS_PER_HOST_HOOK],
    )?;
    Ok(())
}

fn row_to_record(row: &rusqlite::Row<'_>) -> rusqlite::Result<HostLoopEventRecord> {
    Ok(HostLoopEventRecord {
        _id: row.get(0)?,
        observed_at_epoch_s: row.get(1)?,
        session_id: row.get(2)?,
        host: row.get(3)?,
        hook: row.get(4)?,
        status: row.get(5)?,
        session_boundary_action: row.get(6)?,
        source_fingerprint: row.get(7)?,
        normalized_payload_hash: row.get(8)?,
        payload_chars: row.get(9)?,
        payload_estimated_tokens: row.get(10)?,
        host_total_context_chars: row.get(11)?,
        overhead_ratio: row.get(12)?,
        session_started_at_epoch_s: row.get(13)?,
        session_last_started_at_epoch_s: row.get(14)?,
        session_start_count: row
            .get::<_, Option<i64>>(15)?
            .and_then(|value| u32::try_from(value).ok()),
        section_metrics_json: row.get(16)?,
    })
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::db::schema;

    #[test]
    fn insert_and_latest_round_trip() {
        let path = tempfile::NamedTempFile::new().unwrap();
        let conn = Connection::open(path.path()).unwrap();
        schema::initialize(&conn).unwrap();

        insert(
            &conn,
            &HostLoopEventRecord {
                _id: None,
                observed_at_epoch_s: 1_000,
                session_id: Some("ses_1".to_owned()),
                host: "codex".to_owned(),
                hook: "before_prompt_build".to_owned(),
                status: "bounded_context".to_owned(),
                session_boundary_action: Some("continue".to_owned()),
                source_fingerprint: Some("fp_1".to_owned()),
                normalized_payload_hash: Some("hash_1".to_owned()),
                payload_chars: Some(123),
                payload_estimated_tokens: Some(31),
                host_total_context_chars: Some(500),
                overhead_ratio: Some(0.246),
                session_started_at_epoch_s: Some(900),
                session_last_started_at_epoch_s: Some(950),
                session_start_count: Some(2),
                section_metrics_json: Some("[]".to_owned()),
            },
        )
        .unwrap();

        let latest = latest_for_host_hook(path.path(), "codex", "before_prompt_build")
            .unwrap()
            .unwrap();
        assert_eq!(latest.session_id.as_deref(), Some("ses_1"));
        assert_eq!(latest.normalized_payload_hash.as_deref(), Some("hash_1"));
        assert_eq!(latest.payload_chars, Some(123));
        assert_eq!(latest.session_start_count, Some(2));
    }

    #[test]
    fn insert_prunes_older_rows_for_same_host_and_hook() {
        let path = tempfile::NamedTempFile::new().unwrap();
        let conn = Connection::open(path.path()).unwrap();
        schema::initialize(&conn).unwrap();

        for index in 0..300u64 {
            insert(
                &conn,
                &HostLoopEventRecord {
                    _id: None,
                    observed_at_epoch_s: 1_000 + index,
                    session_id: None,
                    host: "codex".to_owned(),
                    hook: "before_prompt_build".to_owned(),
                    status: "bounded_context".to_owned(),
                    session_boundary_action: Some("continue".to_owned()),
                    source_fingerprint: None,
                    normalized_payload_hash: Some(format!("hash_{index}")),
                    payload_chars: Some(100 + index),
                    payload_estimated_tokens: Some(25),
                    host_total_context_chars: None,
                    overhead_ratio: None,
                    session_started_at_epoch_s: None,
                    session_last_started_at_epoch_s: None,
                    session_start_count: None,
                    section_metrics_json: None,
                },
            )
            .unwrap();
        }

        insert(
            &conn,
            &HostLoopEventRecord {
                _id: None,
                observed_at_epoch_s: 9_999,
                session_id: None,
                host: "codex".to_owned(),
                hook: "on_session_start".to_owned(),
                status: "session_bootstrapped".to_owned(),
                session_boundary_action: Some("continue".to_owned()),
                source_fingerprint: None,
                normalized_payload_hash: Some("startup_hash".to_owned()),
                payload_chars: Some(77),
                payload_estimated_tokens: Some(20),
                host_total_context_chars: None,
                overhead_ratio: None,
                session_started_at_epoch_s: None,
                session_last_started_at_epoch_s: None,
                session_start_count: None,
                section_metrics_json: None,
            },
        )
        .unwrap();

        let count: i64 = conn
            .query_row(
                "SELECT COUNT(*) FROM host_loop_events WHERE host = ?1 AND hook = ?2",
                params!["codex", "before_prompt_build"],
                |row| row.get(0),
            )
            .unwrap();
        assert_eq!(count, MAX_HOST_LOOP_EVENTS_PER_HOST_HOOK);

        let other_count: i64 = conn
            .query_row(
                "SELECT COUNT(*) FROM host_loop_events WHERE host = ?1 AND hook = ?2",
                params!["codex", "on_session_start"],
                |row| row.get(0),
            )
            .unwrap();
        assert_eq!(other_count, 1);

        let oldest_retained: Option<String> = conn
            .query_row(
                "SELECT normalized_payload_hash
                 FROM host_loop_events
                 WHERE host = ?1 AND hook = ?2
                 ORDER BY observed_at_epoch_s ASC, id ASC
                 LIMIT 1",
                params!["codex", "before_prompt_build"],
                |row| row.get(0),
            )
            .optional()
            .unwrap();
        assert_eq!(oldest_retained.as_deref(), Some("hash_44"));
    }

    #[test]
    fn list_recent_rejects_limit_overflow() {
        let path = tempfile::NamedTempFile::new().unwrap();
        let too_large = usize::try_from(i64::MAX)
            .ok()
            .and_then(|value| value.checked_add(1));

        let Some(too_large) = too_large else {
            return;
        };

        let error = list_recent(path.path(), too_large).expect_err("overflow should fail");
        assert!(error
            .to_string()
            .contains("limit overflow while listing host loop telemetry"));
    }
}