1use std::collections::HashMap;
2use std::sync::Arc;
3
4pub struct DeliveryBus {
10 sqs_sender: Option<Arc<dyn SqsDelivery>>,
12 sns_sender: Option<Arc<dyn SnsDelivery>>,
14 eventbridge_sender: Option<Arc<dyn EventBridgeDelivery>>,
16 lambda_invoker: Option<Arc<dyn LambdaDelivery>>,
18 kinesis_sender: Option<Arc<dyn KinesisDelivery>>,
20 stepfunctions_starter: Option<Arc<dyn StepFunctionsDelivery>>,
22}
23
24#[derive(Debug, Clone)]
26pub struct SqsMessageAttribute {
27 pub data_type: String,
28 pub string_value: Option<String>,
29 pub binary_value: Option<String>,
30}
31
32#[derive(Debug, Clone, PartialEq, Eq)]
37pub enum SqsDeliveryError {
38 QueueNotFound(String),
40 InvalidArn(String),
42}
43
44impl std::fmt::Display for SqsDeliveryError {
45 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
46 match self {
47 Self::QueueNotFound(arn) => write!(f, "queue not found: {arn}"),
48 Self::InvalidArn(arn) => write!(f, "invalid queue ARN: {arn}"),
49 }
50 }
51}
52
53impl std::error::Error for SqsDeliveryError {}
54
55pub trait SqsDelivery: Send + Sync {
57 fn deliver_to_queue(
58 &self,
59 queue_arn: &str,
60 message_body: &str,
61 attributes: &HashMap<String, String>,
62 );
63
64 fn deliver_to_queue_with_attrs(
66 &self,
67 queue_arn: &str,
68 message_body: &str,
69 message_attributes: &HashMap<String, SqsMessageAttribute>,
70 message_group_id: Option<&str>,
71 message_dedup_id: Option<&str>,
72 ) {
73 let _ = (message_attributes, message_group_id, message_dedup_id);
75 self.deliver_to_queue(queue_arn, message_body, &HashMap::new());
76 }
77
78 fn try_deliver_to_queue_with_attrs(
84 &self,
85 queue_arn: &str,
86 message_body: &str,
87 message_attributes: &HashMap<String, SqsMessageAttribute>,
88 message_group_id: Option<&str>,
89 message_dedup_id: Option<&str>,
90 ) -> Result<(), SqsDeliveryError> {
91 self.deliver_to_queue_with_attrs(
92 queue_arn,
93 message_body,
94 message_attributes,
95 message_group_id,
96 message_dedup_id,
97 );
98 Ok(())
99 }
100}
101
102pub trait SnsDelivery: Send + Sync {
104 fn publish_to_topic(&self, topic_arn: &str, message: &str, subject: Option<&str>);
105}
106
107pub trait EventBridgeDelivery: Send + Sync {
109 fn put_event(&self, source: &str, detail_type: &str, detail: &str, event_bus_name: &str);
112}
113
114pub trait LambdaDelivery: Send + Sync {
116 fn invoke_lambda(
119 &self,
120 function_arn: &str,
121 payload: &str,
122 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<Vec<u8>, String>> + Send>>;
123}
124
125pub trait KinesisDelivery: Send + Sync {
127 fn put_record(&self, stream_arn: &str, data: &str, partition_key: &str);
130}
131
132pub trait StepFunctionsDelivery: Send + Sync {
134 fn start_execution(&self, state_machine_arn: &str, input: &str);
137}
138
139pub trait EmailDispatcher: Send + Sync {
143 fn send_email(
144 &self,
145 account_id: &str,
146 from: &str,
147 to: &str,
148 subject: &str,
149 body_text: &str,
150 body_html: Option<&str>,
151 );
152}
153
154pub trait SmsDispatcher: Send + Sync {
157 fn send_sms(&self, account_id: &str, phone_number: &str, message: &str);
158}
159
160pub trait KmsHook: Send + Sync {
170 fn encrypt(
171 &self,
172 account_id: &str,
173 region: &str,
174 key_id: &str,
175 plaintext: &[u8],
176 service_principal: &str,
177 encryption_context: std::collections::HashMap<String, String>,
178 ) -> Result<String, String>;
179
180 fn decrypt(
181 &self,
182 account_id: &str,
183 ciphertext_b64: &str,
184 service_principal: &str,
185 encryption_context: std::collections::HashMap<String, String>,
186 ) -> Result<Vec<u8>, String>;
187}
188
189impl DeliveryBus {
190 pub fn new() -> Self {
191 Self {
192 sqs_sender: None,
193 sns_sender: None,
194 eventbridge_sender: None,
195 lambda_invoker: None,
196 kinesis_sender: None,
197 stepfunctions_starter: None,
198 }
199 }
200
201 pub fn with_sqs(mut self, sender: Arc<dyn SqsDelivery>) -> Self {
202 self.sqs_sender = Some(sender);
203 self
204 }
205
206 pub fn with_sns(mut self, sender: Arc<dyn SnsDelivery>) -> Self {
207 self.sns_sender = Some(sender);
208 self
209 }
210
211 pub fn with_eventbridge(mut self, sender: Arc<dyn EventBridgeDelivery>) -> Self {
212 self.eventbridge_sender = Some(sender);
213 self
214 }
215
216 pub fn with_lambda(mut self, invoker: Arc<dyn LambdaDelivery>) -> Self {
217 self.lambda_invoker = Some(invoker);
218 self
219 }
220
221 pub fn with_kinesis(mut self, sender: Arc<dyn KinesisDelivery>) -> Self {
222 self.kinesis_sender = Some(sender);
223 self
224 }
225
226 pub fn with_stepfunctions(mut self, starter: Arc<dyn StepFunctionsDelivery>) -> Self {
227 self.stepfunctions_starter = Some(starter);
228 self
229 }
230
231 pub fn send_to_sqs(
233 &self,
234 queue_arn: &str,
235 message_body: &str,
236 attributes: &HashMap<String, String>,
237 ) {
238 if let Some(ref sender) = self.sqs_sender {
239 sender.deliver_to_queue(queue_arn, message_body, attributes);
240 }
241 }
242
243 pub fn send_to_sqs_with_attrs(
245 &self,
246 queue_arn: &str,
247 message_body: &str,
248 message_attributes: &HashMap<String, SqsMessageAttribute>,
249 message_group_id: Option<&str>,
250 message_dedup_id: Option<&str>,
251 ) {
252 if let Some(ref sender) = self.sqs_sender {
253 sender.deliver_to_queue_with_attrs(
254 queue_arn,
255 message_body,
256 message_attributes,
257 message_group_id,
258 message_dedup_id,
259 );
260 }
261 }
262
263 pub fn try_send_to_sqs_with_attrs(
268 &self,
269 queue_arn: &str,
270 message_body: &str,
271 message_attributes: &HashMap<String, SqsMessageAttribute>,
272 message_group_id: Option<&str>,
273 message_dedup_id: Option<&str>,
274 ) -> Result<(), SqsDeliveryError> {
275 match self.sqs_sender {
276 Some(ref sender) => sender.try_deliver_to_queue_with_attrs(
277 queue_arn,
278 message_body,
279 message_attributes,
280 message_group_id,
281 message_dedup_id,
282 ),
283 None => Err(SqsDeliveryError::QueueNotFound(queue_arn.to_string())),
284 }
285 }
286
287 pub fn publish_to_sns(&self, topic_arn: &str, message: &str, subject: Option<&str>) {
289 if let Some(ref sender) = self.sns_sender {
290 sender.publish_to_topic(topic_arn, message, subject);
291 }
292 }
293
294 pub fn put_event_to_eventbridge(
296 &self,
297 source: &str,
298 detail_type: &str,
299 detail: &str,
300 event_bus_name: &str,
301 ) {
302 if let Some(ref sender) = self.eventbridge_sender {
303 sender.put_event(source, detail_type, detail, event_bus_name);
304 }
305 }
306
307 pub async fn invoke_lambda(
309 &self,
310 function_arn: &str,
311 payload: &str,
312 ) -> Option<Result<Vec<u8>, String>> {
313 if let Some(ref invoker) = self.lambda_invoker {
314 Some(invoker.invoke_lambda(function_arn, payload).await)
315 } else {
316 None
317 }
318 }
319
320 pub fn send_to_kinesis(&self, stream_arn: &str, data: &str, partition_key: &str) {
322 if let Some(ref sender) = self.kinesis_sender {
323 sender.put_record(stream_arn, data, partition_key);
324 }
325 }
326
327 pub fn start_stepfunctions_execution(&self, state_machine_arn: &str, input: &str) {
329 if let Some(ref starter) = self.stepfunctions_starter {
330 starter.start_execution(state_machine_arn, input);
331 }
332 }
333}
334
335impl Default for DeliveryBus {
336 fn default() -> Self {
337 Self::new()
338 }
339}
340
341#[cfg(test)]
342mod tests {
343 use super::*;
344 use std::sync::atomic::{AtomicUsize, Ordering};
345 use std::sync::Arc;
346
347 struct MockSqs {
349 call_count: AtomicUsize,
350 }
351 impl SqsDelivery for MockSqs {
352 fn deliver_to_queue(
353 &self,
354 _queue_arn: &str,
355 _message_body: &str,
356 _attributes: &HashMap<String, String>,
357 ) {
358 self.call_count.fetch_add(1, Ordering::SeqCst);
359 }
360 }
361
362 struct MockSns {
363 call_count: AtomicUsize,
364 }
365 impl SnsDelivery for MockSns {
366 fn publish_to_topic(&self, _topic_arn: &str, _message: &str, _subject: Option<&str>) {
367 self.call_count.fetch_add(1, Ordering::SeqCst);
368 }
369 }
370
371 struct MockEventBridge {
372 call_count: AtomicUsize,
373 }
374 impl EventBridgeDelivery for MockEventBridge {
375 fn put_event(
376 &self,
377 _source: &str,
378 _detail_type: &str,
379 _detail: &str,
380 _event_bus_name: &str,
381 ) {
382 self.call_count.fetch_add(1, Ordering::SeqCst);
383 }
384 }
385
386 struct MockKinesis {
387 call_count: AtomicUsize,
388 }
389 impl KinesisDelivery for MockKinesis {
390 fn put_record(&self, _stream_arn: &str, _data: &str, _partition_key: &str) {
391 self.call_count.fetch_add(1, Ordering::SeqCst);
392 }
393 }
394
395 struct MockStepFunctions {
396 call_count: AtomicUsize,
397 }
398 impl StepFunctionsDelivery for MockStepFunctions {
399 fn start_execution(&self, _state_machine_arn: &str, _input: &str) {
400 self.call_count.fetch_add(1, Ordering::SeqCst);
401 }
402 }
403
404 #[test]
405 fn delivery_bus_new_has_no_senders() {
406 let bus = DeliveryBus::new();
407 bus.send_to_sqs("arn:queue", "body", &HashMap::new());
409 bus.publish_to_sns("arn:topic", "msg", None);
410 bus.put_event_to_eventbridge("src", "type", "{}", "default");
411 bus.send_to_kinesis("arn:stream", "data", "pk");
412 bus.start_stepfunctions_execution("arn:sfn", "{}");
413 }
415
416 #[test]
417 fn delivery_bus_default_is_same_as_new() {
418 let bus = DeliveryBus::default();
419 bus.send_to_sqs("arn:q", "b", &HashMap::new());
420 }
421
422 #[test]
423 fn send_to_sqs_calls_sender() {
424 let mock = Arc::new(MockSqs {
425 call_count: AtomicUsize::new(0),
426 });
427 let bus = DeliveryBus::new().with_sqs(mock.clone());
428
429 bus.send_to_sqs("arn:queue", "msg", &HashMap::new());
430 assert_eq!(mock.call_count.load(Ordering::SeqCst), 1);
431
432 bus.send_to_sqs("arn:queue2", "msg2", &HashMap::new());
433 assert_eq!(mock.call_count.load(Ordering::SeqCst), 2);
434 }
435
436 #[test]
437 fn send_to_sqs_with_attrs_calls_sender() {
438 let mock = Arc::new(MockSqs {
439 call_count: AtomicUsize::new(0),
440 });
441 let bus = DeliveryBus::new().with_sqs(mock.clone());
442
443 let mut attrs = HashMap::new();
444 attrs.insert(
445 "key".to_string(),
446 SqsMessageAttribute {
447 data_type: "String".to_string(),
448 string_value: Some("val".to_string()),
449 binary_value: None,
450 },
451 );
452 bus.send_to_sqs_with_attrs("arn:q", "body", &attrs, Some("group"), Some("dedup"));
453 assert_eq!(mock.call_count.load(Ordering::SeqCst), 1);
454 }
455
456 #[test]
457 fn publish_to_sns_calls_sender() {
458 let mock = Arc::new(MockSns {
459 call_count: AtomicUsize::new(0),
460 });
461 let bus = DeliveryBus::new().with_sns(mock.clone());
462
463 bus.publish_to_sns("arn:topic", "message", Some("subject"));
464 assert_eq!(mock.call_count.load(Ordering::SeqCst), 1);
465 }
466
467 #[test]
468 fn put_event_to_eventbridge_calls_sender() {
469 let mock = Arc::new(MockEventBridge {
470 call_count: AtomicUsize::new(0),
471 });
472 let bus = DeliveryBus::new().with_eventbridge(mock.clone());
473
474 bus.put_event_to_eventbridge("aws.s3", "Object Created", "{}", "default");
475 assert_eq!(mock.call_count.load(Ordering::SeqCst), 1);
476 }
477
478 #[test]
479 fn send_to_kinesis_calls_sender() {
480 let mock = Arc::new(MockKinesis {
481 call_count: AtomicUsize::new(0),
482 });
483 let bus = DeliveryBus::new().with_kinesis(mock.clone());
484
485 bus.send_to_kinesis("arn:stream", "data", "partition-key");
486 assert_eq!(mock.call_count.load(Ordering::SeqCst), 1);
487 }
488
489 #[test]
490 fn start_stepfunctions_calls_sender() {
491 let mock = Arc::new(MockStepFunctions {
492 call_count: AtomicUsize::new(0),
493 });
494 let bus = DeliveryBus::new().with_stepfunctions(mock.clone());
495
496 bus.start_stepfunctions_execution("arn:sfn:machine", r#"{"key":"val"}"#);
497 assert_eq!(mock.call_count.load(Ordering::SeqCst), 1);
498 }
499
500 #[test]
501 fn builder_chaining_works() {
502 let sqs = Arc::new(MockSqs {
503 call_count: AtomicUsize::new(0),
504 });
505 let sns = Arc::new(MockSns {
506 call_count: AtomicUsize::new(0),
507 });
508 let eb = Arc::new(MockEventBridge {
509 call_count: AtomicUsize::new(0),
510 });
511 let kin = Arc::new(MockKinesis {
512 call_count: AtomicUsize::new(0),
513 });
514 let sfn = Arc::new(MockStepFunctions {
515 call_count: AtomicUsize::new(0),
516 });
517
518 let bus = DeliveryBus::new()
519 .with_sqs(sqs.clone())
520 .with_sns(sns.clone())
521 .with_eventbridge(eb.clone())
522 .with_kinesis(kin.clone())
523 .with_stepfunctions(sfn.clone());
524
525 bus.send_to_sqs("q", "m", &HashMap::new());
526 bus.publish_to_sns("t", "m", None);
527 bus.put_event_to_eventbridge("s", "d", "{}", "b");
528 bus.send_to_kinesis("s", "d", "k");
529 bus.start_stepfunctions_execution("sm", "{}");
530
531 assert_eq!(sqs.call_count.load(Ordering::SeqCst), 1);
532 assert_eq!(sns.call_count.load(Ordering::SeqCst), 1);
533 assert_eq!(eb.call_count.load(Ordering::SeqCst), 1);
534 assert_eq!(kin.call_count.load(Ordering::SeqCst), 1);
535 assert_eq!(sfn.call_count.load(Ordering::SeqCst), 1);
536 }
537
538 #[tokio::test]
539 async fn invoke_lambda_returns_none_without_invoker() {
540 let bus = DeliveryBus::new();
541 let result = bus.invoke_lambda("arn:lambda", "{}").await;
542 assert!(result.is_none());
543 }
544}