Skip to main content

fakecloud_core/
delivery.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3
4/// Cross-service message delivery.
5///
6/// Services use this to deliver messages to other services without
7/// direct dependencies between service crates. The server wires up
8/// the delivery functions at startup.
9pub struct DeliveryBus {
10    /// Deliver a message to an SQS queue by ARN.
11    sqs_sender: Option<Arc<dyn SqsDelivery>>,
12    /// Publish a message to an SNS topic by ARN.
13    sns_sender: Option<Arc<dyn SnsDelivery>>,
14    /// Put an event onto an EventBridge bus.
15    eventbridge_sender: Option<Arc<dyn EventBridgeDelivery>>,
16    /// Invoke a Lambda function by ARN.
17    lambda_invoker: Option<Arc<dyn LambdaDelivery>>,
18    /// Put records to a Kinesis Data Stream by ARN.
19    kinesis_sender: Option<Arc<dyn KinesisDelivery>>,
20    /// Start Step Functions executions.
21    stepfunctions_starter: Option<Arc<dyn StepFunctionsDelivery>>,
22}
23
24/// Message attribute for SQS delivery from SNS.
25#[derive(Debug, Clone)]
26pub struct SqsMessageAttribute {
27    pub data_type: String,
28    pub string_value: Option<String>,
29    pub binary_value: Option<String>,
30}
31
32/// Error returned by fallible SQS delivery. Used by Scheduler's DLQ
33/// routing, which must distinguish "target queue missing" from
34/// "delivered successfully" to decide whether to send to the
35/// `DeadLetterConfig.Arn`.
36#[derive(Debug, Clone, PartialEq, Eq)]
37pub enum SqsDeliveryError {
38    /// The target queue ARN did not resolve to any existing queue.
39    QueueNotFound(String),
40    /// The ARN could not be parsed into a valid SQS queue identifier.
41    InvalidArn(String),
42}
43
44impl std::fmt::Display for SqsDeliveryError {
45    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
46        match self {
47            Self::QueueNotFound(arn) => write!(f, "queue not found: {arn}"),
48            Self::InvalidArn(arn) => write!(f, "invalid queue ARN: {arn}"),
49        }
50    }
51}
52
53impl std::error::Error for SqsDeliveryError {}
54
55/// Trait for delivering messages to SQS queues.
56pub trait SqsDelivery: Send + Sync {
57    fn deliver_to_queue(
58        &self,
59        queue_arn: &str,
60        message_body: &str,
61        attributes: &HashMap<String, String>,
62    );
63
64    /// Deliver with message attributes and FIFO fields
65    fn deliver_to_queue_with_attrs(
66        &self,
67        queue_arn: &str,
68        message_body: &str,
69        message_attributes: &HashMap<String, SqsMessageAttribute>,
70        message_group_id: Option<&str>,
71        message_dedup_id: Option<&str>,
72    ) {
73        // Default implementation: fall back to simple delivery
74        let _ = (message_attributes, message_group_id, message_dedup_id);
75        self.deliver_to_queue(queue_arn, message_body, &HashMap::new());
76    }
77
78    /// Fallible variant used by Scheduler's DLQ routing. Default
79    /// implementation assumes the queue exists (preserving the
80    /// fire-and-forget semantics of `deliver_to_queue`); the real SQS
81    /// impl overrides this to actually look up the queue and report
82    /// `QueueNotFound` so the caller can route to a DLQ.
83    fn try_deliver_to_queue_with_attrs(
84        &self,
85        queue_arn: &str,
86        message_body: &str,
87        message_attributes: &HashMap<String, SqsMessageAttribute>,
88        message_group_id: Option<&str>,
89        message_dedup_id: Option<&str>,
90    ) -> Result<(), SqsDeliveryError> {
91        self.deliver_to_queue_with_attrs(
92            queue_arn,
93            message_body,
94            message_attributes,
95            message_group_id,
96            message_dedup_id,
97        );
98        Ok(())
99    }
100}
101
102/// Trait for publishing messages to SNS topics.
103pub trait SnsDelivery: Send + Sync {
104    fn publish_to_topic(&self, topic_arn: &str, message: &str, subject: Option<&str>);
105}
106
107/// Trait for putting events onto an EventBridge bus from cross-service integrations.
108pub trait EventBridgeDelivery: Send + Sync {
109    /// Put an event onto the specified event bus.
110    /// The implementation should handle rule matching and target delivery.
111    fn put_event(&self, source: &str, detail_type: &str, detail: &str, event_bus_name: &str);
112}
113
114/// Trait for invoking Lambda functions from cross-service integrations.
115pub trait LambdaDelivery: Send + Sync {
116    /// Invoke a Lambda function with the given payload.
117    /// The function is identified by ARN. Returns the response bytes on success.
118    fn invoke_lambda(
119        &self,
120        function_arn: &str,
121        payload: &str,
122    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<Vec<u8>, String>> + Send>>;
123}
124
125/// Trait for putting records to Kinesis Data Streams.
126pub trait KinesisDelivery: Send + Sync {
127    /// Put a record to a Kinesis stream identified by ARN.
128    /// The data should be base64-encoded. partition_key is used for shard distribution.
129    fn put_record(&self, stream_arn: &str, data: &str, partition_key: &str);
130}
131
132/// Trait for starting Step Functions executions from cross-service integrations.
133pub trait StepFunctionsDelivery: Send + Sync {
134    /// Start a state machine execution with the given input.
135    /// The state machine is identified by ARN.
136    fn start_execution(&self, state_machine_arn: &str, input: &str);
137}
138
139/// Outbound email dispatch used by services that emulate AWS flows that
140/// route through SES (Cognito verification, etc.) without taking a direct
141/// dependency on the SES crate.
142pub trait EmailDispatcher: Send + Sync {
143    fn send_email(
144        &self,
145        account_id: &str,
146        from: &str,
147        to: &str,
148        subject: &str,
149        body_text: &str,
150        body_html: Option<&str>,
151    );
152}
153
154/// Outbound SMS dispatch used by services that emulate AWS flows that route
155/// through SNS phone-number publish (Cognito SMS MFA, etc.).
156pub trait SmsDispatcher: Send + Sync {
157    fn send_sms(&self, account_id: &str, phone_number: &str, message: &str);
158}
159
160/// Cross-service KMS hook: services that accept a `KmsKeyId` (Secrets
161/// Manager, SSM `SecureString`, S3 SSE-KMS, SQS / SNS / DynamoDB
162/// encrypted resources) call this so that real KMS calls happen, the
163/// invocation is recorded for introspection, and the returned blob is
164/// decryptable by the public KMS API.
165///
166/// Encryption context is the AWS-defined per-service map (e.g.
167/// `{aws:secretsmanager:secretArn: <arn>}` for Secrets Manager) and is
168/// recorded with the call so test code can assert it.
169pub trait KmsHook: Send + Sync {
170    fn encrypt(
171        &self,
172        account_id: &str,
173        region: &str,
174        key_id: &str,
175        plaintext: &[u8],
176        service_principal: &str,
177        encryption_context: std::collections::HashMap<String, String>,
178    ) -> Result<String, String>;
179
180    fn decrypt(
181        &self,
182        account_id: &str,
183        ciphertext_b64: &str,
184        service_principal: &str,
185        encryption_context: std::collections::HashMap<String, String>,
186    ) -> Result<Vec<u8>, String>;
187}
188
189impl DeliveryBus {
190    pub fn new() -> Self {
191        Self {
192            sqs_sender: None,
193            sns_sender: None,
194            eventbridge_sender: None,
195            lambda_invoker: None,
196            kinesis_sender: None,
197            stepfunctions_starter: None,
198        }
199    }
200
201    pub fn with_sqs(mut self, sender: Arc<dyn SqsDelivery>) -> Self {
202        self.sqs_sender = Some(sender);
203        self
204    }
205
206    pub fn with_sns(mut self, sender: Arc<dyn SnsDelivery>) -> Self {
207        self.sns_sender = Some(sender);
208        self
209    }
210
211    pub fn with_eventbridge(mut self, sender: Arc<dyn EventBridgeDelivery>) -> Self {
212        self.eventbridge_sender = Some(sender);
213        self
214    }
215
216    pub fn with_lambda(mut self, invoker: Arc<dyn LambdaDelivery>) -> Self {
217        self.lambda_invoker = Some(invoker);
218        self
219    }
220
221    pub fn with_kinesis(mut self, sender: Arc<dyn KinesisDelivery>) -> Self {
222        self.kinesis_sender = Some(sender);
223        self
224    }
225
226    pub fn with_stepfunctions(mut self, starter: Arc<dyn StepFunctionsDelivery>) -> Self {
227        self.stepfunctions_starter = Some(starter);
228        self
229    }
230
231    /// Send a message to an SQS queue identified by ARN.
232    pub fn send_to_sqs(
233        &self,
234        queue_arn: &str,
235        message_body: &str,
236        attributes: &HashMap<String, String>,
237    ) {
238        if let Some(ref sender) = self.sqs_sender {
239            sender.deliver_to_queue(queue_arn, message_body, attributes);
240        }
241    }
242
243    /// Send a message to an SQS queue with message attributes and FIFO fields.
244    pub fn send_to_sqs_with_attrs(
245        &self,
246        queue_arn: &str,
247        message_body: &str,
248        message_attributes: &HashMap<String, SqsMessageAttribute>,
249        message_group_id: Option<&str>,
250        message_dedup_id: Option<&str>,
251    ) {
252        if let Some(ref sender) = self.sqs_sender {
253            sender.deliver_to_queue_with_attrs(
254                queue_arn,
255                message_body,
256                message_attributes,
257                message_group_id,
258                message_dedup_id,
259            );
260        }
261    }
262
263    /// Fallible SQS send — returns `Err` when the target queue does not
264    /// exist, so callers (Scheduler) can route to a DLQ. Returns
265    /// `Err(QueueNotFound)` when no SQS sender is wired up at all,
266    /// matching the "target unreachable" semantics Scheduler relies on.
267    pub fn try_send_to_sqs_with_attrs(
268        &self,
269        queue_arn: &str,
270        message_body: &str,
271        message_attributes: &HashMap<String, SqsMessageAttribute>,
272        message_group_id: Option<&str>,
273        message_dedup_id: Option<&str>,
274    ) -> Result<(), SqsDeliveryError> {
275        match self.sqs_sender {
276            Some(ref sender) => sender.try_deliver_to_queue_with_attrs(
277                queue_arn,
278                message_body,
279                message_attributes,
280                message_group_id,
281                message_dedup_id,
282            ),
283            None => Err(SqsDeliveryError::QueueNotFound(queue_arn.to_string())),
284        }
285    }
286
287    /// Publish a message to an SNS topic identified by ARN.
288    pub fn publish_to_sns(&self, topic_arn: &str, message: &str, subject: Option<&str>) {
289        if let Some(ref sender) = self.sns_sender {
290            sender.publish_to_topic(topic_arn, message, subject);
291        }
292    }
293
294    /// Put an event onto an EventBridge bus.
295    pub fn put_event_to_eventbridge(
296        &self,
297        source: &str,
298        detail_type: &str,
299        detail: &str,
300        event_bus_name: &str,
301    ) {
302        if let Some(ref sender) = self.eventbridge_sender {
303            sender.put_event(source, detail_type, detail, event_bus_name);
304        }
305    }
306
307    /// Invoke a Lambda function identified by ARN.
308    pub async fn invoke_lambda(
309        &self,
310        function_arn: &str,
311        payload: &str,
312    ) -> Option<Result<Vec<u8>, String>> {
313        if let Some(ref invoker) = self.lambda_invoker {
314            Some(invoker.invoke_lambda(function_arn, payload).await)
315        } else {
316            None
317        }
318    }
319
320    /// Put a record to a Kinesis stream identified by ARN.
321    pub fn send_to_kinesis(&self, stream_arn: &str, data: &str, partition_key: &str) {
322        if let Some(ref sender) = self.kinesis_sender {
323            sender.put_record(stream_arn, data, partition_key);
324        }
325    }
326
327    /// Start a Step Functions execution identified by state machine ARN.
328    pub fn start_stepfunctions_execution(&self, state_machine_arn: &str, input: &str) {
329        if let Some(ref starter) = self.stepfunctions_starter {
330            starter.start_execution(state_machine_arn, input);
331        }
332    }
333}
334
335impl Default for DeliveryBus {
336    fn default() -> Self {
337        Self::new()
338    }
339}
340
341#[cfg(test)]
342mod tests {
343    use super::*;
344    use std::sync::atomic::{AtomicUsize, Ordering};
345    use std::sync::Arc;
346
347    // Mock implementations
348    struct MockSqs {
349        call_count: AtomicUsize,
350    }
351    impl SqsDelivery for MockSqs {
352        fn deliver_to_queue(
353            &self,
354            _queue_arn: &str,
355            _message_body: &str,
356            _attributes: &HashMap<String, String>,
357        ) {
358            self.call_count.fetch_add(1, Ordering::SeqCst);
359        }
360    }
361
362    struct MockSns {
363        call_count: AtomicUsize,
364    }
365    impl SnsDelivery for MockSns {
366        fn publish_to_topic(&self, _topic_arn: &str, _message: &str, _subject: Option<&str>) {
367            self.call_count.fetch_add(1, Ordering::SeqCst);
368        }
369    }
370
371    struct MockEventBridge {
372        call_count: AtomicUsize,
373    }
374    impl EventBridgeDelivery for MockEventBridge {
375        fn put_event(
376            &self,
377            _source: &str,
378            _detail_type: &str,
379            _detail: &str,
380            _event_bus_name: &str,
381        ) {
382            self.call_count.fetch_add(1, Ordering::SeqCst);
383        }
384    }
385
386    struct MockKinesis {
387        call_count: AtomicUsize,
388    }
389    impl KinesisDelivery for MockKinesis {
390        fn put_record(&self, _stream_arn: &str, _data: &str, _partition_key: &str) {
391            self.call_count.fetch_add(1, Ordering::SeqCst);
392        }
393    }
394
395    struct MockStepFunctions {
396        call_count: AtomicUsize,
397    }
398    impl StepFunctionsDelivery for MockStepFunctions {
399        fn start_execution(&self, _state_machine_arn: &str, _input: &str) {
400            self.call_count.fetch_add(1, Ordering::SeqCst);
401        }
402    }
403
404    #[test]
405    fn delivery_bus_new_has_no_senders() {
406        let bus = DeliveryBus::new();
407        // Calling methods without senders should be no-ops
408        bus.send_to_sqs("arn:queue", "body", &HashMap::new());
409        bus.publish_to_sns("arn:topic", "msg", None);
410        bus.put_event_to_eventbridge("src", "type", "{}", "default");
411        bus.send_to_kinesis("arn:stream", "data", "pk");
412        bus.start_stepfunctions_execution("arn:sfn", "{}");
413        // No panics = success
414    }
415
416    #[test]
417    fn delivery_bus_default_is_same_as_new() {
418        let bus = DeliveryBus::default();
419        bus.send_to_sqs("arn:q", "b", &HashMap::new());
420    }
421
422    #[test]
423    fn send_to_sqs_calls_sender() {
424        let mock = Arc::new(MockSqs {
425            call_count: AtomicUsize::new(0),
426        });
427        let bus = DeliveryBus::new().with_sqs(mock.clone());
428
429        bus.send_to_sqs("arn:queue", "msg", &HashMap::new());
430        assert_eq!(mock.call_count.load(Ordering::SeqCst), 1);
431
432        bus.send_to_sqs("arn:queue2", "msg2", &HashMap::new());
433        assert_eq!(mock.call_count.load(Ordering::SeqCst), 2);
434    }
435
436    #[test]
437    fn send_to_sqs_with_attrs_calls_sender() {
438        let mock = Arc::new(MockSqs {
439            call_count: AtomicUsize::new(0),
440        });
441        let bus = DeliveryBus::new().with_sqs(mock.clone());
442
443        let mut attrs = HashMap::new();
444        attrs.insert(
445            "key".to_string(),
446            SqsMessageAttribute {
447                data_type: "String".to_string(),
448                string_value: Some("val".to_string()),
449                binary_value: None,
450            },
451        );
452        bus.send_to_sqs_with_attrs("arn:q", "body", &attrs, Some("group"), Some("dedup"));
453        assert_eq!(mock.call_count.load(Ordering::SeqCst), 1);
454    }
455
456    #[test]
457    fn publish_to_sns_calls_sender() {
458        let mock = Arc::new(MockSns {
459            call_count: AtomicUsize::new(0),
460        });
461        let bus = DeliveryBus::new().with_sns(mock.clone());
462
463        bus.publish_to_sns("arn:topic", "message", Some("subject"));
464        assert_eq!(mock.call_count.load(Ordering::SeqCst), 1);
465    }
466
467    #[test]
468    fn put_event_to_eventbridge_calls_sender() {
469        let mock = Arc::new(MockEventBridge {
470            call_count: AtomicUsize::new(0),
471        });
472        let bus = DeliveryBus::new().with_eventbridge(mock.clone());
473
474        bus.put_event_to_eventbridge("aws.s3", "Object Created", "{}", "default");
475        assert_eq!(mock.call_count.load(Ordering::SeqCst), 1);
476    }
477
478    #[test]
479    fn send_to_kinesis_calls_sender() {
480        let mock = Arc::new(MockKinesis {
481            call_count: AtomicUsize::new(0),
482        });
483        let bus = DeliveryBus::new().with_kinesis(mock.clone());
484
485        bus.send_to_kinesis("arn:stream", "data", "partition-key");
486        assert_eq!(mock.call_count.load(Ordering::SeqCst), 1);
487    }
488
489    #[test]
490    fn start_stepfunctions_calls_sender() {
491        let mock = Arc::new(MockStepFunctions {
492            call_count: AtomicUsize::new(0),
493        });
494        let bus = DeliveryBus::new().with_stepfunctions(mock.clone());
495
496        bus.start_stepfunctions_execution("arn:sfn:machine", r#"{"key":"val"}"#);
497        assert_eq!(mock.call_count.load(Ordering::SeqCst), 1);
498    }
499
500    #[test]
501    fn builder_chaining_works() {
502        let sqs = Arc::new(MockSqs {
503            call_count: AtomicUsize::new(0),
504        });
505        let sns = Arc::new(MockSns {
506            call_count: AtomicUsize::new(0),
507        });
508        let eb = Arc::new(MockEventBridge {
509            call_count: AtomicUsize::new(0),
510        });
511        let kin = Arc::new(MockKinesis {
512            call_count: AtomicUsize::new(0),
513        });
514        let sfn = Arc::new(MockStepFunctions {
515            call_count: AtomicUsize::new(0),
516        });
517
518        let bus = DeliveryBus::new()
519            .with_sqs(sqs.clone())
520            .with_sns(sns.clone())
521            .with_eventbridge(eb.clone())
522            .with_kinesis(kin.clone())
523            .with_stepfunctions(sfn.clone());
524
525        bus.send_to_sqs("q", "m", &HashMap::new());
526        bus.publish_to_sns("t", "m", None);
527        bus.put_event_to_eventbridge("s", "d", "{}", "b");
528        bus.send_to_kinesis("s", "d", "k");
529        bus.start_stepfunctions_execution("sm", "{}");
530
531        assert_eq!(sqs.call_count.load(Ordering::SeqCst), 1);
532        assert_eq!(sns.call_count.load(Ordering::SeqCst), 1);
533        assert_eq!(eb.call_count.load(Ordering::SeqCst), 1);
534        assert_eq!(kin.call_count.load(Ordering::SeqCst), 1);
535        assert_eq!(sfn.call_count.load(Ordering::SeqCst), 1);
536    }
537
538    #[tokio::test]
539    async fn invoke_lambda_returns_none_without_invoker() {
540        let bus = DeliveryBus::new();
541        let result = bus.invoke_lambda("arn:lambda", "{}").await;
542        assert!(result.is_none());
543    }
544}