use anyhow::{bail, Result};
use std::env;
use std::io::{self, Write};
use std::process::Command;
use crate::cli::auth;
use crate::cli::config::Config;
use crate::cli::QueryArgs;
use crate::cloudflare::CloudflareClient;
fn bucket_name(env_name: &str) -> String {
format!("otlp2pipeline-{}", env_name.replace('_', "-"))
}
pub async fn execute_query(args: QueryArgs) -> Result<()> {
let env_name = args
.env
.clone()
.or_else(|| Config::load().ok().map(|c| c.environment))
.ok_or_else(|| {
anyhow::anyhow!(
"No environment specified. Either:\n \
1. Run `otlp2pipeline init --provider cf --env <name>` first\n \
2. Pass --env <name> explicitly"
)
})?;
let bucket = bucket_name(&env_name);
println!("==> Starting DuckDB session for environment: {}", env_name);
println!(" Bucket: {}", bucket);
println!();
if Command::new("duckdb").arg("-version").output().is_err() {
bail!(
"duckdb not found\n\n\
Install DuckDB (v1.4.0+ required for Iceberg REST Catalog):\n \
brew install duckdb\n \
# or download from https://duckdb.org/docs/installation/"
);
}
let creds = auth::resolve_credentials()?;
let client = CloudflareClient::new(creds.token, creds.account_id).await?;
let account_id = client.account_id();
let r2_token = match env::var("R2_API_TOKEN") {
Ok(token) => {
println!(" Using R2_API_TOKEN from environment");
token
}
Err(_) => {
println!("R2 API token required for Data Catalog access.");
println!();
println!("Create one at: https://dash.cloudflare.com/?to=/:account/r2/api-tokens");
println!(" - Permissions: Admin Read & Write");
println!(" - Specify bucket: {}", bucket);
println!();
println!("Tip: Set R2_API_TOKEN env var to skip this prompt");
println!();
eprint!("R2 API Token: ");
io::stderr().flush()?;
let mut token = String::new();
io::stdin().read_line(&mut token)?;
let token = token.trim().to_string();
if token.is_empty() {
bail!("No token provided");
}
token
}
};
let warehouse = format!("{}_{}", account_id, bucket);
let catalog_uri = format!(
"https://catalog.cloudflarestorage.com/{}/{}",
account_id, bucket
);
println!();
println!(" Warehouse: {}", warehouse);
println!(" Catalog URI: {}", catalog_uri);
println!();
let init_sql = format!(
r#"-- DuckDB init for otlp2pipeline environment: {}
-- Auto-generated by otlp2pipeline CLI
-- Install and load required extensions
INSTALL iceberg;
LOAD iceberg;
INSTALL httpfs;
LOAD httpfs;
-- Create secret for R2 Data Catalog
CREATE SECRET r2_catalog_secret (
TYPE ICEBERG,
TOKEN '{}'
);
-- Attach R2 Data Catalog
ATTACH '{}' AS r2 (
TYPE ICEBERG,
ENDPOINT '{}'
);
-- Set default schema
USE r2.default;
-- Show available tables
.print ''
.print '==> Connected to R2 Data Catalog'
.print ' Catalog: r2'
.print ' Schema: default'
.print ''
.print 'Available tables:'
SHOW TABLES;
.print ''
.print 'Example queries:'
.print ' SELECT count(*) FROM logs;'
.print ' SELECT * FROM traces LIMIT 10;'
.print ' DESCRIBE logs;'
.print ''
"#,
env_name, r2_token, warehouse, catalog_uri
);
let init_file = std::env::temp_dir().join(format!(
"otlp2pipeline-duckdb-init-{}.sql",
std::process::id()
));
std::fs::write(&init_file, &init_sql)?;
println!("==> Launching DuckDB...");
println!();
let status = Command::new("duckdb")
.arg("-init")
.arg(&init_file)
.status()?;
let _ = std::fs::remove_file(&init_file);
if !status.success() {
bail!("DuckDB exited with error");
}
Ok(())
}