1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
use crate::coordinate::Coordinate;
use crate::event::{EventKind, StoredEvent};
use crate::store::{DiskPos, StoreError};
use serde::Serialize;
/// Reference to causation for batch items.
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
pub enum CausationRef {
/// No causation.
#[default]
None,
/// Absolute event ID.
Absolute(u128),
/// Reference to previous item in same batch by index.
PriorItem(usize),
}
/// Single item in a batch append operation.
#[derive(Clone, Debug)]
pub struct BatchAppendItem {
/// Target coordinate (entity/scope) for this event.
pub coord: Coordinate,
/// Event kind classification.
pub kind: EventKind,
/// Pre-serialized payload bytes (MessagePack).
pub payload_bytes: Vec<u8>,
/// Append options (idempotency, correlation, etc.).
pub options: AppendOptions,
/// Causation reference for intra-batch linking.
pub causation: CausationRef,
}
impl BatchAppendItem {
/// Create a new batch item with serialized payload.
///
/// # Errors
/// Returns `StoreError::Serialization` if payload serialization fails.
pub fn new(
coord: Coordinate,
kind: EventKind,
payload: &impl Serialize,
options: AppendOptions,
causation: CausationRef,
) -> Result<Self, StoreError> {
let payload_bytes =
rmp_serde::to_vec(payload).map_err(|e| StoreError::Serialization(Box::new(e)))?;
Ok(Self {
coord,
kind,
payload_bytes,
options,
causation,
})
}
}
/// AppendReceipt: proof an event was persisted.
#[derive(Clone, Debug)]
pub struct AppendReceipt {
/// Unique ID of the persisted event.
pub event_id: u128,
/// Global sequence number assigned at commit time.
pub sequence: u64,
/// Location of the event frame on disk.
pub disk_pos: DiskPos,
}
/// AppendOptions: CAS, idempotency, custom correlation/causation.
/// [SPEC:src/store/contracts.rs — AppendOptions]
#[derive(Clone, Copy, Debug, Default)]
pub struct AppendOptions {
/// Expected entity sequence for compare-and-swap; `None` skips the CAS check.
pub expected_sequence: Option<u32>,
/// Idempotency key; duplicate appends with the same key return the original receipt.
pub idempotency_key: Option<u128>,
/// Custom correlation ID; defaults to the generated event ID if `None`.
pub correlation_id: Option<u128>,
/// ID of the event that caused this append; `None` for root-cause events.
pub causation_id: Option<u128>,
/// EventHeader flags (FLAG_REQUIRES_ACK, FLAG_TRANSACTIONAL, FLAG_REPLAY).
/// Default: 0 (no flags). [SPEC:src/event/header.rs — Flag bit constants]
pub flags: u8,
}
impl AppendOptions {
/// Create new AppendOptions with all defaults.
pub fn new() -> Self {
Self::default()
}
/// Set expected sequence for compare-and-swap (CAS) check.
pub fn with_cas(mut self, seq: u32) -> Self {
self.expected_sequence = Some(seq);
self
}
/// Set idempotency key. Duplicate appends with the same key return the original receipt.
pub fn with_idempotency(mut self, key: u128) -> Self {
self.idempotency_key = Some(key);
self
}
/// Set EventHeader flags (bitwise OR of FLAG_REQUIRES_ACK, FLAG_TRANSACTIONAL, FLAG_REPLAY).
pub fn with_flags(mut self, flags: u8) -> Self {
self.flags = flags;
self
}
/// Set custom correlation ID.
pub fn with_correlation(mut self, id: u128) -> Self {
self.correlation_id = Some(id);
self
}
/// Set custom causation ID.
pub fn with_causation(mut self, id: u128) -> Self {
self.causation_id = Some(id);
self
}
}
/// Predicate for filtering events during compaction. Returns true to keep, false to drop.
pub type RetentionPredicate = Box<dyn Fn(&StoredEvent<serde_json::Value>) -> bool + Send>;
/// CompactionStrategy: how compact() handles events during segment merging.
#[non_exhaustive]
pub enum CompactionStrategy {
/// Merge sealed segments into one. No events removed.
Merge,
/// Merge + drop events failing the retention predicate.
/// Dropped events are permanently lost.
Retention(RetentionPredicate),
/// Merge + write tombstone markers for dropped events.
/// Downstream consumers can detect deletions.
Tombstone(RetentionPredicate),
}
/// CompactionConfig: controls compact() behavior.
pub struct CompactionConfig {
/// Strategy for handling events during compaction.
pub strategy: CompactionStrategy,
/// Minimum number of sealed segments before compaction runs.
/// Below this threshold, compact() returns early.
pub min_segments: usize,
}
impl Default for CompactionConfig {
fn default() -> Self {
Self {
strategy: CompactionStrategy::Merge,
min_segments: 2,
}
}
}
/// Validate payload length fits in u32. Prevents silent truncation
/// when serialized payloads exceed 4GB (unlikely but possible with
/// pathological inputs or corrupted serialization).
pub(crate) fn checked_payload_len(payload_bytes: &[u8]) -> Result<u32, StoreError> {
u32::try_from(payload_bytes.len())
.map_err(|_| StoreError::ser_msg("payload size exceeds u32::MAX (4GB limit)"))
}