1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
//! Typed reactor output batch (Dispatch Chapter T2).
//!
//! [`ReactionBatch`] is the accumulator a [`TypedReactive`] handler writes
//! into. It is a thin, typed wrapper over [`Vec<BatchAppendItem>`]:
//!
//! * Items are pushed via [`ReactionBatch::push_typed`] — kind is inferred
//! from the payload's `T::KIND`, so handler code never writes
//! `EventKind::custom(...)`.
//! * The batch is flushed by the typed-reactor loop (via
//! [`Store::append_reaction_batch`]), and only when the handler returned
//! `Ok(())`. Each `ReactionBatch` flush is atomic with respect to its own
//! batch append, but a whole cursor poll may flush several of these
//! batches sequentially; the typed reactor is therefore at-least-once, not
//! one giant atomic multi-event append.
//! * If the handler returns `Err`, the `ReactionBatch` is dropped and no
//! items from that event land in the store — drop-on-error is structural,
//! not runtime.
//! * Construction (`new`) and `flush` are `pub(crate)`. Users never build
//! or flush a batch directly; the reactor loop owns both ends.
//!
//! [`TypedReactive`]: crate::event::sourcing::TypedReactive
//! [`Store::append_reaction_batch`]: crate::store::Store::append_reaction_batch
//! [`Vec<BatchAppendItem>`]: BatchAppendItem
use std::sync::Arc;
use crate::coordinate::Coordinate;
use crate::event::EventPayload;
use crate::store::append::{AppendOptions, BatchAppendItem, CausationRef};
use crate::store::{AppendReceipt, Open, Store, StoreError};
/// Typed output batch accumulated by a reactor handler and flushed by the
/// typed-reactor loop when the handler returns `Ok(())`.
///
/// See the module docs for the drop-on-error guarantee and the flush model.
pub struct ReactionBatch {
items: Vec<BatchAppendItem>,
}
impl ReactionBatch {
/// Construct an empty batch. Reactor loops own their own batches; users do
/// not build this directly.
pub(crate) fn new() -> Self {
Self { items: Vec::new() }
}
/// Push a typed reaction — kind is inferred from `T::KIND`.
///
/// # Errors
/// Returns [`StoreError::Serialization`] if the payload cannot be
/// serialized to MessagePack at stage time.
pub fn push_typed<T: EventPayload>(
&mut self,
coord: Coordinate,
payload: &T,
causation: CausationRef,
) -> Result<(), StoreError> {
self.push_typed_with_options(coord, payload, AppendOptions::default(), causation)
}
/// Push a typed reaction with explicit [`AppendOptions`] — kind is inferred
/// from `T::KIND`.
///
/// # Errors
/// Returns [`StoreError::Serialization`] if the payload cannot be
/// serialized to MessagePack at stage time.
pub fn push_typed_with_options<T: EventPayload>(
&mut self,
coord: Coordinate,
payload: &T,
options: AppendOptions,
causation: CausationRef,
) -> Result<(), StoreError> {
let item = BatchAppendItem::typed(coord, payload, options, causation)?;
self.items.push(item);
Ok(())
}
/// Number of staged reactions.
pub fn len(&self) -> usize {
self.items.len()
}
/// True when nothing has been staged.
pub fn is_empty(&self) -> bool {
self.items.is_empty()
}
/// Flush all staged reactions as one batch append with the supplied
/// correlation and causation IDs inherited from the triggering source
/// event.
///
/// Per-item causation overrides passed via [`CausationRef::Absolute`] are
/// preserved by [`Store::append_reaction_batch`] (it only fills the
/// default causation when the item's causation is `None`).
///
/// Called only by the typed-reactor loop after the handler returned
/// `Ok(())`. A batch that is not flushed (because the handler errored)
/// is dropped and no partial commits occur.
///
/// # Errors
/// Returns any [`StoreError`] surfaced by the underlying batch append.
pub(crate) fn flush(
self,
store: &Arc<Store<Open>>,
correlation_id: u128,
causation_id: u128,
) -> Result<Vec<AppendReceipt>, StoreError> {
if self.items.is_empty() {
return Ok(Vec::new());
}
store.append_reaction_batch(
crate::id::CorrelationId::from(correlation_id),
crate::id::CausationId::from(causation_id),
self.items,
)
}
}
impl Default for ReactionBatch {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
// Unit tests exercise `pub(crate) flush`; setup uses `.expect(..)` so each
// failure carries a message, and they are gated by #[cfg(test)] so they never
// reach non-test builds.
mod tests {
//! Internal unit tests for `ReactionBatch::flush`. `flush` is `pub(crate)`
//! because users never call it directly — the typed-reactor loop (T4b)
//! owns the call site. Until T4b ships, these unit tests are the only
//! witness that `flush` works. After T4b lands, its integration tests
//! are the primary witness; these stay as unit-level guards.
use super::*;
use crate::coordinate::Coordinate;
use crate::store::{Store, StoreConfig};
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
struct InternalA {
n: u64,
}
impl crate::event::EventPayload for InternalA {
const KIND: crate::event::EventKind = crate::event::EventKind::custom(6, 1);
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
struct InternalB {
s: String,
}
impl crate::event::EventPayload for InternalB {
const KIND: crate::event::EventKind = crate::event::EventKind::custom(6, 2);
}
fn open_store() -> (Arc<Store<Open>>, tempfile::TempDir) {
let dir = tempfile::tempdir().expect("tempdir");
let store = Store::open(StoreConfig::new(dir.path())).expect("open");
(Arc::new(store), dir)
}
#[test]
fn flush_returns_empty_receipts_for_empty_batch() {
let (store, _dir) = open_store();
let batch = ReactionBatch::new();
let receipts = batch.flush(&store, 0, 0).expect("flush empty");
assert!(receipts.is_empty());
}
#[test]
fn is_empty_tracks_the_staged_item_count() {
// Kills reaction.rs `ReactionBatch::is_empty` body → `true` / `false`: a
// fresh batch is empty, but a batch with one staged item is NOT. (`len`
// body → 0/1 is separately pinned by the multi-item flush test.)
let mut batch = ReactionBatch::new();
assert!(
batch.is_empty(),
"PROPERTY: a fresh batch is empty; the body→false mutant reports non-empty"
);
let coord =
Coordinate::new("entity:reaction-is-empty", "scope:test").expect("valid coordinate");
batch
.push_typed(coord, &InternalA { n: 1 }, CausationRef::None)
.expect("push reaction item into batch");
assert!(
!batch.is_empty(),
"PROPERTY: a batch with a staged item is not empty; the body→true mutant \
reports empty"
);
assert_eq!(batch.len(), 1);
}
#[test]
fn default_yields_an_empty_batch() {
// Kills reaction.rs `Default for ReactionBatch` diverging from `new`: the
// default batch must be the same empty accumulator.
let batch = ReactionBatch::default();
assert!(
batch.is_empty(),
"PROPERTY: Default::default() is the empty batch, same as new()"
);
assert_eq!(batch.len(), 0);
}
#[test]
fn flush_commits_multi_item_batch_atomically() {
let (store, _dir) = open_store();
let source = store
.append_typed(
&Coordinate::new("entity:reaction-internal-src", "scope:test")
.expect("valid coordinate"),
&InternalA { n: 1 },
)
.expect("source append");
let before = store.stats().global_sequence;
let target_coord = Coordinate::new("entity:reaction-internal-tgt", "scope:test")
.expect("valid coordinate");
let mut batch = ReactionBatch::new();
batch
.push_typed(
target_coord.clone(),
&InternalA { n: 2 },
CausationRef::None,
)
.expect("push reaction item into batch");
batch
.push_typed(
target_coord.clone(),
&InternalB {
s: "chained".into(),
},
CausationRef::PriorItem(0),
)
.expect("push reaction item into batch");
assert_eq!(batch.len(), 2);
let receipts = batch
.flush(
&store,
{
use crate::id::EntityIdType;
source.event_id.as_u128()
},
{
use crate::id::EntityIdType;
source.event_id.as_u128()
},
)
.expect("flush");
assert_eq!(
receipts.len(),
2,
"PROPERTY: flush returns one receipt per pushed item"
);
// Atomic visibility: both events appear together.
let after = store.stats().global_sequence;
assert_eq!(
after - before,
2,
"PROPERTY: atomic flush advances sequence by exactly item count"
);
// Kind stamping survived flush.
assert_eq!(store.by_fact_typed::<InternalA>().len(), 2);
assert_eq!(store.by_fact_typed::<InternalB>().len(), 1);
}
#[test]
fn prior_item_causation_resolves_within_flush() {
let (store, _dir) = open_store();
let source = store
.append_typed(
&Coordinate::new("entity:reaction-chain-src", "scope:test")
.expect("valid coordinate"),
&InternalA { n: 10 },
)
.expect("source");
let target =
Coordinate::new("entity:reaction-chain-tgt", "scope:test").expect("valid coordinate");
let mut batch = ReactionBatch::new();
batch
.push_typed(target.clone(), &InternalA { n: 11 }, CausationRef::None)
.expect("push reaction item into batch");
batch
.push_typed(
target.clone(),
&InternalB {
s: "after-0".into(),
},
CausationRef::PriorItem(0),
)
.expect("push reaction item into batch");
let receipts = batch
.flush(
&store,
{
use crate::id::EntityIdType;
source.event_id.as_u128()
},
{
use crate::id::EntityIdType;
source.event_id.as_u128()
},
)
.expect("flush");
assert_eq!(receipts.len(), 2);
// The second item was caused by the first. Fetch and verify.
let second = store.get(receipts[1].event_id).expect("get second");
assert_eq!(
second.event.header.causation_id,
Some({
use crate::id::EntityIdType;
crate::id::CausationId::from(receipts[0].event_id.as_u128())
}),
"PROPERTY: PriorItem causation resolves to first item's event_id"
);
}
}