Skip to main content

fakecloud_core/
delivery.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3
4/// Cross-service message delivery.
5///
6/// Services use this to deliver messages to other services without
7/// direct dependencies between service crates. The server wires up
8/// the delivery functions at startup.
9pub struct DeliveryBus {
10    /// Deliver a message to an SQS queue by ARN.
11    sqs_sender: Option<Arc<dyn SqsDelivery>>,
12    /// Publish a message to an SNS topic by ARN.
13    sns_sender: Option<Arc<dyn SnsDelivery>>,
14    /// Put an event onto an EventBridge bus.
15    eventbridge_sender: Option<Arc<dyn EventBridgeDelivery>>,
16    /// Invoke a Lambda function by ARN.
17    lambda_invoker: Option<Arc<dyn LambdaDelivery>>,
18    /// Put records to a Kinesis Data Stream by ARN.
19    kinesis_sender: Option<Arc<dyn KinesisDelivery>>,
20    /// Start Step Functions executions.
21    stepfunctions_starter: Option<Arc<dyn StepFunctionsDelivery>>,
22}
23
24/// Message attribute for SQS delivery from SNS.
25#[derive(Debug, Clone)]
26pub struct SqsMessageAttribute {
27    pub data_type: String,
28    pub string_value: Option<String>,
29    pub binary_value: Option<String>,
30}
31
32/// Error returned by fallible SQS delivery. Used by Scheduler's DLQ
33/// routing, which must distinguish "target queue missing" from
34/// "delivered successfully" to decide whether to send to the
35/// `DeadLetterConfig.Arn`.
36#[derive(Debug, Clone, PartialEq, Eq)]
37pub enum SqsDeliveryError {
38    /// The target queue ARN did not resolve to any existing queue.
39    QueueNotFound(String),
40    /// The ARN could not be parsed into a valid SQS queue identifier.
41    InvalidArn(String),
42}
43
44impl std::fmt::Display for SqsDeliveryError {
45    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
46        match self {
47            Self::QueueNotFound(arn) => write!(f, "queue not found: {arn}"),
48            Self::InvalidArn(arn) => write!(f, "invalid queue ARN: {arn}"),
49        }
50    }
51}
52
53impl std::error::Error for SqsDeliveryError {}
54
55/// Trait for delivering messages to SQS queues.
56pub trait SqsDelivery: Send + Sync {
57    fn deliver_to_queue(
58        &self,
59        queue_arn: &str,
60        message_body: &str,
61        attributes: &HashMap<String, String>,
62    );
63
64    /// Deliver with message attributes and FIFO fields
65    fn deliver_to_queue_with_attrs(
66        &self,
67        queue_arn: &str,
68        message_body: &str,
69        message_attributes: &HashMap<String, SqsMessageAttribute>,
70        message_group_id: Option<&str>,
71        message_dedup_id: Option<&str>,
72    ) {
73        // Default implementation: fall back to simple delivery
74        let _ = (message_attributes, message_group_id, message_dedup_id);
75        self.deliver_to_queue(queue_arn, message_body, &HashMap::new());
76    }
77
78    /// Fallible variant used by Scheduler's DLQ routing. Default
79    /// implementation assumes the queue exists (preserving the
80    /// fire-and-forget semantics of `deliver_to_queue`); the real SQS
81    /// impl overrides this to actually look up the queue and report
82    /// `QueueNotFound` so the caller can route to a DLQ.
83    fn try_deliver_to_queue_with_attrs(
84        &self,
85        queue_arn: &str,
86        message_body: &str,
87        message_attributes: &HashMap<String, SqsMessageAttribute>,
88        message_group_id: Option<&str>,
89        message_dedup_id: Option<&str>,
90    ) -> Result<(), SqsDeliveryError> {
91        self.deliver_to_queue_with_attrs(
92            queue_arn,
93            message_body,
94            message_attributes,
95            message_group_id,
96            message_dedup_id,
97        );
98        Ok(())
99    }
100}
101
102/// Trait for publishing messages to SNS topics.
103pub trait SnsDelivery: Send + Sync {
104    fn publish_to_topic(&self, topic_arn: &str, message: &str, subject: Option<&str>);
105}
106
107/// Trait for putting events onto an EventBridge bus from cross-service integrations.
108pub trait EventBridgeDelivery: Send + Sync {
109    /// Put an event onto the specified event bus.
110    /// The implementation should handle rule matching and target delivery.
111    fn put_event(&self, source: &str, detail_type: &str, detail: &str, event_bus_name: &str);
112}
113
114/// Trait for invoking Lambda functions from cross-service integrations.
115pub trait LambdaDelivery: Send + Sync {
116    /// Invoke a Lambda function with the given payload.
117    /// The function is identified by ARN. Returns the response bytes on success.
118    fn invoke_lambda(
119        &self,
120        function_arn: &str,
121        payload: &str,
122    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<Vec<u8>, String>> + Send>>;
123}
124
125/// Trait for putting records to Kinesis Data Streams.
126pub trait KinesisDelivery: Send + Sync {
127    /// Put a record to a Kinesis stream identified by ARN.
128    /// The data should be base64-encoded. partition_key is used for shard distribution.
129    fn put_record(&self, stream_arn: &str, data: &str, partition_key: &str);
130}
131
132/// Trait for starting Step Functions executions from cross-service integrations.
133pub trait StepFunctionsDelivery: Send + Sync {
134    /// Start a state machine execution with the given input.
135    /// The state machine is identified by ARN.
136    fn start_execution(&self, state_machine_arn: &str, input: &str);
137}
138
139impl DeliveryBus {
140    pub fn new() -> Self {
141        Self {
142            sqs_sender: None,
143            sns_sender: None,
144            eventbridge_sender: None,
145            lambda_invoker: None,
146            kinesis_sender: None,
147            stepfunctions_starter: None,
148        }
149    }
150
151    pub fn with_sqs(mut self, sender: Arc<dyn SqsDelivery>) -> Self {
152        self.sqs_sender = Some(sender);
153        self
154    }
155
156    pub fn with_sns(mut self, sender: Arc<dyn SnsDelivery>) -> Self {
157        self.sns_sender = Some(sender);
158        self
159    }
160
161    pub fn with_eventbridge(mut self, sender: Arc<dyn EventBridgeDelivery>) -> Self {
162        self.eventbridge_sender = Some(sender);
163        self
164    }
165
166    pub fn with_lambda(mut self, invoker: Arc<dyn LambdaDelivery>) -> Self {
167        self.lambda_invoker = Some(invoker);
168        self
169    }
170
171    pub fn with_kinesis(mut self, sender: Arc<dyn KinesisDelivery>) -> Self {
172        self.kinesis_sender = Some(sender);
173        self
174    }
175
176    pub fn with_stepfunctions(mut self, starter: Arc<dyn StepFunctionsDelivery>) -> Self {
177        self.stepfunctions_starter = Some(starter);
178        self
179    }
180
181    /// Send a message to an SQS queue identified by ARN.
182    pub fn send_to_sqs(
183        &self,
184        queue_arn: &str,
185        message_body: &str,
186        attributes: &HashMap<String, String>,
187    ) {
188        if let Some(ref sender) = self.sqs_sender {
189            sender.deliver_to_queue(queue_arn, message_body, attributes);
190        }
191    }
192
193    /// Send a message to an SQS queue with message attributes and FIFO fields.
194    pub fn send_to_sqs_with_attrs(
195        &self,
196        queue_arn: &str,
197        message_body: &str,
198        message_attributes: &HashMap<String, SqsMessageAttribute>,
199        message_group_id: Option<&str>,
200        message_dedup_id: Option<&str>,
201    ) {
202        if let Some(ref sender) = self.sqs_sender {
203            sender.deliver_to_queue_with_attrs(
204                queue_arn,
205                message_body,
206                message_attributes,
207                message_group_id,
208                message_dedup_id,
209            );
210        }
211    }
212
213    /// Fallible SQS send — returns `Err` when the target queue does not
214    /// exist, so callers (Scheduler) can route to a DLQ. Returns
215    /// `Err(QueueNotFound)` when no SQS sender is wired up at all,
216    /// matching the "target unreachable" semantics Scheduler relies on.
217    pub fn try_send_to_sqs_with_attrs(
218        &self,
219        queue_arn: &str,
220        message_body: &str,
221        message_attributes: &HashMap<String, SqsMessageAttribute>,
222        message_group_id: Option<&str>,
223        message_dedup_id: Option<&str>,
224    ) -> Result<(), SqsDeliveryError> {
225        match self.sqs_sender {
226            Some(ref sender) => sender.try_deliver_to_queue_with_attrs(
227                queue_arn,
228                message_body,
229                message_attributes,
230                message_group_id,
231                message_dedup_id,
232            ),
233            None => Err(SqsDeliveryError::QueueNotFound(queue_arn.to_string())),
234        }
235    }
236
237    /// Publish a message to an SNS topic identified by ARN.
238    pub fn publish_to_sns(&self, topic_arn: &str, message: &str, subject: Option<&str>) {
239        if let Some(ref sender) = self.sns_sender {
240            sender.publish_to_topic(topic_arn, message, subject);
241        }
242    }
243
244    /// Put an event onto an EventBridge bus.
245    pub fn put_event_to_eventbridge(
246        &self,
247        source: &str,
248        detail_type: &str,
249        detail: &str,
250        event_bus_name: &str,
251    ) {
252        if let Some(ref sender) = self.eventbridge_sender {
253            sender.put_event(source, detail_type, detail, event_bus_name);
254        }
255    }
256
257    /// Invoke a Lambda function identified by ARN.
258    pub async fn invoke_lambda(
259        &self,
260        function_arn: &str,
261        payload: &str,
262    ) -> Option<Result<Vec<u8>, String>> {
263        if let Some(ref invoker) = self.lambda_invoker {
264            Some(invoker.invoke_lambda(function_arn, payload).await)
265        } else {
266            None
267        }
268    }
269
270    /// Put a record to a Kinesis stream identified by ARN.
271    pub fn send_to_kinesis(&self, stream_arn: &str, data: &str, partition_key: &str) {
272        if let Some(ref sender) = self.kinesis_sender {
273            sender.put_record(stream_arn, data, partition_key);
274        }
275    }
276
277    /// Start a Step Functions execution identified by state machine ARN.
278    pub fn start_stepfunctions_execution(&self, state_machine_arn: &str, input: &str) {
279        if let Some(ref starter) = self.stepfunctions_starter {
280            starter.start_execution(state_machine_arn, input);
281        }
282    }
283}
284
285impl Default for DeliveryBus {
286    fn default() -> Self {
287        Self::new()
288    }
289}
290
291#[cfg(test)]
292mod tests {
293    use super::*;
294    use std::sync::atomic::{AtomicUsize, Ordering};
295    use std::sync::Arc;
296
297    // Mock implementations
298    struct MockSqs {
299        call_count: AtomicUsize,
300    }
301    impl SqsDelivery for MockSqs {
302        fn deliver_to_queue(
303            &self,
304            _queue_arn: &str,
305            _message_body: &str,
306            _attributes: &HashMap<String, String>,
307        ) {
308            self.call_count.fetch_add(1, Ordering::SeqCst);
309        }
310    }
311
312    struct MockSns {
313        call_count: AtomicUsize,
314    }
315    impl SnsDelivery for MockSns {
316        fn publish_to_topic(&self, _topic_arn: &str, _message: &str, _subject: Option<&str>) {
317            self.call_count.fetch_add(1, Ordering::SeqCst);
318        }
319    }
320
321    struct MockEventBridge {
322        call_count: AtomicUsize,
323    }
324    impl EventBridgeDelivery for MockEventBridge {
325        fn put_event(
326            &self,
327            _source: &str,
328            _detail_type: &str,
329            _detail: &str,
330            _event_bus_name: &str,
331        ) {
332            self.call_count.fetch_add(1, Ordering::SeqCst);
333        }
334    }
335
336    struct MockKinesis {
337        call_count: AtomicUsize,
338    }
339    impl KinesisDelivery for MockKinesis {
340        fn put_record(&self, _stream_arn: &str, _data: &str, _partition_key: &str) {
341            self.call_count.fetch_add(1, Ordering::SeqCst);
342        }
343    }
344
345    struct MockStepFunctions {
346        call_count: AtomicUsize,
347    }
348    impl StepFunctionsDelivery for MockStepFunctions {
349        fn start_execution(&self, _state_machine_arn: &str, _input: &str) {
350            self.call_count.fetch_add(1, Ordering::SeqCst);
351        }
352    }
353
354    #[test]
355    fn delivery_bus_new_has_no_senders() {
356        let bus = DeliveryBus::new();
357        // Calling methods without senders should be no-ops
358        bus.send_to_sqs("arn:queue", "body", &HashMap::new());
359        bus.publish_to_sns("arn:topic", "msg", None);
360        bus.put_event_to_eventbridge("src", "type", "{}", "default");
361        bus.send_to_kinesis("arn:stream", "data", "pk");
362        bus.start_stepfunctions_execution("arn:sfn", "{}");
363        // No panics = success
364    }
365
366    #[test]
367    fn delivery_bus_default_is_same_as_new() {
368        let bus = DeliveryBus::default();
369        bus.send_to_sqs("arn:q", "b", &HashMap::new());
370    }
371
372    #[test]
373    fn send_to_sqs_calls_sender() {
374        let mock = Arc::new(MockSqs {
375            call_count: AtomicUsize::new(0),
376        });
377        let bus = DeliveryBus::new().with_sqs(mock.clone());
378
379        bus.send_to_sqs("arn:queue", "msg", &HashMap::new());
380        assert_eq!(mock.call_count.load(Ordering::SeqCst), 1);
381
382        bus.send_to_sqs("arn:queue2", "msg2", &HashMap::new());
383        assert_eq!(mock.call_count.load(Ordering::SeqCst), 2);
384    }
385
386    #[test]
387    fn send_to_sqs_with_attrs_calls_sender() {
388        let mock = Arc::new(MockSqs {
389            call_count: AtomicUsize::new(0),
390        });
391        let bus = DeliveryBus::new().with_sqs(mock.clone());
392
393        let mut attrs = HashMap::new();
394        attrs.insert(
395            "key".to_string(),
396            SqsMessageAttribute {
397                data_type: "String".to_string(),
398                string_value: Some("val".to_string()),
399                binary_value: None,
400            },
401        );
402        bus.send_to_sqs_with_attrs("arn:q", "body", &attrs, Some("group"), Some("dedup"));
403        assert_eq!(mock.call_count.load(Ordering::SeqCst), 1);
404    }
405
406    #[test]
407    fn publish_to_sns_calls_sender() {
408        let mock = Arc::new(MockSns {
409            call_count: AtomicUsize::new(0),
410        });
411        let bus = DeliveryBus::new().with_sns(mock.clone());
412
413        bus.publish_to_sns("arn:topic", "message", Some("subject"));
414        assert_eq!(mock.call_count.load(Ordering::SeqCst), 1);
415    }
416
417    #[test]
418    fn put_event_to_eventbridge_calls_sender() {
419        let mock = Arc::new(MockEventBridge {
420            call_count: AtomicUsize::new(0),
421        });
422        let bus = DeliveryBus::new().with_eventbridge(mock.clone());
423
424        bus.put_event_to_eventbridge("aws.s3", "Object Created", "{}", "default");
425        assert_eq!(mock.call_count.load(Ordering::SeqCst), 1);
426    }
427
428    #[test]
429    fn send_to_kinesis_calls_sender() {
430        let mock = Arc::new(MockKinesis {
431            call_count: AtomicUsize::new(0),
432        });
433        let bus = DeliveryBus::new().with_kinesis(mock.clone());
434
435        bus.send_to_kinesis("arn:stream", "data", "partition-key");
436        assert_eq!(mock.call_count.load(Ordering::SeqCst), 1);
437    }
438
439    #[test]
440    fn start_stepfunctions_calls_sender() {
441        let mock = Arc::new(MockStepFunctions {
442            call_count: AtomicUsize::new(0),
443        });
444        let bus = DeliveryBus::new().with_stepfunctions(mock.clone());
445
446        bus.start_stepfunctions_execution("arn:sfn:machine", r#"{"key":"val"}"#);
447        assert_eq!(mock.call_count.load(Ordering::SeqCst), 1);
448    }
449
450    #[test]
451    fn builder_chaining_works() {
452        let sqs = Arc::new(MockSqs {
453            call_count: AtomicUsize::new(0),
454        });
455        let sns = Arc::new(MockSns {
456            call_count: AtomicUsize::new(0),
457        });
458        let eb = Arc::new(MockEventBridge {
459            call_count: AtomicUsize::new(0),
460        });
461        let kin = Arc::new(MockKinesis {
462            call_count: AtomicUsize::new(0),
463        });
464        let sfn = Arc::new(MockStepFunctions {
465            call_count: AtomicUsize::new(0),
466        });
467
468        let bus = DeliveryBus::new()
469            .with_sqs(sqs.clone())
470            .with_sns(sns.clone())
471            .with_eventbridge(eb.clone())
472            .with_kinesis(kin.clone())
473            .with_stepfunctions(sfn.clone());
474
475        bus.send_to_sqs("q", "m", &HashMap::new());
476        bus.publish_to_sns("t", "m", None);
477        bus.put_event_to_eventbridge("s", "d", "{}", "b");
478        bus.send_to_kinesis("s", "d", "k");
479        bus.start_stepfunctions_execution("sm", "{}");
480
481        assert_eq!(sqs.call_count.load(Ordering::SeqCst), 1);
482        assert_eq!(sns.call_count.load(Ordering::SeqCst), 1);
483        assert_eq!(eb.call_count.load(Ordering::SeqCst), 1);
484        assert_eq!(kin.call_count.load(Ordering::SeqCst), 1);
485        assert_eq!(sfn.call_count.load(Ordering::SeqCst), 1);
486    }
487
488    #[tokio::test]
489    async fn invoke_lambda_returns_none_without_invoker() {
490        let bus = DeliveryBus::new();
491        let result = bus.invoke_lambda("arn:lambda", "{}").await;
492        assert!(result.is_none());
493    }
494}