codex_mobile_bridge/storage/
events.rs1use rusqlite::{OptionalExtension, params};
2
3use super::Storage;
4use super::decode::decode_json_row;
5use crate::bridge_protocol::{PersistedEvent, now_millis};
6
7impl Storage {
8 pub fn append_event(
9 &self,
10 event_type: &str,
11 runtime_id: Option<&str>,
12 thread_id: Option<&str>,
13 payload: &serde_json::Value,
14 ) -> anyhow::Result<PersistedEvent> {
15 let now = now_millis();
16 let conn = self.connect()?;
17 conn.execute(
18 "INSERT INTO events (event_type, runtime_id, thread_id, payload, created_at_ms)
19 VALUES (?1, ?2, ?3, ?4, ?5)",
20 params![
21 event_type,
22 runtime_id,
23 thread_id,
24 serde_json::to_string(payload)?,
25 now
26 ],
27 )?;
28
29 let seq = conn.last_insert_rowid();
30 Ok(PersistedEvent {
31 seq,
32 event_type: event_type.to_string(),
33 runtime_id: runtime_id.map(ToOwned::to_owned),
34 thread_id: thread_id.map(ToOwned::to_owned),
35 payload: payload.clone(),
36 created_at_ms: now,
37 })
38 }
39
40 pub fn replay_events_after(&self, last_seq: i64) -> anyhow::Result<Vec<PersistedEvent>> {
41 let conn = self.connect()?;
42 let mut stmt = conn.prepare(
43 "SELECT seq, event_type, runtime_id, thread_id, payload, created_at_ms
44 FROM events
45 WHERE seq > ?1
46 ORDER BY seq ASC",
47 )?;
48
49 let rows = stmt.query_map(params![last_seq], |row| {
50 Ok(PersistedEvent {
51 seq: row.get(0)?,
52 event_type: row.get(1)?,
53 runtime_id: row.get(2)?,
54 thread_id: row.get(3)?,
55 payload: decode_json_row(row.get::<_, String>(4)?)?,
56 created_at_ms: row.get(5)?,
57 })
58 })?;
59
60 Ok(rows.collect::<rusqlite::Result<Vec<_>>>()?)
61 }
62
63 pub fn load_thread_events(&self, thread_id: &str) -> anyhow::Result<Vec<PersistedEvent>> {
64 let conn = self.connect()?;
65 let mut stmt = conn.prepare(
66 "SELECT seq, event_type, runtime_id, thread_id, payload, created_at_ms
67 FROM events
68 WHERE thread_id = ?1
69 ORDER BY seq ASC",
70 )?;
71
72 let rows = stmt.query_map(params![thread_id], |row| {
73 Ok(PersistedEvent {
74 seq: row.get(0)?,
75 event_type: row.get(1)?,
76 runtime_id: row.get(2)?,
77 thread_id: row.get(3)?,
78 payload: decode_json_row(row.get::<_, String>(4)?)?,
79 created_at_ms: row.get(5)?,
80 })
81 })?;
82
83 Ok(rows.collect::<rusqlite::Result<Vec<_>>>()?)
84 }
85
86 pub fn save_mobile_session_ack(
87 &self,
88 device_id: &str,
89 last_ack_seq: i64,
90 ) -> anyhow::Result<()> {
91 let conn = self.connect()?;
92 let now = now_millis();
93 conn.execute(
94 "INSERT INTO mobile_sessions (device_id, last_ack_seq, updated_at_ms)
95 VALUES (?1, ?2, ?3)
96 ON CONFLICT(device_id) DO UPDATE SET
97 last_ack_seq = excluded.last_ack_seq,
98 updated_at_ms = excluded.updated_at_ms",
99 params![device_id, last_ack_seq, now],
100 )?;
101 Ok(())
102 }
103
104 pub fn get_mobile_session_ack(&self, device_id: &str) -> anyhow::Result<Option<i64>> {
105 let conn = self.connect()?;
106 let value = conn
107 .query_row(
108 "SELECT last_ack_seq FROM mobile_sessions WHERE device_id = ?1",
109 params![device_id],
110 |row| row.get(0),
111 )
112 .optional()?;
113 Ok(value)
114 }
115
116 pub fn latest_event_seq(&self) -> anyhow::Result<i64> {
117 let conn = self.connect()?;
118 let value = conn
119 .query_row("SELECT COALESCE(MAX(seq), 0) FROM events", [], |row| {
120 row.get(0)
121 })
122 .optional()?
123 .unwrap_or(0);
124 Ok(value)
125 }
126}