otlp2pipeline 0.4.0

OTLP ingestion worker for Cloudflare Pipelines and AWS
Documentation
use anyhow::{bail, Result};
use std::process::Command;

use super::helpers::{load_config, resolve_env_name, resolve_region};
use crate::cli::commands::naming;
use crate::cli::QueryArgs;

/// Execute AWS query command - launches DuckDB connected to S3 Tables
pub fn execute_query(args: QueryArgs) -> Result<()> {
    let config = load_config()?;

    let env_name = resolve_env_name(args.env)?;
    let region = resolve_region(None, &config);

    // Get account_id from config or error
    let account_id = config
        .as_ref()
        .and_then(|c| c.account_id.clone())
        .ok_or_else(|| {
            anyhow::anyhow!(
                "AWS account_id not found in config.\n\n\
                To fix, either:\n  \
                1. Re-run init with AWS CLI configured:\n     \
                   otlp2pipeline init --provider aws --env {} --region {}\n  \
                2. Manually add to .otlp2pipeline.toml:\n     \
                   account_id = \"YOUR_12_DIGIT_ACCOUNT_ID\"",
                env_name,
                region
            )
        })?;

    let bucket_name = format!("otlp2pipeline-{}", naming::normalize(&env_name));
    let table_bucket_arn = format!(
        "arn:aws:s3tables:{}:{}:bucket/{}",
        region, account_id, bucket_name
    );

    println!("==> Starting DuckDB session for AWS S3 Tables");
    println!("    Environment: {}", env_name);
    println!("    Region: {}", region);
    println!("    Account ID: {}", account_id);
    println!("    Table Bucket ARN: {}", table_bucket_arn);
    println!();

    // Check for duckdb
    if Command::new("duckdb").arg("-version").output().is_err() {
        bail!(
            "duckdb not found\n\n\
            Install DuckDB (v1.2.1+ required for S3 Tables):\n  \
            brew install duckdb\n  \
            # or download from https://duckdb.org/docs/installation/"
        );
    }

    // Build init SQL
    let init_sql = format!(
        r#"-- DuckDB init for otlp2pipeline AWS environment: {}
-- Auto-generated by otlp2pipeline CLI

-- Install and load required extensions
INSTALL aws;
INSTALL httpfs;
INSTALL iceberg;
LOAD aws;
LOAD httpfs;
LOAD iceberg;

-- Use AWS credential chain (auto-detect from ~/.aws, env vars, IAM role)
CREATE SECRET (
    TYPE s3,
    PROVIDER credential_chain
);

-- Attach S3 Tables bucket
ATTACH '{}' AS s3_tables (
    TYPE iceberg,
    ENDPOINT_TYPE s3_tables
);

-- Set default schema
USE s3_tables.default;

-- Show available tables
.print ''
.print '==> Connected to AWS S3 Tables'
.print '    Catalog: s3_tables'
.print '    Schema: default'
.print ''
.print 'Available tables:'
SHOW TABLES;

.print ''
.print 'Example queries:'
.print '  SELECT count(*) FROM logs;'
.print '  SELECT * FROM traces LIMIT 10;'
.print '  DESCRIBE logs;'
.print ''
"#,
        env_name, table_bucket_arn
    );

    // Write to temp file
    let init_file = std::env::temp_dir().join(format!(
        "otlp2pipeline-duckdb-aws-init-{}.sql",
        std::process::id()
    ));
    std::fs::write(&init_file, &init_sql)?;

    println!("==> Launching DuckDB...");
    println!();

    // Launch duckdb with init file
    let status = Command::new("duckdb")
        .arg("-init")
        .arg(&init_file)
        .status()?;

    // Cleanup
    if let Err(e) = std::fs::remove_file(&init_file) {
        eprintln!(
            "Warning: Could not clean up temp file {}: {}",
            init_file.display(),
            e
        );
    }

    if !status.success() {
        bail!(
            "DuckDB exited with error (exit code: {}). Check the output above for details.",
            status
                .code()
                .map_or("unknown".to_string(), |c| c.to_string())
        );
    }

    Ok(())
}