use std::path::{Path, PathBuf};
use anyhow::{Context, Result};
use serde::{Deserialize, Serialize};
use tantivy::collector::TopDocs;
use tantivy::query::QueryParser;
use tantivy::schema::*;
use tantivy::{doc, Index as TIndex, IndexReader, IndexWriter, ReloadPolicy, TantivyDocument, Term};
mod walker;
pub mod snapshot;
pub use walker::{walk_workspace, Candidate};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum Corpus {
Docs,
Code,
BenchHistory,
Changelog,
Config,
}
impl Corpus {
pub fn as_str(self) -> &'static str {
match self {
Corpus::Docs => "docs",
Corpus::Code => "code",
Corpus::BenchHistory => "bench_history",
Corpus::Changelog => "changelog",
Corpus::Config => "config",
}
}
pub fn parse(s: &str) -> Option<Self> {
match s {
"docs" => Some(Corpus::Docs),
"code" => Some(Corpus::Code),
"bench_history" | "bench-history" | "bench" => Some(Corpus::BenchHistory),
"changelog" => Some(Corpus::Changelog),
"config" => Some(Corpus::Config),
_ => None,
}
}
pub fn all() -> &'static [Corpus] {
&[
Corpus::Docs,
Corpus::Code,
Corpus::BenchHistory,
Corpus::Changelog,
Corpus::Config,
]
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Hit {
pub score: f32,
pub corpus: String,
pub repo: String,
pub path: String,
pub title: String,
pub snippet: String,
pub mtime: u64,
}
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
pub struct Stats {
pub total: u64,
pub by_corpus: Vec<(String, u64)>,
}
#[derive(Debug, Default, Clone, Serialize)]
pub struct BuildStats {
pub scanned: usize,
pub added: usize,
pub updated: usize,
pub skipped_unchanged: usize,
pub skipped_too_large: usize,
pub errors: usize,
}
const MAX_FILE_BYTES: u64 = 1_048_576;
const SNIPPET_BYTES: usize = 400;
const WRITER_HEAP: usize = 50_000_000;
struct Fields {
path: Field,
corpus: Field,
repo: Field,
title: Field,
body: Field,
mtime: Field,
snippet: Field,
}
fn build_schema() -> (Schema, Fields) {
let mut sb = Schema::builder();
let path = sb.add_text_field("path", STRING | STORED);
let corpus = sb.add_text_field("corpus", STRING | STORED);
let repo = sb.add_text_field("repo", STRING | STORED);
let title = sb.add_text_field("title", TEXT | STORED);
let body = sb.add_text_field("body", TEXT);
let mtime = sb.add_u64_field("mtime", STORED | INDEXED);
let snippet = sb.add_text_field("snippet", STORED);
(
sb.build(),
Fields { path, corpus, repo, title, body, mtime, snippet },
)
}
pub struct Index {
workspace_root: PathBuf,
index_dir: PathBuf,
tindex: TIndex,
reader: IndexReader,
fields: Fields,
repo_scope: Vec<String>,
}
impl Index {
pub fn open(workspace_root: &Path) -> Result<Self> {
Self::open_at(workspace_root, &workspace_root.join(".nornir/cache/index"))
}
pub fn with_repo_scope(mut self, repos: Vec<String>) -> Self {
self.repo_scope = repos;
self
}
pub fn open_at(workspace_root: &Path, index_dir: &Path) -> Result<Self> {
std::fs::create_dir_all(index_dir)
.with_context(|| format!("create {}", index_dir.display()))?;
let (schema, fields) = build_schema();
let dir = tantivy::directory::MmapDirectory::open(index_dir)?;
let tindex = if TIndex::exists(&dir)? {
TIndex::open_in_dir(index_dir)?
} else {
TIndex::create_in_dir(index_dir, schema)?
};
let reader = tindex
.reader_builder()
.reload_policy(ReloadPolicy::OnCommitWithDelay)
.try_into()?;
Ok(Self {
workspace_root: workspace_root.to_path_buf(),
index_dir: index_dir.to_path_buf(),
tindex,
reader,
fields,
repo_scope: Vec::new(),
})
}
pub fn open_or_restore(
workspace_root: &Path,
wh: &crate::warehouse::iceberg::IcebergWarehouse,
repo: &str,
sha: Option<&str>,
) -> Result<(Self, bool)> {
let index_dir = workspace_root.join(".nornir/cache/index");
Self::open_or_restore_at(workspace_root, &index_dir, wh, repo, sha)
}
pub fn open_or_restore_at(
workspace_root: &Path,
index_dir: &Path,
wh: &crate::warehouse::iceberg::IcebergWarehouse,
repo: &str,
sha: Option<&str>,
) -> Result<(Self, bool)> {
let needs_restore = !index_dir.join("meta.json").exists();
let mut restored = false;
if needs_restore {
match crate::index::snapshot::restore_from_iceberg(wh, repo, sha, index_dir) {
Ok(snap) => {
eprintln!(
"⏃ urðr: restored {} ({} blob(s), sha={}) into {}",
snap.snapshot_id,
snap.blob_count,
&snap.git_sha[..snap.git_sha.len().min(12)],
index_dir.display(),
);
restored = true;
}
Err(e) => {
eprintln!(
"⏃ urðr: no iceberg snapshot for `{repo}` ({e}); starting empty"
);
}
}
}
let idx = Self::open_at(workspace_root, index_dir)?;
Ok((idx, restored))
}
pub fn index_dir(&self) -> &Path { &self.index_dir }
pub fn build(&self) -> Result<BuildStats> {
let mut writer: IndexWriter = self.tindex.writer(WRITER_HEAP)?;
let mut stats = BuildStats::default();
for cand in walk_workspace(&self.workspace_root, &self.repo_scope) {
stats.scanned += 1;
match self.upsert(&mut writer, &cand) {
Ok(UpsertOutcome::Added) => stats.added += 1,
Ok(UpsertOutcome::Updated) => stats.updated += 1,
Ok(UpsertOutcome::SkippedUnchanged) => stats.skipped_unchanged += 1,
Ok(UpsertOutcome::SkippedTooLarge) => stats.skipped_too_large += 1,
Err(e) => {
eprintln!("index: {} :: {}", cand.path.display(), e);
stats.errors += 1;
}
}
}
writer.commit()?;
self.reader.reload()?;
Ok(stats)
}
pub fn index_files(&self, repo: &str, corpus: Corpus, files: &[PathBuf]) -> Result<BuildStats> {
let mut writer: IndexWriter = self.tindex.writer(WRITER_HEAP)?;
let mut stats = BuildStats::default();
for f in files {
if !f.is_file() {
continue;
}
let cand = Candidate { path: f.clone(), corpus, repo: repo.to_string() };
stats.scanned += 1;
match self.upsert(&mut writer, &cand) {
Ok(UpsertOutcome::Added) => stats.added += 1,
Ok(UpsertOutcome::Updated) => stats.updated += 1,
Ok(UpsertOutcome::SkippedUnchanged) => stats.skipped_unchanged += 1,
Ok(UpsertOutcome::SkippedTooLarge) => stats.skipped_too_large += 1,
Err(e) => {
eprintln!("docs-index: {} :: {}", f.display(), e);
stats.errors += 1;
}
}
}
writer.commit()?;
self.reader.reload()?;
Ok(stats)
}
fn upsert(&self, writer: &mut IndexWriter, cand: &Candidate) -> Result<UpsertOutcome> {
let meta = std::fs::metadata(&cand.path)?;
if meta.len() > MAX_FILE_BYTES {
return Ok(UpsertOutcome::SkippedTooLarge);
}
let mtime = meta
.modified()
.ok()
.and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok())
.map(|d| d.as_secs())
.unwrap_or(0);
let path_str = cand
.path
.strip_prefix(&self.workspace_root)
.unwrap_or(&cand.path)
.to_string_lossy()
.into_owned();
let prev = self.lookup_mtime(&path_str)?;
if prev == Some(mtime) {
return Ok(UpsertOutcome::SkippedUnchanged);
}
writer.delete_term(Term::from_field_text(self.fields.path, &path_str));
let body = std::fs::read_to_string(&cand.path).unwrap_or_default();
let title = cand
.path
.file_name()
.map(|s| s.to_string_lossy().into_owned())
.unwrap_or_default();
let snippet = make_snippet(&body);
writer.add_document(doc!(
self.fields.path => path_str,
self.fields.corpus => cand.corpus.as_str(),
self.fields.repo => cand.repo.clone(),
self.fields.title => title,
self.fields.body => body,
self.fields.mtime => mtime,
self.fields.snippet => snippet,
))?;
Ok(if prev.is_some() { UpsertOutcome::Updated } else { UpsertOutcome::Added })
}
fn lookup_mtime(&self, path: &str) -> Result<Option<u64>> {
let searcher = self.reader.searcher();
let parser = QueryParser::for_index(&self.tindex, vec![self.fields.path]);
let q = parser.parse_query(&format!("path:\"{}\"", escape_query(path)))?;
let top = searcher.search(&q, &TopDocs::with_limit(1))?;
if let Some((_, addr)) = top.first() {
let d: TantivyDocument = searcher.doc(*addr)?;
for v in d.get_all(self.fields.mtime) {
if let Some(n) = v.as_u64() {
return Ok(Some(n));
}
}
}
Ok(None)
}
pub fn search(
&self,
query: &str,
corpus: Option<Corpus>,
repo: Option<&str>,
limit: usize,
) -> Result<Vec<Hit>> {
let searcher = self.reader.searcher();
let mut parser =
QueryParser::for_index(&self.tindex, vec![self.fields.title, self.fields.body]);
parser.set_conjunction_by_default();
let mut q = format!("({})", query);
if let Some(c) = corpus {
q = format!("{} AND corpus:{}", q, c.as_str());
}
if let Some(r) = repo {
q = format!("{} AND repo:\"{}\"", q, escape_query(r));
}
let parsed = parser
.parse_query(&q)
.with_context(|| format!("parse query: {q}"))?;
let top = searcher.search(&parsed, &TopDocs::with_limit(limit.max(1)))?;
let mut out = Vec::with_capacity(top.len());
for (score, addr) in top {
let d: TantivyDocument = searcher.doc(addr)?;
out.push(Hit {
score,
corpus: first_text(&d, self.fields.corpus),
repo: first_text(&d, self.fields.repo),
path: first_text(&d, self.fields.path),
title: first_text(&d, self.fields.title),
snippet: first_text(&d, self.fields.snippet),
mtime: first_u64(&d, self.fields.mtime),
});
}
Ok(out)
}
pub fn stats(&self) -> Result<Stats> {
let searcher = self.reader.searcher();
let total = searcher.num_docs();
let mut by_corpus = Vec::new();
for c in Corpus::all() {
let parser = QueryParser::for_index(&self.tindex, vec![self.fields.corpus]);
let q = parser.parse_query(&format!("corpus:{}", c.as_str()))?;
let n = searcher.search(&q, &tantivy::collector::Count)?;
by_corpus.push((c.as_str().to_string(), n as u64));
}
Ok(Stats { total, by_corpus })
}
}
enum UpsertOutcome {
Added,
Updated,
SkippedUnchanged,
SkippedTooLarge,
}
fn make_snippet(body: &str) -> String {
let mut s = String::with_capacity(SNIPPET_BYTES.min(body.len()));
for (i, ch) in body.char_indices() {
if i >= SNIPPET_BYTES { break; }
if ch == '\n' || ch == '\r' { s.push(' '); } else { s.push(ch); }
}
s
}
fn escape_query(s: &str) -> String {
s.replace('\\', "\\\\").replace('"', "\\\"")
}
fn first_text(d: &TantivyDocument, f: Field) -> String {
d.get_first(f)
.and_then(|v| v.as_str().map(|s| s.to_string()))
.unwrap_or_default()
}
fn first_u64(d: &TantivyDocument, f: Field) -> u64 {
d.get_first(f).and_then(|v| v.as_u64()).unwrap_or(0)
}
#[cfg(test)]
mod tests {
use super::*;
use std::fs;
fn write(p: &Path, body: &str) {
if let Some(parent) = p.parent() { fs::create_dir_all(parent).unwrap(); }
fs::write(p, body).unwrap();
}
#[test]
fn build_and_search_basic() {
let tmp = tempfile::tempdir().unwrap();
let root = tmp.path();
write(&root.join("holger/README.md"),
"# Holger\n\nthe central artifact server with a special compression mode.");
write(&root.join("znippy/src/lib.rs"),
"//! znippy: fast parallel compression library.\nfn compress() {}");
write(&root.join("znippy/bench_history.jsonl"),
"{\"date\":\"2026-05-24\",\"version\":\"0.7.2\",\"machine\":\"x\",\"cores\":1,\"results\":[]}\n");
write(&root.join("holger/CHANGELOG.md"),
"# Changelog\n## 0.1.0\n- initial release of compression bits");
write(&root.join("holger/Cargo.toml"), "[package]\nname=\"holger\"\n");
let idx = Index::open(root).unwrap();
let s = idx.build().unwrap();
assert_eq!(s.added, 5, "added: {s:?}");
assert_eq!(s.errors, 0);
let hits = idx.search("compression", None, None, 10).unwrap();
assert!(!hits.is_empty(), "expected hits for 'compression'");
assert!(hits.iter().any(|h| h.path.ends_with("README.md")));
let docs_only = idx.search("compression", Some(Corpus::Docs), None, 10).unwrap();
assert!(docs_only.iter().all(|h| h.corpus == "docs"));
let code_only = idx.search("compress", Some(Corpus::Code), None, 10).unwrap();
assert!(code_only.iter().any(|h| h.path.ends_with("lib.rs")));
let by_repo = idx.search("compression", None, Some("holger"), 10).unwrap();
assert!(by_repo.iter().all(|h| h.repo == "holger"));
}
#[test]
fn incremental_skips_unchanged() {
let tmp = tempfile::tempdir().unwrap();
let root = tmp.path();
write(&root.join("holger/README.md"), "# Holger\n\nbody");
let idx = Index::open(root).unwrap();
let s1 = idx.build().unwrap();
assert_eq!(s1.added, 1);
let s2 = idx.build().unwrap();
assert_eq!(s2.added, 0, "stats: {s2:?}");
assert_eq!(s2.skipped_unchanged, 1);
}
#[test]
fn stats_per_corpus() {
let tmp = tempfile::tempdir().unwrap();
let root = tmp.path();
write(&root.join("holger/README.md"), "# x");
write(&root.join("holger/CHANGELOG.md"), "# x");
write(&root.join("holger/src/lib.rs"), "fn x(){}");
let idx = Index::open(root).unwrap();
idx.build().unwrap();
let st = idx.stats().unwrap();
assert_eq!(st.total, 3);
let map: std::collections::HashMap<_, _> =
st.by_corpus.iter().cloned().collect();
assert_eq!(map["docs"], 1);
assert_eq!(map["changelog"], 1);
assert_eq!(map["code"], 1);
}
#[test]
fn repo_scope_and_ignores_prune_the_walk() {
let tmp = tempfile::tempdir().unwrap();
let root = tmp.path();
write(&root.join("holger/README.md"), "# holger compression");
write(&root.join("znippy/src/lib.rs"), "fn compress() {}");
write(&root.join("scratch/notes.md"), "# random unrelated scratch");
write(&root.join("WORKSPACE.md"), "# workspace root doc");
write(&root.join("holger/.venv/site-packages/pkg/mod.py"), "x = 1");
write(&root.join("holger/.venv/lib/thing.py"), "y = 2");
let idx = Index::open(root)
.unwrap()
.with_repo_scope(vec!["holger".into(), "znippy".into()]);
idx.build().unwrap();
let scratch = idx.search("scratch", None, None, 10).unwrap();
assert!(scratch.is_empty(), "scratch dir should be out of scope: {scratch:?}");
let venv = idx.search("site-packages", None, None, 10).unwrap();
assert!(venv.is_empty(), ".venv must be ignored");
let ws = idx.search("workspace", None, None, 10).unwrap();
assert!(ws.iter().any(|h| h.path.ends_with("WORKSPACE.md")), "root file kept");
let stats = idx.stats().unwrap();
assert_eq!(stats.total, 3, "holger README + znippy lib + WORKSPACE.md");
}
#[test]
fn empty_scope_walks_everything() {
let tmp = tempfile::tempdir().unwrap();
let root = tmp.path();
write(&root.join("holger/README.md"), "# a");
write(&root.join("scratch/notes.md"), "# b");
let idx = Index::open(root).unwrap();
idx.build().unwrap();
assert_eq!(idx.stats().unwrap().total, 2);
}
}