use std::path::Path;
use iridium::features::ingest;
use iridium::features::ops;
use iridium::features::runtime;
use iridium::features::storage::api;
use crate::ir_cli::Command;
use super::common::open_store_with_data_root;
use super::ingest_file::run_ingest_file_command;
pub(super) fn run_storage_command(command: Command, data_root: Option<&Path>) {
let mut handle = match open_store_with_data_root(data_root) {
Ok(handle) => handle,
Err(err) => {
eprintln!("failed to open store: {:?}", err);
std::process::exit(1);
}
};
match command {
Command::PutEdge {
node_id,
version,
payload,
} => {
let delta = api::encode_delta(node_id, version, &payload);
if let Err(err) = api::put_edge_delta(&mut handle, &delta) {
eprintln!("put-edge failed: {:?}", err);
std::process::exit(1);
}
}
Command::PutVector {
node_id,
version,
payload,
} => {
let payload = parse_cli_vector_payload(&payload).unwrap_or_else(|err| {
eprintln!("put-vector invalid payload: {}", err);
std::process::exit(1);
});
let delta = api::encode_delta(node_id, version, &payload);
if let Err(err) = api::put_vector_delta(&mut handle, &delta) {
eprintln!("put-vector failed: {:?}", err);
std::process::exit(1);
}
}
Command::PutFull {
node_id,
version,
adjacency,
} => {
if let Err(err) = api::put_full_node(&mut handle, node_id, version, &adjacency) {
eprintln!("put-full failed: {:?}", err);
std::process::exit(1);
}
}
Command::Get { node_id } => match api::get_logical_node(&mut handle, node_id) {
Ok(node) => {
println!("node {}", node.node_id);
println!("full: {}", node.full.is_some());
println!("deltas: {}", node.deltas.len());
}
Err(err) => {
eprintln!("get failed: {:?}", err);
std::process::exit(1);
}
},
Command::Traverse { node_id } => match api::get_logical_node(&mut handle, node_id) {
Ok(node) => {
let adjacency = node.adjacency();
println!(
"{}",
adjacency
.iter()
.map(|n| n.to_string())
.collect::<Vec<_>>()
.join(",")
);
}
Err(err) => {
eprintln!("traverse failed: {:?}", err);
std::process::exit(1);
}
},
Command::Recover => match api::recover_from_wal(&mut handle) {
Ok(applied) => println!("recovered {} records", applied),
Err(err) => {
eprintln!("recover failed: {:?}", err);
std::process::exit(1);
}
},
Command::Stats => {
let report = api::report_metrics(&handle);
println!("write_amp: {:?}", report.write_amp);
println!("read_amp: {:?}", report.read_amp);
println!("space_amp: {:?}", report.space_amp);
}
Command::IngestEdge {
node_id,
version,
payload,
} => {
let mut pipeline = ingest::IngestPipeline::new(ingest::IngestConfig::default())
.expect("default ingest config should be valid");
let ack = ingest::ingest_event(
&mut pipeline,
&mut handle,
ingest::IngestEvent::AddEdgeDelta {
node_id,
version,
payload,
},
)
.unwrap_or_else(|err| {
ops::record_ingest(0, 1);
eprintln!("ingest-edge failed: {:?}", err);
std::process::exit(1);
});
let flushed = pipeline.flush_all(&mut handle).unwrap_or_else(|err| {
ops::record_ingest(0, 1);
eprintln!("ingest-edge flush failed: {:?}", err);
std::process::exit(1);
});
ops::record_ingest(ack.accepted as u64, ack.rejected as u64);
println!(
"accepted={} rejected={} flushed={} queue_depth={}",
ack.accepted,
ack.rejected,
ack.flushed + flushed,
pipeline.queue_depth()
);
}
Command::IngestFile { path, batch_size } => {
run_ingest_file_command(&mut handle, &path, batch_size);
}
Command::IngestVector {
node_id,
version,
payload,
} => {
let payload = parse_cli_vector_payload(&payload).unwrap_or_else(|err| {
ops::record_ingest(0, 1);
eprintln!("ingest-vector invalid payload: {}", err);
std::process::exit(1);
});
let mut pipeline = ingest::IngestPipeline::new(ingest::IngestConfig::default())
.expect("default ingest config should be valid");
let ack = ingest::ingest_event(
&mut pipeline,
&mut handle,
ingest::IngestEvent::UpdateVectorDelta {
node_id,
version,
payload,
},
)
.unwrap_or_else(|err| {
ops::record_ingest(0, 1);
eprintln!("ingest-vector failed: {:?}", err);
std::process::exit(1);
});
let flushed = pipeline.flush_all(&mut handle).unwrap_or_else(|err| {
ops::record_ingest(0, 1);
eprintln!("ingest-vector flush failed: {:?}", err);
std::process::exit(1);
});
ops::record_ingest(ack.accepted as u64, ack.rejected as u64);
println!(
"accepted={} rejected={} flushed={} queue_depth={}",
ack.accepted,
ack.rejected,
ack.flushed + flushed,
pipeline.queue_depth()
);
}
Command::IngestNode {
node_id,
version,
adjacency,
} => {
let mut pipeline = ingest::IngestPipeline::new(ingest::IngestConfig::default())
.expect("default ingest config should be valid");
let ack = ingest::ingest_event(
&mut pipeline,
&mut handle,
ingest::IngestEvent::InsertNode {
node_id,
version,
adjacency,
bitmap_terms: Vec::new(),
embedding_pending: false,
},
)
.unwrap_or_else(|err| {
ops::record_ingest(0, 1);
eprintln!("ingest-node failed: {:?}", err);
std::process::exit(1);
});
let flushed = pipeline.flush_all(&mut handle).unwrap_or_else(|err| {
ops::record_ingest(0, 1);
eprintln!("ingest-node flush failed: {:?}", err);
std::process::exit(1);
});
ops::record_ingest(ack.accepted as u64, ack.rejected as u64);
println!(
"accepted={} rejected={} flushed={} queue_depth={}",
ack.accepted,
ack.rejected,
ack.flushed + flushed,
pipeline.queue_depth()
);
}
Command::IngestBatchEdge {
start_node_id,
count,
version,
payload_prefix,
} => {
let mut pipeline = ingest::IngestPipeline::new(ingest::IngestConfig::default())
.expect("default ingest config should be valid");
let ack = ingest::ingest_edge_delta_range(
&mut pipeline,
&mut handle,
start_node_id,
count,
version,
&payload_prefix,
)
.unwrap_or_else(|err| {
ops::record_ingest(0, 1);
eprintln!("ingest-batch-edge failed: {:?}", err);
std::process::exit(1);
});
let flushed = pipeline.flush_all(&mut handle).unwrap_or_else(|err| {
ops::record_ingest(0, 1);
eprintln!("ingest-batch-edge flush failed: {:?}", err);
std::process::exit(1);
});
ops::record_ingest(ack.accepted as u64, ack.rejected as u64);
println!(
"accepted={} rejected={} flushed={} queue_depth={}",
ack.accepted,
ack.rejected,
ack.flushed + flushed,
pipeline.queue_depth()
);
}
Command::IngestBatchEdgeLoop {
start_node_id,
count,
rounds,
version,
payload_prefix,
} => {
let mut pipeline = ingest::IngestPipeline::new(ingest::IngestConfig::default())
.expect("default ingest config should be valid");
let mut accepted = 0usize;
let mut rejected = 0usize;
let mut flushed = 0usize;
for round in 0..rounds {
let round_start = start_node_id + (round * count);
let ack = ingest::ingest_edge_delta_range(
&mut pipeline,
&mut handle,
round_start,
count,
version,
&payload_prefix,
)
.unwrap_or_else(|err| {
ops::record_ingest(0, 1);
eprintln!("ingest-batch-edge-loop failed: {:?}", err);
std::process::exit(1);
});
accepted += ack.accepted;
rejected += ack.rejected;
flushed += ack.flushed;
}
let flushed_tail = pipeline.flush_all(&mut handle).unwrap_or_else(|err| {
ops::record_ingest(0, 1);
eprintln!("ingest-batch-edge-loop flush failed: {:?}", err);
std::process::exit(1);
});
ops::record_ingest(accepted as u64, rejected as u64);
println!(
"accepted={} rejected={} flushed={} queue_depth={}",
accepted,
rejected,
flushed + flushed_tail,
pipeline.queue_depth()
);
}
Command::CollectStats => {
let snapshot =
runtime::collect_planner_stats(&handle, &runtime::ExecuteParams::default());
if let Err(e) = runtime::set_planner_stats(snapshot.clone()) {
eprintln!("failed to set planner stats: {:?}", e);
std::process::exit(1);
}
println!("stats_version: {}", snapshot.stats_version);
println!("node_scan_base_cost: {}", snapshot.node_scan_base_cost);
println!("vector_scan_base_cost: {}", snapshot.vector_scan_base_cost);
println!(
"vector_selectivity_ppm: {}",
snapshot.vector_selectivity_ppm
);
println!("graph_selectivity_ppm: {}", snapshot.graph_selectivity_ppm);
println!("skew_penalty_cost: {}", snapshot.skew_penalty_cost);
println!("ttl_millis: {}", snapshot.ttl_millis);
}
Command::Parse { .. }
| Command::Help
| Command::Explain { .. }
| Command::Query { .. }
| Command::ContractReport { .. }
| Command::EvidenceReport { .. }
| Command::ServiceReport { .. }
| Command::ServiceValidate { .. }
| Command::ServiceServe { .. }
| Command::Health
| Command::Metrics
| Command::MetricsProm
| Command::Bench { .. } => {
unreachable!("handled in main")
}
Command::Status => {
println!("schema_version: {}", handle.manifest.schema_version);
println!("high_water_mark: {}", handle.manifest.high_water_mark);
println!("l0_runs: {}", handle.l0_runs.len());
println!("memtable_empty: {}", handle.memtable.is_empty());
println!("sstable_cache_entries: {}", handle.sstable_cache.len());
println!(
"pending_deltas_max: {}",
handle
.pending_deltas_per_node
.values()
.copied()
.max()
.unwrap_or(0)
);
println!(
"hnsw_total_vectors: {} hnsw_updated_vectors: {}",
handle.hnsw_total_vectors, handle.hnsw_updated_vectors
);
println!(
"last_hnsw_rebuild_reason: {}",
handle.last_hnsw_rebuild_reason.as_deref().unwrap_or("none")
);
}
Command::TriggerCompaction { level } => match api::compact(&mut handle, level) {
Ok(()) => println!("compaction triggered for level {:?}", level),
Err(err) => {
eprintln!("trigger-compaction failed: {:?}", err);
std::process::exit(1);
}
},
Command::Recluster { force } => {
if force {
println!(
"recluster requested (--force), but execution path is not implemented yet"
);
} else {
println!("recluster is a manual placeholder; use --force to acknowledge");
}
}
}
}
fn parse_cli_vector_payload(payload: &[u8]) -> Result<Vec<u8>, String> {
let raw = std::str::from_utf8(payload)
.map_err(|_| "vector payload must be utf8 comma-separated floats".to_string())?;
let values = raw
.split(',')
.map(str::trim)
.filter(|part| !part.is_empty())
.map(|part| {
part.parse::<f32>()
.map_err(|_| format!("invalid f32 component '{}'", part))
})
.collect::<Result<Vec<f32>, String>>()?;
if values.is_empty() {
return Err("vector payload must contain at least one float".to_string());
}
Ok(api::encode_vector_payload_f32(
1,
api::VectorMetric::Cosine,
&values,
false,
))
}