Skip to main content

qubit_event_bus/core/
event_envelope.rs

1/*******************************************************************************
2 *
3 *    Copyright (c) 2026 Haixing Hu.
4 *
5 *    SPDX-License-Identifier: Apache-2.0
6 *
7 *    Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10//! Standard event envelope.
11// qubit-style: allow multiple-public-types
12
13use std::collections::HashMap;
14use std::sync::atomic::{
15    AtomicU64,
16    Ordering,
17};
18use std::time::{
19    Duration,
20    SystemTime,
21};
22
23use crate::{
24    Acknowledgement,
25    EventEnvelopeBuilder,
26    Topic,
27};
28
29static NEXT_EVENT_ID: AtomicU64 = AtomicU64::new(1);
30
31/// Standard message structure flowing through the event bus.
32#[derive(Debug, Clone)]
33pub struct EventEnvelope<T: 'static> {
34    id: String,
35    topic: Topic<T>,
36    payload: T,
37    headers: HashMap<String, String>,
38    ordering_key: Option<String>,
39    timestamp: SystemTime,
40    delay: Option<Duration>,
41    acknowledgement: Option<Acknowledgement>,
42    dead_letter: bool,
43}
44
45/// Type-erased event metadata exposed to global interceptors.
46#[derive(Debug, Clone)]
47pub struct EventEnvelopeMetadata {
48    id: String,
49    topic_name: String,
50    payload_type_name: &'static str,
51    headers: HashMap<String, String>,
52    ordering_key: Option<String>,
53    timestamp: SystemTime,
54    delay: Option<Duration>,
55    dead_letter: bool,
56}
57
58impl EventEnvelopeMetadata {
59    /// Returns the event ID.
60    ///
61    /// # Returns
62    /// Stable event identifier.
63    pub fn id(&self) -> &str {
64        &self.id
65    }
66
67    /// Returns the topic name.
68    ///
69    /// # Returns
70    /// Topic name without payload type information.
71    pub fn topic_name(&self) -> &str {
72        &self.topic_name
73    }
74
75    /// Returns the Rust payload type name.
76    ///
77    /// # Returns
78    /// Fully qualified payload type name.
79    pub fn payload_type_name(&self) -> &'static str {
80        self.payload_type_name
81    }
82
83    /// Returns event headers.
84    ///
85    /// # Returns
86    /// Immutable header map.
87    pub fn headers(&self) -> &HashMap<String, String> {
88        &self.headers
89    }
90
91    /// Returns the optional ordering key.
92    ///
93    /// # Returns
94    /// `Some` when an ordering key was configured.
95    pub fn ordering_key(&self) -> Option<&str> {
96        self.ordering_key.as_deref()
97    }
98
99    /// Returns event creation timestamp.
100    ///
101    /// # Returns
102    /// Timestamp assigned when the envelope was built.
103    pub fn timestamp(&self) -> SystemTime {
104        self.timestamp
105    }
106
107    /// Returns optional delivery delay.
108    ///
109    /// # Returns
110    /// `Some` when delayed delivery metadata was configured.
111    pub fn delay(&self) -> Option<Duration> {
112        self.delay
113    }
114
115    /// Returns whether this metadata represents a dead letter.
116    ///
117    /// # Returns
118    /// `true` if the source envelope is already a dead letter.
119    pub fn is_dead_letter(&self) -> bool {
120        self.dead_letter
121    }
122
123    /// Adds or replaces one header.
124    ///
125    /// # Parameters
126    /// - `key`: Header key.
127    /// - `value`: Header value converted to string.
128    ///
129    /// # Returns
130    /// Updated metadata.
131    pub fn with_header(mut self, key: &str, value: impl ToString) -> Self {
132        self.headers.insert(key.to_string(), value.to_string());
133        self
134    }
135
136    /// Removes one header.
137    ///
138    /// # Parameters
139    /// - `key`: Header key to remove.
140    ///
141    /// # Returns
142    /// Updated metadata.
143    pub fn without_header(mut self, key: &str) -> Self {
144        self.headers.remove(key);
145        self
146    }
147
148    /// Sets the ordering key.
149    ///
150    /// # Parameters
151    /// - `ordering_key`: Ordering key used by supporting backends.
152    ///
153    /// # Returns
154    /// Updated metadata.
155    pub fn with_ordering_key(mut self, ordering_key: &str) -> Self {
156        self.ordering_key = Some(ordering_key.to_string());
157        self
158    }
159
160    /// Clears the ordering key.
161    ///
162    /// # Returns
163    /// Updated metadata without an ordering key.
164    pub fn without_ordering_key(mut self) -> Self {
165        self.ordering_key = None;
166        self
167    }
168
169    /// Sets delayed delivery metadata.
170    ///
171    /// # Parameters
172    /// - `delay`: Requested delivery delay.
173    ///
174    /// # Returns
175    /// Updated metadata.
176    pub fn with_delay(mut self, delay: Duration) -> Self {
177        self.delay = Some(delay);
178        self
179    }
180
181    /// Clears delayed delivery metadata.
182    ///
183    /// # Returns
184    /// Updated metadata without a delay.
185    pub fn without_delay(mut self) -> Self {
186        self.delay = None;
187        self
188    }
189}
190
191impl<T: 'static> EventEnvelope<T> {
192    /// Creates a new event envelope with generated ID and current timestamp.
193    ///
194    /// # Parameters
195    /// - `topic`: Topic associated with the payload.
196    /// - `payload`: Business payload.
197    ///
198    /// # Returns
199    /// A new event envelope with empty headers.
200    pub fn create(topic: Topic<T>, payload: T) -> Self {
201        Self {
202            id: generate_event_id(),
203            topic,
204            payload,
205            headers: HashMap::new(),
206            ordering_key: None,
207            timestamp: SystemTime::now(),
208            delay: None,
209            acknowledgement: None,
210            dead_letter: false,
211        }
212    }
213
214    /// Creates an event envelope builder.
215    ///
216    /// # Returns
217    /// A builder with generated ID and current timestamp defaults.
218    pub fn builder() -> EventEnvelopeBuilder<T> {
219        EventEnvelopeBuilder::new()
220    }
221
222    /// Creates an event envelope from validated builder fields.
223    ///
224    /// # Parameters
225    /// - `builder`: Builder with all required fields present.
226    ///
227    /// # Returns
228    /// An immutable event envelope.
229    pub(crate) fn from_builder(builder: EventEnvelopeBuilder<T>) -> Self {
230        Self {
231            id: builder.id,
232            topic: builder
233                .topic
234                .expect("validated builder should contain a topic"),
235            payload: builder
236                .payload
237                .expect("validated builder should contain a payload"),
238            headers: builder.headers,
239            ordering_key: builder.ordering_key,
240            timestamp: builder.timestamp,
241            delay: builder.delay,
242            acknowledgement: builder.acknowledgement,
243            dead_letter: builder.dead_letter,
244        }
245    }
246
247    /// Returns the event ID.
248    ///
249    /// # Returns
250    /// Stable event identifier.
251    pub fn id(&self) -> &str {
252        &self.id
253    }
254
255    /// Returns the event topic.
256    ///
257    /// # Returns
258    /// Type-safe topic metadata.
259    pub fn topic(&self) -> &Topic<T> {
260        &self.topic
261    }
262
263    /// Returns the event payload.
264    ///
265    /// # Returns
266    /// Immutable payload reference.
267    pub fn payload(&self) -> &T {
268        &self.payload
269    }
270
271    /// Returns event headers.
272    ///
273    /// # Returns
274    /// Immutable header map.
275    pub fn headers(&self) -> &HashMap<String, String> {
276        &self.headers
277    }
278
279    /// Returns type-erased metadata for global interception.
280    ///
281    /// # Returns
282    /// Cloned event metadata without exposing the typed payload.
283    pub fn metadata(&self) -> EventEnvelopeMetadata {
284        EventEnvelopeMetadata {
285            id: self.id.clone(),
286            topic_name: self.topic.name().to_string(),
287            payload_type_name: self.topic.payload_type_name(),
288            headers: self.headers.clone(),
289            ordering_key: self.ordering_key.clone(),
290            timestamp: self.timestamp,
291            delay: self.delay,
292            dead_letter: self.dead_letter,
293        }
294    }
295
296    /// Returns the optional ordering key.
297    ///
298    /// # Returns
299    /// `Some` when an ordering key was configured.
300    pub fn ordering_key(&self) -> Option<&str> {
301        self.ordering_key.as_deref()
302    }
303
304    /// Returns event creation timestamp.
305    ///
306    /// # Returns
307    /// Timestamp assigned when the envelope was built.
308    pub fn timestamp(&self) -> SystemTime {
309        self.timestamp
310    }
311
312    /// Returns optional delivery delay.
313    ///
314    /// # Returns
315    /// `Some` when delayed delivery metadata was configured.
316    pub fn delay(&self) -> Option<Duration> {
317        self.delay
318    }
319
320    /// Returns optional acknowledgement handle.
321    ///
322    /// # Returns
323    /// `Some` for envelopes delivered to subscriber handlers.
324    pub fn acknowledgement(&self) -> Option<&Acknowledgement> {
325        self.acknowledgement.as_ref()
326    }
327
328    /// Returns whether this envelope represents a dead letter.
329    ///
330    /// # Returns
331    /// `true` if the envelope has already been routed to a dead-letter flow.
332    pub fn is_dead_letter(&self) -> bool {
333        self.dead_letter
334    }
335
336    /// Adds or replaces one header.
337    ///
338    /// # Parameters
339    /// - `key`: Header key.
340    /// - `value`: Header value converted to string.
341    ///
342    /// # Returns
343    /// Updated envelope.
344    pub fn with_header(mut self, key: impl Into<String>, value: impl ToString) -> Self {
345        self.headers.insert(key.into(), value.to_string());
346        self
347    }
348
349    /// Sets the ordering key.
350    ///
351    /// # Parameters
352    /// - `ordering_key`: Ordering key used by backends that support ordering.
353    ///
354    /// # Returns
355    /// Updated envelope.
356    pub fn with_ordering_key(mut self, ordering_key: impl Into<String>) -> Self {
357        self.ordering_key = Some(ordering_key.into());
358        self
359    }
360
361    /// Sets delayed delivery metadata.
362    ///
363    /// # Parameters
364    /// - `delay`: Requested delivery delay.
365    ///
366    /// # Returns
367    /// Updated envelope.
368    pub fn with_delay(mut self, delay: Duration) -> Self {
369        self.delay = Some(delay);
370        self
371    }
372
373    /// Injects an acknowledgement handle.
374    ///
375    /// # Parameters
376    /// - `acknowledgement`: Handle shared with processing code.
377    ///
378    /// # Returns
379    /// Updated envelope.
380    pub fn with_acknowledgement(mut self, acknowledgement: Acknowledgement) -> Self {
381        self.acknowledgement = Some(acknowledgement);
382        self
383    }
384
385    /// Marks the envelope as a dead letter.
386    ///
387    /// # Returns
388    /// Updated envelope with dead-letter marker enabled.
389    pub fn as_dead_letter(mut self) -> Self {
390        self.dead_letter = true;
391        self
392    }
393
394    /// Applies mutable metadata fields returned by a global interceptor.
395    pub(crate) fn apply_metadata(&mut self, metadata: EventEnvelopeMetadata) {
396        self.headers = metadata.headers;
397        self.ordering_key = metadata.ordering_key;
398        self.delay = metadata.delay;
399    }
400}
401
402/// Generates a process-local event ID.
403///
404/// # Returns
405/// Monotonic event ID string.
406pub(crate) fn generate_event_id() -> String {
407    let id = NEXT_EVENT_ID.fetch_add(1, Ordering::SeqCst);
408    format!("event-{id}")
409}