livetrace 0.2.3

CLI tool for live tailing of OTLP traces and logs in the Serverless OTLP Forwarder architecture.
Documentation
//! Provides functionality for polling AWS CloudWatch Logs using the `FilterLogEvents` API.
//!
//! This module is responsible for:
//! - Spawning an asynchronous task that periodically polls a set of log group ARNs.
//! - Managing timestamps for each log group to fetch only new events since the last poll.
//! - Handling pagination for `FilterLogEvents` responses.
//! - Processing log event messages from the polled data using functions from the
//!   `processing` module.
//! - Sending the resulting `TelemetryData` (or errors) over an MPSC channel to the main
//!   application logic.

use anyhow::Result;
use aws_sdk_cloudwatchlogs::Client as CwlClient;
use chrono::Utc;
use std::collections::HashMap;
use std::time::Duration;
use tokio::pin;
use tokio::sync::mpsc;
use tokio::time::interval;
use tokio::time::sleep;

use crate::processing::{process_log_event_message, TelemetryData};

/// Spawns a task that polls FilterLogEvents for multiple log groups and sends results over a channel.
pub fn start_polling_task(
    cwl_client: CwlClient,
    arns: Vec<String>,
    interval_millis: u64,
    sender: mpsc::Sender<Result<TelemetryData>>,
    backtrace_ms: Option<u64>,
    session_timeout_millis: u64,
) {
    tokio::spawn(async move {
        let mut last_timestamps: HashMap<String, i64> = HashMap::new();
        let poll_duration = Duration::from_millis(interval_millis);
        let mut ticker = interval(poll_duration);

        let session_duration = Duration::from_millis(session_timeout_millis);
        let session_timer = sleep(session_duration);
        pin!(session_timer);
        tracing::debug!(
            timeout_ms = session_timeout_millis,
            "Polling Adapter: Session timeout configured."
        );

        let mut initial_start_time_ms = Utc::now().timestamp_millis();
        if let Some(backtrace_ms_val) = backtrace_ms {
            initial_start_time_ms -= backtrace_ms_val as i64;
            tracing::info!(
                backtrace_duration_ms = backtrace_ms_val,
                calculated_start_time_ms = initial_start_time_ms,
                "Polling will start from current time minus backtrace duration."
            );
        } else {
            tracing::debug!(
                start_time = initial_start_time_ms,
                "Polling will start from current time."
            );
        }

        tracing::debug!(
            interval_ms = interval_millis,
            num_groups = arns.len(),
            "Polling Adapter: Starting polling loop."
        );

        loop {
            tokio::select! {
                _ = ticker.tick() => {
                    tracing::trace!("Polling Adapter: Tick");

                    for arn in &arns {
                        let start_time = *last_timestamps.get(arn).unwrap_or(&initial_start_time_ms);
                        let arn_clone = arn.clone();
                        let client_clone = cwl_client.clone();
                        let sender_clone = sender.clone();

                        tracing::debug!(log_group_arn = %arn_clone, %start_time, "Polling Adapter: Fetching events for group.");

                        match filter_log_events_for_group(
                            &client_clone,
                            arn_clone.clone(),
                            start_time,
                            sender_clone.clone(),
                        )
                        .await
                        {
                            Ok(Some(new_timestamp)) => {
                                tracing::trace!(log_group_arn=%arn_clone, %new_timestamp, "Polling Adapter: Updating timestamp.");
                                last_timestamps.insert(arn_clone, new_timestamp);
                            }
                            Ok(None) => {
                                tracing::trace!(log_group_arn=%arn_clone, "Polling Adapter: No new events found.");
                            }
                            Err(e) => {
                                tracing::error!(log_group_arn = %arn_clone, error = %e, "Polling Adapter: Error polling log group.");
                            }
                        }
                    }
                }
                _ = &mut session_timer => {
                    tracing::info!(timeout_ms = session_timeout_millis, "Polling Adapter: Session timeout reached. Stopping polling task.");
                    break;
                }
            }
        }
        tracing::debug!("Polling Adapter: Task finished.");
    });
}

/// Fetches and processes events for a single log group using FilterLogEvents.
/// Handles pagination and sends TelemetryData or errors over the channel.
/// Returns Ok(Some(timestamp)) of the last processed event if successful, Ok(None) if no events, Err on failure.
async fn filter_log_events_for_group(
    client: &CwlClient,
    log_group_identifier: String,
    start_time_ms: i64,
    sender: mpsc::Sender<Result<TelemetryData>>,
) -> Result<Option<i64>> {
    let mut next_token: Option<String> = None;
    let mut latest_event_timestamp = start_time_ms;
    let mut events_found = false;

    loop {
        let mut request_builder = client
            .filter_log_events()
            .log_group_identifier(log_group_identifier.clone())
            .start_time(start_time_ms + 1);

        if let Some(token) = next_token {
            request_builder = request_builder.next_token(token);
        }

        match request_builder.send().await {
            Ok(output) => {
                if let Some(events) = output.events {
                    if !events.is_empty() {
                        events_found = true;
                    }
                    for event in events {
                        if let Some(timestamp) = event.timestamp {
                            latest_event_timestamp = latest_event_timestamp.max(timestamp);
                        }

                        if let Some(msg) = event.message {
                            match process_log_event_message(&msg) {
                                // Process the log event message and send telemetry data if available
                                Ok(Some(telemetry)) => {
                                    if sender.send(Ok(telemetry)).await.is_err() {
                                        tracing::warn!("Polling Adapter: MPSC channel closed by receiver while sending data.");
                                        return Err(anyhow::anyhow!("MPSC receiver closed"));
                                    }
                                }
                                Ok(None) => {}
                                Err(e) => {
                                    tracing::warn!(message = ?msg, error = %e, "Polling Adapter: Failed to process polled log event");
                                }
                            }
                        }
                    }
                }

                if let Some(token) = output.next_token {
                    next_token = Some(token);
                    tracing::trace!(log_group=%log_group_identifier, "Polling Adapter: Got next token, continuing pagination.");
                } else {
                    break;
                }
            }
            Err(e) => {
                let context_msg = format!(
                    "Polling Adapter: Failed to filter log events for {}",
                    log_group_identifier
                );
                tracing::error!(error = %e, %context_msg);
                let _ = sender
                    .send(Err(anyhow::Error::new(e).context(context_msg)))
                    .await;
                return Err(anyhow::anyhow!(
                    "Failed to filter log events for group {}",
                    log_group_identifier
                ));
            }
        }
    }

    if events_found {
        Ok(Some(latest_event_timestamp))
    } else {
        Ok(None)
    }
}