otlp2records 0.4.0

Transform OTLP telemetry to flattened records
Documentation
//! Build script for otlp2records
//!
//! Parses @schema annotations from VRL files and generates:
//! - Arrow schema definitions
//! - VRL source constants
//!
//! Output is written to $OUT_DIR/compiled_vrl.rs

use std::{env, fs, path::Path};

fn main() {
    compile_vrl_scripts();

    // Generate C header when ffi feature is enabled
    #[cfg(feature = "ffi")]
    generate_c_header();
}

/// Generate C FFI header using cbindgen
#[cfg(feature = "ffi")]
fn generate_c_header() {
    let crate_dir = env::var("CARGO_MANIFEST_DIR").expect("CARGO_MANIFEST_DIR not set");
    let crate_path = Path::new(&crate_dir);

    // Output to include/ directory in crate root
    let include_dir = crate_path.join("include");
    fs::create_dir_all(&include_dir).ok();

    let output_file = include_dir.join("otlp2records_ffi.h");

    // Rerun if ffi.rs changes
    println!("cargo:rerun-if-changed=src/ffi.rs");

    let config = cbindgen::Config {
        language: cbindgen::Language::C,
        cpp_compat: true,
        include_guard: Some("OTLP2RECORDS_FFI_H".to_string()),
        no_includes: true,
        includes: vec!["stdint.h".to_string(), "stddef.h".to_string()],
        sys_includes: vec![],
        after_includes: Some(
            r#"
#ifdef __cplusplus
extern "C" {
#endif
"#
            .to_string(),
        ),
        trailer: Some(
            r#"
#ifdef __cplusplus
}
#endif
"#
            .to_string(),
        ),
        documentation: true,
        documentation_style: cbindgen::DocumentationStyle::C,
        ..Default::default()
    };

    match cbindgen::Builder::new()
        .with_crate(&crate_dir)
        .with_config(config)
        .generate()
    {
        Ok(bindings) => {
            bindings.write_to_file(&output_file);
            println!("cargo:note=Generated C header at {}", output_file.display());
        }
        Err(e) => {
            println!("cargo:warning=Failed to generate C bindings: {e}");
        }
    }
}

/// Schema field parsed from VRL annotations
struct SchemaField {
    name: String,
    field_type: String,
    required: bool,
}

/// Schema parsed from VRL file header
struct Schema {
    name: String,
    fields: Vec<SchemaField>,
}

/// VRL scripts to compile and embed
const VRL_SCRIPTS: &[(&str, &str)] = &[
    ("OTLP_LOGS", "otlp_logs.vrl"),
    ("OTLP_TRACES", "otlp_traces.vrl"),
    ("OTLP_GAUGE", "otlp_gauge.vrl"),
    ("OTLP_SUM", "otlp_sum.vrl"),
    ("OTLP_HISTOGRAM", "otlp_histogram.vrl"),
    ("OTLP_EXP_HISTOGRAM", "otlp_exp_histogram.vrl"),
];

