Skip to main content

difflore_core/cloud/observations/
sync.rs

1use super::events::ObservationEvent;
2use super::storage::{MAX_FLUSH_BATCH, ObservationEmitter, now_unix_ms, truncate};
3use crate::cloud::outbox_core::{RetryDecision, backoff_delay_ms, decide_retry};
4use sqlx::Row;
5
6impl ObservationEmitter {
7    pub async fn retry_pending_uploads_now(&self) -> Result<u64, String> {
8        let now = now_unix_ms();
9        let result = sqlx::query(
10            "UPDATE observation_events \
11             SET next_attempt_at_ms = ?1 \
12             WHERE status = 'pending' AND next_attempt_at_ms > ?1",
13        )
14        .bind(now)
15        .execute(self.pool())
16        .await
17        .map_err(|e| format!("reset pending observation retry time: {e}"))?;
18        Ok(result.rows_affected())
19    }
20
21    pub async fn flush_to_cloud(
22        &self,
23        client: &crate::cloud::client::CloudClient,
24    ) -> Result<(usize, usize), String> {
25        if !client.is_logged_in() {
26            return Ok((0, 0));
27        }
28
29        let now = now_unix_ms();
30        let rows = sqlx::query(
31            "SELECT id, payload_json, retry_count FROM observation_events \
32             WHERE status = 'pending' AND next_attempt_at_ms <= ?1 \
33             ORDER BY created_at_ms ASC, id ASC LIMIT ?2",
34        )
35        .bind(now)
36        .bind(MAX_FLUSH_BATCH)
37        .fetch_all(self.pool())
38        .await
39        .map_err(|e| format!("select observation batch: {e}"))?;
40
41        if rows.is_empty() {
42            return Ok((0, 0));
43        }
44
45        let mut ids = Vec::with_capacity(rows.len());
46        let mut events = Vec::with_capacity(rows.len());
47        let mut retry_counts = Vec::with_capacity(rows.len());
48        for row in rows {
49            let id: i64 = row.try_get("id").unwrap_or_default();
50            let payload: String = row.try_get("payload_json").unwrap_or_default();
51            let retry_count: i64 = row.try_get("retry_count").unwrap_or_default();
52            match serde_json::from_str::<ObservationEvent>(&payload) {
53                Ok(event) => {
54                    ids.push(id);
55                    events.push(event);
56                    retry_counts.push(retry_count);
57                }
58                Err(e) => {
59                    self.abandon(id, &format!("decode observation event: {e}"))
60                        .await?;
61                }
62            }
63        }
64
65        if events.is_empty() {
66            return Ok((0, 0));
67        }
68
69        let attempted = events.len();
70        if client.post_observation_events_result(&events).await.is_ok() {
71            let sent_at = now_unix_ms();
72            for id in &ids {
73                self.mark_sent(*id, sent_at).await?;
74            }
75            let _ = self.cap_queue().await;
76            return Ok((attempted, attempted));
77        }
78
79        let sent_at = now_unix_ms();
80        let mut sent = 0usize;
81        for ((id, event), retry_count) in ids.into_iter().zip(events.iter()).zip(retry_counts) {
82            match client
83                .post_observation_events_result(std::slice::from_ref(event))
84                .await
85            {
86                Ok(()) => {
87                    self.mark_sent(id, sent_at).await?;
88                    sent += 1;
89                }
90                Err(err) => {
91                    self.mark_failed(id, retry_count, &err).await?;
92                }
93            }
94        }
95        let _ = self.cap_queue().await;
96        Ok((attempted, sent))
97    }
98
99    pub(super) async fn mark_failed(
100        &self,
101        id: i64,
102        retry_count: i64,
103        err: &str,
104    ) -> Result<(), String> {
105        // Shared retry/abandon decision (unified `MAX_RETRY_COUNT`).
106        // Equivalent to the prior `next = retry_count + 1; next >=
107        // MAX_RETRY_COUNT ? abandon : backoff` — this queue keeps its
108        // exponential-backoff re-schedule for the retry case.
109        let next_count = match decide_retry(retry_count) {
110            RetryDecision::Abandon { .. } => return self.abandon(id, err).await,
111            RetryDecision::Retry { next_count } => next_count,
112        };
113        // `backoff_delay_ms` reproduces the previous inline
114        // `60_000 * (1 << clamp(next_count, 0, 5))` exactly, including
115        // the checked-shift / saturating-mul overflow guards.
116        let delay_ms = backoff_delay_ms(next_count);
117        let next_attempt = now_unix_ms().saturating_add(delay_ms);
118        sqlx::query(
119            "UPDATE observation_events \
120             SET retry_count = ?1, next_attempt_at_ms = ?2, last_error = ?3 \
121             WHERE id = ?4",
122        )
123        .bind(next_count)
124        .bind(next_attempt)
125        .bind(truncate(err, 2048))
126        .bind(id)
127        .execute(self.pool())
128        .await
129        .map_err(|e| format!("mark observation failed: {e}"))?;
130        Ok(())
131    }
132
133    pub(super) async fn mark_sent(&self, id: i64, sent_at_ms: i64) -> Result<(), String> {
134        sqlx::query("UPDATE observation_events SET status = 'sent', sent_at_ms = ?1 WHERE id = ?2")
135            .bind(sent_at_ms)
136            .bind(id)
137            .execute(self.pool())
138            .await
139            .map_err(|e| format!("mark observation sent: {e}"))?;
140        Ok(())
141    }
142
143    /// Resurrect `abandoned` observation_events rows older than
144    /// `cutoff_unix_ms` back to `pending`. Returns the number of rows
145    /// that were (or would be, in `dry_run` mode) reset, bucketed by
146    /// `event_type` (sorted ascending so doctor output is stable).
147    ///
148    /// Uses a single transaction so a partial drain cannot leave the
149    /// queue half-reset; `dry_run = true` rolls back instead of committing.
150    ///
151    /// Cutoff: a row is eligible iff its `created_at_ms` is older than
152    /// the provided cutoff. We deliberately don't use
153    /// `next_attempt_at_ms` because it isn't carried forward when a
154    /// row is abandoned (the prior `mark_failed` rewrote it for the
155    /// would-be retry that never happened), so `created_at_ms` is the
156    /// stable age signal.
157    pub async fn drain_abandoned_older_than(
158        &self,
159        cutoff_unix_ms: i64,
160        dry_run: bool,
161    ) -> Result<crate::cloud::outbox::DrainSummary, String> {
162        let mut tx = self
163            .pool()
164            .begin()
165            .await
166            .map_err(|e| format!("begin drain tx: {e}"))?;
167
168        let rows = sqlx::query(
169            "SELECT event_type, COUNT(*) AS c \
170             FROM observation_events \
171             WHERE status = 'abandoned' AND created_at_ms < ?1 \
172             GROUP BY event_type",
173        )
174        .bind(cutoff_unix_ms)
175        .fetch_all(&mut *tx)
176        .await
177        .map_err(|e| format!("count abandoned observations: {e}"))?;
178
179        let mut summary = crate::cloud::outbox::DrainSummary::default();
180        for row in rows {
181            let kind: String = Row::try_get(&row, "event_type").unwrap_or_default();
182            let count: i64 = Row::try_get(&row, "c").unwrap_or_default();
183            summary.per_kind.push((kind, count));
184            summary.total += count;
185        }
186        summary.per_kind.sort_by(|a, b| a.0.cmp(&b.0));
187
188        if dry_run || summary.total == 0 {
189            tx.rollback()
190                .await
191                .map_err(|e| format!("rollback drain tx: {e}"))?;
192            return Ok(summary);
193        }
194
195        // Resurrected rows must be due immediately (`next_attempt_at_ms`
196        // = now) and free of any prior error context. We deliberately do
197        // NOT touch `created_at_ms` so the cap-queue trimmer's age
198        // ordering is preserved.
199        let now = now_unix_ms();
200        let result = sqlx::query(
201            "UPDATE observation_events \
202             SET status = 'pending', \
203                 retry_count = 0, \
204                 next_attempt_at_ms = ?1, \
205                 last_error = NULL \
206             WHERE status = 'abandoned' AND created_at_ms < ?2",
207        )
208        .bind(now)
209        .bind(cutoff_unix_ms)
210        .execute(&mut *tx)
211        .await
212        .map_err(|e| format!("reset abandoned observations: {e}"))?;
213        tx.commit()
214            .await
215            .map_err(|e| format!("commit drain tx: {e}"))?;
216
217        let affected = i64::try_from(result.rows_affected()).unwrap_or(summary.total);
218        summary.total = affected;
219        Ok(summary)
220    }
221
222    pub(super) async fn abandon(&self, id: i64, err: &str) -> Result<(), String> {
223        sqlx::query(
224            "UPDATE observation_events \
225             SET status = 'abandoned', last_error = ?1 WHERE id = ?2",
226        )
227        .bind(truncate(err, 2048))
228        .bind(id)
229        .execute(self.pool())
230        .await
231        .map_err(|e| format!("abandon observation: {e}"))?;
232        Ok(())
233    }
234}