1use crate::foundation::*;
2use crate::inventory::*;
3use crate::orders::*;
4use crate::risk_privacy::*;
5
6#[derive(Clone, Debug, PartialEq, Eq)]
7#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
8pub enum DomainEvent {
9 OrderPlaced(OrderId, Money),
10 PaymentCaptured(OrderId, Money),
11 RefundIssued(OrderId, Money),
12 StockReserved(Sku, Quantity),
13 ReservationReleased(Sku, Quantity),
14 ReservedShipmentConfirmed(Sku, Quantity),
15 TaxLiabilityRecorded(Id, Money),
16 OrderShipped(OrderId),
17 LeadConverted(LeadId, OpportunityId),
18 SupportCaseOpened(SupportCaseId, Option<OrderId>),
19 ShipmentPlanned(ShipmentId, OrderId),
20 ShipmentDelivered(ShipmentId),
21 ReturnApproved(ReturnAuthorizationId, OrderId, Money),
22}
23
24#[must_use]
25pub const fn domain_event_is_crm(event: &DomainEvent) -> bool {
26 matches!(
27 event,
28 DomainEvent::LeadConverted(_, _) | DomainEvent::SupportCaseOpened(_, _)
29 )
30}
31
32#[must_use]
33pub const fn domain_event_is_logistics(event: &DomainEvent) -> bool {
34 matches!(
35 event,
36 DomainEvent::ShipmentPlanned(_, _)
37 | DomainEvent::ShipmentDelivered(_)
38 | DomainEvent::ReturnApproved(_, _, _)
39 )
40}
41
42domain_struct! {
43 pub struct EventEnvelope {
44 sequence: Nat,
45 event: DomainEvent,
46 }
47}
48
49domain_struct! {
50 pub struct EventStream {
51 events: Vec<EventEnvelope>,
52 last_sequence: Nat,
53 }
54}
55
56#[must_use]
57pub fn stream_sequences_strictly_increase_from(last: Nat, events: &[EventEnvelope]) -> bool {
58 let mut cursor = last;
59 for event in events {
60 if cursor >= event.sequence {
61 return false;
62 }
63 cursor = event.sequence;
64 }
65 true
66}
67
68#[must_use]
69pub fn stream_sequences_strictly_increase(stream: &EventStream) -> bool {
70 stream_sequences_strictly_increase_from(0, &stream.events)
71}
72
73domain_struct! {
74 pub struct WebhookOrderingState {
75 last_sequence: Nat,
76 }
77}
78
79pub const fn apply_webhook(
80 s: &WebhookOrderingState,
81 seq: Nat,
82) -> DomainResult<WebhookOrderingState> {
83 if s.last_sequence >= seq {
84 return Err(ValidationError::Invariant(
85 "webhook sequence must be newer than cursor",
86 ));
87 }
88 Ok(WebhookOrderingState::new(seq))
89}
90
91pub fn replay_webhook_stream(
92 mut state: WebhookOrderingState,
93 events: &[EventEnvelope],
94) -> DomainResult<WebhookOrderingState> {
95 for event in events {
96 state = apply_webhook(&state, event.sequence)?;
97 }
98 Ok(state)
99}
100
101domain_struct! {
102 pub struct IdempotencyState {
103 processed: Vec<IdempotencyKey>,
104 }
105}
106
107#[must_use]
108pub fn already_processed(key: IdempotencyKey, state: &IdempotencyState) -> bool {
109 state.processed.contains(&key)
110}
111
112#[must_use]
113pub fn mark_processed(key: IdempotencyKey, state: &IdempotencyState) -> IdempotencyState {
114 let mut processed = Vec::with_capacity(state.processed.len() + 1);
115 processed.push(key);
116 processed.extend(state.processed.iter().copied());
117 IdempotencyState::new(processed)
118}
119
120domain_struct! {
121 pub struct ValidSystemState {
122 stock: StockState,
123 ledger: PaymentLedger,
124 tax_liability: Money,
125 crm_event_count: Nat,
126 logistics_event_count: Nat,
127 }
128}
129
130pub fn apply_stock_reserved_event(
131 state: &ValidSystemState,
132 sku: Sku,
133 quantity: Quantity,
134) -> DomainResult<ValidSystemState> {
135 if state.stock.sku() != sku {
136 return Err(ValidationError::Invariant("stock-reserved SKU mismatch"));
137 }
138 Ok(ValidSystemState::new(
139 reserve_stock(&state.stock, quantity)?,
140 state.ledger.clone(),
141 state.tax_liability,
142 state.crm_event_count,
143 state.logistics_event_count,
144 ))
145}
146
147pub fn apply_refund_issued_event(
148 state: &ValidSystemState,
149 amount: Money,
150) -> DomainResult<ValidSystemState> {
151 Ok(ValidSystemState::new(
152 state.stock,
153 issue_refund(&state.ledger, amount)?,
154 state.tax_liability,
155 state.crm_event_count,
156 state.logistics_event_count,
157 ))
158}
159
160pub fn apply_reservation_released_event(
161 state: &ValidSystemState,
162 sku: Sku,
163 quantity: Quantity,
164) -> DomainResult<ValidSystemState> {
165 if state.stock.sku() != sku {
166 return Err(ValidationError::Invariant(
167 "reservation-released SKU mismatch",
168 ));
169 }
170 Ok(ValidSystemState::new(
171 release_reserved_stock(&state.stock, quantity)?,
172 state.ledger.clone(),
173 state.tax_liability,
174 state.crm_event_count,
175 state.logistics_event_count,
176 ))
177}
178
179pub fn apply_reserved_shipment_confirmed_event(
180 state: &ValidSystemState,
181 sku: Sku,
182 quantity: Quantity,
183) -> DomainResult<ValidSystemState> {
184 if state.stock.sku() != sku {
185 return Err(ValidationError::Invariant(
186 "reserved-shipment-confirmed SKU mismatch",
187 ));
188 }
189 Ok(ValidSystemState::new(
190 confirm_reserved_shipment(&state.stock, quantity)?,
191 state.ledger.clone(),
192 state.tax_liability,
193 state.crm_event_count,
194 state.logistics_event_count,
195 ))
196}
197
198pub fn apply_tax_liability_recorded_event(
199 state: &ValidSystemState,
200 amount: Money,
201) -> DomainResult<ValidSystemState> {
202 Ok(ValidSystemState::new(
203 state.stock,
204 state.ledger.clone(),
205 checked_add(
206 state.tax_liability,
207 amount,
208 "apply_tax_liability_recorded_event",
209 )?,
210 state.crm_event_count,
211 state.logistics_event_count,
212 ))
213}
214
215pub fn apply_crm_projected_event(state: &ValidSystemState) -> DomainResult<ValidSystemState> {
216 Ok(ValidSystemState::new(
217 state.stock,
218 state.ledger.clone(),
219 state.tax_liability,
220 checked_add(state.crm_event_count, 1, "apply_crm_projected_event")?,
221 state.logistics_event_count,
222 ))
223}
224
225pub fn apply_logistics_projected_event(state: &ValidSystemState) -> DomainResult<ValidSystemState> {
226 Ok(ValidSystemState::new(
227 state.stock,
228 state.ledger.clone(),
229 state.tax_liability,
230 state.crm_event_count,
231 checked_add(
232 state.logistics_event_count,
233 1,
234 "apply_logistics_projected_event",
235 )?,
236 ))
237}
238
239pub fn record_captured_payment(
240 ledger: &PaymentLedger,
241 amount: Money,
242) -> DomainResult<PaymentLedger> {
243 PaymentLedger::try_new(
244 checked_add(ledger.captured(), amount, "record_captured_payment")?,
245 ledger.refunded(),
246 )
247}
248
249pub fn apply_domain_event(
250 state: &ValidSystemState,
251 event: &DomainEvent,
252) -> DomainResult<ValidSystemState> {
253 match event {
254 DomainEvent::OrderPlaced(_, _) | DomainEvent::OrderShipped(_) => Ok(state.clone()),
255 DomainEvent::PaymentCaptured(_, amount) => Ok(ValidSystemState::new(
256 state.stock,
257 record_captured_payment(&state.ledger, *amount)?,
258 state.tax_liability,
259 state.crm_event_count,
260 state.logistics_event_count,
261 )),
262 DomainEvent::RefundIssued(_, amount) => apply_refund_issued_event(state, *amount),
263 DomainEvent::StockReserved(sku, quantity) => {
264 apply_stock_reserved_event(state, *sku, *quantity)
265 }
266 DomainEvent::ReservationReleased(sku, quantity) => {
267 apply_reservation_released_event(state, *sku, *quantity)
268 }
269 DomainEvent::ReservedShipmentConfirmed(sku, quantity) => {
270 apply_reserved_shipment_confirmed_event(state, *sku, *quantity)
271 }
272 DomainEvent::TaxLiabilityRecorded(_, amount) => {
273 apply_tax_liability_recorded_event(state, *amount)
274 }
275 event if domain_event_is_crm(event) => apply_crm_projected_event(state),
276 event if domain_event_is_logistics(event) => apply_logistics_projected_event(state),
277 _ => Ok(state.clone()),
278 }
279}
280
281pub fn replay_domain_events(
282 mut state: ValidSystemState,
283 events: &[DomainEvent],
284) -> DomainResult<ValidSystemState> {
285 for event in events {
286 state = apply_domain_event(&state, event)?;
287 }
288 Ok(state)
289}
290
291pub fn apply_idempotent_domain_event(
292 key: IdempotencyKey,
293 event: &DomainEvent,
294 state: ValidSystemState,
295 idempotency: IdempotencyState,
296) -> DomainResult<(ValidSystemState, IdempotencyState)> {
297 if already_processed(key, &idempotency) {
298 Ok((state, idempotency))
299 } else {
300 Ok((
301 apply_domain_event(&state, event)?,
302 mark_processed(key, &idempotency),
303 ))
304 }
305}
306
307domain_struct! {
308 pub struct EventSnapshot {
309 state: ValidSystemState,
310 last_sequence: Nat,
311 }
312}
313
314pub fn replay_from_snapshot(
315 snapshot: &EventSnapshot,
316 events: &[DomainEvent],
317) -> DomainResult<ValidSystemState> {
318 replay_domain_events(snapshot.state.clone(), events)
319}
320
321pub fn ledger_captured_fold(captured: Money, events: &[DomainEvent]) -> DomainResult<Money> {
322 events.iter().try_fold(captured, |acc, event| match event {
323 DomainEvent::PaymentCaptured(_, amount) => {
324 checked_add(acc, *amount, "ledger_captured_fold")
325 }
326 _ => Ok(acc),
327 })
328}
329
330pub fn ledger_refunded_fold(refunded: Money, events: &[DomainEvent]) -> DomainResult<Money> {
331 events.iter().try_fold(refunded, |acc, event| match event {
332 DomainEvent::RefundIssued(_, amount) => checked_add(acc, *amount, "ledger_refunded_fold"),
333 _ => Ok(acc),
334 })
335}
336
337pub fn tax_liability_fold(liability: Money, events: &[DomainEvent]) -> DomainResult<Money> {
338 events.iter().try_fold(liability, |acc, event| match event {
339 DomainEvent::TaxLiabilityRecorded(_, amount) => {
340 checked_add(acc, *amount, "tax_liability_fold")
341 }
342 _ => Ok(acc),
343 })
344}
345
346pub fn project_tax_liability(
347 opening_liability: Money,
348 events: &[DomainEvent],
349) -> DomainResult<Money> {
350 tax_liability_fold(opening_liability, events)
351}
352
353pub fn project_ledger(
354 mut ledger: PaymentLedger,
355 events: &[DomainEvent],
356) -> DomainResult<PaymentLedger> {
357 for event in events {
358 ledger = match event {
359 DomainEvent::PaymentCaptured(_, amount) => record_captured_payment(&ledger, *amount)?,
360 DomainEvent::RefundIssued(_, amount) => issue_refund(&ledger, *amount)?,
361 _ => ledger,
362 };
363 }
364 Ok(ledger)
365}
366
367pub(crate) const fn _risk_anchor(_: Option<Role>) {}