Skip to main content

kaizen/store/sqlite/
sync.rs

1use super::*;
2
3impl Store {
4    pub fn list_outbox_pending(&self, limit: usize) -> Result<Vec<(i64, String, String)>> {
5        let rows = self.outbox()?.list_pending(limit)?;
6        if !rows.is_empty() {
7            return Ok(rows);
8        }
9        let mut stmt = self.conn.prepare(
10            "SELECT id, kind, payload FROM sync_outbox WHERE sent = 0 ORDER BY id ASC LIMIT ?1",
11        )?;
12        let rows = stmt.query_map(params![limit as i64], |row| {
13            Ok((
14                row.get::<_, i64>(0)?,
15                row.get::<_, String>(1)?,
16                row.get::<_, String>(2)?,
17            ))
18        })?;
19        let mut out = Vec::new();
20        for r in rows {
21            out.push(r?);
22        }
23        Ok(out)
24    }
25
26    pub fn mark_outbox_sent(&self, ids: &[i64]) -> Result<()> {
27        self.outbox()?.delete_ids(ids)?;
28        for id in ids {
29            self.conn
30                .execute("UPDATE sync_outbox SET sent = 1 WHERE id = ?1", params![id])?;
31        }
32        Ok(())
33    }
34
35    pub fn replace_outbox_rows(
36        &self,
37        owner_id: &str,
38        kind: &str,
39        payloads: &[String],
40    ) -> Result<()> {
41        self.outbox()?.replace(owner_id, kind, payloads)?;
42        self.conn.execute(
43            "DELETE FROM sync_outbox WHERE session_id = ?1 AND kind = ?2 AND sent = 0",
44            params![owner_id, kind],
45        )?;
46        for payload in payloads {
47            self.conn.execute(
48                "INSERT INTO sync_outbox (session_id, kind, payload, sent) VALUES (?1, ?2, ?3, 0)",
49                params![owner_id, kind, payload],
50            )?;
51        }
52        Ok(())
53    }
54
55    pub fn outbox_pending_count(&self) -> Result<u64> {
56        let redb = self.outbox()?.pending_count()?;
57        if redb > 0 {
58            return Ok(redb);
59        }
60        let c: i64 =
61            self.conn
62                .query_row("SELECT COUNT(*) FROM sync_outbox WHERE sent = 0", [], |r| {
63                    r.get(0)
64                })?;
65        Ok(c as u64)
66    }
67
68    pub fn set_sync_state_ok(&self) -> Result<()> {
69        let now = now_ms().to_string();
70        self.conn.execute(
71            "INSERT INTO sync_state (k, v) VALUES ('last_success_ms', ?1)
72             ON CONFLICT(k) DO UPDATE SET v = excluded.v",
73            params![now],
74        )?;
75        self.conn.execute(
76            "INSERT INTO sync_state (k, v) VALUES ('consecutive_failures', '0')
77             ON CONFLICT(k) DO UPDATE SET v = excluded.v",
78            [],
79        )?;
80        self.conn
81            .execute("DELETE FROM sync_state WHERE k = 'last_error'", [])?;
82        Ok(())
83    }
84
85    pub fn set_sync_state_error(&self, msg: &str) -> Result<()> {
86        let prev: i64 = self
87            .conn
88            .query_row(
89                "SELECT v FROM sync_state WHERE k = 'consecutive_failures'",
90                [],
91                |r| {
92                    let s: String = r.get(0)?;
93                    Ok(s.parse::<i64>().unwrap_or(0))
94                },
95            )
96            .optional()?
97            .unwrap_or(0);
98        let next = prev.saturating_add(1);
99        self.conn.execute(
100            "INSERT INTO sync_state (k, v) VALUES ('last_error', ?1)
101             ON CONFLICT(k) DO UPDATE SET v = excluded.v",
102            params![msg],
103        )?;
104        self.conn.execute(
105            "INSERT INTO sync_state (k, v) VALUES ('consecutive_failures', ?1)
106             ON CONFLICT(k) DO UPDATE SET v = excluded.v",
107            params![next.to_string()],
108        )?;
109        Ok(())
110    }
111
112    pub fn sync_status(&self) -> Result<SyncStatusSnapshot> {
113        let pending_outbox = self.outbox_pending_count()?;
114        let last_success_ms = self
115            .conn
116            .query_row(
117                "SELECT v FROM sync_state WHERE k = 'last_success_ms'",
118                [],
119                |r| r.get::<_, String>(0),
120            )
121            .optional()?
122            .and_then(|s| s.parse().ok());
123        let last_error = self
124            .conn
125            .query_row("SELECT v FROM sync_state WHERE k = 'last_error'", [], |r| {
126                r.get::<_, String>(0)
127            })
128            .optional()?;
129        let consecutive_failures = self
130            .conn
131            .query_row(
132                "SELECT v FROM sync_state WHERE k = 'consecutive_failures'",
133                [],
134                |r| r.get::<_, String>(0),
135            )
136            .optional()?
137            .and_then(|s| s.parse().ok())
138            .unwrap_or(0);
139        Ok(SyncStatusSnapshot {
140            pending_outbox,
141            last_success_ms,
142            last_error,
143            consecutive_failures,
144        })
145    }
146
147    pub fn sync_state_get_u64(&self, key: &str) -> Result<Option<u64>> {
148        let row: Option<String> = self
149            .conn
150            .query_row("SELECT v FROM sync_state WHERE k = ?1", params![key], |r| {
151                r.get::<_, String>(0)
152            })
153            .optional()?;
154        Ok(row.and_then(|s| s.parse().ok()))
155    }
156
157    pub fn sync_state_set_u64(&self, key: &str, v: u64) -> Result<()> {
158        self.conn.execute(
159            "INSERT INTO sync_state (k, v) VALUES (?1, ?2)
160             ON CONFLICT(k) DO UPDATE SET v = excluded.v",
161            params![key, v.to_string()],
162        )?;
163        Ok(())
164    }
165}