Skip to main content

adapter_aws/
lib.rs

1pub mod sqs;
2pub mod eventbridge;
3pub mod common;
4
5pub use common::{AwsConfig, Message, Result};
6pub use sqs::SqsAdapter;
7pub use eventbridge::EventBridgeAdapter;
8
9use serde_json::Value;
10use std::sync::Arc;
11
12#[derive(Debug, Clone, Copy, PartialEq, Eq)]
13pub enum AwsAdapterType {
14    Sqs,
15    EventBridge,
16}
17
18pub enum AwsAdapter {
19    Sqs(Arc<SqsAdapter>),
20    EventBridge(Arc<EventBridgeAdapter>),
21    Both {
22        sqs: Arc<SqsAdapter>,
23        eventbridge: Arc<EventBridgeAdapter>,
24        default_type: AwsAdapterType,
25    },
26}
27
28impl AwsAdapter {
29    pub async fn new(
30        adapter_type: AwsAdapterType,
31        config: AwsConfig,
32    ) -> common::Result<Self> {
33        match adapter_type {
34            AwsAdapterType::Sqs => {
35                let sqs_config = sqs::SqsConfig {
36                    region: config.region.clone(),
37                    queue_prefix: config.queue_prefix.clone(),
38                    visibility_timeout_seconds: config.visibility_timeout_seconds,
39                    message_retention_seconds: config.message_retention_seconds,
40                    receive_wait_time_seconds: config.receive_wait_time_seconds,
41                };
42                Ok(AwsAdapter::Sqs(Arc::new(
43                    SqsAdapter::new(sqs_config).await?
44                )))
45            }
46            AwsAdapterType::EventBridge => {
47                let eb_config = eventbridge::EventBridgeConfig {
48                    region: config.region.clone(),
49                    event_bus_name: config.event_bus_name.clone(),
50                    source: config.source.clone(),
51                };
52                Ok(AwsAdapter::EventBridge(Arc::new(
53                    EventBridgeAdapter::new(eb_config).await?
54                )))
55            }
56        }
57    }
58
59    pub async fn new_with_both(
60        default_type: AwsAdapterType,
61        config: AwsConfig,
62    ) -> common::Result<Self> {
63        tracing::info!(
64            "AwsAdapter::new_with_both: Initializing with default_type: {:?}, region: {}, queue_prefix: {:?}",
65            default_type,
66            config.region,
67            config.queue_prefix
68        );
69        
70        let sqs_config = sqs::SqsConfig {
71            region: config.region.clone(),
72            queue_prefix: config.queue_prefix.clone(),
73            visibility_timeout_seconds: config.visibility_timeout_seconds,
74            message_retention_seconds: config.message_retention_seconds,
75            receive_wait_time_seconds: config.receive_wait_time_seconds,
76        };
77        tracing::info!("AwsAdapter::new_with_both: Creating SQS adapter...");
78        let sqs_adapter = Arc::new(SqsAdapter::new(sqs_config).await.map_err(|e| {
79            tracing::error!("AwsAdapter::new_with_both: Failed to create SQS adapter: {}", e);
80            e
81        })?);
82        tracing::info!("AwsAdapter::new_with_both: SQS adapter created successfully");
83
84        let eb_config = eventbridge::EventBridgeConfig {
85            region: config.region.clone(),
86            event_bus_name: config.event_bus_name.clone(),
87            source: config.source.clone(),
88        };
89        tracing::info!("AwsAdapter::new_with_both: Creating EventBridge adapter...");
90        let eb_adapter = Arc::new(EventBridgeAdapter::new(eb_config).await.map_err(|e| {
91            tracing::error!("AwsAdapter::new_with_both: Failed to create EventBridge adapter: {}", e);
92            e
93        })?);
94        tracing::info!("AwsAdapter::new_with_both: EventBridge adapter created successfully");
95
96        tracing::info!(
97            "AwsAdapter::new_with_both: Both adapters initialized successfully with default_type: {:?}",
98            default_type
99        );
100        
101        Ok(AwsAdapter::Both {
102            sqs: sqs_adapter,
103            eventbridge: eb_adapter,
104            default_type,
105        })
106    }
107
108    pub async fn publish(
109        &self,
110        topic: impl Into<String>,
111        payload: Value,
112    ) -> common::Result<()> {
113        match self {
114            AwsAdapter::Sqs(adapter) => adapter.publish(topic, payload).await,
115            AwsAdapter::EventBridge(adapter) => adapter.publish(topic, payload).await,
116            AwsAdapter::Both { sqs, eventbridge: _, default_type: _ } => {
117                sqs.publish(topic, payload).await
118            }
119        }
120    }
121
122    pub async fn publish_with_type(
123        &self,
124        topic: impl Into<String>,
125        payload: Value,
126        adapter_type: Option<&str>,
127    ) -> common::Result<()> {
128        let topic_str = topic.into();
129        match self {
130            AwsAdapter::Sqs(adapter) => {
131                tracing::info!("AwsAdapter::publish_with_type: Using SQS adapter for topic: {}", topic_str);
132                adapter.publish(topic_str, payload).await
133            }
134            AwsAdapter::EventBridge(adapter) => {
135                tracing::info!("AwsAdapter::publish_with_type: Using EventBridge adapter for topic: {}", topic_str);
136                adapter.publish(topic_str, payload).await
137            }
138            AwsAdapter::Both { sqs, eventbridge, default_type } => {
139                let use_type = adapter_type
140                    .map(|s| s.to_lowercase())
141                    .unwrap_or_else(|| match default_type {
142                        AwsAdapterType::Sqs => "sqs".to_string(),
143                        AwsAdapterType::EventBridge => "eventbridge".to_string(),
144                    });
145                
146                tracing::info!(
147                    "AwsAdapter::publish_with_type: Both mode - requested: {:?}, using: {}, topic: {}",
148                    adapter_type,
149                    use_type,
150                    topic_str
151                );
152                
153                match use_type.as_str() {
154                    "sqs" => {
155                        tracing::info!("AwsAdapter::publish_with_type: Routing to SQS for topic: {}", topic_str);
156                        sqs.publish(topic_str, payload).await
157                    }
158                    "eventbridge" => {
159                        tracing::info!("AwsAdapter::publish_with_type: Routing to EventBridge for topic: {}", topic_str);
160                        eventbridge.publish(topic_str, payload).await
161                    }
162                    _ => {
163                        tracing::warn!(
164                            "AwsAdapter::publish_with_type: Unknown adapter type '{}', falling back to default for topic: {}",
165                            use_type,
166                            topic_str
167                        );
168                        match default_type {
169                            AwsAdapterType::Sqs => sqs.publish(topic_str, payload).await,
170                            AwsAdapterType::EventBridge => eventbridge.publish(topic_str, payload).await,
171                        }
172                    }
173                }
174            }
175        }
176    }
177
178    pub async fn subscribe_fn<F, Fut>(&self, topic: impl Into<String>, handler: F) -> common::Result<()>
179    where
180        F: Fn(common::Message) -> Fut + Send + Sync + 'static,
181        Fut: std::future::Future<Output = common::Result<()>> + Send + 'static,
182    {
183        self.subscribe_with_type(topic, handler, None).await
184    }
185
186    pub async fn subscribe_with_type<F, Fut>(
187        &self,
188        topic: impl Into<String>,
189        handler: F,
190        adapter_type: Option<&str>,
191    ) -> common::Result<()>
192    where
193        F: Fn(common::Message) -> Fut + Send + Sync + 'static,
194        Fut: std::future::Future<Output = common::Result<()>> + Send + 'static,
195    {
196        let topic_str = topic.into();
197        match self {
198            AwsAdapter::Sqs(adapter) => adapter.subscribe_fn(topic_str, handler).await,
199            AwsAdapter::EventBridge(adapter) => adapter.subscribe_fn(topic_str, handler).await,
200            AwsAdapter::Both { sqs, eventbridge, default_type } => {
201                let use_type = adapter_type
202                    .map(|s| s.to_lowercase())
203                    .unwrap_or_else(|| match default_type {
204                        AwsAdapterType::Sqs => "sqs".to_string(),
205                        AwsAdapterType::EventBridge => "eventbridge".to_string(),
206                    });
207
208                tracing::info!(
209                    "AwsAdapter::subscribe_with_type: Both mode - requested: {:?}, using: {}, topic: {}",
210                    adapter_type,
211                    use_type,
212                    topic_str
213                );
214
215                match use_type.as_str() {
216                    "sqs" => {
217                        tracing::info!("AwsAdapter::subscribe_with_type: Using SQS for subscription - topic: {}", topic_str);
218                        sqs.subscribe_fn(topic_str, handler).await
219                    }
220                    "eventbridge" => {
221                        tracing::info!("AwsAdapter::subscribe_with_type: Using EventBridge for subscription - topic: {}", topic_str);
222                        eventbridge.subscribe_fn(topic_str, handler).await
223                    }
224                    _ => {
225                        tracing::warn!(
226                            "AwsAdapter::subscribe_with_type: Unknown adapter type '{}', falling back to default for topic: {}",
227                            use_type,
228                            topic_str
229                        );
230                        match default_type {
231                            AwsAdapterType::Sqs => sqs.subscribe_fn(topic_str, handler).await,
232                            AwsAdapterType::EventBridge => eventbridge.subscribe_fn(topic_str, handler).await,
233                        }
234                    }
235                }
236            }
237        }
238    }
239
240    pub async fn list_topics(&self) -> Vec<String> {
241        match self {
242            AwsAdapter::Sqs(adapter) => adapter.list_topics().await,
243            AwsAdapter::EventBridge(adapter) => adapter.list_topics().await,
244            AwsAdapter::Both { sqs, eventbridge, default_type: _ } => {
245                let mut topics = sqs.list_topics().await;
246                let mut eb_topics = eventbridge.list_topics().await;
247                topics.append(&mut eb_topics);
248                topics.sort();
249                topics.dedup();
250                topics
251            }
252        }
253    }
254}
255