mod cache_identity;
mod outcome;
mod replay_input;
mod strategy;
use crate::event::{EventSourced, ProjectionInput};
use crate::store::config::duration_micros;
use crate::store::{Freshness, HlcPoint, Store, StoreError};
use std::any::TypeId;
pub(crate) use cache_identity::projection_cache_key;
use outcome::{
finish_empty_projection, finish_projection, record_external_cache_probe_time,
ProjectionFinishObservation,
};
pub(crate) use outcome::{
ProjectionCacheObservation, ProjectionObservedFreshness, ProjectionOutcome, ProjectionTimings,
};
#[doc(hidden)]
pub use replay_input::ReplayInput;
#[cfg(test)]
use strategy::{compute_strategy, ProjectionStrategy};
use strategy::{
replay_execution, PreparedProjection, ProjectionDispatch, ProjectionPreparation, ReplayContext,
ReplayExecution,
};
fn decode_cached_state<T>(entity: &str, bytes: &[u8], warning: &str) -> Option<T>
where
T: serde::de::DeserializeOwned,
{
match serde_json::from_slice::<T>(bytes) {
Ok(value) => Some(value),
Err(error) => {
tracing::warn!(entity, error = %error, "{}", warning);
None
}
}
}
fn fallback_to_full_replay<T, I, State>(
store: &Store<State>,
entity: &str,
freshness: &Freshness,
replay: &ReplayContext,
started_at: std::time::Instant,
timings: &mut Option<&mut ProjectionTimings>,
) -> Result<ProjectionOutcome<T>, StoreError>
where
T: EventSourced + serde::Serialize + serde::de::DeserializeOwned + 'static,
I: ReplayInput<Payload = <T::Input as ProjectionInput>::Payload>,
{
execute_full_replay::<T, I, State>(
store,
replay_execution(entity, freshness, replay, started_at),
ProjectionCacheObservation::Miss,
ProjectionObservedFreshness::Fresh,
timings,
)
}
fn input_frontier_for_sequence<State>(store: &Store<State>, sequence: u64) -> Option<HlcPoint> {
store.index.hlc_for_global_sequence(sequence)
}
fn finish_observation<State>(
store: &Store<State>,
applied_sequence: u64,
cache_status: ProjectionCacheObservation,
observed_freshness: ProjectionObservedFreshness,
) -> ProjectionFinishObservation {
ProjectionFinishObservation {
applied_sequence,
cache_status,
observed_freshness,
input_frontier: input_frontier_for_sequence(store, applied_sequence),
}
}
pub(crate) fn project<T, State>(
store: &Store<State>,
entity: &str,
freshness: &Freshness,
) -> Result<Option<T>, StoreError>
where
T: EventSourced + serde::Serialize + serde::de::DeserializeOwned + 'static,
T::Input: ReplayInput,
{
Ok(project_inner::<T, T::Input, State>(store, entity, freshness, None)?.into_state())
}
pub(crate) fn project_outcome<T, State>(
store: &Store<State>,
entity: &str,
freshness: &Freshness,
) -> Result<ProjectionOutcome<T>, StoreError>
where
T: EventSourced + serde::Serialize + serde::de::DeserializeOwned + 'static,
T::Input: ReplayInput,
{
project_inner::<T, T::Input, State>(store, entity, freshness, None)
}
pub(crate) fn project_if_changed<T, State>(
store: &Store<State>,
entity: &str,
last_seen_generation: u64,
freshness: &Freshness,
) -> Result<Option<(u64, Option<T>)>, StoreError>
where
T: EventSourced + serde::Serialize + serde::de::DeserializeOwned + 'static,
T::Input: ReplayInput,
{
let consistent_freshness = Freshness::Consistent;
let effective_freshness = match freshness {
Freshness::Consistent => freshness,
Freshness::MaybeStale { .. } => &consistent_freshness,
};
let current_generation = store.entity_generation(entity).unwrap_or(0);
if current_generation == last_seen_generation {
return Ok(None);
}
let outcome = project_inner::<T, T::Input, State>(store, entity, effective_freshness, None)?;
Ok(Some(outcome.into_parts()))
}
#[cfg(test)]
pub(crate) fn project_timed<T, State>(
store: &Store<State>,
entity: &str,
freshness: &Freshness,
out: &mut ProjectionTimings,
) -> Result<Option<T>, StoreError>
where
T: EventSourced + serde::Serialize + serde::de::DeserializeOwned + 'static,
T::Input: ReplayInput,
{
Ok(project_inner::<T, T::Input, State>(store, entity, freshness, Some(out))?.into_state())
}
fn project_inner<T, I, State>(
store: &Store<State>,
entity: &str,
freshness: &Freshness,
mut timings: Option<&mut ProjectionTimings>,
) -> Result<ProjectionOutcome<T>, StoreError>
where
T: EventSourced + serde::Serialize + serde::de::DeserializeOwned + 'static,
I: ReplayInput<Payload = <T::Input as ProjectionInput>::Payload>,
{
let t_start = std::time::Instant::now();
let observed_generation = store.entity_generation(entity).unwrap_or(0);
tracing::debug!(
target: "batpak::flow",
flow = "project",
entity,
freshness = match freshness {
Freshness::Consistent => "consistent",
Freshness::MaybeStale { .. } => "maybe_stale",
}
);
let relevant_kinds = T::relevant_event_kinds();
let preparation = match store.index.projection_replay_plan(entity, relevant_kinds) {
None => ProjectionPreparation::Empty,
Some(plan) => {
let t_cache_key = std::time::Instant::now();
let replay = ReplayContext {
watermark: plan.watermark,
cached_at_us: store.runtime.cache_now_us(),
cached_at_mono_ns: crate::store::config::now_mono_ns(),
process_boot_ns: crate::store::config::process_boot_ns(),
type_id: TypeId::of::<T>(),
cache_key: projection_cache_key::<T>(entity),
plan,
};
if let Some(t) = timings.as_deref_mut() {
t.cache_key_build_us = duration_micros(t_cache_key.elapsed());
}
let t_prefetch = std::time::Instant::now();
if store.cache.capabilities().supports_prefetch {
let predicted_meta = super::CacheMeta {
watermark: replay.watermark,
cached_at_us: replay.cached_at_us,
cached_at_mono_ns: Some(replay.cached_at_mono_ns),
process_boot_ns: Some(replay.process_boot_ns),
};
if let Err(error) = store.cache.prefetch(&replay.cache_key, predicted_meta) {
tracing::warn!("cache prefetch failed (non-fatal): {error}");
}
}
if let Some(t) = timings.as_deref_mut() {
t.prefetch_us = duration_micros(t_prefetch.elapsed());
}
let t_group = std::time::Instant::now();
let group_local_slot = store.index.cached_projection(entity, replay.type_id);
let group_local_fresh = group_local_slot
.as_ref()
.map(|slot| match freshness {
Freshness::Consistent => {
slot.watermark == replay.watermark
&& slot.generation == replay.plan.generation
}
Freshness::MaybeStale { max_stale_ms: _ } => {
slot.watermark == replay.watermark
&& slot.generation == replay.plan.generation
}
})
.unwrap_or(false);
if let Some(t) = timings.as_deref_mut() {
t.group_local_lookup_us = duration_micros(t_group.elapsed());
}
ProjectionPreparation::Planned(PreparedProjection {
replay,
group_local_slot,
group_local_fresh,
})
}
};
if let Some(t) = timings.as_deref_mut() {
t.plan_build_us = duration_micros(t_start.elapsed());
}
let dispatch = match preparation {
ProjectionPreparation::Empty => ProjectionDispatch::Empty,
ProjectionPreparation::Planned(prepared) => prepared.dispatch::<T>(
store.runtime.incremental_projection,
store.cache.capabilities().is_noop,
),
};
tracing::debug!(
target: "batpak::flow",
flow = "project",
entity,
strategy = ?dispatch.strategy(),
);
let outcome = match dispatch {
ProjectionDispatch::Empty => Ok(finish_empty_projection(
&mut timings,
t_start,
observed_generation,
)),
ProjectionDispatch::GroupLocalHit { slot, replay } => {
if let Some(value) = decode_cached_state::<T>(
entity,
&slot.bytes,
"group-local projection cache deserialize failed (falling back)",
) {
Ok(finish_projection(
&mut timings,
t_start,
Some(value),
slot.generation,
finish_observation(
store,
slot.watermark,
ProjectionCacheObservation::Hit,
ProjectionObservedFreshness::Fresh,
),
))
} else {
fallback_to_full_replay::<T, I, State>(
store,
entity,
freshness,
&replay,
t_start,
&mut timings,
)
}
}
ProjectionDispatch::GroupLocalIncremental { slot, replay } => {
if let Some(mut cached_state) = decode_cached_state::<T>(
entity,
&slot.bytes,
"group-local incremental deser failed, falling back to full replay",
) {
let execution = replay_execution(entity, freshness, &replay, t_start);
apply_incremental_events::<T, I, State>(
store,
&execution,
&mut cached_state,
slot.watermark,
)?;
store_projection_value(store, &execution, &cached_state);
Ok(finish_projection(
&mut timings,
t_start,
Some(cached_state),
replay.plan.generation,
finish_observation(
store,
replay.watermark,
ProjectionCacheObservation::Hit,
ProjectionObservedFreshness::Fresh,
),
))
} else {
fallback_to_full_replay::<T, I, State>(
store,
entity,
freshness,
&replay,
t_start,
&mut timings,
)
}
}
ProjectionDispatch::ExternalCacheThenReplay { replay } => {
execute_external_cache_path::<T, I, State>(
store,
replay_execution(entity, freshness, &replay, t_start),
ProjectionCacheObservation::Miss,
&mut timings,
)
}
ProjectionDispatch::DirectReplay { replay } => execute_full_replay::<T, I, State>(
store,
replay_execution(entity, freshness, &replay, t_start),
ProjectionCacheObservation::Bypassed,
ProjectionObservedFreshness::Fresh,
&mut timings,
),
}?;
notify_projection_applied::<T, State>(store, entity, &outcome);
Ok(outcome)
}
fn notify_projection_applied<T, State>(
store: &Store<State>,
entity: &str,
outcome: &ProjectionOutcome<T>,
) where
T: 'static,
{
if let Some(sequence) = outcome.applied_sequence() {
if let Some(point) = store.index.hlc_for_global_sequence(sequence) {
store.projection_registry.notify_applied(
super::registry::ProjectionRegistry::id_for_type::<T>(entity),
point,
);
}
}
}
#[inline(never)]
fn execute_external_cache_path<T, I, State>(
store: &Store<State>,
execution: ReplayExecution<'_>,
mut fallback_cache_status: ProjectionCacheObservation,
timings: &mut Option<&mut ProjectionTimings>,
) -> Result<ProjectionOutcome<T>, StoreError>
where
T: EventSourced + serde::Serialize + serde::de::DeserializeOwned + 'static,
I: ReplayInput<Payload = <T::Input as ProjectionInput>::Payload>,
{
let plan_generation = execution.replay.plan.generation;
let t_ext = std::time::Instant::now();
match store.cache.get(&execution.replay.cache_key) {
Ok(Some((bytes, meta))) => {
record_external_cache_probe_time(timings, t_ext);
let is_fresh = match execution.freshness {
Freshness::Consistent => meta.watermark == execution.replay.watermark,
Freshness::MaybeStale { max_stale_ms } => {
let now_us = store.runtime.cache_now_us();
let age_us = now_us.saturating_sub(meta.cached_at_us).max(0);
age_us < (*max_stale_ms as i64) * 1000
}
};
if !is_fresh && T::supports_incremental_apply() && store.runtime.incremental_projection
{
if let Some(mut cached_state) = decode_cached_state::<T>(
execution.entity,
&bytes,
"incremental projection deser failed, falling back to full replay",
) {
apply_incremental_events::<T, I, State>(
store,
&execution,
&mut cached_state,
meta.watermark,
)?;
store_projection_value(store, &execution, &cached_state);
return Ok(finish_projection(
timings,
execution.started_at,
Some(cached_state),
plan_generation,
finish_observation(
store,
execution.replay.watermark,
ProjectionCacheObservation::Hit,
ProjectionObservedFreshness::Fresh,
),
));
}
}
if is_fresh {
if let Some(value) = decode_cached_state::<T>(
execution.entity,
&bytes,
"cache deserialize failed (falling back to replay)",
) {
let _ = store.index.store_cached_projection(
execution.entity,
execution.replay.type_id,
bytes,
meta.watermark,
);
return Ok(finish_projection(
timings,
execution.started_at,
Some(value),
plan_generation,
finish_observation(
store,
meta.watermark,
ProjectionCacheObservation::Hit,
if meta.watermark == execution.replay.watermark {
ProjectionObservedFreshness::Fresh
} else {
ProjectionObservedFreshness::StaleAllowed
},
),
));
}
}
}
Ok(None) => {
fallback_cache_status = ProjectionCacheObservation::Miss;
record_external_cache_probe_time(timings, t_ext);
}
Err(e) => {
fallback_cache_status = ProjectionCacheObservation::Unavailable {
reason: "cache_get_failed",
};
record_external_cache_probe_time(timings, t_ext);
tracing::warn!("cache get failed (falling back to replay): {e}");
}
}
execute_full_replay::<T, I, State>(
store,
execution,
fallback_cache_status,
ProjectionObservedFreshness::Fresh,
timings,
)
}
#[inline(never)]
fn execute_full_replay<T, I, State>(
store: &Store<State>,
execution: ReplayExecution<'_>,
cache_status: ProjectionCacheObservation,
observed_freshness: ProjectionObservedFreshness,
timings: &mut Option<&mut ProjectionTimings>,
) -> Result<ProjectionOutcome<T>, StoreError>
where
T: EventSourced + serde::Serialize + serde::de::DeserializeOwned + 'static,
I: ReplayInput<Payload = <T::Input as ProjectionInput>::Payload>,
{
let plan_generation = execution.replay.plan.generation;
let t_disk = std::time::Instant::now();
let positions: Vec<&crate::store::DiskPos> = execution
.replay
.plan
.items
.iter()
.map(|item| &item.disk_pos)
.collect();
let events = I::read_batch(&store.reader, &positions)?;
if let Some(t) = timings.as_deref_mut() {
t.disk_read_us = duration_micros(t_disk.elapsed());
t.event_extract_us = 0;
}
let t_fold = std::time::Instant::now();
let result = T::from_events(&events);
if let Some(t) = timings.as_deref_mut() {
t.replay_fold_us = duration_micros(t_fold.elapsed());
}
if result.is_none() && !events.is_empty() {
tracing::debug!(
execution.entity,
event_count = events.len(),
"projection returned None despite non-empty filtered event stream"
);
}
let t_store = std::time::Instant::now();
if let Some(ref value) = result {
store_projection_value(store, &execution, value);
}
if let Some(t) = timings.as_deref_mut() {
t.cache_store_us = duration_micros(t_store.elapsed());
}
Ok(finish_projection(
timings,
execution.started_at,
result,
plan_generation,
finish_observation(
store,
execution.replay.watermark,
cache_status,
observed_freshness,
),
))
}
fn apply_incremental_events<T, I, State>(
store: &Store<State>,
execution: &ReplayExecution<'_>,
cached_state: &mut T,
cached_watermark: u64,
) -> Result<(), StoreError>
where
T: EventSourced + serde::Serialize + serde::de::DeserializeOwned + 'static,
I: ReplayInput<Payload = <T::Input as ProjectionInput>::Payload>,
{
for item in execution
.replay
.plan
.items
.iter()
.filter(|item| item.global_sequence > cached_watermark)
{
let event = I::read_one(&store.reader, &item.disk_pos)?;
cached_state.apply_event(&event);
}
Ok(())
}
fn store_projection_value<T, State>(
store: &Store<State>,
execution: &ReplayExecution<'_>,
value: &T,
) where
T: serde::Serialize,
{
if let Ok(bytes) = serde_json::to_vec(value) {
let meta = super::CacheMeta {
watermark: execution.replay.watermark,
cached_at_us: store.runtime.cache_now_us(),
cached_at_mono_ns: Some(crate::store::config::now_mono_ns()),
process_boot_ns: Some(crate::store::config::process_boot_ns()),
};
if let Err(error) = store.cache.put(&execution.replay.cache_key, &bytes, meta) {
tracing::warn!("cache put failed (non-fatal): {error}");
}
let _ = store.index.store_cached_projection(
execution.entity,
execution.replay.type_id,
bytes,
execution.replay.watermark,
);
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::event::{Event, EventKind};
use crate::store::index::columnar::CachedProjectionSlot;
use crate::store::StoreConfig;
use std::error::Error;
use tempfile::TempDir;
type TestResult = Result<(), Box<dyn Error>>;
#[derive(Default, Debug, serde::Serialize, serde::Deserialize)]
struct Counter;
impl EventSourced for Counter {
type Input = crate::event::JsonValueInput;
fn apply_event(&mut self, event: &Event<serde_json::Value>) {
std::hint::black_box(event.event_kind());
}
fn from_events(events: &[Event<serde_json::Value>]) -> Option<Self> {
(!events.is_empty()).then_some(Self)
}
fn relevant_event_kinds() -> &'static [EventKind] {
static KINDS: [EventKind; 1] = [EventKind::custom(0xF, 1)];
&KINDS
}
}
#[test]
fn projection_replay_plan_matches_legacy_stream_filtering() -> TestResult {
let dir = TempDir::new()?;
let store = Store::open(StoreConfig::new(dir.path()))?;
let coord = crate::coordinate::Coordinate::new("entity:proj", "scope:test")?;
let kept = EventKind::custom(0xF, 1);
let skipped = EventKind::custom(0xF, 2);
for (kind, payload) in [
(kept, serde_json::json!({"n": 1})),
(skipped, serde_json::json!({"n": 2})),
(kept, serde_json::json!({"n": 3})),
] {
store.append(&coord, kind, &payload)?;
}
let Some(plan) = store
.index
.projection_replay_plan("entity:proj", Counter::relevant_event_kinds())
else {
return Err(std::io::Error::other("expected projection replay plan").into());
};
let legacy_entries = store.index.stream("entity:proj");
let legacy_entries: Vec<_> = legacy_entries
.into_iter()
.filter(|entry| Counter::relevant_event_kinds().contains(&entry.kind))
.collect();
let legacy_items: Vec<_> = legacy_entries
.iter()
.map(|entry| (entry.global_sequence, entry.disk_pos))
.collect();
let planned_items: Vec<_> = plan
.items
.iter()
.map(|item| (item.global_sequence, item.disk_pos))
.collect();
let Some(legacy_watermark) = legacy_entries.last().map(|entry| entry.global_sequence)
else {
return Err(std::io::Error::other("expected legacy filtered entries").into());
};
assert_eq!(plan.watermark, legacy_watermark);
assert_eq!(
plan.generation,
store.index.entity_generation("entity:proj").unwrap_or(0)
);
assert_eq!(planned_items, legacy_items);
store.close()?;
Ok(())
}
#[test]
fn projection_timings_cold_path_breakdown() -> TestResult {
let dir = TempDir::new()?;
let store = Store::open(StoreConfig::new(dir.path()))?;
let coord = crate::coordinate::Coordinate::new("entity:timed", "scope:test")?;
let kind = EventKind::custom(0xF, 1);
for i in 0..1_000u32 {
store.append(&coord, kind, &serde_json::json!({"i": i}))?;
}
store.close()?;
let store = Store::open(StoreConfig::new(dir.path()))?;
let mut timings = ProjectionTimings::default();
let result: Option<Counter> =
project_timed(&store, "entity:timed", &Freshness::Consistent, &mut timings)?;
assert!(result.is_some(), "projection must produce a value");
let accounted = timings.plan_build_us
+ timings.cache_key_build_us
+ timings.group_local_lookup_us
+ timings.prefetch_us
+ timings.external_cache_probe_us
+ timings.disk_read_us
+ timings.event_extract_us
+ timings.replay_fold_us
+ timings.cache_store_us;
assert!(timings.total_us > 0, "total must be positive");
assert!(
accounted <= timings.total_us,
"phase timings must not exceed total"
);
store.close()?;
Ok(())
}
#[test]
fn compute_strategy_exhaustive() {
let slot = CachedProjectionSlot {
bytes: vec![],
watermark: 42,
generation: 1,
};
assert_eq!(
compute_strategy(Some(&slot), true, false, false, false),
ProjectionStrategy::GroupLocalHit,
);
assert_eq!(
compute_strategy(Some(&slot), true, true, true, true),
ProjectionStrategy::GroupLocalHit,
);
assert_eq!(
compute_strategy(Some(&slot), false, true, true, false),
ProjectionStrategy::GroupLocalIncremental,
);
assert_eq!(
compute_strategy(Some(&slot), false, true, true, true),
ProjectionStrategy::GroupLocalIncremental,
);
assert_eq!(
compute_strategy(Some(&slot), false, true, false, false),
ProjectionStrategy::ExternalCacheThenReplay,
);
assert_eq!(
compute_strategy(Some(&slot), false, true, false, true),
ProjectionStrategy::DirectReplay,
);
assert_eq!(
compute_strategy(Some(&slot), false, false, false, false),
ProjectionStrategy::ExternalCacheThenReplay,
);
assert_eq!(
compute_strategy(Some(&slot), false, false, true, false),
ProjectionStrategy::ExternalCacheThenReplay,
);
assert_eq!(
compute_strategy(Some(&slot), false, false, false, true),
ProjectionStrategy::DirectReplay,
);
assert_eq!(
compute_strategy(None, false, false, false, true),
ProjectionStrategy::DirectReplay,
);
assert_eq!(
compute_strategy(None, false, false, false, false),
ProjectionStrategy::ExternalCacheThenReplay,
);
assert_eq!(
compute_strategy(None, false, true, true, false),
ProjectionStrategy::ExternalCacheThenReplay,
);
}
}