use super::outbox_core::{self, RetryDecision, now_unix_ms};
use serde::{Deserialize, Serialize};
use sqlx::{Row, SqlitePool};
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicI64, AtomicU32, AtomicU64, Ordering};
use std::sync::{Arc, OnceLock};
const SESSION_MINED_LOCAL_REVIEW_STATUS_PATH: &str = "$.localReview.status";
const SESSION_MINED_LOCAL_REVIEW_APPROVED: &str = "approved";
use tokio::sync::Mutex;
pub const DEFAULT_STALE_SECONDS: u64 = 60;
pub const CIRCUIT_FAILURE_THRESHOLD: u32 = 3;
pub const CIRCUIT_OPEN_DURATION_MS: i64 = 60_000;
pub const MAX_RETRY_COUNT: i64 = outbox_core::MAX_RETRY_COUNT;
use outbox_core::MAX_OBSERVATION_BATCH as OBSERVATION_OUTBOX_BATCH_SIZE;
static DRAIN_SERIALIZATION_LOCK: OnceLock<Mutex<()>> = OnceLock::new();
static SPILL_SEQ: AtomicU64 = AtomicU64::new(0);
const DRAIN_STATE_FILE_NAME: &str = "outbox-drain-state.json";
const HOOK_SPILL_DIR: &str = "hook-spill/observations";
const SPILL_RECORD_VERSION: u32 = 1;
fn drain_serialization_lock() -> &'static Mutex<()> {
DRAIN_SERIALIZATION_LOCK.get_or_init(|| Mutex::new(()))
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "camelCase")]
pub struct OutboxDrainState {
pub version: u32,
pub updated_at_ms: i64,
pub last_drain_at_ms: i64,
pub attempted: usize,
pub confirmed: usize,
pub last_error: Option<String>,
}
pub fn drain_state_path() -> crate::Result<PathBuf> {
Ok(crate::infra::paths::data_home()?.join(DRAIN_STATE_FILE_NAME))
}
pub fn read_drain_state() -> crate::Result<Option<OutboxDrainState>> {
let path = drain_state_path()?;
if !path.exists() {
return Ok(None);
}
let raw = std::fs::read_to_string(&path)?;
Ok(Some(serde_json::from_str(&raw)?))
}
fn write_drain_state(report: &OutboxDrainReport, last_error: Option<String>) {
let now = now_unix_ms();
let state = OutboxDrainState {
version: 1,
updated_at_ms: now,
last_drain_at_ms: now,
attempted: report.attempted,
confirmed: report.confirmed,
last_error,
};
if let Ok(path) = drain_state_path()
&& let Ok(json) = serde_json::to_vec_pretty(&state)
{
let _ = crate::infra::files::write_atomic(&path, &json);
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "camelCase")]
pub struct HookSpillRecord {
pub version: u32,
pub kind: String,
pub payload_json: String,
pub created_at_ms: i64,
pub last_error: Option<String>,
}
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct HookSpillStats {
pub count: usize,
pub bytes: u64,
pub oldest_created_at_ms: Option<i64>,
pub newest_created_at_ms: Option<i64>,
pub newest_error: Option<String>,
pub path: Option<PathBuf>,
}
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct HookSpillReplayReport {
pub attempted: usize,
pub replayed: usize,
pub failed: usize,
}
pub fn hook_spill_dir() -> crate::Result<PathBuf> {
Ok(crate::infra::paths::data_home()?.join(HOOK_SPILL_DIR))
}
pub fn spill_observation_payload(
payload_json: &str,
error: impl Into<String>,
) -> crate::Result<PathBuf> {
spill_payload(kind::OBSERVATION, payload_json, Some(error.into()))
}
fn spill_payload(
kind: &str,
payload_json: &str,
last_error: Option<String>,
) -> crate::Result<PathBuf> {
let dir = hook_spill_dir()?;
std::fs::create_dir_all(&dir)?;
let now = now_unix_ms();
let seq = SPILL_SEQ.fetch_add(1, Ordering::Relaxed);
let path = dir.join(format!("{}-{}-{}.json", now, std::process::id(), seq));
let record = HookSpillRecord {
version: SPILL_RECORD_VERSION,
kind: kind.to_owned(),
payload_json: payload_json.to_owned(),
created_at_ms: now,
last_error: last_error.map(|e| outbox_core::truncate(&e, 2048)),
};
let json = serde_json::to_vec_pretty(&record)?;
crate::infra::files::write_atomic(&path, &json)?;
Ok(path)
}
pub fn hook_spill_stats() -> crate::Result<HookSpillStats> {
let dir = hook_spill_dir()?;
let mut stats = HookSpillStats {
path: Some(dir.clone()),
..HookSpillStats::default()
};
if !dir.exists() {
return Ok(stats);
}
for entry in std::fs::read_dir(&dir)? {
let entry = entry?;
let path = entry.path();
if !is_spill_json(&path) {
continue;
}
let meta = entry.metadata()?;
stats.count += 1;
stats.bytes = stats.bytes.saturating_add(meta.len());
if let Ok(raw) = std::fs::read_to_string(&path)
&& let Ok(record) = serde_json::from_str::<HookSpillRecord>(&raw)
{
stats.oldest_created_at_ms = Some(match stats.oldest_created_at_ms {
Some(current) => current.min(record.created_at_ms),
None => record.created_at_ms,
});
let is_newest = stats
.newest_created_at_ms
.is_none_or(|current| record.created_at_ms >= current);
if is_newest {
stats.newest_created_at_ms = Some(record.created_at_ms);
stats.newest_error = record.last_error;
}
}
}
Ok(stats)
}
pub async fn replay_spilled_observations(
queue: &OutboxQueue,
max_items: usize,
) -> crate::Result<HookSpillReplayReport> {
let dir = hook_spill_dir()?;
if max_items == 0 || !dir.exists() {
return Ok(HookSpillReplayReport::default());
}
let mut files = Vec::new();
for entry in std::fs::read_dir(&dir)? {
let entry = entry?;
let path = entry.path();
if is_spill_json(&path) {
files.push(path);
}
}
files.sort();
let mut report = HookSpillReplayReport::default();
for path in files.into_iter().take(max_items) {
report.attempted += 1;
let Ok(raw) = std::fs::read_to_string(&path) else {
report.failed += 1;
continue;
};
let Ok(record) = serde_json::from_str::<HookSpillRecord>(&raw) else {
quarantine_bad_spill(&path);
report.failed += 1;
continue;
};
if record.kind != kind::OBSERVATION {
quarantine_bad_spill(&path);
report.failed += 1;
continue;
}
match queue
.enqueue_with_outcome(&record.kind, &record.payload_json)
.await
{
Ok(EnqueueOutcome::Inserted { .. }) => {
let _ = std::fs::remove_file(&path);
report.replayed += 1;
}
Ok(EnqueueOutcome::CaptureDisabled) | Err(_) => {
report.failed += 1;
break;
}
}
}
Ok(report)
}
fn is_spill_json(path: &Path) -> bool {
path.extension().and_then(|e| e.to_str()) == Some("json")
}
fn quarantine_bad_spill(path: &Path) {
let bad_path = path.with_extension("bad");
let _ = std::fs::rename(path, bad_path);
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct OutboxItem {
pub id: i64,
pub kind: String,
pub payload_json: String,
pub retry_count: i64,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum EnqueueOutcome {
Inserted { row_id: i64 },
CaptureDisabled,
}
impl EnqueueOutcome {
const fn row_id_or_zero(self) -> i64 {
match self {
Self::Inserted { row_id } => row_id,
Self::CaptureDisabled => 0,
}
}
}
fn outbox_item_from_row(row: &sqlx::sqlite::SqliteRow) -> OutboxItem {
OutboxItem {
id: row.try_get("id").unwrap_or_default(),
kind: row.try_get("kind").unwrap_or_default(),
payload_json: row.try_get("payload_json").unwrap_or_default(),
retry_count: row.try_get("retry_count").unwrap_or_default(),
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CircuitState {
Closed,
Open { until_unix_ms: i64 },
}
#[derive(Debug, Clone)]
pub struct OutboxQueue {
pool: SqlitePool,
consecutive_failures: Arc<AtomicU32>,
circuit_open_until_ms: Arc<AtomicI64>,
}
impl OutboxQueue {
pub fn new(pool: SqlitePool) -> Self {
Self {
pool,
consecutive_failures: Arc::new(AtomicU32::new(0)),
circuit_open_until_ms: Arc::new(AtomicI64::new(0)),
}
}
pub async fn enqueue_with_outcome(
&self,
kind: &str,
payload_json: &str,
) -> crate::Result<EnqueueOutcome, sqlx::Error> {
if !crate::cloud::capture::capture_enabled() {
return Ok(EnqueueOutcome::CaptureDisabled);
}
let now = now_unix_ms();
let result = sqlx::query!(
"INSERT INTO cloud_outbox (kind, payload_json, status, retry_count, created_at) \
VALUES (?1, ?2, 'pending', 0, ?3)",
kind,
payload_json,
now
)
.execute(&self.pool)
.await?;
Ok(EnqueueOutcome::Inserted {
row_id: result.last_insert_rowid(),
})
}
pub async fn enqueue(&self, kind: &str, payload_json: &str) -> crate::Result<i64, sqlx::Error> {
Ok(self
.enqueue_with_outcome(kind, payload_json)
.await?
.row_id_or_zero())
}
pub fn circuit_state(&self) -> CircuitState {
let until = self.circuit_open_until_ms.load(Ordering::SeqCst);
if until == 0 {
return CircuitState::Closed;
}
if now_unix_ms() >= until {
self.circuit_open_until_ms.store(0, Ordering::SeqCst);
CircuitState::Closed
} else {
CircuitState::Open {
until_unix_ms: until,
}
}
}
pub async fn claim_next(&self) -> Result<Option<OutboxItem>, sqlx::Error> {
if matches!(self.circuit_state(), CircuitState::Open { .. }) {
return Ok(None);
}
let now = now_unix_ms();
let stale_cutoff = now - (DEFAULT_STALE_SECONDS as i64) * 1000;
let row = sqlx::query(
r"UPDATE cloud_outbox
SET status = 'processing', claimed_at = ?1
WHERE id = (
SELECT id FROM cloud_outbox
WHERE (status = 'pending'
OR (status = 'processing'
AND claimed_at IS NOT NULL
AND claimed_at < ?2))
AND (
kind != ?3
OR (
json_valid(payload_json)
AND LOWER(COALESCE(json_extract(payload_json, ?4), '')) = ?5
)
)
ORDER BY created_at ASC, id ASC
LIMIT 1
)
RETURNING id, kind, payload_json, retry_count",
)
.bind(now)
.bind(stale_cutoff)
.bind(kind::SESSION_MINED_CANDIDATE)
.bind(SESSION_MINED_LOCAL_REVIEW_STATUS_PATH)
.bind(SESSION_MINED_LOCAL_REVIEW_APPROVED)
.fetch_optional(&self.pool)
.await?;
Ok(row.as_ref().map(outbox_item_from_row))
}
pub async fn claim_next_kind(&self, kind: &str) -> Result<Option<OutboxItem>, sqlx::Error> {
if matches!(self.circuit_state(), CircuitState::Open { .. }) {
return Ok(None);
}
let now = now_unix_ms();
let stale_cutoff = now - (DEFAULT_STALE_SECONDS as i64) * 1000;
let row = sqlx::query(
r"UPDATE cloud_outbox
SET status = 'processing', claimed_at = ?1
WHERE id = (
SELECT id FROM cloud_outbox
WHERE kind = ?3
AND (status = 'pending'
OR (status = 'processing'
AND claimed_at IS NOT NULL
AND claimed_at < ?2))
AND (
?3 != ?4
OR (
json_valid(payload_json)
AND LOWER(COALESCE(json_extract(payload_json, ?5), '')) = ?6
)
)
ORDER BY created_at ASC, id ASC
LIMIT 1
)
RETURNING id, kind, payload_json, retry_count",
)
.bind(now)
.bind(stale_cutoff)
.bind(kind)
.bind(kind::SESSION_MINED_CANDIDATE)
.bind(SESSION_MINED_LOCAL_REVIEW_STATUS_PATH)
.bind(SESSION_MINED_LOCAL_REVIEW_APPROVED)
.fetch_optional(&self.pool)
.await?;
Ok(row.as_ref().map(outbox_item_from_row))
}
pub async fn confirm(&self, id: i64) -> Result<(), sqlx::Error> {
sqlx::query!("DELETE FROM cloud_outbox WHERE id = ?1", id)
.execute(&self.pool)
.await?;
self.consecutive_failures.store(0, Ordering::SeqCst);
Ok(())
}
pub async fn mark_failed(&self, id: i64, err: &str) -> Result<(), sqlx::Error> {
let err_trimmed: String = outbox_core::truncate(err, 2048);
let current = sqlx::query!(
"SELECT retry_count, status FROM cloud_outbox WHERE id = ?1",
id
)
.fetch_optional(&self.pool)
.await?;
let Some(row) = current else {
return Ok(());
};
let (new_status, new_count) = match outbox_core::decide_retry(row.retry_count) {
RetryDecision::Retry { next_count } => ("pending", next_count),
RetryDecision::Abandon { next_count } => ("abandoned", next_count),
};
sqlx::query!(
"UPDATE cloud_outbox \
SET status = ?1, retry_count = ?2, last_error = ?3, claimed_at = NULL \
WHERE id = ?4",
new_status,
new_count,
err_trimmed,
id
)
.execute(&self.pool)
.await?;
let prev = self.consecutive_failures.fetch_add(1, Ordering::SeqCst);
if prev + 1 >= CIRCUIT_FAILURE_THRESHOLD {
let until = now_unix_ms() + CIRCUIT_OPEN_DURATION_MS;
self.circuit_open_until_ms.store(until, Ordering::SeqCst);
}
Ok(())
}
pub async fn reset_stale(&self, threshold_secs: u64) -> Result<u64, sqlx::Error> {
let cutoff = now_unix_ms() - (threshold_secs as i64) * 1000;
let result = sqlx::query!(
"UPDATE cloud_outbox \
SET status = 'pending', claimed_at = NULL \
WHERE status = 'processing' AND claimed_at IS NOT NULL AND claimed_at < ?1",
cutoff
)
.execute(&self.pool)
.await?;
Ok(result.rows_affected())
}
pub async fn pending_counts_by_kind(&self) -> Result<Vec<(String, i64)>, sqlx::Error> {
let rows = sqlx::query(
"SELECT kind, COUNT(*) AS c \
FROM cloud_outbox \
WHERE status = 'pending' \
AND (
kind != ?1
OR (
json_valid(payload_json)
AND LOWER(COALESCE(json_extract(payload_json, ?2), '')) = ?3
)
) \
GROUP BY kind",
)
.bind(kind::SESSION_MINED_CANDIDATE)
.bind(SESSION_MINED_LOCAL_REVIEW_STATUS_PATH)
.bind(SESSION_MINED_LOCAL_REVIEW_APPROVED)
.fetch_all(&self.pool)
.await?;
let mut out: Vec<(String, i64)> = rows
.into_iter()
.map(|r| {
let kind: String = Row::try_get(&r, "kind").unwrap_or_default();
let count: i64 = Row::try_get(&r, "c").unwrap_or_default();
(kind, count)
})
.collect();
out.sort_by(|a, b| a.0.cmp(&b.0));
Ok(out)
}
pub async fn drain_abandoned_older_than(
&self,
cutoff_unix_ms: i64,
dry_run: bool,
) -> Result<DrainSummary, sqlx::Error> {
let mut tx = self.pool.begin().await?;
let rows = sqlx::query(
"SELECT kind, COUNT(*) AS c \
FROM cloud_outbox \
WHERE status = 'abandoned' \
AND COALESCE(claimed_at, created_at) < ?1 \
GROUP BY kind",
)
.bind(cutoff_unix_ms)
.fetch_all(&mut *tx)
.await?;
let mut summary = DrainSummary::default();
for row in rows {
let kind: String = Row::try_get(&row, "kind").unwrap_or_default();
let count: i64 = Row::try_get(&row, "c").unwrap_or_default();
summary.per_kind.push((kind, count));
summary.total += count;
}
summary.per_kind.sort_by(|a, b| a.0.cmp(&b.0));
if dry_run || summary.total == 0 {
tx.rollback().await?;
return Ok(summary);
}
let affected = sqlx::query(
"UPDATE cloud_outbox \
SET status = 'pending', \
retry_count = 0, \
last_error = NULL, \
claimed_at = NULL \
WHERE status = 'abandoned' \
AND COALESCE(claimed_at, created_at) < ?1",
)
.bind(cutoff_unix_ms)
.execute(&mut *tx)
.await?;
tx.commit().await?;
self.consecutive_failures.store(0, Ordering::SeqCst);
self.circuit_open_until_ms.store(0, Ordering::SeqCst);
let affected = i64::try_from(affected.rows_affected()).unwrap_or(summary.total);
summary.total = affected;
Ok(summary)
}
pub async fn counts(&self) -> Result<OutboxCounts, sqlx::Error> {
let rows = sqlx::query!(
r#"SELECT status, COUNT(*) as "c!: i64" FROM cloud_outbox GROUP BY status"#
)
.fetch_all(&self.pool)
.await?;
let mut out = OutboxCounts::default();
for r in rows {
let status: String = r.status;
let count: i64 = r.c;
match status.as_str() {
"pending" => out.pending = count,
"processing" => out.processing = count,
"failed" => out.failed = count,
"abandoned" => out.abandoned = count,
_ => {}
}
}
Ok(out)
}
}
#[derive(Debug, Clone, Copy, Default)]
pub struct OutboxCounts {
pub pending: i64,
pub processing: i64,
pub failed: i64,
pub abandoned: i64,
}
#[derive(Debug, Default, Clone, PartialEq, Eq)]
pub struct DrainSummary {
pub total: i64,
pub per_kind: Vec<(String, i64)>,
}
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
pub struct AcceptedEditAttributionSummary {
pub uploaded: usize,
pub launch_grade: usize,
pub missing_team_workspace: usize,
pub missing_rule_ids: usize,
pub unlinked_rule_observations: usize,
}
impl AcceptedEditAttributionSummary {
pub const fn warning_count(self) -> usize {
self.missing_team_workspace + self.missing_rule_ids + self.unlinked_rule_observations
}
pub const fn add(&mut self, other: Self) {
self.uploaded += other.uploaded;
self.launch_grade += other.launch_grade;
self.missing_team_workspace += other.missing_team_workspace;
self.missing_rule_ids += other.missing_rule_ids;
self.unlinked_rule_observations += other.unlinked_rule_observations;
}
}
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
pub struct OutboxDrainReport {
pub attempted: usize,
pub confirmed: usize,
pub accepted_edit_attribution: AcceptedEditAttributionSummary,
}
#[derive(Debug)]
struct DispatchOutcome {
ok: bool,
accepted_edit_attribution: Option<AcceptedEditAttributionSummary>,
last_error: Option<String>,
}
impl DispatchOutcome {
const fn ok(ok: bool) -> Self {
Self {
ok,
accepted_edit_attribution: None,
last_error: None,
}
}
const fn failed_with(last_error: String) -> Self {
Self {
ok: false,
accepted_edit_attribution: None,
last_error: Some(last_error),
}
}
fn from_outbox_failure(failure: &super::client::OutboxFailure) -> Self {
Self::failed_with(failure.format_for_outbox_last_error())
}
}
fn accepted_edit_attribution_summary(
expected_rule_ids: usize,
response: &crate::contract::RecordAcceptedEditResponse,
) -> AcceptedEditAttributionSummary {
let mut summary = AcceptedEditAttributionSummary {
uploaded: usize::from(response.acceptance_recorded),
launch_grade: 0,
missing_team_workspace: 0,
missing_rule_ids: 0,
unlinked_rule_observations: 0,
};
if response.acceptance_recorded {
if expected_rule_ids == 0 {
summary.missing_rule_ids = 1;
}
if response.team_id.is_none() {
summary.missing_team_workspace = 1;
}
if expected_rule_ids > 0 && response.observations_inserted == 0 {
summary.unlinked_rule_observations = 1;
}
if summary.warning_count() == 0 {
summary.launch_grade = 1;
}
}
summary
}
pub mod kind {
pub const TRAJECTORY: &str = "trajectory";
pub const REVIEW_METRICS: &str = "review_metrics";
pub const ACCEPTED_EDIT: &str = "accepted_edit";
pub const LEGACY_FIX_ACCEPTANCE: &str = "fix_acceptance";
pub const MCP_QUERY: &str = "mcp_query";
pub const IMPORTED_REVIEWS: &str = "imported_reviews";
pub const OBSERVATION: &str = "observation";
pub const SESSION_MINED_CANDIDATE: &str = "session_mined_candidate";
}
pub async fn drain_outbox(
queue: &OutboxQueue,
client: &super::client::CloudClient,
max_items: usize,
) -> Result<(usize, usize), sqlx::Error> {
let report = drain_outbox_report(queue, client, max_items).await?;
Ok((report.attempted, report.confirmed))
}
pub async fn drain_outbox_report(
queue: &OutboxQueue,
client: &super::client::CloudClient,
max_items: usize,
) -> Result<OutboxDrainReport, sqlx::Error> {
if !client.is_logged_in() {
return Ok(OutboxDrainReport::default());
}
let _drain_guard = drain_serialization_lock().lock().await;
let report = drain_rows(queue, client, None, max_items).await?;
write_drain_state(&report, None);
Ok(report)
}
async fn drain_rows(
queue: &OutboxQueue,
client: &super::client::CloudClient,
kind: Option<&str>,
max_items: usize,
) -> Result<OutboxDrainReport, sqlx::Error> {
let mut attempted = 0usize;
let mut confirmed = 0usize;
let mut accepted_edit_attribution = AcceptedEditAttributionSummary::default();
for _ in 0..max_items {
if matches!(queue.circuit_state(), CircuitState::Open { .. }) {
break;
}
let claimed = match kind {
Some(kind) => queue.claim_next_kind(kind).await?,
None => queue.claim_next().await?,
};
let Some(item) = claimed else {
break;
};
attempted += 1;
let outcome = match dispatch(client, &item).await {
Ok(outcome) => outcome,
Err(err) => {
let _ = queue.mark_failed(item.id, &err.to_string()).await;
continue;
}
};
if outcome.ok {
queue.confirm(item.id).await?;
confirmed += 1;
if let Some(summary) = outcome.accepted_edit_attribution {
accepted_edit_attribution.add(summary);
}
} else {
let err_msg = outcome
.last_error
.as_deref()
.unwrap_or("upload returned non-2xx (no detail)");
let _ = queue.mark_failed(item.id, err_msg).await;
}
}
Ok(OutboxDrainReport {
attempted,
confirmed,
accepted_edit_attribution,
})
}
pub async fn drain_outbox_kind(
queue: &OutboxQueue,
client: &super::client::CloudClient,
kind: &str,
max_items: usize,
) -> Result<(usize, usize), sqlx::Error> {
let report = drain_outbox_kind_report(queue, client, kind, max_items).await?;
Ok((report.attempted, report.confirmed))
}
pub async fn drain_outbox_kind_report(
queue: &OutboxQueue,
client: &super::client::CloudClient,
kind: &str,
max_items: usize,
) -> Result<OutboxDrainReport, sqlx::Error> {
if !client.is_logged_in() {
return Ok(OutboxDrainReport::default());
}
let _drain_guard = drain_serialization_lock().lock().await;
if kind == kind::OBSERVATION {
let report = drain_observation_outbox_kind_report(queue, client, max_items).await?;
write_drain_state(&report, None);
return Ok(report);
}
let report = drain_rows(queue, client, Some(kind), max_items).await?;
write_drain_state(&report, None);
Ok(report)
}
async fn drain_observation_outbox_kind_report(
queue: &OutboxQueue,
client: &super::client::CloudClient,
max_items: usize,
) -> Result<OutboxDrainReport, sqlx::Error> {
let mut attempted = 0usize;
let mut confirmed = 0usize;
while attempted < max_items {
if matches!(queue.circuit_state(), CircuitState::Open { .. }) {
break;
}
let limit = (max_items - attempted).min(OBSERVATION_OUTBOX_BATCH_SIZE);
let mut ids = Vec::with_capacity(limit);
let mut observations = Vec::with_capacity(limit);
let mut claimed_any = false;
for _ in 0..limit {
let Some(item) = queue.claim_next_kind(kind::OBSERVATION).await? else {
break;
};
claimed_any = true;
attempted += 1;
match serde_json::from_str::<crate::contract::Observation>(&item.payload_json) {
Ok(obs) => {
ids.push(item.id);
observations.push(obs);
}
Err(e) => {
let _ = queue
.mark_failed(item.id, &format!("observation parse: {e}"))
.await;
}
}
}
if observations.is_empty() {
if !claimed_any {
break;
}
continue;
}
match client.post_observations_outcome(&observations).await {
Ok(()) => {
for id in ids {
queue.confirm(id).await?;
confirmed += 1;
}
}
Err(failure) => {
let err_msg = failure.format_for_outbox_last_error();
for id in ids {
let _ = queue.mark_failed(id, &err_msg).await;
}
break;
}
}
}
Ok(OutboxDrainReport {
attempted,
confirmed,
accepted_edit_attribution: AcceptedEditAttributionSummary::default(),
})
}
async fn dispatch(
client: &super::client::CloudClient,
item: &OutboxItem,
) -> crate::Result<DispatchOutcome> {
use crate::cloud::session_mined::SessionMinedCandidate;
use crate::contract::{
RecordAcceptedEditRequest, RecordReviewMetricsRequest, UploadImportedReviewsRequest,
};
use serde_json::Value;
match item.kind.as_str() {
kind::TRAJECTORY => {
let v: Value = serde_json::from_str(&item.payload_json)
.map_err(|e| crate::CoreError::internal(format!("trajectory parse: {e}")))?;
let pr_review_id = v
.get("pr_review_id")
.and_then(|x| x.as_str())
.ok_or_else(|| crate::CoreError::internal("trajectory missing pr_review_id"))?;
let steps = v.get("steps").cloned().unwrap_or(Value::Array(Vec::new()));
Ok(
match client.save_trajectory_outcome(pr_review_id, steps).await {
Ok(()) => DispatchOutcome::ok(true),
Err(failure) => DispatchOutcome::from_outbox_failure(&failure),
},
)
}
kind::REVIEW_METRICS => {
let v: Value = serde_json::from_str(&item.payload_json)
.map_err(|e| crate::CoreError::internal(format!("review_metrics parse: {e}")))?;
let review_id = v
.get("review_id")
.and_then(|x| x.as_str())
.ok_or_else(|| crate::CoreError::internal("review_metrics missing review_id"))?
.to_owned();
let req_val = v
.get("req")
.cloned()
.unwrap_or(Value::Object(serde_json::Map::default()));
let req: RecordReviewMetricsRequest = serde_json::from_value(req_val).map_err(|e| {
crate::CoreError::internal(format!("review_metrics decode req: {e}"))
})?;
Ok(
match client.record_review_metrics_outcome(&review_id, req).await {
Ok(()) => DispatchOutcome::ok(true),
Err(failure) => DispatchOutcome::from_outbox_failure(&failure),
},
)
}
kind::ACCEPTED_EDIT => {
let req: RecordAcceptedEditRequest = serde_json::from_str(&item.payload_json)
.map_err(|e| crate::CoreError::internal(format!("accepted_edit parse: {e}")))?;
let expected_rule_ids = req
.rule_ids
.iter()
.filter(|rule_id| !rule_id.trim().is_empty())
.count();
let response = client.record_accepted_edit_response(req).await?;
let summary = accepted_edit_attribution_summary(expected_rule_ids, &response);
let last_error = if response.acceptance_recorded {
None
} else {
Some(format!(
"accepted_edit rejected: {}",
response.error.as_deref().unwrap_or("no detail")
))
};
Ok(DispatchOutcome {
ok: response.acceptance_recorded,
accepted_edit_attribution: Some(summary),
last_error,
})
}
kind::LEGACY_FIX_ACCEPTANCE => {
Ok(DispatchOutcome::ok(true))
}
kind::MCP_QUERY => {
let v: Value = serde_json::from_str(&item.payload_json)
.map_err(|e| crate::CoreError::internal(format!("mcp_query parse: {e}")))?;
let file = v
.get("file")
.and_then(|x| x.as_str())
.unwrap_or("")
.to_owned();
let intent = v
.get("intent")
.and_then(|x| x.as_str())
.map(ToOwned::to_owned);
let rules_injected = v
.get("rules_injected")
.and_then(Value::as_u64)
.and_then(|n| usize::try_from(n).ok())
.unwrap_or(0);
let strict_match_count = v
.get("strict_match_count")
.and_then(Value::as_u64)
.and_then(|n| usize::try_from(n).ok())
.unwrap_or(0);
let rule_titles: Vec<String> = v
.get("rule_titles")
.and_then(|x| x.as_array())
.map(|arr| {
arr.iter()
.filter_map(|t| t.as_str().map(String::from))
.collect()
})
.unwrap_or_default();
let rule_ids: Vec<String> = v
.get("rule_ids")
.and_then(|x| x.as_array())
.map(|arr| {
arr.iter()
.filter_map(|t| t.as_str().map(String::from))
.collect()
})
.unwrap_or_default();
let client_label = v
.get("client_label")
.and_then(|x| x.as_str())
.map(ToOwned::to_owned);
let repo_full_name = v
.get("repo_full_name")
.and_then(|x| x.as_str())
.map(ToOwned::to_owned);
Ok(
match client
.track_mcp_query_outcome(
&file,
intent.as_deref(),
rules_injected,
strict_match_count,
rule_titles,
rule_ids,
client_label.as_deref(),
repo_full_name.as_deref(),
)
.await
{
Ok(()) => DispatchOutcome::ok(true),
Err(failure) => DispatchOutcome::from_outbox_failure(&failure),
},
)
}
kind::IMPORTED_REVIEWS => {
let req: UploadImportedReviewsRequest = serde_json::from_str(&item.payload_json)
.map_err(|e| crate::CoreError::internal(format!("imported_reviews parse: {e}")))?;
Ok(match client.upload_imported_reviews_outcome(&req).await {
Ok(()) => DispatchOutcome::ok(true),
Err(failure) => DispatchOutcome::from_outbox_failure(&failure),
})
}
kind::OBSERVATION => {
let obs: crate::contract::Observation = serde_json::from_str(&item.payload_json)
.map_err(|e| crate::CoreError::internal(format!("observation parse: {e}")))?;
Ok(
match client
.post_observations_outcome(std::slice::from_ref(&obs))
.await
{
Ok(()) => DispatchOutcome::ok(true),
Err(failure) => DispatchOutcome::from_outbox_failure(&failure),
},
)
}
kind::SESSION_MINED_CANDIDATE => {
let candidate: SessionMinedCandidate = serde_json::from_str(&item.payload_json)
.map_err(|e| {
crate::CoreError::internal(format!("session_mined_candidate parse: {e}"))
})?;
candidate.validate().map_err(|e| {
crate::CoreError::internal(format!("session_mined_candidate validate: {e}"))
})?;
Ok(
match client
.post_session_mined_candidate_outcome(&candidate)
.await
{
Ok(()) => DispatchOutcome::ok(true),
Err(failure) => DispatchOutcome::from_outbox_failure(&failure),
},
)
}
other => Err(crate::CoreError::internal(format!(
"unknown outbox kind '{other}'"
))),
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::contract::RecordAcceptedEditResponse;
use sqlx::sqlite::{SqliteConnectOptions, SqlitePoolOptions};
async fn fresh_pool() -> SqlitePool {
let opts = SqliteConnectOptions::new()
.filename(":memory:")
.create_if_missing(true);
let pool = SqlitePoolOptions::new()
.max_connections(1)
.connect_with(opts)
.await
.expect("pool");
sqlx::migrate!("./migrations")
.run(&pool)
.await
.expect("apply migrations");
pool
}
async fn status_of(pool: &SqlitePool, id: i64) -> Option<String> {
sqlx::query_scalar!("SELECT status FROM cloud_outbox WHERE id = ?1", id)
.fetch_optional(pool)
.await
.unwrap()
}
fn accepted_edit_response(
acceptance_recorded: bool,
team_id: Option<&str>,
observations_inserted: u32,
) -> RecordAcceptedEditResponse {
RecordAcceptedEditResponse {
ok: acceptance_recorded,
acceptance_recorded,
acceptance_id: acceptance_recorded.then(|| "acceptance-1".to_owned()),
diff_signature: Some("diff-1".to_owned()),
team_id: team_id.map(str::to_owned),
attributed_rule_ids: Vec::new(),
observations_inserted,
memory_reinforcement_recorded: false,
memory_reinforcement_deduped: false,
error: None,
}
}
#[test]
fn accepted_edit_attribution_summary_counts_launch_grade_uploads() {
let response = accepted_edit_response(true, Some("team-1"), 2);
let summary = accepted_edit_attribution_summary(2, &response);
assert_eq!(summary.uploaded, 1);
assert_eq!(summary.launch_grade, 1);
assert_eq!(summary.warning_count(), 0);
}
#[test]
fn accepted_edit_attribution_summary_flags_raw_only_uploads() {
let missing_team =
accepted_edit_attribution_summary(2, &accepted_edit_response(true, None, 2));
assert_eq!(missing_team.uploaded, 1);
assert_eq!(missing_team.launch_grade, 0);
assert_eq!(missing_team.missing_team_workspace, 1);
let missing_rule_ids =
accepted_edit_attribution_summary(0, &accepted_edit_response(true, Some("team-1"), 0));
assert_eq!(missing_rule_ids.missing_rule_ids, 1);
assert_eq!(missing_rule_ids.launch_grade, 0);
let unlinked_observation =
accepted_edit_attribution_summary(2, &accepted_edit_response(true, Some("team-1"), 0));
assert_eq!(unlinked_observation.unlinked_rule_observations, 1);
assert_eq!(unlinked_observation.launch_grade, 0);
}
#[test]
fn accepted_edit_attribution_summary_ignores_failed_uploads() {
let response = accepted_edit_response(false, None, 0);
let summary = accepted_edit_attribution_summary(0, &response);
assert_eq!(summary.uploaded, 0);
assert_eq!(summary.launch_grade, 0);
assert_eq!(summary.warning_count(), 0);
}
#[test]
fn drain_state_round_trips_to_data_home() {
let _home = crate::infra::db::shared_test_home();
let path = drain_state_path().expect("drain state path");
let _ = std::fs::remove_file(&path);
let report = OutboxDrainReport {
attempted: 3,
confirmed: 2,
accepted_edit_attribution: AcceptedEditAttributionSummary::default(),
};
write_drain_state(&report, Some("transport: offline".to_owned()));
let state = read_drain_state()
.expect("read drain state")
.expect("state should exist");
assert_eq!(state.attempted, 3);
assert_eq!(state.confirmed, 2);
assert_eq!(state.last_error.as_deref(), Some("transport: offline"));
assert!(state.last_drain_at_ms > 0);
}
#[tokio::test]
async fn hook_spill_replays_into_cloud_outbox_and_removes_spill_file() {
let _home = crate::infra::db::shared_test_home();
let capture_enabled = crate::cloud::capture::capture_enabled();
let spill_dir = hook_spill_dir().expect("spill dir");
let _ = std::fs::remove_dir_all(&spill_dir);
let pool = fresh_pool().await;
let queue = OutboxQueue::new(pool.clone());
let path = spill_observation_payload(r#"{"session_id":"s1"}"#, "db locked")
.expect("spill payload");
assert!(path.exists(), "spill file should be durable");
let stats = hook_spill_stats().expect("spill stats");
assert_eq!(stats.count, 1);
assert_eq!(stats.newest_error.as_deref(), Some("db locked"));
let report = replay_spilled_observations(&queue, 8)
.await
.expect("replay spill");
assert_eq!(report.attempted, 1);
assert_eq!(report.replayed, 1);
assert_eq!(report.failed, 0);
assert!(!path.exists(), "successful replay should remove spill file");
let rows: i64 =
sqlx::query_scalar("SELECT COUNT(*) FROM cloud_outbox WHERE kind = 'observation'")
.fetch_one(&pool)
.await
.expect("count rows");
assert_eq!(rows, i64::from(capture_enabled));
assert_eq!(hook_spill_stats().expect("spill stats").count, 0);
}
#[tokio::test]
async fn legacy_fix_acceptance_dispatch_skips_current_accepted_edit_pipeline() {
let client = crate::cloud::client::CloudClient::new();
let item = OutboxItem {
id: 1,
kind: kind::LEGACY_FIX_ACCEPTANCE.to_owned(),
payload_json: "not accepted-edit json".to_owned(),
retry_count: 0,
};
let outcome = dispatch(&client, &item)
.await
.expect("legacy rows are explicitly acknowledged and skipped");
assert!(outcome.ok);
assert_eq!(outcome.accepted_edit_attribution, None);
}
#[tokio::test]
async fn session_mined_candidate_dispatch_is_a_known_outbox_kind() {
let client = crate::cloud::client::CloudClient::new();
let candidate = crate::cloud::session_mined::SessionMinedCandidate::try_new(
crate::cloud::session_mined::SessionMinedCandidateArgs {
session_id: "sess_test".to_owned(),
ts_ms: 1_719_000_000_000,
source_repo: crate::infra::git::RepoScope::github("acme/widgets")
.expect("valid repo scope"),
title: "Validate webhook signatures".to_owned(),
body: "Always verify webhook signatures before parsing payloads.".to_owned(),
file_patterns: vec!["src/webhooks/**/*.ts".to_owned()],
gate_model: "codex:local".to_owned(),
gate_verdict: "KEEP".to_owned(),
},
)
.expect("valid candidate");
let item = OutboxItem {
id: 1,
kind: kind::SESSION_MINED_CANDIDATE.to_owned(),
payload_json: serde_json::to_string(&candidate).expect("serialize candidate"),
retry_count: 0,
};
let outcome = dispatch(&client, &item)
.await
.expect("session-mined candidates must have a dispatch arm");
assert!(!outcome.ok);
assert!(
outcome
.last_error
.as_deref()
.is_some_and(|err| err.contains("not logged in")),
"logged-out dispatch should fail via CloudClient, not unknown kind: {outcome:?}"
);
}
#[tokio::test]
async fn enqueue_then_claim_moves_to_processing() {
let pool = fresh_pool().await;
let q = OutboxQueue::new(pool.clone());
let id = q.enqueue("trajectory", "{}").await.unwrap();
assert_eq!(status_of(&pool, id).await.as_deref(), Some("pending"));
let item = q.claim_next().await.unwrap().expect("row claimed");
assert_eq!(item.id, id);
assert_eq!(status_of(&pool, id).await.as_deref(), Some("processing"));
}
#[tokio::test]
async fn claim_next_kind_prioritizes_matching_kind() {
let pool = fresh_pool().await;
let q = OutboxQueue::new(pool.clone());
let old_fix = q.enqueue(kind::ACCEPTED_EDIT, "{}").await.unwrap();
let mcp = q.enqueue(kind::MCP_QUERY, "{}").await.unwrap();
let item = q
.claim_next_kind(kind::MCP_QUERY)
.await
.unwrap()
.expect("mcp row claimed");
assert_eq!(item.id, mcp);
assert_eq!(status_of(&pool, mcp).await.as_deref(), Some("processing"));
assert_eq!(status_of(&pool, old_fix).await.as_deref(), Some("pending"));
}
#[tokio::test]
async fn claim_next_kind_skips_unapproved_session_mined_candidates() {
let pool = fresh_pool().await;
let q = OutboxQueue::new(pool.clone());
let unapproved = q
.enqueue(
kind::SESSION_MINED_CANDIDATE,
r#"{"content_hash":"unapproved"}"#,
)
.await
.unwrap();
let approved = q
.enqueue(
kind::SESSION_MINED_CANDIDATE,
r#"{"content_hash":"approved","localReview":{"status":"approved"}}"#,
)
.await
.unwrap();
let counts = q.pending_counts_by_kind().await.unwrap();
assert_eq!(counts, vec![(kind::SESSION_MINED_CANDIDATE.to_owned(), 1)]);
let item = q
.claim_next_kind(kind::SESSION_MINED_CANDIDATE)
.await
.unwrap()
.expect("approved session candidate claimed");
assert_eq!(item.id, approved);
assert_eq!(
status_of(&pool, unapproved).await.as_deref(),
Some("pending")
);
assert!(
q.claim_next_kind(kind::SESSION_MINED_CANDIDATE)
.await
.unwrap()
.is_none()
);
}
#[tokio::test]
async fn claim_next_skips_unapproved_session_mined_candidates() {
let pool = fresh_pool().await;
let q = OutboxQueue::new(pool.clone());
let session = q
.enqueue(
kind::SESSION_MINED_CANDIDATE,
r#"{"content_hash":"unapproved"}"#,
)
.await
.unwrap();
let trajectory = q.enqueue(kind::TRAJECTORY, "{}").await.unwrap();
let item = q
.claim_next()
.await
.unwrap()
.expect("non-session row claimed");
assert_eq!(item.id, trajectory);
assert_eq!(status_of(&pool, session).await.as_deref(), Some("pending"));
}
#[tokio::test]
async fn drain_serialization_lock_is_process_wide() {
let guard = drain_serialization_lock()
.try_lock()
.expect("first drain lock");
assert!(
drain_serialization_lock().try_lock().is_err(),
"concurrent drainers must share the same in-process lock"
);
drop(guard);
assert!(drain_serialization_lock().try_lock().is_ok());
}
#[tokio::test]
async fn confirm_deletes_row() {
let pool = fresh_pool().await;
let q = OutboxQueue::new(pool.clone());
let id = q.enqueue("trajectory", "{}").await.unwrap();
let item = q.claim_next().await.unwrap().unwrap();
q.confirm(item.id).await.unwrap();
assert!(status_of(&pool, id).await.is_none());
}
#[tokio::test]
async fn mark_failed_eight_times_abandons() {
let pool = fresh_pool().await;
let q = OutboxQueue::new(pool.clone());
let id = q.enqueue("trajectory", "{}").await.unwrap();
for attempt in 1..=7 {
q.circuit_open_until_ms.store(0, Ordering::SeqCst);
q.consecutive_failures.store(0, Ordering::SeqCst);
let item = q.claim_next().await.unwrap().unwrap();
q.mark_failed(item.id, &format!("net {attempt}"))
.await
.unwrap();
assert_eq!(
status_of(&pool, id).await.as_deref(),
Some("pending"),
"attempt {attempt}: retry_count {attempt} (< 8) must stay pending"
);
}
q.circuit_open_until_ms.store(0, Ordering::SeqCst);
q.consecutive_failures.store(0, Ordering::SeqCst);
let item = q.claim_next().await.unwrap().unwrap();
q.mark_failed(item.id, "net 8").await.unwrap();
assert_eq!(status_of(&pool, id).await.as_deref(), Some("abandoned"));
q.circuit_open_until_ms.store(0, Ordering::SeqCst);
q.consecutive_failures.store(0, Ordering::SeqCst);
assert!(q.claim_next().await.unwrap().is_none());
}
#[tokio::test]
async fn claim_next_auto_recovers_stale_processing_rows() {
let pool = fresh_pool().await;
let q = OutboxQueue::new(pool.clone());
let id = q.enqueue("trajectory", "{\"crashed\":true}").await.unwrap();
let first = q.claim_next().await.unwrap().expect("first claim");
assert_eq!(first.id, id);
assert_eq!(status_of(&pool, id).await.as_deref(), Some("processing"));
let stale = now_unix_ms() - (DEFAULT_STALE_SECONDS as i64 + 30) * 1000;
sqlx::query!(
"UPDATE cloud_outbox SET claimed_at = ?1 WHERE id = ?2",
stale,
id
)
.execute(&pool)
.await
.unwrap();
let recovered = q.claim_next().await.unwrap().expect("recovered claim");
assert_eq!(recovered.id, id, "stale row must be re-claimable");
assert_eq!(status_of(&pool, id).await.as_deref(), Some("processing"));
}
#[tokio::test]
async fn claim_next_ignores_fresh_processing_rows() {
let pool = fresh_pool().await;
let q = OutboxQueue::new(pool.clone());
let _fresh = q.enqueue("trajectory", "{}").await.unwrap();
let item = q.claim_next().await.unwrap().expect("initial claim");
assert!(q.claim_next().await.unwrap().is_none());
q.confirm(item.id).await.unwrap();
}
#[tokio::test]
async fn reset_stale_promotes_processing_rows() {
let pool = fresh_pool().await;
let q = OutboxQueue::new(pool.clone());
let id = q.enqueue("trajectory", "{}").await.unwrap();
let _ = q.claim_next().await.unwrap().unwrap();
assert_eq!(status_of(&pool, id).await.as_deref(), Some("processing"));
let backdated = now_unix_ms() - 120_000;
sqlx::query!(
"UPDATE cloud_outbox SET claimed_at = ?1 WHERE id = ?2",
backdated,
id
)
.execute(&pool)
.await
.unwrap();
let promoted = q.reset_stale(60).await.unwrap();
assert_eq!(promoted, 1);
assert_eq!(status_of(&pool, id).await.as_deref(), Some("pending"));
}
#[tokio::test]
async fn circuit_breaker_halts_claims_after_three_failures() {
let pool = fresh_pool().await;
let q = OutboxQueue::new(pool.clone());
for i in 0..4 {
q.enqueue("trajectory", &format!("{{\"i\":{i}}}"))
.await
.unwrap();
}
for _ in 0..3 {
let item = q.claim_next().await.unwrap().unwrap();
q.mark_failed(item.id, "x").await.unwrap();
}
assert!(matches!(q.circuit_state(), CircuitState::Open { .. }));
assert!(q.claim_next().await.unwrap().is_none());
}
#[tokio::test]
async fn confirm_resets_consecutive_failure_counter() {
let pool = fresh_pool().await;
let q = OutboxQueue::new(pool.clone());
let _id1 = q.enqueue("trajectory", "{}").await.unwrap();
let _id2 = q.enqueue("trajectory", "{}").await.unwrap();
let item = q.claim_next().await.unwrap().unwrap();
q.mark_failed(item.id, "f1").await.unwrap();
let item = q.claim_next().await.unwrap().unwrap();
q.mark_failed(item.id, "f2").await.unwrap();
assert_eq!(q.consecutive_failures.load(Ordering::SeqCst), 2);
let item = q.claim_next().await.unwrap().unwrap();
q.confirm(item.id).await.unwrap();
assert_eq!(q.consecutive_failures.load(Ordering::SeqCst), 0);
}
#[tokio::test]
async fn claim_next_returns_observation_kind() {
let pool = fresh_pool().await;
let q = OutboxQueue::new(pool.clone());
let obs_id = q
.enqueue(kind::OBSERVATION, r#"{"session_id":"s"}"#)
.await
.unwrap();
let traj_id = q.enqueue(kind::TRAJECTORY, "{}").await.unwrap();
let first = q.claim_next().await.unwrap().expect("claimed first");
let second = q.claim_next().await.unwrap().expect("claimed second");
assert_eq!(first.id, obs_id);
assert_eq!(first.kind, kind::OBSERVATION);
assert_eq!(second.id, traj_id);
}
#[tokio::test]
async fn claim_next_returns_oldest_first() {
let pool = fresh_pool().await;
let q = OutboxQueue::new(pool.clone());
let a = q.enqueue("trajectory", r#"{"n":"a"}"#).await.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(2)).await;
let b = q.enqueue("trajectory", r#"{"n":"b"}"#).await.unwrap();
let first = q.claim_next().await.unwrap().unwrap();
let second = q.claim_next().await.unwrap().unwrap();
assert_eq!(first.id, a);
assert_eq!(second.id, b);
}
async fn insert_abandoned(
pool: &SqlitePool,
kind: &str,
created_at_ms: i64,
claimed_at_ms: Option<i64>,
) -> i64 {
sqlx::query(
"INSERT INTO cloud_outbox \
(kind, payload_json, status, retry_count, created_at, claimed_at, last_error) \
VALUES (?1, '{}', 'abandoned', ?2, ?3, ?4, 'upload returned non-2xx')",
)
.bind(kind)
.bind(MAX_RETRY_COUNT)
.bind(created_at_ms)
.bind(claimed_at_ms)
.execute(pool)
.await
.unwrap()
.last_insert_rowid()
}
#[tokio::test]
async fn drain_abandoned_dry_run_reports_per_kind_without_mutating() {
let pool = fresh_pool().await;
let q = OutboxQueue::new(pool.clone());
let now = now_unix_ms();
let old = now - 31 * 86_400_000; let mcp_id = insert_abandoned(&pool, "mcp_query", old, Some(old)).await;
let obs_id = insert_abandoned(&pool, "observation", old, Some(old)).await;
let _other_mcp = insert_abandoned(&pool, "mcp_query", old, Some(old)).await;
let summary = q.drain_abandoned_older_than(now, true).await.unwrap();
assert_eq!(summary.total, 3);
assert_eq!(
summary.per_kind,
vec![("mcp_query".to_owned(), 2), ("observation".to_owned(), 1),]
);
assert_eq!(status_of(&pool, mcp_id).await.as_deref(), Some("abandoned"));
assert_eq!(status_of(&pool, obs_id).await.as_deref(), Some("abandoned"));
}
#[tokio::test]
async fn drain_abandoned_real_resets_eligible_rows_only() {
let pool = fresh_pool().await;
let q = OutboxQueue::new(pool.clone());
let now = now_unix_ms();
let old = now - 31 * 86_400_000;
let fresh = now - 60_000; let cutoff = now - 7 * 86_400_000;
let old_row = insert_abandoned(&pool, "mcp_query", old, Some(old)).await;
let fresh_row = insert_abandoned(&pool, "mcp_query", fresh, Some(fresh)).await;
q.consecutive_failures
.store(CIRCUIT_FAILURE_THRESHOLD, Ordering::SeqCst);
q.circuit_open_until_ms
.store(now + 60_000, Ordering::SeqCst);
let summary = q.drain_abandoned_older_than(cutoff, false).await.unwrap();
assert_eq!(summary.total, 1);
assert_eq!(status_of(&pool, old_row).await.as_deref(), Some("pending"));
assert_eq!(
status_of(&pool, fresh_row).await.as_deref(),
Some("abandoned"),
"rows newer than cutoff must NOT be touched",
);
let retry_count: i64 =
sqlx::query_scalar("SELECT retry_count FROM cloud_outbox WHERE id = ?1")
.bind(old_row)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(retry_count, 0);
let last_error: Option<String> =
sqlx::query_scalar("SELECT last_error FROM cloud_outbox WHERE id = ?1")
.bind(old_row)
.fetch_one(&pool)
.await
.unwrap();
assert!(last_error.is_none());
assert_eq!(q.consecutive_failures.load(Ordering::SeqCst), 0);
assert_eq!(q.circuit_open_until_ms.load(Ordering::SeqCst), 0);
}
#[tokio::test]
async fn drain_abandoned_uses_created_at_when_no_claimed_at() {
let pool = fresh_pool().await;
let q = OutboxQueue::new(pool.clone());
let now = now_unix_ms();
let old = now - 90 * 86_400_000;
let id = insert_abandoned(&pool, "observation", old, None).await;
let cutoff = now - 30 * 86_400_000;
let summary = q.drain_abandoned_older_than(cutoff, false).await.unwrap();
assert_eq!(summary.total, 1);
assert_eq!(status_of(&pool, id).await.as_deref(), Some("pending"));
}
#[tokio::test]
async fn drain_abandoned_empty_queue_is_a_noop() {
let pool = fresh_pool().await;
let q = OutboxQueue::new(pool.clone());
let summary = q
.drain_abandoned_older_than(now_unix_ms(), false)
.await
.unwrap();
assert_eq!(summary.total, 0);
assert!(summary.per_kind.is_empty());
}
#[tokio::test]
async fn pending_counts_by_kind_buckets_pending_rows() {
let pool = fresh_pool().await;
let q = OutboxQueue::new(pool.clone());
q.enqueue("mcp_query", "{}").await.unwrap();
q.enqueue("mcp_query", "{}").await.unwrap();
q.enqueue("observation", "{}").await.unwrap();
let counts = q.pending_counts_by_kind().await.unwrap();
assert_eq!(
counts,
vec![("mcp_query".to_owned(), 2), ("observation".to_owned(), 1),]
);
}
use crate::cloud::client::{HttpFailure, OutboxFailure, normalize_body_snippet};
#[test]
fn normalize_body_snippet_collapses_whitespace_runs_to_single_spaces() {
let raw = " line one \n\nline two\t\twith\ttabs ";
let snippet = normalize_body_snippet(raw, 200);
assert_eq!(snippet, "line one line two with tabs");
assert!(!snippet.contains('\n'));
assert!(!snippet.contains('\t'));
}
#[test]
fn normalize_body_snippet_caps_to_max_chars_without_splitting_utf8() {
let raw = "😀😀😀😀😀ASCII tail";
let snippet = normalize_body_snippet(raw, 5);
assert_eq!(snippet.chars().count(), 5);
assert_eq!(snippet, "😀😀😀😀😀");
}
#[test]
fn outbox_failure_http_with_body_matches_spec_format() {
let failure = OutboxFailure::Http(HttpFailure {
status: 401,
reason_phrase: "Unauthorized".to_owned(),
body_snippet: r#"{"error":"session_revoked"}"#.to_owned(),
});
assert_eq!(
failure.format_for_outbox_last_error(),
r#"401 Unauthorized: {"error":"session_revoked"}"#
);
}
#[test]
fn outbox_failure_http_with_empty_body_omits_trailing_colon() {
let failure = OutboxFailure::Http(HttpFailure {
status: 500,
reason_phrase: "Internal Server Error".to_owned(),
body_snippet: String::new(),
});
assert_eq!(
failure.format_for_outbox_last_error(),
"500 Internal Server Error",
);
}
#[test]
fn outbox_failure_transport_uses_distinct_sentinel_not_non_2xx_literal() {
let failure = OutboxFailure::Transport("dns lookup failed: timed out".to_owned());
let formatted = failure.format_for_outbox_last_error();
assert!(formatted.starts_with("transport: "));
assert!(formatted.contains("dns lookup failed"));
assert!(
!formatted.contains("non-2xx"),
"transport failures must not collapse to the legacy 'non-2xx' bucket"
);
}
#[tokio::test]
async fn mark_failed_persists_dispatchoutcome_last_error_verbatim() {
let pool = fresh_pool().await;
let q = OutboxQueue::new(pool.clone());
let id = q.enqueue("trajectory", "{}").await.unwrap();
let _claimed = q.claim_next().await.unwrap().expect("row claimed");
let formatted = OutboxFailure::Http(HttpFailure {
status: 401,
reason_phrase: "Unauthorized".to_owned(),
body_snippet: r#"{"error":"session_revoked"}"#.to_owned(),
})
.format_for_outbox_last_error();
q.mark_failed(id, &formatted).await.unwrap();
let stored: Option<String> =
sqlx::query_scalar!("SELECT last_error FROM cloud_outbox WHERE id = ?1", id)
.fetch_one(&pool)
.await
.unwrap();
let stored = stored.expect("mark_failed must populate last_error");
assert!(stored.starts_with("401 Unauthorized:"));
assert!(stored.contains("session_revoked"));
assert_ne!(stored, "upload returned non-2xx");
}
#[test]
fn dispatch_outcome_from_outbox_failure_propagates_spec_format() {
let outcome = DispatchOutcome::from_outbox_failure(&OutboxFailure::Http(HttpFailure {
status: 401,
reason_phrase: "Unauthorized".to_owned(),
body_snippet: r#"{"error":"session_revoked"}"#.to_owned(),
}));
assert!(!outcome.ok);
let last = outcome
.last_error
.expect("failures must always carry a last_error");
assert!(last.starts_with("401 Unauthorized:"));
assert!(last.contains("session_revoked"));
}
}