difflore_core/cloud/observations/
sync.rs1use super::events::ObservationEvent;
2use super::storage::{MAX_FLUSH_BATCH, ObservationEmitter, now_unix_ms, truncate};
3use crate::cloud::outbox_core::{RetryDecision, backoff_delay_ms, decide_retry};
4use sqlx::Row;
5
6impl ObservationEmitter {
7 pub async fn retry_pending_uploads_now(&self) -> Result<u64, String> {
8 let now = now_unix_ms();
9 let result = sqlx::query(
10 "UPDATE observation_events \
11 SET next_attempt_at_ms = ?1 \
12 WHERE status = 'pending' AND next_attempt_at_ms > ?1",
13 )
14 .bind(now)
15 .execute(self.pool())
16 .await
17 .map_err(|e| format!("reset pending observation retry time: {e}"))?;
18 Ok(result.rows_affected())
19 }
20
21 pub async fn flush_to_cloud(
22 &self,
23 client: &crate::cloud::client::CloudClient,
24 ) -> Result<(usize, usize), String> {
25 if !client.is_logged_in() {
26 return Ok((0, 0));
27 }
28
29 let now = now_unix_ms();
30 let rows = sqlx::query(
31 "SELECT id, payload_json, retry_count FROM observation_events \
32 WHERE status = 'pending' AND next_attempt_at_ms <= ?1 \
33 ORDER BY created_at_ms ASC, id ASC LIMIT ?2",
34 )
35 .bind(now)
36 .bind(MAX_FLUSH_BATCH)
37 .fetch_all(self.pool())
38 .await
39 .map_err(|e| format!("select observation batch: {e}"))?;
40
41 if rows.is_empty() {
42 return Ok((0, 0));
43 }
44
45 let mut ids = Vec::with_capacity(rows.len());
46 let mut events = Vec::with_capacity(rows.len());
47 let mut retry_counts = Vec::with_capacity(rows.len());
48 for row in rows {
49 let id: i64 = row.try_get("id").unwrap_or_default();
50 let payload: String = row.try_get("payload_json").unwrap_or_default();
51 let retry_count: i64 = row.try_get("retry_count").unwrap_or_default();
52 match serde_json::from_str::<ObservationEvent>(&payload) {
53 Ok(event) => {
54 ids.push(id);
55 events.push(event);
56 retry_counts.push(retry_count);
57 }
58 Err(e) => {
59 self.abandon(id, &format!("decode observation event: {e}"))
60 .await?;
61 }
62 }
63 }
64
65 if events.is_empty() {
66 return Ok((0, 0));
67 }
68
69 let attempted = events.len();
70 if client.post_observation_events_result(&events).await.is_ok() {
71 let sent_at = now_unix_ms();
72 for id in &ids {
73 self.mark_sent(*id, sent_at).await?;
74 }
75 let _ = self.cap_queue().await;
76 return Ok((attempted, attempted));
77 }
78
79 let sent_at = now_unix_ms();
80 let mut sent = 0usize;
81 for ((id, event), retry_count) in ids.into_iter().zip(events.iter()).zip(retry_counts) {
82 match client
83 .post_observation_events_result(std::slice::from_ref(event))
84 .await
85 {
86 Ok(()) => {
87 self.mark_sent(id, sent_at).await?;
88 sent += 1;
89 }
90 Err(err) => {
91 self.mark_failed(id, retry_count, &err).await?;
92 }
93 }
94 }
95 let _ = self.cap_queue().await;
96 Ok((attempted, sent))
97 }
98
99 pub(super) async fn mark_failed(
100 &self,
101 id: i64,
102 retry_count: i64,
103 err: &str,
104 ) -> Result<(), String> {
105 let next_count = match decide_retry(retry_count) {
110 RetryDecision::Abandon { .. } => return self.abandon(id, err).await,
111 RetryDecision::Retry { next_count } => next_count,
112 };
113 let delay_ms = backoff_delay_ms(next_count);
117 let next_attempt = now_unix_ms().saturating_add(delay_ms);
118 sqlx::query(
119 "UPDATE observation_events \
120 SET retry_count = ?1, next_attempt_at_ms = ?2, last_error = ?3 \
121 WHERE id = ?4",
122 )
123 .bind(next_count)
124 .bind(next_attempt)
125 .bind(truncate(err, 2048))
126 .bind(id)
127 .execute(self.pool())
128 .await
129 .map_err(|e| format!("mark observation failed: {e}"))?;
130 Ok(())
131 }
132
133 pub(super) async fn mark_sent(&self, id: i64, sent_at_ms: i64) -> Result<(), String> {
134 sqlx::query("UPDATE observation_events SET status = 'sent', sent_at_ms = ?1 WHERE id = ?2")
135 .bind(sent_at_ms)
136 .bind(id)
137 .execute(self.pool())
138 .await
139 .map_err(|e| format!("mark observation sent: {e}"))?;
140 Ok(())
141 }
142
143 pub async fn drain_abandoned_older_than(
158 &self,
159 cutoff_unix_ms: i64,
160 dry_run: bool,
161 ) -> Result<crate::cloud::outbox::DrainSummary, String> {
162 let mut tx = self
163 .pool()
164 .begin()
165 .await
166 .map_err(|e| format!("begin drain tx: {e}"))?;
167
168 let rows = sqlx::query(
169 "SELECT event_type, COUNT(*) AS c \
170 FROM observation_events \
171 WHERE status = 'abandoned' AND created_at_ms < ?1 \
172 GROUP BY event_type",
173 )
174 .bind(cutoff_unix_ms)
175 .fetch_all(&mut *tx)
176 .await
177 .map_err(|e| format!("count abandoned observations: {e}"))?;
178
179 let mut summary = crate::cloud::outbox::DrainSummary::default();
180 for row in rows {
181 let kind: String = Row::try_get(&row, "event_type").unwrap_or_default();
182 let count: i64 = Row::try_get(&row, "c").unwrap_or_default();
183 summary.per_kind.push((kind, count));
184 summary.total += count;
185 }
186 summary.per_kind.sort_by(|a, b| a.0.cmp(&b.0));
187
188 if dry_run || summary.total == 0 {
189 tx.rollback()
190 .await
191 .map_err(|e| format!("rollback drain tx: {e}"))?;
192 return Ok(summary);
193 }
194
195 let now = now_unix_ms();
200 let result = sqlx::query(
201 "UPDATE observation_events \
202 SET status = 'pending', \
203 retry_count = 0, \
204 next_attempt_at_ms = ?1, \
205 last_error = NULL \
206 WHERE status = 'abandoned' AND created_at_ms < ?2",
207 )
208 .bind(now)
209 .bind(cutoff_unix_ms)
210 .execute(&mut *tx)
211 .await
212 .map_err(|e| format!("reset abandoned observations: {e}"))?;
213 tx.commit()
214 .await
215 .map_err(|e| format!("commit drain tx: {e}"))?;
216
217 let affected = i64::try_from(result.rows_affected()).unwrap_or(summary.total);
218 summary.total = affected;
219 Ok(summary)
220 }
221
222 pub(super) async fn abandon(&self, id: i64, err: &str) -> Result<(), String> {
223 sqlx::query(
224 "UPDATE observation_events \
225 SET status = 'abandoned', last_error = ?1 WHERE id = ?2",
226 )
227 .bind(truncate(err, 2048))
228 .bind(id)
229 .execute(self.pool())
230 .await
231 .map_err(|e| format!("abandon observation: {e}"))?;
232 Ok(())
233 }
234}