use std::collections::HashMap;
use std::sync::Arc;
pub struct DeliveryBus {
sqs_sender: Option<Arc<dyn SqsDelivery>>,
sns_sender: Option<Arc<dyn SnsDelivery>>,
eventbridge_sender: Option<Arc<dyn EventBridgeDelivery>>,
lambda_invoker: Option<Arc<dyn LambdaDelivery>>,
kinesis_sender: Option<Arc<dyn KinesisDelivery>>,
stepfunctions_starter: Option<Arc<dyn StepFunctionsDelivery>>,
s3_writer: Option<Arc<dyn S3Delivery>>,
firehose_sender: Option<Arc<dyn FirehoseDelivery>>,
ses_dispatcher: Option<Arc<dyn SesSendEmailDispatcher>>,
ecs_task_runner: Option<Arc<dyn EcsTaskRunner>>,
elbv2_target_registration: Option<Arc<dyn Elbv2TargetRegistration>>,
cloudwatch_metrics: Option<Arc<dyn CloudwatchDelivery>>,
cloudwatch_logs: Option<Arc<dyn CloudwatchLogsDelivery>>,
cognito_jwt_verifier: Option<Arc<dyn CognitoJwtVerifier>>,
kms_hook: Option<Arc<dyn KmsHook>>,
}
#[derive(Debug, Clone)]
pub struct SqsMessageAttribute {
pub data_type: String,
pub string_value: Option<String>,
pub binary_value: Option<String>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum SqsDeliveryError {
QueueNotFound(String),
InvalidArn(String),
InvalidParameter(String),
}
impl std::fmt::Display for SqsDeliveryError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::QueueNotFound(arn) => write!(f, "queue not found: {arn}"),
Self::InvalidArn(arn) => write!(f, "invalid queue ARN: {arn}"),
Self::InvalidParameter(msg) => write!(f, "invalid parameter: {msg}"),
}
}
}
impl std::error::Error for SqsDeliveryError {}
pub trait SqsDelivery: Send + Sync {
fn deliver_to_queue(
&self,
queue_arn: &str,
message_body: &str,
attributes: &HashMap<String, String>,
);
fn deliver_to_queue_with_attrs(
&self,
queue_arn: &str,
message_body: &str,
message_attributes: &HashMap<String, SqsMessageAttribute>,
message_group_id: Option<&str>,
message_dedup_id: Option<&str>,
) {
let _ = (message_attributes, message_group_id, message_dedup_id);
self.deliver_to_queue(queue_arn, message_body, &HashMap::new());
}
fn try_deliver_to_queue_with_attrs(
&self,
queue_arn: &str,
message_body: &str,
message_attributes: &HashMap<String, SqsMessageAttribute>,
message_group_id: Option<&str>,
message_dedup_id: Option<&str>,
) -> Result<(), SqsDeliveryError> {
self.deliver_to_queue_with_attrs(
queue_arn,
message_body,
message_attributes,
message_group_id,
message_dedup_id,
);
Ok(())
}
}
pub trait SnsDelivery: Send + Sync {
fn publish_to_topic(&self, topic_arn: &str, message: &str, subject: Option<&str>);
fn publish_to_topic_fifo(
&self,
topic_arn: &str,
message: &str,
subject: Option<&str>,
_message_group_id: Option<&str>,
_message_dedup_id: Option<&str>,
) {
self.publish_to_topic(topic_arn, message, subject);
}
}
pub trait EventBridgeDelivery: Send + Sync {
fn put_event(&self, source: &str, detail_type: &str, detail: &str, event_bus_name: &str);
fn put_event_to_account(
&self,
source: &str,
detail_type: &str,
detail: &str,
event_bus_name: &str,
_target_account_id: &str,
) {
self.put_event(source, detail_type, detail, event_bus_name);
}
}
pub trait LambdaDelivery: Send + Sync {
fn invoke_lambda(
&self,
function_arn: &str,
payload: &str,
) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<Vec<u8>, String>> + Send>>;
}
pub trait KinesisDelivery: Send + Sync {
fn put_record(&self, stream_arn: &str, data: &str, partition_key: &str);
}
pub trait StepFunctionsDelivery: Send + Sync {
fn start_execution(&self, state_machine_arn: &str, input: &str);
}
pub trait FirehoseDelivery: Send + Sync {
fn put_record(&self, delivery_stream_arn: &str, data: &[u8]);
}
pub trait S3Delivery: Send + Sync {
fn put_object(
&self,
account_id: &str,
bucket: &str,
key: &str,
body: Vec<u8>,
content_type: Option<&str>,
) -> Result<(), String>;
fn get_object(&self, account_id: &str, bucket: &str, key: &str) -> Result<Vec<u8>, String>;
}
pub trait SesSendEmailDispatcher: Send + Sync {
#[allow(clippy::too_many_arguments)]
fn send_email(
&self,
account_id: &str,
from: &str,
to: Vec<String>,
cc: Vec<String>,
bcc: Vec<String>,
subject: Option<&str>,
text_body: Option<&str>,
html_body: Option<&str>,
) -> Result<(), String>;
}
pub trait EcsTaskRunner: Send + Sync {
fn run_task(
&self,
account_id: &str,
cluster: &str,
task_definition: &str,
launch_type: Option<&str>,
count: usize,
) -> Result<(), String>;
}
pub trait Elbv2TargetRegistration: Send + Sync {
fn register_targets(
&self,
account_id: &str,
target_group_arn: &str,
targets: Vec<(String, Option<i64>)>,
);
fn deregister_targets(
&self,
account_id: &str,
target_group_arn: &str,
targets: Vec<(String, Option<i64>)>,
);
}
pub trait CloudwatchDelivery: Send + Sync {
#[allow(clippy::too_many_arguments)]
fn put_metric(
&self,
account_id: &str,
region: &str,
namespace: &str,
metric_name: &str,
value: f64,
unit: Option<&str>,
dimensions: std::collections::BTreeMap<String, String>,
timestamp_ms: i64,
);
}
pub trait CloudwatchLogsDelivery: Send + Sync {
fn put_log_events(
&self,
account_id: &str,
log_group_name: &str,
log_stream_name: &str,
events: &[(i64, String)],
);
}
pub trait EmailDispatcher: Send + Sync {
fn send_email(
&self,
account_id: &str,
from: &str,
to: &str,
subject: &str,
body_text: &str,
body_html: Option<&str>,
);
}
pub trait SmsDispatcher: Send + Sync {
fn send_sms(&self, account_id: &str, phone_number: &str, message: &str);
}
pub trait KmsHook: Send + Sync {
fn encrypt(
&self,
account_id: &str,
region: &str,
key_id: &str,
plaintext: &[u8],
service_principal: &str,
encryption_context: std::collections::HashMap<String, String>,
) -> Result<String, String>;
fn decrypt(
&self,
account_id: &str,
ciphertext_b64: &str,
service_principal: &str,
encryption_context: std::collections::HashMap<String, String>,
) -> Result<Vec<u8>, String>;
}
pub trait CognitoJwtVerifier: Send + Sync {
fn verify_token(
&self,
account_id: &str,
user_pool_arn: &str,
token: &str,
) -> Result<serde_json::Value, String>;
}
impl DeliveryBus {
pub fn new() -> Self {
Self {
sqs_sender: None,
sns_sender: None,
eventbridge_sender: None,
lambda_invoker: None,
kinesis_sender: None,
stepfunctions_starter: None,
s3_writer: None,
firehose_sender: None,
ses_dispatcher: None,
ecs_task_runner: None,
elbv2_target_registration: None,
cloudwatch_metrics: None,
cloudwatch_logs: None,
cognito_jwt_verifier: None,
kms_hook: None,
}
}
pub fn with_cognito_jwt_verifier(mut self, verifier: Arc<dyn CognitoJwtVerifier>) -> Self {
self.cognito_jwt_verifier = Some(verifier);
self
}
pub fn with_kms_hook(mut self, hook: Arc<dyn KmsHook>) -> Self {
self.kms_hook = Some(hook);
self
}
pub fn kms_encrypt(
&self,
account_id: &str,
region: &str,
key_id: &str,
plaintext: &[u8],
service_principal: &str,
encryption_context: std::collections::HashMap<String, String>,
) -> Result<String, String> {
match self.kms_hook {
Some(ref h) => h.encrypt(
account_id,
region,
key_id,
plaintext,
service_principal,
encryption_context,
),
None => Err("KMS hook not configured".to_string()),
}
}
pub fn kms_decrypt(
&self,
account_id: &str,
ciphertext_b64: &str,
service_principal: &str,
encryption_context: std::collections::HashMap<String, String>,
) -> Result<Vec<u8>, String> {
match self.kms_hook {
Some(ref h) => h.decrypt(
account_id,
ciphertext_b64,
service_principal,
encryption_context,
),
None => Err("KMS hook not configured".to_string()),
}
}
pub fn verify_cognito_jwt(
&self,
account_id: &str,
user_pool_arn: &str,
token: &str,
) -> Result<serde_json::Value, String> {
match self.cognito_jwt_verifier {
Some(ref v) => v.verify_token(account_id, user_pool_arn, token),
None => Err("Cognito JWT verifier not configured".to_string()),
}
}
pub fn with_cloudwatch_metrics(mut self, sender: Arc<dyn CloudwatchDelivery>) -> Self {
self.cloudwatch_metrics = Some(sender);
self
}
#[allow(clippy::too_many_arguments)]
pub fn put_cloudwatch_metric(
&self,
account_id: &str,
region: &str,
namespace: &str,
metric_name: &str,
value: f64,
unit: Option<&str>,
dimensions: std::collections::BTreeMap<String, String>,
timestamp_ms: i64,
) {
if let Some(ref sender) = self.cloudwatch_metrics {
sender.put_metric(
account_id,
region,
namespace,
metric_name,
value,
unit,
dimensions,
timestamp_ms,
);
}
}
pub fn with_cloudwatch_logs(mut self, sender: Arc<dyn CloudwatchLogsDelivery>) -> Self {
self.cloudwatch_logs = Some(sender);
self
}
pub fn put_log_events(
&self,
account_id: &str,
log_group_name: &str,
log_stream_name: &str,
events: &[(i64, String)],
) {
if let Some(ref sender) = self.cloudwatch_logs {
sender.put_log_events(account_id, log_group_name, log_stream_name, events);
}
}
pub fn with_ses_dispatcher(mut self, dispatcher: Arc<dyn SesSendEmailDispatcher>) -> Self {
self.ses_dispatcher = Some(dispatcher);
self
}
pub fn with_ecs_task_runner(mut self, runner: Arc<dyn EcsTaskRunner>) -> Self {
self.ecs_task_runner = Some(runner);
self
}
pub fn with_elbv2_target_registration(mut self, reg: Arc<dyn Elbv2TargetRegistration>) -> Self {
self.elbv2_target_registration = Some(reg);
self
}
pub fn register_elbv2_targets(
&self,
account_id: &str,
target_group_arn: &str,
targets: Vec<(String, Option<i64>)>,
) {
if let Some(ref reg) = self.elbv2_target_registration {
reg.register_targets(account_id, target_group_arn, targets);
}
}
pub fn deregister_elbv2_targets(
&self,
account_id: &str,
target_group_arn: &str,
targets: Vec<(String, Option<i64>)>,
) {
if let Some(ref reg) = self.elbv2_target_registration {
reg.deregister_targets(account_id, target_group_arn, targets);
}
}
#[allow(clippy::too_many_arguments)]
pub fn send_ses_email(
&self,
account_id: &str,
from: &str,
to: Vec<String>,
cc: Vec<String>,
bcc: Vec<String>,
subject: Option<&str>,
text_body: Option<&str>,
html_body: Option<&str>,
) -> Result<(), String> {
match self.ses_dispatcher {
Some(ref d) => {
d.send_email(account_id, from, to, cc, bcc, subject, text_body, html_body)
}
None => Err("SES dispatcher not configured".to_string()),
}
}
pub fn run_ecs_task(
&self,
account_id: &str,
cluster: &str,
task_definition: &str,
launch_type: Option<&str>,
count: usize,
) -> Result<(), String> {
match self.ecs_task_runner {
Some(ref r) => r.run_task(account_id, cluster, task_definition, launch_type, count),
None => Err("ECS task runner not configured".to_string()),
}
}
pub fn with_s3(mut self, sender: Arc<dyn S3Delivery>) -> Self {
self.s3_writer = Some(sender);
self
}
pub fn with_firehose(mut self, sender: Arc<dyn FirehoseDelivery>) -> Self {
self.firehose_sender = Some(sender);
self
}
pub fn put_record_to_firehose(&self, delivery_stream_arn: &str, data: &[u8]) {
if let Some(ref sender) = self.firehose_sender {
sender.put_record(delivery_stream_arn, data);
}
}
pub fn put_object_to_s3(
&self,
account_id: &str,
bucket: &str,
key: &str,
body: Vec<u8>,
content_type: Option<&str>,
) -> Result<(), String> {
match self.s3_writer {
Some(ref sender) => sender.put_object(account_id, bucket, key, body, content_type),
None => Err("S3 writer not configured".to_string()),
}
}
pub fn get_object_from_s3(
&self,
account_id: &str,
bucket: &str,
key: &str,
) -> Result<Vec<u8>, String> {
match self.s3_writer {
Some(ref sender) => sender.get_object(account_id, bucket, key),
None => Err("S3 client not configured".to_string()),
}
}
pub fn with_sqs(mut self, sender: Arc<dyn SqsDelivery>) -> Self {
self.sqs_sender = Some(sender);
self
}
pub fn with_sns(mut self, sender: Arc<dyn SnsDelivery>) -> Self {
self.sns_sender = Some(sender);
self
}
pub fn with_eventbridge(mut self, sender: Arc<dyn EventBridgeDelivery>) -> Self {
self.eventbridge_sender = Some(sender);
self
}
pub fn with_lambda(mut self, invoker: Arc<dyn LambdaDelivery>) -> Self {
self.lambda_invoker = Some(invoker);
self
}
pub fn with_kinesis(mut self, sender: Arc<dyn KinesisDelivery>) -> Self {
self.kinesis_sender = Some(sender);
self
}
pub fn put_record_to_kinesis(&self, stream_arn: &str, data: &str, partition_key: &str) {
if let Some(ref sender) = self.kinesis_sender {
sender.put_record(stream_arn, data, partition_key);
}
}
pub fn with_stepfunctions(mut self, starter: Arc<dyn StepFunctionsDelivery>) -> Self {
self.stepfunctions_starter = Some(starter);
self
}
pub fn send_to_sqs(
&self,
queue_arn: &str,
message_body: &str,
attributes: &HashMap<String, String>,
) {
if let Some(ref sender) = self.sqs_sender {
sender.deliver_to_queue(queue_arn, message_body, attributes);
}
}
pub fn send_to_sqs_with_attrs(
&self,
queue_arn: &str,
message_body: &str,
message_attributes: &HashMap<String, SqsMessageAttribute>,
message_group_id: Option<&str>,
message_dedup_id: Option<&str>,
) {
if let Some(ref sender) = self.sqs_sender {
sender.deliver_to_queue_with_attrs(
queue_arn,
message_body,
message_attributes,
message_group_id,
message_dedup_id,
);
}
}
pub fn try_send_to_sqs_with_attrs(
&self,
queue_arn: &str,
message_body: &str,
message_attributes: &HashMap<String, SqsMessageAttribute>,
message_group_id: Option<&str>,
message_dedup_id: Option<&str>,
) -> Result<(), SqsDeliveryError> {
match self.sqs_sender {
Some(ref sender) => sender.try_deliver_to_queue_with_attrs(
queue_arn,
message_body,
message_attributes,
message_group_id,
message_dedup_id,
),
None => Err(SqsDeliveryError::QueueNotFound(queue_arn.to_string())),
}
}
pub fn publish_to_sns(&self, topic_arn: &str, message: &str, subject: Option<&str>) {
if let Some(ref sender) = self.sns_sender {
sender.publish_to_topic(topic_arn, message, subject);
}
}
pub fn put_event_to_eventbridge(
&self,
source: &str,
detail_type: &str,
detail: &str,
event_bus_name: &str,
) {
if let Some(ref sender) = self.eventbridge_sender {
sender.put_event(source, detail_type, detail, event_bus_name);
}
}
pub fn put_event_to_eventbridge_for_account(
&self,
source: &str,
detail_type: &str,
detail: &str,
event_bus_name: &str,
target_account_id: &str,
) {
if let Some(ref sender) = self.eventbridge_sender {
sender.put_event_to_account(
source,
detail_type,
detail,
event_bus_name,
target_account_id,
);
}
}
pub async fn invoke_lambda(
&self,
function_arn: &str,
payload: &str,
) -> Option<Result<Vec<u8>, String>> {
if let Some(ref invoker) = self.lambda_invoker {
Some(invoker.invoke_lambda(function_arn, payload).await)
} else {
None
}
}
pub fn send_to_kinesis(&self, stream_arn: &str, data: &str, partition_key: &str) {
if let Some(ref sender) = self.kinesis_sender {
sender.put_record(stream_arn, data, partition_key);
}
}
pub fn start_stepfunctions_execution(&self, state_machine_arn: &str, input: &str) {
if let Some(ref starter) = self.stepfunctions_starter {
starter.start_execution(state_machine_arn, input);
}
}
}
impl Default for DeliveryBus {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
struct MockSqs {
call_count: AtomicUsize,
}
impl SqsDelivery for MockSqs {
fn deliver_to_queue(
&self,
_queue_arn: &str,
_message_body: &str,
_attributes: &HashMap<String, String>,
) {
self.call_count.fetch_add(1, Ordering::SeqCst);
}
}
struct MockSns {
call_count: AtomicUsize,
}
impl SnsDelivery for MockSns {
fn publish_to_topic(&self, _topic_arn: &str, _message: &str, _subject: Option<&str>) {
self.call_count.fetch_add(1, Ordering::SeqCst);
}
}
struct MockEventBridge {
call_count: AtomicUsize,
}
impl EventBridgeDelivery for MockEventBridge {
fn put_event(
&self,
_source: &str,
_detail_type: &str,
_detail: &str,
_event_bus_name: &str,
) {
self.call_count.fetch_add(1, Ordering::SeqCst);
}
}
struct MockKinesis {
call_count: AtomicUsize,
}
impl KinesisDelivery for MockKinesis {
fn put_record(&self, _stream_arn: &str, _data: &str, _partition_key: &str) {
self.call_count.fetch_add(1, Ordering::SeqCst);
}
}
struct MockStepFunctions {
call_count: AtomicUsize,
}
impl StepFunctionsDelivery for MockStepFunctions {
fn start_execution(&self, _state_machine_arn: &str, _input: &str) {
self.call_count.fetch_add(1, Ordering::SeqCst);
}
}
#[test]
fn delivery_bus_new_has_no_senders() {
let bus = DeliveryBus::new();
bus.send_to_sqs("arn:queue", "body", &HashMap::new());
bus.publish_to_sns("arn:topic", "msg", None);
bus.put_event_to_eventbridge("src", "type", "{}", "default");
bus.send_to_kinesis("arn:stream", "data", "pk");
bus.start_stepfunctions_execution("arn:sfn", "{}");
}
#[test]
fn delivery_bus_default_is_same_as_new() {
let bus = DeliveryBus::default();
bus.send_to_sqs("arn:q", "b", &HashMap::new());
}
#[test]
fn send_to_sqs_calls_sender() {
let mock = Arc::new(MockSqs {
call_count: AtomicUsize::new(0),
});
let bus = DeliveryBus::new().with_sqs(mock.clone());
bus.send_to_sqs("arn:queue", "msg", &HashMap::new());
assert_eq!(mock.call_count.load(Ordering::SeqCst), 1);
bus.send_to_sqs("arn:queue2", "msg2", &HashMap::new());
assert_eq!(mock.call_count.load(Ordering::SeqCst), 2);
}
#[test]
fn send_to_sqs_with_attrs_calls_sender() {
let mock = Arc::new(MockSqs {
call_count: AtomicUsize::new(0),
});
let bus = DeliveryBus::new().with_sqs(mock.clone());
let mut attrs = HashMap::new();
attrs.insert(
"key".to_string(),
SqsMessageAttribute {
data_type: "String".to_string(),
string_value: Some("val".to_string()),
binary_value: None,
},
);
bus.send_to_sqs_with_attrs("arn:q", "body", &attrs, Some("group"), Some("dedup"));
assert_eq!(mock.call_count.load(Ordering::SeqCst), 1);
}
#[test]
fn publish_to_sns_calls_sender() {
let mock = Arc::new(MockSns {
call_count: AtomicUsize::new(0),
});
let bus = DeliveryBus::new().with_sns(mock.clone());
bus.publish_to_sns("arn:topic", "message", Some("subject"));
assert_eq!(mock.call_count.load(Ordering::SeqCst), 1);
}
#[test]
fn put_event_to_eventbridge_calls_sender() {
let mock = Arc::new(MockEventBridge {
call_count: AtomicUsize::new(0),
});
let bus = DeliveryBus::new().with_eventbridge(mock.clone());
bus.put_event_to_eventbridge("aws.s3", "Object Created", "{}", "default");
assert_eq!(mock.call_count.load(Ordering::SeqCst), 1);
}
#[test]
fn send_to_kinesis_calls_sender() {
let mock = Arc::new(MockKinesis {
call_count: AtomicUsize::new(0),
});
let bus = DeliveryBus::new().with_kinesis(mock.clone());
bus.send_to_kinesis("arn:stream", "data", "partition-key");
assert_eq!(mock.call_count.load(Ordering::SeqCst), 1);
}
#[test]
fn start_stepfunctions_calls_sender() {
let mock = Arc::new(MockStepFunctions {
call_count: AtomicUsize::new(0),
});
let bus = DeliveryBus::new().with_stepfunctions(mock.clone());
bus.start_stepfunctions_execution("arn:sfn:machine", r#"{"key":"val"}"#);
assert_eq!(mock.call_count.load(Ordering::SeqCst), 1);
}
#[test]
fn builder_chaining_works() {
let sqs = Arc::new(MockSqs {
call_count: AtomicUsize::new(0),
});
let sns = Arc::new(MockSns {
call_count: AtomicUsize::new(0),
});
let eb = Arc::new(MockEventBridge {
call_count: AtomicUsize::new(0),
});
let kin = Arc::new(MockKinesis {
call_count: AtomicUsize::new(0),
});
let sfn = Arc::new(MockStepFunctions {
call_count: AtomicUsize::new(0),
});
let bus = DeliveryBus::new()
.with_sqs(sqs.clone())
.with_sns(sns.clone())
.with_eventbridge(eb.clone())
.with_kinesis(kin.clone())
.with_stepfunctions(sfn.clone());
bus.send_to_sqs("q", "m", &HashMap::new());
bus.publish_to_sns("t", "m", None);
bus.put_event_to_eventbridge("s", "d", "{}", "b");
bus.send_to_kinesis("s", "d", "k");
bus.start_stepfunctions_execution("sm", "{}");
assert_eq!(sqs.call_count.load(Ordering::SeqCst), 1);
assert_eq!(sns.call_count.load(Ordering::SeqCst), 1);
assert_eq!(eb.call_count.load(Ordering::SeqCst), 1);
assert_eq!(kin.call_count.load(Ordering::SeqCst), 1);
assert_eq!(sfn.call_count.load(Ordering::SeqCst), 1);
}
#[tokio::test]
async fn invoke_lambda_returns_none_without_invoker() {
let bus = DeliveryBus::new();
let result = bus.invoke_lambda("arn:lambda", "{}").await;
assert!(result.is_none());
}
}