Skip to main content

kaizen/store/sqlite/
sync.rs

1use super::*;
2
3impl Store {
4    pub(super) fn append_outbox_row(
5        &self,
6        owner_id: &str,
7        kind: &str,
8        payload: &str,
9    ) -> Result<()> {
10        self.conn.execute(
11            "INSERT INTO sync_outbox (session_id, kind, payload, sent)
12             VALUES (?1, ?2, ?3, 0)",
13            params![owner_id, kind, payload],
14        )?;
15        Ok(())
16    }
17
18    pub fn list_outbox_pending(&self, limit: usize) -> Result<Vec<(i64, String, String)>> {
19        let mut stmt = self.conn.prepare(
20            "SELECT id, kind, payload FROM sync_outbox WHERE sent = 0 ORDER BY id ASC LIMIT ?1",
21        )?;
22        let rows = stmt.query_map(params![limit as i64], read_outbox_row)?;
23        rows.collect::<rusqlite::Result<_>>().map_err(Into::into)
24    }
25
26    pub fn mark_outbox_sent(&self, ids: &[i64]) -> Result<()> {
27        let tx = rusqlite::Transaction::new_unchecked(&self.conn, TransactionBehavior::Immediate)?;
28        ids.iter().try_for_each(|id| {
29            tx.execute("UPDATE sync_outbox SET sent = 1 WHERE id = ?1", [id])
30                .map(|_| ())
31        })?;
32        tx.commit()?;
33        Ok(())
34    }
35
36    pub fn replace_outbox_rows(
37        &self,
38        owner_id: &str,
39        kind: &str,
40        payloads: &[String],
41    ) -> Result<()> {
42        let tx = rusqlite::Transaction::new_unchecked(&self.conn, TransactionBehavior::Immediate)?;
43        tx.execute(
44            "DELETE FROM sync_outbox WHERE session_id = ?1 AND kind = ?2 AND sent = 0",
45            params![owner_id, kind],
46        )?;
47        insert_outbox_payloads(&tx, owner_id, kind, payloads)?;
48        tx.commit()?;
49        Ok(())
50    }
51
52    pub fn outbox_pending_count(&self) -> Result<u64> {
53        let c: i64 =
54            self.conn
55                .query_row("SELECT COUNT(*) FROM sync_outbox WHERE sent = 0", [], |r| {
56                    r.get(0)
57                })?;
58        Ok(c as u64)
59    }
60
61    pub fn set_sync_state_ok(&self) -> Result<()> {
62        let now = now_ms().to_string();
63        self.conn.execute(
64            "INSERT INTO sync_state (k, v) VALUES ('last_success_ms', ?1)
65             ON CONFLICT(k) DO UPDATE SET v = excluded.v",
66            params![now],
67        )?;
68        self.conn.execute(
69            "INSERT INTO sync_state (k, v) VALUES ('consecutive_failures', '0')
70             ON CONFLICT(k) DO UPDATE SET v = excluded.v",
71            [],
72        )?;
73        self.conn
74            .execute("DELETE FROM sync_state WHERE k = 'last_error'", [])?;
75        Ok(())
76    }
77
78    pub fn set_sync_state_error(&self, msg: &str) -> Result<()> {
79        let prev: i64 = self
80            .conn
81            .query_row(
82                "SELECT v FROM sync_state WHERE k = 'consecutive_failures'",
83                [],
84                |r| {
85                    let s: String = r.get(0)?;
86                    Ok(s.parse::<i64>().unwrap_or(0))
87                },
88            )
89            .optional()?
90            .unwrap_or(0);
91        let next = prev.saturating_add(1);
92        self.conn.execute(
93            "INSERT INTO sync_state (k, v) VALUES ('last_error', ?1)
94             ON CONFLICT(k) DO UPDATE SET v = excluded.v",
95            params![msg],
96        )?;
97        self.conn.execute(
98            "INSERT INTO sync_state (k, v) VALUES ('consecutive_failures', ?1)
99             ON CONFLICT(k) DO UPDATE SET v = excluded.v",
100            params![next.to_string()],
101        )?;
102        Ok(())
103    }
104
105    pub fn sync_status(&self) -> Result<SyncStatusSnapshot> {
106        let pending_outbox = self.outbox_pending_count()?;
107        let last_success_ms = self
108            .conn
109            .query_row(
110                "SELECT v FROM sync_state WHERE k = 'last_success_ms'",
111                [],
112                |r| r.get::<_, String>(0),
113            )
114            .optional()?
115            .and_then(|s| s.parse().ok());
116        let last_error = self
117            .conn
118            .query_row("SELECT v FROM sync_state WHERE k = 'last_error'", [], |r| {
119                r.get::<_, String>(0)
120            })
121            .optional()?;
122        let consecutive_failures = self
123            .conn
124            .query_row(
125                "SELECT v FROM sync_state WHERE k = 'consecutive_failures'",
126                [],
127                |r| r.get::<_, String>(0),
128            )
129            .optional()?
130            .and_then(|s| s.parse().ok())
131            .unwrap_or(0);
132        Ok(SyncStatusSnapshot {
133            pending_outbox,
134            last_success_ms,
135            last_error,
136            consecutive_failures,
137        })
138    }
139
140    pub fn sync_state_get_u64(&self, key: &str) -> Result<Option<u64>> {
141        let row: Option<String> = self
142            .conn
143            .query_row("SELECT v FROM sync_state WHERE k = ?1", params![key], |r| {
144                r.get::<_, String>(0)
145            })
146            .optional()?;
147        Ok(row.and_then(|s| s.parse().ok()))
148    }
149
150    pub fn sync_state_set_u64(&self, key: &str, v: u64) -> Result<()> {
151        self.conn.execute(
152            "INSERT INTO sync_state (k, v) VALUES (?1, ?2)
153             ON CONFLICT(k) DO UPDATE SET v = excluded.v",
154            params![key, v.to_string()],
155        )?;
156        Ok(())
157    }
158}
159
160fn read_outbox_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<(i64, String, String)> {
161    Ok((row.get(0)?, row.get(1)?, row.get(2)?))
162}
163
164fn insert_outbox_payloads(
165    tx: &rusqlite::Transaction<'_>,
166    owner_id: &str,
167    kind: &str,
168    payloads: &[String],
169) -> rusqlite::Result<()> {
170    payloads.iter().try_for_each(|payload| {
171        tx.execute(
172            "INSERT INTO sync_outbox (session_id, kind, payload, sent)
173             VALUES (?1, ?2, ?3, 0)",
174            params![owner_id, kind, payload],
175        )
176        .map(|_| ())
177    })
178}