otlp2pipeline 0.2.0

OTLP ingestion worker for Cloudflare Pipelines and AWS
Documentation
use anyhow::{bail, Result};
use std::env;
use std::io::{self, Write};
use std::process::Command;

use crate::cli::auth;
use crate::cli::config::Config;
use crate::cli::QueryArgs;
use crate::cloudflare::CloudflareClient;

fn bucket_name(env_name: &str) -> String {
    format!("otlp2pipeline-{}", env_name.replace('_', "-"))
}

pub async fn execute_query(args: QueryArgs) -> Result<()> {
    let env_name = args
        .env
        .clone()
        .or_else(|| Config::load().ok().map(|c| c.environment))
        .ok_or_else(|| {
            anyhow::anyhow!(
                "No environment specified. Either:\n  \
        1. Run `otlp2pipeline init --provider cf --env <name>` first\n  \
        2. Pass --env <name> explicitly"
            )
        })?;

    let bucket = bucket_name(&env_name);

    println!("==> Starting DuckDB session for environment: {}", env_name);
    println!("    Bucket: {}", bucket);
    println!();

    // Check for duckdb
    if Command::new("duckdb").arg("-version").output().is_err() {
        bail!(
            "duckdb not found\n\n\
            Install DuckDB (v1.4.0+ required for Iceberg REST Catalog):\n  \
            brew install duckdb\n  \
            # or download from https://duckdb.org/docs/installation/"
        );
    }

    // Resolve auth to get account ID
    let creds = auth::resolve_credentials()?;
    let client = CloudflareClient::new(creds.token, creds.account_id).await?;
    let account_id = client.account_id();

    // Get R2 API token
    let r2_token = match env::var("R2_API_TOKEN") {
        Ok(token) => {
            println!("    Using R2_API_TOKEN from environment");
            token
        }
        Err(_) => {
            println!("R2 API token required for Data Catalog access.");
            println!();
            println!("Create one at: https://dash.cloudflare.com/?to=/:account/r2/api-tokens");
            println!("  - Permissions: Admin Read & Write");
            println!("  - Specify bucket: {}", bucket);
            println!();
            println!("Tip: Set R2_API_TOKEN env var to skip this prompt");
            println!();
            eprint!("R2 API Token: ");
            io::stderr().flush()?;

            let mut token = String::new();
            io::stdin().read_line(&mut token)?;
            let token = token.trim().to_string();

            if token.is_empty() {
                bail!("No token provided");
            }
            token
        }
    };

    // Build catalog connection details
    let warehouse = format!("{}_{}", account_id, bucket);
    let catalog_uri = format!(
        "https://catalog.cloudflarestorage.com/{}/{}",
        account_id, bucket
    );

    println!();
    println!("    Warehouse: {}", warehouse);
    println!("    Catalog URI: {}", catalog_uri);
    println!();

    // Create init SQL file
    let init_sql = format!(
        r#"-- DuckDB init for otlp2pipeline environment: {}
-- Auto-generated by otlp2pipeline CLI

-- Install and load required extensions
INSTALL iceberg;
LOAD iceberg;
INSTALL httpfs;
LOAD httpfs;

-- Create secret for R2 Data Catalog
CREATE SECRET r2_catalog_secret (
    TYPE ICEBERG,
    TOKEN '{}'
);

-- Attach R2 Data Catalog
ATTACH '{}' AS r2 (
    TYPE ICEBERG,
    ENDPOINT '{}'
);

-- Set default schema
USE r2.default;

-- Show available tables
.print ''
.print '==> Connected to R2 Data Catalog'
.print '    Catalog: r2'
.print '    Schema: default'
.print ''
.print 'Available tables:'
SHOW TABLES;

.print ''
.print 'Example queries:'
.print '  SELECT count(*) FROM logs;'
.print '  SELECT * FROM traces LIMIT 10;'
.print '  DESCRIBE logs;'
.print ''
"#,
        env_name, r2_token, warehouse, catalog_uri
    );

    // Write to temp file
    let init_file = std::env::temp_dir().join(format!(
        "otlp2pipeline-duckdb-init-{}.sql",
        std::process::id()
    ));
    std::fs::write(&init_file, &init_sql)?;

    println!("==> Launching DuckDB...");
    println!();

    // Launch duckdb with init file
    let status = Command::new("duckdb")
        .arg("-init")
        .arg(&init_file)
        .status()?;

    // Cleanup
    let _ = std::fs::remove_file(&init_file);

    if !status.success() {
        bail!("DuckDB exited with error");
    }

    Ok(())
}