qubit_event_bus/core/event_envelope.rs
1/*******************************************************************************
2 *
3 * Copyright (c) 2026 Haixing Hu.
4 *
5 * SPDX-License-Identifier: Apache-2.0
6 *
7 * Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10//! Standard event envelope.
11// qubit-style: allow multiple-public-types
12
13use std::collections::HashMap;
14use std::sync::atomic::{
15 AtomicU64,
16 Ordering,
17};
18use std::time::{
19 Duration,
20 SystemTime,
21};
22
23use crate::{
24 Acknowledgement,
25 EventEnvelopeBuilder,
26 Topic,
27};
28
29static NEXT_EVENT_ID: AtomicU64 = AtomicU64::new(1);
30
31/// Standard message structure flowing through the event bus.
32#[derive(Debug, Clone)]
33pub struct EventEnvelope<T: 'static> {
34 id: String,
35 topic: Topic<T>,
36 payload: T,
37 headers: HashMap<String, String>,
38 ordering_key: Option<String>,
39 timestamp: SystemTime,
40 delay: Option<Duration>,
41 acknowledgement: Option<Acknowledgement>,
42 dead_letter: bool,
43}
44
45/// Type-erased event metadata exposed to global interceptors.
46#[derive(Debug, Clone)]
47pub struct EventEnvelopeMetadata {
48 id: String,
49 topic_name: String,
50 payload_type_name: &'static str,
51 headers: HashMap<String, String>,
52 ordering_key: Option<String>,
53 timestamp: SystemTime,
54 delay: Option<Duration>,
55 dead_letter: bool,
56}
57
58impl EventEnvelopeMetadata {
59 /// Returns the event ID.
60 ///
61 /// # Returns
62 /// Stable event identifier.
63 pub fn id(&self) -> &str {
64 &self.id
65 }
66
67 /// Returns the topic name.
68 ///
69 /// # Returns
70 /// Topic name without payload type information.
71 pub fn topic_name(&self) -> &str {
72 &self.topic_name
73 }
74
75 /// Returns the Rust payload type name.
76 ///
77 /// # Returns
78 /// Fully qualified payload type name.
79 pub fn payload_type_name(&self) -> &'static str {
80 self.payload_type_name
81 }
82
83 /// Returns event headers.
84 ///
85 /// # Returns
86 /// Immutable header map.
87 pub fn headers(&self) -> &HashMap<String, String> {
88 &self.headers
89 }
90
91 /// Returns the optional ordering key.
92 ///
93 /// # Returns
94 /// `Some` when an ordering key was configured.
95 pub fn ordering_key(&self) -> Option<&str> {
96 self.ordering_key.as_deref()
97 }
98
99 /// Returns event creation timestamp.
100 ///
101 /// # Returns
102 /// Timestamp assigned when the envelope was built.
103 pub fn timestamp(&self) -> SystemTime {
104 self.timestamp
105 }
106
107 /// Returns optional delivery delay.
108 ///
109 /// # Returns
110 /// `Some` when delayed delivery metadata was configured.
111 pub fn delay(&self) -> Option<Duration> {
112 self.delay
113 }
114
115 /// Returns whether this metadata represents a dead letter.
116 ///
117 /// # Returns
118 /// `true` if the source envelope is already a dead letter.
119 pub fn is_dead_letter(&self) -> bool {
120 self.dead_letter
121 }
122
123 /// Adds or replaces one header.
124 ///
125 /// # Parameters
126 /// - `key`: Header key.
127 /// - `value`: Header value converted to string.
128 ///
129 /// # Returns
130 /// Updated metadata.
131 pub fn with_header(mut self, key: &str, value: impl ToString) -> Self {
132 self.headers.insert(key.to_string(), value.to_string());
133 self
134 }
135
136 /// Removes one header.
137 ///
138 /// # Parameters
139 /// - `key`: Header key to remove.
140 ///
141 /// # Returns
142 /// Updated metadata.
143 pub fn without_header(mut self, key: &str) -> Self {
144 self.headers.remove(key);
145 self
146 }
147
148 /// Sets the ordering key.
149 ///
150 /// # Parameters
151 /// - `ordering_key`: Ordering key used by supporting backends.
152 ///
153 /// # Returns
154 /// Updated metadata.
155 pub fn with_ordering_key(mut self, ordering_key: &str) -> Self {
156 self.ordering_key = Some(ordering_key.to_string());
157 self
158 }
159
160 /// Clears the ordering key.
161 ///
162 /// # Returns
163 /// Updated metadata without an ordering key.
164 pub fn without_ordering_key(mut self) -> Self {
165 self.ordering_key = None;
166 self
167 }
168
169 /// Sets delayed delivery metadata.
170 ///
171 /// # Parameters
172 /// - `delay`: Requested delivery delay.
173 ///
174 /// # Returns
175 /// Updated metadata.
176 pub fn with_delay(mut self, delay: Duration) -> Self {
177 self.delay = Some(delay);
178 self
179 }
180
181 /// Clears delayed delivery metadata.
182 ///
183 /// # Returns
184 /// Updated metadata without a delay.
185 pub fn without_delay(mut self) -> Self {
186 self.delay = None;
187 self
188 }
189}
190
191impl<T: 'static> EventEnvelope<T> {
192 /// Creates a new event envelope with generated ID and current timestamp.
193 ///
194 /// # Parameters
195 /// - `topic`: Topic associated with the payload.
196 /// - `payload`: Business payload.
197 ///
198 /// # Returns
199 /// A new event envelope with empty headers.
200 pub fn create(topic: Topic<T>, payload: T) -> Self {
201 Self {
202 id: generate_event_id(),
203 topic,
204 payload,
205 headers: HashMap::new(),
206 ordering_key: None,
207 timestamp: SystemTime::now(),
208 delay: None,
209 acknowledgement: None,
210 dead_letter: false,
211 }
212 }
213
214 /// Creates an event envelope builder.
215 ///
216 /// # Returns
217 /// A builder with generated ID and current timestamp defaults.
218 pub fn builder() -> EventEnvelopeBuilder<T> {
219 EventEnvelopeBuilder::new()
220 }
221
222 /// Creates an event envelope from validated builder fields.
223 ///
224 /// # Parameters
225 /// - `builder`: Builder with all required fields present.
226 ///
227 /// # Returns
228 /// An immutable event envelope.
229 pub(crate) fn from_builder(builder: EventEnvelopeBuilder<T>) -> Self {
230 Self {
231 id: builder.id,
232 topic: builder
233 .topic
234 .expect("validated builder should contain a topic"),
235 payload: builder
236 .payload
237 .expect("validated builder should contain a payload"),
238 headers: builder.headers,
239 ordering_key: builder.ordering_key,
240 timestamp: builder.timestamp,
241 delay: builder.delay,
242 acknowledgement: builder.acknowledgement,
243 dead_letter: builder.dead_letter,
244 }
245 }
246
247 /// Returns the event ID.
248 ///
249 /// # Returns
250 /// Stable event identifier.
251 pub fn id(&self) -> &str {
252 &self.id
253 }
254
255 /// Returns the event topic.
256 ///
257 /// # Returns
258 /// Type-safe topic metadata.
259 pub fn topic(&self) -> &Topic<T> {
260 &self.topic
261 }
262
263 /// Returns the event payload.
264 ///
265 /// # Returns
266 /// Immutable payload reference.
267 pub fn payload(&self) -> &T {
268 &self.payload
269 }
270
271 /// Returns event headers.
272 ///
273 /// # Returns
274 /// Immutable header map.
275 pub fn headers(&self) -> &HashMap<String, String> {
276 &self.headers
277 }
278
279 /// Returns type-erased metadata for global interception.
280 ///
281 /// # Returns
282 /// Cloned event metadata without exposing the typed payload.
283 pub fn metadata(&self) -> EventEnvelopeMetadata {
284 EventEnvelopeMetadata {
285 id: self.id.clone(),
286 topic_name: self.topic.name().to_string(),
287 payload_type_name: self.topic.payload_type_name(),
288 headers: self.headers.clone(),
289 ordering_key: self.ordering_key.clone(),
290 timestamp: self.timestamp,
291 delay: self.delay,
292 dead_letter: self.dead_letter,
293 }
294 }
295
296 /// Returns the optional ordering key.
297 ///
298 /// # Returns
299 /// `Some` when an ordering key was configured.
300 pub fn ordering_key(&self) -> Option<&str> {
301 self.ordering_key.as_deref()
302 }
303
304 /// Returns event creation timestamp.
305 ///
306 /// # Returns
307 /// Timestamp assigned when the envelope was built.
308 pub fn timestamp(&self) -> SystemTime {
309 self.timestamp
310 }
311
312 /// Returns optional delivery delay.
313 ///
314 /// # Returns
315 /// `Some` when delayed delivery metadata was configured.
316 pub fn delay(&self) -> Option<Duration> {
317 self.delay
318 }
319
320 /// Returns optional acknowledgement handle.
321 ///
322 /// # Returns
323 /// `Some` for envelopes delivered to subscriber handlers.
324 pub fn acknowledgement(&self) -> Option<&Acknowledgement> {
325 self.acknowledgement.as_ref()
326 }
327
328 /// Returns whether this envelope represents a dead letter.
329 ///
330 /// # Returns
331 /// `true` if the envelope has already been routed to a dead-letter flow.
332 pub fn is_dead_letter(&self) -> bool {
333 self.dead_letter
334 }
335
336 /// Adds or replaces one header.
337 ///
338 /// # Parameters
339 /// - `key`: Header key.
340 /// - `value`: Header value converted to string.
341 ///
342 /// # Returns
343 /// Updated envelope.
344 pub fn with_header(mut self, key: impl Into<String>, value: impl ToString) -> Self {
345 self.headers.insert(key.into(), value.to_string());
346 self
347 }
348
349 /// Sets the ordering key.
350 ///
351 /// # Parameters
352 /// - `ordering_key`: Ordering key used by backends that support ordering.
353 ///
354 /// # Returns
355 /// Updated envelope.
356 pub fn with_ordering_key(mut self, ordering_key: impl Into<String>) -> Self {
357 self.ordering_key = Some(ordering_key.into());
358 self
359 }
360
361 /// Sets delayed delivery metadata.
362 ///
363 /// # Parameters
364 /// - `delay`: Requested delivery delay.
365 ///
366 /// # Returns
367 /// Updated envelope.
368 pub fn with_delay(mut self, delay: Duration) -> Self {
369 self.delay = Some(delay);
370 self
371 }
372
373 /// Injects an acknowledgement handle.
374 ///
375 /// # Parameters
376 /// - `acknowledgement`: Handle shared with processing code.
377 ///
378 /// # Returns
379 /// Updated envelope.
380 pub fn with_acknowledgement(mut self, acknowledgement: Acknowledgement) -> Self {
381 self.acknowledgement = Some(acknowledgement);
382 self
383 }
384
385 /// Marks the envelope as a dead letter.
386 ///
387 /// # Returns
388 /// Updated envelope with dead-letter marker enabled.
389 pub fn as_dead_letter(mut self) -> Self {
390 self.dead_letter = true;
391 self
392 }
393
394 /// Applies mutable metadata fields returned by a global interceptor.
395 pub(crate) fn apply_metadata(&mut self, metadata: EventEnvelopeMetadata) {
396 self.headers = metadata.headers;
397 self.ordering_key = metadata.ordering_key;
398 self.delay = metadata.delay;
399 }
400}
401
402/// Generates a process-local event ID.
403///
404/// # Returns
405/// Monotonic event ID string.
406pub(crate) fn generate_event_id() -> String {
407 let id = NEXT_EVENT_ID.fetch_add(1, Ordering::SeqCst);
408 format!("event-{id}")
409}