use std::collections::{hash_map::DefaultHasher, HashMap};
use std::hash::{Hash, Hasher};
use std::num::NonZeroUsize;
use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use std::time::Instant;
use lru::LruCache;
use serde::{Deserialize, Serialize};
use tokio::sync::RwLock;
use crate::core::bm25::Bm25Index;
use crate::core::chunker::{ChunkType, RawChunk};
use crate::core::embed::Embedder;
use crate::core::entity::RawEntity;
use crate::core::store::VectorStore;
use crate::core::symbol_graph::SymbolGraph;
pub(crate) mod archive;
pub(crate) mod docs_penalty;
mod files;
pub mod graph_score;
mod ingest;
mod persist;
mod search;
#[cfg(test)]
mod tests;
const QUERY_CACHE_CAPACITY: usize = 256;
pub(crate) const HNSW_OVERSAMPLE: usize = 4;
const DEFAULT_EMBEDDING_CACHE_CAP: usize = 1_000;
fn embedding_cache_cap() -> usize {
std::env::var("TRUSTY_EMBEDDING_CACHE")
.ok()
.and_then(|v| v.parse().ok())
.filter(|&n: &usize| n > 0)
.unwrap_or(DEFAULT_EMBEDDING_CACHE_CAP)
}
const DEFAULT_CHUNKS_IDLE_EVICT_SECS: u64 = 300;
pub(crate) fn idle_evict_secs() -> u64 {
match std::env::var("TRUSTY_CHUNKS_IDLE_EVICT_SECS") {
Ok(v) if !v.is_empty() => match v.parse::<u64>() {
Ok(n) => n,
Err(_) => {
tracing::warn!(
"indexer: TRUSTY_CHUNKS_IDLE_EVICT_SECS={v:?} is not a valid u64; \
using default ({DEFAULT_CHUNKS_IDLE_EVICT_SECS}s)"
);
DEFAULT_CHUNKS_IDLE_EVICT_SECS
}
},
_ => DEFAULT_CHUNKS_IDLE_EVICT_SECS,
}
}
const DEFAULT_MAX_CHUNKS_PER_INDEX: usize = 200_000;
pub(crate) fn max_chunks_per_index() -> usize {
std::env::var("TRUSTY_MAX_CHUNKS")
.ok()
.and_then(|v| v.parse().ok())
.filter(|&n: &usize| n > 0)
.unwrap_or(DEFAULT_MAX_CHUNKS_PER_INDEX)
}
const DEFAULT_EMBED_BATCH_SIZE: usize = 64;
const EMBED_BATCH_MIN: usize = 32;
const EMBED_BATCH_MAX: usize = 512;
pub(crate) fn embed_batch_size() -> usize {
std::env::var("TRUSTY_MAX_BATCH_SIZE")
.ok()
.and_then(|v| v.parse::<usize>().ok())
.filter(|&n| n > 0)
.map(|n| n.clamp(EMBED_BATCH_MIN, EMBED_BATCH_MAX))
.unwrap_or(DEFAULT_EMBED_BATCH_SIZE)
}
#[allow(dead_code)]
pub(crate) const KG_EXPAND_SCORE_FACTOR: f32 = 0.7;
pub(crate) const KG_EXPAND_HOPS: usize = 1;
pub(crate) const HNSW_SNAPSHOT_BATCH_INTERVAL: u32 = 16;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CodeChunk {
pub id: String,
pub file: String,
#[serde(default)]
pub language: Option<String>,
pub start_line: usize,
pub end_line: usize,
pub content: String,
pub function_name: Option<String>,
pub score: f32,
pub compact_snippet: Option<String>,
pub match_reason: String,
#[serde(default)]
pub chunk_type: ChunkType,
#[serde(default)]
pub calls: Vec<String>,
#[serde(default)]
pub inherits_from: Vec<String>,
#[serde(default)]
pub chunk_depth: u8,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub index_id: Option<String>,
#[serde(default)]
pub on_branch: bool,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub community_id: Option<u64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub archive_reason: Option<String>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
#[serde(rename_all = "lowercase")]
pub enum SearchMode {
#[default]
Code,
Text,
Data,
All,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SearchQuery {
pub text: String,
#[serde(default = "default_top_k")]
pub top_k: usize,
#[serde(default = "default_true")]
pub expand_graph: bool,
#[serde(default = "default_true")]
pub compact: bool,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub branch_files: Option<Vec<String>>,
#[serde(default = "SearchQuery::default_branch_boost")]
pub branch_boost: f32,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub branch: Option<String>,
#[serde(default)]
pub mode: SearchMode,
#[serde(default)]
pub exclude_archived: bool,
}
impl SearchQuery {
pub fn default_branch_boost() -> f32 {
1.5_f32
}
}
impl Default for SearchQuery {
fn default() -> Self {
Self {
text: String::new(),
top_k: default_top_k(),
expand_graph: true,
compact: true,
branch_files: None,
branch_boost: SearchQuery::default_branch_boost(),
branch: None,
mode: SearchMode::default(),
exclude_archived: false,
}
}
}
fn default_top_k() -> usize {
10
}
fn default_true() -> bool {
true
}
pub(crate) fn hash_query(query: &str) -> u64 {
let mut h = DefaultHasher::new();
query.hash(&mut h);
h.finish()
}
pub(crate) fn build_compact_snippet(content: &str) -> String {
let lines: Vec<&str> = content.lines().collect();
if lines.len() <= 7 {
return content.to_string();
}
lines[..7].join("\n")
}
pub(crate) fn raw_to_code_chunk(
raw: &RawChunk,
score: f32,
match_reason: &str,
compact_snippet: Option<String>,
) -> CodeChunk {
let chunk_depth: u8 = raw.chunk_depth.min(u8::MAX as usize) as u8;
CodeChunk {
id: raw.id.clone(),
file: raw.file.clone(),
language: raw.language.clone(),
start_line: raw.start_line,
end_line: raw.end_line,
content: raw.content.clone(),
function_name: raw.function_name.clone(),
score,
compact_snippet,
match_reason: match_reason.to_string(),
chunk_type: raw.chunk_type.clone(),
calls: raw.calls.clone(),
inherits_from: raw.inherits_from.clone(),
chunk_depth,
index_id: None,
on_branch: false,
community_id: None,
archive_reason: None,
}
}
pub(crate) fn populate_virtual_terms(chunks: &mut [RawChunk], entities: &[RawEntity]) {
for chunk in chunks.iter_mut() {
let mut seen: std::collections::HashSet<&str> = std::collections::HashSet::new();
let mut terms: Vec<String> = Vec::new();
for ent in entities {
if ent.line >= chunk.start_line
&& ent.line <= chunk.end_line
&& seen.insert(ent.text.as_str())
{
terms.push(ent.text.clone());
}
}
chunk.virtual_terms = terms;
}
}
pub(crate) fn file_type_score_multiplier(path: &str) -> f32 {
const DOC_EXTENSIONS: &[&str] = &[".md", ".txt", ".toml", ".yaml", ".yml", ".json"];
let lower = path.to_ascii_lowercase();
if DOC_EXTENSIONS.iter().any(|ext| lower.ends_with(ext)) {
0.5
} else {
1.0
}
}
pub(crate) fn compute_match_reason(in_v: bool, in_b: bool, in_kg: bool) -> &'static str {
match (in_v, in_b, in_kg) {
(true, true, _) => "hybrid",
(true, false, _) => "vector",
(false, true, _) => "bm25",
(false, false, true) => "hybrid+kg",
(false, false, false) => "fallback:ripgrep",
}
}
#[derive(Debug, Serialize, Deserialize)]
pub(crate) struct ChunkSnapshot {
pub(crate) version: u32,
pub(crate) chunks: Vec<RawChunk>,
pub(crate) entities: Vec<(String, Vec<RawEntity>)>,
}
#[derive(Default)]
pub struct ParsedBatch {
pub chunks: Vec<RawChunk>,
pub embeddings: Vec<Option<Vec<f32>>>,
pub entities_by_file: Vec<(String, Vec<RawEntity>)>,
pub parse_ms: u64,
pub embed_ms: u64,
pub vector_count: usize,
}
#[derive(Debug, Default, Clone, Copy)]
pub struct CommitTimings {
pub chunks: usize,
pub bm25_ms: u64,
pub vector_upsert_ms: u64,
pub kg_ms: u64,
}
pub struct CodeIndexer {
pub index_id: String,
pub root_path: std::path::PathBuf,
pub(super) embedder: Option<Arc<dyn Embedder>>,
pub(super) store: Option<Arc<dyn VectorStore>>,
pub(super) chunks: Arc<RwLock<HashMap<String, RawChunk>>>,
pub(super) corpus: Option<Arc<crate::core::corpus::CorpusStore>>,
pub(super) entities: Arc<RwLock<HashMap<String, Vec<RawEntity>>>>,
pub(super) chunk_embeddings: Arc<RwLock<LruCache<String, Vec<f32>>>>,
pub(super) bm25: Arc<RwLock<Bm25Index>>,
pub(super) query_cache: Arc<Mutex<LruCache<u64, Vec<f32>>>>,
pub(super) symbol_graph: Arc<RwLock<Arc<SymbolGraph>>>,
pub(super) ner: crate::core::ner::NerExtractor,
pub(super) persist_state: Arc<PersistState>,
pub(super) domain_terms: Vec<String>,
pub(super) created_at: Instant,
pub(super) last_activity_ms: Arc<AtomicU64>,
pub(super) chunks_evicted: Arc<AtomicBool>,
}
#[derive(Debug, Default)]
pub(crate) struct PersistState {
pub(crate) in_flight: AtomicBool,
pub(crate) dirty: AtomicBool,
pub(crate) batch_counter: AtomicU32,
}
impl CodeIndexer {
pub fn new(index_id: impl Into<String>, root_path: impl Into<std::path::PathBuf>) -> Self {
let cap =
NonZeroUsize::new(QUERY_CACHE_CAPACITY).expect("QUERY_CACHE_CAPACITY must be non-zero");
let emb_cap = NonZeroUsize::new(embedding_cache_cap())
.expect("embedding_cache_cap must be non-zero (env var filtered)");
Self {
index_id: index_id.into(),
root_path: root_path.into(),
embedder: None,
store: None,
corpus: None,
chunks: Arc::new(RwLock::new(HashMap::new())),
entities: Arc::new(RwLock::new(HashMap::new())),
chunk_embeddings: Arc::new(RwLock::new(LruCache::new(emb_cap))),
bm25: Arc::new(RwLock::new(Bm25Index::new())),
query_cache: Arc::new(Mutex::new(LruCache::new(cap))),
symbol_graph: Arc::new(RwLock::new(Arc::new(SymbolGraph::new()))),
ner: crate::core::ner::NerExtractor::try_load(),
persist_state: Arc::new(PersistState::default()),
domain_terms: Vec::new(),
created_at: Instant::now(),
last_activity_ms: Arc::new(AtomicU64::new(0)),
chunks_evicted: Arc::new(AtomicBool::new(false)),
}
}
pub(super) fn touch_activity(&self) {
let ms = self.created_at.elapsed().as_millis().min(u64::MAX as u128) as u64;
self.last_activity_ms.store(ms, Ordering::Relaxed);
}
fn idle_duration(&self) -> std::time::Duration {
let now_ms = self.created_at.elapsed().as_millis().min(u64::MAX as u128) as u64;
let last = self.last_activity_ms.load(Ordering::Relaxed);
std::time::Duration::from_millis(now_ms.saturating_sub(last))
}
pub async fn in_memory_chunk_count(&self) -> usize {
self.chunks.read().await.len()
}
pub async fn evict_chunks_if_idle(&self, idle_threshold: std::time::Duration) -> usize {
if idle_threshold.is_zero() {
return 0;
}
if self.corpus.is_none() {
return 0;
}
if self.idle_duration() < idle_threshold {
return 0;
}
let mut chunks = self.chunks.write().await;
if chunks.is_empty() {
return 0;
}
let evicted = chunks.len();
chunks.clear();
chunks.shrink_to_fit();
drop(chunks);
self.chunks_evicted.store(true, Ordering::Relaxed);
tracing::info!(
"index '{}': evicted {} in-memory chunks after {}s idle \
(durable corpus retained; lazily rehydrates on next access)",
self.index_id,
evicted,
idle_threshold.as_secs(),
);
evicted
}
pub(super) async fn ensure_chunks_loaded(&self) {
if !self.chunks_evicted.load(Ordering::Relaxed) {
return;
}
let Some(corpus) = self.corpus.clone() else {
self.chunks_evicted.store(false, Ordering::Relaxed);
return;
};
let index_id = self.index_id.clone();
let loaded = tokio::task::spawn_blocking(move || corpus.load_all_chunks()).await;
match loaded {
Ok(Ok(chunks)) => {
let n = chunks.len();
let mut map = self.chunks.write().await;
for chunk in chunks {
map.insert(chunk.id.clone(), chunk);
}
drop(map);
self.chunks_evicted.store(false, Ordering::Relaxed);
tracing::info!(
"index '{index_id}': rehydrated {n} chunks from redb after idle eviction"
);
}
Ok(Err(e)) => tracing::warn!(
"index '{index_id}': failed to rehydrate chunks from redb ({e}); \
will retry on next access"
),
Err(e) => tracing::warn!(
"index '{index_id}': chunk rehydration task panicked ({e}); \
will retry on next access"
),
}
}
pub fn with_domain_terms(mut self, terms: Vec<String>) -> Self {
self.domain_terms = terms;
self
}
pub fn set_domain_terms(&mut self, terms: Vec<String>) {
self.domain_terms = terms;
}
pub async fn snapshot_symbol_graph(&self) -> Arc<SymbolGraph> {
Arc::clone(&*self.symbol_graph.read().await)
}
pub fn corpus_store(&self) -> Option<Arc<crate::core::corpus::CorpusStore>> {
self.corpus.as_ref().map(Arc::clone)
}
pub fn with_components(
mut self,
embedder: Arc<dyn Embedder>,
store: Arc<dyn VectorStore>,
) -> Self {
self.embedder = Some(embedder);
self.store = Some(store);
self
}
pub fn set_corpus_store(&mut self, corpus: Arc<crate::core::corpus::CorpusStore>) {
self.corpus = Some(corpus);
}
pub fn swap_corpus_store(
&mut self,
corpus: Arc<crate::core::corpus::CorpusStore>,
) -> Option<Arc<crate::core::corpus::CorpusStore>> {
self.corpus.replace(corpus)
}
pub fn take_corpus_store(&mut self) -> Option<Arc<crate::core::corpus::CorpusStore>> {
self.corpus.take()
}
pub fn has_corpus_store(&self) -> bool {
self.corpus.is_some()
}
}