use crate::client::CosmosDBClient;
use crate::errors;
use crate::models::*;
use crate::query;
use duroxide::providers::ProviderError;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;
use std::time::Duration;
use tokio_util::sync::CancellationToken;
#[derive(Debug, Clone)]
pub struct OutboxFaultInjector {
remaining_failures: Arc<AtomicU32>,
}
impl OutboxFaultInjector {
pub fn fail_next(n: u32) -> Self {
Self {
remaining_failures: Arc::new(AtomicU32::new(n)),
}
}
pub fn should_fail(&self) -> bool {
self.remaining_failures
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |v| {
if v > 0 {
Some(v - 1)
} else {
None
}
})
.is_ok()
}
}
pub async fn deliver_intent(
client: &CosmosDBClient,
intent: &OutboxIntentDocument,
) -> Result<(), ProviderError> {
let payload: serde_json::Value = serde_json::from_str(&intent.payload).map_err(|e| {
ProviderError::permanent(
"deliver_intent",
format!("Failed to parse outbox payload: {e}"),
)
})?;
let resp = client
.create_document(&intent.target_instance_id, &payload)
.await?;
if resp.is_success() || errors::is_conflict(resp.status) {
Ok(())
} else {
Err(errors::map_cosmosdb_error(
"deliver_intent",
resp.status,
&resp.body,
))
}
}
pub async fn delete_intent(
client: &CosmosDBClient,
intent: &OutboxIntentDocument,
) -> Result<(), ProviderError> {
let resp = client
.delete_document(&intent.id, &intent.instance_id)
.await?;
if resp.status == 204 || errors::is_not_found(resp.status) {
Ok(())
} else {
Err(errors::map_cosmosdb_error(
"delete_intent",
resp.status,
&resp.body,
))
}
}
pub async fn deliver_intents_best_effort(
client: &CosmosDBClient,
intents: &[OutboxIntentDocument],
fault_injector: Option<&OutboxFaultInjector>,
) {
if let Some(fi) = fault_injector {
if fi.should_fail() {
tracing::warn!(
target: "duroxide::providers::cosmosdb::outbox",
count = intents.len(),
"FAULT INJECTION: skipping best-effort delivery"
);
return;
}
}
for intent in intents {
match deliver_intent(client, intent).await {
Ok(()) => {
if let Err(e) = delete_intent(client, intent).await {
tracing::warn!(
target: "duroxide::providers::cosmosdb::outbox",
intent_id = %intent.id,
error = %e,
"Failed to delete delivered outbox intent"
);
}
}
Err(e) if e.is_retryable() => {
tracing::debug!(
target: "duroxide::providers::cosmosdb::outbox",
intent_id = %intent.id,
error = %e,
"Outbox intent delivery deferred to reconciler"
);
}
Err(e) => {
tracing::warn!(
target: "duroxide::providers::cosmosdb::outbox",
intent_id = %intent.id,
error = %e,
"Permanent failure delivering outbox intent"
);
}
}
}
}
pub fn start_reconciler(
client: CosmosDBClient,
interval: Duration,
age_threshold: Duration,
cancel: CancellationToken,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
let mut ticker = tokio::time::interval(interval);
loop {
tokio::select! {
_ = cancel.cancelled() => {
tracing::info!(
target: "duroxide::providers::cosmosdb::outbox",
"Outbox reconciler shutting down"
);
break;
}
_ = ticker.tick() => {
if let Err(e) = reconcile_once(&client, age_threshold).await {
tracing::warn!(
target: "duroxide::providers::cosmosdb::outbox",
error = %e,
"Outbox reconciler cycle failed"
);
}
}
}
}
})
}
async fn reconcile_once(
client: &CosmosDBClient,
age_threshold: Duration,
) -> Result<usize, ProviderError> {
let now = now_ms();
let age_ms = age_threshold.as_millis() as u64;
let pending = query::query_pending_intents(client, age_ms, now).await?;
if pending.is_empty() {
return Ok(0);
}
tracing::debug!(
target: "duroxide::providers::cosmosdb::outbox",
count = pending.len(),
"Reconciler found pending intents"
);
let mut delivered = 0;
for intent in &pending {
match deliver_intent(client, intent).await {
Ok(()) => {
delete_intent(client, intent).await.ok();
delivered += 1;
}
Err(e) if e.is_retryable() => {
tracing::debug!(
target: "duroxide::providers::cosmosdb::outbox",
intent_id = %intent.id,
error = %e,
"Reconciler: transient failure, retrying next cycle"
);
}
Err(e) => {
tracing::warn!(
target: "duroxide::providers::cosmosdb::outbox",
intent_id = %intent.id,
error = %e,
"Reconciler: permanent failure delivering intent"
);
}
}
}
Ok(delivered)
}