livetrace 0.2.3

CLI tool for live tailing of OTLP traces and logs in the Serverless OTLP Forwarder architecture.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
//! Handles AWS SDK setup, client creation, and discovery of CloudWatch Log Groups.
//!
//! This module is responsible for:
//! 1. Initializing AWS configuration (region, credentials).
//! 2. Creating AWS service clients (CloudWatch Logs, CloudFormation, STS).
//! 3. Discovering relevant log group names based on user-provided patterns or
//!    CloudFormation stack names.
//! 4. Validating the existence of these log groups, including handling common
//!    Lambda@Edge naming conventions.
//! 5. Constructing ARNs for the validated log groups.

use anyhow::{Context, Result};
use aws_config::meta::region::RegionProviderChain;
use aws_sdk_cloudformation::Client as CfnClient;
use aws_sdk_cloudwatchlogs::Client as CwlClient;
use aws_sdk_sts::Client as StsClient;

// AWS Setup Public Function

pub struct AwsSetupResult {
    pub cwl_client: CwlClient,
    pub account_id: String,
    pub region_str: String,
    #[allow(dead_code)]
    pub partition: String,
    pub resolved_arns: Vec<String>,
}

pub async fn setup_aws_resources(
    log_group_pattern: &Option<Vec<String>>,
    stack_name: &Option<String>,
    aws_region: &Option<String>,
    aws_profile: &Option<String>,
) -> Result<AwsSetupResult> {
    // 1. Load AWS Config
    let region_provider =
        RegionProviderChain::first_try(aws_region.clone().map(aws_config::Region::new))
            .or_default_provider()
            .or_else(aws_config::Region::new("us-east-1")); // Default fallback region

    let mut config_loader =
        aws_config::defaults(aws_config::BehaviorVersion::latest()).region(region_provider);

    if let Some(profile) = aws_profile.clone() {
        config_loader = config_loader.profile_name(profile);
    }

    let aws_config = config_loader.load().await;
    tracing::debug!(
        "Logged in AWS config with region: {:?}",
        aws_config.region()
    );

    // 2. Create AWS Clients
    let cwl_client = CwlClient::new(&aws_config);
    tracing::debug!("CloudWatch Logs client created.");
    let cfn_client = CfnClient::new(&aws_config);
    tracing::debug!("CloudFormation client created.");
    let sts_client = StsClient::new(&aws_config);
    tracing::debug!("STS client created.");

    // Get Account ID and Region for ARN construction
    let region_str = aws_config
        .region()
        .ok_or_else(|| anyhow::anyhow!("Could not determine AWS region from config"))?
        .to_string();
    let caller_identity = sts_client
        .get_caller_identity()
        .send()
        .await
        .context("Failed to get caller identity from STS")?;
    let account_id = caller_identity
        .account()
        .ok_or_else(|| {
            anyhow::anyhow!("Could not determine AWS Account ID from STS caller identity")
        })?
        .to_string();
    let partition = "aws"; // Assuming standard AWS partition
    tracing::debug!(region = %region_str, account_id = %account_id, partition = %partition, "Determined region, account ID, and partition");

    // 5. Discover Log Groups based on pattern or stack name
    let resolved_log_group_names =
        discover_log_group_names(&cfn_client, &cwl_client, log_group_pattern, stack_name).await?;

    // Add validation step
    tracing::debug!("Validating discovered log group names...");
    let validated_log_group_names =
        validate_log_groups(&cwl_client, resolved_log_group_names, &region_str).await?;
    tracing::debug!(
        "Validation complete. Valid names: {:?}",
        validated_log_group_names
    );

    // Validate count of *validated* names
    let group_count = validated_log_group_names.len(); // Use validated count
    if group_count == 0 {
        let error_msg = if stack_name.is_some() {
            format!("Stack '{}' contained 0 discoverable and valid LogGroup resources (checked Lambda@Edge variants).", stack_name.as_deref().unwrap_or("N/A"))
        } else {
            format!(
                "Log Groups Patterns {:?} matched 0 valid log groups (checked Lambda@Edge variants).",
                log_group_pattern.as_ref().map_or(vec!["N/A".to_string()], |v| v.to_vec())
            )
        };
        return Err(anyhow::anyhow!(error_msg));
    } else if group_count > 10 {
        let (method, value) = if let Some(stack) = stack_name.as_deref() {
            ("Stack", stack.to_string())
        } else {
            (
                "Log Groups Patterns",
                format!(
                    "{:?}",
                    log_group_pattern
                        .as_ref()
                        .map_or(vec!["N/A".to_string()], |v| v.to_vec())
                ),
            )
        };
        let error_msg = format!(
            "{} {} resulted in {} valid log groups (max 10 allowed for live tail). Found: {:?}",
            method, value, group_count, validated_log_group_names
        );
        return Err(anyhow::anyhow!(error_msg));
    } else {
        tracing::debug!(
            "Proceeding with {} validated log group name(s): {:?}",
            group_count,
            validated_log_group_names
        );
    }

    // Construct ARNs from *validated* names
    let resolved_log_group_arns: Vec<String> = validated_log_group_names
        .iter()
        .map(|name| {
            format!(
                "arn:{}:logs:{}:{}:log-group:{}",
                partition, region_str, account_id, name
            )
        })
        .collect();
    tracing::debug!("Constructed ARNs: {:?}", resolved_log_group_arns);

    Ok(AwsSetupResult {
        cwl_client, // Return the CWL client for starting the tail
        account_id,
        region_str,
        partition: partition.to_string(), // Convert &str to String
        resolved_arns: resolved_log_group_arns,
    })
}

