1pub mod sqs;
2pub mod eventbridge;
3pub mod common;
4
5pub use common::{AwsConfig, Message, Result};
6pub use sqs::SqsAdapter;
7pub use eventbridge::EventBridgeAdapter;
8
9use serde_json::Value;
10use std::sync::Arc;
11
12#[derive(Debug, Clone, Copy, PartialEq, Eq)]
13pub enum AwsAdapterType {
14 Sqs,
15 EventBridge,
16}
17
18pub enum AwsAdapter {
19 Sqs(Arc<SqsAdapter>),
20 EventBridge(Arc<EventBridgeAdapter>),
21 Both {
22 sqs: Arc<SqsAdapter>,
23 eventbridge: Arc<EventBridgeAdapter>,
24 default_type: AwsAdapterType,
25 },
26}
27
28impl AwsAdapter {
29 pub async fn new(
30 adapter_type: AwsAdapterType,
31 config: AwsConfig,
32 ) -> common::Result<Self> {
33 match adapter_type {
34 AwsAdapterType::Sqs => {
35 let sqs_config = sqs::SqsConfig {
36 region: config.region.clone(),
37 queue_prefix: config.queue_prefix.clone(),
38 visibility_timeout_seconds: config.visibility_timeout_seconds,
39 message_retention_seconds: config.message_retention_seconds,
40 receive_wait_time_seconds: config.receive_wait_time_seconds,
41 };
42 Ok(AwsAdapter::Sqs(Arc::new(
43 SqsAdapter::new(sqs_config).await?
44 )))
45 }
46 AwsAdapterType::EventBridge => {
47 let eb_config = eventbridge::EventBridgeConfig {
48 region: config.region.clone(),
49 event_bus_name: config.event_bus_name.clone(),
50 source: config.source.clone(),
51 };
52 Ok(AwsAdapter::EventBridge(Arc::new(
53 EventBridgeAdapter::new(eb_config).await?
54 )))
55 }
56 }
57 }
58
59 pub async fn new_with_both(
60 default_type: AwsAdapterType,
61 config: AwsConfig,
62 ) -> common::Result<Self> {
63 tracing::info!(
64 "AwsAdapter::new_with_both: Initializing with default_type: {:?}, region: {}, queue_prefix: {:?}",
65 default_type,
66 config.region,
67 config.queue_prefix
68 );
69
70 let sqs_config = sqs::SqsConfig {
71 region: config.region.clone(),
72 queue_prefix: config.queue_prefix.clone(),
73 visibility_timeout_seconds: config.visibility_timeout_seconds,
74 message_retention_seconds: config.message_retention_seconds,
75 receive_wait_time_seconds: config.receive_wait_time_seconds,
76 };
77 tracing::info!("AwsAdapter::new_with_both: Creating SQS adapter...");
78 let sqs_adapter = Arc::new(SqsAdapter::new(sqs_config).await.map_err(|e| {
79 tracing::error!("AwsAdapter::new_with_both: Failed to create SQS adapter: {}", e);
80 e
81 })?);
82 tracing::info!("AwsAdapter::new_with_both: SQS adapter created successfully");
83
84 let eb_config = eventbridge::EventBridgeConfig {
85 region: config.region.clone(),
86 event_bus_name: config.event_bus_name.clone(),
87 source: config.source.clone(),
88 };
89 tracing::info!("AwsAdapter::new_with_both: Creating EventBridge adapter...");
90 let eb_adapter = Arc::new(EventBridgeAdapter::new(eb_config).await.map_err(|e| {
91 tracing::error!("AwsAdapter::new_with_both: Failed to create EventBridge adapter: {}", e);
92 e
93 })?);
94 tracing::info!("AwsAdapter::new_with_both: EventBridge adapter created successfully");
95
96 tracing::info!(
97 "AwsAdapter::new_with_both: Both adapters initialized successfully with default_type: {:?}",
98 default_type
99 );
100
101 Ok(AwsAdapter::Both {
102 sqs: sqs_adapter,
103 eventbridge: eb_adapter,
104 default_type,
105 })
106 }
107
108 pub async fn publish(
109 &self,
110 topic: impl Into<String>,
111 payload: Value,
112 ) -> common::Result<()> {
113 match self {
114 AwsAdapter::Sqs(adapter) => adapter.publish(topic, payload).await,
115 AwsAdapter::EventBridge(adapter) => adapter.publish(topic, payload).await,
116 AwsAdapter::Both { sqs, eventbridge: _, default_type: _ } => {
117 sqs.publish(topic, payload).await
118 }
119 }
120 }
121
122 pub async fn publish_with_type(
123 &self,
124 topic: impl Into<String>,
125 payload: Value,
126 adapter_type: Option<&str>,
127 ) -> common::Result<()> {
128 let topic_str = topic.into();
129 match self {
130 AwsAdapter::Sqs(adapter) => {
131 tracing::info!("AwsAdapter::publish_with_type: Using SQS adapter for topic: {}", topic_str);
132 adapter.publish(topic_str, payload).await
133 }
134 AwsAdapter::EventBridge(adapter) => {
135 tracing::info!("AwsAdapter::publish_with_type: Using EventBridge adapter for topic: {}", topic_str);
136 adapter.publish(topic_str, payload).await
137 }
138 AwsAdapter::Both { sqs, eventbridge, default_type } => {
139 let use_type = adapter_type
140 .map(|s| s.to_lowercase())
141 .unwrap_or_else(|| match default_type {
142 AwsAdapterType::Sqs => "sqs".to_string(),
143 AwsAdapterType::EventBridge => "eventbridge".to_string(),
144 });
145
146 tracing::info!(
147 "AwsAdapter::publish_with_type: Both mode - requested: {:?}, using: {}, topic: {}",
148 adapter_type,
149 use_type,
150 topic_str
151 );
152
153 match use_type.as_str() {
154 "sqs" => {
155 tracing::info!("AwsAdapter::publish_with_type: Routing to SQS for topic: {}", topic_str);
156 sqs.publish(topic_str, payload).await
157 }
158 "eventbridge" => {
159 tracing::info!("AwsAdapter::publish_with_type: Routing to EventBridge for topic: {}", topic_str);
160 eventbridge.publish(topic_str, payload).await
161 }
162 _ => {
163 tracing::warn!(
164 "AwsAdapter::publish_with_type: Unknown adapter type '{}', falling back to default for topic: {}",
165 use_type,
166 topic_str
167 );
168 match default_type {
169 AwsAdapterType::Sqs => sqs.publish(topic_str, payload).await,
170 AwsAdapterType::EventBridge => eventbridge.publish(topic_str, payload).await,
171 }
172 }
173 }
174 }
175 }
176 }
177
178 pub async fn subscribe_fn<F, Fut>(&self, topic: impl Into<String>, handler: F) -> common::Result<()>
179 where
180 F: Fn(common::Message) -> Fut + Send + Sync + 'static,
181 Fut: std::future::Future<Output = common::Result<()>> + Send + 'static,
182 {
183 self.subscribe_with_type(topic, handler, None).await
184 }
185
186 pub async fn subscribe_with_type<F, Fut>(
187 &self,
188 topic: impl Into<String>,
189 handler: F,
190 adapter_type: Option<&str>,
191 ) -> common::Result<()>
192 where
193 F: Fn(common::Message) -> Fut + Send + Sync + 'static,
194 Fut: std::future::Future<Output = common::Result<()>> + Send + 'static,
195 {
196 let topic_str = topic.into();
197 match self {
198 AwsAdapter::Sqs(adapter) => adapter.subscribe_fn(topic_str, handler).await,
199 AwsAdapter::EventBridge(adapter) => adapter.subscribe_fn(topic_str, handler).await,
200 AwsAdapter::Both { sqs, eventbridge, default_type } => {
201 let use_type = adapter_type
202 .map(|s| s.to_lowercase())
203 .unwrap_or_else(|| match default_type {
204 AwsAdapterType::Sqs => "sqs".to_string(),
205 AwsAdapterType::EventBridge => "eventbridge".to_string(),
206 });
207
208 tracing::info!(
209 "AwsAdapter::subscribe_with_type: Both mode - requested: {:?}, using: {}, topic: {}",
210 adapter_type,
211 use_type,
212 topic_str
213 );
214
215 match use_type.as_str() {
216 "sqs" => {
217 tracing::info!("AwsAdapter::subscribe_with_type: Using SQS for subscription - topic: {}", topic_str);
218 sqs.subscribe_fn(topic_str, handler).await
219 }
220 "eventbridge" => {
221 tracing::info!("AwsAdapter::subscribe_with_type: Using EventBridge for subscription - topic: {}", topic_str);
222 eventbridge.subscribe_fn(topic_str, handler).await
223 }
224 _ => {
225 tracing::warn!(
226 "AwsAdapter::subscribe_with_type: Unknown adapter type '{}', falling back to default for topic: {}",
227 use_type,
228 topic_str
229 );
230 match default_type {
231 AwsAdapterType::Sqs => sqs.subscribe_fn(topic_str, handler).await,
232 AwsAdapterType::EventBridge => eventbridge.subscribe_fn(topic_str, handler).await,
233 }
234 }
235 }
236 }
237 }
238 }
239
240 pub async fn list_topics(&self) -> Vec<String> {
241 match self {
242 AwsAdapter::Sqs(adapter) => adapter.list_topics().await,
243 AwsAdapter::EventBridge(adapter) => adapter.list_topics().await,
244 AwsAdapter::Both { sqs, eventbridge, default_type: _ } => {
245 let mut topics = sqs.list_topics().await;
246 let mut eb_topics = eventbridge.list_topics().await;
247 topics.append(&mut eb_topics);
248 topics.sort();
249 topics.dedup();
250 topics
251 }
252 }
253 }
254}
255