Skip to main content

codex_mobile_bridge/storage/
management_tasks.rs

1use rusqlite::{OptionalExtension, params};
2
3use super::Storage;
4use super::decode::decode_json_row;
5use crate::bridge_protocol::{BridgeManagementStatus, BridgeManagementTask};
6
7impl Storage {
8    pub fn upsert_bridge_management_task(&self, task: &BridgeManagementTask) -> anyhow::Result<()> {
9        let conn = self.connect()?;
10        conn.execute(
11            "INSERT INTO bridge_management_tasks (
12                 task_id, status, updated_at_ms, raw_json
13             )
14             VALUES (?1, ?2, ?3, ?4)
15             ON CONFLICT(task_id) DO UPDATE SET
16                 status = excluded.status,
17                 updated_at_ms = excluded.updated_at_ms,
18                 raw_json = excluded.raw_json",
19            params![
20                task.task_id,
21                serde_json::to_string(&task.status)?,
22                task.updated_at_ms,
23                serde_json::to_string(task)?,
24            ],
25        )?;
26        Ok(())
27    }
28
29    pub fn get_bridge_management_task(
30        &self,
31        task_id: &str,
32    ) -> anyhow::Result<Option<BridgeManagementTask>> {
33        let conn = self.connect()?;
34        let task = conn
35            .query_row(
36                "SELECT raw_json FROM bridge_management_tasks WHERE task_id = ?1",
37                params![task_id],
38                |row| {
39                    let raw: String = row.get(0)?;
40                    decode_json_row(raw)
41                },
42            )
43            .optional()?;
44        Ok(task)
45    }
46
47    pub fn latest_bridge_management_task(&self) -> anyhow::Result<Option<BridgeManagementTask>> {
48        let conn = self.connect()?;
49        let task = conn
50            .query_row(
51                "SELECT raw_json
52                 FROM bridge_management_tasks
53                 ORDER BY updated_at_ms DESC
54                 LIMIT 1",
55                [],
56                |row| {
57                    let raw: String = row.get(0)?;
58                    decode_json_row(raw)
59                },
60            )
61            .optional()?;
62        Ok(task)
63    }
64
65    pub fn active_bridge_management_task(&self) -> anyhow::Result<Option<BridgeManagementTask>> {
66        let conn = self.connect()?;
67        let task = conn
68            .query_row(
69                "SELECT raw_json
70                 FROM bridge_management_tasks
71                 WHERE status IN (?1, ?2, ?3)
72                 ORDER BY updated_at_ms DESC
73                 LIMIT 1",
74                params![
75                    serde_json::to_string(&BridgeManagementStatus::Queued)?,
76                    serde_json::to_string(&BridgeManagementStatus::Running)?,
77                    serde_json::to_string(&BridgeManagementStatus::Reconnecting)?,
78                ],
79                |row| {
80                    let raw: String = row.get(0)?;
81                    decode_json_row(raw)
82                },
83            )
84            .optional()?;
85        Ok(task)
86    }
87}