dactor 0.3.0

An abstract framework for distributed actors in Rust
Documentation
use std::any::{Any, TypeId};
use std::collections::HashMap;
use std::time::Instant;

use uuid::Uuid;

/// Defines a message type and its expected reply.
///
/// Implemented on the MESSAGE, not the actor — decouples message definition
/// from handling, allowing the same message to be handled by different actors.
pub trait Message: Send + 'static {
    /// The reply type. Use `()` for fire-and-forget (tell) messages.
    type Reply: Send + 'static;
}

/// A header value that can be stored in the Headers map.
/// Locally: stored by TypeId for type-safe access.
/// Remotely: serialized via to_bytes() with header_name() as key.
pub trait HeaderValue: Send + Sync + 'static {
    /// Stable, unique name for wire transport.
    fn header_name(&self) -> &'static str;

    /// Serialize for remote transport. None = local-only header.
    fn to_bytes(&self) -> Option<Vec<u8>>;

    /// Downcast support.
    fn as_any(&self) -> &dyn Any;
}

/// Type-safe header collection keyed by TypeId.
/// Thread-safe: all values implement `Send + Sync` (via `HeaderValue` bounds).
#[derive(Default)]
pub struct Headers {
    map: HashMap<TypeId, Box<dyn HeaderValue>>,
}

// Compile-time assertion that Headers is Send + Sync.
const _: () = {
    fn _assert<T: Send + Sync>() {}
    fn _check() {
        _assert::<Headers>();
    }
};

impl Headers {
    /// Create an empty header collection.
    pub fn new() -> Self {
        Self {
            map: HashMap::new(),
        }
    }

    /// Insert a typed header. Replaces any existing header of the same type.
    pub fn insert<H: HeaderValue>(&mut self, value: H) {
        self.map.insert(TypeId::of::<H>(), Box::new(value));
    }

    /// Get a reference to a typed header, if present.
    pub fn get<H: HeaderValue + 'static>(&self) -> Option<&H> {
        self.map
            .get(&TypeId::of::<H>())
            .and_then(|v| v.as_any().downcast_ref::<H>())
    }

    /// Remove a typed header, returning it if present.
    pub fn remove<H: HeaderValue + 'static>(&mut self) -> Option<Box<dyn HeaderValue>> {
        self.map.remove(&TypeId::of::<H>())
    }

    /// Insert a boxed header value. Used by wire deserialization to insert
    /// headers whose concrete type is resolved at runtime via a registry.
    pub fn insert_boxed(&mut self, value: Box<dyn HeaderValue>) {
        let type_id = value.as_any().type_id();
        self.map.insert(type_id, value);
    }

    /// Check if the headers collection is empty.
    pub fn is_empty(&self) -> bool {
        self.map.is_empty()
    }

    /// Number of headers.
    pub fn len(&self) -> usize {
        self.map.len()
    }

    /// Convert typed headers to wire-format [`WireHeaders`](crate::remote::WireHeaders).
    ///
    /// Iterates all headers and calls [`HeaderValue::to_bytes()`] on each.
    /// Headers that return `None` (local-only) are skipped.
    pub fn to_wire(&self) -> crate::remote::WireHeaders {
        let mut wire = crate::remote::WireHeaders::new();
        for value in self.map.values() {
            if let Some(bytes) = value.to_bytes() {
                wire.insert(value.header_name().to_string(), bytes);
            }
        }
        wire
    }
}

impl std::fmt::Debug for Headers {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("Headers")
            .field("count", &self.map.len())
            .finish()
    }
}

/// Globally unique identifier for a message, auto-assigned by the runtime.
/// Uses UUID v4 for uniqueness across nodes without coordination.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct MessageId(pub Uuid);

impl MessageId {
    /// Generate a new globally unique MessageId (UUID v4).
    pub fn next() -> Self {
        Self(Uuid::new_v4())
    }
}

impl std::fmt::Display for MessageId {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        write!(f, "msg-{}", self.0)
    }
}

/// Read-only headers auto-generated by the runtime.
/// Interceptors can read these but cannot modify or remove them.
///
/// **Clone semantics:** Cloning preserves the original `message_id` and
/// `timestamp`. This is intentional — cloned headers refer to the same
/// logical message (e.g., when forwarding or replicating an envelope).
/// Use `RuntimeHeaders::new()` to create headers for a genuinely new message.
#[derive(Debug, Clone)]
pub struct RuntimeHeaders {
    /// Unique message ID — auto-assigned by the runtime for every message.
    pub message_id: MessageId,
    /// When the runtime received/created this message.
    pub timestamp: Instant,
}

impl RuntimeHeaders {
    /// Create new RuntimeHeaders with a fresh MessageId and current timestamp.
    pub fn new() -> Self {
        Self {
            message_id: MessageId::next(),
            timestamp: Instant::now(),
        }
    }
}

impl Default for RuntimeHeaders {
    fn default() -> Self {
        Self::new()
    }
}

/// Built-in priority header. Used by priority mailboxes and interceptors.
///
/// Lower numeric value = higher urgency (like Unix nice values).
/// `CRITICAL(0) < HIGH(64) < NORMAL(128) < LOW(192) < BACKGROUND(255)`.
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub struct Priority(pub u8);

impl Priority {
    /// Highest urgency (value 0).
    pub const CRITICAL: Self = Self(0);
    /// High urgency (value 64).
    pub const HIGH: Self = Self(64);
    /// Default priority (value 128).
    pub const NORMAL: Self = Self(128);
    /// Low priority (value 192).
    pub const LOW: Self = Self(192);
    /// Lowest priority — background work (value 255).
    pub const BACKGROUND: Self = Self(255);
}

