Skip to main content

elfo_core/
envelope.rs

1use std::{alloc, fmt, mem, ptr, ptr::NonNull};
2
3use elfo_utils::time::Instant;
4
5use crate::{
6    mailbox,
7    message::{AnyMessageRef, Message, MessageRepr, MessageTypeId, Request},
8    request_table::{RequestId, ResponseToken},
9    tracing::TraceId,
10    Addr,
11};
12
13/// An envelope is a wrapper around message with additional metadata,
14/// involved in message passing between actors.
15///
16/// Envelopes aren't created directly in code, but are produced internally
17/// by [`Context`]'s methods.
18///
19/// Converting an envelope to a message is usually done by calling the [`msg!`]
20/// macro, which supports both owned and borrowed usages.
21///
22/// [`Context`]: crate::Context
23/// [`msg!`]: crate::msg
24pub struct Envelope(NonNull<EnvelopeHeader>);
25
26// Messages aren't required to be `Sync`.
27assert_not_impl_any!(Envelope: Sync);
28assert_impl_all!(Envelope: Send);
29assert_eq_size!(Envelope, usize);
30
31// TODO: the current size (on x86-64) is 64 bytes, but it can be reduced.
32// And... it should be reduced once `TraceId` is extended to 16 bytes.
33pub(crate) struct EnvelopeHeader {
34    /// See `mailbox.rs` for more details.
35    pub(crate) link: mailbox::Link,
36    created_time: Instant, // Now used also as a sent time.
37    trace_id: TraceId,
38    kind: MessageKind,
39    /// Offset from the beginning of the envelope to the `MessageRepr`.
40    message_offset: u32,
41}
42
43assert_impl_all!(EnvelopeHeader: Send);
44
45// SAFETY: `Envelope` can point to `M: Message` only, which is `Send`.
46// `EnvelopeHeader` is checked statically above to be `Send`.
47unsafe impl Send for Envelope {}
48
49// Reexported in `elfo::_priv`.
50pub enum MessageKind {
51    Regular { sender: Addr },
52    RequestAny(ResponseToken),
53    RequestAll(ResponseToken),
54    Response { sender: Addr, request_id: RequestId },
55}
56
57impl MessageKind {
58    #[inline]
59    pub fn regular(sender: Addr) -> Self {
60        Self::Regular { sender }
61    }
62}
63
64// Called if the envelope hasn't been unpacked at all.
65// For instance, if an actor dies with a non-empty mailbox.
66// Usually, an envelope goes to `std::mem:forget()` in `unpack_*` methods.
67impl Drop for Envelope {
68    fn drop(&mut self) {
69        let message = self.message();
70        let message_layout = message._repr_layout();
71        let (layout, message_offset) = envelope_repr_layout(message_layout);
72        debug_assert_eq!(message_offset, self.header().message_offset);
73
74        // Drop the message.
75        // SAFETY: the message is not accessed anymore below.
76        unsafe { message.drop_in_place() };
77
78        // Drop the header.
79        // SAFETY: the header is not accessed anymore below.
80        unsafe { ptr::drop_in_place(self.0.as_ptr()) }
81
82        // Deallocate the whole envelope.
83        // SAFETY: memory was allocated by `alloc::alloc` with the same layout.
84        unsafe { alloc::dealloc(self.0.as_ptr().cast(), layout) };
85    }
86}
87
88impl Envelope {
89    // This is private API. Do not use it.
90    #[doc(hidden)]
91    #[inline]
92    pub fn new<M: Message>(message: M, kind: MessageKind) -> Self {
93        Self::with_trace_id(message, kind, crate::scope::trace_id())
94    }
95
96    // This is private API. Do not use it.
97    #[doc(hidden)]
98    #[inline]
99    pub fn with_trace_id<M: Message>(message: M, kind: MessageKind, trace_id: TraceId) -> Self {
100        let message_layout = message._repr_layout();
101        let (layout, message_offset) = envelope_repr_layout(message_layout);
102
103        let header = EnvelopeHeader {
104            link: <_>::default(),
105            created_time: Instant::now(),
106            trace_id,
107            kind,
108            message_offset,
109        };
110
111        // SAFETY: `layout` is correct and non-zero.
112        let ptr = unsafe { alloc::alloc(layout) };
113
114        let Some(ptr) = NonNull::new(ptr) else {
115            alloc::handle_alloc_error(layout);
116        };
117
118        // SAFETY: `ptr` is valid to write the header.
119        unsafe { ptr::write(ptr.cast().as_ptr(), header) };
120
121        let this = Self(ptr.cast());
122        let message_ptr = this.message_repr_ptr();
123
124        // SAFETY: `message_ptr` is valid to write the message.
125        unsafe { message._write(message_ptr) };
126
127        this
128    }
129
130    pub(crate) fn stub() -> Self {
131        Self::with_trace_id(
132            crate::messages::Ping,
133            MessageKind::regular(Addr::NULL),
134            TraceId::try_from(1).unwrap(),
135        )
136    }
137
138    fn header(&self) -> &EnvelopeHeader {
139        // SAFETY: `self.0` is properly initialized.
140        unsafe { self.0.as_ref() }
141    }
142
143    #[inline]
144    pub fn trace_id(&self) -> TraceId {
145        self.header().trace_id
146    }
147
148    /// Returns a reference to the untyped message inside the envelope.
149    #[inline]
150    pub fn message(&self) -> AnyMessageRef<'_> {
151        let message_repr = self.message_repr_ptr();
152
153        // SAFETY: `message_repr` is valid pointer for read.
154        unsafe { AnyMessageRef::new(message_repr) }
155    }
156
157    /// Part of private API. Do not use it.
158    #[doc(hidden)]
159    pub fn message_kind(&self) -> &MessageKind {
160        &self.header().kind
161    }
162
163    #[doc(hidden)]
164    #[inline]
165    pub fn created_time(&self) -> Instant {
166        self.header().created_time
167    }
168
169    #[inline]
170    pub fn sender(&self) -> Addr {
171        match self.message_kind() {
172            MessageKind::Regular { sender } => *sender,
173            MessageKind::RequestAny(token) => token.sender(),
174            MessageKind::RequestAll(token) => token.sender(),
175            MessageKind::Response { sender, .. } => *sender,
176        }
177    }
178
179    #[inline]
180    pub fn request_id(&self) -> Option<RequestId> {
181        match self.message_kind() {
182            MessageKind::Regular { .. } => None,
183            MessageKind::RequestAny(token) => Some(token.request_id()),
184            MessageKind::RequestAll(token) => Some(token.request_id()),
185            MessageKind::Response { request_id, .. } => Some(*request_id),
186        }
187    }
188
189    #[doc(hidden)]
190    #[inline]
191    pub fn type_id(&self) -> MessageTypeId {
192        self.message().type_id()
193    }
194
195    #[inline]
196    pub fn is<M: Message>(&self) -> bool {
197        self.message().is::<M>()
198    }
199
200    #[doc(hidden)]
201    pub fn duplicate(&self) -> Self {
202        let header = self.header();
203        let message = self.message();
204        let message_layout = message._repr_layout();
205        let (layout, message_offset) = envelope_repr_layout(message_layout);
206        debug_assert_eq!(message_offset, header.message_offset);
207
208        let out_header = EnvelopeHeader {
209            link: <_>::default(),
210            created_time: header.created_time,
211            trace_id: header.trace_id,
212            kind: match &header.kind {
213                MessageKind::Regular { sender } => MessageKind::Regular { sender: *sender },
214                MessageKind::RequestAny(token) => MessageKind::RequestAny(token.duplicate()),
215                MessageKind::RequestAll(token) => MessageKind::RequestAll(token.duplicate()),
216                MessageKind::Response { sender, request_id } => MessageKind::Response {
217                    sender: *sender,
218                    request_id: *request_id,
219                },
220            },
221            message_offset,
222        };
223
224        // SAFETY: `layout` is correct and non-zero.
225        let out_ptr = unsafe { alloc::alloc(layout) };
226
227        let Some(out_ptr) = NonNull::new(out_ptr) else {
228            alloc::handle_alloc_error(layout);
229        };
230
231        // SAFETY: `out_ptr` is valid to write the header.
232        unsafe { ptr::write(out_ptr.cast().as_ptr(), out_header) };
233
234        let out = Self(out_ptr.cast());
235        let out_message_ptr = out.message_repr_ptr();
236
237        // SAFETY: `out_message_ptr` is valid and has the same layout as `message`.
238        unsafe { message.clone_into(out_message_ptr) };
239
240        out
241    }
242
243    // TODO: remove the method
244    pub(crate) fn set_message<M: Message>(&mut self, message: M) {
245        assert!(self.is::<M>() && M::_type_id() != crate::message::AnyMessage::_type_id());
246
247        let repr_ptr = self.message_repr_ptr().cast::<MessageRepr<M>>().as_ptr();
248
249        // SAFETY: `repr_ptr` is valid to write the message.
250        unsafe { ptr::replace(repr_ptr, MessageRepr::new(message)) };
251    }
252
253    fn message_repr_ptr(&self) -> NonNull<MessageRepr> {
254        let message_offset = self.header().message_offset;
255
256        // SAFETY: `message_offset` refers to the same allocation object.
257        let ptr = unsafe { self.0.as_ptr().byte_add(message_offset as usize) };
258
259        // SAFETY: `envelope_repr_layout()` guarantees that `ptr` is valid.
260        unsafe { NonNull::new_unchecked(ptr.cast()) }
261    }
262
263    #[doc(hidden)]
264    #[inline]
265    pub fn unpack<M: Message>(self) -> Option<(M, MessageKind)> {
266        self.is::<M>()
267            // SAFETY: `self` contains a message of type `M`, checked above.
268            .then(|| unsafe { self.unpack_unchecked() })
269    }
270
271    /// # Safety
272    ///
273    /// The caller must ensure that the message is of the correct type.
274    unsafe fn unpack_unchecked<M: Message>(self) -> (M, MessageKind) {
275        let message_layout = self.message()._repr_layout();
276        let (layout, message_offset) = envelope_repr_layout(message_layout);
277        debug_assert_eq!(message_offset, self.header().message_offset);
278
279        let message = M::_read(self.message_repr_ptr());
280        let kind = ptr::read(&self.0.as_ref().kind);
281
282        alloc::dealloc(self.0.as_ptr().cast(), layout);
283        mem::forget(self);
284        (message, kind)
285    }
286
287    pub(crate) fn into_header_ptr(self) -> NonNull<EnvelopeHeader> {
288        let ptr = self.0;
289        mem::forget(self);
290        ptr
291    }
292
293    pub(crate) unsafe fn from_header_ptr(ptr: NonNull<EnvelopeHeader>) -> Self {
294        Self(ptr)
295    }
296}
297
298fn envelope_repr_layout(message_layout: alloc::Layout) -> (alloc::Layout, u32) {
299    let (layout, message_offset) = alloc::Layout::new::<EnvelopeHeader>()
300        .extend(message_layout)
301        .expect("impossible envelope layout");
302
303    let message_offset =
304        u32::try_from(message_offset).expect("message requires too large alignment");
305
306    (layout.pad_to_align(), message_offset)
307}
308
309impl fmt::Debug for MessageKind {
310    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
311        match self {
312            MessageKind::Regular { sender: _ } => f.debug_struct("Regular").finish(),
313            MessageKind::RequestAny(token) => f
314                .debug_tuple("RequestAny")
315                .field(&token.request_id())
316                .finish(),
317            MessageKind::RequestAll(token) => f
318                .debug_tuple("RequestAll")
319                .field(&token.request_id())
320                .finish(),
321            MessageKind::Response {
322                sender: _,
323                request_id,
324            } => f.debug_tuple("Response").field(request_id).finish(),
325        }
326    }
327}
328
329impl fmt::Debug for Envelope {
330    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
331        f.debug_struct("Envelope")
332            .field("trace_id", &self.trace_id())
333            .field("sender", &self.sender())
334            .field("kind", &self.message_kind())
335            .field("message", &self.message())
336            .finish()
337    }
338}
339
340// Extra traits to support both owned and borrowed usages of `msg!(..)`.
341// Both traits are private and reexported in `elfo::_priv` only
342
343#[doc(hidden)]
344pub trait EnvelopeOwned {
345    /// # Safety
346    ///
347    /// The caller must ensure that the message is of the correct type.
348    unsafe fn unpack_regular_unchecked<M: Message>(self) -> M;
349
350    /// # Safety
351    ///
352    /// The caller must ensure that the request is of the correct type.
353    unsafe fn unpack_request_unchecked<R: Request>(self) -> (R, ResponseToken<R>);
354}
355
356#[doc(hidden)]
357pub trait EnvelopeBorrowed {
358    /// # Safety
359    ///
360    /// The caller must ensure that the message is of the correct type.
361    unsafe fn unpack_regular_unchecked<M: Message>(&self) -> &M;
362}
363
364impl EnvelopeOwned for Envelope {
365    #[inline]
366    unsafe fn unpack_regular_unchecked<M: Message>(self) -> M {
367        let (message, kind) = self.unpack_unchecked();
368
369        #[cfg(feature = "network")]
370        if let MessageKind::RequestAny(token) | MessageKind::RequestAll(token) = kind {
371            // The sender thought this is a request, but for the current node it isn't.
372            // Mark the token as received to return `RequestError::Ignored` to the sender.
373            let _ = token.into_received::<()>();
374        }
375
376        // This check is debug-only because it's already checked in `msg!` in
377        // compile-time, which should be the only way to call this consuming method.
378        #[cfg(not(feature = "network"))]
379        debug_assert!(!matches!(
380            kind,
381            MessageKind::RequestAny(_) | MessageKind::RequestAll(_)
382        ));
383
384        message
385    }
386
387    #[inline]
388    unsafe fn unpack_request_unchecked<R: Request>(self) -> (R, ResponseToken<R>) {
389        let (message, kind) = self.unpack_unchecked();
390
391        let token = match kind {
392            MessageKind::RequestAny(token) | MessageKind::RequestAll(token) => token,
393            // A request sent by using `ctx.send()` ("fire and forget").
394            // Also it's useful for the protocol evolution between remote nodes.
395            _ => ResponseToken::forgotten(),
396        };
397
398        (message, token.into_received())
399    }
400}
401
402impl EnvelopeBorrowed for Envelope {
403    #[inline]
404    unsafe fn unpack_regular_unchecked<M: Message>(&self) -> &M {
405        self.message().downcast_ref_unchecked()
406    }
407}
408
409#[cfg(test)]
410mod tests_miri {
411    use std::sync::Arc;
412
413    use elfo_utils::time;
414
415    use super::*;
416    use crate::{message, AnyMessage};
417
418    fn make_regular_envelope(message: impl Message) -> Envelope {
419        // Miri doesn't support asm, so mock the time.
420        // TODO: support miri in `elfo_utils::time`.
421        time::with_instant_mock(|_mock| {
422            let addr = Addr::NULL;
423            let trace_id = TraceId::try_from(1).unwrap();
424            Envelope::with_trace_id(message, MessageKind::regular(addr), trace_id)
425        })
426    }
427
428    #[message]
429    #[derive(PartialEq)]
430    struct P8(u64);
431
432    #[test]
433    fn basic_ops() {
434        let message = P8(42);
435        let envelope = make_regular_envelope(message.clone());
436
437        assert_eq!(envelope.trace_id(), TraceId::try_from(1).unwrap());
438        assert_eq!(envelope.sender(), Addr::NULL);
439
440        assert_eq!(envelope.type_id(), P8::_type_id());
441        assert!(envelope.is::<P8>());
442        assert!(envelope.is::<AnyMessage>());
443        assert!(!envelope.is::<crate::messages::Ping>());
444
445        let (actual_message, _) = envelope.unpack::<P8>().unwrap();
446        assert_eq!(actual_message, message);
447
448        // Unpack to `AnyMessage`
449        let envelope = make_regular_envelope(message.clone());
450
451        let (actual_message, _) = envelope.unpack::<AnyMessage>().unwrap();
452        assert_eq!(format!("{actual_message:?}"), format!("{message:?}"));
453    }
454
455    #[test]
456    fn set_message() {
457        let message = P8(42);
458        let mut envelope = make_regular_envelope(message.clone());
459        envelope.set_message(P8(43));
460
461        let (actual_message, _) = envelope.unpack::<P8>().unwrap();
462        assert_eq!(actual_message, P8(43));
463    }
464
465    #[test]
466    fn duplicate() {
467        #[message]
468        #[derive(PartialEq)]
469        struct Sample {
470            value: u128,
471            counter: Arc<()>,
472        }
473
474        impl Sample {
475            fn new(value: u128) -> (Arc<()>, Self) {
476                let this = Self {
477                    value,
478                    counter: Arc::new(()),
479                };
480
481                (this.counter.clone(), this)
482            }
483        }
484
485        let (counter, message) = Sample::new(42);
486        let envelope = make_regular_envelope(message);
487
488        assert_eq!(Arc::strong_count(&counter), 2);
489        let envelope2 = envelope.duplicate();
490        assert_eq!(Arc::strong_count(&counter), 3);
491        assert!(envelope2.is::<Sample>());
492        let envelope3 = envelope2.duplicate();
493        assert_eq!(Arc::strong_count(&counter), 4);
494        assert!(envelope3.is::<Sample>());
495
496        drop(envelope2);
497        assert_eq!(Arc::strong_count(&counter), 3);
498
499        drop(envelope3);
500        assert_eq!(Arc::strong_count(&counter), 2);
501
502        let envelope4 = envelope.duplicate();
503        assert_eq!(Arc::strong_count(&counter), 3);
504        assert!(envelope4.is::<Sample>());
505
506        drop(envelope);
507        assert_eq!(Arc::strong_count(&counter), 2);
508
509        drop(envelope4);
510        assert_eq!(Arc::strong_count(&counter), 1);
511    }
512}