duroxide-cdb 0.1.6

A CosmosDB-based provider implementation for Duroxide, a durable task orchestration framework
Documentation
use crate::client::CosmosDBClient;
use crate::errors;
use crate::models::*;
use crate::query;
use duroxide::providers::ProviderError;

use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;
use std::time::Duration;
use tokio_util::sync::CancellationToken;

// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
// Fault injection for testing outbox delivery
// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

/// Controls outbox delivery fault injection for testing.
///
/// When `remaining_failures` > 0, `deliver_intents_best_effort` will skip
/// delivery (simulating a network failure) and decrement the counter.
/// Once it reaches 0, delivery proceeds normally.
///
/// The reconciler is NOT affected by fault injection — it always delivers.
/// This models the real failure mode: immediate delivery fails, reconciler
/// picks up the slack.
#[derive(Debug, Clone)]
pub struct OutboxFaultInjector {
    remaining_failures: Arc<AtomicU32>,
}

impl OutboxFaultInjector {
    /// Create a fault injector that will cause the next `n` best-effort
    /// delivery attempts to silently fail. The background reconciler
    /// is unaffected and will deliver the intents.
    pub fn fail_next(n: u32) -> Self {
        Self {
            remaining_failures: Arc::new(AtomicU32::new(n)),
        }
    }

    /// Returns true if delivery should be suppressed (and decrements the counter).
    pub fn should_fail(&self) -> bool {
        self.remaining_failures
            .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |v| {
                if v > 0 {
                    Some(v - 1)
                } else {
                    None
                }
            })
            .is_ok()
    }
}

/// Deliver a single outbox intent to its target partition.
/// Returns Ok(()) if delivered (or already exists).
pub async fn deliver_intent(
    client: &CosmosDBClient,
    intent: &OutboxIntentDocument,
) -> Result<(), ProviderError> {
    // The payload is the complete document to create in the target partition
    let payload: serde_json::Value = serde_json::from_str(&intent.payload).map_err(|e| {
        ProviderError::permanent(
            "deliver_intent",
            format!("Failed to parse outbox payload: {e}"),
        )
    })?;

    let resp = client
        .create_document(&intent.target_instance_id, &payload)
        .await?;

    if resp.is_success() || errors::is_conflict(resp.status) {
        // 201 = created, 409 = already delivered (idempotent)
        Ok(())
    } else {
        Err(errors::map_cosmosdb_error(
            "deliver_intent",
            resp.status,
            &resp.body,
        ))
    }
}

/// Delete an outbox intent after successful delivery.
pub async fn delete_intent(
    client: &CosmosDBClient,
    intent: &OutboxIntentDocument,
) -> Result<(), ProviderError> {
    let resp = client
        .delete_document(&intent.id, &intent.instance_id)
        .await?;

    if resp.status == 204 || errors::is_not_found(resp.status) {
        Ok(())
    } else {
        Err(errors::map_cosmosdb_error(
            "delete_intent",
            resp.status,
            &resp.body,
        ))
    }
}

/// Best-effort delivery of outbox intents. Used right after ack_orchestration_item.
/// If a `fault_injector` is provided and has remaining failures, delivery is
/// skipped (simulating a transient network error). The reconciler will pick up
/// the undelivered intents.
pub async fn deliver_intents_best_effort(
    client: &CosmosDBClient,
    intents: &[OutboxIntentDocument],
    fault_injector: Option<&OutboxFaultInjector>,
) {
    // Fault injection: skip delivery to test reconciler recovery
    if let Some(fi) = fault_injector {
        if fi.should_fail() {
            tracing::warn!(
                target: "duroxide::providers::cosmosdb::outbox",
                count = intents.len(),
                "FAULT INJECTION: skipping best-effort delivery"
            );
            return;
        }
    }

    for intent in intents {
        match deliver_intent(client, intent).await {
            Ok(()) => {
                // Delivered successfully, delete the intent
                if let Err(e) = delete_intent(client, intent).await {
                    tracing::warn!(
                        target: "duroxide::providers::cosmosdb::outbox",
                        intent_id = %intent.id,
                        error = %e,
                        "Failed to delete delivered outbox intent"
                    );
                }
            }
            Err(e) if e.is_retryable() => {
                tracing::debug!(
                    target: "duroxide::providers::cosmosdb::outbox",
                    intent_id = %intent.id,
                    error = %e,
                    "Outbox intent delivery deferred to reconciler"
                );
            }
            Err(e) => {
                tracing::warn!(
                    target: "duroxide::providers::cosmosdb::outbox",
                    intent_id = %intent.id,
                    error = %e,
                    "Permanent failure delivering outbox intent"
                );
            }
        }
    }
}

/// Background reconciler that periodically delivers pending outbox intents.
pub fn start_reconciler(
    client: CosmosDBClient,
    interval: Duration,
    age_threshold: Duration,
    cancel: CancellationToken,
) -> tokio::task::JoinHandle<()> {
    tokio::spawn(async move {
        let mut ticker = tokio::time::interval(interval);
        loop {
            tokio::select! {
                _ = cancel.cancelled() => {
                    tracing::info!(
                        target: "duroxide::providers::cosmosdb::outbox",
                        "Outbox reconciler shutting down"
                    );
                    break;
                }
                _ = ticker.tick() => {
                    if let Err(e) = reconcile_once(&client, age_threshold).await {
                        tracing::warn!(
                            target: "duroxide::providers::cosmosdb::outbox",
                            error = %e,
                            "Outbox reconciler cycle failed"
                        );
                    }
                }
            }
        }
    })
}

/// Run one reconciliation cycle.
async fn reconcile_once(
    client: &CosmosDBClient,
    age_threshold: Duration,
) -> Result<usize, ProviderError> {
    let now = now_ms();
    let age_ms = age_threshold.as_millis() as u64;

    let pending = query::query_pending_intents(client, age_ms, now).await?;

    if pending.is_empty() {
        return Ok(0);
    }

    tracing::debug!(
        target: "duroxide::providers::cosmosdb::outbox",
        count = pending.len(),
        "Reconciler found pending intents"
    );

    let mut delivered = 0;
    for intent in &pending {
        match deliver_intent(client, intent).await {
            Ok(()) => {
                delete_intent(client, intent).await.ok();
                delivered += 1;
            }
            Err(e) if e.is_retryable() => {
                // Leave for next cycle
                tracing::debug!(
                    target: "duroxide::providers::cosmosdb::outbox",
                    intent_id = %intent.id,
                    error = %e,
                    "Reconciler: transient failure, retrying next cycle"
                );
            }
            Err(e) => {
                tracing::warn!(
                    target: "duroxide::providers::cosmosdb::outbox",
                    intent_id = %intent.id,
                    error = %e,
                    "Reconciler: permanent failure delivering intent"
                );
            }
        }
    }

    Ok(delivered)
}