Skip to main content

fakecloud_core/
delivery.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3
4/// Cross-service message delivery.
5///
6/// Services use this to deliver messages to other services without
7/// direct dependencies between service crates. The server wires up
8/// the delivery functions at startup.
9pub struct DeliveryBus {
10    /// Deliver a message to an SQS queue by ARN.
11    sqs_sender: Option<Arc<dyn SqsDelivery>>,
12    /// Publish a message to an SNS topic by ARN.
13    sns_sender: Option<Arc<dyn SnsDelivery>>,
14    /// Put an event onto an EventBridge bus.
15    eventbridge_sender: Option<Arc<dyn EventBridgeDelivery>>,
16    /// Invoke a Lambda function by ARN.
17    lambda_invoker: Option<Arc<dyn LambdaDelivery>>,
18    /// Put records to a Kinesis Data Stream by ARN.
19    kinesis_sender: Option<Arc<dyn KinesisDelivery>>,
20}
21
22/// Message attribute for SQS delivery from SNS.
23#[derive(Debug, Clone)]
24pub struct SqsMessageAttribute {
25    pub data_type: String,
26    pub string_value: Option<String>,
27    pub binary_value: Option<String>,
28}
29
30/// Trait for delivering messages to SQS queues.
31pub trait SqsDelivery: Send + Sync {
32    fn deliver_to_queue(
33        &self,
34        queue_arn: &str,
35        message_body: &str,
36        attributes: &HashMap<String, String>,
37    );
38
39    /// Deliver with message attributes and FIFO fields
40    fn deliver_to_queue_with_attrs(
41        &self,
42        queue_arn: &str,
43        message_body: &str,
44        message_attributes: &HashMap<String, SqsMessageAttribute>,
45        message_group_id: Option<&str>,
46        message_dedup_id: Option<&str>,
47    ) {
48        // Default implementation: fall back to simple delivery
49        let _ = (message_attributes, message_group_id, message_dedup_id);
50        self.deliver_to_queue(queue_arn, message_body, &HashMap::new());
51    }
52}
53
54/// Trait for publishing messages to SNS topics.
55pub trait SnsDelivery: Send + Sync {
56    fn publish_to_topic(&self, topic_arn: &str, message: &str, subject: Option<&str>);
57}
58
59/// Trait for putting events onto an EventBridge bus from cross-service integrations.
60pub trait EventBridgeDelivery: Send + Sync {
61    /// Put an event onto the specified event bus.
62    /// The implementation should handle rule matching and target delivery.
63    fn put_event(&self, source: &str, detail_type: &str, detail: &str, event_bus_name: &str);
64}
65
66/// Trait for invoking Lambda functions from cross-service integrations.
67pub trait LambdaDelivery: Send + Sync {
68    /// Invoke a Lambda function with the given payload.
69    /// The function is identified by ARN. Returns the response bytes on success.
70    fn invoke_lambda(
71        &self,
72        function_arn: &str,
73        payload: &str,
74    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<Vec<u8>, String>> + Send>>;
75}
76
77/// Trait for putting records to Kinesis Data Streams.
78pub trait KinesisDelivery: Send + Sync {
79    /// Put a record to a Kinesis stream identified by ARN.
80    /// The data should be base64-encoded. partition_key is used for shard distribution.
81    fn put_record(&self, stream_arn: &str, data: &str, partition_key: &str);
82}
83
84impl DeliveryBus {
85    pub fn new() -> Self {
86        Self {
87            sqs_sender: None,
88            sns_sender: None,
89            eventbridge_sender: None,
90            lambda_invoker: None,
91            kinesis_sender: None,
92        }
93    }
94
95    pub fn with_sqs(mut self, sender: Arc<dyn SqsDelivery>) -> Self {
96        self.sqs_sender = Some(sender);
97        self
98    }
99
100    pub fn with_sns(mut self, sender: Arc<dyn SnsDelivery>) -> Self {
101        self.sns_sender = Some(sender);
102        self
103    }
104
105    pub fn with_eventbridge(mut self, sender: Arc<dyn EventBridgeDelivery>) -> Self {
106        self.eventbridge_sender = Some(sender);
107        self
108    }
109
110    pub fn with_lambda(mut self, invoker: Arc<dyn LambdaDelivery>) -> Self {
111        self.lambda_invoker = Some(invoker);
112        self
113    }
114
115    pub fn with_kinesis(mut self, sender: Arc<dyn KinesisDelivery>) -> Self {
116        self.kinesis_sender = Some(sender);
117        self
118    }
119
120    /// Send a message to an SQS queue identified by ARN.
121    pub fn send_to_sqs(
122        &self,
123        queue_arn: &str,
124        message_body: &str,
125        attributes: &HashMap<String, String>,
126    ) {
127        if let Some(ref sender) = self.sqs_sender {
128            sender.deliver_to_queue(queue_arn, message_body, attributes);
129        }
130    }
131
132    /// Send a message to an SQS queue with message attributes and FIFO fields.
133    pub fn send_to_sqs_with_attrs(
134        &self,
135        queue_arn: &str,
136        message_body: &str,
137        message_attributes: &HashMap<String, SqsMessageAttribute>,
138        message_group_id: Option<&str>,
139        message_dedup_id: Option<&str>,
140    ) {
141        if let Some(ref sender) = self.sqs_sender {
142            sender.deliver_to_queue_with_attrs(
143                queue_arn,
144                message_body,
145                message_attributes,
146                message_group_id,
147                message_dedup_id,
148            );
149        }
150    }
151
152    /// Publish a message to an SNS topic identified by ARN.
153    pub fn publish_to_sns(&self, topic_arn: &str, message: &str, subject: Option<&str>) {
154        if let Some(ref sender) = self.sns_sender {
155            sender.publish_to_topic(topic_arn, message, subject);
156        }
157    }
158
159    /// Put an event onto an EventBridge bus.
160    pub fn put_event_to_eventbridge(
161        &self,
162        source: &str,
163        detail_type: &str,
164        detail: &str,
165        event_bus_name: &str,
166    ) {
167        if let Some(ref sender) = self.eventbridge_sender {
168            sender.put_event(source, detail_type, detail, event_bus_name);
169        }
170    }
171
172    /// Invoke a Lambda function identified by ARN.
173    pub async fn invoke_lambda(
174        &self,
175        function_arn: &str,
176        payload: &str,
177    ) -> Option<Result<Vec<u8>, String>> {
178        if let Some(ref invoker) = self.lambda_invoker {
179            Some(invoker.invoke_lambda(function_arn, payload).await)
180        } else {
181            None
182        }
183    }
184
185    /// Put a record to a Kinesis stream identified by ARN.
186    pub fn send_to_kinesis(&self, stream_arn: &str, data: &str, partition_key: &str) {
187        if let Some(ref sender) = self.kinesis_sender {
188            sender.put_record(stream_arn, data, partition_key);
189        }
190    }
191}
192
193impl Default for DeliveryBus {
194    fn default() -> Self {
195        Self::new()
196    }
197}