Skip to main content

duroxide_cdb/
outbox.rs

1use crate::client::CosmosDBClient;
2use crate::errors;
3use crate::models::*;
4use crate::query;
5use duroxide::providers::ProviderError;
6
7use std::sync::atomic::{AtomicU32, Ordering};
8use std::sync::Arc;
9use std::time::Duration;
10use tokio_util::sync::CancellationToken;
11
12// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
13// Fault injection for testing outbox delivery
14// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
15
16/// Controls outbox delivery fault injection for testing.
17///
18/// When `remaining_failures` > 0, `deliver_intents_best_effort` will skip
19/// delivery (simulating a network failure) and decrement the counter.
20/// Once it reaches 0, delivery proceeds normally.
21///
22/// The reconciler is NOT affected by fault injection — it always delivers.
23/// This models the real failure mode: immediate delivery fails, reconciler
24/// picks up the slack.
25#[derive(Debug, Clone)]
26pub struct OutboxFaultInjector {
27    remaining_failures: Arc<AtomicU32>,
28}
29
30impl OutboxFaultInjector {
31    /// Create a fault injector that will cause the next `n` best-effort
32    /// delivery attempts to silently fail. The background reconciler
33    /// is unaffected and will deliver the intents.
34    pub fn fail_next(n: u32) -> Self {
35        Self {
36            remaining_failures: Arc::new(AtomicU32::new(n)),
37        }
38    }
39
40    /// Returns true if delivery should be suppressed (and decrements the counter).
41    pub fn should_fail(&self) -> bool {
42        self.remaining_failures
43            .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |v| {
44                if v > 0 {
45                    Some(v - 1)
46                } else {
47                    None
48                }
49            })
50            .is_ok()
51    }
52}
53
54/// Deliver a single outbox intent to its target partition.
55/// Returns Ok(()) if delivered (or already exists).
56pub async fn deliver_intent(
57    client: &CosmosDBClient,
58    intent: &OutboxIntentDocument,
59) -> Result<(), ProviderError> {
60    // The payload is the complete document to create in the target partition
61    let payload: serde_json::Value = serde_json::from_str(&intent.payload).map_err(|e| {
62        ProviderError::permanent(
63            "deliver_intent",
64            format!("Failed to parse outbox payload: {e}"),
65        )
66    })?;
67
68    let resp = client
69        .create_document(&intent.target_instance_id, &payload)
70        .await?;
71
72    if resp.is_success() || errors::is_conflict(resp.status) {
73        // 201 = created, 409 = already delivered (idempotent)
74        Ok(())
75    } else {
76        Err(errors::map_cosmosdb_error(
77            "deliver_intent",
78            resp.status,
79            &resp.body,
80        ))
81    }
82}
83
84/// Delete an outbox intent after successful delivery.
85pub async fn delete_intent(
86    client: &CosmosDBClient,
87    intent: &OutboxIntentDocument,
88) -> Result<(), ProviderError> {
89    let resp = client
90        .delete_document(&intent.id, &intent.instance_id)
91        .await?;
92
93    if resp.status == 204 || errors::is_not_found(resp.status) {
94        Ok(())
95    } else {
96        Err(errors::map_cosmosdb_error(
97            "delete_intent",
98            resp.status,
99            &resp.body,
100        ))
101    }
102}
103
104/// Best-effort delivery of outbox intents. Used right after ack_orchestration_item.
105/// If a `fault_injector` is provided and has remaining failures, delivery is
106/// skipped (simulating a transient network error). The reconciler will pick up
107/// the undelivered intents.
108pub async fn deliver_intents_best_effort(
109    client: &CosmosDBClient,
110    intents: &[OutboxIntentDocument],
111    fault_injector: Option<&OutboxFaultInjector>,
112) {
113    // Fault injection: skip delivery to test reconciler recovery
114    if let Some(fi) = fault_injector {
115        if fi.should_fail() {
116            tracing::warn!(
117                target: "duroxide::providers::cosmosdb::outbox",
118                count = intents.len(),
119                "FAULT INJECTION: skipping best-effort delivery"
120            );
121            return;
122        }
123    }
124
125    for intent in intents {
126        match deliver_intent(client, intent).await {
127            Ok(()) => {
128                // Delivered successfully, delete the intent
129                if let Err(e) = delete_intent(client, intent).await {
130                    tracing::warn!(
131                        target: "duroxide::providers::cosmosdb::outbox",
132                        intent_id = %intent.id,
133                        error = %e,
134                        "Failed to delete delivered outbox intent"
135                    );
136                }
137            }
138            Err(e) if e.is_retryable() => {
139                tracing::debug!(
140                    target: "duroxide::providers::cosmosdb::outbox",
141                    intent_id = %intent.id,
142                    error = %e,
143                    "Outbox intent delivery deferred to reconciler"
144                );
145            }
146            Err(e) => {
147                tracing::warn!(
148                    target: "duroxide::providers::cosmosdb::outbox",
149                    intent_id = %intent.id,
150                    error = %e,
151                    "Permanent failure delivering outbox intent"
152                );
153            }
154        }
155    }
156}
157
158/// Background reconciler that periodically delivers pending outbox intents.
159pub fn start_reconciler(
160    client: CosmosDBClient,
161    interval: Duration,
162    age_threshold: Duration,
163    cancel: CancellationToken,
164) -> tokio::task::JoinHandle<()> {
165    tokio::spawn(async move {
166        let mut ticker = tokio::time::interval(interval);
167        loop {
168            tokio::select! {
169                _ = cancel.cancelled() => {
170                    tracing::info!(
171                        target: "duroxide::providers::cosmosdb::outbox",
172                        "Outbox reconciler shutting down"
173                    );
174                    break;
175                }
176                _ = ticker.tick() => {
177                    if let Err(e) = reconcile_once(&client, age_threshold).await {
178                        tracing::warn!(
179                            target: "duroxide::providers::cosmosdb::outbox",
180                            error = %e,
181                            "Outbox reconciler cycle failed"
182                        );
183                    }
184                }
185            }
186        }
187    })
188}
189
190/// Run one reconciliation cycle.
191async fn reconcile_once(
192    client: &CosmosDBClient,
193    age_threshold: Duration,
194) -> Result<usize, ProviderError> {
195    let now = now_ms();
196    let age_ms = age_threshold.as_millis() as u64;
197
198    let pending = query::query_pending_intents(client, age_ms, now).await?;
199
200    if pending.is_empty() {
201        return Ok(0);
202    }
203
204    tracing::debug!(
205        target: "duroxide::providers::cosmosdb::outbox",
206        count = pending.len(),
207        "Reconciler found pending intents"
208    );
209
210    let mut delivered = 0;
211    for intent in &pending {
212        match deliver_intent(client, intent).await {
213            Ok(()) => {
214                delete_intent(client, intent).await.ok();
215                delivered += 1;
216            }
217            Err(e) if e.is_retryable() => {
218                // Leave for next cycle
219                tracing::debug!(
220                    target: "duroxide::providers::cosmosdb::outbox",
221                    intent_id = %intent.id,
222                    error = %e,
223                    "Reconciler: transient failure, retrying next cycle"
224                );
225            }
226            Err(e) => {
227                tracing::warn!(
228                    target: "duroxide::providers::cosmosdb::outbox",
229                    intent_id = %intent.id,
230                    error = %e,
231                    "Reconciler: permanent failure delivering intent"
232                );
233            }
234        }
235    }
236
237    Ok(delivered)
238}