use anyhow::Result;
use rusqlite::{Connection, OptionalExtension};
const MAX_CACHE_ENTRIES: usize = 64;
const MAX_CACHE_EVENTS: usize = 256;
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) struct ProjectionCacheEntryRecord {
pub(crate) id: Option<i64>,
pub(crate) observed_at_epoch_s: u64,
pub(crate) target: String,
pub(crate) format: String,
pub(crate) source_fingerprint: String,
pub(crate) tool_surface_fingerprint: String,
pub(crate) payload_json: String,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) struct ProjectionCacheEventRecord {
pub(crate) id: Option<i64>,
pub(crate) observed_at_epoch_s: u64,
pub(crate) session_id: Option<String>,
pub(crate) target: String,
pub(crate) format: String,
pub(crate) source_fingerprint: String,
pub(crate) tool_surface_fingerprint: String,
pub(crate) cache_status: String,
}
pub(crate) fn normalize_tool_surface_fingerprint(value: Option<&str>) -> String {
value.unwrap_or("").trim().to_owned()
}
pub(crate) fn read_entry(
conn: &Connection,
target: &str,
format: &str,
source_fingerprint: &str,
tool_surface_fingerprint: &str,
) -> Result<Option<ProjectionCacheEntryRecord>> {
conn.query_row(
"SELECT id, observed_at_epoch_s, target, format, source_fingerprint,
tool_surface_fingerprint, payload_json
FROM projection_cache_entries
WHERE target = ?1 AND format = ?2 AND source_fingerprint = ?3
AND tool_surface_fingerprint = ?4
LIMIT 1",
rusqlite::params![target, format, source_fingerprint, tool_surface_fingerprint],
|row| {
Ok(ProjectionCacheEntryRecord {
id: row.get(0)?,
observed_at_epoch_s: row.get(1)?,
target: row.get(2)?,
format: row.get(3)?,
source_fingerprint: row.get(4)?,
tool_surface_fingerprint: row.get(5)?,
payload_json: row.get(6)?,
})
},
)
.optional()
.map_err(Into::into)
}
pub(crate) fn upsert_entry(conn: &Connection, record: &ProjectionCacheEntryRecord) -> Result<()> {
conn.execute(
"INSERT INTO projection_cache_entries
(observed_at_epoch_s, target, format, source_fingerprint,
tool_surface_fingerprint, payload_json)
VALUES (?1, ?2, ?3, ?4, ?5, ?6)
ON CONFLICT(target, format, source_fingerprint, tool_surface_fingerprint)
DO UPDATE SET
observed_at_epoch_s = excluded.observed_at_epoch_s,
payload_json = excluded.payload_json",
rusqlite::params![
record.observed_at_epoch_s,
&record.target,
&record.format,
&record.source_fingerprint,
&record.tool_surface_fingerprint,
&record.payload_json,
],
)?;
prune_entries(conn, MAX_CACHE_ENTRIES)?;
Ok(())
}
pub(crate) fn record_event(conn: &Connection, record: &ProjectionCacheEventRecord) -> Result<()> {
conn.execute(
"INSERT INTO projection_cache_events
(observed_at_epoch_s, session_id, target, format, source_fingerprint,
tool_surface_fingerprint, cache_status)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
rusqlite::params![
record.observed_at_epoch_s,
&record.session_id,
&record.target,
&record.format,
&record.source_fingerprint,
&record.tool_surface_fingerprint,
&record.cache_status,
],
)?;
prune_events(conn, MAX_CACHE_EVENTS)?;
Ok(())
}
pub(crate) fn list_recent_events_for_target(
conn: &Connection,
target: &str,
limit: usize,
) -> Result<Vec<ProjectionCacheEventRecord>> {
let mut stmt = conn.prepare(
"SELECT id, observed_at_epoch_s, session_id, target, format, source_fingerprint,
tool_surface_fingerprint, cache_status
FROM projection_cache_events
WHERE target = ?1
ORDER BY observed_at_epoch_s DESC, id DESC
LIMIT ?2",
)?;
let rows = stmt.query_map(rusqlite::params![target, limit as i64], |row| {
Ok(ProjectionCacheEventRecord {
id: row.get(0)?,
observed_at_epoch_s: row.get(1)?,
session_id: row.get(2)?,
target: row.get(3)?,
format: row.get(4)?,
source_fingerprint: row.get(5)?,
tool_surface_fingerprint: row.get(6)?,
cache_status: row.get(7)?,
})
})?;
rows.collect::<Result<Vec<_>, _>>().map_err(Into::into)
}
fn prune_entries(conn: &Connection, max_entries: usize) -> Result<()> {
let count: i64 =
conn.query_row("SELECT COUNT(*) FROM projection_cache_entries", [], |row| {
row.get(0)
})?;
if count as usize > max_entries {
let excess = count as usize - max_entries;
conn.execute(
"DELETE FROM projection_cache_entries WHERE id IN
(SELECT id FROM projection_cache_entries ORDER BY observed_at_epoch_s ASC, id ASC LIMIT ?1)",
[excess as i64],
)?;
}
Ok(())
}
fn prune_events(conn: &Connection, max_events: usize) -> Result<()> {
let count: i64 = conn.query_row("SELECT COUNT(*) FROM projection_cache_events", [], |row| {
row.get(0)
})?;
if count as usize > max_events {
let excess = count as usize - max_events;
conn.execute(
"DELETE FROM projection_cache_events WHERE id IN
(SELECT id FROM projection_cache_events ORDER BY observed_at_epoch_s ASC, id ASC LIMIT ?1)",
[excess as i64],
)?;
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use crate::db::schema;
fn test_conn() -> Connection {
let conn = Connection::open_in_memory().unwrap();
schema::initialize(&conn, None).unwrap();
conn
}
#[test]
fn upsert_and_read_entry_roundtrip() {
let conn = test_conn();
let record = ProjectionCacheEntryRecord {
id: None,
observed_at_epoch_s: 42,
target: "session".to_owned(),
format: "bundle".to_owned(),
source_fingerprint: "fp_1".to_owned(),
tool_surface_fingerprint: String::new(),
payload_json: "{\"task\":\"demo\"}".to_owned(),
};
upsert_entry(&conn, &record).unwrap();
let loaded = read_entry(&conn, "session", "bundle", "fp_1", "")
.unwrap()
.expect("entry");
assert_eq!(loaded.source_fingerprint, "fp_1");
assert_eq!(loaded.payload_json, "{\"task\":\"demo\"}");
}
#[test]
fn record_event_and_list_recent_roundtrip() {
let conn = test_conn();
record_event(
&conn,
&ProjectionCacheEventRecord {
id: None,
observed_at_epoch_s: 100,
session_id: Some("ses_1".to_owned()),
target: "session".to_owned(),
format: "narrative".to_owned(),
source_fingerprint: "fp_1".to_owned(),
tool_surface_fingerprint: String::new(),
cache_status: "miss".to_owned(),
},
)
.unwrap();
let rows = list_recent_events_for_target(&conn, "session", 10).unwrap();
assert_eq!(rows.len(), 1);
assert_eq!(rows[0].cache_status, "miss");
}
}