kimberlite_types/lib.rs
1//! # kmb-types: Core types for `Kimberlite`
2//!
3//! This crate contains shared types used across the `Kimberlite` system:
4//! - Entity IDs ([`TenantId`], [`StreamId`], [`Offset`], [`GroupId`])
5//! - Cryptographic types ([`struct@Hash`])
6//! - Temporal types ([`Timestamp`])
7//! - Log record types ([`RecordKind`], [`RecordHeader`], [`Checkpoint`], [`CheckpointPolicy`])
8//! - Projection tracking ([`AppliedIndex`])
9//! - Idempotency ([`IdempotencyId`])
10//! - Recovery tracking ([`Generation`], [`RecoveryRecord`], [`RecoveryReason`])
11//! - Data classification ([`DataClass`])
12//! - Placement rules ([`Placement`], [`Region`])
13//! - Stream metadata ([`StreamMetadata`])
14//! - Audit actions ([`AuditAction`])
15//! - Event persistence ([`EventPersister`], [`PersistError`])
16//!
17//! This crate opts in to strict PRESSURECRAFT clippy lints. Test-only
18//! `unwrap()` / `panic!` are allowed via `cfg_attr(test, ...)`.
19
20#![warn(
21 clippy::unwrap_used,
22 clippy::panic,
23 clippy::todo,
24 clippy::unimplemented,
25 clippy::too_many_lines
26)]
27#![cfg_attr(
28 test,
29 allow(
30 clippy::unwrap_used,
31 clippy::panic,
32 clippy::todo,
33 clippy::unimplemented,
34 clippy::too_many_lines
35 )
36)]
37
38use std::{
39 fmt::{Debug, Display},
40 ops::{Add, AddAssign, Range, Sub},
41 time::{SystemTime, UNIX_EPOCH},
42};
43
44use bytes::Bytes;
45use serde::{Deserialize, Serialize};
46
47// ============================================================================
48// Entity IDs - All Copy (cheap 8-byte values)
49// ============================================================================
50
51/// Unique identifier for a tenant (organization/customer).
52#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
53pub struct TenantId(u64);
54
55impl TenantId {
56 pub fn new(id: u64) -> Self {
57 Self(id)
58 }
59
60 /// Extracts tenant ID from stream ID (upper 32 bits).
61 ///
62 /// **Bit Layout**:
63 /// - Upper 32 bits: `tenant_id` (supports 4.3B tenants)
64 /// - Lower 32 bits: `local_stream_id` (4.3B streams per tenant)
65 ///
66 /// # Examples
67 ///
68 /// ```
69 /// # use kimberlite_types::{TenantId, StreamId};
70 /// let stream_id = StreamId::from_tenant_and_local(TenantId::from(5), 1);
71 /// assert_eq!(TenantId::from_stream_id(stream_id), TenantId::from(5));
72 /// ```
73 pub fn from_stream_id(stream_id: StreamId) -> Self {
74 TenantId::from(u64::from(stream_id) >> 32)
75 }
76}
77
78impl Display for TenantId {
79 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
80 write!(f, "{}", self.0)
81 }
82}
83
84impl From<u64> for TenantId {
85 fn from(value: u64) -> Self {
86 Self(value)
87 }
88}
89
90impl From<TenantId> for u64 {
91 fn from(id: TenantId) -> Self {
92 id.0
93 }
94}
95
96/// Unique identifier for a stream within the system.
97#[derive(
98 Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize, Default,
99)]
100pub struct StreamId(u64);
101
102impl StreamId {
103 pub fn new(id: u64) -> Self {
104 Self(id)
105 }
106
107 /// Creates stream ID from tenant ID and local stream number.
108 ///
109 /// **Bit Layout**:
110 /// - Upper 32 bits: `tenant_id` (supports 4.3B tenants)
111 /// - Lower 32 bits: `local_stream_id` (4.3B streams per tenant)
112 ///
113 /// # Examples
114 ///
115 /// ```
116 /// # use kimberlite_types::{TenantId, StreamId};
117 /// let stream_id = StreamId::from_tenant_and_local(TenantId::from(5), 1);
118 /// assert_eq!(u64::from(stream_id), 21474836481); // (5 << 32) | 1
119 /// assert_eq!(TenantId::from_stream_id(stream_id), TenantId::from(5));
120 /// ```
121 pub fn from_tenant_and_local(tenant_id: TenantId, local_id: u32) -> Self {
122 let tenant_bits = u64::from(tenant_id) << 32;
123 let local_bits = u64::from(local_id);
124 StreamId::from(tenant_bits | local_bits)
125 }
126
127 /// Extracts local stream ID (lower 32 bits).
128 ///
129 /// # Examples
130 ///
131 /// ```
132 /// # use kimberlite_types::{TenantId, StreamId};
133 /// let stream_id = StreamId::from_tenant_and_local(TenantId::from(5), 42);
134 /// assert_eq!(stream_id.local_id(), 42);
135 /// ```
136 pub fn local_id(self) -> u32 {
137 (u64::from(self) & 0xFFFF_FFFF) as u32
138 }
139
140 /// Extracts the tenant id this stream belongs to (upper 32 bits).
141 ///
142 /// Convenience over `TenantId::from_stream_id(stream_id)` for the
143 /// common `id.tenant_id() == other` call sites.
144 ///
145 /// # Examples
146 ///
147 /// ```
148 /// # use kimberlite_types::{TenantId, StreamId};
149 /// let stream_id = StreamId::from_tenant_and_local(TenantId::from(5), 1);
150 /// assert_eq!(stream_id.tenant_id(), TenantId::from(5));
151 /// ```
152 pub fn tenant_id(self) -> TenantId {
153 TenantId::from_stream_id(self)
154 }
155}
156
157impl Add for StreamId {
158 type Output = StreamId;
159
160 /// Saturating add. Fuzzers can drive kernel state to `u64::MAX`; the
161 /// old unchecked `+` panicked on overflow under debug assertions.
162 /// Saturation keeps the type total without producing a smaller id.
163 fn add(self, rhs: Self) -> Self::Output {
164 StreamId::new(self.0.saturating_add(rhs.0))
165 }
166}
167
168impl Display for StreamId {
169 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
170 write!(f, "{}", self.0)
171 }
172}
173
174impl From<u64> for StreamId {
175 fn from(value: u64) -> Self {
176 Self(value)
177 }
178}
179
180impl From<StreamId> for u64 {
181 fn from(id: StreamId) -> Self {
182 id.0
183 }
184}
185
186/// Position of an event within a stream.
187///
188/// Offsets are zero-indexed and sequential. The first event in a stream
189/// has offset 0, the second has offset 1, and so on.
190///
191/// Uses `u64` internally — offsets are never negative by definition.
192#[derive(
193 Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize, Default,
194)]
195pub struct Offset(u64);
196
197impl Offset {
198 pub const ZERO: Offset = Offset(0);
199
200 pub fn new(offset: u64) -> Self {
201 Self(offset)
202 }
203
204 /// Returns the offset as a `u64`.
205 pub fn as_u64(&self) -> u64 {
206 self.0
207 }
208
209 /// Returns the offset as a `usize` for indexing.
210 ///
211 /// # Panics
212 ///
213 /// Panics on 32-bit platforms if the offset exceeds `usize::MAX`.
214 pub fn as_usize(&self) -> usize {
215 self.0 as usize
216 }
217}
218
219impl Display for Offset {
220 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
221 write!(f, "{}", self.0)
222 }
223}
224
225impl Add for Offset {
226 type Output = Self;
227 fn add(self, rhs: Self) -> Self::Output {
228 Self(self.0 + rhs.0)
229 }
230}
231
232impl AddAssign for Offset {
233 fn add_assign(&mut self, rhs: Self) {
234 self.0 += rhs.0;
235 }
236}
237
238impl Sub for Offset {
239 type Output = Self;
240 fn sub(self, rhs: Self) -> Self::Output {
241 Self(self.0 - rhs.0)
242 }
243}
244
245impl From<u64> for Offset {
246 fn from(value: u64) -> Self {
247 Self(value)
248 }
249}
250
251impl From<Offset> for u64 {
252 fn from(offset: Offset) -> Self {
253 offset.0
254 }
255}
256
257/// Unique identifier for a replication group.
258#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
259pub struct GroupId(u64);
260
261impl Display for GroupId {
262 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
263 write!(f, "{}", self.0)
264 }
265}
266
267impl GroupId {
268 pub fn new(id: u64) -> Self {
269 Self(id)
270 }
271}
272
273impl From<u64> for GroupId {
274 fn from(value: u64) -> Self {
275 Self(value)
276 }
277}
278
279impl From<GroupId> for u64 {
280 fn from(id: GroupId) -> Self {
281 id.0
282 }
283}
284
285// ============================================================================
286// Cryptographic Hash - Copy (fixed 32-byte value)
287// ============================================================================
288
289/// Length of cryptographic hashes in bytes (SHA-256 / BLAKE3).
290pub const HASH_LENGTH: usize = 32;
291
292/// A 32-byte cryptographic hash.
293///
294/// This is a foundation type used across `Kimberlite` for:
295/// - Hash chain links (`prev_hash` in records)
296/// - Verification anchors (in checkpoints and projections)
297/// - Content addressing
298///
299/// The specific algorithm (SHA-256 for compliance, BLAKE3 for internal)
300/// is determined by the context where the hash is computed. This type
301/// only stores the resulting 32-byte digest.
302#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
303pub struct Hash([u8; HASH_LENGTH]);
304
305impl Hash {
306 /// The genesis hash (all zeros) used as the `prev_hash` for the first record.
307 pub const GENESIS: Hash = Hash([0u8; HASH_LENGTH]);
308
309 /// Creates a hash from raw bytes.
310 pub fn from_bytes(bytes: [u8; HASH_LENGTH]) -> Self {
311 Self(bytes)
312 }
313
314 /// Returns the hash as a byte slice.
315 pub fn as_bytes(&self) -> &[u8; HASH_LENGTH] {
316 &self.0
317 }
318
319 /// Returns true if this is the genesis hash (all zeros).
320 pub fn is_genesis(&self) -> bool {
321 self.0 == [0u8; HASH_LENGTH]
322 }
323}
324
325impl Debug for Hash {
326 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
327 // Show first 8 bytes in hex for debugging without exposing full hash
328 write!(
329 f,
330 "Hash({:02x}{:02x}{:02x}{:02x}{:02x}{:02x}{:02x}{:02x}...)",
331 self.0[0], self.0[1], self.0[2], self.0[3], self.0[4], self.0[5], self.0[6], self.0[7]
332 )
333 }
334}
335
336impl Display for Hash {
337 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
338 // Full hex representation for display
339 for byte in &self.0 {
340 write!(f, "{byte:02x}")?;
341 }
342 Ok(())
343 }
344}
345
346impl Default for Hash {
347 fn default() -> Self {
348 Self::GENESIS
349 }
350}
351
352impl From<[u8; HASH_LENGTH]> for Hash {
353 fn from(bytes: [u8; HASH_LENGTH]) -> Self {
354 Self(bytes)
355 }
356}
357
358impl From<Hash> for [u8; HASH_LENGTH] {
359 fn from(hash: Hash) -> Self {
360 hash.0
361 }
362}
363
364impl AsRef<[u8]> for Hash {
365 fn as_ref(&self) -> &[u8] {
366 &self.0
367 }
368}
369
370// ============================================================================
371// Timestamp - Copy (8-byte value with monotonic guarantee)
372// ============================================================================
373
374/// Wall-clock timestamp with monotonic guarantee within the system.
375///
376/// Compliance requires real-world time for audit trails; monotonicity
377/// prevents ordering issues when system clocks are adjusted.
378///
379/// Stored as nanoseconds since Unix epoch (1970-01-01 00:00:00 UTC).
380/// This gives us ~584 years of range, well beyond any practical use.
381#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
382pub struct Timestamp(u64);
383
384impl Timestamp {
385 /// The Unix epoch (1970-01-01 00:00:00 UTC).
386 pub const EPOCH: Timestamp = Timestamp(0);
387
388 /// Creates a timestamp from nanoseconds since Unix epoch.
389 pub fn from_nanos(nanos: u64) -> Self {
390 Self(nanos)
391 }
392
393 /// Returns the timestamp as nanoseconds since Unix epoch.
394 pub fn as_nanos(&self) -> u64 {
395 self.0
396 }
397
398 /// Returns the timestamp as seconds since Unix epoch (truncates nanoseconds).
399 pub fn as_secs(&self) -> u64 {
400 self.0 / 1_000_000_000
401 }
402
403 /// Creates a timestamp for the current time.
404 ///
405 /// # Panics
406 ///
407 /// Panics if the system clock is before Unix epoch (should never happen).
408 pub fn now() -> Self {
409 let duration = SystemTime::now()
410 .duration_since(UNIX_EPOCH)
411 .expect("system clock is before Unix epoch");
412 Self(duration.as_nanos() as u64)
413 }
414
415 /// Creates a timestamp ensuring monotonicity: `max(now, last + 1ns)`.
416 ///
417 /// This guarantees that each timestamp is strictly greater than the previous,
418 /// even if the system clock moves backwards or two events occur in the same
419 /// nanosecond.
420 ///
421 /// # Arguments
422 ///
423 /// * `last` - The previous timestamp, if any. Pass `None` for the first timestamp.
424 pub fn now_monotonic(last: Option<Timestamp>) -> Self {
425 let now = Self::now();
426 match last {
427 Some(prev) => {
428 // Ensure strictly increasing: at least prev + 1 nanosecond
429 if now.0 <= prev.0 {
430 Timestamp(prev.0.saturating_add(1))
431 } else {
432 now
433 }
434 }
435 None => now,
436 }
437 }
438}
439
440impl Display for Timestamp {
441 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
442 // Display as seconds.nanoseconds for readability
443 let secs = self.0 / 1_000_000_000;
444 let nanos = self.0 % 1_000_000_000;
445 write!(f, "{secs}.{nanos:09}")
446 }
447}
448
449impl Default for Timestamp {
450 fn default() -> Self {
451 Self::EPOCH
452 }
453}
454
455impl From<u64> for Timestamp {
456 fn from(nanos: u64) -> Self {
457 Self(nanos)
458 }
459}
460
461impl From<Timestamp> for u64 {
462 fn from(ts: Timestamp) -> Self {
463 ts.0
464 }
465}
466
467// ============================================================================
468// Record Types - Copy (small enum and struct)
469// ============================================================================
470
471/// The kind of record stored in the log.
472///
473/// This enum distinguishes between different record types to enable
474/// efficient processing and verification.
475#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default, Serialize, Deserialize)]
476pub enum RecordKind {
477 /// Normal application data record.
478 #[default]
479 Data,
480 /// Periodic verification checkpoint (contains cumulative hash).
481 Checkpoint,
482 /// Logical deletion marker (data is not physically deleted).
483 Tombstone,
484}
485
486impl RecordKind {
487 /// Returns the single-byte discriminant for serialization.
488 pub fn as_byte(&self) -> u8 {
489 match self {
490 RecordKind::Data => 0,
491 RecordKind::Checkpoint => 1,
492 RecordKind::Tombstone => 2,
493 }
494 }
495
496 /// Creates a `RecordKind` from its byte discriminant.
497 ///
498 /// # Errors
499 ///
500 /// Returns `None` if the byte is not a valid discriminant.
501 pub fn from_byte(byte: u8) -> Option<Self> {
502 match byte {
503 0 => Some(RecordKind::Data),
504 1 => Some(RecordKind::Checkpoint),
505 2 => Some(RecordKind::Tombstone),
506 _ => None,
507 }
508 }
509}
510
511/// Metadata header for every log record.
512///
513/// This structure contains all metadata needed to verify and process
514/// a log record without reading its payload.
515#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
516pub struct RecordHeader {
517 /// Position in the log (0-indexed).
518 pub offset: Offset,
519 /// SHA-256 link to the previous record's hash (genesis for first record).
520 pub prev_hash: Hash,
521 /// When the record was committed (monotonic wall-clock).
522 pub timestamp: Timestamp,
523 /// Size of the payload in bytes.
524 pub payload_len: u32,
525 /// Type of record (Data, Checkpoint, Tombstone).
526 pub record_kind: RecordKind,
527}
528
529impl RecordHeader {
530 /// Creates a new record header.
531 ///
532 /// # Arguments
533 ///
534 /// * `offset` - Position in the log
535 /// * `prev_hash` - Hash of the previous record (or GENESIS for first)
536 /// * `timestamp` - When this record was committed
537 /// * `payload_len` - Size of the payload in bytes
538 /// * `record_kind` - Type of record
539 pub fn new(
540 offset: Offset,
541 prev_hash: Hash,
542 timestamp: Timestamp,
543 payload_len: u32,
544 record_kind: RecordKind,
545 ) -> Self {
546 Self {
547 offset,
548 prev_hash,
549 timestamp,
550 payload_len,
551 record_kind,
552 }
553 }
554
555 /// Returns true if this is the first record in the log.
556 pub fn is_genesis(&self) -> bool {
557 self.offset == Offset::ZERO && self.prev_hash.is_genesis()
558 }
559}
560
561// ============================================================================
562// Projection Tracking - Copy (small struct for projections)
563// ============================================================================
564
565/// Tracks which log entry a projection row was derived from.
566///
567/// Projections embed this in each row to enable:
568/// - Point-in-time queries (find rows at a specific offset)
569/// - Verification without walking the hash chain (hash provides direct check)
570/// - Audit trails (know exactly which event created/updated a row)
571#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
572pub struct AppliedIndex {
573 /// The log offset this row was derived from.
574 pub offset: Offset,
575 /// The hash at this offset for direct verification.
576 pub hash: Hash,
577}
578
579impl AppliedIndex {
580 /// Creates a new applied index.
581 pub fn new(offset: Offset, hash: Hash) -> Self {
582 Self { offset, hash }
583 }
584
585 /// Creates the initial applied index (before any records).
586 pub fn genesis() -> Self {
587 Self {
588 offset: Offset::ZERO,
589 hash: Hash::GENESIS,
590 }
591 }
592}
593
594impl Default for AppliedIndex {
595 fn default() -> Self {
596 Self::genesis()
597 }
598}
599
600// ============================================================================
601// Checkpoints - Copy (verification anchors in the log)
602// ============================================================================
603
604/// A periodic verification checkpoint stored in the log.
605///
606/// Checkpoints are records IN the log (not separate files), which means:
607/// - They are part of the hash chain (tamper-evident)
608/// - Checkpoint history is immutable
609/// - Single source of truth
610///
611/// Checkpoints enable efficient verified reads by providing trusted
612/// anchor points, reducing verification from O(n) to O(k) where k is
613/// the distance to the nearest checkpoint.
614#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
615pub struct Checkpoint {
616 /// Log position of this checkpoint record.
617 pub offset: Offset,
618 /// Cumulative hash of the chain at this point.
619 pub chain_hash: Hash,
620 /// Total number of records from genesis to this checkpoint.
621 pub record_count: u64,
622 /// When this checkpoint was created.
623 pub created_at: Timestamp,
624}
625
626impl Checkpoint {
627 /// Creates a new checkpoint.
628 ///
629 /// # Preconditions
630 ///
631 /// - `record_count` should equal `offset.as_u64() + 1` (0-indexed offset)
632 pub fn new(offset: Offset, chain_hash: Hash, record_count: u64, created_at: Timestamp) -> Self {
633 debug_assert_eq!(
634 record_count,
635 offset.as_u64() + 1,
636 "record_count should equal offset + 1"
637 );
638 Self {
639 offset,
640 chain_hash,
641 record_count,
642 created_at,
643 }
644 }
645}
646
647/// Policy for when to create checkpoints.
648///
649/// Checkpoints bound the worst-case verification cost. The default policy
650/// creates a checkpoint every 1000 records and on graceful shutdown.
651#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
652pub struct CheckpointPolicy {
653 /// Create a checkpoint every N records. Set to 0 to disable.
654 pub every_n_records: u64,
655 /// Create a checkpoint on graceful shutdown.
656 pub on_shutdown: bool,
657 /// If true, disable automatic checkpoints (only explicit calls).
658 pub explicit_only: bool,
659}
660
661impl CheckpointPolicy {
662 /// Creates a policy that checkpoints every N records.
663 pub fn every(n: u64) -> Self {
664 Self {
665 every_n_records: n,
666 on_shutdown: true,
667 explicit_only: false,
668 }
669 }
670
671 /// Creates a policy that only creates explicit checkpoints.
672 pub fn explicit_only() -> Self {
673 Self {
674 every_n_records: 0,
675 on_shutdown: false,
676 explicit_only: true,
677 }
678 }
679
680 /// Returns true if a checkpoint should be created at this offset.
681 pub fn should_checkpoint(&self, offset: Offset) -> bool {
682 if self.explicit_only {
683 return false;
684 }
685 if self.every_n_records == 0 {
686 return false;
687 }
688 // Checkpoint at offsets that are multiples of every_n_records
689 // (offset 999 for every_n_records=1000, etc.)
690 (offset.as_u64() + 1) % self.every_n_records == 0
691 }
692}
693
694impl Default for CheckpointPolicy {
695 /// Default policy: checkpoint every 1000 records, on shutdown.
696 fn default() -> Self {
697 Self {
698 every_n_records: 1000,
699 on_shutdown: true,
700 explicit_only: false,
701 }
702 }
703}
704
705// ============================================================================
706// Idempotency - Copy (16-byte identifier for duplicate prevention)
707// ============================================================================
708
709/// Length of idempotency IDs in bytes.
710pub const IDEMPOTENCY_ID_LENGTH: usize = 16;
711
712/// Unique identifier for duplicate transaction prevention.
713///
714/// Clients generate an `IdempotencyId` before their first attempt at a
715/// transaction. If the transaction needs to be retried (e.g., network
716/// timeout), the client reuses the same ID. The server tracks committed
717/// IDs to return the same result for duplicate requests.
718///
719/// Inspired by `FoundationDB`'s idempotency key design.
720///
721/// # FCIS Pattern
722///
723/// This type follows the Functional Core / Imperative Shell pattern:
724/// - `from_bytes()`: Pure restoration from storage
725/// - `from_random_bytes()`: Pure construction from bytes (`pub(crate)`)
726/// - `generate()`: Impure shell that invokes CSPRNG
727#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
728pub struct IdempotencyId([u8; IDEMPOTENCY_ID_LENGTH]);
729
730impl IdempotencyId {
731 // ========================================================================
732 // Functional Core (pure, testable)
733 // ========================================================================
734
735 /// Pure construction from random bytes.
736 ///
737 /// Restricted to `pub(crate)` to prevent misuse with weak random sources.
738 /// External callers should use `generate()` or `from_bytes()`.
739 pub(crate) fn from_random_bytes(bytes: [u8; IDEMPOTENCY_ID_LENGTH]) -> Self {
740 debug_assert!(
741 bytes.iter().any(|&b| b != 0),
742 "idempotency ID bytes are all zeros"
743 );
744 Self(bytes)
745 }
746
747 /// Restoration from stored bytes (pure).
748 ///
749 /// Use this when loading an `IdempotencyId` from storage or wire protocol.
750 pub fn from_bytes(bytes: [u8; IDEMPOTENCY_ID_LENGTH]) -> Self {
751 Self(bytes)
752 }
753
754 /// Returns the ID as a byte slice.
755 pub fn as_bytes(&self) -> &[u8; IDEMPOTENCY_ID_LENGTH] {
756 &self.0
757 }
758
759 // ========================================================================
760 // Imperative Shell (IO boundary)
761 // ========================================================================
762
763 /// Generates a new random idempotency ID using the OS CSPRNG.
764 ///
765 /// # Panics
766 ///
767 /// Panics if the OS CSPRNG fails, which indicates a catastrophic
768 /// system error (e.g., no entropy source available).
769 pub fn generate() -> Self {
770 let mut bytes = [0u8; IDEMPOTENCY_ID_LENGTH];
771 getrandom::fill(&mut bytes).expect("CSPRNG failure is catastrophic");
772 Self::from_random_bytes(bytes)
773 }
774}
775
776impl Debug for IdempotencyId {
777 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
778 // Show full hex for debugging (IDs are meant to be logged)
779 write!(f, "IdempotencyId(")?;
780 for byte in &self.0 {
781 write!(f, "{byte:02x}")?;
782 }
783 write!(f, ")")
784 }
785}
786
787impl Display for IdempotencyId {
788 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
789 // Full hex representation
790 for byte in &self.0 {
791 write!(f, "{byte:02x}")?;
792 }
793 Ok(())
794 }
795}
796
797impl From<[u8; IDEMPOTENCY_ID_LENGTH]> for IdempotencyId {
798 fn from(bytes: [u8; IDEMPOTENCY_ID_LENGTH]) -> Self {
799 Self::from_bytes(bytes)
800 }
801}
802
803impl From<IdempotencyId> for [u8; IDEMPOTENCY_ID_LENGTH] {
804 fn from(id: IdempotencyId) -> Self {
805 id.0
806 }
807}
808
809// ============================================================================
810// Recovery Tracking - Copy (generation-based recovery for compliance)
811// ============================================================================
812
813/// Monotonically increasing recovery generation.
814///
815/// Each recovery event creates a new generation. This provides natural
816/// audit checkpoints and explicit tracking of system recovery events.
817///
818/// Inspired by `FoundationDB`'s generation-based recovery tracking.
819#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
820pub struct Generation(u64);
821
822impl Generation {
823 /// The initial generation (before any recovery).
824 pub const INITIAL: Generation = Generation(0);
825
826 /// Creates a generation from a raw value.
827 pub fn new(value: u64) -> Self {
828 Self(value)
829 }
830
831 /// Returns the generation as a u64.
832 pub fn as_u64(&self) -> u64 {
833 self.0
834 }
835
836 /// Returns the next generation (incremented by 1).
837 pub fn next(&self) -> Self {
838 Generation(self.0.saturating_add(1))
839 }
840}
841
842impl Display for Generation {
843 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
844 write!(f, "gen:{}", self.0)
845 }
846}
847
848impl Default for Generation {
849 fn default() -> Self {
850 Self::INITIAL
851 }
852}
853
854impl From<u64> for Generation {
855 fn from(value: u64) -> Self {
856 Self(value)
857 }
858}
859
860impl From<Generation> for u64 {
861 fn from(generation: Generation) -> Self {
862 generation.0
863 }
864}
865
866/// Reason why a recovery was triggered.
867///
868/// This is recorded in the recovery log for compliance auditing.
869#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
870pub enum RecoveryReason {
871 /// Normal node restart (graceful or crash recovery).
872 NodeRestart,
873 /// Lost quorum and had to recover from remaining replicas.
874 QuorumLoss,
875 /// Detected data corruption requiring recovery.
876 CorruptionDetected,
877 /// Operator manually triggered recovery.
878 ManualIntervention,
879}
880
881impl Display for RecoveryReason {
882 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
883 match self {
884 RecoveryReason::NodeRestart => write!(f, "node_restart"),
885 RecoveryReason::QuorumLoss => write!(f, "quorum_loss"),
886 RecoveryReason::CorruptionDetected => write!(f, "corruption_detected"),
887 RecoveryReason::ManualIntervention => write!(f, "manual_intervention"),
888 }
889 }
890}
891
892/// Records a recovery event with explicit tracking of any data loss.
893///
894/// Critical for compliance: auditors can see exactly what happened during
895/// recovery, including any mutations that were discarded.
896///
897/// Inspired by `FoundationDB`'s 9-phase recovery with explicit data loss tracking.
898#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
899pub struct RecoveryRecord {
900 /// New generation after recovery.
901 pub generation: Generation,
902 /// Previous generation before recovery.
903 pub previous_generation: Generation,
904 /// Last known committed offset before recovery.
905 pub known_committed: Offset,
906 /// Offset we recovered to.
907 pub recovery_point: Offset,
908 /// Range of discarded prepares (if any) - EXPLICIT LOSS TRACKING.
909 ///
910 /// If `Some`, this range of offsets contained prepared but uncommitted
911 /// mutations that were discarded during recovery. This is the critical
912 /// compliance field: it explicitly documents any data loss.
913 pub discarded_range: Option<Range<Offset>>,
914 /// When recovery occurred.
915 pub timestamp: Timestamp,
916 /// Why recovery was triggered.
917 pub reason: RecoveryReason,
918}
919
920impl RecoveryRecord {
921 /// Creates a new recovery record.
922 ///
923 /// # Arguments
924 ///
925 /// * `generation` - The new generation after recovery
926 /// * `previous_generation` - The generation before recovery
927 /// * `known_committed` - Last known committed offset
928 /// * `recovery_point` - The offset we recovered to
929 /// * `discarded_range` - Range of discarded uncommitted prepares, if any
930 /// * `timestamp` - When recovery occurred
931 /// * `reason` - Why recovery was triggered
932 ///
933 /// # Preconditions
934 ///
935 /// - `generation` must be greater than `previous_generation`
936 /// - `recovery_point` must be <= `known_committed`
937 pub fn new(
938 generation: Generation,
939 previous_generation: Generation,
940 known_committed: Offset,
941 recovery_point: Offset,
942 discarded_range: Option<Range<Offset>>,
943 timestamp: Timestamp,
944 reason: RecoveryReason,
945 ) -> Self {
946 debug_assert!(
947 generation > previous_generation,
948 "new generation must be greater than previous"
949 );
950 debug_assert!(
951 recovery_point <= known_committed,
952 "recovery point cannot exceed known committed"
953 );
954
955 Self {
956 generation,
957 previous_generation,
958 known_committed,
959 recovery_point,
960 discarded_range,
961 timestamp,
962 reason,
963 }
964 }
965
966 /// Returns true if any data was lost during this recovery.
967 pub fn had_data_loss(&self) -> bool {
968 self.discarded_range.is_some()
969 }
970
971 /// Returns the number of discarded records, if any.
972 pub fn discarded_count(&self) -> u64 {
973 self.discarded_range
974 .as_ref()
975 .map_or(0, |r| r.end.as_u64().saturating_sub(r.start.as_u64()))
976 }
977}
978
979// ============================================================================
980// Stream Name - Clone (contains String, but rarely cloned)
981// ============================================================================
982
983/// Human-readable name for a stream.
984#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
985pub struct StreamName(String);
986
987impl StreamName {
988 pub fn new(name: impl Into<String>) -> Self {
989 Self(name.into())
990 }
991
992 pub fn as_str(&self) -> &str {
993 &self.0
994 }
995}
996
997impl Display for StreamName {
998 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
999 write!(f, "{}", self.0)
1000 }
1001}
1002
1003impl From<String> for StreamName {
1004 fn from(name: String) -> Self {
1005 Self(name)
1006 }
1007}
1008
1009impl From<&str> for StreamName {
1010 fn from(name: &str) -> Self {
1011 Self(name.to_string())
1012 }
1013}
1014
1015impl From<StreamName> for String {
1016 fn from(value: StreamName) -> Self {
1017 value.0
1018 }
1019}
1020
1021// ============================================================================
1022// Data Classification - Copy (simple enum, no heap data)
1023// ============================================================================
1024
1025/// Classification of data for compliance purposes.
1026///
1027/// Supports multi-framework compliance: HIPAA, GDPR, PCI DSS, SOX, ISO 27001, `FedRAMP`.
1028///
1029/// # Classification Levels (8 total)
1030///
1031/// **Healthcare (HIPAA):**
1032/// - `PHI`: Protected Health Information
1033/// - `Deidentified`: De-identified per HIPAA Safe Harbor
1034///
1035/// **Privacy (GDPR):**
1036/// - `PII`: Personally Identifiable Information (GDPR Article 4)
1037/// - `Sensitive`: Special category data (GDPR Article 9) - race, health, biometrics, etc.
1038///
1039/// **Financial (PCI DSS, SOX):**
1040/// - `PCI`: Payment Card Industry data (card numbers, CVV, etc.)
1041/// - `Financial`: Financial records subject to SOX regulations
1042///
1043/// **General:**
1044/// - `Confidential`: Internal business data, trade secrets
1045/// - `Public`: Publicly available data with no restrictions
1046///
1047/// # Framework Mappings
1048///
1049/// | Level | HIPAA | GDPR | PCI DSS | SOX | ISO 27001 | FedRAMP |
1050/// |-------|-------|------|---------|-----|-----------|---------|
1051/// | PHI | ✓ | ✓ (PII) | — | — | ✓ | ✓ |
1052/// | Deidentified | ✓ | — | — | — | — | — |
1053/// | PII | — | ✓ | — | — | ✓ | ✓ |
1054/// | Sensitive | — | ✓ (Art 9) | — | — | ✓ | ✓ |
1055/// | PCI | — | ✓ (PII) | ✓ | — | ✓ | ✓ |
1056/// | Financial | — | — | — | ✓ | ✓ | ✓ |
1057/// | Confidential | — | — | — | — | ✓ | ✓ |
1058/// | Public | — | — | — | — | — | — |
1059///
1060#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
1061pub enum DataClass {
1062 // ========================================================================
1063 // Healthcare (HIPAA)
1064 // ========================================================================
1065 /// Protected Health Information - subject to HIPAA restrictions.
1066 ///
1067 /// **Examples:** Medical records, diagnoses, lab results, prescriptions
1068 ///
1069 /// **Compliance:** HIPAA Privacy Rule, HIPAA Security Rule
1070 ///
1071 /// **Retention:** Minimum 6 years after last treatment (HIPAA § 164.530)
1072 PHI,
1073
1074 /// Data that has been de-identified per HIPAA Safe Harbor.
1075 ///
1076 /// **Requirements:** All 18 HIPAA identifiers removed (§ 164.514(b)(2))
1077 ///
1078 /// **Examples:** Anonymized patient datasets, aggregate statistics
1079 ///
1080 /// **Compliance:** HIPAA Safe Harbor Method
1081 Deidentified,
1082
1083 // ========================================================================
1084 // Privacy (GDPR)
1085 // ========================================================================
1086 /// Personally Identifiable Information (GDPR Article 4).
1087 ///
1088 /// **Examples:** Names, email addresses, IP addresses, location data
1089 ///
1090 /// **Compliance:** GDPR Articles 5-11 (lawfulness, consent, purpose limitation)
1091 ///
1092 /// **Rights:** Access, rectification, erasure, portability (GDPR Articles 15-20)
1093 PII,
1094
1095 /// Special category data (GDPR Article 9).
1096 ///
1097 /// **Examples:** Racial/ethnic origin, political opinions, religious beliefs,
1098 /// trade union membership, genetic data, biometric data, health data, sex life
1099 ///
1100 /// **Compliance:** GDPR Article 9 (explicit consent required, stricter controls)
1101 ///
1102 /// **Restrictions:** Processing prohibited unless explicit exception applies
1103 Sensitive,
1104
1105 // ========================================================================
1106 // Financial (PCI DSS, SOX)
1107 // ========================================================================
1108 /// Payment Card Industry data (PCI DSS).
1109 ///
1110 /// **Examples:** Credit card numbers, CVV codes, cardholder data
1111 ///
1112 /// **Compliance:** PCI DSS Requirements 1-12
1113 ///
1114 /// **Storage:** Never store CVV/CVV2/PIN after authorization
1115 PCI,
1116
1117 /// Financial records subject to SOX regulations.
1118 ///
1119 /// **Examples:** General ledger, financial statements, audit trails
1120 ///
1121 /// **Compliance:** Sarbanes-Oxley Act § 302, § 404
1122 ///
1123 /// **Retention:** 7 years minimum (SOX § 802)
1124 Financial,
1125
1126 // ========================================================================
1127 // General
1128 // ========================================================================
1129 /// Internal business data, trade secrets.
1130 ///
1131 /// **Examples:** Proprietary algorithms, business strategies, internal communications
1132 ///
1133 /// **Compliance:** ISO 27001 Annex A.8 (Asset Management)
1134 ///
1135 /// **Access:** Restricted to authorized personnel
1136 Confidential,
1137
1138 /// Publicly available data with no restrictions.
1139 ///
1140 /// **Examples:** Public website content, press releases, published research
1141 ///
1142 /// **Compliance:** No special restrictions
1143 ///
1144 /// **Access:** Unrestricted
1145 Public,
1146}
1147
1148// ============================================================================
1149// Placement - Clone (Region::Custom contains String)
1150// ============================================================================
1151
1152/// Placement policy for a stream.
1153#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
1154pub enum Placement {
1155 /// Data must remain within the specified region.
1156 Region(Region),
1157 /// Data can be replicated globally across all regions.
1158 Global,
1159}
1160
1161/// Geographic region for data placement.
1162#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
1163pub enum Region {
1164 /// US East (N. Virginia) - us-east-1
1165 USEast1,
1166 /// Asia Pacific (Sydney) - ap-southeast-2
1167 APSoutheast2,
1168 /// Custom region identifier
1169 Custom(String),
1170}
1171
1172impl Region {
1173 pub fn custom(name: impl Into<String>) -> Self {
1174 Self::Custom(name.into())
1175 }
1176}
1177
1178impl Display for Region {
1179 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1180 match self {
1181 Region::USEast1 => write!(f, "us-east-1"),
1182 Region::APSoutheast2 => write!(f, "ap-southeast-2"),
1183 Region::Custom(custom) => write!(f, "{custom}"),
1184 }
1185 }
1186}
1187
1188// ============================================================================
1189// Stream Metadata - Clone (created once per stream, cloned rarely)
1190// ============================================================================
1191
1192/// Metadata describing a stream's configuration and current state.
1193#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
1194pub struct StreamMetadata {
1195 pub stream_id: StreamId,
1196 pub stream_name: StreamName,
1197 pub data_class: DataClass,
1198 pub placement: Placement,
1199 pub current_offset: Offset,
1200}
1201
1202impl StreamMetadata {
1203 /// Creates new stream metadata with offset initialized to 0.
1204 pub fn new(
1205 stream_id: StreamId,
1206 stream_name: StreamName,
1207 data_class: DataClass,
1208 placement: Placement,
1209 ) -> Self {
1210 Self {
1211 stream_id,
1212 stream_name,
1213 data_class,
1214 placement,
1215 current_offset: Offset::default(),
1216 }
1217 }
1218}
1219
1220// ============================================================================
1221// Batch Payload - NOT Clone (contains Vec<Bytes>, move only)
1222// ============================================================================
1223
1224/// A batch of events to append to a stream.
1225#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
1226pub struct BatchPayload {
1227 pub stream_id: StreamId,
1228 /// The events to append (zero-copy Bytes).
1229 pub events: Vec<Bytes>,
1230 /// Expected current offset for optimistic concurrency.
1231 pub expected_offset: Offset,
1232}
1233
1234impl BatchPayload {
1235 pub fn new(stream_id: StreamId, events: Vec<Bytes>, expected_offset: Offset) -> Self {
1236 Self {
1237 stream_id,
1238 events,
1239 expected_offset,
1240 }
1241 }
1242}
1243
1244// ============================================================================
1245// Audit Actions - Clone (for flexibility in logging)
1246// ============================================================================
1247
1248/// Actions recorded in the audit log.
1249#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1250pub enum AuditAction {
1251 /// A new stream was created.
1252 StreamCreated {
1253 stream_id: StreamId,
1254 stream_name: StreamName,
1255 data_class: DataClass,
1256 placement: Placement,
1257 },
1258 /// Events were appended to a stream.
1259 EventsAppended {
1260 stream_id: StreamId,
1261 count: u32,
1262 from_offset: Offset,
1263 },
1264 /// **AUDIT-2026-04 H-5** — a tenant was sealed for
1265 /// forensic / audit / legal-hold operations. No mutating
1266 /// commands (DDL, DML, AppendBatch, CreateStream) will be
1267 /// accepted against the tenant until an `Unseal` is applied.
1268 TenantSealed {
1269 tenant_id: TenantId,
1270 reason: SealReason,
1271 },
1272 /// **AUDIT-2026-04 H-5** — a previously-sealed tenant was
1273 /// released. Mutating commands are accepted again. Audit trail
1274 /// retains the seal/unseal pair as structured evidence.
1275 TenantUnsealed { tenant_id: TenantId },
1276
1277 // ------------------------------------------------------------------------
1278 // v0.6.0 Tier 2 #7 — column-level masking policy lifecycle
1279 // ------------------------------------------------------------------------
1280 //
1281 // Every CRUD transition on a masking policy emits an audit record
1282 // so HIPAA § 164.312(b) audit reviews can reconstruct "who changed
1283 // which column's mask when". The attachment records intentionally
1284 // carry the table / column identifiers as strings so auditors don't
1285 // need a table-id lookup table to read the log.
1286 /// A new masking policy was declared via `CREATE MASKING POLICY`.
1287 MaskingPolicyCreated {
1288 tenant_id: TenantId,
1289 policy_name: String,
1290 },
1291 /// A masking policy was dropped via `DROP MASKING POLICY`.
1292 MaskingPolicyDropped {
1293 tenant_id: TenantId,
1294 policy_name: String,
1295 },
1296 /// A column was attached to a masking policy via
1297 /// `ALTER TABLE … SET MASKING POLICY`.
1298 MaskingPolicyAttached {
1299 tenant_id: TenantId,
1300 table_id: u64,
1301 column_name: String,
1302 policy_name: String,
1303 },
1304 /// A column's masking policy was detached via
1305 /// `ALTER TABLE … DROP MASKING POLICY`.
1306 MaskingPolicyDetached {
1307 tenant_id: TenantId,
1308 table_id: u64,
1309 column_name: String,
1310 },
1311 /// **v0.6.0 Tier 1 #3 — UPSERT** — an `INSERT ... ON CONFLICT`
1312 /// statement was applied atomically. The resolution discriminator
1313 /// lets compliance auditors distinguish insert-vs-update-vs-noop
1314 /// without re-reading the row payload.
1315 UpsertApplied {
1316 stream_id: StreamId,
1317 /// Which branch fired: new row inserted, existing row updated,
1318 /// or the conflict triggered `DO NOTHING`.
1319 resolution: UpsertResolution,
1320 /// Offset the single resulting event (if any) lives at. For
1321 /// `NoOp` the offset equals the pre-upsert head — no event is
1322 /// emitted, but we record the observed stream position so
1323 /// audit replays can reconstruct the exact moment the upsert
1324 /// was applied.
1325 from_offset: Offset,
1326 },
1327}
1328
1329/// **v0.6.0 Tier 1 #3** — the atomic resolution of a single UPSERT.
1330///
1331/// Every `UpsertApplied` event carries exactly one of these. The
1332/// discriminator is a structural part of the audit payload so
1333/// downstream readers never have to infer intent from a dual-write
1334/// sequence — a property the notebar helper's UPDATE+INSERT pair
1335/// explicitly lacked.
1336#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
1337pub enum UpsertResolution {
1338 /// No prior row at the conflict key — the upsert appended a new row.
1339 Inserted,
1340 /// A prior row existed — `ON CONFLICT DO UPDATE SET ...` fired and
1341 /// the row was rewritten with the merged column set.
1342 Updated,
1343 /// A prior row existed and the clause was `DO NOTHING`. No storage
1344 /// mutation, no row-level audit beyond this `UpsertApplied` record.
1345 NoOp,
1346}
1347
1348/// **AUDIT-2026-04 H-5** — why a tenant was sealed.
1349///
1350/// This is an enum rather than a free-form string so downstream
1351/// compliance reports can aggregate on specific operational
1352/// categories — healthcare auditors expect to see discrete reasons
1353/// (e.g. `ForensicHold`) not human-written prose.
1354#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
1355pub enum SealReason {
1356 /// Forensic preservation during an incident investigation.
1357 ForensicHold,
1358 /// Audit is in progress; writes must freeze until audit completes.
1359 AuditInProgress,
1360 /// Breach investigation — preserve state for forensic analysis.
1361 BreachInvestigation,
1362 /// Legal hold (discovery / litigation-retention).
1363 LegalHold,
1364}
1365
1366// ============================================================================
1367// Event Persistence - Trait for durable event log writes
1368// ============================================================================
1369
1370/// Abstraction for persisting events to the durable event log.
1371///
1372/// This trait is the bridge between the projection layer and the
1373/// `Kimberlite` replication system. Implementations must block until
1374/// persistence is confirmed.
1375///
1376/// # Healthcare Compliance
1377///
1378/// This is the critical path for HIPAA compliance. The implementation must:
1379/// - **Block until VSR consensus** completes (quorum durability)
1380/// - **Return `Err`** if consensus fails (triggers rollback)
1381/// - **Never return `Ok`** unless events are durably stored
1382///
1383/// # Implementation Notes
1384///
1385/// The implementor (typically `Runtime`) must handle the sync→async bridge:
1386///
1387/// ```ignore
1388/// impl EventPersister for RuntimeHandle {
1389/// fn persist_blocking(&self, stream_id: StreamId, events: Vec<Bytes>) -> Result<Offset, PersistError> {
1390/// // Bridge sync callback to async runtime
1391/// tokio::task::block_in_place(|| {
1392/// tokio::runtime::Handle::current().block_on(async {
1393/// self.inner.append(stream_id, events).await
1394/// })
1395/// })
1396/// .map_err(|e| {
1397/// tracing::error!(error = %e, "VSR persistence failed");
1398/// PersistError::ConsensusFailed
1399/// })
1400/// }
1401/// }
1402/// ```
1403///
1404/// # Why `Vec<Bytes>` instead of typed events?
1405///
1406/// Events are serialized before reaching this trait. This keeps `kmb-types`
1407/// decoupled from domain-specific event schemas.
1408pub trait EventPersister: Send + Sync + Debug {
1409 /// Persist a batch of serialized events to the durable event log.
1410 ///
1411 /// This method **blocks** until VSR consensus confirms the events are
1412 /// durably stored on a quorum of nodes.
1413 ///
1414 /// # Arguments
1415 ///
1416 /// * `stream_id` - The stream to append events to
1417 /// * `events` - Serialized events
1418 ///
1419 /// # Returns
1420 ///
1421 /// * `Ok(offset)` - Events persisted, returns the new stream offset
1422 /// * `Err(PersistError)` - Persistence failed, caller should rollback
1423 ///
1424 /// # Errors
1425 ///
1426 /// * [`PersistError::ConsensusFailed`] - VSR quorum unavailable after retries
1427 /// * [`PersistError::StorageError`] - Disk I/O or serialization failure
1428 /// * [`PersistError::ShuttingDown`] - System is terminating
1429 fn persist_blocking(
1430 &self,
1431 stream_id: StreamId,
1432 events: Vec<Bytes>,
1433 ) -> Result<Offset, PersistError>;
1434}
1435
1436/// Error returned when event persistence fails.
1437///
1438/// The hook uses this to decide whether to rollback the transaction.
1439/// Specific underlying errors are logged by the implementation.
1440#[derive(Debug, Clone, PartialEq, Eq)]
1441pub enum PersistError {
1442 /// VSR consensus failed after retries (quorum unavailable)
1443 ConsensusFailed,
1444 /// Storage I/O error
1445 StorageError,
1446 /// System is shutting down
1447 ShuttingDown,
1448}
1449
1450impl std::fmt::Display for PersistError {
1451 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1452 match self {
1453 Self::ConsensusFailed => write!(f, "consensus failed after retries"),
1454 Self::StorageError => write!(f, "storage I/O error"),
1455 Self::ShuttingDown => write!(f, "system is shutting down"),
1456 }
1457 }
1458}
1459
1460impl std::error::Error for PersistError {}
1461
1462// ============================================================================
1463// Compression - Copy (simple enum for codec selection)
1464// ============================================================================
1465
1466/// Compression algorithm for record payloads.
1467///
1468/// Each record stores its compression kind so that records compressed with
1469/// different algorithms can coexist in the same segment. The `None` variant
1470/// means the payload is stored uncompressed.
1471#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default, Serialize, Deserialize)]
1472pub enum CompressionKind {
1473 /// No compression (default).
1474 #[default]
1475 None = 0,
1476 /// LZ4 compression (fast, moderate ratio).
1477 Lz4 = 1,
1478 /// Zstandard compression (slower, better ratio).
1479 Zstd = 2,
1480}
1481
1482impl CompressionKind {
1483 /// Returns the single-byte discriminant for serialization.
1484 pub fn as_byte(self) -> u8 {
1485 self as u8
1486 }
1487
1488 /// Creates a `CompressionKind` from its byte discriminant.
1489 pub fn from_byte(byte: u8) -> Option<Self> {
1490 match byte {
1491 0 => Some(Self::None),
1492 1 => Some(Self::Lz4),
1493 2 => Some(Self::Zstd),
1494 _ => None,
1495 }
1496 }
1497}
1498
1499impl Display for CompressionKind {
1500 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1501 match self {
1502 Self::None => write!(f, "none"),
1503 Self::Lz4 => write!(f, "lz4"),
1504 Self::Zstd => write!(f, "zstd"),
1505 }
1506 }
1507}
1508
1509/// Flux refinement type annotations (experimental)
1510///
1511/// These annotations provide compile-time verification when Flux compiler is enabled.
1512/// Currently commented out as Flux is experimental, but documents intended properties.
1513pub mod flux_annotations;
1514
1515/// Typed-domain primitives for making illegal states unrepresentable.
1516///
1517/// Introduced by the fuzz-to-types hardening effort (see
1518/// `docs-internal/contributing/constructor-audit-2026-04.md`). Re-exports
1519/// [`NonEmptyVec`](domain::NonEmptyVec), [`SqlIdentifier`](domain::SqlIdentifier),
1520/// [`BoundedSize`](domain::BoundedSize), and [`ClearanceLevel`](domain::ClearanceLevel).
1521pub mod domain;
1522
1523pub use domain::{
1524 BoundedSize, BoundedSizeError, ClearanceLevel, ClearanceLevelError, EmptyVecError, NonEmptyVec,
1525 SqlIdentifier, SqlIdentifierError,
1526};
1527
1528#[cfg(test)]
1529mod tests;