use rusqlite::{OptionalExtension, params};
use super::Storage;
use super::decode::decode_json_row;
use crate::bridge_protocol::{PersistedEvent, now_millis};
impl Storage {
pub fn append_event(
&self,
event_type: &str,
runtime_id: Option<&str>,
thread_id: Option<&str>,
payload: &serde_json::Value,
) -> anyhow::Result<PersistedEvent> {
let now = now_millis();
let conn = self.connect()?;
conn.execute(
"INSERT INTO events (event_type, runtime_id, thread_id, payload, created_at_ms)
VALUES (?1, ?2, ?3, ?4, ?5)",
params![
event_type,
runtime_id,
thread_id,
serde_json::to_string(payload)?,
now
],
)?;
let seq = conn.last_insert_rowid();
Ok(PersistedEvent {
seq,
event_type: event_type.to_string(),
runtime_id: runtime_id.map(ToOwned::to_owned),
thread_id: thread_id.map(ToOwned::to_owned),
payload: payload.clone(),
created_at_ms: now,
})
}
pub fn replay_events_after(&self, last_seq: i64) -> anyhow::Result<Vec<PersistedEvent>> {
let conn = self.connect()?;
let mut stmt = conn.prepare(
"SELECT seq, event_type, runtime_id, thread_id, payload, created_at_ms
FROM events
WHERE seq > ?1
ORDER BY seq ASC",
)?;
let rows = stmt.query_map(params![last_seq], |row| {
Ok(PersistedEvent {
seq: row.get(0)?,
event_type: row.get(1)?,
runtime_id: row.get(2)?,
thread_id: row.get(3)?,
payload: decode_json_row(row.get::<_, String>(4)?)?,
created_at_ms: row.get(5)?,
})
})?;
Ok(rows.collect::<rusqlite::Result<Vec<_>>>()?)
}
pub fn load_thread_events(&self, thread_id: &str) -> anyhow::Result<Vec<PersistedEvent>> {
let conn = self.connect()?;
let mut stmt = conn.prepare(
"SELECT seq, event_type, runtime_id, thread_id, payload, created_at_ms
FROM events
WHERE thread_id = ?1
ORDER BY seq ASC",
)?;
let rows = stmt.query_map(params![thread_id], |row| {
Ok(PersistedEvent {
seq: row.get(0)?,
event_type: row.get(1)?,
runtime_id: row.get(2)?,
thread_id: row.get(3)?,
payload: decode_json_row(row.get::<_, String>(4)?)?,
created_at_ms: row.get(5)?,
})
})?;
Ok(rows.collect::<rusqlite::Result<Vec<_>>>()?)
}
pub fn save_mobile_session_ack(
&self,
device_id: &str,
last_ack_seq: i64,
) -> anyhow::Result<()> {
let conn = self.connect()?;
let now = now_millis();
conn.execute(
"INSERT INTO mobile_sessions (device_id, last_ack_seq, updated_at_ms)
VALUES (?1, ?2, ?3)
ON CONFLICT(device_id) DO UPDATE SET
last_ack_seq = excluded.last_ack_seq,
updated_at_ms = excluded.updated_at_ms",
params![device_id, last_ack_seq, now],
)?;
Ok(())
}
pub fn get_mobile_session_ack(&self, device_id: &str) -> anyhow::Result<Option<i64>> {
let conn = self.connect()?;
let value = conn
.query_row(
"SELECT last_ack_seq FROM mobile_sessions WHERE device_id = ?1",
params![device_id],
|row| row.get(0),
)
.optional()?;
Ok(value)
}
pub fn latest_event_seq(&self) -> anyhow::Result<i64> {
let conn = self.connect()?;
let value = conn
.query_row("SELECT COALESCE(MAX(seq), 0) FROM events", [], |row| {
row.get(0)
})
.optional()?
.unwrap_or(0);
Ok(value)
}
}