batpak 0.9.0

Event sourcing with causal graphs and caller-defined gates. Sync API, no async runtime.
Documentation
#[cfg(test)]
use super::apply_after_bounds;
use super::{CachedProjectionSlot, EntryQuery, ProjectionCandidates, ScanIndex};
use crate::event::EventKind;
use crate::store::index::{ProjectionCacheStoreStatus, QueryHit};
use std::any::TypeId;

#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub(super) enum ScanRoute {
    BaseAoS,
    SoA,
    SoAoS,
    AoSoA64,
}

#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub(super) struct ProjectionSupport {
    pub(super) entity_generation_fast_path: bool,
    pub(super) cached_projection: bool,
    pub(super) projection_candidates: bool,
}

#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub(super) struct ScanCapabilities {
    pub(super) by_kind: ScanRoute,
    pub(super) by_scope: ScanRoute,
    pub(super) by_category: ScanRoute,
    pub(super) projection: ProjectionSupport,
    pub(super) topology_name: &'static str,
    pub(super) tile_count: usize,
}

impl ScanIndex {
    pub(super) fn capabilities(&self) -> ScanCapabilities {
        let (has_soa, has_entity_groups, has_tiles64) = (
            self.soa.is_some(),
            self.entity_groups.is_some(),
            self.tiles64.is_some(),
        );

        let topology_name = match (has_soa, has_entity_groups, has_tiles64) {
            (false, false, false) => "aos",
            (true, false, false) => "scan",
            (false, true, false) => "entity-local",
            (false, false, true) => "tiled",
            (true, true, true) => "all",
            _ => "hybrid",
        };

        ScanCapabilities {
            by_kind: if has_soa {
                ScanRoute::SoA
            } else if has_tiles64 {
                ScanRoute::AoSoA64
            } else if has_entity_groups {
                ScanRoute::SoAoS
            } else {
                ScanRoute::BaseAoS
            },
            by_scope: if has_entity_groups {
                ScanRoute::SoAoS
            } else if has_soa {
                ScanRoute::SoA
            } else if has_tiles64 {
                ScanRoute::AoSoA64
            } else {
                ScanRoute::BaseAoS
            },
            by_category: if has_soa {
                ScanRoute::SoA
            } else if has_tiles64 {
                ScanRoute::AoSoA64
            } else if has_entity_groups {
                ScanRoute::SoAoS
            } else {
                ScanRoute::BaseAoS
            },
            projection: ProjectionSupport {
                entity_generation_fast_path: has_entity_groups,
                cached_projection: has_entity_groups,
                projection_candidates: has_entity_groups,
            },
            topology_name,
            tile_count: self
                .tiles64
                .as_ref()
                .map_or(0, super::ColumnarIndex::tile_count),
        }
    }

    fn query_base_hits_by_kind(&self, kind: EventKind) -> Vec<QueryHit> {
        let mut results: Vec<QueryHit> = self
            .by_fact
            .get(&kind)
            .map(|r| {
                r.value()
                    .values()
                    .map(|entry| QueryHit::from_entry(entry))
                    .collect()
            })
            .unwrap_or_default();
        results.sort_by_key(|h| h.global_sequence);
        results
    }

    fn query_base_hits_by_category(&self, category: u8) -> Vec<QueryHit> {
        let mut results = Vec::new();
        for entries in self
            .by_fact
            .iter()
            .filter(|r| r.key().category() == category)
        {
            results.extend(
                entries
                    .value()
                    .values()
                    .map(|entry| QueryHit::from_entry(entry)),
            );
        }
        results.sort_by_key(|h| h.global_sequence);
        results
    }

