use std::collections::BTreeSet;
use std::fs;
use std::path::{Path, PathBuf};
use crate::connectors::NormalizedConversation;
use crate::connectors::NormalizedMessage;
use crate::evidence_bundle::{
EVIDENCE_BUNDLE_MANIFEST_FILE, EvidenceBundleChunk, EvidenceBundleChunkRole,
EvidenceBundleKind, EvidenceBundleManifest,
};
use crate::model::conversation_packet::{
ConversationPacket, ConversationPacketMessage, ConversationPacketProvenance,
};
use crate::search::canonicalize::is_hard_message_noise;
use crate::sources::provenance::LOCAL_SOURCE_ID;
use anyhow::{Context, Error, Result};
use frankensearch::lexical::{
CASS_SCHEMA_HASH, CASS_SCHEMA_VERSION, CassDocument as FsCassDocument,
CassDocumentRef as FsCassDocumentRef, CassFields as FsCassFields,
CassMergeStatus as FsCassMergeStatus, CassTantivyIndex as FsCassTantivyIndex, Index,
IndexReader, ReloadPolicy as FsReloadPolicy, Schema, cass_build_schema as fs_build_schema,
cass_ensure_tokenizer as fs_ensure_tokenizer, cass_fields_from_schema as fs_fields_from_schema,
cass_index_dir as fs_index_dir, cass_open_search_reader as fs_cass_open_search_reader,
cass_schema_hash_matches, tantivy_crate,
};
use serde::{Deserialize, Serialize};
use std::time::SystemTime;
pub(crate) fn normalized_index_source_id(
source_id: Option<&str>,
origin_kind: Option<&str>,
origin_host: Option<&str>,
) -> String {
let trimmed_source_id = source_id.unwrap_or_default().trim();
if !trimmed_source_id.is_empty() {
if trimmed_source_id.eq_ignore_ascii_case(LOCAL_SOURCE_ID) {
return LOCAL_SOURCE_ID.to_string();
}
return trimmed_source_id.to_string();
}
let trimmed_origin_host = origin_host.map(str::trim).filter(|value| !value.is_empty());
let trimmed_origin_kind = origin_kind.unwrap_or_default().trim();
if trimmed_origin_kind.eq_ignore_ascii_case("ssh")
|| trimmed_origin_kind.eq_ignore_ascii_case("remote")
{
return trimmed_origin_host.unwrap_or("remote").to_string();
}
if let Some(origin_host) = trimmed_origin_host {
return origin_host.to_string();
}
LOCAL_SOURCE_ID.to_string()
}
pub(crate) fn normalized_index_origin_kind(source_id: &str, origin_kind: Option<&str>) -> String {
if let Some(kind) = origin_kind.map(str::trim).filter(|value| !value.is_empty()) {
if kind.eq_ignore_ascii_case("local") {
return LOCAL_SOURCE_ID.to_string();
}
if kind.eq_ignore_ascii_case("ssh") || kind.eq_ignore_ascii_case("remote") {
return "remote".to_string();
}
return kind.to_ascii_lowercase();
}
if source_id == LOCAL_SOURCE_ID {
LOCAL_SOURCE_ID.to_string()
} else {
"remote".to_string()
}
}
pub(crate) fn normalized_index_origin_host(origin_host: Option<&str>) -> Option<String> {
origin_host
.map(str::trim)
.filter(|value| !value.is_empty())
.map(str::to_string)
}
pub const SCHEMA_HASH: &str = CASS_SCHEMA_HASH;
const ENV_TANTIVY_ADD_BATCH_MAX_CHARS: &str = "CASS_TANTIVY_ADD_BATCH_MAX_CHARS";
const ENV_TANTIVY_ADD_BATCH_MAX_MESSAGES: &str = "CASS_TANTIVY_ADD_BATCH_MAX_MESSAGES";
const ENV_TANTIVY_MAX_WRITER_THREADS: &str = "CASS_TANTIVY_MAX_WRITER_THREADS";
const ENV_TANTIVY_REBUILD_STAGED_SHARD_BUILDERS: &str =
"CASS_TANTIVY_REBUILD_STAGED_SHARD_BUILDERS";
const DEFAULT_TANTIVY_MAX_WRITER_THREADS_CEILING: usize = 26;
const DEFAULT_TANTIVY_ASSUMED_CONCURRENT_WRITERS: u64 = 8;
const DEFAULT_TANTIVY_WRITER_HEAP_PER_THREAD_BYTES: u64 = 128 * 1024 * 1024;
const DEFAULT_TANTIVY_WRITER_HEAP_RAM_FRACTION: u64 = 10;
fn positive_usize_env(name: &str) -> Option<usize> {
dotenvy::var(name)
.ok()
.and_then(|value| value.parse::<usize>().ok())
.filter(|value| *value > 0)
}
#[cfg(target_os = "linux")]
fn linux_available_memory_bytes() -> Option<u64> {
let meminfo = std::fs::read_to_string("/proc/meminfo").ok()?;
for line in meminfo.lines() {
if let Some(rest) = line.strip_prefix("MemAvailable:") {
let kb = rest.split_whitespace().next()?.parse::<u64>().ok()?;
return kb.checked_mul(1024);
}
}
None
}
#[cfg(target_os = "linux")]
fn host_memory_bytes_for_tantivy_default() -> Option<u64> {
linux_available_memory_bytes()
}
#[cfg(target_os = "macos")]
fn host_memory_bytes_for_tantivy_default() -> Option<u64> {
let output = std::process::Command::new("sysctl")
.args(["-n", "hw.memsize"])
.output()
.ok()?;
if !output.status.success() {
return None;
}
let stdout = String::from_utf8(output.stdout).ok()?;
stdout.trim().parse::<u64>().ok()
}
#[cfg(not(any(target_os = "linux", target_os = "macos")))]
fn host_memory_bytes_for_tantivy_default() -> Option<u64> {
None
}
#[cfg(test)]
pub(crate) fn default_tantivy_max_writer_threads_for_memory_bytes(
memory_bytes: Option<u64>,
) -> usize {
default_tantivy_max_writer_threads_for_memory_bytes_and_concurrent_writers(
memory_bytes,
DEFAULT_TANTIVY_ASSUMED_CONCURRENT_WRITERS,
)
}
pub(crate) fn default_tantivy_max_writer_threads_for_memory_bytes_and_concurrent_writers(
memory_bytes: Option<u64>,
concurrent_writers: u64,
) -> usize {
let Some(memory_bytes) = memory_bytes else {
return DEFAULT_TANTIVY_MAX_WRITER_THREADS_CEILING;
};
let per_thread_peak =
DEFAULT_TANTIVY_WRITER_HEAP_PER_THREAD_BYTES.saturating_mul(concurrent_writers.max(1));
if per_thread_peak == 0 {
return DEFAULT_TANTIVY_MAX_WRITER_THREADS_CEILING;
}
let budget = memory_bytes / DEFAULT_TANTIVY_WRITER_HEAP_RAM_FRACTION;
let threads = budget / per_thread_peak;
usize::try_from(threads)
.unwrap_or(usize::MAX)
.clamp(1, DEFAULT_TANTIVY_MAX_WRITER_THREADS_CEILING)
}
fn default_tantivy_assumed_concurrent_writers() -> u64 {
positive_usize_env(ENV_TANTIVY_REBUILD_STAGED_SHARD_BUILDERS)
.and_then(|value| u64::try_from(value).ok())
.unwrap_or(DEFAULT_TANTIVY_ASSUMED_CONCURRENT_WRITERS)
.max(1)
}
pub fn default_tantivy_max_writer_threads() -> usize {
default_tantivy_max_writer_threads_for_memory_bytes_and_concurrent_writers(
host_memory_bytes_for_tantivy_default(),
default_tantivy_assumed_concurrent_writers(),
)
}
pub(crate) fn tantivy_writer_parallelism_hint_for_available(available_parallelism: usize) -> usize {
let max_threads = positive_usize_env(ENV_TANTIVY_MAX_WRITER_THREADS)
.unwrap_or_else(default_tantivy_max_writer_threads);
available_parallelism.max(1).clamp(1, max_threads)
}
pub(crate) fn tantivy_writer_parallelism_hint_for_available_governed(
available_parallelism: usize,
) -> usize {
let raw = tantivy_writer_parallelism_hint_for_available(available_parallelism);
crate::indexer::responsiveness::effective_worker_count(raw).max(1)
}
pub(crate) fn tantivy_writer_parallelism_hint() -> usize {
tantivy_writer_parallelism_hint_for_available_governed(
std::thread::available_parallelism()
.map(std::num::NonZeroUsize::get)
.unwrap_or(1),
)
}
fn tantivy_add_batch_max_messages() -> usize {
positive_usize_env(ENV_TANTIVY_ADD_BATCH_MAX_MESSAGES)
.unwrap_or_else(|| 4_096.max(tantivy_writer_parallelism_hint().saturating_mul(512)))
}
fn tantivy_add_batch_max_chars() -> usize {
positive_usize_env(ENV_TANTIVY_ADD_BATCH_MAX_CHARS).unwrap_or_else(|| {
(16 * 1024 * 1024).max(tantivy_writer_parallelism_hint().saturating_mul(2 * 1024 * 1024))
})
}
fn tantivy_prebuilt_add_batch_max_messages() -> usize {
positive_usize_env(ENV_TANTIVY_ADD_BATCH_MAX_MESSAGES)
.unwrap_or_else(|| 16_384.max(tantivy_writer_parallelism_hint().saturating_mul(512)))
}
fn map_fs_err(err: frankensearch::SearchError) -> Error {
Error::new(err)
}
#[derive(Clone)]
struct CassDocContext {
agent: String,
workspace: Option<String>,
workspace_original: Option<String>,
source_path: String,
title: Option<String>,
started_at_fallback: Option<i64>,
source_id: String,
origin_kind: String,
origin_host: Option<String>,
conversation_id: Option<i64>,
}
fn cass_doc_context(conv: &NormalizedConversation, conversation_id: Option<i64>) -> CassDocContext {
let cass_origin = conv.metadata.get("cass").and_then(|c| c.get("origin"));
let raw_source_id = cass_origin
.and_then(|o| o.get("source_id"))
.and_then(|v| v.as_str());
let raw_origin_kind = cass_origin
.and_then(|o| o.get("kind"))
.and_then(|v| v.as_str());
let origin_host = normalized_index_origin_host(
cass_origin
.and_then(|o| o.get("host"))
.and_then(|v| v.as_str()),
);
let source_id =
normalized_index_source_id(raw_source_id, raw_origin_kind, origin_host.as_deref());
let origin_kind = normalized_index_origin_kind(&source_id, raw_origin_kind);
CassDocContext {
agent: conv.agent_slug.clone(),
workspace: conv
.workspace
.as_ref()
.map(|ws| ws.to_string_lossy().to_string()),
workspace_original: conv
.metadata
.get("cass")
.and_then(|c| c.get("workspace_original"))
.and_then(|v| v.as_str())
.map(ToOwned::to_owned),
source_path: conv.source_path.to_string_lossy().to_string(),
title: conv.title.clone(),
started_at_fallback: conv.started_at,
source_id,
origin_kind,
origin_host,
conversation_id,
}
}
fn cass_document_for_message(
context: &CassDocContext,
msg: &NormalizedMessage,
) -> Option<FsCassDocument> {
if is_hard_message_noise(Some(msg.role.as_str()), &msg.content) {
return None;
}
Some(FsCassDocument {
agent: context.agent.clone(),
workspace: context.workspace.clone(),
workspace_original: context.workspace_original.clone(),
source_path: context.source_path.clone(),
msg_idx: msg.idx.max(0) as u64,
created_at: msg.created_at.or(context.started_at_fallback),
title: context.title.clone(),
content: msg.content.clone(),
conversation_id: context.conversation_id,
source_id: context.source_id.clone(),
origin_kind: context.origin_kind.clone(),
origin_host: context.origin_host.clone(),
})
}
fn push_cass_document_into_pending(
docs: &mut Vec<FsCassDocument>,
pending_chars: &mut usize,
doc: FsCassDocument,
) {
*pending_chars = pending_chars.saturating_add(doc.content.len());
docs.push(doc);
}
fn cass_doc_context_from_packet(packet: &ConversationPacket) -> CassDocContext {
let payload = &packet.payload;
let metadata = &payload.metadata_json;
let cass_origin = metadata.get("cass").and_then(|c| c.get("origin"));
let raw_source_id = cass_origin
.and_then(|o| o.get("source_id"))
.and_then(|v| v.as_str());
let raw_origin_kind = cass_origin
.and_then(|o| o.get("kind"))
.and_then(|v| v.as_str());
let origin_host = normalized_index_origin_host(
cass_origin
.and_then(|o| o.get("host"))
.and_then(|v| v.as_str()),
);
let source_id =
normalized_index_source_id(raw_source_id, raw_origin_kind, origin_host.as_deref());
let origin_kind = normalized_index_origin_kind(&source_id, raw_origin_kind);
CassDocContext {
agent: payload.identity.agent_slug.clone(),
workspace: payload.identity.workspace.clone(),
workspace_original: metadata
.get("cass")
.and_then(|c| c.get("workspace_original"))
.and_then(|v| v.as_str())
.map(ToOwned::to_owned),
source_path: payload.identity.source_path.clone(),
title: payload.identity.title.clone(),
started_at_fallback: payload.timestamps.started_at,
source_id,
origin_kind,
origin_host,
conversation_id: payload.identity.conversation_id,
}
}
fn cass_document_for_packet_message(
context: &CassDocContext,
msg: &ConversationPacketMessage,
) -> Option<FsCassDocument> {
if is_hard_message_noise(Some(msg.role.as_str()), &msg.content) {
return None;
}
Some(FsCassDocument {
agent: context.agent.clone(),
workspace: context.workspace.clone(),
workspace_original: context.workspace_original.clone(),
source_path: context.source_path.clone(),
msg_idx: msg.idx.max(0) as u64,
created_at: msg.created_at.or(context.started_at_fallback),
title: context.title.clone(),
content: msg.content.clone(),
conversation_id: context.conversation_id,
source_id: context.source_id.clone(),
origin_kind: context.origin_kind.clone(),
origin_host: context.origin_host.clone(),
})
}
pub fn schema_hash_matches(stored: &str) -> bool {
cass_schema_hash_matches(stored)
}
pub type Fields = FsCassFields;
pub type MergeStatus = FsCassMergeStatus;
const FEDERATED_SEARCH_MANIFEST_FILE: &str = "federated-search-manifest.json";
const FEDERATED_SEARCH_MANIFEST_VERSION: u32 = 1;
const FEDERATED_SEARCH_MANIFEST_KIND: &str = "cass-federated-lexical-index";
const EVIDENCE_BUNDLE_MANIFEST_TEMP_FILE: &str = "evidence-bundle-manifest.json.tmp";
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SearchableIndexSummary {
pub docs: usize,
pub segments: usize,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct FederatedSearchManifest {
version: u32,
kind: String,
schema_hash: String,
shards: Vec<FederatedSearchShardManifest>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct FederatedSearchShardManifest {
relative_path: String,
docs: usize,
segments: usize,
meta_fingerprint: String,
}
fn federated_search_manifest_path(index_path: &Path) -> PathBuf {
index_path.join(FEDERATED_SEARCH_MANIFEST_FILE)
}
fn write_root_schema_hash_file(index_path: &Path) -> Result<()> {
fs::write(
index_path.join("schema_hash.json"),
format!("{{\"schema_hash\":\"{CASS_SCHEMA_HASH}\"}}"),
)
.with_context(|| {
format!(
"writing cass schema hash metadata for searchable index {}",
index_path.display()
)
})?;
Ok(())
}
fn manifest_relative_shard_path(shard_idx: usize) -> String {
format!("shards/shard-{shard_idx:05}")
}
fn meta_fingerprint_for_existing_index_dir(index_path: &Path) -> Result<String> {
let meta_path = index_path.join("meta.json");
let bytes = fs::read(&meta_path)
.with_context(|| format!("reading Tantivy meta file {}", meta_path.display()))?;
Ok(blake3::hash(&bytes).to_hex().to_string())
}
fn validate_federated_shard_relative_path(relative_path: &str) -> Result<()> {
if relative_path.trim().is_empty() {
return Err(anyhow::anyhow!(
"federated lexical shard path must not be empty"
));
}
let path = Path::new(relative_path);
let mut components = path.components();
match components.next() {
Some(std::path::Component::Normal(component))
if component == std::ffi::OsStr::new("shards") => {}
_ => {
return Err(anyhow::anyhow!(
"federated lexical shard path must stay under shards/: {}",
relative_path
));
}
}
let mut has_child = false;
for component in components {
match component {
std::path::Component::Normal(_) => has_child = true,
_ => {
return Err(anyhow::anyhow!(
"federated lexical shard path must be a clean relative path: {}",
relative_path
));
}
}
}
if !has_child {
return Err(anyhow::anyhow!(
"federated lexical shard path must name a shard directory under shards/: {}",
relative_path
));
}
Ok(())
}
fn validate_federated_shard_meta_fingerprint(fingerprint: &str) -> Result<()> {
if fingerprint.len() != 64 || !fingerprint.bytes().all(|byte| byte.is_ascii_hexdigit()) {
return Err(anyhow::anyhow!(
"federated lexical shard meta fingerprint must be a 64-character hex BLAKE3 digest"
));
}
Ok(())
}
fn federated_search_manifest_summary(
index_path: &Path,
manifest: &FederatedSearchManifest,
) -> Result<SearchableIndexSummary> {
let mut docs = 0usize;
let mut segments = 0usize;
for shard in &manifest.shards {
docs = docs.checked_add(shard.docs).with_context(|| {
format!(
"federated search manifest doc count overflows platform usize: {}",
index_path.display()
)
})?;
segments = segments.checked_add(shard.segments).with_context(|| {
format!(
"federated search manifest segment count overflows platform usize: {}",
index_path.display()
)
})?;
}
Ok(SearchableIndexSummary { docs, segments })
}
fn validate_federated_search_manifest(
index_path: &Path,
manifest: &FederatedSearchManifest,
verify_shard_fingerprints: bool,
) -> Result<()> {
if manifest.version != FEDERATED_SEARCH_MANIFEST_VERSION {
return Err(anyhow::anyhow!(
"unsupported federated search manifest version: expected {}, got {}",
FEDERATED_SEARCH_MANIFEST_VERSION,
manifest.version
));
}
if manifest.kind != FEDERATED_SEARCH_MANIFEST_KIND {
return Err(anyhow::anyhow!(
"unexpected federated search manifest kind: expected {}, got {}",
FEDERATED_SEARCH_MANIFEST_KIND,
manifest.kind
));
}
if manifest.schema_hash != CASS_SCHEMA_HASH {
return Err(anyhow::anyhow!(
"federated search manifest schema mismatch: expected {}, got {}",
CASS_SCHEMA_HASH,
manifest.schema_hash
));
}
if manifest.shards.is_empty() {
return Err(anyhow::anyhow!(
"federated search manifest must contain at least one shard"
));
}
let mut seen_relative_paths = BTreeSet::new();
for shard in &manifest.shards {
validate_federated_shard_relative_path(&shard.relative_path)?;
validate_federated_shard_meta_fingerprint(&shard.meta_fingerprint)?;
if !seen_relative_paths.insert(shard.relative_path.clone()) {
return Err(anyhow::anyhow!(
"federated search manifest contains duplicate shard path: {}",
shard.relative_path
));
}
if verify_shard_fingerprints {
let shard_path = index_path.join(&shard.relative_path);
let actual = meta_fingerprint_for_existing_index_dir(&shard_path)?;
if actual != shard.meta_fingerprint {
return Err(anyhow::anyhow!(
"federated lexical shard fingerprint mismatch for {}: expected {}, got {}",
shard_path.display(),
shard.meta_fingerprint,
actual
));
}
}
}
federated_search_manifest_summary(index_path, manifest)?;
Ok(())
}
fn federated_evidence_chunk_role(relative_path: &str) -> EvidenceBundleChunkRole {
if relative_path == FEDERATED_SEARCH_MANIFEST_FILE {
EvidenceBundleChunkRole::Manifest
} else if relative_path == "schema_hash.json"
|| relative_path == "meta.json"
|| relative_path.ends_with("/meta.json")
{
EvidenceBundleChunkRole::Metadata
} else if relative_path.starts_with("shards/") {
EvidenceBundleChunkRole::LexicalShard
} else {
EvidenceBundleChunkRole::Other
}
}
fn standard_lexical_evidence_chunk_role(relative_path: &str) -> EvidenceBundleChunkRole {
if relative_path == "schema_hash.json" || relative_path == "meta.json" {
EvidenceBundleChunkRole::Metadata
} else {
EvidenceBundleChunkRole::LexicalShard
}
}
fn current_schema_hash_file_matches(index_path: &Path) -> Result<()> {
let schema_hash_path = index_path.join("schema_hash.json");
let content = fs::read_to_string(&schema_hash_path).with_context(|| {
format!(
"reading cass schema hash metadata for lexical artifact {}",
schema_hash_path.display()
)
})?;
let value: serde_json::Value = serde_json::from_str(&content).with_context(|| {
format!(
"parsing cass schema hash metadata for lexical artifact {}",
schema_hash_path.display()
)
})?;
let stored_hash = value
.get("schema_hash")
.and_then(|value| value.as_str())
.ok_or_else(|| {
anyhow::anyhow!(
"lexical artifact schema hash metadata is missing schema_hash: {}",
schema_hash_path.display()
)
})?;
if stored_hash != CASS_SCHEMA_HASH {
return Err(anyhow::anyhow!(
"lexical artifact schema mismatch: expected {}, got {}",
CASS_SCHEMA_HASH,
stored_hash
));
}
Ok(())
}
fn relative_artifact_path_string(relative_path: &Path) -> Result<String> {
let mut parts = Vec::new();
for component in relative_path.components() {
match component {
std::path::Component::Normal(part) => {
let part = part.to_str().ok_or_else(|| {
anyhow::anyhow!(
"lexical artifact path is not UTF-8: {}",
relative_path.display()
)
})?;
parts.push(part);
}
_ => {
return Err(anyhow::anyhow!(
"lexical artifact path contains an unsafe component: {}",
relative_path.display()
));
}
}
}
if parts.is_empty() {
return Err(anyhow::anyhow!("lexical artifact path must not be empty"));
}
Ok(parts.join("/"))
}
fn is_evidence_bundle_writer_file(relative_path: &str) -> bool {
relative_path == EVIDENCE_BUNDLE_MANIFEST_FILE
|| relative_path == EVIDENCE_BUNDLE_MANIFEST_TEMP_FILE
}
fn collect_federated_evidence_artifact_paths(
root: &Path,
current: &Path,
relative_paths: &mut Vec<String>,
) -> Result<()> {
let entries = fs::read_dir(current)
.with_context(|| format!("reading artifact dir {}", current.display()))?;
for entry in entries {
let entry =
entry.with_context(|| format!("reading artifact entry in {}", current.display()))?;
let path = entry.path();
let file_type = entry
.file_type()
.with_context(|| format!("reading artifact file type {}", path.display()))?;
if file_type.is_dir() {
collect_federated_evidence_artifact_paths(root, &path, relative_paths)?;
} else if file_type.is_file() {
let relative_path = path.strip_prefix(root).with_context(|| {
format!(
"computing relative artifact path for {} under {}",
path.display(),
root.display()
)
})?;
let relative_path = relative_artifact_path_string(relative_path)?;
if !is_evidence_bundle_writer_file(&relative_path) {
relative_paths.push(relative_path);
}
} else {
return Err(anyhow::anyhow!(
"lexical artifact contains unsupported non-file entry: {}",
path.display()
));
}
}
Ok(())
}
fn lexical_evidence_bundle_id(chunks: &[EvidenceBundleChunk]) -> Result<String> {
let mut hasher = blake3::Hasher::new();
hasher.update(b"cass-lexical-evidence-v1\n");
for chunk in chunks {
let bytes = serde_json::to_vec(chunk).context("serializing evidence bundle chunk")?;
hasher.update(&bytes);
hasher.update(b"\n");
}
Ok(format!("cass-lexical-{}", hasher.finalize().to_hex()))
}
fn lexical_search_evidence_bundle_manifest_with_roles(
index_path: &Path,
role_for_path: fn(&str) -> EvidenceBundleChunkRole,
) -> Result<EvidenceBundleManifest> {
let mut relative_paths = Vec::new();
collect_federated_evidence_artifact_paths(index_path, index_path, &mut relative_paths)?;
relative_paths.sort();
let chunks = relative_paths
.into_iter()
.map(|relative_path| {
let role = role_for_path(&relative_path);
EvidenceBundleChunk::from_file(index_path, relative_path, role, true, None)
})
.collect::<Result<Vec<_>>>()?;
let bundle_id = lexical_evidence_bundle_id(&chunks)?;
let mut evidence =
EvidenceBundleManifest::new(bundle_id, EvidenceBundleKind::LexicalGeneration, 0);
evidence.chunks = chunks;
Ok(evidence)
}
pub fn lexical_search_evidence_bundle_manifest(
index_path: &Path,
) -> Result<EvidenceBundleManifest> {
if let Some(manifest) = load_federated_search_manifest_internal(index_path)? {
validate_federated_search_manifest(index_path, &manifest, true)?;
return lexical_search_evidence_bundle_manifest_with_roles(
index_path,
federated_evidence_chunk_role,
);
}
current_schema_hash_file_matches(index_path)?;
searchable_index_summary(index_path)?.ok_or_else(|| {
anyhow::anyhow!(
"cannot build lexical evidence bundle because no searchable index exists in {}",
index_path.display()
)
})?;
lexical_search_evidence_bundle_manifest_with_roles(
index_path,
standard_lexical_evidence_chunk_role,
)
}
pub fn federated_search_evidence_bundle_manifest(
index_path: &Path,
) -> Result<EvidenceBundleManifest> {
let Some(manifest) = load_federated_search_manifest_internal(index_path)? else {
return Err(anyhow::anyhow!(
"cannot build federated lexical evidence bundle without {} in {}",
FEDERATED_SEARCH_MANIFEST_FILE,
index_path.display()
));
};
validate_federated_search_manifest(index_path, &manifest, true)?;
lexical_search_evidence_bundle_manifest_with_roles(index_path, federated_evidence_chunk_role)
}
pub fn write_federated_search_evidence_bundle_manifest(index_path: &Path) -> Result<PathBuf> {
let manifest = federated_search_evidence_bundle_manifest(index_path)?;
manifest.save(index_path)
}
pub fn write_lexical_search_evidence_bundle_manifest(index_path: &Path) -> Result<PathBuf> {
let manifest = lexical_search_evidence_bundle_manifest(index_path)?;
manifest.save(index_path)
}
fn load_federated_search_manifest_internal(
index_path: &Path,
) -> Result<Option<FederatedSearchManifest>> {
let manifest_path = federated_search_manifest_path(index_path);
match fs::read(&manifest_path) {
Ok(bytes) => {
let manifest =
serde_json::from_slice::<FederatedSearchManifest>(&bytes).with_context(|| {
format!(
"parsing federated search manifest {}",
manifest_path.display()
)
})?;
validate_federated_search_manifest(index_path, &manifest, false).with_context(
|| {
format!(
"validating federated search manifest {}",
manifest_path.display()
)
},
)?;
Ok(Some(manifest))
}
Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(None),
Err(err) => Err(err).with_context(|| {
format!(
"reading federated search manifest {}",
manifest_path.display()
)
}),
}
}
pub fn searchable_index_exists(index_path: &Path) -> bool {
index_path.join("meta.json").exists() || federated_search_manifest_path(index_path).exists()
}
pub fn validate_searchable_index_contract(index_path: &Path) -> Result<()> {
if let Some(manifest) = load_federated_search_manifest_internal(index_path)? {
validate_federated_search_manifest(index_path, &manifest, true)?;
for shard in manifest.shards {
let shard_path = index_path.join(&shard.relative_path);
fs_cass_open_search_reader(&shard_path, FsReloadPolicy::Manual)
.map_err(map_fs_err)
.with_context(|| {
format!(
"opening federated lexical shard reader {}",
shard_path.display()
)
})?;
}
return Ok(());
}
if !index_path.join("meta.json").exists() {
return Err(anyhow::anyhow!(
"standard lexical index metadata is missing in {}",
index_path.display()
));
}
current_schema_hash_file_matches(index_path)?;
fs_cass_open_search_reader(index_path, FsReloadPolicy::Manual)
.map_err(map_fs_err)
.with_context(|| {
format!(
"opening standard lexical index reader {}",
index_path.display()
)
})?;
Ok(())
}
pub fn searchable_index_modified_time(index_path: &Path) -> Option<SystemTime> {
let meta_path = index_path.join("meta.json");
if meta_path.exists() {
return fs::metadata(meta_path).and_then(|m| m.modified()).ok();
}
fs::metadata(federated_search_manifest_path(index_path))
.and_then(|m| m.modified())
.ok()
}
pub fn searchable_index_fingerprint(index_path: &Path) -> Result<Option<String>> {
let meta_path = index_path.join("meta.json");
match fs::read(&meta_path) {
Ok(bytes) => Ok(Some(blake3::hash(&bytes).to_hex().to_string())),
Err(err) if err.kind() == std::io::ErrorKind::NotFound => {
let manifest_path = federated_search_manifest_path(index_path);
match fs::read(&manifest_path) {
Ok(bytes) => Ok(Some(blake3::hash(&bytes).to_hex().to_string())),
Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(None),
Err(err) => Err(err).with_context(|| {
format!(
"reading federated search manifest {}",
manifest_path.display()
)
}),
}
}
Err(err) => {
Err(err).with_context(|| format!("reading Tantivy meta file {}", meta_path.display()))
}
}
}
pub fn searchable_index_summary(index_path: &Path) -> Result<Option<SearchableIndexSummary>> {
if let Some(manifest) = load_federated_search_manifest_internal(index_path)? {
return federated_search_manifest_summary(index_path, &manifest).map(Some);
}
let meta_path = index_path.join("meta.json");
if !meta_path.exists() {
return Ok(None);
}
if let Some(summary) = searchable_index_summary_from_tantivy_meta(index_path)? {
return Ok(Some(summary));
}
let mut index = Index::open_in_dir(index_path).with_context(|| {
format!(
"opening searchable Tantivy index directory for summary: {}",
index_path.display()
)
})?;
ensure_tokenizer(&mut index);
let segment_metas = index
.searchable_segment_metas()
.context("reading searchable segment metadata for Tantivy summary")?;
Ok(Some(SearchableIndexSummary {
docs: segment_metas
.iter()
.map(|segment| segment.num_docs() as usize)
.sum(),
segments: segment_metas.len(),
}))
}
fn searchable_index_summary_from_tantivy_meta(
index_path: &Path,
) -> Result<Option<SearchableIndexSummary>> {
let meta_path = index_path.join("meta.json");
let bytes = fs::read(&meta_path)
.with_context(|| format!("reading Tantivy meta file {}", meta_path.display()))?;
let meta: serde_json::Value = serde_json::from_slice(&bytes)
.with_context(|| format!("parsing Tantivy meta file {}", meta_path.display()))?;
let Some(segments) = meta.get("segments").and_then(|value| value.as_array()) else {
return Ok(None);
};
let mut docs = 0usize;
for segment in segments {
if segment
.get("deletes")
.is_some_and(|deletes| !deletes.is_null())
{
return Ok(None);
}
let Some(max_doc) = segment.get("max_doc").and_then(|value| value.as_u64()) else {
return Ok(None);
};
let max_doc = usize::try_from(max_doc).with_context(|| {
format!(
"Tantivy segment max_doc exceeds platform usize in {}",
meta_path.display()
)
})?;
docs = docs.checked_add(max_doc).with_context(|| {
format!(
"Tantivy segment doc count overflows platform usize in {}",
meta_path.display()
)
})?;
}
Ok(Some(SearchableIndexSummary {
docs,
segments: segments.len(),
}))
}
pub fn open_federated_search_readers(
index_path: &Path,
reload_policy: FsReloadPolicy,
) -> Result<Option<Vec<(IndexReader, Fields)>>> {
let Some(manifest) = load_federated_search_manifest_internal(index_path)? else {
return Ok(None);
};
validate_federated_search_manifest(index_path, &manifest, true)?;
let readers = manifest
.shards
.into_iter()
.map(|shard| {
let shard_path = index_path.join(&shard.relative_path);
fs_cass_open_search_reader(&shard_path, reload_policy)
.map_err(map_fs_err)
.with_context(|| {
format!(
"opening federated lexical shard reader {}",
shard_path.display()
)
})
})
.collect::<Result<Vec<_>>>()?;
Ok(Some(readers))
}
fn materialize_federated_search_bundle_for_write(index_path: &Path) -> Result<()> {
let Some(manifest) = load_federated_search_manifest_internal(index_path)? else {
return Ok(());
};
validate_federated_search_manifest(index_path, &manifest, true)?;
let stage_parent = index_path.parent().unwrap_or(index_path);
let materialize_root = tempfile::Builder::new()
.prefix("cass-federated-materialize-")
.tempdir_in(stage_parent)
.with_context(|| {
format!(
"creating staging directory to materialize federated lexical bundle {}",
index_path.display()
)
})?;
let materialized_index_path = materialize_root.path().join("index");
let shard_paths = manifest
.shards
.iter()
.map(|shard| index_path.join(&shard.relative_path))
.collect::<Vec<_>>();
TantivyIndex::assemble_compatible_index_directory_files(&materialized_index_path, &shard_paths)
.with_context(|| {
format!(
"materializing federated lexical bundle into mutable Tantivy index {}",
index_path.display()
)
})?;
if ensure_replaceable_federated_materialization_root(index_path)? {
fs::remove_dir_all(index_path).with_context(|| {
format!(
"removing federated lexical bundle before mutable materialization {}",
index_path.display()
)
})?;
}
fs::rename(&materialized_index_path, index_path).with_context(|| {
format!(
"publishing materialized mutable Tantivy index {} -> {}",
materialized_index_path.display(),
index_path.display()
)
})?;
materialize_root
.close()
.context("closing federated lexical materialization staging directory")?;
Ok(())
}
fn ensure_replaceable_federated_materialization_root(index_path: &Path) -> Result<bool> {
match fs::symlink_metadata(index_path) {
Ok(metadata) => {
let file_type = metadata.file_type();
if file_type.is_symlink() {
return Err(anyhow::anyhow!(
"refusing to materialize federated lexical bundle through symlink: {}",
index_path.display()
));
}
if !file_type.is_dir() {
return Err(anyhow::anyhow!(
"refusing to materialize federated lexical bundle because root is not a directory: {}",
index_path.display()
));
}
Ok(true)
}
Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(false),
Err(err) => Err(err).with_context(|| {
format!(
"checking federated lexical bundle root before materialization: {}",
index_path.display()
)
}),
}
}
pub fn publish_federated_searchable_index_directories<P: AsRef<Path>>(
output_path: &Path,
input_paths: &[P],
) -> Result<SearchableIndexSummary> {
if input_paths.is_empty() {
return Err(anyhow::anyhow!(
"cannot publish federated lexical bundle without at least one input shard"
));
}
let mut input_summaries = Vec::with_capacity(input_paths.len());
for input_path in input_paths {
let input_path = input_path.as_ref();
let summary = searchable_index_summary(input_path)?.ok_or_else(|| {
anyhow::anyhow!(
"federated lexical publish input is not a searchable index: {}",
input_path.display()
)
})?;
input_summaries.push((input_path.to_path_buf(), summary));
}
publish_federated_searchable_index_directories_with_summaries(output_path, &input_summaries)
}
pub fn publish_federated_searchable_index_directories_with_summaries(
output_path: &Path,
input_shards: &[(PathBuf, SearchableIndexSummary)],
) -> Result<SearchableIndexSummary> {
if input_shards.is_empty() {
return Err(anyhow::anyhow!(
"cannot publish federated lexical bundle without at least one input shard"
));
}
ensure_empty_merge_output_directory(output_path)?;
let shard_root = output_path.join("shards");
fs::create_dir_all(&shard_root).with_context(|| {
format!(
"creating federated lexical shard root {}",
shard_root.display()
)
})?;
let mut manifest = FederatedSearchManifest {
version: FEDERATED_SEARCH_MANIFEST_VERSION,
kind: FEDERATED_SEARCH_MANIFEST_KIND.to_string(),
schema_hash: CASS_SCHEMA_HASH.to_string(),
shards: Vec::with_capacity(input_shards.len()),
};
let mut total_docs = 0usize;
let mut total_segments = 0usize;
for (shard_idx, (input_path, summary)) in input_shards.iter().enumerate() {
if !searchable_index_exists(input_path) {
return Err(anyhow::anyhow!(
"federated lexical publish input is not a searchable index: {}",
input_path.display()
));
}
let meta_fingerprint = meta_fingerprint_for_existing_index_dir(input_path)?;
let relative_path = manifest_relative_shard_path(shard_idx);
let destination_path = output_path.join(&relative_path);
if let Some(parent) = destination_path.parent() {
fs::create_dir_all(parent).with_context(|| {
format!(
"creating parent directory for federated lexical shard {}",
destination_path.display()
)
})?;
}
fs::rename(input_path, &destination_path).with_context(|| {
format!(
"moving staged lexical shard {} into federated publish bundle {}",
input_path.display(),
destination_path.display()
)
})?;
total_docs = total_docs.checked_add(summary.docs).with_context(|| {
format!(
"federated lexical publish doc count overflows platform usize for {}",
output_path.display()
)
})?;
total_segments = total_segments
.checked_add(summary.segments)
.with_context(|| {
format!(
"federated lexical publish segment count overflows platform usize for {}",
output_path.display()
)
})?;
manifest.shards.push(FederatedSearchShardManifest {
relative_path,
docs: summary.docs,
segments: summary.segments,
meta_fingerprint,
});
}
let manifest_bytes =
serde_json::to_vec_pretty(&manifest).context("serializing federated search manifest")?;
fs::write(federated_search_manifest_path(output_path), &manifest_bytes).with_context(|| {
format!(
"writing federated search manifest {}",
federated_search_manifest_path(output_path).display()
)
})?;
write_root_schema_hash_file(output_path)?;
Ok(SearchableIndexSummary {
docs: total_docs,
segments: total_segments,
})
}
pub struct TantivyIndex {
inner: FsCassTantivyIndex,
pub fields: Fields,
}
impl TantivyIndex {
pub fn open_or_create(path: &Path) -> Result<Self> {
materialize_federated_search_bundle_for_write(path)?;
let inner = FsCassTantivyIndex::open_or_create_with_writer_parallelism(
path,
tantivy_writer_parallelism_hint(),
)
.map_err(map_fs_err)?;
let fields = inner.fields();
Ok(Self { inner, fields })
}
pub fn open_or_create_with_writer_parallelism(
path: &Path,
writer_parallelism: usize,
) -> Result<Self> {
materialize_federated_search_bundle_for_write(path)?;
let inner = FsCassTantivyIndex::open_or_create_with_writer_parallelism(
path,
writer_parallelism.max(1),
)
.map_err(map_fs_err)?;
let fields = inner.fields();
Ok(Self { inner, fields })
}
pub fn add_conversation(&mut self, conv: &NormalizedConversation) -> Result<()> {
let provenance = ConversationPacketProvenance::local();
let packet = ConversationPacket::from_normalized_conversation(conv, provenance);
self.add_messages_from_packet(&packet, None, None, |_| Ok(()))
}
pub fn add_conversation_with_id(
&mut self,
conv: &NormalizedConversation,
conversation_id: Option<i64>,
) -> Result<()> {
let provenance = ConversationPacketProvenance::local();
let mut packet = ConversationPacket::from_normalized_conversation(conv, provenance);
packet.payload.identity.conversation_id = conversation_id;
self.add_messages_from_packet(&packet, None, conversation_id, |_| Ok(()))
}
pub fn delete_all(&mut self) -> Result<()> {
self.inner.delete_all().map_err(map_fs_err)
}
pub fn commit(&mut self) -> Result<()> {
self.inner.commit().map_err(map_fs_err)
}
pub fn configure_bulk_load_merge_policy(&mut self) {
self.inner.configure_bulk_load_merge_policy();
}
pub fn reader(&self) -> Result<IndexReader> {
self.inner.reader().map_err(map_fs_err)
}
pub fn segment_count(&self) -> usize {
self.inner.segment_count()
}
pub fn merge_status(&self) -> MergeStatus {
self.inner.merge_status()
}
pub fn optimize_if_idle(&mut self) -> Result<bool> {
self.inner.optimize_if_idle().map_err(map_fs_err)
}
pub fn force_merge(&mut self) -> Result<()> {
self.inner.force_merge().map_err(map_fs_err)
}
pub fn merge_compatible_index_directories<P: AsRef<Path>>(
output_path: &Path,
input_paths: &[P],
) -> Result<Self> {
if input_paths.is_empty() {
return Err(anyhow::anyhow!(
"cannot merge Tantivy index directories without at least one input"
));
}
ensure_empty_merge_output_directory(output_path)?;
let indices = input_paths
.iter()
.map(|input_path| {
let input_path = input_path.as_ref();
let mut index = Index::open_in_dir(input_path).with_context(|| {
format!(
"opening compatible Tantivy index directory for merge: {}",
input_path.display()
)
})?;
ensure_tokenizer(&mut index);
Ok(index)
})
.collect::<Result<Vec<_>>>()?;
let output_directory = tantivy_crate::directory::MmapDirectory::open(output_path)
.with_context(|| {
format!(
"opening Tantivy output directory for merged index: {}",
output_path.display()
)
})?;
let mut merged = tantivy_crate::indexer::merge_indices(&indices, output_directory)
.with_context(|| {
format!(
"merging {} compatible Tantivy index directories into {}",
indices.len(),
output_path.display()
)
})?;
ensure_tokenizer(&mut merged);
fs::write(
output_path.join("schema_hash.json"),
format!("{{\"schema_hash\":\"{CASS_SCHEMA_HASH}\"}}"),
)
.with_context(|| {
format!(
"writing cass schema hash metadata for merged Tantivy index {}",
output_path.display()
)
})?;
drop(merged);
Self::open_or_create(output_path)
}
pub fn assemble_compatible_index_directories<P: AsRef<Path>>(
output_path: &Path,
input_paths: &[P],
) -> Result<Self> {
Self::assemble_compatible_index_directory_files(output_path, input_paths)?;
Self::open_or_create(output_path)
}
fn assemble_compatible_index_directory_files<P: AsRef<Path>>(
output_path: &Path,
input_paths: &[P],
) -> Result<()> {
if input_paths.is_empty() {
return Err(anyhow::anyhow!(
"cannot assemble Tantivy index directories without at least one input"
));
}
ensure_empty_merge_output_directory(output_path)?;
let mut combined_index_meta: Option<tantivy_crate::IndexMeta> = None;
let mut combined_segments = Vec::new();
let mut max_opstamp = 0u64;
let mut managed_paths = BTreeSet::new();
for input_path in input_paths {
let input_path = input_path.as_ref();
let mut index = Index::open_in_dir(input_path).with_context(|| {
format!(
"opening compatible Tantivy index directory for assembly: {}",
input_path.display()
)
})?;
ensure_tokenizer(&mut index);
let metas = index.load_metas().with_context(|| {
format!(
"loading Tantivy metadata for assembled index input {}",
input_path.display()
)
})?;
match &mut combined_index_meta {
Some(combined_meta) => {
if metas.schema != combined_meta.schema {
return Err(anyhow::anyhow!(
"attempted to assemble Tantivy index directories with different schemas"
));
}
if metas.index_settings != combined_meta.index_settings {
return Err(anyhow::anyhow!(
"attempted to assemble Tantivy index directories with different index settings"
));
}
}
None => {
combined_index_meta = Some(tantivy_crate::IndexMeta {
index_settings: metas.index_settings.clone(),
segments: Vec::new(),
schema: metas.schema.clone(),
opstamp: 0,
payload: None,
});
}
}
max_opstamp = max_opstamp.max(metas.opstamp);
for segment in metas.segments {
for relative_path in segment.list_files() {
let source_path = input_path.join(&relative_path);
if !source_path.exists() {
continue;
}
link_or_copy_searchable_index_file(&source_path, output_path, &relative_path)?;
if !managed_paths.insert(relative_path.clone()) {
return Err(anyhow::anyhow!(
"assembled Tantivy index would contain duplicate segment file path {}",
relative_path.display()
));
}
}
combined_segments.push(segment);
}
}
let mut combined_index_meta = combined_index_meta.ok_or_else(|| {
anyhow::anyhow!("cannot assemble Tantivy index directories without index metadata")
})?;
combined_index_meta.segments = combined_segments;
combined_index_meta.opstamp = max_opstamp;
combined_index_meta.payload = Some(format!(
"Cass assembled {} compatible Tantivy segments from {} input directories",
combined_index_meta.segments.len(),
input_paths.len()
));
write_searchable_generation_metadata(
output_path,
&combined_index_meta,
&mut managed_paths,
)?;
Ok(())
}
pub fn add_messages(
&mut self,
conv: &NormalizedConversation,
messages: &[NormalizedMessage],
) -> Result<()> {
self.add_messages_with_conversation_id(conv, messages, None)
}
pub fn add_messages_with_conversation_id(
&mut self,
conv: &NormalizedConversation,
messages: &[NormalizedMessage],
conversation_id: Option<i64>,
) -> Result<()> {
self.add_messages_with_conversation_id_and_batch_hook(
conv,
messages,
conversation_id,
|_| Ok(()),
)
}
pub fn add_messages_with_conversation_id_and_batch_hook<F>(
&mut self,
conv: &NormalizedConversation,
messages: &[NormalizedMessage],
conversation_id: Option<i64>,
mut on_batch_flushed: F,
) -> Result<()>
where
F: FnMut(usize) -> Result<()>,
{
let context = cass_doc_context(conv, conversation_id);
let max_messages = tantivy_add_batch_max_messages();
let max_chars = tantivy_add_batch_max_chars();
let mut docs: Vec<FsCassDocument> = Vec::new();
let mut pending_chars = 0usize;
for msg in messages {
let Some(doc) = cass_document_for_message(&context, msg) else {
continue;
};
push_cass_document_into_pending(&mut docs, &mut pending_chars, doc);
if docs.len() >= max_messages || pending_chars >= max_chars {
let flushed_docs = docs.len();
self.inner.add_cass_documents(&docs).map_err(map_fs_err)?;
on_batch_flushed(flushed_docs)?;
docs.clear();
pending_chars = 0;
}
}
if docs.is_empty() {
Ok(())
} else {
let flushed_docs = docs.len();
self.inner.add_cass_documents(&docs).map_err(map_fs_err)?;
on_batch_flushed(flushed_docs)
}
}
pub fn add_messages_from_packet<F>(
&mut self,
packet: &ConversationPacket,
message_indices: Option<&[usize]>,
conversation_id_override: Option<i64>,
mut on_batch_flushed: F,
) -> Result<()>
where
F: FnMut(usize) -> Result<()>,
{
let mut context = cass_doc_context_from_packet(packet);
if let Some(id) = conversation_id_override {
context.conversation_id = Some(id);
}
let max_messages = tantivy_add_batch_max_messages();
let max_chars = tantivy_add_batch_max_chars();
let mut docs: Vec<FsCassDocument> = Vec::new();
let mut pending_chars = 0usize;
let messages = &packet.payload.messages;
let total = messages.len();
let indices_owned;
let indices: &[usize] = match message_indices {
Some(slice) => slice,
None => {
indices_owned = (0..total).collect::<Vec<_>>();
&indices_owned
}
};
for &i in indices {
let Some(msg) = messages.get(i) else {
anyhow::bail!(
"packet message index {} out of range for packet with {} messages",
i,
total
);
};
let Some(doc) = cass_document_for_packet_message(&context, msg) else {
continue;
};
push_cass_document_into_pending(&mut docs, &mut pending_chars, doc);
if docs.len() >= max_messages || pending_chars >= max_chars {
let flushed_docs = docs.len();
self.inner.add_cass_documents(&docs).map_err(map_fs_err)?;
on_batch_flushed(flushed_docs)?;
docs.clear();
pending_chars = 0;
}
}
if docs.is_empty() {
Ok(())
} else {
let flushed_docs = docs.len();
self.inner.add_cass_documents(&docs).map_err(map_fs_err)?;
on_batch_flushed(flushed_docs)
}
}
pub fn add_prebuilt_documents_slice(&mut self, documents: &[FsCassDocument]) -> Result<usize> {
let max_messages = tantivy_prebuilt_add_batch_max_messages();
let max_chars = tantivy_add_batch_max_chars();
let mut indexed_docs = 0usize;
let mut batch_start = 0usize;
let mut pending_chars = 0usize;
for (idx, doc) in documents.iter().enumerate() {
pending_chars = pending_chars.saturating_add(doc.content.len());
let batch_len = idx + 1 - batch_start;
if batch_len >= max_messages || pending_chars >= max_chars {
let batch_end = idx + 1;
indexed_docs = indexed_docs.saturating_add(batch_end - batch_start);
self.inner
.add_cass_documents(&documents[batch_start..batch_end])
.map_err(map_fs_err)?;
batch_start = batch_end;
pending_chars = 0;
}
}
if batch_start < documents.len() {
indexed_docs = indexed_docs.saturating_add(documents.len() - batch_start);
self.inner
.add_cass_documents(&documents[batch_start..])
.map_err(map_fs_err)?;
}
Ok(indexed_docs)
}
pub fn add_prebuilt_document_refs_slice<'a>(
&mut self,
documents: &[FsCassDocumentRef<'a>],
) -> Result<usize> {
let max_messages = tantivy_prebuilt_add_batch_max_messages();
let max_chars = tantivy_add_batch_max_chars();
let mut indexed_docs = 0usize;
let mut batch_start = 0usize;
let mut pending_chars = 0usize;
for (idx, doc) in documents.iter().enumerate() {
pending_chars = pending_chars.saturating_add(doc.content.len());
let batch_len = idx + 1 - batch_start;
if batch_len >= max_messages || pending_chars >= max_chars {
let batch_end = idx + 1;
indexed_docs = indexed_docs.saturating_add(batch_end - batch_start);
self.inner
.add_cass_document_refs(&documents[batch_start..batch_end])
.map_err(map_fs_err)?;
batch_start = batch_end;
pending_chars = 0;
}
}
if batch_start < documents.len() {
indexed_docs = indexed_docs.saturating_add(documents.len() - batch_start);
self.inner
.add_cass_document_refs(&documents[batch_start..])
.map_err(map_fs_err)?;
}
Ok(indexed_docs)
}
pub fn add_prebuilt_documents<I>(&mut self, documents: I) -> Result<usize>
where
I: IntoIterator<Item = FsCassDocument>,
{
let docs = documents.into_iter().collect::<Vec<_>>();
self.add_prebuilt_documents_slice(&docs)
}
pub fn add_conversations_with_ids<'a, I>(&mut self, conversations: I) -> Result<usize>
where
I: IntoIterator<Item = (&'a NormalizedConversation, Option<i64>)>,
{
let max_messages = tantivy_add_batch_max_messages();
let max_chars = tantivy_add_batch_max_chars();
let mut docs: Vec<FsCassDocument> = Vec::new();
let mut pending_chars = 0usize;
let mut indexed_docs = 0usize;
for (conv, conversation_id) in conversations {
let context = cass_doc_context(conv, conversation_id);
for msg in &conv.messages {
let Some(doc) = cass_document_for_message(&context, msg) else {
continue;
};
push_cass_document_into_pending(&mut docs, &mut pending_chars, doc);
if docs.len() >= max_messages || pending_chars >= max_chars {
indexed_docs = indexed_docs.saturating_add(docs.len());
self.inner.add_cass_documents(&docs).map_err(map_fs_err)?;
docs.clear();
pending_chars = 0;
}
}
}
if !docs.is_empty() {
indexed_docs = indexed_docs.saturating_add(docs.len());
self.inner.add_cass_documents(&docs).map_err(map_fs_err)?;
}
Ok(indexed_docs)
}
}
pub fn build_schema() -> Schema {
fs_build_schema()
}
pub fn fields_from_schema(schema: &Schema) -> Result<Fields> {
fs_fields_from_schema(schema).map_err(map_fs_err)
}
pub fn expected_index_dir(base: &Path) -> std::path::PathBuf {
base.join("index").join(CASS_SCHEMA_VERSION)
}
pub fn index_dir(base: &Path) -> Result<std::path::PathBuf> {
fs_index_dir(base).map_err(map_fs_err)
}
pub fn ensure_tokenizer(index: &mut Index) {
fs_ensure_tokenizer(index);
}
fn ensure_empty_merge_output_directory(output_path: &Path) -> Result<()> {
match fs::metadata(output_path) {
Ok(metadata) => {
if !metadata.is_dir() {
return Err(anyhow::anyhow!(
"merged Tantivy output path is not a directory: {}",
output_path.display()
));
}
let mut entries = fs::read_dir(output_path).with_context(|| {
format!(
"reading merged Tantivy output directory before merge: {}",
output_path.display()
)
})?;
if entries.next().transpose()?.is_some() {
return Err(anyhow::anyhow!(
"merged Tantivy output directory must be empty before merge: {}",
output_path.display()
));
}
}
Err(err) if err.kind() == std::io::ErrorKind::NotFound => {
fs::create_dir_all(output_path).with_context(|| {
format!(
"creating merged Tantivy output directory before merge: {}",
output_path.display()
)
})?;
}
Err(err) => {
return Err(err).with_context(|| {
format!(
"checking merged Tantivy output directory before merge: {}",
output_path.display()
)
});
}
}
Ok(())
}
fn link_or_copy_searchable_index_file(
source_path: &Path,
output_path: &Path,
relative_path: &Path,
) -> Result<()> {
let destination_path = output_path.join(relative_path);
if destination_path.exists() {
return Err(anyhow::anyhow!(
"assembled Tantivy output path already exists: {}",
destination_path.display()
));
}
match fs::hard_link(source_path, &destination_path) {
Ok(()) => Ok(()),
Err(err)
if matches!(
err.kind(),
std::io::ErrorKind::PermissionDenied
| std::io::ErrorKind::CrossesDevices
| std::io::ErrorKind::Unsupported
) =>
{
fs::copy(source_path, &destination_path).with_context(|| {
format!(
"copying Tantivy segment file into assembled generation {} -> {}",
source_path.display(),
destination_path.display()
)
})?;
Ok(())
}
Err(err) => Err(err).with_context(|| {
format!(
"hard-linking Tantivy segment file into assembled generation {} -> {}",
source_path.display(),
destination_path.display()
)
}),
}
}
fn write_searchable_generation_metadata(
output_path: &Path,
index_meta: &tantivy_crate::IndexMeta,
managed_paths: &mut BTreeSet<std::path::PathBuf>,
) -> Result<()> {
let meta_path = output_path.join("meta.json");
fs::write(
&meta_path,
serde_json::to_vec_pretty(index_meta).context("serializing assembled Tantivy meta.json")?,
)
.with_context(|| {
format!(
"writing assembled Tantivy meta.json for {}",
output_path.display()
)
})?;
managed_paths.insert(std::path::PathBuf::from("meta.json"));
fs::write(
output_path.join(".managed.json"),
serde_json::to_vec(managed_paths).context("serializing assembled Tantivy managed paths")?,
)
.with_context(|| {
format!(
"writing assembled Tantivy managed file manifest for {}",
output_path.display()
)
})?;
fs::write(
output_path.join("schema_hash.json"),
format!("{{\"schema_hash\":\"{CASS_SCHEMA_HASH}\"}}"),
)
.with_context(|| {
format!(
"writing cass schema hash metadata for assembled Tantivy index {}",
output_path.display()
)
})?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use crate::connectors::{NormalizedConversation, NormalizedMessage};
use serde_json::Value;
use std::path::PathBuf;
use tempfile::TempDir;
#[test]
fn open_or_create_roundtrip() {
let dir = TempDir::new().expect("temp dir");
let idx = TantivyIndex::open_or_create(dir.path()).expect("create index");
let reader = idx.reader().expect("reader");
let searcher = reader.searcher();
assert_eq!(searcher.num_docs(), 0);
}
#[test]
fn schema_hash_matches_current_hash() {
assert!(schema_hash_matches(SCHEMA_HASH));
assert!(!schema_hash_matches("invalid"));
}
#[test]
fn default_tantivy_writer_cap_scales_with_memory() -> Result<(), &'static str> {
const GIB: u64 = 1024 * 1024 * 1024;
require_default_tantivy_writer_cap(None, 26, "unknown memory uses ceiling")?;
require_default_tantivy_writer_cap(Some(8 * GIB), 1, "8 GiB caps at one thread")?;
require_default_tantivy_writer_cap(Some(24 * GIB), 2, "24 GiB caps at two threads")?;
require_default_tantivy_writer_cap(Some(64 * GIB), 6, "64 GiB allows six threads")?;
require_default_tantivy_writer_cap(Some(512 * GIB), 26, "512 GiB reaches ceiling")?;
Ok(())
}
#[test]
fn default_tantivy_writer_cap_accounts_for_actual_concurrent_writers() {
const GIB: u64 = 1024 * 1024 * 1024;
assert_eq!(
default_tantivy_max_writer_threads_for_memory_bytes_and_concurrent_writers(
Some(24 * GIB),
8,
),
2
);
assert_eq!(
default_tantivy_max_writer_threads_for_memory_bytes_and_concurrent_writers(
Some(24 * GIB),
2,
),
9
);
assert_eq!(
default_tantivy_max_writer_threads_for_memory_bytes_and_concurrent_writers(
Some(24 * GIB),
0,
),
19
);
}
fn require_default_tantivy_writer_cap(
memory_bytes: Option<u64>,
expected: usize,
label: &'static str,
) -> Result<(), &'static str> {
if default_tantivy_max_writer_threads_for_memory_bytes(memory_bytes) == expected {
return Ok(());
}
Err(label)
}
#[test]
fn generate_edge_ngrams_prefixes() {
let out = frankensearch::lexical::cass_generate_edge_ngrams("hello world");
assert!(out.contains("he"));
assert!(out.contains("world"));
}
#[test]
fn build_preview_truncates_with_ellipsis() {
let preview = frankensearch::lexical::cass_build_preview("abcdefghijklmnopqrstuvwxyz", 10);
assert_eq!(preview, "abcdefghij…");
}
#[test]
fn merge_status_api_is_exposed() {
let dir = TempDir::new().expect("temp dir");
let index = TantivyIndex::open_or_create(dir.path()).expect("create");
let status = index.merge_status();
assert_eq!(status.merge_threshold, 4);
}
#[test]
fn searchable_index_summary_uses_meta_file_without_opening_index() {
let dir = TempDir::new().expect("temp dir");
fs::write(
dir.path().join("meta.json"),
serde_json::to_vec(&serde_json::json!({
"segments": [
{"segment_id": "a", "max_doc": 3, "deletes": null},
{"segment_id": "b", "max_doc": 5}
]
}))
.expect("serialize meta"),
)
.expect("write meta");
assert_eq!(
searchable_index_summary(dir.path())
.expect("summary")
.expect("index exists"),
SearchableIndexSummary {
docs: 8,
segments: 2
}
);
}
#[test]
fn searchable_index_summary_meta_fast_path_declines_deleted_segments() {
let dir = TempDir::new().expect("temp dir");
fs::write(
dir.path().join("meta.json"),
serde_json::to_vec(&serde_json::json!({
"segments": [
{"segment_id": "a", "max_doc": 3, "deletes": {"opstamp": 1}}
]
}))
.expect("serialize meta"),
)
.expect("write meta");
assert_eq!(
searchable_index_summary_from_tantivy_meta(dir.path()).expect("summary"),
None
);
}
#[test]
fn merge_status_should_merge_logic() {
let status = MergeStatus {
segment_count: 5,
last_merge_ts: 0,
ms_since_last_merge: -1,
merge_threshold: 4,
cooldown_ms: 300_000,
};
assert!(status.should_merge());
}
#[test]
fn index_dir_creates_versioned_path() {
let dir = TempDir::new().expect("temp dir");
let result = index_dir(dir.path()).expect("index dir");
assert!(result.ends_with("index/v8"));
}
#[test]
fn tokenizer_registration_is_callable() {
let dir = TempDir::new().expect("temp dir");
let mut idx = Index::create_in_ram(build_schema());
ensure_tokenizer(&mut idx);
let _ = TantivyIndex::open_or_create(dir.path()).expect("open or create");
}
#[test]
fn add_messages_batches_large_payloads_without_dropping_docs() {
let dir = TempDir::new().expect("temp dir");
let mut idx = TantivyIndex::open_or_create(dir.path()).expect("create index");
let content = "x".repeat(4096);
let messages: Vec<_> = (0..1_200)
.map(|i| NormalizedMessage {
idx: i,
role: "assistant".to_string(),
author: None,
created_at: Some(1_700_000_000_000 + i),
content: format!("{i}-{content}"),
extra: Value::Null,
snippets: Vec::new(),
invocations: Vec::new(),
})
.collect();
let conv = NormalizedConversation {
agent_slug: "codex".to_string(),
external_id: Some("large-batch".to_string()),
title: Some("Large Batch".to_string()),
workspace: Some(PathBuf::from("/tmp/workspace")),
source_path: PathBuf::from("/tmp/rollout.jsonl"),
started_at: Some(1_700_000_000_000),
ended_at: Some(1_700_000_000_999),
metadata: Value::Null,
messages,
};
idx.add_messages(&conv, &conv.messages)
.expect("add messages");
idx.commit().expect("commit");
let reader = idx.reader().expect("reader");
reader.reload().expect("reload");
let searcher = reader.searcher();
assert_eq!(searcher.num_docs(), conv.messages.len() as u64);
}
#[test]
fn add_conversations_with_ids_streams_large_payloads_without_dropping_docs() {
let dir = TempDir::new().expect("temp dir");
let mut idx = TantivyIndex::open_or_create(dir.path()).expect("create index");
let content = "y".repeat(2048);
let conversations: Vec<_> = (0..24)
.map(|conv_idx| {
let messages = (0..256)
.map(|msg_idx| NormalizedMessage {
idx: msg_idx,
role: "assistant".to_string(),
author: None,
created_at: Some(1_700_000_000_000 + (conv_idx * 1_000 + msg_idx)),
content: format!("conv-{conv_idx}-msg-{msg_idx}-{content}"),
extra: Value::Null,
snippets: Vec::new(),
invocations: Vec::new(),
})
.collect();
NormalizedConversation {
agent_slug: "codex".to_string(),
external_id: Some(format!("conv-{conv_idx}")),
title: Some(format!("Conversation {conv_idx}")),
workspace: Some(PathBuf::from("/tmp/workspace")),
source_path: PathBuf::from(format!("/tmp/rollout-{conv_idx}.jsonl")),
started_at: Some(1_700_000_000_000 + conv_idx),
ended_at: Some(1_700_000_000_999 + conv_idx),
metadata: Value::Null,
messages,
}
})
.collect();
let expected_docs: usize = conversations.iter().map(|conv| conv.messages.len()).sum();
let indexed_docs = idx
.add_conversations_with_ids(conversations.iter().map(|conv| (conv, Some(42))))
.expect("add conversations");
assert_eq!(indexed_docs, expected_docs);
idx.commit().expect("commit");
let reader = idx.reader().expect("reader");
reader.reload().expect("reload");
let searcher = reader.searcher();
assert_eq!(searcher.num_docs(), expected_docs as u64);
}
#[test]
fn normalized_index_source_id_infers_remote_from_origin_host_without_kind() {
let source_id = normalized_index_source_id(Some(" "), None, Some("dev@laptop"));
assert_eq!(source_id, "dev@laptop");
assert_eq!(normalized_index_origin_kind(&source_id, None), "remote");
}
#[test]
fn add_prebuilt_documents_streams_large_payloads_without_dropping_docs() {
let dir = TempDir::new().expect("temp dir");
let mut idx = TantivyIndex::open_or_create(dir.path()).expect("create index");
let content = "z".repeat(2048);
let docs: Vec<_> = (0..6_144)
.map(|msg_idx| FsCassDocument {
agent: "codex".to_string(),
workspace: Some("/tmp/workspace".to_string()),
workspace_original: None,
source_path: "/tmp/prebuilt-rollout.jsonl".to_string(),
msg_idx: msg_idx as u64,
created_at: Some(1_700_000_000_000 + msg_idx as i64),
title: Some("Prebuilt Batch".to_string()),
content: format!("prebuilt-msg-{msg_idx}-{content}"),
conversation_id: Some(7),
source_id: crate::sources::provenance::LOCAL_SOURCE_ID.to_string(),
origin_kind: crate::sources::provenance::LOCAL_SOURCE_ID.to_string(),
origin_host: None,
})
.collect();
let expected_docs = docs.len();
let indexed_docs = idx.add_prebuilt_documents(docs).expect("add prebuilt docs");
assert_eq!(indexed_docs, expected_docs);
idx.commit().expect("commit");
let reader = idx.reader().expect("reader");
reader.reload().expect("reload");
let searcher = reader.searcher();
assert_eq!(searcher.num_docs(), expected_docs as u64);
}
#[test]
fn merge_compatible_index_directories_roundtrips_docs_into_single_segment() {
let root = TempDir::new().expect("temp dir");
let shard_a = root.path().join("shard-a");
let shard_b = root.path().join("shard-b");
let merged = root.path().join("merged");
let mut shard_a_index = TantivyIndex::open_or_create(&shard_a).expect("create shard a");
let mut shard_b_index = TantivyIndex::open_or_create(&shard_b).expect("create shard b");
let make_conv = |external_id: &str, title: &str, content: &str| NormalizedConversation {
agent_slug: "codex".to_string(),
external_id: Some(external_id.to_string()),
title: Some(title.to_string()),
workspace: Some(PathBuf::from("/tmp/workspace")),
source_path: PathBuf::from(format!("/tmp/{external_id}.jsonl")),
started_at: Some(1_700_000_000_000),
ended_at: Some(1_700_000_000_100),
metadata: Value::Null,
messages: vec![
NormalizedMessage {
idx: 0,
role: "user".to_string(),
author: None,
created_at: Some(1_700_000_000_010),
content: format!("{content}-a"),
extra: Value::Null,
snippets: Vec::new(),
invocations: Vec::new(),
},
NormalizedMessage {
idx: 1,
role: "assistant".to_string(),
author: None,
created_at: Some(1_700_000_000_020),
content: format!("{content}-b"),
extra: Value::Null,
snippets: Vec::new(),
invocations: Vec::new(),
},
],
};
let conv_a = make_conv("merge-a", "Merge A", "alpha");
let conv_b = make_conv("merge-b", "Merge B", "beta");
shard_a_index
.add_conversation_with_id(&conv_a, Some(10))
.expect("index shard a");
shard_b_index
.add_conversation_with_id(&conv_b, Some(20))
.expect("index shard b");
shard_a_index.commit().expect("commit shard a");
shard_b_index.commit().expect("commit shard b");
drop(shard_a_index);
drop(shard_b_index);
let merged_index =
TantivyIndex::merge_compatible_index_directories(&merged, &[&shard_a, &shard_b])
.expect("merge shard indices");
assert_eq!(
merged_index.segment_count(),
1,
"merged shard indices should collapse into a single searchable segment"
);
let reader = merged_index.reader().expect("reader");
reader.reload().expect("reload");
assert_eq!(reader.searcher().num_docs(), 4);
}
#[test]
fn merge_compatible_index_directories_rejects_non_empty_output_directory() {
let root = TempDir::new().expect("temp dir");
let shard = root.path().join("shard");
let merged = root.path().join("merged");
fs::create_dir_all(&merged).expect("create merged dir");
fs::write(merged.join("sentinel.txt"), "occupied").expect("write sentinel");
let mut shard_index = TantivyIndex::open_or_create(&shard).expect("create shard");
let conv = NormalizedConversation {
agent_slug: "codex".to_string(),
external_id: Some("merge-occupied".to_string()),
title: Some("Occupied".to_string()),
workspace: Some(PathBuf::from("/tmp/workspace")),
source_path: PathBuf::from("/tmp/merge-occupied.jsonl"),
started_at: Some(1_700_000_000_000),
ended_at: Some(1_700_000_000_100),
metadata: Value::Null,
messages: vec![NormalizedMessage {
idx: 0,
role: "assistant".to_string(),
author: None,
created_at: Some(1_700_000_000_010),
content: "occupied".to_string(),
extra: Value::Null,
snippets: Vec::new(),
invocations: Vec::new(),
}],
};
shard_index
.add_conversation_with_id(&conv, Some(1))
.expect("index shard");
shard_index.commit().expect("commit shard");
drop(shard_index);
let error = match TantivyIndex::merge_compatible_index_directories(&merged, &[&shard]) {
Ok(_) => panic!("non-empty merge output dir should be rejected"),
Err(error) => error,
};
assert!(
format!("{error:#}").contains("must be empty"),
"unexpected error: {error:#}"
);
}
#[test]
fn assemble_compatible_index_directories_roundtrips_docs_into_multi_segment_generation() {
let root = TempDir::new().expect("temp dir");
let shard_a = root.path().join("shard-a");
let shard_b = root.path().join("shard-b");
let assembled = root.path().join("assembled");
let mut shard_a_index = TantivyIndex::open_or_create(&shard_a).expect("create shard a");
let mut shard_b_index = TantivyIndex::open_or_create(&shard_b).expect("create shard b");
let make_conv = |external_id: &str, title: &str, content: &str| NormalizedConversation {
agent_slug: "codex".to_string(),
external_id: Some(external_id.to_string()),
title: Some(title.to_string()),
workspace: Some(PathBuf::from("/tmp/workspace")),
source_path: PathBuf::from(format!("/tmp/{external_id}.jsonl")),
started_at: Some(1_700_000_001_000),
ended_at: Some(1_700_000_001_100),
metadata: Value::Null,
messages: vec![
NormalizedMessage {
idx: 0,
role: "user".to_string(),
author: None,
created_at: Some(1_700_000_001_010),
content: format!("{content}-a"),
extra: Value::Null,
snippets: Vec::new(),
invocations: Vec::new(),
},
NormalizedMessage {
idx: 1,
role: "assistant".to_string(),
author: None,
created_at: Some(1_700_000_001_020),
content: format!("{content}-b"),
extra: Value::Null,
snippets: Vec::new(),
invocations: Vec::new(),
},
],
};
let conv_a = make_conv("assemble-a", "Assemble A", "alpha");
let conv_b = make_conv("assemble-b", "Assemble B", "beta");
shard_a_index
.add_conversation_with_id(&conv_a, Some(10))
.expect("index shard a");
shard_b_index
.add_conversation_with_id(&conv_b, Some(20))
.expect("index shard b");
shard_a_index.commit().expect("commit shard a");
shard_b_index.commit().expect("commit shard b");
drop(shard_a_index);
drop(shard_b_index);
let assembled_index =
TantivyIndex::assemble_compatible_index_directories(&assembled, &[&shard_a, &shard_b])
.expect("assemble shard indices");
let reader = assembled_index.reader().expect("reader");
reader.reload().expect("reload");
assert_eq!(reader.searcher().num_docs(), 4);
assert_eq!(
assembled_index.segment_count(),
2,
"assembled shard indices should preserve one searchable segment per input artifact"
);
assert!(
assembled.join(".managed.json").exists(),
"assembled index generation should persist a Tantivy managed-file manifest"
);
}
#[test]
fn publish_federated_searchable_index_directories_writes_manifest_without_root_meta() {
let root = TempDir::new().expect("temp dir");
let shard_a = root.path().join("shard-a");
let shard_b = root.path().join("shard-b");
let published = root.path().join("published");
let mut shard_a_index = TantivyIndex::open_or_create(&shard_a).expect("create shard a");
let mut shard_b_index = TantivyIndex::open_or_create(&shard_b).expect("create shard b");
let make_conv = |external_id: &str, title: &str, content: &str| NormalizedConversation {
agent_slug: "codex".to_string(),
external_id: Some(external_id.to_string()),
title: Some(title.to_string()),
workspace: Some(PathBuf::from("/tmp/workspace")),
source_path: PathBuf::from(format!("/tmp/{external_id}.jsonl")),
started_at: Some(1_700_000_002_000),
ended_at: Some(1_700_000_002_100),
metadata: Value::Null,
messages: vec![
NormalizedMessage {
idx: 0,
role: "user".to_string(),
author: None,
created_at: Some(1_700_000_002_010),
content: format!("{content}-a"),
extra: Value::Null,
snippets: Vec::new(),
invocations: Vec::new(),
},
NormalizedMessage {
idx: 1,
role: "assistant".to_string(),
author: None,
created_at: Some(1_700_000_002_020),
content: format!("{content}-b"),
extra: Value::Null,
snippets: Vec::new(),
invocations: Vec::new(),
},
],
};
shard_a_index
.add_conversation_with_id(&make_conv("fed-a", "Fed A", "alpha"), Some(10))
.expect("index shard a");
shard_b_index
.add_conversation_with_id(&make_conv("fed-b", "Fed B", "beta"), Some(20))
.expect("index shard b");
shard_a_index.commit().expect("commit shard a");
shard_b_index.commit().expect("commit shard b");
drop(shard_a_index);
drop(shard_b_index);
let summary =
publish_federated_searchable_index_directories(&published, &[&shard_a, &shard_b])
.expect("publish federated bundle");
assert_eq!(summary.docs, 4);
assert_eq!(summary.segments, 2);
assert!(
!published.join("meta.json").exists(),
"federated publish root should not force a standard single-index meta.json"
);
assert!(
published.join(FEDERATED_SEARCH_MANIFEST_FILE).exists(),
"federated publish root should persist its manifest"
);
let manifest = load_federated_search_manifest_internal(&published)
.expect("load manifest")
.expect("manifest present");
assert_eq!(manifest.shards.len(), 2);
assert_eq!(
searchable_index_summary(&published)
.expect("summary")
.expect("searchable summary")
.docs,
4
);
}
fn write_federated_manifest_for_test(index_path: &Path, manifest: &FederatedSearchManifest) {
fs::write(
federated_search_manifest_path(index_path),
serde_json::to_vec_pretty(manifest).expect("serialize manifest"),
)
.expect("write manifest");
}
fn publish_test_federated_bundle(root: &Path) -> PathBuf {
let shard_a = root.join("shard-a");
let shard_b = root.join("shard-b");
let published = root.join("published");
let mut shard_a_index = TantivyIndex::open_or_create(&shard_a).expect("create shard a");
let mut shard_b_index = TantivyIndex::open_or_create(&shard_b).expect("create shard b");
let make_conv = |external_id: &str, content: &str| NormalizedConversation {
agent_slug: "codex".to_string(),
external_id: Some(external_id.to_string()),
title: Some(format!("Bundle {external_id}")),
workspace: Some(PathBuf::from("/tmp/workspace")),
source_path: PathBuf::from(format!("/tmp/{external_id}.jsonl")),
started_at: Some(1_700_000_002_000),
ended_at: Some(1_700_000_002_100),
metadata: Value::Null,
messages: vec![NormalizedMessage {
idx: 0,
role: "assistant".to_string(),
author: None,
created_at: Some(1_700_000_002_010),
content: content.to_string(),
extra: Value::Null,
snippets: Vec::new(),
invocations: Vec::new(),
}],
};
shard_a_index
.add_conversation_with_id(&make_conv("bundle-a", "alpha"), Some(10))
.expect("index shard a");
shard_b_index
.add_conversation_with_id(&make_conv("bundle-b", "beta"), Some(20))
.expect("index shard b");
shard_a_index.commit().expect("commit shard a");
shard_b_index.commit().expect("commit shard b");
drop(shard_a_index);
drop(shard_b_index);
publish_federated_searchable_index_directories(&published, &[&shard_a, &shard_b])
.expect("publish federated bundle");
published
}
#[test]
fn federated_manifest_validation_rejects_unsupported_remote_contracts() {
let root = TempDir::new().expect("temp dir");
let published = root.path().join("published");
fs::create_dir_all(&published).expect("create bundle root");
let base_manifest = FederatedSearchManifest {
version: FEDERATED_SEARCH_MANIFEST_VERSION,
kind: FEDERATED_SEARCH_MANIFEST_KIND.to_string(),
schema_hash: CASS_SCHEMA_HASH.to_string(),
shards: vec![FederatedSearchShardManifest {
relative_path: "shards/shard-00000".to_string(),
docs: 1,
segments: 1,
meta_fingerprint: "a".repeat(64),
}],
};
let mut manifest = base_manifest.clone();
manifest.version = FEDERATED_SEARCH_MANIFEST_VERSION + 1;
write_federated_manifest_for_test(&published, &manifest);
let error = load_federated_search_manifest_internal(&published).unwrap_err();
assert!(
format!("{error:#}").contains("unsupported federated search manifest version"),
"unexpected version error: {error:#}"
);
let mut manifest = base_manifest.clone();
manifest.kind = "cass-unknown-artifact".to_string();
write_federated_manifest_for_test(&published, &manifest);
let error = load_federated_search_manifest_internal(&published).unwrap_err();
assert!(
format!("{error:#}").contains("unexpected federated search manifest kind"),
"unexpected kind error: {error:#}"
);
let mut manifest = base_manifest;
manifest.shards[0].relative_path = "../escape".to_string();
write_federated_manifest_for_test(&published, &manifest);
let error = load_federated_search_manifest_internal(&published).unwrap_err();
assert!(
format!("{error:#}").contains("must stay under shards/"),
"unexpected path error: {error:#}"
);
}
#[test]
fn open_federated_search_readers_rejects_corrupt_shard_fingerprint() {
let root = TempDir::new().expect("temp dir");
let published = publish_test_federated_bundle(root.path());
let mut manifest = load_federated_search_manifest_internal(&published)
.expect("load manifest")
.expect("manifest present");
manifest.shards[0].meta_fingerprint = "0".repeat(64);
write_federated_manifest_for_test(&published, &manifest);
let error = match open_federated_search_readers(&published, FsReloadPolicy::Manual) {
Ok(_) => panic!("corrupt federated shard fingerprint should be rejected"),
Err(error) => error,
};
assert!(
format!("{error:#}").contains("federated lexical shard fingerprint mismatch"),
"unexpected fingerprint error: {error:#}"
);
}
fn write_minimal_federated_artifact(root: &Path, segment_bytes: &[u8]) {
let shard = root.join("shards/shard-00000");
fs::create_dir_all(&shard).expect("create shard");
fs::write(shard.join("meta.json"), br#"{"segments":[]}"#).expect("write shard meta");
fs::write(shard.join("segment.bin"), segment_bytes).expect("write shard segment");
write_root_schema_hash_file(root).expect("write schema hash");
let manifest = FederatedSearchManifest {
version: FEDERATED_SEARCH_MANIFEST_VERSION,
kind: FEDERATED_SEARCH_MANIFEST_KIND.to_string(),
schema_hash: CASS_SCHEMA_HASH.to_string(),
shards: vec![FederatedSearchShardManifest {
relative_path: "shards/shard-00000".to_string(),
docs: 0,
segments: 0,
meta_fingerprint: meta_fingerprint_for_existing_index_dir(&shard)
.expect("meta fingerprint"),
}],
};
write_federated_manifest_for_test(root, &manifest);
}
fn write_minimal_standard_lexical_artifact(root: &Path, segment_bytes: &[u8]) {
fs::create_dir_all(root).expect("create standard lexical root");
fs::write(root.join("meta.json"), br#"{"segments":[]}"#).expect("write root meta");
fs::write(root.join("segment.bin"), segment_bytes).expect("write segment");
write_root_schema_hash_file(root).expect("write schema hash");
}
#[test]
fn lexical_evidence_manifest_supports_standard_searchable_index() {
let root = TempDir::new().expect("temp dir");
write_minimal_standard_lexical_artifact(root.path(), b"standard segment bytes");
let manifest =
lexical_search_evidence_bundle_manifest(root.path()).expect("standard manifest");
assert!(manifest.verify(root.path()).is_complete());
assert!(manifest.chunks.iter().any(|chunk| {
chunk.path == "meta.json" && chunk.role == EvidenceBundleChunkRole::Metadata
}));
assert!(manifest.chunks.iter().any(|chunk| {
chunk.path == "segment.bin" && chunk.role == EvidenceBundleChunkRole::LexicalShard
}));
}
#[test]
fn lexical_evidence_manifest_excludes_writer_temp_file_before_save() {
let root = TempDir::new().expect("temp dir");
write_minimal_standard_lexical_artifact(root.path(), b"standard segment bytes");
fs::write(
root.path().join(EVIDENCE_BUNDLE_MANIFEST_TEMP_FILE),
b"leftover temp manifest bytes",
)
.expect("write stale evidence manifest temp file");
let manifest =
lexical_search_evidence_bundle_manifest(root.path()).expect("standard manifest");
assert!(
manifest
.chunks
.iter()
.all(|chunk| chunk.path != EVIDENCE_BUNDLE_MANIFEST_TEMP_FILE),
"writer temp file must not become part of the saved proof: {manifest:?}"
);
manifest.save(root.path()).expect("save evidence manifest");
let report = crate::evidence_bundle::verify_evidence_bundle_manifest_file(
root.path(),
&crate::evidence_bundle::EvidenceBundleManifest::path(root.path()),
);
assert!(report.is_complete(), "{report:?}");
}
#[test]
fn lexical_evidence_manifest_rejects_standard_schema_mismatch() {
let root = TempDir::new().expect("temp dir");
write_minimal_standard_lexical_artifact(root.path(), b"standard segment bytes");
fs::write(
root.path().join("schema_hash.json"),
r#"{"schema_hash":"stale"}"#,
)
.expect("write stale schema hash");
let error = lexical_search_evidence_bundle_manifest(root.path()).unwrap_err();
assert!(
format!("{error:#}").contains("lexical artifact schema mismatch"),
"unexpected schema error: {error:#}"
);
}
#[test]
fn federated_evidence_manifest_is_deterministic_and_detects_mutation() {
let left = TempDir::new().expect("left temp dir");
let right = TempDir::new().expect("right temp dir");
write_minimal_federated_artifact(left.path(), b"same segment bytes");
write_minimal_federated_artifact(right.path(), b"same segment bytes");
let left_manifest =
federated_search_evidence_bundle_manifest(left.path()).expect("left manifest");
let right_manifest =
federated_search_evidence_bundle_manifest(right.path()).expect("right manifest");
assert_eq!(
serde_json::to_value(&left_manifest).expect("left json"),
serde_json::to_value(&right_manifest).expect("right json"),
"byte-identical federated artifacts should produce byte-identical evidence manifests"
);
assert!(left_manifest.verify(left.path()).is_complete());
fs::write(
left.path().join("shards/shard-00000/segment.bin"),
b"SAME segment bytes",
)
.expect("mutate shard segment");
let report = left_manifest.verify(left.path());
assert!(report.is_unsafe(), "{report:?}");
assert!(
report.issues.iter().any(|issue| issue.kind
== crate::evidence_bundle::EvidenceBundleIssueKind::DigestMismatch),
"expected digest mismatch after segment mutation: {report:?}"
);
}
#[cfg(unix)]
#[test]
fn federated_evidence_manifest_rejects_symlink_artifacts() {
use std::os::unix::fs::symlink;
let root = TempDir::new().expect("temp dir");
write_minimal_federated_artifact(root.path(), b"segment bytes");
symlink(
"/tmp/not-a-bundle-file",
root.path().join("shards/shard-00000/link"),
)
.expect("create artifact symlink");
let error = federated_search_evidence_bundle_manifest(root.path()).unwrap_err();
assert!(
format!("{error:#}").contains("unsupported non-file entry"),
"unexpected symlink error: {error:#}"
);
}
#[test]
fn open_or_create_materializes_federated_bundle_back_into_mutable_index() {
let root = TempDir::new().expect("temp dir");
let shard_a = root.path().join("shard-a");
let shard_b = root.path().join("shard-b");
let published = root.path().join("published");
let mut shard_a_index = TantivyIndex::open_or_create(&shard_a).expect("create shard a");
let mut shard_b_index = TantivyIndex::open_or_create(&shard_b).expect("create shard b");
let make_conv = |external_id: &str, title: &str, content: &str| NormalizedConversation {
agent_slug: "codex".to_string(),
external_id: Some(external_id.to_string()),
title: Some(title.to_string()),
workspace: Some(PathBuf::from("/tmp/workspace")),
source_path: PathBuf::from(format!("/tmp/{external_id}.jsonl")),
started_at: Some(1_700_000_003_000),
ended_at: Some(1_700_000_003_100),
metadata: Value::Null,
messages: vec![
NormalizedMessage {
idx: 0,
role: "user".to_string(),
author: None,
created_at: Some(1_700_000_003_010),
content: format!("{content}-a"),
extra: Value::Null,
snippets: Vec::new(),
invocations: Vec::new(),
},
NormalizedMessage {
idx: 1,
role: "assistant".to_string(),
author: None,
created_at: Some(1_700_000_003_020),
content: format!("{content}-b"),
extra: Value::Null,
snippets: Vec::new(),
invocations: Vec::new(),
},
],
};
shard_a_index
.add_conversation_with_id(&make_conv("mat-a", "Mat A", "alpha"), Some(10))
.expect("index shard a");
shard_b_index
.add_conversation_with_id(&make_conv("mat-b", "Mat B", "beta"), Some(20))
.expect("index shard b");
shard_a_index.commit().expect("commit shard a");
shard_b_index.commit().expect("commit shard b");
drop(shard_a_index);
drop(shard_b_index);
publish_federated_searchable_index_directories(&published, &[&shard_a, &shard_b])
.expect("publish federated bundle");
assert!(
published.join(FEDERATED_SEARCH_MANIFEST_FILE).exists(),
"test fixture should start in federated bundle form"
);
let mutable_index =
TantivyIndex::open_or_create(&published).expect("materialize mutable index");
let reader = mutable_index.reader().expect("reader");
reader.reload().expect("reload");
assert_eq!(reader.searcher().num_docs(), 4);
assert!(
published.join("meta.json").exists(),
"writer open should materialize a standard writable Tantivy index"
);
assert!(
!published.join(FEDERATED_SEARCH_MANIFEST_FILE).exists(),
"materialization should replace the federated bundle manifest"
);
}
#[test]
fn federated_materialization_target_rejects_file_root() {
let root = TempDir::new().expect("temp dir");
let published = root.path().join("published");
fs::write(&published, "not a directory").expect("write file root");
let err = ensure_replaceable_federated_materialization_root(&published)
.expect_err("file roots must not be materialized over");
assert!(
err.to_string().contains("not a directory"),
"unexpected error: {err:#}"
);
assert_eq!(
fs::read_to_string(&published).expect("file root should remain"),
"not a directory"
);
}
#[test]
#[cfg(unix)]
fn federated_materialization_target_rejects_dangling_symlink_root() {
use std::os::unix::fs::symlink;
let root = TempDir::new().expect("temp dir");
let published = root.path().join("published");
let missing_target = root.path().join("missing-published");
symlink(&missing_target, &published).expect("create dangling symlink");
let err = ensure_replaceable_federated_materialization_root(&published)
.expect_err("symlink roots must not be materialized over");
assert!(
err.to_string().contains("through symlink"),
"unexpected error: {err:#}"
);
assert!(
fs::symlink_metadata(&published)
.expect("symlink root should remain")
.file_type()
.is_symlink()
);
assert!(!missing_target.exists());
}
#[test]
fn packet_driven_lexical_pipeline_matches_legacy_for_normalized_conv() {
use crate::model::conversation_packet::{ConversationPacket, ConversationPacketProvenance};
fn make_conv() -> NormalizedConversation {
NormalizedConversation {
agent_slug: "codex".to_string(),
external_id: Some("packet-equivalence".to_string()),
title: Some("Packet Equivalence".to_string()),
workspace: Some(PathBuf::from("/work/eq")),
source_path: PathBuf::from("/work/eq/.codex/session.jsonl"),
started_at: Some(1_700_000_000_000),
ended_at: Some(1_700_000_010_000),
metadata: serde_json::json!({
"cass": {
"origin": {
"source_id": "remote-host",
"kind": "ssh",
"host": "ws-42.example",
},
"workspace_original": "/Users/dev/eq",
},
"model": "gpt-5",
}),
messages: vec![
NormalizedMessage {
idx: 0,
role: "user".to_string(),
author: Some("human".to_string()),
created_at: Some(1_700_000_000_000),
content: "explain the packet pipeline".to_string(),
extra: serde_json::json!({"turn": 1}),
snippets: Vec::new(),
invocations: Vec::new(),
},
NormalizedMessage {
idx: 1,
role: "assistant".to_string(),
author: None,
created_at: Some(1_700_000_001_000),
content: "the pipeline normalizes once".to_string(),
extra: Value::Null,
snippets: Vec::new(),
invocations: Vec::new(),
},
NormalizedMessage {
idx: 2,
role: "tool".to_string(),
author: Some("ripgrep".to_string()),
created_at: Some(1_700_000_002_000),
content: "matches: 3".to_string(),
extra: Value::Null,
snippets: Vec::new(),
invocations: Vec::new(),
},
],
}
}
let legacy_dir = TempDir::new().expect("legacy temp dir");
let mut legacy_idx = TantivyIndex::open_or_create(legacy_dir.path()).expect("legacy idx");
let conv = make_conv();
legacy_idx
.add_messages_with_conversation_id(&conv, &conv.messages, Some(99))
.expect("legacy add");
legacy_idx.commit().expect("legacy commit");
let legacy_reader = legacy_idx.reader().expect("legacy reader");
legacy_reader.reload().expect("legacy reload");
let legacy_searcher = legacy_reader.searcher();
let legacy_count = legacy_searcher.num_docs();
let packet_dir = TempDir::new().expect("packet temp dir");
let mut packet_idx = TantivyIndex::open_or_create(packet_dir.path()).expect("packet idx");
let packet = ConversationPacket::from_normalized_conversation(
&conv,
ConversationPacketProvenance::local(),
);
packet_idx
.add_messages_from_packet(&packet, None, Some(99), |_| Ok(()))
.expect("packet add");
packet_idx.commit().expect("packet commit");
let packet_reader = packet_idx.reader().expect("packet reader");
packet_reader.reload().expect("packet reload");
let packet_searcher = packet_reader.searcher();
let packet_count = packet_searcher.num_docs();
assert_eq!(
legacy_count, packet_count,
"packet pipeline must emit the same number of docs as legacy: legacy={legacy_count} packet={packet_count}"
);
assert_eq!(
legacy_count,
conv.messages.len() as u64,
"all 3 fixture messages should land (none filter as hard noise)"
);
let legacy_context = cass_doc_context(&conv, Some(99));
let legacy_docs: Vec<FsCassDocument> = conv
.messages
.iter()
.filter_map(|m| cass_document_for_message(&legacy_context, m))
.collect();
let packet_context_owned = {
let mut ctx = cass_doc_context_from_packet(&packet);
ctx.conversation_id = Some(99);
ctx
};
let packet_docs: Vec<FsCassDocument> = packet
.payload
.messages
.iter()
.filter_map(|m| cass_document_for_packet_message(&packet_context_owned, m))
.collect();
assert_eq!(
legacy_docs.len(),
packet_docs.len(),
"packet doc list length should match legacy"
);
for (legacy_doc, packet_doc) in legacy_docs.iter().zip(packet_docs.iter()) {
assert_eq!(legacy_doc.agent, packet_doc.agent);
assert_eq!(legacy_doc.workspace, packet_doc.workspace);
assert_eq!(legacy_doc.workspace_original, packet_doc.workspace_original);
assert_eq!(legacy_doc.source_path, packet_doc.source_path);
assert_eq!(legacy_doc.msg_idx, packet_doc.msg_idx);
assert_eq!(legacy_doc.created_at, packet_doc.created_at);
assert_eq!(legacy_doc.title, packet_doc.title);
assert_eq!(legacy_doc.content, packet_doc.content);
assert_eq!(legacy_doc.conversation_id, packet_doc.conversation_id);
assert_eq!(
legacy_doc.source_id, packet_doc.source_id,
"source_id must match (remote-host normalization is the bead's tripwire)"
);
assert_eq!(legacy_doc.origin_kind, packet_doc.origin_kind);
assert_eq!(legacy_doc.origin_host, packet_doc.origin_host);
}
assert_eq!(
packet_docs[0].source_id, "remote-host",
"metadata.cass.origin.source_id must be the canonical value"
);
assert_eq!(
packet_docs[0].origin_host.as_deref(),
Some("ws-42.example"),
"metadata.cass.origin.host must surface as origin_host"
);
}
#[test]
fn add_conversation_with_id_packet_path_emits_expected_doc_count() {
fn fixture(id: i64) -> NormalizedConversation {
NormalizedConversation {
agent_slug: "codex".to_string(),
external_id: Some(format!("conv-{id}")),
title: Some(format!("Conv {id}")),
workspace: None,
source_path: PathBuf::from(format!("/tmp/conv-{id}.jsonl")),
started_at: Some(1_700_000_000_000 + id),
ended_at: Some(1_700_000_001_000 + id),
metadata: Value::Null,
messages: vec![
NormalizedMessage {
idx: 0,
role: "user".to_string(),
author: None,
created_at: Some(1_700_000_000_000 + id),
content: format!("hello-{id}"),
extra: Value::Null,
snippets: Vec::new(),
invocations: Vec::new(),
},
NormalizedMessage {
idx: 1,
role: "assistant".to_string(),
author: None,
created_at: Some(1_700_000_000_500 + id),
content: format!("response-{id}"),
extra: Value::Null,
snippets: Vec::new(),
invocations: Vec::new(),
},
],
}
}
let dir = TempDir::new().expect("temp dir");
let mut idx = TantivyIndex::open_or_create(dir.path()).expect("idx");
idx.add_conversation_with_id(&fixture(1), Some(101))
.expect("conv 1");
idx.add_conversation_with_id(&fixture(2), Some(102))
.expect("conv 2");
idx.commit().expect("commit");
let reader = idx.reader().expect("reader");
reader.reload().expect("reload");
assert_eq!(
reader.searcher().num_docs(),
4,
"two conversations × two messages each ⇒ four lexical docs"
);
}
}