fn compile_vrl_scripts() {
    let out_dir = env::var("OUT_DIR").expect("OUT_DIR not set");
    let manifest_dir = env::var("CARGO_MANIFEST_DIR").expect("CARGO_MANIFEST_DIR not set");
    let vrl_dir = Path::new(&manifest_dir).join("vrl");

    // Rerun if vrl directory changes
    println!("cargo:rerun-if-changed=vrl/");

    let mut output = String::new();

    // Header - use regular comments since this is included via include!()
    output.push_str("// Auto-generated VRL scripts and Arrow schemas\n");
    output.push_str("// DO NOT EDIT - Generated by build.rs\n\n");

    // Note: Imports are handled by the including module (runtime.rs)
    // We use full paths for arrow types to avoid import conflicts

    let mut schemas_generated = Vec::new();

    for (const_name, filename) in VRL_SCRIPTS {
        let vrl_path = vrl_dir.join(filename);

        // Rerun if this specific file changes
        println!("cargo:rerun-if-changed=vrl/{filename}");

        if !vrl_path.exists() {
            println!("cargo:warning=VRL file not found: vrl/{filename} - generating placeholder");

            // Generate placeholder source constant
            output.push_str(&format!(
                "/// VRL source for {filename} (placeholder - file not found)\n"
            ));
            output.push_str(&format!(
                "pub const {const_name}_SOURCE: &str = \"// Placeholder: vrl/{filename} not found\\n\";\n\n"
            ));

            // Generate placeholder schema
            output.push_str(&format!(
                "/// Arrow schema for {filename} (placeholder - file not found)\n"
            ));
            output.push_str(&format!(
                "#[allow(dead_code)]\npub static {const_name}_SCHEMA: Lazy<arrow::datatypes::Schema> = Lazy::new(|| {{\n"
            ));
            output.push_str("    arrow::datatypes::Schema::empty()\n");
            output.push_str("});\n\n");

            continue;
        }

        let source = fs::read_to_string(&vrl_path).unwrap_or_else(|e| {
            panic!("Failed to read {}: {}", vrl_path.display(), e);
        });

        // Validate VRL syntax only (avoid stdlib/custom function resolution here).
        if let Err(e) = vrl::parser::parse(&source) {
            println!("cargo:warning=VRL compilation warning for {filename}: {e:?}");
            // Continue anyway - the error might be due to missing functions at build time
        }

        // Generate source constant
        // Use r##"..."## to allow "# sequences in the VRL source
        // If the source contains "##, this would need an even longer delimiter
        if source.contains("\"##") {
            panic!(
                "VRL source {filename} contains '\"##' which breaks raw string embedding. \
                 Please avoid this sequence in VRL files."
            );
        }
        output.push_str(&format!("/// VRL source for {filename}\n"));
        output.push_str("#[allow(dead_code)]\n");
        output.push_str(&format!(
            "pub const {const_name}_SOURCE: &str = r##\"{source}\"##;\n\n"
        ));

        // Parse schema from VRL file
        if let Some(schema) = parse_schema_from_vrl(&source) {
            schemas_generated.push(const_name.to_string());
            output.push_str(&generate_arrow_schema(const_name, &schema));
            output.push_str(&generate_schema_defs(const_name, &schema));
        } else {
            println!(
                "cargo:warning=No @schema annotation found in vrl/{filename} - generating empty schema"
            );

            // Generate empty schema
            output.push_str(&format!(
                "/// Arrow schema for {filename} (no @schema annotation found)\n"
            ));
            output.push_str(&format!(
                "#[allow(dead_code)]\npub static {const_name}_SCHEMA: Lazy<arrow::datatypes::Schema> = Lazy::new(|| {{\n"
            ));
            output.push_str("    arrow::datatypes::Schema::empty()\n");
            output.push_str("});\n\n");
        }
    }

    // Generate a list of all schema names for convenience
    output.push_str("/// Names of all VRL scripts\n");
    output.push_str("#[allow(dead_code)]\n");
    output.push_str("pub const VRL_SCRIPT_NAMES: &[&str] = &[\n");
    for (const_name, _) in VRL_SCRIPTS {
        output.push_str(&format!("    \"{const_name}\",\n"));
    }
    output.push_str("];\n");

    // Generate list of schema definitions
    output.push_str("\n/// All schema definitions parsed from VRL\n");
    output.push_str("#[allow(dead_code)]\n");
    output.push_str("pub static ALL_SCHEMA_DEFS: &[crate::schemas::SchemaDef] = &[\n");
    for const_name in &schemas_generated {
        output.push_str(&format!("    {const_name}_SCHEMA_DEF,\n"));
    }
    output.push_str("];\n");

    // Write output
    let out_path = Path::new(&out_dir).join("compiled_vrl.rs");
    fs::write(&out_path, output).expect("Failed to write compiled_vrl.rs");

    println!(
        "cargo:note=Generated {} with {} schemas",
        out_path.display(),
        schemas_generated.len()
    );
}

fn generate_schema_defs(const_name: &str, schema: &Schema) -> String {
    let mut out = String::new();

    out.push_str(&format!(
        "/// Schema fields for {}\n#[allow(dead_code)]\npub static {}_FIELDS: &[crate::schemas::SchemaField] = &[\n",
        schema.name, const_name
    ));

    for field in &schema.fields {
        out.push_str(&format!(
            "    crate::schemas::SchemaField {{ name: \"{}\", field_type: \"{}\", required: {} }},\n",
            field.name, field.field_type, field.required
        ));
    }

    out.push_str("];\n\n");

    out.push_str(&format!(
        "/// Schema definition for {}\n#[allow(dead_code)]\npub static {}_SCHEMA_DEF: crate::schemas::SchemaDef = crate::schemas::SchemaDef {{ name: \"{}\", fields: {}_FIELDS }};\n\n",
        schema.name, const_name, schema.name, const_name
    ));

    out
}

