1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
use crateCoordinate;
use crate;
/// Internal-friendly marker describing which replay lane the store should use
/// for a projection. This stays tiny and data-oriented on purpose.
/// Marker trait selecting how projection replay decodes event payloads.
///
/// The store owns the concrete replay pipeline for each input mode. Projection
/// implementations pick the mode via their associated `Input` type and then
/// operate over `Event<<Self::Input as ProjectionInput>::Payload>`.
/// Default projection replay mode: payloads are decoded into `serde_json::Value`.
;
/// Raw replay mode: payloads remain in their original MessagePack bytes.
;
/// Convenience alias for the payload shape used by a projection type.
pub type ProjectionPayload<T> = Payload;
/// Convenience alias for the event shape used by a projection type.
pub type ProjectionEvent<T> = ;
/// `EventSourced`: backward-looking fold. Replay events to reconstruct state.
///
/// The associated `Input` selects the replay decode lane. The default and
/// most ergonomic choice is [`JsonValueInput`], which preserves the current
/// `serde_json::Value` projection behavior. Implement [`RawMsgpackInput`] only
/// when the projection benefits from operating directly on raw MessagePack
/// payload bytes.
/// `TypedReactive<T>`: forward-looking counterpart to `EventSourced`, keyed by
/// a single typed payload. A reactor of this shape reacts to events of one
/// `T: EventPayload` and emits derived events into a `ReactionBatch`, which
/// the typed-reactor loop gathers and later flushes after `Ok(())` from
/// `react`.
///
/// This is the typed sibling of [`Reactive<P>`]. The raw `Reactive<P>`
/// trait and [`Store::react_loop`](crate::store::Store::react_loop) stay
/// intact as the lossy push variant; `TypedReactive<T>` rides the
/// cursor canal (see ADR-0011) with the same at-least-once / checkpoint
/// semantics described on the typed reactor surface.
///
/// Decode-failure semantics are rigorously split between two modes:
///
/// * **Wrong kind** — the event's `EventKind` is not `T::KIND`. The
/// typed loop filters it silently; `react` is never called. This is a
/// normal filter, not an error.
/// * **Matched kind + decode failure** — the event's `EventKind` is
/// `T::KIND` but its payload bytes do not deserialize into `T`. This
/// is a hard correctness signal. The loop stops and the error
/// propagates through the join handle as `ReactorError::Decode`. It is
/// **never** a silent skip.
/// Error returned by [`MultiReactive::dispatch`] — identical semantics to
/// T4b's single-kind `ReactorError`, just exposed at the trait level so the
/// derive can generate matching error flow. Matched-kind decode failures
/// are always surfaced as `Decode`, never silently skipped.
/// `MultiReactive<Input>`: reactor surface bound to multiple payload types
/// via `#[derive(MultiEventReactor)]`. Generic over the replay-lane input
/// (`JsonValueInput` for `react_loop_multi`, `RawMsgpackInput` for
/// `react_loop_multi_raw`) so both lanes are first-class per invariant 5.
///
/// Implementors almost always come from the derive. The `dispatch` body
/// is a `match` on `event.header.event_kind` that uses the `DecodeTyped`
/// seam to route each matched kind to the right handler, with the same
/// wrong-kind-is-a-silent-filter vs matched-kind-decode-fail-is-a-hard-error
/// contract as T4b.
/// `Reactive<P>`: forward-looking counterpart. See event → maybe emit derived events.
/// Products compose: subscribe + react + append (7 lines of glue).
///
/// # Manual Glue Pattern
/// ```no_run
/// # use batpak::prelude::*;
/// # use batpak::event::sourcing::Reactive;
/// # struct MyReactor;
/// # impl Reactive<serde_json::Value> for MyReactor {
/// # fn react(&self, _event: &Event<serde_json::Value>) -> Vec<(Coordinate, EventKind, serde_json::Value)> { vec![] }
/// # }
/// # fn example(store: &Store, reactor: &MyReactor) {
/// let region = Region::entity("order:*");
/// let sub = store.subscribe_lossy(®ion);
/// while let Some(notif) = sub.recv() {
/// let stored = store.get(batpak::id::EventId::from(notif.event_id)).unwrap();
/// for (coord, kind, payload) in reactor.react(&stored.event) {
/// store.append_reaction(
/// &coord,
/// kind,
/// &payload,
/// batpak::id::CorrelationId::from(notif.correlation_id),
/// batpak::id::CausationId::from(notif.event_id),
/// ).unwrap();
/// }
/// }
/// # }
/// ```
///
/// For convenience, use [`Store::react_loop`](crate::store::Store::react_loop) which
/// spawns a thread running this pattern automatically.