use std::any::{Any, TypeId};
use std::collections::HashMap;
use std::time::Instant;
use uuid::Uuid;
pub trait Message: Send + 'static {
type Reply: Send + 'static;
}
pub trait HeaderValue: Send + Sync + 'static {
fn header_name(&self) -> &'static str;
fn to_bytes(&self) -> Option<Vec<u8>>;
fn as_any(&self) -> &dyn Any;
}
#[derive(Default)]
pub struct Headers {
map: HashMap<TypeId, Box<dyn HeaderValue>>,
}
const _: () = {
fn _assert<T: Send + Sync>() {}
fn _check() {
_assert::<Headers>();
}
};
impl Headers {
pub fn new() -> Self {
Self {
map: HashMap::new(),
}
}
pub fn insert<H: HeaderValue>(&mut self, value: H) {
self.map.insert(TypeId::of::<H>(), Box::new(value));
}
pub fn get<H: HeaderValue + 'static>(&self) -> Option<&H> {
self.map
.get(&TypeId::of::<H>())
.and_then(|v| v.as_any().downcast_ref::<H>())
}
pub fn remove<H: HeaderValue + 'static>(&mut self) -> Option<Box<dyn HeaderValue>> {
self.map.remove(&TypeId::of::<H>())
}
pub fn insert_boxed(&mut self, value: Box<dyn HeaderValue>) {
let type_id = value.as_any().type_id();
self.map.insert(type_id, value);
}
pub fn is_empty(&self) -> bool {
self.map.is_empty()
}
pub fn len(&self) -> usize {
self.map.len()
}
pub fn to_wire(&self) -> crate::remote::WireHeaders {
let mut wire = crate::remote::WireHeaders::new();
for value in self.map.values() {
if let Some(bytes) = value.to_bytes() {
wire.insert(value.header_name().to_string(), bytes);
}
}
wire
}
}
impl std::fmt::Debug for Headers {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Headers")
.field("count", &self.map.len())
.finish()
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct MessageId(pub Uuid);
impl MessageId {
pub fn next() -> Self {
Self(Uuid::new_v4())
}
}
impl std::fmt::Display for MessageId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "msg-{}", self.0)
}
}
#[derive(Debug, Clone)]
pub struct RuntimeHeaders {
pub message_id: MessageId,
pub timestamp: Instant,
}
impl RuntimeHeaders {
pub fn new() -> Self {
Self {
message_id: MessageId::next(),
timestamp: Instant::now(),
}
}
}
impl Default for RuntimeHeaders {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub struct Priority(pub u8);
impl Priority {
pub const CRITICAL: Self = Self(0);
pub const HIGH: Self = Self(64);
pub const NORMAL: Self = Self(128);
pub const LOW: Self = Self(192);
pub const BACKGROUND: Self = Self(255);
}
impl Default for Priority {
fn default() -> Self {
Self::NORMAL
}
}
impl HeaderValue for Priority {
fn header_name(&self) -> &'static str {
"dactor.Priority"
}
fn to_bytes(&self) -> Option<Vec<u8>> {
Some(vec![self.0])
}
fn as_any(&self) -> &dyn Any {
self
}
}
impl std::fmt::Display for Priority {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self.0 {
0 => write!(f, "CRITICAL"),
64 => write!(f, "HIGH"),
128 => write!(f, "NORMAL"),
192 => write!(f, "LOW"),
255 => write!(f, "BACKGROUND"),
n => write!(f, "Priority({})", n),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
struct Increment(#[allow(dead_code)] u64);
impl Message for Increment {
type Reply = ();
}
struct GetCount;
impl Message for GetCount {
type Reply = u64;
}
struct Reset;
impl Message for Reset {
type Reply = u64;
}
#[test]
fn test_message_reply_types() {
fn assert_reply_unit<M: Message<Reply = ()>>() {}
fn assert_reply_u64<M: Message<Reply = u64>>() {}
assert_reply_unit::<Increment>();
assert_reply_u64::<GetCount>();
assert_reply_u64::<Reset>();
}
#[test]
fn test_headers_insert_get() {
let mut headers = Headers::new();
headers.insert(Priority::HIGH);
let p = headers.get::<Priority>().unwrap();
assert_eq!(*p, Priority::HIGH);
}
#[test]
fn test_headers_insert_replace() {
let mut headers = Headers::new();
headers.insert(Priority::LOW);
headers.insert(Priority::CRITICAL);
let p = headers.get::<Priority>().unwrap();
assert_eq!(*p, Priority::CRITICAL);
}
#[test]
fn test_headers_remove() {
let mut headers = Headers::new();
headers.insert(Priority::NORMAL);
assert!(!headers.is_empty());
headers.remove::<Priority>();
assert!(headers.is_empty());
assert!(headers.get::<Priority>().is_none());
}
#[test]
fn test_headers_get_missing() {
let headers = Headers::new();
assert!(headers.get::<Priority>().is_none());
}
#[test]
fn test_multiple_header_types() {
#[derive(Debug)]
struct TraceId(String);
impl HeaderValue for TraceId {
fn header_name(&self) -> &'static str {
"app.TraceId"
}
fn to_bytes(&self) -> Option<Vec<u8>> {
Some(self.0.as_bytes().to_vec())
}
fn as_any(&self) -> &dyn Any {
self
}
}
let mut headers = Headers::new();
headers.insert(Priority::HIGH);
headers.insert(TraceId("abc-123".into()));
assert_eq!(headers.len(), 2);
assert_eq!(headers.get::<Priority>().unwrap().0, 64);
assert_eq!(headers.get::<TraceId>().unwrap().0, "abc-123");
}
#[test]
fn test_message_id_uniqueness() {
let ids: Vec<MessageId> = (0..1000).map(|_| MessageId::next()).collect();
let unique: std::collections::HashSet<_> = ids.iter().collect();
assert_eq!(unique.len(), 1000);
}
#[test]
fn test_message_id_display() {
let id = MessageId::next();
let display = format!("{}", id);
assert!(display.starts_with("msg-"));
assert!(display.len() > 10); }
#[test]
fn test_runtime_headers_creation() {
let rh1 = RuntimeHeaders::new();
let rh2 = RuntimeHeaders::new();
assert_ne!(rh1.message_id, rh2.message_id);
}
#[test]
fn test_priority_constants() {
let critical = Priority::CRITICAL.0;
let high = Priority::HIGH.0;
let normal = Priority::NORMAL.0;
let low = Priority::LOW.0;
let background = Priority::BACKGROUND.0;
assert!(critical < high);
assert!(high < normal);
assert!(normal < low);
assert!(low < background);
}
#[test]
fn test_priority_display() {
assert_eq!(format!("{}", Priority::CRITICAL), "CRITICAL");
assert_eq!(format!("{}", Priority::HIGH), "HIGH");
assert_eq!(format!("{}", Priority::NORMAL), "NORMAL");
assert_eq!(format!("{}", Priority(100)), "Priority(100)");
}
#[test]
fn test_priority_to_bytes() {
let p = Priority::HIGH;
assert_eq!(p.to_bytes(), Some(vec![64]));
assert_eq!(p.header_name(), "dactor.Priority");
}
#[test]
fn test_headers_len() {
let mut headers = Headers::new();
assert_eq!(headers.len(), 0);
headers.insert(Priority::NORMAL);
assert_eq!(headers.len(), 1);
}
#[test]
fn test_local_only_header() {
#[derive(Debug)]
struct HandlerStartTime(#[allow(dead_code)] Instant);
impl HeaderValue for HandlerStartTime {
fn header_name(&self) -> &'static str {
"dactor.internal.HandlerStartTime"
}
fn to_bytes(&self) -> Option<Vec<u8>> {
None
}
fn as_any(&self) -> &dyn Any {
self
}
}
let mut headers = Headers::new();
headers.insert(HandlerStartTime(Instant::now()));
let h = headers.get::<HandlerStartTime>().unwrap();
assert!(h.to_bytes().is_none());
}
}