1use std::collections::HashMap;
2use std::sync::Arc;
3
4pub struct DeliveryBus {
10 sqs_sender: Option<Arc<dyn SqsDelivery>>,
12 sns_sender: Option<Arc<dyn SnsDelivery>>,
14 eventbridge_sender: Option<Arc<dyn EventBridgeDelivery>>,
16 lambda_invoker: Option<Arc<dyn LambdaDelivery>>,
18 kinesis_sender: Option<Arc<dyn KinesisDelivery>>,
20 stepfunctions_starter: Option<Arc<dyn StepFunctionsDelivery>>,
22 s3_writer: Option<Arc<dyn S3Delivery>>,
24 firehose_sender: Option<Arc<dyn FirehoseDelivery>>,
26 ses_dispatcher: Option<Arc<dyn SesSendEmailDispatcher>>,
28 ecs_task_runner: Option<Arc<dyn EcsTaskRunner>>,
30 elbv2_target_registration: Option<Arc<dyn Elbv2TargetRegistration>>,
32 cloudwatch_metrics: Option<Arc<dyn CloudwatchDelivery>>,
35 cloudwatch_logs: Option<Arc<dyn CloudwatchLogsDelivery>>,
38 cognito_jwt_verifier: Option<Arc<dyn CognitoJwtVerifier>>,
42 kms_hook: Option<Arc<dyn KmsHook>>,
45}
46
47#[derive(Debug, Clone)]
49pub struct SqsMessageAttribute {
50 pub data_type: String,
51 pub string_value: Option<String>,
52 pub binary_value: Option<String>,
53}
54
55#[derive(Debug, Clone, PartialEq, Eq)]
60pub enum SqsDeliveryError {
61 QueueNotFound(String),
63 InvalidArn(String),
65 InvalidParameter(String),
70}
71
72impl std::fmt::Display for SqsDeliveryError {
73 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
74 match self {
75 Self::QueueNotFound(arn) => write!(f, "queue not found: {arn}"),
76 Self::InvalidArn(arn) => write!(f, "invalid queue ARN: {arn}"),
77 Self::InvalidParameter(msg) => write!(f, "invalid parameter: {msg}"),
78 }
79 }
80}
81
82impl std::error::Error for SqsDeliveryError {}
83
84pub trait SqsDelivery: Send + Sync {
86 fn deliver_to_queue(
87 &self,
88 queue_arn: &str,
89 message_body: &str,
90 attributes: &HashMap<String, String>,
91 );
92
93 fn deliver_to_queue_with_attrs(
95 &self,
96 queue_arn: &str,
97 message_body: &str,
98 message_attributes: &HashMap<String, SqsMessageAttribute>,
99 message_group_id: Option<&str>,
100 message_dedup_id: Option<&str>,
101 ) {
102 let _ = (message_attributes, message_group_id, message_dedup_id);
104 self.deliver_to_queue(queue_arn, message_body, &HashMap::new());
105 }
106
107 fn try_deliver_to_queue_with_attrs(
113 &self,
114 queue_arn: &str,
115 message_body: &str,
116 message_attributes: &HashMap<String, SqsMessageAttribute>,
117 message_group_id: Option<&str>,
118 message_dedup_id: Option<&str>,
119 ) -> Result<(), SqsDeliveryError> {
120 self.deliver_to_queue_with_attrs(
121 queue_arn,
122 message_body,
123 message_attributes,
124 message_group_id,
125 message_dedup_id,
126 );
127 Ok(())
128 }
129}
130
131pub trait SnsDelivery: Send + Sync {
133 fn publish_to_topic(&self, topic_arn: &str, message: &str, subject: Option<&str>);
134
135 fn publish_to_topic_fifo(
139 &self,
140 topic_arn: &str,
141 message: &str,
142 subject: Option<&str>,
143 _message_group_id: Option<&str>,
144 _message_dedup_id: Option<&str>,
145 ) {
146 self.publish_to_topic(topic_arn, message, subject);
147 }
148}
149
150pub trait EventBridgeDelivery: Send + Sync {
152 fn put_event(&self, source: &str, detail_type: &str, detail: &str, event_bus_name: &str);
155
156 fn put_event_to_account(
163 &self,
164 source: &str,
165 detail_type: &str,
166 detail: &str,
167 event_bus_name: &str,
168 _target_account_id: &str,
169 ) {
170 self.put_event(source, detail_type, detail, event_bus_name);
171 }
172}
173
174pub trait LambdaDelivery: Send + Sync {
176 fn invoke_lambda(
179 &self,
180 function_arn: &str,
181 payload: &str,
182 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<Vec<u8>, String>> + Send>>;
183}
184
185pub trait KinesisDelivery: Send + Sync {
187 fn put_record(&self, stream_arn: &str, data: &str, partition_key: &str);
190}
191
192pub trait StepFunctionsDelivery: Send + Sync {
194 fn start_execution(&self, state_machine_arn: &str, input: &str);
197}
198
199pub trait FirehoseDelivery: Send + Sync {
205 fn put_record(&self, delivery_stream_arn: &str, data: &[u8]);
206}
207
208pub trait S3Delivery: Send + Sync {
212 fn put_object(
215 &self,
216 account_id: &str,
217 bucket: &str,
218 key: &str,
219 body: Vec<u8>,
220 content_type: Option<&str>,
221 ) -> Result<(), String>;
222
223 fn get_object(&self, account_id: &str, bucket: &str, key: &str) -> Result<Vec<u8>, String>;
228}
229
230pub trait SesSendEmailDispatcher: Send + Sync {
235 #[allow(clippy::too_many_arguments)]
236 fn send_email(
237 &self,
238 account_id: &str,
239 from: &str,
240 to: Vec<String>,
241 cc: Vec<String>,
242 bcc: Vec<String>,
243 subject: Option<&str>,
244 text_body: Option<&str>,
245 html_body: Option<&str>,
246 ) -> Result<(), String>;
247}
248
249pub trait EcsTaskRunner: Send + Sync {
253 fn run_task(
254 &self,
255 account_id: &str,
256 cluster: &str,
257 task_definition: &str,
258 launch_type: Option<&str>,
259 count: usize,
260 ) -> Result<(), String>;
261}
262
263pub trait Elbv2TargetRegistration: Send + Sync {
267 fn register_targets(
268 &self,
269 account_id: &str,
270 target_group_arn: &str,
271 targets: Vec<(String, Option<i64>)>,
272 );
273 fn deregister_targets(
274 &self,
275 account_id: &str,
276 target_group_arn: &str,
277 targets: Vec<(String, Option<i64>)>,
278 );
279}
280
281pub trait CloudwatchDelivery: Send + Sync {
285 #[allow(clippy::too_many_arguments)]
286 fn put_metric(
287 &self,
288 account_id: &str,
289 region: &str,
290 namespace: &str,
291 metric_name: &str,
292 value: f64,
293 unit: Option<&str>,
294 dimensions: std::collections::BTreeMap<String, String>,
295 timestamp_ms: i64,
296 );
297}
298
299pub trait CloudwatchLogsDelivery: Send + Sync {
304 fn put_log_events(
305 &self,
306 account_id: &str,
307 log_group_name: &str,
308 log_stream_name: &str,
309 events: &[(i64, String)],
310 );
311}
312
313pub trait EmailDispatcher: Send + Sync {
317 fn send_email(
318 &self,
319 account_id: &str,
320 from: &str,
321 to: &str,
322 subject: &str,
323 body_text: &str,
324 body_html: Option<&str>,
325 );
326}
327
328pub trait SmsDispatcher: Send + Sync {
331 fn send_sms(&self, account_id: &str, phone_number: &str, message: &str);
332}
333
334pub trait KmsHook: Send + Sync {
344 fn encrypt(
345 &self,
346 account_id: &str,
347 region: &str,
348 key_id: &str,
349 plaintext: &[u8],
350 service_principal: &str,
351 encryption_context: std::collections::HashMap<String, String>,
352 ) -> Result<String, String>;
353
354 fn decrypt(
355 &self,
356 account_id: &str,
357 ciphertext_b64: &str,
358 service_principal: &str,
359 encryption_context: std::collections::HashMap<String, String>,
360 ) -> Result<Vec<u8>, String>;
361}
362
363pub trait CognitoJwtVerifier: Send + Sync {
371 fn verify_token(
372 &self,
373 account_id: &str,
374 user_pool_arn: &str,
375 token: &str,
376 ) -> Result<serde_json::Value, String>;
377}
378
379impl DeliveryBus {
380 pub fn new() -> Self {
381 Self {
382 sqs_sender: None,
383 sns_sender: None,
384 eventbridge_sender: None,
385 lambda_invoker: None,
386 kinesis_sender: None,
387 stepfunctions_starter: None,
388 s3_writer: None,
389 firehose_sender: None,
390 ses_dispatcher: None,
391 ecs_task_runner: None,
392 elbv2_target_registration: None,
393 cloudwatch_metrics: None,
394 cloudwatch_logs: None,
395 cognito_jwt_verifier: None,
396 kms_hook: None,
397 }
398 }
399
400 pub fn with_cognito_jwt_verifier(mut self, verifier: Arc<dyn CognitoJwtVerifier>) -> Self {
401 self.cognito_jwt_verifier = Some(verifier);
402 self
403 }
404
405 pub fn with_kms_hook(mut self, hook: Arc<dyn KmsHook>) -> Self {
406 self.kms_hook = Some(hook);
407 self
408 }
409
410 pub fn kms_encrypt(
413 &self,
414 account_id: &str,
415 region: &str,
416 key_id: &str,
417 plaintext: &[u8],
418 service_principal: &str,
419 encryption_context: std::collections::HashMap<String, String>,
420 ) -> Result<String, String> {
421 match self.kms_hook {
422 Some(ref h) => h.encrypt(
423 account_id,
424 region,
425 key_id,
426 plaintext,
427 service_principal,
428 encryption_context,
429 ),
430 None => Err("KMS hook not configured".to_string()),
431 }
432 }
433
434 pub fn kms_decrypt(
437 &self,
438 account_id: &str,
439 ciphertext_b64: &str,
440 service_principal: &str,
441 encryption_context: std::collections::HashMap<String, String>,
442 ) -> Result<Vec<u8>, String> {
443 match self.kms_hook {
444 Some(ref h) => h.decrypt(
445 account_id,
446 ciphertext_b64,
447 service_principal,
448 encryption_context,
449 ),
450 None => Err("KMS hook not configured".to_string()),
451 }
452 }
453
454 pub fn verify_cognito_jwt(
457 &self,
458 account_id: &str,
459 user_pool_arn: &str,
460 token: &str,
461 ) -> Result<serde_json::Value, String> {
462 match self.cognito_jwt_verifier {
463 Some(ref v) => v.verify_token(account_id, user_pool_arn, token),
464 None => Err("Cognito JWT verifier not configured".to_string()),
465 }
466 }
467
468 pub fn with_cloudwatch_metrics(mut self, sender: Arc<dyn CloudwatchDelivery>) -> Self {
469 self.cloudwatch_metrics = Some(sender);
470 self
471 }
472
473 #[allow(clippy::too_many_arguments)]
477 pub fn put_cloudwatch_metric(
478 &self,
479 account_id: &str,
480 region: &str,
481 namespace: &str,
482 metric_name: &str,
483 value: f64,
484 unit: Option<&str>,
485 dimensions: std::collections::BTreeMap<String, String>,
486 timestamp_ms: i64,
487 ) {
488 if let Some(ref sender) = self.cloudwatch_metrics {
489 sender.put_metric(
490 account_id,
491 region,
492 namespace,
493 metric_name,
494 value,
495 unit,
496 dimensions,
497 timestamp_ms,
498 );
499 }
500 }
501
502 pub fn with_cloudwatch_logs(mut self, sender: Arc<dyn CloudwatchLogsDelivery>) -> Self {
503 self.cloudwatch_logs = Some(sender);
504 self
505 }
506
507 pub fn put_log_events(
510 &self,
511 account_id: &str,
512 log_group_name: &str,
513 log_stream_name: &str,
514 events: &[(i64, String)],
515 ) {
516 if let Some(ref sender) = self.cloudwatch_logs {
517 sender.put_log_events(account_id, log_group_name, log_stream_name, events);
518 }
519 }
520
521 pub fn with_ses_dispatcher(mut self, dispatcher: Arc<dyn SesSendEmailDispatcher>) -> Self {
522 self.ses_dispatcher = Some(dispatcher);
523 self
524 }
525
526 pub fn with_ecs_task_runner(mut self, runner: Arc<dyn EcsTaskRunner>) -> Self {
527 self.ecs_task_runner = Some(runner);
528 self
529 }
530
531 pub fn with_elbv2_target_registration(mut self, reg: Arc<dyn Elbv2TargetRegistration>) -> Self {
532 self.elbv2_target_registration = Some(reg);
533 self
534 }
535
536 pub fn register_elbv2_targets(
539 &self,
540 account_id: &str,
541 target_group_arn: &str,
542 targets: Vec<(String, Option<i64>)>,
543 ) {
544 if let Some(ref reg) = self.elbv2_target_registration {
545 reg.register_targets(account_id, target_group_arn, targets);
546 }
547 }
548
549 pub fn deregister_elbv2_targets(
552 &self,
553 account_id: &str,
554 target_group_arn: &str,
555 targets: Vec<(String, Option<i64>)>,
556 ) {
557 if let Some(ref reg) = self.elbv2_target_registration {
558 reg.deregister_targets(account_id, target_group_arn, targets);
559 }
560 }
561
562 #[allow(clippy::too_many_arguments)]
565 pub fn send_ses_email(
566 &self,
567 account_id: &str,
568 from: &str,
569 to: Vec<String>,
570 cc: Vec<String>,
571 bcc: Vec<String>,
572 subject: Option<&str>,
573 text_body: Option<&str>,
574 html_body: Option<&str>,
575 ) -> Result<(), String> {
576 match self.ses_dispatcher {
577 Some(ref d) => {
578 d.send_email(account_id, from, to, cc, bcc, subject, text_body, html_body)
579 }
580 None => Err("SES dispatcher not configured".to_string()),
581 }
582 }
583
584 pub fn run_ecs_task(
587 &self,
588 account_id: &str,
589 cluster: &str,
590 task_definition: &str,
591 launch_type: Option<&str>,
592 count: usize,
593 ) -> Result<(), String> {
594 match self.ecs_task_runner {
595 Some(ref r) => r.run_task(account_id, cluster, task_definition, launch_type, count),
596 None => Err("ECS task runner not configured".to_string()),
597 }
598 }
599
600 pub fn with_s3(mut self, sender: Arc<dyn S3Delivery>) -> Self {
601 self.s3_writer = Some(sender);
602 self
603 }
604
605 pub fn with_firehose(mut self, sender: Arc<dyn FirehoseDelivery>) -> Self {
606 self.firehose_sender = Some(sender);
607 self
608 }
609
610 pub fn put_record_to_firehose(&self, delivery_stream_arn: &str, data: &[u8]) {
614 if let Some(ref sender) = self.firehose_sender {
615 sender.put_record(delivery_stream_arn, data);
616 }
617 }
618
619 pub fn put_object_to_s3(
622 &self,
623 account_id: &str,
624 bucket: &str,
625 key: &str,
626 body: Vec<u8>,
627 content_type: Option<&str>,
628 ) -> Result<(), String> {
629 match self.s3_writer {
630 Some(ref sender) => sender.put_object(account_id, bucket, key, body, content_type),
631 None => Err("S3 writer not configured".to_string()),
632 }
633 }
634
635 pub fn get_object_from_s3(
638 &self,
639 account_id: &str,
640 bucket: &str,
641 key: &str,
642 ) -> Result<Vec<u8>, String> {
643 match self.s3_writer {
644 Some(ref sender) => sender.get_object(account_id, bucket, key),
645 None => Err("S3 client not configured".to_string()),
646 }
647 }
648
649 pub fn with_sqs(mut self, sender: Arc<dyn SqsDelivery>) -> Self {
650 self.sqs_sender = Some(sender);
651 self
652 }
653
654 pub fn with_sns(mut self, sender: Arc<dyn SnsDelivery>) -> Self {
655 self.sns_sender = Some(sender);
656 self
657 }
658
659 pub fn with_eventbridge(mut self, sender: Arc<dyn EventBridgeDelivery>) -> Self {
660 self.eventbridge_sender = Some(sender);
661 self
662 }
663
664 pub fn with_lambda(mut self, invoker: Arc<dyn LambdaDelivery>) -> Self {
665 self.lambda_invoker = Some(invoker);
666 self
667 }
668
669 pub fn with_kinesis(mut self, sender: Arc<dyn KinesisDelivery>) -> Self {
670 self.kinesis_sender = Some(sender);
671 self
672 }
673
674 pub fn put_record_to_kinesis(&self, stream_arn: &str, data: &str, partition_key: &str) {
677 if let Some(ref sender) = self.kinesis_sender {
678 sender.put_record(stream_arn, data, partition_key);
679 }
680 }
681
682 pub fn with_stepfunctions(mut self, starter: Arc<dyn StepFunctionsDelivery>) -> Self {
683 self.stepfunctions_starter = Some(starter);
684 self
685 }
686
687 pub fn send_to_sqs(
689 &self,
690 queue_arn: &str,
691 message_body: &str,
692 attributes: &HashMap<String, String>,
693 ) {
694 if let Some(ref sender) = self.sqs_sender {
695 sender.deliver_to_queue(queue_arn, message_body, attributes);
696 }
697 }
698
699 pub fn send_to_sqs_with_attrs(
701 &self,
702 queue_arn: &str,
703 message_body: &str,
704 message_attributes: &HashMap<String, SqsMessageAttribute>,
705 message_group_id: Option<&str>,
706 message_dedup_id: Option<&str>,
707 ) {
708 if let Some(ref sender) = self.sqs_sender {
709 sender.deliver_to_queue_with_attrs(
710 queue_arn,
711 message_body,
712 message_attributes,
713 message_group_id,
714 message_dedup_id,
715 );
716 }
717 }
718
719 pub fn try_send_to_sqs_with_attrs(
724 &self,
725 queue_arn: &str,
726 message_body: &str,
727 message_attributes: &HashMap<String, SqsMessageAttribute>,
728 message_group_id: Option<&str>,
729 message_dedup_id: Option<&str>,
730 ) -> Result<(), SqsDeliveryError> {
731 match self.sqs_sender {
732 Some(ref sender) => sender.try_deliver_to_queue_with_attrs(
733 queue_arn,
734 message_body,
735 message_attributes,
736 message_group_id,
737 message_dedup_id,
738 ),
739 None => Err(SqsDeliveryError::QueueNotFound(queue_arn.to_string())),
740 }
741 }
742
743 pub fn publish_to_sns(&self, topic_arn: &str, message: &str, subject: Option<&str>) {
745 if let Some(ref sender) = self.sns_sender {
746 sender.publish_to_topic(topic_arn, message, subject);
747 }
748 }
749
750 pub fn put_event_to_eventbridge(
752 &self,
753 source: &str,
754 detail_type: &str,
755 detail: &str,
756 event_bus_name: &str,
757 ) {
758 if let Some(ref sender) = self.eventbridge_sender {
759 sender.put_event(source, detail_type, detail, event_bus_name);
760 }
761 }
762
763 pub fn put_event_to_eventbridge_for_account(
766 &self,
767 source: &str,
768 detail_type: &str,
769 detail: &str,
770 event_bus_name: &str,
771 target_account_id: &str,
772 ) {
773 if let Some(ref sender) = self.eventbridge_sender {
774 sender.put_event_to_account(
775 source,
776 detail_type,
777 detail,
778 event_bus_name,
779 target_account_id,
780 );
781 }
782 }
783
784 pub async fn invoke_lambda(
786 &self,
787 function_arn: &str,
788 payload: &str,
789 ) -> Option<Result<Vec<u8>, String>> {
790 if let Some(ref invoker) = self.lambda_invoker {
791 Some(invoker.invoke_lambda(function_arn, payload).await)
792 } else {
793 None
794 }
795 }
796
797 pub fn send_to_kinesis(&self, stream_arn: &str, data: &str, partition_key: &str) {
799 if let Some(ref sender) = self.kinesis_sender {
800 sender.put_record(stream_arn, data, partition_key);
801 }
802 }
803
804 pub fn start_stepfunctions_execution(&self, state_machine_arn: &str, input: &str) {
806 if let Some(ref starter) = self.stepfunctions_starter {
807 starter.start_execution(state_machine_arn, input);
808 }
809 }
810}
811
812impl Default for DeliveryBus {
813 fn default() -> Self {
814 Self::new()
815 }
816}
817
818#[cfg(test)]
819mod tests {
820 use super::*;
821 use std::sync::atomic::{AtomicUsize, Ordering};
822 use std::sync::Arc;
823
824 struct MockSqs {
826 call_count: AtomicUsize,
827 }
828 impl SqsDelivery for MockSqs {
829 fn deliver_to_queue(
830 &self,
831 _queue_arn: &str,
832 _message_body: &str,
833 _attributes: &HashMap<String, String>,
834 ) {
835 self.call_count.fetch_add(1, Ordering::SeqCst);
836 }
837 }
838
839 struct MockSns {
840 call_count: AtomicUsize,
841 }
842 impl SnsDelivery for MockSns {
843 fn publish_to_topic(&self, _topic_arn: &str, _message: &str, _subject: Option<&str>) {
844 self.call_count.fetch_add(1, Ordering::SeqCst);
845 }
846 }
847
848 struct MockEventBridge {
849 call_count: AtomicUsize,
850 }
851 impl EventBridgeDelivery for MockEventBridge {
852 fn put_event(
853 &self,
854 _source: &str,
855 _detail_type: &str,
856 _detail: &str,
857 _event_bus_name: &str,
858 ) {
859 self.call_count.fetch_add(1, Ordering::SeqCst);
860 }
861 }
862
863 struct MockKinesis {
864 call_count: AtomicUsize,
865 }
866 impl KinesisDelivery for MockKinesis {
867 fn put_record(&self, _stream_arn: &str, _data: &str, _partition_key: &str) {
868 self.call_count.fetch_add(1, Ordering::SeqCst);
869 }
870 }
871
872 struct MockStepFunctions {
873 call_count: AtomicUsize,
874 }
875 impl StepFunctionsDelivery for MockStepFunctions {
876 fn start_execution(&self, _state_machine_arn: &str, _input: &str) {
877 self.call_count.fetch_add(1, Ordering::SeqCst);
878 }
879 }
880
881 #[test]
882 fn delivery_bus_new_has_no_senders() {
883 let bus = DeliveryBus::new();
884 bus.send_to_sqs("arn:queue", "body", &HashMap::new());
886 bus.publish_to_sns("arn:topic", "msg", None);
887 bus.put_event_to_eventbridge("src", "type", "{}", "default");
888 bus.send_to_kinesis("arn:stream", "data", "pk");
889 bus.start_stepfunctions_execution("arn:sfn", "{}");
890 }
892
893 #[test]
894 fn delivery_bus_default_is_same_as_new() {
895 let bus = DeliveryBus::default();
896 bus.send_to_sqs("arn:q", "b", &HashMap::new());
897 }
898
899 #[test]
900 fn send_to_sqs_calls_sender() {
901 let mock = Arc::new(MockSqs {
902 call_count: AtomicUsize::new(0),
903 });
904 let bus = DeliveryBus::new().with_sqs(mock.clone());
905
906 bus.send_to_sqs("arn:queue", "msg", &HashMap::new());
907 assert_eq!(mock.call_count.load(Ordering::SeqCst), 1);
908
909 bus.send_to_sqs("arn:queue2", "msg2", &HashMap::new());
910 assert_eq!(mock.call_count.load(Ordering::SeqCst), 2);
911 }
912
913 #[test]
914 fn send_to_sqs_with_attrs_calls_sender() {
915 let mock = Arc::new(MockSqs {
916 call_count: AtomicUsize::new(0),
917 });
918 let bus = DeliveryBus::new().with_sqs(mock.clone());
919
920 let mut attrs = HashMap::new();
921 attrs.insert(
922 "key".to_string(),
923 SqsMessageAttribute {
924 data_type: "String".to_string(),
925 string_value: Some("val".to_string()),
926 binary_value: None,
927 },
928 );
929 bus.send_to_sqs_with_attrs("arn:q", "body", &attrs, Some("group"), Some("dedup"));
930 assert_eq!(mock.call_count.load(Ordering::SeqCst), 1);
931 }
932
933 #[test]
934 fn publish_to_sns_calls_sender() {
935 let mock = Arc::new(MockSns {
936 call_count: AtomicUsize::new(0),
937 });
938 let bus = DeliveryBus::new().with_sns(mock.clone());
939
940 bus.publish_to_sns("arn:topic", "message", Some("subject"));
941 assert_eq!(mock.call_count.load(Ordering::SeqCst), 1);
942 }
943
944 #[test]
945 fn put_event_to_eventbridge_calls_sender() {
946 let mock = Arc::new(MockEventBridge {
947 call_count: AtomicUsize::new(0),
948 });
949 let bus = DeliveryBus::new().with_eventbridge(mock.clone());
950
951 bus.put_event_to_eventbridge("aws.s3", "Object Created", "{}", "default");
952 assert_eq!(mock.call_count.load(Ordering::SeqCst), 1);
953 }
954
955 #[test]
956 fn send_to_kinesis_calls_sender() {
957 let mock = Arc::new(MockKinesis {
958 call_count: AtomicUsize::new(0),
959 });
960 let bus = DeliveryBus::new().with_kinesis(mock.clone());
961
962 bus.send_to_kinesis("arn:stream", "data", "partition-key");
963 assert_eq!(mock.call_count.load(Ordering::SeqCst), 1);
964 }
965
966 #[test]
967 fn start_stepfunctions_calls_sender() {
968 let mock = Arc::new(MockStepFunctions {
969 call_count: AtomicUsize::new(0),
970 });
971 let bus = DeliveryBus::new().with_stepfunctions(mock.clone());
972
973 bus.start_stepfunctions_execution("arn:sfn:machine", r#"{"key":"val"}"#);
974 assert_eq!(mock.call_count.load(Ordering::SeqCst), 1);
975 }
976
977 #[test]
978 fn builder_chaining_works() {
979 let sqs = Arc::new(MockSqs {
980 call_count: AtomicUsize::new(0),
981 });
982 let sns = Arc::new(MockSns {
983 call_count: AtomicUsize::new(0),
984 });
985 let eb = Arc::new(MockEventBridge {
986 call_count: AtomicUsize::new(0),
987 });
988 let kin = Arc::new(MockKinesis {
989 call_count: AtomicUsize::new(0),
990 });
991 let sfn = Arc::new(MockStepFunctions {
992 call_count: AtomicUsize::new(0),
993 });
994
995 let bus = DeliveryBus::new()
996 .with_sqs(sqs.clone())
997 .with_sns(sns.clone())
998 .with_eventbridge(eb.clone())
999 .with_kinesis(kin.clone())
1000 .with_stepfunctions(sfn.clone());
1001
1002 bus.send_to_sqs("q", "m", &HashMap::new());
1003 bus.publish_to_sns("t", "m", None);
1004 bus.put_event_to_eventbridge("s", "d", "{}", "b");
1005 bus.send_to_kinesis("s", "d", "k");
1006 bus.start_stepfunctions_execution("sm", "{}");
1007
1008 assert_eq!(sqs.call_count.load(Ordering::SeqCst), 1);
1009 assert_eq!(sns.call_count.load(Ordering::SeqCst), 1);
1010 assert_eq!(eb.call_count.load(Ordering::SeqCst), 1);
1011 assert_eq!(kin.call_count.load(Ordering::SeqCst), 1);
1012 assert_eq!(sfn.call_count.load(Ordering::SeqCst), 1);
1013 }
1014
1015 #[tokio::test]
1016 async fn invoke_lambda_returns_none_without_invoker() {
1017 let bus = DeliveryBus::new();
1018 let result = bus.invoke_lambda("arn:lambda", "{}").await;
1019 assert!(result.is_none());
1020 }
1021}