Skip to main content

lifeloop/router/
receipts.rs

1//! Receipt synthesis, idempotency, and run-scoped sequencing (issue #14).
2//!
3//! Fills the [`ReceiptEmitter`] seam declared in `src/router/seams.rs`
4//! (issue #7) and implements the status-mapping handoff documented on
5//! [`super::NegotiatedPlan`] (issue #13).
6//!
7//! # Boundary
8//!
9//! Owns:
10//! * [`ReceiptContext`] — caller-supplied identifiers Lifeloop cannot
11//!   synthesize (client id, harness ids, root receipt id, parent
12//!   receipt id, at-epoch clock value);
13//! * [`SequenceGenerator`] — a per-`lifeloop_run_id` monotonic counter
14//!   that scopes [`crate::LifecycleReceipt::sequence`] within a run.
15//!   Reset semantics are defined in the type docs;
16//! * [`IdempotencyStore`] (trait + [`InMemoryIdempotencyStore`]) — the
17//!   replay-boundary store keyed by
18//!   `(client_id, adapter_id, idempotency_key)`. Persistence is the
19//!   consumer's concern; the trait exists so a real consumer can plug
20//!   in a database without changing this module;
21//! * [`LifeloopReceiptEmitter`] — the concrete emitter that consumes a
22//!   [`super::NegotiatedPlan`] + [`crate::CallbackResponse`] and
23//!   produces a validated, idempotent [`crate::LifecycleReceipt`];
24//! * [`ReceiptError`] — typed failure variants for emission
25//!   (validation failures, idempotency conflicts, the
26//!   `receipt.emitted` rejection).
27//!
28//! Does **not** own:
29//! * failure-class mapping for arbitrary
30//!   [`super::validation::RouteError`]s — that is issue #15's
31//!   [`super::FailureMapper`] seam. This module reads
32//!   [`crate::FailureClass`] / [`crate::RetryClass`] directly when
33//!   building a `status=failed` receipt;
34//! * receipt persistence beyond the in-memory idempotency cache.
35//!   A consumer's durable receipt ledger plugs in via the
36//!   [`IdempotencyStore`] trait;
37//! * payload body inspection.
38//!
39//! # Status mapping (mirrors the `NegotiatedPlan` doc and seeds #15)
40//!
41//! | Source                                                         | Receipt status      | Failure class                                           |
42//! |----------------------------------------------------------------|---------------------|---------------------------------------------------------|
43//! | `event == receipt.emitted`                                     | (rejected — error)  | n/a                                                     |
44//! | `outcome=Unsupported`                                          | `failed`            | `negotiated.failure_class` (≥ `capability_unsupported`) |
45//! | `outcome=RequiresOperator`                                     | `failed`            | `operator_required`                                     |
46//! | any `placement_decisions[].Failed` (with non-blocking outcome) | `failed`            | first failed decision's `failure_class`                 |
47//! | `outcome=Degraded`                                             | `degraded`          | none                                                    |
48//! | `outcome=Satisfied` + `response.status=Failed`                 | `failed`            | `response.failure_class` (REQUIRED by validation)       |
49//! | `outcome=Satisfied` + `response.status=Skipped`                | `skipped`           | none                                                    |
50//! | `outcome=Satisfied` + `response.status=Observed`               | `observed`          | none                                                    |
51//! | `outcome=Satisfied` + `response.status=Degraded`               | `degraded`          | none                                                    |
52//! | `outcome=Satisfied` + `response.status=Delivered`              | `delivered`         | none                                                    |
53//!
54//! `retry_class` on a `failed` receipt is seeded via
55//! [`crate::FailureClass::default_retry`] unless the response or
56//! negotiation already provided one.
57//!
58//! NOTE: The spec status vocabulary is `observed | delivered | skipped
59//! | degraded | failed`. The issue brief's "blocked" shorthand for a
60//! halted dispatch maps to `failed` per the spec — there is no
61//! separate `blocked` variant on the wire.
62
63use std::collections::HashMap;
64use std::sync::Mutex;
65
66use crate::{
67    CallbackResponse, FailureClass, LifecycleEventKind, LifecycleReceipt, PayloadReceipt,
68    PlacementOutcome, ReceiptStatus, RetryClass, SCHEMA_VERSION, ValidationError,
69};
70
71use super::negotiation::{NegotiatedPlan, PayloadPlacementDecision};
72use super::seams::ReceiptEmitter;
73
74// ===========================================================================
75// ReceiptError
76// ===========================================================================
77
78/// Failure variants from receipt emission.
79///
80/// `Conflict` is the spec-named `duplicate_id_conflict`: the same
81/// `(client_id, adapter_id, idempotency_key)` tuple was reused with a
82/// receipt body that does not match the prior content.
83#[derive(Debug, Clone, PartialEq, Eq)]
84pub enum ReceiptError {
85    /// `receipt.emitted` is a notification event and must not itself
86    /// produce a receipt. Rejected at emit time so a misuse cannot
87    /// land in the idempotency store.
88    ReceiptEmittedNotEmittable,
89    /// Same `idempotency_key` reused with different content. Maps
90    /// onto the spec's `duplicate_id_conflict` failure class.
91    Conflict { idempotency_key: String },
92    /// The synthesized receipt failed [`LifecycleReceipt::validate`].
93    Invalid(ValidationError),
94}
95
96impl std::fmt::Display for ReceiptError {
97    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
98        match self {
99            Self::ReceiptEmittedNotEmittable => f.write_str(
100                "receipt.emitted is a notification event and does not itself \
101                 produce a receipt",
102            ),
103            Self::Conflict { idempotency_key } => write!(
104                f,
105                "idempotency_key `{idempotency_key}` was reused with \
106                 different receipt content (duplicate_id_conflict)"
107            ),
108            Self::Invalid(e) => write!(f, "synthesized receipt failed validation: {e}"),
109        }
110    }
111}
112
113impl std::error::Error for ReceiptError {}
114
115impl From<ValidationError> for ReceiptError {
116    fn from(e: ValidationError) -> Self {
117        Self::Invalid(e)
118    }
119}
120
121// ===========================================================================
122// ReceiptContext
123// ===========================================================================
124
125/// Caller-supplied identifiers and clock value Lifeloop cannot
126/// synthesize on its own. Every field except the harness ids is
127/// required.
128#[derive(Debug, Clone, PartialEq, Eq)]
129pub struct ReceiptContext {
130    /// Client-declared stable label scoping idempotency and replay.
131    /// Each lifecycle client picks its own opaque identifier; see the
132    /// spec's "Receipt Schema" section in
133    /// `docs/specs/lifecycle-contract/body.md` for the illustrative
134    /// client-label list. Required-non-empty.
135    pub client_id: String,
136    /// Opaque identifier for *this* emitted receipt. Required-non-empty.
137    pub receipt_id: String,
138    /// Optional parent receipt id for nested or causally linked
139    /// lifecycle receipts. `None` for root receipts.
140    pub parent_receipt_id: Option<String>,
141    /// Wall-clock value to stamp on the receipt. Required.
142    pub at_epoch_s: u64,
143    /// Optional harness session id correlation.
144    pub harness_session_id: Option<String>,
145    /// Optional harness run id correlation. Receipts in the same
146    /// `lifeloop_run_id` (= `harness_run_id` when present) share a
147    /// monotonic [`SequenceGenerator`] counter; absent run id means
148    /// no in-run sequence is synthesized (`sequence` = `None`).
149    pub harness_run_id: Option<String>,
150    /// Optional harness task id correlation.
151    pub harness_task_id: Option<String>,
152}
153
154// ===========================================================================
155// SequenceGenerator
156// ===========================================================================
157
158/// Per-`lifeloop_run_id` monotonic sequence counter.
159///
160/// The spec ties `sequence` to "the strongest available durable
161/// session scope". For a Lifeloop-synthesized receipt, that scope is
162/// the `lifeloop_run_id` (= caller's `harness_run_id`). The generator
163/// keeps one counter per run id and hands out 1, 2, 3, ... within a
164/// run.
165///
166/// Sequence is per run, *not* global: emitting under run A then run
167/// B then run A again produces A=1, B=1, A=2.
168///
169/// When the caller supplies no `harness_run_id`, the emitter sets
170/// `sequence=None` rather than inventing a misleading cross-run order
171/// — see the spec's "required and nullable" rule for `sequence`.
172#[derive(Debug, Default)]
173pub struct SequenceGenerator {
174    counters: Mutex<HashMap<String, u64>>,
175}
176
177impl SequenceGenerator {
178    pub fn new() -> Self {
179        Self::default()
180    }
181
182    /// Allocate the next sequence value for `run_id` and return it.
183    /// Counters start at 1 within a run.
184    pub fn next(&self, run_id: &str) -> u64 {
185        let mut guard = self.counters.lock().expect("sequence mutex poisoned");
186        let slot = guard.entry(run_id.to_string()).or_insert(0);
187        *slot += 1;
188        *slot
189    }
190
191    /// Inspect (without advancing) the current counter for `run_id`.
192    /// Returns `0` when the run has not yet emitted.
193    pub fn peek(&self, run_id: &str) -> u64 {
194        let guard = self.counters.lock().expect("sequence mutex poisoned");
195        guard.get(run_id).copied().unwrap_or(0)
196    }
197}
198
199// ===========================================================================
200// IdempotencyStore
201// ===========================================================================
202
203/// Composite key the idempotency store uses internally. The spec
204/// scopes idempotency by `(client_id, adapter_id, idempotency_key)`
205/// — the key type pins that triple so a consumer's persistent store
206/// cannot accidentally collapse the scope.
207#[derive(Debug, Clone, Hash, PartialEq, Eq)]
208pub struct IdempotencyKey {
209    pub client_id: String,
210    pub adapter_id: String,
211    pub idempotency_key: String,
212}
213
214/// Replay-boundary store. Distinct trait so a consumer can plug in a
215/// real database without this module knowing.
216///
217/// Implementations MUST be content-equality stable: a `get` after
218/// `put(k, v)` returns a value equal (by `LifecycleReceipt::eq`) to
219/// `v`. The in-memory implementation below is the reference shape.
220pub trait IdempotencyStore {
221    /// Look up the prior receipt for `key`, if any.
222    fn get(&self, key: &IdempotencyKey) -> Option<LifecycleReceipt>;
223
224    /// Insert or replay-confirm `receipt` under `key`. Implementations
225    /// must enforce the spec's idempotency rule:
226    /// * if no prior entry exists → insert and return
227    ///   `Ok(StoreOutcome::Inserted)`;
228    /// * if a prior entry exists with content equal to `receipt` →
229    ///   return `Ok(StoreOutcome::Replayed(prior))` *without*
230    ///   overwriting;
231    /// * if a prior entry exists with different content → return
232    ///   `Err(ReceiptError::Conflict { .. })`.
233    fn put(
234        &self,
235        key: &IdempotencyKey,
236        receipt: &LifecycleReceipt,
237    ) -> Result<StoreOutcome, ReceiptError>;
238}
239
240/// Outcome of a successful [`IdempotencyStore::put`].
241///
242/// `Replayed` carries a full [`LifecycleReceipt`]. The size disparity
243/// between `Inserted` (zero-sized) and `Replayed` triggers
244/// `clippy::large_enum_variant`; we accept it rather than boxing
245/// because every `put()` returns a stack-local `StoreOutcome` and
246/// boxing would force every `IdempotencyStore` impl through an extra
247/// indirection for no measurable benefit (mirrors the choice for
248/// `AdapterResolution::Found` in `validation.rs`).
249#[derive(Debug, Clone, PartialEq, Eq)]
250#[allow(clippy::large_enum_variant)]
251pub enum StoreOutcome {
252    /// First write under this key. The supplied receipt is now the
253    /// canonical value.
254    Inserted,
255    /// Replay of an existing receipt; the prior value is returned
256    /// unchanged.
257    Replayed(LifecycleReceipt),
258}
259
260/// In-memory reference [`IdempotencyStore`] backed by a `HashMap`
261/// behind a `Mutex` for interior mutability through `&self`.
262///
263/// Suitable for tests and for single-process consumers who do not
264/// need durable replay protection.
265#[derive(Debug, Default)]
266pub struct InMemoryIdempotencyStore {
267    inner: Mutex<HashMap<IdempotencyKey, LifecycleReceipt>>,
268}
269
270impl InMemoryIdempotencyStore {
271    pub fn new() -> Self {
272        Self::default()
273    }
274
275    /// Number of receipts currently stored. Mostly for diagnostics
276    /// and tests.
277    pub fn len(&self) -> usize {
278        self.inner.lock().expect("idem mutex poisoned").len()
279    }
280
281    pub fn is_empty(&self) -> bool {
282        self.len() == 0
283    }
284}
285
286impl IdempotencyStore for InMemoryIdempotencyStore {
287    fn get(&self, key: &IdempotencyKey) -> Option<LifecycleReceipt> {
288        self.inner
289            .lock()
290            .expect("idem mutex poisoned")
291            .get(key)
292            .cloned()
293    }
294
295    fn put(
296        &self,
297        key: &IdempotencyKey,
298        receipt: &LifecycleReceipt,
299    ) -> Result<StoreOutcome, ReceiptError> {
300        let mut guard = self.inner.lock().expect("idem mutex poisoned");
301        if let Some(prior) = guard.get(key) {
302            if prior == receipt {
303                return Ok(StoreOutcome::Replayed(prior.clone()));
304            }
305            return Err(ReceiptError::Conflict {
306                idempotency_key: key.idempotency_key.clone(),
307            });
308        }
309        guard.insert(key.clone(), receipt.clone());
310        Ok(StoreOutcome::Inserted)
311    }
312}
313
314// ===========================================================================
315// LifeloopReceiptEmitter
316// ===========================================================================
317
318/// Concrete [`ReceiptEmitter`] for issue #14.
319///
320/// Holds a sequence generator and an idempotency store. The
321/// [`ReceiptEmitter::emit`] trait method takes a fully-built
322/// [`LifecycleReceipt`] (the seam from issue #7) and validates +
323/// idempotency-checks it. The richer
324/// [`Self::synthesize_and_emit`] entry point consumes a
325/// [`NegotiatedPlan`] + [`CallbackResponse`] + [`ReceiptContext`] and
326/// builds the receipt before storing it — that is the path issue #14
327/// is delivering.
328#[derive(Debug)]
329pub struct LifeloopReceiptEmitter<S: IdempotencyStore = InMemoryIdempotencyStore> {
330    sequencer: SequenceGenerator,
331    store: S,
332}
333
334impl LifeloopReceiptEmitter<InMemoryIdempotencyStore> {
335    /// Construct an emitter backed by an in-memory idempotency store.
336    pub fn in_memory() -> Self {
337        Self {
338            sequencer: SequenceGenerator::new(),
339            store: InMemoryIdempotencyStore::new(),
340        }
341    }
342}
343
344impl<S: IdempotencyStore> LifeloopReceiptEmitter<S> {
345    /// Construct an emitter with a caller-supplied store (e.g. a
346    /// database-backed implementation in production).
347    pub fn with_store(store: S) -> Self {
348        Self {
349            sequencer: SequenceGenerator::new(),
350            store,
351        }
352    }
353
354    pub fn store(&self) -> &S {
355        &self.store
356    }
357
358    pub fn sequencer(&self) -> &SequenceGenerator {
359        &self.sequencer
360    }
361
362    /// Build, validate, and idempotency-check a receipt from a
363    /// negotiation result + the client's response.
364    ///
365    /// Returns the canonical receipt (newly inserted, or the prior
366    /// replay value when the same idempotency key is reused with the
367    /// same content).
368    pub fn synthesize_and_emit(
369        &self,
370        negotiated: &NegotiatedPlan,
371        response: &CallbackResponse,
372        ctx: &ReceiptContext,
373    ) -> Result<LifecycleReceipt, ReceiptError> {
374        // `receipt.emitted` is a notification event and never itself
375        // produces a receipt. Reject at emit time so the
376        // idempotency store cannot record an illegal receipt even
377        // before validate() runs.
378        if matches!(negotiated.plan.event, LifecycleEventKind::ReceiptEmitted) {
379            return Err(ReceiptError::ReceiptEmittedNotEmittable);
380        }
381
382        let (status, failure_class, retry_class) = derive_status(negotiated, response);
383
384        // Replay check BEFORE sequence allocation: if this idempotency
385        // key has already been recorded, reuse the prior sequence so a
386        // content-equality replay short-circuits cleanly. Otherwise a
387        // replay would synthesize seq=N+1, fail the content-equality
388        // check, and surface as a false Conflict.
389        let prior_sequence = negotiated.plan.idempotency_key.as_deref().and_then(|idem| {
390            let key = IdempotencyKey {
391                client_id: ctx.client_id.clone(),
392                adapter_id: negotiated.plan.adapter.adapter_id.clone(),
393                idempotency_key: idem.to_string(),
394            };
395            self.store.get(&key).map(|prior| prior.sequence)
396        });
397
398        // Sequence is scoped to lifeloop_run_id (= harness_run_id
399        // when supplied). Without a run id, leave sequence null.
400        let sequence = match prior_sequence {
401            Some(reused) => reused,
402            None => ctx
403                .harness_run_id
404                .as_deref()
405                .map(|run| self.sequencer.next(run)),
406        };
407
408        let receipt = LifecycleReceipt {
409            schema_version: SCHEMA_VERSION.to_string(),
410            receipt_id: ctx.receipt_id.clone(),
411            idempotency_key: negotiated.plan.idempotency_key.clone(),
412            client_id: ctx.client_id.clone(),
413            adapter_id: negotiated.plan.adapter.adapter_id.clone(),
414            invocation_id: negotiated.plan.invocation_id.clone(),
415            event: negotiated.plan.event,
416            event_id: negotiated.plan.event_id.clone(),
417            sequence,
418            parent_receipt_id: ctx.parent_receipt_id.clone(),
419            integration_mode: negotiated.plan.integration_mode,
420            status,
421            at_epoch_s: ctx.at_epoch_s,
422            harness_session_id: ctx.harness_session_id.clone(),
423            harness_run_id: ctx.harness_run_id.clone(),
424            harness_task_id: ctx.harness_task_id.clone(),
425            payload_receipts: payload_receipts_from(negotiated),
426            telemetry_summary: serde_json::Map::new(),
427            capability_degradations: negotiated.capability_degradations.clone(),
428            failure_class,
429            retry_class,
430            warnings: merged_warnings(negotiated, response),
431        };
432
433        receipt.validate()?;
434
435        match negotiated.plan.idempotency_key.as_deref() {
436            Some(idem) => {
437                let key = IdempotencyKey {
438                    client_id: ctx.client_id.clone(),
439                    adapter_id: negotiated.plan.adapter.adapter_id.clone(),
440                    idempotency_key: idem.to_string(),
441                };
442                match self.store.put(&key, &receipt)? {
443                    StoreOutcome::Inserted => Ok(receipt),
444                    StoreOutcome::Replayed(prior) => Ok(prior),
445                }
446            }
447            // No idempotency key — the receipt itself is the replay
448            // boundary; we hand it back unstored. A future durable
449            // ledger may still record it under `receipt_id`, but
450            // that is the consumer's concern.
451            None => Ok(receipt),
452        }
453    }
454}
455
456impl<S: IdempotencyStore> ReceiptEmitter for LifeloopReceiptEmitter<S> {
457    type Error = ReceiptError;
458
459    /// Validate + idempotency-check an externally-built
460    /// [`LifecycleReceipt`]. The richer
461    /// [`Self::synthesize_and_emit`] is the path issue #14 is
462    /// delivering; this trait method exists to satisfy the issue #7
463    /// seam contract for callers that already hold a built receipt.
464    fn emit(&self, receipt: &LifecycleReceipt) -> Result<(), Self::Error> {
465        receipt.validate()?;
466        if let Some(idem) = receipt.idempotency_key.as_deref() {
467            let key = IdempotencyKey {
468                client_id: receipt.client_id.clone(),
469                adapter_id: receipt.adapter_id.clone(),
470                idempotency_key: idem.to_string(),
471            };
472            self.store.put(&key, receipt)?;
473        }
474        Ok(())
475    }
476}
477
478// ===========================================================================
479// Status derivation (the public mapping table mirrored at the top of file)
480// ===========================================================================
481
482fn derive_status(
483    negotiated: &NegotiatedPlan,
484    response: &CallbackResponse,
485) -> (ReceiptStatus, Option<FailureClass>, Option<RetryClass>) {
486    use crate::NegotiationOutcome as NO;
487
488    // Capability outcomes that block dispatch always win.
489    match negotiated.outcome {
490        NO::Unsupported => {
491            let fc = negotiated
492                .failure_class
493                .unwrap_or(FailureClass::CapabilityUnsupported);
494            return (ReceiptStatus::Failed, Some(fc), Some(fc.default_retry()));
495        }
496        NO::RequiresOperator => {
497            let fc = FailureClass::OperatorRequired;
498            return (ReceiptStatus::Failed, Some(fc), Some(fc.default_retry()));
499        }
500        NO::Degraded => {
501            // Fall through to client-status-aware refinement below
502            // unless the client itself reported Failed.
503            if matches!(response.status, ReceiptStatus::Failed) {
504                let fc = response
505                    .failure_class
506                    .unwrap_or(FailureClass::InternalError);
507                let rc = response.retry_class.unwrap_or(fc.default_retry());
508                return (ReceiptStatus::Failed, Some(fc), Some(rc));
509            }
510            return (ReceiptStatus::Degraded, None, None);
511        }
512        NO::Satisfied => {}
513    }
514
515    // Satisfied negotiation: a placement decision may still have
516    // failed (e.g. payload_too_large on a non-blocking outcome —
517    // negotiation already escalates to Unsupported when this
518    // happens, but be defensive).
519    if let Some(failed) = negotiated.placement_decisions.iter().find_map(|d| match d {
520        PayloadPlacementDecision::Failed { failure_class, .. } => Some(*failure_class),
521        _ => None,
522    }) {
523        return (
524            ReceiptStatus::Failed,
525            Some(failed),
526            Some(failed.default_retry()),
527        );
528    }
529
530    // Honor the client's reported status.
531    match response.status {
532        ReceiptStatus::Failed => {
533            let fc = response
534                .failure_class
535                .unwrap_or(FailureClass::InternalError);
536            let rc = response.retry_class.unwrap_or(fc.default_retry());
537            (ReceiptStatus::Failed, Some(fc), Some(rc))
538        }
539        ReceiptStatus::Skipped => (ReceiptStatus::Skipped, None, None),
540        ReceiptStatus::Observed => (ReceiptStatus::Observed, None, None),
541        ReceiptStatus::Degraded => (ReceiptStatus::Degraded, None, None),
542        ReceiptStatus::Delivered => (ReceiptStatus::Delivered, None, None),
543    }
544}
545
546fn payload_receipts_from(negotiated: &NegotiatedPlan) -> Vec<PayloadReceipt> {
547    // Map placement decisions onto wire PayloadReceipts. Provenance
548    // comes from the payload envelope that negotiation evaluated, not
549    // from CallbackResponse::client_payloads; response payloads are
550    // client output and may be sparse or derived.
551    negotiated
552        .placement_decisions
553        .iter()
554        .map(|d| match d {
555            PayloadPlacementDecision::Chosen {
556                payload_id,
557                payload_kind,
558                byte_size,
559                content_digest,
560                chosen,
561                ..
562            } => PayloadReceipt {
563                payload_id: payload_id.clone(),
564                payload_kind: payload_kind.clone(),
565                placement: *chosen,
566                status: PlacementOutcome::Delivered,
567                byte_size: *byte_size,
568                content_digest: content_digest.clone(),
569            },
570            PayloadPlacementDecision::Failed {
571                payload_id,
572                payload_kind,
573                byte_size,
574                content_digest,
575                rejected,
576                ..
577            } => PayloadReceipt {
578                payload_id: payload_id.clone(),
579                payload_kind: payload_kind.clone(),
580                // No placement won; surface the first attempted one
581                // for diagnostics. Falls back to ReceiptOnly when
582                // there were no attempts (unusual).
583                placement: rejected
584                    .first()
585                    .map(|r| r.placement())
586                    .unwrap_or(crate::PlacementClass::ReceiptOnly),
587                status: PlacementOutcome::Failed,
588                byte_size: *byte_size,
589                content_digest: content_digest.clone(),
590            },
591        })
592        .collect()
593}
594
595fn merged_warnings(
596    negotiated: &NegotiatedPlan,
597    response: &CallbackResponse,
598) -> Vec<crate::Warning> {
599    let mut out = negotiated.warnings.clone();
600    out.extend(response.warnings.iter().cloned());
601    out
602}