kaizen/store/sqlite/
sync.rs1use super::*;
2
3impl Store {
4 pub fn list_outbox_pending(&self, limit: usize) -> Result<Vec<(i64, String, String)>> {
5 let rows = self.outbox()?.list_pending(limit)?;
6 if !rows.is_empty() {
7 return Ok(rows);
8 }
9 let mut stmt = self.conn.prepare(
10 "SELECT id, kind, payload FROM sync_outbox WHERE sent = 0 ORDER BY id ASC LIMIT ?1",
11 )?;
12 let rows = stmt.query_map(params![limit as i64], |row| {
13 Ok((
14 row.get::<_, i64>(0)?,
15 row.get::<_, String>(1)?,
16 row.get::<_, String>(2)?,
17 ))
18 })?;
19 let mut out = Vec::new();
20 for r in rows {
21 out.push(r?);
22 }
23 Ok(out)
24 }
25
26 pub fn mark_outbox_sent(&self, ids: &[i64]) -> Result<()> {
27 self.outbox()?.delete_ids(ids)?;
28 for id in ids {
29 self.conn
30 .execute("UPDATE sync_outbox SET sent = 1 WHERE id = ?1", params![id])?;
31 }
32 Ok(())
33 }
34
35 pub fn replace_outbox_rows(
36 &self,
37 owner_id: &str,
38 kind: &str,
39 payloads: &[String],
40 ) -> Result<()> {
41 self.outbox()?.replace(owner_id, kind, payloads)?;
42 self.conn.execute(
43 "DELETE FROM sync_outbox WHERE session_id = ?1 AND kind = ?2 AND sent = 0",
44 params![owner_id, kind],
45 )?;
46 for payload in payloads {
47 self.conn.execute(
48 "INSERT INTO sync_outbox (session_id, kind, payload, sent) VALUES (?1, ?2, ?3, 0)",
49 params![owner_id, kind, payload],
50 )?;
51 }
52 Ok(())
53 }
54
55 pub fn outbox_pending_count(&self) -> Result<u64> {
56 let redb = self.outbox()?.pending_count()?;
57 if redb > 0 {
58 return Ok(redb);
59 }
60 let c: i64 =
61 self.conn
62 .query_row("SELECT COUNT(*) FROM sync_outbox WHERE sent = 0", [], |r| {
63 r.get(0)
64 })?;
65 Ok(c as u64)
66 }
67
68 pub fn set_sync_state_ok(&self) -> Result<()> {
69 let now = now_ms().to_string();
70 self.conn.execute(
71 "INSERT INTO sync_state (k, v) VALUES ('last_success_ms', ?1)
72 ON CONFLICT(k) DO UPDATE SET v = excluded.v",
73 params![now],
74 )?;
75 self.conn.execute(
76 "INSERT INTO sync_state (k, v) VALUES ('consecutive_failures', '0')
77 ON CONFLICT(k) DO UPDATE SET v = excluded.v",
78 [],
79 )?;
80 self.conn
81 .execute("DELETE FROM sync_state WHERE k = 'last_error'", [])?;
82 Ok(())
83 }
84
85 pub fn set_sync_state_error(&self, msg: &str) -> Result<()> {
86 let prev: i64 = self
87 .conn
88 .query_row(
89 "SELECT v FROM sync_state WHERE k = 'consecutive_failures'",
90 [],
91 |r| {
92 let s: String = r.get(0)?;
93 Ok(s.parse::<i64>().unwrap_or(0))
94 },
95 )
96 .optional()?
97 .unwrap_or(0);
98 let next = prev.saturating_add(1);
99 self.conn.execute(
100 "INSERT INTO sync_state (k, v) VALUES ('last_error', ?1)
101 ON CONFLICT(k) DO UPDATE SET v = excluded.v",
102 params![msg],
103 )?;
104 self.conn.execute(
105 "INSERT INTO sync_state (k, v) VALUES ('consecutive_failures', ?1)
106 ON CONFLICT(k) DO UPDATE SET v = excluded.v",
107 params![next.to_string()],
108 )?;
109 Ok(())
110 }
111
112 pub fn sync_status(&self) -> Result<SyncStatusSnapshot> {
113 let pending_outbox = self.outbox_pending_count()?;
114 let last_success_ms = self
115 .conn
116 .query_row(
117 "SELECT v FROM sync_state WHERE k = 'last_success_ms'",
118 [],
119 |r| r.get::<_, String>(0),
120 )
121 .optional()?
122 .and_then(|s| s.parse().ok());
123 let last_error = self
124 .conn
125 .query_row("SELECT v FROM sync_state WHERE k = 'last_error'", [], |r| {
126 r.get::<_, String>(0)
127 })
128 .optional()?;
129 let consecutive_failures = self
130 .conn
131 .query_row(
132 "SELECT v FROM sync_state WHERE k = 'consecutive_failures'",
133 [],
134 |r| r.get::<_, String>(0),
135 )
136 .optional()?
137 .and_then(|s| s.parse().ok())
138 .unwrap_or(0);
139 Ok(SyncStatusSnapshot {
140 pending_outbox,
141 last_success_ms,
142 last_error,
143 consecutive_failures,
144 })
145 }
146
147 pub fn sync_state_get_u64(&self, key: &str) -> Result<Option<u64>> {
148 let row: Option<String> = self
149 .conn
150 .query_row("SELECT v FROM sync_state WHERE k = ?1", params![key], |r| {
151 r.get::<_, String>(0)
152 })
153 .optional()?;
154 Ok(row.and_then(|s| s.parse().ok()))
155 }
156
157 pub fn sync_state_set_u64(&self, key: &str, v: u64) -> Result<()> {
158 self.conn.execute(
159 "INSERT INTO sync_state (k, v) VALUES (?1, ?2)
160 ON CONFLICT(k) DO UPDATE SET v = excluded.v",
161 params![key, v.to_string()],
162 )?;
163 Ok(())
164 }
165}