1use anyhow::Result;
13use aws_sdk_cloudwatchlogs::Client as CwlClient;
14use chrono::Utc;
15use std::collections::HashMap;
16use std::time::Duration;
17use tokio::pin;
18use tokio::sync::mpsc;
19use tokio::time::interval;
20use tokio::time::sleep;
21
22use crate::processing::{process_log_event_message, TelemetryData};
23
24pub fn start_polling_task(
26 cwl_client: CwlClient,
27 arns: Vec<String>,
28 interval_millis: u64,
29 sender: mpsc::Sender<Result<TelemetryData>>,
30 backtrace_ms: Option<u64>,
31 session_timeout_millis: u64,
32) {
33 tokio::spawn(async move {
34 let mut last_timestamps: HashMap<String, i64> = HashMap::new();
35 let poll_duration = Duration::from_millis(interval_millis);
36 let mut ticker = interval(poll_duration);
37
38 let session_duration = Duration::from_millis(session_timeout_millis);
39 let session_timer = sleep(session_duration);
40 pin!(session_timer);
41 tracing::debug!(
42 timeout_ms = session_timeout_millis,
43 "Polling Adapter: Session timeout configured."
44 );
45
46 let mut initial_start_time_ms = Utc::now().timestamp_millis();
47 if let Some(backtrace_ms_val) = backtrace_ms {
48 initial_start_time_ms -= backtrace_ms_val as i64;
49 tracing::info!(
50 backtrace_duration_ms = backtrace_ms_val,
51 calculated_start_time_ms = initial_start_time_ms,
52 "Polling will start from current time minus backtrace duration."
53 );
54 } else {
55 tracing::debug!(
56 start_time = initial_start_time_ms,
57 "Polling will start from current time."
58 );
59 }
60
61 tracing::debug!(
62 interval_ms = interval_millis,
63 num_groups = arns.len(),
64 "Polling Adapter: Starting polling loop."
65 );
66
67 loop {
68 tokio::select! {
69 _ = ticker.tick() => {
70 tracing::trace!("Polling Adapter: Tick");
71
72 for arn in &arns {
73 let start_time = *last_timestamps.get(arn).unwrap_or(&initial_start_time_ms);
74 let arn_clone = arn.clone();
75 let client_clone = cwl_client.clone();
76 let sender_clone = sender.clone();
77
78 tracing::debug!(log_group_arn = %arn_clone, %start_time, "Polling Adapter: Fetching events for group.");
79
80 match filter_log_events_for_group(
81 &client_clone,
82 arn_clone.clone(),
83 start_time,
84 sender_clone.clone(),
85 )
86 .await
87 {
88 Ok(Some(new_timestamp)) => {
89 tracing::trace!(log_group_arn=%arn_clone, %new_timestamp, "Polling Adapter: Updating timestamp.");
90 last_timestamps.insert(arn_clone, new_timestamp);
91 }
92 Ok(None) => {
93 tracing::trace!(log_group_arn=%arn_clone, "Polling Adapter: No new events found.");
94 }
95 Err(e) => {
96 tracing::error!(log_group_arn = %arn_clone, error = %e, "Polling Adapter: Error polling log group.");
97 }
98 }
99 }
100 }
101 _ = &mut session_timer => {
102 tracing::info!(timeout_ms = session_timeout_millis, "Polling Adapter: Session timeout reached. Stopping polling task.");
103 break;
104 }
105 }
106 }
107 tracing::debug!("Polling Adapter: Task finished.");
108 });
109}
110
111async fn filter_log_events_for_group(
115 client: &CwlClient,
116 log_group_identifier: String,
117 start_time_ms: i64,
118 sender: mpsc::Sender<Result<TelemetryData>>,
119) -> Result<Option<i64>> {
120 let mut next_token: Option<String> = None;
121 let mut latest_event_timestamp = start_time_ms;
122 let mut events_found = false;
123
124 loop {
125 let mut request_builder = client
126 .filter_log_events()
127 .log_group_identifier(log_group_identifier.clone())
128 .start_time(start_time_ms + 1);
129
130 if let Some(token) = next_token {
131 request_builder = request_builder.next_token(token);
132 }
133
134 match request_builder.send().await {
135 Ok(output) => {
136 if let Some(events) = output.events {
137 if !events.is_empty() {
138 events_found = true;
139 }
140 for event in events {
141 if let Some(timestamp) = event.timestamp {
142 latest_event_timestamp = latest_event_timestamp.max(timestamp);
143 }
144
145 if let Some(msg) = event.message {
146 match process_log_event_message(&msg) {
147 Ok(Some(telemetry)) => {
149 if sender.send(Ok(telemetry)).await.is_err() {
150 tracing::warn!("Polling Adapter: MPSC channel closed by receiver while sending data.");
151 return Err(anyhow::anyhow!("MPSC receiver closed"));
152 }
153 }
154 Ok(None) => {}
155 Err(e) => {
156 tracing::warn!(message = ?msg, error = %e, "Polling Adapter: Failed to process polled log event");
157 }
158 }
159 }
160 }
161 }
162
163 if let Some(token) = output.next_token {
164 next_token = Some(token);
165 tracing::trace!(log_group=%log_group_identifier, "Polling Adapter: Got next token, continuing pagination.");
166 } else {
167 break;
168 }
169 }
170 Err(e) => {
171 let context_msg = format!(
172 "Polling Adapter: Failed to filter log events for {}",
173 log_group_identifier
174 );
175 tracing::error!(error = %e, %context_msg);
176 let _ = sender
177 .send(Err(anyhow::Error::new(e).context(context_msg)))
178 .await;
179 return Err(anyhow::anyhow!(
180 "Failed to filter log events for group {}",
181 log_group_identifier
182 ));
183 }
184 }
185 }
186
187 if events_found {
188 Ok(Some(latest_event_timestamp))
189 } else {
190 Ok(None)
191 }
192}