impl Default for Priority {
    fn default() -> Self {
        Self::NORMAL
    }
}

impl HeaderValue for Priority {
    fn header_name(&self) -> &'static str {
        "dactor.Priority"
    }

    fn to_bytes(&self) -> Option<Vec<u8>> {
        Some(vec![self.0])
    }

    fn as_any(&self) -> &dyn Any {
        self
    }
}

impl std::fmt::Display for Priority {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        match self.0 {
            0 => write!(f, "CRITICAL"),
            64 => write!(f, "HIGH"),
            128 => write!(f, "NORMAL"),
            192 => write!(f, "LOW"),
            255 => write!(f, "BACKGROUND"),
            n => write!(f, "Priority({})", n),
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    struct Increment(#[allow(dead_code)] u64);
    impl Message for Increment {
        type Reply = ();
    }

    struct GetCount;
    impl Message for GetCount {
        type Reply = u64;
    }

    struct Reset;
    impl Message for Reset {
        type Reply = u64;
    }

    #[test]
    fn test_message_reply_types() {
        fn assert_reply_unit<M: Message<Reply = ()>>() {}
        fn assert_reply_u64<M: Message<Reply = u64>>() {}

        assert_reply_unit::<Increment>();
        assert_reply_u64::<GetCount>();
        assert_reply_u64::<Reset>();
    }

    #[test]
    fn test_headers_insert_get() {
        let mut headers = Headers::new();
        headers.insert(Priority::HIGH);

        let p = headers.get::<Priority>().unwrap();
        assert_eq!(*p, Priority::HIGH);
    }

    #[test]
    fn test_headers_insert_replace() {
        let mut headers = Headers::new();
        headers.insert(Priority::LOW);
        headers.insert(Priority::CRITICAL);

        let p = headers.get::<Priority>().unwrap();
        assert_eq!(*p, Priority::CRITICAL);
    }

    #[test]
    fn test_headers_remove() {
        let mut headers = Headers::new();
        headers.insert(Priority::NORMAL);
        assert!(!headers.is_empty());

        headers.remove::<Priority>();
        assert!(headers.is_empty());
        assert!(headers.get::<Priority>().is_none());
    }

    #[test]
    fn test_headers_get_missing() {
        let headers = Headers::new();
        assert!(headers.get::<Priority>().is_none());
    }

    #[test]
    fn test_multiple_header_types() {
        #[derive(Debug)]
        struct TraceId(String);

        impl HeaderValue for TraceId {
            fn header_name(&self) -> &'static str {
                "app.TraceId"
            }
            fn to_bytes(&self) -> Option<Vec<u8>> {
                Some(self.0.as_bytes().to_vec())
            }
            fn as_any(&self) -> &dyn Any {
                self
            }
        }

        let mut headers = Headers::new();
        headers.insert(Priority::HIGH);
        headers.insert(TraceId("abc-123".into()));

        assert_eq!(headers.len(), 2);
        assert_eq!(headers.get::<Priority>().unwrap().0, 64);
        assert_eq!(headers.get::<TraceId>().unwrap().0, "abc-123");
    }

    #[test]
    fn test_message_id_uniqueness() {
        let ids: Vec<MessageId> = (0..1000).map(|_| MessageId::next()).collect();
        let unique: std::collections::HashSet<_> = ids.iter().collect();
        assert_eq!(unique.len(), 1000);
    }

    #[test]
    fn test_message_id_display() {
        let id = MessageId::next();
        let display = format!("{}", id);
        assert!(display.starts_with("msg-"));
        assert!(display.len() > 10); // UUID is 36 chars
    }

    #[test]
    fn test_runtime_headers_creation() {
        let rh1 = RuntimeHeaders::new();
        let rh2 = RuntimeHeaders::new();
        assert_ne!(rh1.message_id, rh2.message_id);
    }

    #[test]
    fn test_priority_constants() {
        // Use runtime values to avoid clippy::assertions_on_constants
        let critical = Priority::CRITICAL.0;
        let high = Priority::HIGH.0;
        let normal = Priority::NORMAL.0;
        let low = Priority::LOW.0;
        let background = Priority::BACKGROUND.0;
        assert!(critical < high);
        assert!(high < normal);
        assert!(normal < low);
        assert!(low < background);
    }

    #[test]
    fn test_priority_display() {
        assert_eq!(format!("{}", Priority::CRITICAL), "CRITICAL");
        assert_eq!(format!("{}", Priority::HIGH), "HIGH");
        assert_eq!(format!("{}", Priority::NORMAL), "NORMAL");
        assert_eq!(format!("{}", Priority(100)), "Priority(100)");
    }

    #[test]
    fn test_priority_to_bytes() {
        let p = Priority::HIGH;
        assert_eq!(p.to_bytes(), Some(vec![64]));
        assert_eq!(p.header_name(), "dactor.Priority");
    }

    #[test]
    fn test_headers_len() {
        let mut headers = Headers::new();
        assert_eq!(headers.len(), 0);
        headers.insert(Priority::NORMAL);
        assert_eq!(headers.len(), 1);
    }

    #[test]
    fn test_local_only_header() {
        #[derive(Debug)]
        struct HandlerStartTime(#[allow(dead_code)] Instant);

        impl HeaderValue for HandlerStartTime {
            fn header_name(&self) -> &'static str {
                "dactor.internal.HandlerStartTime"
            }
            fn to_bytes(&self) -> Option<Vec<u8>> {
                None
            }
            fn as_any(&self) -> &dyn Any {
                self
            }
        }

        let mut headers = Headers::new();
        headers.insert(HandlerStartTime(Instant::now()));

        let h = headers.get::<HandlerStartTime>().unwrap();
        assert!(h.to_bytes().is_none());
    }
}