kimberlite_types/lib.rs
1//! # kmb-types: Core types for `Kimberlite`
2//!
3//! This crate contains shared types used across the `Kimberlite` system:
4//! - Entity IDs ([`TenantId`], [`StreamId`], [`Offset`], [`GroupId`])
5//! - Cryptographic types ([`struct@Hash`])
6//! - Temporal types ([`Timestamp`])
7//! - Log record types ([`RecordKind`], [`RecordHeader`], [`Checkpoint`], [`CheckpointPolicy`])
8//! - Projection tracking ([`AppliedIndex`])
9//! - Idempotency ([`IdempotencyId`])
10//! - Recovery tracking ([`Generation`], [`RecoveryRecord`], [`RecoveryReason`])
11//! - Data classification ([`DataClass`])
12//! - Placement rules ([`Placement`], [`Region`])
13//! - Stream metadata ([`StreamMetadata`])
14//! - Audit actions ([`AuditAction`])
15//! - Event persistence ([`EventPersister`], [`PersistError`])
16//!
17//! This crate opts in to strict PRESSURECRAFT clippy lints. Test-only
18//! `unwrap()` / `panic!` are allowed via `cfg_attr(test, ...)`.
19
20#![warn(
21 clippy::unwrap_used,
22 clippy::panic,
23 clippy::todo,
24 clippy::unimplemented,
25 clippy::too_many_lines
26)]
27#![cfg_attr(
28 test,
29 allow(
30 clippy::unwrap_used,
31 clippy::panic,
32 clippy::todo,
33 clippy::unimplemented,
34 clippy::too_many_lines
35 )
36)]
37
38use std::{
39 fmt::{Debug, Display},
40 ops::{Add, AddAssign, Range, Sub},
41 time::{SystemTime, UNIX_EPOCH},
42};
43
44use bytes::Bytes;
45use serde::{Deserialize, Serialize};
46
47// ============================================================================
48// Entity IDs - All Copy (cheap 8-byte values)
49// ============================================================================
50
51/// Unique identifier for a tenant (organization/customer).
52#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
53pub struct TenantId(u64);
54
55impl TenantId {
56 pub fn new(id: u64) -> Self {
57 Self(id)
58 }
59
60 /// Extracts tenant ID from stream ID (upper 32 bits).
61 ///
62 /// **Bit Layout**:
63 /// - Upper 32 bits: `tenant_id` (supports 4.3B tenants)
64 /// - Lower 32 bits: `local_stream_id` (4.3B streams per tenant)
65 ///
66 /// # Examples
67 ///
68 /// ```
69 /// # use kimberlite_types::{TenantId, StreamId};
70 /// let stream_id = StreamId::from_tenant_and_local(TenantId::from(5), 1);
71 /// assert_eq!(TenantId::from_stream_id(stream_id), TenantId::from(5));
72 /// ```
73 pub fn from_stream_id(stream_id: StreamId) -> Self {
74 TenantId::from(u64::from(stream_id) >> 32)
75 }
76}
77
78impl Display for TenantId {
79 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
80 write!(f, "{}", self.0)
81 }
82}
83
84impl From<u64> for TenantId {
85 fn from(value: u64) -> Self {
86 Self(value)
87 }
88}
89
90impl From<TenantId> for u64 {
91 fn from(id: TenantId) -> Self {
92 id.0
93 }
94}
95
96/// Maximum tenant id that fits inside a `StreamId`'s upper 32 bits.
97///
98/// `StreamId` encodes `(tenant_id << 32) | local_id` into a single
99/// `u64`. The encoding loses information for `tenant_id > u32::MAX` —
100/// two such tenants whose lower 32 bits collide produce the same
101/// `StreamId`, which silently corrupts every per-tenant filter
102/// downstream (kernel state, audit log, projection store, replication
103/// addressing).
104///
105/// Callers building `StreamId`s from caller-supplied `TenantId` values
106/// (SDK boundary, wire deserialisation, fuzz inputs, test harnesses)
107/// MUST validate against this ceiling. Use
108/// [`StreamId::try_from_tenant_and_local`] for the fallible path; the
109/// infallible [`StreamId::from_tenant_and_local`] is for callers that
110/// can structurally prove the tenant id fits.
111pub const MAX_TENANT_ID_FOR_STREAM_ID: u64 = u32::MAX as u64;
112
113/// Error encoding a [`StreamId`] from `(tenant_id, local_id)`.
114///
115/// Surfaces when caller-supplied bits don't fit the bit-packed `u64`
116/// layout. The single variant today is overflow on `tenant_id`; future
117/// encoding changes (e.g. a wider `StreamId`) would extend this enum.
118#[derive(Debug, Clone, Copy, PartialEq, Eq, thiserror::Error)]
119pub enum StreamIdEncodingError {
120 /// The provided `tenant_id` does not fit in 32 bits and would be
121 /// silently truncated by the bit-packed `StreamId` layout.
122 #[error(
123 "tenant_id {tenant_id} exceeds the StreamId encoding limit of {max}; the encoding silently \
124 truncates which corrupts per-tenant filtering. Keep tenant ids ≤ u32::MAX."
125 )]
126 TenantIdTooLarge {
127 /// The offending tenant id.
128 tenant_id: u64,
129 /// The encoding limit ([`MAX_TENANT_ID_FOR_STREAM_ID`]).
130 max: u64,
131 },
132}
133
134/// Unique identifier for a stream within the system.
135#[derive(
136 Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize, Default,
137)]
138pub struct StreamId(u64);
139
140impl StreamId {
141 pub fn new(id: u64) -> Self {
142 Self(id)
143 }
144
145 /// Creates a stream id from tenant id and local stream number.
146 ///
147 /// **Bit Layout**:
148 /// - Upper 32 bits: `tenant_id` (must fit in u32 — see
149 /// [`MAX_TENANT_ID_FOR_STREAM_ID`])
150 /// - Lower 32 bits: `local_stream_id` (4.3B streams per tenant)
151 ///
152 /// Use this when `tenant_id` is structurally known to fit in
153 /// 32 bits (compile-time constants in tests, internally-allocated
154 /// ids whose upper bound the kernel controls). Prefer
155 /// [`Self::try_from_tenant_and_local`] for any caller that
156 /// receives `tenant_id` from outside (SDK, wire, fuzz, test
157 /// harness).
158 ///
159 /// # Panics
160 ///
161 /// Panics if `tenant_id > u32::MAX`. The bit-packed layout would
162 /// otherwise silently truncate the upper bits and corrupt every
163 /// per-tenant filter downstream. This panic is loud-by-design so
164 /// the encoding contract surfaces at the call site rather than as
165 /// mysterious cross-tenant data corruption later.
166 ///
167 /// # Examples
168 ///
169 /// ```
170 /// # use kimberlite_types::{TenantId, StreamId};
171 /// let stream_id = StreamId::from_tenant_and_local(TenantId::from(5), 1);
172 /// assert_eq!(u64::from(stream_id), 21474836481); // (5 << 32) | 1
173 /// assert_eq!(TenantId::from_stream_id(stream_id), TenantId::from(5));
174 /// ```
175 #[track_caller]
176 pub fn from_tenant_and_local(tenant_id: TenantId, local_id: u32) -> Self {
177 Self::try_from_tenant_and_local(tenant_id, local_id).expect(
178 "StreamId::from_tenant_and_local: tenant_id exceeds u32::MAX — use try_from_tenant_and_local for fallible construction",
179 )
180 }
181
182 /// Fallible counterpart to [`Self::from_tenant_and_local`].
183 ///
184 /// Returns [`StreamIdEncodingError::TenantIdTooLarge`] if the
185 /// `tenant_id` does not fit in 32 bits — the bit-packed `StreamId`
186 /// layout would silently truncate the upper bits and corrupt
187 /// per-tenant filtering.
188 ///
189 /// This is the correct entry point for any caller that receives
190 /// `tenant_id` from a non-kernel source (SDK adapter, wire
191 /// deserialisation, fuzz harness, test factory).
192 ///
193 /// # Errors
194 ///
195 /// Returns [`StreamIdEncodingError::TenantIdTooLarge`] when
196 /// `u64::from(tenant_id) > u32::MAX`.
197 ///
198 /// # Examples
199 ///
200 /// ```
201 /// # use kimberlite_types::{TenantId, StreamId, StreamIdEncodingError};
202 /// // Valid: tenant id fits in u32.
203 /// let ok = StreamId::try_from_tenant_and_local(TenantId::from(5), 1)
204 /// .expect("5 fits in u32");
205 /// assert_eq!(ok.local_id(), 1);
206 ///
207 /// // Invalid: tenant id overflows u32.
208 /// let err = StreamId::try_from_tenant_and_local(
209 /// TenantId::from(u64::from(u32::MAX) + 1),
210 /// 0,
211 /// );
212 /// assert!(matches!(err, Err(StreamIdEncodingError::TenantIdTooLarge { .. })));
213 /// ```
214 pub fn try_from_tenant_and_local(
215 tenant_id: TenantId,
216 local_id: u32,
217 ) -> Result<Self, StreamIdEncodingError> {
218 let tenant_raw = u64::from(tenant_id);
219 if tenant_raw > MAX_TENANT_ID_FOR_STREAM_ID {
220 return Err(StreamIdEncodingError::TenantIdTooLarge {
221 tenant_id: tenant_raw,
222 max: MAX_TENANT_ID_FOR_STREAM_ID,
223 });
224 }
225 let tenant_bits = tenant_raw << 32;
226 let local_bits = u64::from(local_id);
227 Ok(StreamId::from(tenant_bits | local_bits))
228 }
229
230 /// Extracts local stream ID (lower 32 bits).
231 ///
232 /// # Examples
233 ///
234 /// ```
235 /// # use kimberlite_types::{TenantId, StreamId};
236 /// let stream_id = StreamId::from_tenant_and_local(TenantId::from(5), 42);
237 /// assert_eq!(stream_id.local_id(), 42);
238 /// ```
239 pub fn local_id(self) -> u32 {
240 (u64::from(self) & 0xFFFF_FFFF) as u32
241 }
242
243 /// Extracts the tenant id this stream belongs to (upper 32 bits).
244 ///
245 /// Convenience over `TenantId::from_stream_id(stream_id)` for the
246 /// common `id.tenant_id() == other` call sites.
247 ///
248 /// # Examples
249 ///
250 /// ```
251 /// # use kimberlite_types::{TenantId, StreamId};
252 /// let stream_id = StreamId::from_tenant_and_local(TenantId::from(5), 1);
253 /// assert_eq!(stream_id.tenant_id(), TenantId::from(5));
254 /// ```
255 pub fn tenant_id(self) -> TenantId {
256 TenantId::from_stream_id(self)
257 }
258}
259
260impl Add for StreamId {
261 type Output = StreamId;
262
263 /// Saturating add. Fuzzers can drive kernel state to `u64::MAX`; the
264 /// old unchecked `+` panicked on overflow under debug assertions.
265 /// Saturation keeps the type total without producing a smaller id.
266 fn add(self, rhs: Self) -> Self::Output {
267 StreamId::new(self.0.saturating_add(rhs.0))
268 }
269}
270
271impl Display for StreamId {
272 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
273 write!(f, "{}", self.0)
274 }
275}
276
277impl From<u64> for StreamId {
278 fn from(value: u64) -> Self {
279 Self(value)
280 }
281}
282
283impl From<StreamId> for u64 {
284 fn from(id: StreamId) -> Self {
285 id.0
286 }
287}
288
289/// Position of an event within a stream.
290///
291/// Offsets are zero-indexed and sequential. The first event in a stream
292/// has offset 0, the second has offset 1, and so on.
293///
294/// Uses `u64` internally — offsets are never negative by definition.
295#[derive(
296 Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize, Default,
297)]
298pub struct Offset(u64);
299
300impl Offset {
301 pub const ZERO: Offset = Offset(0);
302
303 pub fn new(offset: u64) -> Self {
304 Self(offset)
305 }
306
307 /// Returns the offset as a `u64`.
308 pub fn as_u64(&self) -> u64 {
309 self.0
310 }
311
312 /// Returns the offset as a `usize` for indexing.
313 ///
314 /// # Panics
315 ///
316 /// Panics on 32-bit platforms if the offset exceeds `usize::MAX`.
317 pub fn as_usize(&self) -> usize {
318 self.0 as usize
319 }
320}
321
322impl Display for Offset {
323 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
324 write!(f, "{}", self.0)
325 }
326}
327
328impl Add for Offset {
329 type Output = Self;
330 fn add(self, rhs: Self) -> Self::Output {
331 Self(self.0 + rhs.0)
332 }
333}
334
335impl AddAssign for Offset {
336 fn add_assign(&mut self, rhs: Self) {
337 self.0 += rhs.0;
338 }
339}
340
341impl Sub for Offset {
342 type Output = Self;
343 fn sub(self, rhs: Self) -> Self::Output {
344 Self(self.0 - rhs.0)
345 }
346}
347
348impl From<u64> for Offset {
349 fn from(value: u64) -> Self {
350 Self(value)
351 }
352}
353
354impl From<Offset> for u64 {
355 fn from(offset: Offset) -> Self {
356 offset.0
357 }
358}
359
360/// Unique identifier for a replication group.
361#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
362pub struct GroupId(u64);
363
364impl Display for GroupId {
365 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
366 write!(f, "{}", self.0)
367 }
368}
369
370impl GroupId {
371 pub fn new(id: u64) -> Self {
372 Self(id)
373 }
374}
375
376impl From<u64> for GroupId {
377 fn from(value: u64) -> Self {
378 Self(value)
379 }
380}
381
382impl From<GroupId> for u64 {
383 fn from(id: GroupId) -> Self {
384 id.0
385 }
386}
387
388// ============================================================================
389// Cryptographic Hash - Copy (fixed 32-byte value)
390// ============================================================================
391
392/// Length of cryptographic hashes in bytes (SHA-256 / BLAKE3).
393pub const HASH_LENGTH: usize = 32;
394
395/// A 32-byte cryptographic hash.
396///
397/// This is a foundation type used across `Kimberlite` for:
398/// - Hash chain links (`prev_hash` in records)
399/// - Verification anchors (in checkpoints and projections)
400/// - Content addressing
401///
402/// The specific algorithm (SHA-256 for compliance, BLAKE3 for internal)
403/// is determined by the context where the hash is computed. This type
404/// only stores the resulting 32-byte digest.
405#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
406pub struct Hash([u8; HASH_LENGTH]);
407
408impl Hash {
409 /// The genesis hash (all zeros) used as the `prev_hash` for the first record.
410 pub const GENESIS: Hash = Hash([0u8; HASH_LENGTH]);
411
412 /// Creates a hash from raw bytes.
413 pub fn from_bytes(bytes: [u8; HASH_LENGTH]) -> Self {
414 Self(bytes)
415 }
416
417 /// Returns the hash as a byte slice.
418 pub fn as_bytes(&self) -> &[u8; HASH_LENGTH] {
419 &self.0
420 }
421
422 /// Returns true if this is the genesis hash (all zeros).
423 pub fn is_genesis(&self) -> bool {
424 self.0 == [0u8; HASH_LENGTH]
425 }
426}
427
428impl Debug for Hash {
429 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
430 // Show first 8 bytes in hex for debugging without exposing full hash
431 write!(
432 f,
433 "Hash({:02x}{:02x}{:02x}{:02x}{:02x}{:02x}{:02x}{:02x}...)",
434 self.0[0], self.0[1], self.0[2], self.0[3], self.0[4], self.0[5], self.0[6], self.0[7]
435 )
436 }
437}
438
439impl Display for Hash {
440 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
441 // Full hex representation for display
442 for byte in &self.0 {
443 write!(f, "{byte:02x}")?;
444 }
445 Ok(())
446 }
447}
448
449impl Default for Hash {
450 fn default() -> Self {
451 Self::GENESIS
452 }
453}
454
455impl From<[u8; HASH_LENGTH]> for Hash {
456 fn from(bytes: [u8; HASH_LENGTH]) -> Self {
457 Self(bytes)
458 }
459}
460
461impl From<Hash> for [u8; HASH_LENGTH] {
462 fn from(hash: Hash) -> Self {
463 hash.0
464 }
465}
466
467impl AsRef<[u8]> for Hash {
468 fn as_ref(&self) -> &[u8] {
469 &self.0
470 }
471}
472
473// ============================================================================
474// Timestamp - Copy (8-byte value with monotonic guarantee)
475// ============================================================================
476
477/// Wall-clock timestamp with monotonic guarantee within the system.
478///
479/// Compliance requires real-world time for audit trails; monotonicity
480/// prevents ordering issues when system clocks are adjusted.
481///
482/// Stored as nanoseconds since Unix epoch (1970-01-01 00:00:00 UTC).
483/// This gives us ~584 years of range, well beyond any practical use.
484#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
485pub struct Timestamp(u64);
486
487impl Timestamp {
488 /// The Unix epoch (1970-01-01 00:00:00 UTC).
489 pub const EPOCH: Timestamp = Timestamp(0);
490
491 /// Creates a timestamp from nanoseconds since Unix epoch.
492 pub fn from_nanos(nanos: u64) -> Self {
493 Self(nanos)
494 }
495
496 /// Returns the timestamp as nanoseconds since Unix epoch.
497 pub fn as_nanos(&self) -> u64 {
498 self.0
499 }
500
501 /// Returns the timestamp as seconds since Unix epoch (truncates nanoseconds).
502 pub fn as_secs(&self) -> u64 {
503 self.0 / 1_000_000_000
504 }
505
506 /// Creates a timestamp for the current time.
507 ///
508 /// # Panics
509 ///
510 /// Panics if the system clock is before Unix epoch (should never happen).
511 pub fn now() -> Self {
512 let duration = SystemTime::now()
513 .duration_since(UNIX_EPOCH)
514 .expect("system clock is before Unix epoch");
515 Self(duration.as_nanos() as u64)
516 }
517
518 /// Creates a timestamp ensuring monotonicity: `max(now, last + 1ns)`.
519 ///
520 /// This guarantees that each timestamp is strictly greater than the previous,
521 /// even if the system clock moves backwards or two events occur in the same
522 /// nanosecond.
523 ///
524 /// # Arguments
525 ///
526 /// * `last` - The previous timestamp, if any. Pass `None` for the first timestamp.
527 pub fn now_monotonic(last: Option<Timestamp>) -> Self {
528 let now = Self::now();
529 match last {
530 Some(prev) => {
531 // Ensure strictly increasing: at least prev + 1 nanosecond
532 if now.0 <= prev.0 {
533 Timestamp(prev.0.saturating_add(1))
534 } else {
535 now
536 }
537 }
538 None => now,
539 }
540 }
541}
542
543impl Display for Timestamp {
544 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
545 // Display as seconds.nanoseconds for readability
546 let secs = self.0 / 1_000_000_000;
547 let nanos = self.0 % 1_000_000_000;
548 write!(f, "{secs}.{nanos:09}")
549 }
550}
551
552impl Default for Timestamp {
553 fn default() -> Self {
554 Self::EPOCH
555 }
556}
557
558impl From<u64> for Timestamp {
559 fn from(nanos: u64) -> Self {
560 Self(nanos)
561 }
562}
563
564impl From<Timestamp> for u64 {
565 fn from(ts: Timestamp) -> Self {
566 ts.0
567 }
568}
569
570// ============================================================================
571// Record Types - Copy (small enum and struct)
572// ============================================================================
573
574/// The kind of record stored in the log.
575///
576/// This enum distinguishes between different record types to enable
577/// efficient processing and verification.
578#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default, Serialize, Deserialize)]
579pub enum RecordKind {
580 /// Normal application data record.
581 #[default]
582 Data,
583 /// Periodic verification checkpoint (contains cumulative hash).
584 Checkpoint,
585 /// Logical deletion marker (data is not physically deleted).
586 Tombstone,
587}
588
589impl RecordKind {
590 /// Returns the single-byte discriminant for serialization.
591 pub fn as_byte(&self) -> u8 {
592 match self {
593 RecordKind::Data => 0,
594 RecordKind::Checkpoint => 1,
595 RecordKind::Tombstone => 2,
596 }
597 }
598
599 /// Creates a `RecordKind` from its byte discriminant.
600 ///
601 /// # Errors
602 ///
603 /// Returns `None` if the byte is not a valid discriminant.
604 pub fn from_byte(byte: u8) -> Option<Self> {
605 match byte {
606 0 => Some(RecordKind::Data),
607 1 => Some(RecordKind::Checkpoint),
608 2 => Some(RecordKind::Tombstone),
609 _ => None,
610 }
611 }
612}
613
614/// Metadata header for every log record.
615///
616/// This structure contains all metadata needed to verify and process
617/// a log record without reading its payload.
618#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
619pub struct RecordHeader {
620 /// Position in the log (0-indexed).
621 pub offset: Offset,
622 /// SHA-256 link to the previous record's hash (genesis for first record).
623 pub prev_hash: Hash,
624 /// When the record was committed (monotonic wall-clock).
625 pub timestamp: Timestamp,
626 /// Size of the payload in bytes.
627 pub payload_len: u32,
628 /// Type of record (Data, Checkpoint, Tombstone).
629 pub record_kind: RecordKind,
630}
631
632impl RecordHeader {
633 /// Creates a new record header.
634 ///
635 /// # Arguments
636 ///
637 /// * `offset` - Position in the log
638 /// * `prev_hash` - Hash of the previous record (or GENESIS for first)
639 /// * `timestamp` - When this record was committed
640 /// * `payload_len` - Size of the payload in bytes
641 /// * `record_kind` - Type of record
642 pub fn new(
643 offset: Offset,
644 prev_hash: Hash,
645 timestamp: Timestamp,
646 payload_len: u32,
647 record_kind: RecordKind,
648 ) -> Self {
649 Self {
650 offset,
651 prev_hash,
652 timestamp,
653 payload_len,
654 record_kind,
655 }
656 }
657
658 /// Returns true if this is the first record in the log.
659 pub fn is_genesis(&self) -> bool {
660 self.offset == Offset::ZERO && self.prev_hash.is_genesis()
661 }
662}
663
664// ============================================================================
665// Projection Tracking - Copy (small struct for projections)
666// ============================================================================
667
668/// Tracks which log entry a projection row was derived from.
669///
670/// Projections embed this in each row to enable:
671/// - Point-in-time queries (find rows at a specific offset)
672/// - Verification without walking the hash chain (hash provides direct check)
673/// - Audit trails (know exactly which event created/updated a row)
674#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
675pub struct AppliedIndex {
676 /// The log offset this row was derived from.
677 pub offset: Offset,
678 /// The hash at this offset for direct verification.
679 pub hash: Hash,
680}
681
682impl AppliedIndex {
683 /// Creates a new applied index.
684 pub fn new(offset: Offset, hash: Hash) -> Self {
685 Self { offset, hash }
686 }
687
688 /// Creates the initial applied index (before any records).
689 pub fn genesis() -> Self {
690 Self {
691 offset: Offset::ZERO,
692 hash: Hash::GENESIS,
693 }
694 }
695}
696
697impl Default for AppliedIndex {
698 fn default() -> Self {
699 Self::genesis()
700 }
701}
702
703// ============================================================================
704// Checkpoints - Copy (verification anchors in the log)
705// ============================================================================
706
707/// A periodic verification checkpoint stored in the log.
708///
709/// Checkpoints are records IN the log (not separate files), which means:
710/// - They are part of the hash chain (tamper-evident)
711/// - Checkpoint history is immutable
712/// - Single source of truth
713///
714/// Checkpoints enable efficient verified reads by providing trusted
715/// anchor points, reducing verification from O(n) to O(k) where k is
716/// the distance to the nearest checkpoint.
717#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
718pub struct Checkpoint {
719 /// Log position of this checkpoint record.
720 pub offset: Offset,
721 /// Cumulative hash of the chain at this point.
722 pub chain_hash: Hash,
723 /// Total number of records from genesis to this checkpoint.
724 pub record_count: u64,
725 /// When this checkpoint was created.
726 pub created_at: Timestamp,
727}
728
729impl Checkpoint {
730 /// Creates a new checkpoint.
731 ///
732 /// # Preconditions
733 ///
734 /// - `record_count` should equal `offset.as_u64() + 1` (0-indexed offset)
735 pub fn new(offset: Offset, chain_hash: Hash, record_count: u64, created_at: Timestamp) -> Self {
736 debug_assert_eq!(
737 record_count,
738 offset.as_u64() + 1,
739 "record_count should equal offset + 1"
740 );
741 Self {
742 offset,
743 chain_hash,
744 record_count,
745 created_at,
746 }
747 }
748}
749
750/// Policy for when to create checkpoints.
751///
752/// Checkpoints bound the worst-case verification cost. The default policy
753/// creates a checkpoint every 1000 records and on graceful shutdown.
754#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
755pub struct CheckpointPolicy {
756 /// Create a checkpoint every N records. Set to 0 to disable.
757 pub every_n_records: u64,
758 /// Create a checkpoint on graceful shutdown.
759 pub on_shutdown: bool,
760 /// If true, disable automatic checkpoints (only explicit calls).
761 pub explicit_only: bool,
762}
763
764impl CheckpointPolicy {
765 /// Creates a policy that checkpoints every N records.
766 pub fn every(n: u64) -> Self {
767 Self {
768 every_n_records: n,
769 on_shutdown: true,
770 explicit_only: false,
771 }
772 }
773
774 /// Creates a policy that only creates explicit checkpoints.
775 pub fn explicit_only() -> Self {
776 Self {
777 every_n_records: 0,
778 on_shutdown: false,
779 explicit_only: true,
780 }
781 }
782
783 /// Returns true if a checkpoint should be created at this offset.
784 pub fn should_checkpoint(&self, offset: Offset) -> bool {
785 if self.explicit_only {
786 return false;
787 }
788 if self.every_n_records == 0 {
789 return false;
790 }
791 // Checkpoint at offsets that are multiples of every_n_records
792 // (offset 999 for every_n_records=1000, etc.)
793 (offset.as_u64() + 1) % self.every_n_records == 0
794 }
795}
796
797impl Default for CheckpointPolicy {
798 /// Default policy: checkpoint every 1000 records, on shutdown.
799 fn default() -> Self {
800 Self {
801 every_n_records: 1000,
802 on_shutdown: true,
803 explicit_only: false,
804 }
805 }
806}
807
808// ============================================================================
809// Idempotency - Copy (16-byte identifier for duplicate prevention)
810// ============================================================================
811
812/// Length of idempotency IDs in bytes.
813pub const IDEMPOTENCY_ID_LENGTH: usize = 16;
814
815/// Unique identifier for duplicate transaction prevention.
816///
817/// Clients generate an `IdempotencyId` before their first attempt at a
818/// transaction. If the transaction needs to be retried (e.g., network
819/// timeout), the client reuses the same ID. The server tracks committed
820/// IDs to return the same result for duplicate requests.
821///
822/// Inspired by `FoundationDB`'s idempotency key design.
823///
824/// # FCIS Pattern
825///
826/// This type follows the Functional Core / Imperative Shell pattern:
827/// - `from_bytes()`: Pure restoration from storage
828/// - `from_random_bytes()`: Pure construction from bytes (`pub(crate)`)
829/// - `generate()`: Impure shell that invokes CSPRNG
830#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
831pub struct IdempotencyId([u8; IDEMPOTENCY_ID_LENGTH]);
832
833impl IdempotencyId {
834 // ========================================================================
835 // Functional Core (pure, testable)
836 // ========================================================================
837
838 /// Pure construction from random bytes.
839 ///
840 /// Restricted to `pub(crate)` to prevent misuse with weak random sources.
841 /// External callers should use `generate()` or `from_bytes()`.
842 pub(crate) fn from_random_bytes(bytes: [u8; IDEMPOTENCY_ID_LENGTH]) -> Self {
843 debug_assert!(
844 bytes.iter().any(|&b| b != 0),
845 "idempotency ID bytes are all zeros"
846 );
847 Self(bytes)
848 }
849
850 /// Restoration from stored bytes (pure).
851 ///
852 /// Use this when loading an `IdempotencyId` from storage or wire protocol.
853 pub fn from_bytes(bytes: [u8; IDEMPOTENCY_ID_LENGTH]) -> Self {
854 Self(bytes)
855 }
856
857 /// Returns the ID as a byte slice.
858 pub fn as_bytes(&self) -> &[u8; IDEMPOTENCY_ID_LENGTH] {
859 &self.0
860 }
861
862 // ========================================================================
863 // Imperative Shell (IO boundary)
864 // ========================================================================
865
866 /// Generates a new random idempotency ID using the OS CSPRNG.
867 ///
868 /// # Panics
869 ///
870 /// Panics if the OS CSPRNG fails, which indicates a catastrophic
871 /// system error (e.g., no entropy source available).
872 pub fn generate() -> Self {
873 let mut bytes = [0u8; IDEMPOTENCY_ID_LENGTH];
874 getrandom::fill(&mut bytes).expect("CSPRNG failure is catastrophic");
875 Self::from_random_bytes(bytes)
876 }
877}
878
879impl Debug for IdempotencyId {
880 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
881 // Show full hex for debugging (IDs are meant to be logged)
882 write!(f, "IdempotencyId(")?;
883 for byte in &self.0 {
884 write!(f, "{byte:02x}")?;
885 }
886 write!(f, ")")
887 }
888}
889
890impl Display for IdempotencyId {
891 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
892 // Full hex representation
893 for byte in &self.0 {
894 write!(f, "{byte:02x}")?;
895 }
896 Ok(())
897 }
898}
899
900impl From<[u8; IDEMPOTENCY_ID_LENGTH]> for IdempotencyId {
901 fn from(bytes: [u8; IDEMPOTENCY_ID_LENGTH]) -> Self {
902 Self::from_bytes(bytes)
903 }
904}
905
906impl From<IdempotencyId> for [u8; IDEMPOTENCY_ID_LENGTH] {
907 fn from(id: IdempotencyId) -> Self {
908 id.0
909 }
910}
911
912// ============================================================================
913// Recovery Tracking - Copy (generation-based recovery for compliance)
914// ============================================================================
915
916/// Monotonically increasing recovery generation.
917///
918/// Each recovery event creates a new generation. This provides natural
919/// audit checkpoints and explicit tracking of system recovery events.
920///
921/// Inspired by `FoundationDB`'s generation-based recovery tracking.
922#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
923pub struct Generation(u64);
924
925impl Generation {
926 /// The initial generation (before any recovery).
927 pub const INITIAL: Generation = Generation(0);
928
929 /// Creates a generation from a raw value.
930 pub fn new(value: u64) -> Self {
931 Self(value)
932 }
933
934 /// Returns the generation as a u64.
935 pub fn as_u64(&self) -> u64 {
936 self.0
937 }
938
939 /// Returns the next generation (incremented by 1).
940 pub fn next(&self) -> Self {
941 Generation(self.0.saturating_add(1))
942 }
943}
944
945impl Display for Generation {
946 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
947 write!(f, "gen:{}", self.0)
948 }
949}
950
951impl Default for Generation {
952 fn default() -> Self {
953 Self::INITIAL
954 }
955}
956
957impl From<u64> for Generation {
958 fn from(value: u64) -> Self {
959 Self(value)
960 }
961}
962
963impl From<Generation> for u64 {
964 fn from(generation: Generation) -> Self {
965 generation.0
966 }
967}
968
969/// Reason why a recovery was triggered.
970///
971/// This is recorded in the recovery log for compliance auditing.
972#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
973pub enum RecoveryReason {
974 /// Normal node restart (graceful or crash recovery).
975 NodeRestart,
976 /// Lost quorum and had to recover from remaining replicas.
977 QuorumLoss,
978 /// Detected data corruption requiring recovery.
979 CorruptionDetected,
980 /// Operator manually triggered recovery.
981 ManualIntervention,
982}
983
984impl Display for RecoveryReason {
985 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
986 match self {
987 RecoveryReason::NodeRestart => write!(f, "node_restart"),
988 RecoveryReason::QuorumLoss => write!(f, "quorum_loss"),
989 RecoveryReason::CorruptionDetected => write!(f, "corruption_detected"),
990 RecoveryReason::ManualIntervention => write!(f, "manual_intervention"),
991 }
992 }
993}
994
995/// Records a recovery event with explicit tracking of any data loss.
996///
997/// Critical for compliance: auditors can see exactly what happened during
998/// recovery, including any mutations that were discarded.
999///
1000/// Inspired by `FoundationDB`'s 9-phase recovery with explicit data loss tracking.
1001#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1002pub struct RecoveryRecord {
1003 /// New generation after recovery.
1004 pub generation: Generation,
1005 /// Previous generation before recovery.
1006 pub previous_generation: Generation,
1007 /// Last known committed offset before recovery.
1008 pub known_committed: Offset,
1009 /// Offset we recovered to.
1010 pub recovery_point: Offset,
1011 /// Range of discarded prepares (if any) - EXPLICIT LOSS TRACKING.
1012 ///
1013 /// If `Some`, this range of offsets contained prepared but uncommitted
1014 /// mutations that were discarded during recovery. This is the critical
1015 /// compliance field: it explicitly documents any data loss.
1016 pub discarded_range: Option<Range<Offset>>,
1017 /// When recovery occurred.
1018 pub timestamp: Timestamp,
1019 /// Why recovery was triggered.
1020 pub reason: RecoveryReason,
1021}
1022
1023impl RecoveryRecord {
1024 /// Creates a new recovery record.
1025 ///
1026 /// # Arguments
1027 ///
1028 /// * `generation` - The new generation after recovery
1029 /// * `previous_generation` - The generation before recovery
1030 /// * `known_committed` - Last known committed offset
1031 /// * `recovery_point` - The offset we recovered to
1032 /// * `discarded_range` - Range of discarded uncommitted prepares, if any
1033 /// * `timestamp` - When recovery occurred
1034 /// * `reason` - Why recovery was triggered
1035 ///
1036 /// # Preconditions
1037 ///
1038 /// - `generation` must be greater than `previous_generation`
1039 /// - `recovery_point` must be <= `known_committed`
1040 pub fn new(
1041 generation: Generation,
1042 previous_generation: Generation,
1043 known_committed: Offset,
1044 recovery_point: Offset,
1045 discarded_range: Option<Range<Offset>>,
1046 timestamp: Timestamp,
1047 reason: RecoveryReason,
1048 ) -> Self {
1049 debug_assert!(
1050 generation > previous_generation,
1051 "new generation must be greater than previous"
1052 );
1053 debug_assert!(
1054 recovery_point <= known_committed,
1055 "recovery point cannot exceed known committed"
1056 );
1057
1058 Self {
1059 generation,
1060 previous_generation,
1061 known_committed,
1062 recovery_point,
1063 discarded_range,
1064 timestamp,
1065 reason,
1066 }
1067 }
1068
1069 /// Returns true if any data was lost during this recovery.
1070 pub fn had_data_loss(&self) -> bool {
1071 self.discarded_range.is_some()
1072 }
1073
1074 /// Returns the number of discarded records, if any.
1075 pub fn discarded_count(&self) -> u64 {
1076 self.discarded_range
1077 .as_ref()
1078 .map_or(0, |r| r.end.as_u64().saturating_sub(r.start.as_u64()))
1079 }
1080}
1081
1082// ============================================================================
1083// Stream Name - Clone (contains String, but rarely cloned)
1084// ============================================================================
1085
1086/// Human-readable name for a stream.
1087#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
1088pub struct StreamName(String);
1089
1090impl StreamName {
1091 pub fn new(name: impl Into<String>) -> Self {
1092 Self(name.into())
1093 }
1094
1095 pub fn as_str(&self) -> &str {
1096 &self.0
1097 }
1098}
1099
1100impl Display for StreamName {
1101 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1102 write!(f, "{}", self.0)
1103 }
1104}
1105
1106impl From<String> for StreamName {
1107 fn from(name: String) -> Self {
1108 Self(name)
1109 }
1110}
1111
1112impl From<&str> for StreamName {
1113 fn from(name: &str) -> Self {
1114 Self(name.to_string())
1115 }
1116}
1117
1118impl From<StreamName> for String {
1119 fn from(value: StreamName) -> Self {
1120 value.0
1121 }
1122}
1123
1124// ============================================================================
1125// Data Classification - Copy (simple enum, no heap data)
1126// ============================================================================
1127
1128/// Classification of data for compliance purposes.
1129///
1130/// Supports multi-framework compliance: HIPAA, GDPR, PCI DSS, SOX, ISO 27001, `FedRAMP`.
1131///
1132/// # Classification Levels (8 total)
1133///
1134/// **Healthcare (HIPAA):**
1135/// - `PHI`: Protected Health Information
1136/// - `Deidentified`: De-identified per HIPAA Safe Harbor
1137///
1138/// **Privacy (GDPR):**
1139/// - `PII`: Personally Identifiable Information (GDPR Article 4)
1140/// - `Sensitive`: Special category data (GDPR Article 9) - race, health, biometrics, etc.
1141///
1142/// **Financial (PCI DSS, SOX):**
1143/// - `PCI`: Payment Card Industry data (card numbers, CVV, etc.)
1144/// - `Financial`: Financial records subject to SOX regulations
1145///
1146/// **General:**
1147/// - `Confidential`: Internal business data, trade secrets
1148/// - `Public`: Publicly available data with no restrictions
1149///
1150/// # Framework Mappings
1151///
1152/// | Level | HIPAA | GDPR | PCI DSS | SOX | ISO 27001 | FedRAMP |
1153/// |-------|-------|------|---------|-----|-----------|---------|
1154/// | PHI | ✓ | ✓ (PII) | — | — | ✓ | ✓ |
1155/// | Deidentified | ✓ | — | — | — | — | — |
1156/// | PII | — | ✓ | — | — | ✓ | ✓ |
1157/// | Sensitive | — | ✓ (Art 9) | — | — | ✓ | ✓ |
1158/// | PCI | — | ✓ (PII) | ✓ | — | ✓ | ✓ |
1159/// | Financial | — | — | — | ✓ | ✓ | ✓ |
1160/// | Confidential | — | — | — | — | ✓ | ✓ |
1161/// | Public | — | — | — | — | — | — |
1162///
1163#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
1164pub enum DataClass {
1165 // ========================================================================
1166 // Healthcare (HIPAA)
1167 // ========================================================================
1168 /// Protected Health Information - subject to HIPAA restrictions.
1169 ///
1170 /// **Examples:** Medical records, diagnoses, lab results, prescriptions
1171 ///
1172 /// **Compliance:** HIPAA Privacy Rule, HIPAA Security Rule
1173 ///
1174 /// **Retention:** Minimum 6 years after last treatment (HIPAA § 164.530)
1175 PHI,
1176
1177 /// Data that has been de-identified per HIPAA Safe Harbor.
1178 ///
1179 /// **Requirements:** All 18 HIPAA identifiers removed (§ 164.514(b)(2))
1180 ///
1181 /// **Examples:** Anonymized patient datasets, aggregate statistics
1182 ///
1183 /// **Compliance:** HIPAA Safe Harbor Method
1184 Deidentified,
1185
1186 // ========================================================================
1187 // Privacy (GDPR)
1188 // ========================================================================
1189 /// Personally Identifiable Information (GDPR Article 4).
1190 ///
1191 /// **Examples:** Names, email addresses, IP addresses, location data
1192 ///
1193 /// **Compliance:** GDPR Articles 5-11 (lawfulness, consent, purpose limitation)
1194 ///
1195 /// **Rights:** Access, rectification, erasure, portability (GDPR Articles 15-20)
1196 PII,
1197
1198 /// Special category data (GDPR Article 9).
1199 ///
1200 /// **Examples:** Racial/ethnic origin, political opinions, religious beliefs,
1201 /// trade union membership, genetic data, biometric data, health data, sex life
1202 ///
1203 /// **Compliance:** GDPR Article 9 (explicit consent required, stricter controls)
1204 ///
1205 /// **Restrictions:** Processing prohibited unless explicit exception applies
1206 Sensitive,
1207
1208 // ========================================================================
1209 // Financial (PCI DSS, SOX)
1210 // ========================================================================
1211 /// Payment Card Industry data (PCI DSS).
1212 ///
1213 /// **Examples:** Credit card numbers, CVV codes, cardholder data
1214 ///
1215 /// **Compliance:** PCI DSS Requirements 1-12
1216 ///
1217 /// **Storage:** Never store CVV/CVV2/PIN after authorization
1218 PCI,
1219
1220 /// Financial records subject to SOX regulations.
1221 ///
1222 /// **Examples:** General ledger, financial statements, audit trails
1223 ///
1224 /// **Compliance:** Sarbanes-Oxley Act § 302, § 404
1225 ///
1226 /// **Retention:** 7 years minimum (SOX § 802)
1227 Financial,
1228
1229 // ========================================================================
1230 // General
1231 // ========================================================================
1232 /// Internal business data, trade secrets.
1233 ///
1234 /// **Examples:** Proprietary algorithms, business strategies, internal communications
1235 ///
1236 /// **Compliance:** ISO 27001 Annex A.8 (Asset Management)
1237 ///
1238 /// **Access:** Restricted to authorized personnel
1239 Confidential,
1240
1241 /// Publicly available data with no restrictions.
1242 ///
1243 /// **Examples:** Public website content, press releases, published research
1244 ///
1245 /// **Compliance:** No special restrictions
1246 ///
1247 /// **Access:** Unrestricted
1248 Public,
1249}
1250
1251impl Default for DataClass {
1252 /// Healthcare-first default: any stream created without an explicit
1253 /// classification is treated as PHI. This is the opinionated stance
1254 /// of a healthcare database — if you don't know what's in a stream,
1255 /// assume the worst for compliance purposes (HIPAA-restricted, 6-year
1256 /// retention, audit-required, encryption-required).
1257 ///
1258 /// Callers that genuinely hold non-PHI data must declare it explicitly.
1259 fn default() -> Self {
1260 Self::PHI
1261 }
1262}
1263
1264// ============================================================================
1265// Placement - Clone (Region::Custom contains String)
1266// ============================================================================
1267
1268/// Placement policy for a stream.
1269#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
1270pub enum Placement {
1271 /// Data must remain within the specified region.
1272 Region(Region),
1273 /// Data can be replicated globally across all regions.
1274 Global,
1275}
1276
1277/// Geographic region for data placement.
1278#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
1279pub enum Region {
1280 /// US East (N. Virginia) - us-east-1
1281 USEast1,
1282 /// Asia Pacific (Sydney) - ap-southeast-2
1283 APSoutheast2,
1284 /// Custom region identifier
1285 Custom(String),
1286}
1287
1288impl Region {
1289 pub fn custom(name: impl Into<String>) -> Self {
1290 Self::Custom(name.into())
1291 }
1292}
1293
1294impl Display for Region {
1295 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1296 match self {
1297 Region::USEast1 => write!(f, "us-east-1"),
1298 Region::APSoutheast2 => write!(f, "ap-southeast-2"),
1299 Region::Custom(custom) => write!(f, "{custom}"),
1300 }
1301 }
1302}
1303
1304// ============================================================================
1305// Stream Metadata - Clone (created once per stream, cloned rarely)
1306// ============================================================================
1307
1308/// Metadata describing a stream's configuration and current state.
1309#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
1310pub struct StreamMetadata {
1311 pub stream_id: StreamId,
1312 pub stream_name: StreamName,
1313 pub data_class: DataClass,
1314 pub placement: Placement,
1315 pub current_offset: Offset,
1316}
1317
1318impl StreamMetadata {
1319 /// Creates new stream metadata with offset initialized to 0.
1320 pub fn new(
1321 stream_id: StreamId,
1322 stream_name: StreamName,
1323 data_class: DataClass,
1324 placement: Placement,
1325 ) -> Self {
1326 Self {
1327 stream_id,
1328 stream_name,
1329 data_class,
1330 placement,
1331 current_offset: Offset::default(),
1332 }
1333 }
1334}
1335
1336// ============================================================================
1337// Batch Payload - NOT Clone (contains Vec<Bytes>, move only)
1338// ============================================================================
1339
1340/// A batch of events to append to a stream.
1341#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
1342pub struct BatchPayload {
1343 pub stream_id: StreamId,
1344 /// The events to append (zero-copy Bytes).
1345 pub events: Vec<Bytes>,
1346 /// Expected current offset for optimistic concurrency.
1347 pub expected_offset: Offset,
1348}
1349
1350impl BatchPayload {
1351 pub fn new(stream_id: StreamId, events: Vec<Bytes>, expected_offset: Offset) -> Self {
1352 Self {
1353 stream_id,
1354 events,
1355 expected_offset,
1356 }
1357 }
1358}
1359
1360// ============================================================================
1361// Audit Actions - Clone (for flexibility in logging)
1362// ============================================================================
1363
1364/// Actions recorded in the audit log.
1365#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1366pub enum AuditAction {
1367 /// A new stream was created.
1368 StreamCreated {
1369 stream_id: StreamId,
1370 stream_name: StreamName,
1371 data_class: DataClass,
1372 placement: Placement,
1373 },
1374 /// Events were appended to a stream.
1375 EventsAppended {
1376 stream_id: StreamId,
1377 count: u32,
1378 from_offset: Offset,
1379 },
1380 /// **AUDIT-2026-04 H-5** — a tenant was sealed for
1381 /// forensic / audit / legal-hold operations. No mutating
1382 /// commands (DDL, DML, AppendBatch, CreateStream) will be
1383 /// accepted against the tenant until an `Unseal` is applied.
1384 TenantSealed {
1385 tenant_id: TenantId,
1386 reason: SealReason,
1387 },
1388 /// **AUDIT-2026-04 H-5** — a previously-sealed tenant was
1389 /// released. Mutating commands are accepted again. Audit trail
1390 /// retains the seal/unseal pair as structured evidence.
1391 TenantUnsealed { tenant_id: TenantId },
1392
1393 // ------------------------------------------------------------------------
1394 // v0.6.0 Tier 2 #7 — column-level masking policy lifecycle
1395 // ------------------------------------------------------------------------
1396 //
1397 // Every CRUD transition on a masking policy emits an audit record
1398 // so HIPAA § 164.312(b) audit reviews can reconstruct "who changed
1399 // which column's mask when". The attachment records intentionally
1400 // carry the table / column identifiers as strings so auditors don't
1401 // need a table-id lookup table to read the log.
1402 /// A new masking policy was declared via `CREATE MASKING POLICY`.
1403 MaskingPolicyCreated {
1404 tenant_id: TenantId,
1405 policy_name: String,
1406 },
1407 /// A masking policy was dropped via `DROP MASKING POLICY`.
1408 MaskingPolicyDropped {
1409 tenant_id: TenantId,
1410 policy_name: String,
1411 },
1412 /// A column was attached to a masking policy via
1413 /// `ALTER TABLE … SET MASKING POLICY`.
1414 MaskingPolicyAttached {
1415 tenant_id: TenantId,
1416 table_id: u64,
1417 column_name: String,
1418 policy_name: String,
1419 },
1420 /// A column's masking policy was detached via
1421 /// `ALTER TABLE … DROP MASKING POLICY`.
1422 MaskingPolicyDetached {
1423 tenant_id: TenantId,
1424 table_id: u64,
1425 column_name: String,
1426 },
1427 /// **v0.6.0 Tier 1 #3 — UPSERT** — an `INSERT ... ON CONFLICT`
1428 /// statement was applied atomically. The resolution discriminator
1429 /// lets compliance auditors distinguish insert-vs-update-vs-noop
1430 /// without re-reading the row payload.
1431 UpsertApplied {
1432 stream_id: StreamId,
1433 /// Which branch fired: new row inserted, existing row updated,
1434 /// or the conflict triggered `DO NOTHING`.
1435 resolution: UpsertResolution,
1436 /// Offset the single resulting event (if any) lives at. For
1437 /// `NoOp` the offset equals the pre-upsert head — no event is
1438 /// emitted, but we record the observed stream position so
1439 /// audit replays can reconstruct the exact moment the upsert
1440 /// was applied.
1441 from_offset: Offset,
1442 },
1443}
1444
1445/// **v0.6.0 Tier 1 #3** — the atomic resolution of a single UPSERT.
1446///
1447/// Every `UpsertApplied` event carries exactly one of these. The
1448/// discriminator is a structural part of the audit payload so
1449/// downstream readers never have to infer intent from a dual-write
1450/// sequence — a property the notebar helper's UPDATE+INSERT pair
1451/// explicitly lacked.
1452#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
1453pub enum UpsertResolution {
1454 /// No prior row at the conflict key — the upsert appended a new row.
1455 Inserted,
1456 /// A prior row existed — `ON CONFLICT DO UPDATE SET ...` fired and
1457 /// the row was rewritten with the merged column set.
1458 Updated,
1459 /// A prior row existed and the clause was `DO NOTHING`. No storage
1460 /// mutation, no row-level audit beyond this `UpsertApplied` record.
1461 NoOp,
1462}
1463
1464/// **AUDIT-2026-04 H-5** — why a tenant was sealed.
1465///
1466/// This is an enum rather than a free-form string so downstream
1467/// compliance reports can aggregate on specific operational
1468/// categories — healthcare auditors expect to see discrete reasons
1469/// (e.g. `ForensicHold`) not human-written prose.
1470#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
1471pub enum SealReason {
1472 /// Forensic preservation during an incident investigation.
1473 ForensicHold,
1474 /// Audit is in progress; writes must freeze until audit completes.
1475 AuditInProgress,
1476 /// Breach investigation — preserve state for forensic analysis.
1477 BreachInvestigation,
1478 /// Legal hold (discovery / litigation-retention).
1479 LegalHold,
1480}
1481
1482// ============================================================================
1483// Event Persistence - Trait for durable event log writes
1484// ============================================================================
1485
1486/// Abstraction for persisting events to the durable event log.
1487///
1488/// This trait is the bridge between the projection layer and the
1489/// `Kimberlite` replication system. Implementations must block until
1490/// persistence is confirmed.
1491///
1492/// # Healthcare Compliance
1493///
1494/// This is the critical path for HIPAA compliance. The implementation must:
1495/// - **Block until VSR consensus** completes (quorum durability)
1496/// - **Return `Err`** if consensus fails (triggers rollback)
1497/// - **Never return `Ok`** unless events are durably stored
1498///
1499/// # Implementation Notes
1500///
1501/// The implementor (typically `Runtime`) must handle the sync→async bridge:
1502///
1503/// ```ignore
1504/// impl EventPersister for RuntimeHandle {
1505/// fn persist_blocking(&self, stream_id: StreamId, events: Vec<Bytes>) -> Result<Offset, PersistError> {
1506/// // Bridge sync callback to async runtime
1507/// tokio::task::block_in_place(|| {
1508/// tokio::runtime::Handle::current().block_on(async {
1509/// self.inner.append(stream_id, events).await
1510/// })
1511/// })
1512/// .map_err(|e| {
1513/// tracing::error!(error = %e, "VSR persistence failed");
1514/// PersistError::ConsensusFailed
1515/// })
1516/// }
1517/// }
1518/// ```
1519///
1520/// # Why `Vec<Bytes>` instead of typed events?
1521///
1522/// Events are serialized before reaching this trait. This keeps `kmb-types`
1523/// decoupled from domain-specific event schemas.
1524pub trait EventPersister: Send + Sync + Debug {
1525 /// Persist a batch of serialized events to the durable event log.
1526 ///
1527 /// This method **blocks** until VSR consensus confirms the events are
1528 /// durably stored on a quorum of nodes.
1529 ///
1530 /// # Arguments
1531 ///
1532 /// * `stream_id` - The stream to append events to
1533 /// * `events` - Serialized events
1534 ///
1535 /// # Returns
1536 ///
1537 /// * `Ok(offset)` - Events persisted, returns the new stream offset
1538 /// * `Err(PersistError)` - Persistence failed, caller should rollback
1539 ///
1540 /// # Errors
1541 ///
1542 /// * [`PersistError::ConsensusFailed`] - VSR quorum unavailable after retries
1543 /// * [`PersistError::StorageError`] - Disk I/O or serialization failure
1544 /// * [`PersistError::ShuttingDown`] - System is terminating
1545 fn persist_blocking(
1546 &self,
1547 stream_id: StreamId,
1548 events: Vec<Bytes>,
1549 ) -> Result<Offset, PersistError>;
1550}
1551
1552/// Error returned when event persistence fails.
1553///
1554/// The hook uses this to decide whether to rollback the transaction.
1555/// Specific underlying errors are logged by the implementation.
1556#[derive(Debug, Clone, PartialEq, Eq)]
1557pub enum PersistError {
1558 /// VSR consensus failed after retries (quorum unavailable)
1559 ConsensusFailed,
1560 /// Storage I/O error
1561 StorageError,
1562 /// System is shutting down
1563 ShuttingDown,
1564}
1565
1566impl std::fmt::Display for PersistError {
1567 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1568 match self {
1569 Self::ConsensusFailed => write!(f, "consensus failed after retries"),
1570 Self::StorageError => write!(f, "storage I/O error"),
1571 Self::ShuttingDown => write!(f, "system is shutting down"),
1572 }
1573 }
1574}
1575
1576impl std::error::Error for PersistError {}
1577
1578// ============================================================================
1579// Compression - Copy (simple enum for codec selection)
1580// ============================================================================
1581
1582/// Compression algorithm for record payloads.
1583///
1584/// Each record stores its compression kind so that records compressed with
1585/// different algorithms can coexist in the same segment. The `None` variant
1586/// means the payload is stored uncompressed.
1587#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default, Serialize, Deserialize)]
1588pub enum CompressionKind {
1589 /// No compression (default).
1590 #[default]
1591 None = 0,
1592 /// LZ4 compression (fast, moderate ratio).
1593 Lz4 = 1,
1594 /// Zstandard compression (slower, better ratio).
1595 Zstd = 2,
1596}
1597
1598impl CompressionKind {
1599 /// Returns the single-byte discriminant for serialization.
1600 pub fn as_byte(self) -> u8 {
1601 self as u8
1602 }
1603
1604 /// Creates a `CompressionKind` from its byte discriminant.
1605 pub fn from_byte(byte: u8) -> Option<Self> {
1606 match byte {
1607 0 => Some(Self::None),
1608 1 => Some(Self::Lz4),
1609 2 => Some(Self::Zstd),
1610 _ => None,
1611 }
1612 }
1613}
1614
1615impl Display for CompressionKind {
1616 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1617 match self {
1618 Self::None => write!(f, "none"),
1619 Self::Lz4 => write!(f, "lz4"),
1620 Self::Zstd => write!(f, "zstd"),
1621 }
1622 }
1623}
1624
1625/// Flux refinement type annotations (experimental)
1626///
1627/// These annotations provide compile-time verification when Flux compiler is enabled.
1628/// Currently commented out as Flux is experimental, but documents intended properties.
1629pub mod flux_annotations;
1630
1631/// Typed-domain primitives for making illegal states unrepresentable.
1632///
1633/// Introduced by the fuzz-to-types hardening effort (see
1634/// `docs-internal/contributing/constructor-audit-2026-04.md`). Re-exports
1635/// [`NonEmptyVec`](domain::NonEmptyVec), [`SqlIdentifier`](domain::SqlIdentifier),
1636/// [`BoundedSize`](domain::BoundedSize), and [`ClearanceLevel`](domain::ClearanceLevel).
1637pub mod domain;
1638
1639pub use domain::{
1640 AGGREGATE_BUDGET_DEFAULT_BYTES, AGGREGATE_BUDGET_MIN_BYTES, AggregateMemoryBudget,
1641 AggregateMemoryBudgetError, BoundedSize, BoundedSizeError, ClearanceLevel, ClearanceLevelError,
1642 DateField, DateFieldParseError, EmptyVecError, Interval, IntervalConstructionError,
1643 NANOS_PER_DAY, NegativeSubstringLength, NonEmptyVec, SqlIdentifier, SqlIdentifierError,
1644 SubstringRange,
1645};
1646
1647#[cfg(test)]
1648mod tests;