// Private Helper Functions

/// Discovers log group names based on stack or pattern arguments.
async fn discover_log_group_names(
    cfn_client: &CfnClient,
    cwl_client: &CwlClient,
    log_group_pattern: &Option<Vec<String>>,
    stack_name: &Option<String>,
) -> Result<Vec<String>> {
    // Create a HashSet to collect all log groups and avoid duplicates
    let mut all_log_groups = std::collections::HashSet::new();

    // Process stack name if provided
    if let Some(stack) = stack_name.as_deref() {
        let stack_groups = discover_log_groups_from_stack(cfn_client, stack).await?;
        for group in stack_groups {
            all_log_groups.insert(group);
        }
    }

    // Process log group patterns if provided
    if let Some(patterns) = log_group_pattern {
        if !patterns.is_empty() {
            let pattern_groups = discover_log_groups_by_patterns(cwl_client, patterns).await?;
            for group in pattern_groups {
                all_log_groups.insert(group);
            }
        }
    }

    // Return error if neither was provided or both were empty
    if all_log_groups.is_empty() {
        if stack_name.is_none() && log_group_pattern.is_none() {
            return Err(anyhow::anyhow!(
                "Internal error: No log group pattern or stack name provided."
            ));
        } else {
            return Err(anyhow::anyhow!(
                "No log groups found with the provided pattern(s) and/or stack name."
            ));
        }
    }

    // Convert to Vec and return
    Ok(all_log_groups.into_iter().collect())
}

/// Discovers log groups matching multiple patterns.
async fn discover_log_groups_by_patterns(
    cwl_client: &CwlClient,
    patterns: &[String],
) -> Result<Vec<String>> {
    tracing::debug!("Discovering log groups matching patterns: {:?}", patterns);

    // Use a HashSet to avoid duplicates when multiple patterns match the same log group
    let mut discovered_groups = std::collections::HashSet::new();

    // Process each pattern in sequence
    for pattern in patterns {
        // Call the existing function that handles a single pattern
        let groups = discover_log_groups_by_pattern(cwl_client, pattern).await?;
        // Add results to our set
        for group in groups {
            discovered_groups.insert(group);
        }
    }

    // Convert back to Vec for the return value
    Ok(discovered_groups.into_iter().collect())
}

/// Discovers log groups matching a single pattern.
async fn discover_log_groups_by_pattern(
    cwl_client: &CwlClient,
    pattern: &str,
) -> Result<Vec<String>> {
    tracing::debug!("Discovering log groups matching pattern: '{}'", pattern);
    let describe_output = cwl_client
        .describe_log_groups()
        .log_group_name_pattern(pattern)
        .send()
        .await
        .context("Failed to describe log groups")?;

    Ok(describe_output
        .log_groups
        .unwrap_or_default()
        .into_iter()
        .filter_map(|lg| lg.log_group_name)
        .collect())
}

/// Discovers log groups within a CloudFormation stack.
async fn discover_log_groups_from_stack(
    cfn_client: &CfnClient,
    stack_name: &str,
) -> Result<Vec<String>> {
    tracing::debug!("Discovering log groups from stack: '{}'", stack_name);
    let mut discovered_groups = Vec::new();
    let mut next_token: Option<String> = None;

    loop {
        let mut request = cfn_client.list_stack_resources().stack_name(stack_name);
        if let Some(token) = next_token {
            request = request.next_token(token);
        }

        let output = request
            .send()
            .await
            .with_context(|| format!("Failed to list resources for stack '{}'", stack_name))?;

        if let Some(summaries) = output.stack_resource_summaries {
            for summary in summaries {
                if summary.resource_type.as_deref() == Some("AWS::Logs::LogGroup") {
                    if let Some(physical_id) = summary.physical_resource_id {
                        discovered_groups.push(physical_id);
                    } else {
                        tracing::warn!(resource_summary = ?summary, "Found LogGroup resource without physical ID");
                    }
                } else if summary.resource_type.as_deref() == Some("AWS::Lambda::Function") {
                    if let Some(physical_id) = summary.physical_resource_id {
                        let lambda_log_group_name = format!("/aws/lambda/{}", physical_id);
                        tracing::debug!(lambda_function = %physical_id, derived_log_group = %lambda_log_group_name, "Adding derived log group for Lambda function");
                        discovered_groups.push(lambda_log_group_name);
                    } else {
                        tracing::warn!(resource_summary = ?summary, "Found Lambda function resource without physical ID");
                    }
                }
            }
        }

        if let Some(token) = output.next_token {
            next_token = Some(token);
        } else {
            break;
        }
    }
    Ok(discovered_groups)
}

