1use crate::client::CosmosDBClient;
2use crate::errors;
3use crate::models::*;
4use crate::query;
5use duroxide::providers::ProviderError;
6
7use std::sync::atomic::{AtomicU32, Ordering};
8use std::sync::Arc;
9use std::time::Duration;
10use tokio_util::sync::CancellationToken;
11
12#[derive(Debug, Clone)]
26pub struct OutboxFaultInjector {
27 remaining_failures: Arc<AtomicU32>,
28}
29
30impl OutboxFaultInjector {
31 pub fn fail_next(n: u32) -> Self {
35 Self {
36 remaining_failures: Arc::new(AtomicU32::new(n)),
37 }
38 }
39
40 pub fn should_fail(&self) -> bool {
42 self.remaining_failures
43 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |v| {
44 if v > 0 {
45 Some(v - 1)
46 } else {
47 None
48 }
49 })
50 .is_ok()
51 }
52}
53
54pub async fn deliver_intent(
57 client: &CosmosDBClient,
58 intent: &OutboxIntentDocument,
59) -> Result<(), ProviderError> {
60 let payload: serde_json::Value = serde_json::from_str(&intent.payload).map_err(|e| {
62 ProviderError::permanent(
63 "deliver_intent",
64 format!("Failed to parse outbox payload: {e}"),
65 )
66 })?;
67
68 let resp = client
69 .create_document(&intent.target_instance_id, &payload)
70 .await?;
71
72 if resp.is_success() || errors::is_conflict(resp.status) {
73 Ok(())
75 } else {
76 Err(errors::map_cosmosdb_error(
77 "deliver_intent",
78 resp.status,
79 &resp.body,
80 ))
81 }
82}
83
84pub async fn delete_intent(
86 client: &CosmosDBClient,
87 intent: &OutboxIntentDocument,
88) -> Result<(), ProviderError> {
89 let resp = client
90 .delete_document(&intent.id, &intent.instance_id)
91 .await?;
92
93 if resp.status == 204 || errors::is_not_found(resp.status) {
94 Ok(())
95 } else {
96 Err(errors::map_cosmosdb_error(
97 "delete_intent",
98 resp.status,
99 &resp.body,
100 ))
101 }
102}
103
104pub async fn deliver_intents_best_effort(
109 client: &CosmosDBClient,
110 intents: &[OutboxIntentDocument],
111 fault_injector: Option<&OutboxFaultInjector>,
112) {
113 if let Some(fi) = fault_injector {
115 if fi.should_fail() {
116 tracing::warn!(
117 target: "duroxide::providers::cosmosdb::outbox",
118 count = intents.len(),
119 "FAULT INJECTION: skipping best-effort delivery"
120 );
121 return;
122 }
123 }
124
125 for intent in intents {
126 match deliver_intent(client, intent).await {
127 Ok(()) => {
128 if let Err(e) = delete_intent(client, intent).await {
130 tracing::warn!(
131 target: "duroxide::providers::cosmosdb::outbox",
132 intent_id = %intent.id,
133 error = %e,
134 "Failed to delete delivered outbox intent"
135 );
136 }
137 }
138 Err(e) if e.is_retryable() => {
139 tracing::debug!(
140 target: "duroxide::providers::cosmosdb::outbox",
141 intent_id = %intent.id,
142 error = %e,
143 "Outbox intent delivery deferred to reconciler"
144 );
145 }
146 Err(e) => {
147 tracing::warn!(
148 target: "duroxide::providers::cosmosdb::outbox",
149 intent_id = %intent.id,
150 error = %e,
151 "Permanent failure delivering outbox intent"
152 );
153 }
154 }
155 }
156}
157
158pub fn start_reconciler(
160 client: CosmosDBClient,
161 interval: Duration,
162 age_threshold: Duration,
163 cancel: CancellationToken,
164) -> tokio::task::JoinHandle<()> {
165 tokio::spawn(async move {
166 let mut ticker = tokio::time::interval(interval);
167 loop {
168 tokio::select! {
169 _ = cancel.cancelled() => {
170 tracing::info!(
171 target: "duroxide::providers::cosmosdb::outbox",
172 "Outbox reconciler shutting down"
173 );
174 break;
175 }
176 _ = ticker.tick() => {
177 if let Err(e) = reconcile_once(&client, age_threshold).await {
178 tracing::warn!(
179 target: "duroxide::providers::cosmosdb::outbox",
180 error = %e,
181 "Outbox reconciler cycle failed"
182 );
183 }
184 }
185 }
186 }
187 })
188}
189
190async fn reconcile_once(
192 client: &CosmosDBClient,
193 age_threshold: Duration,
194) -> Result<usize, ProviderError> {
195 let now = now_ms();
196 let age_ms = age_threshold.as_millis() as u64;
197
198 let pending = query::query_pending_intents(client, age_ms, now).await?;
199
200 if pending.is_empty() {
201 return Ok(0);
202 }
203
204 tracing::debug!(
205 target: "duroxide::providers::cosmosdb::outbox",
206 count = pending.len(),
207 "Reconciler found pending intents"
208 );
209
210 let mut delivered = 0;
211 for intent in &pending {
212 match deliver_intent(client, intent).await {
213 Ok(()) => {
214 delete_intent(client, intent).await.ok();
215 delivered += 1;
216 }
217 Err(e) if e.is_retryable() => {
218 tracing::debug!(
220 target: "duroxide::providers::cosmosdb::outbox",
221 intent_id = %intent.id,
222 error = %e,
223 "Reconciler: transient failure, retrying next cycle"
224 );
225 }
226 Err(e) => {
227 tracing::warn!(
228 target: "duroxide::providers::cosmosdb::outbox",
229 intent_id = %intent.id,
230 error = %e,
231 "Reconciler: permanent failure delivering intent"
232 );
233 }
234 }
235 }
236
237 Ok(delivered)
238}