1use crate::common::{AdapterError, Message, MessageHandler, Result};
2use aws_sdk_eventbridge::Client as EventBridgeClient;
3use aws_sdk_sqs::Client as SqsClient;
4use async_trait::async_trait;
5use std::collections::HashMap;
6use std::sync::Arc;
7use tokio::sync::RwLock;
8use tracing::{debug, error, info, warn};
9
10#[derive(Debug, Clone)]
11pub struct EventBridgeConfig {
12 pub region: String,
13 pub event_bus_name: Option<String>,
14 pub source: Option<String>,
15}
16
17impl Default for EventBridgeConfig {
18 fn default() -> Self {
19 Self {
20 region: "us-east-1".to_string(),
21 event_bus_name: None, source: Some("rohas".to_string()),
23 }
24 }
25}
26
27pub struct EventBridgeAdapter {
28 client: EventBridgeClient,
29 sqs_client: SqsClient,
30 #[allow(dead_code)]
31 config: EventBridgeConfig,
32 event_bus_name: String,
33 source: String,
34 published_topics: Arc<RwLock<HashMap<String, ()>>>,
35 queue_urls: Arc<RwLock<HashMap<String, String>>>, rule_names: Arc<RwLock<HashMap<String, String>>>, }
38
39impl EventBridgeAdapter {
40
41 pub async fn new(config: EventBridgeConfig) -> Result<Self> {
42 let aws_config = aws_config::defaults(aws_config::BehaviorVersion::latest())
43 .region(aws_sdk_eventbridge::config::Region::new(config.region.clone()))
44 .load()
45 .await;
46
47 let client = EventBridgeClient::new(&aws_config);
48 let sqs_client = SqsClient::new(&aws_config);
49
50 let event_bus_name = config.event_bus_name.clone().unwrap_or_else(|| "default".to_string());
51 let source = config.source.clone().unwrap_or_else(|| "rohas".to_string());
52
53 info!(
54 "Initialized EventBridge adapter for region: {}, event_bus: {}, source: {}",
55 config.region, event_bus_name, source
56 );
57
58 Ok(Self {
59 client,
60 sqs_client,
61 config,
62 event_bus_name,
63 source,
64 published_topics: Arc::new(RwLock::new(HashMap::new())),
65 queue_urls: Arc::new(RwLock::new(HashMap::new())),
66 rule_names: Arc::new(RwLock::new(HashMap::new())),
67 })
68 }
69
70 pub async fn publish(
71 &self,
72 topic: impl Into<String>,
73 payload: serde_json::Value,
74 ) -> Result<()> {
75 let topic = topic.into();
76 let message = Message::new(topic.clone(), payload.clone());
77
78 {
79 let mut topics = self.published_topics.write().await;
80 topics.insert(topic.clone(), ());
81 }
82
83 let detail = serde_json::to_string(&message)
84 .map_err(|e| AdapterError::Serialization(e))?;
85
86 info!("Publishing EventBridge event - source: '{}', detail-type: '{}', detail length: {} bytes",
87 self.source, topic, detail.len());
88 debug!("EventBridge event detail content: {}", detail);
89
90 let mut event_builder = aws_sdk_eventbridge::types::PutEventsRequestEntry::builder()
91 .source(&self.source)
92 .detail_type(&topic)
93 .detail(&detail);
94
95 if self.event_bus_name != "default" {
96 event_builder = event_builder.event_bus_name(&self.event_bus_name);
97 }
98
99 let event = event_builder.build();
100
101 let send_result = self
102 .client
103 .put_events()
104 .set_entries(Some(vec![event]))
105 .send()
106 .await;
107
108 match send_result {
109 Ok(response) => {
110 let entries = response.entries();
111 if !entries.is_empty() {
112 if let Some(entry) = entries.first() {
113 if let Some(error_code) = entry.error_code() {
114 error!(
115 "EventBridge publish failed for topic '{}': {} - {}",
116 topic,
117 error_code,
118 entry.error_message().unwrap_or("Unknown error")
119 );
120 return Err(AdapterError::AwsEventBridge(format!(
121 "Failed to publish event: {} - {}",
122 error_code,
123 entry.error_message().unwrap_or("Unknown error")
124 )));
125 }
126 }
127 }
128 info!(
129 "Published message to EventBridge topic: {} (event_bus: {}, source: {})",
130 topic, self.event_bus_name, self.source
131 );
132 Ok(())
133 }
134 Err(e) => {
135 error!("Failed to send message to EventBridge '{}': {}", topic, e);
136 Err(AdapterError::AwsEventBridge(format!(
137 "Failed to send event: {}",
138 e
139 )))
140 }
141 }
142 }
143
144 async fn get_or_create_queue(&self, topic: &str) -> Result<String> {
145 {
146 let queue_urls = self.queue_urls.read().await;
147 if let Some(url) = queue_urls.get(topic) {
148 return Ok(url.clone());
149 }
150 }
151
152 let queue_name = format!("rohas-eb-{}", topic)
153 .chars()
154 .map(|c| {
155 if c.is_alphanumeric() || c == '-' || c == '_' {
156 c
157 } else {
158 '-'
159 }
160 })
161 .collect::<String>();
162
163 info!("Checking if SQS queue '{}' exists...", queue_name);
164 let get_queue_result = self
165 .sqs_client
166 .get_queue_url()
167 .queue_name(&queue_name)
168 .send()
169 .await;
170
171 let queue_url = match get_queue_result {
172 Ok(response) => {
173 if let Some(url) = response.queue_url() {
174 info!("Found existing SQS queue for EventBridge topic '{}': {}", topic, url);
175 url.to_string()
176 } else {
177 error!("Queue URL not returned for '{}'", queue_name);
178 return Err(AdapterError::AwsEventBridge(format!(
179 "Queue URL not returned for '{}'",
180 queue_name
181 )));
182 }
183 }
184 Err(e) => {
185 warn!("SQS queue '{}' not found (error: {}), creating new queue...", queue_name, e);
186 info!("Creating SQS queue for EventBridge topic '{}': {}", topic, queue_name);
187 let create_result = self
188 .sqs_client
189 .create_queue()
190 .queue_name(&queue_name)
191 .send()
192 .await
193 .map_err(|e| {
194 error!("Failed to create SQS queue '{}': {}", queue_name, e);
195 AdapterError::AwsEventBridge(format!("Failed to create queue '{}': {}", queue_name, e))
196 })?;
197
198 if let Some(url) = create_result.queue_url() {
199 info!("Created SQS queue for EventBridge topic '{}': {}", topic, url);
200 url.to_string()
201 } else {
202 error!("Queue created but no URL returned for '{}'", queue_name);
203 return Err(AdapterError::AwsEventBridge(format!(
204 "Queue created but no URL returned for '{}'",
205 queue_name
206 )));
207 }
208 }
209 };
210
211 {
212 let mut queue_urls = self.queue_urls.write().await;
213 queue_urls.insert(topic.to_string(), queue_url.clone());
214 }
215
216 Ok(queue_url)
217 }
218
219 async fn get_or_create_rule(&self, topic: &str, queue_arn: &str) -> Result<String> {
220 {
221 let rule_names = self.rule_names.read().await;
222 if let Some(rule_name) = rule_names.get(topic) {
223 return Ok(rule_name.clone());
224 }
225 }
226
227 let rule_name = format!("rohas-rule-{}", topic)
228 .chars()
229 .map(|c| {
230 if c.is_alphanumeric() || c == '-' || c == '_' {
231 c
232 } else {
233 '-'
234 }
235 })
236 .collect::<String>();
237
238 let event_pattern = serde_json::json!({
239 "source": [self.source],
240 "detail-type": [topic]
241 });
242
243 info!("EventBridge rule pattern for topic '{}': {}", topic, event_pattern.to_string());
244 info!("Expected event format: source='{}', detail-type='{}'", self.source, topic);
245
246 let get_rule_result = self
247 .client
248 .describe_rule()
249 .name(&rule_name)
250 .set_event_bus_name(if self.event_bus_name != "default" {
251 Some(self.event_bus_name.clone())
252 } else {
253 None
254 })
255 .send()
256 .await;
257
258 let target_id = format!("sqs-target-{}", topic);
259 let target = aws_sdk_eventbridge::types::Target::builder()
260 .id(&target_id)
261 .arn(queue_arn)
262 .build()
263 .map_err(|e| {
264 AdapterError::AwsEventBridge(format!("Failed to build target: {}", e))
265 })?;
266
267 match get_rule_result {
268 Ok(rule_desc) => {
269 info!("Found existing EventBridge rule for topic '{}': {}", topic, rule_name);
270 if let Some(state) = rule_desc.state() {
271 match state {
272 aws_sdk_eventbridge::types::RuleState::Enabled => {
273 info!("EventBridge rule '{}' is ENABLED", rule_name);
274 }
275 aws_sdk_eventbridge::types::RuleState::Disabled => {
276 warn!("EventBridge rule '{}' is DISABLED - enabling it now...", rule_name);
277 match self
278 .client
279 .enable_rule()
280 .name(&rule_name)
281 .set_event_bus_name(if self.event_bus_name != "default" {
282 Some(self.event_bus_name.clone())
283 } else {
284 None
285 })
286 .send()
287 .await
288 {
289 Ok(_) => {
290 info!("EventBridge rule '{}' has been enabled", rule_name);
291 }
292 Err(e) => {
293 error!("Failed to enable EventBridge rule '{}': {}", rule_name, e);
294 return Err(AdapterError::AwsEventBridge(format!(
295 "Failed to enable rule '{}': {}",
296 rule_name, e
297 )));
298 }
299 }
300 }
301 _ => {
302 warn!("EventBridge rule '{}' has unknown state: {:?}", rule_name, state);
303 }
304 }
305 }
306 info!("Ensuring SQS queue target is added to existing rule '{}'", rule_name);
307 }
308 Err(e) => {
309 warn!("EventBridge rule '{}' not found (error: {}), creating new rule...", rule_name, e);
310 info!("Creating EventBridge rule for topic '{}': {}", topic, rule_name);
311
312 let put_rule_result = self
313 .client
314 .put_rule()
315 .name(&rule_name)
316 .event_pattern(event_pattern.to_string())
317 .state(aws_sdk_eventbridge::types::RuleState::Enabled)
318 .set_event_bus_name(if self.event_bus_name != "default" {
319 Some(self.event_bus_name.clone())
320 } else {
321 None
322 })
323 .send()
324 .await
325 .map_err(|e| {
326 error!("Failed to create EventBridge rule '{}': {}", rule_name, e);
327 AdapterError::AwsEventBridge(format!("Failed to create rule '{}': {}", rule_name, e))
328 })?;
329
330 info!("Created EventBridge rule '{}' (arn: {:?})", rule_name, put_rule_result.rule_arn());
331 }
332 }
333
334 info!("Adding SQS queue as target to EventBridge rule '{}'", rule_name);
335 info!("Target details: ID='{}', ARN='{}'", target_id, queue_arn);
336 let put_targets_result = self
337 .client
338 .put_targets()
339 .rule(&rule_name)
340 .set_targets(Some(vec![target]))
341 .set_event_bus_name(if self.event_bus_name != "default" {
342 Some(self.event_bus_name.clone())
343 } else {
344 None
345 })
346 .send()
347 .await
348 .map_err(|e| {
349 error!("Failed to add target to EventBridge rule '{}': {}", rule_name, e);
350 AdapterError::AwsEventBridge(format!("Failed to add target to rule '{}': {}", rule_name, e))
351 })?;
352
353 let failed_entries = put_targets_result.failed_entries();
354 if !failed_entries.is_empty() {
355 error!("Failed to add target to EventBridge rule '{}': {:?}", rule_name, failed_entries);
356 for entry in failed_entries {
357 error!(" - Error Code: {}, Error Message: {}",
358 entry.error_code().unwrap_or("unknown"),
359 entry.error_message().unwrap_or("unknown"));
360 }
361 return Err(AdapterError::AwsEventBridge(format!(
362 "Failed to add target to rule '{}': {:?}",
363 rule_name, failed_entries
364 )));
365 }
366
367 info!("Successfully added SQS queue as target to EventBridge rule '{}'", rule_name);
368 info!("Target configuration: Queue ARN='{}', Target ID='{}'", queue_arn, target_id);
369
370 let list_targets_result = self
371 .client
372 .list_targets_by_rule()
373 .rule(&rule_name)
374 .set_event_bus_name(if self.event_bus_name != "default" {
375 Some(self.event_bus_name.clone())
376 } else {
377 None
378 })
379 .send()
380 .await;
381
382 if let Ok(targets_response) = list_targets_result {
383 let targets = targets_response.targets();
384 if targets.is_empty() {
385 error!("CRITICAL: EventBridge rule '{}' has NO TARGETS configured!", rule_name);
386 return Err(AdapterError::AwsEventBridge(format!(
387 "Rule '{}' has no targets configured",
388 rule_name
389 )));
390 } else {
391 info!("Verified: EventBridge rule '{}' has {} target(s) configured", rule_name, targets.len());
392 for target in targets {
393 let target_arn = target.arn();
394 if target_arn == queue_arn {
395 info!("Target verified: SQS queue ARN '{}' is configured as target", queue_arn);
396 } else {
397 warn!("Found target with different ARN: {} (expected: {})", target_arn, queue_arn);
398 }
399 }
400 }
401 } else {
402 warn!("Could not list targets for rule '{}'", rule_name);
403 }
404
405 let verify_result = self
406 .client
407 .describe_rule()
408 .name(&rule_name)
409 .set_event_bus_name(if self.event_bus_name != "default" {
410 Some(self.event_bus_name.clone())
411 } else {
412 None
413 })
414 .send()
415 .await;
416
417 if let Ok(rule_desc) = verify_result {
418 if let Some(state) = rule_desc.state() {
419 match state {
420 aws_sdk_eventbridge::types::RuleState::Enabled => {
421 info!("Verified: EventBridge rule '{}' is ENABLED and ready", rule_name);
422 }
423 aws_sdk_eventbridge::types::RuleState::Disabled => {
424 error!("CRITICAL: EventBridge rule '{}' is still DISABLED after setup!", rule_name);
425 return Err(AdapterError::AwsEventBridge(format!(
426 "Rule '{}' is disabled and could not be enabled",
427 rule_name
428 )));
429 }
430 _ => {
431 warn!("EventBridge rule '{}' has unknown state: {:?}", rule_name, state);
432 }
433 }
434 }
435 } else {
436 warn!("Could not verify rule state after setup");
437 }
438
439 {
440 let mut rule_names = self.rule_names.write().await;
441 rule_names.insert(topic.to_string(), rule_name.clone());
442 }
443
444 Ok(rule_name)
445 }
446
447 async fn get_queue_arn(&self, queue_url: &str) -> Result<String> {
448 let queue_name = queue_url
449 .split('/')
450 .last()
451 .ok_or_else(|| {
452 error!("Invalid queue URL format: {}", queue_url);
453 AdapterError::AwsEventBridge(format!("Invalid queue URL: {}", queue_url))
454 })?;
455
456 info!("Retrieving ARN for SQS queue '{}'...", queue_name);
457
458 let attributes_result = self
459 .sqs_client
460 .get_queue_attributes()
461 .queue_url(queue_url)
462 .attribute_names(aws_sdk_sqs::types::QueueAttributeName::QueueArn)
463 .send()
464 .await
465 .map_err(|e| {
466 error!("Failed to get queue attributes for '{}': {}", queue_name, e);
467 AdapterError::AwsEventBridge(format!("Failed to get queue attributes for '{}': {}", queue_name, e))
468 })?;
469
470 if let Some(attributes) = attributes_result.attributes() {
471 if let Some(arn) = attributes.get(&aws_sdk_sqs::types::QueueAttributeName::QueueArn) {
472 info!("Retrieved ARN for queue '{}': {}", queue_name, arn);
473 return Ok(arn.clone());
474 }
475 }
476
477 error!("Queue ARN not found in attributes for queue '{}'", queue_name);
478 Err(AdapterError::AwsEventBridge(format!(
479 "Queue ARN not found for queue '{}'",
480 queue_name
481 )))
482 }
483
484 pub async fn subscribe_fn<F, Fut>(&self, topic: impl Into<String>, handler: F) -> Result<()>
485 where
486 F: Fn(Message) -> Fut + Send + Sync + 'static,
487 Fut: std::future::Future<Output = Result<()>> + Send + 'static,
488 {
489 let topic = topic.into();
490 info!("=== Setting up EventBridge subscription for topic: {} ===", topic);
491
492 info!("Step 1: Creating/getting SQS queue for topic '{}'...", topic);
493 let queue_url = match self.get_or_create_queue(&topic).await {
494 Ok(url) => {
495 info!("SQS queue ready for EventBridge topic '{}': {}", topic, url);
496 url
497 }
498 Err(e) => {
499 error!("Failed to create/get SQS queue for topic '{}': {}", topic, e);
500 return Err(e);
501 }
502 };
503
504 info!("Step 2: Getting SQS queue ARN for topic '{}'...", topic);
505 let queue_arn = match self.get_queue_arn(&queue_url).await {
506 Ok(arn) => {
507 info!("SQS queue ARN for topic '{}': {}", topic, arn);
508 arn
509 }
510 Err(e) => {
511 error!("Failed to get SQS queue ARN for topic '{}': {}", topic, e);
512 return Err(e);
513 }
514 };
515
516 info!("Step 3: Creating/getting EventBridge rule for topic '{}'...", topic);
517 let rule_name = match self.get_or_create_rule(&topic, &queue_arn).await {
518 Ok(name) => {
519 info!("EventBridge rule ready for topic '{}': {}", topic, name);
520 name
521 }
522 Err(e) => {
523 error!("Failed to create/get EventBridge rule for topic '{}': {}", topic, e);
524 return Err(e);
525 }
526 };
527
528 let account_id = queue_arn
529 .split(':')
530 .nth(4)
531 .unwrap_or("*");
532
533 let rule_arn = if self.event_bus_name == "default" {
537 format!(
538 "arn:aws:events:{}:{}:rule/{}",
539 self.config.region,
540 account_id,
541 rule_name
542 )
543 } else {
544 format!(
545 "arn:aws:events:{}:{}:rule/{}/{}",
546 self.config.region,
547 account_id,
548 self.event_bus_name,
549 rule_name
550 )
551 };
552
553 info!("Step 4: Setting up SQS queue policy for EventBridge access (rule ARN: {})...", rule_arn);
554
555 let policy = serde_json::json!({
556 "Version": "2012-10-17",
557 "Statement": [{
558 "Effect": "Allow",
559 "Principal": {
560 "Service": "events.amazonaws.com"
561 },
562 "Action": "sqs:SendMessage",
563 "Resource": queue_arn,
564 "Condition": {
565 "ArnEquals": {
566 "aws:SourceArn": rule_arn
567 }
568 }
569 }]
570 });
571
572 match self
573 .sqs_client
574 .set_queue_attributes()
575 .queue_url(&queue_url)
576 .attributes(
577 aws_sdk_sqs::types::QueueAttributeName::Policy,
578 policy.to_string(),
579 )
580 .send()
581 .await
582 {
583 Ok(_) => {
584 info!("Successfully set SQS queue policy for EventBridge access");
585 }
586 Err(e) => {
587 warn!("Failed to set SQS queue policy (this may be okay if policy already exists): {}", e);
588 }
589 }
590
591 info!("Step 5: Starting SQS queue polling for topic '{}'...", topic);
592 let sqs_client = self.sqs_client.clone();
593 let topic_clone = topic.clone();
594 let queue_url_clone = queue_url.clone();
595 let queue_arn_clone = queue_arn.clone();
596 let rule_name_clone = rule_name.clone();
597
598 struct ClosureHandler<F, Fut>
599 where
600 F: Fn(Message) -> Fut + Send + Sync,
601 Fut: std::future::Future<Output = Result<()>> + Send,
602 {
603 func: F,
604 }
605
606 #[async_trait]
607 impl<F, Fut> MessageHandler for ClosureHandler<F, Fut>
608 where
609 F: Fn(Message) -> Fut + Send + Sync,
610 Fut: std::future::Future<Output = Result<()>> + Send,
611 {
612 async fn handle(&self, message: Message) -> Result<()> {
613 (self.func)(message).await
614 }
615 }
616
617 let handler = Arc::new(ClosureHandler { func: handler });
618
619 tokio::spawn(async move {
620 info!("EventBridge subscription polling loop started for topic '{}' (queue: {})", topic_clone, queue_url);
621 let mut poll_count = 0u64;
622 loop {
623 poll_count += 1;
624 if poll_count % 10 == 0 {
625 info!("EventBridge polling loop still active for topic '{}' (poll #{}), queue: {}", topic_clone, poll_count, queue_url);
626 }
627 if poll_count == 1 || poll_count % 5 == 0 {
628 info!("Polling SQS queue for EventBridge topic '{}' (poll #{})...", topic_clone, poll_count);
629 } else {
630 debug!("Polling SQS queue for EventBridge topic '{}' (poll #{})...", topic_clone, poll_count);
631 }
632 let receive_result = sqs_client
633 .receive_message()
634 .queue_url(&queue_url)
635 .max_number_of_messages(10)
636 .wait_time_seconds(20)
637 .send()
638 .await;
639
640 match receive_result {
641 Ok(response) => {
642 let messages = response.messages();
643 if !messages.is_empty() {
644 info!("Received {} message(s) from EventBridge queue for topic '{}'", messages.len(), topic_clone);
645 for sqs_message in messages {
646 if let Some(body) = sqs_message.body() {
647 info!("Raw SQS message body for topic '{}': {}", topic_clone, body);
648
649 debug!("Attempting to parse EventBridge message for topic '{}'", topic_clone);
650 let message_result = {
651 debug!("Trying to parse as array of events...");
652 if let Ok(events_array) = serde_json::from_str::<Vec<serde_json::Value>>(body) {
653 debug!("Successfully parsed as array with {} event(s)", events_array.len());
654 if let Some(event) = events_array.first() {
655 debug!("First event structure: {:?}", event);
656 if let Some(detail_str) = event.get("detail").and_then(|d| d.as_str()) {
657 debug!("Found 'detail' field as string (length: {}): {}", detail_str.len(), detail_str);
658 match serde_json::from_str::<Message>(detail_str) {
659 Ok(msg) => {
660 debug!("Successfully parsed Message from detail string");
661 Some(msg)
662 }
663 Err(e) => {
664 debug!("Failed to parse Message from detail string: {}", e);
665 None
666 }
667 }
668 } else if let Some(detail_obj) = event.get("detail") {
669 debug!("Found 'detail' field as object: {:?}", detail_obj);
670 match serde_json::from_value::<Message>(detail_obj.clone()) {
671 Ok(msg) => {
672 debug!("Successfully parsed Message from detail object");
673 Some(msg)
674 }
675 Err(e) => {
676 debug!("Failed to parse Message from detail object: {}", e);
677 None
678 }
679 }
680 } else {
681 debug!("No 'detail' field found in event object");
682 None
683 }
684 } else {
685 debug!("Array is empty");
686 None
687 }
688 } else {
689 debug!("Not an array, trying as single event object...");
690 None
691 }
692 }.or_else(|| {
693 debug!("Trying to parse as single event object...");
694 if let Ok(event_obj) = serde_json::from_str::<serde_json::Value>(body) {
695 debug!("Successfully parsed as event object");
696 if let Some(detail_str) = event_obj.get("detail").and_then(|d| d.as_str()) {
697 debug!("Found 'detail' field as string (length: {}): {}", detail_str.len(), detail_str);
698 match serde_json::from_str::<Message>(detail_str) {
699 Ok(msg) => {
700 debug!("Successfully parsed Message from detail string");
701 Some(msg)
702 }
703 Err(e) => {
704 debug!("Failed to parse Message from detail string: {}", e);
705 None
706 }
707 }
708 } else if let Some(detail_obj) = event_obj.get("detail") {
709 debug!("Found 'detail' field as object: {:?}", detail_obj);
710 match serde_json::from_value::<Message>(detail_obj.clone()) {
711 Ok(msg) => {
712 debug!("Successfully parsed Message from detail object");
713 Some(msg)
714 }
715 Err(e) => {
716 debug!("Failed to parse Message from detail object: {}", e);
717 None
718 }
719 }
720 } else {
721 debug!("No 'detail' field found in event object");
722 None
723 }
724 } else {
725 debug!("Not a valid JSON object, trying direct Message parse...");
726 None
727 }
728 }).or_else(|| {
729 debug!("Trying to parse body directly as Message...");
730 match serde_json::from_str::<Message>(body) {
731 Ok(msg) => {
732 debug!("Successfully parsed body directly as Message");
733 Some(msg)
734 }
735 Err(e) => {
736 debug!("Failed to parse body directly as Message: {}", e);
737 None
738 }
739 }
740 });
741
742 let message_result = message_result.ok_or_else(|| {
743 let last_error = serde_json::from_str::<Message>(body)
744 .map_err(|e| e)
745 .unwrap_err();
746 debug!("All parsing attempts failed. Last error: {}", last_error);
747 last_error
748 });
749
750 match message_result {
751 Ok(message) => {
752 info!("Successfully parsed EventBridge message for topic '{}'", topic_clone);
753 info!("Message topic: {}, payload: {:?}", message.topic, message.payload);
754 info!("Calling handler for EventBridge message...");
755 if let Err(e) = handler.handle(message).await {
756 error!("Handler error for EventBridge topic '{}': {}", topic_clone, e);
757 } else {
758 info!("Handler completed successfully for EventBridge topic '{}'", topic_clone);
759 }
760
761 if let Some(receipt_handle) = sqs_message.receipt_handle() {
762 if let Err(e) = sqs_client
763 .delete_message()
764 .queue_url(&queue_url)
765 .receipt_handle(receipt_handle)
766 .send()
767 .await
768 {
769 warn!(
770 "Failed to delete message from EventBridge queue '{}': {}",
771 queue_url, e
772 );
773 }
774 }
775 }
776 Err(e) => {
777 error!(
778 "Failed to deserialize EventBridge message for topic '{}': {}. Body: {}",
779 topic_clone, e, body
780 );
781 if let Some(receipt_handle) = sqs_message.receipt_handle() {
782 let _ = sqs_client
783 .delete_message()
784 .queue_url(&queue_url)
785 .receipt_handle(receipt_handle)
786 .send()
787 .await;
788 }
789 }
790 }
791 }
792 }
793 } else {
794 debug!("No messages received from EventBridge queue for topic '{}' (this is normal, continuing to poll...)", topic_clone);
795 }
796 }
797 Err(e) => {
798 error!(
799 "Error receiving messages from EventBridge queue '{}' for topic '{}': {}. Retrying in 5 seconds...",
800 queue_url, topic_clone, e
801 );
802 tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
803 }
804 }
805 }
806 });
807
808 info!("=== EventBridge subscription set up successfully for topic: {} ===", topic);
809 info!("Summary:");
810 info!(" - SQS Queue URL: {}", queue_url_clone);
811 info!(" - SQS Queue ARN: {}", queue_arn_clone);
812 info!(" - EventBridge Rule: {}", rule_name_clone);
813 info!(" - Event Pattern: source='{}', detail-type='{}'", self.source, topic);
814 info!(" - Event Bus: {}", self.event_bus_name);
815 info!(" - Target: SQS queue '{}' (ARN: {})", queue_url_clone, queue_arn_clone);
816 info!(" - Polling: Active (long polling enabled, 20s wait time)");
817 info!(" - Next: Events matching the pattern will be routed to the SQS queue");
818 let rule_arn_final = if self.event_bus_name == "default" {
819 format!(
820 "arn:aws:events:{}:{}:rule/{}",
821 self.config.region,
822 queue_arn_clone.split(':').nth(4).unwrap_or("unknown"),
823 rule_name_clone
824 )
825 } else {
826 format!(
827 "arn:aws:events:{}:{}:rule/{}/{}",
828 self.config.region,
829 queue_arn_clone.split(':').nth(4).unwrap_or("unknown"),
830 self.event_bus_name,
831 rule_name_clone
832 )
833 };
834 info!(" - Rule ARN: {}", rule_arn_final);
835 info!(" - To verify: Check AWS EventBridge console for rule '{}' and ensure it has the SQS queue as a target", rule_name_clone);
836 Ok(())
837 }
838
839 pub async fn list_topics(&self) -> Vec<String> {
840 let topics = self.published_topics.read().await;
841 topics.keys().cloned().collect()
842 }
843}
844