use std::collections::HashMap;
use std::sync::Arc;
use crate::candidate_gate::AllowedSet;
use crate::filter_ir::{AuthScope, FilterIR};
use crate::filtered_vector_search::ScoredResult;
use crate::grep_executor::GrepMode;
use crate::namespace::NamespaceScope;
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum FusionMethod {
Rrf {
k: f32,
vector_weight: f32,
bm25_weight: f32,
},
Linear {
vector_weight: f32,
bm25_weight: f32,
},
Max,
Cascade { primary: Modality },
}
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum Modality {
Vector,
Bm25,
Grep,
}
impl Default for FusionMethod {
fn default() -> Self {
Self::Rrf {
k: 60.0,
vector_weight: 1.0,
bm25_weight: 1.0,
}
}
}
#[derive(Debug, Clone)]
pub struct FusionConfig {
pub method: FusionMethod,
pub candidates_per_modality: usize,
pub final_k: usize,
pub min_score: Option<f32>,
}
impl Default for FusionConfig {
fn default() -> Self {
Self {
method: FusionMethod::default(),
candidates_per_modality: 100,
final_k: 10,
min_score: None,
}
}
}
#[derive(Debug, Clone)]
pub struct UnifiedHybridQuery {
pub namespace: NamespaceScope,
pub vector_query: Option<VectorQuerySpec>,
pub bm25_query: Option<Bm25QuerySpec>,
pub grep_query: Option<GrepQuerySpec>,
pub filter: FilterIR,
pub fusion_config: FusionConfig,
}
#[derive(Debug, Clone)]
pub struct VectorQuerySpec {
pub embedding: Vec<f32>,
pub ef_search: usize,
}
#[derive(Debug, Clone)]
pub struct Bm25QuerySpec {
pub text: String,
pub fields: Vec<String>,
}
#[derive(Debug, Clone)]
pub struct GrepQuerySpec {
pub pattern: String,
pub mode: GrepMode,
pub weight: f32,
}
impl UnifiedHybridQuery {
pub fn new(namespace: NamespaceScope) -> Self {
Self {
namespace,
vector_query: None,
bm25_query: None,
grep_query: None,
filter: FilterIR::all(),
fusion_config: FusionConfig::default(),
}
}
pub fn with_vector(mut self, embedding: Vec<f32>) -> Self {
self.vector_query = Some(VectorQuerySpec {
embedding,
ef_search: 100,
});
self
}
pub fn with_bm25(mut self, text: impl Into<String>) -> Self {
self.bm25_query = Some(Bm25QuerySpec {
text: text.into(),
fields: vec!["content".to_string()],
});
self
}
pub fn with_grep(mut self, pattern: impl Into<String>, mode: GrepMode) -> Self {
self.grep_query = Some(GrepQuerySpec {
pattern: pattern.into(),
mode,
weight: 1.0,
});
self
}
pub fn with_grep_weighted(
mut self,
pattern: impl Into<String>,
mode: GrepMode,
weight: f32,
) -> Self {
self.grep_query = Some(GrepQuerySpec {
pattern: pattern.into(),
mode,
weight,
});
self
}
pub fn with_filter(mut self, filter: FilterIR) -> Self {
self.filter = filter;
self
}
pub fn with_fusion(mut self, config: FusionConfig) -> Self {
self.fusion_config = config;
self
}
pub fn effective_filter(&self) -> FilterIR {
self.namespace.to_filter_ir().and(self.filter.clone())
}
}
#[derive(Debug)]
pub struct FilteredCandidates {
pub modality: Modality,
pub results: Vec<ScoredResult>,
pub filtered: bool,
}
impl FilteredCandidates {
pub fn from_vector(results: Vec<ScoredResult>) -> Self {
Self {
modality: Modality::Vector,
results,
filtered: true,
}
}
pub fn from_bm25(results: Vec<ScoredResult>) -> Self {
Self {
modality: Modality::Bm25,
results,
filtered: true,
}
}
pub fn from_grep(results: Vec<ScoredResult>) -> Self {
Self {
modality: Modality::Grep,
results,
filtered: true,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub struct DocId(pub u64);
impl DocId {
#[inline]
pub const fn get(self) -> u64 {
self.0
}
}
impl From<u64> for DocId {
fn from(id: u64) -> Self {
DocId(id)
}
}
impl From<DocId> for u64 {
fn from(d: DocId) -> Self {
d.0
}
}
pub struct RankedList<'a> {
pub results: &'a [ScoredResult],
pub weight: f32,
}
pub struct WeightedLane {
pub candidates: FilteredCandidates,
pub weight: f32,
}
pub fn fuse_rrf_weighted(lists: &[RankedList<'_>], k: f32) -> HashMap<DocId, f32> {
let mut scores: HashMap<DocId, f32> = HashMap::new();
for list in lists {
for (rank, result) in list.results.iter().enumerate() {
let contribution = list.weight / (k + (rank as f32 + 1.0));
*scores.entry(DocId(result.doc_id)).or_insert(0.0) += contribution;
}
}
scores
}
pub struct FusionEngine {
config: FusionConfig,
}
impl FusionEngine {
pub fn new(config: FusionConfig) -> Self {
Self { config }
}
pub fn fuse(
&self,
vector_candidates: Option<FilteredCandidates>,
bm25_candidates: Option<FilteredCandidates>,
) -> FusionResult {
if let Some(ref vc) = vector_candidates {
debug_assert!(vc.filtered, "Vector candidates must be pre-filtered!");
}
if let Some(ref bc) = bm25_candidates {
debug_assert!(bc.filtered, "BM25 candidates must be pre-filtered!");
}
if let FusionMethod::Cascade { primary } = self.config.method {
return self.fuse_cascade(vector_candidates, bm25_candidates, primary);
}
let (vector_weight, bm25_weight) = self.method_weights();
let mut lanes: Vec<WeightedLane> = Vec::with_capacity(2);
if let Some(vc) = vector_candidates {
lanes.push(WeightedLane {
candidates: vc,
weight: vector_weight,
});
}
if let Some(bc) = bm25_candidates {
lanes.push(WeightedLane {
candidates: bc,
weight: bm25_weight,
});
}
self.fuse_multi(lanes)
}
pub(crate) fn method_weights(&self) -> (f32, f32) {
match self.config.method {
FusionMethod::Rrf {
vector_weight,
bm25_weight,
..
} => (vector_weight, bm25_weight),
FusionMethod::Linear {
vector_weight,
bm25_weight,
} => (vector_weight, bm25_weight),
FusionMethod::Max | FusionMethod::Cascade { .. } => (1.0, 1.0),
}
}
pub fn fuse_multi(&self, lanes: Vec<WeightedLane>) -> FusionResult {
for lane in &lanes {
debug_assert!(
lane.candidates.filtered,
"Fusion lanes must be pre-filtered!"
);
}
match self.config.method {
FusionMethod::Rrf { k, .. } => {
let ranked: Vec<RankedList<'_>> = lanes
.iter()
.map(|lane| RankedList {
results: &lane.candidates.results,
weight: lane.weight,
})
.collect();
let scores = fuse_rrf_weighted(&ranked, k)
.into_iter()
.map(|(doc, score)| (doc.0, score))
.collect();
self.collect_top_k(scores)
}
FusionMethod::Linear { .. } => {
let mut scores: HashMap<u64, f32> = HashMap::new();
for lane in &lanes {
for (doc_id, score) in self.normalize_scores(&lane.candidates.results) {
*scores.entry(doc_id).or_insert(0.0) += score * lane.weight;
}
}
self.collect_top_k(scores)
}
FusionMethod::Max => {
let mut scores: HashMap<u64, f32> = HashMap::new();
for lane in &lanes {
for (doc_id, score) in self.normalize_scores(&lane.candidates.results) {
let entry = scores.entry(doc_id).or_insert(0.0);
*entry = entry.max(score);
}
}
self.collect_top_k(scores)
}
FusionMethod::Cascade { primary } => {
let mut vector = None;
let mut bm25 = None;
for lane in lanes {
match lane.candidates.modality {
Modality::Vector => vector = Some(lane.candidates),
Modality::Bm25 => bm25 = Some(lane.candidates),
Modality::Grep => {}
}
}
self.fuse_cascade(vector, bm25, primary)
}
}
}
fn fuse_cascade(
&self,
vector: Option<FilteredCandidates>,
bm25: Option<FilteredCandidates>,
primary: Modality,
) -> FusionResult {
let (primary_candidates, secondary_candidates) = match primary {
Modality::Vector => (vector, bm25),
Modality::Bm25 => (bm25, vector),
Modality::Grep => (vector, bm25),
};
let primary_ids: std::collections::HashSet<u64> = primary_candidates
.as_ref()
.map(|c| c.results.iter().map(|r| r.doc_id).collect())
.unwrap_or_default();
let mut scores: HashMap<u64, f32> = HashMap::new();
if let Some(sc) = secondary_candidates {
for result in &sc.results {
if primary_ids.contains(&result.doc_id) {
scores.insert(result.doc_id, result.score);
}
}
}
if let Some(pc) = primary_candidates {
for (rank, result) in pc.results.iter().enumerate() {
scores.entry(result.doc_id).or_insert(-(rank as f32));
}
}
self.collect_top_k(scores)
}
fn normalize_scores(&self, results: &[ScoredResult]) -> Vec<(u64, f32)> {
if results.is_empty() {
return vec![];
}
let min = results
.iter()
.map(|r| r.score)
.fold(f32::INFINITY, f32::min);
let max = results
.iter()
.map(|r| r.score)
.fold(f32::NEG_INFINITY, f32::max);
let range = max - min;
if range == 0.0 {
return results.iter().map(|r| (r.doc_id, 1.0)).collect();
}
results
.iter()
.map(|r| (r.doc_id, (r.score - min) / range))
.collect()
}
fn collect_top_k(&self, scores: HashMap<u64, f32>) -> FusionResult {
let mut results: Vec<ScoredResult> = scores
.into_iter()
.map(|(doc_id, score)| ScoredResult::new(doc_id, score))
.collect();
results.sort_by(|a, b| {
b.score
.partial_cmp(&a.score)
.unwrap_or(std::cmp::Ordering::Equal)
});
if let Some(min) = self.config.min_score {
results.retain(|r| r.score >= min);
}
results.truncate(self.config.final_k);
FusionResult {
results,
method: self.config.method,
}
}
}
#[derive(Debug)]
pub struct FusionResult {
pub results: Vec<ScoredResult>,
pub method: FusionMethod,
}
pub trait VectorExecutor {
fn search(&self, query: &[f32], k: usize, allowed: &AllowedSet) -> Vec<ScoredResult>;
}
pub trait Bm25Executor {
fn search(&self, query: &str, k: usize, allowed: &AllowedSet) -> Vec<ScoredResult>;
}
pub trait GrepLaneExecutor {
fn grep(
&self,
pattern: &str,
k: usize,
allowed: &AllowedSet,
mode: GrepMode,
) -> Vec<ScoredResult>;
}
pub struct UnifiedHybridExecutor<V: VectorExecutor, B: Bm25Executor> {
vector_executor: Arc<V>,
bm25_executor: Arc<B>,
grep_executor: Option<Arc<dyn GrepLaneExecutor>>,
fusion_engine: FusionEngine,
}
impl<V: VectorExecutor, B: Bm25Executor> UnifiedHybridExecutor<V, B> {
pub fn new(
vector_executor: Arc<V>,
bm25_executor: Arc<B>,
fusion_config: FusionConfig,
) -> Self {
Self {
vector_executor,
bm25_executor,
grep_executor: None,
fusion_engine: FusionEngine::new(fusion_config),
}
}
pub fn with_grep_executor(mut self, grep_executor: Arc<dyn GrepLaneExecutor>) -> Self {
self.grep_executor = Some(grep_executor);
self
}
pub fn execute(
&self,
query: &UnifiedHybridQuery,
_auth_scope: &AuthScope,
allowed_set: &AllowedSet, ) -> FusionResult {
if allowed_set.is_empty() {
return FusionResult {
results: vec![],
method: self.fusion_engine.config.method,
};
}
let k = self.fusion_engine.config.candidates_per_modality;
let mut grep_rank: Option<FilteredCandidates> = None;
let mut grep_weight = 1.0_f32;
let mut gated: Option<AllowedSet> = None;
if let (Some(gq), Some(grep)) = (query.grep_query.as_ref(), self.grep_executor.as_ref()) {
match gq.mode {
GrepMode::Gate => {
let hits = grep.grep(&gq.pattern, 0, allowed_set, GrepMode::Gate);
gated = Some(AllowedSet::from_iter(hits.into_iter().map(|r| r.doc_id)));
}
GrepMode::Rank => {
let hits = grep.grep(&gq.pattern, k, allowed_set, GrepMode::Rank);
grep_rank = Some(FilteredCandidates::from_grep(hits));
grep_weight = gq.weight;
}
}
}
let effective_allowed: &AllowedSet = gated.as_ref().unwrap_or(allowed_set);
if effective_allowed.is_empty() {
return FusionResult {
results: vec![],
method: self.fusion_engine.config.method,
};
}
let vector_candidates = query.vector_query.as_ref().map(|vq| {
let results = self
.vector_executor
.search(&vq.embedding, k, effective_allowed);
FilteredCandidates::from_vector(results)
});
let bm25_candidates = query.bm25_query.as_ref().map(|bq| {
let results = self.bm25_executor.search(&bq.text, k, effective_allowed);
FilteredCandidates::from_bm25(results)
});
let (vector_weight, bm25_weight) = self.fusion_engine.method_weights();
let mut lanes: Vec<WeightedLane> = Vec::with_capacity(3);
if let Some(vc) = vector_candidates {
lanes.push(WeightedLane {
candidates: vc,
weight: vector_weight,
});
}
if let Some(bc) = bm25_candidates {
lanes.push(WeightedLane {
candidates: bc,
weight: bm25_weight,
});
}
if let Some(gc) = grep_rank {
lanes.push(WeightedLane {
candidates: gc,
weight: grep_weight,
});
}
self.fusion_engine.fuse_multi(lanes)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_rrf_fusion() {
let config = FusionConfig {
method: FusionMethod::Rrf {
k: 60.0,
vector_weight: 1.0,
bm25_weight: 1.0,
},
candidates_per_modality: 10,
final_k: 5,
min_score: None,
};
let engine = FusionEngine::new(config);
let vector = FilteredCandidates::from_vector(vec![
ScoredResult::new(1, 0.9),
ScoredResult::new(2, 0.8),
ScoredResult::new(3, 0.7),
]);
let bm25 = FilteredCandidates::from_bm25(vec![
ScoredResult::new(2, 5.0), ScoredResult::new(4, 4.0),
ScoredResult::new(1, 3.0), ]);
let result = engine.fuse(Some(vector), Some(bm25));
assert!(!result.results.is_empty());
let top_ids: Vec<u64> = result.results.iter().map(|r| r.doc_id).collect();
assert!(top_ids.contains(&1));
assert!(top_ids.contains(&2));
}
#[test]
fn test_fuse_rrf_weighted_is_1_indexed_and_weighted() {
let k = 60.0_f32;
let docs = [ScoredResult::new(7, 0.9), ScoredResult::new(8, 0.5)];
let scores = fuse_rrf_weighted(
&[RankedList {
results: &docs,
weight: 2.0,
}],
k,
);
let s7 = scores[&DocId(7)];
let s8 = scores[&DocId(8)];
assert!(
(s7 - 2.0 / (k + 1.0)).abs() < 1e-6,
"rank-1 must use 1-indexed weighted score"
);
assert!(
(s8 - 2.0 / (k + 2.0)).abs() < 1e-6,
"rank-2 must use 1-indexed weighted score"
);
assert!(s7 > s8, "earlier rank must score higher");
let list_a = [ScoredResult::new(1, 0.0)];
let list_b = [ScoredResult::new(1, 0.0)];
let merged = fuse_rrf_weighted(
&[
RankedList {
results: &list_a,
weight: 1.0,
},
RankedList {
results: &list_b,
weight: 3.0,
},
],
k,
);
let expected = 1.0 / (k + 1.0) + 3.0 / (k + 1.0);
assert!(
(merged[&DocId(1)] - expected).abs() < 1e-6,
"weights must sum across lists"
);
}
#[test]
fn test_linear_fusion() {
let config = FusionConfig {
method: FusionMethod::Linear {
vector_weight: 0.6,
bm25_weight: 0.4,
},
candidates_per_modality: 10,
final_k: 5,
min_score: None,
};
let engine = FusionEngine::new(config);
let vector = FilteredCandidates::from_vector(vec![
ScoredResult::new(1, 1.0),
ScoredResult::new(2, 0.5),
]);
let bm25 = FilteredCandidates::from_bm25(vec![
ScoredResult::new(2, 10.0), ScoredResult::new(3, 5.0),
]);
let result = engine.fuse(Some(vector), Some(bm25));
assert!(!result.results.is_empty());
}
#[test]
fn test_empty_allowed_set() {
let config = FusionConfig::default();
let engine = FusionEngine::new(config);
let result = engine.fuse(None, None);
assert!(result.results.is_empty());
}
#[test]
fn test_score_normalization() {
let config = FusionConfig::default();
let engine = FusionEngine::new(config);
let results = vec![
ScoredResult::new(1, 100.0),
ScoredResult::new(2, 50.0),
ScoredResult::new(3, 0.0),
];
let normalized = engine.normalize_scores(&results);
assert_eq!(normalized.len(), 3);
let scores: HashMap<u64, f32> = normalized.into_iter().collect();
assert!((scores[&1] - 1.0).abs() < 0.001);
assert!((scores[&2] - 0.5).abs() < 0.001);
assert!((scores[&3] - 0.0).abs() < 0.001);
}
#[test]
fn test_no_post_filter_invariant() {
let allowed: std::collections::HashSet<u64> = [1, 2, 3, 5, 8].into_iter().collect();
let allowed_set = AllowedSet::from_iter(allowed.iter().copied());
let vector = FilteredCandidates::from_vector(vec![
ScoredResult::new(1, 0.9), ScoredResult::new(2, 0.8), ScoredResult::new(5, 0.7), ]);
let bm25 = FilteredCandidates::from_bm25(vec![
ScoredResult::new(2, 5.0), ScoredResult::new(3, 4.0), ScoredResult::new(8, 3.0), ]);
let config = FusionConfig::default();
let engine = FusionEngine::new(config);
let result = engine.fuse(Some(vector), Some(bm25));
for doc in &result.results {
assert!(
allowed_set.contains(doc.doc_id),
"INVARIANT VIOLATION: doc_id {} not in allowed set",
doc.doc_id
);
}
}
use crate::grep_executor::GrepMode;
use crate::namespace::Namespace;
use crate::trigram_index::TrigramIndex;
struct MockVector(Vec<ScoredResult>);
impl VectorExecutor for MockVector {
fn search(&self, _q: &[f32], k: usize, allowed: &AllowedSet) -> Vec<ScoredResult> {
self.0
.iter()
.filter(|r| allowed.contains(r.doc_id))
.take(k)
.cloned()
.collect()
}
}
struct MockBm25(Vec<ScoredResult>);
impl Bm25Executor for MockBm25 {
fn search(&self, _q: &str, k: usize, allowed: &AllowedSet) -> Vec<ScoredResult> {
self.0
.iter()
.filter(|r| allowed.contains(r.doc_id))
.take(k)
.cloned()
.collect()
}
}
struct RealGrep {
index: TrigramIndex,
}
impl GrepLaneExecutor for RealGrep {
fn grep(
&self,
pattern: &str,
k: usize,
allowed: &AllowedSet,
mode: GrepMode,
) -> Vec<ScoredResult> {
let exec = crate::grep_executor::GrepExecutor::new(&self.index);
match exec.search(pattern, allowed, k, mode) {
Ok(results) => results
.hits
.into_iter()
.map(|h| ScoredResult::new(h.doc_id, h.score))
.collect(),
Err(_) => Vec::new(),
}
}
}
fn test_query() -> UnifiedHybridQuery {
UnifiedHybridQuery::new(NamespaceScope::single(Namespace::new("test").unwrap()))
}
fn grep_index() -> TrigramIndex {
let mut idx = TrigramIndex::new();
idx.insert(1, "fn alpha() { compute_idf() }");
idx.insert(2, "fn beta() { unrelated helper }");
idx.insert(3, "fn gamma() { compute_idf() twice compute_idf() }");
idx.insert(4, "struct Config { compute_idf: bool }");
idx
}
#[test]
fn test_three_lane_rank_fusion_respects_allowed_set() {
let vector = MockVector(vec![
ScoredResult::new(2, 0.9),
ScoredResult::new(1, 0.8),
ScoredResult::new(4, 0.2),
]);
let bm25 = MockBm25(vec![ScoredResult::new(2, 5.0), ScoredResult::new(1, 3.0)]);
let grep = RealGrep {
index: grep_index(),
};
let allowed = AllowedSet::from_iter([1, 2, 3, 4]);
let executor =
UnifiedHybridExecutor::new(Arc::new(vector), Arc::new(bm25), FusionConfig::default())
.with_grep_executor(Arc::new(grep));
let query = test_query()
.with_vector(vec![0.0; 4])
.with_bm25("anything")
.with_grep("compute_idf", GrepMode::Rank);
let result = executor.execute(&query, &AuthScope::for_namespace("test"), &allowed);
assert!(!result.results.is_empty());
for r in &result.results {
assert!(
allowed.contains(r.doc_id),
"result {} escaped allowed set",
r.doc_id
);
}
let ids: Vec<u64> = result.results.iter().map(|r| r.doc_id).collect();
assert!(
ids.contains(&3),
"grep-only doc 3 should appear via the third lane, got {ids:?}"
);
}
#[test]
fn test_grep_gate_narrows_before_other_lanes() {
let vector = MockVector(vec![
ScoredResult::new(2, 0.9),
ScoredResult::new(1, 0.8),
ScoredResult::new(4, 0.7),
ScoredResult::new(3, 0.6),
]);
let bm25 = MockBm25(vec![ScoredResult::new(2, 5.0)]);
let grep = RealGrep {
index: grep_index(),
};
let allowed = AllowedSet::from_iter([1, 2, 3, 4]);
let executor =
UnifiedHybridExecutor::new(Arc::new(vector), Arc::new(bm25), FusionConfig::default())
.with_grep_executor(Arc::new(grep));
let query = test_query()
.with_vector(vec![0.0; 4])
.with_bm25("anything")
.with_grep("compute_idf", GrepMode::Gate);
let result = executor.execute(&query, &AuthScope::for_namespace("test"), &allowed);
assert!(!result.results.is_empty());
let gate: std::collections::HashSet<u64> = [1, 3, 4].into_iter().collect();
for r in &result.results {
assert!(
gate.contains(&r.doc_id),
"doc {} not in grep gate {{1,3,4}}",
r.doc_id
);
}
assert!(
!result.results.iter().any(|r| r.doc_id == 2),
"doc 2 (no compute_idf) must be gated out"
);
}
#[test]
fn test_grep_query_ignored_without_grep_executor() {
let vector = MockVector(vec![ScoredResult::new(1, 0.9)]);
let bm25 = MockBm25(vec![ScoredResult::new(2, 5.0)]);
let allowed = AllowedSet::from_iter([1, 2, 3, 4]);
let executor =
UnifiedHybridExecutor::new(Arc::new(vector), Arc::new(bm25), FusionConfig::default());
let query = test_query()
.with_vector(vec![0.0; 4])
.with_bm25("anything")
.with_grep("compute_idf", GrepMode::Gate);
let result = executor.execute(&query, &AuthScope::for_namespace("test"), &allowed);
let ids: Vec<u64> = result.results.iter().map(|r| r.doc_id).collect();
assert!(
ids.contains(&1) && ids.contains(&2),
"without a grep executor both lanes survive, got {ids:?}"
);
}
}
pub fn verify_no_post_filter_invariant(
result: &FusionResult,
allowed_set: &AllowedSet,
) -> InvariantVerification {
let mut violations = Vec::new();
for doc in &result.results {
if !allowed_set.contains(doc.doc_id) {
violations.push(doc.doc_id);
}
}
if violations.is_empty() {
InvariantVerification::Valid
} else {
InvariantVerification::Violated {
doc_ids: violations,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum InvariantVerification {
Valid,
Violated { doc_ids: Vec<u64> },
}
impl InvariantVerification {
pub fn is_valid(&self) -> bool {
matches!(self, Self::Valid)
}
pub fn assert_valid(&self) {
match self {
Self::Valid => {}
Self::Violated { doc_ids } => {
panic!(
"NO-POST-FILTER INVARIANT VIOLATED: {} docs not in allowed set: {:?}",
doc_ids.len(),
doc_ids
);
}
}
}
}