1use std::path::{Path, PathBuf};
2use std::sync::RwLock;
3use std::sync::atomic::{AtomicU64, Ordering};
4
5use anyhow::{Context, Result};
6use chrono::Utc;
7use git2::Delta;
8use serde::{Deserialize, Serialize};
9use tantivy::{
10 Index, IndexReader, IndexWriter, Searcher, Term, collector::TopDocs, directory::MmapDirectory,
11 query::AllQuery,
12};
13use walkdir::WalkDir;
14
15use crate::frontmatter;
16use crate::git;
17use crate::index_schema::IndexSchema;
18use crate::links;
19use crate::slug::Slug;
20use crate::type_registry::SpaceTypeRegistry;
21
22#[derive(Debug, Clone, Serialize, Deserialize)]
26pub struct IndexReport {
27 pub wiki: String,
29 pub pages_indexed: usize,
31 pub skipped: usize,
33 pub duration_ms: u64,
35}
36
37#[derive(Debug, Clone, Serialize, Deserialize, Default)]
39pub struct UpdateReport {
40 pub updated: usize,
42 pub deleted: usize,
44}
45
46#[derive(Debug, Clone, Serialize, Deserialize)]
48pub struct IndexStatus {
49 pub wiki: String,
51 pub path: String,
53 pub built: Option<String>,
55 pub pages: usize,
57 pub sections: usize,
59 pub stale: bool,
61 pub openable: bool,
63 pub queryable: bool,
65}
66
67#[derive(Debug, Clone, PartialEq, Eq)]
69pub enum StalenessKind {
70 Current,
72 CommitChanged,
74 TypesChanged(Vec<String>),
76 FullRebuildNeeded,
78}
79
80#[derive(Debug, Clone, Serialize, Deserialize)]
84pub struct IndexState {
85 #[serde(default)]
87 pub schema_hash: String,
88 pub built: String,
90 pub pages: usize,
92 pub sections: usize,
94 pub commit: String,
96 #[serde(default)]
98 pub types: std::collections::HashMap<String, String>,
99}
100
101struct IndexInner {
104 tantivy_index: Option<Index>,
105 index_reader: Option<IndexReader>,
106 generation: AtomicU64,
107}
108
109pub struct SpaceIndexManager {
111 wiki_name: String,
112 index_path: PathBuf,
113 inner: RwLock<IndexInner>,
114}
115
116impl SpaceIndexManager {
117 pub fn new(wiki_name: impl Into<String>, index_path: impl Into<PathBuf>) -> Self {
119 Self {
120 wiki_name: wiki_name.into(),
121 index_path: index_path.into(),
122 inner: RwLock::new(IndexInner {
123 tantivy_index: None,
124 index_reader: None,
125 generation: AtomicU64::new(0),
126 }),
127 }
128 }
129
130 pub fn index_path(&self) -> &Path {
132 &self.index_path
133 }
134
135 pub fn wiki_name(&self) -> &str {
137 &self.wiki_name
138 }
139
140 pub fn generation(&self) -> u64 {
143 self.inner
144 .read()
145 .unwrap()
146 .generation
147 .load(Ordering::Acquire)
148 }
149
150 pub fn open(
154 &self,
155 is: &IndexSchema,
156 recovery: Option<(&Path, &Path, &SpaceTypeRegistry)>,
157 ) -> Result<()> {
158 let search_dir = self.index_path.join("search-index");
159
160 let try_open = || -> Result<Index> {
161 let dir = MmapDirectory::open(&search_dir)?;
162 Ok(Index::open(dir)?)
163 };
164
165 let index = match try_open() {
166 Ok(idx) => idx,
167 Err(e) => {
168 if let Some((wiki_root, repo_root, registry)) = recovery {
169 tracing::warn!(
170 wiki = %self.wiki_name,
171 error = %e,
172 "index corrupt, rebuilding",
173 );
174 if search_dir.exists() {
175 let _ = std::fs::remove_dir_all(&search_dir);
176 }
177 self.rebuild(wiki_root, repo_root, is, registry)?;
178 try_open().context("index still corrupt after rebuild")?
179 } else {
180 return Err(e);
181 }
182 }
183 };
184
185 let reader = index
186 .reader_builder()
187 .reload_policy(tantivy::ReloadPolicy::Manual)
188 .try_into()?;
189 let mut inner = self
190 .inner
191 .write()
192 .map_err(|_| anyhow::anyhow!("index lock poisoned"))?;
193 inner.tantivy_index = Some(index);
194 inner.index_reader = Some(reader);
195 Ok(())
196 }
197
198 pub fn searcher(&self) -> Result<Searcher> {
200 let inner = self
201 .inner
202 .read()
203 .map_err(|_| anyhow::anyhow!("index lock poisoned"))?;
204 inner
205 .index_reader
206 .as_ref()
207 .ok_or_else(|| anyhow::anyhow!("index not open"))
208 .map(|r| r.searcher())
209 }
210
211 fn reload_reader(&self) -> Result<()> {
214 let inner = self
215 .inner
216 .read()
217 .map_err(|_| anyhow::anyhow!("index lock poisoned"))?;
218 if let Some(ref r) = inner.index_reader {
219 r.reload()?;
220 }
221 inner.generation.fetch_add(1, Ordering::AcqRel);
222 Ok(())
223 }
224
225 fn writer(&self) -> Result<IndexWriter> {
227 let inner = self
228 .inner
229 .read()
230 .map_err(|_| anyhow::anyhow!("index lock poisoned"))?;
231 if let Some(ref idx) = inner.tantivy_index {
232 Ok(idx.writer(50_000_000)?)
233 } else {
234 drop(inner);
235 let search_dir = self.index_path.join("search-index");
236 let dir = MmapDirectory::open(&search_dir)
237 .with_context(|| format!("failed to open index dir: {}", search_dir.display()))?;
238 let index = Index::open(dir).context("failed to open index")?;
239 Ok(index.writer(50_000_000)?)
240 }
241 }
242
243 pub fn last_commit(&self) -> Option<String> {
245 let state_path = self.index_path.join("state.toml");
246 let content = std::fs::read_to_string(&state_path).ok()?;
247 let state: IndexState = toml::from_str(&content).ok()?;
248 if state.commit.is_empty() {
249 None
250 } else {
251 Some(state.commit)
252 }
253 }
254
255 pub fn rebuild(
257 &self,
258 wiki_root: &Path,
259 repo_root: &Path,
260 is: &IndexSchema,
261 registry: &SpaceTypeRegistry,
262 ) -> Result<IndexReport> {
263 let start = std::time::Instant::now();
264
265 let search_dir = self.index_path.join("search-index");
266 std::fs::create_dir_all(&search_dir)?;
267
268 let dir = MmapDirectory::open(&search_dir)
270 .with_context(|| format!("failed to open index dir: {}", search_dir.display()))?;
271 let index = Index::open_or_create(dir, is.schema.clone())?;
272 let mut writer: IndexWriter = index.writer(50_000_000)?;
273 writer.delete_all_documents()?;
274
275 let mut pages = 0usize;
276 let mut sections = 0usize;
277 let mut skipped = 0usize;
278
279 for entry in WalkDir::new(wiki_root).into_iter().filter_map(|e| e.ok()) {
280 let path = entry.path();
281 if !path.is_file() || path.extension().and_then(|e| e.to_str()) != Some("md") {
282 continue;
283 }
284
285 let content = match std::fs::read_to_string(path) {
286 Ok(c) => c,
287 Err(e) => {
288 tracing::warn!(path = %path.display(), error = %e, "skipping unreadable file");
289 skipped += 1;
290 continue;
291 }
292 };
293
294 let slug = match Slug::from_path(path, wiki_root) {
295 Ok(s) => s,
296 Err(e) => {
297 tracing::warn!(path = %path.display(), error = %e, "skipping invalid path");
298 skipped += 1;
299 continue;
300 }
301 };
302 let uri = format!("wiki://{}/{slug}", self.wiki_name);
303 let page = frontmatter::parse(&content);
304
305 writer.add_document(index_page(is, registry, slug.as_str(), &uri, &page))?;
306
307 if page.page_type() == Some("section") {
308 sections += 1;
309 }
310 pages += 1;
311 }
312
313 writer.commit()?;
314 self.reload_reader()?;
315
316 let commit = git::current_head(repo_root).unwrap_or_default();
317 let state = IndexState {
318 schema_hash: registry.schema_hash().to_string(),
319 built: Utc::now().to_rfc3339(),
320 pages,
321 sections,
322 commit,
323 types: registry.type_hashes().clone(),
324 };
325 std::fs::write(
326 self.index_path.join("state.toml"),
327 toml::to_string_pretty(&state)?,
328 )?;
329
330 Ok(IndexReport {
331 wiki: self.wiki_name.clone(),
332 pages_indexed: pages,
333 skipped,
334 duration_ms: start.elapsed().as_millis() as u64,
335 })
336 }
337
338 pub fn update(
340 &self,
341 wiki_root: &Path,
342 repo_root: &Path,
343 last_indexed_commit: Option<&str>,
344 is: &IndexSchema,
345 registry: &SpaceTypeRegistry,
346 ) -> Result<UpdateReport> {
347 let changes = git::collect_changed_files(repo_root, wiki_root, last_indexed_commit)?;
348 if changes.is_empty() {
349 return Ok(UpdateReport::default());
350 }
351
352 let mut writer = self.writer()?;
353
354 let f_slug = is.field("slug");
355 let wiki_prefix = wiki_root
356 .strip_prefix(repo_root)
357 .unwrap_or(Path::new("wiki"));
358 let mut updated = 0;
359 let mut deleted = 0;
360
361 for (path, status) in &changes {
362 let slug = match Slug::from_path(path, wiki_prefix) {
363 Ok(s) => s,
364 Err(e) => {
365 tracing::warn!(path = %path.display(), error = %e, "skipping invalid path in update");
366 continue;
367 }
368 };
369
370 writer.delete_term(Term::from_field_text(f_slug, slug.as_str()));
371
372 if *status == Delta::Deleted {
373 deleted += 1;
374 } else {
375 let full_path = repo_root.join(path);
376 if let Ok(content) = std::fs::read_to_string(&full_path) {
377 let page = frontmatter::parse(&content);
378 let uri = format!("wiki://{}/{slug}", self.wiki_name);
379 writer.add_document(index_page(is, registry, slug.as_str(), &uri, &page))?;
380 updated += 1;
381 }
382 }
383 }
384
385 writer.commit()?;
386 self.reload_reader()?;
387 Ok(UpdateReport { updated, deleted })
388 }
389
390 pub fn status(&self, repo_root: &Path) -> Result<IndexStatus> {
392 let state_path = self.index_path.join("state.toml");
393 let search_dir = self.index_path.join("search-index");
394
395 let (built, pages, sections, stale) = if state_path.exists() {
396 match std::fs::read_to_string(&state_path)
397 .ok()
398 .and_then(|c| toml::from_str::<IndexState>(&c).ok())
399 {
400 Some(state) => {
401 let head = git::current_head(repo_root).unwrap_or_default();
402 let (current_schema_hash, _) =
403 crate::type_registry::compute_disk_hashes(repo_root).unwrap_or_default();
404 let stale = state.commit != head || state.schema_hash != current_schema_hash;
405 (Some(state.built), state.pages, state.sections, stale)
406 }
407 None => (None, 0, 0, true),
408 }
409 } else {
410 (None, 0, 0, true)
411 };
412
413 let (openable, queryable) = if search_dir.exists() {
414 let try_open = || -> std::result::Result<Index, Box<dyn std::error::Error>> {
415 let dir = MmapDirectory::open(&search_dir)?;
416 Ok(Index::open(dir)?)
417 };
418 match try_open() {
419 Ok(index) => {
420 let queryable = index
421 .reader_builder()
422 .reload_policy(tantivy::ReloadPolicy::Manual)
423 .try_into()
424 .map(|r: IndexReader| {
425 r.searcher()
426 .search(&AllQuery, &TopDocs::with_limit(1).order_by_score())
427 .is_ok()
428 })
429 .unwrap_or(false);
430 (true, queryable)
431 }
432 Err(_) => (false, false),
433 }
434 } else {
435 (false, false)
436 };
437
438 Ok(IndexStatus {
439 wiki: self.wiki_name.clone(),
440 path: search_dir.to_string_lossy().into(),
441 built,
442 pages,
443 sections,
444 stale,
445 openable,
446 queryable,
447 })
448 }
449
450 pub fn delete_by_type(&self, is: &IndexSchema, type_name: &str) -> Result<()> {
452 let mut writer = self.writer()?;
453 let f_type = is.field("type");
454 writer.delete_term(Term::from_field_text(f_type, type_name));
455 writer.commit()?;
456 self.reload_reader()?;
457 Ok(())
458 }
459
460 pub fn staleness_kind(&self, repo_root: &Path) -> Result<StalenessKind> {
462 let state_path = self.index_path.join("state.toml");
463 let state = match std::fs::read_to_string(&state_path)
464 .ok()
465 .and_then(|c| toml::from_str::<IndexState>(&c).ok())
466 {
467 Some(s) => s,
468 None => return Ok(StalenessKind::FullRebuildNeeded),
469 };
470
471 let head = git::current_head(repo_root).unwrap_or_default();
472 let (current_schema_hash, current_types) =
473 crate::type_registry::compute_disk_hashes(repo_root).unwrap_or_default();
474
475 if state.commit == head && state.schema_hash == current_schema_hash {
476 return Ok(StalenessKind::Current);
477 }
478
479 if state.schema_hash == current_schema_hash {
480 return Ok(StalenessKind::CommitChanged);
481 }
482
483 let mut changed = Vec::new();
485 for (name, hash) in &state.types {
486 match current_types.get(name) {
487 Some(h) if h != hash => changed.push(name.clone()),
488 None => changed.push(name.clone()),
489 _ => {}
490 }
491 }
492 for name in current_types.keys() {
493 if !state.types.contains_key(name) {
494 changed.push(name.clone());
495 }
496 }
497
498 if changed.is_empty() {
499 Ok(StalenessKind::FullRebuildNeeded)
500 } else {
501 changed.sort();
502 Ok(StalenessKind::TypesChanged(changed))
503 }
504 }
505
506 pub fn rebuild_types(
508 &self,
509 types: &[String],
510 wiki_root: &Path,
511 repo_root: &Path,
512 is: &IndexSchema,
513 registry: &SpaceTypeRegistry,
514 ) -> Result<IndexReport> {
515 let start = std::time::Instant::now();
516 let mut writer = self.writer()?;
517 let f_type = is.field("type");
518
519 for type_name in types {
521 writer.delete_term(Term::from_field_text(f_type, type_name));
522 }
523
524 let type_set: std::collections::HashSet<&str> = types.iter().map(|s| s.as_str()).collect();
526 let mut pages = 0usize;
527 let mut skipped = 0usize;
528
529 for entry in WalkDir::new(wiki_root).into_iter().filter_map(|e| e.ok()) {
530 let path = entry.path();
531 if !path.is_file() || path.extension().and_then(|e| e.to_str()) != Some("md") {
532 continue;
533 }
534 let content = match std::fs::read_to_string(path) {
535 Ok(c) => c,
536 Err(e) => {
537 tracing::warn!(path = %path.display(), error = %e, "skipping unreadable file");
538 skipped += 1;
539 continue;
540 }
541 };
542 let page = frontmatter::parse(&content);
543 let page_type = page.page_type().unwrap_or("page");
544 if !type_set.contains(page_type) {
545 continue;
546 }
547 let slug = match Slug::from_path(path, wiki_root) {
548 Ok(s) => s,
549 Err(e) => {
550 tracing::warn!(path = %path.display(), error = %e, "skipping invalid path");
551 skipped += 1;
552 continue;
553 }
554 };
555 let uri = format!("wiki://{}/{slug}", self.wiki_name);
556 writer.add_document(index_page(is, registry, slug.as_str(), &uri, &page))?;
557 pages += 1;
558 }
559
560 writer.commit()?;
561 self.reload_reader()?;
562
563 let commit = git::current_head(repo_root).unwrap_or_default();
565 let state = IndexState {
566 schema_hash: registry.schema_hash().to_string(),
567 built: Utc::now().to_rfc3339(),
568 pages: 0, sections: 0,
570 commit,
571 types: registry.type_hashes().clone(),
572 };
573 std::fs::write(
574 self.index_path.join("state.toml"),
575 toml::to_string_pretty(&state)?,
576 )?;
577
578 Ok(IndexReport {
579 wiki: self.wiki_name.clone(),
580 pages_indexed: pages,
581 skipped,
582 duration_ms: start.elapsed().as_millis() as u64,
583 })
584 }
585}
586
587fn index_page(
590 is: &IndexSchema,
591 registry: &SpaceTypeRegistry,
592 slug: &str,
593 uri: &str,
594 page: &frontmatter::ParsedPage,
595) -> tantivy::TantivyDocument {
596 let mut doc = tantivy::TantivyDocument::default();
597
598 doc.add_text(is.field("slug"), slug);
599 doc.add_text(is.field("uri"), uri);
600
601 if let Some(conf_field) = is.try_field("confidence") {
603 let conf = frontmatter::confidence(&page.frontmatter) as f64;
604 doc.add_f64(conf_field, conf);
605 }
606
607 let resolved = resolve_fields(page, registry);
608 let mut extra_text = String::new();
609
610 for (canonical, value) in &resolved {
611 if canonical == "confidence" {
613 continue;
614 }
615 index_value(&mut doc, &mut extra_text, is, canonical, value);
616 }
617
618 if extra_text.is_empty() {
619 doc.add_text(is.field("body"), &page.body);
620 } else {
621 doc.add_text(
622 is.field("body"),
623 format!("{}\n{}", page.body, extra_text.trim()),
624 );
625 }
626
627 for link in links::extract_body_wikilinks(&page.body) {
628 doc.add_text(is.field("body_links"), &link);
629 }
630
631 doc
632}
633
634fn resolve_fields<'a>(
641 page: &'a frontmatter::ParsedPage,
642 registry: &'a SpaceTypeRegistry,
643) -> Vec<(String, &'a serde_yaml::Value)> {
644 let page_type = page.page_type().unwrap_or("page");
645 let empty = std::collections::HashMap::new();
646 let aliases = registry.aliases(page_type).unwrap_or(&empty);
647
648 let mut result = Vec::new();
649 let mut indexed: std::collections::HashSet<String> = std::collections::HashSet::new();
650
651 for (field_name, value) in &page.frontmatter {
653 if aliases.contains_key(field_name.as_str()) {
654 continue;
655 }
656 let canonical = field_name.to_string();
657 indexed.insert(canonical.clone());
658 result.push((canonical, value));
659 }
660
661 for (source_field, canonical) in aliases {
663 if indexed.contains(canonical.as_str()) {
664 continue;
665 }
666 if let Some(value) = page.frontmatter.get(source_field.as_str()) {
667 indexed.insert(canonical.clone());
668 result.push((canonical.clone(), value));
669 }
670 }
671
672 result
673}
674
675fn index_value(
676 doc: &mut tantivy::TantivyDocument,
677 extra_text: &mut String,
678 is: &IndexSchema,
679 canonical: &str,
680 value: &serde_yaml::Value,
681) {
682 if let Some(field_handle) = is.try_field(canonical) {
683 if is.is_keyword(canonical) {
684 for s in yaml_to_strings(value) {
685 doc.add_text(field_handle, &s);
686 }
687 } else {
688 let text = yaml_to_text(value);
689 if !text.is_empty() {
690 doc.add_text(field_handle, &text);
691 }
692 }
693 } else {
694 let text = yaml_to_text(value);
695 if !text.is_empty() {
696 extra_text.push(' ');
697 extra_text.push_str(&text);
698 }
699 }
700}
701
702fn yaml_to_text(value: &serde_yaml::Value) -> String {
703 match value {
704 serde_yaml::Value::String(s) => s.clone(),
705 serde_yaml::Value::Bool(b) => b.to_string(),
706 serde_yaml::Value::Number(n) => n.to_string(),
707 serde_yaml::Value::Sequence(seq) => seq
708 .iter()
709 .filter_map(|v| match v {
710 serde_yaml::Value::String(s) => Some(s.as_str()),
711 _ => None,
712 })
713 .collect::<Vec<_>>()
714 .join(" "),
715 serde_yaml::Value::Mapping(_) => serde_json::to_string(value).unwrap_or_default(),
716 serde_yaml::Value::Null => String::new(),
717 _ => String::new(),
718 }
719}
720
721fn yaml_to_strings(value: &serde_yaml::Value) -> Vec<String> {
722 match value {
723 serde_yaml::Value::String(s) => vec![s.clone()],
724 serde_yaml::Value::Bool(b) => vec![b.to_string()],
725 serde_yaml::Value::Number(n) => vec![n.to_string()],
726 serde_yaml::Value::Sequence(seq) => seq
727 .iter()
728 .filter_map(|v| match v {
729 serde_yaml::Value::String(s) => Some(s.clone()),
730 _ => None,
731 })
732 .collect(),
733 serde_yaml::Value::Null => vec![],
734 _ => vec![yaml_to_text(value)],
735 }
736}