Skip to main content

llm_wiki/
index_manager.rs

1use std::path::{Path, PathBuf};
2use std::sync::RwLock;
3use std::sync::atomic::{AtomicU64, Ordering};
4
5use anyhow::{Context, Result};
6use chrono::Utc;
7use git2::Delta;
8use serde::{Deserialize, Serialize};
9use tantivy::{
10    Index, IndexReader, IndexWriter, Searcher, Term, collector::TopDocs, directory::MmapDirectory,
11    query::AllQuery,
12};
13use walkdir::WalkDir;
14
15use crate::frontmatter;
16use crate::git;
17use crate::index_schema::IndexSchema;
18use crate::links;
19use crate::slug::Slug;
20use crate::type_registry::SpaceTypeRegistry;
21
22// ── Return types ──────────────────────────────────────────────────────────────
23
24/// Result of a full index rebuild.
25#[derive(Debug, Clone, Serialize, Deserialize)]
26pub struct IndexReport {
27    /// Name of the wiki that was indexed.
28    pub wiki: String,
29    /// Number of pages successfully added to the index.
30    pub pages_indexed: usize,
31    /// Number of files that were skipped due to read errors or invalid paths.
32    pub skipped: usize,
33    /// Wall-clock time taken for the rebuild in milliseconds.
34    pub duration_ms: u64,
35}
36
37/// Result of an incremental index update.
38#[derive(Debug, Clone, Serialize, Deserialize, Default)]
39pub struct UpdateReport {
40    /// Number of pages added or re-indexed.
41    pub updated: usize,
42    /// Number of pages removed from the index.
43    pub deleted: usize,
44}
45
46/// Current health snapshot of a wiki's search index.
47#[derive(Debug, Clone, Serialize, Deserialize)]
48pub struct IndexStatus {
49    /// Wiki name.
50    pub wiki: String,
51    /// Absolute path to the search-index directory.
52    pub path: String,
53    /// ISO-8601 timestamp of the last successful build, or None if never built.
54    pub built: Option<String>,
55    /// Number of pages in the index.
56    pub pages: usize,
57    /// Number of section pages in the index.
58    pub sections: usize,
59    /// True if the index is behind the current HEAD commit or schema.
60    pub stale: bool,
61    /// True if the index directory can be opened by Tantivy.
62    pub openable: bool,
63    /// True if the index can be queried (reader opened successfully).
64    pub queryable: bool,
65}
66
67/// Classification of index staleness used to choose the cheapest rebuild strategy.
68#[derive(Debug, Clone, PartialEq, Eq)]
69pub enum StalenessKind {
70    /// Index matches current HEAD and schema — no rebuild needed.
71    Current,
72    /// HEAD commit changed but schema is unchanged — incremental update sufficient.
73    CommitChanged,
74    /// Only specific type schemas changed — partial rebuild of those types sufficient.
75    TypesChanged(Vec<String>),
76    /// Schema changed in a way that requires a full rebuild.
77    FullRebuildNeeded,
78}
79
80// ── state.toml ────────────────────────────────────────────────────────────────
81
82/// Persisted state written to `state.toml` alongside the Tantivy index.
83#[derive(Debug, Clone, Serialize, Deserialize)]
84pub struct IndexState {
85    /// SHA-256 hash of the combined type registry, used for schema staleness detection.
86    #[serde(default)]
87    pub schema_hash: String,
88    /// ISO-8601 timestamp of when the index was last successfully built.
89    pub built: String,
90    /// Number of pages in the index at last build.
91    pub pages: usize,
92    /// Number of section pages in the index at last build.
93    pub sections: usize,
94    /// Git HEAD commit hash at the time of the last build.
95    pub commit: String,
96    /// Per-type content hashes at last build (type name → hash).
97    #[serde(default)]
98    pub types: std::collections::HashMap<String, String>,
99}
100
101// ── SpaceIndexManager ─────────────────────────────────────────────────────────
102
103struct IndexInner {
104    tantivy_index: Option<Index>,
105    index_reader: Option<IndexReader>,
106    generation: AtomicU64,
107}
108
109/// Tantivy index lifecycle manager for a single wiki space.
110pub struct SpaceIndexManager {
111    wiki_name: String,
112    index_path: PathBuf,
113    inner: RwLock<IndexInner>,
114}
115
116impl SpaceIndexManager {
117    /// Create a new `SpaceIndexManager` for `wiki_name` with its index stored at `index_path`.
118    pub fn new(wiki_name: impl Into<String>, index_path: impl Into<PathBuf>) -> Self {
119        Self {
120            wiki_name: wiki_name.into(),
121            index_path: index_path.into(),
122            inner: RwLock::new(IndexInner {
123                tantivy_index: None,
124                index_reader: None,
125                generation: AtomicU64::new(0),
126            }),
127        }
128    }
129
130    /// Return the absolute path to the index directory.
131    pub fn index_path(&self) -> &Path {
132        &self.index_path
133    }
134
135    /// Return the wiki name this manager is associated with.
136    pub fn wiki_name(&self) -> &str {
137        &self.wiki_name
138    }
139
140    /// Return the current generation counter value.
141    /// Incremented on every successful `reload_reader()` call.
142    pub fn generation(&self) -> u64 {
143        self.inner
144            .read()
145            .unwrap()
146            .generation
147            .load(Ordering::Acquire)
148    }
149
150    /// Open the index from disk and hold the reader.
151    /// Call after rebuild/staleness check. Recovery: if open fails and
152    /// wiki_root/repo_root/registry are provided, rebuild and retry.
153    pub fn open(
154        &self,
155        is: &IndexSchema,
156        recovery: Option<(&Path, &Path, &SpaceTypeRegistry)>,
157    ) -> Result<()> {
158        let search_dir = self.index_path.join("search-index");
159
160        let try_open = || -> Result<Index> {
161            let dir = MmapDirectory::open(&search_dir)?;
162            Ok(Index::open(dir)?)
163        };
164
165        let index = match try_open() {
166            Ok(idx) => idx,
167            Err(e) => {
168                if let Some((wiki_root, repo_root, registry)) = recovery {
169                    tracing::warn!(
170                        wiki = %self.wiki_name,
171                        error = %e,
172                        "index corrupt, rebuilding",
173                    );
174                    if search_dir.exists() {
175                        let _ = std::fs::remove_dir_all(&search_dir);
176                    }
177                    self.rebuild(wiki_root, repo_root, is, registry)?;
178                    try_open().context("index still corrupt after rebuild")?
179                } else {
180                    return Err(e);
181                }
182            }
183        };
184
185        let reader = index
186            .reader_builder()
187            .reload_policy(tantivy::ReloadPolicy::Manual)
188            .try_into()?;
189        let mut inner = self
190            .inner
191            .write()
192            .map_err(|_| anyhow::anyhow!("index lock poisoned"))?;
193        inner.tantivy_index = Some(index);
194        inner.index_reader = Some(reader);
195        Ok(())
196    }
197
198    /// Get a searcher. Cheap — arc clone of current segment set.
199    pub fn searcher(&self) -> Result<Searcher> {
200        let inner = self
201            .inner
202            .read()
203            .map_err(|_| anyhow::anyhow!("index lock poisoned"))?;
204        inner
205            .index_reader
206            .as_ref()
207            .ok_or_else(|| anyhow::anyhow!("index not open"))
208            .map(|r| r.searcher())
209    }
210
211    /// Reload the held IndexReader so searchers see the latest commit.
212    /// No-op if the reader is not yet open. Safe to call after every write.
213    fn reload_reader(&self) -> Result<()> {
214        let inner = self
215            .inner
216            .read()
217            .map_err(|_| anyhow::anyhow!("index lock poisoned"))?;
218        if let Some(ref r) = inner.index_reader {
219            r.reload()?;
220        }
221        inner.generation.fetch_add(1, Ordering::AcqRel);
222        Ok(())
223    }
224
225    /// Get a writer from the held index, or open from disk if not held.
226    fn writer(&self) -> Result<IndexWriter> {
227        let inner = self
228            .inner
229            .read()
230            .map_err(|_| anyhow::anyhow!("index lock poisoned"))?;
231        if let Some(ref idx) = inner.tantivy_index {
232            Ok(idx.writer(50_000_000)?)
233        } else {
234            drop(inner);
235            let search_dir = self.index_path.join("search-index");
236            let dir = MmapDirectory::open(&search_dir)
237                .with_context(|| format!("failed to open index dir: {}", search_dir.display()))?;
238            let index = Index::open(dir).context("failed to open index")?;
239            Ok(index.writer(50_000_000)?)
240        }
241    }
242
243    /// Return the git commit hash recorded in `state.toml` at the last index build, if any.
244    pub fn last_commit(&self) -> Option<String> {
245        let state_path = self.index_path.join("state.toml");
246        let content = std::fs::read_to_string(&state_path).ok()?;
247        let state: IndexState = toml::from_str(&content).ok()?;
248        if state.commit.is_empty() {
249            None
250        } else {
251            Some(state.commit)
252        }
253    }
254
255    /// Rebuild the full index by walking all Markdown files under `wiki_root`.
256    pub fn rebuild(
257        &self,
258        wiki_root: &Path,
259        repo_root: &Path,
260        is: &IndexSchema,
261        registry: &SpaceTypeRegistry,
262    ) -> Result<IndexReport> {
263        let start = std::time::Instant::now();
264
265        let search_dir = self.index_path.join("search-index");
266        std::fs::create_dir_all(&search_dir)?;
267
268        // Always open_or_create for rebuild (schema may have changed)
269        let dir = MmapDirectory::open(&search_dir)
270            .with_context(|| format!("failed to open index dir: {}", search_dir.display()))?;
271        let index = Index::open_or_create(dir, is.schema.clone())?;
272        let mut writer: IndexWriter = index.writer(50_000_000)?;
273        writer.delete_all_documents()?;
274
275        let mut pages = 0usize;
276        let mut sections = 0usize;
277        let mut skipped = 0usize;
278
279        for entry in WalkDir::new(wiki_root).into_iter().filter_map(|e| e.ok()) {
280            let path = entry.path();
281            if !path.is_file() || path.extension().and_then(|e| e.to_str()) != Some("md") {
282                continue;
283            }
284
285            let content = match std::fs::read_to_string(path) {
286                Ok(c) => c,
287                Err(e) => {
288                    tracing::warn!(path = %path.display(), error = %e, "skipping unreadable file");
289                    skipped += 1;
290                    continue;
291                }
292            };
293
294            let slug = match Slug::from_path(path, wiki_root) {
295                Ok(s) => s,
296                Err(e) => {
297                    tracing::warn!(path = %path.display(), error = %e, "skipping invalid path");
298                    skipped += 1;
299                    continue;
300                }
301            };
302            let uri = format!("wiki://{}/{slug}", self.wiki_name);
303            let page = frontmatter::parse(&content);
304
305            writer.add_document(index_page(is, registry, slug.as_str(), &uri, &page))?;
306
307            if page.page_type() == Some("section") {
308                sections += 1;
309            }
310            pages += 1;
311        }
312
313        writer.commit()?;
314        self.reload_reader()?;
315
316        let commit = git::current_head(repo_root).unwrap_or_default();
317        let state = IndexState {
318            schema_hash: registry.schema_hash().to_string(),
319            built: Utc::now().to_rfc3339(),
320            pages,
321            sections,
322            commit,
323            types: registry.type_hashes().clone(),
324        };
325        std::fs::write(
326            self.index_path.join("state.toml"),
327            toml::to_string_pretty(&state)?,
328        )?;
329
330        Ok(IndexReport {
331            wiki: self.wiki_name.clone(),
332            pages_indexed: pages,
333            skipped,
334            duration_ms: start.elapsed().as_millis() as u64,
335        })
336    }
337
338    /// Incrementally update the index for files changed since `last_indexed_commit`.
339    pub fn update(
340        &self,
341        wiki_root: &Path,
342        repo_root: &Path,
343        last_indexed_commit: Option<&str>,
344        is: &IndexSchema,
345        registry: &SpaceTypeRegistry,
346    ) -> Result<UpdateReport> {
347        let changes = git::collect_changed_files(repo_root, wiki_root, last_indexed_commit)?;
348        if changes.is_empty() {
349            return Ok(UpdateReport::default());
350        }
351
352        let mut writer = self.writer()?;
353
354        let f_slug = is.field("slug");
355        let wiki_prefix = wiki_root
356            .strip_prefix(repo_root)
357            .unwrap_or(Path::new("wiki"));
358        let mut updated = 0;
359        let mut deleted = 0;
360
361        for (path, status) in &changes {
362            let slug = match Slug::from_path(path, wiki_prefix) {
363                Ok(s) => s,
364                Err(e) => {
365                    tracing::warn!(path = %path.display(), error = %e, "skipping invalid path in update");
366                    continue;
367                }
368            };
369
370            writer.delete_term(Term::from_field_text(f_slug, slug.as_str()));
371
372            if *status == Delta::Deleted {
373                deleted += 1;
374            } else {
375                let full_path = repo_root.join(path);
376                if let Ok(content) = std::fs::read_to_string(&full_path) {
377                    let page = frontmatter::parse(&content);
378                    let uri = format!("wiki://{}/{slug}", self.wiki_name);
379                    writer.add_document(index_page(is, registry, slug.as_str(), &uri, &page))?;
380                    updated += 1;
381                }
382            }
383        }
384
385        writer.commit()?;
386        self.reload_reader()?;
387        Ok(UpdateReport { updated, deleted })
388    }
389
390    /// Return the current index health status (staleness, page count, openability).
391    pub fn status(&self, repo_root: &Path) -> Result<IndexStatus> {
392        let state_path = self.index_path.join("state.toml");
393        let search_dir = self.index_path.join("search-index");
394
395        let (built, pages, sections, stale) = if state_path.exists() {
396            match std::fs::read_to_string(&state_path)
397                .ok()
398                .and_then(|c| toml::from_str::<IndexState>(&c).ok())
399            {
400                Some(state) => {
401                    let head = git::current_head(repo_root).unwrap_or_default();
402                    let (current_schema_hash, _) =
403                        crate::type_registry::compute_disk_hashes(repo_root).unwrap_or_default();
404                    let stale = state.commit != head || state.schema_hash != current_schema_hash;
405                    (Some(state.built), state.pages, state.sections, stale)
406                }
407                None => (None, 0, 0, true),
408            }
409        } else {
410            (None, 0, 0, true)
411        };
412
413        let (openable, queryable) = if search_dir.exists() {
414            let try_open = || -> std::result::Result<Index, Box<dyn std::error::Error>> {
415                let dir = MmapDirectory::open(&search_dir)?;
416                Ok(Index::open(dir)?)
417            };
418            match try_open() {
419                Ok(index) => {
420                    let queryable = index
421                        .reader_builder()
422                        .reload_policy(tantivy::ReloadPolicy::Manual)
423                        .try_into()
424                        .map(|r: IndexReader| {
425                            r.searcher()
426                                .search(&AllQuery, &TopDocs::with_limit(1).order_by_score())
427                                .is_ok()
428                        })
429                        .unwrap_or(false);
430                    (true, queryable)
431                }
432                Err(_) => (false, false),
433            }
434        } else {
435            (false, false)
436        };
437
438        Ok(IndexStatus {
439            wiki: self.wiki_name.clone(),
440            path: search_dir.to_string_lossy().into(),
441            built,
442            pages,
443            sections,
444            stale,
445            openable,
446            queryable,
447        })
448    }
449
450    /// Delete all index documents whose `type` field equals `type_name`.
451    pub fn delete_by_type(&self, is: &IndexSchema, type_name: &str) -> Result<()> {
452        let mut writer = self.writer()?;
453        let f_type = is.field("type");
454        writer.delete_term(Term::from_field_text(f_type, type_name));
455        writer.commit()?;
456        self.reload_reader()?;
457        Ok(())
458    }
459
460    /// Determine what kind of staleness exists.
461    pub fn staleness_kind(&self, repo_root: &Path) -> Result<StalenessKind> {
462        let state_path = self.index_path.join("state.toml");
463        let state = match std::fs::read_to_string(&state_path)
464            .ok()
465            .and_then(|c| toml::from_str::<IndexState>(&c).ok())
466        {
467            Some(s) => s,
468            None => return Ok(StalenessKind::FullRebuildNeeded),
469        };
470
471        let head = git::current_head(repo_root).unwrap_or_default();
472        let (current_schema_hash, current_types) =
473            crate::type_registry::compute_disk_hashes(repo_root).unwrap_or_default();
474
475        if state.commit == head && state.schema_hash == current_schema_hash {
476            return Ok(StalenessKind::Current);
477        }
478
479        if state.schema_hash == current_schema_hash {
480            return Ok(StalenessKind::CommitChanged);
481        }
482
483        // Schema hash differs — check per-type
484        let mut changed = Vec::new();
485        for (name, hash) in &state.types {
486            match current_types.get(name) {
487                Some(h) if h != hash => changed.push(name.clone()),
488                None => changed.push(name.clone()),
489                _ => {}
490            }
491        }
492        for name in current_types.keys() {
493            if !state.types.contains_key(name) {
494                changed.push(name.clone());
495            }
496        }
497
498        if changed.is_empty() {
499            Ok(StalenessKind::FullRebuildNeeded)
500        } else {
501            changed.sort();
502            Ok(StalenessKind::TypesChanged(changed))
503        }
504    }
505
506    /// Re-index only pages of the specified types.
507    pub fn rebuild_types(
508        &self,
509        types: &[String],
510        wiki_root: &Path,
511        repo_root: &Path,
512        is: &IndexSchema,
513        registry: &SpaceTypeRegistry,
514    ) -> Result<IndexReport> {
515        let start = std::time::Instant::now();
516        let mut writer = self.writer()?;
517        let f_type = is.field("type");
518
519        // Delete all documents of the changed types
520        for type_name in types {
521            writer.delete_term(Term::from_field_text(f_type, type_name));
522        }
523
524        // Re-index pages matching those types
525        let type_set: std::collections::HashSet<&str> = types.iter().map(|s| s.as_str()).collect();
526        let mut pages = 0usize;
527        let mut skipped = 0usize;
528
529        for entry in WalkDir::new(wiki_root).into_iter().filter_map(|e| e.ok()) {
530            let path = entry.path();
531            if !path.is_file() || path.extension().and_then(|e| e.to_str()) != Some("md") {
532                continue;
533            }
534            let content = match std::fs::read_to_string(path) {
535                Ok(c) => c,
536                Err(e) => {
537                    tracing::warn!(path = %path.display(), error = %e, "skipping unreadable file");
538                    skipped += 1;
539                    continue;
540                }
541            };
542            let page = frontmatter::parse(&content);
543            let page_type = page.page_type().unwrap_or("page");
544            if !type_set.contains(page_type) {
545                continue;
546            }
547            let slug = match Slug::from_path(path, wiki_root) {
548                Ok(s) => s,
549                Err(e) => {
550                    tracing::warn!(path = %path.display(), error = %e, "skipping invalid path");
551                    skipped += 1;
552                    continue;
553                }
554            };
555            let uri = format!("wiki://{}/{slug}", self.wiki_name);
556            writer.add_document(index_page(is, registry, slug.as_str(), &uri, &page))?;
557            pages += 1;
558        }
559
560        writer.commit()?;
561        self.reload_reader()?;
562
563        // Update state.toml
564        let commit = git::current_head(repo_root).unwrap_or_default();
565        let state = IndexState {
566            schema_hash: registry.schema_hash().to_string(),
567            built: Utc::now().to_rfc3339(),
568            pages: 0, // not accurate for partial, but state.toml is refreshed
569            sections: 0,
570            commit,
571            types: registry.type_hashes().clone(),
572        };
573        std::fs::write(
574            self.index_path.join("state.toml"),
575            toml::to_string_pretty(&state)?,
576        )?;
577
578        Ok(IndexReport {
579            wiki: self.wiki_name.clone(),
580            pages_indexed: pages,
581            skipped,
582            duration_ms: start.elapsed().as_millis() as u64,
583        })
584    }
585}
586
587// ── Document building (private) ───────────────────────────────────────────────
588
589fn index_page(
590    is: &IndexSchema,
591    registry: &SpaceTypeRegistry,
592    slug: &str,
593    uri: &str,
594    page: &frontmatter::ParsedPage,
595) -> tantivy::TantivyDocument {
596    let mut doc = tantivy::TantivyDocument::default();
597
598    doc.add_text(is.field("slug"), slug);
599    doc.add_text(is.field("uri"), uri);
600
601    // Write confidence as f64 FAST field using the dedicated getter
602    if let Some(conf_field) = is.try_field("confidence") {
603        let conf = frontmatter::confidence(&page.frontmatter) as f64;
604        doc.add_f64(conf_field, conf);
605    }
606
607    let resolved = resolve_fields(page, registry);
608    let mut extra_text = String::new();
609
610    for (canonical, value) in &resolved {
611        // confidence is already written above as a numeric field; skip text indexing
612        if canonical == "confidence" {
613            continue;
614        }
615        index_value(&mut doc, &mut extra_text, is, canonical, value);
616    }
617
618    if extra_text.is_empty() {
619        doc.add_text(is.field("body"), &page.body);
620    } else {
621        doc.add_text(
622            is.field("body"),
623            format!("{}\n{}", page.body, extra_text.trim()),
624        );
625    }
626
627    for link in links::extract_body_wikilinks(&page.body) {
628        doc.add_text(is.field("body_links"), &link);
629    }
630
631    doc
632}
633
634/// Resolve frontmatter fields through the type's alias map.
635///
636/// Two passes:
637/// 1. Index non-aliased fields under their own name
638/// 2. For aliased source fields, index under the canonical name
639///    only if the canonical wasn't already present
640fn resolve_fields<'a>(
641    page: &'a frontmatter::ParsedPage,
642    registry: &'a SpaceTypeRegistry,
643) -> Vec<(String, &'a serde_yaml::Value)> {
644    let page_type = page.page_type().unwrap_or("page");
645    let empty = std::collections::HashMap::new();
646    let aliases = registry.aliases(page_type).unwrap_or(&empty);
647
648    let mut result = Vec::new();
649    let mut indexed: std::collections::HashSet<String> = std::collections::HashSet::new();
650
651    // Pass 1: non-aliased fields
652    for (field_name, value) in &page.frontmatter {
653        if aliases.contains_key(field_name.as_str()) {
654            continue;
655        }
656        let canonical = field_name.to_string();
657        indexed.insert(canonical.clone());
658        result.push((canonical, value));
659    }
660
661    // Pass 2: aliased source fields whose canonical target was not present
662    for (source_field, canonical) in aliases {
663        if indexed.contains(canonical.as_str()) {
664            continue;
665        }
666        if let Some(value) = page.frontmatter.get(source_field.as_str()) {
667            indexed.insert(canonical.clone());
668            result.push((canonical.clone(), value));
669        }
670    }
671
672    result
673}
674
675fn index_value(
676    doc: &mut tantivy::TantivyDocument,
677    extra_text: &mut String,
678    is: &IndexSchema,
679    canonical: &str,
680    value: &serde_yaml::Value,
681) {
682    if let Some(field_handle) = is.try_field(canonical) {
683        if is.is_keyword(canonical) {
684            for s in yaml_to_strings(value) {
685                doc.add_text(field_handle, &s);
686            }
687        } else {
688            let text = yaml_to_text(value);
689            if !text.is_empty() {
690                doc.add_text(field_handle, &text);
691            }
692        }
693    } else {
694        let text = yaml_to_text(value);
695        if !text.is_empty() {
696            extra_text.push(' ');
697            extra_text.push_str(&text);
698        }
699    }
700}
701
702fn yaml_to_text(value: &serde_yaml::Value) -> String {
703    match value {
704        serde_yaml::Value::String(s) => s.clone(),
705        serde_yaml::Value::Bool(b) => b.to_string(),
706        serde_yaml::Value::Number(n) => n.to_string(),
707        serde_yaml::Value::Sequence(seq) => seq
708            .iter()
709            .filter_map(|v| match v {
710                serde_yaml::Value::String(s) => Some(s.as_str()),
711                _ => None,
712            })
713            .collect::<Vec<_>>()
714            .join(" "),
715        serde_yaml::Value::Mapping(_) => serde_json::to_string(value).unwrap_or_default(),
716        serde_yaml::Value::Null => String::new(),
717        _ => String::new(),
718    }
719}
720
721fn yaml_to_strings(value: &serde_yaml::Value) -> Vec<String> {
722    match value {
723        serde_yaml::Value::String(s) => vec![s.clone()],
724        serde_yaml::Value::Bool(b) => vec![b.to_string()],
725        serde_yaml::Value::Number(n) => vec![n.to_string()],
726        serde_yaml::Value::Sequence(seq) => seq
727            .iter()
728            .filter_map(|v| match v {
729                serde_yaml::Value::String(s) => Some(s.clone()),
730                _ => None,
731            })
732            .collect(),
733        serde_yaml::Value::Null => vec![],
734        _ => vec![yaml_to_text(value)],
735    }
736}