mm1_core/
envelope.rs

1use std::any::TypeId;
2use std::fmt;
3use std::sync::atomic::{AtomicU64, Ordering};
4
5use mm1_address::address::Address;
6pub use mm1_proc_macros::dispatch;
7use mm1_proto::Message;
8
9use crate::message::AnyMessage;
10use crate::tracing::TraceId;
11
12static ENVELOPE_SEQ_NO: AtomicU64 = AtomicU64::new(0);
13
14const DEFAULT_TTL: usize = 3;
15
16#[derive(Debug)]
17pub struct EnvelopeHeader {
18    pub to:  Address,
19    pub ttl: usize,
20
21    #[allow(dead_code)]
22    no:       u64,
23    #[allow(dead_code)]
24    trace_id: TraceId,
25}
26
27pub struct Envelope<M = AnyMessage> {
28    header:  EnvelopeHeader,
29    message: M,
30}
31
32impl EnvelopeHeader {
33    pub fn to_address(to: Address) -> Self {
34        Self {
35            to,
36            no: ENVELOPE_SEQ_NO.fetch_add(1, Ordering::Relaxed),
37            ttl: DEFAULT_TTL,
38            trace_id: TraceId::current(),
39        }
40    }
41
42    pub fn with_ttl(self, ttl: usize) -> Self {
43        Self { ttl, ..self }
44    }
45
46    pub fn with_trace_id(self, trace_id: TraceId) -> Self {
47        Self { trace_id, ..self }
48    }
49}
50
51impl EnvelopeHeader {
52    pub fn trace_id(&self) -> TraceId {
53        self.trace_id
54    }
55}
56
57impl<M> Envelope<M>
58where
59    M: Message,
60{
61    pub fn into_erased(self) -> Envelope<AnyMessage> {
62        let Self {
63            header: info,
64            message,
65        } = self;
66        let message = AnyMessage::new(message);
67        Envelope {
68            header: info,
69            message,
70        }
71    }
72}
73
74impl<M> Envelope<M> {
75    pub fn new(header: EnvelopeHeader, message: M) -> Self {
76        Self { header, message }
77    }
78
79    pub fn header(&self) -> &EnvelopeHeader {
80        &self.header
81    }
82}
83
84impl Envelope<AnyMessage> {
85    pub fn cast<M>(self) -> Result<Envelope<M>, Self>
86    where
87        M: Message,
88    {
89        let Self {
90            header: info,
91            message,
92        } = self;
93        match message.cast() {
94            Ok(message) => {
95                Ok(Envelope {
96                    header: info,
97                    message,
98                })
99            },
100            Err(message) => {
101                Err(Self {
102                    header: info,
103                    message,
104                })
105            },
106        }
107    }
108
109    pub fn peek<M>(&self) -> Option<&M>
110    where
111        M: Message,
112    {
113        self.message.peek()
114    }
115
116    pub fn is<M>(&self) -> bool
117    where
118        M: Message,
119    {
120        self.message.is::<M>()
121    }
122
123    pub fn tid(&self) -> TypeId {
124        self.message.tid()
125    }
126
127    pub fn message_name(&self) -> &str {
128        self.message.type_name()
129    }
130}
131impl<M> Envelope<M> {
132    pub fn take(self) -> (M, Envelope<()>) {
133        (
134            self.message,
135            Envelope {
136                header:  self.header,
137                message: (),
138            },
139        )
140    }
141}
142
143impl<M> fmt::Debug for Envelope<M>
144where
145    M: fmt::Debug,
146{
147    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
148        f.debug_struct("Envelope")
149            .field("info", &self.header)
150            .field("message", &self.message)
151            .finish()
152    }
153}