Skip to main content

codex_mobile_bridge/storage/
events.rs

1use rusqlite::{OptionalExtension, params};
2
3use super::Storage;
4use super::decode::decode_json_row;
5use crate::bridge_protocol::{PersistedEvent, now_millis};
6
7impl Storage {
8    pub fn append_event(
9        &self,
10        event_type: &str,
11        runtime_id: Option<&str>,
12        thread_id: Option<&str>,
13        payload: &serde_json::Value,
14    ) -> anyhow::Result<PersistedEvent> {
15        let now = now_millis();
16        let conn = self.connect()?;
17        conn.execute(
18            "INSERT INTO events (event_type, runtime_id, thread_id, payload, created_at_ms)
19             VALUES (?1, ?2, ?3, ?4, ?5)",
20            params![
21                event_type,
22                runtime_id,
23                thread_id,
24                serde_json::to_string(payload)?,
25                now
26            ],
27        )?;
28
29        let seq = conn.last_insert_rowid();
30        Ok(PersistedEvent {
31            seq,
32            event_type: event_type.to_string(),
33            runtime_id: runtime_id.map(ToOwned::to_owned),
34            thread_id: thread_id.map(ToOwned::to_owned),
35            payload: payload.clone(),
36            created_at_ms: now,
37        })
38    }
39
40    pub fn replay_events_after(&self, last_seq: i64) -> anyhow::Result<Vec<PersistedEvent>> {
41        let conn = self.connect()?;
42        let mut stmt = conn.prepare(
43            "SELECT seq, event_type, runtime_id, thread_id, payload, created_at_ms
44             FROM events
45             WHERE seq > ?1
46             ORDER BY seq ASC",
47        )?;
48
49        let rows = stmt.query_map(params![last_seq], |row| {
50            Ok(PersistedEvent {
51                seq: row.get(0)?,
52                event_type: row.get(1)?,
53                runtime_id: row.get(2)?,
54                thread_id: row.get(3)?,
55                payload: decode_json_row(row.get::<_, String>(4)?)?,
56                created_at_ms: row.get(5)?,
57            })
58        })?;
59
60        Ok(rows.collect::<rusqlite::Result<Vec<_>>>()?)
61    }
62
63    pub fn load_thread_events(&self, thread_id: &str) -> anyhow::Result<Vec<PersistedEvent>> {
64        let conn = self.connect()?;
65        let mut stmt = conn.prepare(
66            "SELECT seq, event_type, runtime_id, thread_id, payload, created_at_ms
67             FROM events
68             WHERE thread_id = ?1
69             ORDER BY seq ASC",
70        )?;
71
72        let rows = stmt.query_map(params![thread_id], |row| {
73            Ok(PersistedEvent {
74                seq: row.get(0)?,
75                event_type: row.get(1)?,
76                runtime_id: row.get(2)?,
77                thread_id: row.get(3)?,
78                payload: decode_json_row(row.get::<_, String>(4)?)?,
79                created_at_ms: row.get(5)?,
80            })
81        })?;
82
83        Ok(rows.collect::<rusqlite::Result<Vec<_>>>()?)
84    }
85
86    pub fn save_mobile_session_ack(
87        &self,
88        device_id: &str,
89        last_ack_seq: i64,
90    ) -> anyhow::Result<()> {
91        let conn = self.connect()?;
92        let now = now_millis();
93        conn.execute(
94            "INSERT INTO mobile_sessions (device_id, last_ack_seq, updated_at_ms)
95             VALUES (?1, ?2, ?3)
96             ON CONFLICT(device_id) DO UPDATE SET
97                 last_ack_seq = excluded.last_ack_seq,
98                 updated_at_ms = excluded.updated_at_ms",
99            params![device_id, last_ack_seq, now],
100        )?;
101        Ok(())
102    }
103
104    pub fn get_mobile_session_ack(&self, device_id: &str) -> anyhow::Result<Option<i64>> {
105        let conn = self.connect()?;
106        let value = conn
107            .query_row(
108                "SELECT last_ack_seq FROM mobile_sessions WHERE device_id = ?1",
109                params![device_id],
110                |row| row.get(0),
111            )
112            .optional()?;
113        Ok(value)
114    }
115
116    pub fn latest_event_seq(&self) -> anyhow::Result<i64> {
117        let conn = self.connect()?;
118        let value = conn
119            .query_row("SELECT COALESCE(MAX(seq), 0) FROM events", [], |row| {
120                row.get(0)
121            })
122            .optional()?
123            .unwrap_or(0);
124        Ok(value)
125    }
126}