Skip to main content

difflore_core/cloud/
outbox.rs

1//! SQLite-backed outbox queue for fire-and-forget cloud uploads.
2//!
3//! Every fire-and-forget cloud POST (trajectory, `review_metrics`,
4//! `accepted_edit`, `mcp_query`, `imported_reviews`) is first appended as a
5//! `pending` row in the global `~/.difflore/data.db`. Drain is triggered
6//! synchronously from hook / CLI cold paths — there is deliberately no
7//! background daemon.
8//!
9//! Claim/confirm semantics:
10//!
11//! ```text
12//! enqueue()     -> INSERT status='pending'
13//! claim_next()  -> UPDATE status='processing' (atomic, oldest first)
14//! confirm(id)   -> DELETE
15//! mark_failed() -> UPDATE retry_count++; >=MAX_RETRY_COUNT -> status='abandoned'
16//! reset_stale() -> processing > threshold seconds -> pending
17//! ```
18//!
19//! Circuit breaker: three consecutive `mark_failed` calls trip the breaker
20//! for 60 s; while open, `claim_next` returns `None` so callers short-
21//! circuit without hammering an unreachable cloud. Any successful
22//! `confirm` resets the consecutive-failure counter.
23//!
24//! Idempotency contract: `claim_next` deliberately self-heals stale
25//! `processing` rows after `DEFAULT_STALE_SECONDS`. A very slow upload can
26//! therefore be retried by a later drain pass. Every cloud endpoint reached
27//! from this queue must treat duplicate payloads as idempotent, keyed by
28//! the event id / request signature carried in the payload. The queue
29//! chooses at-least-once delivery over permanent local data loss.
30
31use super::outbox_core::{self, RetryDecision, now_unix_ms};
32use serde::{Deserialize, Serialize};
33use sqlx::SqlitePool;
34use std::sync::atomic::{AtomicI64, AtomicU32, Ordering};
35use std::sync::{Arc, OnceLock};
36use tokio::sync::Mutex;
37
38/// How long a processing row is allowed to sit before `reset_stale` will
39/// recover it. Anything claimed by a crashed / hung drain pass falls back
40/// to `pending` after this many seconds.
41pub const DEFAULT_STALE_SECONDS: u64 = 60;
42
43/// How many consecutive `mark_failed` calls trip the circuit breaker.
44pub const CIRCUIT_FAILURE_THRESHOLD: u32 = 3;
45
46/// How long (ms) the circuit stays open before `claim_next` starts
47/// returning rows again.
48pub const CIRCUIT_OPEN_DURATION_MS: i64 = 60_000;
49
50/// Maximum delivery attempts per outbox item. After this many failures,
51/// the item is marked `abandoned` and is no longer claimed.
52pub const MAX_RETRY_COUNT: i64 = outbox_core::MAX_RETRY_COUNT;
53
54static DRAIN_SERIALIZATION_LOCK: OnceLock<Mutex<()>> = OnceLock::new();
55
56fn drain_serialization_lock() -> &'static Mutex<()> {
57    DRAIN_SERIALIZATION_LOCK.get_or_init(|| Mutex::new(()))
58}
59
60/// A row in `cloud_outbox` that has been claimed for processing.
61#[derive(Debug, Clone, Serialize, Deserialize)]
62pub struct OutboxItem {
63    pub id: i64,
64    pub kind: String,
65    pub payload_json: String,
66    pub retry_count: i64,
67}
68
69/// Current state of the breaker. `Open` means callers should short-
70/// circuit until the `until_unix_ms` timestamp has passed.
71#[derive(Debug, Clone, Copy, PartialEq, Eq)]
72pub enum CircuitState {
73    Closed,
74    Open { until_unix_ms: i64 },
75}
76
77/// Queue handle. Cheap to clone; all state is either on disk or inside
78/// an `Arc<Atomic*>` so multiple callers inside the same process observe
79/// the same breaker state.
80#[derive(Debug, Clone)]
81pub struct OutboxQueue {
82    pool: SqlitePool,
83    /// Count of consecutive `mark_failed` calls since the last
84    /// successful `confirm`. Shared across clones.
85    consecutive_failures: Arc<AtomicU32>,
86    /// Unix-ms until which the circuit stays open. `0` when closed.
87    circuit_open_until_ms: Arc<AtomicI64>,
88}
89
90impl OutboxQueue {
91    /// Build a queue handle from an existing pool. The pool must have the
92    /// `cloud_outbox` migration applied (i.e. `init_db` was called on it).
93    pub fn new(pool: SqlitePool) -> Self {
94        Self {
95            pool,
96            consecutive_failures: Arc::new(AtomicU32::new(0)),
97            circuit_open_until_ms: Arc::new(AtomicI64::new(0)),
98        }
99    }
100
101    /// Insert a new fire-and-forget payload. Returns the row id so tests
102    /// (and the occasional curious caller) can trace a specific item.
103    /// Production callers usually ignore the returned id.
104    pub async fn enqueue(&self, kind: &str, payload_json: &str) -> Result<i64, sqlx::Error> {
105        if !crate::cloud::capture::capture_enabled() {
106            return Ok(0);
107        }
108        let now = now_unix_ms();
109        let result = sqlx::query!(
110            "INSERT INTO cloud_outbox (kind, payload_json, status, retry_count, created_at) \
111             VALUES (?1, ?2, 'pending', 0, ?3)",
112            kind,
113            payload_json,
114            now
115        )
116        .execute(&self.pool)
117        .await?;
118        Ok(result.last_insert_rowid())
119    }
120
121    /// Current breaker state. `None` rows + closed breaker ⇒ callers may
122    /// proceed. `Open` ⇒ short-circuit. Callers should check this *before*
123    /// building expensive payloads for bulk drains.
124    pub fn circuit_state(&self) -> CircuitState {
125        let until = self.circuit_open_until_ms.load(Ordering::SeqCst);
126        if until == 0 {
127            return CircuitState::Closed;
128        }
129        if now_unix_ms() >= until {
130            // Expired — allow the next claim to cycle the breaker closed.
131            // We intentionally don't reset the counter here; the first
132            // successful `confirm` does that, and the next `mark_failed`
133            // just re-opens the breaker.
134            self.circuit_open_until_ms.store(0, Ordering::SeqCst);
135            CircuitState::Closed
136        } else {
137            CircuitState::Open {
138                until_unix_ms: until,
139            }
140        }
141    }
142
143    /// Atomically pick the oldest `pending` row and flip it to
144    /// `processing`. Returns `None` when the queue is empty or when the
145    /// circuit breaker is open.
146    ///
147    /// The UPDATE uses `RETURNING` so claim-and-read happen in one
148    /// statement (`SQLite` serialises writes per connection, so this is
149    /// equivalent to `SELECT … FOR UPDATE` on a row-at-a-time queue).
150    ///
151    pub async fn claim_next(&self) -> Result<Option<OutboxItem>, sqlx::Error> {
152        if matches!(self.circuit_state(), CircuitState::Open { .. }) {
153            return Ok(None);
154        }
155
156        let now = now_unix_ms();
157        // Rows `status = 'processing'` whose `claimed_at` is older than
158        // `DEFAULT_STALE_SECONDS` are eligible to be re-claimed here:
159        // a previous claimer crashed / SIGKILL'd / froze. Folding the
160        // recovery into the same atomic UPDATE means no caller needs to
161        // remember to invoke `reset_stale` separately — the queue self-
162        // heals on every claim. Independent `reset_stale` is kept as a
163        // public entry point for startup + diagnostics.
164        let stale_cutoff = now - (DEFAULT_STALE_SECONDS as i64) * 1000;
165        let row = sqlx::query!(
166            r#"UPDATE cloud_outbox
167             SET status = 'processing', claimed_at = ?1
168             WHERE id = (
169                 SELECT id FROM cloud_outbox
170                 WHERE status = 'pending'
171                    OR (status = 'processing'
172                        AND claimed_at IS NOT NULL
173                        AND claimed_at < ?2)
174                 ORDER BY created_at ASC, id ASC
175                 LIMIT 1
176             )
177             RETURNING id as "id!: i64", kind, payload_json, retry_count"#,
178            now,
179            stale_cutoff
180        )
181        .fetch_optional(&self.pool)
182        .await?;
183
184        Ok(row.map(|r| OutboxItem {
185            id: r.id,
186            kind: r.kind,
187            payload_json: r.payload_json,
188            retry_count: r.retry_count,
189        }))
190    }
191
192    pub async fn claim_next_kind(&self, kind: &str) -> Result<Option<OutboxItem>, sqlx::Error> {
193        if matches!(self.circuit_state(), CircuitState::Open { .. }) {
194            return Ok(None);
195        }
196
197        let now = now_unix_ms();
198        let stale_cutoff = now - (DEFAULT_STALE_SECONDS as i64) * 1000;
199        let row = sqlx::query!(
200            r#"UPDATE cloud_outbox
201             SET status = 'processing', claimed_at = ?1
202             WHERE id = (
203                 SELECT id FROM cloud_outbox
204                 WHERE kind = ?3
205                   AND (status = 'pending'
206                        OR (status = 'processing'
207                            AND claimed_at IS NOT NULL
208                            AND claimed_at < ?2))
209                 ORDER BY created_at ASC, id ASC
210                 LIMIT 1
211             )
212             RETURNING id AS "id!: i64", kind AS "kind!: String", payload_json AS "payload_json!: String", retry_count AS "retry_count!: i64""#,
213            now,
214            stale_cutoff,
215            kind,
216        )
217        .fetch_optional(&self.pool)
218        .await?;
219
220        Ok(row.map(|r| OutboxItem {
221            id: r.id,
222            kind: r.kind,
223            payload_json: r.payload_json,
224            retry_count: r.retry_count,
225        }))
226    }
227
228    /// Upload succeeded — delete the row and reset the consecutive-
229    /// failure counter so the circuit can close again.
230    pub async fn confirm(&self, id: i64) -> Result<(), sqlx::Error> {
231        sqlx::query!("DELETE FROM cloud_outbox WHERE id = ?1", id)
232            .execute(&self.pool)
233            .await?;
234        self.consecutive_failures.store(0, Ordering::SeqCst);
235        // Do NOT reset the breaker here — we let the natural expiry or
236        // the next `claim_next` cycle close it. This avoids races where
237        // one confirm sneaks between two failures.
238        Ok(())
239    }
240
241    /// Upload failed. If the row has been tried fewer than
242    /// `MAX_RETRY_COUNT` times, bounce it back to `pending` so a later
243    /// drain can retry. Otherwise flip it to `abandoned` — we keep the
244    /// row for diagnostics but will never re-claim it.
245    ///
246    /// This also ticks the consecutive-failure counter and, on the
247    /// threshold, opens the circuit for `CIRCUIT_OPEN_DURATION_MS`.
248    pub async fn mark_failed(&self, id: i64, err: &str) -> Result<(), sqlx::Error> {
249        // Trim unbounded errors so cascade failures cannot bloat the DB.
250        let err_trimmed: String = outbox_core::truncate(err, 2048);
251
252        // Load current retry_count to decide the transition.
253        let current = sqlx::query!(
254            "SELECT retry_count, status FROM cloud_outbox WHERE id = ?1",
255            id
256        )
257        .fetch_optional(&self.pool)
258        .await?;
259
260        let Some(row) = current else {
261            // Row vanished between claim and mark_failed. Caller may have
262            // raced with a confirm from another drain pass; treat as a
263            // no-op and don't tick the failure counter.
264            return Ok(());
265        };
266
267        // Shared retry/abandon decision. This queue retries by bouncing
268        // rows to `pending`; it does not schedule backoff delays.
269        let (new_status, new_count) = match outbox_core::decide_retry(row.retry_count) {
270            RetryDecision::Retry { next_count } => ("pending", next_count),
271            RetryDecision::Abandon { next_count } => ("abandoned", next_count),
272        };
273
274        sqlx::query!(
275            "UPDATE cloud_outbox \
276             SET status = ?1, retry_count = ?2, last_error = ?3, claimed_at = NULL \
277             WHERE id = ?4",
278            new_status,
279            new_count,
280            err_trimmed,
281            id
282        )
283        .execute(&self.pool)
284        .await?;
285
286        // Trip the circuit breaker after N consecutive failures.
287        let prev = self.consecutive_failures.fetch_add(1, Ordering::SeqCst);
288        if prev + 1 >= CIRCUIT_FAILURE_THRESHOLD {
289            let until = now_unix_ms() + CIRCUIT_OPEN_DURATION_MS;
290            self.circuit_open_until_ms.store(until, Ordering::SeqCst);
291        }
292
293        Ok(())
294    }
295
296    /// Promote any `processing` rows older than `threshold_secs` back to
297    /// `pending`. Called at startup (see `startup.rs`) to recover from
298    /// crashed drains.
299    pub async fn reset_stale(&self, threshold_secs: u64) -> Result<u64, sqlx::Error> {
300        let cutoff = now_unix_ms() - (threshold_secs as i64) * 1000;
301        let result = sqlx::query!(
302            "UPDATE cloud_outbox \
303             SET status = 'pending', claimed_at = NULL \
304             WHERE status = 'processing' AND claimed_at IS NOT NULL AND claimed_at < ?1",
305            cutoff
306        )
307        .execute(&self.pool)
308        .await?;
309        Ok(result.rows_affected())
310    }
311
312    /// Per-kind breakdown of `status='pending'` rows for lag warnings.
313    /// Sorted by kind for deterministic rendering. Uses runtime SQL so
314    /// this diagnostic helper does not require `.sqlx` cache updates.
315    pub async fn pending_counts_by_kind(&self) -> Result<Vec<(String, i64)>, sqlx::Error> {
316        let rows = sqlx::query(
317            "SELECT kind, COUNT(*) AS c \
318             FROM cloud_outbox WHERE status = 'pending' GROUP BY kind",
319        )
320        .fetch_all(&self.pool)
321        .await?;
322        let mut out: Vec<(String, i64)> = rows
323            .into_iter()
324            .map(|r| {
325                let kind: String = sqlx::Row::try_get(&r, "kind").unwrap_or_default();
326                let count: i64 = sqlx::Row::try_get(&r, "c").unwrap_or_default();
327                (kind, count)
328            })
329            .collect();
330        out.sort_by(|a, b| a.0.cmp(&b.0));
331        Ok(out)
332    }
333
334    /// Drain stale `abandoned` rows older than `cutoff_unix_ms` back to
335    /// the live queue. Returns a per-kind breakdown of rows that were
336    /// (or would be, in `dry_run` mode) reset.
337    ///
338    /// Cutoff semantics: a row is eligible iff its most recent
339    /// `claimed_at` (last attempt) — or `created_at` if it was
340    /// abandoned before any attempt — is **older than**
341    /// `cutoff_unix_ms`. This deliberately leaves recently-abandoned
342    /// rows alone: those are almost certainly the symptom of a current
343    /// outage rather than a stale-auth backlog.
344    ///
345    /// Tx-safety: the whole operation runs inside a single
346    /// `BEGIN/COMMIT` so a partial drain cannot leave the queue in a
347    /// half-reset state. `dry_run = true` still runs the SELECT but
348    /// rolls back instead of committing, so it has the same
349    /// transactional snapshot guarantees as the real path while
350    /// remaining strictly read-only.
351    pub async fn drain_abandoned_older_than(
352        &self,
353        cutoff_unix_ms: i64,
354        dry_run: bool,
355    ) -> Result<DrainSummary, sqlx::Error> {
356        let mut tx = self.pool.begin().await?;
357        let rows = sqlx::query(
358            "SELECT kind, COUNT(*) AS c \
359             FROM cloud_outbox \
360             WHERE status = 'abandoned' \
361               AND COALESCE(claimed_at, created_at) < ?1 \
362             GROUP BY kind",
363        )
364        .bind(cutoff_unix_ms)
365        .fetch_all(&mut *tx)
366        .await?;
367
368        let mut summary = DrainSummary::default();
369        for row in rows {
370            let kind: String = sqlx::Row::try_get(&row, "kind").unwrap_or_default();
371            let count: i64 = sqlx::Row::try_get(&row, "c").unwrap_or_default();
372            summary.per_kind.push((kind, count));
373            summary.total += count;
374        }
375        summary.per_kind.sort_by(|a, b| a.0.cmp(&b.0));
376
377        if dry_run || summary.total == 0 {
378            // Nothing to mutate; roll the snapshot back so we never
379            // commit a no-op tx (cheaper, and keeps the audit clean).
380            tx.rollback().await?;
381            return Ok(summary);
382        }
383
384        let affected = sqlx::query(
385            "UPDATE cloud_outbox \
386             SET status = 'pending', \
387                 retry_count = 0, \
388                 last_error = NULL, \
389                 claimed_at = NULL \
390             WHERE status = 'abandoned' \
391               AND COALESCE(claimed_at, created_at) < ?1",
392        )
393        .bind(cutoff_unix_ms)
394        .execute(&mut *tx)
395        .await?;
396        tx.commit().await?;
397
398        // Reset in-process breaker state so the next drain pass after a
399        // resurrection isn't immediately short-circuited by the
400        // counter left over from the auth-revoke storm. The on-disk
401        // row state is what's authoritative; this is just hygiene.
402        self.consecutive_failures.store(0, Ordering::SeqCst);
403        self.circuit_open_until_ms.store(0, Ordering::SeqCst);
404
405        // Reconcile the per-kind summary's `total` with the actual
406        // affected row count. They can only diverge if a concurrent
407        // writer abandoned another eligible row between the SELECT and
408        // the UPDATE inside the same tx — vanishingly rare, but record
409        // the real number rather than the snapshot count.
410        let affected = i64::try_from(affected.rows_affected()).unwrap_or(summary.total);
411        summary.total = affected;
412        Ok(summary)
413    }
414
415    /// Number of rows in each status bucket. Diagnostics only — e.g.
416    /// `difflore doctor` surfaces this so users can see a backlog
417    /// building up.
418    pub async fn counts(&self) -> Result<OutboxCounts, sqlx::Error> {
419        let rows = sqlx::query!(
420            r#"SELECT status, COUNT(*) as "c!: i64" FROM cloud_outbox GROUP BY status"#
421        )
422        .fetch_all(&self.pool)
423        .await?;
424        let mut out = OutboxCounts::default();
425        for r in rows {
426            let status: String = r.status;
427            let count: i64 = r.c;
428            match status.as_str() {
429                "pending" => out.pending = count,
430                "processing" => out.processing = count,
431                "failed" => out.failed = count,
432                "abandoned" => out.abandoned = count,
433                _ => {}
434            }
435        }
436        Ok(out)
437    }
438}
439
440/// Summary row for `difflore doctor` / diagnostics.
441#[derive(Debug, Clone, Copy, Default)]
442pub struct OutboxCounts {
443    pub pending: i64,
444    pub processing: i64,
445    pub failed: i64,
446    pub abandoned: i64,
447}
448
449/// Result of a `drain_abandoned_older_than` call (dry-run or real).
450///
451/// `total` is the number of rows that were (or would be) reset to
452/// `pending`. `per_kind` is the same number bucketed by the row's
453/// `kind` column (or `event_type` for the observation queue), sorted
454/// ascending so callers render deterministically.
455#[derive(Debug, Default, Clone, PartialEq, Eq)]
456pub struct DrainSummary {
457    pub total: i64,
458    pub per_kind: Vec<(String, i64)>,
459}
460
461#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
462pub struct AcceptedEditAttributionSummary {
463    pub uploaded: usize,
464    pub launch_grade: usize,
465    pub missing_team_workspace: usize,
466    pub missing_rule_ids: usize,
467    pub unlinked_rule_observations: usize,
468}
469
470impl AcceptedEditAttributionSummary {
471    pub const fn warning_count(self) -> usize {
472        self.missing_team_workspace + self.missing_rule_ids + self.unlinked_rule_observations
473    }
474
475    pub const fn add(&mut self, other: Self) {
476        self.uploaded += other.uploaded;
477        self.launch_grade += other.launch_grade;
478        self.missing_team_workspace += other.missing_team_workspace;
479        self.missing_rule_ids += other.missing_rule_ids;
480        self.unlinked_rule_observations += other.unlinked_rule_observations;
481    }
482}
483
484#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
485pub struct OutboxDrainReport {
486    pub attempted: usize,
487    pub confirmed: usize,
488    pub accepted_edit_attribution: AcceptedEditAttributionSummary,
489}
490
491/// Per-row dispatch result. `last_error` carries the diagnostic string
492/// to persist into `cloud_outbox.last_error` when `ok` is false.
493struct DispatchOutcome {
494    ok: bool,
495    accepted_edit_attribution: Option<AcceptedEditAttributionSummary>,
496    /// Pre-formatted, greppable string to persist into
497    /// `cloud_outbox.last_error` when `ok == false`. `None` on the
498    /// success path. For HTTP failures this is
499    /// `"{status} {reason}: {body_snippet}"`; for transport failures
500    /// it is `"transport: {message}"`; for semantic cloud-side
501    /// rejections (e.g. `record_accepted_edit` 2xx but
502    /// `acceptance_recorded == false`) it carries the `response.error`
503    /// the cloud handed back.
504    last_error: Option<String>,
505}
506
507impl DispatchOutcome {
508    const fn ok(ok: bool) -> Self {
509        Self {
510            ok,
511            accepted_edit_attribution: None,
512            last_error: None,
513        }
514    }
515
516    /// Constructor for the failure path: marks `ok = false` and
517    /// carries the rich error string the caller already formatted.
518    /// Centralises the field discipline so a future arm can't
519    /// accidentally lose `last_error`.
520    const fn failed_with(last_error: String) -> Self {
521        Self {
522            ok: false,
523            accepted_edit_attribution: None,
524            last_error: Some(last_error),
525        }
526    }
527
528    /// Translate a [`super::client::OutboxFailure`] into the persisted
529    /// `last_error` shape.
530    fn from_outbox_failure(failure: &super::client::OutboxFailure) -> Self {
531        Self::failed_with(failure.format_for_outbox_last_error())
532    }
533}
534
535fn accepted_edit_attribution_summary(
536    expected_rule_ids: usize,
537    response: &super::api_types::RecordAcceptedEditResponse,
538) -> AcceptedEditAttributionSummary {
539    let mut summary = AcceptedEditAttributionSummary {
540        uploaded: usize::from(response.acceptance_recorded),
541        launch_grade: 0,
542        missing_team_workspace: 0,
543        missing_rule_ids: 0,
544        unlinked_rule_observations: 0,
545    };
546    if response.acceptance_recorded {
547        if expected_rule_ids == 0 {
548            summary.missing_rule_ids = 1;
549        }
550        if response.team_id.is_none() {
551            summary.missing_team_workspace = 1;
552        }
553        if expected_rule_ids > 0 && response.observations_inserted == 0 {
554            summary.unlinked_rule_observations = 1;
555        }
556        if summary.warning_count() == 0 {
557            summary.launch_grade = 1;
558        }
559    }
560    summary
561}
562
563/// Supported outbox payload kinds. Stored as TEXT in `cloud_outbox.kind`;
564/// the `drain_outbox` dispatcher matches on these to pick the right POST
565/// route. Keep the string literals stable — changing one means abandoning
566/// every row in the queue at upgrade time.
567pub mod kind {
568    pub const TRAJECTORY: &str = "trajectory";
569    pub const REVIEW_METRICS: &str = "review_metrics";
570    pub const ACCEPTED_EDIT: &str = "accepted_edit";
571    /// Pre-release OSS outbox rows. Kept stable so drains can acknowledge
572    /// and discard them, but they must never feed the current accepted-edit
573    /// value-loop evidence endpoint.
574    pub const LEGACY_FIX_ACCEPTANCE: &str = "fix_acceptance";
575    pub const MCP_QUERY: &str = "mcp_query";
576    pub const IMPORTED_REVIEWS: &str = "imported_reviews";
577    /// `PostToolUse` observation; see `cloud::api_types::Observation`
578    /// and `crate::observation` for the payload shape.
579    pub const OBSERVATION: &str = "observation";
580    /// Session-mined candidate rule — see
581    /// [`crate::cloud::session_mined::SessionMinedCandidate`]. The
582    /// destination endpoint is `POST /api/cloud/session-mined-candidates`.
583    /// The cloud endpoint exists; local drains still need an explicit
584    /// dispatcher arm before these rows leave `pending`. Defined here (rather
585    /// than in `session_mine/worker.rs`) so the enqueue and dispatcher share
586    /// one source of truth for the wire string.
587    pub const SESSION_MINED_CANDIDATE: &str = "session_mined_candidate";
588}
589
590/// Drain at most `max_items` outbox rows. For each claimed row, invoke
591/// the appropriate `CloudClient` method, then `confirm` on success or
592/// `mark_failed` on HTTP / network failure. Returns `(attempted, confirmed)`.
593///
594/// Callers should treat this as best-effort: a non-Ok return value from
595/// a SQL operation is surfaced, but upload failures themselves are
596/// absorbed into the queue's retry counters.
597///
598/// Call site: hook cold-path exits (before `exit(0)`) and any CLI
599/// command that has some idle time after its main work completes.
600pub async fn drain_outbox(
601    queue: &OutboxQueue,
602    client: &super::client::CloudClient,
603    max_items: usize,
604) -> Result<(usize, usize), sqlx::Error> {
605    let report = drain_outbox_report(queue, client, max_items).await?;
606    Ok((report.attempted, report.confirmed))
607}
608
609pub async fn drain_outbox_report(
610    queue: &OutboxQueue,
611    client: &super::client::CloudClient,
612    max_items: usize,
613) -> Result<OutboxDrainReport, sqlx::Error> {
614    if !client.is_logged_in() {
615        // Logged out — leave rows in place; a future logged-in session
616        // will drain them. Treat this as "nothing to do".
617        return Ok(OutboxDrainReport::default());
618    }
619    let _drain_guard = drain_serialization_lock().lock().await;
620
621    let mut attempted = 0usize;
622    let mut confirmed = 0usize;
623    let mut accepted_edit_attribution = AcceptedEditAttributionSummary::default();
624    for _ in 0..max_items {
625        if matches!(queue.circuit_state(), CircuitState::Open { .. }) {
626            break;
627        }
628        let Some(item) = queue.claim_next().await? else {
629            break;
630        };
631        attempted += 1;
632        let outcome = match dispatch(client, &item).await {
633            Ok(outcome) => outcome,
634            Err(err) => {
635                let _ = queue.mark_failed(item.id, &err).await;
636                continue;
637            }
638        };
639        if outcome.ok {
640            queue.confirm(item.id).await?;
641            confirmed += 1;
642            if let Some(summary) = outcome.accepted_edit_attribution {
643                accepted_edit_attribution.add(summary);
644            }
645        } else {
646            // Persist the structured dispatch error when available.
647            let err_msg = outcome
648                .last_error
649                .as_deref()
650                .unwrap_or("upload returned non-2xx (no detail)");
651            let _ = queue.mark_failed(item.id, err_msg).await;
652        }
653    }
654    Ok(OutboxDrainReport {
655        attempted,
656        confirmed,
657        accepted_edit_attribution,
658    })
659}
660
661pub async fn drain_outbox_kind(
662    queue: &OutboxQueue,
663    client: &super::client::CloudClient,
664    kind: &str,
665    max_items: usize,
666) -> Result<(usize, usize), sqlx::Error> {
667    let report = drain_outbox_kind_report(queue, client, kind, max_items).await?;
668    Ok((report.attempted, report.confirmed))
669}
670
671pub async fn drain_outbox_kind_report(
672    queue: &OutboxQueue,
673    client: &super::client::CloudClient,
674    kind: &str,
675    max_items: usize,
676) -> Result<OutboxDrainReport, sqlx::Error> {
677    if !client.is_logged_in() {
678        return Ok(OutboxDrainReport::default());
679    }
680    let _drain_guard = drain_serialization_lock().lock().await;
681
682    let mut attempted = 0usize;
683    let mut confirmed = 0usize;
684    let mut accepted_edit_attribution = AcceptedEditAttributionSummary::default();
685    for _ in 0..max_items {
686        if matches!(queue.circuit_state(), CircuitState::Open { .. }) {
687            break;
688        }
689        let Some(item) = queue.claim_next_kind(kind).await? else {
690            break;
691        };
692        attempted += 1;
693        let outcome = match dispatch(client, &item).await {
694            Ok(outcome) => outcome,
695            Err(err) => {
696                let _ = queue.mark_failed(item.id, &err).await;
697                continue;
698            }
699        };
700        if outcome.ok {
701            queue.confirm(item.id).await?;
702            confirmed += 1;
703            if let Some(summary) = outcome.accepted_edit_attribution {
704                accepted_edit_attribution.add(summary);
705            }
706        } else {
707            // Persist the structured dispatch error when available.
708            let err_msg = outcome
709                .last_error
710                .as_deref()
711                .unwrap_or("upload returned non-2xx (no detail)");
712            let _ = queue.mark_failed(item.id, err_msg).await;
713        }
714    }
715    Ok(OutboxDrainReport {
716        attempted,
717        confirmed,
718        accepted_edit_attribution,
719    })
720}
721
722/// Route a single outbox row to the correct `CloudClient` method.
723///
724/// The payload JSON is a versionless wrapper so the enqueue / drain
725/// contracts stay in one place. Schemas:
726///
727/// * `trajectory`        — `{ "pr_review_id": String, "steps": Value }`
728/// * `review_metrics`    — `{ "review_id": String, "req": RecordReviewMetricsRequest }`
729/// * `accepted_edit`     — `RecordAcceptedEditRequest`
730/// * `fix_acceptance`    — legacy pre-release rows; explicitly skipped
731/// * `mcp_query`         — `{ "file", "intent", "rules_injected",
732///                             "strict_match_count", "rule_titles",
733///                             "client_label" }`
734/// * `imported_reviews`  — `UploadImportedReviewsRequest`
735async fn dispatch(
736    client: &super::client::CloudClient,
737    item: &OutboxItem,
738) -> Result<DispatchOutcome, String> {
739    use super::api_types::{
740        RecordAcceptedEditRequest, RecordReviewMetricsRequest, UploadImportedReviewsRequest,
741    };
742    use serde_json::Value;
743
744    match item.kind.as_str() {
745        kind::TRAJECTORY => {
746            let v: Value = serde_json::from_str(&item.payload_json)
747                .map_err(|e| format!("trajectory parse: {e}"))?;
748            let pr_review_id = v
749                .get("pr_review_id")
750                .and_then(|x| x.as_str())
751                .ok_or_else(|| "trajectory missing pr_review_id".to_owned())?;
752            let steps = v.get("steps").cloned().unwrap_or(Value::Array(Vec::new()));
753            // `_outcome` carries status + body detail for `last_error`.
754            Ok(
755                match client.save_trajectory_outcome(pr_review_id, steps).await {
756                    Ok(()) => DispatchOutcome::ok(true),
757                    Err(failure) => DispatchOutcome::from_outbox_failure(&failure),
758                },
759            )
760        }
761        kind::REVIEW_METRICS => {
762            let v: Value = serde_json::from_str(&item.payload_json)
763                .map_err(|e| format!("review_metrics parse: {e}"))?;
764            let review_id = v
765                .get("review_id")
766                .and_then(|x| x.as_str())
767                .ok_or_else(|| "review_metrics missing review_id".to_owned())?
768                .to_owned();
769            let req_val = v
770                .get("req")
771                .cloned()
772                .unwrap_or(Value::Object(serde_json::Map::default()));
773            let req: RecordReviewMetricsRequest = serde_json::from_value(req_val)
774                .map_err(|e| format!("review_metrics decode req: {e}"))?;
775            Ok(
776                match client.record_review_metrics_outcome(&review_id, req).await {
777                    Ok(()) => DispatchOutcome::ok(true),
778                    Err(failure) => DispatchOutcome::from_outbox_failure(&failure),
779                },
780            )
781        }
782        kind::ACCEPTED_EDIT => {
783            let req: RecordAcceptedEditRequest = serde_json::from_str(&item.payload_json)
784                .map_err(|e| format!("accepted_edit parse: {e}"))?;
785            let expected_rule_ids = req
786                .rule_ids
787                .iter()
788                .filter(|rule_id| !rule_id.trim().is_empty())
789                .count();
790            let response = client.record_accepted_edit_response(req).await?;
791            let summary = accepted_edit_attribution_summary(expected_rule_ids, &response);
792            // Semantic-only failure path: the cloud returned 2xx but
793            // refused to record the acceptance (e.g. dedup, payload
794            // rejection). Surface its `error` field — never the
795            // historic `non-2xx` literal, which is actively wrong
796            // here (the response was 2xx).
797            let last_error = if response.acceptance_recorded {
798                None
799            } else {
800                Some(format!(
801                    "accepted_edit rejected: {}",
802                    response.error.as_deref().unwrap_or("no detail")
803                ))
804            };
805            Ok(DispatchOutcome {
806                ok: response.acceptance_recorded,
807                accepted_edit_attribution: Some(summary),
808                last_error,
809            })
810        }
811        kind::LEGACY_FIX_ACCEPTANCE => {
812            // Historical `fix_acceptance` rows predate the current
813            // accepted-edit proof contract. Confirm them so old queues do
814            // not retry forever, but do not POST them to `/accepted-edits`
815            // or count them as current value-loop evidence.
816            Ok(DispatchOutcome::ok(true))
817        }
818        kind::MCP_QUERY => {
819            let v: Value = serde_json::from_str(&item.payload_json)
820                .map_err(|e| format!("mcp_query parse: {e}"))?;
821            let file = v
822                .get("file")
823                .and_then(|x| x.as_str())
824                .unwrap_or("")
825                .to_owned();
826            let intent = v
827                .get("intent")
828                .and_then(|x| x.as_str())
829                .map(ToOwned::to_owned);
830            let rules_injected = v
831                .get("rules_injected")
832                .and_then(Value::as_u64)
833                .and_then(|n| usize::try_from(n).ok())
834                .unwrap_or(0);
835            let strict_match_count = v
836                .get("strict_match_count")
837                .and_then(Value::as_u64)
838                .and_then(|n| usize::try_from(n).ok())
839                .unwrap_or(0);
840            let rule_titles: Vec<String> = v
841                .get("rule_titles")
842                .and_then(|x| x.as_array())
843                .map(|arr| {
844                    arr.iter()
845                        .filter_map(|t| t.as_str().map(String::from))
846                        .collect()
847                })
848                .unwrap_or_default();
849            let rule_ids: Vec<String> = v
850                .get("rule_ids")
851                .and_then(|x| x.as_array())
852                .map(|arr| {
853                    arr.iter()
854                        .filter_map(|t| t.as_str().map(String::from))
855                        .collect()
856                })
857                .unwrap_or_default();
858            let client_label = v
859                .get("client_label")
860                .and_then(|x| x.as_str())
861                .map(ToOwned::to_owned);
862            let repo_full_name = v
863                .get("repo_full_name")
864                .and_then(|x| x.as_str())
865                .map(ToOwned::to_owned);
866            Ok(
867                match client
868                    .track_mcp_query_outcome(
869                        &file,
870                        intent.as_deref(),
871                        rules_injected,
872                        strict_match_count,
873                        rule_titles,
874                        rule_ids,
875                        client_label.as_deref(),
876                        repo_full_name.as_deref(),
877                    )
878                    .await
879                {
880                    Ok(()) => DispatchOutcome::ok(true),
881                    Err(failure) => DispatchOutcome::from_outbox_failure(&failure),
882                },
883            )
884        }
885        kind::IMPORTED_REVIEWS => {
886            let req: UploadImportedReviewsRequest = serde_json::from_str(&item.payload_json)
887                .map_err(|e| format!("imported_reviews parse: {e}"))?;
888            Ok(match client.upload_imported_reviews_outcome(&req).await {
889                Ok(()) => DispatchOutcome::ok(true),
890                Err(failure) => DispatchOutcome::from_outbox_failure(&failure),
891            })
892        }
893        kind::OBSERVATION => {
894            let obs: super::api_types::Observation = serde_json::from_str(&item.payload_json)
895                .map_err(|e| format!("observation parse: {e}"))?;
896            Ok(
897                match client
898                    .post_observations_outcome(std::slice::from_ref(&obs))
899                    .await
900                {
901                    Ok(()) => DispatchOutcome::ok(true),
902                    Err(failure) => DispatchOutcome::from_outbox_failure(&failure),
903                },
904            )
905        }
906        other => Err(format!("unknown outbox kind '{other}'")),
907    }
908}
909
910#[cfg(test)]
911mod tests {
912    use super::*;
913    use crate::cloud::api_types::RecordAcceptedEditResponse;
914    use sqlx::sqlite::{SqliteConnectOptions, SqlitePoolOptions};
915
916    async fn fresh_pool() -> SqlitePool {
917        let opts = SqliteConnectOptions::new()
918            .filename(":memory:")
919            .create_if_missing(true);
920        let pool = SqlitePoolOptions::new()
921            .max_connections(1)
922            .connect_with(opts)
923            .await
924            .expect("pool");
925        sqlx::migrate!("./migrations")
926            .run(&pool)
927            .await
928            .expect("apply migrations");
929        pool
930    }
931
932    async fn status_of(pool: &SqlitePool, id: i64) -> Option<String> {
933        sqlx::query_scalar!("SELECT status FROM cloud_outbox WHERE id = ?1", id)
934            .fetch_optional(pool)
935            .await
936            .unwrap()
937    }
938
939    fn accepted_edit_response(
940        acceptance_recorded: bool,
941        team_id: Option<&str>,
942        observations_inserted: u32,
943    ) -> RecordAcceptedEditResponse {
944        RecordAcceptedEditResponse {
945            ok: acceptance_recorded,
946            acceptance_recorded,
947            acceptance_id: acceptance_recorded.then(|| "acceptance-1".to_owned()),
948            diff_signature: Some("diff-1".to_owned()),
949            team_id: team_id.map(str::to_owned),
950            attributed_rule_ids: Vec::new(),
951            observations_inserted,
952            memory_reinforcement_recorded: false,
953            memory_reinforcement_deduped: false,
954            error: None,
955        }
956    }
957
958    #[test]
959    fn accepted_edit_attribution_summary_counts_launch_grade_uploads() {
960        let response = accepted_edit_response(true, Some("team-1"), 2);
961        let summary = accepted_edit_attribution_summary(2, &response);
962
963        assert_eq!(summary.uploaded, 1);
964        assert_eq!(summary.launch_grade, 1);
965        assert_eq!(summary.warning_count(), 0);
966    }
967
968    #[test]
969    fn accepted_edit_attribution_summary_flags_raw_only_uploads() {
970        let missing_team =
971            accepted_edit_attribution_summary(2, &accepted_edit_response(true, None, 2));
972        assert_eq!(missing_team.uploaded, 1);
973        assert_eq!(missing_team.launch_grade, 0);
974        assert_eq!(missing_team.missing_team_workspace, 1);
975
976        let missing_rule_ids =
977            accepted_edit_attribution_summary(0, &accepted_edit_response(true, Some("team-1"), 0));
978        assert_eq!(missing_rule_ids.missing_rule_ids, 1);
979        assert_eq!(missing_rule_ids.launch_grade, 0);
980
981        let unlinked_observation =
982            accepted_edit_attribution_summary(2, &accepted_edit_response(true, Some("team-1"), 0));
983        assert_eq!(unlinked_observation.unlinked_rule_observations, 1);
984        assert_eq!(unlinked_observation.launch_grade, 0);
985    }
986
987    #[test]
988    fn accepted_edit_attribution_summary_ignores_failed_uploads() {
989        let response = accepted_edit_response(false, None, 0);
990        let summary = accepted_edit_attribution_summary(0, &response);
991
992        assert_eq!(summary.uploaded, 0);
993        assert_eq!(summary.launch_grade, 0);
994        assert_eq!(summary.warning_count(), 0);
995    }
996
997    #[tokio::test]
998    async fn legacy_fix_acceptance_dispatch_skips_current_accepted_edit_pipeline() {
999        let client = crate::cloud::client::CloudClient::new();
1000        let item = OutboxItem {
1001            id: 1,
1002            kind: kind::LEGACY_FIX_ACCEPTANCE.to_owned(),
1003            payload_json: "not accepted-edit json".to_owned(),
1004            retry_count: 0,
1005        };
1006
1007        let outcome = dispatch(&client, &item)
1008            .await
1009            .expect("legacy rows are explicitly acknowledged and skipped");
1010
1011        assert!(outcome.ok);
1012        assert_eq!(outcome.accepted_edit_attribution, None);
1013    }
1014
1015    #[tokio::test]
1016    async fn enqueue_then_claim_moves_to_processing() {
1017        let pool = fresh_pool().await;
1018        let q = OutboxQueue::new(pool.clone());
1019        let id = q.enqueue("trajectory", "{}").await.unwrap();
1020        assert_eq!(status_of(&pool, id).await.as_deref(), Some("pending"));
1021
1022        let item = q.claim_next().await.unwrap().expect("row claimed");
1023        assert_eq!(item.id, id);
1024        assert_eq!(status_of(&pool, id).await.as_deref(), Some("processing"));
1025    }
1026
1027    #[tokio::test]
1028    async fn claim_next_kind_prioritizes_matching_kind() {
1029        let pool = fresh_pool().await;
1030        let q = OutboxQueue::new(pool.clone());
1031        let old_fix = q.enqueue(kind::ACCEPTED_EDIT, "{}").await.unwrap();
1032        let mcp = q.enqueue(kind::MCP_QUERY, "{}").await.unwrap();
1033
1034        let item = q
1035            .claim_next_kind(kind::MCP_QUERY)
1036            .await
1037            .unwrap()
1038            .expect("mcp row claimed");
1039
1040        assert_eq!(item.id, mcp);
1041        assert_eq!(status_of(&pool, mcp).await.as_deref(), Some("processing"));
1042        assert_eq!(status_of(&pool, old_fix).await.as_deref(), Some("pending"));
1043    }
1044
1045    #[tokio::test]
1046    async fn drain_serialization_lock_is_process_wide() {
1047        let guard = drain_serialization_lock()
1048            .try_lock()
1049            .expect("first drain lock");
1050        assert!(
1051            drain_serialization_lock().try_lock().is_err(),
1052            "concurrent drainers must share the same in-process lock"
1053        );
1054        drop(guard);
1055        assert!(drain_serialization_lock().try_lock().is_ok());
1056    }
1057
1058    #[tokio::test]
1059    async fn confirm_deletes_row() {
1060        let pool = fresh_pool().await;
1061        let q = OutboxQueue::new(pool.clone());
1062        let id = q.enqueue("trajectory", "{}").await.unwrap();
1063        let item = q.claim_next().await.unwrap().unwrap();
1064        q.confirm(item.id).await.unwrap();
1065        assert!(status_of(&pool, id).await.is_none());
1066    }
1067
1068    #[tokio::test]
1069    async fn mark_failed_eight_times_abandons() {
1070        // Reconciliation note: the abandon bound was unified 3 -> 8
1071        // (`outbox_core::MAX_RETRY_COUNT`) so the `cloud_outbox` queue
1072        // no longer under-retries relative to the higher-volume
1073        // observation queue. A row therefore now survives 7 retried
1074        // failures and is abandoned on the 8th (`next_count == 8`).
1075        let pool = fresh_pool().await;
1076        let q = OutboxQueue::new(pool.clone());
1077        let id = q.enqueue("trajectory", "{}").await.unwrap();
1078
1079        // Attempts 1..=7 bounce the row back to `pending`. The circuit
1080        // breaker trips after 3 consecutive failures, which — now that
1081        // the bound is 8 — happens *before* the abandon transition, so
1082        // reset the in-process breaker state between attempts to keep
1083        // `claim_next` returning the row (the on-disk retry_count, the
1084        // thing under test, is unaffected by this reset).
1085        for attempt in 1..=7 {
1086            q.circuit_open_until_ms.store(0, Ordering::SeqCst);
1087            q.consecutive_failures.store(0, Ordering::SeqCst);
1088            let item = q.claim_next().await.unwrap().unwrap();
1089            q.mark_failed(item.id, &format!("net {attempt}"))
1090                .await
1091                .unwrap();
1092            assert_eq!(
1093                status_of(&pool, id).await.as_deref(),
1094                Some("pending"),
1095                "attempt {attempt}: retry_count {attempt} (< 8) must stay pending"
1096            );
1097        }
1098
1099        // Attempt 8 — retry_count becomes 8, should transition to
1100        // abandoned.
1101        q.circuit_open_until_ms.store(0, Ordering::SeqCst);
1102        q.consecutive_failures.store(0, Ordering::SeqCst);
1103        let item = q.claim_next().await.unwrap().unwrap();
1104        q.mark_failed(item.id, "net 8").await.unwrap();
1105        assert_eq!(status_of(&pool, id).await.as_deref(), Some("abandoned"));
1106
1107        // Abandoned rows are NOT re-claimable. Force-close the breaker
1108        // first so the assertion is about abandonment, not the breaker.
1109        q.circuit_open_until_ms.store(0, Ordering::SeqCst);
1110        q.consecutive_failures.store(0, Ordering::SeqCst);
1111        assert!(q.claim_next().await.unwrap().is_none());
1112    }
1113
1114    #[tokio::test]
1115    async fn claim_next_auto_recovers_stale_processing_rows() {
1116        // Simulate a crashed drain: enqueue → claim → never confirm,
1117        // then backdate `claimed_at` past the stale threshold. A later
1118        // `claim_next` from a fresh caller must self-heal the row (no
1119        // explicit `reset_stale` call) so daemon crashes don't leave
1120        // work stuck in `processing` forever.
1121        let pool = fresh_pool().await;
1122        let q = OutboxQueue::new(pool.clone());
1123        let id = q.enqueue("trajectory", "{\"crashed\":true}").await.unwrap();
1124
1125        // First claim — simulates the drain that subsequently dies.
1126        let first = q.claim_next().await.unwrap().expect("first claim");
1127        assert_eq!(first.id, id);
1128        assert_eq!(status_of(&pool, id).await.as_deref(), Some("processing"));
1129
1130        // Backdate `claimed_at` to push the row past
1131        // `DEFAULT_STALE_SECONDS`.
1132        let stale = now_unix_ms() - (DEFAULT_STALE_SECONDS as i64 + 30) * 1000;
1133        sqlx::query!(
1134            "UPDATE cloud_outbox SET claimed_at = ?1 WHERE id = ?2",
1135            stale,
1136            id
1137        )
1138        .execute(&pool)
1139        .await
1140        .unwrap();
1141
1142        // Second claim from the "recovered" caller — must pick up the
1143        // stale row without any intermediate `reset_stale` call.
1144        let recovered = q.claim_next().await.unwrap().expect("recovered claim");
1145        assert_eq!(recovered.id, id, "stale row must be re-claimable");
1146        assert_eq!(status_of(&pool, id).await.as_deref(), Some("processing"));
1147    }
1148
1149    #[tokio::test]
1150    async fn claim_next_ignores_fresh_processing_rows() {
1151        // A still-fresh `processing` row (within the stale window) must
1152        // NOT be re-claimed — that would let two drainers race on the
1153        // same payload and duplicate the cloud upload.
1154        let pool = fresh_pool().await;
1155        let q = OutboxQueue::new(pool.clone());
1156        let _fresh = q.enqueue("trajectory", "{}").await.unwrap();
1157        let item = q.claim_next().await.unwrap().expect("initial claim");
1158
1159        // With no pending rows left AND the only processing row still
1160        // fresh, claim_next must return None.
1161        assert!(q.claim_next().await.unwrap().is_none());
1162        // Sanity: confirm cleans up.
1163        q.confirm(item.id).await.unwrap();
1164    }
1165
1166    #[tokio::test]
1167    async fn reset_stale_promotes_processing_rows() {
1168        let pool = fresh_pool().await;
1169        let q = OutboxQueue::new(pool.clone());
1170        let id = q.enqueue("trajectory", "{}").await.unwrap();
1171        let _ = q.claim_next().await.unwrap().unwrap();
1172        assert_eq!(status_of(&pool, id).await.as_deref(), Some("processing"));
1173
1174        // Backdate claimed_at so the threshold fires.
1175        let backdated = now_unix_ms() - 120_000;
1176        sqlx::query!(
1177            "UPDATE cloud_outbox SET claimed_at = ?1 WHERE id = ?2",
1178            backdated,
1179            id
1180        )
1181        .execute(&pool)
1182        .await
1183        .unwrap();
1184
1185        let promoted = q.reset_stale(60).await.unwrap();
1186        assert_eq!(promoted, 1);
1187        assert_eq!(status_of(&pool, id).await.as_deref(), Some("pending"));
1188    }
1189
1190    #[tokio::test]
1191    async fn circuit_breaker_halts_claims_after_three_failures() {
1192        let pool = fresh_pool().await;
1193        let q = OutboxQueue::new(pool.clone());
1194
1195        // Enqueue four rows so we have at least one left after tripping.
1196        for i in 0..4 {
1197            q.enqueue("trajectory", &format!("{{\"i\":{i}}}"))
1198                .await
1199                .unwrap();
1200        }
1201
1202        for _ in 0..3 {
1203            let item = q.claim_next().await.unwrap().unwrap();
1204            q.mark_failed(item.id, "x").await.unwrap();
1205        }
1206
1207        // Breaker is open; claim_next must return None even though a
1208        // pending row still exists.
1209        assert!(matches!(q.circuit_state(), CircuitState::Open { .. }));
1210        assert!(q.claim_next().await.unwrap().is_none());
1211    }
1212
1213    #[tokio::test]
1214    async fn confirm_resets_consecutive_failure_counter() {
1215        let pool = fresh_pool().await;
1216        let q = OutboxQueue::new(pool.clone());
1217
1218        // Two failures tick the counter to 2 (still below the threshold
1219        // of 3, so the breaker stays closed).
1220        let _id1 = q.enqueue("trajectory", "{}").await.unwrap();
1221        let _id2 = q.enqueue("trajectory", "{}").await.unwrap();
1222
1223        let item = q.claim_next().await.unwrap().unwrap();
1224        q.mark_failed(item.id, "f1").await.unwrap();
1225        let item = q.claim_next().await.unwrap().unwrap();
1226        q.mark_failed(item.id, "f2").await.unwrap();
1227        assert_eq!(q.consecutive_failures.load(Ordering::SeqCst), 2);
1228
1229        // A successful confirm in between must reset the counter. We
1230        // don't care which physical row got claimed — only that a
1231        // successful confirm clears the consecutive-failure state.
1232        let item = q.claim_next().await.unwrap().unwrap();
1233        q.confirm(item.id).await.unwrap();
1234        assert_eq!(q.consecutive_failures.load(Ordering::SeqCst), 0);
1235    }
1236
1237    #[tokio::test]
1238    async fn claim_next_returns_observation_kind() {
1239        let pool = fresh_pool().await;
1240        let q = OutboxQueue::new(pool.clone());
1241        let obs_id = q
1242            .enqueue(kind::OBSERVATION, r#"{"session_id":"s"}"#)
1243            .await
1244            .unwrap();
1245        let traj_id = q.enqueue(kind::TRAJECTORY, "{}").await.unwrap();
1246
1247        let first = q.claim_next().await.unwrap().expect("claimed first");
1248        let second = q.claim_next().await.unwrap().expect("claimed second");
1249        assert_eq!(first.id, obs_id);
1250        assert_eq!(first.kind, kind::OBSERVATION);
1251        assert_eq!(second.id, traj_id);
1252    }
1253
1254    #[tokio::test]
1255    async fn claim_next_returns_oldest_first() {
1256        let pool = fresh_pool().await;
1257        let q = OutboxQueue::new(pool.clone());
1258        let a = q.enqueue("trajectory", r#"{"n":"a"}"#).await.unwrap();
1259        // Tiny sleep to guarantee distinct created_at timestamps.
1260        tokio::time::sleep(std::time::Duration::from_millis(2)).await;
1261        let b = q.enqueue("trajectory", r#"{"n":"b"}"#).await.unwrap();
1262
1263        let first = q.claim_next().await.unwrap().unwrap();
1264        let second = q.claim_next().await.unwrap().unwrap();
1265        assert_eq!(first.id, a);
1266        assert_eq!(second.id, b);
1267    }
1268
1269    /// Helper: insert a directly-abandoned row at a chosen `created_at`
1270    /// (in unix-ms). Bypasses the public `enqueue`/`mark_failed` path so
1271    /// the cutoff-window tests don't need to fake 8 round-trips per row.
1272    async fn insert_abandoned(
1273        pool: &SqlitePool,
1274        kind: &str,
1275        created_at_ms: i64,
1276        claimed_at_ms: Option<i64>,
1277    ) -> i64 {
1278        sqlx::query(
1279            "INSERT INTO cloud_outbox \
1280             (kind, payload_json, status, retry_count, created_at, claimed_at, last_error) \
1281             VALUES (?1, '{}', 'abandoned', ?2, ?3, ?4, 'upload returned non-2xx')",
1282        )
1283        .bind(kind)
1284        .bind(MAX_RETRY_COUNT)
1285        .bind(created_at_ms)
1286        .bind(claimed_at_ms)
1287        .execute(pool)
1288        .await
1289        .unwrap()
1290        .last_insert_rowid()
1291    }
1292
1293    #[tokio::test]
1294    async fn drain_abandoned_dry_run_reports_per_kind_without_mutating() {
1295        let pool = fresh_pool().await;
1296        let q = OutboxQueue::new(pool.clone());
1297        let now = now_unix_ms();
1298        let old = now - 31 * 86_400_000; // 31 days ago
1299        let mcp_id = insert_abandoned(&pool, "mcp_query", old, Some(old)).await;
1300        let obs_id = insert_abandoned(&pool, "observation", old, Some(old)).await;
1301        let _other_mcp = insert_abandoned(&pool, "mcp_query", old, Some(old)).await;
1302
1303        let summary = q.drain_abandoned_older_than(now, true).await.unwrap();
1304
1305        assert_eq!(summary.total, 3);
1306        // Sorted ascending by kind for deterministic doctor output.
1307        assert_eq!(
1308            summary.per_kind,
1309            vec![("mcp_query".to_owned(), 2), ("observation".to_owned(), 1),]
1310        );
1311        // Dry-run MUST NOT mutate any row.
1312        assert_eq!(status_of(&pool, mcp_id).await.as_deref(), Some("abandoned"));
1313        assert_eq!(status_of(&pool, obs_id).await.as_deref(), Some("abandoned"));
1314    }
1315
1316    #[tokio::test]
1317    async fn drain_abandoned_real_resets_eligible_rows_only() {
1318        let pool = fresh_pool().await;
1319        let q = OutboxQueue::new(pool.clone());
1320        let now = now_unix_ms();
1321        let old = now - 31 * 86_400_000;
1322        let fresh = now - 60_000; // 60s ago — must be left alone
1323        let cutoff = now - 7 * 86_400_000; // older-than-7d
1324
1325        let old_row = insert_abandoned(&pool, "mcp_query", old, Some(old)).await;
1326        let fresh_row = insert_abandoned(&pool, "mcp_query", fresh, Some(fresh)).await;
1327
1328        // Tick the in-process breaker into the "open" state so we can
1329        // assert the drain hygienically resets it on success.
1330        q.consecutive_failures
1331            .store(CIRCUIT_FAILURE_THRESHOLD, Ordering::SeqCst);
1332        q.circuit_open_until_ms
1333            .store(now + 60_000, Ordering::SeqCst);
1334
1335        let summary = q.drain_abandoned_older_than(cutoff, false).await.unwrap();
1336        assert_eq!(summary.total, 1);
1337        assert_eq!(status_of(&pool, old_row).await.as_deref(), Some("pending"));
1338        assert_eq!(
1339            status_of(&pool, fresh_row).await.as_deref(),
1340            Some("abandoned"),
1341            "rows newer than cutoff must NOT be touched",
1342        );
1343
1344        // Resurrected row must come back with retry_count cleared.
1345        let retry_count: i64 =
1346            sqlx::query_scalar("SELECT retry_count FROM cloud_outbox WHERE id = ?1")
1347                .bind(old_row)
1348                .fetch_one(&pool)
1349                .await
1350                .unwrap();
1351        assert_eq!(retry_count, 0);
1352        let last_error: Option<String> =
1353            sqlx::query_scalar("SELECT last_error FROM cloud_outbox WHERE id = ?1")
1354                .bind(old_row)
1355                .fetch_one(&pool)
1356                .await
1357                .unwrap();
1358        assert!(last_error.is_none());
1359
1360        // Breaker state cleared so the next drain pass isn't short-circuited
1361        // by a stale in-process counter from the auth-revoke storm.
1362        assert_eq!(q.consecutive_failures.load(Ordering::SeqCst), 0);
1363        assert_eq!(q.circuit_open_until_ms.load(Ordering::SeqCst), 0);
1364    }
1365
1366    #[tokio::test]
1367    async fn drain_abandoned_uses_created_at_when_no_claimed_at() {
1368        // Rows abandoned via decode failure / bookkeeping never get a
1369        // `claimed_at`. The cutoff must still apply via `created_at`
1370        // so they don't sit around forever.
1371        let pool = fresh_pool().await;
1372        let q = OutboxQueue::new(pool.clone());
1373        let now = now_unix_ms();
1374        let old = now - 90 * 86_400_000;
1375        let id = insert_abandoned(&pool, "observation", old, None).await;
1376        let cutoff = now - 30 * 86_400_000;
1377
1378        let summary = q.drain_abandoned_older_than(cutoff, false).await.unwrap();
1379        assert_eq!(summary.total, 1);
1380        assert_eq!(status_of(&pool, id).await.as_deref(), Some("pending"));
1381    }
1382
1383    #[tokio::test]
1384    async fn drain_abandoned_empty_queue_is_a_noop() {
1385        let pool = fresh_pool().await;
1386        let q = OutboxQueue::new(pool.clone());
1387        let summary = q
1388            .drain_abandoned_older_than(now_unix_ms(), false)
1389            .await
1390            .unwrap();
1391        assert_eq!(summary.total, 0);
1392        assert!(summary.per_kind.is_empty());
1393    }
1394
1395    #[tokio::test]
1396    async fn pending_counts_by_kind_buckets_pending_rows() {
1397        let pool = fresh_pool().await;
1398        let q = OutboxQueue::new(pool.clone());
1399        q.enqueue("mcp_query", "{}").await.unwrap();
1400        q.enqueue("mcp_query", "{}").await.unwrap();
1401        q.enqueue("observation", "{}").await.unwrap();
1402
1403        let counts = q.pending_counts_by_kind().await.unwrap();
1404        assert_eq!(
1405            counts,
1406            vec![("mcp_query".to_owned(), 2), ("observation".to_owned(), 1),]
1407        );
1408    }
1409
1410    // Structured `last_error` regression coverage.
1411
1412    use crate::cloud::client::{HttpFailure, OutboxFailure, normalize_body_snippet};
1413
1414    #[test]
1415    fn normalize_body_snippet_collapses_whitespace_runs_to_single_spaces() {
1416        let raw = "  line one  \n\nline two\t\twith\ttabs   ";
1417        let snippet = normalize_body_snippet(raw, 200);
1418        assert_eq!(snippet, "line one line two with tabs");
1419        assert!(!snippet.contains('\n'));
1420        assert!(!snippet.contains('\t'));
1421    }
1422
1423    #[test]
1424    fn normalize_body_snippet_caps_to_max_chars_without_splitting_utf8() {
1425        // Two-codepoint emoji + ASCII so the cap lands at codepoint 5
1426        // (not byte 5, which would slice mid-codepoint and panic on
1427        // `String::from_utf8` if we'd been sloppy).
1428        let raw = "😀😀😀😀😀ASCII tail";
1429        let snippet = normalize_body_snippet(raw, 5);
1430        assert_eq!(snippet.chars().count(), 5);
1431        assert_eq!(snippet, "😀😀😀😀😀");
1432    }
1433
1434    #[test]
1435    fn outbox_failure_http_with_body_matches_spec_format() {
1436        // HTTP failures persist `{status} {reason}: {body_snippet}`.
1437        let failure = OutboxFailure::Http(HttpFailure {
1438            status: 401,
1439            reason_phrase: "Unauthorized".to_owned(),
1440            body_snippet: r#"{"error":"session_revoked"}"#.to_owned(),
1441        });
1442        assert_eq!(
1443            failure.format_for_outbox_last_error(),
1444            r#"401 Unauthorized: {"error":"session_revoked"}"#
1445        );
1446    }
1447
1448    #[test]
1449    fn outbox_failure_http_with_empty_body_omits_trailing_colon() {
1450        let failure = OutboxFailure::Http(HttpFailure {
1451            status: 500,
1452            reason_phrase: "Internal Server Error".to_owned(),
1453            body_snippet: String::new(),
1454        });
1455        assert_eq!(
1456            failure.format_for_outbox_last_error(),
1457            "500 Internal Server Error",
1458        );
1459    }
1460
1461    #[test]
1462    fn outbox_failure_transport_uses_distinct_sentinel_not_non_2xx_literal() {
1463        let failure = OutboxFailure::Transport("dns lookup failed: timed out".to_owned());
1464        let formatted = failure.format_for_outbox_last_error();
1465        assert!(formatted.starts_with("transport: "));
1466        assert!(formatted.contains("dns lookup failed"));
1467        // Keep transport failures out of the generic non-2xx bucket.
1468        assert!(
1469            !formatted.contains("non-2xx"),
1470            "transport failures must not collapse to the legacy 'non-2xx' bucket"
1471        );
1472    }
1473
1474    #[tokio::test]
1475    async fn mark_failed_persists_dispatchoutcome_last_error_verbatim() {
1476        // Rich dispatch errors must reach `cloud_outbox.last_error`
1477        // unchanged except for the 2 KB safety trim.
1478        let pool = fresh_pool().await;
1479        let q = OutboxQueue::new(pool.clone());
1480        let id = q.enqueue("trajectory", "{}").await.unwrap();
1481        let _claimed = q.claim_next().await.unwrap().expect("row claimed");
1482
1483        let formatted = OutboxFailure::Http(HttpFailure {
1484            status: 401,
1485            reason_phrase: "Unauthorized".to_owned(),
1486            body_snippet: r#"{"error":"session_revoked"}"#.to_owned(),
1487        })
1488        .format_for_outbox_last_error();
1489        q.mark_failed(id, &formatted).await.unwrap();
1490
1491        let stored: Option<String> =
1492            sqlx::query_scalar!("SELECT last_error FROM cloud_outbox WHERE id = ?1", id)
1493                .fetch_one(&pool)
1494                .await
1495                .unwrap();
1496        let stored = stored.expect("mark_failed must populate last_error");
1497        assert!(stored.starts_with("401 Unauthorized:"));
1498        assert!(stored.contains("session_revoked"));
1499        // Do not collapse status + body into the generic placeholder.
1500        assert_ne!(stored, "upload returned non-2xx");
1501    }
1502
1503    #[test]
1504    fn dispatch_outcome_from_outbox_failure_propagates_spec_format() {
1505        // The dispatch failure builder must preserve the persisted
1506        // error format.
1507        let outcome = DispatchOutcome::from_outbox_failure(&OutboxFailure::Http(HttpFailure {
1508            status: 401,
1509            reason_phrase: "Unauthorized".to_owned(),
1510            body_snippet: r#"{"error":"session_revoked"}"#.to_owned(),
1511        }));
1512        assert!(!outcome.ok);
1513        let last = outcome
1514            .last_error
1515            .expect("failures must always carry a last_error");
1516        assert!(last.starts_with("401 Unauthorized:"));
1517        assert!(last.contains("session_revoked"));
1518    }
1519}