1use std::collections::HashMap;
2use std::sync::Arc;
3
4pub struct DeliveryBus {
10 sqs_sender: Option<Arc<dyn SqsDelivery>>,
12 sns_sender: Option<Arc<dyn SnsDelivery>>,
14 eventbridge_sender: Option<Arc<dyn EventBridgeDelivery>>,
16 lambda_invoker: Option<Arc<dyn LambdaDelivery>>,
18 kinesis_sender: Option<Arc<dyn KinesisDelivery>>,
20 stepfunctions_starter: Option<Arc<dyn StepFunctionsDelivery>>,
22}
23
24#[derive(Debug, Clone)]
26pub struct SqsMessageAttribute {
27 pub data_type: String,
28 pub string_value: Option<String>,
29 pub binary_value: Option<String>,
30}
31
32pub trait SqsDelivery: Send + Sync {
34 fn deliver_to_queue(
35 &self,
36 queue_arn: &str,
37 message_body: &str,
38 attributes: &HashMap<String, String>,
39 );
40
41 fn deliver_to_queue_with_attrs(
43 &self,
44 queue_arn: &str,
45 message_body: &str,
46 message_attributes: &HashMap<String, SqsMessageAttribute>,
47 message_group_id: Option<&str>,
48 message_dedup_id: Option<&str>,
49 ) {
50 let _ = (message_attributes, message_group_id, message_dedup_id);
52 self.deliver_to_queue(queue_arn, message_body, &HashMap::new());
53 }
54}
55
56pub trait SnsDelivery: Send + Sync {
58 fn publish_to_topic(&self, topic_arn: &str, message: &str, subject: Option<&str>);
59}
60
61pub trait EventBridgeDelivery: Send + Sync {
63 fn put_event(&self, source: &str, detail_type: &str, detail: &str, event_bus_name: &str);
66}
67
68pub trait LambdaDelivery: Send + Sync {
70 fn invoke_lambda(
73 &self,
74 function_arn: &str,
75 payload: &str,
76 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<Vec<u8>, String>> + Send>>;
77}
78
79pub trait KinesisDelivery: Send + Sync {
81 fn put_record(&self, stream_arn: &str, data: &str, partition_key: &str);
84}
85
86pub trait StepFunctionsDelivery: Send + Sync {
88 fn start_execution(&self, state_machine_arn: &str, input: &str);
91}
92
93impl DeliveryBus {
94 pub fn new() -> Self {
95 Self {
96 sqs_sender: None,
97 sns_sender: None,
98 eventbridge_sender: None,
99 lambda_invoker: None,
100 kinesis_sender: None,
101 stepfunctions_starter: None,
102 }
103 }
104
105 pub fn with_sqs(mut self, sender: Arc<dyn SqsDelivery>) -> Self {
106 self.sqs_sender = Some(sender);
107 self
108 }
109
110 pub fn with_sns(mut self, sender: Arc<dyn SnsDelivery>) -> Self {
111 self.sns_sender = Some(sender);
112 self
113 }
114
115 pub fn with_eventbridge(mut self, sender: Arc<dyn EventBridgeDelivery>) -> Self {
116 self.eventbridge_sender = Some(sender);
117 self
118 }
119
120 pub fn with_lambda(mut self, invoker: Arc<dyn LambdaDelivery>) -> Self {
121 self.lambda_invoker = Some(invoker);
122 self
123 }
124
125 pub fn with_kinesis(mut self, sender: Arc<dyn KinesisDelivery>) -> Self {
126 self.kinesis_sender = Some(sender);
127 self
128 }
129
130 pub fn with_stepfunctions(mut self, starter: Arc<dyn StepFunctionsDelivery>) -> Self {
131 self.stepfunctions_starter = Some(starter);
132 self
133 }
134
135 pub fn send_to_sqs(
137 &self,
138 queue_arn: &str,
139 message_body: &str,
140 attributes: &HashMap<String, String>,
141 ) {
142 if let Some(ref sender) = self.sqs_sender {
143 sender.deliver_to_queue(queue_arn, message_body, attributes);
144 }
145 }
146
147 pub fn send_to_sqs_with_attrs(
149 &self,
150 queue_arn: &str,
151 message_body: &str,
152 message_attributes: &HashMap<String, SqsMessageAttribute>,
153 message_group_id: Option<&str>,
154 message_dedup_id: Option<&str>,
155 ) {
156 if let Some(ref sender) = self.sqs_sender {
157 sender.deliver_to_queue_with_attrs(
158 queue_arn,
159 message_body,
160 message_attributes,
161 message_group_id,
162 message_dedup_id,
163 );
164 }
165 }
166
167 pub fn publish_to_sns(&self, topic_arn: &str, message: &str, subject: Option<&str>) {
169 if let Some(ref sender) = self.sns_sender {
170 sender.publish_to_topic(topic_arn, message, subject);
171 }
172 }
173
174 pub fn put_event_to_eventbridge(
176 &self,
177 source: &str,
178 detail_type: &str,
179 detail: &str,
180 event_bus_name: &str,
181 ) {
182 if let Some(ref sender) = self.eventbridge_sender {
183 sender.put_event(source, detail_type, detail, event_bus_name);
184 }
185 }
186
187 pub async fn invoke_lambda(
189 &self,
190 function_arn: &str,
191 payload: &str,
192 ) -> Option<Result<Vec<u8>, String>> {
193 if let Some(ref invoker) = self.lambda_invoker {
194 Some(invoker.invoke_lambda(function_arn, payload).await)
195 } else {
196 None
197 }
198 }
199
200 pub fn send_to_kinesis(&self, stream_arn: &str, data: &str, partition_key: &str) {
202 if let Some(ref sender) = self.kinesis_sender {
203 sender.put_record(stream_arn, data, partition_key);
204 }
205 }
206
207 pub fn start_stepfunctions_execution(&self, state_machine_arn: &str, input: &str) {
209 if let Some(ref starter) = self.stepfunctions_starter {
210 starter.start_execution(state_machine_arn, input);
211 }
212 }
213}
214
215impl Default for DeliveryBus {
216 fn default() -> Self {
217 Self::new()
218 }
219}