1use super::*;
2
3impl Store {
4 pub(super) fn append_outbox_row(
5 &self,
6 owner_id: &str,
7 kind: &str,
8 payload: &str,
9 ) -> Result<()> {
10 self.conn.execute(
11 "INSERT INTO sync_outbox (session_id, kind, payload, sent)
12 VALUES (?1, ?2, ?3, 0)",
13 params![owner_id, kind, payload],
14 )?;
15 Ok(())
16 }
17
18 pub fn list_outbox_pending(&self, limit: usize) -> Result<Vec<(i64, String, String)>> {
19 let mut stmt = self.conn.prepare(
20 "SELECT id, kind, payload FROM sync_outbox WHERE sent = 0 ORDER BY id ASC LIMIT ?1",
21 )?;
22 let rows = stmt.query_map(params![limit as i64], read_outbox_row)?;
23 rows.collect::<rusqlite::Result<_>>().map_err(Into::into)
24 }
25
26 pub fn mark_outbox_sent(&self, ids: &[i64]) -> Result<()> {
27 let tx = rusqlite::Transaction::new_unchecked(&self.conn, TransactionBehavior::Immediate)?;
28 ids.iter().try_for_each(|id| {
29 tx.execute("UPDATE sync_outbox SET sent = 1 WHERE id = ?1", [id])
30 .map(|_| ())
31 })?;
32 tx.commit()?;
33 Ok(())
34 }
35
36 pub fn replace_outbox_rows(
37 &self,
38 owner_id: &str,
39 kind: &str,
40 payloads: &[String],
41 ) -> Result<()> {
42 let tx = rusqlite::Transaction::new_unchecked(&self.conn, TransactionBehavior::Immediate)?;
43 tx.execute(
44 "DELETE FROM sync_outbox WHERE session_id = ?1 AND kind = ?2 AND sent = 0",
45 params![owner_id, kind],
46 )?;
47 insert_outbox_payloads(&tx, owner_id, kind, payloads)?;
48 tx.commit()?;
49 Ok(())
50 }
51
52 pub fn outbox_pending_count(&self) -> Result<u64> {
53 let c: i64 =
54 self.conn
55 .query_row("SELECT COUNT(*) FROM sync_outbox WHERE sent = 0", [], |r| {
56 r.get(0)
57 })?;
58 Ok(c as u64)
59 }
60
61 pub fn set_sync_state_ok(&self) -> Result<()> {
62 let now = now_ms().to_string();
63 self.conn.execute(
64 "INSERT INTO sync_state (k, v) VALUES ('last_success_ms', ?1)
65 ON CONFLICT(k) DO UPDATE SET v = excluded.v",
66 params![now],
67 )?;
68 self.conn.execute(
69 "INSERT INTO sync_state (k, v) VALUES ('consecutive_failures', '0')
70 ON CONFLICT(k) DO UPDATE SET v = excluded.v",
71 [],
72 )?;
73 self.conn
74 .execute("DELETE FROM sync_state WHERE k = 'last_error'", [])?;
75 Ok(())
76 }
77
78 pub fn set_sync_state_error(&self, msg: &str) -> Result<()> {
79 let prev: i64 = self
80 .conn
81 .query_row(
82 "SELECT v FROM sync_state WHERE k = 'consecutive_failures'",
83 [],
84 |r| {
85 let s: String = r.get(0)?;
86 Ok(s.parse::<i64>().unwrap_or(0))
87 },
88 )
89 .optional()?
90 .unwrap_or(0);
91 let next = prev.saturating_add(1);
92 self.conn.execute(
93 "INSERT INTO sync_state (k, v) VALUES ('last_error', ?1)
94 ON CONFLICT(k) DO UPDATE SET v = excluded.v",
95 params![msg],
96 )?;
97 self.conn.execute(
98 "INSERT INTO sync_state (k, v) VALUES ('consecutive_failures', ?1)
99 ON CONFLICT(k) DO UPDATE SET v = excluded.v",
100 params![next.to_string()],
101 )?;
102 Ok(())
103 }
104
105 pub fn sync_status(&self) -> Result<SyncStatusSnapshot> {
106 let pending_outbox = self.outbox_pending_count()?;
107 let last_success_ms = self
108 .conn
109 .query_row(
110 "SELECT v FROM sync_state WHERE k = 'last_success_ms'",
111 [],
112 |r| r.get::<_, String>(0),
113 )
114 .optional()?
115 .and_then(|s| s.parse().ok());
116 let last_error = self
117 .conn
118 .query_row("SELECT v FROM sync_state WHERE k = 'last_error'", [], |r| {
119 r.get::<_, String>(0)
120 })
121 .optional()?;
122 let consecutive_failures = self
123 .conn
124 .query_row(
125 "SELECT v FROM sync_state WHERE k = 'consecutive_failures'",
126 [],
127 |r| r.get::<_, String>(0),
128 )
129 .optional()?
130 .and_then(|s| s.parse().ok())
131 .unwrap_or(0);
132 Ok(SyncStatusSnapshot {
133 pending_outbox,
134 last_success_ms,
135 last_error,
136 consecutive_failures,
137 })
138 }
139
140 pub fn sync_state_get_u64(&self, key: &str) -> Result<Option<u64>> {
141 let row: Option<String> = self
142 .conn
143 .query_row("SELECT v FROM sync_state WHERE k = ?1", params![key], |r| {
144 r.get::<_, String>(0)
145 })
146 .optional()?;
147 Ok(row.and_then(|s| s.parse().ok()))
148 }
149
150 pub fn sync_state_set_u64(&self, key: &str, v: u64) -> Result<()> {
151 self.conn.execute(
152 "INSERT INTO sync_state (k, v) VALUES (?1, ?2)
153 ON CONFLICT(k) DO UPDATE SET v = excluded.v",
154 params![key, v.to_string()],
155 )?;
156 Ok(())
157 }
158}
159
160fn read_outbox_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<(i64, String, String)> {
161 Ok((row.get(0)?, row.get(1)?, row.get(2)?))
162}
163
164fn insert_outbox_payloads(
165 tx: &rusqlite::Transaction<'_>,
166 owner_id: &str,
167 kind: &str,
168 payloads: &[String],
169) -> rusqlite::Result<()> {
170 payloads.iter().try_for_each(|payload| {
171 tx.execute(
172 "INSERT INTO sync_outbox (session_id, kind, payload, sent)
173 VALUES (?1, ?2, ?3, 0)",
174 params![owner_id, kind, payload],
175 )
176 .map(|_| ())
177 })
178}