outbox_pattern_processor/
outbox.rs1use crate::http_destination::HttpDestination;
2use crate::outbox_destination::OutboxDestination;
3use crate::sns_destination::SnsDestination;
4use crate::sqs_destination::SqsDestination;
5use serde_json::Value;
6use sqlx::types::chrono::{DateTime, Utc};
7use sqlx::types::Json;
8use sqlx::FromRow;
9use std::collections::HashMap;
10use uuid::Uuid;
11
12#[derive(Debug, FromRow, Clone, PartialEq)]
13pub struct Outbox {
14 pub idempotent_key: Uuid,
15 pub partition_key: Uuid,
16 pub destinations: Json<Vec<OutboxDestination>>,
17 pub headers: Option<Json<HashMap<String, String>>>,
18 pub payload: String,
19 pub attempts: i32,
20 pub created_at: DateTime<Utc>,
21 pub process_after: Option<DateTime<Utc>>,
22 pub processed_at: Option<DateTime<Utc>>,
23}
24
25impl Outbox {
26 pub fn delay(
27 &self,
28 process_after: DateTime<Utc>,
29 ) -> Self {
30 Outbox {
31 idempotent_key: self.idempotent_key,
32 partition_key: self.partition_key,
33 destinations: self.destinations.clone(),
34 headers: self.headers.clone(),
35 payload: self.payload.clone(),
36 attempts: self.attempts,
37 created_at: self.created_at,
38 process_after: Some(process_after),
39 processed_at: self.processed_at,
40 }
41 }
42
43 pub fn http_post_json(
44 partition_key: Uuid,
45 url: &str,
46 headers: Option<HashMap<String, String>>,
47 payload: &Value,
48 ) -> Self {
49 Self::http(partition_key, url, headers, vec![], &payload.to_string(), None)
50 }
51
52 pub fn http_put_json(
53 partition_key: Uuid,
54 url: &str,
55 headers: Option<HashMap<String, String>>,
56 payload: &Value,
57 ) -> Self {
58 Self::http(partition_key, url, headers, vec![], &payload.to_string(), Some("put".to_string()))
59 }
60
61 pub fn http_patch_json(
62 partition_key: Uuid,
63 url: &str,
64 headers: Option<HashMap<String, String>>,
65 payload: &Value,
66 ) -> Self {
67 Self::http(partition_key, url, headers, vec![], &payload.to_string(), Some("patch".to_string()))
68 }
69
70 pub fn sqs(
71 partition_key: Uuid,
72 queue_url: &str,
73 headers: Option<HashMap<String, String>>,
74 payload: &str,
75 ) -> Self {
76 let destinations = vec![OutboxDestination::SqsDestination(SqsDestination { queue_url: queue_url.to_string() })];
77
78 Self::new(partition_key, destinations, headers, payload)
79 }
80
81 pub fn sns(
82 partition_key: Uuid,
83 topic_arn: &str,
84 headers: Option<HashMap<String, String>>,
85 payload: &str,
86 ) -> Self {
87 let destinations = vec![OutboxDestination::SnsDestination(SnsDestination { topic_arn: topic_arn.to_string() })];
88
89 Self::new(partition_key, destinations, headers, payload)
90 }
91
92 pub fn http_and_sns(
93 partition_key: Uuid,
94 url: &str,
95 topic_arn: &str,
96 headers: Option<HashMap<String, String>>,
97 payload: &Value,
98 ) -> Self {
99 let destinations = vec![OutboxDestination::SnsDestination(SnsDestination { topic_arn: topic_arn.to_string() })];
100 Self::http(partition_key, url, headers, destinations, &payload.to_string(), None)
101 }
102
103 pub fn http_and_sqs(
104 partition_key: Uuid,
105 url: &str,
106 queue_url: &str,
107 headers: Option<HashMap<String, String>>,
108 payload: &Value,
109 ) -> Self {
110 let destinations = vec![OutboxDestination::SqsDestination(SqsDestination { queue_url: queue_url.to_string() })];
111 Self::http(partition_key, url, headers, destinations, &payload.to_string(), None)
112 }
113
114 fn http(
115 partition_key: Uuid,
116 url: &str,
117 headers: Option<HashMap<String, String>>,
118 destinations: Vec<OutboxDestination>,
119 payload: &str,
120 method: Option<String>,
121 ) -> Self {
122 let mut extended_headers = headers.unwrap_or_default();
123 extended_headers.extend(HashMap::from([("Content-Type".to_string(), "application/json".to_string())]));
124
125 let mut all_destinations = vec![];
126 all_destinations.extend(destinations);
127 all_destinations.push(OutboxDestination::HttpDestination(HttpDestination {
128 url: url.to_string(),
129 headers: Some(extended_headers),
130 method,
131 }));
132
133 Self::new(partition_key, all_destinations, None, payload)
134 }
135
136 pub fn new(
137 partition_key: Uuid,
138 destinations: Vec<OutboxDestination>,
139 headers: Option<HashMap<String, String>>,
140 payload: &str,
141 ) -> Self {
142 Outbox {
143 idempotent_key: Uuid::now_v7(),
144 partition_key,
145 destinations: Json(destinations),
146 headers: headers.map(Json),
147 payload: payload.to_string(),
148 attempts: 0,
149 created_at: Utc::now(),
150 process_after: None,
151 processed_at: None,
152 }
153 }
154}