1use std::{alloc, fmt, mem, ptr, ptr::NonNull};
2
3use elfo_utils::time::Instant;
4
5use crate::{
6 mailbox,
7 message::{AnyMessageRef, Message, MessageRepr, MessageTypeId, Request},
8 request_table::{RequestId, ResponseToken},
9 tracing::TraceId,
10 Addr,
11};
12
13pub struct Envelope(NonNull<EnvelopeHeader>);
25
26assert_not_impl_any!(Envelope: Sync);
28assert_impl_all!(Envelope: Send);
29assert_eq_size!(Envelope, usize);
30
31pub(crate) struct EnvelopeHeader {
34 pub(crate) link: mailbox::Link,
36 created_time: Instant, trace_id: TraceId,
38 kind: MessageKind,
39 message_offset: u32,
41}
42
43assert_impl_all!(EnvelopeHeader: Send);
44
45unsafe impl Send for Envelope {}
48
49pub enum MessageKind {
51 Regular { sender: Addr },
52 RequestAny(ResponseToken),
53 RequestAll(ResponseToken),
54 Response { sender: Addr, request_id: RequestId },
55}
56
57impl MessageKind {
58 #[inline]
59 pub fn regular(sender: Addr) -> Self {
60 Self::Regular { sender }
61 }
62}
63
64impl Drop for Envelope {
68 fn drop(&mut self) {
69 let message = self.message();
70 let message_layout = message._repr_layout();
71 let (layout, message_offset) = envelope_repr_layout(message_layout);
72 debug_assert_eq!(message_offset, self.header().message_offset);
73
74 unsafe { message.drop_in_place() };
77
78 unsafe { ptr::drop_in_place(self.0.as_ptr()) }
81
82 unsafe { alloc::dealloc(self.0.as_ptr().cast(), layout) };
85 }
86}
87
88impl Envelope {
89 #[doc(hidden)]
91 #[inline]
92 pub fn new<M: Message>(message: M, kind: MessageKind) -> Self {
93 Self::with_trace_id(message, kind, crate::scope::trace_id())
94 }
95
96 #[doc(hidden)]
98 #[inline]
99 pub fn with_trace_id<M: Message>(message: M, kind: MessageKind, trace_id: TraceId) -> Self {
100 let message_layout = message._repr_layout();
101 let (layout, message_offset) = envelope_repr_layout(message_layout);
102
103 let header = EnvelopeHeader {
104 link: <_>::default(),
105 created_time: Instant::now(),
106 trace_id,
107 kind,
108 message_offset,
109 };
110
111 let ptr = unsafe { alloc::alloc(layout) };
113
114 let Some(ptr) = NonNull::new(ptr) else {
115 alloc::handle_alloc_error(layout);
116 };
117
118 unsafe { ptr::write(ptr.cast().as_ptr(), header) };
120
121 let this = Self(ptr.cast());
122 let message_ptr = this.message_repr_ptr();
123
124 unsafe { message._write(message_ptr) };
126
127 this
128 }
129
130 pub(crate) fn stub() -> Self {
131 Self::with_trace_id(
132 crate::messages::Ping,
133 MessageKind::regular(Addr::NULL),
134 TraceId::try_from(1).unwrap(),
135 )
136 }
137
138 fn header(&self) -> &EnvelopeHeader {
139 unsafe { self.0.as_ref() }
141 }
142
143 #[inline]
144 pub fn trace_id(&self) -> TraceId {
145 self.header().trace_id
146 }
147
148 #[inline]
150 pub fn message(&self) -> AnyMessageRef<'_> {
151 let message_repr = self.message_repr_ptr();
152
153 unsafe { AnyMessageRef::new(message_repr) }
155 }
156
157 #[doc(hidden)]
159 pub fn message_kind(&self) -> &MessageKind {
160 &self.header().kind
161 }
162
163 #[doc(hidden)]
164 #[inline]
165 pub fn created_time(&self) -> Instant {
166 self.header().created_time
167 }
168
169 #[inline]
170 pub fn sender(&self) -> Addr {
171 match self.message_kind() {
172 MessageKind::Regular { sender } => *sender,
173 MessageKind::RequestAny(token) => token.sender(),
174 MessageKind::RequestAll(token) => token.sender(),
175 MessageKind::Response { sender, .. } => *sender,
176 }
177 }
178
179 #[inline]
180 pub fn request_id(&self) -> Option<RequestId> {
181 match self.message_kind() {
182 MessageKind::Regular { .. } => None,
183 MessageKind::RequestAny(token) => Some(token.request_id()),
184 MessageKind::RequestAll(token) => Some(token.request_id()),
185 MessageKind::Response { request_id, .. } => Some(*request_id),
186 }
187 }
188
189 #[doc(hidden)]
190 #[inline]
191 pub fn type_id(&self) -> MessageTypeId {
192 self.message().type_id()
193 }
194
195 #[inline]
196 pub fn is<M: Message>(&self) -> bool {
197 self.message().is::<M>()
198 }
199
200 #[doc(hidden)]
201 pub fn duplicate(&self) -> Self {
202 let header = self.header();
203 let message = self.message();
204 let message_layout = message._repr_layout();
205 let (layout, message_offset) = envelope_repr_layout(message_layout);
206 debug_assert_eq!(message_offset, header.message_offset);
207
208 let out_header = EnvelopeHeader {
209 link: <_>::default(),
210 created_time: header.created_time,
211 trace_id: header.trace_id,
212 kind: match &header.kind {
213 MessageKind::Regular { sender } => MessageKind::Regular { sender: *sender },
214 MessageKind::RequestAny(token) => MessageKind::RequestAny(token.duplicate()),
215 MessageKind::RequestAll(token) => MessageKind::RequestAll(token.duplicate()),
216 MessageKind::Response { sender, request_id } => MessageKind::Response {
217 sender: *sender,
218 request_id: *request_id,
219 },
220 },
221 message_offset,
222 };
223
224 let out_ptr = unsafe { alloc::alloc(layout) };
226
227 let Some(out_ptr) = NonNull::new(out_ptr) else {
228 alloc::handle_alloc_error(layout);
229 };
230
231 unsafe { ptr::write(out_ptr.cast().as_ptr(), out_header) };
233
234 let out = Self(out_ptr.cast());
235 let out_message_ptr = out.message_repr_ptr();
236
237 unsafe { message.clone_into(out_message_ptr) };
239
240 out
241 }
242
243 pub(crate) fn set_message<M: Message>(&mut self, message: M) {
245 assert!(self.is::<M>() && M::_type_id() != crate::message::AnyMessage::_type_id());
246
247 let repr_ptr = self.message_repr_ptr().cast::<MessageRepr<M>>().as_ptr();
248
249 unsafe { ptr::replace(repr_ptr, MessageRepr::new(message)) };
251 }
252
253 fn message_repr_ptr(&self) -> NonNull<MessageRepr> {
254 let message_offset = self.header().message_offset;
255
256 let ptr = unsafe { self.0.as_ptr().byte_add(message_offset as usize) };
258
259 unsafe { NonNull::new_unchecked(ptr.cast()) }
261 }
262
263 #[doc(hidden)]
264 #[inline]
265 pub fn unpack<M: Message>(self) -> Option<(M, MessageKind)> {
266 self.is::<M>()
267 .then(|| unsafe { self.unpack_unchecked() })
269 }
270
271 unsafe fn unpack_unchecked<M: Message>(self) -> (M, MessageKind) {
275 let message_layout = self.message()._repr_layout();
276 let (layout, message_offset) = envelope_repr_layout(message_layout);
277 debug_assert_eq!(message_offset, self.header().message_offset);
278
279 let message = M::_read(self.message_repr_ptr());
280 let kind = ptr::read(&self.0.as_ref().kind);
281
282 alloc::dealloc(self.0.as_ptr().cast(), layout);
283 mem::forget(self);
284 (message, kind)
285 }
286
287 pub(crate) fn into_header_ptr(self) -> NonNull<EnvelopeHeader> {
288 let ptr = self.0;
289 mem::forget(self);
290 ptr
291 }
292
293 pub(crate) unsafe fn from_header_ptr(ptr: NonNull<EnvelopeHeader>) -> Self {
294 Self(ptr)
295 }
296}
297
298fn envelope_repr_layout(message_layout: alloc::Layout) -> (alloc::Layout, u32) {
299 let (layout, message_offset) = alloc::Layout::new::<EnvelopeHeader>()
300 .extend(message_layout)
301 .expect("impossible envelope layout");
302
303 let message_offset =
304 u32::try_from(message_offset).expect("message requires too large alignment");
305
306 (layout.pad_to_align(), message_offset)
307}
308
309impl fmt::Debug for MessageKind {
310 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
311 match self {
312 MessageKind::Regular { sender: _ } => f.debug_struct("Regular").finish(),
313 MessageKind::RequestAny(token) => f
314 .debug_tuple("RequestAny")
315 .field(&token.request_id())
316 .finish(),
317 MessageKind::RequestAll(token) => f
318 .debug_tuple("RequestAll")
319 .field(&token.request_id())
320 .finish(),
321 MessageKind::Response {
322 sender: _,
323 request_id,
324 } => f.debug_tuple("Response").field(request_id).finish(),
325 }
326 }
327}
328
329impl fmt::Debug for Envelope {
330 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
331 f.debug_struct("Envelope")
332 .field("trace_id", &self.trace_id())
333 .field("sender", &self.sender())
334 .field("kind", &self.message_kind())
335 .field("message", &self.message())
336 .finish()
337 }
338}
339
340#[doc(hidden)]
344pub trait EnvelopeOwned {
345 unsafe fn unpack_regular_unchecked<M: Message>(self) -> M;
349
350 unsafe fn unpack_request_unchecked<R: Request>(self) -> (R, ResponseToken<R>);
354}
355
356#[doc(hidden)]
357pub trait EnvelopeBorrowed {
358 unsafe fn unpack_regular_unchecked<M: Message>(&self) -> &M;
362}
363
364impl EnvelopeOwned for Envelope {
365 #[inline]
366 unsafe fn unpack_regular_unchecked<M: Message>(self) -> M {
367 let (message, kind) = self.unpack_unchecked();
368
369 #[cfg(feature = "network")]
370 if let MessageKind::RequestAny(token) | MessageKind::RequestAll(token) = kind {
371 let _ = token.into_received::<()>();
374 }
375
376 #[cfg(not(feature = "network"))]
379 debug_assert!(!matches!(
380 kind,
381 MessageKind::RequestAny(_) | MessageKind::RequestAll(_)
382 ));
383
384 message
385 }
386
387 #[inline]
388 unsafe fn unpack_request_unchecked<R: Request>(self) -> (R, ResponseToken<R>) {
389 let (message, kind) = self.unpack_unchecked();
390
391 let token = match kind {
392 MessageKind::RequestAny(token) | MessageKind::RequestAll(token) => token,
393 _ => ResponseToken::forgotten(),
396 };
397
398 (message, token.into_received())
399 }
400}
401
402impl EnvelopeBorrowed for Envelope {
403 #[inline]
404 unsafe fn unpack_regular_unchecked<M: Message>(&self) -> &M {
405 self.message().downcast_ref_unchecked()
406 }
407}
408
409#[cfg(test)]
410mod tests_miri {
411 use std::sync::Arc;
412
413 use elfo_utils::time;
414
415 use super::*;
416 use crate::{message, AnyMessage};
417
418 fn make_regular_envelope(message: impl Message) -> Envelope {
419 time::with_instant_mock(|_mock| {
422 let addr = Addr::NULL;
423 let trace_id = TraceId::try_from(1).unwrap();
424 Envelope::with_trace_id(message, MessageKind::regular(addr), trace_id)
425 })
426 }
427
428 #[message]
429 #[derive(PartialEq)]
430 struct P8(u64);
431
432 #[test]
433 fn basic_ops() {
434 let message = P8(42);
435 let envelope = make_regular_envelope(message.clone());
436
437 assert_eq!(envelope.trace_id(), TraceId::try_from(1).unwrap());
438 assert_eq!(envelope.sender(), Addr::NULL);
439
440 assert_eq!(envelope.type_id(), P8::_type_id());
441 assert!(envelope.is::<P8>());
442 assert!(envelope.is::<AnyMessage>());
443 assert!(!envelope.is::<crate::messages::Ping>());
444
445 let (actual_message, _) = envelope.unpack::<P8>().unwrap();
446 assert_eq!(actual_message, message);
447
448 let envelope = make_regular_envelope(message.clone());
450
451 let (actual_message, _) = envelope.unpack::<AnyMessage>().unwrap();
452 assert_eq!(format!("{actual_message:?}"), format!("{message:?}"));
453 }
454
455 #[test]
456 fn set_message() {
457 let message = P8(42);
458 let mut envelope = make_regular_envelope(message.clone());
459 envelope.set_message(P8(43));
460
461 let (actual_message, _) = envelope.unpack::<P8>().unwrap();
462 assert_eq!(actual_message, P8(43));
463 }
464
465 #[test]
466 fn duplicate() {
467 #[message]
468 #[derive(PartialEq)]
469 struct Sample {
470 value: u128,
471 counter: Arc<()>,
472 }
473
474 impl Sample {
475 fn new(value: u128) -> (Arc<()>, Self) {
476 let this = Self {
477 value,
478 counter: Arc::new(()),
479 };
480
481 (this.counter.clone(), this)
482 }
483 }
484
485 let (counter, message) = Sample::new(42);
486 let envelope = make_regular_envelope(message);
487
488 assert_eq!(Arc::strong_count(&counter), 2);
489 let envelope2 = envelope.duplicate();
490 assert_eq!(Arc::strong_count(&counter), 3);
491 assert!(envelope2.is::<Sample>());
492 let envelope3 = envelope2.duplicate();
493 assert_eq!(Arc::strong_count(&counter), 4);
494 assert!(envelope3.is::<Sample>());
495
496 drop(envelope2);
497 assert_eq!(Arc::strong_count(&counter), 3);
498
499 drop(envelope3);
500 assert_eq!(Arc::strong_count(&counter), 2);
501
502 let envelope4 = envelope.duplicate();
503 assert_eq!(Arc::strong_count(&counter), 3);
504 assert!(envelope4.is::<Sample>());
505
506 drop(envelope);
507 assert_eq!(Arc::strong_count(&counter), 2);
508
509 drop(envelope4);
510 assert_eq!(Arc::strong_count(&counter), 1);
511 }
512}