executor_core/mailbox.rs
1//! Mailbox-based message passing for safe cross-thread communication.
2//!
3//! This module provides a [`Mailbox`] type that enables asynchronous message passing
4//! to a value owned by a background task. The mailbox ensures thread-safe access
5//! to the contained value by serializing all operations through a message queue.
6//!
7//! # Overview
8//!
9//! The mailbox pattern is useful when you need to:
10//! - Share mutable state across threads safely
11//! - Process operations on a value sequentially
12//! - Avoid blocking when sending updates
13//! - Make async calls that return values
14
15use crate::LocalExecutor;
16use alloc::boxed::Box;
17use async_channel::{Sender, unbounded};
18
19type Job<T> = Box<dyn Send + FnOnce(&T)>;
20
21/// A mailbox for sending messages to a value owned by a background task.
22///
23/// `Mailbox<T>` provides thread-safe access to a value of type `T` by serializing
24/// all operations through an async message queue. The value is owned by a background
25/// task that processes incoming messages sequentially.
26///
27/// # Type Parameters
28///
29/// * `T` - The type of value contained in the mailbox. Must be `'static` to ensure
30/// the background task can own it safely.
31///
32/// # Thread Safety
33///
34/// The mailbox enables other threads to safely access a value living on another thread
35/// without explicit locks. The mailbox handle itself is always `Send` and `Sync`,
36/// allowing it to be shared across threads. All operations on the value are serialized
37/// through an async message queue, providing lock-free concurrent access. When `T` is
38/// not `Send`, the value remains pinned to its original thread but can still be safely
39/// accessed from other threads through the mailbox.
40#[derive(Debug)]
41pub struct Mailbox<T: 'static> {
42 sender: Sender<Job<T>>,
43}
44
45impl<T: 'static> Mailbox<T> {
46 /// Creates a new mailbox with the given value on the specified executor.
47 ///
48 /// The value will be moved to a background task that processes incoming
49 /// messages. The executor is consumed to spawn the background task.
50 ///
51 /// # Parameters
52 ///
53 /// * `executor` - The executor to spawn the background task on
54 /// * `value` - The value to be owned by the background task
55 #[allow(clippy::needless_pass_by_value)]
56 pub fn new<E: LocalExecutor>(executor: E, value: T) -> Self {
57 let (sender, receiver) = unbounded::<Box<dyn Send + FnOnce(&T)>>();
58
59 let _fut = executor.spawn_local(async move {
60 while let Ok(update) = receiver.recv().await {
61 update(&value);
62 }
63 });
64 Self { sender }
65 }
66
67 /// Sends a non-blocking update to the mailbox value.
68 ///
69 /// The provided closure will be called with a reference to the value
70 /// in the background task. This operation is non-blocking and will
71 /// not wait for the update to be processed.
72 ///
73 /// If the background task has been dropped or the channel is full,
74 /// the update may be silently discarded.
75 ///
76 /// # Parameters
77 ///
78 /// * `update` - A closure that will be called with a reference to the value
79 pub fn handle(&self, update: impl FnOnce(&T) + Send + 'static) {
80 let _ = self.sender.try_send(Box::new(update));
81 }
82
83 /// Makes an asynchronous call to the mailbox value and returns the result.
84 ///
85 /// The provided closure will be called with a reference to the value
86 /// in the background task, and the result will be returned to the caller.
87 /// This operation blocks until the call is processed and the result is available.
88 ///
89 /// # Parameters
90 ///
91 /// * `f` - A closure that will be called with a reference to the value and returns a result
92 ///
93 /// # Returns
94 ///
95 /// The result returned by the closure after it has been executed on the value.
96 ///
97 /// # Panics
98 ///
99 /// Panics if the background task has been dropped or the channel is closed,
100 /// making it impossible to receive the result.
101 ///
102 pub async fn call<R>(&self, f: impl FnOnce(&T) -> R + Send + 'static) -> R
103 where
104 R: Send + 'static,
105 {
106 let (s, r) = async_channel::bounded(1);
107 self.handle(move |v| {
108 let res = f(v);
109 let _ = s.try_send(res);
110 });
111 r.recv().await.expect("Mailbox call failed")
112 }
113}