Skip to main content

kimberlite_types/
lib.rs

1//! # kmb-types: Core types for `Kimberlite`
2//!
3//! This crate contains shared types used across the `Kimberlite` system:
4//! - Entity IDs ([`TenantId`], [`StreamId`], [`Offset`], [`GroupId`])
5//! - Cryptographic types ([`struct@Hash`])
6//! - Temporal types ([`Timestamp`])
7//! - Log record types ([`RecordKind`], [`RecordHeader`], [`Checkpoint`], [`CheckpointPolicy`])
8//! - Projection tracking ([`AppliedIndex`])
9//! - Idempotency ([`IdempotencyId`])
10//! - Recovery tracking ([`Generation`], [`RecoveryRecord`], [`RecoveryReason`])
11//! - Data classification ([`DataClass`])
12//! - Placement rules ([`Placement`], [`Region`])
13//! - Stream metadata ([`StreamMetadata`])
14//! - Audit actions ([`AuditAction`])
15//! - Event persistence ([`EventPersister`], [`PersistError`])
16
17use std::{
18    fmt::{Debug, Display},
19    ops::{Add, AddAssign, Range, Sub},
20    time::{SystemTime, UNIX_EPOCH},
21};
22
23use bytes::Bytes;
24use serde::{Deserialize, Serialize};
25
26// ============================================================================
27// Entity IDs - All Copy (cheap 8-byte values)
28// ============================================================================
29
30/// Unique identifier for a tenant (organization/customer).
31#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
32pub struct TenantId(u64);
33
34impl TenantId {
35    pub fn new(id: u64) -> Self {
36        Self(id)
37    }
38
39    /// Extracts tenant ID from stream ID (upper 32 bits).
40    ///
41    /// **Bit Layout**:
42    /// - Upper 32 bits: `tenant_id` (supports 4.3B tenants)
43    /// - Lower 32 bits: `local_stream_id` (4.3B streams per tenant)
44    ///
45    /// # Examples
46    ///
47    /// ```
48    /// # use kimberlite_types::{TenantId, StreamId};
49    /// let stream_id = StreamId::from_tenant_and_local(TenantId::from(5), 1);
50    /// assert_eq!(TenantId::from_stream_id(stream_id), TenantId::from(5));
51    /// ```
52    pub fn from_stream_id(stream_id: StreamId) -> Self {
53        TenantId::from(u64::from(stream_id) >> 32)
54    }
55}
56
57impl From<u64> for TenantId {
58    fn from(value: u64) -> Self {
59        Self(value)
60    }
61}
62
63impl From<TenantId> for u64 {
64    fn from(id: TenantId) -> Self {
65        id.0
66    }
67}
68
69/// Unique identifier for a stream within the system.
70#[derive(
71    Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize, Default,
72)]
73pub struct StreamId(u64);
74
75impl StreamId {
76    pub fn new(id: u64) -> Self {
77        Self(id)
78    }
79
80    /// Creates stream ID from tenant ID and local stream number.
81    ///
82    /// **Bit Layout**:
83    /// - Upper 32 bits: `tenant_id` (supports 4.3B tenants)
84    /// - Lower 32 bits: `local_stream_id` (4.3B streams per tenant)
85    ///
86    /// # Examples
87    ///
88    /// ```
89    /// # use kimberlite_types::{TenantId, StreamId};
90    /// let stream_id = StreamId::from_tenant_and_local(TenantId::from(5), 1);
91    /// assert_eq!(u64::from(stream_id), 21474836481); // (5 << 32) | 1
92    /// assert_eq!(TenantId::from_stream_id(stream_id), TenantId::from(5));
93    /// ```
94    pub fn from_tenant_and_local(tenant_id: TenantId, local_id: u32) -> Self {
95        let tenant_bits = u64::from(tenant_id) << 32;
96        let local_bits = u64::from(local_id);
97        StreamId::from(tenant_bits | local_bits)
98    }
99
100    /// Extracts local stream ID (lower 32 bits).
101    ///
102    /// # Examples
103    ///
104    /// ```
105    /// # use kimberlite_types::{TenantId, StreamId};
106    /// let stream_id = StreamId::from_tenant_and_local(TenantId::from(5), 42);
107    /// assert_eq!(stream_id.local_id(), 42);
108    /// ```
109    pub fn local_id(self) -> u32 {
110        (u64::from(self) & 0xFFFF_FFFF) as u32
111    }
112}
113
114impl Add for StreamId {
115    type Output = StreamId;
116
117    fn add(self, rhs: Self) -> Self::Output {
118        let v = self.0 + rhs.0;
119        StreamId::new(v)
120    }
121}
122
123impl Display for StreamId {
124    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
125        write!(f, "{}", self.0)
126    }
127}
128
129impl From<u64> for StreamId {
130    fn from(value: u64) -> Self {
131        Self(value)
132    }
133}
134
135impl From<StreamId> for u64 {
136    fn from(id: StreamId) -> Self {
137        id.0
138    }
139}
140
141/// Position of an event within a stream.
142///
143/// Offsets are zero-indexed and sequential. The first event in a stream
144/// has offset 0, the second has offset 1, and so on.
145///
146/// Uses `u64` internally — offsets are never negative by definition.
147#[derive(
148    Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize, Default,
149)]
150pub struct Offset(u64);
151
152impl Offset {
153    pub const ZERO: Offset = Offset(0);
154
155    pub fn new(offset: u64) -> Self {
156        Self(offset)
157    }
158
159    /// Returns the offset as a `u64`.
160    pub fn as_u64(&self) -> u64 {
161        self.0
162    }
163
164    /// Returns the offset as a `usize` for indexing.
165    ///
166    /// # Panics
167    ///
168    /// Panics on 32-bit platforms if the offset exceeds `usize::MAX`.
169    pub fn as_usize(&self) -> usize {
170        self.0 as usize
171    }
172}
173
174impl Display for Offset {
175    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
176        write!(f, "{}", self.0)
177    }
178}
179
180impl Add for Offset {
181    type Output = Self;
182    fn add(self, rhs: Self) -> Self::Output {
183        Self(self.0 + rhs.0)
184    }
185}
186
187impl AddAssign for Offset {
188    fn add_assign(&mut self, rhs: Self) {
189        self.0 += rhs.0;
190    }
191}
192
193impl Sub for Offset {
194    type Output = Self;
195    fn sub(self, rhs: Self) -> Self::Output {
196        Self(self.0 - rhs.0)
197    }
198}
199
200impl From<u64> for Offset {
201    fn from(value: u64) -> Self {
202        Self(value)
203    }
204}
205
206impl From<Offset> for u64 {
207    fn from(offset: Offset) -> Self {
208        offset.0
209    }
210}
211
212/// Unique identifier for a replication group.
213#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
214pub struct GroupId(u64);
215
216impl Display for GroupId {
217    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
218        write!(f, "{}", self.0)
219    }
220}
221
222impl GroupId {
223    pub fn new(id: u64) -> Self {
224        Self(id)
225    }
226}
227
228impl From<u64> for GroupId {
229    fn from(value: u64) -> Self {
230        Self(value)
231    }
232}
233
234impl From<GroupId> for u64 {
235    fn from(id: GroupId) -> Self {
236        id.0
237    }
238}
239
240// ============================================================================
241// Cryptographic Hash - Copy (fixed 32-byte value)
242// ============================================================================
243
244/// Length of cryptographic hashes in bytes (SHA-256 / BLAKE3).
245pub const HASH_LENGTH: usize = 32;
246
247/// A 32-byte cryptographic hash.
248///
249/// This is a foundation type used across `Kimberlite` for:
250/// - Hash chain links (`prev_hash` in records)
251/// - Verification anchors (in checkpoints and projections)
252/// - Content addressing
253///
254/// The specific algorithm (SHA-256 for compliance, BLAKE3 for internal)
255/// is determined by the context where the hash is computed. This type
256/// only stores the resulting 32-byte digest.
257#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
258pub struct Hash([u8; HASH_LENGTH]);
259
260impl Hash {
261    /// The genesis hash (all zeros) used as the `prev_hash` for the first record.
262    pub const GENESIS: Hash = Hash([0u8; HASH_LENGTH]);
263
264    /// Creates a hash from raw bytes.
265    pub fn from_bytes(bytes: [u8; HASH_LENGTH]) -> Self {
266        Self(bytes)
267    }
268
269    /// Returns the hash as a byte slice.
270    pub fn as_bytes(&self) -> &[u8; HASH_LENGTH] {
271        &self.0
272    }
273
274    /// Returns true if this is the genesis hash (all zeros).
275    pub fn is_genesis(&self) -> bool {
276        self.0 == [0u8; HASH_LENGTH]
277    }
278}
279
280impl Debug for Hash {
281    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
282        // Show first 8 bytes in hex for debugging without exposing full hash
283        write!(
284            f,
285            "Hash({:02x}{:02x}{:02x}{:02x}{:02x}{:02x}{:02x}{:02x}...)",
286            self.0[0], self.0[1], self.0[2], self.0[3], self.0[4], self.0[5], self.0[6], self.0[7]
287        )
288    }
289}
290
291impl Display for Hash {
292    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
293        // Full hex representation for display
294        for byte in &self.0 {
295            write!(f, "{byte:02x}")?;
296        }
297        Ok(())
298    }
299}
300
301impl Default for Hash {
302    fn default() -> Self {
303        Self::GENESIS
304    }
305}
306
307impl From<[u8; HASH_LENGTH]> for Hash {
308    fn from(bytes: [u8; HASH_LENGTH]) -> Self {
309        Self(bytes)
310    }
311}
312
313impl From<Hash> for [u8; HASH_LENGTH] {
314    fn from(hash: Hash) -> Self {
315        hash.0
316    }
317}
318
319impl AsRef<[u8]> for Hash {
320    fn as_ref(&self) -> &[u8] {
321        &self.0
322    }
323}
324
325// ============================================================================
326// Timestamp - Copy (8-byte value with monotonic guarantee)
327// ============================================================================
328
329/// Wall-clock timestamp with monotonic guarantee within the system.
330///
331/// Compliance requires real-world time for audit trails; monotonicity
332/// prevents ordering issues when system clocks are adjusted.
333///
334/// Stored as nanoseconds since Unix epoch (1970-01-01 00:00:00 UTC).
335/// This gives us ~584 years of range, well beyond any practical use.
336#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
337pub struct Timestamp(u64);
338
339impl Timestamp {
340    /// The Unix epoch (1970-01-01 00:00:00 UTC).
341    pub const EPOCH: Timestamp = Timestamp(0);
342
343    /// Creates a timestamp from nanoseconds since Unix epoch.
344    pub fn from_nanos(nanos: u64) -> Self {
345        Self(nanos)
346    }
347
348    /// Returns the timestamp as nanoseconds since Unix epoch.
349    pub fn as_nanos(&self) -> u64 {
350        self.0
351    }
352
353    /// Returns the timestamp as seconds since Unix epoch (truncates nanoseconds).
354    pub fn as_secs(&self) -> u64 {
355        self.0 / 1_000_000_000
356    }
357
358    /// Creates a timestamp for the current time.
359    ///
360    /// # Panics
361    ///
362    /// Panics if the system clock is before Unix epoch (should never happen).
363    pub fn now() -> Self {
364        let duration = SystemTime::now()
365            .duration_since(UNIX_EPOCH)
366            .expect("system clock is before Unix epoch");
367        Self(duration.as_nanos() as u64)
368    }
369
370    /// Creates a timestamp ensuring monotonicity: `max(now, last + 1ns)`.
371    ///
372    /// This guarantees that each timestamp is strictly greater than the previous,
373    /// even if the system clock moves backwards or two events occur in the same
374    /// nanosecond.
375    ///
376    /// # Arguments
377    ///
378    /// * `last` - The previous timestamp, if any. Pass `None` for the first timestamp.
379    pub fn now_monotonic(last: Option<Timestamp>) -> Self {
380        let now = Self::now();
381        match last {
382            Some(prev) => {
383                // Ensure strictly increasing: at least prev + 1 nanosecond
384                if now.0 <= prev.0 {
385                    Timestamp(prev.0.saturating_add(1))
386                } else {
387                    now
388                }
389            }
390            None => now,
391        }
392    }
393}
394
395impl Display for Timestamp {
396    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
397        // Display as seconds.nanoseconds for readability
398        let secs = self.0 / 1_000_000_000;
399        let nanos = self.0 % 1_000_000_000;
400        write!(f, "{secs}.{nanos:09}")
401    }
402}
403
404impl Default for Timestamp {
405    fn default() -> Self {
406        Self::EPOCH
407    }
408}
409
410impl From<u64> for Timestamp {
411    fn from(nanos: u64) -> Self {
412        Self(nanos)
413    }
414}
415
416impl From<Timestamp> for u64 {
417    fn from(ts: Timestamp) -> Self {
418        ts.0
419    }
420}
421
422// ============================================================================
423// Record Types - Copy (small enum and struct)
424// ============================================================================
425
426/// The kind of record stored in the log.
427///
428/// This enum distinguishes between different record types to enable
429/// efficient processing and verification.
430#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default, Serialize, Deserialize)]
431pub enum RecordKind {
432    /// Normal application data record.
433    #[default]
434    Data,
435    /// Periodic verification checkpoint (contains cumulative hash).
436    Checkpoint,
437    /// Logical deletion marker (data is not physically deleted).
438    Tombstone,
439}
440
441impl RecordKind {
442    /// Returns the single-byte discriminant for serialization.
443    pub fn as_byte(&self) -> u8 {
444        match self {
445            RecordKind::Data => 0,
446            RecordKind::Checkpoint => 1,
447            RecordKind::Tombstone => 2,
448        }
449    }
450
451    /// Creates a `RecordKind` from its byte discriminant.
452    ///
453    /// # Errors
454    ///
455    /// Returns `None` if the byte is not a valid discriminant.
456    pub fn from_byte(byte: u8) -> Option<Self> {
457        match byte {
458            0 => Some(RecordKind::Data),
459            1 => Some(RecordKind::Checkpoint),
460            2 => Some(RecordKind::Tombstone),
461            _ => None,
462        }
463    }
464}
465
466/// Metadata header for every log record.
467///
468/// This structure contains all metadata needed to verify and process
469/// a log record without reading its payload.
470#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
471pub struct RecordHeader {
472    /// Position in the log (0-indexed).
473    pub offset: Offset,
474    /// SHA-256 link to the previous record's hash (genesis for first record).
475    pub prev_hash: Hash,
476    /// When the record was committed (monotonic wall-clock).
477    pub timestamp: Timestamp,
478    /// Size of the payload in bytes.
479    pub payload_len: u32,
480    /// Type of record (Data, Checkpoint, Tombstone).
481    pub record_kind: RecordKind,
482}
483
484impl RecordHeader {
485    /// Creates a new record header.
486    ///
487    /// # Arguments
488    ///
489    /// * `offset` - Position in the log
490    /// * `prev_hash` - Hash of the previous record (or GENESIS for first)
491    /// * `timestamp` - When this record was committed
492    /// * `payload_len` - Size of the payload in bytes
493    /// * `record_kind` - Type of record
494    pub fn new(
495        offset: Offset,
496        prev_hash: Hash,
497        timestamp: Timestamp,
498        payload_len: u32,
499        record_kind: RecordKind,
500    ) -> Self {
501        Self {
502            offset,
503            prev_hash,
504            timestamp,
505            payload_len,
506            record_kind,
507        }
508    }
509
510    /// Returns true if this is the first record in the log.
511    pub fn is_genesis(&self) -> bool {
512        self.offset == Offset::ZERO && self.prev_hash.is_genesis()
513    }
514}
515
516// ============================================================================
517// Projection Tracking - Copy (small struct for projections)
518// ============================================================================
519
520/// Tracks which log entry a projection row was derived from.
521///
522/// Projections embed this in each row to enable:
523/// - Point-in-time queries (find rows at a specific offset)
524/// - Verification without walking the hash chain (hash provides direct check)
525/// - Audit trails (know exactly which event created/updated a row)
526#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
527pub struct AppliedIndex {
528    /// The log offset this row was derived from.
529    pub offset: Offset,
530    /// The hash at this offset for direct verification.
531    pub hash: Hash,
532}
533
534impl AppliedIndex {
535    /// Creates a new applied index.
536    pub fn new(offset: Offset, hash: Hash) -> Self {
537        Self { offset, hash }
538    }
539
540    /// Creates the initial applied index (before any records).
541    pub fn genesis() -> Self {
542        Self {
543            offset: Offset::ZERO,
544            hash: Hash::GENESIS,
545        }
546    }
547}
548
549impl Default for AppliedIndex {
550    fn default() -> Self {
551        Self::genesis()
552    }
553}
554
555// ============================================================================
556// Checkpoints - Copy (verification anchors in the log)
557// ============================================================================
558
559/// A periodic verification checkpoint stored in the log.
560///
561/// Checkpoints are records IN the log (not separate files), which means:
562/// - They are part of the hash chain (tamper-evident)
563/// - Checkpoint history is immutable
564/// - Single source of truth
565///
566/// Checkpoints enable efficient verified reads by providing trusted
567/// anchor points, reducing verification from O(n) to O(k) where k is
568/// the distance to the nearest checkpoint.
569#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
570pub struct Checkpoint {
571    /// Log position of this checkpoint record.
572    pub offset: Offset,
573    /// Cumulative hash of the chain at this point.
574    pub chain_hash: Hash,
575    /// Total number of records from genesis to this checkpoint.
576    pub record_count: u64,
577    /// When this checkpoint was created.
578    pub created_at: Timestamp,
579}
580
581impl Checkpoint {
582    /// Creates a new checkpoint.
583    ///
584    /// # Preconditions
585    ///
586    /// - `record_count` should equal `offset.as_u64() + 1` (0-indexed offset)
587    pub fn new(offset: Offset, chain_hash: Hash, record_count: u64, created_at: Timestamp) -> Self {
588        debug_assert_eq!(
589            record_count,
590            offset.as_u64() + 1,
591            "record_count should equal offset + 1"
592        );
593        Self {
594            offset,
595            chain_hash,
596            record_count,
597            created_at,
598        }
599    }
600}
601
602/// Policy for when to create checkpoints.
603///
604/// Checkpoints bound the worst-case verification cost. The default policy
605/// creates a checkpoint every 1000 records and on graceful shutdown.
606#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
607pub struct CheckpointPolicy {
608    /// Create a checkpoint every N records. Set to 0 to disable.
609    pub every_n_records: u64,
610    /// Create a checkpoint on graceful shutdown.
611    pub on_shutdown: bool,
612    /// If true, disable automatic checkpoints (only explicit calls).
613    pub explicit_only: bool,
614}
615
616impl CheckpointPolicy {
617    /// Creates a policy that checkpoints every N records.
618    pub fn every(n: u64) -> Self {
619        Self {
620            every_n_records: n,
621            on_shutdown: true,
622            explicit_only: false,
623        }
624    }
625
626    /// Creates a policy that only creates explicit checkpoints.
627    pub fn explicit_only() -> Self {
628        Self {
629            every_n_records: 0,
630            on_shutdown: false,
631            explicit_only: true,
632        }
633    }
634
635    /// Returns true if a checkpoint should be created at this offset.
636    pub fn should_checkpoint(&self, offset: Offset) -> bool {
637        if self.explicit_only {
638            return false;
639        }
640        if self.every_n_records == 0 {
641            return false;
642        }
643        // Checkpoint at offsets that are multiples of every_n_records
644        // (offset 999 for every_n_records=1000, etc.)
645        (offset.as_u64() + 1) % self.every_n_records == 0
646    }
647}
648
649impl Default for CheckpointPolicy {
650    /// Default policy: checkpoint every 1000 records, on shutdown.
651    fn default() -> Self {
652        Self {
653            every_n_records: 1000,
654            on_shutdown: true,
655            explicit_only: false,
656        }
657    }
658}
659
660// ============================================================================
661// Idempotency - Copy (16-byte identifier for duplicate prevention)
662// ============================================================================
663
664/// Length of idempotency IDs in bytes.
665pub const IDEMPOTENCY_ID_LENGTH: usize = 16;
666
667/// Unique identifier for duplicate transaction prevention.
668///
669/// Clients generate an `IdempotencyId` before their first attempt at a
670/// transaction. If the transaction needs to be retried (e.g., network
671/// timeout), the client reuses the same ID. The server tracks committed
672/// IDs to return the same result for duplicate requests.
673///
674/// Inspired by `FoundationDB`'s idempotency key design.
675///
676/// # FCIS Pattern
677///
678/// This type follows the Functional Core / Imperative Shell pattern:
679/// - `from_bytes()`: Pure restoration from storage
680/// - `from_random_bytes()`: Pure construction from bytes (`pub(crate)`)
681/// - `generate()`: Impure shell that invokes CSPRNG
682#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
683pub struct IdempotencyId([u8; IDEMPOTENCY_ID_LENGTH]);
684
685impl IdempotencyId {
686    // ========================================================================
687    // Functional Core (pure, testable)
688    // ========================================================================
689
690    /// Pure construction from random bytes.
691    ///
692    /// Restricted to `pub(crate)` to prevent misuse with weak random sources.
693    /// External callers should use `generate()` or `from_bytes()`.
694    pub(crate) fn from_random_bytes(bytes: [u8; IDEMPOTENCY_ID_LENGTH]) -> Self {
695        debug_assert!(
696            bytes.iter().any(|&b| b != 0),
697            "idempotency ID bytes are all zeros"
698        );
699        Self(bytes)
700    }
701
702    /// Restoration from stored bytes (pure).
703    ///
704    /// Use this when loading an `IdempotencyId` from storage or wire protocol.
705    pub fn from_bytes(bytes: [u8; IDEMPOTENCY_ID_LENGTH]) -> Self {
706        Self(bytes)
707    }
708
709    /// Returns the ID as a byte slice.
710    pub fn as_bytes(&self) -> &[u8; IDEMPOTENCY_ID_LENGTH] {
711        &self.0
712    }
713
714    // ========================================================================
715    // Imperative Shell (IO boundary)
716    // ========================================================================
717
718    /// Generates a new random idempotency ID using the OS CSPRNG.
719    ///
720    /// # Panics
721    ///
722    /// Panics if the OS CSPRNG fails, which indicates a catastrophic
723    /// system error (e.g., no entropy source available).
724    pub fn generate() -> Self {
725        let mut bytes = [0u8; IDEMPOTENCY_ID_LENGTH];
726        getrandom::fill(&mut bytes).expect("CSPRNG failure is catastrophic");
727        Self::from_random_bytes(bytes)
728    }
729}
730
731impl Debug for IdempotencyId {
732    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
733        // Show full hex for debugging (IDs are meant to be logged)
734        write!(f, "IdempotencyId(")?;
735        for byte in &self.0 {
736            write!(f, "{byte:02x}")?;
737        }
738        write!(f, ")")
739    }
740}
741
742impl Display for IdempotencyId {
743    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
744        // Full hex representation
745        for byte in &self.0 {
746            write!(f, "{byte:02x}")?;
747        }
748        Ok(())
749    }
750}
751
752impl From<[u8; IDEMPOTENCY_ID_LENGTH]> for IdempotencyId {
753    fn from(bytes: [u8; IDEMPOTENCY_ID_LENGTH]) -> Self {
754        Self::from_bytes(bytes)
755    }
756}
757
758impl From<IdempotencyId> for [u8; IDEMPOTENCY_ID_LENGTH] {
759    fn from(id: IdempotencyId) -> Self {
760        id.0
761    }
762}
763
764// ============================================================================
765// Recovery Tracking - Copy (generation-based recovery for compliance)
766// ============================================================================
767
768/// Monotonically increasing recovery generation.
769///
770/// Each recovery event creates a new generation. This provides natural
771/// audit checkpoints and explicit tracking of system recovery events.
772///
773/// Inspired by `FoundationDB`'s generation-based recovery tracking.
774#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
775pub struct Generation(u64);
776
777impl Generation {
778    /// The initial generation (before any recovery).
779    pub const INITIAL: Generation = Generation(0);
780
781    /// Creates a generation from a raw value.
782    pub fn new(value: u64) -> Self {
783        Self(value)
784    }
785
786    /// Returns the generation as a u64.
787    pub fn as_u64(&self) -> u64 {
788        self.0
789    }
790
791    /// Returns the next generation (incremented by 1).
792    pub fn next(&self) -> Self {
793        Generation(self.0.saturating_add(1))
794    }
795}
796
797impl Display for Generation {
798    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
799        write!(f, "gen:{}", self.0)
800    }
801}
802
803impl Default for Generation {
804    fn default() -> Self {
805        Self::INITIAL
806    }
807}
808
809impl From<u64> for Generation {
810    fn from(value: u64) -> Self {
811        Self(value)
812    }
813}
814
815impl From<Generation> for u64 {
816    fn from(generation: Generation) -> Self {
817        generation.0
818    }
819}
820
821/// Reason why a recovery was triggered.
822///
823/// This is recorded in the recovery log for compliance auditing.
824#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
825pub enum RecoveryReason {
826    /// Normal node restart (graceful or crash recovery).
827    NodeRestart,
828    /// Lost quorum and had to recover from remaining replicas.
829    QuorumLoss,
830    /// Detected data corruption requiring recovery.
831    CorruptionDetected,
832    /// Operator manually triggered recovery.
833    ManualIntervention,
834}
835
836impl Display for RecoveryReason {
837    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
838        match self {
839            RecoveryReason::NodeRestart => write!(f, "node_restart"),
840            RecoveryReason::QuorumLoss => write!(f, "quorum_loss"),
841            RecoveryReason::CorruptionDetected => write!(f, "corruption_detected"),
842            RecoveryReason::ManualIntervention => write!(f, "manual_intervention"),
843        }
844    }
845}
846
847/// Records a recovery event with explicit tracking of any data loss.
848///
849/// Critical for compliance: auditors can see exactly what happened during
850/// recovery, including any mutations that were discarded.
851///
852/// Inspired by `FoundationDB`'s 9-phase recovery with explicit data loss tracking.
853#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
854pub struct RecoveryRecord {
855    /// New generation after recovery.
856    pub generation: Generation,
857    /// Previous generation before recovery.
858    pub previous_generation: Generation,
859    /// Last known committed offset before recovery.
860    pub known_committed: Offset,
861    /// Offset we recovered to.
862    pub recovery_point: Offset,
863    /// Range of discarded prepares (if any) - EXPLICIT LOSS TRACKING.
864    ///
865    /// If `Some`, this range of offsets contained prepared but uncommitted
866    /// mutations that were discarded during recovery. This is the critical
867    /// compliance field: it explicitly documents any data loss.
868    pub discarded_range: Option<Range<Offset>>,
869    /// When recovery occurred.
870    pub timestamp: Timestamp,
871    /// Why recovery was triggered.
872    pub reason: RecoveryReason,
873}
874
875impl RecoveryRecord {
876    /// Creates a new recovery record.
877    ///
878    /// # Arguments
879    ///
880    /// * `generation` - The new generation after recovery
881    /// * `previous_generation` - The generation before recovery
882    /// * `known_committed` - Last known committed offset
883    /// * `recovery_point` - The offset we recovered to
884    /// * `discarded_range` - Range of discarded uncommitted prepares, if any
885    /// * `timestamp` - When recovery occurred
886    /// * `reason` - Why recovery was triggered
887    ///
888    /// # Preconditions
889    ///
890    /// - `generation` must be greater than `previous_generation`
891    /// - `recovery_point` must be <= `known_committed`
892    pub fn new(
893        generation: Generation,
894        previous_generation: Generation,
895        known_committed: Offset,
896        recovery_point: Offset,
897        discarded_range: Option<Range<Offset>>,
898        timestamp: Timestamp,
899        reason: RecoveryReason,
900    ) -> Self {
901        debug_assert!(
902            generation > previous_generation,
903            "new generation must be greater than previous"
904        );
905        debug_assert!(
906            recovery_point <= known_committed,
907            "recovery point cannot exceed known committed"
908        );
909
910        Self {
911            generation,
912            previous_generation,
913            known_committed,
914            recovery_point,
915            discarded_range,
916            timestamp,
917            reason,
918        }
919    }
920
921    /// Returns true if any data was lost during this recovery.
922    pub fn had_data_loss(&self) -> bool {
923        self.discarded_range.is_some()
924    }
925
926    /// Returns the number of discarded records, if any.
927    pub fn discarded_count(&self) -> u64 {
928        self.discarded_range
929            .as_ref()
930            .map_or(0, |r| r.end.as_u64().saturating_sub(r.start.as_u64()))
931    }
932}
933
934// ============================================================================
935// Stream Name - Clone (contains String, but rarely cloned)
936// ============================================================================
937
938/// Human-readable name for a stream.
939#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
940pub struct StreamName(String);
941
942impl StreamName {
943    pub fn new(name: impl Into<String>) -> Self {
944        Self(name.into())
945    }
946
947    pub fn as_str(&self) -> &str {
948        &self.0
949    }
950}
951
952impl Display for StreamName {
953    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
954        write!(f, "{}", self.0)
955    }
956}
957
958impl From<String> for StreamName {
959    fn from(name: String) -> Self {
960        Self(name)
961    }
962}
963
964impl From<&str> for StreamName {
965    fn from(name: &str) -> Self {
966        Self(name.to_string())
967    }
968}
969
970impl From<StreamName> for String {
971    fn from(value: StreamName) -> Self {
972        value.0
973    }
974}
975
976// ============================================================================
977// Data Classification - Copy (simple enum, no heap data)
978// ============================================================================
979
980/// Classification of data for compliance purposes.
981#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
982pub enum DataClass {
983    /// Protected Health Information - subject to HIPAA restrictions.
984    PHI,
985    /// Non-PHI data that doesn't contain health information.
986    NonPHI,
987    /// Data that has been de-identified per HIPAA Safe Harbor.
988    Deidentified,
989}
990
991// ============================================================================
992// Placement - Clone (Region::Custom contains String)
993// ============================================================================
994
995/// Placement policy for a stream.
996#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
997pub enum Placement {
998    /// Data must remain within the specified region.
999    Region(Region),
1000    /// Data can be replicated globally across all regions.
1001    Global,
1002}
1003
1004/// Geographic region for data placement.
1005#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
1006pub enum Region {
1007    /// US East (N. Virginia) - us-east-1
1008    USEast1,
1009    /// Asia Pacific (Sydney) - ap-southeast-2
1010    APSoutheast2,
1011    /// Custom region identifier
1012    Custom(String),
1013}
1014
1015impl Region {
1016    pub fn custom(name: impl Into<String>) -> Self {
1017        Self::Custom(name.into())
1018    }
1019}
1020
1021impl Display for Region {
1022    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1023        match self {
1024            Region::USEast1 => write!(f, "us-east-1"),
1025            Region::APSoutheast2 => write!(f, "ap-southeast-2"),
1026            Region::Custom(custom) => write!(f, "{custom}"),
1027        }
1028    }
1029}
1030
1031// ============================================================================
1032// Stream Metadata - Clone (created once per stream, cloned rarely)
1033// ============================================================================
1034
1035/// Metadata describing a stream's configuration and current state.
1036#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
1037pub struct StreamMetadata {
1038    pub stream_id: StreamId,
1039    pub stream_name: StreamName,
1040    pub data_class: DataClass,
1041    pub placement: Placement,
1042    pub current_offset: Offset,
1043}
1044
1045impl StreamMetadata {
1046    /// Creates new stream metadata with offset initialized to 0.
1047    pub fn new(
1048        stream_id: StreamId,
1049        stream_name: StreamName,
1050        data_class: DataClass,
1051        placement: Placement,
1052    ) -> Self {
1053        Self {
1054            stream_id,
1055            stream_name,
1056            data_class,
1057            placement,
1058            current_offset: Offset::default(),
1059        }
1060    }
1061}
1062
1063// ============================================================================
1064// Batch Payload - NOT Clone (contains Vec<Bytes>, move only)
1065// ============================================================================
1066
1067/// A batch of events to append to a stream.
1068#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
1069pub struct BatchPayload {
1070    pub stream_id: StreamId,
1071    /// The events to append (zero-copy Bytes).
1072    pub events: Vec<Bytes>,
1073    /// Expected current offset for optimistic concurrency.
1074    pub expected_offset: Offset,
1075}
1076
1077impl BatchPayload {
1078    pub fn new(stream_id: StreamId, events: Vec<Bytes>, expected_offset: Offset) -> Self {
1079        Self {
1080            stream_id,
1081            events,
1082            expected_offset,
1083        }
1084    }
1085}
1086
1087// ============================================================================
1088// Audit Actions - Clone (for flexibility in logging)
1089// ============================================================================
1090
1091/// Actions recorded in the audit log.
1092#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1093pub enum AuditAction {
1094    /// A new stream was created.
1095    StreamCreated {
1096        stream_id: StreamId,
1097        stream_name: StreamName,
1098        data_class: DataClass,
1099        placement: Placement,
1100    },
1101    /// Events were appended to a stream.
1102    EventsAppended {
1103        stream_id: StreamId,
1104        count: u32,
1105        from_offset: Offset,
1106    },
1107}
1108
1109// ============================================================================
1110// Event Persistence - Trait for durable event log writes
1111// ============================================================================
1112
1113/// Abstraction for persisting events to the durable event log.
1114///
1115/// This trait is the bridge between the projection layer and the
1116/// `Kimberlite` replication system. Implementations must block until
1117/// persistence is confirmed.
1118///
1119/// # Healthcare Compliance
1120///
1121/// This is the critical path for HIPAA compliance. The implementation must:
1122/// - **Block until VSR consensus** completes (quorum durability)
1123/// - **Return `Err`** if consensus fails (triggers rollback)
1124/// - **Never return `Ok`** unless events are durably stored
1125///
1126/// # Implementation Notes
1127///
1128/// The implementor (typically `Runtime`) must handle the sync→async bridge:
1129///
1130/// ```ignore
1131/// impl EventPersister for RuntimeHandle {
1132///     fn persist_blocking(&self, stream_id: StreamId, events: Vec<Bytes>) -> Result<Offset, PersistError> {
1133///         // Bridge sync callback to async runtime
1134///         tokio::task::block_in_place(|| {
1135///             tokio::runtime::Handle::current().block_on(async {
1136///                 self.inner.append(stream_id, events).await
1137///             })
1138///         })
1139///         .map_err(|e| {
1140///             tracing::error!(error = %e, "VSR persistence failed");
1141///             PersistError::ConsensusFailed
1142///         })
1143///     }
1144/// }
1145/// ```
1146///
1147/// # Why `Vec<Bytes>` instead of typed events?
1148///
1149/// Events are serialized before reaching this trait. This keeps `kmb-types`
1150/// decoupled from domain-specific event schemas.
1151pub trait EventPersister: Send + Sync + Debug {
1152    /// Persist a batch of serialized events to the durable event log.
1153    ///
1154    /// This method **blocks** until VSR consensus confirms the events are
1155    /// durably stored on a quorum of nodes.
1156    ///
1157    /// # Arguments
1158    ///
1159    /// * `stream_id` - The stream to append events to
1160    /// * `events` - Serialized events
1161    ///
1162    /// # Returns
1163    ///
1164    /// * `Ok(offset)` - Events persisted, returns the new stream offset
1165    /// * `Err(PersistError)` - Persistence failed, caller should rollback
1166    ///
1167    /// # Errors
1168    ///
1169    /// * [`PersistError::ConsensusFailed`] - VSR quorum unavailable after retries
1170    /// * [`PersistError::StorageError`] - Disk I/O or serialization failure
1171    /// * [`PersistError::ShuttingDown`] - System is terminating
1172    fn persist_blocking(
1173        &self,
1174        stream_id: StreamId,
1175        events: Vec<Bytes>,
1176    ) -> Result<Offset, PersistError>;
1177}
1178
1179/// Error returned when event persistence fails.
1180///
1181/// The hook uses this to decide whether to rollback the transaction.
1182/// Specific underlying errors are logged by the implementation.
1183#[derive(Debug, Clone, PartialEq, Eq)]
1184pub enum PersistError {
1185    /// VSR consensus failed after retries (quorum unavailable)
1186    ConsensusFailed,
1187    /// Storage I/O error
1188    StorageError,
1189    /// System is shutting down
1190    ShuttingDown,
1191}
1192
1193impl std::fmt::Display for PersistError {
1194    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1195        match self {
1196            Self::ConsensusFailed => write!(f, "consensus failed after retries"),
1197            Self::StorageError => write!(f, "storage I/O error"),
1198            Self::ShuttingDown => write!(f, "system is shutting down"),
1199        }
1200    }
1201}
1202
1203impl std::error::Error for PersistError {}
1204
1205#[cfg(test)]
1206mod tests;