Skip to main content

adapter_aws/
eventbridge.rs

1use crate::common::{AdapterError, Message, MessageHandler, Result};
2use aws_sdk_eventbridge::Client as EventBridgeClient;
3use aws_sdk_sqs::Client as SqsClient;
4use async_trait::async_trait;
5use std::collections::HashMap;
6use std::sync::Arc;
7use tokio::sync::RwLock;
8use tracing::{debug, error, info, warn};
9
10#[derive(Debug, Clone)]
11pub struct EventBridgeConfig {
12    pub region: String,
13    pub event_bus_name: Option<String>,
14    pub source: Option<String>,
15}
16
17impl Default for EventBridgeConfig {
18    fn default() -> Self {
19        Self {
20            region: "us-east-1".to_string(),
21            event_bus_name: None, // Use default event bus
22            source: Some("rohas".to_string()),
23        }
24    }
25}
26
27pub struct EventBridgeAdapter {
28    client: EventBridgeClient,
29    sqs_client: SqsClient,
30    #[allow(dead_code)]
31    config: EventBridgeConfig,
32    event_bus_name: String,
33    source: String,
34    published_topics: Arc<RwLock<HashMap<String, ()>>>,
35    queue_urls: Arc<RwLock<HashMap<String, String>>>, // topic -> queue_url
36    rule_names: Arc<RwLock<HashMap<String, String>>>, // topic -> rule_name
37}
38
39impl EventBridgeAdapter {
40
41    pub async fn new(config: EventBridgeConfig) -> Result<Self> {
42        let aws_config = aws_config::defaults(aws_config::BehaviorVersion::latest())
43            .region(aws_sdk_eventbridge::config::Region::new(config.region.clone()))
44            .load()
45            .await;
46
47        let client = EventBridgeClient::new(&aws_config);
48        let sqs_client = SqsClient::new(&aws_config);
49
50        let event_bus_name = config.event_bus_name.clone().unwrap_or_else(|| "default".to_string());
51        let source = config.source.clone().unwrap_or_else(|| "rohas".to_string());
52
53        info!(
54            "Initialized EventBridge adapter for region: {}, event_bus: {}, source: {}",
55            config.region, event_bus_name, source
56        );
57
58        Ok(Self {
59            client,
60            sqs_client,
61            config,
62            event_bus_name,
63            source,
64            published_topics: Arc::new(RwLock::new(HashMap::new())),
65            queue_urls: Arc::new(RwLock::new(HashMap::new())),
66            rule_names: Arc::new(RwLock::new(HashMap::new())),
67        })
68    }
69
70    pub async fn publish(
71        &self,
72        topic: impl Into<String>,
73        payload: serde_json::Value,
74    ) -> Result<()> {
75        let topic = topic.into();
76        let message = Message::new(topic.clone(), payload.clone());
77
78        {
79            let mut topics = self.published_topics.write().await;
80            topics.insert(topic.clone(), ());
81        }
82
83        let detail = serde_json::to_string(&message)
84            .map_err(|e| AdapterError::Serialization(e))?;
85
86        info!("Publishing EventBridge event - source: '{}', detail-type: '{}', detail length: {} bytes", 
87            self.source, topic, detail.len());
88        debug!("EventBridge event detail content: {}", detail);
89
90        let mut event_builder = aws_sdk_eventbridge::types::PutEventsRequestEntry::builder()
91            .source(&self.source)
92            .detail_type(&topic)
93            .detail(&detail);
94
95        if self.event_bus_name != "default" {
96            event_builder = event_builder.event_bus_name(&self.event_bus_name);
97        }
98
99        let event = event_builder.build();
100
101        let send_result = self
102            .client
103            .put_events()
104            .set_entries(Some(vec![event]))
105            .send()
106            .await;
107
108        match send_result {
109            Ok(response) => {
110                let entries = response.entries();
111                if !entries.is_empty() {
112                    if let Some(entry) = entries.first() {
113                        if let Some(error_code) = entry.error_code() {
114                            error!(
115                                "EventBridge publish failed for topic '{}': {} - {}",
116                                topic,
117                                error_code,
118                                entry.error_message().unwrap_or("Unknown error")
119                            );
120                            return Err(AdapterError::AwsEventBridge(format!(
121                                "Failed to publish event: {} - {}",
122                                error_code,
123                                entry.error_message().unwrap_or("Unknown error")
124                            )));
125                        }
126                    }
127                }
128                info!(
129                    "Published message to EventBridge topic: {} (event_bus: {}, source: {})",
130                    topic, self.event_bus_name, self.source
131                );
132                Ok(())
133            }
134            Err(e) => {
135                error!("Failed to send message to EventBridge '{}': {}", topic, e);
136                Err(AdapterError::AwsEventBridge(format!(
137                    "Failed to send event: {}",
138                    e
139                )))
140            }
141        }
142    }
143
144    async fn get_or_create_queue(&self, topic: &str) -> Result<String> {
145        {
146            let queue_urls = self.queue_urls.read().await;
147            if let Some(url) = queue_urls.get(topic) {
148                return Ok(url.clone());
149            }
150        }
151
152        let queue_name = format!("rohas-eb-{}", topic)
153            .chars()
154            .map(|c| {
155                if c.is_alphanumeric() || c == '-' || c == '_' {
156                    c
157                } else {
158                    '-'
159                }
160            })
161            .collect::<String>();
162
163        info!("Checking if SQS queue '{}' exists...", queue_name);
164        let get_queue_result = self
165            .sqs_client
166            .get_queue_url()
167            .queue_name(&queue_name)
168            .send()
169            .await;
170
171        let queue_url = match get_queue_result {
172            Ok(response) => {
173                if let Some(url) = response.queue_url() {
174                    info!("Found existing SQS queue for EventBridge topic '{}': {}", topic, url);
175                    url.to_string()
176                } else {
177                    error!("Queue URL not returned for '{}'", queue_name);
178                    return Err(AdapterError::AwsEventBridge(format!(
179                        "Queue URL not returned for '{}'",
180                        queue_name
181                    )));
182                }
183            }
184            Err(e) => {
185                warn!("SQS queue '{}' not found (error: {}), creating new queue...", queue_name, e);
186                info!("Creating SQS queue for EventBridge topic '{}': {}", topic, queue_name);
187                let create_result = self
188                    .sqs_client
189                    .create_queue()
190                    .queue_name(&queue_name)
191                    .send()
192                    .await
193                    .map_err(|e| {
194                        error!("Failed to create SQS queue '{}': {}", queue_name, e);
195                        AdapterError::AwsEventBridge(format!("Failed to create queue '{}': {}", queue_name, e))
196                    })?;
197
198                if let Some(url) = create_result.queue_url() {
199                    info!("Created SQS queue for EventBridge topic '{}': {}", topic, url);
200                    url.to_string()
201                } else {
202                    error!("Queue created but no URL returned for '{}'", queue_name);
203                    return Err(AdapterError::AwsEventBridge(format!(
204                        "Queue created but no URL returned for '{}'",
205                        queue_name
206                    )));
207                }
208            }
209        };
210
211        {
212            let mut queue_urls = self.queue_urls.write().await;
213            queue_urls.insert(topic.to_string(), queue_url.clone());
214        }
215
216        Ok(queue_url)
217    }
218
219    async fn get_or_create_rule(&self, topic: &str, queue_arn: &str) -> Result<String> {
220        {
221            let rule_names = self.rule_names.read().await;
222            if let Some(rule_name) = rule_names.get(topic) {
223                return Ok(rule_name.clone());
224            }
225        }
226
227        let rule_name = format!("rohas-rule-{}", topic)
228            .chars()
229            .map(|c| {
230                if c.is_alphanumeric() || c == '-' || c == '_' {
231                    c
232                } else {
233                    '-'
234                }
235            })
236            .collect::<String>();
237
238        let event_pattern = serde_json::json!({
239            "source": [self.source],
240            "detail-type": [topic]
241        });
242        
243        info!("EventBridge rule pattern for topic '{}': {}", topic, event_pattern.to_string());
244        info!("Expected event format: source='{}', detail-type='{}'", self.source, topic);
245
246        let get_rule_result = self
247            .client
248            .describe_rule()
249            .name(&rule_name)
250            .set_event_bus_name(if self.event_bus_name != "default" {
251                Some(self.event_bus_name.clone())
252            } else {
253                None
254            })
255            .send()
256            .await;
257
258        let target_id = format!("sqs-target-{}", topic);
259        let target = aws_sdk_eventbridge::types::Target::builder()
260            .id(&target_id)
261            .arn(queue_arn)
262            .build()
263            .map_err(|e| {
264                AdapterError::AwsEventBridge(format!("Failed to build target: {}", e))
265            })?;
266
267        match get_rule_result {
268            Ok(rule_desc) => {
269                info!("Found existing EventBridge rule for topic '{}': {}", topic, rule_name);
270                if let Some(state) = rule_desc.state() {
271                    match state {
272                        aws_sdk_eventbridge::types::RuleState::Enabled => {
273                            info!("EventBridge rule '{}' is ENABLED", rule_name);
274                        }
275                        aws_sdk_eventbridge::types::RuleState::Disabled => {
276                            warn!("EventBridge rule '{}' is DISABLED - enabling it now...", rule_name);
277                            match self
278                                .client
279                                .enable_rule()
280                                .name(&rule_name)
281                                .set_event_bus_name(if self.event_bus_name != "default" {
282                                    Some(self.event_bus_name.clone())
283                                } else {
284                                    None
285                                })
286                                .send()
287                                .await
288                            {
289                                Ok(_) => {
290                                    info!("EventBridge rule '{}' has been enabled", rule_name);
291                                }
292                                Err(e) => {
293                                    error!("Failed to enable EventBridge rule '{}': {}", rule_name, e);
294                                    return Err(AdapterError::AwsEventBridge(format!(
295                                        "Failed to enable rule '{}': {}",
296                                        rule_name, e
297                                    )));
298                                }
299                            }
300                        }
301                        _ => {
302                            warn!("EventBridge rule '{}' has unknown state: {:?}", rule_name, state);
303                        }
304                    }
305                }
306                info!("Ensuring SQS queue target is added to existing rule '{}'", rule_name);
307            }
308            Err(e) => {
309                warn!("EventBridge rule '{}' not found (error: {}), creating new rule...", rule_name, e);
310                info!("Creating EventBridge rule for topic '{}': {}", topic, rule_name);
311                
312                let put_rule_result = self
313                    .client
314                    .put_rule()
315                    .name(&rule_name)
316                    .event_pattern(event_pattern.to_string())
317                    .state(aws_sdk_eventbridge::types::RuleState::Enabled)
318                    .set_event_bus_name(if self.event_bus_name != "default" {
319                        Some(self.event_bus_name.clone())
320                    } else {
321                        None
322                    })
323                    .send()
324                    .await
325                    .map_err(|e| {
326                        error!("Failed to create EventBridge rule '{}': {}", rule_name, e);
327                        AdapterError::AwsEventBridge(format!("Failed to create rule '{}': {}", rule_name, e))
328                    })?;
329
330                info!("Created EventBridge rule '{}' (arn: {:?})", rule_name, put_rule_result.rule_arn());
331            }
332        }
333
334        info!("Adding SQS queue as target to EventBridge rule '{}'", rule_name);
335        info!("Target details: ID='{}', ARN='{}'", target_id, queue_arn);
336        let put_targets_result = self
337            .client
338            .put_targets()
339            .rule(&rule_name)
340            .set_targets(Some(vec![target]))
341            .set_event_bus_name(if self.event_bus_name != "default" {
342                Some(self.event_bus_name.clone())
343            } else {
344                None
345            })
346            .send()
347            .await
348            .map_err(|e| {
349                error!("Failed to add target to EventBridge rule '{}': {}", rule_name, e);
350                AdapterError::AwsEventBridge(format!("Failed to add target to rule '{}': {}", rule_name, e))
351            })?;
352
353        let failed_entries = put_targets_result.failed_entries();
354        if !failed_entries.is_empty() {
355            error!("Failed to add target to EventBridge rule '{}': {:?}", rule_name, failed_entries);
356            for entry in failed_entries {
357                error!("  - Error Code: {}, Error Message: {}", 
358                    entry.error_code().unwrap_or("unknown"),
359                    entry.error_message().unwrap_or("unknown"));
360            }
361            return Err(AdapterError::AwsEventBridge(format!(
362                "Failed to add target to rule '{}': {:?}",
363                rule_name, failed_entries
364            )));
365        }
366
367        info!("Successfully added SQS queue as target to EventBridge rule '{}'", rule_name);
368        info!("Target configuration: Queue ARN='{}', Target ID='{}'", queue_arn, target_id);
369
370        let list_targets_result = self
371            .client
372            .list_targets_by_rule()
373            .rule(&rule_name)
374            .set_event_bus_name(if self.event_bus_name != "default" {
375                Some(self.event_bus_name.clone())
376            } else {
377                None
378            })
379            .send()
380            .await;
381
382        if let Ok(targets_response) = list_targets_result {
383            let targets = targets_response.targets();
384            if targets.is_empty() {
385                error!("CRITICAL: EventBridge rule '{}' has NO TARGETS configured!", rule_name);
386                return Err(AdapterError::AwsEventBridge(format!(
387                    "Rule '{}' has no targets configured",
388                    rule_name
389                )));
390            } else {
391                info!("Verified: EventBridge rule '{}' has {} target(s) configured", rule_name, targets.len());
392                for target in targets {
393                    let target_arn = target.arn();
394                    if target_arn == queue_arn {
395                        info!("Target verified: SQS queue ARN '{}' is configured as target", queue_arn);
396                    } else {
397                        warn!("Found target with different ARN: {} (expected: {})", target_arn, queue_arn);
398                    }
399                }
400            }
401        } else {
402            warn!("Could not list targets for rule '{}'", rule_name);
403        }
404
405        let verify_result = self
406            .client
407            .describe_rule()
408            .name(&rule_name)
409            .set_event_bus_name(if self.event_bus_name != "default" {
410                Some(self.event_bus_name.clone())
411            } else {
412                None
413            })
414            .send()
415            .await;
416
417        if let Ok(rule_desc) = verify_result {
418            if let Some(state) = rule_desc.state() {
419                match state {
420                    aws_sdk_eventbridge::types::RuleState::Enabled => {
421                        info!("Verified: EventBridge rule '{}' is ENABLED and ready", rule_name);
422                    }
423                    aws_sdk_eventbridge::types::RuleState::Disabled => {
424                        error!("CRITICAL: EventBridge rule '{}' is still DISABLED after setup!", rule_name);
425                        return Err(AdapterError::AwsEventBridge(format!(
426                            "Rule '{}' is disabled and could not be enabled",
427                            rule_name
428                        )));
429                    }
430                    _ => {
431                        warn!("EventBridge rule '{}' has unknown state: {:?}", rule_name, state);
432                    }
433                }
434            }
435        } else {
436            warn!("Could not verify rule state after setup");
437        }
438
439        {
440            let mut rule_names = self.rule_names.write().await;
441            rule_names.insert(topic.to_string(), rule_name.clone());
442        }
443
444        Ok(rule_name)
445    }
446
447    async fn get_queue_arn(&self, queue_url: &str) -> Result<String> {
448        let queue_name = queue_url
449            .split('/')
450            .last()
451            .ok_or_else(|| {
452                error!("Invalid queue URL format: {}", queue_url);
453                AdapterError::AwsEventBridge(format!("Invalid queue URL: {}", queue_url))
454            })?;
455
456        info!("Retrieving ARN for SQS queue '{}'...", queue_name);
457        
458        let attributes_result = self
459            .sqs_client
460            .get_queue_attributes()
461            .queue_url(queue_url)
462            .attribute_names(aws_sdk_sqs::types::QueueAttributeName::QueueArn)
463            .send()
464            .await
465            .map_err(|e| {
466                error!("Failed to get queue attributes for '{}': {}", queue_name, e);
467                AdapterError::AwsEventBridge(format!("Failed to get queue attributes for '{}': {}", queue_name, e))
468            })?;
469
470        if let Some(attributes) = attributes_result.attributes() {
471            if let Some(arn) = attributes.get(&aws_sdk_sqs::types::QueueAttributeName::QueueArn) {
472                info!("Retrieved ARN for queue '{}': {}", queue_name, arn);
473                return Ok(arn.clone());
474            }
475        }
476
477        error!("Queue ARN not found in attributes for queue '{}'", queue_name);
478        Err(AdapterError::AwsEventBridge(format!(
479            "Queue ARN not found for queue '{}'",
480            queue_name
481        )))
482    }
483
484    pub async fn subscribe_fn<F, Fut>(&self, topic: impl Into<String>, handler: F) -> Result<()>
485    where
486        F: Fn(Message) -> Fut + Send + Sync + 'static,
487        Fut: std::future::Future<Output = Result<()>> + Send + 'static,
488    {
489        let topic = topic.into();
490        info!("=== Setting up EventBridge subscription for topic: {} ===", topic);
491
492        info!("Step 1: Creating/getting SQS queue for topic '{}'...", topic);
493        let queue_url = match self.get_or_create_queue(&topic).await {
494            Ok(url) => {
495                info!("SQS queue ready for EventBridge topic '{}': {}", topic, url);
496                url
497            }
498            Err(e) => {
499                error!("Failed to create/get SQS queue for topic '{}': {}", topic, e);
500                return Err(e);
501            }
502        };
503
504        info!("Step 2: Getting SQS queue ARN for topic '{}'...", topic);
505        let queue_arn = match self.get_queue_arn(&queue_url).await {
506            Ok(arn) => {
507                info!("SQS queue ARN for topic '{}': {}", topic, arn);
508                arn
509            }
510            Err(e) => {
511                error!("Failed to get SQS queue ARN for topic '{}': {}", topic, e);
512                return Err(e);
513            }
514        };
515
516        info!("Step 3: Creating/getting EventBridge rule for topic '{}'...", topic);
517        let rule_name = match self.get_or_create_rule(&topic, &queue_arn).await {
518            Ok(name) => {
519                info!("EventBridge rule ready for topic '{}': {}", topic, name);
520                name
521            }
522            Err(e) => {
523                error!("Failed to create/get EventBridge rule for topic '{}': {}", topic, e);
524                return Err(e);
525            }
526        };
527
528        let account_id = queue_arn
529            .split(':')
530            .nth(4)
531            .unwrap_or("*");
532        
533        // EventBridge rule ARN format:
534        // - Default bus: arn:aws:events:region:account-id:rule/rule-name
535        // - Custom bus: arn:aws:events:region:account-id:rule/event-bus-name/rule-name
536        let rule_arn = if self.event_bus_name == "default" {
537            format!(
538                "arn:aws:events:{}:{}:rule/{}",
539                self.config.region,
540                account_id,
541                rule_name
542            )
543        } else {
544            format!(
545                "arn:aws:events:{}:{}:rule/{}/{}",
546                self.config.region,
547                account_id,
548                self.event_bus_name,
549                rule_name
550            )
551        };
552        
553        info!("Step 4: Setting up SQS queue policy for EventBridge access (rule ARN: {})...", rule_arn);
554        
555        let policy = serde_json::json!({
556            "Version": "2012-10-17",
557            "Statement": [{
558                "Effect": "Allow",
559                "Principal": {
560                    "Service": "events.amazonaws.com"
561                },
562                "Action": "sqs:SendMessage",
563                "Resource": queue_arn,
564                "Condition": {
565                    "ArnEquals": {
566                        "aws:SourceArn": rule_arn
567                    }
568                }
569            }]
570        });
571
572        match self
573            .sqs_client
574            .set_queue_attributes()
575            .queue_url(&queue_url)
576            .attributes(
577                aws_sdk_sqs::types::QueueAttributeName::Policy,
578                policy.to_string(),
579            )
580            .send()
581            .await
582        {
583            Ok(_) => {
584                info!("Successfully set SQS queue policy for EventBridge access");
585            }
586            Err(e) => {
587                warn!("Failed to set SQS queue policy (this may be okay if policy already exists): {}", e);
588            }
589        }
590
591        info!("Step 5: Starting SQS queue polling for topic '{}'...", topic);
592        let sqs_client = self.sqs_client.clone();
593        let topic_clone = topic.clone();
594        let queue_url_clone = queue_url.clone();
595        let queue_arn_clone = queue_arn.clone();
596        let rule_name_clone = rule_name.clone();
597
598        struct ClosureHandler<F, Fut>
599        where
600            F: Fn(Message) -> Fut + Send + Sync,
601            Fut: std::future::Future<Output = Result<()>> + Send,
602        {
603            func: F,
604        }
605
606        #[async_trait]
607        impl<F, Fut> MessageHandler for ClosureHandler<F, Fut>
608        where
609            F: Fn(Message) -> Fut + Send + Sync,
610            Fut: std::future::Future<Output = Result<()>> + Send,
611        {
612            async fn handle(&self, message: Message) -> Result<()> {
613                (self.func)(message).await
614            }
615        }
616
617        let handler = Arc::new(ClosureHandler { func: handler });
618
619        tokio::spawn(async move {
620            info!("EventBridge subscription polling loop started for topic '{}' (queue: {})", topic_clone, queue_url);
621            let mut poll_count = 0u64;
622            loop {
623                poll_count += 1;
624                if poll_count % 10 == 0 {
625                    info!("EventBridge polling loop still active for topic '{}' (poll #{}), queue: {}", topic_clone, poll_count, queue_url);
626                }
627                if poll_count == 1 || poll_count % 5 == 0 {
628                    info!("Polling SQS queue for EventBridge topic '{}' (poll #{})...", topic_clone, poll_count);
629                } else {
630                    debug!("Polling SQS queue for EventBridge topic '{}' (poll #{})...", topic_clone, poll_count);
631                }
632                let receive_result = sqs_client
633                    .receive_message()
634                    .queue_url(&queue_url)
635                    .max_number_of_messages(10)
636                    .wait_time_seconds(20)
637                    .send()
638                    .await;
639
640                match receive_result {
641                    Ok(response) => {
642                        let messages = response.messages();
643                        if !messages.is_empty() {
644                            info!("Received {} message(s) from EventBridge queue for topic '{}'", messages.len(), topic_clone);
645                            for sqs_message in messages {
646                                if let Some(body) = sqs_message.body() {
647                                    info!("Raw SQS message body for topic '{}': {}", topic_clone, body);
648                                    
649                                    debug!("Attempting to parse EventBridge message for topic '{}'", topic_clone);
650                                    let message_result = {
651                                        debug!("Trying to parse as array of events...");
652                                        if let Ok(events_array) = serde_json::from_str::<Vec<serde_json::Value>>(body) {
653                                            debug!("Successfully parsed as array with {} event(s)", events_array.len());
654                                            if let Some(event) = events_array.first() {
655                                                debug!("First event structure: {:?}", event);
656                                                if let Some(detail_str) = event.get("detail").and_then(|d| d.as_str()) {
657                                                    debug!("Found 'detail' field as string (length: {}): {}", detail_str.len(), detail_str);
658                                                    match serde_json::from_str::<Message>(detail_str) {
659                                                        Ok(msg) => {
660                                                            debug!("Successfully parsed Message from detail string");
661                                                            Some(msg)
662                                                        }
663                                                        Err(e) => {
664                                                            debug!("Failed to parse Message from detail string: {}", e);
665                                                            None
666                                                        }
667                                                    }
668                                                } else if let Some(detail_obj) = event.get("detail") {
669                                                    debug!("Found 'detail' field as object: {:?}", detail_obj);
670                                                    match serde_json::from_value::<Message>(detail_obj.clone()) {
671                                                        Ok(msg) => {
672                                                            debug!("Successfully parsed Message from detail object");
673                                                            Some(msg)
674                                                        }
675                                                        Err(e) => {
676                                                            debug!("Failed to parse Message from detail object: {}", e);
677                                                            None
678                                                        }
679                                                    }
680                                                } else {
681                                                    debug!("No 'detail' field found in event object");
682                                                    None
683                                                }
684                                            } else {
685                                                debug!("Array is empty");
686                                                None
687                                            }
688                                        } else {
689                                            debug!("Not an array, trying as single event object...");
690                                            None
691                                        }
692                                    }.or_else(|| {
693                                        debug!("Trying to parse as single event object...");
694                                        if let Ok(event_obj) = serde_json::from_str::<serde_json::Value>(body) {
695                                            debug!("Successfully parsed as event object");
696                                            if let Some(detail_str) = event_obj.get("detail").and_then(|d| d.as_str()) {
697                                                debug!("Found 'detail' field as string (length: {}): {}", detail_str.len(), detail_str);
698                                                match serde_json::from_str::<Message>(detail_str) {
699                                                    Ok(msg) => {
700                                                        debug!("Successfully parsed Message from detail string");
701                                                        Some(msg)
702                                                    }
703                                                    Err(e) => {
704                                                        debug!("Failed to parse Message from detail string: {}", e);
705                                                        None
706                                                    }
707                                                }
708                                            } else if let Some(detail_obj) = event_obj.get("detail") {
709                                                debug!("Found 'detail' field as object: {:?}", detail_obj);
710                                                match serde_json::from_value::<Message>(detail_obj.clone()) {
711                                                    Ok(msg) => {
712                                                        debug!("Successfully parsed Message from detail object");
713                                                        Some(msg)
714                                                    }
715                                                    Err(e) => {
716                                                        debug!("Failed to parse Message from detail object: {}", e);
717                                                        None
718                                                    }
719                                                }
720                                            } else {
721                                                debug!("No 'detail' field found in event object");
722                                                None
723                                            }
724                                        } else {
725                                            debug!("Not a valid JSON object, trying direct Message parse...");
726                                            None
727                                        }
728                                    }).or_else(|| {
729                                        debug!("Trying to parse body directly as Message...");
730                                        match serde_json::from_str::<Message>(body) {
731                                            Ok(msg) => {
732                                                debug!("Successfully parsed body directly as Message");
733                                                Some(msg)
734                                            }
735                                            Err(e) => {
736                                                debug!("Failed to parse body directly as Message: {}", e);
737                                                None
738                                            }
739                                        }
740                                    });
741                                    
742                                    let message_result = message_result.ok_or_else(|| {
743                                        let last_error = serde_json::from_str::<Message>(body)
744                                            .map_err(|e| e)
745                                            .unwrap_err();
746                                        debug!("All parsing attempts failed. Last error: {}", last_error);
747                                        last_error
748                                    });
749
750                                    match message_result {
751                                        Ok(message) => {
752                                            info!("Successfully parsed EventBridge message for topic '{}'", topic_clone);
753                                            info!("Message topic: {}, payload: {:?}", message.topic, message.payload);
754                                            info!("Calling handler for EventBridge message...");
755                                            if let Err(e) = handler.handle(message).await {
756                                                error!("Handler error for EventBridge topic '{}': {}", topic_clone, e);
757                                            } else {
758                                                info!("Handler completed successfully for EventBridge topic '{}'", topic_clone);
759                                            }
760
761                                            if let Some(receipt_handle) = sqs_message.receipt_handle() {
762                                                if let Err(e) = sqs_client
763                                                    .delete_message()
764                                                    .queue_url(&queue_url)
765                                                    .receipt_handle(receipt_handle)
766                                                    .send()
767                                                    .await
768                                                {
769                                                    warn!(
770                                                        "Failed to delete message from EventBridge queue '{}': {}",
771                                                        queue_url, e
772                                                    );
773                                                }
774                                            }
775                                        }
776                                        Err(e) => {
777                                            error!(
778                                                "Failed to deserialize EventBridge message for topic '{}': {}. Body: {}",
779                                                topic_clone, e, body
780                                            );
781                                            if let Some(receipt_handle) = sqs_message.receipt_handle() {
782                                                let _ = sqs_client
783                                                    .delete_message()
784                                                    .queue_url(&queue_url)
785                                                    .receipt_handle(receipt_handle)
786                                                    .send()
787                                                    .await;
788                                            }
789                                        }
790                                    }
791                                }
792                            }
793                        } else {
794                            debug!("No messages received from EventBridge queue for topic '{}' (this is normal, continuing to poll...)", topic_clone);
795                        }
796                    }
797                    Err(e) => {
798                        error!(
799                            "Error receiving messages from EventBridge queue '{}' for topic '{}': {}. Retrying in 5 seconds...",
800                            queue_url, topic_clone, e
801                        );
802                        tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
803                    }
804                }
805            }
806        });
807
808        info!("=== EventBridge subscription set up successfully for topic: {} ===", topic);
809        info!("Summary:");
810        info!("  - SQS Queue URL: {}", queue_url_clone);
811        info!("  - SQS Queue ARN: {}", queue_arn_clone);
812        info!("  - EventBridge Rule: {}", rule_name_clone);
813        info!("  - Event Pattern: source='{}', detail-type='{}'", self.source, topic);
814        info!("  - Event Bus: {}", self.event_bus_name);
815        info!("  - Target: SQS queue '{}' (ARN: {})", queue_url_clone, queue_arn_clone);
816        info!("  - Polling: Active (long polling enabled, 20s wait time)");
817        info!("  - Next: Events matching the pattern will be routed to the SQS queue");
818        let rule_arn_final = if self.event_bus_name == "default" {
819            format!(
820                "arn:aws:events:{}:{}:rule/{}",
821                self.config.region,
822                queue_arn_clone.split(':').nth(4).unwrap_or("unknown"),
823                rule_name_clone
824            )
825        } else {
826            format!(
827                "arn:aws:events:{}:{}:rule/{}/{}",
828                self.config.region,
829                queue_arn_clone.split(':').nth(4).unwrap_or("unknown"),
830                self.event_bus_name,
831                rule_name_clone
832            )
833        };
834        info!("  - Rule ARN: {}", rule_arn_final);
835        info!("  - To verify: Check AWS EventBridge console for rule '{}' and ensure it has the SQS queue as a target", rule_name_clone);
836        Ok(())
837    }
838
839    pub async fn list_topics(&self) -> Vec<String> {
840        let topics = self.published_topics.read().await;
841        topics.keys().cloned().collect()
842    }
843}
844