1pub use awaken_runtime_contract::contract::outbox::OutboxError;
5
6use std::sync::Arc;
7
8use async_trait::async_trait;
9use serde::{Deserialize, Serialize};
10use serde_json::Value;
11
12use crate::contract::scope::{ScopeId, scoped_key, unscoped_key};
13
14pub const OUTBOX_LANE_CANONICAL: &str = "canonical";
15pub const OUTBOX_LANE_PROTOCOL_REPLAY: &str = "protocol_replay";
16pub const OUTBOX_TARGET_PROTOCOL_PROJECTOR: &str = "protocol_projector";
17pub const OUTBOX_TARGET_PROTOCOL_FANOUT: &str = "protocol_fanout";
18pub const OUTBOX_TARGET_A2A_WEBHOOK: &str = "a2a_webhook";
19
20#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
21#[serde(rename_all = "snake_case")]
22pub enum OutboxStatus {
23 Pending,
24 Claimed,
25 Delivered,
26 DeadLetter,
27}
28
29#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
30pub struct OutboxMessageDraft {
31 pub lane: String,
32 pub target: String,
33 pub payload: Value,
34 #[serde(default, skip_serializing_if = "Option::is_none")]
35 pub dedupe_key: Option<String>,
36 #[serde(default)]
37 pub available_at: u64,
38 #[serde(default = "default_max_attempts")]
39 pub max_attempts: u32,
40}
41
42impl OutboxMessageDraft {
43 pub fn new(
44 lane: impl Into<String>,
45 target: impl Into<String>,
46 payload: Value,
47 ) -> Result<Self, OutboxError> {
48 let draft = Self {
49 lane: lane.into(),
50 target: target.into(),
51 payload,
52 dedupe_key: None,
53 available_at: 0,
54 max_attempts: default_max_attempts(),
55 };
56 draft.validate()?;
57 Ok(draft)
58 }
59
60 pub fn validate(&self) -> Result<(), OutboxError> {
61 reject_blank("lane", &self.lane)?;
62 reject_blank("target", &self.target)?;
63 if let Some(dedupe_key) = &self.dedupe_key {
64 reject_blank("dedupe_key", dedupe_key)?;
65 }
66 if self.max_attempts == 0 {
67 return Err(OutboxError::Validation(
68 "max_attempts must be greater than zero".to_string(),
69 ));
70 }
71 Ok(())
72 }
73}
74
75#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
76pub struct OutboxMessage {
77 pub outbox_id: String,
78 pub lane: String,
79 pub target: String,
80 pub payload: Value,
81 #[serde(default, skip_serializing_if = "Option::is_none")]
82 pub dedupe_key: Option<String>,
83 pub status: OutboxStatus,
84 pub available_at: u64,
85 pub attempt_count: u32,
86 pub max_attempts: u32,
87 #[serde(default, skip_serializing_if = "Option::is_none")]
88 pub claimed_by: Option<String>,
89 #[serde(default, skip_serializing_if = "Option::is_none")]
90 pub claim_token: Option<String>,
91 #[serde(default, skip_serializing_if = "Option::is_none")]
92 pub lease_expires_at: Option<u64>,
93 #[serde(default, skip_serializing_if = "Option::is_none")]
94 pub last_error: Option<String>,
95 pub created_at: u64,
96 pub updated_at: u64,
97}
98
99impl OutboxMessage {
100 pub fn from_enqueue(
101 outbox_id: String,
102 draft: OutboxMessageDraft,
103 now: u64,
104 ) -> Result<Self, OutboxError> {
105 draft.validate()?;
106 reject_blank("outbox_id", &outbox_id)?;
107 Ok(Self {
108 outbox_id,
109 lane: draft.lane,
110 target: draft.target,
111 payload: draft.payload,
112 dedupe_key: draft.dedupe_key,
113 status: OutboxStatus::Pending,
114 available_at: draft.available_at,
115 attempt_count: 0,
116 max_attempts: draft.max_attempts,
117 claimed_by: None,
118 claim_token: None,
119 lease_expires_at: None,
120 last_error: None,
121 created_at: now,
122 updated_at: now,
123 })
124 }
125}
126
127#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
128pub struct OutboxEnqueueResult {
129 pub message: OutboxMessage,
130}
131
132#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
133pub enum OutboxNackOutcome {
134 Requeued,
135 DeadLettered,
136 LostClaim,
137}
138
139fn default_max_attempts() -> u32 {
140 5
141}
142
143fn reject_blank(field: &str, value: &str) -> Result<(), OutboxError> {
144 if value.trim().is_empty() {
145 return Err(OutboxError::Validation(format!("{field} is required")));
146 }
147 Ok(())
148}
149
150#[async_trait]
171pub trait OutboxStore: Send + Sync {
172 async fn enqueue_outbox(
176 &self,
177 draft: OutboxMessageDraft,
178 ) -> Result<OutboxEnqueueResult, OutboxError>;
179
180 async fn claim_outbox(
181 &self,
182 lane: &str,
183 target: &str,
184 limit: usize,
185 lease_ms: u64,
186 consumer_id: &str,
187 now: u64,
188 ) -> Result<Vec<OutboxMessage>, OutboxError>;
189
190 async fn ack_outbox(
191 &self,
192 outbox_id: &str,
193 claim_token: &str,
194 now: u64,
195 ) -> Result<bool, OutboxError>;
196
197 async fn nack_outbox(
198 &self,
199 outbox_id: &str,
200 claim_token: &str,
201 error: &str,
202 retry_at: u64,
203 now: u64,
204 ) -> Result<OutboxNackOutcome, OutboxError>;
205
206 async fn list_outbox(
207 &self,
208 status: Option<OutboxStatus>,
209 limit: usize,
210 ) -> Result<Vec<OutboxMessage>, OutboxError>;
211}
212
213#[derive(Clone)]
214pub struct ScopedOutboxStore {
215 inner: Arc<dyn OutboxStore>,
216 scope_id: ScopeId,
217}
218
219impl ScopedOutboxStore {
220 pub fn new(inner: Arc<dyn OutboxStore>, scope_id: ScopeId) -> Self {
221 Self { inner, scope_id }
222 }
223
224 pub fn scope_id(&self) -> &ScopeId {
225 &self.scope_id
226 }
227
228 pub fn inner(&self) -> &dyn OutboxStore {
229 self.inner.as_ref()
230 }
231
232 fn scoped(&self, value: &str) -> String {
233 scoped_key(&self.scope_id, value)
234 }
235
236 fn unscoped<'a>(&self, value: &'a str) -> Option<&'a str> {
237 unscoped_key(&self.scope_id, value)
238 }
239
240 fn encode_draft(&self, mut draft: OutboxMessageDraft) -> OutboxMessageDraft {
241 draft.lane = self.scoped(&draft.lane);
242 draft.dedupe_key = draft.dedupe_key.as_deref().map(|key| self.scoped(key));
243 draft
244 }
245
246 fn decode_message(&self, mut message: OutboxMessage) -> Option<OutboxMessage> {
247 message.lane = self.unscoped(&message.lane)?.to_string();
248 message.dedupe_key = message
249 .dedupe_key
250 .as_deref()
251 .map(|key| self.unscoped(key).map(str::to_string))
252 .unwrap_or(None);
253 Some(message)
254 }
255}
256
257#[async_trait]
258impl OutboxStore for ScopedOutboxStore {
259 async fn enqueue_outbox(
260 &self,
261 draft: OutboxMessageDraft,
262 ) -> Result<OutboxEnqueueResult, OutboxError> {
263 let result = self.inner.enqueue_outbox(self.encode_draft(draft)).await?;
264 let message = self.decode_message(result.message).ok_or_else(|| {
265 OutboxError::Io("scoped outbox store returned a message outside its scope".into())
266 })?;
267 Ok(OutboxEnqueueResult { message })
268 }
269
270 async fn claim_outbox(
271 &self,
272 lane: &str,
273 target: &str,
274 limit: usize,
275 lease_ms: u64,
276 consumer_id: &str,
277 now: u64,
278 ) -> Result<Vec<OutboxMessage>, OutboxError> {
279 Ok(self
280 .inner
281 .claim_outbox(
282 &self.scoped(lane),
283 target,
284 limit,
285 lease_ms,
286 consumer_id,
287 now,
288 )
289 .await?
290 .into_iter()
291 .filter_map(|message| self.decode_message(message))
292 .collect())
293 }
294
295 async fn ack_outbox(
296 &self,
297 outbox_id: &str,
298 claim_token: &str,
299 now: u64,
300 ) -> Result<bool, OutboxError> {
301 let Some(message) = self
302 .list_outbox(Some(OutboxStatus::Claimed), usize::MAX)
303 .await?
304 .into_iter()
305 .find(|message| message.outbox_id == outbox_id)
306 else {
307 return Ok(false);
308 };
309 self.inner
310 .ack_outbox(&message.outbox_id, claim_token, now)
311 .await
312 }
313
314 async fn nack_outbox(
315 &self,
316 outbox_id: &str,
317 claim_token: &str,
318 error: &str,
319 retry_at: u64,
320 now: u64,
321 ) -> Result<OutboxNackOutcome, OutboxError> {
322 let Some(message) = self
323 .list_outbox(Some(OutboxStatus::Claimed), usize::MAX)
324 .await?
325 .into_iter()
326 .find(|message| message.outbox_id == outbox_id)
327 else {
328 return Ok(OutboxNackOutcome::LostClaim);
329 };
330 self.inner
331 .nack_outbox(&message.outbox_id, claim_token, error, retry_at, now)
332 .await
333 }
334
335 async fn list_outbox(
336 &self,
337 status: Option<OutboxStatus>,
338 limit: usize,
339 ) -> Result<Vec<OutboxMessage>, OutboxError> {
340 Ok(self
341 .inner
342 .list_outbox(status, usize::MAX)
343 .await?
344 .into_iter()
345 .filter_map(|message| self.decode_message(message))
346 .take(limit)
347 .collect())
348 }
349}
350
351#[cfg(test)]
352mod tests {
353 use super::*;
354
355 #[test]
356 fn draft_rejects_blank_lane() {
357 let err = OutboxMessageDraft::new(" ", "target", serde_json::json!({})).unwrap_err();
358 assert!(matches!(err, OutboxError::Validation(message) if message.contains("lane")));
359 }
360
361 #[test]
362 fn message_from_enqueue_initializes_pending_delivery_state() {
363 let draft =
364 OutboxMessageDraft::new("canonical", "projector", serde_json::json!({})).unwrap();
365 let message = OutboxMessage::from_enqueue("out_1".into(), draft, 42).unwrap();
366 assert_eq!(message.status, OutboxStatus::Pending);
367 assert_eq!(message.attempt_count, 0);
368 assert_eq!(message.created_at, 42);
369 assert!(message.claim_token.is_none());
370 }
371}