1use std::collections::HashMap;
2use std::sync::Arc;
3
4pub struct DeliveryBus {
10 sqs_sender: Option<Arc<dyn SqsDelivery>>,
12 sns_sender: Option<Arc<dyn SnsDelivery>>,
14 eventbridge_sender: Option<Arc<dyn EventBridgeDelivery>>,
16 lambda_invoker: Option<Arc<dyn LambdaDelivery>>,
18 kinesis_sender: Option<Arc<dyn KinesisDelivery>>,
20 stepfunctions_starter: Option<Arc<dyn StepFunctionsDelivery>>,
22}
23
24#[derive(Debug, Clone)]
26pub struct SqsMessageAttribute {
27 pub data_type: String,
28 pub string_value: Option<String>,
29 pub binary_value: Option<String>,
30}
31
32#[derive(Debug, Clone, PartialEq, Eq)]
37pub enum SqsDeliveryError {
38 QueueNotFound(String),
40 InvalidArn(String),
42}
43
44impl std::fmt::Display for SqsDeliveryError {
45 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
46 match self {
47 Self::QueueNotFound(arn) => write!(f, "queue not found: {arn}"),
48 Self::InvalidArn(arn) => write!(f, "invalid queue ARN: {arn}"),
49 }
50 }
51}
52
53impl std::error::Error for SqsDeliveryError {}
54
55pub trait SqsDelivery: Send + Sync {
57 fn deliver_to_queue(
58 &self,
59 queue_arn: &str,
60 message_body: &str,
61 attributes: &HashMap<String, String>,
62 );
63
64 fn deliver_to_queue_with_attrs(
66 &self,
67 queue_arn: &str,
68 message_body: &str,
69 message_attributes: &HashMap<String, SqsMessageAttribute>,
70 message_group_id: Option<&str>,
71 message_dedup_id: Option<&str>,
72 ) {
73 let _ = (message_attributes, message_group_id, message_dedup_id);
75 self.deliver_to_queue(queue_arn, message_body, &HashMap::new());
76 }
77
78 fn try_deliver_to_queue_with_attrs(
84 &self,
85 queue_arn: &str,
86 message_body: &str,
87 message_attributes: &HashMap<String, SqsMessageAttribute>,
88 message_group_id: Option<&str>,
89 message_dedup_id: Option<&str>,
90 ) -> Result<(), SqsDeliveryError> {
91 self.deliver_to_queue_with_attrs(
92 queue_arn,
93 message_body,
94 message_attributes,
95 message_group_id,
96 message_dedup_id,
97 );
98 Ok(())
99 }
100}
101
102pub trait SnsDelivery: Send + Sync {
104 fn publish_to_topic(&self, topic_arn: &str, message: &str, subject: Option<&str>);
105}
106
107pub trait EventBridgeDelivery: Send + Sync {
109 fn put_event(&self, source: &str, detail_type: &str, detail: &str, event_bus_name: &str);
112}
113
114pub trait LambdaDelivery: Send + Sync {
116 fn invoke_lambda(
119 &self,
120 function_arn: &str,
121 payload: &str,
122 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<Vec<u8>, String>> + Send>>;
123}
124
125pub trait KinesisDelivery: Send + Sync {
127 fn put_record(&self, stream_arn: &str, data: &str, partition_key: &str);
130}
131
132pub trait StepFunctionsDelivery: Send + Sync {
134 fn start_execution(&self, state_machine_arn: &str, input: &str);
137}
138
139impl DeliveryBus {
140 pub fn new() -> Self {
141 Self {
142 sqs_sender: None,
143 sns_sender: None,
144 eventbridge_sender: None,
145 lambda_invoker: None,
146 kinesis_sender: None,
147 stepfunctions_starter: None,
148 }
149 }
150
151 pub fn with_sqs(mut self, sender: Arc<dyn SqsDelivery>) -> Self {
152 self.sqs_sender = Some(sender);
153 self
154 }
155
156 pub fn with_sns(mut self, sender: Arc<dyn SnsDelivery>) -> Self {
157 self.sns_sender = Some(sender);
158 self
159 }
160
161 pub fn with_eventbridge(mut self, sender: Arc<dyn EventBridgeDelivery>) -> Self {
162 self.eventbridge_sender = Some(sender);
163 self
164 }
165
166 pub fn with_lambda(mut self, invoker: Arc<dyn LambdaDelivery>) -> Self {
167 self.lambda_invoker = Some(invoker);
168 self
169 }
170
171 pub fn with_kinesis(mut self, sender: Arc<dyn KinesisDelivery>) -> Self {
172 self.kinesis_sender = Some(sender);
173 self
174 }
175
176 pub fn with_stepfunctions(mut self, starter: Arc<dyn StepFunctionsDelivery>) -> Self {
177 self.stepfunctions_starter = Some(starter);
178 self
179 }
180
181 pub fn send_to_sqs(
183 &self,
184 queue_arn: &str,
185 message_body: &str,
186 attributes: &HashMap<String, String>,
187 ) {
188 if let Some(ref sender) = self.sqs_sender {
189 sender.deliver_to_queue(queue_arn, message_body, attributes);
190 }
191 }
192
193 pub fn send_to_sqs_with_attrs(
195 &self,
196 queue_arn: &str,
197 message_body: &str,
198 message_attributes: &HashMap<String, SqsMessageAttribute>,
199 message_group_id: Option<&str>,
200 message_dedup_id: Option<&str>,
201 ) {
202 if let Some(ref sender) = self.sqs_sender {
203 sender.deliver_to_queue_with_attrs(
204 queue_arn,
205 message_body,
206 message_attributes,
207 message_group_id,
208 message_dedup_id,
209 );
210 }
211 }
212
213 pub fn try_send_to_sqs_with_attrs(
218 &self,
219 queue_arn: &str,
220 message_body: &str,
221 message_attributes: &HashMap<String, SqsMessageAttribute>,
222 message_group_id: Option<&str>,
223 message_dedup_id: Option<&str>,
224 ) -> Result<(), SqsDeliveryError> {
225 match self.sqs_sender {
226 Some(ref sender) => sender.try_deliver_to_queue_with_attrs(
227 queue_arn,
228 message_body,
229 message_attributes,
230 message_group_id,
231 message_dedup_id,
232 ),
233 None => Err(SqsDeliveryError::QueueNotFound(queue_arn.to_string())),
234 }
235 }
236
237 pub fn publish_to_sns(&self, topic_arn: &str, message: &str, subject: Option<&str>) {
239 if let Some(ref sender) = self.sns_sender {
240 sender.publish_to_topic(topic_arn, message, subject);
241 }
242 }
243
244 pub fn put_event_to_eventbridge(
246 &self,
247 source: &str,
248 detail_type: &str,
249 detail: &str,
250 event_bus_name: &str,
251 ) {
252 if let Some(ref sender) = self.eventbridge_sender {
253 sender.put_event(source, detail_type, detail, event_bus_name);
254 }
255 }
256
257 pub async fn invoke_lambda(
259 &self,
260 function_arn: &str,
261 payload: &str,
262 ) -> Option<Result<Vec<u8>, String>> {
263 if let Some(ref invoker) = self.lambda_invoker {
264 Some(invoker.invoke_lambda(function_arn, payload).await)
265 } else {
266 None
267 }
268 }
269
270 pub fn send_to_kinesis(&self, stream_arn: &str, data: &str, partition_key: &str) {
272 if let Some(ref sender) = self.kinesis_sender {
273 sender.put_record(stream_arn, data, partition_key);
274 }
275 }
276
277 pub fn start_stepfunctions_execution(&self, state_machine_arn: &str, input: &str) {
279 if let Some(ref starter) = self.stepfunctions_starter {
280 starter.start_execution(state_machine_arn, input);
281 }
282 }
283}
284
285impl Default for DeliveryBus {
286 fn default() -> Self {
287 Self::new()
288 }
289}
290
291#[cfg(test)]
292mod tests {
293 use super::*;
294 use std::sync::atomic::{AtomicUsize, Ordering};
295 use std::sync::Arc;
296
297 struct MockSqs {
299 call_count: AtomicUsize,
300 }
301 impl SqsDelivery for MockSqs {
302 fn deliver_to_queue(
303 &self,
304 _queue_arn: &str,
305 _message_body: &str,
306 _attributes: &HashMap<String, String>,
307 ) {
308 self.call_count.fetch_add(1, Ordering::SeqCst);
309 }
310 }
311
312 struct MockSns {
313 call_count: AtomicUsize,
314 }
315 impl SnsDelivery for MockSns {
316 fn publish_to_topic(&self, _topic_arn: &str, _message: &str, _subject: Option<&str>) {
317 self.call_count.fetch_add(1, Ordering::SeqCst);
318 }
319 }
320
321 struct MockEventBridge {
322 call_count: AtomicUsize,
323 }
324 impl EventBridgeDelivery for MockEventBridge {
325 fn put_event(
326 &self,
327 _source: &str,
328 _detail_type: &str,
329 _detail: &str,
330 _event_bus_name: &str,
331 ) {
332 self.call_count.fetch_add(1, Ordering::SeqCst);
333 }
334 }
335
336 struct MockKinesis {
337 call_count: AtomicUsize,
338 }
339 impl KinesisDelivery for MockKinesis {
340 fn put_record(&self, _stream_arn: &str, _data: &str, _partition_key: &str) {
341 self.call_count.fetch_add(1, Ordering::SeqCst);
342 }
343 }
344
345 struct MockStepFunctions {
346 call_count: AtomicUsize,
347 }
348 impl StepFunctionsDelivery for MockStepFunctions {
349 fn start_execution(&self, _state_machine_arn: &str, _input: &str) {
350 self.call_count.fetch_add(1, Ordering::SeqCst);
351 }
352 }
353
354 #[test]
355 fn delivery_bus_new_has_no_senders() {
356 let bus = DeliveryBus::new();
357 bus.send_to_sqs("arn:queue", "body", &HashMap::new());
359 bus.publish_to_sns("arn:topic", "msg", None);
360 bus.put_event_to_eventbridge("src", "type", "{}", "default");
361 bus.send_to_kinesis("arn:stream", "data", "pk");
362 bus.start_stepfunctions_execution("arn:sfn", "{}");
363 }
365
366 #[test]
367 fn delivery_bus_default_is_same_as_new() {
368 let bus = DeliveryBus::default();
369 bus.send_to_sqs("arn:q", "b", &HashMap::new());
370 }
371
372 #[test]
373 fn send_to_sqs_calls_sender() {
374 let mock = Arc::new(MockSqs {
375 call_count: AtomicUsize::new(0),
376 });
377 let bus = DeliveryBus::new().with_sqs(mock.clone());
378
379 bus.send_to_sqs("arn:queue", "msg", &HashMap::new());
380 assert_eq!(mock.call_count.load(Ordering::SeqCst), 1);
381
382 bus.send_to_sqs("arn:queue2", "msg2", &HashMap::new());
383 assert_eq!(mock.call_count.load(Ordering::SeqCst), 2);
384 }
385
386 #[test]
387 fn send_to_sqs_with_attrs_calls_sender() {
388 let mock = Arc::new(MockSqs {
389 call_count: AtomicUsize::new(0),
390 });
391 let bus = DeliveryBus::new().with_sqs(mock.clone());
392
393 let mut attrs = HashMap::new();
394 attrs.insert(
395 "key".to_string(),
396 SqsMessageAttribute {
397 data_type: "String".to_string(),
398 string_value: Some("val".to_string()),
399 binary_value: None,
400 },
401 );
402 bus.send_to_sqs_with_attrs("arn:q", "body", &attrs, Some("group"), Some("dedup"));
403 assert_eq!(mock.call_count.load(Ordering::SeqCst), 1);
404 }
405
406 #[test]
407 fn publish_to_sns_calls_sender() {
408 let mock = Arc::new(MockSns {
409 call_count: AtomicUsize::new(0),
410 });
411 let bus = DeliveryBus::new().with_sns(mock.clone());
412
413 bus.publish_to_sns("arn:topic", "message", Some("subject"));
414 assert_eq!(mock.call_count.load(Ordering::SeqCst), 1);
415 }
416
417 #[test]
418 fn put_event_to_eventbridge_calls_sender() {
419 let mock = Arc::new(MockEventBridge {
420 call_count: AtomicUsize::new(0),
421 });
422 let bus = DeliveryBus::new().with_eventbridge(mock.clone());
423
424 bus.put_event_to_eventbridge("aws.s3", "Object Created", "{}", "default");
425 assert_eq!(mock.call_count.load(Ordering::SeqCst), 1);
426 }
427
428 #[test]
429 fn send_to_kinesis_calls_sender() {
430 let mock = Arc::new(MockKinesis {
431 call_count: AtomicUsize::new(0),
432 });
433 let bus = DeliveryBus::new().with_kinesis(mock.clone());
434
435 bus.send_to_kinesis("arn:stream", "data", "partition-key");
436 assert_eq!(mock.call_count.load(Ordering::SeqCst), 1);
437 }
438
439 #[test]
440 fn start_stepfunctions_calls_sender() {
441 let mock = Arc::new(MockStepFunctions {
442 call_count: AtomicUsize::new(0),
443 });
444 let bus = DeliveryBus::new().with_stepfunctions(mock.clone());
445
446 bus.start_stepfunctions_execution("arn:sfn:machine", r#"{"key":"val"}"#);
447 assert_eq!(mock.call_count.load(Ordering::SeqCst), 1);
448 }
449
450 #[test]
451 fn builder_chaining_works() {
452 let sqs = Arc::new(MockSqs {
453 call_count: AtomicUsize::new(0),
454 });
455 let sns = Arc::new(MockSns {
456 call_count: AtomicUsize::new(0),
457 });
458 let eb = Arc::new(MockEventBridge {
459 call_count: AtomicUsize::new(0),
460 });
461 let kin = Arc::new(MockKinesis {
462 call_count: AtomicUsize::new(0),
463 });
464 let sfn = Arc::new(MockStepFunctions {
465 call_count: AtomicUsize::new(0),
466 });
467
468 let bus = DeliveryBus::new()
469 .with_sqs(sqs.clone())
470 .with_sns(sns.clone())
471 .with_eventbridge(eb.clone())
472 .with_kinesis(kin.clone())
473 .with_stepfunctions(sfn.clone());
474
475 bus.send_to_sqs("q", "m", &HashMap::new());
476 bus.publish_to_sns("t", "m", None);
477 bus.put_event_to_eventbridge("s", "d", "{}", "b");
478 bus.send_to_kinesis("s", "d", "k");
479 bus.start_stepfunctions_execution("sm", "{}");
480
481 assert_eq!(sqs.call_count.load(Ordering::SeqCst), 1);
482 assert_eq!(sns.call_count.load(Ordering::SeqCst), 1);
483 assert_eq!(eb.call_count.load(Ordering::SeqCst), 1);
484 assert_eq!(kin.call_count.load(Ordering::SeqCst), 1);
485 assert_eq!(sfn.call_count.load(Ordering::SeqCst), 1);
486 }
487
488 #[tokio::test]
489 async fn invoke_lambda_returns_none_without_invoker() {
490 let bus = DeliveryBus::new();
491 let result = bus.invoke_lambda("arn:lambda", "{}").await;
492 assert!(result.is_none());
493 }
494}