    fn query_hits_route(&self, route: ScanRoute, query: EntryQuery<'_>) -> Vec<QueryHit> {
        let fallback = || match query {
            EntryQuery::Kind(kind) => self.query_base_hits_by_kind(kind),
            EntryQuery::Category(category) => self.query_base_hits_by_category(category),
            EntryQuery::Scope(_) => Vec::new(),
        };
        match (route, query) {
            (ScanRoute::BaseAoS, EntryQuery::Kind(kind)) => self.query_base_hits_by_kind(kind),
            (ScanRoute::BaseAoS, EntryQuery::Category(category)) => {
                self.query_base_hits_by_category(category)
            }
            (ScanRoute::BaseAoS, EntryQuery::Scope(_)) => Vec::new(),
            (ScanRoute::SoA, EntryQuery::Kind(kind)) => self
                .soa
                .as_ref()
                .map_or_else(fallback, |soa| soa.query_hits_by_kind(kind)),
            (ScanRoute::SoA, EntryQuery::Category(category)) => self
                .soa
                .as_ref()
                .map_or_else(fallback, |soa| soa.query_hits_by_category(category)),
            (ScanRoute::SoA, EntryQuery::Scope(scope)) => self
                .soa
                .as_ref()
                .map_or_else(fallback, |soa| soa.query_hits_by_scope(scope)),
            (ScanRoute::SoAoS, EntryQuery::Kind(kind)) => self
                .entity_groups
                .as_ref()
                .map_or_else(fallback, |groups| groups.query_hits_by_kind(kind)),
            (ScanRoute::SoAoS, EntryQuery::Category(category)) => self
                .entity_groups
                .as_ref()
                .map_or_else(fallback, |groups| groups.query_hits_by_category(category)),
            (ScanRoute::SoAoS, EntryQuery::Scope(scope)) => self
                .entity_groups
                .as_ref()
                .map_or_else(fallback, |groups| groups.query_hits_by_scope(scope)),
            (ScanRoute::AoSoA64, EntryQuery::Kind(kind)) => self
                .tiles64
                .as_ref()
                .map_or_else(fallback, |tiles| tiles.query_hits_by_kind(kind)),
            (ScanRoute::AoSoA64, EntryQuery::Category(category)) => self
                .tiles64
                .as_ref()
                .map_or_else(fallback, |tiles| tiles.query_hits_by_category(category)),
            (ScanRoute::AoSoA64, EntryQuery::Scope(scope)) => self
                .tiles64
                .as_ref()
                .map_or_else(fallback, |tiles| tiles.query_hits_by_scope(scope)),
        }
    }

    pub(crate) fn query_hits_by_kind(&self, kind: EventKind) -> Vec<QueryHit> {
        self.query_hits_route(self.capabilities().by_kind, EntryQuery::Kind(kind))
    }

    pub(crate) fn query_hits_by_category(&self, category: u8) -> Vec<QueryHit> {
        self.query_hits_route(
            self.capabilities().by_category,
            EntryQuery::Category(category),
        )
    }

    pub(crate) fn query_hits_by_scope(&self, scope: &str) -> Vec<QueryHit> {
        self.query_hits_route(self.capabilities().by_scope, EntryQuery::Scope(scope))
    }

    #[cfg(test)]
    fn query_hits_route_after(
        &self,
        route: ScanRoute,
        query: EntryQuery<'_>,
        after_seq: u64,
        started: bool,
        limit: usize,
    ) -> Vec<QueryHit> {
        let fallback = || match query {
            EntryQuery::Kind(kind) => {
                let mut v = self.query_base_hits_by_kind(kind);
                apply_after_bounds(&mut v, after_seq, started, limit);
                v
            }
            EntryQuery::Category(cat) => {
                let mut v = self.query_base_hits_by_category(cat);
                apply_after_bounds(&mut v, after_seq, started, limit);
                v
            }
            EntryQuery::Scope(_) => Vec::new(),
        };
        match (route, query) {
            (ScanRoute::BaseAoS, EntryQuery::Kind(kind)) => {
                let mut v = self.query_base_hits_by_kind(kind);
                apply_after_bounds(&mut v, after_seq, started, limit);
                v
            }
            (ScanRoute::BaseAoS, EntryQuery::Category(cat)) => {
                let mut v = self.query_base_hits_by_category(cat);
                apply_after_bounds(&mut v, after_seq, started, limit);
                v
            }
            (ScanRoute::BaseAoS, EntryQuery::Scope(_)) => Vec::new(),
            (ScanRoute::SoA, EntryQuery::Kind(kind)) => {
                self.soa.as_ref().map_or_else(fallback, |soa| {
                    soa.query_hits_by_kind_after(kind, after_seq, started, limit)
                })
            }
            (ScanRoute::SoA, EntryQuery::Category(cat)) => {
                self.soa.as_ref().map_or_else(fallback, |soa| {
                    soa.query_hits_by_category_after(cat, after_seq, started, limit)
                })
            }
            (ScanRoute::SoA, EntryQuery::Scope(scope)) => {
                self.soa.as_ref().map_or_else(fallback, |soa| {
                    soa.query_hits_by_scope_after(scope, after_seq, started, limit)
                })
            }
            (ScanRoute::SoAoS, EntryQuery::Kind(kind)) => {
                self.entity_groups.as_ref().map_or_else(fallback, |groups| {
                    let mut v = groups.query_hits_by_kind(kind);
                    apply_after_bounds(&mut v, after_seq, started, limit);
                    v
                })
            }
            (ScanRoute::SoAoS, EntryQuery::Category(cat)) => {
                self.entity_groups.as_ref().map_or_else(fallback, |groups| {
                    let mut v = groups.query_hits_by_category(cat);
                    apply_after_bounds(&mut v, after_seq, started, limit);
                    v
                })
            }
            (ScanRoute::SoAoS, EntryQuery::Scope(scope)) => {
                self.entity_groups.as_ref().map_or_else(fallback, |groups| {
                    let mut v = groups.query_hits_by_scope(scope);
                    apply_after_bounds(&mut v, after_seq, started, limit);
                    v
                })
            }
            (ScanRoute::AoSoA64, EntryQuery::Kind(kind)) => {
                self.tiles64.as_ref().map_or_else(fallback, |tiles| {
                    let mut v = tiles.query_hits_by_kind(kind);
                    apply_after_bounds(&mut v, after_seq, started, limit);
                    v
                })
            }
            (ScanRoute::AoSoA64, EntryQuery::Category(cat)) => {
                self.tiles64.as_ref().map_or_else(fallback, |tiles| {
                    let mut v = tiles.query_hits_by_category(cat);
                    apply_after_bounds(&mut v, after_seq, started, limit);
                    v
                })
            }
            (ScanRoute::AoSoA64, EntryQuery::Scope(scope)) => {
                self.tiles64.as_ref().map_or_else(fallback, |tiles| {
                    let mut v = tiles.query_hits_by_scope(scope);
                    apply_after_bounds(&mut v, after_seq, started, limit);
                    v
                })
            }
        }
    }

