Skip to main content

livetrace/
poller.rs

1//! Provides functionality for polling AWS CloudWatch Logs using the `FilterLogEvents` API.
2//!
3//! This module is responsible for:
4//! - Spawning an asynchronous task that periodically polls a set of log group ARNs.
5//! - Managing timestamps for each log group to fetch only new events since the last poll.
6//! - Handling pagination for `FilterLogEvents` responses.
7//! - Processing log event messages from the polled data using functions from the
8//!   `processing` module.
9//! - Sending the resulting `TelemetryData` (or errors) over an MPSC channel to the main
10//!   application logic.
11
12use anyhow::Result;
13use aws_sdk_cloudwatchlogs::Client as CwlClient;
14use chrono::Utc;
15use std::collections::HashMap;
16use std::time::Duration;
17use tokio::pin;
18use tokio::sync::mpsc;
19use tokio::time::interval;
20use tokio::time::sleep;
21
22use crate::processing::{process_log_event_message, TelemetryData};
23
24/// Spawns a task that polls FilterLogEvents for multiple log groups and sends results over a channel.
25pub fn start_polling_task(
26    cwl_client: CwlClient,
27    arns: Vec<String>,
28    interval_millis: u64,
29    sender: mpsc::Sender<Result<TelemetryData>>,
30    backtrace_ms: Option<u64>,
31    session_timeout_millis: u64,
32) {
33    tokio::spawn(async move {
34        let mut last_timestamps: HashMap<String, i64> = HashMap::new();
35        let poll_duration = Duration::from_millis(interval_millis);
36        let mut ticker = interval(poll_duration);
37
38        let session_duration = Duration::from_millis(session_timeout_millis);
39        let session_timer = sleep(session_duration);
40        pin!(session_timer);
41        tracing::debug!(
42            timeout_ms = session_timeout_millis,
43            "Polling Adapter: Session timeout configured."
44        );
45
46        let mut initial_start_time_ms = Utc::now().timestamp_millis();
47        if let Some(backtrace_ms_val) = backtrace_ms {
48            initial_start_time_ms -= backtrace_ms_val as i64;
49            tracing::info!(
50                backtrace_duration_ms = backtrace_ms_val,
51                calculated_start_time_ms = initial_start_time_ms,
52                "Polling will start from current time minus backtrace duration."
53            );
54        } else {
55            tracing::debug!(
56                start_time = initial_start_time_ms,
57                "Polling will start from current time."
58            );
59        }
60
61        tracing::debug!(
62            interval_ms = interval_millis,
63            num_groups = arns.len(),
64            "Polling Adapter: Starting polling loop."
65        );
66
67        loop {
68            tokio::select! {
69                _ = ticker.tick() => {
70                    tracing::trace!("Polling Adapter: Tick");
71
72                    for arn in &arns {
73                        let start_time = *last_timestamps.get(arn).unwrap_or(&initial_start_time_ms);
74                        let arn_clone = arn.clone();
75                        let client_clone = cwl_client.clone();
76                        let sender_clone = sender.clone();
77
78                        tracing::debug!(log_group_arn = %arn_clone, %start_time, "Polling Adapter: Fetching events for group.");
79
80                        match filter_log_events_for_group(
81                            &client_clone,
82                            arn_clone.clone(),
83                            start_time,
84                            sender_clone.clone(),
85                        )
86                        .await
87                        {
88                            Ok(Some(new_timestamp)) => {
89                                tracing::trace!(log_group_arn=%arn_clone, %new_timestamp, "Polling Adapter: Updating timestamp.");
90                                last_timestamps.insert(arn_clone, new_timestamp);
91                            }
92                            Ok(None) => {
93                                tracing::trace!(log_group_arn=%arn_clone, "Polling Adapter: No new events found.");
94                            }
95                            Err(e) => {
96                                tracing::error!(log_group_arn = %arn_clone, error = %e, "Polling Adapter: Error polling log group.");
97                            }
98                        }
99                    }
100                }
101                _ = &mut session_timer => {
102                    tracing::info!(timeout_ms = session_timeout_millis, "Polling Adapter: Session timeout reached. Stopping polling task.");
103                    break;
104                }
105            }
106        }
107        tracing::debug!("Polling Adapter: Task finished.");
108    });
109}
110
111/// Fetches and processes events for a single log group using FilterLogEvents.
112/// Handles pagination and sends TelemetryData or errors over the channel.
113/// Returns Ok(Some(timestamp)) of the last processed event if successful, Ok(None) if no events, Err on failure.
114async fn filter_log_events_for_group(
115    client: &CwlClient,
116    log_group_identifier: String,
117    start_time_ms: i64,
118    sender: mpsc::Sender<Result<TelemetryData>>,
119) -> Result<Option<i64>> {
120    let mut next_token: Option<String> = None;
121    let mut latest_event_timestamp = start_time_ms;
122    let mut events_found = false;
123
124    loop {
125        let mut request_builder = client
126            .filter_log_events()
127            .log_group_identifier(log_group_identifier.clone())
128            .start_time(start_time_ms + 1);
129
130        if let Some(token) = next_token {
131            request_builder = request_builder.next_token(token);
132        }
133
134        match request_builder.send().await {
135            Ok(output) => {
136                if let Some(events) = output.events {
137                    if !events.is_empty() {
138                        events_found = true;
139                    }
140                    for event in events {
141                        if let Some(timestamp) = event.timestamp {
142                            latest_event_timestamp = latest_event_timestamp.max(timestamp);
143                        }
144
145                        if let Some(msg) = event.message {
146                            match process_log_event_message(&msg) {
147                                // Process the log event message and send telemetry data if available
148                                Ok(Some(telemetry)) => {
149                                    if sender.send(Ok(telemetry)).await.is_err() {
150                                        tracing::warn!("Polling Adapter: MPSC channel closed by receiver while sending data.");
151                                        return Err(anyhow::anyhow!("MPSC receiver closed"));
152                                    }
153                                }
154                                Ok(None) => {}
155                                Err(e) => {
156                                    tracing::warn!(message = ?msg, error = %e, "Polling Adapter: Failed to process polled log event");
157                                }
158                            }
159                        }
160                    }
161                }
162
163                if let Some(token) = output.next_token {
164                    next_token = Some(token);
165                    tracing::trace!(log_group=%log_group_identifier, "Polling Adapter: Got next token, continuing pagination.");
166                } else {
167                    break;
168                }
169            }
170            Err(e) => {
171                let context_msg = format!(
172                    "Polling Adapter: Failed to filter log events for {}",
173                    log_group_identifier
174                );
175                tracing::error!(error = %e, %context_msg);
176                let _ = sender
177                    .send(Err(anyhow::Error::new(e).context(context_msg)))
178                    .await;
179                return Err(anyhow::anyhow!(
180                    "Failed to filter log events for group {}",
181                    log_group_identifier
182                ));
183            }
184        }
185    }
186
187    if events_found {
188        Ok(Some(latest_event_timestamp))
189    } else {
190        Ok(None)
191    }
192}