adapter-aws 0.1.0

AWS adapter for Rohas event-driven applications supporting SQS and EventBridge
Documentation
pub mod sqs;
pub mod eventbridge;
pub mod common;

pub use common::{AwsConfig, Message, Result};
pub use sqs::SqsAdapter;
pub use eventbridge::EventBridgeAdapter;

use serde_json::Value;
use std::sync::Arc;

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum AwsAdapterType {
    Sqs,
    EventBridge,
}

pub enum AwsAdapter {
    Sqs(Arc<SqsAdapter>),
    EventBridge(Arc<EventBridgeAdapter>),
    Both {
        sqs: Arc<SqsAdapter>,
        eventbridge: Arc<EventBridgeAdapter>,
        default_type: AwsAdapterType,
    },
}

impl AwsAdapter {
    pub async fn new(
        adapter_type: AwsAdapterType,
        config: AwsConfig,
    ) -> common::Result<Self> {
        match adapter_type {
            AwsAdapterType::Sqs => {
                let sqs_config = sqs::SqsConfig {
                    region: config.region.clone(),
                    queue_prefix: config.queue_prefix.clone(),
                    visibility_timeout_seconds: config.visibility_timeout_seconds,
                    message_retention_seconds: config.message_retention_seconds,
                    receive_wait_time_seconds: config.receive_wait_time_seconds,
                };
                Ok(AwsAdapter::Sqs(Arc::new(
                    SqsAdapter::new(sqs_config).await?
                )))
            }
            AwsAdapterType::EventBridge => {
                let eb_config = eventbridge::EventBridgeConfig {
                    region: config.region.clone(),
                    event_bus_name: config.event_bus_name.clone(),
                    source: config.source.clone(),
                };
                Ok(AwsAdapter::EventBridge(Arc::new(
                    EventBridgeAdapter::new(eb_config).await?
                )))
            }
        }
    }

    pub async fn new_with_both(
        default_type: AwsAdapterType,
        config: AwsConfig,
    ) -> common::Result<Self> {
        tracing::info!(
            "AwsAdapter::new_with_both: Initializing with default_type: {:?}, region: {}, queue_prefix: {:?}",
            default_type,
            config.region,
            config.queue_prefix
        );
        
        let sqs_config = sqs::SqsConfig {
            region: config.region.clone(),
            queue_prefix: config.queue_prefix.clone(),
            visibility_timeout_seconds: config.visibility_timeout_seconds,
            message_retention_seconds: config.message_retention_seconds,
            receive_wait_time_seconds: config.receive_wait_time_seconds,
        };
        tracing::info!("AwsAdapter::new_with_both: Creating SQS adapter...");
        let sqs_adapter = Arc::new(SqsAdapter::new(sqs_config).await.map_err(|e| {
            tracing::error!("AwsAdapter::new_with_both: Failed to create SQS adapter: {}", e);
            e
        })?);
        tracing::info!("AwsAdapter::new_with_both: SQS adapter created successfully");

        let eb_config = eventbridge::EventBridgeConfig {
            region: config.region.clone(),
            event_bus_name: config.event_bus_name.clone(),
            source: config.source.clone(),
        };
        tracing::info!("AwsAdapter::new_with_both: Creating EventBridge adapter...");
        let eb_adapter = Arc::new(EventBridgeAdapter::new(eb_config).await.map_err(|e| {
            tracing::error!("AwsAdapter::new_with_both: Failed to create EventBridge adapter: {}", e);
            e
        })?);
        tracing::info!("AwsAdapter::new_with_both: EventBridge adapter created successfully");

        tracing::info!(
            "AwsAdapter::new_with_both: Both adapters initialized successfully with default_type: {:?}",
            default_type
        );
        
        Ok(AwsAdapter::Both {
            sqs: sqs_adapter,
            eventbridge: eb_adapter,
            default_type,
        })
    }

    pub async fn publish(
        &self,
        topic: impl Into<String>,
        payload: Value,
    ) -> common::Result<()> {
        match self {
            AwsAdapter::Sqs(adapter) => adapter.publish(topic, payload).await,
            AwsAdapter::EventBridge(adapter) => adapter.publish(topic, payload).await,
            AwsAdapter::Both { sqs, eventbridge: _, default_type: _ } => {
                sqs.publish(topic, payload).await
            }
        }
    }

