use crate::event::EventSourced;
use crate::store::index::columnar::CachedProjectionSlot;
use crate::store::index::ProjectionReplayPlan;
use crate::store::Freshness;
use std::any::TypeId;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum ProjectionStrategy {
Empty,
GroupLocalHit,
GroupLocalIncremental,
ExternalCacheThenReplay,
DirectReplay,
}
#[derive(Debug, Clone)]
pub(super) struct ReplayContext {
pub(super) plan: ProjectionReplayPlan,
pub(super) cache_key: Vec<u8>,
pub(super) watermark: u64,
pub(super) cached_at_us: i64,
pub(super) cached_at_mono_ns: i64,
pub(super) process_boot_ns: u64,
pub(super) type_id: TypeId,
}
#[derive(Debug, Clone)]
pub(super) struct PreparedProjection {
pub(super) replay: ReplayContext,
pub(super) group_local_slot: Option<CachedProjectionSlot>,
pub(super) group_local_fresh: bool,
}
#[derive(Debug, Clone)]
pub(super) enum ProjectionPreparation {
Empty,
Planned(PreparedProjection),
}
#[derive(Debug, Clone, Copy)]
pub(super) struct ReplayExecution<'a> {
pub(super) entity: &'a str,
pub(super) freshness: &'a Freshness,
pub(super) replay: &'a ReplayContext,
pub(super) started_at_ns: i64,
}
#[derive(Debug, Clone)]
pub(super) enum ProjectionDispatch {
Empty,
GroupLocalHit {
slot: CachedProjectionSlot,
replay: ReplayContext,
},
GroupLocalIncremental {
slot: CachedProjectionSlot,
replay: ReplayContext,
},
ExternalCacheThenReplay {
replay: ReplayContext,
},
DirectReplay {
replay: ReplayContext,
},
}
impl ProjectionDispatch {
pub(super) fn strategy(&self) -> ProjectionStrategy {
match self {
Self::Empty => ProjectionStrategy::Empty,
Self::GroupLocalHit { .. } => ProjectionStrategy::GroupLocalHit,
Self::GroupLocalIncremental { .. } => ProjectionStrategy::GroupLocalIncremental,
Self::ExternalCacheThenReplay { .. } => ProjectionStrategy::ExternalCacheThenReplay,
Self::DirectReplay { .. } => ProjectionStrategy::DirectReplay,
}
}
}
pub(super) fn replay_execution<'a>(
entity: &'a str,
freshness: &'a Freshness,
replay: &'a ReplayContext,
started_at_ns: i64,
) -> ReplayExecution<'a> {
ReplayExecution {
entity,
freshness,
replay,
started_at_ns,
}
}
impl PreparedProjection {
pub(super) fn dispatch<T: EventSourced>(
self,
incremental_enabled: bool,
cache_is_noop: bool,
) -> ProjectionDispatch {
let strategy = compute_strategy(
self.group_local_slot.as_ref(),
self.group_local_fresh,
T::supports_incremental_apply(),
incremental_enabled,
cache_is_noop,
);
let Self {
replay,
group_local_slot,
..
} = self;
match (strategy, group_local_slot) {
(ProjectionStrategy::GroupLocalHit, Some(slot)) => {
ProjectionDispatch::GroupLocalHit { slot, replay }
}
(ProjectionStrategy::GroupLocalIncremental, Some(slot)) => {
ProjectionDispatch::GroupLocalIncremental { slot, replay }
}
(ProjectionStrategy::ExternalCacheThenReplay, _) => {
ProjectionDispatch::ExternalCacheThenReplay { replay }
}
(ProjectionStrategy::DirectReplay, _) => ProjectionDispatch::DirectReplay { replay },
(ProjectionStrategy::Empty, _) => ProjectionDispatch::Empty,
(
ProjectionStrategy::GroupLocalHit | ProjectionStrategy::GroupLocalIncremental,
None,
) => {
debug_assert!(
false,
"compute_strategy selected a group-local projection strategy without a cached slot"
);
ProjectionDispatch::DirectReplay { replay }
}
}
}
}
pub(super) fn compute_strategy(
group_local_slot: Option<&CachedProjectionSlot>,
is_group_local_fresh: bool,
supports_incremental: bool,
incremental_enabled: bool,
cache_is_noop: bool,
) -> ProjectionStrategy {
if group_local_slot.is_some() {
if is_group_local_fresh {
return ProjectionStrategy::GroupLocalHit;
}
if supports_incremental && incremental_enabled {
return ProjectionStrategy::GroupLocalIncremental;
}
}
if cache_is_noop {
return ProjectionStrategy::DirectReplay;
}
ProjectionStrategy::ExternalCacheThenReplay
}
#[cfg(test)]
mod tests {
use super::*;
use crate::event::{Event, EventKind};
use crate::store::index::ProjectionReplayPlan;
#[derive(Default, Debug, serde::Serialize, serde::Deserialize)]
struct Counter;
impl EventSourced for Counter {
type Input = crate::event::JsonValueInput;
fn apply_event(&mut self, _event: &Event<serde_json::Value>) {}
fn from_events(events: &[Event<serde_json::Value>]) -> Option<Self> {
(!events.is_empty()).then_some(Self)
}
fn relevant_event_kinds() -> &'static [EventKind] {
static KINDS: [EventKind; 1] = [EventKind::custom(0xF, 1)];
&KINDS
}
}
fn replay_context() -> ReplayContext {
ReplayContext {
plan: ProjectionReplayPlan {
watermark: 7,
generation: 9,
items: vec![],
},
cache_key: vec![1, 2, 3],
watermark: 7,
cached_at_us: 0,
cached_at_mono_ns: 0,
process_boot_ns: 0,
type_id: std::any::TypeId::of::<Counter>(),
}
}
#[test]
fn dispatch_uses_direct_replay_for_noop_cache_without_group_local_slot() {
let prepared = PreparedProjection {
replay: replay_context(),
group_local_slot: None,
group_local_fresh: false,
};
assert!(matches!(
prepared.dispatch::<Counter>(false, true),
ProjectionDispatch::DirectReplay { replay }
if replay.plan.generation == 9
));
}
}