iridium-db 0.3.0

A high-performance vector-graph hybrid storage and indexing engine
use std::path::Path;

use iridium::features::ingest;
use iridium::features::ops;
use iridium::features::storage::api;

pub(super) fn run_ingest_file_command(
    handle: &mut api::StorageHandle,
    path: &Path,
    batch_size: usize,
) {
    let contents = std::fs::read_to_string(path).unwrap_or_else(|err| {
        eprintln!("ingest --file read failed ({}): {}", path.display(), err);
        std::process::exit(1);
    });
    let mut pipeline = ingest::IngestPipeline::new(ingest::IngestConfig {
        max_queue_depth: 16384,
        max_batch_size: batch_size,
    })
    .expect("ingest file config should be valid");

    let mut accepted = 0usize;
    let mut rejected = 0usize;
    let mut parse_errors = 0usize;
    let mut batch = Vec::with_capacity(batch_size);

    for (line_no, line) in contents.lines().enumerate() {
        let trimmed = line.trim();
        if trimmed.is_empty() {
            continue;
        }
        let event = match parse_ingest_event_jsonl(trimmed) {
            Ok(event) => event,
            Err(err) => {
                parse_errors += 1;
                eprintln!("ingest --file parse error at line {}: {}", line_no + 1, err);
                continue;
            }
        };
        batch.push(event);
        if batch.len() >= batch_size {
            match ingest::ingest_batch(&mut pipeline, handle, &batch) {
                Ok(ack) => {
                    accepted += ack.accepted;
                    rejected += ack.rejected;
                }
                Err(err) => {
                    eprintln!(
                        "ingest --file batch failed at line {}: {:?}",
                        line_no + 1,
                        err
                    );
                    std::process::exit(1);
                }
            }
            batch.clear();
        }
    }

    if !batch.is_empty() {
        match ingest::ingest_batch(&mut pipeline, handle, &batch) {
            Ok(ack) => {
                accepted += ack.accepted;
                rejected += ack.rejected;
            }
            Err(err) => {
                eprintln!("ingest --file final batch failed: {:?}", err);
                std::process::exit(1);
            }
        }
    }

    let flushed = pipeline.flush_all(handle).unwrap_or_else(|err| {
        eprintln!("ingest --file flush failed: {:?}", err);
        std::process::exit(1);
    });
    ops::record_ingest(accepted as u64, rejected as u64 + parse_errors as u64);
    println!(
        "accepted={} rejected={} parse_errors={} flushed={} queue_depth={}",
        accepted,
        rejected,
        parse_errors,
        flushed,
        pipeline.queue_depth()
    );
}

fn parse_ingest_event_jsonl(line: &str) -> std::result::Result<ingest::IngestEvent, String> {
    let event_type =
        extract_json_string(line, "type").ok_or_else(|| "missing field 'type'".to_string())?;
    let node_id =
        extract_json_u64(line, "node_id").ok_or_else(|| "missing field 'node_id'".to_string())?;
    let version =
        extract_json_u64(line, "version").ok_or_else(|| "missing field 'version'".to_string())?;

    match event_type.as_str() {
        "InsertNode" | "insert_node" => {
            let adjacency = extract_json_u64_array(line, "adjacency")
                .ok_or_else(|| "missing/invalid field 'adjacency'".to_string())?;
            Ok(ingest::IngestEvent::InsertNode {
                node_id,
                version,
                adjacency,
                bitmap_terms: Vec::new(),
                embedding_pending: false,
            })
        }
        "AddEdgeDelta" | "add_edge_delta" => {
            let payload = extract_json_string(line, "payload")
                .ok_or_else(|| "missing field 'payload'".to_string())?;
            Ok(ingest::IngestEvent::AddEdgeDelta {
                node_id,
                version,
                payload: payload.into_bytes(),
            })
        }
        "UpdateVectorDelta" | "update_vector_delta" => {
            let payload = extract_json_string(line, "payload")
                .ok_or_else(|| "missing field 'payload'".to_string())?;
            Ok(ingest::IngestEvent::UpdateVectorDelta {
                node_id,
                version,
                payload: parse_vector_payload_string(&payload)?,
            })
        }
        other => Err(format!("unsupported type '{}'", other)),
    }
}

fn parse_vector_payload_string(payload: &str) -> Result<Vec<u8>, String> {
    let values = payload
        .split(',')
        .map(str::trim)
        .filter(|part| !part.is_empty())
        .map(|part| {
            part.parse::<f32>()
                .map_err(|_| format!("invalid vector component '{}'", part))
        })
        .collect::<Result<Vec<f32>, String>>()?;
    if values.is_empty() {
        return Err("vector payload must contain at least one float".to_string());
    }
    Ok(api::encode_vector_payload_f32(
        1,
        api::VectorMetric::Cosine,
        &values,
        false,
    ))
}

fn extract_json_string(line: &str, key: &str) -> Option<String> {
    let needle = format!("\"{}\"", key);
    let key_idx = line.find(&needle)?;
    let rest = &line[key_idx + needle.len()..];
    let colon_idx = rest.find(':')?;
    let mut value = rest[colon_idx + 1..].trim_start();
    if !value.starts_with('"') {
        return None;
    }
    value = &value[1..];
    let end = value.find('"')?;
    Some(value[..end].to_string())
}

fn extract_json_u64(line: &str, key: &str) -> Option<u64> {
    let needle = format!("\"{}\"", key);
    let key_idx = line.find(&needle)?;
    let rest = &line[key_idx + needle.len()..];
    let colon_idx = rest.find(':')?;
    let value = rest[colon_idx + 1..]
        .trim_start()
        .chars()
        .take_while(|c| c.is_ascii_digit())
        .collect::<String>();
    if value.is_empty() {
        return None;
    }
    value.parse::<u64>().ok()
}

fn extract_json_u64_array(line: &str, key: &str) -> Option<Vec<u64>> {
    let needle = format!("\"{}\"", key);
    let key_idx = line.find(&needle)?;
    let rest = &line[key_idx + needle.len()..];
    let colon_idx = rest.find(':')?;
    let arr = rest[colon_idx + 1..].trim_start();
    let open = arr.find('[')?;
    let close = arr[open + 1..].find(']')? + open + 1;
    let body = arr[open + 1..close].trim();
    if body.is_empty() {
        return Some(Vec::new());
    }
    let mut out = Vec::new();
    for token in body.split(',') {
        out.push(token.trim().parse::<u64>().ok()?);
    }
    Some(out)
}