Skip to main content

fakecloud_core/
delivery.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3
4/// Cross-service message delivery.
5///
6/// Services use this to deliver messages to other services without
7/// direct dependencies between service crates. The server wires up
8/// the delivery functions at startup.
9pub struct DeliveryBus {
10    /// Deliver a message to an SQS queue by ARN.
11    sqs_sender: Option<Arc<dyn SqsDelivery>>,
12    /// Publish a message to an SNS topic by ARN.
13    sns_sender: Option<Arc<dyn SnsDelivery>>,
14    /// Put an event onto an EventBridge bus.
15    eventbridge_sender: Option<Arc<dyn EventBridgeDelivery>>,
16    /// Invoke a Lambda function by ARN.
17    lambda_invoker: Option<Arc<dyn LambdaDelivery>>,
18    /// Put records to a Kinesis Data Stream by ARN.
19    kinesis_sender: Option<Arc<dyn KinesisDelivery>>,
20    /// Start Step Functions executions.
21    stepfunctions_starter: Option<Arc<dyn StepFunctionsDelivery>>,
22    /// Write objects to S3 buckets.
23    s3_writer: Option<Arc<dyn S3Delivery>>,
24    /// Put records into Firehose delivery streams.
25    firehose_sender: Option<Arc<dyn FirehoseDelivery>>,
26    /// Send a high-level SES SendEmail call (cross-service universal target).
27    ses_dispatcher: Option<Arc<dyn SesSendEmailDispatcher>>,
28    /// Run an ECS task on a cluster (cross-service universal target).
29    ecs_task_runner: Option<Arc<dyn EcsTaskRunner>>,
30    /// Register/deregister ELBv2 targets from ECS runtime.
31    elbv2_target_registration: Option<Arc<dyn Elbv2TargetRegistration>>,
32    /// Publish CloudWatch metric data points (CloudWatch Logs metric
33    /// filters extract these on PutLogEvents).
34    cloudwatch_metrics: Option<Arc<dyn CloudwatchDelivery>>,
35    /// Put log events into CloudWatch Logs log groups. Used by Step
36    /// Functions Express execution logging and ECS awslogs driver.
37    cloudwatch_logs: Option<Arc<dyn CloudwatchLogsDelivery>>,
38    /// Verify a Cognito-issued JWT against the user pool that issued it.
39    /// Used by API Gateway v1's `COGNITO_USER_POOLS` authorizer to
40    /// validate signature/expiry/audience and extract claims.
41    cognito_jwt_verifier: Option<Arc<dyn CognitoJwtVerifier>>,
42    /// Encrypt/decrypt via KMS for cross-service SSE (S3 SSE-KMS, SES
43    /// S3Action KmsKeyArn, etc.).
44    kms_hook: Option<Arc<dyn KmsHook>>,
45}
46
47/// Message attribute for SQS delivery from SNS.
48#[derive(Debug, Clone)]
49pub struct SqsMessageAttribute {
50    pub data_type: String,
51    pub string_value: Option<String>,
52    pub binary_value: Option<String>,
53}
54
55/// Error returned by fallible SQS delivery. Used by Scheduler's DLQ
56/// routing, which must distinguish "target queue missing" from
57/// "delivered successfully" to decide whether to send to the
58/// `DeadLetterConfig.Arn`.
59#[derive(Debug, Clone, PartialEq, Eq)]
60pub enum SqsDeliveryError {
61    /// The target queue ARN did not resolve to any existing queue.
62    QueueNotFound(String),
63    /// The ARN could not be parsed into a valid SQS queue identifier.
64    InvalidArn(String),
65    /// The message violated a constraint required by the target queue
66    /// (e.g. FIFO send missing MessageDeduplicationId without
67    /// content-based dedup enabled). Surfaces as a non-retriable
68    /// failure so the upstream service can route to its configured DLQ.
69    InvalidParameter(String),
70}
71
72impl std::fmt::Display for SqsDeliveryError {
73    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
74        match self {
75            Self::QueueNotFound(arn) => write!(f, "queue not found: {arn}"),
76            Self::InvalidArn(arn) => write!(f, "invalid queue ARN: {arn}"),
77            Self::InvalidParameter(msg) => write!(f, "invalid parameter: {msg}"),
78        }
79    }
80}
81
82impl std::error::Error for SqsDeliveryError {}
83
84/// Trait for delivering messages to SQS queues.
85pub trait SqsDelivery: Send + Sync {
86    fn deliver_to_queue(
87        &self,
88        queue_arn: &str,
89        message_body: &str,
90        attributes: &HashMap<String, String>,
91    );
92
93    /// Deliver with message attributes and FIFO fields
94    fn deliver_to_queue_with_attrs(
95        &self,
96        queue_arn: &str,
97        message_body: &str,
98        message_attributes: &HashMap<String, SqsMessageAttribute>,
99        message_group_id: Option<&str>,
100        message_dedup_id: Option<&str>,
101    ) {
102        // Default implementation: fall back to simple delivery
103        let _ = (message_attributes, message_group_id, message_dedup_id);
104        self.deliver_to_queue(queue_arn, message_body, &HashMap::new());
105    }
106
107    /// Fallible variant used by Scheduler's DLQ routing. Default
108    /// implementation assumes the queue exists (preserving the
109    /// fire-and-forget semantics of `deliver_to_queue`); the real SQS
110    /// impl overrides this to actually look up the queue and report
111    /// `QueueNotFound` so the caller can route to a DLQ.
112    fn try_deliver_to_queue_with_attrs(
113        &self,
114        queue_arn: &str,
115        message_body: &str,
116        message_attributes: &HashMap<String, SqsMessageAttribute>,
117        message_group_id: Option<&str>,
118        message_dedup_id: Option<&str>,
119    ) -> Result<(), SqsDeliveryError> {
120        self.deliver_to_queue_with_attrs(
121            queue_arn,
122            message_body,
123            message_attributes,
124            message_group_id,
125            message_dedup_id,
126        );
127        Ok(())
128    }
129}
130
131/// Trait for publishing messages to SNS topics.
132pub trait SnsDelivery: Send + Sync {
133    fn publish_to_topic(&self, topic_arn: &str, message: &str, subject: Option<&str>);
134
135    /// Publish to a FIFO SNS topic carrying the message group/dedup IDs
136    /// that downstream SQS subscribers need for ordering. Default impl
137    /// drops the IDs so non-FIFO callers don't have to override.
138    fn publish_to_topic_fifo(
139        &self,
140        topic_arn: &str,
141        message: &str,
142        subject: Option<&str>,
143        _message_group_id: Option<&str>,
144        _message_dedup_id: Option<&str>,
145    ) {
146        self.publish_to_topic(topic_arn, message, subject);
147    }
148}
149
150/// Trait for putting events onto an EventBridge bus from cross-service integrations.
151pub trait EventBridgeDelivery: Send + Sync {
152    /// Put an event onto the specified event bus in the default account.
153    /// The implementation should handle rule matching and target delivery.
154    fn put_event(&self, source: &str, detail_type: &str, detail: &str, event_bus_name: &str);
155
156    /// Put an event onto the specified event bus owned by `target_account_id`.
157    /// Used for cross-account delivery where the source service (e.g. Scheduler)
158    /// has a target ARN containing the destination account. The default impl
159    /// falls back to the default-account `put_event` for backwards compat —
160    /// real implementations should override and route to the target account's
161    /// state.
162    fn put_event_to_account(
163        &self,
164        source: &str,
165        detail_type: &str,
166        detail: &str,
167        event_bus_name: &str,
168        _target_account_id: &str,
169    ) {
170        self.put_event(source, detail_type, detail, event_bus_name);
171    }
172}
173
174/// Trait for invoking Lambda functions from cross-service integrations.
175pub trait LambdaDelivery: Send + Sync {
176    /// Invoke a Lambda function with the given payload.
177    /// The function is identified by ARN. Returns the response bytes on success.
178    fn invoke_lambda(
179        &self,
180        function_arn: &str,
181        payload: &str,
182    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<Vec<u8>, String>> + Send>>;
183}
184
185/// Trait for putting records to Kinesis Data Streams.
186pub trait KinesisDelivery: Send + Sync {
187    /// Put a record to a Kinesis stream identified by ARN.
188    /// The data should be base64-encoded. partition_key is used for shard distribution.
189    fn put_record(&self, stream_arn: &str, data: &str, partition_key: &str);
190}
191
192/// Trait for starting Step Functions executions from cross-service integrations.
193pub trait StepFunctionsDelivery: Send + Sync {
194    /// Start a state machine execution with the given input.
195    /// The state machine is identified by ARN.
196    fn start_execution(&self, state_machine_arn: &str, input: &str);
197}
198
199/// Cross-service Kinesis Data Firehose dispatch used by services
200/// (CloudWatch Logs subscription filters, EventBridge targets) that
201/// route records into a delivery stream without depending on the
202/// firehose crate directly. ARN form is
203/// `arn:aws:firehose:<region>:<account>:deliverystream/<name>`.
204pub trait FirehoseDelivery: Send + Sync {
205    fn put_record(&self, delivery_stream_arn: &str, data: &[u8]);
206}
207
208/// Cross-service S3 writer used by services that need to deliver
209/// content to S3 buckets without taking a direct dep on the S3 crate
210/// (CloudWatch Logs export tasks, Kinesis Firehose, ELB access logs).
211pub trait S3Delivery: Send + Sync {
212    /// Put an object to a bucket. Returns the bucket name on success
213    /// or an error string the caller can surface in tests / logs.
214    fn put_object(
215        &self,
216        account_id: &str,
217        bucket: &str,
218        key: &str,
219        body: Vec<u8>,
220        content_type: Option<&str>,
221    ) -> Result<(), String>;
222
223    /// Read an object's body. Returns Err when the bucket or key does
224    /// not exist or when the body cannot be read. Used by RDS
225    /// `RestoreDBInstanceFromS3` to ingest a backup blob without taking
226    /// a direct dep on the S3 crate.
227    fn get_object(&self, account_id: &str, bucket: &str, key: &str) -> Result<Vec<u8>, String>;
228}
229
230/// SES SendEmail dispatch for callers that already speak the AWS SES
231/// SendEmail / SendEmailV2 shape (multiple to/cc/bcc, optional subject,
232/// text/html bodies). Distinct from `EmailDispatcher`, which is the
233/// single-recipient cross-service primitive used by Cognito.
234pub trait SesSendEmailDispatcher: Send + Sync {
235    #[allow(clippy::too_many_arguments)]
236    fn send_email(
237        &self,
238        account_id: &str,
239        from: &str,
240        to: Vec<String>,
241        cc: Vec<String>,
242        bcc: Vec<String>,
243        subject: Option<&str>,
244        text_body: Option<&str>,
245        html_body: Option<&str>,
246    ) -> Result<(), String>;
247}
248
249/// Synthesize an ECS RunTask call from outside the ECS crate. Used by
250/// EventBridge Scheduler and EventBridge Rules to start tasks without
251/// depending directly on `fakecloud_ecs`.
252pub trait EcsTaskRunner: Send + Sync {
253    fn run_task(
254        &self,
255        account_id: &str,
256        cluster: &str,
257        task_definition: &str,
258        launch_type: Option<&str>,
259        count: usize,
260    ) -> Result<(), String>;
261}
262
263/// Register/deregister targets with ELBv2 target groups from outside
264/// the elbv2 crate. Used by ECS runtime when tasks with load balancers
265/// reach RUNNING or STOPPED.
266pub trait Elbv2TargetRegistration: Send + Sync {
267    fn register_targets(
268        &self,
269        account_id: &str,
270        target_group_arn: &str,
271        targets: Vec<(String, Option<i64>)>,
272    );
273    fn deregister_targets(
274        &self,
275        account_id: &str,
276        target_group_arn: &str,
277        targets: Vec<(String, Option<i64>)>,
278    );
279}
280
281/// Publish CloudWatch metric data points from outside the cloudwatch
282/// crate. Used by CloudWatch Logs metric filters when an incoming log
283/// event matches their pattern.
284pub trait CloudwatchDelivery: Send + Sync {
285    #[allow(clippy::too_many_arguments)]
286    fn put_metric(
287        &self,
288        account_id: &str,
289        region: &str,
290        namespace: &str,
291        metric_name: &str,
292        value: f64,
293        unit: Option<&str>,
294        dimensions: std::collections::BTreeMap<String, String>,
295        timestamp_ms: i64,
296    );
297}
298
299/// Put log events into CloudWatch Logs log groups from outside the
300/// logs crate. Used by Step Functions Express execution logging and
301/// ECS awslogs driver so they can deliver without depending directly
302/// on fakecloud_logs.
303pub trait CloudwatchLogsDelivery: Send + Sync {
304    fn put_log_events(
305        &self,
306        account_id: &str,
307        log_group_name: &str,
308        log_stream_name: &str,
309        events: &[(i64, String)],
310    );
311}
312
313/// Outbound email dispatch used by services that emulate AWS flows that
314/// route through SES (Cognito verification, etc.) without taking a direct
315/// dependency on the SES crate.
316pub trait EmailDispatcher: Send + Sync {
317    fn send_email(
318        &self,
319        account_id: &str,
320        from: &str,
321        to: &str,
322        subject: &str,
323        body_text: &str,
324        body_html: Option<&str>,
325    );
326}
327
328/// Outbound SMS dispatch used by services that emulate AWS flows that route
329/// through SNS phone-number publish (Cognito SMS MFA, etc.).
330pub trait SmsDispatcher: Send + Sync {
331    fn send_sms(&self, account_id: &str, phone_number: &str, message: &str);
332}
333
334/// Cross-service KMS hook: services that accept a `KmsKeyId` (Secrets
335/// Manager, SSM `SecureString`, S3 SSE-KMS, SQS / SNS / DynamoDB
336/// encrypted resources) call this so that real KMS calls happen, the
337/// invocation is recorded for introspection, and the returned blob is
338/// decryptable by the public KMS API.
339///
340/// Encryption context is the AWS-defined per-service map (e.g.
341/// `{aws:secretsmanager:secretArn: <arn>}` for Secrets Manager) and is
342/// recorded with the call so test code can assert it.
343pub trait KmsHook: Send + Sync {
344    fn encrypt(
345        &self,
346        account_id: &str,
347        region: &str,
348        key_id: &str,
349        plaintext: &[u8],
350        service_principal: &str,
351        encryption_context: std::collections::HashMap<String, String>,
352    ) -> Result<String, String>;
353
354    fn decrypt(
355        &self,
356        account_id: &str,
357        ciphertext_b64: &str,
358        service_principal: &str,
359        encryption_context: std::collections::HashMap<String, String>,
360    ) -> Result<Vec<u8>, String>;
361}
362
363/// Cognito-issued JWT verification hook. Implementations are wired by
364/// fakecloud-server and back the `COGNITO_USER_POOLS` authorizer in
365/// API Gateway v1. The verifier validates RS256 signature, exp/nbf,
366/// `iss`, and `aud`/`client_id` against the user pool referenced by
367/// the authorizer's `providerArns`. On success returns the decoded
368/// claims as a JSON object; on failure returns an error string the
369/// caller surfaces as `401 Unauthorized`.
370pub trait CognitoJwtVerifier: Send + Sync {
371    fn verify_token(
372        &self,
373        account_id: &str,
374        user_pool_arn: &str,
375        token: &str,
376    ) -> Result<serde_json::Value, String>;
377}
378
379impl DeliveryBus {
380    pub fn new() -> Self {
381        Self {
382            sqs_sender: None,
383            sns_sender: None,
384            eventbridge_sender: None,
385            lambda_invoker: None,
386            kinesis_sender: None,
387            stepfunctions_starter: None,
388            s3_writer: None,
389            firehose_sender: None,
390            ses_dispatcher: None,
391            ecs_task_runner: None,
392            elbv2_target_registration: None,
393            cloudwatch_metrics: None,
394            cloudwatch_logs: None,
395            cognito_jwt_verifier: None,
396            kms_hook: None,
397        }
398    }
399
400    pub fn with_cognito_jwt_verifier(mut self, verifier: Arc<dyn CognitoJwtVerifier>) -> Self {
401        self.cognito_jwt_verifier = Some(verifier);
402        self
403    }
404
405    pub fn with_kms_hook(mut self, hook: Arc<dyn KmsHook>) -> Self {
406        self.kms_hook = Some(hook);
407        self
408    }
409
410    /// Encrypt plaintext with the supplied KMS key. Returns Err when no
411    /// KMS hook is wired or the encryption fails.
412    pub fn kms_encrypt(
413        &self,
414        account_id: &str,
415        region: &str,
416        key_id: &str,
417        plaintext: &[u8],
418        service_principal: &str,
419        encryption_context: std::collections::HashMap<String, String>,
420    ) -> Result<String, String> {
421        match self.kms_hook {
422            Some(ref h) => h.encrypt(
423                account_id,
424                region,
425                key_id,
426                plaintext,
427                service_principal,
428                encryption_context,
429            ),
430            None => Err("KMS hook not configured".to_string()),
431        }
432    }
433
434    /// Decrypt a KMS ciphertext blob. Returns Err when no KMS hook is
435    /// wired or the decryption fails.
436    pub fn kms_decrypt(
437        &self,
438        account_id: &str,
439        ciphertext_b64: &str,
440        service_principal: &str,
441        encryption_context: std::collections::HashMap<String, String>,
442    ) -> Result<Vec<u8>, String> {
443        match self.kms_hook {
444            Some(ref h) => h.decrypt(
445                account_id,
446                ciphertext_b64,
447                service_principal,
448                encryption_context,
449            ),
450            None => Err("KMS hook not configured".to_string()),
451        }
452    }
453
454    /// Verify a Cognito JWT against a user pool. Returns `Err` when no
455    /// verifier is wired or when the token fails validation.
456    pub fn verify_cognito_jwt(
457        &self,
458        account_id: &str,
459        user_pool_arn: &str,
460        token: &str,
461    ) -> Result<serde_json::Value, String> {
462        match self.cognito_jwt_verifier {
463            Some(ref v) => v.verify_token(account_id, user_pool_arn, token),
464            None => Err("Cognito JWT verifier not configured".to_string()),
465        }
466    }
467
468    pub fn with_cloudwatch_metrics(mut self, sender: Arc<dyn CloudwatchDelivery>) -> Self {
469        self.cloudwatch_metrics = Some(sender);
470        self
471    }
472
473    /// Publish a CloudWatch metric data point. Silently no-ops when no
474    /// CloudWatch sender is wired (in-process tests not exercising the
475    /// metrics path).
476    #[allow(clippy::too_many_arguments)]
477    pub fn put_cloudwatch_metric(
478        &self,
479        account_id: &str,
480        region: &str,
481        namespace: &str,
482        metric_name: &str,
483        value: f64,
484        unit: Option<&str>,
485        dimensions: std::collections::BTreeMap<String, String>,
486        timestamp_ms: i64,
487    ) {
488        if let Some(ref sender) = self.cloudwatch_metrics {
489            sender.put_metric(
490                account_id,
491                region,
492                namespace,
493                metric_name,
494                value,
495                unit,
496                dimensions,
497                timestamp_ms,
498            );
499        }
500    }
501
502    pub fn with_cloudwatch_logs(mut self, sender: Arc<dyn CloudwatchLogsDelivery>) -> Self {
503        self.cloudwatch_logs = Some(sender);
504        self
505    }
506
507    /// Put log events to a CloudWatch Logs log group / stream.
508    /// Silently no-ops when no CloudWatch Logs sender is wired.
509    pub fn put_log_events(
510        &self,
511        account_id: &str,
512        log_group_name: &str,
513        log_stream_name: &str,
514        events: &[(i64, String)],
515    ) {
516        if let Some(ref sender) = self.cloudwatch_logs {
517            sender.put_log_events(account_id, log_group_name, log_stream_name, events);
518        }
519    }
520
521    pub fn with_ses_dispatcher(mut self, dispatcher: Arc<dyn SesSendEmailDispatcher>) -> Self {
522        self.ses_dispatcher = Some(dispatcher);
523        self
524    }
525
526    pub fn with_ecs_task_runner(mut self, runner: Arc<dyn EcsTaskRunner>) -> Self {
527        self.ecs_task_runner = Some(runner);
528        self
529    }
530
531    pub fn with_elbv2_target_registration(mut self, reg: Arc<dyn Elbv2TargetRegistration>) -> Self {
532        self.elbv2_target_registration = Some(reg);
533        self
534    }
535
536    /// Register targets with an ELBv2 target group. Silently no-ops when
537    /// no ELBv2 target registration hook is wired.
538    pub fn register_elbv2_targets(
539        &self,
540        account_id: &str,
541        target_group_arn: &str,
542        targets: Vec<(String, Option<i64>)>,
543    ) {
544        if let Some(ref reg) = self.elbv2_target_registration {
545            reg.register_targets(account_id, target_group_arn, targets);
546        }
547    }
548
549    /// Deregister targets from an ELBv2 target group. Silently no-ops
550    /// when no ELBv2 target registration hook is wired.
551    pub fn deregister_elbv2_targets(
552        &self,
553        account_id: &str,
554        target_group_arn: &str,
555        targets: Vec<(String, Option<i64>)>,
556    ) {
557        if let Some(ref reg) = self.elbv2_target_registration {
558            reg.deregister_targets(account_id, target_group_arn, targets);
559        }
560    }
561
562    /// Send an email via SES. Returns `Err` when no SES dispatcher is
563    /// wired or the underlying impl rejects (bad source/dest).
564    #[allow(clippy::too_many_arguments)]
565    pub fn send_ses_email(
566        &self,
567        account_id: &str,
568        from: &str,
569        to: Vec<String>,
570        cc: Vec<String>,
571        bcc: Vec<String>,
572        subject: Option<&str>,
573        text_body: Option<&str>,
574        html_body: Option<&str>,
575    ) -> Result<(), String> {
576        match self.ses_dispatcher {
577            Some(ref d) => {
578                d.send_email(account_id, from, to, cc, bcc, subject, text_body, html_body)
579            }
580            None => Err("SES dispatcher not configured".to_string()),
581        }
582    }
583
584    /// Run an ECS task. Returns `Err` when no ECS runner is wired or
585    /// the impl rejects (unknown cluster / task definition).
586    pub fn run_ecs_task(
587        &self,
588        account_id: &str,
589        cluster: &str,
590        task_definition: &str,
591        launch_type: Option<&str>,
592        count: usize,
593    ) -> Result<(), String> {
594        match self.ecs_task_runner {
595            Some(ref r) => r.run_task(account_id, cluster, task_definition, launch_type, count),
596            None => Err("ECS task runner not configured".to_string()),
597        }
598    }
599
600    pub fn with_s3(mut self, sender: Arc<dyn S3Delivery>) -> Self {
601        self.s3_writer = Some(sender);
602        self
603    }
604
605    pub fn with_firehose(mut self, sender: Arc<dyn FirehoseDelivery>) -> Self {
606        self.firehose_sender = Some(sender);
607        self
608    }
609
610    /// Send a single record to a Firehose delivery stream by ARN.
611    /// Silently no-ops when no Firehose sender is wired (in-process
612    /// tests). Production wiring goes through fakecloud_firehose.
613    pub fn put_record_to_firehose(&self, delivery_stream_arn: &str, data: &[u8]) {
614        if let Some(ref sender) = self.firehose_sender {
615            sender.put_record(delivery_stream_arn, data);
616        }
617    }
618
619    /// Write content to S3. Returns Err when no S3 writer is wired or
620    /// when the underlying impl rejects the bucket / payload.
621    pub fn put_object_to_s3(
622        &self,
623        account_id: &str,
624        bucket: &str,
625        key: &str,
626        body: Vec<u8>,
627        content_type: Option<&str>,
628    ) -> Result<(), String> {
629        match self.s3_writer {
630            Some(ref sender) => sender.put_object(account_id, bucket, key, body, content_type),
631            None => Err("S3 writer not configured".to_string()),
632        }
633    }
634
635    /// Read content from S3. Returns Err when no S3 client is wired or
636    /// when the underlying impl cannot resolve the object.
637    pub fn get_object_from_s3(
638        &self,
639        account_id: &str,
640        bucket: &str,
641        key: &str,
642    ) -> Result<Vec<u8>, String> {
643        match self.s3_writer {
644            Some(ref sender) => sender.get_object(account_id, bucket, key),
645            None => Err("S3 client not configured".to_string()),
646        }
647    }
648
649    pub fn with_sqs(mut self, sender: Arc<dyn SqsDelivery>) -> Self {
650        self.sqs_sender = Some(sender);
651        self
652    }
653
654    pub fn with_sns(mut self, sender: Arc<dyn SnsDelivery>) -> Self {
655        self.sns_sender = Some(sender);
656        self
657    }
658
659    pub fn with_eventbridge(mut self, sender: Arc<dyn EventBridgeDelivery>) -> Self {
660        self.eventbridge_sender = Some(sender);
661        self
662    }
663
664    pub fn with_lambda(mut self, invoker: Arc<dyn LambdaDelivery>) -> Self {
665        self.lambda_invoker = Some(invoker);
666        self
667    }
668
669    pub fn with_kinesis(mut self, sender: Arc<dyn KinesisDelivery>) -> Self {
670        self.kinesis_sender = Some(sender);
671        self
672    }
673
674    /// Put a record to a Kinesis Data Stream identified by ARN.
675    /// Silently no-ops when no Kinesis sender is wired.
676    pub fn put_record_to_kinesis(&self, stream_arn: &str, data: &str, partition_key: &str) {
677        if let Some(ref sender) = self.kinesis_sender {
678            sender.put_record(stream_arn, data, partition_key);
679        }
680    }
681
682    pub fn with_stepfunctions(mut self, starter: Arc<dyn StepFunctionsDelivery>) -> Self {
683        self.stepfunctions_starter = Some(starter);
684        self
685    }
686
687    /// Send a message to an SQS queue identified by ARN.
688    pub fn send_to_sqs(
689        &self,
690        queue_arn: &str,
691        message_body: &str,
692        attributes: &HashMap<String, String>,
693    ) {
694        if let Some(ref sender) = self.sqs_sender {
695            sender.deliver_to_queue(queue_arn, message_body, attributes);
696        }
697    }
698
699    /// Send a message to an SQS queue with message attributes and FIFO fields.
700    pub fn send_to_sqs_with_attrs(
701        &self,
702        queue_arn: &str,
703        message_body: &str,
704        message_attributes: &HashMap<String, SqsMessageAttribute>,
705        message_group_id: Option<&str>,
706        message_dedup_id: Option<&str>,
707    ) {
708        if let Some(ref sender) = self.sqs_sender {
709            sender.deliver_to_queue_with_attrs(
710                queue_arn,
711                message_body,
712                message_attributes,
713                message_group_id,
714                message_dedup_id,
715            );
716        }
717    }
718
719    /// Fallible SQS send — returns `Err` when the target queue does not
720    /// exist, so callers (Scheduler) can route to a DLQ. Returns
721    /// `Err(QueueNotFound)` when no SQS sender is wired up at all,
722    /// matching the "target unreachable" semantics Scheduler relies on.
723    pub fn try_send_to_sqs_with_attrs(
724        &self,
725        queue_arn: &str,
726        message_body: &str,
727        message_attributes: &HashMap<String, SqsMessageAttribute>,
728        message_group_id: Option<&str>,
729        message_dedup_id: Option<&str>,
730    ) -> Result<(), SqsDeliveryError> {
731        match self.sqs_sender {
732            Some(ref sender) => sender.try_deliver_to_queue_with_attrs(
733                queue_arn,
734                message_body,
735                message_attributes,
736                message_group_id,
737                message_dedup_id,
738            ),
739            None => Err(SqsDeliveryError::QueueNotFound(queue_arn.to_string())),
740        }
741    }
742
743    /// Publish a message to an SNS topic identified by ARN.
744    pub fn publish_to_sns(&self, topic_arn: &str, message: &str, subject: Option<&str>) {
745        if let Some(ref sender) = self.sns_sender {
746            sender.publish_to_topic(topic_arn, message, subject);
747        }
748    }
749
750    /// Put an event onto an EventBridge bus in the default account.
751    pub fn put_event_to_eventbridge(
752        &self,
753        source: &str,
754        detail_type: &str,
755        detail: &str,
756        event_bus_name: &str,
757    ) {
758        if let Some(ref sender) = self.eventbridge_sender {
759            sender.put_event(source, detail_type, detail, event_bus_name);
760        }
761    }
762
763    /// Put an event onto an EventBridge bus in a specific account. Used by
764    /// Scheduler to deliver to cross-account event buses.
765    pub fn put_event_to_eventbridge_for_account(
766        &self,
767        source: &str,
768        detail_type: &str,
769        detail: &str,
770        event_bus_name: &str,
771        target_account_id: &str,
772    ) {
773        if let Some(ref sender) = self.eventbridge_sender {
774            sender.put_event_to_account(
775                source,
776                detail_type,
777                detail,
778                event_bus_name,
779                target_account_id,
780            );
781        }
782    }
783
784    /// Invoke a Lambda function identified by ARN.
785    pub async fn invoke_lambda(
786        &self,
787        function_arn: &str,
788        payload: &str,
789    ) -> Option<Result<Vec<u8>, String>> {
790        if let Some(ref invoker) = self.lambda_invoker {
791            Some(invoker.invoke_lambda(function_arn, payload).await)
792        } else {
793            None
794        }
795    }
796
797    /// Put a record to a Kinesis stream identified by ARN.
798    pub fn send_to_kinesis(&self, stream_arn: &str, data: &str, partition_key: &str) {
799        if let Some(ref sender) = self.kinesis_sender {
800            sender.put_record(stream_arn, data, partition_key);
801        }
802    }
803
804    /// Start a Step Functions execution identified by state machine ARN.
805    pub fn start_stepfunctions_execution(&self, state_machine_arn: &str, input: &str) {
806        if let Some(ref starter) = self.stepfunctions_starter {
807            starter.start_execution(state_machine_arn, input);
808        }
809    }
810}
811
812impl Default for DeliveryBus {
813    fn default() -> Self {
814        Self::new()
815    }
816}
817
818#[cfg(test)]
819mod tests {
820    use super::*;
821    use std::sync::atomic::{AtomicUsize, Ordering};
822    use std::sync::Arc;
823
824    // Mock implementations
825    struct MockSqs {
826        call_count: AtomicUsize,
827    }
828    impl SqsDelivery for MockSqs {
829        fn deliver_to_queue(
830            &self,
831            _queue_arn: &str,
832            _message_body: &str,
833            _attributes: &HashMap<String, String>,
834        ) {
835            self.call_count.fetch_add(1, Ordering::SeqCst);
836        }
837    }
838
839    struct MockSns {
840        call_count: AtomicUsize,
841    }
842    impl SnsDelivery for MockSns {
843        fn publish_to_topic(&self, _topic_arn: &str, _message: &str, _subject: Option<&str>) {
844            self.call_count.fetch_add(1, Ordering::SeqCst);
845        }
846    }
847
848    struct MockEventBridge {
849        call_count: AtomicUsize,
850    }
851    impl EventBridgeDelivery for MockEventBridge {
852        fn put_event(
853            &self,
854            _source: &str,
855            _detail_type: &str,
856            _detail: &str,
857            _event_bus_name: &str,
858        ) {
859            self.call_count.fetch_add(1, Ordering::SeqCst);
860        }
861    }
862
863    struct MockKinesis {
864        call_count: AtomicUsize,
865    }
866    impl KinesisDelivery for MockKinesis {
867        fn put_record(&self, _stream_arn: &str, _data: &str, _partition_key: &str) {
868            self.call_count.fetch_add(1, Ordering::SeqCst);
869        }
870    }
871
872    struct MockStepFunctions {
873        call_count: AtomicUsize,
874    }
875    impl StepFunctionsDelivery for MockStepFunctions {
876        fn start_execution(&self, _state_machine_arn: &str, _input: &str) {
877            self.call_count.fetch_add(1, Ordering::SeqCst);
878        }
879    }
880
881    #[test]
882    fn delivery_bus_new_has_no_senders() {
883        let bus = DeliveryBus::new();
884        // Calling methods without senders should be no-ops
885        bus.send_to_sqs("arn:queue", "body", &HashMap::new());
886        bus.publish_to_sns("arn:topic", "msg", None);
887        bus.put_event_to_eventbridge("src", "type", "{}", "default");
888        bus.send_to_kinesis("arn:stream", "data", "pk");
889        bus.start_stepfunctions_execution("arn:sfn", "{}");
890        // No panics = success
891    }
892
893    #[test]
894    fn delivery_bus_default_is_same_as_new() {
895        let bus = DeliveryBus::default();
896        bus.send_to_sqs("arn:q", "b", &HashMap::new());
897    }
898
899    #[test]
900    fn send_to_sqs_calls_sender() {
901        let mock = Arc::new(MockSqs {
902            call_count: AtomicUsize::new(0),
903        });
904        let bus = DeliveryBus::new().with_sqs(mock.clone());
905
906        bus.send_to_sqs("arn:queue", "msg", &HashMap::new());
907        assert_eq!(mock.call_count.load(Ordering::SeqCst), 1);
908
909        bus.send_to_sqs("arn:queue2", "msg2", &HashMap::new());
910        assert_eq!(mock.call_count.load(Ordering::SeqCst), 2);
911    }
912
913    #[test]
914    fn send_to_sqs_with_attrs_calls_sender() {
915        let mock = Arc::new(MockSqs {
916            call_count: AtomicUsize::new(0),
917        });
918        let bus = DeliveryBus::new().with_sqs(mock.clone());
919
920        let mut attrs = HashMap::new();
921        attrs.insert(
922            "key".to_string(),
923            SqsMessageAttribute {
924                data_type: "String".to_string(),
925                string_value: Some("val".to_string()),
926                binary_value: None,
927            },
928        );
929        bus.send_to_sqs_with_attrs("arn:q", "body", &attrs, Some("group"), Some("dedup"));
930        assert_eq!(mock.call_count.load(Ordering::SeqCst), 1);
931    }
932
933    #[test]
934    fn publish_to_sns_calls_sender() {
935        let mock = Arc::new(MockSns {
936            call_count: AtomicUsize::new(0),
937        });
938        let bus = DeliveryBus::new().with_sns(mock.clone());
939
940        bus.publish_to_sns("arn:topic", "message", Some("subject"));
941        assert_eq!(mock.call_count.load(Ordering::SeqCst), 1);
942    }
943
944    #[test]
945    fn put_event_to_eventbridge_calls_sender() {
946        let mock = Arc::new(MockEventBridge {
947            call_count: AtomicUsize::new(0),
948        });
949        let bus = DeliveryBus::new().with_eventbridge(mock.clone());
950
951        bus.put_event_to_eventbridge("aws.s3", "Object Created", "{}", "default");
952        assert_eq!(mock.call_count.load(Ordering::SeqCst), 1);
953    }
954
955    #[test]
956    fn send_to_kinesis_calls_sender() {
957        let mock = Arc::new(MockKinesis {
958            call_count: AtomicUsize::new(0),
959        });
960        let bus = DeliveryBus::new().with_kinesis(mock.clone());
961
962        bus.send_to_kinesis("arn:stream", "data", "partition-key");
963        assert_eq!(mock.call_count.load(Ordering::SeqCst), 1);
964    }
965
966    #[test]
967    fn start_stepfunctions_calls_sender() {
968        let mock = Arc::new(MockStepFunctions {
969            call_count: AtomicUsize::new(0),
970        });
971        let bus = DeliveryBus::new().with_stepfunctions(mock.clone());
972
973        bus.start_stepfunctions_execution("arn:sfn:machine", r#"{"key":"val"}"#);
974        assert_eq!(mock.call_count.load(Ordering::SeqCst), 1);
975    }
976
977    #[test]
978    fn builder_chaining_works() {
979        let sqs = Arc::new(MockSqs {
980            call_count: AtomicUsize::new(0),
981        });
982        let sns = Arc::new(MockSns {
983            call_count: AtomicUsize::new(0),
984        });
985        let eb = Arc::new(MockEventBridge {
986            call_count: AtomicUsize::new(0),
987        });
988        let kin = Arc::new(MockKinesis {
989            call_count: AtomicUsize::new(0),
990        });
991        let sfn = Arc::new(MockStepFunctions {
992            call_count: AtomicUsize::new(0),
993        });
994
995        let bus = DeliveryBus::new()
996            .with_sqs(sqs.clone())
997            .with_sns(sns.clone())
998            .with_eventbridge(eb.clone())
999            .with_kinesis(kin.clone())
1000            .with_stepfunctions(sfn.clone());
1001
1002        bus.send_to_sqs("q", "m", &HashMap::new());
1003        bus.publish_to_sns("t", "m", None);
1004        bus.put_event_to_eventbridge("s", "d", "{}", "b");
1005        bus.send_to_kinesis("s", "d", "k");
1006        bus.start_stepfunctions_execution("sm", "{}");
1007
1008        assert_eq!(sqs.call_count.load(Ordering::SeqCst), 1);
1009        assert_eq!(sns.call_count.load(Ordering::SeqCst), 1);
1010        assert_eq!(eb.call_count.load(Ordering::SeqCst), 1);
1011        assert_eq!(kin.call_count.load(Ordering::SeqCst), 1);
1012        assert_eq!(sfn.call_count.load(Ordering::SeqCst), 1);
1013    }
1014
1015    #[tokio::test]
1016    async fn invoke_lambda_returns_none_without_invoker() {
1017        let bus = DeliveryBus::new();
1018        let result = bus.invoke_lambda("arn:lambda", "{}").await;
1019        assert!(result.is_none());
1020    }
1021}