/// Validates a list of potential log group names, prioritizing Lambda@Edge patterns.
pub async fn validate_log_groups(
    cwl_client: &CwlClient,
    initial_names: Vec<String>,
    region_str: &str,
) -> Result<Vec<String>> {
    const LAMBDA_PREFIX: &str = "/aws/lambda/";

    let checks = initial_names.into_iter().map(|name| {
        let client = cwl_client.clone();
        let region = region_str.to_string();
        async move {
            if name.starts_with(LAMBDA_PREFIX) {
                let base_name_part = name.strip_prefix(LAMBDA_PREFIX).unwrap_or(&name);

                let potential_edge_name = format!("{}{}.{}", LAMBDA_PREFIX, region, base_name_part);
                tracing::debug!(log_group=%name, potential_edge_name=%potential_edge_name, "Checking for Lambda@Edge variant first");

                match describe_exact_log_group(&client, &potential_edge_name).await {
                    Ok(Some(found_edge_name)) => {
                        tracing::debug!(log_group=%found_edge_name, "Validated exact Lambda@Edge log group name");
                        Ok(Some(found_edge_name))
                    }
                    Ok(None) => {
                        tracing::debug!(log_group=%name, potential_edge_name=%potential_edge_name, "Lambda@Edge variant not found, checking original name");
                         match describe_exact_log_group(&client, &name).await {
                            Ok(Some(found_original_name)) => {
                                tracing::debug!(log_group=%found_original_name, "Validated original Lambda log group name after checking edge variant");
                                Ok(Some(found_original_name))
                            }
                            Ok(None) => {
                                tracing::warn!(log_group=%name, potential_edge_name=%potential_edge_name, "Neither Lambda@Edge variant nor original name found. Skipping.");
                                Ok(None)
                            }
                            Err(e) => Err(e.context(format!("Error validating original Lambda log group name '{}' after checking edge variant", name))),
                         }
                    }
                     Err(e) => Err(e.context(format!("Error validating potential Lambda@Edge log group name '{}'", potential_edge_name))),
                }
            } else {
                tracing::debug!(log_group=%name, "Checking non-Lambda log group name directly");
                 match describe_exact_log_group(&client, &name).await {
                     Ok(Some(found_name)) => {
                         tracing::debug!(log_group = %found_name, "Validated non-Lambda log group");
                          Ok(Some(found_name))
                     }
                     Ok(None) => {
                          tracing::warn!(log_group=%name, "Non-Lambda log group name not found. Skipping.");
                          Ok(None)
                     }
                     Err(e) => Err(e.context(format!("Error validating non-Lambda log group '{}'", name))),
                }
            }
        }
    });

    let results = futures::future::join_all(checks).await;

    let mut validated_names = Vec::new();
    let mut errors = Vec::new(); // Collect errors to potentially report them all

    for result in results {
        match result {
            Ok(Some(name)) => validated_names.push(name),
            Ok(None) => {}            // Logged within the check, skip
            Err(e) => errors.push(e), // Collect error
        }
    }

    // If any errors occurred during validation, return the first one
    if let Some(first_error) = errors.into_iter().next() {
        return Err(first_error);
    }

    Ok(validated_names)
}

/// Helper to describe a single log group by exact name.
async fn describe_exact_log_group(client: &CwlClient, name: &str) -> Result<Option<String>> {
    match client
        .describe_log_groups()
        .log_group_name_prefix(name) // Use prefix for API
        .limit(1)
        .send()
        .await
    {
        Ok(output) => {
            // Check if the *exact* name was returned
            if output.log_groups.is_some_and(|lgs| {
                lgs.iter()
                    .any(|lg| lg.log_group_name.as_deref() == Some(name))
            }) {
                Ok(Some(name.to_string()))
            } else {
                Ok(None) // Prefix matched something else, or nothing
            }
        }
        Err(e) => {
            // Specifically handle ResourceNotFoundException as Ok(None)
            if let Some(service_error) = e.as_service_error() {
                // Compare the error code string directly
                if service_error.meta().code() == Some("ResourceNotFoundException") {
                    return Ok(None);
                }
            }
            // Otherwise, it's an actual error
            Err(anyhow::Error::new(e).context(format!("Failed to describe log group '{}'", name)))
        }
    }
}