    pub async fn publish_with_type(
        &self,
        topic: impl Into<String>,
        payload: Value,
        adapter_type: Option<&str>,
    ) -> common::Result<()> {
        let topic_str = topic.into();
        match self {
            AwsAdapter::Sqs(adapter) => {
                tracing::info!("AwsAdapter::publish_with_type: Using SQS adapter for topic: {}", topic_str);
                adapter.publish(topic_str, payload).await
            }
            AwsAdapter::EventBridge(adapter) => {
                tracing::info!("AwsAdapter::publish_with_type: Using EventBridge adapter for topic: {}", topic_str);
                adapter.publish(topic_str, payload).await
            }
            AwsAdapter::Both { sqs, eventbridge, default_type } => {
                let use_type = adapter_type
                    .map(|s| s.to_lowercase())
                    .unwrap_or_else(|| match default_type {
                        AwsAdapterType::Sqs => "sqs".to_string(),
                        AwsAdapterType::EventBridge => "eventbridge".to_string(),
                    });
                
                tracing::info!(
                    "AwsAdapter::publish_with_type: Both mode - requested: {:?}, using: {}, topic: {}",
                    adapter_type,
                    use_type,
                    topic_str
                );
                
                match use_type.as_str() {
                    "sqs" => {
                        tracing::info!("AwsAdapter::publish_with_type: Routing to SQS for topic: {}", topic_str);
                        sqs.publish(topic_str, payload).await
                    }
                    "eventbridge" => {
                        tracing::info!("AwsAdapter::publish_with_type: Routing to EventBridge for topic: {}", topic_str);
                        eventbridge.publish(topic_str, payload).await
                    }
                    _ => {
                        tracing::warn!(
                            "AwsAdapter::publish_with_type: Unknown adapter type '{}', falling back to default for topic: {}",
                            use_type,
                            topic_str
                        );
                        match default_type {
                            AwsAdapterType::Sqs => sqs.publish(topic_str, payload).await,
                            AwsAdapterType::EventBridge => eventbridge.publish(topic_str, payload).await,
                        }
                    }
                }
            }
        }
    }

    pub async fn subscribe_fn<F, Fut>(&self, topic: impl Into<String>, handler: F) -> common::Result<()>
    where
        F: Fn(common::Message) -> Fut + Send + Sync + 'static,
        Fut: std::future::Future<Output = common::Result<()>> + Send + 'static,
    {
        self.subscribe_with_type(topic, handler, None).await
    }

    pub async fn subscribe_with_type<F, Fut>(
        &self,
        topic: impl Into<String>,
        handler: F,
        adapter_type: Option<&str>,
    ) -> common::Result<()>
    where
        F: Fn(common::Message) -> Fut + Send + Sync + 'static,
        Fut: std::future::Future<Output = common::Result<()>> + Send + 'static,
    {
        let topic_str = topic.into();
        match self {
            AwsAdapter::Sqs(adapter) => adapter.subscribe_fn(topic_str, handler).await,
            AwsAdapter::EventBridge(adapter) => adapter.subscribe_fn(topic_str, handler).await,
            AwsAdapter::Both { sqs, eventbridge, default_type } => {
                let use_type = adapter_type
                    .map(|s| s.to_lowercase())
                    .unwrap_or_else(|| match default_type {
                        AwsAdapterType::Sqs => "sqs".to_string(),
                        AwsAdapterType::EventBridge => "eventbridge".to_string(),
                    });

                tracing::info!(
                    "AwsAdapter::subscribe_with_type: Both mode - requested: {:?}, using: {}, topic: {}",
                    adapter_type,
                    use_type,
                    topic_str
                );

                match use_type.as_str() {
                    "sqs" => {
                        tracing::info!("AwsAdapter::subscribe_with_type: Using SQS for subscription - topic: {}", topic_str);
                        sqs.subscribe_fn(topic_str, handler).await
                    }
                    "eventbridge" => {
                        tracing::info!("AwsAdapter::subscribe_with_type: Using EventBridge for subscription - topic: {}", topic_str);
                        eventbridge.subscribe_fn(topic_str, handler).await
                    }
                    _ => {
                        tracing::warn!(
                            "AwsAdapter::subscribe_with_type: Unknown adapter type '{}', falling back to default for topic: {}",
                            use_type,
                            topic_str
                        );
                        match default_type {
                            AwsAdapterType::Sqs => sqs.subscribe_fn(topic_str, handler).await,
                            AwsAdapterType::EventBridge => eventbridge.subscribe_fn(topic_str, handler).await,
                        }
                    }
                }
            }
        }
    }

    pub async fn list_topics(&self) -> Vec<String> {
        match self {
            AwsAdapter::Sqs(adapter) => adapter.list_topics().await,
            AwsAdapter::EventBridge(adapter) => adapter.list_topics().await,
            AwsAdapter::Both { sqs, eventbridge, default_type: _ } => {
                let mut topics = sqs.list_topics().await;
                let mut eb_topics = eventbridge.list_topics().await;
                topics.append(&mut eb_topics);
                topics.sort();
                topics.dedup();
                topics
            }
        }
    }
}