    #[cfg(test)]
    pub(crate) fn query_hits_by_kind_after(
        &self,
        kind: EventKind,
        after_seq: u64,
        started: bool,
        limit: usize,
    ) -> Vec<QueryHit> {
        self.query_hits_route_after(
            self.capabilities().by_kind,
            EntryQuery::Kind(kind),
            after_seq,
            started,
            limit,
        )
    }

    #[cfg(test)]
    pub(crate) fn query_hits_by_category_after(
        &self,
        category: u8,
        after_seq: u64,
        started: bool,
        limit: usize,
    ) -> Vec<QueryHit> {
        self.query_hits_route_after(
            self.capabilities().by_category,
            EntryQuery::Category(category),
            after_seq,
            started,
            limit,
        )
    }

    #[cfg(test)]
    pub(crate) fn query_hits_by_scope_after(
        &self,
        scope: &str,
        after_seq: u64,
        started: bool,
        limit: usize,
    ) -> Vec<QueryHit> {
        self.query_hits_route_after(
            self.capabilities().by_scope,
            EntryQuery::Scope(scope),
            after_seq,
            started,
            limit,
        )
    }

    pub(crate) fn topology_name(&self) -> &'static str {
        self.capabilities().topology_name
    }

    pub(crate) fn tile_count(&self) -> usize {
        self.capabilities().tile_count
    }

    pub(crate) fn entity_generation(&self, entity: &str) -> Option<u64> {
        let projection = self.capabilities().projection;
        if !projection.entity_generation_fast_path {
            return None;
        }
        self.entity_groups
            .as_ref()
            .and_then(|idx| idx.entity_generation(entity))
    }

    pub(crate) fn cached_projection(
        &self,
        entity: &str,
        type_id: TypeId,
    ) -> Option<CachedProjectionSlot> {
        let projection = self.capabilities().projection;
        if !projection.cached_projection {
            return None;
        }
        self.entity_groups
            .as_ref()
            .and_then(|idx| idx.cached_projection(entity, type_id))
    }

    pub(crate) fn store_cached_projection(
        &self,
        entity: &str,
        type_id: TypeId,
        bytes: Vec<u8>,
        watermark: u64,
    ) -> ProjectionCacheStoreStatus {
        let projection = self.capabilities().projection;
        if !projection.cached_projection {
            return ProjectionCacheStoreStatus::UnsupportedTopology;
        }
        self.entity_groups
            .as_ref()
            .map_or(ProjectionCacheStoreStatus::UnsupportedTopology, |idx| {
                idx.store_cached_projection(entity, type_id, bytes, watermark)
            })
    }

    pub(crate) fn projection_candidates(
        &self,
        entity: &str,
        relevant_kinds: &[EventKind],
    ) -> Option<ProjectionCandidates> {
        let projection = self.capabilities().projection;
        if !projection.projection_candidates {
            return None;
        }
        self.entity_groups
            .as_ref()
            .and_then(|idx| idx.projection_candidates(entity, relevant_kinds))
    }
}