use super::visibility::{extend_visible_entries, VisibilitySnapshot};
use super::{IndexEntry, QueryHit, StoreIndex};
use crate::coordinate::{KindFilter, Region};
impl StoreIndex {
pub(crate) fn get_by_id(&self, event_id: u128) -> Option<IndexEntry> {
let _read_guard = self.swap_gate.read();
let visibility = self.sequence.snapshot();
self.by_id
.get(&event_id)
.map(|r| r.value().as_ref().clone())
.filter(|e| visibility.is_visible(e.global_sequence))
}
pub(crate) fn upgrade_hit(&self, hit: QueryHit) -> Option<IndexEntry> {
let visibility = self.sequence.snapshot();
self.upgrade_hit_with_visibility(hit, &visibility)
}
pub(crate) fn upgrade_hit_with_visibility(
&self,
hit: QueryHit,
visibility: &VisibilitySnapshot,
) -> Option<IndexEntry> {
let upgraded = self
.by_id
.get(&hit.event_id)
.map(|entry| entry.value().as_ref().clone());
if upgraded.is_none() {
tracing::error!(
target: "batpak::index",
event_id = hit.event_id,
global_sequence = hit.global_sequence,
"dropping query hit with no backing by_id entry"
);
}
upgraded.filter(|entry| visibility.is_visible(entry.global_sequence))
}
pub(crate) fn query_hits_with_snapshot(
&self,
region: &Region,
) -> (Vec<QueryHit>, VisibilitySnapshot) {
let _read_guard = self.swap_gate.read();
let visibility = self.sequence.snapshot();
let hits = self.query_hits_with_visibility(region, &visibility);
(hits, visibility)
}
fn query_hits_with_visibility(
&self,
region: &Region,
visibility: &VisibilitySnapshot,
) -> Vec<QueryHit> {
let mut hits = self.query_candidate_hits(region, |_| true);
self.filter_region_hits(&mut hits, region, visibility);
hits.sort_by_key(|h| h.global_sequence);
hits
}
fn query_candidate_hits<F>(&self, region: &Region, mut include_stream_entry: F) -> Vec<QueryHit>
where
F: FnMut(&IndexEntry) -> bool,
{
if region.entity_prefix.is_some() {
return self.stream_hits_matching(region, |entry| include_stream_entry(entry));
}
if let Some(ref scope) = region.scope {
let scan_hits = self.scan.query_hits_by_scope(scope.as_ref());
if !scan_hits.is_empty() {
return scan_hits;
}
return self
.scan
.scope_entity_set(scope.as_ref())
.map(|entities| {
let mut candidates = Vec::new();
for entity in &entities {
if let Some(stream) = self.streams.get(entity.as_ref()) {
for entry in stream.value().values() {
if include_stream_entry(entry) {
candidates.push(QueryHit::from_entry(entry));
}
}
}
}
candidates
})
.unwrap_or_default();
}
if let Some(ref fact) = region.fact {
return match fact {
KindFilter::Exact(k) => self.scan.query_hits_by_kind(*k),
KindFilter::Category(c) => self.scan.query_hits_by_category(*c),
KindFilter::Any => self.all_stream_hits_where(include_stream_entry),
};
}
self.all_stream_hits_where(include_stream_entry)
}
fn stream_hits_matching<F>(&self, region: &Region, mut include_entry: F) -> Vec<QueryHit>
where
F: FnMut(&IndexEntry) -> bool,
{
let mut candidates = Vec::new();
for stream in self
.streams
.iter()
.filter(|r| region.matches_entity(r.key().as_ref()))
{
for entry in stream.value().values() {
if include_entry(entry) {
candidates.push(QueryHit::from_entry(entry));
}
}
}
candidates
}
fn all_stream_hits_where<F>(&self, mut include_entry: F) -> Vec<QueryHit>
where
F: FnMut(&IndexEntry) -> bool,
{
let mut candidates = Vec::new();
for stream in self.streams.iter() {
for entry in stream.value().values() {
if include_entry(entry) {
candidates.push(QueryHit::from_entry(entry));
}
}
}
candidates
}
fn filter_region_hits(
&self,
hits: &mut Vec<QueryHit>,
region: &Region,
visibility: &VisibilitySnapshot,
) {
let requested_scope = region.scope.as_deref();
let fact_filter = region.fact.as_ref();
let clock_range = region.clock_range;
hits.retain(|h| {
if !visibility.is_visible(h.global_sequence) {
return false;
}
if let Some(scope) = requested_scope {
match self.by_id.get(&h.event_id) {
Some(entry) => {
if entry.value().coord.scope() != scope {
return false;
}
}
None => return false,
}
}
if let Some(fact) = fact_filter {
let kind_ok = match fact {
KindFilter::Exact(k) => h.kind == *k,
KindFilter::Category(c) => h.kind.category() == *c,
KindFilter::Any => true,
};
if !kind_ok {
return false;
}
}
if let Some((min, max)) = clock_range {
if h.clock < min || h.clock > max {
return false;
}
}
true
});
}
pub(crate) fn query_hits_after(
&self,
region: &Region,
after_seq: u64,
started: bool,
limit: usize,
) -> Vec<QueryHit> {
let _read_guard = self.swap_gate.read();
let visibility = self.sequence.snapshot();
let seq_ok = |seq: u64| !started || seq > after_seq;
if region.entity_prefix.is_none()
&& region.scope.is_none()
&& matches!(region.fact, Some(KindFilter::Any))
{
return self.query_any_hits_after(region, &visibility, seq_ok, limit);
}
let mut hits = self.query_candidate_hits(region, |entry| seq_ok(entry.global_sequence));
self.filter_region_hits(&mut hits, region, &visibility);
if started {
hits.retain(|h| h.global_sequence > after_seq);
}
hits.sort_by_key(|h| h.global_sequence);
hits.truncate(limit);
hits
}
fn query_any_hits_after<F>(
&self,
region: &Region,
visibility: &VisibilitySnapshot,
mut seq_ok: F,
limit: usize,
) -> Vec<QueryHit>
where
F: FnMut(u64) -> bool,
{
let clock_range = region.clock_range;
let trim_threshold = limit
.saturating_mul(2)
.max(limit.saturating_add(1))
.min(1 << 20);
let initial_cap = limit.min(1 << 20);
let mut buf: Vec<QueryHit> = Vec::with_capacity(initial_cap);
let trim = |buf: &mut Vec<QueryHit>, limit: usize| {
buf.sort_by_key(|h| h.global_sequence);
buf.truncate(limit);
};
for stream in self.streams.iter() {
for entry in stream.value().values() {
if !seq_ok(entry.global_sequence) {
continue;
}
if !visibility.is_visible(entry.global_sequence) {
continue;
}
if let Some((min, max)) = clock_range {
if entry.clock < min || entry.clock > max {
continue;
}
}
buf.push(QueryHit::from_entry(entry));
if buf.len() >= trim_threshold {
trim(&mut buf, limit);
}
}
}
trim(&mut buf, limit);
buf
}
pub(crate) fn get_latest(&self, entity: &str) -> Option<IndexEntry> {
let _read_guard = self.swap_gate.read();
let visibility = self.sequence.snapshot();
self.latest
.get(entity)
.map(|r| r.value().as_ref().clone())
.filter(|e| visibility.is_visible(e.global_sequence))
}
pub(crate) fn stream(&self, entity: &str) -> Vec<IndexEntry> {
let _read_guard = self.swap_gate.read();
let visibility = self.sequence.snapshot();
self.streams
.get(entity)
.map(|r| {
let mut entries = Vec::with_capacity(r.value().len());
extend_visible_entries(&mut entries, r.value().values(), &visibility);
entries
})
.unwrap_or_default()
}
pub(crate) fn query(&self, region: &Region) -> Vec<IndexEntry> {
let (hits, visibility) = self.query_hits_with_snapshot(region);
hits.into_iter()
.filter_map(|hit| self.upgrade_hit_with_visibility(hit, &visibility))
.collect()
}
}