use crate::store::projection::flow::{
project_outcome, ProjectionCacheObservation, ProjectionObservedFreshness,
};
use crate::store::{Freshness, HlcPoint, Store, StoreError};
use serde::{Deserialize, Serialize};
pub const PROJECTION_RUN_REPORT_SCHEMA_VERSION: u16 = 1;
pub type ProjectionRunHash = [u8; 32];
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub enum ProjectionSourceRef {
Entity {
entity: String,
},
RelevantKind {
category: u8,
type_id: u16,
},
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub enum ProjectionRunReplayMode {
Current,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub enum ProjectionRunRequestedFreshness {
Consistent,
MaybeStale {
max_stale_ms: u64,
},
}
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub enum ProjectionRunFreshnessStatus {
Fresh,
StaleAllowed,
NotApplicable,
Unavailable {
reason: String,
},
}
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub enum ProjectionRunCacheStatus {
Hit,
Miss,
Bypassed,
Unavailable {
reason: String,
},
}
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub enum ProjectionRunCheckpointRef {
NotApplicable,
}
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub enum ProjectionRunOutputHash {
Known(ProjectionRunHash),
NotApplicable,
Unavailable {
reason: String,
},
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub enum ProjectionRunFrontierKind {
Visible,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub struct ProjectionRunInputFrontier {
pub kind: ProjectionRunFrontierKind,
pub wall_ms: u64,
pub global_sequence: u64,
}
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub enum ProjectionRunFinding {
ObservedFreshnessUnavailable,
InputFrontierUnknown,
OutputHashUnavailable,
CacheStatusUnavailable,
PartialVisibilityNotApplicable,
ProjectionFailed,
StaleUsed,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct ProjectionRunReportBody {
pub schema_version: u16,
pub projection_id: String,
pub source_refs: Vec<ProjectionSourceRef>,
pub replay_mode: ProjectionRunReplayMode,
pub requested_freshness: ProjectionRunRequestedFreshness,
pub observed_freshness: ProjectionRunFreshnessStatus,
pub input_frontier: Option<ProjectionRunInputFrontier>,
pub output_hash: ProjectionRunOutputHash,
pub cache_status: ProjectionRunCacheStatus,
pub checkpoint_ref: ProjectionRunCheckpointRef,
pub findings: Vec<ProjectionRunFinding>,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct ProjectionRunEvidenceReport {
pub body: ProjectionRunReportBody,
pub body_hash: ProjectionRunHash,
pub generated_at_unix_ms: Option<u64>,
pub batpak_version: Option<String>,
pub diagnostics: Vec<String>,
}
#[derive(Debug)]
#[non_exhaustive]
pub enum ProjectionRunReportError {
BodyEncoding {
message: String,
},
ProjectionFailed {
source: StoreError,
report: Box<ProjectionRunEvidenceReport>,
},
}
impl std::fmt::Display for ProjectionRunReportError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::BodyEncoding { message } => {
write!(f, "projection run report body encoding failed: {message}")
}
Self::ProjectionFailed { source, .. } => {
write!(f, "projection run failed: {source}")
}
}
}
}
impl std::error::Error for ProjectionRunReportError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
Self::BodyEncoding { .. } => None,
Self::ProjectionFailed { source, .. } => Some(source),
}
}
}
impl<State> Store<State> {
pub fn project_run_evidence<T>(
&self,
entity: &str,
freshness: &Freshness,
) -> Result<(Option<T>, ProjectionRunEvidenceReport), ProjectionRunReportError>
where
T: crate::event::EventSourced + serde::Serialize + serde::de::DeserializeOwned + 'static,
T::Input: crate::store::projection::flow::ReplayInput,
{
let projection_id =
crate::store::projection::registry::ProjectionRegistry::id_for_type::<T>(entity);
let mut source_refs = Vec::new();
source_refs.push(ProjectionSourceRef::Entity {
entity: entity.to_owned(),
});
for kind in T::relevant_event_kinds() {
source_refs.push(ProjectionSourceRef::RelevantKind {
category: kind.category(),
type_id: kind.type_id(),
});
}
source_refs.sort();
let requested_freshness = map_requested_freshness(freshness);
let replay_mode = ProjectionRunReplayMode::Current;
let run_result = project_outcome::<T, State>(self, entity, freshness);
match run_result {
Ok(outcome) => {
let observed_freshness = map_observed_freshness(outcome.observed_freshness());
let cache_status = map_cache_status(outcome.cache_status());
let input_frontier = outcome.input_frontier().map(map_input_frontier);
let state = outcome.into_state();
let output_hash = output_hash_for_state(state.as_ref());
let checkpoint_ref = ProjectionRunCheckpointRef::NotApplicable;
let mut findings = Vec::new();
append_common_findings(
&mut findings,
&observed_freshness,
input_frontier,
&output_hash,
&cache_status,
);
crate::evidence::sort_findings(&mut findings);
let report = build_report(
ProjectionRunReportBody {
schema_version: PROJECTION_RUN_REPORT_SCHEMA_VERSION,
projection_id,
source_refs,
replay_mode,
requested_freshness,
observed_freshness,
input_frontier,
output_hash,
cache_status,
checkpoint_ref,
findings,
},
Vec::new(),
)?;
Ok((state, report))
}
Err(error) => {
let observed_freshness = ProjectionRunFreshnessStatus::Unavailable {
reason: "projection_failed".to_owned(),
};
let cache_status = ProjectionRunCacheStatus::Unavailable {
reason: "projection_failed".to_owned(),
};
let input_frontier = None;
let output_hash = ProjectionRunOutputHash::Unavailable {
reason: "projection_failed".to_owned(),
};
let checkpoint_ref = ProjectionRunCheckpointRef::NotApplicable;
let mut findings = vec![ProjectionRunFinding::ProjectionFailed];
append_common_findings(
&mut findings,
&observed_freshness,
input_frontier,
&output_hash,
&cache_status,
);
crate::evidence::sort_findings(&mut findings);
let report = build_report(
ProjectionRunReportBody {
schema_version: PROJECTION_RUN_REPORT_SCHEMA_VERSION,
projection_id,
source_refs,
replay_mode,
requested_freshness,
observed_freshness,
input_frontier,
output_hash,
cache_status,
checkpoint_ref,
findings,
},
vec![error.to_string()],
)?;
Err(ProjectionRunReportError::ProjectionFailed {
source: error,
report: Box::new(report),
})
}
}
}
}
fn map_input_frontier(frontier: HlcPoint) -> ProjectionRunInputFrontier {
ProjectionRunInputFrontier {
kind: ProjectionRunFrontierKind::Visible,
wall_ms: frontier.wall_ms,
global_sequence: frontier.global_sequence,
}
}
fn map_requested_freshness(freshness: &Freshness) -> ProjectionRunRequestedFreshness {
match freshness {
Freshness::Consistent => ProjectionRunRequestedFreshness::Consistent,
Freshness::MaybeStale { max_stale_ms } => ProjectionRunRequestedFreshness::MaybeStale {
max_stale_ms: *max_stale_ms,
},
}
}
fn map_observed_freshness(value: ProjectionObservedFreshness) -> ProjectionRunFreshnessStatus {
match value {
ProjectionObservedFreshness::Fresh => ProjectionRunFreshnessStatus::Fresh,
ProjectionObservedFreshness::StaleAllowed => ProjectionRunFreshnessStatus::StaleAllowed,
ProjectionObservedFreshness::NotApplicable => ProjectionRunFreshnessStatus::NotApplicable,
}
}
fn map_cache_status(value: ProjectionCacheObservation) -> ProjectionRunCacheStatus {
match value {
ProjectionCacheObservation::Hit => ProjectionRunCacheStatus::Hit,
ProjectionCacheObservation::Miss => ProjectionRunCacheStatus::Miss,
ProjectionCacheObservation::Bypassed => ProjectionRunCacheStatus::Bypassed,
ProjectionCacheObservation::Unavailable { reason } => {
ProjectionRunCacheStatus::Unavailable {
reason: reason.to_owned(),
}
}
}
}
fn output_hash_for_state<T: serde::Serialize>(state: Option<&T>) -> ProjectionRunOutputHash {
let Some(value) = state else {
return ProjectionRunOutputHash::NotApplicable;
};
match crate::canonical::to_bytes(value) {
Ok(bytes) => ProjectionRunOutputHash::Known(crate::evidence::content_hash(&bytes)),
Err(error) => ProjectionRunOutputHash::Unavailable {
reason: error.to_string(),
},
}
}
fn append_common_findings(
findings: &mut Vec<ProjectionRunFinding>,
observed_freshness: &ProjectionRunFreshnessStatus,
input_frontier: Option<ProjectionRunInputFrontier>,
output_hash: &ProjectionRunOutputHash,
cache_status: &ProjectionRunCacheStatus,
) {
if matches!(
observed_freshness,
ProjectionRunFreshnessStatus::Unavailable { .. }
) {
findings.push(ProjectionRunFinding::ObservedFreshnessUnavailable);
}
if observed_freshness == &ProjectionRunFreshnessStatus::StaleAllowed {
findings.push(ProjectionRunFinding::StaleUsed);
}
if input_frontier.is_none()
&& observed_freshness != &ProjectionRunFreshnessStatus::NotApplicable
{
findings.push(ProjectionRunFinding::InputFrontierUnknown);
}
if matches!(output_hash, ProjectionRunOutputHash::Unavailable { .. }) {
findings.push(ProjectionRunFinding::OutputHashUnavailable);
}
if matches!(cache_status, ProjectionRunCacheStatus::Unavailable { .. }) {
findings.push(ProjectionRunFinding::CacheStatusUnavailable);
}
findings.push(ProjectionRunFinding::PartialVisibilityNotApplicable);
}
fn build_report(
body: ProjectionRunReportBody,
diagnostics: Vec<String>,
) -> Result<ProjectionRunEvidenceReport, ProjectionRunReportError> {
let body_hash = report_body_hash(&body)?;
Ok(ProjectionRunEvidenceReport {
body,
body_hash,
generated_at_unix_ms: None,
batpak_version: None,
diagnostics,
})
}
fn report_body_hash(
body: &ProjectionRunReportBody,
) -> Result<ProjectionRunHash, ProjectionRunReportError> {
crate::evidence::report_body_hash(body, |message| ProjectionRunReportError::BodyEncoding {
message,
})
}