1use chrono::{DateTime, Utc};
2
3use crate::causal::{CausalContext, MessageId};
4use crate::channel::SchemaId;
5
6#[derive(Clone, Debug, PartialEq, Eq, Hash)]
8pub struct PublisherId(String);
9
10impl PublisherId {
11 #[must_use]
13 pub fn new(value: impl Into<String>) -> Self {
14 Self(value.into())
15 }
16
17 #[must_use]
19 pub fn as_str(&self) -> &str {
20 &self.0
21 }
22}
23
24impl From<&str> for PublisherId {
25 fn from(value: &str) -> Self {
26 Self::new(value)
27 }
28}
29
30impl From<String> for PublisherId {
31 fn from(value: String) -> Self {
32 Self::new(value)
33 }
34}
35
36impl Default for PublisherId {
37 fn default() -> Self {
38 Self::new("anonymous")
39 }
40}
41
42#[derive(Clone, Debug, PartialEq, Eq)]
44pub struct Envelope {
45 pub message_id: MessageId,
47 pub payload: Vec<u8>,
49 pub causal_context: Option<CausalContext>,
51 pub schema_id: SchemaId,
53 pub publisher_id: PublisherId,
55 pub timestamp: DateTime<Utc>,
57}
58
59impl Envelope {
60 #[must_use]
62 pub fn new(
63 payload: Vec<u8>,
64 causal_context: Option<CausalContext>,
65 schema_id: SchemaId,
66 publisher_id: PublisherId,
67 ) -> Self {
68 Self::with_timestamp(payload, causal_context, schema_id, publisher_id, Utc::now())
69 }
70
71 #[must_use]
73 pub fn with_timestamp(
74 payload: Vec<u8>,
75 causal_context: Option<CausalContext>,
76 schema_id: SchemaId,
77 publisher_id: PublisherId,
78 timestamp: DateTime<Utc>,
79 ) -> Self {
80 Self::with_message_id_and_timestamp(
81 MessageId::new(),
82 payload,
83 causal_context,
84 schema_id,
85 publisher_id,
86 timestamp,
87 )
88 }
89
90 #[must_use]
92 pub fn with_message_id(
93 message_id: MessageId,
94 payload: Vec<u8>,
95 causal_context: Option<CausalContext>,
96 schema_id: SchemaId,
97 publisher_id: PublisherId,
98 ) -> Self {
99 Self::with_message_id_and_timestamp(
100 message_id,
101 payload,
102 causal_context,
103 schema_id,
104 publisher_id,
105 Utc::now(),
106 )
107 }
108
109 #[must_use]
111 pub const fn with_message_id_and_timestamp(
112 message_id: MessageId,
113 payload: Vec<u8>,
114 causal_context: Option<CausalContext>,
115 schema_id: SchemaId,
116 publisher_id: PublisherId,
117 timestamp: DateTime<Utc>,
118 ) -> Self {
119 Self {
120 message_id,
121 payload,
122 causal_context,
123 schema_id,
124 publisher_id,
125 timestamp,
126 }
127 }
128}
129
130#[cfg(test)]
131mod tests {
132 use chrono::Utc;
133
134 use super::{Envelope, PublisherId};
135 use crate::causal::{CausalContext, MessageId};
136 use crate::channel::SchemaId;
137
138 #[test]
139 fn envelope_carries_required_fields() {
140 let schema_id = SchemaId::new();
141 let publisher_id = PublisherId::from("publisher-1");
142 let parent = MessageId::new();
143 let causal_context = Some(CausalContext::child_of(parent));
144 let timestamp = fixed_timestamp();
145 let message_id = MessageId::new();
146
147 let envelope = Envelope::with_message_id_and_timestamp(
148 message_id,
149 b"{}".to_vec(),
150 causal_context.clone(),
151 schema_id,
152 publisher_id.clone(),
153 timestamp,
154 );
155
156 assert_eq!(envelope.message_id, message_id);
157 assert_eq!(envelope.payload, b"{}".to_vec());
158 assert_eq!(envelope.causal_context, causal_context);
159 assert_eq!(envelope.schema_id, schema_id);
160 assert_eq!(envelope.publisher_id, publisher_id);
161 assert_eq!(envelope.timestamp, timestamp);
162 }
163
164 #[test]
165 fn envelope_assigns_unique_message_ids() {
166 let first = Envelope::new(vec![], None, SchemaId::new(), PublisherId::default());
167 let second = Envelope::new(vec![], None, SchemaId::new(), PublisherId::default());
168
169 assert_ne!(first.message_id, second.message_id);
170 }
171
172 fn fixed_timestamp() -> chrono::DateTime<Utc> {
173 Utc::now()
174 }
175}