Skip to main content

mako_engine/
envelope.rs

1//! The [`EventEnvelope`] — the standard wrapper for every persisted event.
2//!
3//! Infrastructure metadata (routing, tracing, auditing) lives in the envelope.
4//! The business payload is opaque JSON, allowing each domain crate to own its
5//! own schema without leaking it into the engine layer.
6//!
7//! # Write / read split
8//!
9//! - [`NewEvent`] is what the caller submits to [`EventStore::append`][crate::event_store::EventStore::append]. It
10//!   contains everything the *caller* knows: context IDs, the typed payload.
11//!   It intentionally **omits** `event_id`, `stream_id`, `sequence_number`,
12//!   and `timestamp` — the store assigns those atomically during the append.
13//!
14//! - [`EventEnvelope`] is what the store returns and persists. It is the
15//!   complete, immutable record.
16
17use serde_json::Value;
18use time::OffsetDateTime;
19
20use crate::ids::{
21    CausationId, ConversationId, CorrelationId, EventId, ProcessId, StreamId, TenantId,
22};
23use crate::version::WorkflowId;
24
25// ── NewEvent ──────────────────────────────────────────────────────────────────
26
27/// A pending event ready to be appended to a stream.
28///
29/// The caller constructs a `NewEvent` for each domain event produced by a
30/// workflow command. Fields that the store assigns (`event_id`,
31/// `sequence_number`, `timestamp`, `stream_id`) are absent.
32///
33/// ## Idiomatic construction
34///
35/// Prefer [`CommandContext::new_event`][crate::workflow::CommandContext::new_event]
36/// inside workflow handlers and transport adapters — it propagates all
37/// correlation IDs from the command context automatically:
38///
39/// ```rust,ignore
40/// // Inside a MessageAdapter or test:
41/// let new_event = ctx.new_event(&SupplierChangeEvent::Initiated { .. })?;
42/// store.append(&stream_id, ExpectedVersion::Any, &[new_event]).await?;
43/// ```
44///
45/// Use [`EventEnvelope::new_caused_event`] when building a follow-up event
46/// causally linked to a prior persisted event.
47///
48/// For test scaffolding that needs a `NewEvent` without a typed payload or
49/// context, use [`NewEvent::new`].  The `#[non_exhaustive]` attribute
50/// future-proofs callers against new optional fields being added without
51/// requiring a semver-breaking change.
52#[derive(Debug, Clone)]
53#[non_exhaustive]
54pub struct NewEvent {
55    /// Groups all events that originate from the same root command.
56    pub correlation_id: CorrelationId,
57    /// The event or command that directly caused this event, if any.
58    pub causation_id: Option<CausationId>,
59    /// Links events belonging to the same business conversation.
60    pub conversation_id: ConversationId,
61    /// Stable identifier for the MaKo process instance.
62    pub process_id: ProcessId,
63    /// Tenant that owns this event.
64    pub tenant_id: TenantId,
65    /// Workflow definition that produced this event.
66    pub workflow_id: WorkflowId,
67    /// Stable, human-readable type discriminant (e.g. `"SupplierChangeInitiated"`).
68    pub event_type: Box<str>,
69    /// Schema version of the serialized payload.
70    pub schema_version: u32,
71    /// The domain event payload, serialized as JSON.
72    pub payload: Value,
73}
74
75impl NewEvent {
76    /// Construct a `NewEvent` from its constituent parts.
77    ///
78    /// This is the escape hatch for callers that need full control over all
79    /// fields (e.g. test scaffolding, migration tooling, storage-layer tests).
80    /// In application code, prefer
81    /// [`CommandContext::new_event`][crate::workflow::CommandContext::new_event]
82    /// which propagates correlation metadata automatically.
83    #[allow(clippy::too_many_arguments)]
84    #[must_use]
85    pub fn new(
86        correlation_id: CorrelationId,
87        causation_id: Option<CausationId>,
88        conversation_id: ConversationId,
89        process_id: ProcessId,
90        tenant_id: TenantId,
91        workflow_id: WorkflowId,
92        event_type: impl Into<Box<str>>,
93        schema_version: u32,
94        payload: Value,
95    ) -> Self {
96        Self {
97            correlation_id,
98            causation_id,
99            conversation_id,
100            process_id,
101            tenant_id,
102            workflow_id,
103            event_type: event_type.into(),
104            schema_version,
105            payload,
106        }
107    }
108}
109
110// ── EventEnvelope ─────────────────────────────────────────────────────────────
111
112/// A single persisted event, wrapped in engine-level metadata.
113///
114/// The envelope separates **infrastructure concerns** (identity, ordering,
115/// tracing) from **domain concerns** (the business event payload). The payload
116/// is stored as [`serde_json::Value`] so the engine remains domain-agnostic.
117///
118/// ## Sequence numbers
119///
120/// `sequence_number` is 1-based and monotonically increasing **per stream**.
121/// It is assigned by the [`EventStore`] during [`EventStore::append`] and
122/// must not be set by callers.
123///
124/// ## Immutability
125///
126/// Once persisted, an envelope is never modified. Corrections are modelled as
127/// new events.
128///
129/// [`EventStore`]: crate::event_store::EventStore
130/// [`EventStore::append`]: crate::event_store::EventStore::append
131#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
132pub struct EventEnvelope {
133    /// Globally unique event instance identifier (assigned by store).
134    pub event_id: EventId,
135
136    /// The stream this event belongs to (e.g. `process/xxxxxxxx`).
137    pub stream_id: StreamId,
138
139    /// 1-based monotonic position within the stream (assigned by store).
140    pub sequence_number: u64,
141
142    /// Wall-clock time at which the event was appended to the store (assigned by store).
143    #[serde(with = "time::serde::rfc3339")]
144    pub timestamp: OffsetDateTime,
145
146    /// Groups all events that originate from the same root command.
147    pub correlation_id: CorrelationId,
148
149    /// The event or command that directly caused this event, if any.
150    pub causation_id: Option<CausationId>,
151
152    /// Links events belonging to the same business conversation
153    /// (e.g. a UTILMD exchange and its APERAK acknowledgement).
154    pub conversation_id: ConversationId,
155
156    /// Stable identifier for the MaKo process instance that owns this stream.
157    pub process_id: ProcessId,
158
159    /// Tenant that owns this event (market participant or deployment tenant).
160    pub tenant_id: TenantId,
161
162    /// Identifies the workflow definition (name + BDEW format version) that
163    /// produced this event.
164    pub workflow_id: WorkflowId,
165
166    /// Stable, human-readable type discriminant for the domain event
167    /// (e.g. `"SupplierChangeInitiated"`).
168    ///
169    /// Used for projection routing and observability without deserializing
170    /// the full payload.
171    pub event_type: Box<str>,
172
173    /// Schema version of the serialized payload. Increment when the payload
174    /// structure changes to enable upcasting during replay.
175    pub schema_version: u32,
176
177    /// The domain event payload, serialized as JSON.
178    pub payload: Value,
179}
180
181impl EventEnvelope {
182    /// Deserialize the payload into a typed domain event.
183    ///
184    /// Clones the [`serde_json::Value`] payload and deserializes `T` from it
185    /// directly.  Use [`EventEnvelope::decode_owned`] to consume the envelope
186    /// and avoid the clone when the envelope is no longer needed.
187    ///
188    /// # Errors
189    ///
190    /// Returns a [`serde_json::Error`] when deserialization fails.
191    pub fn decode<T: serde::de::DeserializeOwned>(&self) -> Result<T, serde_json::Error> {
192        serde_json::from_value(self.payload.clone())
193    }
194
195    /// Deserialize the payload into a typed domain event, consuming the
196    /// envelope to avoid an extra clone.
197    ///
198    /// Prefer this over [`EventEnvelope::decode`] when you no longer need
199    /// access to the envelope after decoding (e.g. in one-shot projection
200    /// handlers that transform each event exactly once).
201    ///
202    /// # Errors
203    ///
204    /// Returns a [`serde_json::Error`] when deserialization fails.
205    pub fn decode_owned<T: serde::de::DeserializeOwned>(self) -> Result<T, serde_json::Error> {
206        serde_json::from_value(self.payload)
207    }
208
209    /// Construct an envelope from a [`NewEvent`] with store-assigned fields.
210    ///
211    /// Called internally by [`EventStore`] implementations during append.
212    ///
213    /// [`EventStore`]: crate::event_store::EventStore
214    #[must_use]
215    pub fn from_new(
216        new: NewEvent,
217        stream_id: StreamId,
218        sequence_number: u64,
219        timestamp: OffsetDateTime,
220    ) -> Self {
221        Self {
222            event_id: EventId::new(),
223            stream_id,
224            sequence_number,
225            timestamp,
226            correlation_id: new.correlation_id,
227            causation_id: new.causation_id,
228            conversation_id: new.conversation_id,
229            process_id: new.process_id,
230            tenant_id: new.tenant_id,
231            workflow_id: new.workflow_id,
232            event_type: new.event_type,
233            schema_version: new.schema_version,
234            payload: new.payload,
235        }
236    }
237}