mako_engine/envelope.rs
1//! The [`EventEnvelope`] — the standard wrapper for every persisted event.
2//!
3//! Infrastructure metadata (routing, tracing, auditing) lives in the envelope.
4//! The business payload is opaque JSON, allowing each domain crate to own its
5//! own schema without leaking it into the engine layer.
6//!
7//! # Write / read split
8//!
9//! - [`NewEvent`] is what the caller submits to [`EventStore::append`][crate::event_store::EventStore::append]. It
10//! contains everything the *caller* knows: context IDs, the typed payload.
11//! It intentionally **omits** `event_id`, `stream_id`, `sequence_number`,
12//! and `timestamp` — the store assigns those atomically during the append.
13//!
14//! - [`EventEnvelope`] is what the store returns and persists. It is the
15//! complete, immutable record.
16
17use serde_json::Value;
18use time::OffsetDateTime;
19
20use crate::ids::{
21 CausationId, ConversationId, CorrelationId, EventId, ProcessId, StreamId, TenantId,
22};
23use crate::version::WorkflowId;
24
25// ── NewEvent ──────────────────────────────────────────────────────────────────
26
27/// A pending event ready to be appended to a stream.
28///
29/// The caller constructs a `NewEvent` for each domain event produced by a
30/// workflow command. Fields that the store assigns (`event_id`,
31/// `sequence_number`, `timestamp`, `stream_id`) are absent.
32///
33/// ## Idiomatic construction
34///
35/// Prefer [`CommandContext::new_event`][crate::workflow::CommandContext::new_event]
36/// inside workflow handlers and transport adapters — it propagates all
37/// correlation IDs from the command context automatically:
38///
39/// ```rust,ignore
40/// // Inside a MessageAdapter or test:
41/// let new_event = ctx.new_event(&SupplierChangeEvent::Initiated { .. })?;
42/// store.append(&stream_id, ExpectedVersion::Any, &[new_event]).await?;
43/// ```
44///
45/// Use [`EventEnvelope::new_caused_event`] when building a follow-up event
46/// causally linked to a prior persisted event.
47///
48/// For test scaffolding that needs a `NewEvent` without a typed payload or
49/// context, use [`NewEvent::new`]. The `#[non_exhaustive]` attribute
50/// future-proofs callers against new optional fields being added without
51/// requiring a semver-breaking change.
52#[derive(Debug, Clone)]
53#[non_exhaustive]
54pub struct NewEvent {
55 /// Groups all events that originate from the same root command.
56 pub correlation_id: CorrelationId,
57 /// The event or command that directly caused this event, if any.
58 pub causation_id: Option<CausationId>,
59 /// Links events belonging to the same business conversation.
60 pub conversation_id: ConversationId,
61 /// Stable identifier for the MaKo process instance.
62 pub process_id: ProcessId,
63 /// Tenant that owns this event.
64 pub tenant_id: TenantId,
65 /// Workflow definition that produced this event.
66 pub workflow_id: WorkflowId,
67 /// Stable, human-readable type discriminant (e.g. `"SupplierChangeInitiated"`).
68 pub event_type: Box<str>,
69 /// Schema version of the serialized payload.
70 pub schema_version: u32,
71 /// The domain event payload, serialized as JSON.
72 pub payload: Value,
73}
74
75impl NewEvent {
76 /// Construct a `NewEvent` from its constituent parts.
77 ///
78 /// This is the escape hatch for callers that need full control over all
79 /// fields (e.g. test scaffolding, migration tooling, storage-layer tests).
80 /// In application code, prefer
81 /// [`CommandContext::new_event`][crate::workflow::CommandContext::new_event]
82 /// which propagates correlation metadata automatically.
83 #[allow(clippy::too_many_arguments)]
84 #[must_use]
85 pub fn new(
86 correlation_id: CorrelationId,
87 causation_id: Option<CausationId>,
88 conversation_id: ConversationId,
89 process_id: ProcessId,
90 tenant_id: TenantId,
91 workflow_id: WorkflowId,
92 event_type: impl Into<Box<str>>,
93 schema_version: u32,
94 payload: Value,
95 ) -> Self {
96 Self {
97 correlation_id,
98 causation_id,
99 conversation_id,
100 process_id,
101 tenant_id,
102 workflow_id,
103 event_type: event_type.into(),
104 schema_version,
105 payload,
106 }
107 }
108}
109
110// ── EventEnvelope ─────────────────────────────────────────────────────────────
111
112/// A single persisted event, wrapped in engine-level metadata.
113///
114/// The envelope separates **infrastructure concerns** (identity, ordering,
115/// tracing) from **domain concerns** (the business event payload). The payload
116/// is stored as [`serde_json::Value`] so the engine remains domain-agnostic.
117///
118/// ## Sequence numbers
119///
120/// `sequence_number` is 1-based and monotonically increasing **per stream**.
121/// It is assigned by the [`EventStore`] during [`EventStore::append`] and
122/// must not be set by callers.
123///
124/// ## Immutability
125///
126/// Once persisted, an envelope is never modified. Corrections are modelled as
127/// new events.
128///
129/// [`EventStore`]: crate::event_store::EventStore
130/// [`EventStore::append`]: crate::event_store::EventStore::append
131#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
132pub struct EventEnvelope {
133 /// Globally unique event instance identifier (assigned by store).
134 pub event_id: EventId,
135
136 /// The stream this event belongs to (e.g. `process/xxxxxxxx`).
137 pub stream_id: StreamId,
138
139 /// 1-based monotonic position within the stream (assigned by store).
140 pub sequence_number: u64,
141
142 /// Wall-clock time at which the event was appended to the store (assigned by store).
143 #[serde(with = "time::serde::rfc3339")]
144 pub timestamp: OffsetDateTime,
145
146 /// Groups all events that originate from the same root command.
147 pub correlation_id: CorrelationId,
148
149 /// The event or command that directly caused this event, if any.
150 pub causation_id: Option<CausationId>,
151
152 /// Links events belonging to the same business conversation
153 /// (e.g. a UTILMD exchange and its APERAK acknowledgement).
154 pub conversation_id: ConversationId,
155
156 /// Stable identifier for the MaKo process instance that owns this stream.
157 pub process_id: ProcessId,
158
159 /// Tenant that owns this event (market participant or deployment tenant).
160 pub tenant_id: TenantId,
161
162 /// Identifies the workflow definition (name + BDEW format version) that
163 /// produced this event.
164 pub workflow_id: WorkflowId,
165
166 /// Stable, human-readable type discriminant for the domain event
167 /// (e.g. `"SupplierChangeInitiated"`).
168 ///
169 /// Used for projection routing and observability without deserializing
170 /// the full payload.
171 pub event_type: Box<str>,
172
173 /// Schema version of the serialized payload. Increment when the payload
174 /// structure changes to enable upcasting during replay.
175 pub schema_version: u32,
176
177 /// The domain event payload, serialized as JSON.
178 pub payload: Value,
179}
180
181impl EventEnvelope {
182 /// Deserialize the payload into a typed domain event.
183 ///
184 /// Clones the [`serde_json::Value`] payload and deserializes `T` from it
185 /// directly. Use [`EventEnvelope::decode_owned`] to consume the envelope
186 /// and avoid the clone when the envelope is no longer needed.
187 ///
188 /// # Errors
189 ///
190 /// Returns a [`serde_json::Error`] when deserialization fails.
191 pub fn decode<T: serde::de::DeserializeOwned>(&self) -> Result<T, serde_json::Error> {
192 serde_json::from_value(self.payload.clone())
193 }
194
195 /// Deserialize the payload into a typed domain event, consuming the
196 /// envelope to avoid an extra clone.
197 ///
198 /// Prefer this over [`EventEnvelope::decode`] when you no longer need
199 /// access to the envelope after decoding (e.g. in one-shot projection
200 /// handlers that transform each event exactly once).
201 ///
202 /// # Errors
203 ///
204 /// Returns a [`serde_json::Error`] when deserialization fails.
205 pub fn decode_owned<T: serde::de::DeserializeOwned>(self) -> Result<T, serde_json::Error> {
206 serde_json::from_value(self.payload)
207 }
208
209 /// Construct an envelope from a [`NewEvent`] with store-assigned fields.
210 ///
211 /// Called internally by [`EventStore`] implementations during append.
212 ///
213 /// [`EventStore`]: crate::event_store::EventStore
214 #[must_use]
215 pub fn from_new(
216 new: NewEvent,
217 stream_id: StreamId,
218 sequence_number: u64,
219 timestamp: OffsetDateTime,
220 ) -> Self {
221 Self {
222 event_id: EventId::new(),
223 stream_id,
224 sequence_number,
225 timestamp,
226 correlation_id: new.correlation_id,
227 causation_id: new.causation_id,
228 conversation_id: new.conversation_id,
229 process_id: new.process_id,
230 tenant_id: new.tenant_id,
231 workflow_id: new.workflow_id,
232 event_type: new.event_type,
233 schema_version: new.schema_version,
234 payload: new.payload,
235 }
236 }
237}