kimberlite_types/lib.rs
1//! # kmb-types: Core types for `Kimberlite`
2//!
3//! This crate contains shared types used across the `Kimberlite` system:
4//! - Entity IDs ([`TenantId`], [`StreamId`], [`Offset`], [`GroupId`])
5//! - Cryptographic types ([`struct@Hash`])
6//! - Temporal types ([`Timestamp`])
7//! - Log record types ([`RecordKind`], [`RecordHeader`], [`Checkpoint`], [`CheckpointPolicy`])
8//! - Projection tracking ([`AppliedIndex`])
9//! - Idempotency ([`IdempotencyId`])
10//! - Recovery tracking ([`Generation`], [`RecoveryRecord`], [`RecoveryReason`])
11//! - Data classification ([`DataClass`])
12//! - Placement rules ([`Placement`], [`Region`])
13//! - Stream metadata ([`StreamMetadata`])
14//! - Audit actions ([`AuditAction`])
15//! - Event persistence ([`EventPersister`], [`PersistError`])
16
17use std::{
18 fmt::{Debug, Display},
19 ops::{Add, AddAssign, Range, Sub},
20 time::{SystemTime, UNIX_EPOCH},
21};
22
23use bytes::Bytes;
24use serde::{Deserialize, Serialize};
25
26// ============================================================================
27// Entity IDs - All Copy (cheap 8-byte values)
28// ============================================================================
29
30/// Unique identifier for a tenant (organization/customer).
31#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
32pub struct TenantId(u64);
33
34impl TenantId {
35 pub fn new(id: u64) -> Self {
36 Self(id)
37 }
38
39 /// Extracts tenant ID from stream ID (upper 32 bits).
40 ///
41 /// **Bit Layout**:
42 /// - Upper 32 bits: `tenant_id` (supports 4.3B tenants)
43 /// - Lower 32 bits: `local_stream_id` (4.3B streams per tenant)
44 ///
45 /// # Examples
46 ///
47 /// ```
48 /// # use kimberlite_types::{TenantId, StreamId};
49 /// let stream_id = StreamId::from_tenant_and_local(TenantId::from(5), 1);
50 /// assert_eq!(TenantId::from_stream_id(stream_id), TenantId::from(5));
51 /// ```
52 pub fn from_stream_id(stream_id: StreamId) -> Self {
53 TenantId::from(u64::from(stream_id) >> 32)
54 }
55}
56
57impl From<u64> for TenantId {
58 fn from(value: u64) -> Self {
59 Self(value)
60 }
61}
62
63impl From<TenantId> for u64 {
64 fn from(id: TenantId) -> Self {
65 id.0
66 }
67}
68
69/// Unique identifier for a stream within the system.
70#[derive(
71 Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize, Default,
72)]
73pub struct StreamId(u64);
74
75impl StreamId {
76 pub fn new(id: u64) -> Self {
77 Self(id)
78 }
79
80 /// Creates stream ID from tenant ID and local stream number.
81 ///
82 /// **Bit Layout**:
83 /// - Upper 32 bits: `tenant_id` (supports 4.3B tenants)
84 /// - Lower 32 bits: `local_stream_id` (4.3B streams per tenant)
85 ///
86 /// # Examples
87 ///
88 /// ```
89 /// # use kimberlite_types::{TenantId, StreamId};
90 /// let stream_id = StreamId::from_tenant_and_local(TenantId::from(5), 1);
91 /// assert_eq!(u64::from(stream_id), 21474836481); // (5 << 32) | 1
92 /// assert_eq!(TenantId::from_stream_id(stream_id), TenantId::from(5));
93 /// ```
94 pub fn from_tenant_and_local(tenant_id: TenantId, local_id: u32) -> Self {
95 let tenant_bits = u64::from(tenant_id) << 32;
96 let local_bits = u64::from(local_id);
97 StreamId::from(tenant_bits | local_bits)
98 }
99
100 /// Extracts local stream ID (lower 32 bits).
101 ///
102 /// # Examples
103 ///
104 /// ```
105 /// # use kimberlite_types::{TenantId, StreamId};
106 /// let stream_id = StreamId::from_tenant_and_local(TenantId::from(5), 42);
107 /// assert_eq!(stream_id.local_id(), 42);
108 /// ```
109 pub fn local_id(self) -> u32 {
110 (u64::from(self) & 0xFFFF_FFFF) as u32
111 }
112}
113
114impl Add for StreamId {
115 type Output = StreamId;
116
117 fn add(self, rhs: Self) -> Self::Output {
118 let v = self.0 + rhs.0;
119 StreamId::new(v)
120 }
121}
122
123impl Display for StreamId {
124 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
125 write!(f, "{}", self.0)
126 }
127}
128
129impl From<u64> for StreamId {
130 fn from(value: u64) -> Self {
131 Self(value)
132 }
133}
134
135impl From<StreamId> for u64 {
136 fn from(id: StreamId) -> Self {
137 id.0
138 }
139}
140
141/// Position of an event within a stream.
142///
143/// Offsets are zero-indexed and sequential. The first event in a stream
144/// has offset 0, the second has offset 1, and so on.
145///
146/// Uses `u64` internally — offsets are never negative by definition.
147#[derive(
148 Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize, Default,
149)]
150pub struct Offset(u64);
151
152impl Offset {
153 pub const ZERO: Offset = Offset(0);
154
155 pub fn new(offset: u64) -> Self {
156 Self(offset)
157 }
158
159 /// Returns the offset as a `u64`.
160 pub fn as_u64(&self) -> u64 {
161 self.0
162 }
163
164 /// Returns the offset as a `usize` for indexing.
165 ///
166 /// # Panics
167 ///
168 /// Panics on 32-bit platforms if the offset exceeds `usize::MAX`.
169 pub fn as_usize(&self) -> usize {
170 self.0 as usize
171 }
172}
173
174impl Display for Offset {
175 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
176 write!(f, "{}", self.0)
177 }
178}
179
180impl Add for Offset {
181 type Output = Self;
182 fn add(self, rhs: Self) -> Self::Output {
183 Self(self.0 + rhs.0)
184 }
185}
186
187impl AddAssign for Offset {
188 fn add_assign(&mut self, rhs: Self) {
189 self.0 += rhs.0;
190 }
191}
192
193impl Sub for Offset {
194 type Output = Self;
195 fn sub(self, rhs: Self) -> Self::Output {
196 Self(self.0 - rhs.0)
197 }
198}
199
200impl From<u64> for Offset {
201 fn from(value: u64) -> Self {
202 Self(value)
203 }
204}
205
206impl From<Offset> for u64 {
207 fn from(offset: Offset) -> Self {
208 offset.0
209 }
210}
211
212/// Unique identifier for a replication group.
213#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
214pub struct GroupId(u64);
215
216impl Display for GroupId {
217 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
218 write!(f, "{}", self.0)
219 }
220}
221
222impl GroupId {
223 pub fn new(id: u64) -> Self {
224 Self(id)
225 }
226}
227
228impl From<u64> for GroupId {
229 fn from(value: u64) -> Self {
230 Self(value)
231 }
232}
233
234impl From<GroupId> for u64 {
235 fn from(id: GroupId) -> Self {
236 id.0
237 }
238}
239
240// ============================================================================
241// Cryptographic Hash - Copy (fixed 32-byte value)
242// ============================================================================
243
244/// Length of cryptographic hashes in bytes (SHA-256 / BLAKE3).
245pub const HASH_LENGTH: usize = 32;
246
247/// A 32-byte cryptographic hash.
248///
249/// This is a foundation type used across `Kimberlite` for:
250/// - Hash chain links (`prev_hash` in records)
251/// - Verification anchors (in checkpoints and projections)
252/// - Content addressing
253///
254/// The specific algorithm (SHA-256 for compliance, BLAKE3 for internal)
255/// is determined by the context where the hash is computed. This type
256/// only stores the resulting 32-byte digest.
257#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
258pub struct Hash([u8; HASH_LENGTH]);
259
260impl Hash {
261 /// The genesis hash (all zeros) used as the `prev_hash` for the first record.
262 pub const GENESIS: Hash = Hash([0u8; HASH_LENGTH]);
263
264 /// Creates a hash from raw bytes.
265 pub fn from_bytes(bytes: [u8; HASH_LENGTH]) -> Self {
266 Self(bytes)
267 }
268
269 /// Returns the hash as a byte slice.
270 pub fn as_bytes(&self) -> &[u8; HASH_LENGTH] {
271 &self.0
272 }
273
274 /// Returns true if this is the genesis hash (all zeros).
275 pub fn is_genesis(&self) -> bool {
276 self.0 == [0u8; HASH_LENGTH]
277 }
278}
279
280impl Debug for Hash {
281 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
282 // Show first 8 bytes in hex for debugging without exposing full hash
283 write!(
284 f,
285 "Hash({:02x}{:02x}{:02x}{:02x}{:02x}{:02x}{:02x}{:02x}...)",
286 self.0[0], self.0[1], self.0[2], self.0[3], self.0[4], self.0[5], self.0[6], self.0[7]
287 )
288 }
289}
290
291impl Display for Hash {
292 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
293 // Full hex representation for display
294 for byte in &self.0 {
295 write!(f, "{byte:02x}")?;
296 }
297 Ok(())
298 }
299}
300
301impl Default for Hash {
302 fn default() -> Self {
303 Self::GENESIS
304 }
305}
306
307impl From<[u8; HASH_LENGTH]> for Hash {
308 fn from(bytes: [u8; HASH_LENGTH]) -> Self {
309 Self(bytes)
310 }
311}
312
313impl From<Hash> for [u8; HASH_LENGTH] {
314 fn from(hash: Hash) -> Self {
315 hash.0
316 }
317}
318
319impl AsRef<[u8]> for Hash {
320 fn as_ref(&self) -> &[u8] {
321 &self.0
322 }
323}
324
325// ============================================================================
326// Timestamp - Copy (8-byte value with monotonic guarantee)
327// ============================================================================
328
329/// Wall-clock timestamp with monotonic guarantee within the system.
330///
331/// Compliance requires real-world time for audit trails; monotonicity
332/// prevents ordering issues when system clocks are adjusted.
333///
334/// Stored as nanoseconds since Unix epoch (1970-01-01 00:00:00 UTC).
335/// This gives us ~584 years of range, well beyond any practical use.
336#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
337pub struct Timestamp(u64);
338
339impl Timestamp {
340 /// The Unix epoch (1970-01-01 00:00:00 UTC).
341 pub const EPOCH: Timestamp = Timestamp(0);
342
343 /// Creates a timestamp from nanoseconds since Unix epoch.
344 pub fn from_nanos(nanos: u64) -> Self {
345 Self(nanos)
346 }
347
348 /// Returns the timestamp as nanoseconds since Unix epoch.
349 pub fn as_nanos(&self) -> u64 {
350 self.0
351 }
352
353 /// Returns the timestamp as seconds since Unix epoch (truncates nanoseconds).
354 pub fn as_secs(&self) -> u64 {
355 self.0 / 1_000_000_000
356 }
357
358 /// Creates a timestamp for the current time.
359 ///
360 /// # Panics
361 ///
362 /// Panics if the system clock is before Unix epoch (should never happen).
363 pub fn now() -> Self {
364 let duration = SystemTime::now()
365 .duration_since(UNIX_EPOCH)
366 .expect("system clock is before Unix epoch");
367 Self(duration.as_nanos() as u64)
368 }
369
370 /// Creates a timestamp ensuring monotonicity: `max(now, last + 1ns)`.
371 ///
372 /// This guarantees that each timestamp is strictly greater than the previous,
373 /// even if the system clock moves backwards or two events occur in the same
374 /// nanosecond.
375 ///
376 /// # Arguments
377 ///
378 /// * `last` - The previous timestamp, if any. Pass `None` for the first timestamp.
379 pub fn now_monotonic(last: Option<Timestamp>) -> Self {
380 let now = Self::now();
381 match last {
382 Some(prev) => {
383 // Ensure strictly increasing: at least prev + 1 nanosecond
384 if now.0 <= prev.0 {
385 Timestamp(prev.0.saturating_add(1))
386 } else {
387 now
388 }
389 }
390 None => now,
391 }
392 }
393}
394
395impl Display for Timestamp {
396 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
397 // Display as seconds.nanoseconds for readability
398 let secs = self.0 / 1_000_000_000;
399 let nanos = self.0 % 1_000_000_000;
400 write!(f, "{secs}.{nanos:09}")
401 }
402}
403
404impl Default for Timestamp {
405 fn default() -> Self {
406 Self::EPOCH
407 }
408}
409
410impl From<u64> for Timestamp {
411 fn from(nanos: u64) -> Self {
412 Self(nanos)
413 }
414}
415
416impl From<Timestamp> for u64 {
417 fn from(ts: Timestamp) -> Self {
418 ts.0
419 }
420}
421
422// ============================================================================
423// Record Types - Copy (small enum and struct)
424// ============================================================================
425
426/// The kind of record stored in the log.
427///
428/// This enum distinguishes between different record types to enable
429/// efficient processing and verification.
430#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default, Serialize, Deserialize)]
431pub enum RecordKind {
432 /// Normal application data record.
433 #[default]
434 Data,
435 /// Periodic verification checkpoint (contains cumulative hash).
436 Checkpoint,
437 /// Logical deletion marker (data is not physically deleted).
438 Tombstone,
439}
440
441impl RecordKind {
442 /// Returns the single-byte discriminant for serialization.
443 pub fn as_byte(&self) -> u8 {
444 match self {
445 RecordKind::Data => 0,
446 RecordKind::Checkpoint => 1,
447 RecordKind::Tombstone => 2,
448 }
449 }
450
451 /// Creates a `RecordKind` from its byte discriminant.
452 ///
453 /// # Errors
454 ///
455 /// Returns `None` if the byte is not a valid discriminant.
456 pub fn from_byte(byte: u8) -> Option<Self> {
457 match byte {
458 0 => Some(RecordKind::Data),
459 1 => Some(RecordKind::Checkpoint),
460 2 => Some(RecordKind::Tombstone),
461 _ => None,
462 }
463 }
464}
465
466/// Metadata header for every log record.
467///
468/// This structure contains all metadata needed to verify and process
469/// a log record without reading its payload.
470#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
471pub struct RecordHeader {
472 /// Position in the log (0-indexed).
473 pub offset: Offset,
474 /// SHA-256 link to the previous record's hash (genesis for first record).
475 pub prev_hash: Hash,
476 /// When the record was committed (monotonic wall-clock).
477 pub timestamp: Timestamp,
478 /// Size of the payload in bytes.
479 pub payload_len: u32,
480 /// Type of record (Data, Checkpoint, Tombstone).
481 pub record_kind: RecordKind,
482}
483
484impl RecordHeader {
485 /// Creates a new record header.
486 ///
487 /// # Arguments
488 ///
489 /// * `offset` - Position in the log
490 /// * `prev_hash` - Hash of the previous record (or GENESIS for first)
491 /// * `timestamp` - When this record was committed
492 /// * `payload_len` - Size of the payload in bytes
493 /// * `record_kind` - Type of record
494 pub fn new(
495 offset: Offset,
496 prev_hash: Hash,
497 timestamp: Timestamp,
498 payload_len: u32,
499 record_kind: RecordKind,
500 ) -> Self {
501 Self {
502 offset,
503 prev_hash,
504 timestamp,
505 payload_len,
506 record_kind,
507 }
508 }
509
510 /// Returns true if this is the first record in the log.
511 pub fn is_genesis(&self) -> bool {
512 self.offset == Offset::ZERO && self.prev_hash.is_genesis()
513 }
514}
515
516// ============================================================================
517// Projection Tracking - Copy (small struct for projections)
518// ============================================================================
519
520/// Tracks which log entry a projection row was derived from.
521///
522/// Projections embed this in each row to enable:
523/// - Point-in-time queries (find rows at a specific offset)
524/// - Verification without walking the hash chain (hash provides direct check)
525/// - Audit trails (know exactly which event created/updated a row)
526#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
527pub struct AppliedIndex {
528 /// The log offset this row was derived from.
529 pub offset: Offset,
530 /// The hash at this offset for direct verification.
531 pub hash: Hash,
532}
533
534impl AppliedIndex {
535 /// Creates a new applied index.
536 pub fn new(offset: Offset, hash: Hash) -> Self {
537 Self { offset, hash }
538 }
539
540 /// Creates the initial applied index (before any records).
541 pub fn genesis() -> Self {
542 Self {
543 offset: Offset::ZERO,
544 hash: Hash::GENESIS,
545 }
546 }
547}
548
549impl Default for AppliedIndex {
550 fn default() -> Self {
551 Self::genesis()
552 }
553}
554
555// ============================================================================
556// Checkpoints - Copy (verification anchors in the log)
557// ============================================================================
558
559/// A periodic verification checkpoint stored in the log.
560///
561/// Checkpoints are records IN the log (not separate files), which means:
562/// - They are part of the hash chain (tamper-evident)
563/// - Checkpoint history is immutable
564/// - Single source of truth
565///
566/// Checkpoints enable efficient verified reads by providing trusted
567/// anchor points, reducing verification from O(n) to O(k) where k is
568/// the distance to the nearest checkpoint.
569#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
570pub struct Checkpoint {
571 /// Log position of this checkpoint record.
572 pub offset: Offset,
573 /// Cumulative hash of the chain at this point.
574 pub chain_hash: Hash,
575 /// Total number of records from genesis to this checkpoint.
576 pub record_count: u64,
577 /// When this checkpoint was created.
578 pub created_at: Timestamp,
579}
580
581impl Checkpoint {
582 /// Creates a new checkpoint.
583 ///
584 /// # Preconditions
585 ///
586 /// - `record_count` should equal `offset.as_u64() + 1` (0-indexed offset)
587 pub fn new(offset: Offset, chain_hash: Hash, record_count: u64, created_at: Timestamp) -> Self {
588 debug_assert_eq!(
589 record_count,
590 offset.as_u64() + 1,
591 "record_count should equal offset + 1"
592 );
593 Self {
594 offset,
595 chain_hash,
596 record_count,
597 created_at,
598 }
599 }
600}
601
602/// Policy for when to create checkpoints.
603///
604/// Checkpoints bound the worst-case verification cost. The default policy
605/// creates a checkpoint every 1000 records and on graceful shutdown.
606#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
607pub struct CheckpointPolicy {
608 /// Create a checkpoint every N records. Set to 0 to disable.
609 pub every_n_records: u64,
610 /// Create a checkpoint on graceful shutdown.
611 pub on_shutdown: bool,
612 /// If true, disable automatic checkpoints (only explicit calls).
613 pub explicit_only: bool,
614}
615
616impl CheckpointPolicy {
617 /// Creates a policy that checkpoints every N records.
618 pub fn every(n: u64) -> Self {
619 Self {
620 every_n_records: n,
621 on_shutdown: true,
622 explicit_only: false,
623 }
624 }
625
626 /// Creates a policy that only creates explicit checkpoints.
627 pub fn explicit_only() -> Self {
628 Self {
629 every_n_records: 0,
630 on_shutdown: false,
631 explicit_only: true,
632 }
633 }
634
635 /// Returns true if a checkpoint should be created at this offset.
636 pub fn should_checkpoint(&self, offset: Offset) -> bool {
637 if self.explicit_only {
638 return false;
639 }
640 if self.every_n_records == 0 {
641 return false;
642 }
643 // Checkpoint at offsets that are multiples of every_n_records
644 // (offset 999 for every_n_records=1000, etc.)
645 (offset.as_u64() + 1) % self.every_n_records == 0
646 }
647}
648
649impl Default for CheckpointPolicy {
650 /// Default policy: checkpoint every 1000 records, on shutdown.
651 fn default() -> Self {
652 Self {
653 every_n_records: 1000,
654 on_shutdown: true,
655 explicit_only: false,
656 }
657 }
658}
659
660// ============================================================================
661// Idempotency - Copy (16-byte identifier for duplicate prevention)
662// ============================================================================
663
664/// Length of idempotency IDs in bytes.
665pub const IDEMPOTENCY_ID_LENGTH: usize = 16;
666
667/// Unique identifier for duplicate transaction prevention.
668///
669/// Clients generate an `IdempotencyId` before their first attempt at a
670/// transaction. If the transaction needs to be retried (e.g., network
671/// timeout), the client reuses the same ID. The server tracks committed
672/// IDs to return the same result for duplicate requests.
673///
674/// Inspired by `FoundationDB`'s idempotency key design.
675///
676/// # FCIS Pattern
677///
678/// This type follows the Functional Core / Imperative Shell pattern:
679/// - `from_bytes()`: Pure restoration from storage
680/// - `from_random_bytes()`: Pure construction from bytes (`pub(crate)`)
681/// - `generate()`: Impure shell that invokes CSPRNG
682#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
683pub struct IdempotencyId([u8; IDEMPOTENCY_ID_LENGTH]);
684
685impl IdempotencyId {
686 // ========================================================================
687 // Functional Core (pure, testable)
688 // ========================================================================
689
690 /// Pure construction from random bytes.
691 ///
692 /// Restricted to `pub(crate)` to prevent misuse with weak random sources.
693 /// External callers should use `generate()` or `from_bytes()`.
694 pub(crate) fn from_random_bytes(bytes: [u8; IDEMPOTENCY_ID_LENGTH]) -> Self {
695 debug_assert!(
696 bytes.iter().any(|&b| b != 0),
697 "idempotency ID bytes are all zeros"
698 );
699 Self(bytes)
700 }
701
702 /// Restoration from stored bytes (pure).
703 ///
704 /// Use this when loading an `IdempotencyId` from storage or wire protocol.
705 pub fn from_bytes(bytes: [u8; IDEMPOTENCY_ID_LENGTH]) -> Self {
706 Self(bytes)
707 }
708
709 /// Returns the ID as a byte slice.
710 pub fn as_bytes(&self) -> &[u8; IDEMPOTENCY_ID_LENGTH] {
711 &self.0
712 }
713
714 // ========================================================================
715 // Imperative Shell (IO boundary)
716 // ========================================================================
717
718 /// Generates a new random idempotency ID using the OS CSPRNG.
719 ///
720 /// # Panics
721 ///
722 /// Panics if the OS CSPRNG fails, which indicates a catastrophic
723 /// system error (e.g., no entropy source available).
724 pub fn generate() -> Self {
725 let mut bytes = [0u8; IDEMPOTENCY_ID_LENGTH];
726 getrandom::fill(&mut bytes).expect("CSPRNG failure is catastrophic");
727 Self::from_random_bytes(bytes)
728 }
729}
730
731impl Debug for IdempotencyId {
732 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
733 // Show full hex for debugging (IDs are meant to be logged)
734 write!(f, "IdempotencyId(")?;
735 for byte in &self.0 {
736 write!(f, "{byte:02x}")?;
737 }
738 write!(f, ")")
739 }
740}
741
742impl Display for IdempotencyId {
743 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
744 // Full hex representation
745 for byte in &self.0 {
746 write!(f, "{byte:02x}")?;
747 }
748 Ok(())
749 }
750}
751
752impl From<[u8; IDEMPOTENCY_ID_LENGTH]> for IdempotencyId {
753 fn from(bytes: [u8; IDEMPOTENCY_ID_LENGTH]) -> Self {
754 Self::from_bytes(bytes)
755 }
756}
757
758impl From<IdempotencyId> for [u8; IDEMPOTENCY_ID_LENGTH] {
759 fn from(id: IdempotencyId) -> Self {
760 id.0
761 }
762}
763
764// ============================================================================
765// Recovery Tracking - Copy (generation-based recovery for compliance)
766// ============================================================================
767
768/// Monotonically increasing recovery generation.
769///
770/// Each recovery event creates a new generation. This provides natural
771/// audit checkpoints and explicit tracking of system recovery events.
772///
773/// Inspired by `FoundationDB`'s generation-based recovery tracking.
774#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
775pub struct Generation(u64);
776
777impl Generation {
778 /// The initial generation (before any recovery).
779 pub const INITIAL: Generation = Generation(0);
780
781 /// Creates a generation from a raw value.
782 pub fn new(value: u64) -> Self {
783 Self(value)
784 }
785
786 /// Returns the generation as a u64.
787 pub fn as_u64(&self) -> u64 {
788 self.0
789 }
790
791 /// Returns the next generation (incremented by 1).
792 pub fn next(&self) -> Self {
793 Generation(self.0.saturating_add(1))
794 }
795}
796
797impl Display for Generation {
798 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
799 write!(f, "gen:{}", self.0)
800 }
801}
802
803impl Default for Generation {
804 fn default() -> Self {
805 Self::INITIAL
806 }
807}
808
809impl From<u64> for Generation {
810 fn from(value: u64) -> Self {
811 Self(value)
812 }
813}
814
815impl From<Generation> for u64 {
816 fn from(generation: Generation) -> Self {
817 generation.0
818 }
819}
820
821/// Reason why a recovery was triggered.
822///
823/// This is recorded in the recovery log for compliance auditing.
824#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
825pub enum RecoveryReason {
826 /// Normal node restart (graceful or crash recovery).
827 NodeRestart,
828 /// Lost quorum and had to recover from remaining replicas.
829 QuorumLoss,
830 /// Detected data corruption requiring recovery.
831 CorruptionDetected,
832 /// Operator manually triggered recovery.
833 ManualIntervention,
834}
835
836impl Display for RecoveryReason {
837 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
838 match self {
839 RecoveryReason::NodeRestart => write!(f, "node_restart"),
840 RecoveryReason::QuorumLoss => write!(f, "quorum_loss"),
841 RecoveryReason::CorruptionDetected => write!(f, "corruption_detected"),
842 RecoveryReason::ManualIntervention => write!(f, "manual_intervention"),
843 }
844 }
845}
846
847/// Records a recovery event with explicit tracking of any data loss.
848///
849/// Critical for compliance: auditors can see exactly what happened during
850/// recovery, including any mutations that were discarded.
851///
852/// Inspired by `FoundationDB`'s 9-phase recovery with explicit data loss tracking.
853#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
854pub struct RecoveryRecord {
855 /// New generation after recovery.
856 pub generation: Generation,
857 /// Previous generation before recovery.
858 pub previous_generation: Generation,
859 /// Last known committed offset before recovery.
860 pub known_committed: Offset,
861 /// Offset we recovered to.
862 pub recovery_point: Offset,
863 /// Range of discarded prepares (if any) - EXPLICIT LOSS TRACKING.
864 ///
865 /// If `Some`, this range of offsets contained prepared but uncommitted
866 /// mutations that were discarded during recovery. This is the critical
867 /// compliance field: it explicitly documents any data loss.
868 pub discarded_range: Option<Range<Offset>>,
869 /// When recovery occurred.
870 pub timestamp: Timestamp,
871 /// Why recovery was triggered.
872 pub reason: RecoveryReason,
873}
874
875impl RecoveryRecord {
876 /// Creates a new recovery record.
877 ///
878 /// # Arguments
879 ///
880 /// * `generation` - The new generation after recovery
881 /// * `previous_generation` - The generation before recovery
882 /// * `known_committed` - Last known committed offset
883 /// * `recovery_point` - The offset we recovered to
884 /// * `discarded_range` - Range of discarded uncommitted prepares, if any
885 /// * `timestamp` - When recovery occurred
886 /// * `reason` - Why recovery was triggered
887 ///
888 /// # Preconditions
889 ///
890 /// - `generation` must be greater than `previous_generation`
891 /// - `recovery_point` must be <= `known_committed`
892 pub fn new(
893 generation: Generation,
894 previous_generation: Generation,
895 known_committed: Offset,
896 recovery_point: Offset,
897 discarded_range: Option<Range<Offset>>,
898 timestamp: Timestamp,
899 reason: RecoveryReason,
900 ) -> Self {
901 debug_assert!(
902 generation > previous_generation,
903 "new generation must be greater than previous"
904 );
905 debug_assert!(
906 recovery_point <= known_committed,
907 "recovery point cannot exceed known committed"
908 );
909
910 Self {
911 generation,
912 previous_generation,
913 known_committed,
914 recovery_point,
915 discarded_range,
916 timestamp,
917 reason,
918 }
919 }
920
921 /// Returns true if any data was lost during this recovery.
922 pub fn had_data_loss(&self) -> bool {
923 self.discarded_range.is_some()
924 }
925
926 /// Returns the number of discarded records, if any.
927 pub fn discarded_count(&self) -> u64 {
928 self.discarded_range
929 .as_ref()
930 .map_or(0, |r| r.end.as_u64().saturating_sub(r.start.as_u64()))
931 }
932}
933
934// ============================================================================
935// Stream Name - Clone (contains String, but rarely cloned)
936// ============================================================================
937
938/// Human-readable name for a stream.
939#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
940pub struct StreamName(String);
941
942impl StreamName {
943 pub fn new(name: impl Into<String>) -> Self {
944 Self(name.into())
945 }
946
947 pub fn as_str(&self) -> &str {
948 &self.0
949 }
950}
951
952impl Display for StreamName {
953 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
954 write!(f, "{}", self.0)
955 }
956}
957
958impl From<String> for StreamName {
959 fn from(name: String) -> Self {
960 Self(name)
961 }
962}
963
964impl From<&str> for StreamName {
965 fn from(name: &str) -> Self {
966 Self(name.to_string())
967 }
968}
969
970impl From<StreamName> for String {
971 fn from(value: StreamName) -> Self {
972 value.0
973 }
974}
975
976// ============================================================================
977// Data Classification - Copy (simple enum, no heap data)
978// ============================================================================
979
980/// Classification of data for compliance purposes.
981#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
982pub enum DataClass {
983 /// Protected Health Information - subject to HIPAA restrictions.
984 PHI,
985 /// Non-PHI data that doesn't contain health information.
986 NonPHI,
987 /// Data that has been de-identified per HIPAA Safe Harbor.
988 Deidentified,
989}
990
991// ============================================================================
992// Placement - Clone (Region::Custom contains String)
993// ============================================================================
994
995/// Placement policy for a stream.
996#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
997pub enum Placement {
998 /// Data must remain within the specified region.
999 Region(Region),
1000 /// Data can be replicated globally across all regions.
1001 Global,
1002}
1003
1004/// Geographic region for data placement.
1005#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
1006pub enum Region {
1007 /// US East (N. Virginia) - us-east-1
1008 USEast1,
1009 /// Asia Pacific (Sydney) - ap-southeast-2
1010 APSoutheast2,
1011 /// Custom region identifier
1012 Custom(String),
1013}
1014
1015impl Region {
1016 pub fn custom(name: impl Into<String>) -> Self {
1017 Self::Custom(name.into())
1018 }
1019}
1020
1021impl Display for Region {
1022 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1023 match self {
1024 Region::USEast1 => write!(f, "us-east-1"),
1025 Region::APSoutheast2 => write!(f, "ap-southeast-2"),
1026 Region::Custom(custom) => write!(f, "{custom}"),
1027 }
1028 }
1029}
1030
1031// ============================================================================
1032// Stream Metadata - Clone (created once per stream, cloned rarely)
1033// ============================================================================
1034
1035/// Metadata describing a stream's configuration and current state.
1036#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
1037pub struct StreamMetadata {
1038 pub stream_id: StreamId,
1039 pub stream_name: StreamName,
1040 pub data_class: DataClass,
1041 pub placement: Placement,
1042 pub current_offset: Offset,
1043}
1044
1045impl StreamMetadata {
1046 /// Creates new stream metadata with offset initialized to 0.
1047 pub fn new(
1048 stream_id: StreamId,
1049 stream_name: StreamName,
1050 data_class: DataClass,
1051 placement: Placement,
1052 ) -> Self {
1053 Self {
1054 stream_id,
1055 stream_name,
1056 data_class,
1057 placement,
1058 current_offset: Offset::default(),
1059 }
1060 }
1061}
1062
1063// ============================================================================
1064// Batch Payload - NOT Clone (contains Vec<Bytes>, move only)
1065// ============================================================================
1066
1067/// A batch of events to append to a stream.
1068#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
1069pub struct BatchPayload {
1070 pub stream_id: StreamId,
1071 /// The events to append (zero-copy Bytes).
1072 pub events: Vec<Bytes>,
1073 /// Expected current offset for optimistic concurrency.
1074 pub expected_offset: Offset,
1075}
1076
1077impl BatchPayload {
1078 pub fn new(stream_id: StreamId, events: Vec<Bytes>, expected_offset: Offset) -> Self {
1079 Self {
1080 stream_id,
1081 events,
1082 expected_offset,
1083 }
1084 }
1085}
1086
1087// ============================================================================
1088// Audit Actions - Clone (for flexibility in logging)
1089// ============================================================================
1090
1091/// Actions recorded in the audit log.
1092#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1093pub enum AuditAction {
1094 /// A new stream was created.
1095 StreamCreated {
1096 stream_id: StreamId,
1097 stream_name: StreamName,
1098 data_class: DataClass,
1099 placement: Placement,
1100 },
1101 /// Events were appended to a stream.
1102 EventsAppended {
1103 stream_id: StreamId,
1104 count: u32,
1105 from_offset: Offset,
1106 },
1107}
1108
1109// ============================================================================
1110// Event Persistence - Trait for durable event log writes
1111// ============================================================================
1112
1113/// Abstraction for persisting events to the durable event log.
1114///
1115/// This trait is the bridge between the projection layer and the
1116/// `Kimberlite` replication system. Implementations must block until
1117/// persistence is confirmed.
1118///
1119/// # Healthcare Compliance
1120///
1121/// This is the critical path for HIPAA compliance. The implementation must:
1122/// - **Block until VSR consensus** completes (quorum durability)
1123/// - **Return `Err`** if consensus fails (triggers rollback)
1124/// - **Never return `Ok`** unless events are durably stored
1125///
1126/// # Implementation Notes
1127///
1128/// The implementor (typically `Runtime`) must handle the sync→async bridge:
1129///
1130/// ```ignore
1131/// impl EventPersister for RuntimeHandle {
1132/// fn persist_blocking(&self, stream_id: StreamId, events: Vec<Bytes>) -> Result<Offset, PersistError> {
1133/// // Bridge sync callback to async runtime
1134/// tokio::task::block_in_place(|| {
1135/// tokio::runtime::Handle::current().block_on(async {
1136/// self.inner.append(stream_id, events).await
1137/// })
1138/// })
1139/// .map_err(|e| {
1140/// tracing::error!(error = %e, "VSR persistence failed");
1141/// PersistError::ConsensusFailed
1142/// })
1143/// }
1144/// }
1145/// ```
1146///
1147/// # Why `Vec<Bytes>` instead of typed events?
1148///
1149/// Events are serialized before reaching this trait. This keeps `kmb-types`
1150/// decoupled from domain-specific event schemas.
1151pub trait EventPersister: Send + Sync + Debug {
1152 /// Persist a batch of serialized events to the durable event log.
1153 ///
1154 /// This method **blocks** until VSR consensus confirms the events are
1155 /// durably stored on a quorum of nodes.
1156 ///
1157 /// # Arguments
1158 ///
1159 /// * `stream_id` - The stream to append events to
1160 /// * `events` - Serialized events
1161 ///
1162 /// # Returns
1163 ///
1164 /// * `Ok(offset)` - Events persisted, returns the new stream offset
1165 /// * `Err(PersistError)` - Persistence failed, caller should rollback
1166 ///
1167 /// # Errors
1168 ///
1169 /// * [`PersistError::ConsensusFailed`] - VSR quorum unavailable after retries
1170 /// * [`PersistError::StorageError`] - Disk I/O or serialization failure
1171 /// * [`PersistError::ShuttingDown`] - System is terminating
1172 fn persist_blocking(
1173 &self,
1174 stream_id: StreamId,
1175 events: Vec<Bytes>,
1176 ) -> Result<Offset, PersistError>;
1177}
1178
1179/// Error returned when event persistence fails.
1180///
1181/// The hook uses this to decide whether to rollback the transaction.
1182/// Specific underlying errors are logged by the implementation.
1183#[derive(Debug, Clone, PartialEq, Eq)]
1184pub enum PersistError {
1185 /// VSR consensus failed after retries (quorum unavailable)
1186 ConsensusFailed,
1187 /// Storage I/O error
1188 StorageError,
1189 /// System is shutting down
1190 ShuttingDown,
1191}
1192
1193impl std::fmt::Display for PersistError {
1194 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1195 match self {
1196 Self::ConsensusFailed => write!(f, "consensus failed after retries"),
1197 Self::StorageError => write!(f, "storage I/O error"),
1198 Self::ShuttingDown => write!(f, "system is shutting down"),
1199 }
1200 }
1201}
1202
1203impl std::error::Error for PersistError {}
1204
1205#[cfg(test)]
1206mod tests;