fn parse_schema_from_vrl(source: &str) -> Option<Schema> {
    let mut in_schema_block = false;
    let mut schema_name = None;
    let mut fields = Vec::new();

    for line in source.lines() {
        let line = line.trim();

        // Start of schema block
        if line.starts_with("# @schema ") {
            in_schema_block = true;
            schema_name = Some(line.trim_start_matches("# @schema ").trim().to_string());
            continue;
        }

        // End of schema block
        if line == "# @end" {
            break;
        }

        if !in_schema_block {
            continue;
        }

        // Skip @description and empty lines
        if line.starts_with("# @") || line == "#" || line.is_empty() {
            continue;
        }

        // Field definition: # field_name: type, required?, "description"?
        if line.starts_with("# ") && line.contains(':') {
            if let Some(field) = parse_field_line(&line[2..]) {
                fields.push(field);
            } else {
                println!(
                    "cargo:warning=Failed to parse schema field: {}",
                    line.trim_start_matches("# ")
                );
            }
        }
    }

    schema_name.map(|name| Schema { name, fields })
}

fn parse_field_line(line: &str) -> Option<SchemaField> {
    // Format: field_name: type, required?, "description"?
    let mut parts = line.splitn(2, ':');
    let name = parts.next()?.trim().to_string();
    let rest = parts.next()?.trim();

    // Strip description in quotes if present
    let rest = if let Some(quote_start) = rest.find('"') {
        rest[..quote_start].trim()
    } else {
        rest
    };

    // Parse type and required flag
    let mut field_type = String::new();
    let mut required = false;
    for part in rest.split(',') {
        let part = part.trim();
        if part == "required" {
            required = true;
        } else if !part.is_empty() && field_type.is_empty() {
            field_type = part.to_string();
        }
    }

    if field_type.is_empty() {
        return None;
    }

    Some(SchemaField {
        name,
        field_type,
        required,
    })
}

/// Map schema type to Arrow DataType
fn map_to_arrow_type(field_type: &str) -> String {
    match field_type {
        "timestamp" => {
            "arrow::datatypes::DataType::Timestamp(arrow::datatypes::TimeUnit::Microsecond, None)"
                .to_string()
        }
        "int64" => "arrow::datatypes::DataType::Int64".to_string(),
        "int32" => "arrow::datatypes::DataType::Int32".to_string(),
        "float64" => "arrow::datatypes::DataType::Float64".to_string(),
        "bool" => "arrow::datatypes::DataType::Boolean".to_string(),
        "string" => "arrow::datatypes::DataType::Utf8".to_string(),
        "json" => "arrow::datatypes::DataType::Utf8".to_string(), // JSON stored as string
        other => {
            println!("cargo:warning=Unknown schema type '{other}', defaulting to Utf8");
            "arrow::datatypes::DataType::Utf8".to_string()
        }
    }
}

/// Generate Arrow schema constant for a parsed schema
fn generate_arrow_schema(const_name: &str, schema: &Schema) -> String {
    let mut output = String::new();

    output.push_str(&format!(
        "/// Arrow schema for {} (from @schema {})\n",
        const_name, schema.name
    ));
    output.push_str(&format!(
        "#[allow(dead_code)]\npub static {const_name}_SCHEMA: Lazy<arrow::datatypes::Schema> = Lazy::new(|| {{\n"
    ));
    output.push_str("    arrow::datatypes::Schema::new(vec![\n");

    for field in &schema.fields {
        let arrow_type = map_to_arrow_type(&field.field_type);
        let nullable = !field.required;
        output.push_str(&format!(
            "        arrow::datatypes::Field::new(\"{}\", {}, {}),\n",
            field.name, arrow_type, nullable
        ));
    }

    output.push_str("    ])\n");
    output.push_str("});\n\n");

    output
}