use std::collections::{HashMap, HashSet};
use std::path::PathBuf;
use std::sync::atomic::{AtomicUsize, Ordering};
use anyhow::Result;
use crossbeam_channel::Receiver;
use indicatif::ProgressBar;
use cqs::{Chunk, Embedding, Store};
use super::types::EmbeddedBatch;
use crate::cli::check_interrupted;
fn deferred_flush_interval() -> usize {
std::env::var("CQS_DEFERRED_FLUSH_INTERVAL")
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(50)
}
fn flush_calls(
store: &Store,
calls: Vec<(String, cqs::parser::CallSite)>,
) -> Vec<(String, cqs::parser::CallSite)> {
if calls.is_empty() {
return Vec::new();
}
let unique_ids: HashSet<&str> = calls.iter().map(|(id, _)| id.as_str()).collect();
let existing = match store.existing_chunk_ids(&unique_ids) {
Ok(set) => set,
Err(e) => {
tracing::warn!(error = %e, "Failed to check existing chunk IDs, retaining all deferred calls");
return calls;
}
};
let (ready, retained): (Vec<_>, Vec<_>) = calls
.into_iter()
.partition(|(id, _)| existing.contains(id.as_str()));
if !ready.is_empty() {
tracing::info!(
flushed = ready.len(),
retained = retained.len(),
"Periodic flush: deferred chunk calls"
);
if let Err(e) = store.upsert_calls_batch(&ready) {
tracing::warn!(
count = ready.len(),
error = %e,
"Periodic flush of deferred calls failed, items lost"
);
}
}
retained
}
fn flush_type_edges(store: &Store, edges: &[(PathBuf, Vec<cqs::parser::ChunkTypeRefs>)]) {
if edges.is_empty() {
return;
}
tracing::info!(files = edges.len(), "Periodic flush: deferred type edges");
if let Err(e) = store.upsert_type_edges_for_files(edges) {
tracing::warn!(
files = edges.len(),
error = %e,
"Periodic flush of deferred type edges failed"
);
}
}
pub(super) fn store_stage(
embed_rx: Receiver<EmbeddedBatch>,
store: &Store,
parsed_count: &AtomicUsize,
embedded_count: &AtomicUsize,
progress: &ProgressBar,
) -> Result<(usize, usize, usize, usize)> {
let _span = tracing::info_span!("store_stage").entered();
let mut total_embedded = 0;
let mut total_cached = 0;
let mut total_type_edges = 0;
let mut total_calls = 0;
let mut deferred_type_edges: Vec<(PathBuf, Vec<cqs::parser::ChunkTypeRefs>)> = Vec::new();
let mut deferred_chunk_calls: Vec<(String, cqs::parser::CallSite)> = Vec::new();
let mut batch_counter: usize = 0;
let flush_interval = deferred_flush_interval();
for batch in embed_rx {
if check_interrupted() {
break;
}
deferred_chunk_calls.extend(batch.relationships.chunk_calls);
let batch_count = batch.chunk_embeddings.len();
let no_calls: Vec<(String, cqs::parser::CallSite)> = Vec::new();
if batch.file_mtimes.len() <= 1 {
let mtime = batch.file_mtimes.values().next().copied();
store.upsert_chunks_and_calls(&batch.chunk_embeddings, mtime, &no_calls)?;
} else {
let mut by_file: HashMap<PathBuf, Vec<(Chunk, Embedding)>> = HashMap::new();
for (chunk, embedding) in batch.chunk_embeddings {
by_file
.entry(chunk.file.clone())
.or_default()
.push((chunk, embedding));
}
for (file, pairs) in &by_file {
let mtime = batch.file_mtimes.get(file.as_path()).copied();
store.upsert_chunks_and_calls(pairs, mtime, &no_calls)?;
}
}
for (file, function_calls) in &batch.relationships.function_calls {
for fc in function_calls {
total_calls += fc.calls.len();
}
if let Err(e) = store.upsert_function_calls(file, function_calls) {
tracing::warn!(
file = %file.display(),
error = %e,
"Failed to store function calls"
);
}
}
for (file, chunk_type_refs) in batch.relationships.type_refs {
for ctr in &chunk_type_refs {
total_type_edges += ctr.type_refs.len();
}
deferred_type_edges.push((file, chunk_type_refs));
}
total_embedded += batch_count;
total_cached += batch.cached_count;
let parsed = parsed_count.load(Ordering::Relaxed);
let embedded = embedded_count.load(Ordering::Relaxed);
progress.set_position(parsed as u64);
progress.set_message(format!(
"parsed:{} embedded:{} written:{}",
parsed, embedded, total_embedded
));
batch_counter += 1;
if batch_counter.is_multiple_of(flush_interval) {
deferred_chunk_calls = flush_calls(store, std::mem::take(&mut deferred_chunk_calls));
flush_type_edges(store, &deferred_type_edges);
deferred_type_edges.clear();
}
}
if !deferred_chunk_calls.is_empty() {
if let Err(e) = store.upsert_calls_batch(&deferred_chunk_calls) {
tracing::warn!(
count = deferred_chunk_calls.len(),
error = %e,
"Failed to store deferred chunk calls"
);
}
total_calls += deferred_chunk_calls.len();
}
if !deferred_type_edges.is_empty() {
if let Err(e) = store.upsert_type_edges_for_files(&deferred_type_edges) {
tracing::warn!(
files = deferred_type_edges.len(),
error = %e,
"Failed to store deferred type edges"
);
}
}
Ok((total_embedded, total_cached, total_type_edges, total_calls))
}