batpak 0.8.0

Event sourcing with causal graphs and caller-defined gates. Sync API, no async runtime.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
//! Deterministic evidence report for a single projection run.

use crate::store::projection::flow::{
    project_outcome, ProjectionCacheObservation, ProjectionObservedFreshness,
};
use crate::store::{Freshness, HlcPoint, Store, StoreError};
use serde::{Deserialize, Serialize};

/// Report-body schema version for projection run evidence.
pub const PROJECTION_RUN_REPORT_SCHEMA_VERSION: u16 = 1;

/// Fixed-width hash used by projection run evidence.
pub type ProjectionRunHash = [u8; 32];

/// Source reference included in a projection run report.
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub enum ProjectionSourceRef {
    /// Entity-scoped source reference.
    Entity {
        /// Entity identifier.
        entity: String,
    },
    /// Event kind admitted by the projection fold.
    RelevantKind {
        /// Event kind category.
        category: u8,
        /// Event kind type identifier.
        type_id: u16,
    },
}

/// Replay boundary mode for the run.
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub enum ProjectionRunReplayMode {
    /// Current visible replay boundary.
    Current,
}

/// Requested freshness policy.
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub enum ProjectionRunRequestedFreshness {
    /// Force current replay.
    Consistent,
    /// Allow stale reads within the provided age bound.
    MaybeStale {
        /// Maximum stale age in milliseconds.
        max_stale_ms: u64,
    },
}

/// Observed freshness status for the run result.
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub enum ProjectionRunFreshnessStatus {
    /// Output reflects current replay boundary.
    Fresh,
    /// Output came from stale-allowed cache semantics.
    StaleAllowed,
    /// Freshness does not apply for this run.
    NotApplicable,
    /// Freshness could not be acquired because the run failed.
    Unavailable {
        /// Availability reason.
        reason: String,
    },
}

/// Cache status observed during the run.
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub enum ProjectionRunCacheStatus {
    /// Cache hit path served or seeded the run.
    Hit,
    /// Cache miss path required replay.
    Miss,
    /// Cache path was bypassed.
    Bypassed,
    /// Cache observation was unavailable with deterministic reason.
    Unavailable {
        /// Availability reason.
        reason: String,
    },
}

/// Optional checkpoint reference availability for this run path.
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub enum ProjectionRunCheckpointRef {
    /// Checkpoint does not apply to this run path.
    NotApplicable,
}

/// Output hash availability for the run.
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub enum ProjectionRunOutputHash {
    /// Canonical output hash is known.
    Known(ProjectionRunHash),
    /// Output hash does not apply (for example empty projection state).
    NotApplicable,
    /// Output hash is unavailable with deterministic reason.
    Unavailable {
        /// Availability reason.
        reason: String,
    },
}

/// Kind of projection input boundary recorded by this report.
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub enum ProjectionRunFrontierKind {
    /// Replay/cache watermark selected from the visible index. Durable and
    /// process-wide applied watermarks remain available through
    /// [`crate::store::FrontierView`]; they are not the projection input
    /// boundary consumed by this run.
    Visible,
}

/// Input frontier boundary observed by the run.
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub struct ProjectionRunInputFrontier {
    /// Frontier kind used as run boundary.
    pub kind: ProjectionRunFrontierKind,
    /// HLC wall milliseconds at boundary.
    pub wall_ms: u64,
    /// Global sequence at boundary.
    pub global_sequence: u64,
}

/// Structural findings emitted by projection run evidence.
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub enum ProjectionRunFinding {
    /// Observed freshness was unavailable.
    ObservedFreshnessUnavailable,
    /// Input frontier could not be determined.
    InputFrontierUnknown,
    /// Output hash was unavailable.
    OutputHashUnavailable,
    /// Cache status was unavailable.
    CacheStatusUnavailable,
    /// Partial visibility does not apply to this run path.
    PartialVisibilityNotApplicable,
    /// Projection run failed.
    ProjectionFailed,
    /// Run served stale-allowed output.
    StaleUsed,
}

/// Deterministic report body for one projection run.
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct ProjectionRunReportBody {
    /// Report-body schema version.
    pub schema_version: u16,
    /// Stable projection identifier.
    pub projection_id: String,
    /// Projection source references.
    pub source_refs: Vec<ProjectionSourceRef>,
    /// Replay mode used for this run.
    pub replay_mode: ProjectionRunReplayMode,
    /// Requested freshness policy.
    pub requested_freshness: ProjectionRunRequestedFreshness,
    /// Observed freshness status.
    pub observed_freshness: ProjectionRunFreshnessStatus,
    /// Input frontier boundary if known.
    pub input_frontier: Option<ProjectionRunInputFrontier>,
    /// Output hash availability.
    pub output_hash: ProjectionRunOutputHash,
    /// Cache status.
    pub cache_status: ProjectionRunCacheStatus,
    /// Checkpoint reference availability.
    pub checkpoint_ref: ProjectionRunCheckpointRef,
    /// Deterministic structural findings.
    pub findings: Vec<ProjectionRunFinding>,
}

