1use std::collections::HashMap;
2use std::sync::Arc;
3
4pub struct DeliveryBus {
10 sqs_sender: Option<Arc<dyn SqsDelivery>>,
12 sns_sender: Option<Arc<dyn SnsDelivery>>,
14 eventbridge_sender: Option<Arc<dyn EventBridgeDelivery>>,
16 lambda_invoker: Option<Arc<dyn LambdaDelivery>>,
18 kinesis_sender: Option<Arc<dyn KinesisDelivery>>,
20 stepfunctions_starter: Option<Arc<dyn StepFunctionsDelivery>>,
22}
23
24#[derive(Debug, Clone)]
26pub struct SqsMessageAttribute {
27 pub data_type: String,
28 pub string_value: Option<String>,
29 pub binary_value: Option<String>,
30}
31
32#[derive(Debug, Clone, PartialEq, Eq)]
37pub enum SqsDeliveryError {
38 QueueNotFound(String),
40 InvalidArn(String),
42}
43
44impl std::fmt::Display for SqsDeliveryError {
45 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
46 match self {
47 Self::QueueNotFound(arn) => write!(f, "queue not found: {arn}"),
48 Self::InvalidArn(arn) => write!(f, "invalid queue ARN: {arn}"),
49 }
50 }
51}
52
53impl std::error::Error for SqsDeliveryError {}
54
55pub trait SqsDelivery: Send + Sync {
57 fn deliver_to_queue(
58 &self,
59 queue_arn: &str,
60 message_body: &str,
61 attributes: &HashMap<String, String>,
62 );
63
64 fn deliver_to_queue_with_attrs(
66 &self,
67 queue_arn: &str,
68 message_body: &str,
69 message_attributes: &HashMap<String, SqsMessageAttribute>,
70 message_group_id: Option<&str>,
71 message_dedup_id: Option<&str>,
72 ) {
73 let _ = (message_attributes, message_group_id, message_dedup_id);
75 self.deliver_to_queue(queue_arn, message_body, &HashMap::new());
76 }
77
78 fn try_deliver_to_queue_with_attrs(
84 &self,
85 queue_arn: &str,
86 message_body: &str,
87 message_attributes: &HashMap<String, SqsMessageAttribute>,
88 message_group_id: Option<&str>,
89 message_dedup_id: Option<&str>,
90 ) -> Result<(), SqsDeliveryError> {
91 self.deliver_to_queue_with_attrs(
92 queue_arn,
93 message_body,
94 message_attributes,
95 message_group_id,
96 message_dedup_id,
97 );
98 Ok(())
99 }
100}
101
102pub trait SnsDelivery: Send + Sync {
104 fn publish_to_topic(&self, topic_arn: &str, message: &str, subject: Option<&str>);
105}
106
107pub trait EventBridgeDelivery: Send + Sync {
109 fn put_event(&self, source: &str, detail_type: &str, detail: &str, event_bus_name: &str);
112
113 fn put_event_to_account(
120 &self,
121 source: &str,
122 detail_type: &str,
123 detail: &str,
124 event_bus_name: &str,
125 _target_account_id: &str,
126 ) {
127 self.put_event(source, detail_type, detail, event_bus_name);
128 }
129}
130
131pub trait LambdaDelivery: Send + Sync {
133 fn invoke_lambda(
136 &self,
137 function_arn: &str,
138 payload: &str,
139 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<Vec<u8>, String>> + Send>>;
140}
141
142pub trait KinesisDelivery: Send + Sync {
144 fn put_record(&self, stream_arn: &str, data: &str, partition_key: &str);
147}
148
149pub trait StepFunctionsDelivery: Send + Sync {
151 fn start_execution(&self, state_machine_arn: &str, input: &str);
154}
155
156pub trait EmailDispatcher: Send + Sync {
160 fn send_email(
161 &self,
162 account_id: &str,
163 from: &str,
164 to: &str,
165 subject: &str,
166 body_text: &str,
167 body_html: Option<&str>,
168 );
169}
170
171pub trait SmsDispatcher: Send + Sync {
174 fn send_sms(&self, account_id: &str, phone_number: &str, message: &str);
175}
176
177pub trait KmsHook: Send + Sync {
187 fn encrypt(
188 &self,
189 account_id: &str,
190 region: &str,
191 key_id: &str,
192 plaintext: &[u8],
193 service_principal: &str,
194 encryption_context: std::collections::HashMap<String, String>,
195 ) -> Result<String, String>;
196
197 fn decrypt(
198 &self,
199 account_id: &str,
200 ciphertext_b64: &str,
201 service_principal: &str,
202 encryption_context: std::collections::HashMap<String, String>,
203 ) -> Result<Vec<u8>, String>;
204}
205
206impl DeliveryBus {
207 pub fn new() -> Self {
208 Self {
209 sqs_sender: None,
210 sns_sender: None,
211 eventbridge_sender: None,
212 lambda_invoker: None,
213 kinesis_sender: None,
214 stepfunctions_starter: None,
215 }
216 }
217
218 pub fn with_sqs(mut self, sender: Arc<dyn SqsDelivery>) -> Self {
219 self.sqs_sender = Some(sender);
220 self
221 }
222
223 pub fn with_sns(mut self, sender: Arc<dyn SnsDelivery>) -> Self {
224 self.sns_sender = Some(sender);
225 self
226 }
227
228 pub fn with_eventbridge(mut self, sender: Arc<dyn EventBridgeDelivery>) -> Self {
229 self.eventbridge_sender = Some(sender);
230 self
231 }
232
233 pub fn with_lambda(mut self, invoker: Arc<dyn LambdaDelivery>) -> Self {
234 self.lambda_invoker = Some(invoker);
235 self
236 }
237
238 pub fn with_kinesis(mut self, sender: Arc<dyn KinesisDelivery>) -> Self {
239 self.kinesis_sender = Some(sender);
240 self
241 }
242
243 pub fn with_stepfunctions(mut self, starter: Arc<dyn StepFunctionsDelivery>) -> Self {
244 self.stepfunctions_starter = Some(starter);
245 self
246 }
247
248 pub fn send_to_sqs(
250 &self,
251 queue_arn: &str,
252 message_body: &str,
253 attributes: &HashMap<String, String>,
254 ) {
255 if let Some(ref sender) = self.sqs_sender {
256 sender.deliver_to_queue(queue_arn, message_body, attributes);
257 }
258 }
259
260 pub fn send_to_sqs_with_attrs(
262 &self,
263 queue_arn: &str,
264 message_body: &str,
265 message_attributes: &HashMap<String, SqsMessageAttribute>,
266 message_group_id: Option<&str>,
267 message_dedup_id: Option<&str>,
268 ) {
269 if let Some(ref sender) = self.sqs_sender {
270 sender.deliver_to_queue_with_attrs(
271 queue_arn,
272 message_body,
273 message_attributes,
274 message_group_id,
275 message_dedup_id,
276 );
277 }
278 }
279
280 pub fn try_send_to_sqs_with_attrs(
285 &self,
286 queue_arn: &str,
287 message_body: &str,
288 message_attributes: &HashMap<String, SqsMessageAttribute>,
289 message_group_id: Option<&str>,
290 message_dedup_id: Option<&str>,
291 ) -> Result<(), SqsDeliveryError> {
292 match self.sqs_sender {
293 Some(ref sender) => sender.try_deliver_to_queue_with_attrs(
294 queue_arn,
295 message_body,
296 message_attributes,
297 message_group_id,
298 message_dedup_id,
299 ),
300 None => Err(SqsDeliveryError::QueueNotFound(queue_arn.to_string())),
301 }
302 }
303
304 pub fn publish_to_sns(&self, topic_arn: &str, message: &str, subject: Option<&str>) {
306 if let Some(ref sender) = self.sns_sender {
307 sender.publish_to_topic(topic_arn, message, subject);
308 }
309 }
310
311 pub fn put_event_to_eventbridge(
313 &self,
314 source: &str,
315 detail_type: &str,
316 detail: &str,
317 event_bus_name: &str,
318 ) {
319 if let Some(ref sender) = self.eventbridge_sender {
320 sender.put_event(source, detail_type, detail, event_bus_name);
321 }
322 }
323
324 pub fn put_event_to_eventbridge_for_account(
327 &self,
328 source: &str,
329 detail_type: &str,
330 detail: &str,
331 event_bus_name: &str,
332 target_account_id: &str,
333 ) {
334 if let Some(ref sender) = self.eventbridge_sender {
335 sender.put_event_to_account(
336 source,
337 detail_type,
338 detail,
339 event_bus_name,
340 target_account_id,
341 );
342 }
343 }
344
345 pub async fn invoke_lambda(
347 &self,
348 function_arn: &str,
349 payload: &str,
350 ) -> Option<Result<Vec<u8>, String>> {
351 if let Some(ref invoker) = self.lambda_invoker {
352 Some(invoker.invoke_lambda(function_arn, payload).await)
353 } else {
354 None
355 }
356 }
357
358 pub fn send_to_kinesis(&self, stream_arn: &str, data: &str, partition_key: &str) {
360 if let Some(ref sender) = self.kinesis_sender {
361 sender.put_record(stream_arn, data, partition_key);
362 }
363 }
364
365 pub fn start_stepfunctions_execution(&self, state_machine_arn: &str, input: &str) {
367 if let Some(ref starter) = self.stepfunctions_starter {
368 starter.start_execution(state_machine_arn, input);
369 }
370 }
371}
372
373impl Default for DeliveryBus {
374 fn default() -> Self {
375 Self::new()
376 }
377}
378
379#[cfg(test)]
380mod tests {
381 use super::*;
382 use std::sync::atomic::{AtomicUsize, Ordering};
383 use std::sync::Arc;
384
385 struct MockSqs {
387 call_count: AtomicUsize,
388 }
389 impl SqsDelivery for MockSqs {
390 fn deliver_to_queue(
391 &self,
392 _queue_arn: &str,
393 _message_body: &str,
394 _attributes: &HashMap<String, String>,
395 ) {
396 self.call_count.fetch_add(1, Ordering::SeqCst);
397 }
398 }
399
400 struct MockSns {
401 call_count: AtomicUsize,
402 }
403 impl SnsDelivery for MockSns {
404 fn publish_to_topic(&self, _topic_arn: &str, _message: &str, _subject: Option<&str>) {
405 self.call_count.fetch_add(1, Ordering::SeqCst);
406 }
407 }
408
409 struct MockEventBridge {
410 call_count: AtomicUsize,
411 }
412 impl EventBridgeDelivery for MockEventBridge {
413 fn put_event(
414 &self,
415 _source: &str,
416 _detail_type: &str,
417 _detail: &str,
418 _event_bus_name: &str,
419 ) {
420 self.call_count.fetch_add(1, Ordering::SeqCst);
421 }
422 }
423
424 struct MockKinesis {
425 call_count: AtomicUsize,
426 }
427 impl KinesisDelivery for MockKinesis {
428 fn put_record(&self, _stream_arn: &str, _data: &str, _partition_key: &str) {
429 self.call_count.fetch_add(1, Ordering::SeqCst);
430 }
431 }
432
433 struct MockStepFunctions {
434 call_count: AtomicUsize,
435 }
436 impl StepFunctionsDelivery for MockStepFunctions {
437 fn start_execution(&self, _state_machine_arn: &str, _input: &str) {
438 self.call_count.fetch_add(1, Ordering::SeqCst);
439 }
440 }
441
442 #[test]
443 fn delivery_bus_new_has_no_senders() {
444 let bus = DeliveryBus::new();
445 bus.send_to_sqs("arn:queue", "body", &HashMap::new());
447 bus.publish_to_sns("arn:topic", "msg", None);
448 bus.put_event_to_eventbridge("src", "type", "{}", "default");
449 bus.send_to_kinesis("arn:stream", "data", "pk");
450 bus.start_stepfunctions_execution("arn:sfn", "{}");
451 }
453
454 #[test]
455 fn delivery_bus_default_is_same_as_new() {
456 let bus = DeliveryBus::default();
457 bus.send_to_sqs("arn:q", "b", &HashMap::new());
458 }
459
460 #[test]
461 fn send_to_sqs_calls_sender() {
462 let mock = Arc::new(MockSqs {
463 call_count: AtomicUsize::new(0),
464 });
465 let bus = DeliveryBus::new().with_sqs(mock.clone());
466
467 bus.send_to_sqs("arn:queue", "msg", &HashMap::new());
468 assert_eq!(mock.call_count.load(Ordering::SeqCst), 1);
469
470 bus.send_to_sqs("arn:queue2", "msg2", &HashMap::new());
471 assert_eq!(mock.call_count.load(Ordering::SeqCst), 2);
472 }
473
474 #[test]
475 fn send_to_sqs_with_attrs_calls_sender() {
476 let mock = Arc::new(MockSqs {
477 call_count: AtomicUsize::new(0),
478 });
479 let bus = DeliveryBus::new().with_sqs(mock.clone());
480
481 let mut attrs = HashMap::new();
482 attrs.insert(
483 "key".to_string(),
484 SqsMessageAttribute {
485 data_type: "String".to_string(),
486 string_value: Some("val".to_string()),
487 binary_value: None,
488 },
489 );
490 bus.send_to_sqs_with_attrs("arn:q", "body", &attrs, Some("group"), Some("dedup"));
491 assert_eq!(mock.call_count.load(Ordering::SeqCst), 1);
492 }
493
494 #[test]
495 fn publish_to_sns_calls_sender() {
496 let mock = Arc::new(MockSns {
497 call_count: AtomicUsize::new(0),
498 });
499 let bus = DeliveryBus::new().with_sns(mock.clone());
500
501 bus.publish_to_sns("arn:topic", "message", Some("subject"));
502 assert_eq!(mock.call_count.load(Ordering::SeqCst), 1);
503 }
504
505 #[test]
506 fn put_event_to_eventbridge_calls_sender() {
507 let mock = Arc::new(MockEventBridge {
508 call_count: AtomicUsize::new(0),
509 });
510 let bus = DeliveryBus::new().with_eventbridge(mock.clone());
511
512 bus.put_event_to_eventbridge("aws.s3", "Object Created", "{}", "default");
513 assert_eq!(mock.call_count.load(Ordering::SeqCst), 1);
514 }
515
516 #[test]
517 fn send_to_kinesis_calls_sender() {
518 let mock = Arc::new(MockKinesis {
519 call_count: AtomicUsize::new(0),
520 });
521 let bus = DeliveryBus::new().with_kinesis(mock.clone());
522
523 bus.send_to_kinesis("arn:stream", "data", "partition-key");
524 assert_eq!(mock.call_count.load(Ordering::SeqCst), 1);
525 }
526
527 #[test]
528 fn start_stepfunctions_calls_sender() {
529 let mock = Arc::new(MockStepFunctions {
530 call_count: AtomicUsize::new(0),
531 });
532 let bus = DeliveryBus::new().with_stepfunctions(mock.clone());
533
534 bus.start_stepfunctions_execution("arn:sfn:machine", r#"{"key":"val"}"#);
535 assert_eq!(mock.call_count.load(Ordering::SeqCst), 1);
536 }
537
538 #[test]
539 fn builder_chaining_works() {
540 let sqs = Arc::new(MockSqs {
541 call_count: AtomicUsize::new(0),
542 });
543 let sns = Arc::new(MockSns {
544 call_count: AtomicUsize::new(0),
545 });
546 let eb = Arc::new(MockEventBridge {
547 call_count: AtomicUsize::new(0),
548 });
549 let kin = Arc::new(MockKinesis {
550 call_count: AtomicUsize::new(0),
551 });
552 let sfn = Arc::new(MockStepFunctions {
553 call_count: AtomicUsize::new(0),
554 });
555
556 let bus = DeliveryBus::new()
557 .with_sqs(sqs.clone())
558 .with_sns(sns.clone())
559 .with_eventbridge(eb.clone())
560 .with_kinesis(kin.clone())
561 .with_stepfunctions(sfn.clone());
562
563 bus.send_to_sqs("q", "m", &HashMap::new());
564 bus.publish_to_sns("t", "m", None);
565 bus.put_event_to_eventbridge("s", "d", "{}", "b");
566 bus.send_to_kinesis("s", "d", "k");
567 bus.start_stepfunctions_execution("sm", "{}");
568
569 assert_eq!(sqs.call_count.load(Ordering::SeqCst), 1);
570 assert_eq!(sns.call_count.load(Ordering::SeqCst), 1);
571 assert_eq!(eb.call_count.load(Ordering::SeqCst), 1);
572 assert_eq!(kin.call_count.load(Ordering::SeqCst), 1);
573 assert_eq!(sfn.call_count.load(Ordering::SeqCst), 1);
574 }
575
576 #[tokio::test]
577 async fn invoke_lambda_returns_none_without_invoker() {
578 let bus = DeliveryBus::new();
579 let result = bus.invoke_lambda("arn:lambda", "{}").await;
580 assert!(result.is_none());
581 }
582}