iridium-db 0.3.0

A high-performance vector-graph hybrid storage and indexing engine
use std::path::Path;

use iridium::features::ingest;
use iridium::features::ops;
use iridium::features::runtime;
use iridium::features::storage::api;

use crate::ir_cli::Command;

use super::common::open_store_with_data_root;
use super::ingest_file::run_ingest_file_command;

pub(super) fn run_storage_command(command: Command, data_root: Option<&Path>) {
    let mut handle = match open_store_with_data_root(data_root) {
        Ok(handle) => handle,
        Err(err) => {
            eprintln!("failed to open store: {:?}", err);
            std::process::exit(1);
        }
    };

    match command {
        Command::PutEdge {
            node_id,
            version,
            payload,
        } => {
            let delta = api::encode_delta(node_id, version, &payload);
            if let Err(err) = api::put_edge_delta(&mut handle, &delta) {
                eprintln!("put-edge failed: {:?}", err);
                std::process::exit(1);
            }
        }
        Command::PutVector {
            node_id,
            version,
            payload,
        } => {
            let payload = parse_cli_vector_payload(&payload).unwrap_or_else(|err| {
                eprintln!("put-vector invalid payload: {}", err);
                std::process::exit(1);
            });
            let delta = api::encode_delta(node_id, version, &payload);
            if let Err(err) = api::put_vector_delta(&mut handle, &delta) {
                eprintln!("put-vector failed: {:?}", err);
                std::process::exit(1);
            }
        }
        Command::PutFull {
            node_id,
            version,
            adjacency,
        } => {
            if let Err(err) = api::put_full_node(&mut handle, node_id, version, &adjacency) {
                eprintln!("put-full failed: {:?}", err);
                std::process::exit(1);
            }
        }
        Command::Get { node_id } => match api::get_logical_node(&mut handle, node_id) {
            Ok(node) => {
                println!("node {}", node.node_id);
                println!("full: {}", node.full.is_some());
                println!("deltas: {}", node.deltas.len());
            }
            Err(err) => {
                eprintln!("get failed: {:?}", err);
                std::process::exit(1);
            }
        },
        Command::Traverse { node_id } => match api::get_logical_node(&mut handle, node_id) {
            Ok(node) => {
                let adjacency = node.adjacency();
                println!(
                    "{}",
                    adjacency
                        .iter()
                        .map(|n| n.to_string())
                        .collect::<Vec<_>>()
                        .join(",")
                );
            }
            Err(err) => {
                eprintln!("traverse failed: {:?}", err);
                std::process::exit(1);
            }
        },
        Command::Recover => match api::recover_from_wal(&mut handle) {
            Ok(applied) => println!("recovered {} records", applied),
            Err(err) => {
                eprintln!("recover failed: {:?}", err);
                std::process::exit(1);
            }
        },
        Command::Stats => {
            let report = api::report_metrics(&handle);
            println!("write_amp: {:?}", report.write_amp);
            println!("read_amp: {:?}", report.read_amp);
            println!("space_amp: {:?}", report.space_amp);
        }
        Command::IngestEdge {
            node_id,
            version,
            payload,
        } => {
            let mut pipeline = ingest::IngestPipeline::new(ingest::IngestConfig::default())
                .expect("default ingest config should be valid");
            let ack = ingest::ingest_event(
                &mut pipeline,
                &mut handle,
                ingest::IngestEvent::AddEdgeDelta {
                    node_id,
                    version,
                    payload,
                },
            )
            .unwrap_or_else(|err| {
                ops::record_ingest(0, 1);
                eprintln!("ingest-edge failed: {:?}", err);
                std::process::exit(1);
            });
            let flushed = pipeline.flush_all(&mut handle).unwrap_or_else(|err| {
                ops::record_ingest(0, 1);
                eprintln!("ingest-edge flush failed: {:?}", err);
                std::process::exit(1);
            });
            ops::record_ingest(ack.accepted as u64, ack.rejected as u64);
            println!(
                "accepted={} rejected={} flushed={} queue_depth={}",
                ack.accepted,
                ack.rejected,
                ack.flushed + flushed,
                pipeline.queue_depth()
            );
        }
        Command::IngestFile { path, batch_size } => {
            run_ingest_file_command(&mut handle, &path, batch_size);
        }
        Command::IngestVector {
            node_id,
            version,
            payload,
        } => {
            let payload = parse_cli_vector_payload(&payload).unwrap_or_else(|err| {
                ops::record_ingest(0, 1);
                eprintln!("ingest-vector invalid payload: {}", err);
                std::process::exit(1);
            });
            let mut pipeline = ingest::IngestPipeline::new(ingest::IngestConfig::default())
                .expect("default ingest config should be valid");
            let ack = ingest::ingest_event(
                &mut pipeline,
                &mut handle,
                ingest::IngestEvent::UpdateVectorDelta {
                    node_id,
                    version,
                    payload,
                },
            )
            .unwrap_or_else(|err| {
                ops::record_ingest(0, 1);
                eprintln!("ingest-vector failed: {:?}", err);
                std::process::exit(1);
            });
            let flushed = pipeline.flush_all(&mut handle).unwrap_or_else(|err| {
                ops::record_ingest(0, 1);
                eprintln!("ingest-vector flush failed: {:?}", err);
                std::process::exit(1);
            });
            ops::record_ingest(ack.accepted as u64, ack.rejected as u64);
            println!(
                "accepted={} rejected={} flushed={} queue_depth={}",
                ack.accepted,
                ack.rejected,
                ack.flushed + flushed,
                pipeline.queue_depth()
            );
        }
        Command::IngestNode {
            node_id,
            version,
            adjacency,
        } => {
            let mut pipeline = ingest::IngestPipeline::new(ingest::IngestConfig::default())
                .expect("default ingest config should be valid");
            let ack = ingest::ingest_event(
                &mut pipeline,
                &mut handle,
                ingest::IngestEvent::InsertNode {
                    node_id,
                    version,
                    adjacency,
                    bitmap_terms: Vec::new(),
                    embedding_pending: false,
                },
            )
            .unwrap_or_else(|err| {
                ops::record_ingest(0, 1);
                eprintln!("ingest-node failed: {:?}", err);
                std::process::exit(1);
            });
            let flushed = pipeline.flush_all(&mut handle).unwrap_or_else(|err| {
                ops::record_ingest(0, 1);
                eprintln!("ingest-node flush failed: {:?}", err);
                std::process::exit(1);
            });
            ops::record_ingest(ack.accepted as u64, ack.rejected as u64);
            println!(
                "accepted={} rejected={} flushed={} queue_depth={}",
                ack.accepted,
                ack.rejected,
                ack.flushed + flushed,
                pipeline.queue_depth()
            );
        }
        Command::IngestBatchEdge {
            start_node_id,
            count,
            version,
            payload_prefix,
        } => {
            let mut pipeline = ingest::IngestPipeline::new(ingest::IngestConfig::default())
                .expect("default ingest config should be valid");
            let ack = ingest::ingest_edge_delta_range(
                &mut pipeline,
                &mut handle,
                start_node_id,
                count,
                version,
                &payload_prefix,
            )
            .unwrap_or_else(|err| {
                ops::record_ingest(0, 1);
                eprintln!("ingest-batch-edge failed: {:?}", err);
                std::process::exit(1);
            });
            let flushed = pipeline.flush_all(&mut handle).unwrap_or_else(|err| {
                ops::record_ingest(0, 1);
                eprintln!("ingest-batch-edge flush failed: {:?}", err);
                std::process::exit(1);
            });
            ops::record_ingest(ack.accepted as u64, ack.rejected as u64);
            println!(
                "accepted={} rejected={} flushed={} queue_depth={}",
                ack.accepted,
                ack.rejected,
                ack.flushed + flushed,
                pipeline.queue_depth()
            );
        }
        Command::IngestBatchEdgeLoop {
            start_node_id,
            count,
            rounds,
            version,
            payload_prefix,
        } => {
            let mut pipeline = ingest::IngestPipeline::new(ingest::IngestConfig::default())
                .expect("default ingest config should be valid");
            let mut accepted = 0usize;
            let mut rejected = 0usize;
            let mut flushed = 0usize;
            for round in 0..rounds {
                let round_start = start_node_id + (round * count);
                let ack = ingest::ingest_edge_delta_range(
                    &mut pipeline,
                    &mut handle,
                    round_start,
                    count,
                    version,
                    &payload_prefix,
                )
                .unwrap_or_else(|err| {
                    ops::record_ingest(0, 1);
                    eprintln!("ingest-batch-edge-loop failed: {:?}", err);
                    std::process::exit(1);
                });
                accepted += ack.accepted;
                rejected += ack.rejected;
                flushed += ack.flushed;
            }
            let flushed_tail = pipeline.flush_all(&mut handle).unwrap_or_else(|err| {
                ops::record_ingest(0, 1);
                eprintln!("ingest-batch-edge-loop flush failed: {:?}", err);
                std::process::exit(1);
            });
            ops::record_ingest(accepted as u64, rejected as u64);
            println!(
                "accepted={} rejected={} flushed={} queue_depth={}",
                accepted,
                rejected,
                flushed + flushed_tail,
                pipeline.queue_depth()
            );
        }
        Command::CollectStats => {
            let snapshot =
                runtime::collect_planner_stats(&handle, &runtime::ExecuteParams::default());
            if let Err(e) = runtime::set_planner_stats(snapshot.clone()) {
                eprintln!("failed to set planner stats: {:?}", e);
                std::process::exit(1);
            }
            println!("stats_version: {}", snapshot.stats_version);
            println!("node_scan_base_cost: {}", snapshot.node_scan_base_cost);
            println!("vector_scan_base_cost: {}", snapshot.vector_scan_base_cost);
            println!(
                "vector_selectivity_ppm: {}",
                snapshot.vector_selectivity_ppm
            );
            println!("graph_selectivity_ppm: {}", snapshot.graph_selectivity_ppm);
            println!("skew_penalty_cost: {}", snapshot.skew_penalty_cost);
            println!("ttl_millis: {}", snapshot.ttl_millis);
        }
        Command::Parse { .. }
        | Command::Help
        | Command::Explain { .. }
        | Command::Query { .. }
        | Command::ContractReport { .. }
        | Command::EvidenceReport { .. }
        | Command::ServiceReport { .. }
        | Command::ServiceValidate { .. }
        | Command::ServiceServe { .. }
        | Command::Health
        | Command::Metrics
        | Command::MetricsProm
        | Command::Bench { .. } => {
            unreachable!("handled in main")
        }
        Command::Status => {
            println!("schema_version: {}", handle.manifest.schema_version);
            println!("high_water_mark: {}", handle.manifest.high_water_mark);
            println!("l0_runs: {}", handle.l0_runs.len());
            println!("memtable_empty: {}", handle.memtable.is_empty());
            println!("sstable_cache_entries: {}", handle.sstable_cache.len());
            println!(
                "pending_deltas_max: {}",
                handle
                    .pending_deltas_per_node
                    .values()
                    .copied()
                    .max()
                    .unwrap_or(0)
            );
            println!(
                "hnsw_total_vectors: {} hnsw_updated_vectors: {}",
                handle.hnsw_total_vectors, handle.hnsw_updated_vectors
            );
            println!(
                "last_hnsw_rebuild_reason: {}",
                handle.last_hnsw_rebuild_reason.as_deref().unwrap_or("none")
            );
        }
        Command::TriggerCompaction { level } => match api::compact(&mut handle, level) {
            Ok(()) => println!("compaction triggered for level {:?}", level),
            Err(err) => {
                eprintln!("trigger-compaction failed: {:?}", err);
                std::process::exit(1);
            }
        },
        Command::Recluster { force } => {
            if force {
                println!(
                    "recluster requested (--force), but execution path is not implemented yet"
                );
            } else {
                println!("recluster is a manual placeholder; use --force to acknowledge");
            }
        }
    }
}

fn parse_cli_vector_payload(payload: &[u8]) -> Result<Vec<u8>, String> {
    let raw = std::str::from_utf8(payload)
        .map_err(|_| "vector payload must be utf8 comma-separated floats".to_string())?;
    let values = raw
        .split(',')
        .map(str::trim)
        .filter(|part| !part.is_empty())
        .map(|part| {
            part.parse::<f32>()
                .map_err(|_| format!("invalid f32 component '{}'", part))
        })
        .collect::<Result<Vec<f32>, String>>()?;
    if values.is_empty() {
        return Err("vector payload must contain at least one float".to_string());
    }
    Ok(api::encode_vector_payload_f32(
        1,
        api::VectorMetric::Cosine,
        &values,
        false,
    ))
}