Skip to main content

liminal/
envelope.rs

1use chrono::{DateTime, Utc};
2
3use crate::causal::{CausalContext, MessageId};
4use crate::channel::SchemaId;
5
6/// Identity of the publisher that submitted a message.
7#[derive(Clone, Debug, PartialEq, Eq, Hash)]
8pub struct PublisherId(String);
9
10impl PublisherId {
11    /// Creates a publisher identifier from a string-like value.
12    #[must_use]
13    pub fn new(value: impl Into<String>) -> Self {
14        Self(value.into())
15    }
16
17    /// Returns the publisher identifier as a string slice.
18    #[must_use]
19    pub fn as_str(&self) -> &str {
20        &self.0
21    }
22}
23
24impl From<&str> for PublisherId {
25    fn from(value: &str) -> Self {
26        Self::new(value)
27    }
28}
29
30impl From<String> for PublisherId {
31    fn from(value: String) -> Self {
32        Self::new(value)
33    }
34}
35
36impl Default for PublisherId {
37    fn default() -> Self {
38        Self::new("anonymous")
39    }
40}
41
42/// Message envelope used as the delivery unit inside the core bus.
43#[derive(Clone, Debug, PartialEq, Eq)]
44pub struct Envelope {
45    /// UUID-based unique identifier assigned when the message is published.
46    pub message_id: MessageId,
47    /// Validated payload bytes, normalized by the schema when defaults are applied.
48    pub payload: Vec<u8>,
49    /// Optional parent reference for causal-chain metadata.
50    pub causal_context: Option<CausalContext>,
51    /// Schema version that validated this payload.
52    pub schema_id: SchemaId,
53    /// Publisher identity attached at publish time.
54    pub publisher_id: PublisherId,
55    /// UTC timestamp captured when the message was published.
56    pub timestamp: DateTime<Utc>,
57}
58
59impl Envelope {
60    /// Creates an envelope with the current UTC publish timestamp.
61    #[must_use]
62    pub fn new(
63        payload: Vec<u8>,
64        causal_context: Option<CausalContext>,
65        schema_id: SchemaId,
66        publisher_id: PublisherId,
67    ) -> Self {
68        Self::with_timestamp(payload, causal_context, schema_id, publisher_id, Utc::now())
69    }
70
71    /// Creates an envelope with an explicit timestamp.
72    #[must_use]
73    pub fn with_timestamp(
74        payload: Vec<u8>,
75        causal_context: Option<CausalContext>,
76        schema_id: SchemaId,
77        publisher_id: PublisherId,
78        timestamp: DateTime<Utc>,
79    ) -> Self {
80        Self::with_message_id_and_timestamp(
81            MessageId::new(),
82            payload,
83            causal_context,
84            schema_id,
85            publisher_id,
86            timestamp,
87        )
88    }
89
90    /// Creates an envelope with an explicit message identifier and current UTC timestamp.
91    #[must_use]
92    pub fn with_message_id(
93        message_id: MessageId,
94        payload: Vec<u8>,
95        causal_context: Option<CausalContext>,
96        schema_id: SchemaId,
97        publisher_id: PublisherId,
98    ) -> Self {
99        Self::with_message_id_and_timestamp(
100            message_id,
101            payload,
102            causal_context,
103            schema_id,
104            publisher_id,
105            Utc::now(),
106        )
107    }
108
109    /// Creates an envelope with explicit message identifier and timestamp.
110    #[must_use]
111    pub const fn with_message_id_and_timestamp(
112        message_id: MessageId,
113        payload: Vec<u8>,
114        causal_context: Option<CausalContext>,
115        schema_id: SchemaId,
116        publisher_id: PublisherId,
117        timestamp: DateTime<Utc>,
118    ) -> Self {
119        Self {
120            message_id,
121            payload,
122            causal_context,
123            schema_id,
124            publisher_id,
125            timestamp,
126        }
127    }
128}
129
130#[cfg(test)]
131mod tests {
132    use chrono::Utc;
133
134    use super::{Envelope, PublisherId};
135    use crate::causal::{CausalContext, MessageId};
136    use crate::channel::SchemaId;
137
138    #[test]
139    fn envelope_carries_required_fields() {
140        let schema_id = SchemaId::new();
141        let publisher_id = PublisherId::from("publisher-1");
142        let parent = MessageId::new();
143        let causal_context = Some(CausalContext::child_of(parent));
144        let timestamp = fixed_timestamp();
145        let message_id = MessageId::new();
146
147        let envelope = Envelope::with_message_id_and_timestamp(
148            message_id,
149            b"{}".to_vec(),
150            causal_context.clone(),
151            schema_id,
152            publisher_id.clone(),
153            timestamp,
154        );
155
156        assert_eq!(envelope.message_id, message_id);
157        assert_eq!(envelope.payload, b"{}".to_vec());
158        assert_eq!(envelope.causal_context, causal_context);
159        assert_eq!(envelope.schema_id, schema_id);
160        assert_eq!(envelope.publisher_id, publisher_id);
161        assert_eq!(envelope.timestamp, timestamp);
162    }
163
164    #[test]
165    fn envelope_assigns_unique_message_ids() {
166        let first = Envelope::new(vec![], None, SchemaId::new(), PublisherId::default());
167        let second = Envelope::new(vec![], None, SchemaId::new(), PublisherId::default());
168
169        assert_ne!(first.message_id, second.message_id);
170    }
171
172    fn fixed_timestamp() -> chrono::DateTime<Utc> {
173        Utc::now()
174    }
175}