Skip to main content

coding_agent_search/search/
tantivy.rs

1use std::collections::BTreeSet;
2use std::fs;
3use std::path::{Path, PathBuf};
4
5use crate::connectors::NormalizedConversation;
6use crate::connectors::NormalizedMessage;
7use crate::evidence_bundle::{
8    EVIDENCE_BUNDLE_MANIFEST_FILE, EvidenceBundleChunk, EvidenceBundleChunkRole,
9    EvidenceBundleKind, EvidenceBundleManifest,
10};
11use crate::model::conversation_packet::{
12    ConversationPacket, ConversationPacketMessage, ConversationPacketProvenance,
13};
14use crate::search::canonicalize::is_hard_message_noise;
15use crate::sources::provenance::LOCAL_SOURCE_ID;
16use anyhow::{Context, Error, Result};
17use frankensearch::lexical::{
18    CASS_SCHEMA_HASH, CASS_SCHEMA_VERSION, CassDocument as FsCassDocument,
19    CassDocumentRef as FsCassDocumentRef, CassFields as FsCassFields,
20    CassMergeStatus as FsCassMergeStatus, CassTantivyIndex as FsCassTantivyIndex, Index,
21    IndexReader, ReloadPolicy as FsReloadPolicy, Schema, cass_build_schema as fs_build_schema,
22    cass_ensure_tokenizer as fs_ensure_tokenizer, cass_fields_from_schema as fs_fields_from_schema,
23    cass_index_dir as fs_index_dir, cass_open_search_reader as fs_cass_open_search_reader,
24    cass_schema_hash_matches, tantivy_crate,
25};
26use serde::{Deserialize, Serialize};
27use std::time::SystemTime;
28
29pub(crate) fn normalized_index_source_id(
30    source_id: Option<&str>,
31    origin_kind: Option<&str>,
32    origin_host: Option<&str>,
33) -> String {
34    let trimmed_source_id = source_id.unwrap_or_default().trim();
35    if !trimmed_source_id.is_empty() {
36        if trimmed_source_id.eq_ignore_ascii_case(LOCAL_SOURCE_ID) {
37            return LOCAL_SOURCE_ID.to_string();
38        }
39        return trimmed_source_id.to_string();
40    }
41
42    let trimmed_origin_host = origin_host.map(str::trim).filter(|value| !value.is_empty());
43    let trimmed_origin_kind = origin_kind.unwrap_or_default().trim();
44    if trimmed_origin_kind.eq_ignore_ascii_case("ssh")
45        || trimmed_origin_kind.eq_ignore_ascii_case("remote")
46    {
47        return trimmed_origin_host.unwrap_or("remote").to_string();
48    }
49    if let Some(origin_host) = trimmed_origin_host {
50        return origin_host.to_string();
51    }
52
53    LOCAL_SOURCE_ID.to_string()
54}
55
56pub(crate) fn normalized_index_origin_kind(source_id: &str, origin_kind: Option<&str>) -> String {
57    if let Some(kind) = origin_kind.map(str::trim).filter(|value| !value.is_empty()) {
58        if kind.eq_ignore_ascii_case("local") {
59            return LOCAL_SOURCE_ID.to_string();
60        }
61        if kind.eq_ignore_ascii_case("ssh") || kind.eq_ignore_ascii_case("remote") {
62            return "remote".to_string();
63        }
64        return kind.to_ascii_lowercase();
65    }
66
67    if source_id == LOCAL_SOURCE_ID {
68        LOCAL_SOURCE_ID.to_string()
69    } else {
70        "remote".to_string()
71    }
72}
73
74pub(crate) fn normalized_index_origin_host(origin_host: Option<&str>) -> Option<String> {
75    origin_host
76        .map(str::trim)
77        .filter(|value| !value.is_empty())
78        .map(str::to_string)
79}
80
81pub const SCHEMA_HASH: &str = CASS_SCHEMA_HASH;
82const ENV_TANTIVY_ADD_BATCH_MAX_CHARS: &str = "CASS_TANTIVY_ADD_BATCH_MAX_CHARS";
83const ENV_TANTIVY_ADD_BATCH_MAX_MESSAGES: &str = "CASS_TANTIVY_ADD_BATCH_MAX_MESSAGES";
84const ENV_TANTIVY_MAX_WRITER_THREADS: &str = "CASS_TANTIVY_MAX_WRITER_THREADS";
85const ENV_TANTIVY_REBUILD_STAGED_SHARD_BUILDERS: &str =
86    "CASS_TANTIVY_REBUILD_STAGED_SHARD_BUILDERS";
87const DEFAULT_TANTIVY_MAX_WRITER_THREADS_CEILING: usize = 26;
88const DEFAULT_TANTIVY_ASSUMED_CONCURRENT_WRITERS: u64 = 8;
89const DEFAULT_TANTIVY_WRITER_HEAP_PER_THREAD_BYTES: u64 = 128 * 1024 * 1024;
90const DEFAULT_TANTIVY_WRITER_HEAP_RAM_FRACTION: u64 = 10;
91
92fn positive_usize_env(name: &str) -> Option<usize> {
93    dotenvy::var(name)
94        .ok()
95        .and_then(|value| value.parse::<usize>().ok())
96        .filter(|value| *value > 0)
97}
98
99#[cfg(target_os = "linux")]
100fn linux_available_memory_bytes() -> Option<u64> {
101    let meminfo = std::fs::read_to_string("/proc/meminfo").ok()?;
102    for line in meminfo.lines() {
103        if let Some(rest) = line.strip_prefix("MemAvailable:") {
104            let kb = rest.split_whitespace().next()?.parse::<u64>().ok()?;
105            return kb.checked_mul(1024);
106        }
107    }
108    None
109}
110
111#[cfg(target_os = "linux")]
112fn host_memory_bytes_for_tantivy_default() -> Option<u64> {
113    linux_available_memory_bytes()
114}
115
116#[cfg(target_os = "macos")]
117fn host_memory_bytes_for_tantivy_default() -> Option<u64> {
118    let output = std::process::Command::new("sysctl")
119        .args(["-n", "hw.memsize"])
120        .output()
121        .ok()?;
122    if !output.status.success() {
123        return None;
124    }
125    let stdout = String::from_utf8(output.stdout).ok()?;
126    stdout.trim().parse::<u64>().ok()
127}
128
129#[cfg(not(any(target_os = "linux", target_os = "macos")))]
130fn host_memory_bytes_for_tantivy_default() -> Option<u64> {
131    None
132}
133
134#[cfg(test)]
135pub(crate) fn default_tantivy_max_writer_threads_for_memory_bytes(
136    memory_bytes: Option<u64>,
137) -> usize {
138    default_tantivy_max_writer_threads_for_memory_bytes_and_concurrent_writers(
139        memory_bytes,
140        DEFAULT_TANTIVY_ASSUMED_CONCURRENT_WRITERS,
141    )
142}
143
144pub(crate) fn default_tantivy_max_writer_threads_for_memory_bytes_and_concurrent_writers(
145    memory_bytes: Option<u64>,
146    concurrent_writers: u64,
147) -> usize {
148    let Some(memory_bytes) = memory_bytes else {
149        return DEFAULT_TANTIVY_MAX_WRITER_THREADS_CEILING;
150    };
151    let per_thread_peak =
152        DEFAULT_TANTIVY_WRITER_HEAP_PER_THREAD_BYTES.saturating_mul(concurrent_writers.max(1));
153    if per_thread_peak == 0 {
154        return DEFAULT_TANTIVY_MAX_WRITER_THREADS_CEILING;
155    }
156    let budget = memory_bytes / DEFAULT_TANTIVY_WRITER_HEAP_RAM_FRACTION;
157    let threads = budget / per_thread_peak;
158    usize::try_from(threads)
159        .unwrap_or(usize::MAX)
160        .clamp(1, DEFAULT_TANTIVY_MAX_WRITER_THREADS_CEILING)
161}
162
163fn default_tantivy_assumed_concurrent_writers() -> u64 {
164    positive_usize_env(ENV_TANTIVY_REBUILD_STAGED_SHARD_BUILDERS)
165        .and_then(|value| u64::try_from(value).ok())
166        .unwrap_or(DEFAULT_TANTIVY_ASSUMED_CONCURRENT_WRITERS)
167        .max(1)
168}
169
170pub fn default_tantivy_max_writer_threads() -> usize {
171    default_tantivy_max_writer_threads_for_memory_bytes_and_concurrent_writers(
172        host_memory_bytes_for_tantivy_default(),
173        default_tantivy_assumed_concurrent_writers(),
174    )
175}
176
177pub(crate) fn tantivy_writer_parallelism_hint_for_available(available_parallelism: usize) -> usize {
178    let max_threads = positive_usize_env(ENV_TANTIVY_MAX_WRITER_THREADS)
179        .unwrap_or_else(default_tantivy_max_writer_threads);
180
181    available_parallelism.max(1).clamp(1, max_threads)
182}
183
184/// Governor-aware variant of `tantivy_writer_parallelism_hint_for_available`.
185/// Returns the same value on an idle host but scales down when the machine
186/// responsiveness governor has shrunk the global capacity. Call this from
187/// production code paths; the ungoverned `_for_available` variant is kept so
188/// formula-only unit tests stay deterministic.
189pub(crate) fn tantivy_writer_parallelism_hint_for_available_governed(
190    available_parallelism: usize,
191) -> usize {
192    let raw = tantivy_writer_parallelism_hint_for_available(available_parallelism);
193    crate::indexer::responsiveness::effective_worker_count(raw).max(1)
194}
195
196pub(crate) fn tantivy_writer_parallelism_hint() -> usize {
197    tantivy_writer_parallelism_hint_for_available_governed(
198        std::thread::available_parallelism()
199            .map(std::num::NonZeroUsize::get)
200            .unwrap_or(1),
201    )
202}
203
204fn tantivy_add_batch_max_messages() -> usize {
205    positive_usize_env(ENV_TANTIVY_ADD_BATCH_MAX_MESSAGES)
206        .unwrap_or_else(|| 4_096.max(tantivy_writer_parallelism_hint().saturating_mul(512)))
207}
208
209fn tantivy_add_batch_max_chars() -> usize {
210    positive_usize_env(ENV_TANTIVY_ADD_BATCH_MAX_CHARS).unwrap_or_else(|| {
211        (16 * 1024 * 1024).max(tantivy_writer_parallelism_hint().saturating_mul(2 * 1024 * 1024))
212    })
213}
214
215fn tantivy_prebuilt_add_batch_max_messages() -> usize {
216    positive_usize_env(ENV_TANTIVY_ADD_BATCH_MAX_MESSAGES)
217        .unwrap_or_else(|| 16_384.max(tantivy_writer_parallelism_hint().saturating_mul(512)))
218}
219
220fn map_fs_err(err: frankensearch::SearchError) -> Error {
221    Error::new(err)
222}
223
224#[derive(Clone)]
225struct CassDocContext {
226    agent: String,
227    workspace: Option<String>,
228    workspace_original: Option<String>,
229    source_path: String,
230    title: Option<String>,
231    started_at_fallback: Option<i64>,
232    source_id: String,
233    origin_kind: String,
234    origin_host: Option<String>,
235    conversation_id: Option<i64>,
236}
237
238fn cass_doc_context(conv: &NormalizedConversation, conversation_id: Option<i64>) -> CassDocContext {
239    let cass_origin = conv.metadata.get("cass").and_then(|c| c.get("origin"));
240    let raw_source_id = cass_origin
241        .and_then(|o| o.get("source_id"))
242        .and_then(|v| v.as_str());
243    let raw_origin_kind = cass_origin
244        .and_then(|o| o.get("kind"))
245        .and_then(|v| v.as_str());
246    let origin_host = normalized_index_origin_host(
247        cass_origin
248            .and_then(|o| o.get("host"))
249            .and_then(|v| v.as_str()),
250    );
251    let source_id =
252        normalized_index_source_id(raw_source_id, raw_origin_kind, origin_host.as_deref());
253    let origin_kind = normalized_index_origin_kind(&source_id, raw_origin_kind);
254
255    CassDocContext {
256        agent: conv.agent_slug.clone(),
257        workspace: conv
258            .workspace
259            .as_ref()
260            .map(|ws| ws.to_string_lossy().to_string()),
261        workspace_original: conv
262            .metadata
263            .get("cass")
264            .and_then(|c| c.get("workspace_original"))
265            .and_then(|v| v.as_str())
266            .map(ToOwned::to_owned),
267        source_path: conv.source_path.to_string_lossy().to_string(),
268        title: conv.title.clone(),
269        started_at_fallback: conv.started_at,
270        source_id,
271        origin_kind,
272        origin_host,
273        conversation_id,
274    }
275}
276
277fn cass_document_for_message(
278    context: &CassDocContext,
279    msg: &NormalizedMessage,
280) -> Option<FsCassDocument> {
281    if is_hard_message_noise(Some(msg.role.as_str()), &msg.content) {
282        return None;
283    }
284
285    Some(FsCassDocument {
286        agent: context.agent.clone(),
287        workspace: context.workspace.clone(),
288        workspace_original: context.workspace_original.clone(),
289        source_path: context.source_path.clone(),
290        msg_idx: msg.idx.max(0) as u64,
291        created_at: msg.created_at.or(context.started_at_fallback),
292        title: context.title.clone(),
293        content: msg.content.clone(),
294        conversation_id: context.conversation_id,
295        source_id: context.source_id.clone(),
296        origin_kind: context.origin_kind.clone(),
297        origin_host: context.origin_host.clone(),
298    })
299}
300
301fn push_cass_document_into_pending(
302    docs: &mut Vec<FsCassDocument>,
303    pending_chars: &mut usize,
304    doc: FsCassDocument,
305) {
306    *pending_chars = pending_chars.saturating_add(doc.content.len());
307    docs.push(doc);
308}
309
310/// Build the per-document context the lexical sink needs from a
311/// [`ConversationPacket`]. Packet builders (raw scan + canonical replay)
312/// already normalized identity, provenance, metadata, and timestamps, so
313/// the lexical builder no longer has to re-walk the raw connector
314/// payload (`coding_agent_session_search-ibuuh.32`). We still re-derive
315/// `source_id`/`origin_kind`/`origin_host` from `metadata.cass.origin`
316/// (rather than trusting `packet.payload.provenance` blindly) so the
317/// packet pipeline produces byte-identical CassDocuments to the legacy
318/// `cass_doc_context` path — that's the equivalence gate covered by
319/// `packet_driven_lexical_pipeline_matches_legacy_for_normalized_conv`.
320fn cass_doc_context_from_packet(packet: &ConversationPacket) -> CassDocContext {
321    let payload = &packet.payload;
322    let metadata = &payload.metadata_json;
323    let cass_origin = metadata.get("cass").and_then(|c| c.get("origin"));
324    let raw_source_id = cass_origin
325        .and_then(|o| o.get("source_id"))
326        .and_then(|v| v.as_str());
327    let raw_origin_kind = cass_origin
328        .and_then(|o| o.get("kind"))
329        .and_then(|v| v.as_str());
330    let origin_host = normalized_index_origin_host(
331        cass_origin
332            .and_then(|o| o.get("host"))
333            .and_then(|v| v.as_str()),
334    );
335    let source_id =
336        normalized_index_source_id(raw_source_id, raw_origin_kind, origin_host.as_deref());
337    let origin_kind = normalized_index_origin_kind(&source_id, raw_origin_kind);
338
339    CassDocContext {
340        agent: payload.identity.agent_slug.clone(),
341        workspace: payload.identity.workspace.clone(),
342        workspace_original: metadata
343            .get("cass")
344            .and_then(|c| c.get("workspace_original"))
345            .and_then(|v| v.as_str())
346            .map(ToOwned::to_owned),
347        source_path: payload.identity.source_path.clone(),
348        title: payload.identity.title.clone(),
349        started_at_fallback: payload.timestamps.started_at,
350        source_id,
351        origin_kind,
352        origin_host,
353        conversation_id: payload.identity.conversation_id,
354    }
355}
356
357/// Build a single CassDocument from a packet message, matching the
358/// legacy `cass_document_for_message` filter and projection rules.
359fn cass_document_for_packet_message(
360    context: &CassDocContext,
361    msg: &ConversationPacketMessage,
362) -> Option<FsCassDocument> {
363    if is_hard_message_noise(Some(msg.role.as_str()), &msg.content) {
364        return None;
365    }
366
367    Some(FsCassDocument {
368        agent: context.agent.clone(),
369        workspace: context.workspace.clone(),
370        workspace_original: context.workspace_original.clone(),
371        source_path: context.source_path.clone(),
372        msg_idx: msg.idx.max(0) as u64,
373        created_at: msg.created_at.or(context.started_at_fallback),
374        title: context.title.clone(),
375        content: msg.content.clone(),
376        conversation_id: context.conversation_id,
377        source_id: context.source_id.clone(),
378        origin_kind: context.origin_kind.clone(),
379        origin_host: context.origin_host.clone(),
380    })
381}
382
383#[allow(clippy::too_many_arguments)]
384fn push_packet_message_into_pending<F>(
385    inner: &mut FsCassTantivyIndex,
386    context: &CassDocContext,
387    msg: &ConversationPacketMessage,
388    docs: &mut Vec<FsCassDocument>,
389    pending_chars: &mut usize,
390    max_messages: usize,
391    max_chars: usize,
392    on_batch_flushed: &mut F,
393) -> Result<()>
394where
395    F: FnMut(usize) -> Result<()>,
396{
397    let Some(doc) = cass_document_for_packet_message(context, msg) else {
398        return Ok(());
399    };
400    push_cass_document_into_pending(docs, pending_chars, doc);
401    if docs.len() >= max_messages || *pending_chars >= max_chars {
402        let flushed_docs = docs.len();
403        inner
404            .add_cass_documents(docs.as_slice())
405            .map_err(map_fs_err)?;
406        on_batch_flushed(flushed_docs)?;
407        docs.clear();
408        *pending_chars = 0;
409    }
410    Ok(())
411}
412
413/// Returns true if the given stored hash matches the current schema hash.
414pub fn schema_hash_matches(stored: &str) -> bool {
415    cass_schema_hash_matches(stored)
416}
417
418pub type Fields = FsCassFields;
419pub type MergeStatus = FsCassMergeStatus;
420
421const FEDERATED_SEARCH_MANIFEST_FILE: &str = "federated-search-manifest.json";
422const FEDERATED_SEARCH_MANIFEST_VERSION: u32 = 1;
423const FEDERATED_SEARCH_MANIFEST_KIND: &str = "cass-federated-lexical-index";
424const EVIDENCE_BUNDLE_MANIFEST_TEMP_FILE: &str = "evidence-bundle-manifest.json.tmp";
425
426#[derive(Debug, Clone, PartialEq, Eq)]
427pub struct SearchableIndexSummary {
428    pub docs: usize,
429    pub segments: usize,
430}
431
432#[derive(Debug, Clone, Serialize, Deserialize)]
433struct FederatedSearchManifest {
434    version: u32,
435    kind: String,
436    schema_hash: String,
437    shards: Vec<FederatedSearchShardManifest>,
438}
439
440#[derive(Debug, Clone, Serialize, Deserialize)]
441struct FederatedSearchShardManifest {
442    relative_path: String,
443    docs: usize,
444    segments: usize,
445    meta_fingerprint: String,
446}
447
448fn federated_search_manifest_path(index_path: &Path) -> PathBuf {
449    index_path.join(FEDERATED_SEARCH_MANIFEST_FILE)
450}
451
452fn write_root_schema_hash_file(index_path: &Path) -> Result<()> {
453    fs::write(
454        index_path.join("schema_hash.json"),
455        format!("{{\"schema_hash\":\"{CASS_SCHEMA_HASH}\"}}"),
456    )
457    .with_context(|| {
458        format!(
459            "writing cass schema hash metadata for searchable index {}",
460            index_path.display()
461        )
462    })?;
463    Ok(())
464}
465
466fn manifest_relative_shard_path(shard_idx: usize) -> String {
467    format!("shards/shard-{shard_idx:05}")
468}
469
470fn meta_fingerprint_for_existing_index_dir(index_path: &Path) -> Result<String> {
471    let meta_path = index_path.join("meta.json");
472    let bytes = fs::read(&meta_path)
473        .with_context(|| format!("reading Tantivy meta file {}", meta_path.display()))?;
474    Ok(blake3::hash(&bytes).to_hex().to_string())
475}
476
477fn validate_federated_shard_relative_path(relative_path: &str) -> Result<()> {
478    if relative_path.trim().is_empty() {
479        return Err(anyhow::anyhow!(
480            "federated lexical shard path must not be empty"
481        ));
482    }
483
484    let path = Path::new(relative_path);
485    let mut components = path.components();
486    match components.next() {
487        Some(std::path::Component::Normal(component))
488            if component == std::ffi::OsStr::new("shards") => {}
489        _ => {
490            return Err(anyhow::anyhow!(
491                "federated lexical shard path must stay under shards/: {}",
492                relative_path
493            ));
494        }
495    }
496
497    let mut has_child = false;
498    for component in components {
499        match component {
500            std::path::Component::Normal(_) => has_child = true,
501            _ => {
502                return Err(anyhow::anyhow!(
503                    "federated lexical shard path must be a clean relative path: {}",
504                    relative_path
505                ));
506            }
507        }
508    }
509
510    if !has_child {
511        return Err(anyhow::anyhow!(
512            "federated lexical shard path must name a shard directory under shards/: {}",
513            relative_path
514        ));
515    }
516
517    Ok(())
518}
519
520fn validate_federated_shard_meta_fingerprint(fingerprint: &str) -> Result<()> {
521    if fingerprint.len() != 64 || !fingerprint.bytes().all(|byte| byte.is_ascii_hexdigit()) {
522        return Err(anyhow::anyhow!(
523            "federated lexical shard meta fingerprint must be a 64-character hex BLAKE3 digest"
524        ));
525    }
526    Ok(())
527}
528
529fn federated_search_manifest_summary(
530    index_path: &Path,
531    manifest: &FederatedSearchManifest,
532) -> Result<SearchableIndexSummary> {
533    let mut docs = 0usize;
534    let mut segments = 0usize;
535    for shard in &manifest.shards {
536        docs = docs.checked_add(shard.docs).with_context(|| {
537            format!(
538                "federated search manifest doc count overflows platform usize: {}",
539                index_path.display()
540            )
541        })?;
542        segments = segments.checked_add(shard.segments).with_context(|| {
543            format!(
544                "federated search manifest segment count overflows platform usize: {}",
545                index_path.display()
546            )
547        })?;
548    }
549    Ok(SearchableIndexSummary { docs, segments })
550}
551
552fn validate_federated_search_manifest(
553    index_path: &Path,
554    manifest: &FederatedSearchManifest,
555    verify_shard_fingerprints: bool,
556) -> Result<()> {
557    if manifest.version != FEDERATED_SEARCH_MANIFEST_VERSION {
558        return Err(anyhow::anyhow!(
559            "unsupported federated search manifest version: expected {}, got {}",
560            FEDERATED_SEARCH_MANIFEST_VERSION,
561            manifest.version
562        ));
563    }
564    if manifest.kind != FEDERATED_SEARCH_MANIFEST_KIND {
565        return Err(anyhow::anyhow!(
566            "unexpected federated search manifest kind: expected {}, got {}",
567            FEDERATED_SEARCH_MANIFEST_KIND,
568            manifest.kind
569        ));
570    }
571    if manifest.schema_hash != CASS_SCHEMA_HASH {
572        return Err(anyhow::anyhow!(
573            "federated search manifest schema mismatch: expected {}, got {}",
574            CASS_SCHEMA_HASH,
575            manifest.schema_hash
576        ));
577    }
578    if manifest.shards.is_empty() {
579        return Err(anyhow::anyhow!(
580            "federated search manifest must contain at least one shard"
581        ));
582    }
583
584    let mut seen_relative_paths = BTreeSet::new();
585    for shard in &manifest.shards {
586        validate_federated_shard_relative_path(&shard.relative_path)?;
587        validate_federated_shard_meta_fingerprint(&shard.meta_fingerprint)?;
588        if !seen_relative_paths.insert(shard.relative_path.clone()) {
589            return Err(anyhow::anyhow!(
590                "federated search manifest contains duplicate shard path: {}",
591                shard.relative_path
592            ));
593        }
594
595        if verify_shard_fingerprints {
596            let shard_path = index_path.join(&shard.relative_path);
597            let actual = meta_fingerprint_for_existing_index_dir(&shard_path)?;
598            if actual != shard.meta_fingerprint {
599                return Err(anyhow::anyhow!(
600                    "federated lexical shard fingerprint mismatch for {}: expected {}, got {}",
601                    shard_path.display(),
602                    shard.meta_fingerprint,
603                    actual
604                ));
605            }
606        }
607    }
608
609    federated_search_manifest_summary(index_path, manifest)?;
610    Ok(())
611}
612
613fn federated_evidence_chunk_role(relative_path: &str) -> EvidenceBundleChunkRole {
614    if relative_path == FEDERATED_SEARCH_MANIFEST_FILE {
615        EvidenceBundleChunkRole::Manifest
616    } else if relative_path == "schema_hash.json"
617        || relative_path == "meta.json"
618        || relative_path.ends_with("/meta.json")
619    {
620        EvidenceBundleChunkRole::Metadata
621    } else if relative_path.starts_with("shards/") {
622        EvidenceBundleChunkRole::LexicalShard
623    } else {
624        EvidenceBundleChunkRole::Other
625    }
626}
627
628fn standard_lexical_evidence_chunk_role(relative_path: &str) -> EvidenceBundleChunkRole {
629    if relative_path == "schema_hash.json" || relative_path == "meta.json" {
630        EvidenceBundleChunkRole::Metadata
631    } else {
632        EvidenceBundleChunkRole::LexicalShard
633    }
634}
635
636fn current_schema_hash_file_matches(index_path: &Path) -> Result<()> {
637    let schema_hash_path = index_path.join("schema_hash.json");
638    let content = fs::read_to_string(&schema_hash_path).with_context(|| {
639        format!(
640            "reading cass schema hash metadata for lexical artifact {}",
641            schema_hash_path.display()
642        )
643    })?;
644    let value: serde_json::Value = serde_json::from_str(&content).with_context(|| {
645        format!(
646            "parsing cass schema hash metadata for lexical artifact {}",
647            schema_hash_path.display()
648        )
649    })?;
650    let stored_hash = value
651        .get("schema_hash")
652        .and_then(|value| value.as_str())
653        .ok_or_else(|| {
654            anyhow::anyhow!(
655                "lexical artifact schema hash metadata is missing schema_hash: {}",
656                schema_hash_path.display()
657            )
658        })?;
659    if stored_hash != CASS_SCHEMA_HASH {
660        return Err(anyhow::anyhow!(
661            "lexical artifact schema mismatch: expected {}, got {}",
662            CASS_SCHEMA_HASH,
663            stored_hash
664        ));
665    }
666    Ok(())
667}
668
669fn relative_artifact_path_string(relative_path: &Path) -> Result<String> {
670    let mut parts = Vec::new();
671    for component in relative_path.components() {
672        match component {
673            std::path::Component::Normal(part) => {
674                let part = part.to_str().ok_or_else(|| {
675                    anyhow::anyhow!(
676                        "lexical artifact path is not UTF-8: {}",
677                        relative_path.display()
678                    )
679                })?;
680                parts.push(part);
681            }
682            _ => {
683                return Err(anyhow::anyhow!(
684                    "lexical artifact path contains an unsafe component: {}",
685                    relative_path.display()
686                ));
687            }
688        }
689    }
690    if parts.is_empty() {
691        return Err(anyhow::anyhow!("lexical artifact path must not be empty"));
692    }
693    Ok(parts.join("/"))
694}
695
696fn is_evidence_bundle_writer_file(relative_path: &str) -> bool {
697    relative_path == EVIDENCE_BUNDLE_MANIFEST_FILE
698        || relative_path == EVIDENCE_BUNDLE_MANIFEST_TEMP_FILE
699}
700
701fn collect_federated_evidence_artifact_paths(
702    root: &Path,
703    current: &Path,
704    relative_paths: &mut Vec<String>,
705) -> Result<()> {
706    let entries = fs::read_dir(current)
707        .with_context(|| format!("reading artifact dir {}", current.display()))?;
708    for entry in entries {
709        let entry =
710            entry.with_context(|| format!("reading artifact entry in {}", current.display()))?;
711        let path = entry.path();
712        let file_type = entry
713            .file_type()
714            .with_context(|| format!("reading artifact file type {}", path.display()))?;
715        if file_type.is_dir() {
716            collect_federated_evidence_artifact_paths(root, &path, relative_paths)?;
717        } else if file_type.is_file() {
718            let relative_path = path.strip_prefix(root).with_context(|| {
719                format!(
720                    "computing relative artifact path for {} under {}",
721                    path.display(),
722                    root.display()
723                )
724            })?;
725            let relative_path = relative_artifact_path_string(relative_path)?;
726            if !is_evidence_bundle_writer_file(&relative_path) {
727                relative_paths.push(relative_path);
728            }
729        } else {
730            return Err(anyhow::anyhow!(
731                "lexical artifact contains unsupported non-file entry: {}",
732                path.display()
733            ));
734        }
735    }
736    Ok(())
737}
738
739fn lexical_evidence_bundle_id(chunks: &[EvidenceBundleChunk]) -> Result<String> {
740    let mut hasher = blake3::Hasher::new();
741    hasher.update(b"cass-lexical-evidence-v1\n");
742    for chunk in chunks {
743        let bytes = serde_json::to_vec(chunk).context("serializing evidence bundle chunk")?;
744        hasher.update(&bytes);
745        hasher.update(b"\n");
746    }
747    Ok(format!("cass-lexical-{}", hasher.finalize().to_hex()))
748}
749
750fn lexical_search_evidence_bundle_manifest_with_roles(
751    index_path: &Path,
752    role_for_path: fn(&str) -> EvidenceBundleChunkRole,
753) -> Result<EvidenceBundleManifest> {
754    let mut relative_paths = Vec::new();
755    collect_federated_evidence_artifact_paths(index_path, index_path, &mut relative_paths)?;
756    relative_paths.sort();
757
758    let chunks = relative_paths
759        .into_iter()
760        .map(|relative_path| {
761            let role = role_for_path(&relative_path);
762            EvidenceBundleChunk::from_file(index_path, relative_path, role, true, None)
763        })
764        .collect::<Result<Vec<_>>>()?;
765    let bundle_id = lexical_evidence_bundle_id(&chunks)?;
766    let mut evidence =
767        EvidenceBundleManifest::new(bundle_id, EvidenceBundleKind::LexicalGeneration, 0);
768    evidence.chunks = chunks;
769    Ok(evidence)
770}
771
772/// Build a deterministic evidence manifest for any cass lexical artifact.
773///
774/// Normal Tantivy directories and federated lexical bundles both use this proof
775/// surface before remote exchange. Federated bundles get their contract and
776/// shard fingerprints validated; standard indexes require the current
777/// `schema_hash.json` and a readable searchable summary.
778pub fn lexical_search_evidence_bundle_manifest(
779    index_path: &Path,
780) -> Result<EvidenceBundleManifest> {
781    if let Some(manifest) = load_federated_search_manifest_internal(index_path)? {
782        validate_federated_search_manifest(index_path, &manifest, true)?;
783        return lexical_search_evidence_bundle_manifest_with_roles(
784            index_path,
785            federated_evidence_chunk_role,
786        );
787    }
788
789    current_schema_hash_file_matches(index_path)?;
790    searchable_index_summary(index_path)?.ok_or_else(|| {
791        anyhow::anyhow!(
792            "cannot build lexical evidence bundle because no searchable index exists in {}",
793            index_path.display()
794        )
795    })?;
796    lexical_search_evidence_bundle_manifest_with_roles(
797        index_path,
798        standard_lexical_evidence_chunk_role,
799    )
800}
801
802/// Build a deterministic evidence manifest for a federated lexical bundle.
803///
804/// This is the admission proof remote artifact exchange can compare before
805/// accepting a copied bundle: the federated manifest is contract-validated,
806/// every shard `meta.json` fingerprint is checked, then every regular file in
807/// the bundle is recorded with a BLAKE3 digest. The evidence manifest itself is
808/// excluded so repeated verification does not self-mutate the proof.
809pub fn federated_search_evidence_bundle_manifest(
810    index_path: &Path,
811) -> Result<EvidenceBundleManifest> {
812    let Some(manifest) = load_federated_search_manifest_internal(index_path)? else {
813        return Err(anyhow::anyhow!(
814            "cannot build federated lexical evidence bundle without {} in {}",
815            FEDERATED_SEARCH_MANIFEST_FILE,
816            index_path.display()
817        ));
818    };
819    validate_federated_search_manifest(index_path, &manifest, true)?;
820    lexical_search_evidence_bundle_manifest_with_roles(index_path, federated_evidence_chunk_role)
821}
822
823pub fn write_federated_search_evidence_bundle_manifest(index_path: &Path) -> Result<PathBuf> {
824    let manifest = federated_search_evidence_bundle_manifest(index_path)?;
825    manifest.save(index_path)
826}
827
828pub fn write_lexical_search_evidence_bundle_manifest(index_path: &Path) -> Result<PathBuf> {
829    let manifest = lexical_search_evidence_bundle_manifest(index_path)?;
830    manifest.save(index_path)
831}
832
833fn load_federated_search_manifest_internal(
834    index_path: &Path,
835) -> Result<Option<FederatedSearchManifest>> {
836    let manifest_path = federated_search_manifest_path(index_path);
837    match fs::read(&manifest_path) {
838        Ok(bytes) => {
839            let manifest =
840                serde_json::from_slice::<FederatedSearchManifest>(&bytes).with_context(|| {
841                    format!(
842                        "parsing federated search manifest {}",
843                        manifest_path.display()
844                    )
845                })?;
846            validate_federated_search_manifest(index_path, &manifest, false).with_context(
847                || {
848                    format!(
849                        "validating federated search manifest {}",
850                        manifest_path.display()
851                    )
852                },
853            )?;
854            Ok(Some(manifest))
855        }
856        Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(None),
857        Err(err) => Err(err).with_context(|| {
858            format!(
859                "reading federated search manifest {}",
860                manifest_path.display()
861            )
862        }),
863    }
864}
865
866pub fn searchable_index_exists(index_path: &Path) -> bool {
867    index_path.join("meta.json").exists() || federated_search_manifest_path(index_path).exists()
868}
869
870pub fn validate_searchable_index_contract(index_path: &Path) -> Result<()> {
871    if let Some(manifest) = load_federated_search_manifest_internal(index_path)? {
872        validate_federated_search_manifest(index_path, &manifest, true)?;
873        for shard in manifest.shards {
874            let shard_path = index_path.join(&shard.relative_path);
875            fs_cass_open_search_reader(&shard_path, FsReloadPolicy::Manual)
876                .map_err(map_fs_err)
877                .with_context(|| {
878                    format!(
879                        "opening federated lexical shard reader {}",
880                        shard_path.display()
881                    )
882                })?;
883        }
884        return Ok(());
885    }
886
887    if !index_path.join("meta.json").exists() {
888        return Err(anyhow::anyhow!(
889            "standard lexical index metadata is missing in {}",
890            index_path.display()
891        ));
892    }
893    current_schema_hash_file_matches(index_path)?;
894    fs_cass_open_search_reader(index_path, FsReloadPolicy::Manual)
895        .map_err(map_fs_err)
896        .with_context(|| {
897            format!(
898                "opening standard lexical index reader {}",
899                index_path.display()
900            )
901        })?;
902    Ok(())
903}
904
905pub fn searchable_index_modified_time(index_path: &Path) -> Option<SystemTime> {
906    let meta_path = index_path.join("meta.json");
907    if meta_path.exists() {
908        return fs::metadata(meta_path).and_then(|m| m.modified()).ok();
909    }
910    fs::metadata(federated_search_manifest_path(index_path))
911        .and_then(|m| m.modified())
912        .ok()
913}
914
915pub fn searchable_index_fingerprint(index_path: &Path) -> Result<Option<String>> {
916    let meta_path = index_path.join("meta.json");
917    match fs::read(&meta_path) {
918        Ok(bytes) => Ok(Some(blake3::hash(&bytes).to_hex().to_string())),
919        Err(err) if err.kind() == std::io::ErrorKind::NotFound => {
920            let manifest_path = federated_search_manifest_path(index_path);
921            match fs::read(&manifest_path) {
922                Ok(bytes) => Ok(Some(blake3::hash(&bytes).to_hex().to_string())),
923                Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(None),
924                Err(err) => Err(err).with_context(|| {
925                    format!(
926                        "reading federated search manifest {}",
927                        manifest_path.display()
928                    )
929                }),
930            }
931        }
932        Err(err) => {
933            Err(err).with_context(|| format!("reading Tantivy meta file {}", meta_path.display()))
934        }
935    }
936}
937
938pub fn searchable_index_summary(index_path: &Path) -> Result<Option<SearchableIndexSummary>> {
939    if let Some(manifest) = load_federated_search_manifest_internal(index_path)? {
940        return federated_search_manifest_summary(index_path, &manifest).map(Some);
941    }
942
943    let meta_path = index_path.join("meta.json");
944    if !meta_path.exists() {
945        return Ok(None);
946    }
947
948    if let Some(summary) = searchable_index_summary_from_tantivy_meta(index_path)? {
949        return Ok(Some(summary));
950    }
951
952    let mut index = Index::open_in_dir(index_path).with_context(|| {
953        format!(
954            "opening searchable Tantivy index directory for summary: {}",
955            index_path.display()
956        )
957    })?;
958    ensure_tokenizer(&mut index);
959    let segment_metas = index
960        .searchable_segment_metas()
961        .context("reading searchable segment metadata for Tantivy summary")?;
962    Ok(Some(SearchableIndexSummary {
963        docs: segment_metas
964            .iter()
965            .map(|segment| segment.num_docs() as usize)
966            .sum(),
967        segments: segment_metas.len(),
968    }))
969}
970
971fn searchable_index_summary_from_tantivy_meta(
972    index_path: &Path,
973) -> Result<Option<SearchableIndexSummary>> {
974    let meta_path = index_path.join("meta.json");
975    let bytes = fs::read(&meta_path)
976        .with_context(|| format!("reading Tantivy meta file {}", meta_path.display()))?;
977    let meta: serde_json::Value = serde_json::from_slice(&bytes)
978        .with_context(|| format!("parsing Tantivy meta file {}", meta_path.display()))?;
979    let Some(segments) = meta.get("segments").and_then(|value| value.as_array()) else {
980        return Ok(None);
981    };
982
983    let mut docs = 0usize;
984    for segment in segments {
985        if segment
986            .get("deletes")
987            .is_some_and(|deletes| !deletes.is_null())
988        {
989            return Ok(None);
990        }
991        let Some(max_doc) = segment.get("max_doc").and_then(|value| value.as_u64()) else {
992            return Ok(None);
993        };
994        let max_doc = usize::try_from(max_doc).with_context(|| {
995            format!(
996                "Tantivy segment max_doc exceeds platform usize in {}",
997                meta_path.display()
998            )
999        })?;
1000        docs = docs.checked_add(max_doc).with_context(|| {
1001            format!(
1002                "Tantivy segment doc count overflows platform usize in {}",
1003                meta_path.display()
1004            )
1005        })?;
1006    }
1007
1008    Ok(Some(SearchableIndexSummary {
1009        docs,
1010        segments: segments.len(),
1011    }))
1012}
1013
1014pub fn open_federated_search_readers(
1015    index_path: &Path,
1016    reload_policy: FsReloadPolicy,
1017) -> Result<Option<Vec<(IndexReader, Fields)>>> {
1018    let Some(manifest) = load_federated_search_manifest_internal(index_path)? else {
1019        return Ok(None);
1020    };
1021    validate_federated_search_manifest(index_path, &manifest, true)?;
1022
1023    let readers = manifest
1024        .shards
1025        .into_iter()
1026        .map(|shard| {
1027            let shard_path = index_path.join(&shard.relative_path);
1028            fs_cass_open_search_reader(&shard_path, reload_policy)
1029                .map_err(map_fs_err)
1030                .with_context(|| {
1031                    format!(
1032                        "opening federated lexical shard reader {}",
1033                        shard_path.display()
1034                    )
1035                })
1036        })
1037        .collect::<Result<Vec<_>>>()?;
1038    Ok(Some(readers))
1039}
1040
1041fn materialize_federated_search_bundle_for_write(index_path: &Path) -> Result<()> {
1042    let Some(manifest) = load_federated_search_manifest_internal(index_path)? else {
1043        return Ok(());
1044    };
1045    validate_federated_search_manifest(index_path, &manifest, true)?;
1046
1047    let stage_parent = index_path.parent().unwrap_or(index_path);
1048    let materialize_root = tempfile::Builder::new()
1049        .prefix("cass-federated-materialize-")
1050        .tempdir_in(stage_parent)
1051        .with_context(|| {
1052            format!(
1053                "creating staging directory to materialize federated lexical bundle {}",
1054                index_path.display()
1055            )
1056        })?;
1057    let materialized_index_path = materialize_root.path().join("index");
1058    let shard_paths = manifest
1059        .shards
1060        .iter()
1061        .map(|shard| index_path.join(&shard.relative_path))
1062        .collect::<Vec<_>>();
1063
1064    TantivyIndex::assemble_compatible_index_directory_files(&materialized_index_path, &shard_paths)
1065        .with_context(|| {
1066            format!(
1067                "materializing federated lexical bundle into mutable Tantivy index {}",
1068                index_path.display()
1069            )
1070        })?;
1071
1072    if ensure_replaceable_federated_materialization_root(index_path)? {
1073        fs::remove_dir_all(index_path).with_context(|| {
1074            format!(
1075                "removing federated lexical bundle before mutable materialization {}",
1076                index_path.display()
1077            )
1078        })?;
1079    }
1080    fs::rename(&materialized_index_path, index_path).with_context(|| {
1081        format!(
1082            "publishing materialized mutable Tantivy index {} -> {}",
1083            materialized_index_path.display(),
1084            index_path.display()
1085        )
1086    })?;
1087    materialize_root
1088        .close()
1089        .context("closing federated lexical materialization staging directory")?;
1090    Ok(())
1091}
1092
1093fn ensure_replaceable_federated_materialization_root(index_path: &Path) -> Result<bool> {
1094    match fs::symlink_metadata(index_path) {
1095        Ok(metadata) => {
1096            let file_type = metadata.file_type();
1097            if file_type.is_symlink() {
1098                return Err(anyhow::anyhow!(
1099                    "refusing to materialize federated lexical bundle through symlink: {}",
1100                    index_path.display()
1101                ));
1102            }
1103            if !file_type.is_dir() {
1104                return Err(anyhow::anyhow!(
1105                    "refusing to materialize federated lexical bundle because root is not a directory: {}",
1106                    index_path.display()
1107                ));
1108            }
1109            Ok(true)
1110        }
1111        Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(false),
1112        Err(err) => Err(err).with_context(|| {
1113            format!(
1114                "checking federated lexical bundle root before materialization: {}",
1115                index_path.display()
1116            )
1117        }),
1118    }
1119}
1120
1121pub fn publish_federated_searchable_index_directories<P: AsRef<Path>>(
1122    output_path: &Path,
1123    input_paths: &[P],
1124) -> Result<SearchableIndexSummary> {
1125    if input_paths.is_empty() {
1126        return Err(anyhow::anyhow!(
1127            "cannot publish federated lexical bundle without at least one input shard"
1128        ));
1129    }
1130    let mut input_summaries = Vec::with_capacity(input_paths.len());
1131    for input_path in input_paths {
1132        let input_path = input_path.as_ref();
1133        let summary = searchable_index_summary(input_path)?.ok_or_else(|| {
1134            anyhow::anyhow!(
1135                "federated lexical publish input is not a searchable index: {}",
1136                input_path.display()
1137            )
1138        })?;
1139        input_summaries.push((input_path.to_path_buf(), summary));
1140    }
1141    publish_federated_searchable_index_directories_with_summaries(output_path, &input_summaries)
1142}
1143
1144pub fn publish_federated_searchable_index_directories_with_summaries(
1145    output_path: &Path,
1146    input_shards: &[(PathBuf, SearchableIndexSummary)],
1147) -> Result<SearchableIndexSummary> {
1148    if input_shards.is_empty() {
1149        return Err(anyhow::anyhow!(
1150            "cannot publish federated lexical bundle without at least one input shard"
1151        ));
1152    }
1153    ensure_empty_merge_output_directory(output_path)?;
1154
1155    let shard_root = output_path.join("shards");
1156    fs::create_dir_all(&shard_root).with_context(|| {
1157        format!(
1158            "creating federated lexical shard root {}",
1159            shard_root.display()
1160        )
1161    })?;
1162
1163    let mut manifest = FederatedSearchManifest {
1164        version: FEDERATED_SEARCH_MANIFEST_VERSION,
1165        kind: FEDERATED_SEARCH_MANIFEST_KIND.to_string(),
1166        schema_hash: CASS_SCHEMA_HASH.to_string(),
1167        shards: Vec::with_capacity(input_shards.len()),
1168    };
1169    let mut total_docs = 0usize;
1170    let mut total_segments = 0usize;
1171
1172    for (shard_idx, (input_path, summary)) in input_shards.iter().enumerate() {
1173        if !searchable_index_exists(input_path) {
1174            return Err(anyhow::anyhow!(
1175                "federated lexical publish input is not a searchable index: {}",
1176                input_path.display()
1177            ));
1178        }
1179        let meta_fingerprint = meta_fingerprint_for_existing_index_dir(input_path)?;
1180        let relative_path = manifest_relative_shard_path(shard_idx);
1181        let destination_path = output_path.join(&relative_path);
1182        if let Some(parent) = destination_path.parent() {
1183            fs::create_dir_all(parent).with_context(|| {
1184                format!(
1185                    "creating parent directory for federated lexical shard {}",
1186                    destination_path.display()
1187                )
1188            })?;
1189        }
1190        fs::rename(input_path, &destination_path).with_context(|| {
1191            format!(
1192                "moving staged lexical shard {} into federated publish bundle {}",
1193                input_path.display(),
1194                destination_path.display()
1195            )
1196        })?;
1197
1198        total_docs = total_docs.checked_add(summary.docs).with_context(|| {
1199            format!(
1200                "federated lexical publish doc count overflows platform usize for {}",
1201                output_path.display()
1202            )
1203        })?;
1204        total_segments = total_segments
1205            .checked_add(summary.segments)
1206            .with_context(|| {
1207                format!(
1208                    "federated lexical publish segment count overflows platform usize for {}",
1209                    output_path.display()
1210                )
1211            })?;
1212        manifest.shards.push(FederatedSearchShardManifest {
1213            relative_path,
1214            docs: summary.docs,
1215            segments: summary.segments,
1216            meta_fingerprint,
1217        });
1218    }
1219
1220    let manifest_bytes =
1221        serde_json::to_vec_pretty(&manifest).context("serializing federated search manifest")?;
1222    fs::write(federated_search_manifest_path(output_path), &manifest_bytes).with_context(|| {
1223        format!(
1224            "writing federated search manifest {}",
1225            federated_search_manifest_path(output_path).display()
1226        )
1227    })?;
1228    write_root_schema_hash_file(output_path)?;
1229
1230    Ok(SearchableIndexSummary {
1231        docs: total_docs,
1232        segments: total_segments,
1233    })
1234}
1235
1236pub struct TantivyIndex {
1237    inner: FsCassTantivyIndex,
1238    pub fields: Fields,
1239}
1240
1241impl TantivyIndex {
1242    pub fn open_or_create(path: &Path) -> Result<Self> {
1243        materialize_federated_search_bundle_for_write(path)?;
1244        let inner = FsCassTantivyIndex::open_or_create_with_writer_parallelism(
1245            path,
1246            tantivy_writer_parallelism_hint(),
1247        )
1248        .map_err(map_fs_err)?;
1249        let fields = inner.fields();
1250        Ok(Self { inner, fields })
1251    }
1252
1253    pub fn open_or_create_with_writer_parallelism(
1254        path: &Path,
1255        writer_parallelism: usize,
1256    ) -> Result<Self> {
1257        materialize_federated_search_bundle_for_write(path)?;
1258        let inner = FsCassTantivyIndex::open_or_create_with_writer_parallelism(
1259            path,
1260            writer_parallelism.max(1),
1261        )
1262        .map_err(map_fs_err)?;
1263        let fields = inner.fields();
1264        Ok(Self { inner, fields })
1265    }
1266
1267    pub fn add_conversation(&mut self, conv: &NormalizedConversation) -> Result<()> {
1268        // ibuuh.32 migration: route the in-tree convenience entrypoint
1269        // through the packet-driven pipeline so the lexical sink stops
1270        // re-deriving doc context from the raw NormalizedConversation
1271        // separately. The legacy `add_messages_with_conversation_id`
1272        // path remains for indexer/mod.rs callers until that file's
1273        // exclusive lock is released and they can migrate too.
1274        let provenance = ConversationPacketProvenance::local();
1275        let packet = ConversationPacket::from_normalized_conversation(conv, provenance);
1276        self.add_messages_from_packet(&packet, None, None, |_| Ok(()))
1277    }
1278
1279    pub fn add_conversation_with_id(
1280        &mut self,
1281        conv: &NormalizedConversation,
1282        conversation_id: Option<i64>,
1283    ) -> Result<()> {
1284        let provenance = ConversationPacketProvenance::local();
1285        let mut packet = ConversationPacket::from_normalized_conversation(conv, provenance);
1286        // Stamp the canonical id onto the packet identity so the lexical
1287        // doc carries the same conversation_id the legacy path emitted.
1288        packet.payload.identity.conversation_id = conversation_id;
1289        self.add_messages_from_packet(&packet, None, conversation_id, |_| Ok(()))
1290    }
1291
1292    pub fn delete_all(&mut self) -> Result<()> {
1293        self.inner.delete_all().map_err(map_fs_err)
1294    }
1295
1296    pub fn commit(&mut self) -> Result<()> {
1297        self.inner.commit().map_err(map_fs_err)
1298    }
1299
1300    pub fn configure_bulk_load_merge_policy(&mut self) {
1301        self.inner.configure_bulk_load_merge_policy();
1302    }
1303
1304    pub fn reader(&self) -> Result<IndexReader> {
1305        self.inner.reader().map_err(map_fs_err)
1306    }
1307
1308    pub fn segment_count(&self) -> usize {
1309        self.inner.segment_count()
1310    }
1311
1312    pub fn merge_status(&self) -> MergeStatus {
1313        self.inner.merge_status()
1314    }
1315
1316    /// Attempt to merge segments if idle conditions are met.
1317    /// Returns Ok(true) if merge was triggered, Ok(false) if skipped.
1318    pub fn optimize_if_idle(&mut self) -> Result<bool> {
1319        self.inner.optimize_if_idle().map_err(map_fs_err)
1320    }
1321
1322    /// Force immediate segment merge and wait for completion.
1323    /// Use sparingly - blocks until merge finishes.
1324    pub fn force_merge(&mut self) -> Result<()> {
1325        self.inner.force_merge().map_err(map_fs_err)
1326    }
1327
1328    pub fn merge_compatible_index_directories<P: AsRef<Path>>(
1329        output_path: &Path,
1330        input_paths: &[P],
1331    ) -> Result<Self> {
1332        if input_paths.is_empty() {
1333            return Err(anyhow::anyhow!(
1334                "cannot merge Tantivy index directories without at least one input"
1335            ));
1336        }
1337        ensure_empty_merge_output_directory(output_path)?;
1338
1339        let indices = input_paths
1340            .iter()
1341            .map(|input_path| {
1342                let input_path = input_path.as_ref();
1343                let mut index = Index::open_in_dir(input_path).with_context(|| {
1344                    format!(
1345                        "opening compatible Tantivy index directory for merge: {}",
1346                        input_path.display()
1347                    )
1348                })?;
1349                ensure_tokenizer(&mut index);
1350                Ok(index)
1351            })
1352            .collect::<Result<Vec<_>>>()?;
1353        let output_directory = tantivy_crate::directory::MmapDirectory::open(output_path)
1354            .with_context(|| {
1355                format!(
1356                    "opening Tantivy output directory for merged index: {}",
1357                    output_path.display()
1358                )
1359            })?;
1360        let mut merged = tantivy_crate::indexer::merge_indices(&indices, output_directory)
1361            .with_context(|| {
1362                format!(
1363                    "merging {} compatible Tantivy index directories into {}",
1364                    indices.len(),
1365                    output_path.display()
1366                )
1367            })?;
1368        ensure_tokenizer(&mut merged);
1369        fs::write(
1370            output_path.join("schema_hash.json"),
1371            format!("{{\"schema_hash\":\"{CASS_SCHEMA_HASH}\"}}"),
1372        )
1373        .with_context(|| {
1374            format!(
1375                "writing cass schema hash metadata for merged Tantivy index {}",
1376                output_path.display()
1377            )
1378        })?;
1379        drop(merged);
1380        Self::open_or_create(output_path)
1381    }
1382
1383    pub fn assemble_compatible_index_directories<P: AsRef<Path>>(
1384        output_path: &Path,
1385        input_paths: &[P],
1386    ) -> Result<Self> {
1387        Self::assemble_compatible_index_directory_files(output_path, input_paths)?;
1388        Self::open_or_create(output_path)
1389    }
1390
1391    fn assemble_compatible_index_directory_files<P: AsRef<Path>>(
1392        output_path: &Path,
1393        input_paths: &[P],
1394    ) -> Result<()> {
1395        if input_paths.is_empty() {
1396            return Err(anyhow::anyhow!(
1397                "cannot assemble Tantivy index directories without at least one input"
1398            ));
1399        }
1400        ensure_empty_merge_output_directory(output_path)?;
1401
1402        let mut combined_index_meta: Option<tantivy_crate::IndexMeta> = None;
1403        let mut combined_segments = Vec::new();
1404        let mut max_opstamp = 0u64;
1405        let mut managed_paths = BTreeSet::new();
1406
1407        for input_path in input_paths {
1408            let input_path = input_path.as_ref();
1409            let mut index = Index::open_in_dir(input_path).with_context(|| {
1410                format!(
1411                    "opening compatible Tantivy index directory for assembly: {}",
1412                    input_path.display()
1413                )
1414            })?;
1415            ensure_tokenizer(&mut index);
1416            let metas = index.load_metas().with_context(|| {
1417                format!(
1418                    "loading Tantivy metadata for assembled index input {}",
1419                    input_path.display()
1420                )
1421            })?;
1422
1423            match &mut combined_index_meta {
1424                Some(combined_meta) => {
1425                    if metas.schema != combined_meta.schema {
1426                        return Err(anyhow::anyhow!(
1427                            "attempted to assemble Tantivy index directories with different schemas"
1428                        ));
1429                    }
1430                    if metas.index_settings != combined_meta.index_settings {
1431                        return Err(anyhow::anyhow!(
1432                            "attempted to assemble Tantivy index directories with different index settings"
1433                        ));
1434                    }
1435                }
1436                None => {
1437                    combined_index_meta = Some(tantivy_crate::IndexMeta {
1438                        index_settings: metas.index_settings.clone(),
1439                        segments: Vec::new(),
1440                        schema: metas.schema.clone(),
1441                        opstamp: 0,
1442                        payload: None,
1443                    });
1444                }
1445            }
1446
1447            max_opstamp = max_opstamp.max(metas.opstamp);
1448            for segment in metas.segments {
1449                for relative_path in segment.list_files() {
1450                    let source_path = input_path.join(&relative_path);
1451                    if !source_path.exists() {
1452                        continue;
1453                    }
1454                    link_or_copy_searchable_index_file(&source_path, output_path, &relative_path)?;
1455                    if !managed_paths.insert(relative_path.clone()) {
1456                        return Err(anyhow::anyhow!(
1457                            "assembled Tantivy index would contain duplicate segment file path {}",
1458                            relative_path.display()
1459                        ));
1460                    }
1461                }
1462                combined_segments.push(segment);
1463            }
1464        }
1465
1466        let mut combined_index_meta = combined_index_meta.ok_or_else(|| {
1467            anyhow::anyhow!("cannot assemble Tantivy index directories without index metadata")
1468        })?;
1469        combined_index_meta.segments = combined_segments;
1470        combined_index_meta.opstamp = max_opstamp;
1471        combined_index_meta.payload = Some(format!(
1472            "Cass assembled {} compatible Tantivy segments from {} input directories",
1473            combined_index_meta.segments.len(),
1474            input_paths.len()
1475        ));
1476
1477        write_searchable_generation_metadata(
1478            output_path,
1479            &combined_index_meta,
1480            &mut managed_paths,
1481        )?;
1482        Ok(())
1483    }
1484
1485    pub fn add_messages(
1486        &mut self,
1487        conv: &NormalizedConversation,
1488        messages: &[NormalizedMessage],
1489    ) -> Result<()> {
1490        self.add_messages_with_conversation_id(conv, messages, None)
1491    }
1492
1493    pub fn add_messages_with_conversation_id(
1494        &mut self,
1495        conv: &NormalizedConversation,
1496        messages: &[NormalizedMessage],
1497        conversation_id: Option<i64>,
1498    ) -> Result<()> {
1499        self.add_messages_with_conversation_id_and_batch_hook(
1500            conv,
1501            messages,
1502            conversation_id,
1503            |_| Ok(()),
1504        )
1505    }
1506
1507    pub fn add_messages_with_conversation_id_and_batch_hook<F>(
1508        &mut self,
1509        conv: &NormalizedConversation,
1510        messages: &[NormalizedMessage],
1511        conversation_id: Option<i64>,
1512        mut on_batch_flushed: F,
1513    ) -> Result<()>
1514    where
1515        F: FnMut(usize) -> Result<()>,
1516    {
1517        let context = cass_doc_context(conv, conversation_id);
1518        let max_messages = tantivy_add_batch_max_messages();
1519        let max_chars = tantivy_add_batch_max_chars();
1520        let mut docs: Vec<FsCassDocument> = Vec::new();
1521        let mut pending_chars = 0usize;
1522
1523        for msg in messages {
1524            let Some(doc) = cass_document_for_message(&context, msg) else {
1525                continue;
1526            };
1527            push_cass_document_into_pending(&mut docs, &mut pending_chars, doc);
1528            if docs.len() >= max_messages || pending_chars >= max_chars {
1529                let flushed_docs = docs.len();
1530                self.inner.add_cass_documents(&docs).map_err(map_fs_err)?;
1531                on_batch_flushed(flushed_docs)?;
1532                docs.clear();
1533                pending_chars = 0;
1534            }
1535        }
1536
1537        if docs.is_empty() {
1538            Ok(())
1539        } else {
1540            let flushed_docs = docs.len();
1541            self.inner.add_cass_documents(&docs).map_err(map_fs_err)?;
1542            on_batch_flushed(flushed_docs)
1543        }
1544    }
1545
1546    /// Packet-driven counterpart to
1547    /// [`Self::add_messages_with_conversation_id_and_batch_hook`].
1548    ///
1549    /// This is the entrypoint the ibuuh.32 migration uses to feed the
1550    /// lexical sink straight from a normalized [`ConversationPacket`].
1551    /// Callers that already hold a packet (e.g. the rebuild pipeline,
1552    /// or the in-tree convenience entrypoints `add_conversation` and
1553    /// `add_conversation_with_id`) avoid the second normalization pass
1554    /// the legacy `cass_doc_context` path performed against the raw
1555    /// `NormalizedConversation`.
1556    ///
1557    /// `message_indices` lets incremental callers project a subset of
1558    /// the packet's messages (e.g. only newly inserted indices) without
1559    /// rebuilding the packet — when `None`, every message is emitted.
1560    /// `conversation_id_override` lets callers stamp a canonical id
1561    /// without mutating the packet identity in place.
1562    pub fn add_messages_from_packet<F>(
1563        &mut self,
1564        packet: &ConversationPacket,
1565        message_indices: Option<&[usize]>,
1566        conversation_id_override: Option<i64>,
1567        mut on_batch_flushed: F,
1568    ) -> Result<()>
1569    where
1570        F: FnMut(usize) -> Result<()>,
1571    {
1572        let mut context = cass_doc_context_from_packet(packet);
1573        if let Some(id) = conversation_id_override {
1574            context.conversation_id = Some(id);
1575        }
1576
1577        let max_messages = tantivy_add_batch_max_messages();
1578        let max_chars = tantivy_add_batch_max_chars();
1579        let mut docs: Vec<FsCassDocument> = Vec::new();
1580        let mut pending_chars = 0usize;
1581
1582        let messages = &packet.payload.messages;
1583        let total = messages.len();
1584        if let Some(indices) = message_indices {
1585            for &i in indices {
1586                let Some(msg) = messages.get(i) else {
1587                    anyhow::bail!(
1588                        "packet message index {} out of range for packet with {} messages",
1589                        i,
1590                        total
1591                    );
1592                };
1593                push_packet_message_into_pending(
1594                    &mut self.inner,
1595                    &context,
1596                    msg,
1597                    &mut docs,
1598                    &mut pending_chars,
1599                    max_messages,
1600                    max_chars,
1601                    &mut on_batch_flushed,
1602                )?;
1603            }
1604        } else {
1605            for msg in messages {
1606                push_packet_message_into_pending(
1607                    &mut self.inner,
1608                    &context,
1609                    msg,
1610                    &mut docs,
1611                    &mut pending_chars,
1612                    max_messages,
1613                    max_chars,
1614                    &mut on_batch_flushed,
1615                )?;
1616            }
1617        }
1618
1619        if docs.is_empty() {
1620            Ok(())
1621        } else {
1622            let flushed_docs = docs.len();
1623            self.inner.add_cass_documents(&docs).map_err(map_fs_err)?;
1624            on_batch_flushed(flushed_docs)
1625        }
1626    }
1627
1628    pub fn add_prebuilt_documents_slice(&mut self, documents: &[FsCassDocument]) -> Result<usize> {
1629        let max_messages = tantivy_prebuilt_add_batch_max_messages();
1630        let max_chars = tantivy_add_batch_max_chars();
1631        let mut indexed_docs = 0usize;
1632        let mut batch_start = 0usize;
1633        let mut pending_chars = 0usize;
1634
1635        for (idx, doc) in documents.iter().enumerate() {
1636            pending_chars = pending_chars.saturating_add(doc.content.len());
1637            let batch_len = idx + 1 - batch_start;
1638            if batch_len >= max_messages || pending_chars >= max_chars {
1639                let batch_end = idx + 1;
1640                indexed_docs = indexed_docs.saturating_add(batch_end - batch_start);
1641                let Some(batch) = documents.get(batch_start..batch_end) else {
1642                    anyhow::bail!(
1643                        "invalid Tantivy prebuilt document batch range {}..{} for {} documents",
1644                        batch_start,
1645                        batch_end,
1646                        documents.len()
1647                    );
1648                };
1649                self.inner.add_cass_documents(batch).map_err(map_fs_err)?;
1650                batch_start = batch_end;
1651                pending_chars = 0;
1652            }
1653        }
1654
1655        if batch_start < documents.len() {
1656            indexed_docs = indexed_docs.saturating_add(documents.len() - batch_start);
1657            let Some(batch) = documents.get(batch_start..) else {
1658                anyhow::bail!(
1659                    "invalid Tantivy prebuilt document tail range {}.. for {} documents",
1660                    batch_start,
1661                    documents.len()
1662                );
1663            };
1664            self.inner.add_cass_documents(batch).map_err(map_fs_err)?;
1665        }
1666
1667        Ok(indexed_docs)
1668    }
1669
1670    pub fn add_prebuilt_document_refs_slice<'a>(
1671        &mut self,
1672        documents: &[FsCassDocumentRef<'a>],
1673    ) -> Result<usize> {
1674        let max_messages = tantivy_prebuilt_add_batch_max_messages();
1675        let max_chars = tantivy_add_batch_max_chars();
1676        let mut indexed_docs = 0usize;
1677        let mut batch_start = 0usize;
1678        let mut pending_chars = 0usize;
1679
1680        for (idx, doc) in documents.iter().enumerate() {
1681            pending_chars = pending_chars.saturating_add(doc.content.len());
1682            let batch_len = idx + 1 - batch_start;
1683            if batch_len >= max_messages || pending_chars >= max_chars {
1684                let batch_end = idx + 1;
1685                indexed_docs = indexed_docs.saturating_add(batch_end - batch_start);
1686                let Some(batch) = documents.get(batch_start..batch_end) else {
1687                    anyhow::bail!(
1688                        "invalid Tantivy prebuilt document ref batch range {}..{} for {} documents",
1689                        batch_start,
1690                        batch_end,
1691                        documents.len()
1692                    );
1693                };
1694                self.inner
1695                    .add_cass_document_refs(batch)
1696                    .map_err(map_fs_err)?;
1697                batch_start = batch_end;
1698                pending_chars = 0;
1699            }
1700        }
1701
1702        if batch_start < documents.len() {
1703            indexed_docs = indexed_docs.saturating_add(documents.len() - batch_start);
1704            let Some(batch) = documents.get(batch_start..) else {
1705                anyhow::bail!(
1706                    "invalid Tantivy prebuilt document ref tail range {}.. for {} documents",
1707                    batch_start,
1708                    documents.len()
1709                );
1710            };
1711            self.inner
1712                .add_cass_document_refs(batch)
1713                .map_err(map_fs_err)?;
1714        }
1715
1716        Ok(indexed_docs)
1717    }
1718
1719    pub fn add_prebuilt_documents<I>(&mut self, documents: I) -> Result<usize>
1720    where
1721        I: IntoIterator<Item = FsCassDocument>,
1722    {
1723        let max_messages = tantivy_prebuilt_add_batch_max_messages();
1724        let max_chars = tantivy_add_batch_max_chars();
1725        let mut docs = Vec::new();
1726        let mut pending_chars = 0usize;
1727        let mut indexed_docs = 0usize;
1728
1729        for doc in documents {
1730            pending_chars = pending_chars.saturating_add(doc.content.len());
1731            docs.push(doc);
1732            if docs.len() >= max_messages || pending_chars >= max_chars {
1733                indexed_docs = indexed_docs.saturating_add(docs.len());
1734                self.inner.add_cass_documents(&docs).map_err(map_fs_err)?;
1735                docs.clear();
1736                pending_chars = 0;
1737            }
1738        }
1739
1740        if !docs.is_empty() {
1741            indexed_docs = indexed_docs.saturating_add(docs.len());
1742            self.inner.add_cass_documents(&docs).map_err(map_fs_err)?;
1743        }
1744
1745        Ok(indexed_docs)
1746    }
1747
1748    pub fn add_conversations_with_ids<'a, I>(&mut self, conversations: I) -> Result<usize>
1749    where
1750        I: IntoIterator<Item = (&'a NormalizedConversation, Option<i64>)>,
1751    {
1752        let max_messages = tantivy_add_batch_max_messages();
1753        let max_chars = tantivy_add_batch_max_chars();
1754        let mut docs: Vec<FsCassDocument> = Vec::new();
1755        let mut pending_chars = 0usize;
1756        let mut indexed_docs = 0usize;
1757
1758        for (conv, conversation_id) in conversations {
1759            let context = cass_doc_context(conv, conversation_id);
1760            for msg in &conv.messages {
1761                let Some(doc) = cass_document_for_message(&context, msg) else {
1762                    continue;
1763                };
1764                push_cass_document_into_pending(&mut docs, &mut pending_chars, doc);
1765                if docs.len() >= max_messages || pending_chars >= max_chars {
1766                    indexed_docs = indexed_docs.saturating_add(docs.len());
1767                    self.inner.add_cass_documents(&docs).map_err(map_fs_err)?;
1768                    docs.clear();
1769                    pending_chars = 0;
1770                }
1771            }
1772        }
1773
1774        if !docs.is_empty() {
1775            indexed_docs = indexed_docs.saturating_add(docs.len());
1776            self.inner.add_cass_documents(&docs).map_err(map_fs_err)?;
1777        }
1778
1779        Ok(indexed_docs)
1780    }
1781}
1782
1783pub fn build_schema() -> Schema {
1784    fs_build_schema()
1785}
1786
1787pub fn fields_from_schema(schema: &Schema) -> Result<Fields> {
1788    fs_fields_from_schema(schema).map_err(map_fs_err)
1789}
1790
1791pub fn expected_index_dir(base: &Path) -> std::path::PathBuf {
1792    base.join("index").join(CASS_SCHEMA_VERSION)
1793}
1794
1795pub fn index_dir(base: &Path) -> Result<std::path::PathBuf> {
1796    fs_index_dir(base).map_err(map_fs_err)
1797}
1798
1799pub fn ensure_tokenizer(index: &mut Index) {
1800    fs_ensure_tokenizer(index);
1801}
1802
1803fn ensure_empty_merge_output_directory(output_path: &Path) -> Result<()> {
1804    match fs::metadata(output_path) {
1805        Ok(metadata) => {
1806            if !metadata.is_dir() {
1807                return Err(anyhow::anyhow!(
1808                    "merged Tantivy output path is not a directory: {}",
1809                    output_path.display()
1810                ));
1811            }
1812            let mut entries = fs::read_dir(output_path).with_context(|| {
1813                format!(
1814                    "reading merged Tantivy output directory before merge: {}",
1815                    output_path.display()
1816                )
1817            })?;
1818            if entries.next().transpose()?.is_some() {
1819                return Err(anyhow::anyhow!(
1820                    "merged Tantivy output directory must be empty before merge: {}",
1821                    output_path.display()
1822                ));
1823            }
1824        }
1825        Err(err) if err.kind() == std::io::ErrorKind::NotFound => {
1826            fs::create_dir_all(output_path).with_context(|| {
1827                format!(
1828                    "creating merged Tantivy output directory before merge: {}",
1829                    output_path.display()
1830                )
1831            })?;
1832        }
1833        Err(err) => {
1834            return Err(err).with_context(|| {
1835                format!(
1836                    "checking merged Tantivy output directory before merge: {}",
1837                    output_path.display()
1838                )
1839            });
1840        }
1841    }
1842    Ok(())
1843}
1844
1845fn link_or_copy_searchable_index_file(
1846    source_path: &Path,
1847    output_path: &Path,
1848    relative_path: &Path,
1849) -> Result<()> {
1850    let destination_path = output_path.join(relative_path);
1851    if destination_path.exists() {
1852        return Err(anyhow::anyhow!(
1853            "assembled Tantivy output path already exists: {}",
1854            destination_path.display()
1855        ));
1856    }
1857
1858    match fs::hard_link(source_path, &destination_path) {
1859        Ok(()) => Ok(()),
1860        Err(err)
1861            if matches!(
1862                err.kind(),
1863                std::io::ErrorKind::PermissionDenied
1864                    | std::io::ErrorKind::CrossesDevices
1865                    | std::io::ErrorKind::Unsupported
1866            ) =>
1867        {
1868            fs::copy(source_path, &destination_path).with_context(|| {
1869                format!(
1870                    "copying Tantivy segment file into assembled generation {} -> {}",
1871                    source_path.display(),
1872                    destination_path.display()
1873                )
1874            })?;
1875            Ok(())
1876        }
1877        Err(err) => Err(err).with_context(|| {
1878            format!(
1879                "hard-linking Tantivy segment file into assembled generation {} -> {}",
1880                source_path.display(),
1881                destination_path.display()
1882            )
1883        }),
1884    }
1885}
1886
1887fn write_searchable_generation_metadata(
1888    output_path: &Path,
1889    index_meta: &tantivy_crate::IndexMeta,
1890    managed_paths: &mut BTreeSet<std::path::PathBuf>,
1891) -> Result<()> {
1892    let meta_path = output_path.join("meta.json");
1893    fs::write(
1894        &meta_path,
1895        serde_json::to_vec_pretty(index_meta).context("serializing assembled Tantivy meta.json")?,
1896    )
1897    .with_context(|| {
1898        format!(
1899            "writing assembled Tantivy meta.json for {}",
1900            output_path.display()
1901        )
1902    })?;
1903    managed_paths.insert(std::path::PathBuf::from("meta.json"));
1904    fs::write(
1905        output_path.join(".managed.json"),
1906        serde_json::to_vec(managed_paths).context("serializing assembled Tantivy managed paths")?,
1907    )
1908    .with_context(|| {
1909        format!(
1910            "writing assembled Tantivy managed file manifest for {}",
1911            output_path.display()
1912        )
1913    })?;
1914    fs::write(
1915        output_path.join("schema_hash.json"),
1916        format!("{{\"schema_hash\":\"{CASS_SCHEMA_HASH}\"}}"),
1917    )
1918    .with_context(|| {
1919        format!(
1920            "writing cass schema hash metadata for assembled Tantivy index {}",
1921            output_path.display()
1922        )
1923    })?;
1924    Ok(())
1925}
1926
1927#[cfg(test)]
1928mod tests {
1929    use super::*;
1930    use crate::connectors::{NormalizedConversation, NormalizedMessage};
1931    use serde_json::Value;
1932    use std::path::PathBuf;
1933    use tempfile::TempDir;
1934
1935    #[test]
1936    fn open_or_create_roundtrip() {
1937        let dir = TempDir::new().expect("temp dir");
1938        let idx = TantivyIndex::open_or_create(dir.path()).expect("create index");
1939        let reader = idx.reader().expect("reader");
1940        let searcher = reader.searcher();
1941        assert_eq!(searcher.num_docs(), 0);
1942    }
1943
1944    #[test]
1945    fn schema_hash_matches_current_hash() {
1946        assert!(schema_hash_matches(SCHEMA_HASH));
1947        assert!(!schema_hash_matches("invalid"));
1948    }
1949
1950    #[test]
1951    fn default_tantivy_writer_cap_scales_with_memory() -> Result<(), &'static str> {
1952        const GIB: u64 = 1024 * 1024 * 1024;
1953
1954        require_default_tantivy_writer_cap(None, 26, "unknown memory uses ceiling")?;
1955        require_default_tantivy_writer_cap(Some(8 * GIB), 1, "8 GiB caps at one thread")?;
1956        require_default_tantivy_writer_cap(Some(24 * GIB), 2, "24 GiB caps at two threads")?;
1957        require_default_tantivy_writer_cap(Some(64 * GIB), 6, "64 GiB allows six threads")?;
1958        require_default_tantivy_writer_cap(Some(512 * GIB), 26, "512 GiB reaches ceiling")?;
1959        Ok(())
1960    }
1961
1962    #[test]
1963    fn default_tantivy_writer_cap_accounts_for_actual_concurrent_writers() {
1964        const GIB: u64 = 1024 * 1024 * 1024;
1965
1966        assert_eq!(
1967            default_tantivy_max_writer_threads_for_memory_bytes_and_concurrent_writers(
1968                Some(24 * GIB),
1969                8,
1970            ),
1971            2
1972        );
1973        assert_eq!(
1974            default_tantivy_max_writer_threads_for_memory_bytes_and_concurrent_writers(
1975                Some(24 * GIB),
1976                2,
1977            ),
1978            9
1979        );
1980        assert_eq!(
1981            default_tantivy_max_writer_threads_for_memory_bytes_and_concurrent_writers(
1982                Some(24 * GIB),
1983                0,
1984            ),
1985            19
1986        );
1987    }
1988
1989    fn require_default_tantivy_writer_cap(
1990        memory_bytes: Option<u64>,
1991        expected: usize,
1992        label: &'static str,
1993    ) -> Result<(), &'static str> {
1994        if default_tantivy_max_writer_threads_for_memory_bytes(memory_bytes) == expected {
1995            return Ok(());
1996        }
1997        Err(label)
1998    }
1999
2000    #[test]
2001    fn generate_edge_ngrams_prefixes() {
2002        let out = frankensearch::lexical::cass_generate_edge_ngrams("hello world");
2003        assert!(out.contains("he"));
2004        assert!(out.contains("world"));
2005    }
2006
2007    #[test]
2008    fn build_preview_truncates_with_ellipsis() {
2009        let preview = frankensearch::lexical::cass_build_preview("abcdefghijklmnopqrstuvwxyz", 10);
2010        assert_eq!(preview, "abcdefghij…");
2011    }
2012
2013    #[test]
2014    fn merge_status_api_is_exposed() {
2015        let dir = TempDir::new().expect("temp dir");
2016        let index = TantivyIndex::open_or_create(dir.path()).expect("create");
2017        let status = index.merge_status();
2018        assert_eq!(status.merge_threshold, 4);
2019    }
2020
2021    #[test]
2022    fn searchable_index_summary_uses_meta_file_without_opening_index() {
2023        let dir = TempDir::new().expect("temp dir");
2024        fs::write(
2025            dir.path().join("meta.json"),
2026            serde_json::to_vec(&serde_json::json!({
2027                "segments": [
2028                    {"segment_id": "a", "max_doc": 3, "deletes": null},
2029                    {"segment_id": "b", "max_doc": 5}
2030                ]
2031            }))
2032            .expect("serialize meta"),
2033        )
2034        .expect("write meta");
2035
2036        assert_eq!(
2037            searchable_index_summary(dir.path())
2038                .expect("summary")
2039                .expect("index exists"),
2040            SearchableIndexSummary {
2041                docs: 8,
2042                segments: 2
2043            }
2044        );
2045    }
2046
2047    #[test]
2048    fn searchable_index_summary_meta_fast_path_declines_deleted_segments() {
2049        let dir = TempDir::new().expect("temp dir");
2050        fs::write(
2051            dir.path().join("meta.json"),
2052            serde_json::to_vec(&serde_json::json!({
2053                "segments": [
2054                    {"segment_id": "a", "max_doc": 3, "deletes": {"opstamp": 1}}
2055                ]
2056            }))
2057            .expect("serialize meta"),
2058        )
2059        .expect("write meta");
2060
2061        assert_eq!(
2062            searchable_index_summary_from_tantivy_meta(dir.path()).expect("summary"),
2063            None
2064        );
2065    }
2066
2067    #[test]
2068    fn merge_status_should_merge_logic() {
2069        let status = MergeStatus {
2070            segment_count: 5,
2071            last_merge_ts: 0,
2072            ms_since_last_merge: -1,
2073            merge_threshold: 4,
2074            cooldown_ms: 300_000,
2075        };
2076        assert!(status.should_merge());
2077    }
2078
2079    #[test]
2080    fn index_dir_creates_versioned_path() {
2081        let dir = TempDir::new().expect("temp dir");
2082        let result = index_dir(dir.path()).expect("index dir");
2083        // frankensearch CASS_SCHEMA_VERSION bumped v7 -> v8 with the tantivy 0.26.1
2084        // upgrade (rev 2cad158f / frankensearch 0.3.2).
2085        assert!(result.ends_with("index/v8"));
2086    }
2087
2088    #[test]
2089    fn tokenizer_registration_is_callable() {
2090        let dir = TempDir::new().expect("temp dir");
2091        let mut idx = Index::create_in_ram(build_schema());
2092        ensure_tokenizer(&mut idx);
2093        let _ = TantivyIndex::open_or_create(dir.path()).expect("open or create");
2094    }
2095
2096    #[test]
2097    fn add_messages_batches_large_payloads_without_dropping_docs() {
2098        let dir = TempDir::new().expect("temp dir");
2099        let mut idx = TantivyIndex::open_or_create(dir.path()).expect("create index");
2100        let content = "x".repeat(4096);
2101        let messages: Vec<_> = (0..1_200)
2102            .map(|i| NormalizedMessage {
2103                idx: i,
2104                role: "assistant".to_string(),
2105                author: None,
2106                created_at: Some(1_700_000_000_000 + i),
2107                content: format!("{i}-{content}"),
2108                extra: Value::Null,
2109                snippets: Vec::new(),
2110                invocations: Vec::new(),
2111            })
2112            .collect();
2113        let conv = NormalizedConversation {
2114            agent_slug: "codex".to_string(),
2115            external_id: Some("large-batch".to_string()),
2116            title: Some("Large Batch".to_string()),
2117            workspace: Some(PathBuf::from("/tmp/workspace")),
2118            source_path: PathBuf::from("/tmp/rollout.jsonl"),
2119            started_at: Some(1_700_000_000_000),
2120            ended_at: Some(1_700_000_000_999),
2121            metadata: Value::Null,
2122            messages,
2123        };
2124
2125        idx.add_messages(&conv, &conv.messages)
2126            .expect("add messages");
2127        idx.commit().expect("commit");
2128
2129        let reader = idx.reader().expect("reader");
2130        reader.reload().expect("reload");
2131        let searcher = reader.searcher();
2132        assert_eq!(searcher.num_docs(), conv.messages.len() as u64);
2133    }
2134
2135    #[test]
2136    fn add_conversations_with_ids_streams_large_payloads_without_dropping_docs() {
2137        let dir = TempDir::new().expect("temp dir");
2138        let mut idx = TantivyIndex::open_or_create(dir.path()).expect("create index");
2139        let content = "y".repeat(2048);
2140        let conversations: Vec<_> = (0..24)
2141            .map(|conv_idx| {
2142                let messages = (0..256)
2143                    .map(|msg_idx| NormalizedMessage {
2144                        idx: msg_idx,
2145                        role: "assistant".to_string(),
2146                        author: None,
2147                        created_at: Some(1_700_000_000_000 + (conv_idx * 1_000 + msg_idx)),
2148                        content: format!("conv-{conv_idx}-msg-{msg_idx}-{content}"),
2149                        extra: Value::Null,
2150                        snippets: Vec::new(),
2151                        invocations: Vec::new(),
2152                    })
2153                    .collect();
2154                NormalizedConversation {
2155                    agent_slug: "codex".to_string(),
2156                    external_id: Some(format!("conv-{conv_idx}")),
2157                    title: Some(format!("Conversation {conv_idx}")),
2158                    workspace: Some(PathBuf::from("/tmp/workspace")),
2159                    source_path: PathBuf::from(format!("/tmp/rollout-{conv_idx}.jsonl")),
2160                    started_at: Some(1_700_000_000_000 + conv_idx),
2161                    ended_at: Some(1_700_000_000_999 + conv_idx),
2162                    metadata: Value::Null,
2163                    messages,
2164                }
2165            })
2166            .collect();
2167        let expected_docs: usize = conversations.iter().map(|conv| conv.messages.len()).sum();
2168
2169        let indexed_docs = idx
2170            .add_conversations_with_ids(conversations.iter().map(|conv| (conv, Some(42))))
2171            .expect("add conversations");
2172        assert_eq!(indexed_docs, expected_docs);
2173        idx.commit().expect("commit");
2174
2175        let reader = idx.reader().expect("reader");
2176        reader.reload().expect("reload");
2177        let searcher = reader.searcher();
2178        assert_eq!(searcher.num_docs(), expected_docs as u64);
2179    }
2180
2181    #[test]
2182    fn normalized_index_source_id_infers_remote_from_origin_host_without_kind() {
2183        let source_id = normalized_index_source_id(Some("   "), None, Some("dev@laptop"));
2184        assert_eq!(source_id, "dev@laptop");
2185        assert_eq!(normalized_index_origin_kind(&source_id, None), "remote");
2186    }
2187
2188    #[test]
2189    fn add_prebuilt_documents_streams_large_payloads_without_dropping_docs() {
2190        let dir = TempDir::new().expect("temp dir");
2191        let mut idx = TantivyIndex::open_or_create(dir.path()).expect("create index");
2192        let content = "z".repeat(2048);
2193        let docs: Vec<_> = (0..6_144)
2194            .map(|msg_idx| FsCassDocument {
2195                agent: "codex".to_string(),
2196                workspace: Some("/tmp/workspace".to_string()),
2197                workspace_original: None,
2198                source_path: "/tmp/prebuilt-rollout.jsonl".to_string(),
2199                msg_idx: msg_idx as u64,
2200                created_at: Some(1_700_000_000_000 + msg_idx as i64),
2201                title: Some("Prebuilt Batch".to_string()),
2202                content: format!("prebuilt-msg-{msg_idx}-{content}"),
2203                conversation_id: Some(7),
2204                source_id: crate::sources::provenance::LOCAL_SOURCE_ID.to_string(),
2205                origin_kind: crate::sources::provenance::LOCAL_SOURCE_ID.to_string(),
2206                origin_host: None,
2207            })
2208            .collect();
2209        let expected_docs = docs.len();
2210
2211        let indexed_docs = idx.add_prebuilt_documents(docs).expect("add prebuilt docs");
2212        assert_eq!(indexed_docs, expected_docs);
2213        idx.commit().expect("commit");
2214
2215        let reader = idx.reader().expect("reader");
2216        reader.reload().expect("reload");
2217        let searcher = reader.searcher();
2218        assert_eq!(searcher.num_docs(), expected_docs as u64);
2219    }
2220
2221    #[test]
2222    fn merge_compatible_index_directories_roundtrips_docs_into_single_segment() {
2223        let root = TempDir::new().expect("temp dir");
2224        let shard_a = root.path().join("shard-a");
2225        let shard_b = root.path().join("shard-b");
2226        let merged = root.path().join("merged");
2227
2228        let mut shard_a_index = TantivyIndex::open_or_create(&shard_a).expect("create shard a");
2229        let mut shard_b_index = TantivyIndex::open_or_create(&shard_b).expect("create shard b");
2230
2231        let make_conv = |external_id: &str, title: &str, content: &str| NormalizedConversation {
2232            agent_slug: "codex".to_string(),
2233            external_id: Some(external_id.to_string()),
2234            title: Some(title.to_string()),
2235            workspace: Some(PathBuf::from("/tmp/workspace")),
2236            source_path: PathBuf::from(format!("/tmp/{external_id}.jsonl")),
2237            started_at: Some(1_700_000_000_000),
2238            ended_at: Some(1_700_000_000_100),
2239            metadata: Value::Null,
2240            messages: vec![
2241                NormalizedMessage {
2242                    idx: 0,
2243                    role: "user".to_string(),
2244                    author: None,
2245                    created_at: Some(1_700_000_000_010),
2246                    content: format!("{content}-a"),
2247                    extra: Value::Null,
2248                    snippets: Vec::new(),
2249                    invocations: Vec::new(),
2250                },
2251                NormalizedMessage {
2252                    idx: 1,
2253                    role: "assistant".to_string(),
2254                    author: None,
2255                    created_at: Some(1_700_000_000_020),
2256                    content: format!("{content}-b"),
2257                    extra: Value::Null,
2258                    snippets: Vec::new(),
2259                    invocations: Vec::new(),
2260                },
2261            ],
2262        };
2263
2264        let conv_a = make_conv("merge-a", "Merge A", "alpha");
2265        let conv_b = make_conv("merge-b", "Merge B", "beta");
2266        shard_a_index
2267            .add_conversation_with_id(&conv_a, Some(10))
2268            .expect("index shard a");
2269        shard_b_index
2270            .add_conversation_with_id(&conv_b, Some(20))
2271            .expect("index shard b");
2272        shard_a_index.commit().expect("commit shard a");
2273        shard_b_index.commit().expect("commit shard b");
2274        drop(shard_a_index);
2275        drop(shard_b_index);
2276
2277        let merged_index =
2278            TantivyIndex::merge_compatible_index_directories(&merged, &[&shard_a, &shard_b])
2279                .expect("merge shard indices");
2280        assert_eq!(
2281            merged_index.segment_count(),
2282            1,
2283            "merged shard indices should collapse into a single searchable segment"
2284        );
2285        let reader = merged_index.reader().expect("reader");
2286        reader.reload().expect("reload");
2287        assert_eq!(reader.searcher().num_docs(), 4);
2288    }
2289
2290    #[test]
2291    fn merge_compatible_index_directories_rejects_non_empty_output_directory() {
2292        let root = TempDir::new().expect("temp dir");
2293        let shard = root.path().join("shard");
2294        let merged = root.path().join("merged");
2295        fs::create_dir_all(&merged).expect("create merged dir");
2296        fs::write(merged.join("sentinel.txt"), "occupied").expect("write sentinel");
2297
2298        let mut shard_index = TantivyIndex::open_or_create(&shard).expect("create shard");
2299        let conv = NormalizedConversation {
2300            agent_slug: "codex".to_string(),
2301            external_id: Some("merge-occupied".to_string()),
2302            title: Some("Occupied".to_string()),
2303            workspace: Some(PathBuf::from("/tmp/workspace")),
2304            source_path: PathBuf::from("/tmp/merge-occupied.jsonl"),
2305            started_at: Some(1_700_000_000_000),
2306            ended_at: Some(1_700_000_000_100),
2307            metadata: Value::Null,
2308            messages: vec![NormalizedMessage {
2309                idx: 0,
2310                role: "assistant".to_string(),
2311                author: None,
2312                created_at: Some(1_700_000_000_010),
2313                content: "occupied".to_string(),
2314                extra: Value::Null,
2315                snippets: Vec::new(),
2316                invocations: Vec::new(),
2317            }],
2318        };
2319        shard_index
2320            .add_conversation_with_id(&conv, Some(1))
2321            .expect("index shard");
2322        shard_index.commit().expect("commit shard");
2323        drop(shard_index);
2324
2325        let result = TantivyIndex::merge_compatible_index_directories(&merged, &[&shard]);
2326        assert!(
2327            result.is_err(),
2328            "non-empty merge output dir should be rejected"
2329        );
2330        let error = result.err().expect("merge result should contain an error");
2331        assert!(
2332            format!("{error:#}").contains("must be empty"),
2333            "unexpected error: {error:#}"
2334        );
2335    }
2336
2337    #[test]
2338    fn assemble_compatible_index_directories_roundtrips_docs_into_multi_segment_generation() {
2339        let root = TempDir::new().expect("temp dir");
2340        let shard_a = root.path().join("shard-a");
2341        let shard_b = root.path().join("shard-b");
2342        let assembled = root.path().join("assembled");
2343
2344        let mut shard_a_index = TantivyIndex::open_or_create(&shard_a).expect("create shard a");
2345        let mut shard_b_index = TantivyIndex::open_or_create(&shard_b).expect("create shard b");
2346
2347        let make_conv = |external_id: &str, title: &str, content: &str| NormalizedConversation {
2348            agent_slug: "codex".to_string(),
2349            external_id: Some(external_id.to_string()),
2350            title: Some(title.to_string()),
2351            workspace: Some(PathBuf::from("/tmp/workspace")),
2352            source_path: PathBuf::from(format!("/tmp/{external_id}.jsonl")),
2353            started_at: Some(1_700_000_001_000),
2354            ended_at: Some(1_700_000_001_100),
2355            metadata: Value::Null,
2356            messages: vec![
2357                NormalizedMessage {
2358                    idx: 0,
2359                    role: "user".to_string(),
2360                    author: None,
2361                    created_at: Some(1_700_000_001_010),
2362                    content: format!("{content}-a"),
2363                    extra: Value::Null,
2364                    snippets: Vec::new(),
2365                    invocations: Vec::new(),
2366                },
2367                NormalizedMessage {
2368                    idx: 1,
2369                    role: "assistant".to_string(),
2370                    author: None,
2371                    created_at: Some(1_700_000_001_020),
2372                    content: format!("{content}-b"),
2373                    extra: Value::Null,
2374                    snippets: Vec::new(),
2375                    invocations: Vec::new(),
2376                },
2377            ],
2378        };
2379
2380        let conv_a = make_conv("assemble-a", "Assemble A", "alpha");
2381        let conv_b = make_conv("assemble-b", "Assemble B", "beta");
2382        shard_a_index
2383            .add_conversation_with_id(&conv_a, Some(10))
2384            .expect("index shard a");
2385        shard_b_index
2386            .add_conversation_with_id(&conv_b, Some(20))
2387            .expect("index shard b");
2388        shard_a_index.commit().expect("commit shard a");
2389        shard_b_index.commit().expect("commit shard b");
2390        drop(shard_a_index);
2391        drop(shard_b_index);
2392
2393        let assembled_index =
2394            TantivyIndex::assemble_compatible_index_directories(&assembled, &[&shard_a, &shard_b])
2395                .expect("assemble shard indices");
2396        let reader = assembled_index.reader().expect("reader");
2397        reader.reload().expect("reload");
2398        assert_eq!(reader.searcher().num_docs(), 4);
2399        assert_eq!(
2400            assembled_index.segment_count(),
2401            2,
2402            "assembled shard indices should preserve one searchable segment per input artifact"
2403        );
2404        assert!(
2405            assembled.join(".managed.json").exists(),
2406            "assembled index generation should persist a Tantivy managed-file manifest"
2407        );
2408    }
2409
2410    #[test]
2411    fn publish_federated_searchable_index_directories_writes_manifest_without_root_meta() {
2412        let root = TempDir::new().expect("temp dir");
2413        let shard_a = root.path().join("shard-a");
2414        let shard_b = root.path().join("shard-b");
2415        let published = root.path().join("published");
2416
2417        let mut shard_a_index = TantivyIndex::open_or_create(&shard_a).expect("create shard a");
2418        let mut shard_b_index = TantivyIndex::open_or_create(&shard_b).expect("create shard b");
2419
2420        let make_conv = |external_id: &str, title: &str, content: &str| NormalizedConversation {
2421            agent_slug: "codex".to_string(),
2422            external_id: Some(external_id.to_string()),
2423            title: Some(title.to_string()),
2424            workspace: Some(PathBuf::from("/tmp/workspace")),
2425            source_path: PathBuf::from(format!("/tmp/{external_id}.jsonl")),
2426            started_at: Some(1_700_000_002_000),
2427            ended_at: Some(1_700_000_002_100),
2428            metadata: Value::Null,
2429            messages: vec![
2430                NormalizedMessage {
2431                    idx: 0,
2432                    role: "user".to_string(),
2433                    author: None,
2434                    created_at: Some(1_700_000_002_010),
2435                    content: format!("{content}-a"),
2436                    extra: Value::Null,
2437                    snippets: Vec::new(),
2438                    invocations: Vec::new(),
2439                },
2440                NormalizedMessage {
2441                    idx: 1,
2442                    role: "assistant".to_string(),
2443                    author: None,
2444                    created_at: Some(1_700_000_002_020),
2445                    content: format!("{content}-b"),
2446                    extra: Value::Null,
2447                    snippets: Vec::new(),
2448                    invocations: Vec::new(),
2449                },
2450            ],
2451        };
2452
2453        shard_a_index
2454            .add_conversation_with_id(&make_conv("fed-a", "Fed A", "alpha"), Some(10))
2455            .expect("index shard a");
2456        shard_b_index
2457            .add_conversation_with_id(&make_conv("fed-b", "Fed B", "beta"), Some(20))
2458            .expect("index shard b");
2459        shard_a_index.commit().expect("commit shard a");
2460        shard_b_index.commit().expect("commit shard b");
2461        drop(shard_a_index);
2462        drop(shard_b_index);
2463
2464        let summary =
2465            publish_federated_searchable_index_directories(&published, &[&shard_a, &shard_b])
2466                .expect("publish federated bundle");
2467        assert_eq!(summary.docs, 4);
2468        assert_eq!(summary.segments, 2);
2469        assert!(
2470            !published.join("meta.json").exists(),
2471            "federated publish root should not force a standard single-index meta.json"
2472        );
2473        assert!(
2474            published.join(FEDERATED_SEARCH_MANIFEST_FILE).exists(),
2475            "federated publish root should persist its manifest"
2476        );
2477        let manifest = load_federated_search_manifest_internal(&published)
2478            .expect("load manifest")
2479            .expect("manifest present");
2480        assert_eq!(manifest.shards.len(), 2);
2481        assert_eq!(
2482            searchable_index_summary(&published)
2483                .expect("summary")
2484                .expect("searchable summary")
2485                .docs,
2486            4
2487        );
2488    }
2489
2490    fn write_federated_manifest_for_test(index_path: &Path, manifest: &FederatedSearchManifest) {
2491        fs::write(
2492            federated_search_manifest_path(index_path),
2493            serde_json::to_vec_pretty(manifest).expect("serialize manifest"),
2494        )
2495        .expect("write manifest");
2496    }
2497
2498    fn publish_test_federated_bundle(root: &Path) -> PathBuf {
2499        let shard_a = root.join("shard-a");
2500        let shard_b = root.join("shard-b");
2501        let published = root.join("published");
2502
2503        let mut shard_a_index = TantivyIndex::open_or_create(&shard_a).expect("create shard a");
2504        let mut shard_b_index = TantivyIndex::open_or_create(&shard_b).expect("create shard b");
2505
2506        let make_conv = |external_id: &str, content: &str| NormalizedConversation {
2507            agent_slug: "codex".to_string(),
2508            external_id: Some(external_id.to_string()),
2509            title: Some(format!("Bundle {external_id}")),
2510            workspace: Some(PathBuf::from("/tmp/workspace")),
2511            source_path: PathBuf::from(format!("/tmp/{external_id}.jsonl")),
2512            started_at: Some(1_700_000_002_000),
2513            ended_at: Some(1_700_000_002_100),
2514            metadata: Value::Null,
2515            messages: vec![NormalizedMessage {
2516                idx: 0,
2517                role: "assistant".to_string(),
2518                author: None,
2519                created_at: Some(1_700_000_002_010),
2520                content: content.to_string(),
2521                extra: Value::Null,
2522                snippets: Vec::new(),
2523                invocations: Vec::new(),
2524            }],
2525        };
2526
2527        shard_a_index
2528            .add_conversation_with_id(&make_conv("bundle-a", "alpha"), Some(10))
2529            .expect("index shard a");
2530        shard_b_index
2531            .add_conversation_with_id(&make_conv("bundle-b", "beta"), Some(20))
2532            .expect("index shard b");
2533        shard_a_index.commit().expect("commit shard a");
2534        shard_b_index.commit().expect("commit shard b");
2535        drop(shard_a_index);
2536        drop(shard_b_index);
2537
2538        publish_federated_searchable_index_directories(&published, &[&shard_a, &shard_b])
2539            .expect("publish federated bundle");
2540        published
2541    }
2542
2543    #[test]
2544    fn federated_manifest_validation_rejects_unsupported_remote_contracts() {
2545        let root = TempDir::new().expect("temp dir");
2546        let published = root.path().join("published");
2547        fs::create_dir_all(&published).expect("create bundle root");
2548        let base_manifest = FederatedSearchManifest {
2549            version: FEDERATED_SEARCH_MANIFEST_VERSION,
2550            kind: FEDERATED_SEARCH_MANIFEST_KIND.to_string(),
2551            schema_hash: CASS_SCHEMA_HASH.to_string(),
2552            shards: vec![FederatedSearchShardManifest {
2553                relative_path: "shards/shard-00000".to_string(),
2554                docs: 1,
2555                segments: 1,
2556                meta_fingerprint: "a".repeat(64),
2557            }],
2558        };
2559
2560        let mut manifest = base_manifest.clone();
2561        manifest.version = FEDERATED_SEARCH_MANIFEST_VERSION + 1;
2562        write_federated_manifest_for_test(&published, &manifest);
2563        let error = load_federated_search_manifest_internal(&published).unwrap_err();
2564        assert!(
2565            format!("{error:#}").contains("unsupported federated search manifest version"),
2566            "unexpected version error: {error:#}"
2567        );
2568
2569        let mut manifest = base_manifest.clone();
2570        manifest.kind = "cass-unknown-artifact".to_string();
2571        write_federated_manifest_for_test(&published, &manifest);
2572        let error = load_federated_search_manifest_internal(&published).unwrap_err();
2573        assert!(
2574            format!("{error:#}").contains("unexpected federated search manifest kind"),
2575            "unexpected kind error: {error:#}"
2576        );
2577
2578        let mut manifest = base_manifest;
2579        manifest
2580            .shards
2581            .first_mut()
2582            .expect("test manifest should contain a shard")
2583            .relative_path = "../escape".to_string();
2584        write_federated_manifest_for_test(&published, &manifest);
2585        let error = load_federated_search_manifest_internal(&published).unwrap_err();
2586        assert!(
2587            format!("{error:#}").contains("must stay under shards/"),
2588            "unexpected path error: {error:#}"
2589        );
2590    }
2591
2592    #[test]
2593    fn open_federated_search_readers_rejects_corrupt_shard_fingerprint() {
2594        let root = TempDir::new().expect("temp dir");
2595        let published = publish_test_federated_bundle(root.path());
2596        let mut manifest = load_federated_search_manifest_internal(&published)
2597            .expect("load manifest")
2598            .expect("manifest present");
2599        manifest
2600            .shards
2601            .first_mut()
2602            .expect("test manifest should contain a shard")
2603            .meta_fingerprint = "0".repeat(64);
2604        write_federated_manifest_for_test(&published, &manifest);
2605
2606        let result = open_federated_search_readers(&published, FsReloadPolicy::Manual);
2607        assert!(
2608            result.is_err(),
2609            "corrupt federated shard fingerprint should be rejected"
2610        );
2611        let error = result.err().expect("open result should contain an error");
2612        assert!(
2613            format!("{error:#}").contains("federated lexical shard fingerprint mismatch"),
2614            "unexpected fingerprint error: {error:#}"
2615        );
2616    }
2617
2618    fn write_minimal_federated_artifact(root: &Path, segment_bytes: &[u8]) {
2619        let shard = root.join("shards/shard-00000");
2620        fs::create_dir_all(&shard).expect("create shard");
2621        fs::write(shard.join("meta.json"), br#"{"segments":[]}"#).expect("write shard meta");
2622        fs::write(shard.join("segment.bin"), segment_bytes).expect("write shard segment");
2623        write_root_schema_hash_file(root).expect("write schema hash");
2624
2625        let manifest = FederatedSearchManifest {
2626            version: FEDERATED_SEARCH_MANIFEST_VERSION,
2627            kind: FEDERATED_SEARCH_MANIFEST_KIND.to_string(),
2628            schema_hash: CASS_SCHEMA_HASH.to_string(),
2629            shards: vec![FederatedSearchShardManifest {
2630                relative_path: "shards/shard-00000".to_string(),
2631                docs: 0,
2632                segments: 0,
2633                meta_fingerprint: meta_fingerprint_for_existing_index_dir(&shard)
2634                    .expect("meta fingerprint"),
2635            }],
2636        };
2637        write_federated_manifest_for_test(root, &manifest);
2638    }
2639
2640    fn write_minimal_standard_lexical_artifact(root: &Path, segment_bytes: &[u8]) {
2641        fs::create_dir_all(root).expect("create standard lexical root");
2642        fs::write(root.join("meta.json"), br#"{"segments":[]}"#).expect("write root meta");
2643        fs::write(root.join("segment.bin"), segment_bytes).expect("write segment");
2644        write_root_schema_hash_file(root).expect("write schema hash");
2645    }
2646
2647    #[test]
2648    fn lexical_evidence_manifest_supports_standard_searchable_index() {
2649        let root = TempDir::new().expect("temp dir");
2650        write_minimal_standard_lexical_artifact(root.path(), b"standard segment bytes");
2651
2652        let manifest =
2653            lexical_search_evidence_bundle_manifest(root.path()).expect("standard manifest");
2654        assert!(manifest.verify(root.path()).is_complete());
2655        assert!(manifest.chunks.iter().any(|chunk| {
2656            chunk.path == "meta.json" && chunk.role == EvidenceBundleChunkRole::Metadata
2657        }));
2658        assert!(manifest.chunks.iter().any(|chunk| {
2659            chunk.path == "segment.bin" && chunk.role == EvidenceBundleChunkRole::LexicalShard
2660        }));
2661    }
2662
2663    #[test]
2664    fn lexical_evidence_manifest_excludes_writer_temp_file_before_save() {
2665        let root = TempDir::new().expect("temp dir");
2666        write_minimal_standard_lexical_artifact(root.path(), b"standard segment bytes");
2667        fs::write(
2668            root.path().join(EVIDENCE_BUNDLE_MANIFEST_TEMP_FILE),
2669            b"leftover temp manifest bytes",
2670        )
2671        .expect("write stale evidence manifest temp file");
2672
2673        let manifest =
2674            lexical_search_evidence_bundle_manifest(root.path()).expect("standard manifest");
2675        assert!(
2676            manifest
2677                .chunks
2678                .iter()
2679                .all(|chunk| chunk.path != EVIDENCE_BUNDLE_MANIFEST_TEMP_FILE),
2680            "writer temp file must not become part of the saved proof: {manifest:?}"
2681        );
2682
2683        manifest.save(root.path()).expect("save evidence manifest");
2684        let report = crate::evidence_bundle::verify_evidence_bundle_manifest_file(
2685            root.path(),
2686            &crate::evidence_bundle::EvidenceBundleManifest::path(root.path()),
2687        );
2688        assert!(report.is_complete(), "{report:?}");
2689    }
2690
2691    #[test]
2692    fn lexical_evidence_manifest_rejects_standard_schema_mismatch() {
2693        let root = TempDir::new().expect("temp dir");
2694        write_minimal_standard_lexical_artifact(root.path(), b"standard segment bytes");
2695        fs::write(
2696            root.path().join("schema_hash.json"),
2697            r#"{"schema_hash":"stale"}"#,
2698        )
2699        .expect("write stale schema hash");
2700
2701        let error = lexical_search_evidence_bundle_manifest(root.path()).unwrap_err();
2702        assert!(
2703            format!("{error:#}").contains("lexical artifact schema mismatch"),
2704            "unexpected schema error: {error:#}"
2705        );
2706    }
2707
2708    #[test]
2709    fn federated_evidence_manifest_is_deterministic_and_detects_mutation() {
2710        let left = TempDir::new().expect("left temp dir");
2711        let right = TempDir::new().expect("right temp dir");
2712        write_minimal_federated_artifact(left.path(), b"same segment bytes");
2713        write_minimal_federated_artifact(right.path(), b"same segment bytes");
2714
2715        let left_manifest =
2716            federated_search_evidence_bundle_manifest(left.path()).expect("left manifest");
2717        let right_manifest =
2718            federated_search_evidence_bundle_manifest(right.path()).expect("right manifest");
2719        assert_eq!(
2720            serde_json::to_value(&left_manifest).expect("left json"),
2721            serde_json::to_value(&right_manifest).expect("right json"),
2722            "byte-identical federated artifacts should produce byte-identical evidence manifests"
2723        );
2724        assert!(left_manifest.verify(left.path()).is_complete());
2725
2726        fs::write(
2727            left.path().join("shards/shard-00000/segment.bin"),
2728            b"SAME segment bytes",
2729        )
2730        .expect("mutate shard segment");
2731        let report = left_manifest.verify(left.path());
2732        assert!(report.is_unsafe(), "{report:?}");
2733        assert!(
2734            report.issues.iter().any(|issue| issue.kind
2735                == crate::evidence_bundle::EvidenceBundleIssueKind::DigestMismatch),
2736            "expected digest mismatch after segment mutation: {report:?}"
2737        );
2738    }
2739
2740    #[cfg(unix)]
2741    #[test]
2742    fn federated_evidence_manifest_rejects_symlink_artifacts() {
2743        use std::os::unix::fs::symlink;
2744
2745        let root = TempDir::new().expect("temp dir");
2746        write_minimal_federated_artifact(root.path(), b"segment bytes");
2747        symlink(
2748            "/tmp/not-a-bundle-file",
2749            root.path().join("shards/shard-00000/link"),
2750        )
2751        .expect("create artifact symlink");
2752
2753        let error = federated_search_evidence_bundle_manifest(root.path()).unwrap_err();
2754        assert!(
2755            format!("{error:#}").contains("unsupported non-file entry"),
2756            "unexpected symlink error: {error:#}"
2757        );
2758    }
2759
2760    #[test]
2761    fn open_or_create_materializes_federated_bundle_back_into_mutable_index() {
2762        let root = TempDir::new().expect("temp dir");
2763        let shard_a = root.path().join("shard-a");
2764        let shard_b = root.path().join("shard-b");
2765        let published = root.path().join("published");
2766
2767        let mut shard_a_index = TantivyIndex::open_or_create(&shard_a).expect("create shard a");
2768        let mut shard_b_index = TantivyIndex::open_or_create(&shard_b).expect("create shard b");
2769
2770        let make_conv = |external_id: &str, title: &str, content: &str| NormalizedConversation {
2771            agent_slug: "codex".to_string(),
2772            external_id: Some(external_id.to_string()),
2773            title: Some(title.to_string()),
2774            workspace: Some(PathBuf::from("/tmp/workspace")),
2775            source_path: PathBuf::from(format!("/tmp/{external_id}.jsonl")),
2776            started_at: Some(1_700_000_003_000),
2777            ended_at: Some(1_700_000_003_100),
2778            metadata: Value::Null,
2779            messages: vec![
2780                NormalizedMessage {
2781                    idx: 0,
2782                    role: "user".to_string(),
2783                    author: None,
2784                    created_at: Some(1_700_000_003_010),
2785                    content: format!("{content}-a"),
2786                    extra: Value::Null,
2787                    snippets: Vec::new(),
2788                    invocations: Vec::new(),
2789                },
2790                NormalizedMessage {
2791                    idx: 1,
2792                    role: "assistant".to_string(),
2793                    author: None,
2794                    created_at: Some(1_700_000_003_020),
2795                    content: format!("{content}-b"),
2796                    extra: Value::Null,
2797                    snippets: Vec::new(),
2798                    invocations: Vec::new(),
2799                },
2800            ],
2801        };
2802
2803        shard_a_index
2804            .add_conversation_with_id(&make_conv("mat-a", "Mat A", "alpha"), Some(10))
2805            .expect("index shard a");
2806        shard_b_index
2807            .add_conversation_with_id(&make_conv("mat-b", "Mat B", "beta"), Some(20))
2808            .expect("index shard b");
2809        shard_a_index.commit().expect("commit shard a");
2810        shard_b_index.commit().expect("commit shard b");
2811        drop(shard_a_index);
2812        drop(shard_b_index);
2813
2814        publish_federated_searchable_index_directories(&published, &[&shard_a, &shard_b])
2815            .expect("publish federated bundle");
2816        assert!(
2817            published.join(FEDERATED_SEARCH_MANIFEST_FILE).exists(),
2818            "test fixture should start in federated bundle form"
2819        );
2820
2821        let mutable_index =
2822            TantivyIndex::open_or_create(&published).expect("materialize mutable index");
2823        let reader = mutable_index.reader().expect("reader");
2824        reader.reload().expect("reload");
2825        assert_eq!(reader.searcher().num_docs(), 4);
2826        assert!(
2827            published.join("meta.json").exists(),
2828            "writer open should materialize a standard writable Tantivy index"
2829        );
2830        assert!(
2831            !published.join(FEDERATED_SEARCH_MANIFEST_FILE).exists(),
2832            "materialization should replace the federated bundle manifest"
2833        );
2834    }
2835
2836    #[test]
2837    fn federated_materialization_target_rejects_file_root() {
2838        let root = TempDir::new().expect("temp dir");
2839        let published = root.path().join("published");
2840        fs::write(&published, "not a directory").expect("write file root");
2841
2842        let err = ensure_replaceable_federated_materialization_root(&published)
2843            .expect_err("file roots must not be materialized over");
2844
2845        assert!(
2846            err.to_string().contains("not a directory"),
2847            "unexpected error: {err:#}"
2848        );
2849        assert_eq!(
2850            fs::read_to_string(&published).expect("file root should remain"),
2851            "not a directory"
2852        );
2853    }
2854
2855    #[test]
2856    #[cfg(unix)]
2857    fn federated_materialization_target_rejects_dangling_symlink_root() {
2858        use std::os::unix::fs::symlink;
2859
2860        let root = TempDir::new().expect("temp dir");
2861        let published = root.path().join("published");
2862        let missing_target = root.path().join("missing-published");
2863        symlink(&missing_target, &published).expect("create dangling symlink");
2864
2865        let err = ensure_replaceable_federated_materialization_root(&published)
2866            .expect_err("symlink roots must not be materialized over");
2867
2868        assert!(
2869            err.to_string().contains("through symlink"),
2870            "unexpected error: {err:#}"
2871        );
2872        assert!(
2873            fs::symlink_metadata(&published)
2874                .expect("symlink root should remain")
2875                .file_type()
2876                .is_symlink()
2877        );
2878        assert!(!missing_target.exists());
2879    }
2880
2881    /// Equivalence gate for `coding_agent_session_search-ibuuh.32`:
2882    /// the packet-driven lexical pipeline (`add_messages_from_packet`)
2883    /// must emit byte-identical CassDocuments to the legacy
2884    /// `add_messages_with_conversation_id` path on the same input.
2885    /// This proves the migration of `add_conversation*` is a true
2886    /// no-op semantically while removing the duplicate normalization
2887    /// pass, so future migration slices in indexer/mod.rs can adopt
2888    /// `add_messages_from_packet` with confidence.
2889    #[test]
2890    fn packet_driven_lexical_pipeline_matches_legacy_for_normalized_conv() {
2891        use crate::model::conversation_packet::{ConversationPacket, ConversationPacketProvenance};
2892
2893        fn make_conv() -> NormalizedConversation {
2894            NormalizedConversation {
2895                agent_slug: "codex".to_string(),
2896                external_id: Some("packet-equivalence".to_string()),
2897                title: Some("Packet Equivalence".to_string()),
2898                workspace: Some(PathBuf::from("/work/eq")),
2899                source_path: PathBuf::from("/work/eq/.codex/session.jsonl"),
2900                started_at: Some(1_700_000_000_000),
2901                ended_at: Some(1_700_000_010_000),
2902                metadata: serde_json::json!({
2903                    "cass": {
2904                        "origin": {
2905                            "source_id": "remote-host",
2906                            "kind": "ssh",
2907                            "host": "ws-42.example",
2908                        },
2909                        "workspace_original": "/Users/dev/eq",
2910                    },
2911                    "model": "gpt-5",
2912                }),
2913                messages: vec![
2914                    NormalizedMessage {
2915                        idx: 0,
2916                        role: "user".to_string(),
2917                        author: Some("human".to_string()),
2918                        created_at: Some(1_700_000_000_000),
2919                        content: "explain the packet pipeline".to_string(),
2920                        extra: serde_json::json!({"turn": 1}),
2921                        snippets: Vec::new(),
2922                        invocations: Vec::new(),
2923                    },
2924                    NormalizedMessage {
2925                        idx: 1,
2926                        role: "assistant".to_string(),
2927                        author: None,
2928                        created_at: Some(1_700_000_001_000),
2929                        content: "the pipeline normalizes once".to_string(),
2930                        extra: Value::Null,
2931                        snippets: Vec::new(),
2932                        invocations: Vec::new(),
2933                    },
2934                    NormalizedMessage {
2935                        idx: 2,
2936                        role: "tool".to_string(),
2937                        author: Some("ripgrep".to_string()),
2938                        created_at: Some(1_700_000_002_000),
2939                        content: "matches: 3".to_string(),
2940                        extra: Value::Null,
2941                        snippets: Vec::new(),
2942                        invocations: Vec::new(),
2943                    },
2944                ],
2945            }
2946        }
2947
2948        let legacy_dir = TempDir::new().expect("legacy temp dir");
2949        let mut legacy_idx = TantivyIndex::open_or_create(legacy_dir.path()).expect("legacy idx");
2950        let conv = make_conv();
2951        legacy_idx
2952            .add_messages_with_conversation_id(&conv, &conv.messages, Some(99))
2953            .expect("legacy add");
2954        legacy_idx.commit().expect("legacy commit");
2955        let legacy_reader = legacy_idx.reader().expect("legacy reader");
2956        legacy_reader.reload().expect("legacy reload");
2957        let legacy_searcher = legacy_reader.searcher();
2958        let legacy_count = legacy_searcher.num_docs();
2959
2960        let packet_dir = TempDir::new().expect("packet temp dir");
2961        let mut packet_idx = TantivyIndex::open_or_create(packet_dir.path()).expect("packet idx");
2962        let packet = ConversationPacket::from_normalized_conversation(
2963            &conv,
2964            ConversationPacketProvenance::local(),
2965        );
2966        packet_idx
2967            .add_messages_from_packet(&packet, None, Some(99), |_| Ok(()))
2968            .expect("packet add");
2969        packet_idx.commit().expect("packet commit");
2970        let packet_reader = packet_idx.reader().expect("packet reader");
2971        packet_reader.reload().expect("packet reload");
2972        let packet_searcher = packet_reader.searcher();
2973        let packet_count = packet_searcher.num_docs();
2974
2975        assert_eq!(
2976            legacy_count, packet_count,
2977            "packet pipeline must emit the same number of docs as legacy: legacy={legacy_count} packet={packet_count}"
2978        );
2979        assert_eq!(
2980            legacy_count,
2981            conv.messages.len() as u64,
2982            "all 3 fixture messages should land (none filter as hard noise)"
2983        );
2984
2985        // Compare every stored field byte-for-byte by reconstructing the
2986        // CassDocument list both pipelines fed into Tantivy. This sidesteps
2987        // schema-coupled retrieval boilerplate and pins the property the
2988        // bead acceptance gate cares about: same projection, same docs.
2989        let legacy_context = cass_doc_context(&conv, Some(99));
2990        let legacy_docs: Vec<FsCassDocument> = conv
2991            .messages
2992            .iter()
2993            .filter_map(|m| cass_document_for_message(&legacy_context, m))
2994            .collect();
2995        let packet_context_owned = {
2996            let mut ctx = cass_doc_context_from_packet(&packet);
2997            ctx.conversation_id = Some(99);
2998            ctx
2999        };
3000        let packet_docs: Vec<FsCassDocument> = packet
3001            .payload
3002            .messages
3003            .iter()
3004            .filter_map(|m| cass_document_for_packet_message(&packet_context_owned, m))
3005            .collect();
3006        assert_eq!(
3007            legacy_docs.len(),
3008            packet_docs.len(),
3009            "packet doc list length should match legacy"
3010        );
3011        for (legacy_doc, packet_doc) in legacy_docs.iter().zip(packet_docs.iter()) {
3012            assert_eq!(legacy_doc.agent, packet_doc.agent);
3013            assert_eq!(legacy_doc.workspace, packet_doc.workspace);
3014            assert_eq!(legacy_doc.workspace_original, packet_doc.workspace_original);
3015            assert_eq!(legacy_doc.source_path, packet_doc.source_path);
3016            assert_eq!(legacy_doc.msg_idx, packet_doc.msg_idx);
3017            assert_eq!(legacy_doc.created_at, packet_doc.created_at);
3018            assert_eq!(legacy_doc.title, packet_doc.title);
3019            assert_eq!(legacy_doc.content, packet_doc.content);
3020            assert_eq!(legacy_doc.conversation_id, packet_doc.conversation_id);
3021            assert_eq!(
3022                legacy_doc.source_id, packet_doc.source_id,
3023                "source_id must match (remote-host normalization is the bead's tripwire)"
3024            );
3025            assert_eq!(legacy_doc.origin_kind, packet_doc.origin_kind);
3026            assert_eq!(legacy_doc.origin_host, packet_doc.origin_host);
3027        }
3028        // Sanity check the remote-host provenance actually round-tripped:
3029        // a regression in normalization on either side would silently
3030        // pass the per-doc compare unless we pin the expected value too.
3031        let first_packet_doc = packet_docs
3032            .first()
3033            .expect("packet fixture should emit at least one doc");
3034        assert_eq!(
3035            first_packet_doc.source_id, "remote-host",
3036            "metadata.cass.origin.source_id must be the canonical value"
3037        );
3038        assert_eq!(
3039            first_packet_doc.origin_host.as_deref(),
3040            Some("ws-42.example"),
3041            "metadata.cass.origin.host must surface as origin_host"
3042        );
3043    }
3044
3045    /// Pins the `add_conversation_with_id` migration: the convenience
3046    /// entrypoint now routes through the packet pipeline, but operators
3047    /// see no behavioral change. The doc count and conversation_id
3048    /// stamping must match the legacy `add_messages_with_conversation_id`
3049    /// path on the same fixture.
3050    #[test]
3051    fn add_conversation_with_id_packet_path_emits_expected_doc_count() {
3052        fn fixture(id: i64) -> NormalizedConversation {
3053            NormalizedConversation {
3054                agent_slug: "codex".to_string(),
3055                external_id: Some(format!("conv-{id}")),
3056                title: Some(format!("Conv {id}")),
3057                workspace: None,
3058                source_path: PathBuf::from(format!("/tmp/conv-{id}.jsonl")),
3059                started_at: Some(1_700_000_000_000 + id),
3060                ended_at: Some(1_700_000_001_000 + id),
3061                metadata: Value::Null,
3062                messages: vec![
3063                    NormalizedMessage {
3064                        idx: 0,
3065                        role: "user".to_string(),
3066                        author: None,
3067                        created_at: Some(1_700_000_000_000 + id),
3068                        content: format!("hello-{id}"),
3069                        extra: Value::Null,
3070                        snippets: Vec::new(),
3071                        invocations: Vec::new(),
3072                    },
3073                    NormalizedMessage {
3074                        idx: 1,
3075                        role: "assistant".to_string(),
3076                        author: None,
3077                        created_at: Some(1_700_000_000_500 + id),
3078                        content: format!("response-{id}"),
3079                        extra: Value::Null,
3080                        snippets: Vec::new(),
3081                        invocations: Vec::new(),
3082                    },
3083                ],
3084            }
3085        }
3086
3087        let dir = TempDir::new().expect("temp dir");
3088        let mut idx = TantivyIndex::open_or_create(dir.path()).expect("idx");
3089        idx.add_conversation_with_id(&fixture(1), Some(101))
3090            .expect("conv 1");
3091        idx.add_conversation_with_id(&fixture(2), Some(102))
3092            .expect("conv 2");
3093        idx.commit().expect("commit");
3094
3095        let reader = idx.reader().expect("reader");
3096        reader.reload().expect("reload");
3097        assert_eq!(
3098            reader.searcher().num_docs(),
3099            4,
3100            "two conversations × two messages each ⇒ four lexical docs"
3101        );
3102    }
3103}