Skip to main content

aft/db/
compression_events.rs

1use rusqlite::{params, Connection};
2
3pub struct CompressionEventRow<'a> {
4    pub harness: &'a str,
5    pub session_id: Option<&'a str>,
6    pub project_key: &'a str,
7    pub tool: &'a str,
8    pub task_id: Option<&'a str>,
9    pub command: Option<&'a str>,
10    pub compressor: &'a str,
11    pub original_bytes: i64,
12    pub compressed_bytes: i64,
13    pub original_tokens: u32,
14    pub compressed_tokens: u32,
15    pub created_at: i64,
16}
17
18#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, serde::Serialize)]
19pub struct CompressionAggregate {
20    pub events: u64,
21    pub original_tokens: u64,
22    pub compressed_tokens: u64,
23}
24
25impl CompressionAggregate {
26    pub fn savings_tokens(&self) -> u64 {
27        self.original_tokens.saturating_sub(self.compressed_tokens)
28    }
29}
30
31pub fn insert_compression_event(
32    conn: &Connection,
33    row: &CompressionEventRow<'_>,
34) -> rusqlite::Result<()> {
35    conn.execute(
36        r#"
37        INSERT OR IGNORE INTO compression_events (
38            harness, session_id, project_key, tool, task_id, command, compressor,
39            original_bytes, compressed_bytes, original_tokens, compressed_tokens, created_at
40        )
41        VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12)
42        "#,
43        params![
44            row.harness,
45            row.session_id,
46            row.project_key,
47            row.tool,
48            row.task_id,
49            row.command,
50            row.compressor,
51            row.original_bytes,
52            row.compressed_bytes,
53            row.original_tokens,
54            row.compressed_tokens,
55            row.created_at,
56        ],
57    )?;
58    Ok(())
59}
60
61pub fn aggregate_for_project(
62    conn: &Connection,
63    harness: &str,
64    project_key: &str,
65) -> rusqlite::Result<CompressionAggregate> {
66    conn.query_row(
67        r#"
68        SELECT
69            COUNT(*) AS events,
70            COALESCE(SUM(original_tokens), 0) AS original,
71            COALESCE(SUM(compressed_tokens), 0) AS compressed
72        FROM compression_events
73        WHERE harness = ?1 AND project_key = ?2
74        "#,
75        params![harness, project_key],
76        |row| {
77            Ok(CompressionAggregate {
78                events: row.get::<_, i64>(0)? as u64,
79                original_tokens: row.get::<_, i64>(1)? as u64,
80                compressed_tokens: row.get::<_, i64>(2)? as u64,
81            })
82        },
83    )
84}
85
86pub fn aggregate_for_session(
87    conn: &Connection,
88    harness: &str,
89    project_key: &str,
90    session_id: &str,
91) -> rusqlite::Result<CompressionAggregate> {
92    conn.query_row(
93        r#"
94        SELECT
95            COUNT(*) AS events,
96            COALESCE(SUM(original_tokens), 0) AS original,
97            COALESCE(SUM(compressed_tokens), 0) AS compressed
98        FROM compression_events
99        WHERE harness = ?1 AND project_key = ?2 AND session_id = ?3
100        "#,
101        params![harness, project_key, session_id],
102        |row| {
103            Ok(CompressionAggregate {
104                events: row.get::<_, i64>(0)? as u64,
105                original_tokens: row.get::<_, i64>(1)? as u64,
106                compressed_tokens: row.get::<_, i64>(2)? as u64,
107            })
108        },
109    )
110}
111
112#[cfg(test)]
113mod tests {
114    use super::*;
115    use tempfile::tempdir;
116
117    #[test]
118    fn duplicate_identity_is_ignored_without_cross_project_suppression() {
119        let dir = tempdir().expect("tempdir");
120        let conn = crate::db::open(&dir.path().join("aft.db")).expect("open db");
121
122        insert_compression_event(&conn, &row("project-a", 100, 40, 1)).expect("insert first");
123        insert_compression_event(&conn, &row("project-a", 900, 10, 2)).expect("ignore duplicate");
124        insert_compression_event(&conn, &row("project-b", 200, 80, 3))
125            .expect("insert same task id for other project");
126
127        let project_a = aggregate_for_project(&conn, "opencode", "project-a").unwrap();
128        assert_eq!(project_a.events, 1);
129        assert_eq!(project_a.original_tokens, 100);
130        assert_eq!(project_a.compressed_tokens, 40);
131
132        let project_b = aggregate_for_project(&conn, "opencode", "project-b").unwrap();
133        assert_eq!(project_b.events, 1);
134        assert_eq!(project_b.original_tokens, 200);
135        assert_eq!(project_b.compressed_tokens, 80);
136    }
137
138    fn row(
139        project_key: &'static str,
140        original_tokens: u32,
141        compressed_tokens: u32,
142        created_at: i64,
143    ) -> CompressionEventRow<'static> {
144        CompressionEventRow {
145            harness: "opencode",
146            session_id: Some("session-1"),
147            project_key,
148            tool: "bash",
149            task_id: Some("task-1"),
150            command: Some("echo ok"),
151            compressor: "zstd",
152            original_bytes: original_tokens as i64 * 4,
153            compressed_bytes: compressed_tokens as i64 * 4,
154            original_tokens,
155            compressed_tokens,
156            created_at,
157        }
158    }
159}