Skip to main content

commerce_theory/
event_sourcing.rs

1use crate::foundation::*;
2use crate::inventory::*;
3use crate::orders::*;
4use crate::risk_privacy::*;
5
6#[derive(Clone, Debug, PartialEq, Eq)]
7#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
8pub enum DomainEvent {
9    OrderPlaced(OrderId, Money),
10    PaymentCaptured(OrderId, Money),
11    RefundIssued(OrderId, Money),
12    StockReserved(Sku, Quantity),
13    ReservationReleased(Sku, Quantity),
14    ReservedShipmentConfirmed(Sku, Quantity),
15    TaxLiabilityRecorded(Id, Money),
16    OrderShipped(OrderId),
17    LeadConverted(LeadId, OpportunityId),
18    SupportCaseOpened(SupportCaseId, Option<OrderId>),
19    ShipmentPlanned(ShipmentId, OrderId),
20    ShipmentDelivered(ShipmentId),
21    ReturnApproved(ReturnAuthorizationId, OrderId, Money),
22}
23
24#[must_use]
25pub const fn domain_event_is_crm(event: &DomainEvent) -> bool {
26    matches!(
27        event,
28        DomainEvent::LeadConverted(_, _) | DomainEvent::SupportCaseOpened(_, _)
29    )
30}
31
32#[must_use]
33pub const fn domain_event_is_logistics(event: &DomainEvent) -> bool {
34    matches!(
35        event,
36        DomainEvent::ShipmentPlanned(_, _)
37            | DomainEvent::ShipmentDelivered(_)
38            | DomainEvent::ReturnApproved(_, _, _)
39    )
40}
41
42domain_struct! {
43    pub struct EventEnvelope {
44        sequence: Nat,
45        event: DomainEvent,
46    }
47}
48
49domain_struct! {
50    pub struct EventStream {
51        events: Vec<EventEnvelope>,
52        last_sequence: Nat,
53    }
54}
55
56#[must_use]
57pub fn stream_sequences_strictly_increase_from(last: Nat, events: &[EventEnvelope]) -> bool {
58    let mut cursor = last;
59    for event in events {
60        if cursor >= event.sequence {
61            return false;
62        }
63        cursor = event.sequence;
64    }
65    true
66}
67
68#[must_use]
69pub fn stream_sequences_strictly_increase(stream: &EventStream) -> bool {
70    stream_sequences_strictly_increase_from(0, &stream.events)
71}
72
73domain_struct! {
74    pub struct WebhookOrderingState {
75        last_sequence: Nat,
76    }
77}
78
79pub const fn apply_webhook(
80    s: &WebhookOrderingState,
81    seq: Nat,
82) -> DomainResult<WebhookOrderingState> {
83    if s.last_sequence >= seq {
84        return Err(ValidationError::Invariant(
85            "webhook sequence must be newer than cursor",
86        ));
87    }
88    Ok(WebhookOrderingState::new(seq))
89}
90
91pub fn replay_webhook_stream(
92    mut state: WebhookOrderingState,
93    events: &[EventEnvelope],
94) -> DomainResult<WebhookOrderingState> {
95    for event in events {
96        state = apply_webhook(&state, event.sequence)?;
97    }
98    Ok(state)
99}
100
101domain_struct! {
102    pub struct IdempotencyState {
103        processed: Vec<IdempotencyKey>,
104    }
105}
106
107#[must_use]
108pub fn already_processed(key: IdempotencyKey, state: &IdempotencyState) -> bool {
109    state.processed.contains(&key)
110}
111
112#[must_use]
113pub fn mark_processed(key: IdempotencyKey, state: &IdempotencyState) -> IdempotencyState {
114    let mut processed = Vec::with_capacity(state.processed.len() + 1);
115    processed.push(key);
116    processed.extend(state.processed.iter().copied());
117    IdempotencyState::new(processed)
118}
119
120domain_struct! {
121    pub struct ValidSystemState {
122        stock: StockState,
123        ledger: PaymentLedger,
124        tax_liability: Money,
125        crm_event_count: Nat,
126        logistics_event_count: Nat,
127    }
128}
129
130pub fn apply_stock_reserved_event(
131    state: &ValidSystemState,
132    sku: Sku,
133    quantity: Quantity,
134) -> DomainResult<ValidSystemState> {
135    if state.stock.sku() != sku {
136        return Err(ValidationError::Invariant("stock-reserved SKU mismatch"));
137    }
138    Ok(ValidSystemState::new(
139        reserve_stock(&state.stock, quantity)?,
140        state.ledger.clone(),
141        state.tax_liability,
142        state.crm_event_count,
143        state.logistics_event_count,
144    ))
145}
146
147pub fn apply_refund_issued_event(
148    state: &ValidSystemState,
149    amount: Money,
150) -> DomainResult<ValidSystemState> {
151    Ok(ValidSystemState::new(
152        state.stock,
153        issue_refund(&state.ledger, amount)?,
154        state.tax_liability,
155        state.crm_event_count,
156        state.logistics_event_count,
157    ))
158}
159
160pub fn apply_reservation_released_event(
161    state: &ValidSystemState,
162    sku: Sku,
163    quantity: Quantity,
164) -> DomainResult<ValidSystemState> {
165    if state.stock.sku() != sku {
166        return Err(ValidationError::Invariant(
167            "reservation-released SKU mismatch",
168        ));
169    }
170    Ok(ValidSystemState::new(
171        release_reserved_stock(&state.stock, quantity)?,
172        state.ledger.clone(),
173        state.tax_liability,
174        state.crm_event_count,
175        state.logistics_event_count,
176    ))
177}
178
179pub fn apply_reserved_shipment_confirmed_event(
180    state: &ValidSystemState,
181    sku: Sku,
182    quantity: Quantity,
183) -> DomainResult<ValidSystemState> {
184    if state.stock.sku() != sku {
185        return Err(ValidationError::Invariant(
186            "reserved-shipment-confirmed SKU mismatch",
187        ));
188    }
189    Ok(ValidSystemState::new(
190        confirm_reserved_shipment(&state.stock, quantity)?,
191        state.ledger.clone(),
192        state.tax_liability,
193        state.crm_event_count,
194        state.logistics_event_count,
195    ))
196}
197
198pub fn apply_tax_liability_recorded_event(
199    state: &ValidSystemState,
200    amount: Money,
201) -> DomainResult<ValidSystemState> {
202    Ok(ValidSystemState::new(
203        state.stock,
204        state.ledger.clone(),
205        checked_add(
206            state.tax_liability,
207            amount,
208            "apply_tax_liability_recorded_event",
209        )?,
210        state.crm_event_count,
211        state.logistics_event_count,
212    ))
213}
214
215pub fn apply_crm_projected_event(state: &ValidSystemState) -> DomainResult<ValidSystemState> {
216    Ok(ValidSystemState::new(
217        state.stock,
218        state.ledger.clone(),
219        state.tax_liability,
220        checked_add(state.crm_event_count, 1, "apply_crm_projected_event")?,
221        state.logistics_event_count,
222    ))
223}
224
225pub fn apply_logistics_projected_event(state: &ValidSystemState) -> DomainResult<ValidSystemState> {
226    Ok(ValidSystemState::new(
227        state.stock,
228        state.ledger.clone(),
229        state.tax_liability,
230        state.crm_event_count,
231        checked_add(
232            state.logistics_event_count,
233            1,
234            "apply_logistics_projected_event",
235        )?,
236    ))
237}
238
239pub fn record_captured_payment(
240    ledger: &PaymentLedger,
241    amount: Money,
242) -> DomainResult<PaymentLedger> {
243    PaymentLedger::try_new(
244        checked_add(ledger.captured(), amount, "record_captured_payment")?,
245        ledger.refunded(),
246    )
247}
248
249pub fn apply_domain_event(
250    state: &ValidSystemState,
251    event: &DomainEvent,
252) -> DomainResult<ValidSystemState> {
253    match event {
254        DomainEvent::OrderPlaced(_, _) | DomainEvent::OrderShipped(_) => Ok(state.clone()),
255        DomainEvent::PaymentCaptured(_, amount) => Ok(ValidSystemState::new(
256            state.stock,
257            record_captured_payment(&state.ledger, *amount)?,
258            state.tax_liability,
259            state.crm_event_count,
260            state.logistics_event_count,
261        )),
262        DomainEvent::RefundIssued(_, amount) => apply_refund_issued_event(state, *amount),
263        DomainEvent::StockReserved(sku, quantity) => {
264            apply_stock_reserved_event(state, *sku, *quantity)
265        }
266        DomainEvent::ReservationReleased(sku, quantity) => {
267            apply_reservation_released_event(state, *sku, *quantity)
268        }
269        DomainEvent::ReservedShipmentConfirmed(sku, quantity) => {
270            apply_reserved_shipment_confirmed_event(state, *sku, *quantity)
271        }
272        DomainEvent::TaxLiabilityRecorded(_, amount) => {
273            apply_tax_liability_recorded_event(state, *amount)
274        }
275        event if domain_event_is_crm(event) => apply_crm_projected_event(state),
276        event if domain_event_is_logistics(event) => apply_logistics_projected_event(state),
277        _ => Ok(state.clone()),
278    }
279}
280
281pub fn replay_domain_events(
282    mut state: ValidSystemState,
283    events: &[DomainEvent],
284) -> DomainResult<ValidSystemState> {
285    for event in events {
286        state = apply_domain_event(&state, event)?;
287    }
288    Ok(state)
289}
290
291pub fn apply_idempotent_domain_event(
292    key: IdempotencyKey,
293    event: &DomainEvent,
294    state: ValidSystemState,
295    idempotency: IdempotencyState,
296) -> DomainResult<(ValidSystemState, IdempotencyState)> {
297    if already_processed(key, &idempotency) {
298        Ok((state, idempotency))
299    } else {
300        Ok((
301            apply_domain_event(&state, event)?,
302            mark_processed(key, &idempotency),
303        ))
304    }
305}
306
307domain_struct! {
308    pub struct EventSnapshot {
309        state: ValidSystemState,
310        last_sequence: Nat,
311    }
312}
313
314pub fn replay_from_snapshot(
315    snapshot: &EventSnapshot,
316    events: &[DomainEvent],
317) -> DomainResult<ValidSystemState> {
318    replay_domain_events(snapshot.state.clone(), events)
319}
320
321pub fn ledger_captured_fold(captured: Money, events: &[DomainEvent]) -> DomainResult<Money> {
322    events.iter().try_fold(captured, |acc, event| match event {
323        DomainEvent::PaymentCaptured(_, amount) => {
324            checked_add(acc, *amount, "ledger_captured_fold")
325        }
326        _ => Ok(acc),
327    })
328}
329
330pub fn ledger_refunded_fold(refunded: Money, events: &[DomainEvent]) -> DomainResult<Money> {
331    events.iter().try_fold(refunded, |acc, event| match event {
332        DomainEvent::RefundIssued(_, amount) => checked_add(acc, *amount, "ledger_refunded_fold"),
333        _ => Ok(acc),
334    })
335}
336
337pub fn tax_liability_fold(liability: Money, events: &[DomainEvent]) -> DomainResult<Money> {
338    events.iter().try_fold(liability, |acc, event| match event {
339        DomainEvent::TaxLiabilityRecorded(_, amount) => {
340            checked_add(acc, *amount, "tax_liability_fold")
341        }
342        _ => Ok(acc),
343    })
344}
345
346pub fn project_tax_liability(
347    opening_liability: Money,
348    events: &[DomainEvent],
349) -> DomainResult<Money> {
350    tax_liability_fold(opening_liability, events)
351}
352
353pub fn project_ledger(
354    mut ledger: PaymentLedger,
355    events: &[DomainEvent],
356) -> DomainResult<PaymentLedger> {
357    for event in events {
358        ledger = match event {
359            DomainEvent::PaymentCaptured(_, amount) => record_captured_payment(&ledger, *amount)?,
360            DomainEvent::RefundIssued(_, amount) => issue_refund(&ledger, *amount)?,
361            _ => ledger,
362        };
363    }
364    Ok(ledger)
365}
366
367pub(crate) const fn _risk_anchor(_: Option<Role>) {}