Skip to main content

dactor/
message.rs

1use std::any::{Any, TypeId};
2use std::collections::HashMap;
3use std::time::Instant;
4
5use uuid::Uuid;
6
7/// Defines a message type and its expected reply.
8///
9/// Implemented on the MESSAGE, not the actor — decouples message definition
10/// from handling, allowing the same message to be handled by different actors.
11pub trait Message: Send + 'static {
12    /// The reply type. Use `()` for fire-and-forget (tell) messages.
13    type Reply: Send + 'static;
14}
15
16/// A header value that can be stored in the Headers map.
17/// Locally: stored by TypeId for type-safe access.
18/// Remotely: serialized via to_bytes() with header_name() as key.
19pub trait HeaderValue: Send + Sync + 'static {
20    /// Stable, unique name for wire transport.
21    fn header_name(&self) -> &'static str;
22
23    /// Serialize for remote transport. None = local-only header.
24    fn to_bytes(&self) -> Option<Vec<u8>>;
25
26    /// Downcast support.
27    fn as_any(&self) -> &dyn Any;
28}
29
30/// Type-safe header collection keyed by TypeId.
31/// Thread-safe: all values implement `Send + Sync` (via `HeaderValue` bounds).
32#[derive(Default)]
33pub struct Headers {
34    map: HashMap<TypeId, Box<dyn HeaderValue>>,
35}
36
37// Compile-time assertion that Headers is Send + Sync.
38const _: () = {
39    fn _assert<T: Send + Sync>() {}
40    fn _check() {
41        _assert::<Headers>();
42    }
43};
44
45impl Headers {
46    /// Create an empty header collection.
47    pub fn new() -> Self {
48        Self {
49            map: HashMap::new(),
50        }
51    }
52
53    /// Insert a typed header. Replaces any existing header of the same type.
54    pub fn insert<H: HeaderValue>(&mut self, value: H) {
55        self.map.insert(TypeId::of::<H>(), Box::new(value));
56    }
57
58    /// Get a reference to a typed header, if present.
59    pub fn get<H: HeaderValue + 'static>(&self) -> Option<&H> {
60        self.map
61            .get(&TypeId::of::<H>())
62            .and_then(|v| v.as_any().downcast_ref::<H>())
63    }
64
65    /// Remove a typed header, returning it if present.
66    pub fn remove<H: HeaderValue + 'static>(&mut self) -> Option<Box<dyn HeaderValue>> {
67        self.map.remove(&TypeId::of::<H>())
68    }
69
70    /// Insert a boxed header value. Used by wire deserialization to insert
71    /// headers whose concrete type is resolved at runtime via a registry.
72    pub fn insert_boxed(&mut self, value: Box<dyn HeaderValue>) {
73        let type_id = value.as_any().type_id();
74        self.map.insert(type_id, value);
75    }
76
77    /// Check if the headers collection is empty.
78    pub fn is_empty(&self) -> bool {
79        self.map.is_empty()
80    }
81
82    /// Number of headers.
83    pub fn len(&self) -> usize {
84        self.map.len()
85    }
86
87    /// Convert typed headers to wire-format [`WireHeaders`](crate::remote::WireHeaders).
88    ///
89    /// Iterates all headers and calls [`HeaderValue::to_bytes()`] on each.
90    /// Headers that return `None` (local-only) are skipped.
91    pub fn to_wire(&self) -> crate::remote::WireHeaders {
92        let mut wire = crate::remote::WireHeaders::new();
93        for value in self.map.values() {
94            if let Some(bytes) = value.to_bytes() {
95                wire.insert(value.header_name().to_string(), bytes);
96            }
97        }
98        wire
99    }
100}
101
102impl std::fmt::Debug for Headers {
103    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
104        f.debug_struct("Headers")
105            .field("count", &self.map.len())
106            .finish()
107    }
108}
109
110/// Globally unique identifier for a message, auto-assigned by the runtime.
111/// Uses UUID v4 for uniqueness across nodes without coordination.
112#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
113pub struct MessageId(pub Uuid);
114
115impl MessageId {
116    /// Generate a new globally unique MessageId (UUID v4).
117    pub fn next() -> Self {
118        Self(Uuid::new_v4())
119    }
120}
121
122impl std::fmt::Display for MessageId {
123    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
124        write!(f, "msg-{}", self.0)
125    }
126}
127
128/// Read-only headers auto-generated by the runtime.
129/// Interceptors can read these but cannot modify or remove them.
130///
131/// **Clone semantics:** Cloning preserves the original `message_id` and
132/// `timestamp`. This is intentional — cloned headers refer to the same
133/// logical message (e.g., when forwarding or replicating an envelope).
134/// Use `RuntimeHeaders::new()` to create headers for a genuinely new message.
135#[derive(Debug, Clone)]
136pub struct RuntimeHeaders {
137    /// Unique message ID — auto-assigned by the runtime for every message.
138    pub message_id: MessageId,
139    /// When the runtime received/created this message.
140    pub timestamp: Instant,
141}
142
143impl RuntimeHeaders {
144    /// Create new RuntimeHeaders with a fresh MessageId and current timestamp.
145    pub fn new() -> Self {
146        Self {
147            message_id: MessageId::next(),
148            timestamp: Instant::now(),
149        }
150    }
151}
152
153impl Default for RuntimeHeaders {
154    fn default() -> Self {
155        Self::new()
156    }
157}
158
159/// Built-in priority header. Used by priority mailboxes and interceptors.
160///
161/// Lower numeric value = higher urgency (like Unix nice values).
162/// `CRITICAL(0) < HIGH(64) < NORMAL(128) < LOW(192) < BACKGROUND(255)`.
163#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
164pub struct Priority(pub u8);
165
166impl Priority {
167    /// Highest urgency (value 0).
168    pub const CRITICAL: Self = Self(0);
169    /// High urgency (value 64).
170    pub const HIGH: Self = Self(64);
171    /// Default priority (value 128).
172    pub const NORMAL: Self = Self(128);
173    /// Low priority (value 192).
174    pub const LOW: Self = Self(192);
175    /// Lowest priority — background work (value 255).
176    pub const BACKGROUND: Self = Self(255);
177}
178
179impl Default for Priority {
180    fn default() -> Self {
181        Self::NORMAL
182    }
183}
184
185impl HeaderValue for Priority {
186    fn header_name(&self) -> &'static str {
187        "dactor.Priority"
188    }
189
190    fn to_bytes(&self) -> Option<Vec<u8>> {
191        Some(vec![self.0])
192    }
193
194    fn as_any(&self) -> &dyn Any {
195        self
196    }
197}
198
199impl std::fmt::Display for Priority {
200    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
201        match self.0 {
202            0 => write!(f, "CRITICAL"),
203            64 => write!(f, "HIGH"),
204            128 => write!(f, "NORMAL"),
205            192 => write!(f, "LOW"),
206            255 => write!(f, "BACKGROUND"),
207            n => write!(f, "Priority({})", n),
208        }
209    }
210}
211
212#[cfg(test)]
213mod tests {
214    use super::*;
215
216    struct Increment(#[allow(dead_code)] u64);
217    impl Message for Increment {
218        type Reply = ();
219    }
220
221    struct GetCount;
222    impl Message for GetCount {
223        type Reply = u64;
224    }
225
226    struct Reset;
227    impl Message for Reset {
228        type Reply = u64;
229    }
230
231    #[test]
232    fn test_message_reply_types() {
233        fn assert_reply_unit<M: Message<Reply = ()>>() {}
234        fn assert_reply_u64<M: Message<Reply = u64>>() {}
235
236        assert_reply_unit::<Increment>();
237        assert_reply_u64::<GetCount>();
238        assert_reply_u64::<Reset>();
239    }
240
241    #[test]
242    fn test_headers_insert_get() {
243        let mut headers = Headers::new();
244        headers.insert(Priority::HIGH);
245
246        let p = headers.get::<Priority>().unwrap();
247        assert_eq!(*p, Priority::HIGH);
248    }
249
250    #[test]
251    fn test_headers_insert_replace() {
252        let mut headers = Headers::new();
253        headers.insert(Priority::LOW);
254        headers.insert(Priority::CRITICAL);
255
256        let p = headers.get::<Priority>().unwrap();
257        assert_eq!(*p, Priority::CRITICAL);
258    }
259
260    #[test]
261    fn test_headers_remove() {
262        let mut headers = Headers::new();
263        headers.insert(Priority::NORMAL);
264        assert!(!headers.is_empty());
265
266        headers.remove::<Priority>();
267        assert!(headers.is_empty());
268        assert!(headers.get::<Priority>().is_none());
269    }
270
271    #[test]
272    fn test_headers_get_missing() {
273        let headers = Headers::new();
274        assert!(headers.get::<Priority>().is_none());
275    }
276
277    #[test]
278    fn test_multiple_header_types() {
279        #[derive(Debug)]
280        struct TraceId(String);
281
282        impl HeaderValue for TraceId {
283            fn header_name(&self) -> &'static str {
284                "app.TraceId"
285            }
286            fn to_bytes(&self) -> Option<Vec<u8>> {
287                Some(self.0.as_bytes().to_vec())
288            }
289            fn as_any(&self) -> &dyn Any {
290                self
291            }
292        }
293
294        let mut headers = Headers::new();
295        headers.insert(Priority::HIGH);
296        headers.insert(TraceId("abc-123".into()));
297
298        assert_eq!(headers.len(), 2);
299        assert_eq!(headers.get::<Priority>().unwrap().0, 64);
300        assert_eq!(headers.get::<TraceId>().unwrap().0, "abc-123");
301    }
302
303    #[test]
304    fn test_message_id_uniqueness() {
305        let ids: Vec<MessageId> = (0..1000).map(|_| MessageId::next()).collect();
306        let unique: std::collections::HashSet<_> = ids.iter().collect();
307        assert_eq!(unique.len(), 1000);
308    }
309
310    #[test]
311    fn test_message_id_display() {
312        let id = MessageId::next();
313        let display = format!("{}", id);
314        assert!(display.starts_with("msg-"));
315        assert!(display.len() > 10); // UUID is 36 chars
316    }
317
318    #[test]
319    fn test_runtime_headers_creation() {
320        let rh1 = RuntimeHeaders::new();
321        let rh2 = RuntimeHeaders::new();
322        assert_ne!(rh1.message_id, rh2.message_id);
323    }
324
325    #[test]
326    fn test_priority_constants() {
327        // Use runtime values to avoid clippy::assertions_on_constants
328        let critical = Priority::CRITICAL.0;
329        let high = Priority::HIGH.0;
330        let normal = Priority::NORMAL.0;
331        let low = Priority::LOW.0;
332        let background = Priority::BACKGROUND.0;
333        assert!(critical < high);
334        assert!(high < normal);
335        assert!(normal < low);
336        assert!(low < background);
337    }
338
339    #[test]
340    fn test_priority_display() {
341        assert_eq!(format!("{}", Priority::CRITICAL), "CRITICAL");
342        assert_eq!(format!("{}", Priority::HIGH), "HIGH");
343        assert_eq!(format!("{}", Priority::NORMAL), "NORMAL");
344        assert_eq!(format!("{}", Priority(100)), "Priority(100)");
345    }
346
347    #[test]
348    fn test_priority_to_bytes() {
349        let p = Priority::HIGH;
350        assert_eq!(p.to_bytes(), Some(vec![64]));
351        assert_eq!(p.header_name(), "dactor.Priority");
352    }
353
354    #[test]
355    fn test_headers_len() {
356        let mut headers = Headers::new();
357        assert_eq!(headers.len(), 0);
358        headers.insert(Priority::NORMAL);
359        assert_eq!(headers.len(), 1);
360    }
361
362    #[test]
363    fn test_local_only_header() {
364        #[derive(Debug)]
365        struct HandlerStartTime(#[allow(dead_code)] Instant);
366
367        impl HeaderValue for HandlerStartTime {
368            fn header_name(&self) -> &'static str {
369                "dactor.internal.HandlerStartTime"
370            }
371            fn to_bytes(&self) -> Option<Vec<u8>> {
372                None
373            }
374            fn as_any(&self) -> &dyn Any {
375                self
376            }
377        }
378
379        let mut headers = Headers::new();
380        headers.insert(HandlerStartTime(Instant::now()));
381
382        let h = headers.get::<HandlerStartTime>().unwrap();
383        assert!(h.to_bytes().is_none());
384    }
385}