use std::path::Path;
use iridium::features::ingest;
use iridium::features::ops;
use iridium::features::storage::api;
pub(super) fn run_ingest_file_command(
handle: &mut api::StorageHandle,
path: &Path,
batch_size: usize,
) {
let contents = std::fs::read_to_string(path).unwrap_or_else(|err| {
eprintln!("ingest --file read failed ({}): {}", path.display(), err);
std::process::exit(1);
});
let mut pipeline = ingest::IngestPipeline::new(ingest::IngestConfig {
max_queue_depth: 16384,
max_batch_size: batch_size,
})
.expect("ingest file config should be valid");
let mut accepted = 0usize;
let mut rejected = 0usize;
let mut parse_errors = 0usize;
let mut batch = Vec::with_capacity(batch_size);
for (line_no, line) in contents.lines().enumerate() {
let trimmed = line.trim();
if trimmed.is_empty() {
continue;
}
let event = match parse_ingest_event_jsonl(trimmed) {
Ok(event) => event,
Err(err) => {
parse_errors += 1;
eprintln!("ingest --file parse error at line {}: {}", line_no + 1, err);
continue;
}
};
batch.push(event);
if batch.len() >= batch_size {
match ingest::ingest_batch(&mut pipeline, handle, &batch) {
Ok(ack) => {
accepted += ack.accepted;
rejected += ack.rejected;
}
Err(err) => {
eprintln!(
"ingest --file batch failed at line {}: {:?}",
line_no + 1,
err
);
std::process::exit(1);
}
}
batch.clear();
}
}
if !batch.is_empty() {
match ingest::ingest_batch(&mut pipeline, handle, &batch) {
Ok(ack) => {
accepted += ack.accepted;
rejected += ack.rejected;
}
Err(err) => {
eprintln!("ingest --file final batch failed: {:?}", err);
std::process::exit(1);
}
}
}
let flushed = pipeline.flush_all(handle).unwrap_or_else(|err| {
eprintln!("ingest --file flush failed: {:?}", err);
std::process::exit(1);
});
ops::record_ingest(accepted as u64, rejected as u64 + parse_errors as u64);
println!(
"accepted={} rejected={} parse_errors={} flushed={} queue_depth={}",
accepted,
rejected,
parse_errors,
flushed,
pipeline.queue_depth()
);
}
fn parse_ingest_event_jsonl(line: &str) -> std::result::Result<ingest::IngestEvent, String> {
let event_type =
extract_json_string(line, "type").ok_or_else(|| "missing field 'type'".to_string())?;
let node_id =
extract_json_u64(line, "node_id").ok_or_else(|| "missing field 'node_id'".to_string())?;
let version =
extract_json_u64(line, "version").ok_or_else(|| "missing field 'version'".to_string())?;
match event_type.as_str() {
"InsertNode" | "insert_node" => {
let adjacency = extract_json_u64_array(line, "adjacency")
.ok_or_else(|| "missing/invalid field 'adjacency'".to_string())?;
Ok(ingest::IngestEvent::InsertNode {
node_id,
version,
adjacency,
bitmap_terms: Vec::new(),
embedding_pending: false,
})
}
"AddEdgeDelta" | "add_edge_delta" => {
let payload = extract_json_string(line, "payload")
.ok_or_else(|| "missing field 'payload'".to_string())?;
Ok(ingest::IngestEvent::AddEdgeDelta {
node_id,
version,
payload: payload.into_bytes(),
})
}
"UpdateVectorDelta" | "update_vector_delta" => {
let payload = extract_json_string(line, "payload")
.ok_or_else(|| "missing field 'payload'".to_string())?;
Ok(ingest::IngestEvent::UpdateVectorDelta {
node_id,
version,
payload: parse_vector_payload_string(&payload)?,
})
}
other => Err(format!("unsupported type '{}'", other)),
}
}
fn parse_vector_payload_string(payload: &str) -> Result<Vec<u8>, String> {
let values = payload
.split(',')
.map(str::trim)
.filter(|part| !part.is_empty())
.map(|part| {
part.parse::<f32>()
.map_err(|_| format!("invalid vector component '{}'", part))
})
.collect::<Result<Vec<f32>, String>>()?;
if values.is_empty() {
return Err("vector payload must contain at least one float".to_string());
}
Ok(api::encode_vector_payload_f32(
1,
api::VectorMetric::Cosine,
&values,
false,
))
}
fn extract_json_string(line: &str, key: &str) -> Option<String> {
let needle = format!("\"{}\"", key);
let key_idx = line.find(&needle)?;
let rest = &line[key_idx + needle.len()..];
let colon_idx = rest.find(':')?;
let mut value = rest[colon_idx + 1..].trim_start();
if !value.starts_with('"') {
return None;
}
value = &value[1..];
let end = value.find('"')?;
Some(value[..end].to_string())
}
fn extract_json_u64(line: &str, key: &str) -> Option<u64> {
let needle = format!("\"{}\"", key);
let key_idx = line.find(&needle)?;
let rest = &line[key_idx + needle.len()..];
let colon_idx = rest.find(':')?;
let value = rest[colon_idx + 1..]
.trim_start()
.chars()
.take_while(|c| c.is_ascii_digit())
.collect::<String>();
if value.is_empty() {
return None;
}
value.parse::<u64>().ok()
}
fn extract_json_u64_array(line: &str, key: &str) -> Option<Vec<u64>> {
let needle = format!("\"{}\"", key);
let key_idx = line.find(&needle)?;
let rest = &line[key_idx + needle.len()..];
let colon_idx = rest.find(':')?;
let arr = rest[colon_idx + 1..].trim_start();
let open = arr.find('[')?;
let close = arr[open + 1..].find(']')? + open + 1;
let body = arr[open + 1..close].trim();
if body.is_empty() {
return Some(Vec::new());
}
let mut out = Vec::new();
for token in body.split(',') {
out.push(token.trim().parse::<u64>().ok()?);
}
Some(out)
}