outbox_pattern_processor/
outbox.rs

1use crate::http_destination::HttpDestination;
2use crate::outbox_destination::OutboxDestination;
3use crate::sns_destination::SnsDestination;
4use crate::sqs_destination::SqsDestination;
5use serde_json::Value;
6use sqlx::types::chrono::{DateTime, Utc};
7use sqlx::types::Json;
8use sqlx::FromRow;
9use std::collections::HashMap;
10use uuid::Uuid;
11
12#[derive(Debug, FromRow, Clone, PartialEq)]
13pub struct Outbox {
14    pub idempotent_key: Uuid,
15    pub partition_key: Uuid,
16    pub destinations: Json<Vec<OutboxDestination>>,
17    pub headers: Option<Json<HashMap<String, String>>>,
18    pub payload: String,
19    pub attempts: i32,
20    pub created_at: DateTime<Utc>,
21    pub process_after: Option<DateTime<Utc>>,
22    pub processed_at: Option<DateTime<Utc>>,
23}
24
25impl Outbox {
26    pub fn delay(
27        &self,
28        process_after: DateTime<Utc>,
29    ) -> Self {
30        Outbox {
31            idempotent_key: self.idempotent_key,
32            partition_key: self.partition_key,
33            destinations: self.destinations.clone(),
34            headers: self.headers.clone(),
35            payload: self.payload.clone(),
36            attempts: self.attempts,
37            created_at: self.created_at,
38            process_after: Some(process_after),
39            processed_at: self.processed_at,
40        }
41    }
42
43    pub fn http_post_json(
44        partition_key: Uuid,
45        url: &str,
46        headers: Option<HashMap<String, String>>,
47        payload: &Value,
48    ) -> Self {
49        Self::http(partition_key, url, headers, vec![], &payload.to_string(), None)
50    }
51
52    pub fn http_put_json(
53        partition_key: Uuid,
54        url: &str,
55        headers: Option<HashMap<String, String>>,
56        payload: &Value,
57    ) -> Self {
58        Self::http(partition_key, url, headers, vec![], &payload.to_string(), Some("put".to_string()))
59    }
60
61    pub fn http_patch_json(
62        partition_key: Uuid,
63        url: &str,
64        headers: Option<HashMap<String, String>>,
65        payload: &Value,
66    ) -> Self {
67        Self::http(partition_key, url, headers, vec![], &payload.to_string(), Some("patch".to_string()))
68    }
69
70    pub fn sqs(
71        partition_key: Uuid,
72        queue_url: &str,
73        headers: Option<HashMap<String, String>>,
74        payload: &str,
75    ) -> Self {
76        let destinations = vec![OutboxDestination::SqsDestination(SqsDestination { queue_url: queue_url.to_string() })];
77
78        Self::new(partition_key, destinations, headers, payload)
79    }
80
81    pub fn sns(
82        partition_key: Uuid,
83        topic_arn: &str,
84        headers: Option<HashMap<String, String>>,
85        payload: &str,
86    ) -> Self {
87        let destinations = vec![OutboxDestination::SnsDestination(SnsDestination { topic_arn: topic_arn.to_string() })];
88
89        Self::new(partition_key, destinations, headers, payload)
90    }
91
92    pub fn http_and_sns(
93        partition_key: Uuid,
94        url: &str,
95        topic_arn: &str,
96        headers: Option<HashMap<String, String>>,
97        payload: &Value,
98    ) -> Self {
99        let destinations = vec![OutboxDestination::SnsDestination(SnsDestination { topic_arn: topic_arn.to_string() })];
100        Self::http(partition_key, url, headers, destinations, &payload.to_string(), None)
101    }
102
103    pub fn http_and_sqs(
104        partition_key: Uuid,
105        url: &str,
106        queue_url: &str,
107        headers: Option<HashMap<String, String>>,
108        payload: &Value,
109    ) -> Self {
110        let destinations = vec![OutboxDestination::SqsDestination(SqsDestination { queue_url: queue_url.to_string() })];
111        Self::http(partition_key, url, headers, destinations, &payload.to_string(), None)
112    }
113
114    fn http(
115        partition_key: Uuid,
116        url: &str,
117        headers: Option<HashMap<String, String>>,
118        destinations: Vec<OutboxDestination>,
119        payload: &str,
120        method: Option<String>,
121    ) -> Self {
122        let mut extended_headers = headers.unwrap_or_default();
123        extended_headers.extend(HashMap::from([("Content-Type".to_string(), "application/json".to_string())]));
124
125        let mut all_destinations = vec![];
126        all_destinations.extend(destinations);
127        all_destinations.push(OutboxDestination::HttpDestination(HttpDestination {
128            url: url.to_string(),
129            headers: Some(extended_headers),
130            method,
131        }));
132
133        Self::new(partition_key, all_destinations, None, payload)
134    }
135
136    pub fn new(
137        partition_key: Uuid,
138        destinations: Vec<OutboxDestination>,
139        headers: Option<HashMap<String, String>>,
140        payload: &str,
141    ) -> Self {
142        Outbox {
143            idempotent_key: Uuid::now_v7(),
144            partition_key,
145            destinations: Json(destinations),
146            headers: headers.map(Json),
147            payload: payload.to_string(),
148            attempts: 0,
149            created_at: Utc::now(),
150            process_after: None,
151            processed_at: None,
152        }
153    }
154}