/// Projection run evidence report envelope.
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct ProjectionRunEvidenceReport {
    /// Deterministic report body.
    pub body: ProjectionRunReportBody,
    /// Canonical hash of `body`.
    pub body_hash: ProjectionRunHash,
    /// Optional generation timestamp metadata outside deterministic identity.
    pub generated_at_unix_ms: Option<u64>,
    /// Optional producer version metadata outside deterministic identity.
    pub batpak_version: Option<String>,
    /// Optional diagnostics outside deterministic identity.
    pub diagnostics: Vec<String>,
}

/// Error returned when projection run evidence generation fails.
#[derive(Debug)]
#[non_exhaustive]
pub enum ProjectionRunReportError {
    /// Canonical report-body encoding failed.
    BodyEncoding {
        /// Human-readable encoding error.
        message: String,
    },
    /// Projection execution failed; includes a deterministic report.
    ProjectionFailed {
        /// Underlying store error.
        source: StoreError,
        /// Deterministic report produced for the failed run.
        report: Box<ProjectionRunEvidenceReport>,
    },
}

impl std::fmt::Display for ProjectionRunReportError {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        match self {
            Self::BodyEncoding { message } => {
                write!(f, "projection run report body encoding failed: {message}")
            }
            Self::ProjectionFailed { source, .. } => {
                write!(f, "projection run failed: {source}")
            }
        }
    }
}

impl std::error::Error for ProjectionRunReportError {
    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
        match self {
            Self::BodyEncoding { .. } => None,
            Self::ProjectionFailed { source, .. } => Some(source),
        }
    }
}

impl<State> Store<State> {
    /// Run a projection and return both materialized state and deterministic
    /// projection run evidence.
    ///
    /// # Errors
    /// Returns [`ProjectionRunReportError::BodyEncoding`] when deterministic
    /// report-body encoding fails, or [`ProjectionRunReportError::ProjectionFailed`]
    /// when the projection run fails.
    pub fn project_run_evidence<T>(
        &self,
        entity: &str,
        freshness: &Freshness,
    ) -> Result<(Option<T>, ProjectionRunEvidenceReport), ProjectionRunReportError>
    where
        T: crate::event::EventSourced + serde::Serialize + serde::de::DeserializeOwned + 'static,
        T::Input: crate::store::projection::flow::ReplayInput,
    {
        let projection_id =
            crate::store::projection::registry::ProjectionRegistry::id_for_type::<T>(entity);
        let mut source_refs = Vec::new();
        source_refs.push(ProjectionSourceRef::Entity {
            entity: entity.to_owned(),
        });
        for kind in T::relevant_event_kinds() {
            source_refs.push(ProjectionSourceRef::RelevantKind {
                category: kind.category(),
                type_id: kind.type_id(),
            });
        }
        source_refs.sort();

        let requested_freshness = map_requested_freshness(freshness);
        let replay_mode = ProjectionRunReplayMode::Current;

        let run_result = project_outcome::<T, State>(self, entity, freshness);
        match run_result {
            Ok(outcome) => {
                let observed_freshness = map_observed_freshness(outcome.observed_freshness());
                let cache_status = map_cache_status(outcome.cache_status());
                let input_frontier = outcome.input_frontier().map(map_input_frontier);
                let state = outcome.into_state();
                let output_hash = output_hash_for_state(state.as_ref());
                let checkpoint_ref = ProjectionRunCheckpointRef::NotApplicable;

                let mut findings = Vec::new();
                append_common_findings(
                    &mut findings,
                    &observed_freshness,
                    input_frontier,
                    &output_hash,
                    &cache_status,
                );
                crate::evidence::sort_findings(&mut findings);

                let report = build_report(
                    ProjectionRunReportBody {
                        schema_version: PROJECTION_RUN_REPORT_SCHEMA_VERSION,
                        projection_id,
                        source_refs,
                        replay_mode,
                        requested_freshness,
                        observed_freshness,
                        input_frontier,
                        output_hash,
                        cache_status,
                        checkpoint_ref,
                        findings,
                    },
                    Vec::new(),
                )?;
                Ok((state, report))
            }
            Err(error) => {
                let observed_freshness = ProjectionRunFreshnessStatus::Unavailable {
                    reason: "projection_failed".to_owned(),
                };
                let cache_status = ProjectionRunCacheStatus::Unavailable {
                    reason: "projection_failed".to_owned(),
                };
                let input_frontier = None;
                let output_hash = ProjectionRunOutputHash::Unavailable {
                    reason: "projection_failed".to_owned(),
                };
                let checkpoint_ref = ProjectionRunCheckpointRef::NotApplicable;
                let mut findings = vec![ProjectionRunFinding::ProjectionFailed];
                append_common_findings(
                    &mut findings,
                    &observed_freshness,
                    input_frontier,
                    &output_hash,
                    &cache_status,
                );
                crate::evidence::sort_findings(&mut findings);

                let report = build_report(
                    ProjectionRunReportBody {
                        schema_version: PROJECTION_RUN_REPORT_SCHEMA_VERSION,
                        projection_id,
                        source_refs,
                        replay_mode,
                        requested_freshness,
                        observed_freshness,
                        input_frontier,
                        output_hash,
                        cache_status,
                        checkpoint_ref,
                        findings,
                    },
                    vec![error.to_string()],
                )?;
                Err(ProjectionRunReportError::ProjectionFailed {
                    source: error,
                    report: Box::new(report),
                })
            }
        }
    }
}

