Skip to main content

fakecloud_core/
delivery.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3
4/// Cross-service message delivery.
5///
6/// Services use this to deliver messages to other services without
7/// direct dependencies between service crates. The server wires up
8/// the delivery functions at startup.
9pub struct DeliveryBus {
10    /// Deliver a message to an SQS queue by ARN.
11    sqs_sender: Option<Arc<dyn SqsDelivery>>,
12    /// Publish a message to an SNS topic by ARN.
13    sns_sender: Option<Arc<dyn SnsDelivery>>,
14    /// Put an event onto an EventBridge bus.
15    eventbridge_sender: Option<Arc<dyn EventBridgeDelivery>>,
16    /// Invoke a Lambda function by ARN.
17    lambda_invoker: Option<Arc<dyn LambdaDelivery>>,
18    /// Put records to a Kinesis Data Stream by ARN.
19    kinesis_sender: Option<Arc<dyn KinesisDelivery>>,
20    /// Start Step Functions executions.
21    stepfunctions_starter: Option<Arc<dyn StepFunctionsDelivery>>,
22}
23
24/// Message attribute for SQS delivery from SNS.
25#[derive(Debug, Clone)]
26pub struct SqsMessageAttribute {
27    pub data_type: String,
28    pub string_value: Option<String>,
29    pub binary_value: Option<String>,
30}
31
32/// Error returned by fallible SQS delivery. Used by Scheduler's DLQ
33/// routing, which must distinguish "target queue missing" from
34/// "delivered successfully" to decide whether to send to the
35/// `DeadLetterConfig.Arn`.
36#[derive(Debug, Clone, PartialEq, Eq)]
37pub enum SqsDeliveryError {
38    /// The target queue ARN did not resolve to any existing queue.
39    QueueNotFound(String),
40    /// The ARN could not be parsed into a valid SQS queue identifier.
41    InvalidArn(String),
42}
43
44impl std::fmt::Display for SqsDeliveryError {
45    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
46        match self {
47            Self::QueueNotFound(arn) => write!(f, "queue not found: {arn}"),
48            Self::InvalidArn(arn) => write!(f, "invalid queue ARN: {arn}"),
49        }
50    }
51}
52
53impl std::error::Error for SqsDeliveryError {}
54
55/// Trait for delivering messages to SQS queues.
56pub trait SqsDelivery: Send + Sync {
57    fn deliver_to_queue(
58        &self,
59        queue_arn: &str,
60        message_body: &str,
61        attributes: &HashMap<String, String>,
62    );
63
64    /// Deliver with message attributes and FIFO fields
65    fn deliver_to_queue_with_attrs(
66        &self,
67        queue_arn: &str,
68        message_body: &str,
69        message_attributes: &HashMap<String, SqsMessageAttribute>,
70        message_group_id: Option<&str>,
71        message_dedup_id: Option<&str>,
72    ) {
73        // Default implementation: fall back to simple delivery
74        let _ = (message_attributes, message_group_id, message_dedup_id);
75        self.deliver_to_queue(queue_arn, message_body, &HashMap::new());
76    }
77
78    /// Fallible variant used by Scheduler's DLQ routing. Default
79    /// implementation assumes the queue exists (preserving the
80    /// fire-and-forget semantics of `deliver_to_queue`); the real SQS
81    /// impl overrides this to actually look up the queue and report
82    /// `QueueNotFound` so the caller can route to a DLQ.
83    fn try_deliver_to_queue_with_attrs(
84        &self,
85        queue_arn: &str,
86        message_body: &str,
87        message_attributes: &HashMap<String, SqsMessageAttribute>,
88        message_group_id: Option<&str>,
89        message_dedup_id: Option<&str>,
90    ) -> Result<(), SqsDeliveryError> {
91        self.deliver_to_queue_with_attrs(
92            queue_arn,
93            message_body,
94            message_attributes,
95            message_group_id,
96            message_dedup_id,
97        );
98        Ok(())
99    }
100}
101
102/// Trait for publishing messages to SNS topics.
103pub trait SnsDelivery: Send + Sync {
104    fn publish_to_topic(&self, topic_arn: &str, message: &str, subject: Option<&str>);
105}
106
107/// Trait for putting events onto an EventBridge bus from cross-service integrations.
108pub trait EventBridgeDelivery: Send + Sync {
109    /// Put an event onto the specified event bus in the default account.
110    /// The implementation should handle rule matching and target delivery.
111    fn put_event(&self, source: &str, detail_type: &str, detail: &str, event_bus_name: &str);
112
113    /// Put an event onto the specified event bus owned by `target_account_id`.
114    /// Used for cross-account delivery where the source service (e.g. Scheduler)
115    /// has a target ARN containing the destination account. The default impl
116    /// falls back to the default-account `put_event` for backwards compat —
117    /// real implementations should override and route to the target account's
118    /// state.
119    fn put_event_to_account(
120        &self,
121        source: &str,
122        detail_type: &str,
123        detail: &str,
124        event_bus_name: &str,
125        _target_account_id: &str,
126    ) {
127        self.put_event(source, detail_type, detail, event_bus_name);
128    }
129}
130
131/// Trait for invoking Lambda functions from cross-service integrations.
132pub trait LambdaDelivery: Send + Sync {
133    /// Invoke a Lambda function with the given payload.
134    /// The function is identified by ARN. Returns the response bytes on success.
135    fn invoke_lambda(
136        &self,
137        function_arn: &str,
138        payload: &str,
139    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<Vec<u8>, String>> + Send>>;
140}
141
142/// Trait for putting records to Kinesis Data Streams.
143pub trait KinesisDelivery: Send + Sync {
144    /// Put a record to a Kinesis stream identified by ARN.
145    /// The data should be base64-encoded. partition_key is used for shard distribution.
146    fn put_record(&self, stream_arn: &str, data: &str, partition_key: &str);
147}
148
149/// Trait for starting Step Functions executions from cross-service integrations.
150pub trait StepFunctionsDelivery: Send + Sync {
151    /// Start a state machine execution with the given input.
152    /// The state machine is identified by ARN.
153    fn start_execution(&self, state_machine_arn: &str, input: &str);
154}
155
156/// Outbound email dispatch used by services that emulate AWS flows that
157/// route through SES (Cognito verification, etc.) without taking a direct
158/// dependency on the SES crate.
159pub trait EmailDispatcher: Send + Sync {
160    fn send_email(
161        &self,
162        account_id: &str,
163        from: &str,
164        to: &str,
165        subject: &str,
166        body_text: &str,
167        body_html: Option<&str>,
168    );
169}
170
171/// Outbound SMS dispatch used by services that emulate AWS flows that route
172/// through SNS phone-number publish (Cognito SMS MFA, etc.).
173pub trait SmsDispatcher: Send + Sync {
174    fn send_sms(&self, account_id: &str, phone_number: &str, message: &str);
175}
176
177/// Cross-service KMS hook: services that accept a `KmsKeyId` (Secrets
178/// Manager, SSM `SecureString`, S3 SSE-KMS, SQS / SNS / DynamoDB
179/// encrypted resources) call this so that real KMS calls happen, the
180/// invocation is recorded for introspection, and the returned blob is
181/// decryptable by the public KMS API.
182///
183/// Encryption context is the AWS-defined per-service map (e.g.
184/// `{aws:secretsmanager:secretArn: <arn>}` for Secrets Manager) and is
185/// recorded with the call so test code can assert it.
186pub trait KmsHook: Send + Sync {
187    fn encrypt(
188        &self,
189        account_id: &str,
190        region: &str,
191        key_id: &str,
192        plaintext: &[u8],
193        service_principal: &str,
194        encryption_context: std::collections::HashMap<String, String>,
195    ) -> Result<String, String>;
196
197    fn decrypt(
198        &self,
199        account_id: &str,
200        ciphertext_b64: &str,
201        service_principal: &str,
202        encryption_context: std::collections::HashMap<String, String>,
203    ) -> Result<Vec<u8>, String>;
204}
205
206impl DeliveryBus {
207    pub fn new() -> Self {
208        Self {
209            sqs_sender: None,
210            sns_sender: None,
211            eventbridge_sender: None,
212            lambda_invoker: None,
213            kinesis_sender: None,
214            stepfunctions_starter: None,
215        }
216    }
217
218    pub fn with_sqs(mut self, sender: Arc<dyn SqsDelivery>) -> Self {
219        self.sqs_sender = Some(sender);
220        self
221    }
222
223    pub fn with_sns(mut self, sender: Arc<dyn SnsDelivery>) -> Self {
224        self.sns_sender = Some(sender);
225        self
226    }
227
228    pub fn with_eventbridge(mut self, sender: Arc<dyn EventBridgeDelivery>) -> Self {
229        self.eventbridge_sender = Some(sender);
230        self
231    }
232
233    pub fn with_lambda(mut self, invoker: Arc<dyn LambdaDelivery>) -> Self {
234        self.lambda_invoker = Some(invoker);
235        self
236    }
237
238    pub fn with_kinesis(mut self, sender: Arc<dyn KinesisDelivery>) -> Self {
239        self.kinesis_sender = Some(sender);
240        self
241    }
242
243    pub fn with_stepfunctions(mut self, starter: Arc<dyn StepFunctionsDelivery>) -> Self {
244        self.stepfunctions_starter = Some(starter);
245        self
246    }
247
248    /// Send a message to an SQS queue identified by ARN.
249    pub fn send_to_sqs(
250        &self,
251        queue_arn: &str,
252        message_body: &str,
253        attributes: &HashMap<String, String>,
254    ) {
255        if let Some(ref sender) = self.sqs_sender {
256            sender.deliver_to_queue(queue_arn, message_body, attributes);
257        }
258    }
259
260    /// Send a message to an SQS queue with message attributes and FIFO fields.
261    pub fn send_to_sqs_with_attrs(
262        &self,
263        queue_arn: &str,
264        message_body: &str,
265        message_attributes: &HashMap<String, SqsMessageAttribute>,
266        message_group_id: Option<&str>,
267        message_dedup_id: Option<&str>,
268    ) {
269        if let Some(ref sender) = self.sqs_sender {
270            sender.deliver_to_queue_with_attrs(
271                queue_arn,
272                message_body,
273                message_attributes,
274                message_group_id,
275                message_dedup_id,
276            );
277        }
278    }
279
280    /// Fallible SQS send — returns `Err` when the target queue does not
281    /// exist, so callers (Scheduler) can route to a DLQ. Returns
282    /// `Err(QueueNotFound)` when no SQS sender is wired up at all,
283    /// matching the "target unreachable" semantics Scheduler relies on.
284    pub fn try_send_to_sqs_with_attrs(
285        &self,
286        queue_arn: &str,
287        message_body: &str,
288        message_attributes: &HashMap<String, SqsMessageAttribute>,
289        message_group_id: Option<&str>,
290        message_dedup_id: Option<&str>,
291    ) -> Result<(), SqsDeliveryError> {
292        match self.sqs_sender {
293            Some(ref sender) => sender.try_deliver_to_queue_with_attrs(
294                queue_arn,
295                message_body,
296                message_attributes,
297                message_group_id,
298                message_dedup_id,
299            ),
300            None => Err(SqsDeliveryError::QueueNotFound(queue_arn.to_string())),
301        }
302    }
303
304    /// Publish a message to an SNS topic identified by ARN.
305    pub fn publish_to_sns(&self, topic_arn: &str, message: &str, subject: Option<&str>) {
306        if let Some(ref sender) = self.sns_sender {
307            sender.publish_to_topic(topic_arn, message, subject);
308        }
309    }
310
311    /// Put an event onto an EventBridge bus in the default account.
312    pub fn put_event_to_eventbridge(
313        &self,
314        source: &str,
315        detail_type: &str,
316        detail: &str,
317        event_bus_name: &str,
318    ) {
319        if let Some(ref sender) = self.eventbridge_sender {
320            sender.put_event(source, detail_type, detail, event_bus_name);
321        }
322    }
323
324    /// Put an event onto an EventBridge bus in a specific account. Used by
325    /// Scheduler to deliver to cross-account event buses.
326    pub fn put_event_to_eventbridge_for_account(
327        &self,
328        source: &str,
329        detail_type: &str,
330        detail: &str,
331        event_bus_name: &str,
332        target_account_id: &str,
333    ) {
334        if let Some(ref sender) = self.eventbridge_sender {
335            sender.put_event_to_account(
336                source,
337                detail_type,
338                detail,
339                event_bus_name,
340                target_account_id,
341            );
342        }
343    }
344
345    /// Invoke a Lambda function identified by ARN.
346    pub async fn invoke_lambda(
347        &self,
348        function_arn: &str,
349        payload: &str,
350    ) -> Option<Result<Vec<u8>, String>> {
351        if let Some(ref invoker) = self.lambda_invoker {
352            Some(invoker.invoke_lambda(function_arn, payload).await)
353        } else {
354            None
355        }
356    }
357
358    /// Put a record to a Kinesis stream identified by ARN.
359    pub fn send_to_kinesis(&self, stream_arn: &str, data: &str, partition_key: &str) {
360        if let Some(ref sender) = self.kinesis_sender {
361            sender.put_record(stream_arn, data, partition_key);
362        }
363    }
364
365    /// Start a Step Functions execution identified by state machine ARN.
366    pub fn start_stepfunctions_execution(&self, state_machine_arn: &str, input: &str) {
367        if let Some(ref starter) = self.stepfunctions_starter {
368            starter.start_execution(state_machine_arn, input);
369        }
370    }
371}
372
373impl Default for DeliveryBus {
374    fn default() -> Self {
375        Self::new()
376    }
377}
378
379#[cfg(test)]
380mod tests {
381    use super::*;
382    use std::sync::atomic::{AtomicUsize, Ordering};
383    use std::sync::Arc;
384
385    // Mock implementations
386    struct MockSqs {
387        call_count: AtomicUsize,
388    }
389    impl SqsDelivery for MockSqs {
390        fn deliver_to_queue(
391            &self,
392            _queue_arn: &str,
393            _message_body: &str,
394            _attributes: &HashMap<String, String>,
395        ) {
396            self.call_count.fetch_add(1, Ordering::SeqCst);
397        }
398    }
399
400    struct MockSns {
401        call_count: AtomicUsize,
402    }
403    impl SnsDelivery for MockSns {
404        fn publish_to_topic(&self, _topic_arn: &str, _message: &str, _subject: Option<&str>) {
405            self.call_count.fetch_add(1, Ordering::SeqCst);
406        }
407    }
408
409    struct MockEventBridge {
410        call_count: AtomicUsize,
411    }
412    impl EventBridgeDelivery for MockEventBridge {
413        fn put_event(
414            &self,
415            _source: &str,
416            _detail_type: &str,
417            _detail: &str,
418            _event_bus_name: &str,
419        ) {
420            self.call_count.fetch_add(1, Ordering::SeqCst);
421        }
422    }
423
424    struct MockKinesis {
425        call_count: AtomicUsize,
426    }
427    impl KinesisDelivery for MockKinesis {
428        fn put_record(&self, _stream_arn: &str, _data: &str, _partition_key: &str) {
429            self.call_count.fetch_add(1, Ordering::SeqCst);
430        }
431    }
432
433    struct MockStepFunctions {
434        call_count: AtomicUsize,
435    }
436    impl StepFunctionsDelivery for MockStepFunctions {
437        fn start_execution(&self, _state_machine_arn: &str, _input: &str) {
438            self.call_count.fetch_add(1, Ordering::SeqCst);
439        }
440    }
441
442    #[test]
443    fn delivery_bus_new_has_no_senders() {
444        let bus = DeliveryBus::new();
445        // Calling methods without senders should be no-ops
446        bus.send_to_sqs("arn:queue", "body", &HashMap::new());
447        bus.publish_to_sns("arn:topic", "msg", None);
448        bus.put_event_to_eventbridge("src", "type", "{}", "default");
449        bus.send_to_kinesis("arn:stream", "data", "pk");
450        bus.start_stepfunctions_execution("arn:sfn", "{}");
451        // No panics = success
452    }
453
454    #[test]
455    fn delivery_bus_default_is_same_as_new() {
456        let bus = DeliveryBus::default();
457        bus.send_to_sqs("arn:q", "b", &HashMap::new());
458    }
459
460    #[test]
461    fn send_to_sqs_calls_sender() {
462        let mock = Arc::new(MockSqs {
463            call_count: AtomicUsize::new(0),
464        });
465        let bus = DeliveryBus::new().with_sqs(mock.clone());
466
467        bus.send_to_sqs("arn:queue", "msg", &HashMap::new());
468        assert_eq!(mock.call_count.load(Ordering::SeqCst), 1);
469
470        bus.send_to_sqs("arn:queue2", "msg2", &HashMap::new());
471        assert_eq!(mock.call_count.load(Ordering::SeqCst), 2);
472    }
473
474    #[test]
475    fn send_to_sqs_with_attrs_calls_sender() {
476        let mock = Arc::new(MockSqs {
477            call_count: AtomicUsize::new(0),
478        });
479        let bus = DeliveryBus::new().with_sqs(mock.clone());
480
481        let mut attrs = HashMap::new();
482        attrs.insert(
483            "key".to_string(),
484            SqsMessageAttribute {
485                data_type: "String".to_string(),
486                string_value: Some("val".to_string()),
487                binary_value: None,
488            },
489        );
490        bus.send_to_sqs_with_attrs("arn:q", "body", &attrs, Some("group"), Some("dedup"));
491        assert_eq!(mock.call_count.load(Ordering::SeqCst), 1);
492    }
493
494    #[test]
495    fn publish_to_sns_calls_sender() {
496        let mock = Arc::new(MockSns {
497            call_count: AtomicUsize::new(0),
498        });
499        let bus = DeliveryBus::new().with_sns(mock.clone());
500
501        bus.publish_to_sns("arn:topic", "message", Some("subject"));
502        assert_eq!(mock.call_count.load(Ordering::SeqCst), 1);
503    }
504
505    #[test]
506    fn put_event_to_eventbridge_calls_sender() {
507        let mock = Arc::new(MockEventBridge {
508            call_count: AtomicUsize::new(0),
509        });
510        let bus = DeliveryBus::new().with_eventbridge(mock.clone());
511
512        bus.put_event_to_eventbridge("aws.s3", "Object Created", "{}", "default");
513        assert_eq!(mock.call_count.load(Ordering::SeqCst), 1);
514    }
515
516    #[test]
517    fn send_to_kinesis_calls_sender() {
518        let mock = Arc::new(MockKinesis {
519            call_count: AtomicUsize::new(0),
520        });
521        let bus = DeliveryBus::new().with_kinesis(mock.clone());
522
523        bus.send_to_kinesis("arn:stream", "data", "partition-key");
524        assert_eq!(mock.call_count.load(Ordering::SeqCst), 1);
525    }
526
527    #[test]
528    fn start_stepfunctions_calls_sender() {
529        let mock = Arc::new(MockStepFunctions {
530            call_count: AtomicUsize::new(0),
531        });
532        let bus = DeliveryBus::new().with_stepfunctions(mock.clone());
533
534        bus.start_stepfunctions_execution("arn:sfn:machine", r#"{"key":"val"}"#);
535        assert_eq!(mock.call_count.load(Ordering::SeqCst), 1);
536    }
537
538    #[test]
539    fn builder_chaining_works() {
540        let sqs = Arc::new(MockSqs {
541            call_count: AtomicUsize::new(0),
542        });
543        let sns = Arc::new(MockSns {
544            call_count: AtomicUsize::new(0),
545        });
546        let eb = Arc::new(MockEventBridge {
547            call_count: AtomicUsize::new(0),
548        });
549        let kin = Arc::new(MockKinesis {
550            call_count: AtomicUsize::new(0),
551        });
552        let sfn = Arc::new(MockStepFunctions {
553            call_count: AtomicUsize::new(0),
554        });
555
556        let bus = DeliveryBus::new()
557            .with_sqs(sqs.clone())
558            .with_sns(sns.clone())
559            .with_eventbridge(eb.clone())
560            .with_kinesis(kin.clone())
561            .with_stepfunctions(sfn.clone());
562
563        bus.send_to_sqs("q", "m", &HashMap::new());
564        bus.publish_to_sns("t", "m", None);
565        bus.put_event_to_eventbridge("s", "d", "{}", "b");
566        bus.send_to_kinesis("s", "d", "k");
567        bus.start_stepfunctions_execution("sm", "{}");
568
569        assert_eq!(sqs.call_count.load(Ordering::SeqCst), 1);
570        assert_eq!(sns.call_count.load(Ordering::SeqCst), 1);
571        assert_eq!(eb.call_count.load(Ordering::SeqCst), 1);
572        assert_eq!(kin.call_count.load(Ordering::SeqCst), 1);
573        assert_eq!(sfn.call_count.load(Ordering::SeqCst), 1);
574    }
575
576    #[tokio::test]
577    async fn invoke_lambda_returns_none_without_invoker() {
578        let bus = DeliveryBus::new();
579        let result = bus.invoke_lambda("arn:lambda", "{}").await;
580        assert!(result.is_none());
581    }
582}