lifeloop/router/receipts.rs
1//! Receipt synthesis, idempotency, and run-scoped sequencing (issue #14).
2//!
3//! Fills the [`ReceiptEmitter`] seam declared in `src/router/seams.rs`
4//! (issue #7) and implements the status-mapping handoff documented on
5//! [`super::NegotiatedPlan`] (issue #13).
6//!
7//! # Boundary
8//!
9//! Owns:
10//! * [`ReceiptContext`] — caller-supplied identifiers Lifeloop cannot
11//! synthesize (client id, harness ids, root receipt id, parent
12//! receipt id, at-epoch clock value);
13//! * [`SequenceGenerator`] — a per-`lifeloop_run_id` monotonic counter
14//! that scopes [`crate::LifecycleReceipt::sequence`] within a run.
15//! Reset semantics are defined in the type docs;
16//! * [`IdempotencyStore`] (trait + [`InMemoryIdempotencyStore`]) — the
17//! replay-boundary store keyed by
18//! `(client_id, adapter_id, idempotency_key)`. Persistence is the
19//! consumer's concern; the trait exists so a real consumer can plug
20//! in a database without changing this module;
21//! * [`LifeloopReceiptEmitter`] — the concrete emitter that consumes a
22//! [`super::NegotiatedPlan`] + [`crate::CallbackResponse`] and
23//! produces a validated, idempotent [`crate::LifecycleReceipt`];
24//! * [`ReceiptError`] — typed failure variants for emission
25//! (validation failures, idempotency conflicts, the
26//! `receipt.emitted` rejection).
27//!
28//! Does **not** own:
29//! * failure-class mapping for arbitrary
30//! [`super::validation::RouteError`]s — that is issue #15's
31//! [`super::FailureMapper`] seam. This module reads
32//! [`crate::FailureClass`] / [`crate::RetryClass`] directly when
33//! building a `status=failed` receipt;
34//! * receipt persistence beyond the in-memory idempotency cache.
35//! A consumer's durable receipt ledger plugs in via the
36//! [`IdempotencyStore`] trait;
37//! * payload body inspection.
38//!
39//! # Status mapping (mirrors the `NegotiatedPlan` doc and seeds #15)
40//!
41//! | Source | Receipt status | Failure class |
42//! |----------------------------------------------------------------|---------------------|---------------------------------------------------------|
43//! | `event == receipt.emitted` | (rejected — error) | n/a |
44//! | `outcome=Unsupported` | `failed` | `negotiated.failure_class` (≥ `capability_unsupported`) |
45//! | `outcome=RequiresOperator` | `failed` | `operator_required` |
46//! | any `placement_decisions[].Failed` (with non-blocking outcome) | `failed` | first failed decision's `failure_class` |
47//! | `outcome=Degraded` | `degraded` | none |
48//! | `outcome=Satisfied` + `response.status=Failed` | `failed` | `response.failure_class` (REQUIRED by validation) |
49//! | `outcome=Satisfied` + `response.status=Skipped` | `skipped` | none |
50//! | `outcome=Satisfied` + `response.status=Observed` | `observed` | none |
51//! | `outcome=Satisfied` + `response.status=Degraded` | `degraded` | none |
52//! | `outcome=Satisfied` + `response.status=Delivered` | `delivered` | none |
53//!
54//! `retry_class` on a `failed` receipt is seeded via
55//! [`crate::FailureClass::default_retry`] unless the response or
56//! negotiation already provided one.
57//!
58//! NOTE: The spec status vocabulary is `observed | delivered | skipped
59//! | degraded | failed`. The issue brief's "blocked" shorthand for a
60//! halted dispatch maps to `failed` per the spec — there is no
61//! separate `blocked` variant on the wire.
62
63use std::collections::HashMap;
64use std::sync::Mutex;
65
66use crate::{
67 CallbackResponse, FailureClass, LifecycleEventKind, LifecycleReceipt, PayloadReceipt,
68 PlacementOutcome, ReceiptStatus, RetryClass, SCHEMA_VERSION, ValidationError,
69};
70
71use super::negotiation::{NegotiatedPlan, PayloadPlacementDecision};
72use super::seams::ReceiptEmitter;
73
74// ===========================================================================
75// ReceiptError
76// ===========================================================================
77
78/// Failure variants from receipt emission.
79///
80/// `Conflict` is the spec-named `duplicate_id_conflict`: the same
81/// `(client_id, adapter_id, idempotency_key)` tuple was reused with a
82/// receipt body that does not match the prior content.
83#[derive(Debug, Clone, PartialEq, Eq)]
84pub enum ReceiptError {
85 /// `receipt.emitted` is a notification event and must not itself
86 /// produce a receipt. Rejected at emit time so a misuse cannot
87 /// land in the idempotency store.
88 ReceiptEmittedNotEmittable,
89 /// Same `idempotency_key` reused with different content. Maps
90 /// onto the spec's `duplicate_id_conflict` failure class.
91 Conflict { idempotency_key: String },
92 /// The synthesized receipt failed [`LifecycleReceipt::validate`].
93 Invalid(ValidationError),
94}
95
96impl std::fmt::Display for ReceiptError {
97 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
98 match self {
99 Self::ReceiptEmittedNotEmittable => f.write_str(
100 "receipt.emitted is a notification event and does not itself \
101 produce a receipt",
102 ),
103 Self::Conflict { idempotency_key } => write!(
104 f,
105 "idempotency_key `{idempotency_key}` was reused with \
106 different receipt content (duplicate_id_conflict)"
107 ),
108 Self::Invalid(e) => write!(f, "synthesized receipt failed validation: {e}"),
109 }
110 }
111}
112
113impl std::error::Error for ReceiptError {}
114
115impl From<ValidationError> for ReceiptError {
116 fn from(e: ValidationError) -> Self {
117 Self::Invalid(e)
118 }
119}
120
121// ===========================================================================
122// ReceiptContext
123// ===========================================================================
124
125/// Caller-supplied identifiers and clock value Lifeloop cannot
126/// synthesize on its own. Every field except the harness ids is
127/// required.
128#[derive(Debug, Clone, PartialEq, Eq)]
129pub struct ReceiptContext {
130 /// Client-declared stable label scoping idempotency and replay.
131 /// Each lifecycle client picks its own opaque identifier; see the
132 /// spec's "Receipt Schema" section in
133 /// `docs/specs/lifecycle-contract/body.md` for the illustrative
134 /// client-label list. Required-non-empty.
135 pub client_id: String,
136 /// Opaque identifier for *this* emitted receipt. Required-non-empty.
137 pub receipt_id: String,
138 /// Optional parent receipt id for nested or causally linked
139 /// lifecycle receipts. `None` for root receipts.
140 pub parent_receipt_id: Option<String>,
141 /// Wall-clock value to stamp on the receipt. Required.
142 pub at_epoch_s: u64,
143 /// Optional harness session id correlation.
144 pub harness_session_id: Option<String>,
145 /// Optional harness run id correlation. Receipts in the same
146 /// `lifeloop_run_id` (= `harness_run_id` when present) share a
147 /// monotonic [`SequenceGenerator`] counter; absent run id means
148 /// no in-run sequence is synthesized (`sequence` = `None`).
149 pub harness_run_id: Option<String>,
150 /// Optional harness task id correlation.
151 pub harness_task_id: Option<String>,
152}
153
154// ===========================================================================
155// SequenceGenerator
156// ===========================================================================
157
158/// Per-`lifeloop_run_id` monotonic sequence counter.
159///
160/// The spec ties `sequence` to "the strongest available durable
161/// session scope". For a Lifeloop-synthesized receipt, that scope is
162/// the `lifeloop_run_id` (= caller's `harness_run_id`). The generator
163/// keeps one counter per run id and hands out 1, 2, 3, ... within a
164/// run.
165///
166/// Sequence is per run, *not* global: emitting under run A then run
167/// B then run A again produces A=1, B=1, A=2.
168///
169/// When the caller supplies no `harness_run_id`, the emitter sets
170/// `sequence=None` rather than inventing a misleading cross-run order
171/// — see the spec's "required and nullable" rule for `sequence`.
172#[derive(Debug, Default)]
173pub struct SequenceGenerator {
174 counters: Mutex<HashMap<String, u64>>,
175}
176
177impl SequenceGenerator {
178 pub fn new() -> Self {
179 Self::default()
180 }
181
182 /// Allocate the next sequence value for `run_id` and return it.
183 /// Counters start at 1 within a run.
184 pub fn next(&self, run_id: &str) -> u64 {
185 let mut guard = self.counters.lock().expect("sequence mutex poisoned");
186 let slot = guard.entry(run_id.to_string()).or_insert(0);
187 *slot += 1;
188 *slot
189 }
190
191 /// Inspect (without advancing) the current counter for `run_id`.
192 /// Returns `0` when the run has not yet emitted.
193 pub fn peek(&self, run_id: &str) -> u64 {
194 let guard = self.counters.lock().expect("sequence mutex poisoned");
195 guard.get(run_id).copied().unwrap_or(0)
196 }
197}
198
199// ===========================================================================
200// IdempotencyStore
201// ===========================================================================
202
203/// Composite key the idempotency store uses internally. The spec
204/// scopes idempotency by `(client_id, adapter_id, idempotency_key)`
205/// — the key type pins that triple so a consumer's persistent store
206/// cannot accidentally collapse the scope.
207#[derive(Debug, Clone, Hash, PartialEq, Eq)]
208pub struct IdempotencyKey {
209 pub client_id: String,
210 pub adapter_id: String,
211 pub idempotency_key: String,
212}
213
214/// Replay-boundary store. Distinct trait so a consumer can plug in a
215/// real database without this module knowing.
216///
217/// Implementations MUST be content-equality stable: a `get` after
218/// `put(k, v)` returns a value equal (by `LifecycleReceipt::eq`) to
219/// `v`. The in-memory implementation below is the reference shape.
220pub trait IdempotencyStore {
221 /// Look up the prior receipt for `key`, if any.
222 fn get(&self, key: &IdempotencyKey) -> Option<LifecycleReceipt>;
223
224 /// Insert or replay-confirm `receipt` under `key`. Implementations
225 /// must enforce the spec's idempotency rule:
226 /// * if no prior entry exists → insert and return
227 /// `Ok(StoreOutcome::Inserted)`;
228 /// * if a prior entry exists with content equal to `receipt` →
229 /// return `Ok(StoreOutcome::Replayed(prior))` *without*
230 /// overwriting;
231 /// * if a prior entry exists with different content → return
232 /// `Err(ReceiptError::Conflict { .. })`.
233 fn put(
234 &self,
235 key: &IdempotencyKey,
236 receipt: &LifecycleReceipt,
237 ) -> Result<StoreOutcome, ReceiptError>;
238}
239
240/// Outcome of a successful [`IdempotencyStore::put`].
241///
242/// `Replayed` carries a full [`LifecycleReceipt`]. The size disparity
243/// between `Inserted` (zero-sized) and `Replayed` triggers
244/// `clippy::large_enum_variant`; we accept it rather than boxing
245/// because every `put()` returns a stack-local `StoreOutcome` and
246/// boxing would force every `IdempotencyStore` impl through an extra
247/// indirection for no measurable benefit (mirrors the choice for
248/// `AdapterResolution::Found` in `validation.rs`).
249#[derive(Debug, Clone, PartialEq, Eq)]
250#[allow(clippy::large_enum_variant)]
251pub enum StoreOutcome {
252 /// First write under this key. The supplied receipt is now the
253 /// canonical value.
254 Inserted,
255 /// Replay of an existing receipt; the prior value is returned
256 /// unchanged.
257 Replayed(LifecycleReceipt),
258}
259
260/// In-memory reference [`IdempotencyStore`] backed by a `HashMap`
261/// behind a `Mutex` for interior mutability through `&self`.
262///
263/// Suitable for tests and for single-process consumers who do not
264/// need durable replay protection.
265#[derive(Debug, Default)]
266pub struct InMemoryIdempotencyStore {
267 inner: Mutex<HashMap<IdempotencyKey, LifecycleReceipt>>,
268}
269
270impl InMemoryIdempotencyStore {
271 pub fn new() -> Self {
272 Self::default()
273 }
274
275 /// Number of receipts currently stored. Mostly for diagnostics
276 /// and tests.
277 pub fn len(&self) -> usize {
278 self.inner.lock().expect("idem mutex poisoned").len()
279 }
280
281 pub fn is_empty(&self) -> bool {
282 self.len() == 0
283 }
284}
285
286impl IdempotencyStore for InMemoryIdempotencyStore {
287 fn get(&self, key: &IdempotencyKey) -> Option<LifecycleReceipt> {
288 self.inner
289 .lock()
290 .expect("idem mutex poisoned")
291 .get(key)
292 .cloned()
293 }
294
295 fn put(
296 &self,
297 key: &IdempotencyKey,
298 receipt: &LifecycleReceipt,
299 ) -> Result<StoreOutcome, ReceiptError> {
300 let mut guard = self.inner.lock().expect("idem mutex poisoned");
301 if let Some(prior) = guard.get(key) {
302 if prior == receipt {
303 return Ok(StoreOutcome::Replayed(prior.clone()));
304 }
305 return Err(ReceiptError::Conflict {
306 idempotency_key: key.idempotency_key.clone(),
307 });
308 }
309 guard.insert(key.clone(), receipt.clone());
310 Ok(StoreOutcome::Inserted)
311 }
312}
313
314// ===========================================================================
315// LifeloopReceiptEmitter
316// ===========================================================================
317
318/// Concrete [`ReceiptEmitter`] for issue #14.
319///
320/// Holds a sequence generator and an idempotency store. The
321/// [`ReceiptEmitter::emit`] trait method takes a fully-built
322/// [`LifecycleReceipt`] (the seam from issue #7) and validates +
323/// idempotency-checks it. The richer
324/// [`Self::synthesize_and_emit`] entry point consumes a
325/// [`NegotiatedPlan`] + [`CallbackResponse`] + [`ReceiptContext`] and
326/// builds the receipt before storing it — that is the path issue #14
327/// is delivering.
328#[derive(Debug)]
329pub struct LifeloopReceiptEmitter<S: IdempotencyStore = InMemoryIdempotencyStore> {
330 sequencer: SequenceGenerator,
331 store: S,
332}
333
334impl LifeloopReceiptEmitter<InMemoryIdempotencyStore> {
335 /// Construct an emitter backed by an in-memory idempotency store.
336 pub fn in_memory() -> Self {
337 Self {
338 sequencer: SequenceGenerator::new(),
339 store: InMemoryIdempotencyStore::new(),
340 }
341 }
342}
343
344impl<S: IdempotencyStore> LifeloopReceiptEmitter<S> {
345 /// Construct an emitter with a caller-supplied store (e.g. a
346 /// database-backed implementation in production).
347 pub fn with_store(store: S) -> Self {
348 Self {
349 sequencer: SequenceGenerator::new(),
350 store,
351 }
352 }
353
354 pub fn store(&self) -> &S {
355 &self.store
356 }
357
358 pub fn sequencer(&self) -> &SequenceGenerator {
359 &self.sequencer
360 }
361
362 /// Build, validate, and idempotency-check a receipt from a
363 /// negotiation result + the client's response.
364 ///
365 /// Returns the canonical receipt (newly inserted, or the prior
366 /// replay value when the same idempotency key is reused with the
367 /// same content).
368 pub fn synthesize_and_emit(
369 &self,
370 negotiated: &NegotiatedPlan,
371 response: &CallbackResponse,
372 ctx: &ReceiptContext,
373 ) -> Result<LifecycleReceipt, ReceiptError> {
374 // `receipt.emitted` is a notification event and never itself
375 // produces a receipt. Reject at emit time so the
376 // idempotency store cannot record an illegal receipt even
377 // before validate() runs.
378 if matches!(negotiated.plan.event, LifecycleEventKind::ReceiptEmitted) {
379 return Err(ReceiptError::ReceiptEmittedNotEmittable);
380 }
381
382 let (status, failure_class, retry_class) = derive_status(negotiated, response);
383
384 // Replay check BEFORE sequence allocation: if this idempotency
385 // key has already been recorded, reuse the prior sequence so a
386 // content-equality replay short-circuits cleanly. Otherwise a
387 // replay would synthesize seq=N+1, fail the content-equality
388 // check, and surface as a false Conflict.
389 let prior_sequence = negotiated.plan.idempotency_key.as_deref().and_then(|idem| {
390 let key = IdempotencyKey {
391 client_id: ctx.client_id.clone(),
392 adapter_id: negotiated.plan.adapter.adapter_id.clone(),
393 idempotency_key: idem.to_string(),
394 };
395 self.store.get(&key).map(|prior| prior.sequence)
396 });
397
398 // Sequence is scoped to lifeloop_run_id (= harness_run_id
399 // when supplied). Without a run id, leave sequence null.
400 let sequence = match prior_sequence {
401 Some(reused) => reused,
402 None => ctx
403 .harness_run_id
404 .as_deref()
405 .map(|run| self.sequencer.next(run)),
406 };
407
408 let receipt = LifecycleReceipt {
409 schema_version: SCHEMA_VERSION.to_string(),
410 receipt_id: ctx.receipt_id.clone(),
411 idempotency_key: negotiated.plan.idempotency_key.clone(),
412 client_id: ctx.client_id.clone(),
413 adapter_id: negotiated.plan.adapter.adapter_id.clone(),
414 invocation_id: negotiated.plan.invocation_id.clone(),
415 event: negotiated.plan.event,
416 event_id: negotiated.plan.event_id.clone(),
417 sequence,
418 parent_receipt_id: ctx.parent_receipt_id.clone(),
419 integration_mode: negotiated.plan.integration_mode,
420 status,
421 at_epoch_s: ctx.at_epoch_s,
422 harness_session_id: ctx.harness_session_id.clone(),
423 harness_run_id: ctx.harness_run_id.clone(),
424 harness_task_id: ctx.harness_task_id.clone(),
425 payload_receipts: payload_receipts_from(negotiated),
426 telemetry_summary: serde_json::Map::new(),
427 capability_degradations: negotiated.capability_degradations.clone(),
428 failure_class,
429 retry_class,
430 warnings: merged_warnings(negotiated, response),
431 };
432
433 receipt.validate()?;
434
435 match negotiated.plan.idempotency_key.as_deref() {
436 Some(idem) => {
437 let key = IdempotencyKey {
438 client_id: ctx.client_id.clone(),
439 adapter_id: negotiated.plan.adapter.adapter_id.clone(),
440 idempotency_key: idem.to_string(),
441 };
442 match self.store.put(&key, &receipt)? {
443 StoreOutcome::Inserted => Ok(receipt),
444 StoreOutcome::Replayed(prior) => Ok(prior),
445 }
446 }
447 // No idempotency key — the receipt itself is the replay
448 // boundary; we hand it back unstored. A future durable
449 // ledger may still record it under `receipt_id`, but
450 // that is the consumer's concern.
451 None => Ok(receipt),
452 }
453 }
454}
455
456impl<S: IdempotencyStore> ReceiptEmitter for LifeloopReceiptEmitter<S> {
457 type Error = ReceiptError;
458
459 /// Validate + idempotency-check an externally-built
460 /// [`LifecycleReceipt`]. The richer
461 /// [`Self::synthesize_and_emit`] is the path issue #14 is
462 /// delivering; this trait method exists to satisfy the issue #7
463 /// seam contract for callers that already hold a built receipt.
464 fn emit(&self, receipt: &LifecycleReceipt) -> Result<(), Self::Error> {
465 receipt.validate()?;
466 if let Some(idem) = receipt.idempotency_key.as_deref() {
467 let key = IdempotencyKey {
468 client_id: receipt.client_id.clone(),
469 adapter_id: receipt.adapter_id.clone(),
470 idempotency_key: idem.to_string(),
471 };
472 self.store.put(&key, receipt)?;
473 }
474 Ok(())
475 }
476}
477
478// ===========================================================================
479// Status derivation (the public mapping table mirrored at the top of file)
480// ===========================================================================
481
482fn derive_status(
483 negotiated: &NegotiatedPlan,
484 response: &CallbackResponse,
485) -> (ReceiptStatus, Option<FailureClass>, Option<RetryClass>) {
486 use crate::NegotiationOutcome as NO;
487
488 // Capability outcomes that block dispatch always win.
489 match negotiated.outcome {
490 NO::Unsupported => {
491 let fc = negotiated
492 .failure_class
493 .unwrap_or(FailureClass::CapabilityUnsupported);
494 return (ReceiptStatus::Failed, Some(fc), Some(fc.default_retry()));
495 }
496 NO::RequiresOperator => {
497 let fc = FailureClass::OperatorRequired;
498 return (ReceiptStatus::Failed, Some(fc), Some(fc.default_retry()));
499 }
500 NO::Degraded => {
501 // Fall through to client-status-aware refinement below
502 // unless the client itself reported Failed.
503 if matches!(response.status, ReceiptStatus::Failed) {
504 let fc = response
505 .failure_class
506 .unwrap_or(FailureClass::InternalError);
507 let rc = response.retry_class.unwrap_or(fc.default_retry());
508 return (ReceiptStatus::Failed, Some(fc), Some(rc));
509 }
510 return (ReceiptStatus::Degraded, None, None);
511 }
512 NO::Satisfied => {}
513 }
514
515 // Satisfied negotiation: a placement decision may still have
516 // failed (e.g. payload_too_large on a non-blocking outcome —
517 // negotiation already escalates to Unsupported when this
518 // happens, but be defensive).
519 if let Some(failed) = negotiated.placement_decisions.iter().find_map(|d| match d {
520 PayloadPlacementDecision::Failed { failure_class, .. } => Some(*failure_class),
521 _ => None,
522 }) {
523 return (
524 ReceiptStatus::Failed,
525 Some(failed),
526 Some(failed.default_retry()),
527 );
528 }
529
530 // Honor the client's reported status.
531 match response.status {
532 ReceiptStatus::Failed => {
533 let fc = response
534 .failure_class
535 .unwrap_or(FailureClass::InternalError);
536 let rc = response.retry_class.unwrap_or(fc.default_retry());
537 (ReceiptStatus::Failed, Some(fc), Some(rc))
538 }
539 ReceiptStatus::Skipped => (ReceiptStatus::Skipped, None, None),
540 ReceiptStatus::Observed => (ReceiptStatus::Observed, None, None),
541 ReceiptStatus::Degraded => (ReceiptStatus::Degraded, None, None),
542 ReceiptStatus::Delivered => (ReceiptStatus::Delivered, None, None),
543 }
544}
545
546fn payload_receipts_from(negotiated: &NegotiatedPlan) -> Vec<PayloadReceipt> {
547 // Map placement decisions onto wire PayloadReceipts. Provenance
548 // comes from the payload envelope that negotiation evaluated, not
549 // from CallbackResponse::client_payloads; response payloads are
550 // client output and may be sparse or derived.
551 negotiated
552 .placement_decisions
553 .iter()
554 .map(|d| match d {
555 PayloadPlacementDecision::Chosen {
556 payload_id,
557 payload_kind,
558 byte_size,
559 content_digest,
560 chosen,
561 ..
562 } => PayloadReceipt {
563 payload_id: payload_id.clone(),
564 payload_kind: payload_kind.clone(),
565 placement: *chosen,
566 status: PlacementOutcome::Delivered,
567 byte_size: *byte_size,
568 content_digest: content_digest.clone(),
569 },
570 PayloadPlacementDecision::Failed {
571 payload_id,
572 payload_kind,
573 byte_size,
574 content_digest,
575 rejected,
576 ..
577 } => PayloadReceipt {
578 payload_id: payload_id.clone(),
579 payload_kind: payload_kind.clone(),
580 // No placement won; surface the first attempted one
581 // for diagnostics. Falls back to ReceiptOnly when
582 // there were no attempts (unusual).
583 placement: rejected
584 .first()
585 .map(|r| r.placement())
586 .unwrap_or(crate::PlacementClass::ReceiptOnly),
587 status: PlacementOutcome::Failed,
588 byte_size: *byte_size,
589 content_digest: content_digest.clone(),
590 },
591 })
592 .collect()
593}
594
595fn merged_warnings(
596 negotiated: &NegotiatedPlan,
597 response: &CallbackResponse,
598) -> Vec<crate::Warning> {
599 let mut out = negotiated.warnings.clone();
600 out.extend(response.warnings.iter().cloned());
601 out
602}