use anyhow::Result;
use rusqlite::Connection;
use rusqlite::OptionalExtension;
use crate::state::projection_metadata::ProjectionObservation;
const MAX_ENTRIES: usize = 32;
pub(crate) fn list(conn: &Connection) -> Result<Vec<ProjectionObservation>> {
let mut stmt = conn.prepare(
"SELECT observed_at_epoch_s, source_fingerprint, projection_digests,
tool_surface_fingerprint, session_id
FROM projection_metadata ORDER BY id ASC",
)?;
let observations = stmt
.query_map([], |row| {
let digests: Option<String> = row.get(2)?;
let tool_fp: Option<String> = row.get(3)?;
let session_id: Option<String> = row.get(4)?;
Ok(RawProjection {
observed_at_epoch_s: row.get(0)?,
source_fingerprint: row.get(1)?,
projection_digests_json: digests,
tool_surface_fingerprint: tool_fp,
session_id,
})
})?
.collect::<Result<Vec<_>, _>>()?;
observations
.into_iter()
.map(|raw| raw.into_observation())
.collect::<Result<Vec<_>>>()
}
pub(crate) fn find_baseline_for_session(
conn: &Connection,
session_id: &str,
) -> Result<Option<ProjectionObservation>> {
let raw = conn
.query_row(
"SELECT observed_at_epoch_s, source_fingerprint, projection_digests,
tool_surface_fingerprint, session_id
FROM projection_metadata WHERE session_id = ?1 ORDER BY id ASC LIMIT 1",
[session_id],
|row| {
let digests: Option<String> = row.get(2)?;
let tool_fp: Option<String> = row.get(3)?;
let sid: Option<String> = row.get(4)?;
Ok(RawProjection {
observed_at_epoch_s: row.get(0)?,
source_fingerprint: row.get(1)?,
projection_digests_json: digests,
tool_surface_fingerprint: tool_fp,
session_id: sid,
})
},
)
.optional()?;
raw.map(|r| r.into_observation()).transpose()
}
pub(crate) fn record(conn: &Connection, obs: &ProjectionObservation) -> Result<()> {
let latest_fp: Option<(String, Option<String>)> = conn
.query_row(
"SELECT source_fingerprint, tool_surface_fingerprint
FROM projection_metadata ORDER BY id DESC LIMIT 1",
[],
|row| Ok((row.get(0)?, row.get(1)?)),
)
.ok();
if let Some((fp, tool_fp)) = &latest_fp {
if fp == &obs.source_fingerprint && *tool_fp == obs.tool_surface_fingerprint {
return Ok(());
}
}
let digests_json = obs
.projection_digests
.as_ref()
.map(serde_json::to_string)
.transpose()?;
conn.execute(
"INSERT INTO projection_metadata
(observed_at_epoch_s, source_fingerprint, projection_digests,
tool_surface_fingerprint, session_id)
VALUES (?1, ?2, ?3, ?4, ?5)",
rusqlite::params![
obs.observed_at_epoch_s,
obs.source_fingerprint,
digests_json,
obs.tool_surface_fingerprint,
obs.session_id,
],
)?;
prune(conn, MAX_ENTRIES)?;
Ok(())
}
pub(crate) fn prune(conn: &Connection, max_entries: usize) -> Result<()> {
let count: i64 = conn.query_row("SELECT COUNT(*) FROM projection_metadata", [], |row| {
row.get(0)
})?;
if count as usize > max_entries {
let excess = count as usize - max_entries;
conn.execute(
"DELETE FROM projection_metadata WHERE id IN
(SELECT id FROM projection_metadata ORDER BY id ASC LIMIT ?1)",
[excess as i64],
)?;
}
Ok(())
}
struct RawProjection {
observed_at_epoch_s: u64,
source_fingerprint: String,
projection_digests_json: Option<String>,
tool_surface_fingerprint: Option<String>,
session_id: Option<String>,
}
impl RawProjection {
fn into_observation(self) -> Result<ProjectionObservation> {
let projection_digests = self
.projection_digests_json
.map(|json| serde_json::from_str(&json))
.transpose()?;
Ok(ProjectionObservation {
observed_at_epoch_s: self.observed_at_epoch_s,
source_fingerprint: self.source_fingerprint,
projection_digests,
tool_surface_fingerprint: self.tool_surface_fingerprint,
session_id: self.session_id,
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::db::schema;
fn test_conn() -> Connection {
let conn = Connection::open_in_memory().unwrap();
schema::initialize(&conn).unwrap();
conn
}
fn obs(fp: &str, epoch: u64) -> ProjectionObservation {
ProjectionObservation {
observed_at_epoch_s: epoch,
source_fingerprint: fp.to_owned(),
projection_digests: None,
tool_surface_fingerprint: None,
session_id: None,
}
}
#[test]
fn list_returns_empty_initially() {
let conn = test_conn();
assert!(list(&conn).unwrap().is_empty());
}
#[test]
fn record_and_list_roundtrip() {
let conn = test_conn();
record(&conn, &obs("fp_1", 1000)).unwrap();
record(&conn, &obs("fp_2", 2000)).unwrap();
let entries = list(&conn).unwrap();
assert_eq!(entries.len(), 2);
assert_eq!(entries[0].source_fingerprint, "fp_1");
assert_eq!(entries[1].source_fingerprint, "fp_2");
}
#[test]
fn record_deduplicates_same_fingerprint() {
let conn = test_conn();
record(&conn, &obs("fp_same", 1000)).unwrap();
record(&conn, &obs("fp_same", 2000)).unwrap();
let entries = list(&conn).unwrap();
assert_eq!(entries.len(), 1);
}
#[test]
fn prune_limits_entries() {
let conn = test_conn();
for i in 0..35 {
let o = obs(&format!("fp_{i}"), 1000 + i);
let digests_json: Option<String> = None;
conn.execute(
"INSERT INTO projection_metadata
(observed_at_epoch_s, source_fingerprint, projection_digests,
tool_surface_fingerprint, session_id)
VALUES (?1, ?2, ?3, ?4, ?5)",
rusqlite::params![
o.observed_at_epoch_s,
o.source_fingerprint,
digests_json,
o.tool_surface_fingerprint,
o.session_id,
],
)
.unwrap();
}
prune(&conn, MAX_ENTRIES).unwrap();
let entries = list(&conn).unwrap();
assert_eq!(entries.len(), MAX_ENTRIES);
assert_eq!(entries[0].source_fingerprint, "fp_3");
}
#[test]
fn record_with_tool_surface_fingerprint() {
let conn = test_conn();
let o = ProjectionObservation {
observed_at_epoch_s: 1000,
source_fingerprint: "fp_tool".to_owned(),
projection_digests: None,
tool_surface_fingerprint: Some("tool_fp_123".to_owned()),
session_id: None,
};
record(&conn, &o).unwrap();
let entries = list(&conn).unwrap();
assert_eq!(
entries[0].tool_surface_fingerprint.as_deref(),
Some("tool_fp_123")
);
}
#[test]
fn dedup_considers_tool_surface_fingerprint() {
let conn = test_conn();
let o1 = ProjectionObservation {
observed_at_epoch_s: 1000,
source_fingerprint: "fp_same".to_owned(),
projection_digests: None,
tool_surface_fingerprint: Some("tool_a".to_owned()),
session_id: None,
};
let o2 = ProjectionObservation {
observed_at_epoch_s: 2000,
source_fingerprint: "fp_same".to_owned(),
projection_digests: None,
tool_surface_fingerprint: Some("tool_b".to_owned()),
session_id: None,
};
record(&conn, &o1).unwrap();
record(&conn, &o2).unwrap();
let entries = list(&conn).unwrap();
assert_eq!(entries.len(), 2);
}
#[test]
fn find_baseline_for_session_returns_first_matching_row() {
let conn = test_conn();
let o1 = ProjectionObservation {
observed_at_epoch_s: 1000,
source_fingerprint: "fp_1".to_owned(),
projection_digests: None,
tool_surface_fingerprint: None,
session_id: Some("ses_AAA".to_owned()),
};
let o2 = ProjectionObservation {
observed_at_epoch_s: 2000,
source_fingerprint: "fp_2".to_owned(),
projection_digests: None,
tool_surface_fingerprint: None,
session_id: Some("ses_AAA".to_owned()),
};
let o3 = ProjectionObservation {
observed_at_epoch_s: 3000,
source_fingerprint: "fp_3".to_owned(),
projection_digests: None,
tool_surface_fingerprint: None,
session_id: Some("ses_BBB".to_owned()),
};
record(&conn, &o1).unwrap();
record(&conn, &o2).unwrap();
record(&conn, &o3).unwrap();
let baseline = find_baseline_for_session(&conn, "ses_AAA")
.unwrap()
.expect("expected Some for ses_AAA");
assert_eq!(baseline.source_fingerprint, "fp_1");
assert_eq!(baseline.observed_at_epoch_s, 1000);
assert_eq!(baseline.session_id.as_deref(), Some("ses_AAA"));
let baseline = find_baseline_for_session(&conn, "ses_BBB")
.unwrap()
.expect("expected Some for ses_BBB");
assert_eq!(baseline.source_fingerprint, "fp_3");
assert_eq!(baseline.observed_at_epoch_s, 3000);
let baseline = find_baseline_for_session(&conn, "ses_ZZZ").unwrap();
assert!(baseline.is_none());
}
}