1use super::outbox_core::{self, RetryDecision, now_unix_ms};
32use serde::{Deserialize, Serialize};
33use sqlx::SqlitePool;
34use std::sync::atomic::{AtomicI64, AtomicU32, Ordering};
35use std::sync::{Arc, OnceLock};
36use tokio::sync::Mutex;
37
38pub const DEFAULT_STALE_SECONDS: u64 = 60;
42
43pub const CIRCUIT_FAILURE_THRESHOLD: u32 = 3;
45
46pub const CIRCUIT_OPEN_DURATION_MS: i64 = 60_000;
49
50pub const MAX_RETRY_COUNT: i64 = outbox_core::MAX_RETRY_COUNT;
53
54static DRAIN_SERIALIZATION_LOCK: OnceLock<Mutex<()>> = OnceLock::new();
55
56fn drain_serialization_lock() -> &'static Mutex<()> {
57 DRAIN_SERIALIZATION_LOCK.get_or_init(|| Mutex::new(()))
58}
59
60#[derive(Debug, Clone, Serialize, Deserialize)]
62pub struct OutboxItem {
63 pub id: i64,
64 pub kind: String,
65 pub payload_json: String,
66 pub retry_count: i64,
67}
68
69#[derive(Debug, Clone, Copy, PartialEq, Eq)]
72pub enum CircuitState {
73 Closed,
74 Open { until_unix_ms: i64 },
75}
76
77#[derive(Debug, Clone)]
81pub struct OutboxQueue {
82 pool: SqlitePool,
83 consecutive_failures: Arc<AtomicU32>,
86 circuit_open_until_ms: Arc<AtomicI64>,
88}
89
90impl OutboxQueue {
91 pub fn new(pool: SqlitePool) -> Self {
94 Self {
95 pool,
96 consecutive_failures: Arc::new(AtomicU32::new(0)),
97 circuit_open_until_ms: Arc::new(AtomicI64::new(0)),
98 }
99 }
100
101 pub async fn enqueue(&self, kind: &str, payload_json: &str) -> Result<i64, sqlx::Error> {
105 if !crate::cloud::capture::capture_enabled() {
106 return Ok(0);
107 }
108 let now = now_unix_ms();
109 let result = sqlx::query!(
110 "INSERT INTO cloud_outbox (kind, payload_json, status, retry_count, created_at) \
111 VALUES (?1, ?2, 'pending', 0, ?3)",
112 kind,
113 payload_json,
114 now
115 )
116 .execute(&self.pool)
117 .await?;
118 Ok(result.last_insert_rowid())
119 }
120
121 pub fn circuit_state(&self) -> CircuitState {
125 let until = self.circuit_open_until_ms.load(Ordering::SeqCst);
126 if until == 0 {
127 return CircuitState::Closed;
128 }
129 if now_unix_ms() >= until {
130 self.circuit_open_until_ms.store(0, Ordering::SeqCst);
135 CircuitState::Closed
136 } else {
137 CircuitState::Open {
138 until_unix_ms: until,
139 }
140 }
141 }
142
143 pub async fn claim_next(&self) -> Result<Option<OutboxItem>, sqlx::Error> {
152 if matches!(self.circuit_state(), CircuitState::Open { .. }) {
153 return Ok(None);
154 }
155
156 let now = now_unix_ms();
157 let stale_cutoff = now - (DEFAULT_STALE_SECONDS as i64) * 1000;
165 let row = sqlx::query!(
166 r#"UPDATE cloud_outbox
167 SET status = 'processing', claimed_at = ?1
168 WHERE id = (
169 SELECT id FROM cloud_outbox
170 WHERE status = 'pending'
171 OR (status = 'processing'
172 AND claimed_at IS NOT NULL
173 AND claimed_at < ?2)
174 ORDER BY created_at ASC, id ASC
175 LIMIT 1
176 )
177 RETURNING id as "id!: i64", kind, payload_json, retry_count"#,
178 now,
179 stale_cutoff
180 )
181 .fetch_optional(&self.pool)
182 .await?;
183
184 Ok(row.map(|r| OutboxItem {
185 id: r.id,
186 kind: r.kind,
187 payload_json: r.payload_json,
188 retry_count: r.retry_count,
189 }))
190 }
191
192 pub async fn claim_next_kind(&self, kind: &str) -> Result<Option<OutboxItem>, sqlx::Error> {
193 if matches!(self.circuit_state(), CircuitState::Open { .. }) {
194 return Ok(None);
195 }
196
197 let now = now_unix_ms();
198 let stale_cutoff = now - (DEFAULT_STALE_SECONDS as i64) * 1000;
199 let row = sqlx::query!(
200 r#"UPDATE cloud_outbox
201 SET status = 'processing', claimed_at = ?1
202 WHERE id = (
203 SELECT id FROM cloud_outbox
204 WHERE kind = ?3
205 AND (status = 'pending'
206 OR (status = 'processing'
207 AND claimed_at IS NOT NULL
208 AND claimed_at < ?2))
209 ORDER BY created_at ASC, id ASC
210 LIMIT 1
211 )
212 RETURNING id AS "id!: i64", kind AS "kind!: String", payload_json AS "payload_json!: String", retry_count AS "retry_count!: i64""#,
213 now,
214 stale_cutoff,
215 kind,
216 )
217 .fetch_optional(&self.pool)
218 .await?;
219
220 Ok(row.map(|r| OutboxItem {
221 id: r.id,
222 kind: r.kind,
223 payload_json: r.payload_json,
224 retry_count: r.retry_count,
225 }))
226 }
227
228 pub async fn confirm(&self, id: i64) -> Result<(), sqlx::Error> {
231 sqlx::query!("DELETE FROM cloud_outbox WHERE id = ?1", id)
232 .execute(&self.pool)
233 .await?;
234 self.consecutive_failures.store(0, Ordering::SeqCst);
235 Ok(())
239 }
240
241 pub async fn mark_failed(&self, id: i64, err: &str) -> Result<(), sqlx::Error> {
249 let err_trimmed: String = outbox_core::truncate(err, 2048);
251
252 let current = sqlx::query!(
254 "SELECT retry_count, status FROM cloud_outbox WHERE id = ?1",
255 id
256 )
257 .fetch_optional(&self.pool)
258 .await?;
259
260 let Some(row) = current else {
261 return Ok(());
265 };
266
267 let (new_status, new_count) = match outbox_core::decide_retry(row.retry_count) {
270 RetryDecision::Retry { next_count } => ("pending", next_count),
271 RetryDecision::Abandon { next_count } => ("abandoned", next_count),
272 };
273
274 sqlx::query!(
275 "UPDATE cloud_outbox \
276 SET status = ?1, retry_count = ?2, last_error = ?3, claimed_at = NULL \
277 WHERE id = ?4",
278 new_status,
279 new_count,
280 err_trimmed,
281 id
282 )
283 .execute(&self.pool)
284 .await?;
285
286 let prev = self.consecutive_failures.fetch_add(1, Ordering::SeqCst);
288 if prev + 1 >= CIRCUIT_FAILURE_THRESHOLD {
289 let until = now_unix_ms() + CIRCUIT_OPEN_DURATION_MS;
290 self.circuit_open_until_ms.store(until, Ordering::SeqCst);
291 }
292
293 Ok(())
294 }
295
296 pub async fn reset_stale(&self, threshold_secs: u64) -> Result<u64, sqlx::Error> {
300 let cutoff = now_unix_ms() - (threshold_secs as i64) * 1000;
301 let result = sqlx::query!(
302 "UPDATE cloud_outbox \
303 SET status = 'pending', claimed_at = NULL \
304 WHERE status = 'processing' AND claimed_at IS NOT NULL AND claimed_at < ?1",
305 cutoff
306 )
307 .execute(&self.pool)
308 .await?;
309 Ok(result.rows_affected())
310 }
311
312 pub async fn pending_counts_by_kind(&self) -> Result<Vec<(String, i64)>, sqlx::Error> {
316 let rows = sqlx::query(
317 "SELECT kind, COUNT(*) AS c \
318 FROM cloud_outbox WHERE status = 'pending' GROUP BY kind",
319 )
320 .fetch_all(&self.pool)
321 .await?;
322 let mut out: Vec<(String, i64)> = rows
323 .into_iter()
324 .map(|r| {
325 let kind: String = sqlx::Row::try_get(&r, "kind").unwrap_or_default();
326 let count: i64 = sqlx::Row::try_get(&r, "c").unwrap_or_default();
327 (kind, count)
328 })
329 .collect();
330 out.sort_by(|a, b| a.0.cmp(&b.0));
331 Ok(out)
332 }
333
334 pub async fn drain_abandoned_older_than(
352 &self,
353 cutoff_unix_ms: i64,
354 dry_run: bool,
355 ) -> Result<DrainSummary, sqlx::Error> {
356 let mut tx = self.pool.begin().await?;
357 let rows = sqlx::query(
358 "SELECT kind, COUNT(*) AS c \
359 FROM cloud_outbox \
360 WHERE status = 'abandoned' \
361 AND COALESCE(claimed_at, created_at) < ?1 \
362 GROUP BY kind",
363 )
364 .bind(cutoff_unix_ms)
365 .fetch_all(&mut *tx)
366 .await?;
367
368 let mut summary = DrainSummary::default();
369 for row in rows {
370 let kind: String = sqlx::Row::try_get(&row, "kind").unwrap_or_default();
371 let count: i64 = sqlx::Row::try_get(&row, "c").unwrap_or_default();
372 summary.per_kind.push((kind, count));
373 summary.total += count;
374 }
375 summary.per_kind.sort_by(|a, b| a.0.cmp(&b.0));
376
377 if dry_run || summary.total == 0 {
378 tx.rollback().await?;
381 return Ok(summary);
382 }
383
384 let affected = sqlx::query(
385 "UPDATE cloud_outbox \
386 SET status = 'pending', \
387 retry_count = 0, \
388 last_error = NULL, \
389 claimed_at = NULL \
390 WHERE status = 'abandoned' \
391 AND COALESCE(claimed_at, created_at) < ?1",
392 )
393 .bind(cutoff_unix_ms)
394 .execute(&mut *tx)
395 .await?;
396 tx.commit().await?;
397
398 self.consecutive_failures.store(0, Ordering::SeqCst);
403 self.circuit_open_until_ms.store(0, Ordering::SeqCst);
404
405 let affected = i64::try_from(affected.rows_affected()).unwrap_or(summary.total);
411 summary.total = affected;
412 Ok(summary)
413 }
414
415 pub async fn counts(&self) -> Result<OutboxCounts, sqlx::Error> {
419 let rows = sqlx::query!(
420 r#"SELECT status, COUNT(*) as "c!: i64" FROM cloud_outbox GROUP BY status"#
421 )
422 .fetch_all(&self.pool)
423 .await?;
424 let mut out = OutboxCounts::default();
425 for r in rows {
426 let status: String = r.status;
427 let count: i64 = r.c;
428 match status.as_str() {
429 "pending" => out.pending = count,
430 "processing" => out.processing = count,
431 "failed" => out.failed = count,
432 "abandoned" => out.abandoned = count,
433 _ => {}
434 }
435 }
436 Ok(out)
437 }
438}
439
440#[derive(Debug, Clone, Copy, Default)]
442pub struct OutboxCounts {
443 pub pending: i64,
444 pub processing: i64,
445 pub failed: i64,
446 pub abandoned: i64,
447}
448
449#[derive(Debug, Default, Clone, PartialEq, Eq)]
456pub struct DrainSummary {
457 pub total: i64,
458 pub per_kind: Vec<(String, i64)>,
459}
460
461#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
462pub struct AcceptedEditAttributionSummary {
463 pub uploaded: usize,
464 pub launch_grade: usize,
465 pub missing_team_workspace: usize,
466 pub missing_rule_ids: usize,
467 pub unlinked_rule_observations: usize,
468}
469
470impl AcceptedEditAttributionSummary {
471 pub const fn warning_count(self) -> usize {
472 self.missing_team_workspace + self.missing_rule_ids + self.unlinked_rule_observations
473 }
474
475 pub const fn add(&mut self, other: Self) {
476 self.uploaded += other.uploaded;
477 self.launch_grade += other.launch_grade;
478 self.missing_team_workspace += other.missing_team_workspace;
479 self.missing_rule_ids += other.missing_rule_ids;
480 self.unlinked_rule_observations += other.unlinked_rule_observations;
481 }
482}
483
484#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
485pub struct OutboxDrainReport {
486 pub attempted: usize,
487 pub confirmed: usize,
488 pub accepted_edit_attribution: AcceptedEditAttributionSummary,
489}
490
491struct DispatchOutcome {
494 ok: bool,
495 accepted_edit_attribution: Option<AcceptedEditAttributionSummary>,
496 last_error: Option<String>,
505}
506
507impl DispatchOutcome {
508 const fn ok(ok: bool) -> Self {
509 Self {
510 ok,
511 accepted_edit_attribution: None,
512 last_error: None,
513 }
514 }
515
516 const fn failed_with(last_error: String) -> Self {
521 Self {
522 ok: false,
523 accepted_edit_attribution: None,
524 last_error: Some(last_error),
525 }
526 }
527
528 fn from_outbox_failure(failure: &super::client::OutboxFailure) -> Self {
531 Self::failed_with(failure.format_for_outbox_last_error())
532 }
533}
534
535fn accepted_edit_attribution_summary(
536 expected_rule_ids: usize,
537 response: &super::api_types::RecordAcceptedEditResponse,
538) -> AcceptedEditAttributionSummary {
539 let mut summary = AcceptedEditAttributionSummary {
540 uploaded: usize::from(response.acceptance_recorded),
541 launch_grade: 0,
542 missing_team_workspace: 0,
543 missing_rule_ids: 0,
544 unlinked_rule_observations: 0,
545 };
546 if response.acceptance_recorded {
547 if expected_rule_ids == 0 {
548 summary.missing_rule_ids = 1;
549 }
550 if response.team_id.is_none() {
551 summary.missing_team_workspace = 1;
552 }
553 if expected_rule_ids > 0 && response.observations_inserted == 0 {
554 summary.unlinked_rule_observations = 1;
555 }
556 if summary.warning_count() == 0 {
557 summary.launch_grade = 1;
558 }
559 }
560 summary
561}
562
563pub mod kind {
568 pub const TRAJECTORY: &str = "trajectory";
569 pub const REVIEW_METRICS: &str = "review_metrics";
570 pub const ACCEPTED_EDIT: &str = "accepted_edit";
571 pub const LEGACY_FIX_ACCEPTANCE: &str = "fix_acceptance";
575 pub const MCP_QUERY: &str = "mcp_query";
576 pub const IMPORTED_REVIEWS: &str = "imported_reviews";
577 pub const OBSERVATION: &str = "observation";
580 pub const SESSION_MINED_CANDIDATE: &str = "session_mined_candidate";
588}
589
590pub async fn drain_outbox(
601 queue: &OutboxQueue,
602 client: &super::client::CloudClient,
603 max_items: usize,
604) -> Result<(usize, usize), sqlx::Error> {
605 let report = drain_outbox_report(queue, client, max_items).await?;
606 Ok((report.attempted, report.confirmed))
607}
608
609pub async fn drain_outbox_report(
610 queue: &OutboxQueue,
611 client: &super::client::CloudClient,
612 max_items: usize,
613) -> Result<OutboxDrainReport, sqlx::Error> {
614 if !client.is_logged_in() {
615 return Ok(OutboxDrainReport::default());
618 }
619 let _drain_guard = drain_serialization_lock().lock().await;
620
621 let mut attempted = 0usize;
622 let mut confirmed = 0usize;
623 let mut accepted_edit_attribution = AcceptedEditAttributionSummary::default();
624 for _ in 0..max_items {
625 if matches!(queue.circuit_state(), CircuitState::Open { .. }) {
626 break;
627 }
628 let Some(item) = queue.claim_next().await? else {
629 break;
630 };
631 attempted += 1;
632 let outcome = match dispatch(client, &item).await {
633 Ok(outcome) => outcome,
634 Err(err) => {
635 let _ = queue.mark_failed(item.id, &err).await;
636 continue;
637 }
638 };
639 if outcome.ok {
640 queue.confirm(item.id).await?;
641 confirmed += 1;
642 if let Some(summary) = outcome.accepted_edit_attribution {
643 accepted_edit_attribution.add(summary);
644 }
645 } else {
646 let err_msg = outcome
648 .last_error
649 .as_deref()
650 .unwrap_or("upload returned non-2xx (no detail)");
651 let _ = queue.mark_failed(item.id, err_msg).await;
652 }
653 }
654 Ok(OutboxDrainReport {
655 attempted,
656 confirmed,
657 accepted_edit_attribution,
658 })
659}
660
661pub async fn drain_outbox_kind(
662 queue: &OutboxQueue,
663 client: &super::client::CloudClient,
664 kind: &str,
665 max_items: usize,
666) -> Result<(usize, usize), sqlx::Error> {
667 let report = drain_outbox_kind_report(queue, client, kind, max_items).await?;
668 Ok((report.attempted, report.confirmed))
669}
670
671pub async fn drain_outbox_kind_report(
672 queue: &OutboxQueue,
673 client: &super::client::CloudClient,
674 kind: &str,
675 max_items: usize,
676) -> Result<OutboxDrainReport, sqlx::Error> {
677 if !client.is_logged_in() {
678 return Ok(OutboxDrainReport::default());
679 }
680 let _drain_guard = drain_serialization_lock().lock().await;
681
682 let mut attempted = 0usize;
683 let mut confirmed = 0usize;
684 let mut accepted_edit_attribution = AcceptedEditAttributionSummary::default();
685 for _ in 0..max_items {
686 if matches!(queue.circuit_state(), CircuitState::Open { .. }) {
687 break;
688 }
689 let Some(item) = queue.claim_next_kind(kind).await? else {
690 break;
691 };
692 attempted += 1;
693 let outcome = match dispatch(client, &item).await {
694 Ok(outcome) => outcome,
695 Err(err) => {
696 let _ = queue.mark_failed(item.id, &err).await;
697 continue;
698 }
699 };
700 if outcome.ok {
701 queue.confirm(item.id).await?;
702 confirmed += 1;
703 if let Some(summary) = outcome.accepted_edit_attribution {
704 accepted_edit_attribution.add(summary);
705 }
706 } else {
707 let err_msg = outcome
709 .last_error
710 .as_deref()
711 .unwrap_or("upload returned non-2xx (no detail)");
712 let _ = queue.mark_failed(item.id, err_msg).await;
713 }
714 }
715 Ok(OutboxDrainReport {
716 attempted,
717 confirmed,
718 accepted_edit_attribution,
719 })
720}
721
722async fn dispatch(
736 client: &super::client::CloudClient,
737 item: &OutboxItem,
738) -> Result<DispatchOutcome, String> {
739 use super::api_types::{
740 RecordAcceptedEditRequest, RecordReviewMetricsRequest, UploadImportedReviewsRequest,
741 };
742 use serde_json::Value;
743
744 match item.kind.as_str() {
745 kind::TRAJECTORY => {
746 let v: Value = serde_json::from_str(&item.payload_json)
747 .map_err(|e| format!("trajectory parse: {e}"))?;
748 let pr_review_id = v
749 .get("pr_review_id")
750 .and_then(|x| x.as_str())
751 .ok_or_else(|| "trajectory missing pr_review_id".to_owned())?;
752 let steps = v.get("steps").cloned().unwrap_or(Value::Array(Vec::new()));
753 Ok(
755 match client.save_trajectory_outcome(pr_review_id, steps).await {
756 Ok(()) => DispatchOutcome::ok(true),
757 Err(failure) => DispatchOutcome::from_outbox_failure(&failure),
758 },
759 )
760 }
761 kind::REVIEW_METRICS => {
762 let v: Value = serde_json::from_str(&item.payload_json)
763 .map_err(|e| format!("review_metrics parse: {e}"))?;
764 let review_id = v
765 .get("review_id")
766 .and_then(|x| x.as_str())
767 .ok_or_else(|| "review_metrics missing review_id".to_owned())?
768 .to_owned();
769 let req_val = v
770 .get("req")
771 .cloned()
772 .unwrap_or(Value::Object(serde_json::Map::default()));
773 let req: RecordReviewMetricsRequest = serde_json::from_value(req_val)
774 .map_err(|e| format!("review_metrics decode req: {e}"))?;
775 Ok(
776 match client.record_review_metrics_outcome(&review_id, req).await {
777 Ok(()) => DispatchOutcome::ok(true),
778 Err(failure) => DispatchOutcome::from_outbox_failure(&failure),
779 },
780 )
781 }
782 kind::ACCEPTED_EDIT => {
783 let req: RecordAcceptedEditRequest = serde_json::from_str(&item.payload_json)
784 .map_err(|e| format!("accepted_edit parse: {e}"))?;
785 let expected_rule_ids = req
786 .rule_ids
787 .iter()
788 .filter(|rule_id| !rule_id.trim().is_empty())
789 .count();
790 let response = client.record_accepted_edit_response(req).await?;
791 let summary = accepted_edit_attribution_summary(expected_rule_ids, &response);
792 let last_error = if response.acceptance_recorded {
798 None
799 } else {
800 Some(format!(
801 "accepted_edit rejected: {}",
802 response.error.as_deref().unwrap_or("no detail")
803 ))
804 };
805 Ok(DispatchOutcome {
806 ok: response.acceptance_recorded,
807 accepted_edit_attribution: Some(summary),
808 last_error,
809 })
810 }
811 kind::LEGACY_FIX_ACCEPTANCE => {
812 Ok(DispatchOutcome::ok(true))
817 }
818 kind::MCP_QUERY => {
819 let v: Value = serde_json::from_str(&item.payload_json)
820 .map_err(|e| format!("mcp_query parse: {e}"))?;
821 let file = v
822 .get("file")
823 .and_then(|x| x.as_str())
824 .unwrap_or("")
825 .to_owned();
826 let intent = v
827 .get("intent")
828 .and_then(|x| x.as_str())
829 .map(ToOwned::to_owned);
830 let rules_injected = v
831 .get("rules_injected")
832 .and_then(Value::as_u64)
833 .and_then(|n| usize::try_from(n).ok())
834 .unwrap_or(0);
835 let strict_match_count = v
836 .get("strict_match_count")
837 .and_then(Value::as_u64)
838 .and_then(|n| usize::try_from(n).ok())
839 .unwrap_or(0);
840 let rule_titles: Vec<String> = v
841 .get("rule_titles")
842 .and_then(|x| x.as_array())
843 .map(|arr| {
844 arr.iter()
845 .filter_map(|t| t.as_str().map(String::from))
846 .collect()
847 })
848 .unwrap_or_default();
849 let rule_ids: Vec<String> = v
850 .get("rule_ids")
851 .and_then(|x| x.as_array())
852 .map(|arr| {
853 arr.iter()
854 .filter_map(|t| t.as_str().map(String::from))
855 .collect()
856 })
857 .unwrap_or_default();
858 let client_label = v
859 .get("client_label")
860 .and_then(|x| x.as_str())
861 .map(ToOwned::to_owned);
862 let repo_full_name = v
863 .get("repo_full_name")
864 .and_then(|x| x.as_str())
865 .map(ToOwned::to_owned);
866 Ok(
867 match client
868 .track_mcp_query_outcome(
869 &file,
870 intent.as_deref(),
871 rules_injected,
872 strict_match_count,
873 rule_titles,
874 rule_ids,
875 client_label.as_deref(),
876 repo_full_name.as_deref(),
877 )
878 .await
879 {
880 Ok(()) => DispatchOutcome::ok(true),
881 Err(failure) => DispatchOutcome::from_outbox_failure(&failure),
882 },
883 )
884 }
885 kind::IMPORTED_REVIEWS => {
886 let req: UploadImportedReviewsRequest = serde_json::from_str(&item.payload_json)
887 .map_err(|e| format!("imported_reviews parse: {e}"))?;
888 Ok(match client.upload_imported_reviews_outcome(&req).await {
889 Ok(()) => DispatchOutcome::ok(true),
890 Err(failure) => DispatchOutcome::from_outbox_failure(&failure),
891 })
892 }
893 kind::OBSERVATION => {
894 let obs: super::api_types::Observation = serde_json::from_str(&item.payload_json)
895 .map_err(|e| format!("observation parse: {e}"))?;
896 Ok(
897 match client
898 .post_observations_outcome(std::slice::from_ref(&obs))
899 .await
900 {
901 Ok(()) => DispatchOutcome::ok(true),
902 Err(failure) => DispatchOutcome::from_outbox_failure(&failure),
903 },
904 )
905 }
906 other => Err(format!("unknown outbox kind '{other}'")),
907 }
908}
909
910#[cfg(test)]
911mod tests {
912 use super::*;
913 use crate::cloud::api_types::RecordAcceptedEditResponse;
914 use sqlx::sqlite::{SqliteConnectOptions, SqlitePoolOptions};
915
916 async fn fresh_pool() -> SqlitePool {
917 let opts = SqliteConnectOptions::new()
918 .filename(":memory:")
919 .create_if_missing(true);
920 let pool = SqlitePoolOptions::new()
921 .max_connections(1)
922 .connect_with(opts)
923 .await
924 .expect("pool");
925 sqlx::migrate!("./migrations")
926 .run(&pool)
927 .await
928 .expect("apply migrations");
929 pool
930 }
931
932 async fn status_of(pool: &SqlitePool, id: i64) -> Option<String> {
933 sqlx::query_scalar!("SELECT status FROM cloud_outbox WHERE id = ?1", id)
934 .fetch_optional(pool)
935 .await
936 .unwrap()
937 }
938
939 fn accepted_edit_response(
940 acceptance_recorded: bool,
941 team_id: Option<&str>,
942 observations_inserted: u32,
943 ) -> RecordAcceptedEditResponse {
944 RecordAcceptedEditResponse {
945 ok: acceptance_recorded,
946 acceptance_recorded,
947 acceptance_id: acceptance_recorded.then(|| "acceptance-1".to_owned()),
948 diff_signature: Some("diff-1".to_owned()),
949 team_id: team_id.map(str::to_owned),
950 attributed_rule_ids: Vec::new(),
951 observations_inserted,
952 memory_reinforcement_recorded: false,
953 memory_reinforcement_deduped: false,
954 error: None,
955 }
956 }
957
958 #[test]
959 fn accepted_edit_attribution_summary_counts_launch_grade_uploads() {
960 let response = accepted_edit_response(true, Some("team-1"), 2);
961 let summary = accepted_edit_attribution_summary(2, &response);
962
963 assert_eq!(summary.uploaded, 1);
964 assert_eq!(summary.launch_grade, 1);
965 assert_eq!(summary.warning_count(), 0);
966 }
967
968 #[test]
969 fn accepted_edit_attribution_summary_flags_raw_only_uploads() {
970 let missing_team =
971 accepted_edit_attribution_summary(2, &accepted_edit_response(true, None, 2));
972 assert_eq!(missing_team.uploaded, 1);
973 assert_eq!(missing_team.launch_grade, 0);
974 assert_eq!(missing_team.missing_team_workspace, 1);
975
976 let missing_rule_ids =
977 accepted_edit_attribution_summary(0, &accepted_edit_response(true, Some("team-1"), 0));
978 assert_eq!(missing_rule_ids.missing_rule_ids, 1);
979 assert_eq!(missing_rule_ids.launch_grade, 0);
980
981 let unlinked_observation =
982 accepted_edit_attribution_summary(2, &accepted_edit_response(true, Some("team-1"), 0));
983 assert_eq!(unlinked_observation.unlinked_rule_observations, 1);
984 assert_eq!(unlinked_observation.launch_grade, 0);
985 }
986
987 #[test]
988 fn accepted_edit_attribution_summary_ignores_failed_uploads() {
989 let response = accepted_edit_response(false, None, 0);
990 let summary = accepted_edit_attribution_summary(0, &response);
991
992 assert_eq!(summary.uploaded, 0);
993 assert_eq!(summary.launch_grade, 0);
994 assert_eq!(summary.warning_count(), 0);
995 }
996
997 #[tokio::test]
998 async fn legacy_fix_acceptance_dispatch_skips_current_accepted_edit_pipeline() {
999 let client = crate::cloud::client::CloudClient::new();
1000 let item = OutboxItem {
1001 id: 1,
1002 kind: kind::LEGACY_FIX_ACCEPTANCE.to_owned(),
1003 payload_json: "not accepted-edit json".to_owned(),
1004 retry_count: 0,
1005 };
1006
1007 let outcome = dispatch(&client, &item)
1008 .await
1009 .expect("legacy rows are explicitly acknowledged and skipped");
1010
1011 assert!(outcome.ok);
1012 assert_eq!(outcome.accepted_edit_attribution, None);
1013 }
1014
1015 #[tokio::test]
1016 async fn enqueue_then_claim_moves_to_processing() {
1017 let pool = fresh_pool().await;
1018 let q = OutboxQueue::new(pool.clone());
1019 let id = q.enqueue("trajectory", "{}").await.unwrap();
1020 assert_eq!(status_of(&pool, id).await.as_deref(), Some("pending"));
1021
1022 let item = q.claim_next().await.unwrap().expect("row claimed");
1023 assert_eq!(item.id, id);
1024 assert_eq!(status_of(&pool, id).await.as_deref(), Some("processing"));
1025 }
1026
1027 #[tokio::test]
1028 async fn claim_next_kind_prioritizes_matching_kind() {
1029 let pool = fresh_pool().await;
1030 let q = OutboxQueue::new(pool.clone());
1031 let old_fix = q.enqueue(kind::ACCEPTED_EDIT, "{}").await.unwrap();
1032 let mcp = q.enqueue(kind::MCP_QUERY, "{}").await.unwrap();
1033
1034 let item = q
1035 .claim_next_kind(kind::MCP_QUERY)
1036 .await
1037 .unwrap()
1038 .expect("mcp row claimed");
1039
1040 assert_eq!(item.id, mcp);
1041 assert_eq!(status_of(&pool, mcp).await.as_deref(), Some("processing"));
1042 assert_eq!(status_of(&pool, old_fix).await.as_deref(), Some("pending"));
1043 }
1044
1045 #[tokio::test]
1046 async fn drain_serialization_lock_is_process_wide() {
1047 let guard = drain_serialization_lock()
1048 .try_lock()
1049 .expect("first drain lock");
1050 assert!(
1051 drain_serialization_lock().try_lock().is_err(),
1052 "concurrent drainers must share the same in-process lock"
1053 );
1054 drop(guard);
1055 assert!(drain_serialization_lock().try_lock().is_ok());
1056 }
1057
1058 #[tokio::test]
1059 async fn confirm_deletes_row() {
1060 let pool = fresh_pool().await;
1061 let q = OutboxQueue::new(pool.clone());
1062 let id = q.enqueue("trajectory", "{}").await.unwrap();
1063 let item = q.claim_next().await.unwrap().unwrap();
1064 q.confirm(item.id).await.unwrap();
1065 assert!(status_of(&pool, id).await.is_none());
1066 }
1067
1068 #[tokio::test]
1069 async fn mark_failed_eight_times_abandons() {
1070 let pool = fresh_pool().await;
1076 let q = OutboxQueue::new(pool.clone());
1077 let id = q.enqueue("trajectory", "{}").await.unwrap();
1078
1079 for attempt in 1..=7 {
1086 q.circuit_open_until_ms.store(0, Ordering::SeqCst);
1087 q.consecutive_failures.store(0, Ordering::SeqCst);
1088 let item = q.claim_next().await.unwrap().unwrap();
1089 q.mark_failed(item.id, &format!("net {attempt}"))
1090 .await
1091 .unwrap();
1092 assert_eq!(
1093 status_of(&pool, id).await.as_deref(),
1094 Some("pending"),
1095 "attempt {attempt}: retry_count {attempt} (< 8) must stay pending"
1096 );
1097 }
1098
1099 q.circuit_open_until_ms.store(0, Ordering::SeqCst);
1102 q.consecutive_failures.store(0, Ordering::SeqCst);
1103 let item = q.claim_next().await.unwrap().unwrap();
1104 q.mark_failed(item.id, "net 8").await.unwrap();
1105 assert_eq!(status_of(&pool, id).await.as_deref(), Some("abandoned"));
1106
1107 q.circuit_open_until_ms.store(0, Ordering::SeqCst);
1110 q.consecutive_failures.store(0, Ordering::SeqCst);
1111 assert!(q.claim_next().await.unwrap().is_none());
1112 }
1113
1114 #[tokio::test]
1115 async fn claim_next_auto_recovers_stale_processing_rows() {
1116 let pool = fresh_pool().await;
1122 let q = OutboxQueue::new(pool.clone());
1123 let id = q.enqueue("trajectory", "{\"crashed\":true}").await.unwrap();
1124
1125 let first = q.claim_next().await.unwrap().expect("first claim");
1127 assert_eq!(first.id, id);
1128 assert_eq!(status_of(&pool, id).await.as_deref(), Some("processing"));
1129
1130 let stale = now_unix_ms() - (DEFAULT_STALE_SECONDS as i64 + 30) * 1000;
1133 sqlx::query!(
1134 "UPDATE cloud_outbox SET claimed_at = ?1 WHERE id = ?2",
1135 stale,
1136 id
1137 )
1138 .execute(&pool)
1139 .await
1140 .unwrap();
1141
1142 let recovered = q.claim_next().await.unwrap().expect("recovered claim");
1145 assert_eq!(recovered.id, id, "stale row must be re-claimable");
1146 assert_eq!(status_of(&pool, id).await.as_deref(), Some("processing"));
1147 }
1148
1149 #[tokio::test]
1150 async fn claim_next_ignores_fresh_processing_rows() {
1151 let pool = fresh_pool().await;
1155 let q = OutboxQueue::new(pool.clone());
1156 let _fresh = q.enqueue("trajectory", "{}").await.unwrap();
1157 let item = q.claim_next().await.unwrap().expect("initial claim");
1158
1159 assert!(q.claim_next().await.unwrap().is_none());
1162 q.confirm(item.id).await.unwrap();
1164 }
1165
1166 #[tokio::test]
1167 async fn reset_stale_promotes_processing_rows() {
1168 let pool = fresh_pool().await;
1169 let q = OutboxQueue::new(pool.clone());
1170 let id = q.enqueue("trajectory", "{}").await.unwrap();
1171 let _ = q.claim_next().await.unwrap().unwrap();
1172 assert_eq!(status_of(&pool, id).await.as_deref(), Some("processing"));
1173
1174 let backdated = now_unix_ms() - 120_000;
1176 sqlx::query!(
1177 "UPDATE cloud_outbox SET claimed_at = ?1 WHERE id = ?2",
1178 backdated,
1179 id
1180 )
1181 .execute(&pool)
1182 .await
1183 .unwrap();
1184
1185 let promoted = q.reset_stale(60).await.unwrap();
1186 assert_eq!(promoted, 1);
1187 assert_eq!(status_of(&pool, id).await.as_deref(), Some("pending"));
1188 }
1189
1190 #[tokio::test]
1191 async fn circuit_breaker_halts_claims_after_three_failures() {
1192 let pool = fresh_pool().await;
1193 let q = OutboxQueue::new(pool.clone());
1194
1195 for i in 0..4 {
1197 q.enqueue("trajectory", &format!("{{\"i\":{i}}}"))
1198 .await
1199 .unwrap();
1200 }
1201
1202 for _ in 0..3 {
1203 let item = q.claim_next().await.unwrap().unwrap();
1204 q.mark_failed(item.id, "x").await.unwrap();
1205 }
1206
1207 assert!(matches!(q.circuit_state(), CircuitState::Open { .. }));
1210 assert!(q.claim_next().await.unwrap().is_none());
1211 }
1212
1213 #[tokio::test]
1214 async fn confirm_resets_consecutive_failure_counter() {
1215 let pool = fresh_pool().await;
1216 let q = OutboxQueue::new(pool.clone());
1217
1218 let _id1 = q.enqueue("trajectory", "{}").await.unwrap();
1221 let _id2 = q.enqueue("trajectory", "{}").await.unwrap();
1222
1223 let item = q.claim_next().await.unwrap().unwrap();
1224 q.mark_failed(item.id, "f1").await.unwrap();
1225 let item = q.claim_next().await.unwrap().unwrap();
1226 q.mark_failed(item.id, "f2").await.unwrap();
1227 assert_eq!(q.consecutive_failures.load(Ordering::SeqCst), 2);
1228
1229 let item = q.claim_next().await.unwrap().unwrap();
1233 q.confirm(item.id).await.unwrap();
1234 assert_eq!(q.consecutive_failures.load(Ordering::SeqCst), 0);
1235 }
1236
1237 #[tokio::test]
1238 async fn claim_next_returns_observation_kind() {
1239 let pool = fresh_pool().await;
1240 let q = OutboxQueue::new(pool.clone());
1241 let obs_id = q
1242 .enqueue(kind::OBSERVATION, r#"{"session_id":"s"}"#)
1243 .await
1244 .unwrap();
1245 let traj_id = q.enqueue(kind::TRAJECTORY, "{}").await.unwrap();
1246
1247 let first = q.claim_next().await.unwrap().expect("claimed first");
1248 let second = q.claim_next().await.unwrap().expect("claimed second");
1249 assert_eq!(first.id, obs_id);
1250 assert_eq!(first.kind, kind::OBSERVATION);
1251 assert_eq!(second.id, traj_id);
1252 }
1253
1254 #[tokio::test]
1255 async fn claim_next_returns_oldest_first() {
1256 let pool = fresh_pool().await;
1257 let q = OutboxQueue::new(pool.clone());
1258 let a = q.enqueue("trajectory", r#"{"n":"a"}"#).await.unwrap();
1259 tokio::time::sleep(std::time::Duration::from_millis(2)).await;
1261 let b = q.enqueue("trajectory", r#"{"n":"b"}"#).await.unwrap();
1262
1263 let first = q.claim_next().await.unwrap().unwrap();
1264 let second = q.claim_next().await.unwrap().unwrap();
1265 assert_eq!(first.id, a);
1266 assert_eq!(second.id, b);
1267 }
1268
1269 async fn insert_abandoned(
1273 pool: &SqlitePool,
1274 kind: &str,
1275 created_at_ms: i64,
1276 claimed_at_ms: Option<i64>,
1277 ) -> i64 {
1278 sqlx::query(
1279 "INSERT INTO cloud_outbox \
1280 (kind, payload_json, status, retry_count, created_at, claimed_at, last_error) \
1281 VALUES (?1, '{}', 'abandoned', ?2, ?3, ?4, 'upload returned non-2xx')",
1282 )
1283 .bind(kind)
1284 .bind(MAX_RETRY_COUNT)
1285 .bind(created_at_ms)
1286 .bind(claimed_at_ms)
1287 .execute(pool)
1288 .await
1289 .unwrap()
1290 .last_insert_rowid()
1291 }
1292
1293 #[tokio::test]
1294 async fn drain_abandoned_dry_run_reports_per_kind_without_mutating() {
1295 let pool = fresh_pool().await;
1296 let q = OutboxQueue::new(pool.clone());
1297 let now = now_unix_ms();
1298 let old = now - 31 * 86_400_000; let mcp_id = insert_abandoned(&pool, "mcp_query", old, Some(old)).await;
1300 let obs_id = insert_abandoned(&pool, "observation", old, Some(old)).await;
1301 let _other_mcp = insert_abandoned(&pool, "mcp_query", old, Some(old)).await;
1302
1303 let summary = q.drain_abandoned_older_than(now, true).await.unwrap();
1304
1305 assert_eq!(summary.total, 3);
1306 assert_eq!(
1308 summary.per_kind,
1309 vec![("mcp_query".to_owned(), 2), ("observation".to_owned(), 1),]
1310 );
1311 assert_eq!(status_of(&pool, mcp_id).await.as_deref(), Some("abandoned"));
1313 assert_eq!(status_of(&pool, obs_id).await.as_deref(), Some("abandoned"));
1314 }
1315
1316 #[tokio::test]
1317 async fn drain_abandoned_real_resets_eligible_rows_only() {
1318 let pool = fresh_pool().await;
1319 let q = OutboxQueue::new(pool.clone());
1320 let now = now_unix_ms();
1321 let old = now - 31 * 86_400_000;
1322 let fresh = now - 60_000; let cutoff = now - 7 * 86_400_000; let old_row = insert_abandoned(&pool, "mcp_query", old, Some(old)).await;
1326 let fresh_row = insert_abandoned(&pool, "mcp_query", fresh, Some(fresh)).await;
1327
1328 q.consecutive_failures
1331 .store(CIRCUIT_FAILURE_THRESHOLD, Ordering::SeqCst);
1332 q.circuit_open_until_ms
1333 .store(now + 60_000, Ordering::SeqCst);
1334
1335 let summary = q.drain_abandoned_older_than(cutoff, false).await.unwrap();
1336 assert_eq!(summary.total, 1);
1337 assert_eq!(status_of(&pool, old_row).await.as_deref(), Some("pending"));
1338 assert_eq!(
1339 status_of(&pool, fresh_row).await.as_deref(),
1340 Some("abandoned"),
1341 "rows newer than cutoff must NOT be touched",
1342 );
1343
1344 let retry_count: i64 =
1346 sqlx::query_scalar("SELECT retry_count FROM cloud_outbox WHERE id = ?1")
1347 .bind(old_row)
1348 .fetch_one(&pool)
1349 .await
1350 .unwrap();
1351 assert_eq!(retry_count, 0);
1352 let last_error: Option<String> =
1353 sqlx::query_scalar("SELECT last_error FROM cloud_outbox WHERE id = ?1")
1354 .bind(old_row)
1355 .fetch_one(&pool)
1356 .await
1357 .unwrap();
1358 assert!(last_error.is_none());
1359
1360 assert_eq!(q.consecutive_failures.load(Ordering::SeqCst), 0);
1363 assert_eq!(q.circuit_open_until_ms.load(Ordering::SeqCst), 0);
1364 }
1365
1366 #[tokio::test]
1367 async fn drain_abandoned_uses_created_at_when_no_claimed_at() {
1368 let pool = fresh_pool().await;
1372 let q = OutboxQueue::new(pool.clone());
1373 let now = now_unix_ms();
1374 let old = now - 90 * 86_400_000;
1375 let id = insert_abandoned(&pool, "observation", old, None).await;
1376 let cutoff = now - 30 * 86_400_000;
1377
1378 let summary = q.drain_abandoned_older_than(cutoff, false).await.unwrap();
1379 assert_eq!(summary.total, 1);
1380 assert_eq!(status_of(&pool, id).await.as_deref(), Some("pending"));
1381 }
1382
1383 #[tokio::test]
1384 async fn drain_abandoned_empty_queue_is_a_noop() {
1385 let pool = fresh_pool().await;
1386 let q = OutboxQueue::new(pool.clone());
1387 let summary = q
1388 .drain_abandoned_older_than(now_unix_ms(), false)
1389 .await
1390 .unwrap();
1391 assert_eq!(summary.total, 0);
1392 assert!(summary.per_kind.is_empty());
1393 }
1394
1395 #[tokio::test]
1396 async fn pending_counts_by_kind_buckets_pending_rows() {
1397 let pool = fresh_pool().await;
1398 let q = OutboxQueue::new(pool.clone());
1399 q.enqueue("mcp_query", "{}").await.unwrap();
1400 q.enqueue("mcp_query", "{}").await.unwrap();
1401 q.enqueue("observation", "{}").await.unwrap();
1402
1403 let counts = q.pending_counts_by_kind().await.unwrap();
1404 assert_eq!(
1405 counts,
1406 vec![("mcp_query".to_owned(), 2), ("observation".to_owned(), 1),]
1407 );
1408 }
1409
1410 use crate::cloud::client::{HttpFailure, OutboxFailure, normalize_body_snippet};
1413
1414 #[test]
1415 fn normalize_body_snippet_collapses_whitespace_runs_to_single_spaces() {
1416 let raw = " line one \n\nline two\t\twith\ttabs ";
1417 let snippet = normalize_body_snippet(raw, 200);
1418 assert_eq!(snippet, "line one line two with tabs");
1419 assert!(!snippet.contains('\n'));
1420 assert!(!snippet.contains('\t'));
1421 }
1422
1423 #[test]
1424 fn normalize_body_snippet_caps_to_max_chars_without_splitting_utf8() {
1425 let raw = "😀😀😀😀😀ASCII tail";
1429 let snippet = normalize_body_snippet(raw, 5);
1430 assert_eq!(snippet.chars().count(), 5);
1431 assert_eq!(snippet, "😀😀😀😀😀");
1432 }
1433
1434 #[test]
1435 fn outbox_failure_http_with_body_matches_spec_format() {
1436 let failure = OutboxFailure::Http(HttpFailure {
1438 status: 401,
1439 reason_phrase: "Unauthorized".to_owned(),
1440 body_snippet: r#"{"error":"session_revoked"}"#.to_owned(),
1441 });
1442 assert_eq!(
1443 failure.format_for_outbox_last_error(),
1444 r#"401 Unauthorized: {"error":"session_revoked"}"#
1445 );
1446 }
1447
1448 #[test]
1449 fn outbox_failure_http_with_empty_body_omits_trailing_colon() {
1450 let failure = OutboxFailure::Http(HttpFailure {
1451 status: 500,
1452 reason_phrase: "Internal Server Error".to_owned(),
1453 body_snippet: String::new(),
1454 });
1455 assert_eq!(
1456 failure.format_for_outbox_last_error(),
1457 "500 Internal Server Error",
1458 );
1459 }
1460
1461 #[test]
1462 fn outbox_failure_transport_uses_distinct_sentinel_not_non_2xx_literal() {
1463 let failure = OutboxFailure::Transport("dns lookup failed: timed out".to_owned());
1464 let formatted = failure.format_for_outbox_last_error();
1465 assert!(formatted.starts_with("transport: "));
1466 assert!(formatted.contains("dns lookup failed"));
1467 assert!(
1469 !formatted.contains("non-2xx"),
1470 "transport failures must not collapse to the legacy 'non-2xx' bucket"
1471 );
1472 }
1473
1474 #[tokio::test]
1475 async fn mark_failed_persists_dispatchoutcome_last_error_verbatim() {
1476 let pool = fresh_pool().await;
1479 let q = OutboxQueue::new(pool.clone());
1480 let id = q.enqueue("trajectory", "{}").await.unwrap();
1481 let _claimed = q.claim_next().await.unwrap().expect("row claimed");
1482
1483 let formatted = OutboxFailure::Http(HttpFailure {
1484 status: 401,
1485 reason_phrase: "Unauthorized".to_owned(),
1486 body_snippet: r#"{"error":"session_revoked"}"#.to_owned(),
1487 })
1488 .format_for_outbox_last_error();
1489 q.mark_failed(id, &formatted).await.unwrap();
1490
1491 let stored: Option<String> =
1492 sqlx::query_scalar!("SELECT last_error FROM cloud_outbox WHERE id = ?1", id)
1493 .fetch_one(&pool)
1494 .await
1495 .unwrap();
1496 let stored = stored.expect("mark_failed must populate last_error");
1497 assert!(stored.starts_with("401 Unauthorized:"));
1498 assert!(stored.contains("session_revoked"));
1499 assert_ne!(stored, "upload returned non-2xx");
1501 }
1502
1503 #[test]
1504 fn dispatch_outcome_from_outbox_failure_propagates_spec_format() {
1505 let outcome = DispatchOutcome::from_outbox_failure(&OutboxFailure::Http(HttpFailure {
1508 status: 401,
1509 reason_phrase: "Unauthorized".to_owned(),
1510 body_snippet: r#"{"error":"session_revoked"}"#.to_owned(),
1511 }));
1512 assert!(!outcome.ok);
1513 let last = outcome
1514 .last_error
1515 .expect("failures must always carry a last_error");
1516 assert!(last.starts_with("401 Unauthorized:"));
1517 assert!(last.contains("session_revoked"));
1518 }
1519}