1use std::any::{Any, TypeId};
2use std::collections::HashMap;
3use std::time::Instant;
4
5use uuid::Uuid;
6
7pub trait Message: Send + 'static {
12 type Reply: Send + 'static;
14}
15
16pub trait HeaderValue: Send + Sync + 'static {
20 fn header_name(&self) -> &'static str;
22
23 fn to_bytes(&self) -> Option<Vec<u8>>;
25
26 fn as_any(&self) -> &dyn Any;
28}
29
30#[derive(Default)]
33pub struct Headers {
34 map: HashMap<TypeId, Box<dyn HeaderValue>>,
35}
36
37const _: () = {
39 fn _assert<T: Send + Sync>() {}
40 fn _check() {
41 _assert::<Headers>();
42 }
43};
44
45impl Headers {
46 pub fn new() -> Self {
48 Self {
49 map: HashMap::new(),
50 }
51 }
52
53 pub fn insert<H: HeaderValue>(&mut self, value: H) {
55 self.map.insert(TypeId::of::<H>(), Box::new(value));
56 }
57
58 pub fn get<H: HeaderValue + 'static>(&self) -> Option<&H> {
60 self.map
61 .get(&TypeId::of::<H>())
62 .and_then(|v| v.as_any().downcast_ref::<H>())
63 }
64
65 pub fn remove<H: HeaderValue + 'static>(&mut self) -> Option<Box<dyn HeaderValue>> {
67 self.map.remove(&TypeId::of::<H>())
68 }
69
70 pub fn insert_boxed(&mut self, value: Box<dyn HeaderValue>) {
73 let type_id = value.as_any().type_id();
74 self.map.insert(type_id, value);
75 }
76
77 pub fn is_empty(&self) -> bool {
79 self.map.is_empty()
80 }
81
82 pub fn len(&self) -> usize {
84 self.map.len()
85 }
86
87 pub fn to_wire(&self) -> crate::remote::WireHeaders {
92 let mut wire = crate::remote::WireHeaders::new();
93 for value in self.map.values() {
94 if let Some(bytes) = value.to_bytes() {
95 wire.insert(value.header_name().to_string(), bytes);
96 }
97 }
98 wire
99 }
100}
101
102impl std::fmt::Debug for Headers {
103 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
104 f.debug_struct("Headers")
105 .field("count", &self.map.len())
106 .finish()
107 }
108}
109
110#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
113pub struct MessageId(pub Uuid);
114
115impl MessageId {
116 pub fn next() -> Self {
118 Self(Uuid::new_v4())
119 }
120}
121
122impl std::fmt::Display for MessageId {
123 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
124 write!(f, "msg-{}", self.0)
125 }
126}
127
128#[derive(Debug, Clone)]
136pub struct RuntimeHeaders {
137 pub message_id: MessageId,
139 pub timestamp: Instant,
141}
142
143impl RuntimeHeaders {
144 pub fn new() -> Self {
146 Self {
147 message_id: MessageId::next(),
148 timestamp: Instant::now(),
149 }
150 }
151}
152
153impl Default for RuntimeHeaders {
154 fn default() -> Self {
155 Self::new()
156 }
157}
158
159#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
164pub struct Priority(pub u8);
165
166impl Priority {
167 pub const CRITICAL: Self = Self(0);
169 pub const HIGH: Self = Self(64);
171 pub const NORMAL: Self = Self(128);
173 pub const LOW: Self = Self(192);
175 pub const BACKGROUND: Self = Self(255);
177}
178
179impl Default for Priority {
180 fn default() -> Self {
181 Self::NORMAL
182 }
183}
184
185impl HeaderValue for Priority {
186 fn header_name(&self) -> &'static str {
187 "dactor.Priority"
188 }
189
190 fn to_bytes(&self) -> Option<Vec<u8>> {
191 Some(vec![self.0])
192 }
193
194 fn as_any(&self) -> &dyn Any {
195 self
196 }
197}
198
199impl std::fmt::Display for Priority {
200 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
201 match self.0 {
202 0 => write!(f, "CRITICAL"),
203 64 => write!(f, "HIGH"),
204 128 => write!(f, "NORMAL"),
205 192 => write!(f, "LOW"),
206 255 => write!(f, "BACKGROUND"),
207 n => write!(f, "Priority({})", n),
208 }
209 }
210}
211
212#[cfg(test)]
213mod tests {
214 use super::*;
215
216 struct Increment(#[allow(dead_code)] u64);
217 impl Message for Increment {
218 type Reply = ();
219 }
220
221 struct GetCount;
222 impl Message for GetCount {
223 type Reply = u64;
224 }
225
226 struct Reset;
227 impl Message for Reset {
228 type Reply = u64;
229 }
230
231 #[test]
232 fn test_message_reply_types() {
233 fn assert_reply_unit<M: Message<Reply = ()>>() {}
234 fn assert_reply_u64<M: Message<Reply = u64>>() {}
235
236 assert_reply_unit::<Increment>();
237 assert_reply_u64::<GetCount>();
238 assert_reply_u64::<Reset>();
239 }
240
241 #[test]
242 fn test_headers_insert_get() {
243 let mut headers = Headers::new();
244 headers.insert(Priority::HIGH);
245
246 let p = headers.get::<Priority>().unwrap();
247 assert_eq!(*p, Priority::HIGH);
248 }
249
250 #[test]
251 fn test_headers_insert_replace() {
252 let mut headers = Headers::new();
253 headers.insert(Priority::LOW);
254 headers.insert(Priority::CRITICAL);
255
256 let p = headers.get::<Priority>().unwrap();
257 assert_eq!(*p, Priority::CRITICAL);
258 }
259
260 #[test]
261 fn test_headers_remove() {
262 let mut headers = Headers::new();
263 headers.insert(Priority::NORMAL);
264 assert!(!headers.is_empty());
265
266 headers.remove::<Priority>();
267 assert!(headers.is_empty());
268 assert!(headers.get::<Priority>().is_none());
269 }
270
271 #[test]
272 fn test_headers_get_missing() {
273 let headers = Headers::new();
274 assert!(headers.get::<Priority>().is_none());
275 }
276
277 #[test]
278 fn test_multiple_header_types() {
279 #[derive(Debug)]
280 struct TraceId(String);
281
282 impl HeaderValue for TraceId {
283 fn header_name(&self) -> &'static str {
284 "app.TraceId"
285 }
286 fn to_bytes(&self) -> Option<Vec<u8>> {
287 Some(self.0.as_bytes().to_vec())
288 }
289 fn as_any(&self) -> &dyn Any {
290 self
291 }
292 }
293
294 let mut headers = Headers::new();
295 headers.insert(Priority::HIGH);
296 headers.insert(TraceId("abc-123".into()));
297
298 assert_eq!(headers.len(), 2);
299 assert_eq!(headers.get::<Priority>().unwrap().0, 64);
300 assert_eq!(headers.get::<TraceId>().unwrap().0, "abc-123");
301 }
302
303 #[test]
304 fn test_message_id_uniqueness() {
305 let ids: Vec<MessageId> = (0..1000).map(|_| MessageId::next()).collect();
306 let unique: std::collections::HashSet<_> = ids.iter().collect();
307 assert_eq!(unique.len(), 1000);
308 }
309
310 #[test]
311 fn test_message_id_display() {
312 let id = MessageId::next();
313 let display = format!("{}", id);
314 assert!(display.starts_with("msg-"));
315 assert!(display.len() > 10); }
317
318 #[test]
319 fn test_runtime_headers_creation() {
320 let rh1 = RuntimeHeaders::new();
321 let rh2 = RuntimeHeaders::new();
322 assert_ne!(rh1.message_id, rh2.message_id);
323 }
324
325 #[test]
326 fn test_priority_constants() {
327 let critical = Priority::CRITICAL.0;
329 let high = Priority::HIGH.0;
330 let normal = Priority::NORMAL.0;
331 let low = Priority::LOW.0;
332 let background = Priority::BACKGROUND.0;
333 assert!(critical < high);
334 assert!(high < normal);
335 assert!(normal < low);
336 assert!(low < background);
337 }
338
339 #[test]
340 fn test_priority_display() {
341 assert_eq!(format!("{}", Priority::CRITICAL), "CRITICAL");
342 assert_eq!(format!("{}", Priority::HIGH), "HIGH");
343 assert_eq!(format!("{}", Priority::NORMAL), "NORMAL");
344 assert_eq!(format!("{}", Priority(100)), "Priority(100)");
345 }
346
347 #[test]
348 fn test_priority_to_bytes() {
349 let p = Priority::HIGH;
350 assert_eq!(p.to_bytes(), Some(vec![64]));
351 assert_eq!(p.header_name(), "dactor.Priority");
352 }
353
354 #[test]
355 fn test_headers_len() {
356 let mut headers = Headers::new();
357 assert_eq!(headers.len(), 0);
358 headers.insert(Priority::NORMAL);
359 assert_eq!(headers.len(), 1);
360 }
361
362 #[test]
363 fn test_local_only_header() {
364 #[derive(Debug)]
365 struct HandlerStartTime(#[allow(dead_code)] Instant);
366
367 impl HeaderValue for HandlerStartTime {
368 fn header_name(&self) -> &'static str {
369 "dactor.internal.HandlerStartTime"
370 }
371 fn to_bytes(&self) -> Option<Vec<u8>> {
372 None
373 }
374 fn as_any(&self) -> &dyn Any {
375 self
376 }
377 }
378
379 let mut headers = Headers::new();
380 headers.insert(HandlerStartTime(Instant::now()));
381
382 let h = headers.get::<HandlerStartTime>().unwrap();
383 assert!(h.to_bytes().is_none());
384 }
385}