use super::visibility::{extend_visible_entries, VisibilitySnapshot};
use super::{IndexEntry, QueryHit, StoreIndex};
use crate::coordinate::{KindFilter, Region};
impl StoreIndex {
pub(crate) fn get_by_id(&self, event_id: u128) -> Option<IndexEntry> {
let _read_guard = self.swap_gate.read();
let visibility = self.sequence.snapshot();
self.by_id
.get(&event_id)
.map(|r| r.value().as_ref().clone())
.filter(|e| visibility.is_visible(e.global_sequence))
}
pub(crate) fn upgrade_hit(&self, hit: QueryHit) -> Option<IndexEntry> {
let visibility = self.sequence.snapshot();
self.upgrade_hit_with_visibility(hit, &visibility)
}
pub(crate) fn upgrade_hit_with_visible_upper_bound(
&self,
hit: QueryHit,
visible_upper_bound: u64,
) -> Option<IndexEntry> {
if hit.global_sequence >= visible_upper_bound {
return None;
}
let upgraded = self
.by_id
.get(&hit.event_id)
.map(|entry| entry.value().as_ref().clone());
upgraded.filter(|entry| entry.global_sequence < visible_upper_bound)
}
fn upgrade_hit_with_visibility(
&self,
hit: QueryHit,
visibility: &VisibilitySnapshot,
) -> Option<IndexEntry> {
let upgraded = self
.by_id
.get(&hit.event_id)
.map(|entry| entry.value().as_ref().clone());
if upgraded.is_none() {
tracing::error!(
target: "batpak::index",
event_id = hit.event_id,
global_sequence = hit.global_sequence,
"dropping query hit with no backing by_id entry"
);
}
upgraded.filter(|entry| visibility.is_visible(entry.global_sequence))
}
pub(crate) fn query_hits(&self, region: &Region) -> Vec<QueryHit> {
let _read_guard = self.swap_gate.read();
let visibility = self.sequence.snapshot();
self.query_hits_with_visibility(region, &visibility)
}
pub(crate) fn query_hits_with_visible_upper_bound(
&self,
region: &Region,
) -> (Vec<QueryHit>, u64) {
let _read_guard = self.swap_gate.read();
let visibility = self.sequence.snapshot();
let visible_upper_bound = visibility.visible_upper_bound();
let hits = self.query_hits_with_visibility(region, &visibility);
(hits, visible_upper_bound)
}
fn query_hits_with_visibility(
&self,
region: &Region,
visibility: &VisibilitySnapshot,
) -> Vec<QueryHit> {
let mut hits: Vec<QueryHit> = if region.entity_prefix.is_some() {
let mut candidates = Vec::new();
for stream in self
.streams
.iter()
.filter(|r| region.matches_entity(r.key().as_ref()))
{
for entry in stream.value().values() {
candidates.push(QueryHit::from_entry(entry));
}
}
candidates
} else if let Some(ref scope) = region.scope {
let scan_hits = self.scan.query_hits_by_scope(scope.as_ref());
if !scan_hits.is_empty() {
scan_hits
} else if let Some(entities) = self.scan.scope_entity_set(scope.as_ref()) {
let mut candidates = Vec::new();
for entity in &entities {
if let Some(stream) = self.streams.get(entity.as_ref()) {
for entry in stream.value().values() {
candidates.push(QueryHit::from_entry(entry));
}
}
}
candidates
} else {
Vec::new()
}
} else if let Some(ref fact) = region.fact {
match fact {
KindFilter::Exact(k) => self.scan.query_hits_by_kind(*k),
KindFilter::Category(c) => self.scan.query_hits_by_category(*c),
KindFilter::Any => {
let mut candidates = Vec::new();
for stream in self.streams.iter() {
for entry in stream.value().values() {
candidates.push(QueryHit::from_entry(entry));
}
}
candidates
}
}
} else {
let mut candidates = Vec::new();
for stream in self.streams.iter() {
for entry in stream.value().values() {
candidates.push(QueryHit::from_entry(entry));
}
}
candidates
};
self.filter_region_hits(&mut hits, region, visibility);
hits.sort_by_key(|h| h.global_sequence);
hits
}
fn filter_region_hits(
&self,
hits: &mut Vec<QueryHit>,
region: &Region,
visibility: &VisibilitySnapshot,
) {
let requested_scope = region.scope.as_deref();
let fact_filter = region.fact.as_ref();
let clock_range = region.clock_range;
hits.retain(|h| {
if !visibility.is_visible(h.global_sequence) {
return false;
}
if let Some(scope) = requested_scope {
match self.by_id.get(&h.event_id) {
Some(entry) => {
if entry.value().coord.scope() != scope {
return false;
}
}
None => return false,
}
}
if let Some(fact) = fact_filter {
let kind_ok = match fact {
KindFilter::Exact(k) => h.kind == *k,
KindFilter::Category(c) => h.kind.category() == *c,
KindFilter::Any => true,
};
if !kind_ok {
return false;
}
}
if let Some((min, max)) = clock_range {
if h.clock < min || h.clock > max {
return false;
}
}
true
});
}
pub(crate) fn query_hits_after(
&self,
region: &Region,
after_seq: u64,
started: bool,
limit: usize,
) -> Vec<QueryHit> {
let _read_guard = self.swap_gate.read();
let visibility = self.sequence.snapshot();
let seq_ok = |seq: u64| !started || seq > after_seq;
let mut hits: Vec<QueryHit> = if region.entity_prefix.is_some() {
let mut candidates = Vec::new();
for stream in self
.streams
.iter()
.filter(|r| region.matches_entity(r.key().as_ref()))
{
for entry in stream.value().values() {
if !seq_ok(entry.global_sequence) {
continue;
}
candidates.push(QueryHit::from_entry(entry));
}
}
candidates
} else if let Some(ref scope) = region.scope {
let overlay_hits = self.scan.query_hits_by_scope(scope.as_ref());
if !overlay_hits.is_empty() {
overlay_hits
} else if let Some(entities) = self.scan.scope_entity_set(scope.as_ref()) {
let mut candidates = Vec::new();
for entity in &entities {
if let Some(stream) = self.streams.get(entity.as_ref()) {
for entry in stream.value().values() {
if seq_ok(entry.global_sequence) {
candidates.push(QueryHit::from_entry(entry));
}
}
}
}
candidates
} else {
Vec::new()
}
} else if let Some(ref fact) = region.fact {
match fact {
KindFilter::Exact(k) => self.scan.query_hits_by_kind(*k),
KindFilter::Category(c) => self.scan.query_hits_by_category(*c),
KindFilter::Any => {
let clock_range = region.clock_range;
let trim_threshold = limit
.saturating_mul(2)
.max(limit.saturating_add(1))
.min(1 << 20);
let initial_cap = limit.min(1 << 20);
let mut buf: Vec<QueryHit> = Vec::with_capacity(initial_cap);
let trim = |buf: &mut Vec<QueryHit>, limit: usize| {
buf.sort_by_key(|h| h.global_sequence);
buf.truncate(limit);
};
for stream in self.streams.iter() {
for entry in stream.value().values() {
if !seq_ok(entry.global_sequence) {
continue;
}
if !visibility.is_visible(entry.global_sequence) {
continue;
}
if let Some((min, max)) = clock_range {
if entry.clock < min || entry.clock > max {
continue;
}
}
buf.push(QueryHit::from_entry(entry));
if buf.len() >= trim_threshold {
trim(&mut buf, limit);
}
}
}
trim(&mut buf, limit);
return buf;
}
}
} else {
let mut candidates = Vec::new();
for stream in self.streams.iter() {
for entry in stream.value().values() {
if !seq_ok(entry.global_sequence) {
continue;
}
candidates.push(QueryHit::from_entry(entry));
}
}
candidates
};
self.filter_region_hits(&mut hits, region, &visibility);
if started {
hits.retain(|h| h.global_sequence > after_seq);
}
hits.sort_by_key(|h| h.global_sequence);
hits.truncate(limit);
hits
}
pub(crate) fn get_latest(&self, entity: &str) -> Option<IndexEntry> {
let _read_guard = self.swap_gate.read();
let visibility = self.sequence.snapshot();
self.latest
.get(entity)
.map(|r| r.value().as_ref().clone())
.filter(|e| visibility.is_visible(e.global_sequence))
}
pub(crate) fn stream(&self, entity: &str) -> Vec<IndexEntry> {
let _read_guard = self.swap_gate.read();
let visibility = self.sequence.snapshot();
self.streams
.get(entity)
.map(|r| {
let mut entries = Vec::with_capacity(r.value().len());
extend_visible_entries(&mut entries, r.value().values(), &visibility);
entries
})
.unwrap_or_default()
}
pub(crate) fn query(&self, region: &Region) -> Vec<IndexEntry> {
self.query_hits(region)
.into_iter()
.filter_map(|hit| self.upgrade_hit(hit))
.collect()
}
}