Skip to main content

awaken_server_contract/contract/
outbox.rs

1// `OutboxError` is the one outbox type the runtime-contract write boundary
2// names; everything else (status, message/draft data, lane constants, the
3// store trait) is server/store-owned and defined here.
4pub use awaken_runtime_contract::contract::outbox::OutboxError;
5
6use std::sync::Arc;
7
8use async_trait::async_trait;
9use serde::{Deserialize, Serialize};
10use serde_json::Value;
11
12use crate::contract::scope::{ScopeId, scoped_key, unscoped_key};
13
14pub const OUTBOX_LANE_CANONICAL: &str = "canonical";
15pub const OUTBOX_LANE_PROTOCOL_REPLAY: &str = "protocol_replay";
16pub const OUTBOX_TARGET_PROTOCOL_PROJECTOR: &str = "protocol_projector";
17pub const OUTBOX_TARGET_PROTOCOL_FANOUT: &str = "protocol_fanout";
18pub const OUTBOX_TARGET_A2A_WEBHOOK: &str = "a2a_webhook";
19
20#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
21#[serde(rename_all = "snake_case")]
22pub enum OutboxStatus {
23    Pending,
24    Claimed,
25    Delivered,
26    DeadLetter,
27}
28
29#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
30pub struct OutboxMessageDraft {
31    pub lane: String,
32    pub target: String,
33    pub payload: Value,
34    #[serde(default, skip_serializing_if = "Option::is_none")]
35    pub dedupe_key: Option<String>,
36    #[serde(default)]
37    pub available_at: u64,
38    #[serde(default = "default_max_attempts")]
39    pub max_attempts: u32,
40}
41
42impl OutboxMessageDraft {
43    pub fn new(
44        lane: impl Into<String>,
45        target: impl Into<String>,
46        payload: Value,
47    ) -> Result<Self, OutboxError> {
48        let draft = Self {
49            lane: lane.into(),
50            target: target.into(),
51            payload,
52            dedupe_key: None,
53            available_at: 0,
54            max_attempts: default_max_attempts(),
55        };
56        draft.validate()?;
57        Ok(draft)
58    }
59
60    pub fn validate(&self) -> Result<(), OutboxError> {
61        reject_blank("lane", &self.lane)?;
62        reject_blank("target", &self.target)?;
63        if let Some(dedupe_key) = &self.dedupe_key {
64            reject_blank("dedupe_key", dedupe_key)?;
65        }
66        if self.max_attempts == 0 {
67            return Err(OutboxError::Validation(
68                "max_attempts must be greater than zero".to_string(),
69            ));
70        }
71        Ok(())
72    }
73}
74
75#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
76pub struct OutboxMessage {
77    pub outbox_id: String,
78    pub lane: String,
79    pub target: String,
80    pub payload: Value,
81    #[serde(default, skip_serializing_if = "Option::is_none")]
82    pub dedupe_key: Option<String>,
83    pub status: OutboxStatus,
84    pub available_at: u64,
85    pub attempt_count: u32,
86    pub max_attempts: u32,
87    #[serde(default, skip_serializing_if = "Option::is_none")]
88    pub claimed_by: Option<String>,
89    #[serde(default, skip_serializing_if = "Option::is_none")]
90    pub claim_token: Option<String>,
91    #[serde(default, skip_serializing_if = "Option::is_none")]
92    pub lease_expires_at: Option<u64>,
93    #[serde(default, skip_serializing_if = "Option::is_none")]
94    pub last_error: Option<String>,
95    pub created_at: u64,
96    pub updated_at: u64,
97}
98
99impl OutboxMessage {
100    pub fn from_enqueue(
101        outbox_id: String,
102        draft: OutboxMessageDraft,
103        now: u64,
104    ) -> Result<Self, OutboxError> {
105        draft.validate()?;
106        reject_blank("outbox_id", &outbox_id)?;
107        Ok(Self {
108            outbox_id,
109            lane: draft.lane,
110            target: draft.target,
111            payload: draft.payload,
112            dedupe_key: draft.dedupe_key,
113            status: OutboxStatus::Pending,
114            available_at: draft.available_at,
115            attempt_count: 0,
116            max_attempts: draft.max_attempts,
117            claimed_by: None,
118            claim_token: None,
119            lease_expires_at: None,
120            last_error: None,
121            created_at: now,
122            updated_at: now,
123        })
124    }
125}
126
127#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
128pub struct OutboxEnqueueResult {
129    pub message: OutboxMessage,
130}
131
132#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
133pub enum OutboxNackOutcome {
134    Requeued,
135    DeadLettered,
136    LostClaim,
137}
138
139fn default_max_attempts() -> u32 {
140    5
141}
142
143fn reject_blank(field: &str, value: &str) -> Result<(), OutboxError> {
144    if value.trim().is_empty() {
145        return Err(OutboxError::Validation(format!("{field} is required")));
146    }
147    Ok(())
148}
149
150/// Outbox store contract (ADR-0034 D9/D10).
151///
152/// `enqueue_outbox` is the protocol-neutral entry point and does NOT share a
153/// transaction with the caller's domain writes. Backends with transactional
154/// guarantees expose an additional concrete-type entry point that accepts a
155/// backend-specific transaction handle:
156///
157/// - `PostgresStore` provides `enqueue_outbox_in_transaction` to attach an
158///   outbox insert to an externally-managed `sqlx::Transaction`. This is the
159///   path control-plane writers use to honor ADR-0034 D9's `BEGIN ... update
160///   domain row ... append canonical event ... insert outbox ... COMMIT`
161///   atomicity requirement.
162/// - The canonical EventStore append paths (Postgres) already enqueue the
163///   canonical outbox row within the same EventStore transaction, so callers
164///   that only need EventStore + outbox atomicity get it for free via
165///   `EventWriter::append`.
166/// - Implementations whose database is different from the canonical event
167///   backend can ONLY provide eventually-consistent canonical→outbox writes;
168///   the deployment must accept the eventual-consistency profile that
169///   ADR-0034 documents.
170#[async_trait]
171pub trait OutboxStore: Send + Sync {
172    /// Enqueue an outbox row in the backend's own transaction. Use the
173    /// concrete-type transactional method (see trait docs) when the caller
174    /// needs to share a transaction with other writes.
175    async fn enqueue_outbox(
176        &self,
177        draft: OutboxMessageDraft,
178    ) -> Result<OutboxEnqueueResult, OutboxError>;
179
180    async fn claim_outbox(
181        &self,
182        lane: &str,
183        target: &str,
184        limit: usize,
185        lease_ms: u64,
186        consumer_id: &str,
187        now: u64,
188    ) -> Result<Vec<OutboxMessage>, OutboxError>;
189
190    async fn ack_outbox(
191        &self,
192        outbox_id: &str,
193        claim_token: &str,
194        now: u64,
195    ) -> Result<bool, OutboxError>;
196
197    async fn nack_outbox(
198        &self,
199        outbox_id: &str,
200        claim_token: &str,
201        error: &str,
202        retry_at: u64,
203        now: u64,
204    ) -> Result<OutboxNackOutcome, OutboxError>;
205
206    async fn list_outbox(
207        &self,
208        status: Option<OutboxStatus>,
209        limit: usize,
210    ) -> Result<Vec<OutboxMessage>, OutboxError>;
211}
212
213#[derive(Clone)]
214pub struct ScopedOutboxStore {
215    inner: Arc<dyn OutboxStore>,
216    scope_id: ScopeId,
217}
218
219impl ScopedOutboxStore {
220    pub fn new(inner: Arc<dyn OutboxStore>, scope_id: ScopeId) -> Self {
221        Self { inner, scope_id }
222    }
223
224    pub fn scope_id(&self) -> &ScopeId {
225        &self.scope_id
226    }
227
228    pub fn inner(&self) -> &dyn OutboxStore {
229        self.inner.as_ref()
230    }
231
232    fn scoped(&self, value: &str) -> String {
233        scoped_key(&self.scope_id, value)
234    }
235
236    fn unscoped<'a>(&self, value: &'a str) -> Option<&'a str> {
237        unscoped_key(&self.scope_id, value)
238    }
239
240    fn encode_draft(&self, mut draft: OutboxMessageDraft) -> OutboxMessageDraft {
241        draft.lane = self.scoped(&draft.lane);
242        draft.dedupe_key = draft.dedupe_key.as_deref().map(|key| self.scoped(key));
243        draft
244    }
245
246    fn decode_message(&self, mut message: OutboxMessage) -> Option<OutboxMessage> {
247        message.lane = self.unscoped(&message.lane)?.to_string();
248        message.dedupe_key = message
249            .dedupe_key
250            .as_deref()
251            .map(|key| self.unscoped(key).map(str::to_string))
252            .unwrap_or(None);
253        Some(message)
254    }
255}
256
257#[async_trait]
258impl OutboxStore for ScopedOutboxStore {
259    async fn enqueue_outbox(
260        &self,
261        draft: OutboxMessageDraft,
262    ) -> Result<OutboxEnqueueResult, OutboxError> {
263        let result = self.inner.enqueue_outbox(self.encode_draft(draft)).await?;
264        let message = self.decode_message(result.message).ok_or_else(|| {
265            OutboxError::Io("scoped outbox store returned a message outside its scope".into())
266        })?;
267        Ok(OutboxEnqueueResult { message })
268    }
269
270    async fn claim_outbox(
271        &self,
272        lane: &str,
273        target: &str,
274        limit: usize,
275        lease_ms: u64,
276        consumer_id: &str,
277        now: u64,
278    ) -> Result<Vec<OutboxMessage>, OutboxError> {
279        Ok(self
280            .inner
281            .claim_outbox(
282                &self.scoped(lane),
283                target,
284                limit,
285                lease_ms,
286                consumer_id,
287                now,
288            )
289            .await?
290            .into_iter()
291            .filter_map(|message| self.decode_message(message))
292            .collect())
293    }
294
295    async fn ack_outbox(
296        &self,
297        outbox_id: &str,
298        claim_token: &str,
299        now: u64,
300    ) -> Result<bool, OutboxError> {
301        let Some(message) = self
302            .list_outbox(Some(OutboxStatus::Claimed), usize::MAX)
303            .await?
304            .into_iter()
305            .find(|message| message.outbox_id == outbox_id)
306        else {
307            return Ok(false);
308        };
309        self.inner
310            .ack_outbox(&message.outbox_id, claim_token, now)
311            .await
312    }
313
314    async fn nack_outbox(
315        &self,
316        outbox_id: &str,
317        claim_token: &str,
318        error: &str,
319        retry_at: u64,
320        now: u64,
321    ) -> Result<OutboxNackOutcome, OutboxError> {
322        let Some(message) = self
323            .list_outbox(Some(OutboxStatus::Claimed), usize::MAX)
324            .await?
325            .into_iter()
326            .find(|message| message.outbox_id == outbox_id)
327        else {
328            return Ok(OutboxNackOutcome::LostClaim);
329        };
330        self.inner
331            .nack_outbox(&message.outbox_id, claim_token, error, retry_at, now)
332            .await
333    }
334
335    async fn list_outbox(
336        &self,
337        status: Option<OutboxStatus>,
338        limit: usize,
339    ) -> Result<Vec<OutboxMessage>, OutboxError> {
340        Ok(self
341            .inner
342            .list_outbox(status, usize::MAX)
343            .await?
344            .into_iter()
345            .filter_map(|message| self.decode_message(message))
346            .take(limit)
347            .collect())
348    }
349}
350
351#[cfg(test)]
352mod tests {
353    use super::*;
354
355    #[test]
356    fn draft_rejects_blank_lane() {
357        let err = OutboxMessageDraft::new(" ", "target", serde_json::json!({})).unwrap_err();
358        assert!(matches!(err, OutboxError::Validation(message) if message.contains("lane")));
359    }
360
361    #[test]
362    fn message_from_enqueue_initializes_pending_delivery_state() {
363        let draft =
364            OutboxMessageDraft::new("canonical", "projector", serde_json::json!({})).unwrap();
365        let message = OutboxMessage::from_enqueue("out_1".into(), draft, 42).unwrap();
366        assert_eq!(message.status, OutboxStatus::Pending);
367        assert_eq!(message.attempt_count, 0);
368        assert_eq!(message.created_at, 42);
369        assert!(message.claim_token.is_none());
370    }
371}