fn map_input_frontier(frontier: HlcPoint) -> ProjectionRunInputFrontier {
    ProjectionRunInputFrontier {
        kind: ProjectionRunFrontierKind::Visible,
        wall_ms: frontier.wall_ms,
        global_sequence: frontier.global_sequence,
    }
}

fn map_requested_freshness(freshness: &Freshness) -> ProjectionRunRequestedFreshness {
    match freshness {
        Freshness::Consistent => ProjectionRunRequestedFreshness::Consistent,
        Freshness::MaybeStale { max_stale_ms } => ProjectionRunRequestedFreshness::MaybeStale {
            max_stale_ms: *max_stale_ms,
        },
    }
}

fn map_observed_freshness(value: ProjectionObservedFreshness) -> ProjectionRunFreshnessStatus {
    match value {
        ProjectionObservedFreshness::Fresh => ProjectionRunFreshnessStatus::Fresh,
        ProjectionObservedFreshness::StaleAllowed => ProjectionRunFreshnessStatus::StaleAllowed,
        ProjectionObservedFreshness::NotApplicable => ProjectionRunFreshnessStatus::NotApplicable,
    }
}

fn map_cache_status(value: ProjectionCacheObservation) -> ProjectionRunCacheStatus {
    match value {
        ProjectionCacheObservation::Hit => ProjectionRunCacheStatus::Hit,
        ProjectionCacheObservation::Miss => ProjectionRunCacheStatus::Miss,
        ProjectionCacheObservation::Bypassed => ProjectionRunCacheStatus::Bypassed,
        ProjectionCacheObservation::Unavailable { reason } => {
            ProjectionRunCacheStatus::Unavailable {
                reason: reason.to_owned(),
            }
        }
    }
}

fn output_hash_for_state<T: serde::Serialize>(state: Option<&T>) -> ProjectionRunOutputHash {
    let Some(value) = state else {
        return ProjectionRunOutputHash::NotApplicable;
    };
    match crate::canonical::to_bytes(value) {
        Ok(bytes) => ProjectionRunOutputHash::Known(crate::evidence::content_hash(&bytes)),
        Err(error) => ProjectionRunOutputHash::Unavailable {
            reason: error.to_string(),
        },
    }
}

fn append_common_findings(
    findings: &mut Vec<ProjectionRunFinding>,
    observed_freshness: &ProjectionRunFreshnessStatus,
    input_frontier: Option<ProjectionRunInputFrontier>,
    output_hash: &ProjectionRunOutputHash,
    cache_status: &ProjectionRunCacheStatus,
) {
    if matches!(
        observed_freshness,
        ProjectionRunFreshnessStatus::Unavailable { .. }
    ) {
        findings.push(ProjectionRunFinding::ObservedFreshnessUnavailable);
    }
    if observed_freshness == &ProjectionRunFreshnessStatus::StaleAllowed {
        findings.push(ProjectionRunFinding::StaleUsed);
    }
    if input_frontier.is_none()
        && observed_freshness != &ProjectionRunFreshnessStatus::NotApplicable
    {
        findings.push(ProjectionRunFinding::InputFrontierUnknown);
    }
    if matches!(output_hash, ProjectionRunOutputHash::Unavailable { .. }) {
        findings.push(ProjectionRunFinding::OutputHashUnavailable);
    }
    if matches!(cache_status, ProjectionRunCacheStatus::Unavailable { .. }) {
        findings.push(ProjectionRunFinding::CacheStatusUnavailable);
    }
    // Projection runs return an in-memory folded value; partial state is not
    // exposed from this path.
    findings.push(ProjectionRunFinding::PartialVisibilityNotApplicable);
}

fn build_report(
    body: ProjectionRunReportBody,
    diagnostics: Vec<String>,
) -> Result<ProjectionRunEvidenceReport, ProjectionRunReportError> {
    let body_hash = report_body_hash(&body)?;
    Ok(ProjectionRunEvidenceReport {
        body,
        body_hash,
        generated_at_unix_ms: None,
        batpak_version: None,
        diagnostics,
    })
}

fn report_body_hash(
    body: &ProjectionRunReportBody,
) -> Result<ProjectionRunHash, ProjectionRunReportError> {
    crate::evidence::report_body_hash(body, |message| ProjectionRunReportError::BodyEncoding {
        message,
    })
}