1use std::collections::HashMap;
2use std::sync::Arc;
3
4pub struct DeliveryBus {
10 sqs_sender: Option<Arc<dyn SqsDelivery>>,
12 sns_sender: Option<Arc<dyn SnsDelivery>>,
14 eventbridge_sender: Option<Arc<dyn EventBridgeDelivery>>,
16 lambda_invoker: Option<Arc<dyn LambdaDelivery>>,
18 kinesis_sender: Option<Arc<dyn KinesisDelivery>>,
20}
21
22#[derive(Debug, Clone)]
24pub struct SqsMessageAttribute {
25 pub data_type: String,
26 pub string_value: Option<String>,
27 pub binary_value: Option<String>,
28}
29
30pub trait SqsDelivery: Send + Sync {
32 fn deliver_to_queue(
33 &self,
34 queue_arn: &str,
35 message_body: &str,
36 attributes: &HashMap<String, String>,
37 );
38
39 fn deliver_to_queue_with_attrs(
41 &self,
42 queue_arn: &str,
43 message_body: &str,
44 message_attributes: &HashMap<String, SqsMessageAttribute>,
45 message_group_id: Option<&str>,
46 message_dedup_id: Option<&str>,
47 ) {
48 let _ = (message_attributes, message_group_id, message_dedup_id);
50 self.deliver_to_queue(queue_arn, message_body, &HashMap::new());
51 }
52}
53
54pub trait SnsDelivery: Send + Sync {
56 fn publish_to_topic(&self, topic_arn: &str, message: &str, subject: Option<&str>);
57}
58
59pub trait EventBridgeDelivery: Send + Sync {
61 fn put_event(&self, source: &str, detail_type: &str, detail: &str, event_bus_name: &str);
64}
65
66pub trait LambdaDelivery: Send + Sync {
68 fn invoke_lambda(
71 &self,
72 function_arn: &str,
73 payload: &str,
74 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<Vec<u8>, String>> + Send>>;
75}
76
77pub trait KinesisDelivery: Send + Sync {
79 fn put_record(&self, stream_arn: &str, data: &str, partition_key: &str);
82}
83
84impl DeliveryBus {
85 pub fn new() -> Self {
86 Self {
87 sqs_sender: None,
88 sns_sender: None,
89 eventbridge_sender: None,
90 lambda_invoker: None,
91 kinesis_sender: None,
92 }
93 }
94
95 pub fn with_sqs(mut self, sender: Arc<dyn SqsDelivery>) -> Self {
96 self.sqs_sender = Some(sender);
97 self
98 }
99
100 pub fn with_sns(mut self, sender: Arc<dyn SnsDelivery>) -> Self {
101 self.sns_sender = Some(sender);
102 self
103 }
104
105 pub fn with_eventbridge(mut self, sender: Arc<dyn EventBridgeDelivery>) -> Self {
106 self.eventbridge_sender = Some(sender);
107 self
108 }
109
110 pub fn with_lambda(mut self, invoker: Arc<dyn LambdaDelivery>) -> Self {
111 self.lambda_invoker = Some(invoker);
112 self
113 }
114
115 pub fn with_kinesis(mut self, sender: Arc<dyn KinesisDelivery>) -> Self {
116 self.kinesis_sender = Some(sender);
117 self
118 }
119
120 pub fn send_to_sqs(
122 &self,
123 queue_arn: &str,
124 message_body: &str,
125 attributes: &HashMap<String, String>,
126 ) {
127 if let Some(ref sender) = self.sqs_sender {
128 sender.deliver_to_queue(queue_arn, message_body, attributes);
129 }
130 }
131
132 pub fn send_to_sqs_with_attrs(
134 &self,
135 queue_arn: &str,
136 message_body: &str,
137 message_attributes: &HashMap<String, SqsMessageAttribute>,
138 message_group_id: Option<&str>,
139 message_dedup_id: Option<&str>,
140 ) {
141 if let Some(ref sender) = self.sqs_sender {
142 sender.deliver_to_queue_with_attrs(
143 queue_arn,
144 message_body,
145 message_attributes,
146 message_group_id,
147 message_dedup_id,
148 );
149 }
150 }
151
152 pub fn publish_to_sns(&self, topic_arn: &str, message: &str, subject: Option<&str>) {
154 if let Some(ref sender) = self.sns_sender {
155 sender.publish_to_topic(topic_arn, message, subject);
156 }
157 }
158
159 pub fn put_event_to_eventbridge(
161 &self,
162 source: &str,
163 detail_type: &str,
164 detail: &str,
165 event_bus_name: &str,
166 ) {
167 if let Some(ref sender) = self.eventbridge_sender {
168 sender.put_event(source, detail_type, detail, event_bus_name);
169 }
170 }
171
172 pub async fn invoke_lambda(
174 &self,
175 function_arn: &str,
176 payload: &str,
177 ) -> Option<Result<Vec<u8>, String>> {
178 if let Some(ref invoker) = self.lambda_invoker {
179 Some(invoker.invoke_lambda(function_arn, payload).await)
180 } else {
181 None
182 }
183 }
184
185 pub fn send_to_kinesis(&self, stream_arn: &str, data: &str, partition_key: &str) {
187 if let Some(ref sender) = self.kinesis_sender {
188 sender.put_record(stream_arn, data, partition_key);
189 }
190 }
191}
192
193impl Default for DeliveryBus {
194 fn default() -> Self {
195 Self::new()
196 }
197}