1use std::collections::BTreeSet;
2use std::fs;
3use std::path::{Path, PathBuf};
4
5use crate::connectors::NormalizedConversation;
6use crate::connectors::NormalizedMessage;
7use crate::evidence_bundle::{
8 EVIDENCE_BUNDLE_MANIFEST_FILE, EvidenceBundleChunk, EvidenceBundleChunkRole,
9 EvidenceBundleKind, EvidenceBundleManifest,
10};
11use crate::model::conversation_packet::{
12 ConversationPacket, ConversationPacketMessage, ConversationPacketProvenance,
13};
14use crate::search::canonicalize::is_hard_message_noise;
15use crate::sources::provenance::LOCAL_SOURCE_ID;
16use anyhow::{Context, Error, Result};
17use frankensearch::lexical::{
18 CASS_SCHEMA_HASH, CASS_SCHEMA_VERSION, CassDocument as FsCassDocument,
19 CassDocumentRef as FsCassDocumentRef, CassFields as FsCassFields,
20 CassMergeStatus as FsCassMergeStatus, CassTantivyIndex as FsCassTantivyIndex, Index,
21 IndexReader, ReloadPolicy as FsReloadPolicy, Schema, cass_build_schema as fs_build_schema,
22 cass_ensure_tokenizer as fs_ensure_tokenizer, cass_fields_from_schema as fs_fields_from_schema,
23 cass_index_dir as fs_index_dir, cass_open_search_reader as fs_cass_open_search_reader,
24 cass_schema_hash_matches, tantivy_crate,
25};
26use serde::{Deserialize, Serialize};
27use std::time::SystemTime;
28
29pub(crate) fn normalized_index_source_id(
30 source_id: Option<&str>,
31 origin_kind: Option<&str>,
32 origin_host: Option<&str>,
33) -> String {
34 let trimmed_source_id = source_id.unwrap_or_default().trim();
35 if !trimmed_source_id.is_empty() {
36 if trimmed_source_id.eq_ignore_ascii_case(LOCAL_SOURCE_ID) {
37 return LOCAL_SOURCE_ID.to_string();
38 }
39 return trimmed_source_id.to_string();
40 }
41
42 let trimmed_origin_host = origin_host.map(str::trim).filter(|value| !value.is_empty());
43 let trimmed_origin_kind = origin_kind.unwrap_or_default().trim();
44 if trimmed_origin_kind.eq_ignore_ascii_case("ssh")
45 || trimmed_origin_kind.eq_ignore_ascii_case("remote")
46 {
47 return trimmed_origin_host.unwrap_or("remote").to_string();
48 }
49 if let Some(origin_host) = trimmed_origin_host {
50 return origin_host.to_string();
51 }
52
53 LOCAL_SOURCE_ID.to_string()
54}
55
56pub(crate) fn normalized_index_origin_kind(source_id: &str, origin_kind: Option<&str>) -> String {
57 if let Some(kind) = origin_kind.map(str::trim).filter(|value| !value.is_empty()) {
58 if kind.eq_ignore_ascii_case("local") {
59 return LOCAL_SOURCE_ID.to_string();
60 }
61 if kind.eq_ignore_ascii_case("ssh") || kind.eq_ignore_ascii_case("remote") {
62 return "remote".to_string();
63 }
64 return kind.to_ascii_lowercase();
65 }
66
67 if source_id == LOCAL_SOURCE_ID {
68 LOCAL_SOURCE_ID.to_string()
69 } else {
70 "remote".to_string()
71 }
72}
73
74pub(crate) fn normalized_index_origin_host(origin_host: Option<&str>) -> Option<String> {
75 origin_host
76 .map(str::trim)
77 .filter(|value| !value.is_empty())
78 .map(str::to_string)
79}
80
81pub const SCHEMA_HASH: &str = CASS_SCHEMA_HASH;
82const ENV_TANTIVY_ADD_BATCH_MAX_CHARS: &str = "CASS_TANTIVY_ADD_BATCH_MAX_CHARS";
83const ENV_TANTIVY_ADD_BATCH_MAX_MESSAGES: &str = "CASS_TANTIVY_ADD_BATCH_MAX_MESSAGES";
84const ENV_TANTIVY_MAX_WRITER_THREADS: &str = "CASS_TANTIVY_MAX_WRITER_THREADS";
85const ENV_TANTIVY_REBUILD_STAGED_SHARD_BUILDERS: &str =
86 "CASS_TANTIVY_REBUILD_STAGED_SHARD_BUILDERS";
87const DEFAULT_TANTIVY_MAX_WRITER_THREADS_CEILING: usize = 26;
88const DEFAULT_TANTIVY_ASSUMED_CONCURRENT_WRITERS: u64 = 8;
89const DEFAULT_TANTIVY_WRITER_HEAP_PER_THREAD_BYTES: u64 = 128 * 1024 * 1024;
90const DEFAULT_TANTIVY_WRITER_HEAP_RAM_FRACTION: u64 = 10;
91
92fn positive_usize_env(name: &str) -> Option<usize> {
93 dotenvy::var(name)
94 .ok()
95 .and_then(|value| value.parse::<usize>().ok())
96 .filter(|value| *value > 0)
97}
98
99#[cfg(target_os = "linux")]
100fn linux_available_memory_bytes() -> Option<u64> {
101 let meminfo = std::fs::read_to_string("/proc/meminfo").ok()?;
102 for line in meminfo.lines() {
103 if let Some(rest) = line.strip_prefix("MemAvailable:") {
104 let kb = rest.split_whitespace().next()?.parse::<u64>().ok()?;
105 return kb.checked_mul(1024);
106 }
107 }
108 None
109}
110
111#[cfg(target_os = "linux")]
112fn host_memory_bytes_for_tantivy_default() -> Option<u64> {
113 linux_available_memory_bytes()
114}
115
116#[cfg(target_os = "macos")]
117fn host_memory_bytes_for_tantivy_default() -> Option<u64> {
118 let output = std::process::Command::new("sysctl")
119 .args(["-n", "hw.memsize"])
120 .output()
121 .ok()?;
122 if !output.status.success() {
123 return None;
124 }
125 let stdout = String::from_utf8(output.stdout).ok()?;
126 stdout.trim().parse::<u64>().ok()
127}
128
129#[cfg(not(any(target_os = "linux", target_os = "macos")))]
130fn host_memory_bytes_for_tantivy_default() -> Option<u64> {
131 None
132}
133
134#[cfg(test)]
135pub(crate) fn default_tantivy_max_writer_threads_for_memory_bytes(
136 memory_bytes: Option<u64>,
137) -> usize {
138 default_tantivy_max_writer_threads_for_memory_bytes_and_concurrent_writers(
139 memory_bytes,
140 DEFAULT_TANTIVY_ASSUMED_CONCURRENT_WRITERS,
141 )
142}
143
144pub(crate) fn default_tantivy_max_writer_threads_for_memory_bytes_and_concurrent_writers(
145 memory_bytes: Option<u64>,
146 concurrent_writers: u64,
147) -> usize {
148 let Some(memory_bytes) = memory_bytes else {
149 return DEFAULT_TANTIVY_MAX_WRITER_THREADS_CEILING;
150 };
151 let per_thread_peak =
152 DEFAULT_TANTIVY_WRITER_HEAP_PER_THREAD_BYTES.saturating_mul(concurrent_writers.max(1));
153 if per_thread_peak == 0 {
154 return DEFAULT_TANTIVY_MAX_WRITER_THREADS_CEILING;
155 }
156 let budget = memory_bytes / DEFAULT_TANTIVY_WRITER_HEAP_RAM_FRACTION;
157 let threads = budget / per_thread_peak;
158 usize::try_from(threads)
159 .unwrap_or(usize::MAX)
160 .clamp(1, DEFAULT_TANTIVY_MAX_WRITER_THREADS_CEILING)
161}
162
163fn default_tantivy_assumed_concurrent_writers() -> u64 {
164 positive_usize_env(ENV_TANTIVY_REBUILD_STAGED_SHARD_BUILDERS)
165 .and_then(|value| u64::try_from(value).ok())
166 .unwrap_or(DEFAULT_TANTIVY_ASSUMED_CONCURRENT_WRITERS)
167 .max(1)
168}
169
170pub fn default_tantivy_max_writer_threads() -> usize {
171 default_tantivy_max_writer_threads_for_memory_bytes_and_concurrent_writers(
172 host_memory_bytes_for_tantivy_default(),
173 default_tantivy_assumed_concurrent_writers(),
174 )
175}
176
177pub(crate) fn tantivy_writer_parallelism_hint_for_available(available_parallelism: usize) -> usize {
178 let max_threads = positive_usize_env(ENV_TANTIVY_MAX_WRITER_THREADS)
179 .unwrap_or_else(default_tantivy_max_writer_threads);
180
181 available_parallelism.max(1).clamp(1, max_threads)
182}
183
184pub(crate) fn tantivy_writer_parallelism_hint_for_available_governed(
190 available_parallelism: usize,
191) -> usize {
192 let raw = tantivy_writer_parallelism_hint_for_available(available_parallelism);
193 crate::indexer::responsiveness::effective_worker_count(raw).max(1)
194}
195
196pub(crate) fn tantivy_writer_parallelism_hint() -> usize {
197 tantivy_writer_parallelism_hint_for_available_governed(
198 std::thread::available_parallelism()
199 .map(std::num::NonZeroUsize::get)
200 .unwrap_or(1),
201 )
202}
203
204fn tantivy_add_batch_max_messages() -> usize {
205 positive_usize_env(ENV_TANTIVY_ADD_BATCH_MAX_MESSAGES)
206 .unwrap_or_else(|| 4_096.max(tantivy_writer_parallelism_hint().saturating_mul(512)))
207}
208
209fn tantivy_add_batch_max_chars() -> usize {
210 positive_usize_env(ENV_TANTIVY_ADD_BATCH_MAX_CHARS).unwrap_or_else(|| {
211 (16 * 1024 * 1024).max(tantivy_writer_parallelism_hint().saturating_mul(2 * 1024 * 1024))
212 })
213}
214
215fn tantivy_prebuilt_add_batch_max_messages() -> usize {
216 positive_usize_env(ENV_TANTIVY_ADD_BATCH_MAX_MESSAGES)
217 .unwrap_or_else(|| 16_384.max(tantivy_writer_parallelism_hint().saturating_mul(512)))
218}
219
220fn map_fs_err(err: frankensearch::SearchError) -> Error {
221 Error::new(err)
222}
223
224#[derive(Clone)]
225struct CassDocContext {
226 agent: String,
227 workspace: Option<String>,
228 workspace_original: Option<String>,
229 source_path: String,
230 title: Option<String>,
231 started_at_fallback: Option<i64>,
232 source_id: String,
233 origin_kind: String,
234 origin_host: Option<String>,
235 conversation_id: Option<i64>,
236}
237
238fn cass_doc_context(conv: &NormalizedConversation, conversation_id: Option<i64>) -> CassDocContext {
239 let cass_origin = conv.metadata.get("cass").and_then(|c| c.get("origin"));
240 let raw_source_id = cass_origin
241 .and_then(|o| o.get("source_id"))
242 .and_then(|v| v.as_str());
243 let raw_origin_kind = cass_origin
244 .and_then(|o| o.get("kind"))
245 .and_then(|v| v.as_str());
246 let origin_host = normalized_index_origin_host(
247 cass_origin
248 .and_then(|o| o.get("host"))
249 .and_then(|v| v.as_str()),
250 );
251 let source_id =
252 normalized_index_source_id(raw_source_id, raw_origin_kind, origin_host.as_deref());
253 let origin_kind = normalized_index_origin_kind(&source_id, raw_origin_kind);
254
255 CassDocContext {
256 agent: conv.agent_slug.clone(),
257 workspace: conv
258 .workspace
259 .as_ref()
260 .map(|ws| ws.to_string_lossy().to_string()),
261 workspace_original: conv
262 .metadata
263 .get("cass")
264 .and_then(|c| c.get("workspace_original"))
265 .and_then(|v| v.as_str())
266 .map(ToOwned::to_owned),
267 source_path: conv.source_path.to_string_lossy().to_string(),
268 title: conv.title.clone(),
269 started_at_fallback: conv.started_at,
270 source_id,
271 origin_kind,
272 origin_host,
273 conversation_id,
274 }
275}
276
277fn cass_document_for_message(
278 context: &CassDocContext,
279 msg: &NormalizedMessage,
280) -> Option<FsCassDocument> {
281 if is_hard_message_noise(Some(msg.role.as_str()), &msg.content) {
282 return None;
283 }
284
285 Some(FsCassDocument {
286 agent: context.agent.clone(),
287 workspace: context.workspace.clone(),
288 workspace_original: context.workspace_original.clone(),
289 source_path: context.source_path.clone(),
290 msg_idx: msg.idx.max(0) as u64,
291 created_at: msg.created_at.or(context.started_at_fallback),
292 title: context.title.clone(),
293 content: msg.content.clone(),
294 conversation_id: context.conversation_id,
295 source_id: context.source_id.clone(),
296 origin_kind: context.origin_kind.clone(),
297 origin_host: context.origin_host.clone(),
298 })
299}
300
301fn push_cass_document_into_pending(
302 docs: &mut Vec<FsCassDocument>,
303 pending_chars: &mut usize,
304 doc: FsCassDocument,
305) {
306 *pending_chars = pending_chars.saturating_add(doc.content.len());
307 docs.push(doc);
308}
309
310fn cass_doc_context_from_packet(packet: &ConversationPacket) -> CassDocContext {
321 let payload = &packet.payload;
322 let metadata = &payload.metadata_json;
323 let cass_origin = metadata.get("cass").and_then(|c| c.get("origin"));
324 let raw_source_id = cass_origin
325 .and_then(|o| o.get("source_id"))
326 .and_then(|v| v.as_str());
327 let raw_origin_kind = cass_origin
328 .and_then(|o| o.get("kind"))
329 .and_then(|v| v.as_str());
330 let origin_host = normalized_index_origin_host(
331 cass_origin
332 .and_then(|o| o.get("host"))
333 .and_then(|v| v.as_str()),
334 );
335 let source_id =
336 normalized_index_source_id(raw_source_id, raw_origin_kind, origin_host.as_deref());
337 let origin_kind = normalized_index_origin_kind(&source_id, raw_origin_kind);
338
339 CassDocContext {
340 agent: payload.identity.agent_slug.clone(),
341 workspace: payload.identity.workspace.clone(),
342 workspace_original: metadata
343 .get("cass")
344 .and_then(|c| c.get("workspace_original"))
345 .and_then(|v| v.as_str())
346 .map(ToOwned::to_owned),
347 source_path: payload.identity.source_path.clone(),
348 title: payload.identity.title.clone(),
349 started_at_fallback: payload.timestamps.started_at,
350 source_id,
351 origin_kind,
352 origin_host,
353 conversation_id: payload.identity.conversation_id,
354 }
355}
356
357fn cass_document_for_packet_message(
360 context: &CassDocContext,
361 msg: &ConversationPacketMessage,
362) -> Option<FsCassDocument> {
363 if is_hard_message_noise(Some(msg.role.as_str()), &msg.content) {
364 return None;
365 }
366
367 Some(FsCassDocument {
368 agent: context.agent.clone(),
369 workspace: context.workspace.clone(),
370 workspace_original: context.workspace_original.clone(),
371 source_path: context.source_path.clone(),
372 msg_idx: msg.idx.max(0) as u64,
373 created_at: msg.created_at.or(context.started_at_fallback),
374 title: context.title.clone(),
375 content: msg.content.clone(),
376 conversation_id: context.conversation_id,
377 source_id: context.source_id.clone(),
378 origin_kind: context.origin_kind.clone(),
379 origin_host: context.origin_host.clone(),
380 })
381}
382
383#[allow(clippy::too_many_arguments)]
384fn push_packet_message_into_pending<F>(
385 inner: &mut FsCassTantivyIndex,
386 context: &CassDocContext,
387 msg: &ConversationPacketMessage,
388 docs: &mut Vec<FsCassDocument>,
389 pending_chars: &mut usize,
390 max_messages: usize,
391 max_chars: usize,
392 on_batch_flushed: &mut F,
393) -> Result<()>
394where
395 F: FnMut(usize) -> Result<()>,
396{
397 let Some(doc) = cass_document_for_packet_message(context, msg) else {
398 return Ok(());
399 };
400 push_cass_document_into_pending(docs, pending_chars, doc);
401 if docs.len() >= max_messages || *pending_chars >= max_chars {
402 let flushed_docs = docs.len();
403 inner
404 .add_cass_documents(docs.as_slice())
405 .map_err(map_fs_err)?;
406 on_batch_flushed(flushed_docs)?;
407 docs.clear();
408 *pending_chars = 0;
409 }
410 Ok(())
411}
412
413pub fn schema_hash_matches(stored: &str) -> bool {
415 cass_schema_hash_matches(stored)
416}
417
418pub type Fields = FsCassFields;
419pub type MergeStatus = FsCassMergeStatus;
420
421const FEDERATED_SEARCH_MANIFEST_FILE: &str = "federated-search-manifest.json";
422const FEDERATED_SEARCH_MANIFEST_VERSION: u32 = 1;
423const FEDERATED_SEARCH_MANIFEST_KIND: &str = "cass-federated-lexical-index";
424const EVIDENCE_BUNDLE_MANIFEST_TEMP_FILE: &str = "evidence-bundle-manifest.json.tmp";
425
426#[derive(Debug, Clone, PartialEq, Eq)]
427pub struct SearchableIndexSummary {
428 pub docs: usize,
429 pub segments: usize,
430}
431
432#[derive(Debug, Clone, Serialize, Deserialize)]
433struct FederatedSearchManifest {
434 version: u32,
435 kind: String,
436 schema_hash: String,
437 shards: Vec<FederatedSearchShardManifest>,
438}
439
440#[derive(Debug, Clone, Serialize, Deserialize)]
441struct FederatedSearchShardManifest {
442 relative_path: String,
443 docs: usize,
444 segments: usize,
445 meta_fingerprint: String,
446}
447
448fn federated_search_manifest_path(index_path: &Path) -> PathBuf {
449 index_path.join(FEDERATED_SEARCH_MANIFEST_FILE)
450}
451
452fn write_root_schema_hash_file(index_path: &Path) -> Result<()> {
453 fs::write(
454 index_path.join("schema_hash.json"),
455 format!("{{\"schema_hash\":\"{CASS_SCHEMA_HASH}\"}}"),
456 )
457 .with_context(|| {
458 format!(
459 "writing cass schema hash metadata for searchable index {}",
460 index_path.display()
461 )
462 })?;
463 Ok(())
464}
465
466fn manifest_relative_shard_path(shard_idx: usize) -> String {
467 format!("shards/shard-{shard_idx:05}")
468}
469
470fn meta_fingerprint_for_existing_index_dir(index_path: &Path) -> Result<String> {
471 let meta_path = index_path.join("meta.json");
472 let bytes = fs::read(&meta_path)
473 .with_context(|| format!("reading Tantivy meta file {}", meta_path.display()))?;
474 Ok(blake3::hash(&bytes).to_hex().to_string())
475}
476
477fn validate_federated_shard_relative_path(relative_path: &str) -> Result<()> {
478 if relative_path.trim().is_empty() {
479 return Err(anyhow::anyhow!(
480 "federated lexical shard path must not be empty"
481 ));
482 }
483
484 let path = Path::new(relative_path);
485 let mut components = path.components();
486 match components.next() {
487 Some(std::path::Component::Normal(component))
488 if component == std::ffi::OsStr::new("shards") => {}
489 _ => {
490 return Err(anyhow::anyhow!(
491 "federated lexical shard path must stay under shards/: {}",
492 relative_path
493 ));
494 }
495 }
496
497 let mut has_child = false;
498 for component in components {
499 match component {
500 std::path::Component::Normal(_) => has_child = true,
501 _ => {
502 return Err(anyhow::anyhow!(
503 "federated lexical shard path must be a clean relative path: {}",
504 relative_path
505 ));
506 }
507 }
508 }
509
510 if !has_child {
511 return Err(anyhow::anyhow!(
512 "federated lexical shard path must name a shard directory under shards/: {}",
513 relative_path
514 ));
515 }
516
517 Ok(())
518}
519
520fn validate_federated_shard_meta_fingerprint(fingerprint: &str) -> Result<()> {
521 if fingerprint.len() != 64 || !fingerprint.bytes().all(|byte| byte.is_ascii_hexdigit()) {
522 return Err(anyhow::anyhow!(
523 "federated lexical shard meta fingerprint must be a 64-character hex BLAKE3 digest"
524 ));
525 }
526 Ok(())
527}
528
529fn federated_search_manifest_summary(
530 index_path: &Path,
531 manifest: &FederatedSearchManifest,
532) -> Result<SearchableIndexSummary> {
533 let mut docs = 0usize;
534 let mut segments = 0usize;
535 for shard in &manifest.shards {
536 docs = docs.checked_add(shard.docs).with_context(|| {
537 format!(
538 "federated search manifest doc count overflows platform usize: {}",
539 index_path.display()
540 )
541 })?;
542 segments = segments.checked_add(shard.segments).with_context(|| {
543 format!(
544 "federated search manifest segment count overflows platform usize: {}",
545 index_path.display()
546 )
547 })?;
548 }
549 Ok(SearchableIndexSummary { docs, segments })
550}
551
552fn validate_federated_search_manifest(
553 index_path: &Path,
554 manifest: &FederatedSearchManifest,
555 verify_shard_fingerprints: bool,
556) -> Result<()> {
557 if manifest.version != FEDERATED_SEARCH_MANIFEST_VERSION {
558 return Err(anyhow::anyhow!(
559 "unsupported federated search manifest version: expected {}, got {}",
560 FEDERATED_SEARCH_MANIFEST_VERSION,
561 manifest.version
562 ));
563 }
564 if manifest.kind != FEDERATED_SEARCH_MANIFEST_KIND {
565 return Err(anyhow::anyhow!(
566 "unexpected federated search manifest kind: expected {}, got {}",
567 FEDERATED_SEARCH_MANIFEST_KIND,
568 manifest.kind
569 ));
570 }
571 if manifest.schema_hash != CASS_SCHEMA_HASH {
572 return Err(anyhow::anyhow!(
573 "federated search manifest schema mismatch: expected {}, got {}",
574 CASS_SCHEMA_HASH,
575 manifest.schema_hash
576 ));
577 }
578 if manifest.shards.is_empty() {
579 return Err(anyhow::anyhow!(
580 "federated search manifest must contain at least one shard"
581 ));
582 }
583
584 let mut seen_relative_paths = BTreeSet::new();
585 for shard in &manifest.shards {
586 validate_federated_shard_relative_path(&shard.relative_path)?;
587 validate_federated_shard_meta_fingerprint(&shard.meta_fingerprint)?;
588 if !seen_relative_paths.insert(shard.relative_path.clone()) {
589 return Err(anyhow::anyhow!(
590 "federated search manifest contains duplicate shard path: {}",
591 shard.relative_path
592 ));
593 }
594
595 if verify_shard_fingerprints {
596 let shard_path = index_path.join(&shard.relative_path);
597 let actual = meta_fingerprint_for_existing_index_dir(&shard_path)?;
598 if actual != shard.meta_fingerprint {
599 return Err(anyhow::anyhow!(
600 "federated lexical shard fingerprint mismatch for {}: expected {}, got {}",
601 shard_path.display(),
602 shard.meta_fingerprint,
603 actual
604 ));
605 }
606 }
607 }
608
609 federated_search_manifest_summary(index_path, manifest)?;
610 Ok(())
611}
612
613fn federated_evidence_chunk_role(relative_path: &str) -> EvidenceBundleChunkRole {
614 if relative_path == FEDERATED_SEARCH_MANIFEST_FILE {
615 EvidenceBundleChunkRole::Manifest
616 } else if relative_path == "schema_hash.json"
617 || relative_path == "meta.json"
618 || relative_path.ends_with("/meta.json")
619 {
620 EvidenceBundleChunkRole::Metadata
621 } else if relative_path.starts_with("shards/") {
622 EvidenceBundleChunkRole::LexicalShard
623 } else {
624 EvidenceBundleChunkRole::Other
625 }
626}
627
628fn standard_lexical_evidence_chunk_role(relative_path: &str) -> EvidenceBundleChunkRole {
629 if relative_path == "schema_hash.json" || relative_path == "meta.json" {
630 EvidenceBundleChunkRole::Metadata
631 } else {
632 EvidenceBundleChunkRole::LexicalShard
633 }
634}
635
636fn current_schema_hash_file_matches(index_path: &Path) -> Result<()> {
637 let schema_hash_path = index_path.join("schema_hash.json");
638 let content = fs::read_to_string(&schema_hash_path).with_context(|| {
639 format!(
640 "reading cass schema hash metadata for lexical artifact {}",
641 schema_hash_path.display()
642 )
643 })?;
644 let value: serde_json::Value = serde_json::from_str(&content).with_context(|| {
645 format!(
646 "parsing cass schema hash metadata for lexical artifact {}",
647 schema_hash_path.display()
648 )
649 })?;
650 let stored_hash = value
651 .get("schema_hash")
652 .and_then(|value| value.as_str())
653 .ok_or_else(|| {
654 anyhow::anyhow!(
655 "lexical artifact schema hash metadata is missing schema_hash: {}",
656 schema_hash_path.display()
657 )
658 })?;
659 if stored_hash != CASS_SCHEMA_HASH {
660 return Err(anyhow::anyhow!(
661 "lexical artifact schema mismatch: expected {}, got {}",
662 CASS_SCHEMA_HASH,
663 stored_hash
664 ));
665 }
666 Ok(())
667}
668
669fn relative_artifact_path_string(relative_path: &Path) -> Result<String> {
670 let mut parts = Vec::new();
671 for component in relative_path.components() {
672 match component {
673 std::path::Component::Normal(part) => {
674 let part = part.to_str().ok_or_else(|| {
675 anyhow::anyhow!(
676 "lexical artifact path is not UTF-8: {}",
677 relative_path.display()
678 )
679 })?;
680 parts.push(part);
681 }
682 _ => {
683 return Err(anyhow::anyhow!(
684 "lexical artifact path contains an unsafe component: {}",
685 relative_path.display()
686 ));
687 }
688 }
689 }
690 if parts.is_empty() {
691 return Err(anyhow::anyhow!("lexical artifact path must not be empty"));
692 }
693 Ok(parts.join("/"))
694}
695
696fn is_evidence_bundle_writer_file(relative_path: &str) -> bool {
697 relative_path == EVIDENCE_BUNDLE_MANIFEST_FILE
698 || relative_path == EVIDENCE_BUNDLE_MANIFEST_TEMP_FILE
699}
700
701fn collect_federated_evidence_artifact_paths(
702 root: &Path,
703 current: &Path,
704 relative_paths: &mut Vec<String>,
705) -> Result<()> {
706 let entries = fs::read_dir(current)
707 .with_context(|| format!("reading artifact dir {}", current.display()))?;
708 for entry in entries {
709 let entry =
710 entry.with_context(|| format!("reading artifact entry in {}", current.display()))?;
711 let path = entry.path();
712 let file_type = entry
713 .file_type()
714 .with_context(|| format!("reading artifact file type {}", path.display()))?;
715 if file_type.is_dir() {
716 collect_federated_evidence_artifact_paths(root, &path, relative_paths)?;
717 } else if file_type.is_file() {
718 let relative_path = path.strip_prefix(root).with_context(|| {
719 format!(
720 "computing relative artifact path for {} under {}",
721 path.display(),
722 root.display()
723 )
724 })?;
725 let relative_path = relative_artifact_path_string(relative_path)?;
726 if !is_evidence_bundle_writer_file(&relative_path) {
727 relative_paths.push(relative_path);
728 }
729 } else {
730 return Err(anyhow::anyhow!(
731 "lexical artifact contains unsupported non-file entry: {}",
732 path.display()
733 ));
734 }
735 }
736 Ok(())
737}
738
739fn lexical_evidence_bundle_id(chunks: &[EvidenceBundleChunk]) -> Result<String> {
740 let mut hasher = blake3::Hasher::new();
741 hasher.update(b"cass-lexical-evidence-v1\n");
742 for chunk in chunks {
743 let bytes = serde_json::to_vec(chunk).context("serializing evidence bundle chunk")?;
744 hasher.update(&bytes);
745 hasher.update(b"\n");
746 }
747 Ok(format!("cass-lexical-{}", hasher.finalize().to_hex()))
748}
749
750fn lexical_search_evidence_bundle_manifest_with_roles(
751 index_path: &Path,
752 role_for_path: fn(&str) -> EvidenceBundleChunkRole,
753) -> Result<EvidenceBundleManifest> {
754 let mut relative_paths = Vec::new();
755 collect_federated_evidence_artifact_paths(index_path, index_path, &mut relative_paths)?;
756 relative_paths.sort();
757
758 let chunks = relative_paths
759 .into_iter()
760 .map(|relative_path| {
761 let role = role_for_path(&relative_path);
762 EvidenceBundleChunk::from_file(index_path, relative_path, role, true, None)
763 })
764 .collect::<Result<Vec<_>>>()?;
765 let bundle_id = lexical_evidence_bundle_id(&chunks)?;
766 let mut evidence =
767 EvidenceBundleManifest::new(bundle_id, EvidenceBundleKind::LexicalGeneration, 0);
768 evidence.chunks = chunks;
769 Ok(evidence)
770}
771
772pub fn lexical_search_evidence_bundle_manifest(
779 index_path: &Path,
780) -> Result<EvidenceBundleManifest> {
781 if let Some(manifest) = load_federated_search_manifest_internal(index_path)? {
782 validate_federated_search_manifest(index_path, &manifest, true)?;
783 return lexical_search_evidence_bundle_manifest_with_roles(
784 index_path,
785 federated_evidence_chunk_role,
786 );
787 }
788
789 current_schema_hash_file_matches(index_path)?;
790 searchable_index_summary(index_path)?.ok_or_else(|| {
791 anyhow::anyhow!(
792 "cannot build lexical evidence bundle because no searchable index exists in {}",
793 index_path.display()
794 )
795 })?;
796 lexical_search_evidence_bundle_manifest_with_roles(
797 index_path,
798 standard_lexical_evidence_chunk_role,
799 )
800}
801
802pub fn federated_search_evidence_bundle_manifest(
810 index_path: &Path,
811) -> Result<EvidenceBundleManifest> {
812 let Some(manifest) = load_federated_search_manifest_internal(index_path)? else {
813 return Err(anyhow::anyhow!(
814 "cannot build federated lexical evidence bundle without {} in {}",
815 FEDERATED_SEARCH_MANIFEST_FILE,
816 index_path.display()
817 ));
818 };
819 validate_federated_search_manifest(index_path, &manifest, true)?;
820 lexical_search_evidence_bundle_manifest_with_roles(index_path, federated_evidence_chunk_role)
821}
822
823pub fn write_federated_search_evidence_bundle_manifest(index_path: &Path) -> Result<PathBuf> {
824 let manifest = federated_search_evidence_bundle_manifest(index_path)?;
825 manifest.save(index_path)
826}
827
828pub fn write_lexical_search_evidence_bundle_manifest(index_path: &Path) -> Result<PathBuf> {
829 let manifest = lexical_search_evidence_bundle_manifest(index_path)?;
830 manifest.save(index_path)
831}
832
833fn load_federated_search_manifest_internal(
834 index_path: &Path,
835) -> Result<Option<FederatedSearchManifest>> {
836 let manifest_path = federated_search_manifest_path(index_path);
837 match fs::read(&manifest_path) {
838 Ok(bytes) => {
839 let manifest =
840 serde_json::from_slice::<FederatedSearchManifest>(&bytes).with_context(|| {
841 format!(
842 "parsing federated search manifest {}",
843 manifest_path.display()
844 )
845 })?;
846 validate_federated_search_manifest(index_path, &manifest, false).with_context(
847 || {
848 format!(
849 "validating federated search manifest {}",
850 manifest_path.display()
851 )
852 },
853 )?;
854 Ok(Some(manifest))
855 }
856 Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(None),
857 Err(err) => Err(err).with_context(|| {
858 format!(
859 "reading federated search manifest {}",
860 manifest_path.display()
861 )
862 }),
863 }
864}
865
866pub fn searchable_index_exists(index_path: &Path) -> bool {
867 index_path.join("meta.json").exists() || federated_search_manifest_path(index_path).exists()
868}
869
870pub fn validate_searchable_index_contract(index_path: &Path) -> Result<()> {
871 if let Some(manifest) = load_federated_search_manifest_internal(index_path)? {
872 validate_federated_search_manifest(index_path, &manifest, true)?;
873 for shard in manifest.shards {
874 let shard_path = index_path.join(&shard.relative_path);
875 fs_cass_open_search_reader(&shard_path, FsReloadPolicy::Manual)
876 .map_err(map_fs_err)
877 .with_context(|| {
878 format!(
879 "opening federated lexical shard reader {}",
880 shard_path.display()
881 )
882 })?;
883 }
884 return Ok(());
885 }
886
887 if !index_path.join("meta.json").exists() {
888 return Err(anyhow::anyhow!(
889 "standard lexical index metadata is missing in {}",
890 index_path.display()
891 ));
892 }
893 current_schema_hash_file_matches(index_path)?;
894 fs_cass_open_search_reader(index_path, FsReloadPolicy::Manual)
895 .map_err(map_fs_err)
896 .with_context(|| {
897 format!(
898 "opening standard lexical index reader {}",
899 index_path.display()
900 )
901 })?;
902 Ok(())
903}
904
905pub fn searchable_index_modified_time(index_path: &Path) -> Option<SystemTime> {
906 let meta_path = index_path.join("meta.json");
907 if meta_path.exists() {
908 return fs::metadata(meta_path).and_then(|m| m.modified()).ok();
909 }
910 fs::metadata(federated_search_manifest_path(index_path))
911 .and_then(|m| m.modified())
912 .ok()
913}
914
915pub fn searchable_index_fingerprint(index_path: &Path) -> Result<Option<String>> {
916 let meta_path = index_path.join("meta.json");
917 match fs::read(&meta_path) {
918 Ok(bytes) => Ok(Some(blake3::hash(&bytes).to_hex().to_string())),
919 Err(err) if err.kind() == std::io::ErrorKind::NotFound => {
920 let manifest_path = federated_search_manifest_path(index_path);
921 match fs::read(&manifest_path) {
922 Ok(bytes) => Ok(Some(blake3::hash(&bytes).to_hex().to_string())),
923 Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(None),
924 Err(err) => Err(err).with_context(|| {
925 format!(
926 "reading federated search manifest {}",
927 manifest_path.display()
928 )
929 }),
930 }
931 }
932 Err(err) => {
933 Err(err).with_context(|| format!("reading Tantivy meta file {}", meta_path.display()))
934 }
935 }
936}
937
938pub fn searchable_index_summary(index_path: &Path) -> Result<Option<SearchableIndexSummary>> {
939 if let Some(manifest) = load_federated_search_manifest_internal(index_path)? {
940 return federated_search_manifest_summary(index_path, &manifest).map(Some);
941 }
942
943 let meta_path = index_path.join("meta.json");
944 if !meta_path.exists() {
945 return Ok(None);
946 }
947
948 if let Some(summary) = searchable_index_summary_from_tantivy_meta(index_path)? {
949 return Ok(Some(summary));
950 }
951
952 let mut index = Index::open_in_dir(index_path).with_context(|| {
953 format!(
954 "opening searchable Tantivy index directory for summary: {}",
955 index_path.display()
956 )
957 })?;
958 ensure_tokenizer(&mut index);
959 let segment_metas = index
960 .searchable_segment_metas()
961 .context("reading searchable segment metadata for Tantivy summary")?;
962 Ok(Some(SearchableIndexSummary {
963 docs: segment_metas
964 .iter()
965 .map(|segment| segment.num_docs() as usize)
966 .sum(),
967 segments: segment_metas.len(),
968 }))
969}
970
971fn searchable_index_summary_from_tantivy_meta(
972 index_path: &Path,
973) -> Result<Option<SearchableIndexSummary>> {
974 let meta_path = index_path.join("meta.json");
975 let bytes = fs::read(&meta_path)
976 .with_context(|| format!("reading Tantivy meta file {}", meta_path.display()))?;
977 let meta: serde_json::Value = serde_json::from_slice(&bytes)
978 .with_context(|| format!("parsing Tantivy meta file {}", meta_path.display()))?;
979 let Some(segments) = meta.get("segments").and_then(|value| value.as_array()) else {
980 return Ok(None);
981 };
982
983 let mut docs = 0usize;
984 for segment in segments {
985 if segment
986 .get("deletes")
987 .is_some_and(|deletes| !deletes.is_null())
988 {
989 return Ok(None);
990 }
991 let Some(max_doc) = segment.get("max_doc").and_then(|value| value.as_u64()) else {
992 return Ok(None);
993 };
994 let max_doc = usize::try_from(max_doc).with_context(|| {
995 format!(
996 "Tantivy segment max_doc exceeds platform usize in {}",
997 meta_path.display()
998 )
999 })?;
1000 docs = docs.checked_add(max_doc).with_context(|| {
1001 format!(
1002 "Tantivy segment doc count overflows platform usize in {}",
1003 meta_path.display()
1004 )
1005 })?;
1006 }
1007
1008 Ok(Some(SearchableIndexSummary {
1009 docs,
1010 segments: segments.len(),
1011 }))
1012}
1013
1014pub fn open_federated_search_readers(
1015 index_path: &Path,
1016 reload_policy: FsReloadPolicy,
1017) -> Result<Option<Vec<(IndexReader, Fields)>>> {
1018 let Some(manifest) = load_federated_search_manifest_internal(index_path)? else {
1019 return Ok(None);
1020 };
1021 validate_federated_search_manifest(index_path, &manifest, true)?;
1022
1023 let readers = manifest
1024 .shards
1025 .into_iter()
1026 .map(|shard| {
1027 let shard_path = index_path.join(&shard.relative_path);
1028 fs_cass_open_search_reader(&shard_path, reload_policy)
1029 .map_err(map_fs_err)
1030 .with_context(|| {
1031 format!(
1032 "opening federated lexical shard reader {}",
1033 shard_path.display()
1034 )
1035 })
1036 })
1037 .collect::<Result<Vec<_>>>()?;
1038 Ok(Some(readers))
1039}
1040
1041fn materialize_federated_search_bundle_for_write(index_path: &Path) -> Result<()> {
1042 let Some(manifest) = load_federated_search_manifest_internal(index_path)? else {
1043 return Ok(());
1044 };
1045 validate_federated_search_manifest(index_path, &manifest, true)?;
1046
1047 let stage_parent = index_path.parent().unwrap_or(index_path);
1048 let materialize_root = tempfile::Builder::new()
1049 .prefix("cass-federated-materialize-")
1050 .tempdir_in(stage_parent)
1051 .with_context(|| {
1052 format!(
1053 "creating staging directory to materialize federated lexical bundle {}",
1054 index_path.display()
1055 )
1056 })?;
1057 let materialized_index_path = materialize_root.path().join("index");
1058 let shard_paths = manifest
1059 .shards
1060 .iter()
1061 .map(|shard| index_path.join(&shard.relative_path))
1062 .collect::<Vec<_>>();
1063
1064 TantivyIndex::assemble_compatible_index_directory_files(&materialized_index_path, &shard_paths)
1065 .with_context(|| {
1066 format!(
1067 "materializing federated lexical bundle into mutable Tantivy index {}",
1068 index_path.display()
1069 )
1070 })?;
1071
1072 if ensure_replaceable_federated_materialization_root(index_path)? {
1073 fs::remove_dir_all(index_path).with_context(|| {
1074 format!(
1075 "removing federated lexical bundle before mutable materialization {}",
1076 index_path.display()
1077 )
1078 })?;
1079 }
1080 fs::rename(&materialized_index_path, index_path).with_context(|| {
1081 format!(
1082 "publishing materialized mutable Tantivy index {} -> {}",
1083 materialized_index_path.display(),
1084 index_path.display()
1085 )
1086 })?;
1087 materialize_root
1088 .close()
1089 .context("closing federated lexical materialization staging directory")?;
1090 Ok(())
1091}
1092
1093fn ensure_replaceable_federated_materialization_root(index_path: &Path) -> Result<bool> {
1094 match fs::symlink_metadata(index_path) {
1095 Ok(metadata) => {
1096 let file_type = metadata.file_type();
1097 if file_type.is_symlink() {
1098 return Err(anyhow::anyhow!(
1099 "refusing to materialize federated lexical bundle through symlink: {}",
1100 index_path.display()
1101 ));
1102 }
1103 if !file_type.is_dir() {
1104 return Err(anyhow::anyhow!(
1105 "refusing to materialize federated lexical bundle because root is not a directory: {}",
1106 index_path.display()
1107 ));
1108 }
1109 Ok(true)
1110 }
1111 Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(false),
1112 Err(err) => Err(err).with_context(|| {
1113 format!(
1114 "checking federated lexical bundle root before materialization: {}",
1115 index_path.display()
1116 )
1117 }),
1118 }
1119}
1120
1121pub fn publish_federated_searchable_index_directories<P: AsRef<Path>>(
1122 output_path: &Path,
1123 input_paths: &[P],
1124) -> Result<SearchableIndexSummary> {
1125 if input_paths.is_empty() {
1126 return Err(anyhow::anyhow!(
1127 "cannot publish federated lexical bundle without at least one input shard"
1128 ));
1129 }
1130 let mut input_summaries = Vec::with_capacity(input_paths.len());
1131 for input_path in input_paths {
1132 let input_path = input_path.as_ref();
1133 let summary = searchable_index_summary(input_path)?.ok_or_else(|| {
1134 anyhow::anyhow!(
1135 "federated lexical publish input is not a searchable index: {}",
1136 input_path.display()
1137 )
1138 })?;
1139 input_summaries.push((input_path.to_path_buf(), summary));
1140 }
1141 publish_federated_searchable_index_directories_with_summaries(output_path, &input_summaries)
1142}
1143
1144pub fn publish_federated_searchable_index_directories_with_summaries(
1145 output_path: &Path,
1146 input_shards: &[(PathBuf, SearchableIndexSummary)],
1147) -> Result<SearchableIndexSummary> {
1148 if input_shards.is_empty() {
1149 return Err(anyhow::anyhow!(
1150 "cannot publish federated lexical bundle without at least one input shard"
1151 ));
1152 }
1153 ensure_empty_merge_output_directory(output_path)?;
1154
1155 let shard_root = output_path.join("shards");
1156 fs::create_dir_all(&shard_root).with_context(|| {
1157 format!(
1158 "creating federated lexical shard root {}",
1159 shard_root.display()
1160 )
1161 })?;
1162
1163 let mut manifest = FederatedSearchManifest {
1164 version: FEDERATED_SEARCH_MANIFEST_VERSION,
1165 kind: FEDERATED_SEARCH_MANIFEST_KIND.to_string(),
1166 schema_hash: CASS_SCHEMA_HASH.to_string(),
1167 shards: Vec::with_capacity(input_shards.len()),
1168 };
1169 let mut total_docs = 0usize;
1170 let mut total_segments = 0usize;
1171
1172 for (shard_idx, (input_path, summary)) in input_shards.iter().enumerate() {
1173 if !searchable_index_exists(input_path) {
1174 return Err(anyhow::anyhow!(
1175 "federated lexical publish input is not a searchable index: {}",
1176 input_path.display()
1177 ));
1178 }
1179 let meta_fingerprint = meta_fingerprint_for_existing_index_dir(input_path)?;
1180 let relative_path = manifest_relative_shard_path(shard_idx);
1181 let destination_path = output_path.join(&relative_path);
1182 if let Some(parent) = destination_path.parent() {
1183 fs::create_dir_all(parent).with_context(|| {
1184 format!(
1185 "creating parent directory for federated lexical shard {}",
1186 destination_path.display()
1187 )
1188 })?;
1189 }
1190 fs::rename(input_path, &destination_path).with_context(|| {
1191 format!(
1192 "moving staged lexical shard {} into federated publish bundle {}",
1193 input_path.display(),
1194 destination_path.display()
1195 )
1196 })?;
1197
1198 total_docs = total_docs.checked_add(summary.docs).with_context(|| {
1199 format!(
1200 "federated lexical publish doc count overflows platform usize for {}",
1201 output_path.display()
1202 )
1203 })?;
1204 total_segments = total_segments
1205 .checked_add(summary.segments)
1206 .with_context(|| {
1207 format!(
1208 "federated lexical publish segment count overflows platform usize for {}",
1209 output_path.display()
1210 )
1211 })?;
1212 manifest.shards.push(FederatedSearchShardManifest {
1213 relative_path,
1214 docs: summary.docs,
1215 segments: summary.segments,
1216 meta_fingerprint,
1217 });
1218 }
1219
1220 let manifest_bytes =
1221 serde_json::to_vec_pretty(&manifest).context("serializing federated search manifest")?;
1222 fs::write(federated_search_manifest_path(output_path), &manifest_bytes).with_context(|| {
1223 format!(
1224 "writing federated search manifest {}",
1225 federated_search_manifest_path(output_path).display()
1226 )
1227 })?;
1228 write_root_schema_hash_file(output_path)?;
1229
1230 Ok(SearchableIndexSummary {
1231 docs: total_docs,
1232 segments: total_segments,
1233 })
1234}
1235
1236pub struct TantivyIndex {
1237 inner: FsCassTantivyIndex,
1238 pub fields: Fields,
1239}
1240
1241impl TantivyIndex {
1242 pub fn open_or_create(path: &Path) -> Result<Self> {
1243 materialize_federated_search_bundle_for_write(path)?;
1244 let inner = FsCassTantivyIndex::open_or_create_with_writer_parallelism(
1245 path,
1246 tantivy_writer_parallelism_hint(),
1247 )
1248 .map_err(map_fs_err)?;
1249 let fields = inner.fields();
1250 Ok(Self { inner, fields })
1251 }
1252
1253 pub fn open_or_create_with_writer_parallelism(
1254 path: &Path,
1255 writer_parallelism: usize,
1256 ) -> Result<Self> {
1257 materialize_federated_search_bundle_for_write(path)?;
1258 let inner = FsCassTantivyIndex::open_or_create_with_writer_parallelism(
1259 path,
1260 writer_parallelism.max(1),
1261 )
1262 .map_err(map_fs_err)?;
1263 let fields = inner.fields();
1264 Ok(Self { inner, fields })
1265 }
1266
1267 pub fn add_conversation(&mut self, conv: &NormalizedConversation) -> Result<()> {
1268 let provenance = ConversationPacketProvenance::local();
1275 let packet = ConversationPacket::from_normalized_conversation(conv, provenance);
1276 self.add_messages_from_packet(&packet, None, None, |_| Ok(()))
1277 }
1278
1279 pub fn add_conversation_with_id(
1280 &mut self,
1281 conv: &NormalizedConversation,
1282 conversation_id: Option<i64>,
1283 ) -> Result<()> {
1284 let provenance = ConversationPacketProvenance::local();
1285 let mut packet = ConversationPacket::from_normalized_conversation(conv, provenance);
1286 packet.payload.identity.conversation_id = conversation_id;
1289 self.add_messages_from_packet(&packet, None, conversation_id, |_| Ok(()))
1290 }
1291
1292 pub fn delete_all(&mut self) -> Result<()> {
1293 self.inner.delete_all().map_err(map_fs_err)
1294 }
1295
1296 pub fn commit(&mut self) -> Result<()> {
1297 self.inner.commit().map_err(map_fs_err)
1298 }
1299
1300 pub fn configure_bulk_load_merge_policy(&mut self) {
1301 self.inner.configure_bulk_load_merge_policy();
1302 }
1303
1304 pub fn reader(&self) -> Result<IndexReader> {
1305 self.inner.reader().map_err(map_fs_err)
1306 }
1307
1308 pub fn segment_count(&self) -> usize {
1309 self.inner.segment_count()
1310 }
1311
1312 pub fn merge_status(&self) -> MergeStatus {
1313 self.inner.merge_status()
1314 }
1315
1316 pub fn optimize_if_idle(&mut self) -> Result<bool> {
1319 self.inner.optimize_if_idle().map_err(map_fs_err)
1320 }
1321
1322 pub fn force_merge(&mut self) -> Result<()> {
1325 self.inner.force_merge().map_err(map_fs_err)
1326 }
1327
1328 pub fn merge_compatible_index_directories<P: AsRef<Path>>(
1329 output_path: &Path,
1330 input_paths: &[P],
1331 ) -> Result<Self> {
1332 if input_paths.is_empty() {
1333 return Err(anyhow::anyhow!(
1334 "cannot merge Tantivy index directories without at least one input"
1335 ));
1336 }
1337 ensure_empty_merge_output_directory(output_path)?;
1338
1339 let indices = input_paths
1340 .iter()
1341 .map(|input_path| {
1342 let input_path = input_path.as_ref();
1343 let mut index = Index::open_in_dir(input_path).with_context(|| {
1344 format!(
1345 "opening compatible Tantivy index directory for merge: {}",
1346 input_path.display()
1347 )
1348 })?;
1349 ensure_tokenizer(&mut index);
1350 Ok(index)
1351 })
1352 .collect::<Result<Vec<_>>>()?;
1353 let output_directory = tantivy_crate::directory::MmapDirectory::open(output_path)
1354 .with_context(|| {
1355 format!(
1356 "opening Tantivy output directory for merged index: {}",
1357 output_path.display()
1358 )
1359 })?;
1360 let mut merged = tantivy_crate::indexer::merge_indices(&indices, output_directory)
1361 .with_context(|| {
1362 format!(
1363 "merging {} compatible Tantivy index directories into {}",
1364 indices.len(),
1365 output_path.display()
1366 )
1367 })?;
1368 ensure_tokenizer(&mut merged);
1369 fs::write(
1370 output_path.join("schema_hash.json"),
1371 format!("{{\"schema_hash\":\"{CASS_SCHEMA_HASH}\"}}"),
1372 )
1373 .with_context(|| {
1374 format!(
1375 "writing cass schema hash metadata for merged Tantivy index {}",
1376 output_path.display()
1377 )
1378 })?;
1379 drop(merged);
1380 Self::open_or_create(output_path)
1381 }
1382
1383 pub fn assemble_compatible_index_directories<P: AsRef<Path>>(
1384 output_path: &Path,
1385 input_paths: &[P],
1386 ) -> Result<Self> {
1387 Self::assemble_compatible_index_directory_files(output_path, input_paths)?;
1388 Self::open_or_create(output_path)
1389 }
1390
1391 fn assemble_compatible_index_directory_files<P: AsRef<Path>>(
1392 output_path: &Path,
1393 input_paths: &[P],
1394 ) -> Result<()> {
1395 if input_paths.is_empty() {
1396 return Err(anyhow::anyhow!(
1397 "cannot assemble Tantivy index directories without at least one input"
1398 ));
1399 }
1400 ensure_empty_merge_output_directory(output_path)?;
1401
1402 let mut combined_index_meta: Option<tantivy_crate::IndexMeta> = None;
1403 let mut combined_segments = Vec::new();
1404 let mut max_opstamp = 0u64;
1405 let mut managed_paths = BTreeSet::new();
1406
1407 for input_path in input_paths {
1408 let input_path = input_path.as_ref();
1409 let mut index = Index::open_in_dir(input_path).with_context(|| {
1410 format!(
1411 "opening compatible Tantivy index directory for assembly: {}",
1412 input_path.display()
1413 )
1414 })?;
1415 ensure_tokenizer(&mut index);
1416 let metas = index.load_metas().with_context(|| {
1417 format!(
1418 "loading Tantivy metadata for assembled index input {}",
1419 input_path.display()
1420 )
1421 })?;
1422
1423 match &mut combined_index_meta {
1424 Some(combined_meta) => {
1425 if metas.schema != combined_meta.schema {
1426 return Err(anyhow::anyhow!(
1427 "attempted to assemble Tantivy index directories with different schemas"
1428 ));
1429 }
1430 if metas.index_settings != combined_meta.index_settings {
1431 return Err(anyhow::anyhow!(
1432 "attempted to assemble Tantivy index directories with different index settings"
1433 ));
1434 }
1435 }
1436 None => {
1437 combined_index_meta = Some(tantivy_crate::IndexMeta {
1438 index_settings: metas.index_settings.clone(),
1439 segments: Vec::new(),
1440 schema: metas.schema.clone(),
1441 opstamp: 0,
1442 payload: None,
1443 });
1444 }
1445 }
1446
1447 max_opstamp = max_opstamp.max(metas.opstamp);
1448 for segment in metas.segments {
1449 for relative_path in segment.list_files() {
1450 let source_path = input_path.join(&relative_path);
1451 if !source_path.exists() {
1452 continue;
1453 }
1454 link_or_copy_searchable_index_file(&source_path, output_path, &relative_path)?;
1455 if !managed_paths.insert(relative_path.clone()) {
1456 return Err(anyhow::anyhow!(
1457 "assembled Tantivy index would contain duplicate segment file path {}",
1458 relative_path.display()
1459 ));
1460 }
1461 }
1462 combined_segments.push(segment);
1463 }
1464 }
1465
1466 let mut combined_index_meta = combined_index_meta.ok_or_else(|| {
1467 anyhow::anyhow!("cannot assemble Tantivy index directories without index metadata")
1468 })?;
1469 combined_index_meta.segments = combined_segments;
1470 combined_index_meta.opstamp = max_opstamp;
1471 combined_index_meta.payload = Some(format!(
1472 "Cass assembled {} compatible Tantivy segments from {} input directories",
1473 combined_index_meta.segments.len(),
1474 input_paths.len()
1475 ));
1476
1477 write_searchable_generation_metadata(
1478 output_path,
1479 &combined_index_meta,
1480 &mut managed_paths,
1481 )?;
1482 Ok(())
1483 }
1484
1485 pub fn add_messages(
1486 &mut self,
1487 conv: &NormalizedConversation,
1488 messages: &[NormalizedMessage],
1489 ) -> Result<()> {
1490 self.add_messages_with_conversation_id(conv, messages, None)
1491 }
1492
1493 pub fn add_messages_with_conversation_id(
1494 &mut self,
1495 conv: &NormalizedConversation,
1496 messages: &[NormalizedMessage],
1497 conversation_id: Option<i64>,
1498 ) -> Result<()> {
1499 self.add_messages_with_conversation_id_and_batch_hook(
1500 conv,
1501 messages,
1502 conversation_id,
1503 |_| Ok(()),
1504 )
1505 }
1506
1507 pub fn add_messages_with_conversation_id_and_batch_hook<F>(
1508 &mut self,
1509 conv: &NormalizedConversation,
1510 messages: &[NormalizedMessage],
1511 conversation_id: Option<i64>,
1512 mut on_batch_flushed: F,
1513 ) -> Result<()>
1514 where
1515 F: FnMut(usize) -> Result<()>,
1516 {
1517 let context = cass_doc_context(conv, conversation_id);
1518 let max_messages = tantivy_add_batch_max_messages();
1519 let max_chars = tantivy_add_batch_max_chars();
1520 let mut docs: Vec<FsCassDocument> = Vec::new();
1521 let mut pending_chars = 0usize;
1522
1523 for msg in messages {
1524 let Some(doc) = cass_document_for_message(&context, msg) else {
1525 continue;
1526 };
1527 push_cass_document_into_pending(&mut docs, &mut pending_chars, doc);
1528 if docs.len() >= max_messages || pending_chars >= max_chars {
1529 let flushed_docs = docs.len();
1530 self.inner.add_cass_documents(&docs).map_err(map_fs_err)?;
1531 on_batch_flushed(flushed_docs)?;
1532 docs.clear();
1533 pending_chars = 0;
1534 }
1535 }
1536
1537 if docs.is_empty() {
1538 Ok(())
1539 } else {
1540 let flushed_docs = docs.len();
1541 self.inner.add_cass_documents(&docs).map_err(map_fs_err)?;
1542 on_batch_flushed(flushed_docs)
1543 }
1544 }
1545
1546 pub fn add_messages_from_packet<F>(
1563 &mut self,
1564 packet: &ConversationPacket,
1565 message_indices: Option<&[usize]>,
1566 conversation_id_override: Option<i64>,
1567 mut on_batch_flushed: F,
1568 ) -> Result<()>
1569 where
1570 F: FnMut(usize) -> Result<()>,
1571 {
1572 let mut context = cass_doc_context_from_packet(packet);
1573 if let Some(id) = conversation_id_override {
1574 context.conversation_id = Some(id);
1575 }
1576
1577 let max_messages = tantivy_add_batch_max_messages();
1578 let max_chars = tantivy_add_batch_max_chars();
1579 let mut docs: Vec<FsCassDocument> = Vec::new();
1580 let mut pending_chars = 0usize;
1581
1582 let messages = &packet.payload.messages;
1583 let total = messages.len();
1584 if let Some(indices) = message_indices {
1585 for &i in indices {
1586 let Some(msg) = messages.get(i) else {
1587 anyhow::bail!(
1588 "packet message index {} out of range for packet with {} messages",
1589 i,
1590 total
1591 );
1592 };
1593 push_packet_message_into_pending(
1594 &mut self.inner,
1595 &context,
1596 msg,
1597 &mut docs,
1598 &mut pending_chars,
1599 max_messages,
1600 max_chars,
1601 &mut on_batch_flushed,
1602 )?;
1603 }
1604 } else {
1605 for msg in messages {
1606 push_packet_message_into_pending(
1607 &mut self.inner,
1608 &context,
1609 msg,
1610 &mut docs,
1611 &mut pending_chars,
1612 max_messages,
1613 max_chars,
1614 &mut on_batch_flushed,
1615 )?;
1616 }
1617 }
1618
1619 if docs.is_empty() {
1620 Ok(())
1621 } else {
1622 let flushed_docs = docs.len();
1623 self.inner.add_cass_documents(&docs).map_err(map_fs_err)?;
1624 on_batch_flushed(flushed_docs)
1625 }
1626 }
1627
1628 pub fn add_prebuilt_documents_slice(&mut self, documents: &[FsCassDocument]) -> Result<usize> {
1629 let max_messages = tantivy_prebuilt_add_batch_max_messages();
1630 let max_chars = tantivy_add_batch_max_chars();
1631 let mut indexed_docs = 0usize;
1632 let mut batch_start = 0usize;
1633 let mut pending_chars = 0usize;
1634
1635 for (idx, doc) in documents.iter().enumerate() {
1636 pending_chars = pending_chars.saturating_add(doc.content.len());
1637 let batch_len = idx + 1 - batch_start;
1638 if batch_len >= max_messages || pending_chars >= max_chars {
1639 let batch_end = idx + 1;
1640 indexed_docs = indexed_docs.saturating_add(batch_end - batch_start);
1641 let Some(batch) = documents.get(batch_start..batch_end) else {
1642 anyhow::bail!(
1643 "invalid Tantivy prebuilt document batch range {}..{} for {} documents",
1644 batch_start,
1645 batch_end,
1646 documents.len()
1647 );
1648 };
1649 self.inner.add_cass_documents(batch).map_err(map_fs_err)?;
1650 batch_start = batch_end;
1651 pending_chars = 0;
1652 }
1653 }
1654
1655 if batch_start < documents.len() {
1656 indexed_docs = indexed_docs.saturating_add(documents.len() - batch_start);
1657 let Some(batch) = documents.get(batch_start..) else {
1658 anyhow::bail!(
1659 "invalid Tantivy prebuilt document tail range {}.. for {} documents",
1660 batch_start,
1661 documents.len()
1662 );
1663 };
1664 self.inner.add_cass_documents(batch).map_err(map_fs_err)?;
1665 }
1666
1667 Ok(indexed_docs)
1668 }
1669
1670 pub fn add_prebuilt_document_refs_slice<'a>(
1671 &mut self,
1672 documents: &[FsCassDocumentRef<'a>],
1673 ) -> Result<usize> {
1674 let max_messages = tantivy_prebuilt_add_batch_max_messages();
1675 let max_chars = tantivy_add_batch_max_chars();
1676 let mut indexed_docs = 0usize;
1677 let mut batch_start = 0usize;
1678 let mut pending_chars = 0usize;
1679
1680 for (idx, doc) in documents.iter().enumerate() {
1681 pending_chars = pending_chars.saturating_add(doc.content.len());
1682 let batch_len = idx + 1 - batch_start;
1683 if batch_len >= max_messages || pending_chars >= max_chars {
1684 let batch_end = idx + 1;
1685 indexed_docs = indexed_docs.saturating_add(batch_end - batch_start);
1686 let Some(batch) = documents.get(batch_start..batch_end) else {
1687 anyhow::bail!(
1688 "invalid Tantivy prebuilt document ref batch range {}..{} for {} documents",
1689 batch_start,
1690 batch_end,
1691 documents.len()
1692 );
1693 };
1694 self.inner
1695 .add_cass_document_refs(batch)
1696 .map_err(map_fs_err)?;
1697 batch_start = batch_end;
1698 pending_chars = 0;
1699 }
1700 }
1701
1702 if batch_start < documents.len() {
1703 indexed_docs = indexed_docs.saturating_add(documents.len() - batch_start);
1704 let Some(batch) = documents.get(batch_start..) else {
1705 anyhow::bail!(
1706 "invalid Tantivy prebuilt document ref tail range {}.. for {} documents",
1707 batch_start,
1708 documents.len()
1709 );
1710 };
1711 self.inner
1712 .add_cass_document_refs(batch)
1713 .map_err(map_fs_err)?;
1714 }
1715
1716 Ok(indexed_docs)
1717 }
1718
1719 pub fn add_prebuilt_documents<I>(&mut self, documents: I) -> Result<usize>
1720 where
1721 I: IntoIterator<Item = FsCassDocument>,
1722 {
1723 let max_messages = tantivy_prebuilt_add_batch_max_messages();
1724 let max_chars = tantivy_add_batch_max_chars();
1725 let mut docs = Vec::new();
1726 let mut pending_chars = 0usize;
1727 let mut indexed_docs = 0usize;
1728
1729 for doc in documents {
1730 pending_chars = pending_chars.saturating_add(doc.content.len());
1731 docs.push(doc);
1732 if docs.len() >= max_messages || pending_chars >= max_chars {
1733 indexed_docs = indexed_docs.saturating_add(docs.len());
1734 self.inner.add_cass_documents(&docs).map_err(map_fs_err)?;
1735 docs.clear();
1736 pending_chars = 0;
1737 }
1738 }
1739
1740 if !docs.is_empty() {
1741 indexed_docs = indexed_docs.saturating_add(docs.len());
1742 self.inner.add_cass_documents(&docs).map_err(map_fs_err)?;
1743 }
1744
1745 Ok(indexed_docs)
1746 }
1747
1748 pub fn add_conversations_with_ids<'a, I>(&mut self, conversations: I) -> Result<usize>
1749 where
1750 I: IntoIterator<Item = (&'a NormalizedConversation, Option<i64>)>,
1751 {
1752 let max_messages = tantivy_add_batch_max_messages();
1753 let max_chars = tantivy_add_batch_max_chars();
1754 let mut docs: Vec<FsCassDocument> = Vec::new();
1755 let mut pending_chars = 0usize;
1756 let mut indexed_docs = 0usize;
1757
1758 for (conv, conversation_id) in conversations {
1759 let context = cass_doc_context(conv, conversation_id);
1760 for msg in &conv.messages {
1761 let Some(doc) = cass_document_for_message(&context, msg) else {
1762 continue;
1763 };
1764 push_cass_document_into_pending(&mut docs, &mut pending_chars, doc);
1765 if docs.len() >= max_messages || pending_chars >= max_chars {
1766 indexed_docs = indexed_docs.saturating_add(docs.len());
1767 self.inner.add_cass_documents(&docs).map_err(map_fs_err)?;
1768 docs.clear();
1769 pending_chars = 0;
1770 }
1771 }
1772 }
1773
1774 if !docs.is_empty() {
1775 indexed_docs = indexed_docs.saturating_add(docs.len());
1776 self.inner.add_cass_documents(&docs).map_err(map_fs_err)?;
1777 }
1778
1779 Ok(indexed_docs)
1780 }
1781}
1782
1783pub fn build_schema() -> Schema {
1784 fs_build_schema()
1785}
1786
1787pub fn fields_from_schema(schema: &Schema) -> Result<Fields> {
1788 fs_fields_from_schema(schema).map_err(map_fs_err)
1789}
1790
1791pub fn expected_index_dir(base: &Path) -> std::path::PathBuf {
1792 base.join("index").join(CASS_SCHEMA_VERSION)
1793}
1794
1795pub fn index_dir(base: &Path) -> Result<std::path::PathBuf> {
1796 fs_index_dir(base).map_err(map_fs_err)
1797}
1798
1799pub fn ensure_tokenizer(index: &mut Index) {
1800 fs_ensure_tokenizer(index);
1801}
1802
1803fn ensure_empty_merge_output_directory(output_path: &Path) -> Result<()> {
1804 match fs::metadata(output_path) {
1805 Ok(metadata) => {
1806 if !metadata.is_dir() {
1807 return Err(anyhow::anyhow!(
1808 "merged Tantivy output path is not a directory: {}",
1809 output_path.display()
1810 ));
1811 }
1812 let mut entries = fs::read_dir(output_path).with_context(|| {
1813 format!(
1814 "reading merged Tantivy output directory before merge: {}",
1815 output_path.display()
1816 )
1817 })?;
1818 if entries.next().transpose()?.is_some() {
1819 return Err(anyhow::anyhow!(
1820 "merged Tantivy output directory must be empty before merge: {}",
1821 output_path.display()
1822 ));
1823 }
1824 }
1825 Err(err) if err.kind() == std::io::ErrorKind::NotFound => {
1826 fs::create_dir_all(output_path).with_context(|| {
1827 format!(
1828 "creating merged Tantivy output directory before merge: {}",
1829 output_path.display()
1830 )
1831 })?;
1832 }
1833 Err(err) => {
1834 return Err(err).with_context(|| {
1835 format!(
1836 "checking merged Tantivy output directory before merge: {}",
1837 output_path.display()
1838 )
1839 });
1840 }
1841 }
1842 Ok(())
1843}
1844
1845fn link_or_copy_searchable_index_file(
1846 source_path: &Path,
1847 output_path: &Path,
1848 relative_path: &Path,
1849) -> Result<()> {
1850 let destination_path = output_path.join(relative_path);
1851 if destination_path.exists() {
1852 return Err(anyhow::anyhow!(
1853 "assembled Tantivy output path already exists: {}",
1854 destination_path.display()
1855 ));
1856 }
1857
1858 match fs::hard_link(source_path, &destination_path) {
1859 Ok(()) => Ok(()),
1860 Err(err)
1861 if matches!(
1862 err.kind(),
1863 std::io::ErrorKind::PermissionDenied
1864 | std::io::ErrorKind::CrossesDevices
1865 | std::io::ErrorKind::Unsupported
1866 ) =>
1867 {
1868 fs::copy(source_path, &destination_path).with_context(|| {
1869 format!(
1870 "copying Tantivy segment file into assembled generation {} -> {}",
1871 source_path.display(),
1872 destination_path.display()
1873 )
1874 })?;
1875 Ok(())
1876 }
1877 Err(err) => Err(err).with_context(|| {
1878 format!(
1879 "hard-linking Tantivy segment file into assembled generation {} -> {}",
1880 source_path.display(),
1881 destination_path.display()
1882 )
1883 }),
1884 }
1885}
1886
1887fn write_searchable_generation_metadata(
1888 output_path: &Path,
1889 index_meta: &tantivy_crate::IndexMeta,
1890 managed_paths: &mut BTreeSet<std::path::PathBuf>,
1891) -> Result<()> {
1892 let meta_path = output_path.join("meta.json");
1893 fs::write(
1894 &meta_path,
1895 serde_json::to_vec_pretty(index_meta).context("serializing assembled Tantivy meta.json")?,
1896 )
1897 .with_context(|| {
1898 format!(
1899 "writing assembled Tantivy meta.json for {}",
1900 output_path.display()
1901 )
1902 })?;
1903 managed_paths.insert(std::path::PathBuf::from("meta.json"));
1904 fs::write(
1905 output_path.join(".managed.json"),
1906 serde_json::to_vec(managed_paths).context("serializing assembled Tantivy managed paths")?,
1907 )
1908 .with_context(|| {
1909 format!(
1910 "writing assembled Tantivy managed file manifest for {}",
1911 output_path.display()
1912 )
1913 })?;
1914 fs::write(
1915 output_path.join("schema_hash.json"),
1916 format!("{{\"schema_hash\":\"{CASS_SCHEMA_HASH}\"}}"),
1917 )
1918 .with_context(|| {
1919 format!(
1920 "writing cass schema hash metadata for assembled Tantivy index {}",
1921 output_path.display()
1922 )
1923 })?;
1924 Ok(())
1925}
1926
1927#[cfg(test)]
1928mod tests {
1929 use super::*;
1930 use crate::connectors::{NormalizedConversation, NormalizedMessage};
1931 use serde_json::Value;
1932 use std::path::PathBuf;
1933 use tempfile::TempDir;
1934
1935 #[test]
1936 fn open_or_create_roundtrip() {
1937 let dir = TempDir::new().expect("temp dir");
1938 let idx = TantivyIndex::open_or_create(dir.path()).expect("create index");
1939 let reader = idx.reader().expect("reader");
1940 let searcher = reader.searcher();
1941 assert_eq!(searcher.num_docs(), 0);
1942 }
1943
1944 #[test]
1945 fn schema_hash_matches_current_hash() {
1946 assert!(schema_hash_matches(SCHEMA_HASH));
1947 assert!(!schema_hash_matches("invalid"));
1948 }
1949
1950 #[test]
1951 fn default_tantivy_writer_cap_scales_with_memory() -> Result<(), &'static str> {
1952 const GIB: u64 = 1024 * 1024 * 1024;
1953
1954 require_default_tantivy_writer_cap(None, 26, "unknown memory uses ceiling")?;
1955 require_default_tantivy_writer_cap(Some(8 * GIB), 1, "8 GiB caps at one thread")?;
1956 require_default_tantivy_writer_cap(Some(24 * GIB), 2, "24 GiB caps at two threads")?;
1957 require_default_tantivy_writer_cap(Some(64 * GIB), 6, "64 GiB allows six threads")?;
1958 require_default_tantivy_writer_cap(Some(512 * GIB), 26, "512 GiB reaches ceiling")?;
1959 Ok(())
1960 }
1961
1962 #[test]
1963 fn default_tantivy_writer_cap_accounts_for_actual_concurrent_writers() {
1964 const GIB: u64 = 1024 * 1024 * 1024;
1965
1966 assert_eq!(
1967 default_tantivy_max_writer_threads_for_memory_bytes_and_concurrent_writers(
1968 Some(24 * GIB),
1969 8,
1970 ),
1971 2
1972 );
1973 assert_eq!(
1974 default_tantivy_max_writer_threads_for_memory_bytes_and_concurrent_writers(
1975 Some(24 * GIB),
1976 2,
1977 ),
1978 9
1979 );
1980 assert_eq!(
1981 default_tantivy_max_writer_threads_for_memory_bytes_and_concurrent_writers(
1982 Some(24 * GIB),
1983 0,
1984 ),
1985 19
1986 );
1987 }
1988
1989 fn require_default_tantivy_writer_cap(
1990 memory_bytes: Option<u64>,
1991 expected: usize,
1992 label: &'static str,
1993 ) -> Result<(), &'static str> {
1994 if default_tantivy_max_writer_threads_for_memory_bytes(memory_bytes) == expected {
1995 return Ok(());
1996 }
1997 Err(label)
1998 }
1999
2000 #[test]
2001 fn generate_edge_ngrams_prefixes() {
2002 let out = frankensearch::lexical::cass_generate_edge_ngrams("hello world");
2003 assert!(out.contains("he"));
2004 assert!(out.contains("world"));
2005 }
2006
2007 #[test]
2008 fn build_preview_truncates_with_ellipsis() {
2009 let preview = frankensearch::lexical::cass_build_preview("abcdefghijklmnopqrstuvwxyz", 10);
2010 assert_eq!(preview, "abcdefghij…");
2011 }
2012
2013 #[test]
2014 fn merge_status_api_is_exposed() {
2015 let dir = TempDir::new().expect("temp dir");
2016 let index = TantivyIndex::open_or_create(dir.path()).expect("create");
2017 let status = index.merge_status();
2018 assert_eq!(status.merge_threshold, 4);
2019 }
2020
2021 #[test]
2022 fn searchable_index_summary_uses_meta_file_without_opening_index() {
2023 let dir = TempDir::new().expect("temp dir");
2024 fs::write(
2025 dir.path().join("meta.json"),
2026 serde_json::to_vec(&serde_json::json!({
2027 "segments": [
2028 {"segment_id": "a", "max_doc": 3, "deletes": null},
2029 {"segment_id": "b", "max_doc": 5}
2030 ]
2031 }))
2032 .expect("serialize meta"),
2033 )
2034 .expect("write meta");
2035
2036 assert_eq!(
2037 searchable_index_summary(dir.path())
2038 .expect("summary")
2039 .expect("index exists"),
2040 SearchableIndexSummary {
2041 docs: 8,
2042 segments: 2
2043 }
2044 );
2045 }
2046
2047 #[test]
2048 fn searchable_index_summary_meta_fast_path_declines_deleted_segments() {
2049 let dir = TempDir::new().expect("temp dir");
2050 fs::write(
2051 dir.path().join("meta.json"),
2052 serde_json::to_vec(&serde_json::json!({
2053 "segments": [
2054 {"segment_id": "a", "max_doc": 3, "deletes": {"opstamp": 1}}
2055 ]
2056 }))
2057 .expect("serialize meta"),
2058 )
2059 .expect("write meta");
2060
2061 assert_eq!(
2062 searchable_index_summary_from_tantivy_meta(dir.path()).expect("summary"),
2063 None
2064 );
2065 }
2066
2067 #[test]
2068 fn merge_status_should_merge_logic() {
2069 let status = MergeStatus {
2070 segment_count: 5,
2071 last_merge_ts: 0,
2072 ms_since_last_merge: -1,
2073 merge_threshold: 4,
2074 cooldown_ms: 300_000,
2075 };
2076 assert!(status.should_merge());
2077 }
2078
2079 #[test]
2080 fn index_dir_creates_versioned_path() {
2081 let dir = TempDir::new().expect("temp dir");
2082 let result = index_dir(dir.path()).expect("index dir");
2083 assert!(result.ends_with("index/v8"));
2086 }
2087
2088 #[test]
2089 fn tokenizer_registration_is_callable() {
2090 let dir = TempDir::new().expect("temp dir");
2091 let mut idx = Index::create_in_ram(build_schema());
2092 ensure_tokenizer(&mut idx);
2093 let _ = TantivyIndex::open_or_create(dir.path()).expect("open or create");
2094 }
2095
2096 #[test]
2097 fn add_messages_batches_large_payloads_without_dropping_docs() {
2098 let dir = TempDir::new().expect("temp dir");
2099 let mut idx = TantivyIndex::open_or_create(dir.path()).expect("create index");
2100 let content = "x".repeat(4096);
2101 let messages: Vec<_> = (0..1_200)
2102 .map(|i| NormalizedMessage {
2103 idx: i,
2104 role: "assistant".to_string(),
2105 author: None,
2106 created_at: Some(1_700_000_000_000 + i),
2107 content: format!("{i}-{content}"),
2108 extra: Value::Null,
2109 snippets: Vec::new(),
2110 invocations: Vec::new(),
2111 })
2112 .collect();
2113 let conv = NormalizedConversation {
2114 agent_slug: "codex".to_string(),
2115 external_id: Some("large-batch".to_string()),
2116 title: Some("Large Batch".to_string()),
2117 workspace: Some(PathBuf::from("/tmp/workspace")),
2118 source_path: PathBuf::from("/tmp/rollout.jsonl"),
2119 started_at: Some(1_700_000_000_000),
2120 ended_at: Some(1_700_000_000_999),
2121 metadata: Value::Null,
2122 messages,
2123 };
2124
2125 idx.add_messages(&conv, &conv.messages)
2126 .expect("add messages");
2127 idx.commit().expect("commit");
2128
2129 let reader = idx.reader().expect("reader");
2130 reader.reload().expect("reload");
2131 let searcher = reader.searcher();
2132 assert_eq!(searcher.num_docs(), conv.messages.len() as u64);
2133 }
2134
2135 #[test]
2136 fn add_conversations_with_ids_streams_large_payloads_without_dropping_docs() {
2137 let dir = TempDir::new().expect("temp dir");
2138 let mut idx = TantivyIndex::open_or_create(dir.path()).expect("create index");
2139 let content = "y".repeat(2048);
2140 let conversations: Vec<_> = (0..24)
2141 .map(|conv_idx| {
2142 let messages = (0..256)
2143 .map(|msg_idx| NormalizedMessage {
2144 idx: msg_idx,
2145 role: "assistant".to_string(),
2146 author: None,
2147 created_at: Some(1_700_000_000_000 + (conv_idx * 1_000 + msg_idx)),
2148 content: format!("conv-{conv_idx}-msg-{msg_idx}-{content}"),
2149 extra: Value::Null,
2150 snippets: Vec::new(),
2151 invocations: Vec::new(),
2152 })
2153 .collect();
2154 NormalizedConversation {
2155 agent_slug: "codex".to_string(),
2156 external_id: Some(format!("conv-{conv_idx}")),
2157 title: Some(format!("Conversation {conv_idx}")),
2158 workspace: Some(PathBuf::from("/tmp/workspace")),
2159 source_path: PathBuf::from(format!("/tmp/rollout-{conv_idx}.jsonl")),
2160 started_at: Some(1_700_000_000_000 + conv_idx),
2161 ended_at: Some(1_700_000_000_999 + conv_idx),
2162 metadata: Value::Null,
2163 messages,
2164 }
2165 })
2166 .collect();
2167 let expected_docs: usize = conversations.iter().map(|conv| conv.messages.len()).sum();
2168
2169 let indexed_docs = idx
2170 .add_conversations_with_ids(conversations.iter().map(|conv| (conv, Some(42))))
2171 .expect("add conversations");
2172 assert_eq!(indexed_docs, expected_docs);
2173 idx.commit().expect("commit");
2174
2175 let reader = idx.reader().expect("reader");
2176 reader.reload().expect("reload");
2177 let searcher = reader.searcher();
2178 assert_eq!(searcher.num_docs(), expected_docs as u64);
2179 }
2180
2181 #[test]
2182 fn normalized_index_source_id_infers_remote_from_origin_host_without_kind() {
2183 let source_id = normalized_index_source_id(Some(" "), None, Some("dev@laptop"));
2184 assert_eq!(source_id, "dev@laptop");
2185 assert_eq!(normalized_index_origin_kind(&source_id, None), "remote");
2186 }
2187
2188 #[test]
2189 fn add_prebuilt_documents_streams_large_payloads_without_dropping_docs() {
2190 let dir = TempDir::new().expect("temp dir");
2191 let mut idx = TantivyIndex::open_or_create(dir.path()).expect("create index");
2192 let content = "z".repeat(2048);
2193 let docs: Vec<_> = (0..6_144)
2194 .map(|msg_idx| FsCassDocument {
2195 agent: "codex".to_string(),
2196 workspace: Some("/tmp/workspace".to_string()),
2197 workspace_original: None,
2198 source_path: "/tmp/prebuilt-rollout.jsonl".to_string(),
2199 msg_idx: msg_idx as u64,
2200 created_at: Some(1_700_000_000_000 + msg_idx as i64),
2201 title: Some("Prebuilt Batch".to_string()),
2202 content: format!("prebuilt-msg-{msg_idx}-{content}"),
2203 conversation_id: Some(7),
2204 source_id: crate::sources::provenance::LOCAL_SOURCE_ID.to_string(),
2205 origin_kind: crate::sources::provenance::LOCAL_SOURCE_ID.to_string(),
2206 origin_host: None,
2207 })
2208 .collect();
2209 let expected_docs = docs.len();
2210
2211 let indexed_docs = idx.add_prebuilt_documents(docs).expect("add prebuilt docs");
2212 assert_eq!(indexed_docs, expected_docs);
2213 idx.commit().expect("commit");
2214
2215 let reader = idx.reader().expect("reader");
2216 reader.reload().expect("reload");
2217 let searcher = reader.searcher();
2218 assert_eq!(searcher.num_docs(), expected_docs as u64);
2219 }
2220
2221 #[test]
2222 fn merge_compatible_index_directories_roundtrips_docs_into_single_segment() {
2223 let root = TempDir::new().expect("temp dir");
2224 let shard_a = root.path().join("shard-a");
2225 let shard_b = root.path().join("shard-b");
2226 let merged = root.path().join("merged");
2227
2228 let mut shard_a_index = TantivyIndex::open_or_create(&shard_a).expect("create shard a");
2229 let mut shard_b_index = TantivyIndex::open_or_create(&shard_b).expect("create shard b");
2230
2231 let make_conv = |external_id: &str, title: &str, content: &str| NormalizedConversation {
2232 agent_slug: "codex".to_string(),
2233 external_id: Some(external_id.to_string()),
2234 title: Some(title.to_string()),
2235 workspace: Some(PathBuf::from("/tmp/workspace")),
2236 source_path: PathBuf::from(format!("/tmp/{external_id}.jsonl")),
2237 started_at: Some(1_700_000_000_000),
2238 ended_at: Some(1_700_000_000_100),
2239 metadata: Value::Null,
2240 messages: vec![
2241 NormalizedMessage {
2242 idx: 0,
2243 role: "user".to_string(),
2244 author: None,
2245 created_at: Some(1_700_000_000_010),
2246 content: format!("{content}-a"),
2247 extra: Value::Null,
2248 snippets: Vec::new(),
2249 invocations: Vec::new(),
2250 },
2251 NormalizedMessage {
2252 idx: 1,
2253 role: "assistant".to_string(),
2254 author: None,
2255 created_at: Some(1_700_000_000_020),
2256 content: format!("{content}-b"),
2257 extra: Value::Null,
2258 snippets: Vec::new(),
2259 invocations: Vec::new(),
2260 },
2261 ],
2262 };
2263
2264 let conv_a = make_conv("merge-a", "Merge A", "alpha");
2265 let conv_b = make_conv("merge-b", "Merge B", "beta");
2266 shard_a_index
2267 .add_conversation_with_id(&conv_a, Some(10))
2268 .expect("index shard a");
2269 shard_b_index
2270 .add_conversation_with_id(&conv_b, Some(20))
2271 .expect("index shard b");
2272 shard_a_index.commit().expect("commit shard a");
2273 shard_b_index.commit().expect("commit shard b");
2274 drop(shard_a_index);
2275 drop(shard_b_index);
2276
2277 let merged_index =
2278 TantivyIndex::merge_compatible_index_directories(&merged, &[&shard_a, &shard_b])
2279 .expect("merge shard indices");
2280 assert_eq!(
2281 merged_index.segment_count(),
2282 1,
2283 "merged shard indices should collapse into a single searchable segment"
2284 );
2285 let reader = merged_index.reader().expect("reader");
2286 reader.reload().expect("reload");
2287 assert_eq!(reader.searcher().num_docs(), 4);
2288 }
2289
2290 #[test]
2291 fn merge_compatible_index_directories_rejects_non_empty_output_directory() {
2292 let root = TempDir::new().expect("temp dir");
2293 let shard = root.path().join("shard");
2294 let merged = root.path().join("merged");
2295 fs::create_dir_all(&merged).expect("create merged dir");
2296 fs::write(merged.join("sentinel.txt"), "occupied").expect("write sentinel");
2297
2298 let mut shard_index = TantivyIndex::open_or_create(&shard).expect("create shard");
2299 let conv = NormalizedConversation {
2300 agent_slug: "codex".to_string(),
2301 external_id: Some("merge-occupied".to_string()),
2302 title: Some("Occupied".to_string()),
2303 workspace: Some(PathBuf::from("/tmp/workspace")),
2304 source_path: PathBuf::from("/tmp/merge-occupied.jsonl"),
2305 started_at: Some(1_700_000_000_000),
2306 ended_at: Some(1_700_000_000_100),
2307 metadata: Value::Null,
2308 messages: vec![NormalizedMessage {
2309 idx: 0,
2310 role: "assistant".to_string(),
2311 author: None,
2312 created_at: Some(1_700_000_000_010),
2313 content: "occupied".to_string(),
2314 extra: Value::Null,
2315 snippets: Vec::new(),
2316 invocations: Vec::new(),
2317 }],
2318 };
2319 shard_index
2320 .add_conversation_with_id(&conv, Some(1))
2321 .expect("index shard");
2322 shard_index.commit().expect("commit shard");
2323 drop(shard_index);
2324
2325 let result = TantivyIndex::merge_compatible_index_directories(&merged, &[&shard]);
2326 assert!(
2327 result.is_err(),
2328 "non-empty merge output dir should be rejected"
2329 );
2330 let error = result.err().expect("merge result should contain an error");
2331 assert!(
2332 format!("{error:#}").contains("must be empty"),
2333 "unexpected error: {error:#}"
2334 );
2335 }
2336
2337 #[test]
2338 fn assemble_compatible_index_directories_roundtrips_docs_into_multi_segment_generation() {
2339 let root = TempDir::new().expect("temp dir");
2340 let shard_a = root.path().join("shard-a");
2341 let shard_b = root.path().join("shard-b");
2342 let assembled = root.path().join("assembled");
2343
2344 let mut shard_a_index = TantivyIndex::open_or_create(&shard_a).expect("create shard a");
2345 let mut shard_b_index = TantivyIndex::open_or_create(&shard_b).expect("create shard b");
2346
2347 let make_conv = |external_id: &str, title: &str, content: &str| NormalizedConversation {
2348 agent_slug: "codex".to_string(),
2349 external_id: Some(external_id.to_string()),
2350 title: Some(title.to_string()),
2351 workspace: Some(PathBuf::from("/tmp/workspace")),
2352 source_path: PathBuf::from(format!("/tmp/{external_id}.jsonl")),
2353 started_at: Some(1_700_000_001_000),
2354 ended_at: Some(1_700_000_001_100),
2355 metadata: Value::Null,
2356 messages: vec![
2357 NormalizedMessage {
2358 idx: 0,
2359 role: "user".to_string(),
2360 author: None,
2361 created_at: Some(1_700_000_001_010),
2362 content: format!("{content}-a"),
2363 extra: Value::Null,
2364 snippets: Vec::new(),
2365 invocations: Vec::new(),
2366 },
2367 NormalizedMessage {
2368 idx: 1,
2369 role: "assistant".to_string(),
2370 author: None,
2371 created_at: Some(1_700_000_001_020),
2372 content: format!("{content}-b"),
2373 extra: Value::Null,
2374 snippets: Vec::new(),
2375 invocations: Vec::new(),
2376 },
2377 ],
2378 };
2379
2380 let conv_a = make_conv("assemble-a", "Assemble A", "alpha");
2381 let conv_b = make_conv("assemble-b", "Assemble B", "beta");
2382 shard_a_index
2383 .add_conversation_with_id(&conv_a, Some(10))
2384 .expect("index shard a");
2385 shard_b_index
2386 .add_conversation_with_id(&conv_b, Some(20))
2387 .expect("index shard b");
2388 shard_a_index.commit().expect("commit shard a");
2389 shard_b_index.commit().expect("commit shard b");
2390 drop(shard_a_index);
2391 drop(shard_b_index);
2392
2393 let assembled_index =
2394 TantivyIndex::assemble_compatible_index_directories(&assembled, &[&shard_a, &shard_b])
2395 .expect("assemble shard indices");
2396 let reader = assembled_index.reader().expect("reader");
2397 reader.reload().expect("reload");
2398 assert_eq!(reader.searcher().num_docs(), 4);
2399 assert_eq!(
2400 assembled_index.segment_count(),
2401 2,
2402 "assembled shard indices should preserve one searchable segment per input artifact"
2403 );
2404 assert!(
2405 assembled.join(".managed.json").exists(),
2406 "assembled index generation should persist a Tantivy managed-file manifest"
2407 );
2408 }
2409
2410 #[test]
2411 fn publish_federated_searchable_index_directories_writes_manifest_without_root_meta() {
2412 let root = TempDir::new().expect("temp dir");
2413 let shard_a = root.path().join("shard-a");
2414 let shard_b = root.path().join("shard-b");
2415 let published = root.path().join("published");
2416
2417 let mut shard_a_index = TantivyIndex::open_or_create(&shard_a).expect("create shard a");
2418 let mut shard_b_index = TantivyIndex::open_or_create(&shard_b).expect("create shard b");
2419
2420 let make_conv = |external_id: &str, title: &str, content: &str| NormalizedConversation {
2421 agent_slug: "codex".to_string(),
2422 external_id: Some(external_id.to_string()),
2423 title: Some(title.to_string()),
2424 workspace: Some(PathBuf::from("/tmp/workspace")),
2425 source_path: PathBuf::from(format!("/tmp/{external_id}.jsonl")),
2426 started_at: Some(1_700_000_002_000),
2427 ended_at: Some(1_700_000_002_100),
2428 metadata: Value::Null,
2429 messages: vec![
2430 NormalizedMessage {
2431 idx: 0,
2432 role: "user".to_string(),
2433 author: None,
2434 created_at: Some(1_700_000_002_010),
2435 content: format!("{content}-a"),
2436 extra: Value::Null,
2437 snippets: Vec::new(),
2438 invocations: Vec::new(),
2439 },
2440 NormalizedMessage {
2441 idx: 1,
2442 role: "assistant".to_string(),
2443 author: None,
2444 created_at: Some(1_700_000_002_020),
2445 content: format!("{content}-b"),
2446 extra: Value::Null,
2447 snippets: Vec::new(),
2448 invocations: Vec::new(),
2449 },
2450 ],
2451 };
2452
2453 shard_a_index
2454 .add_conversation_with_id(&make_conv("fed-a", "Fed A", "alpha"), Some(10))
2455 .expect("index shard a");
2456 shard_b_index
2457 .add_conversation_with_id(&make_conv("fed-b", "Fed B", "beta"), Some(20))
2458 .expect("index shard b");
2459 shard_a_index.commit().expect("commit shard a");
2460 shard_b_index.commit().expect("commit shard b");
2461 drop(shard_a_index);
2462 drop(shard_b_index);
2463
2464 let summary =
2465 publish_federated_searchable_index_directories(&published, &[&shard_a, &shard_b])
2466 .expect("publish federated bundle");
2467 assert_eq!(summary.docs, 4);
2468 assert_eq!(summary.segments, 2);
2469 assert!(
2470 !published.join("meta.json").exists(),
2471 "federated publish root should not force a standard single-index meta.json"
2472 );
2473 assert!(
2474 published.join(FEDERATED_SEARCH_MANIFEST_FILE).exists(),
2475 "federated publish root should persist its manifest"
2476 );
2477 let manifest = load_federated_search_manifest_internal(&published)
2478 .expect("load manifest")
2479 .expect("manifest present");
2480 assert_eq!(manifest.shards.len(), 2);
2481 assert_eq!(
2482 searchable_index_summary(&published)
2483 .expect("summary")
2484 .expect("searchable summary")
2485 .docs,
2486 4
2487 );
2488 }
2489
2490 fn write_federated_manifest_for_test(index_path: &Path, manifest: &FederatedSearchManifest) {
2491 fs::write(
2492 federated_search_manifest_path(index_path),
2493 serde_json::to_vec_pretty(manifest).expect("serialize manifest"),
2494 )
2495 .expect("write manifest");
2496 }
2497
2498 fn publish_test_federated_bundle(root: &Path) -> PathBuf {
2499 let shard_a = root.join("shard-a");
2500 let shard_b = root.join("shard-b");
2501 let published = root.join("published");
2502
2503 let mut shard_a_index = TantivyIndex::open_or_create(&shard_a).expect("create shard a");
2504 let mut shard_b_index = TantivyIndex::open_or_create(&shard_b).expect("create shard b");
2505
2506 let make_conv = |external_id: &str, content: &str| NormalizedConversation {
2507 agent_slug: "codex".to_string(),
2508 external_id: Some(external_id.to_string()),
2509 title: Some(format!("Bundle {external_id}")),
2510 workspace: Some(PathBuf::from("/tmp/workspace")),
2511 source_path: PathBuf::from(format!("/tmp/{external_id}.jsonl")),
2512 started_at: Some(1_700_000_002_000),
2513 ended_at: Some(1_700_000_002_100),
2514 metadata: Value::Null,
2515 messages: vec![NormalizedMessage {
2516 idx: 0,
2517 role: "assistant".to_string(),
2518 author: None,
2519 created_at: Some(1_700_000_002_010),
2520 content: content.to_string(),
2521 extra: Value::Null,
2522 snippets: Vec::new(),
2523 invocations: Vec::new(),
2524 }],
2525 };
2526
2527 shard_a_index
2528 .add_conversation_with_id(&make_conv("bundle-a", "alpha"), Some(10))
2529 .expect("index shard a");
2530 shard_b_index
2531 .add_conversation_with_id(&make_conv("bundle-b", "beta"), Some(20))
2532 .expect("index shard b");
2533 shard_a_index.commit().expect("commit shard a");
2534 shard_b_index.commit().expect("commit shard b");
2535 drop(shard_a_index);
2536 drop(shard_b_index);
2537
2538 publish_federated_searchable_index_directories(&published, &[&shard_a, &shard_b])
2539 .expect("publish federated bundle");
2540 published
2541 }
2542
2543 #[test]
2544 fn federated_manifest_validation_rejects_unsupported_remote_contracts() {
2545 let root = TempDir::new().expect("temp dir");
2546 let published = root.path().join("published");
2547 fs::create_dir_all(&published).expect("create bundle root");
2548 let base_manifest = FederatedSearchManifest {
2549 version: FEDERATED_SEARCH_MANIFEST_VERSION,
2550 kind: FEDERATED_SEARCH_MANIFEST_KIND.to_string(),
2551 schema_hash: CASS_SCHEMA_HASH.to_string(),
2552 shards: vec![FederatedSearchShardManifest {
2553 relative_path: "shards/shard-00000".to_string(),
2554 docs: 1,
2555 segments: 1,
2556 meta_fingerprint: "a".repeat(64),
2557 }],
2558 };
2559
2560 let mut manifest = base_manifest.clone();
2561 manifest.version = FEDERATED_SEARCH_MANIFEST_VERSION + 1;
2562 write_federated_manifest_for_test(&published, &manifest);
2563 let error = load_federated_search_manifest_internal(&published).unwrap_err();
2564 assert!(
2565 format!("{error:#}").contains("unsupported federated search manifest version"),
2566 "unexpected version error: {error:#}"
2567 );
2568
2569 let mut manifest = base_manifest.clone();
2570 manifest.kind = "cass-unknown-artifact".to_string();
2571 write_federated_manifest_for_test(&published, &manifest);
2572 let error = load_federated_search_manifest_internal(&published).unwrap_err();
2573 assert!(
2574 format!("{error:#}").contains("unexpected federated search manifest kind"),
2575 "unexpected kind error: {error:#}"
2576 );
2577
2578 let mut manifest = base_manifest;
2579 manifest
2580 .shards
2581 .first_mut()
2582 .expect("test manifest should contain a shard")
2583 .relative_path = "../escape".to_string();
2584 write_federated_manifest_for_test(&published, &manifest);
2585 let error = load_federated_search_manifest_internal(&published).unwrap_err();
2586 assert!(
2587 format!("{error:#}").contains("must stay under shards/"),
2588 "unexpected path error: {error:#}"
2589 );
2590 }
2591
2592 #[test]
2593 fn open_federated_search_readers_rejects_corrupt_shard_fingerprint() {
2594 let root = TempDir::new().expect("temp dir");
2595 let published = publish_test_federated_bundle(root.path());
2596 let mut manifest = load_federated_search_manifest_internal(&published)
2597 .expect("load manifest")
2598 .expect("manifest present");
2599 manifest
2600 .shards
2601 .first_mut()
2602 .expect("test manifest should contain a shard")
2603 .meta_fingerprint = "0".repeat(64);
2604 write_federated_manifest_for_test(&published, &manifest);
2605
2606 let result = open_federated_search_readers(&published, FsReloadPolicy::Manual);
2607 assert!(
2608 result.is_err(),
2609 "corrupt federated shard fingerprint should be rejected"
2610 );
2611 let error = result.err().expect("open result should contain an error");
2612 assert!(
2613 format!("{error:#}").contains("federated lexical shard fingerprint mismatch"),
2614 "unexpected fingerprint error: {error:#}"
2615 );
2616 }
2617
2618 fn write_minimal_federated_artifact(root: &Path, segment_bytes: &[u8]) {
2619 let shard = root.join("shards/shard-00000");
2620 fs::create_dir_all(&shard).expect("create shard");
2621 fs::write(shard.join("meta.json"), br#"{"segments":[]}"#).expect("write shard meta");
2622 fs::write(shard.join("segment.bin"), segment_bytes).expect("write shard segment");
2623 write_root_schema_hash_file(root).expect("write schema hash");
2624
2625 let manifest = FederatedSearchManifest {
2626 version: FEDERATED_SEARCH_MANIFEST_VERSION,
2627 kind: FEDERATED_SEARCH_MANIFEST_KIND.to_string(),
2628 schema_hash: CASS_SCHEMA_HASH.to_string(),
2629 shards: vec![FederatedSearchShardManifest {
2630 relative_path: "shards/shard-00000".to_string(),
2631 docs: 0,
2632 segments: 0,
2633 meta_fingerprint: meta_fingerprint_for_existing_index_dir(&shard)
2634 .expect("meta fingerprint"),
2635 }],
2636 };
2637 write_federated_manifest_for_test(root, &manifest);
2638 }
2639
2640 fn write_minimal_standard_lexical_artifact(root: &Path, segment_bytes: &[u8]) {
2641 fs::create_dir_all(root).expect("create standard lexical root");
2642 fs::write(root.join("meta.json"), br#"{"segments":[]}"#).expect("write root meta");
2643 fs::write(root.join("segment.bin"), segment_bytes).expect("write segment");
2644 write_root_schema_hash_file(root).expect("write schema hash");
2645 }
2646
2647 #[test]
2648 fn lexical_evidence_manifest_supports_standard_searchable_index() {
2649 let root = TempDir::new().expect("temp dir");
2650 write_minimal_standard_lexical_artifact(root.path(), b"standard segment bytes");
2651
2652 let manifest =
2653 lexical_search_evidence_bundle_manifest(root.path()).expect("standard manifest");
2654 assert!(manifest.verify(root.path()).is_complete());
2655 assert!(manifest.chunks.iter().any(|chunk| {
2656 chunk.path == "meta.json" && chunk.role == EvidenceBundleChunkRole::Metadata
2657 }));
2658 assert!(manifest.chunks.iter().any(|chunk| {
2659 chunk.path == "segment.bin" && chunk.role == EvidenceBundleChunkRole::LexicalShard
2660 }));
2661 }
2662
2663 #[test]
2664 fn lexical_evidence_manifest_excludes_writer_temp_file_before_save() {
2665 let root = TempDir::new().expect("temp dir");
2666 write_minimal_standard_lexical_artifact(root.path(), b"standard segment bytes");
2667 fs::write(
2668 root.path().join(EVIDENCE_BUNDLE_MANIFEST_TEMP_FILE),
2669 b"leftover temp manifest bytes",
2670 )
2671 .expect("write stale evidence manifest temp file");
2672
2673 let manifest =
2674 lexical_search_evidence_bundle_manifest(root.path()).expect("standard manifest");
2675 assert!(
2676 manifest
2677 .chunks
2678 .iter()
2679 .all(|chunk| chunk.path != EVIDENCE_BUNDLE_MANIFEST_TEMP_FILE),
2680 "writer temp file must not become part of the saved proof: {manifest:?}"
2681 );
2682
2683 manifest.save(root.path()).expect("save evidence manifest");
2684 let report = crate::evidence_bundle::verify_evidence_bundle_manifest_file(
2685 root.path(),
2686 &crate::evidence_bundle::EvidenceBundleManifest::path(root.path()),
2687 );
2688 assert!(report.is_complete(), "{report:?}");
2689 }
2690
2691 #[test]
2692 fn lexical_evidence_manifest_rejects_standard_schema_mismatch() {
2693 let root = TempDir::new().expect("temp dir");
2694 write_minimal_standard_lexical_artifact(root.path(), b"standard segment bytes");
2695 fs::write(
2696 root.path().join("schema_hash.json"),
2697 r#"{"schema_hash":"stale"}"#,
2698 )
2699 .expect("write stale schema hash");
2700
2701 let error = lexical_search_evidence_bundle_manifest(root.path()).unwrap_err();
2702 assert!(
2703 format!("{error:#}").contains("lexical artifact schema mismatch"),
2704 "unexpected schema error: {error:#}"
2705 );
2706 }
2707
2708 #[test]
2709 fn federated_evidence_manifest_is_deterministic_and_detects_mutation() {
2710 let left = TempDir::new().expect("left temp dir");
2711 let right = TempDir::new().expect("right temp dir");
2712 write_minimal_federated_artifact(left.path(), b"same segment bytes");
2713 write_minimal_federated_artifact(right.path(), b"same segment bytes");
2714
2715 let left_manifest =
2716 federated_search_evidence_bundle_manifest(left.path()).expect("left manifest");
2717 let right_manifest =
2718 federated_search_evidence_bundle_manifest(right.path()).expect("right manifest");
2719 assert_eq!(
2720 serde_json::to_value(&left_manifest).expect("left json"),
2721 serde_json::to_value(&right_manifest).expect("right json"),
2722 "byte-identical federated artifacts should produce byte-identical evidence manifests"
2723 );
2724 assert!(left_manifest.verify(left.path()).is_complete());
2725
2726 fs::write(
2727 left.path().join("shards/shard-00000/segment.bin"),
2728 b"SAME segment bytes",
2729 )
2730 .expect("mutate shard segment");
2731 let report = left_manifest.verify(left.path());
2732 assert!(report.is_unsafe(), "{report:?}");
2733 assert!(
2734 report.issues.iter().any(|issue| issue.kind
2735 == crate::evidence_bundle::EvidenceBundleIssueKind::DigestMismatch),
2736 "expected digest mismatch after segment mutation: {report:?}"
2737 );
2738 }
2739
2740 #[cfg(unix)]
2741 #[test]
2742 fn federated_evidence_manifest_rejects_symlink_artifacts() {
2743 use std::os::unix::fs::symlink;
2744
2745 let root = TempDir::new().expect("temp dir");
2746 write_minimal_federated_artifact(root.path(), b"segment bytes");
2747 symlink(
2748 "/tmp/not-a-bundle-file",
2749 root.path().join("shards/shard-00000/link"),
2750 )
2751 .expect("create artifact symlink");
2752
2753 let error = federated_search_evidence_bundle_manifest(root.path()).unwrap_err();
2754 assert!(
2755 format!("{error:#}").contains("unsupported non-file entry"),
2756 "unexpected symlink error: {error:#}"
2757 );
2758 }
2759
2760 #[test]
2761 fn open_or_create_materializes_federated_bundle_back_into_mutable_index() {
2762 let root = TempDir::new().expect("temp dir");
2763 let shard_a = root.path().join("shard-a");
2764 let shard_b = root.path().join("shard-b");
2765 let published = root.path().join("published");
2766
2767 let mut shard_a_index = TantivyIndex::open_or_create(&shard_a).expect("create shard a");
2768 let mut shard_b_index = TantivyIndex::open_or_create(&shard_b).expect("create shard b");
2769
2770 let make_conv = |external_id: &str, title: &str, content: &str| NormalizedConversation {
2771 agent_slug: "codex".to_string(),
2772 external_id: Some(external_id.to_string()),
2773 title: Some(title.to_string()),
2774 workspace: Some(PathBuf::from("/tmp/workspace")),
2775 source_path: PathBuf::from(format!("/tmp/{external_id}.jsonl")),
2776 started_at: Some(1_700_000_003_000),
2777 ended_at: Some(1_700_000_003_100),
2778 metadata: Value::Null,
2779 messages: vec![
2780 NormalizedMessage {
2781 idx: 0,
2782 role: "user".to_string(),
2783 author: None,
2784 created_at: Some(1_700_000_003_010),
2785 content: format!("{content}-a"),
2786 extra: Value::Null,
2787 snippets: Vec::new(),
2788 invocations: Vec::new(),
2789 },
2790 NormalizedMessage {
2791 idx: 1,
2792 role: "assistant".to_string(),
2793 author: None,
2794 created_at: Some(1_700_000_003_020),
2795 content: format!("{content}-b"),
2796 extra: Value::Null,
2797 snippets: Vec::new(),
2798 invocations: Vec::new(),
2799 },
2800 ],
2801 };
2802
2803 shard_a_index
2804 .add_conversation_with_id(&make_conv("mat-a", "Mat A", "alpha"), Some(10))
2805 .expect("index shard a");
2806 shard_b_index
2807 .add_conversation_with_id(&make_conv("mat-b", "Mat B", "beta"), Some(20))
2808 .expect("index shard b");
2809 shard_a_index.commit().expect("commit shard a");
2810 shard_b_index.commit().expect("commit shard b");
2811 drop(shard_a_index);
2812 drop(shard_b_index);
2813
2814 publish_federated_searchable_index_directories(&published, &[&shard_a, &shard_b])
2815 .expect("publish federated bundle");
2816 assert!(
2817 published.join(FEDERATED_SEARCH_MANIFEST_FILE).exists(),
2818 "test fixture should start in federated bundle form"
2819 );
2820
2821 let mutable_index =
2822 TantivyIndex::open_or_create(&published).expect("materialize mutable index");
2823 let reader = mutable_index.reader().expect("reader");
2824 reader.reload().expect("reload");
2825 assert_eq!(reader.searcher().num_docs(), 4);
2826 assert!(
2827 published.join("meta.json").exists(),
2828 "writer open should materialize a standard writable Tantivy index"
2829 );
2830 assert!(
2831 !published.join(FEDERATED_SEARCH_MANIFEST_FILE).exists(),
2832 "materialization should replace the federated bundle manifest"
2833 );
2834 }
2835
2836 #[test]
2837 fn federated_materialization_target_rejects_file_root() {
2838 let root = TempDir::new().expect("temp dir");
2839 let published = root.path().join("published");
2840 fs::write(&published, "not a directory").expect("write file root");
2841
2842 let err = ensure_replaceable_federated_materialization_root(&published)
2843 .expect_err("file roots must not be materialized over");
2844
2845 assert!(
2846 err.to_string().contains("not a directory"),
2847 "unexpected error: {err:#}"
2848 );
2849 assert_eq!(
2850 fs::read_to_string(&published).expect("file root should remain"),
2851 "not a directory"
2852 );
2853 }
2854
2855 #[test]
2856 #[cfg(unix)]
2857 fn federated_materialization_target_rejects_dangling_symlink_root() {
2858 use std::os::unix::fs::symlink;
2859
2860 let root = TempDir::new().expect("temp dir");
2861 let published = root.path().join("published");
2862 let missing_target = root.path().join("missing-published");
2863 symlink(&missing_target, &published).expect("create dangling symlink");
2864
2865 let err = ensure_replaceable_federated_materialization_root(&published)
2866 .expect_err("symlink roots must not be materialized over");
2867
2868 assert!(
2869 err.to_string().contains("through symlink"),
2870 "unexpected error: {err:#}"
2871 );
2872 assert!(
2873 fs::symlink_metadata(&published)
2874 .expect("symlink root should remain")
2875 .file_type()
2876 .is_symlink()
2877 );
2878 assert!(!missing_target.exists());
2879 }
2880
2881 #[test]
2890 fn packet_driven_lexical_pipeline_matches_legacy_for_normalized_conv() {
2891 use crate::model::conversation_packet::{ConversationPacket, ConversationPacketProvenance};
2892
2893 fn make_conv() -> NormalizedConversation {
2894 NormalizedConversation {
2895 agent_slug: "codex".to_string(),
2896 external_id: Some("packet-equivalence".to_string()),
2897 title: Some("Packet Equivalence".to_string()),
2898 workspace: Some(PathBuf::from("/work/eq")),
2899 source_path: PathBuf::from("/work/eq/.codex/session.jsonl"),
2900 started_at: Some(1_700_000_000_000),
2901 ended_at: Some(1_700_000_010_000),
2902 metadata: serde_json::json!({
2903 "cass": {
2904 "origin": {
2905 "source_id": "remote-host",
2906 "kind": "ssh",
2907 "host": "ws-42.example",
2908 },
2909 "workspace_original": "/Users/dev/eq",
2910 },
2911 "model": "gpt-5",
2912 }),
2913 messages: vec![
2914 NormalizedMessage {
2915 idx: 0,
2916 role: "user".to_string(),
2917 author: Some("human".to_string()),
2918 created_at: Some(1_700_000_000_000),
2919 content: "explain the packet pipeline".to_string(),
2920 extra: serde_json::json!({"turn": 1}),
2921 snippets: Vec::new(),
2922 invocations: Vec::new(),
2923 },
2924 NormalizedMessage {
2925 idx: 1,
2926 role: "assistant".to_string(),
2927 author: None,
2928 created_at: Some(1_700_000_001_000),
2929 content: "the pipeline normalizes once".to_string(),
2930 extra: Value::Null,
2931 snippets: Vec::new(),
2932 invocations: Vec::new(),
2933 },
2934 NormalizedMessage {
2935 idx: 2,
2936 role: "tool".to_string(),
2937 author: Some("ripgrep".to_string()),
2938 created_at: Some(1_700_000_002_000),
2939 content: "matches: 3".to_string(),
2940 extra: Value::Null,
2941 snippets: Vec::new(),
2942 invocations: Vec::new(),
2943 },
2944 ],
2945 }
2946 }
2947
2948 let legacy_dir = TempDir::new().expect("legacy temp dir");
2949 let mut legacy_idx = TantivyIndex::open_or_create(legacy_dir.path()).expect("legacy idx");
2950 let conv = make_conv();
2951 legacy_idx
2952 .add_messages_with_conversation_id(&conv, &conv.messages, Some(99))
2953 .expect("legacy add");
2954 legacy_idx.commit().expect("legacy commit");
2955 let legacy_reader = legacy_idx.reader().expect("legacy reader");
2956 legacy_reader.reload().expect("legacy reload");
2957 let legacy_searcher = legacy_reader.searcher();
2958 let legacy_count = legacy_searcher.num_docs();
2959
2960 let packet_dir = TempDir::new().expect("packet temp dir");
2961 let mut packet_idx = TantivyIndex::open_or_create(packet_dir.path()).expect("packet idx");
2962 let packet = ConversationPacket::from_normalized_conversation(
2963 &conv,
2964 ConversationPacketProvenance::local(),
2965 );
2966 packet_idx
2967 .add_messages_from_packet(&packet, None, Some(99), |_| Ok(()))
2968 .expect("packet add");
2969 packet_idx.commit().expect("packet commit");
2970 let packet_reader = packet_idx.reader().expect("packet reader");
2971 packet_reader.reload().expect("packet reload");
2972 let packet_searcher = packet_reader.searcher();
2973 let packet_count = packet_searcher.num_docs();
2974
2975 assert_eq!(
2976 legacy_count, packet_count,
2977 "packet pipeline must emit the same number of docs as legacy: legacy={legacy_count} packet={packet_count}"
2978 );
2979 assert_eq!(
2980 legacy_count,
2981 conv.messages.len() as u64,
2982 "all 3 fixture messages should land (none filter as hard noise)"
2983 );
2984
2985 let legacy_context = cass_doc_context(&conv, Some(99));
2990 let legacy_docs: Vec<FsCassDocument> = conv
2991 .messages
2992 .iter()
2993 .filter_map(|m| cass_document_for_message(&legacy_context, m))
2994 .collect();
2995 let packet_context_owned = {
2996 let mut ctx = cass_doc_context_from_packet(&packet);
2997 ctx.conversation_id = Some(99);
2998 ctx
2999 };
3000 let packet_docs: Vec<FsCassDocument> = packet
3001 .payload
3002 .messages
3003 .iter()
3004 .filter_map(|m| cass_document_for_packet_message(&packet_context_owned, m))
3005 .collect();
3006 assert_eq!(
3007 legacy_docs.len(),
3008 packet_docs.len(),
3009 "packet doc list length should match legacy"
3010 );
3011 for (legacy_doc, packet_doc) in legacy_docs.iter().zip(packet_docs.iter()) {
3012 assert_eq!(legacy_doc.agent, packet_doc.agent);
3013 assert_eq!(legacy_doc.workspace, packet_doc.workspace);
3014 assert_eq!(legacy_doc.workspace_original, packet_doc.workspace_original);
3015 assert_eq!(legacy_doc.source_path, packet_doc.source_path);
3016 assert_eq!(legacy_doc.msg_idx, packet_doc.msg_idx);
3017 assert_eq!(legacy_doc.created_at, packet_doc.created_at);
3018 assert_eq!(legacy_doc.title, packet_doc.title);
3019 assert_eq!(legacy_doc.content, packet_doc.content);
3020 assert_eq!(legacy_doc.conversation_id, packet_doc.conversation_id);
3021 assert_eq!(
3022 legacy_doc.source_id, packet_doc.source_id,
3023 "source_id must match (remote-host normalization is the bead's tripwire)"
3024 );
3025 assert_eq!(legacy_doc.origin_kind, packet_doc.origin_kind);
3026 assert_eq!(legacy_doc.origin_host, packet_doc.origin_host);
3027 }
3028 let first_packet_doc = packet_docs
3032 .first()
3033 .expect("packet fixture should emit at least one doc");
3034 assert_eq!(
3035 first_packet_doc.source_id, "remote-host",
3036 "metadata.cass.origin.source_id must be the canonical value"
3037 );
3038 assert_eq!(
3039 first_packet_doc.origin_host.as_deref(),
3040 Some("ws-42.example"),
3041 "metadata.cass.origin.host must surface as origin_host"
3042 );
3043 }
3044
3045 #[test]
3051 fn add_conversation_with_id_packet_path_emits_expected_doc_count() {
3052 fn fixture(id: i64) -> NormalizedConversation {
3053 NormalizedConversation {
3054 agent_slug: "codex".to_string(),
3055 external_id: Some(format!("conv-{id}")),
3056 title: Some(format!("Conv {id}")),
3057 workspace: None,
3058 source_path: PathBuf::from(format!("/tmp/conv-{id}.jsonl")),
3059 started_at: Some(1_700_000_000_000 + id),
3060 ended_at: Some(1_700_000_001_000 + id),
3061 metadata: Value::Null,
3062 messages: vec![
3063 NormalizedMessage {
3064 idx: 0,
3065 role: "user".to_string(),
3066 author: None,
3067 created_at: Some(1_700_000_000_000 + id),
3068 content: format!("hello-{id}"),
3069 extra: Value::Null,
3070 snippets: Vec::new(),
3071 invocations: Vec::new(),
3072 },
3073 NormalizedMessage {
3074 idx: 1,
3075 role: "assistant".to_string(),
3076 author: None,
3077 created_at: Some(1_700_000_000_500 + id),
3078 content: format!("response-{id}"),
3079 extra: Value::Null,
3080 snippets: Vec::new(),
3081 invocations: Vec::new(),
3082 },
3083 ],
3084 }
3085 }
3086
3087 let dir = TempDir::new().expect("temp dir");
3088 let mut idx = TantivyIndex::open_or_create(dir.path()).expect("idx");
3089 idx.add_conversation_with_id(&fixture(1), Some(101))
3090 .expect("conv 1");
3091 idx.add_conversation_with_id(&fixture(2), Some(102))
3092 .expect("conv 2");
3093 idx.commit().expect("commit");
3094
3095 let reader = idx.reader().expect("reader");
3096 reader.reload().expect("reload");
3097 assert_eq!(
3098 reader.searcher().num_docs(),
3099 4,
3100 "two conversations × two messages each ⇒ four lexical docs"
3101 );
3102 }
3103}