use std::path::Path;
use anyhow::{Context, Result};
use rusqlite::{params, Connection, Error as SqliteError, OpenFlags, OptionalExtension};
const MAX_HOST_LOOP_EVENTS_PER_HOST_HOOK: i64 = 256;
#[derive(Debug, Clone)]
pub(crate) struct HostLoopEventRecord {
pub(crate) _id: Option<i64>,
pub(crate) observed_at_epoch_s: u64,
pub(crate) session_id: Option<String>,
pub(crate) host: String,
pub(crate) hook: String,
pub(crate) status: String,
pub(crate) session_boundary_action: Option<String>,
pub(crate) source_fingerprint: Option<String>,
pub(crate) normalized_payload_hash: Option<String>,
pub(crate) payload_chars: Option<u64>,
pub(crate) payload_estimated_tokens: Option<u64>,
pub(crate) host_total_context_chars: Option<u64>,
pub(crate) overhead_ratio: Option<f64>,
pub(crate) session_started_at_epoch_s: Option<u64>,
pub(crate) session_last_started_at_epoch_s: Option<u64>,
pub(crate) session_start_count: Option<u32>,
pub(crate) section_metrics_json: Option<String>,
}
pub(crate) fn insert(conn: &Connection, record: &HostLoopEventRecord) -> Result<()> {
conn.execute(
"INSERT INTO host_loop_events (
observed_at_epoch_s,
session_id,
host,
hook,
status,
session_boundary_action,
source_fingerprint,
normalized_payload_hash,
payload_chars,
payload_estimated_tokens,
host_total_context_chars,
overhead_ratio,
session_started_at_epoch_s,
session_last_started_at_epoch_s,
session_start_count,
section_metrics_json
) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16)",
params![
i64::try_from(record.observed_at_epoch_s)
.context("observed_at_epoch_s overflow while writing host loop event")?,
record.session_id,
record.host,
record.hook,
record.status,
record.session_boundary_action,
record.source_fingerprint,
record.normalized_payload_hash,
record
.payload_chars
.map(|value| {
i64::try_from(value).context("payload_chars overflow while writing host loop")
})
.transpose()?,
record
.payload_estimated_tokens
.map(|value| {
i64::try_from(value)
.context("payload_estimated_tokens overflow while writing host loop")
})
.transpose()?,
record
.host_total_context_chars
.map(|value| {
i64::try_from(value)
.context("host_total_context_chars overflow while writing host loop")
})
.transpose()?,
record.overhead_ratio,
record
.session_started_at_epoch_s
.map(|value| {
i64::try_from(value)
.context("session_started_at_epoch_s overflow while writing host loop")
})
.transpose()?,
record
.session_last_started_at_epoch_s
.map(|value| {
i64::try_from(value)
.context("session_last_started_at_epoch_s overflow while writing host loop")
})
.transpose()?,
record.session_start_count.map(i64::from),
record.section_metrics_json,
],
)?;
prune_host_hook_events(conn, &record.host, &record.hook)?;
Ok(())
}
pub(crate) fn latest_for_host_hook(
path: &Path,
host: &str,
hook: &str,
) -> Result<Option<HostLoopEventRecord>> {
if !path.is_file() {
return Ok(None);
}
let conn = Connection::open_with_flags(path, OpenFlags::SQLITE_OPEN_READ_ONLY)
.with_context(|| format!("open host loop DB: {}", path.display()))?;
match conn
.query_row(
"SELECT id, observed_at_epoch_s, session_id, host, hook, status, session_boundary_action,
source_fingerprint, normalized_payload_hash, payload_chars,
payload_estimated_tokens, host_total_context_chars, overhead_ratio,
session_started_at_epoch_s, session_last_started_at_epoch_s,
session_start_count, section_metrics_json
FROM host_loop_events
WHERE host = ?1 AND hook = ?2
ORDER BY observed_at_epoch_s DESC, id DESC
LIMIT 1",
params![host, hook],
row_to_record,
)
.optional()
{
Ok(record) => Ok(record),
Err(error) if is_missing_host_loop_table(&error) => Ok(None),
Err(error) => Err(error.into()),
}
}
pub(crate) fn list_recent(path: &Path, limit: usize) -> Result<Vec<HostLoopEventRecord>> {
if !path.is_file() {
return Ok(Vec::new());
}
if limit == 0 {
return Ok(Vec::new());
}
let conn = Connection::open_with_flags(path, OpenFlags::SQLITE_OPEN_READ_ONLY)
.with_context(|| format!("open host loop DB: {}", path.display()))?;
let limit = i64::try_from(limit).context("limit overflow while listing host loop telemetry")?;
let mut stmt = match conn.prepare(
"SELECT id, observed_at_epoch_s, session_id, host, hook, status, session_boundary_action,
source_fingerprint, normalized_payload_hash, payload_chars,
payload_estimated_tokens, host_total_context_chars, overhead_ratio,
session_started_at_epoch_s, session_last_started_at_epoch_s,
session_start_count, section_metrics_json
FROM host_loop_events
ORDER BY observed_at_epoch_s DESC, id DESC
LIMIT ?1",
) {
Ok(stmt) => stmt,
Err(error) if is_missing_host_loop_table(&error) => return Ok(Vec::new()),
Err(error) => return Err(error.into()),
};
let rows = match stmt.query_map([limit], row_to_record) {
Ok(rows) => rows,
Err(error) if is_missing_host_loop_table(&error) => return Ok(Vec::new()),
Err(error) => return Err(error.into()),
};
let mut records = Vec::new();
for row in rows {
records.push(row?);
}
Ok(records)
}
fn is_missing_host_loop_table(error: &SqliteError) -> bool {
matches!(error, SqliteError::SqliteFailure(_, Some(message)) if message.contains("no such table: host_loop_events"))
}
fn prune_host_hook_events(conn: &Connection, host: &str, hook: &str) -> Result<()> {
conn.execute(
"DELETE FROM host_loop_events
WHERE host = ?1
AND hook = ?2
AND id NOT IN (
SELECT id
FROM host_loop_events
WHERE host = ?1 AND hook = ?2
ORDER BY observed_at_epoch_s DESC, id DESC
LIMIT ?3
)",
params![host, hook, MAX_HOST_LOOP_EVENTS_PER_HOST_HOOK],
)?;
Ok(())
}
fn row_to_record(row: &rusqlite::Row<'_>) -> rusqlite::Result<HostLoopEventRecord> {
Ok(HostLoopEventRecord {
_id: row.get(0)?,
observed_at_epoch_s: row.get(1)?,
session_id: row.get(2)?,
host: row.get(3)?,
hook: row.get(4)?,
status: row.get(5)?,
session_boundary_action: row.get(6)?,
source_fingerprint: row.get(7)?,
normalized_payload_hash: row.get(8)?,
payload_chars: row.get(9)?,
payload_estimated_tokens: row.get(10)?,
host_total_context_chars: row.get(11)?,
overhead_ratio: row.get(12)?,
session_started_at_epoch_s: row.get(13)?,
session_last_started_at_epoch_s: row.get(14)?,
session_start_count: row
.get::<_, Option<i64>>(15)?
.and_then(|value| u32::try_from(value).ok()),
section_metrics_json: row.get(16)?,
})
}
#[cfg(test)]
mod tests {
use super::*;
use crate::db::schema;
#[test]
fn insert_and_latest_round_trip() {
let path = tempfile::NamedTempFile::new().unwrap();
let conn = Connection::open(path.path()).unwrap();
schema::initialize(&conn).unwrap();
insert(
&conn,
&HostLoopEventRecord {
_id: None,
observed_at_epoch_s: 1_000,
session_id: Some("ses_1".to_owned()),
host: "codex".to_owned(),
hook: "before_prompt_build".to_owned(),
status: "bounded_context".to_owned(),
session_boundary_action: Some("continue".to_owned()),
source_fingerprint: Some("fp_1".to_owned()),
normalized_payload_hash: Some("hash_1".to_owned()),
payload_chars: Some(123),
payload_estimated_tokens: Some(31),
host_total_context_chars: Some(500),
overhead_ratio: Some(0.246),
session_started_at_epoch_s: Some(900),
session_last_started_at_epoch_s: Some(950),
session_start_count: Some(2),
section_metrics_json: Some("[]".to_owned()),
},
)
.unwrap();
let latest = latest_for_host_hook(path.path(), "codex", "before_prompt_build")
.unwrap()
.unwrap();
assert_eq!(latest.session_id.as_deref(), Some("ses_1"));
assert_eq!(latest.normalized_payload_hash.as_deref(), Some("hash_1"));
assert_eq!(latest.payload_chars, Some(123));
assert_eq!(latest.session_start_count, Some(2));
}
#[test]
fn insert_prunes_older_rows_for_same_host_and_hook() {
let path = tempfile::NamedTempFile::new().unwrap();
let conn = Connection::open(path.path()).unwrap();
schema::initialize(&conn).unwrap();
for index in 0..300u64 {
insert(
&conn,
&HostLoopEventRecord {
_id: None,
observed_at_epoch_s: 1_000 + index,
session_id: None,
host: "codex".to_owned(),
hook: "before_prompt_build".to_owned(),
status: "bounded_context".to_owned(),
session_boundary_action: Some("continue".to_owned()),
source_fingerprint: None,
normalized_payload_hash: Some(format!("hash_{index}")),
payload_chars: Some(100 + index),
payload_estimated_tokens: Some(25),
host_total_context_chars: None,
overhead_ratio: None,
session_started_at_epoch_s: None,
session_last_started_at_epoch_s: None,
session_start_count: None,
section_metrics_json: None,
},
)
.unwrap();
}
insert(
&conn,
&HostLoopEventRecord {
_id: None,
observed_at_epoch_s: 9_999,
session_id: None,
host: "codex".to_owned(),
hook: "on_session_start".to_owned(),
status: "session_bootstrapped".to_owned(),
session_boundary_action: Some("continue".to_owned()),
source_fingerprint: None,
normalized_payload_hash: Some("startup_hash".to_owned()),
payload_chars: Some(77),
payload_estimated_tokens: Some(20),
host_total_context_chars: None,
overhead_ratio: None,
session_started_at_epoch_s: None,
session_last_started_at_epoch_s: None,
session_start_count: None,
section_metrics_json: None,
},
)
.unwrap();
let count: i64 = conn
.query_row(
"SELECT COUNT(*) FROM host_loop_events WHERE host = ?1 AND hook = ?2",
params!["codex", "before_prompt_build"],
|row| row.get(0),
)
.unwrap();
assert_eq!(count, MAX_HOST_LOOP_EVENTS_PER_HOST_HOOK);
let other_count: i64 = conn
.query_row(
"SELECT COUNT(*) FROM host_loop_events WHERE host = ?1 AND hook = ?2",
params!["codex", "on_session_start"],
|row| row.get(0),
)
.unwrap();
assert_eq!(other_count, 1);
let oldest_retained: Option<String> = conn
.query_row(
"SELECT normalized_payload_hash
FROM host_loop_events
WHERE host = ?1 AND hook = ?2
ORDER BY observed_at_epoch_s ASC, id ASC
LIMIT 1",
params!["codex", "before_prompt_build"],
|row| row.get(0),
)
.optional()
.unwrap();
assert_eq!(oldest_retained.as_deref(), Some("hash_44"));
}
#[test]
fn list_recent_rejects_limit_overflow() {
let path = tempfile::NamedTempFile::new().unwrap();
let too_large = usize::try_from(i64::MAX)
.ok()
.and_then(|value| value.checked_add(1));
let Some(too_large) = too_large else {
return;
};
let error = list_recent(path.path(), too_large).expect_err("overflow should fail");
assert!(error
.to_string()
.contains("limit overflow while listing host loop telemetry"));
}
}