Skip to main content

fakecloud_core/
delivery.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3
4/// Cross-service message delivery.
5///
6/// Services use this to deliver messages to other services without
7/// direct dependencies between service crates. The server wires up
8/// the delivery functions at startup.
9pub struct DeliveryBus {
10    /// Deliver a message to an SQS queue by ARN.
11    sqs_sender: Option<Arc<dyn SqsDelivery>>,
12    /// Publish a message to an SNS topic by ARN.
13    sns_sender: Option<Arc<dyn SnsDelivery>>,
14    /// Put an event onto an EventBridge bus.
15    eventbridge_sender: Option<Arc<dyn EventBridgeDelivery>>,
16    /// Invoke a Lambda function by ARN.
17    lambda_invoker: Option<Arc<dyn LambdaDelivery>>,
18    /// Put records to a Kinesis Data Stream by ARN.
19    kinesis_sender: Option<Arc<dyn KinesisDelivery>>,
20    /// Start Step Functions executions.
21    stepfunctions_starter: Option<Arc<dyn StepFunctionsDelivery>>,
22}
23
24/// Message attribute for SQS delivery from SNS.
25#[derive(Debug, Clone)]
26pub struct SqsMessageAttribute {
27    pub data_type: String,
28    pub string_value: Option<String>,
29    pub binary_value: Option<String>,
30}
31
32/// Trait for delivering messages to SQS queues.
33pub trait SqsDelivery: Send + Sync {
34    fn deliver_to_queue(
35        &self,
36        queue_arn: &str,
37        message_body: &str,
38        attributes: &HashMap<String, String>,
39    );
40
41    /// Deliver with message attributes and FIFO fields
42    fn deliver_to_queue_with_attrs(
43        &self,
44        queue_arn: &str,
45        message_body: &str,
46        message_attributes: &HashMap<String, SqsMessageAttribute>,
47        message_group_id: Option<&str>,
48        message_dedup_id: Option<&str>,
49    ) {
50        // Default implementation: fall back to simple delivery
51        let _ = (message_attributes, message_group_id, message_dedup_id);
52        self.deliver_to_queue(queue_arn, message_body, &HashMap::new());
53    }
54}
55
56/// Trait for publishing messages to SNS topics.
57pub trait SnsDelivery: Send + Sync {
58    fn publish_to_topic(&self, topic_arn: &str, message: &str, subject: Option<&str>);
59}
60
61/// Trait for putting events onto an EventBridge bus from cross-service integrations.
62pub trait EventBridgeDelivery: Send + Sync {
63    /// Put an event onto the specified event bus.
64    /// The implementation should handle rule matching and target delivery.
65    fn put_event(&self, source: &str, detail_type: &str, detail: &str, event_bus_name: &str);
66}
67
68/// Trait for invoking Lambda functions from cross-service integrations.
69pub trait LambdaDelivery: Send + Sync {
70    /// Invoke a Lambda function with the given payload.
71    /// The function is identified by ARN. Returns the response bytes on success.
72    fn invoke_lambda(
73        &self,
74        function_arn: &str,
75        payload: &str,
76    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<Vec<u8>, String>> + Send>>;
77}
78
79/// Trait for putting records to Kinesis Data Streams.
80pub trait KinesisDelivery: Send + Sync {
81    /// Put a record to a Kinesis stream identified by ARN.
82    /// The data should be base64-encoded. partition_key is used for shard distribution.
83    fn put_record(&self, stream_arn: &str, data: &str, partition_key: &str);
84}
85
86/// Trait for starting Step Functions executions from cross-service integrations.
87pub trait StepFunctionsDelivery: Send + Sync {
88    /// Start a state machine execution with the given input.
89    /// The state machine is identified by ARN.
90    fn start_execution(&self, state_machine_arn: &str, input: &str);
91}
92
93impl DeliveryBus {
94    pub fn new() -> Self {
95        Self {
96            sqs_sender: None,
97            sns_sender: None,
98            eventbridge_sender: None,
99            lambda_invoker: None,
100            kinesis_sender: None,
101            stepfunctions_starter: None,
102        }
103    }
104
105    pub fn with_sqs(mut self, sender: Arc<dyn SqsDelivery>) -> Self {
106        self.sqs_sender = Some(sender);
107        self
108    }
109
110    pub fn with_sns(mut self, sender: Arc<dyn SnsDelivery>) -> Self {
111        self.sns_sender = Some(sender);
112        self
113    }
114
115    pub fn with_eventbridge(mut self, sender: Arc<dyn EventBridgeDelivery>) -> Self {
116        self.eventbridge_sender = Some(sender);
117        self
118    }
119
120    pub fn with_lambda(mut self, invoker: Arc<dyn LambdaDelivery>) -> Self {
121        self.lambda_invoker = Some(invoker);
122        self
123    }
124
125    pub fn with_kinesis(mut self, sender: Arc<dyn KinesisDelivery>) -> Self {
126        self.kinesis_sender = Some(sender);
127        self
128    }
129
130    pub fn with_stepfunctions(mut self, starter: Arc<dyn StepFunctionsDelivery>) -> Self {
131        self.stepfunctions_starter = Some(starter);
132        self
133    }
134
135    /// Send a message to an SQS queue identified by ARN.
136    pub fn send_to_sqs(
137        &self,
138        queue_arn: &str,
139        message_body: &str,
140        attributes: &HashMap<String, String>,
141    ) {
142        if let Some(ref sender) = self.sqs_sender {
143            sender.deliver_to_queue(queue_arn, message_body, attributes);
144        }
145    }
146
147    /// Send a message to an SQS queue with message attributes and FIFO fields.
148    pub fn send_to_sqs_with_attrs(
149        &self,
150        queue_arn: &str,
151        message_body: &str,
152        message_attributes: &HashMap<String, SqsMessageAttribute>,
153        message_group_id: Option<&str>,
154        message_dedup_id: Option<&str>,
155    ) {
156        if let Some(ref sender) = self.sqs_sender {
157            sender.deliver_to_queue_with_attrs(
158                queue_arn,
159                message_body,
160                message_attributes,
161                message_group_id,
162                message_dedup_id,
163            );
164        }
165    }
166
167    /// Publish a message to an SNS topic identified by ARN.
168    pub fn publish_to_sns(&self, topic_arn: &str, message: &str, subject: Option<&str>) {
169        if let Some(ref sender) = self.sns_sender {
170            sender.publish_to_topic(topic_arn, message, subject);
171        }
172    }
173
174    /// Put an event onto an EventBridge bus.
175    pub fn put_event_to_eventbridge(
176        &self,
177        source: &str,
178        detail_type: &str,
179        detail: &str,
180        event_bus_name: &str,
181    ) {
182        if let Some(ref sender) = self.eventbridge_sender {
183            sender.put_event(source, detail_type, detail, event_bus_name);
184        }
185    }
186
187    /// Invoke a Lambda function identified by ARN.
188    pub async fn invoke_lambda(
189        &self,
190        function_arn: &str,
191        payload: &str,
192    ) -> Option<Result<Vec<u8>, String>> {
193        if let Some(ref invoker) = self.lambda_invoker {
194            Some(invoker.invoke_lambda(function_arn, payload).await)
195        } else {
196            None
197        }
198    }
199
200    /// Put a record to a Kinesis stream identified by ARN.
201    pub fn send_to_kinesis(&self, stream_arn: &str, data: &str, partition_key: &str) {
202        if let Some(ref sender) = self.kinesis_sender {
203            sender.put_record(stream_arn, data, partition_key);
204        }
205    }
206
207    /// Start a Step Functions execution identified by state machine ARN.
208    pub fn start_stepfunctions_execution(&self, state_machine_arn: &str, input: &str) {
209        if let Some(ref starter) = self.stepfunctions_starter {
210            starter.start_execution(state_machine_arn, input);
211        }
212    }
213}
214
215impl Default for DeliveryBus {
216    fn default() -> Self {
217        Self::new()
218    }
219}