Skip to main content

black_box/actors/
mod.rs

1use std::{future::Future, sync::atomic::AtomicU64};
2
3use async_channel::{Sender, WeakSender};
4
5use crate::{
6    executor::Context,
7    message::{Envelope, Message},
8};
9
10static ADDRESS_COUNTER: AtomicU64 = AtomicU64::new(0);
11
12/// Abstraction for message handling
13///
14/// Actors are spawned in an [`Executor`](crate::Executor), and run in the executor's event loop.
15/// When new messages are received by the executor, the appropriate handler [`Handler`] is invoked,
16/// allowing the actor to take any necessary action, including mutating it's internal state.
17pub trait Actor: Sized {
18    fn starting(&mut self, _ctx: &Context<Self>) -> impl Future<Output = ()> + Send {
19        std::future::ready(())
20    }
21
22    fn stopping(&mut self, _ctx: &Context<Self>) -> impl Future<Output = ()> + Send {
23        std::future::ready(())
24    }
25}
26
27/// The implementation for how an actor handles a particular message
28///
29/// An [`Actor`], can implement the Handler trait any number of time, with a unique message type for
30/// each implementation.
31pub trait Handler<M>
32where
33    Self: Actor,
34    M: Message,
35{
36    /// Asynchronously act on the message, with mutable access to self
37    fn handle(&mut self, msg: M, ctx: &Context<Self>) -> impl Future<Output = ()> + Send;
38}
39
40/// A cloneable address which can be used to send messages to the associated [`Actor`]
41///
42/// This is a cheaply cloneable type and can be used to send an actor address to other actors, other
43/// runtimes, etc.
44#[derive(Debug)]
45pub struct Address<A> {
46    id: u64,
47    sender: Sender<Envelope<A>>,
48}
49
50impl<A> PartialEq for Address<A> {
51    fn eq(&self, other: &Self) -> bool {
52        self.id == other.id
53    }
54}
55
56// SAFETY: The address is a queue abstraction for *messages* sent to the actor. Even if the actor
57// itself is not Send/Sync the address should be. The Message trait itself already requires that
58// the implementer be Send
59unsafe impl<A> std::marker::Send for Address<A> {}
60// SAFETY: As above but for Sync
61unsafe impl<A> std::marker::Sync for Address<A> {}
62impl<A> std::marker::Unpin for Address<A> {}
63
64impl<A> Clone for Address<A> {
65    fn clone(&self) -> Self {
66        Self {
67            sender: self.sender.clone(),
68            id: self.id,
69        }
70    }
71}
72
73impl<A> Address<A> {
74    pub(crate) fn new(sender: Sender<Envelope<A>>) -> Self {
75        let id = ADDRESS_COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
76
77        Self { sender, id }
78    }
79
80    pub fn downgrade(&self) -> WeakAddress<A> {
81        let sender = self.sender.downgrade();
82        WeakAddress::new(self.id, sender)
83    }
84
85    /// # Warning
86    ///
87    /// For testing purposes only.
88    ///
89    /// This constructs a new address the other half of which is leaked. This can be useful for
90    /// constructing actors which have one or more other actors as dependencies, but you do not need
91    /// to test the functionality of a subset of those dependencies, you just need to satisfy the
92    /// type requirements.
93    #[cfg(feature = "mocking")]
94    pub fn new_leak(cap: usize) -> Self {
95        let (sender, receiver) = async_channel::bounded::<Envelope<A>>(cap);
96        Box::leak(Box::new(receiver));
97        Self::new(sender)
98    }
99}
100
101impl<A> Address<A>
102where
103    A: 'static + Actor + Send,
104{
105    /// Send the given message to the actor's receiver.
106    ///
107    /// If the receiver is currently full, it will await capacity to enqueue the message
108    pub async fn send<M>(&self, message: M)
109    where
110        A: Handler<M>,
111        M: Message,
112    {
113        let env = Envelope::pack(message);
114
115        // TODO: Decide what to do here
116        let _ = self.sender.send(env).await;
117    }
118
119    pub fn try_send<M>(&self, message: M)
120    where
121        A: Handler<M>,
122        M: Message,
123    {
124        let env = Envelope::pack(message);
125
126        // TODO: Decide what to do here
127        let _ = self.sender.try_send(env);
128    }
129}
130
131/// A cloneable address which can be used to send messages to the associated [`Actor`]
132///
133/// This is a cheaply cloneable type and can be used to send an actor address to other actors, other
134/// runtimes, etc.
135#[derive(Debug)]
136pub struct WeakAddress<A> {
137    id: u64,
138    sender: WeakSender<Envelope<A>>,
139}
140
141impl<A> Clone for WeakAddress<A> {
142    fn clone(&self) -> Self {
143        Self {
144            id: self.id,
145            sender: self.sender.clone(),
146        }
147    }
148}
149
150impl<A> WeakAddress<A> {
151    pub(crate) fn new(id: u64, sender: WeakSender<Envelope<A>>) -> Self {
152        Self { id, sender }
153    }
154
155    pub fn upgrade(&self) -> Option<Address<A>> {
156        let sender = self.sender.upgrade()?;
157        Some(Address::new(sender))
158    }
159}
160
161// SAFETY: The address is a queue abstraction for *messages* sent to the actor. Even if the actor
162// itself is not Send/Sync the address should be. The Message trait itself already requires that
163// the implementer be Send
164unsafe impl<A> std::marker::Send for WeakAddress<A> {}
165// SAFETY: As above but for Sync
166unsafe impl<A> std::marker::Sync for WeakAddress<A> {}
167impl<A> std::marker::Unpin for WeakAddress<A> {}
168
169#[cfg(test)]
170mod test {
171    use std::sync::Mutex;
172
173    use crate::Executor;
174
175    use super::*;
176
177    struct Msg;
178    struct Act;
179    impl Actor for Act {}
180    impl Handler<Msg> for Act {
181        async fn handle(&mut self, _msg: Msg, _ctx: &Context<Self>) {}
182    }
183
184    #[test]
185    fn partial_eq_on_clone() {
186        let (_executor, address) = Executor::new(Act);
187        let same_address = address.clone();
188        assert!(address.eq(&same_address));
189    }
190
191    #[test]
192    fn partial_eq_on_different_addrs() {
193        let (_executor_1, address_1) = Executor::new(Act);
194        let (_executor_2, address_2) = Executor::new(Act);
195        assert!(address_1.ne(&address_2));
196    }
197
198    #[test]
199    fn partial_eq_on_a_thousand_different_addrs() {
200        let mut addrs: Vec<Address<Act>> = Vec::new();
201        for _ in 0..1_000 {
202            let (_executor_1, address) = Executor::new(Act);
203            for addr in addrs.iter() {
204                assert!(addr.ne(&address));
205            }
206            addrs.push(address);
207        }
208    }
209
210    #[test]
211    fn partial_eq_on_a_thousand_different_threads() {
212        const NUM_THREAD: usize = 1_000;
213        let addrs = Mutex::new(Vec::<Address<Act>>::new());
214        std::thread::scope(|s| {
215            for _ in 0..NUM_THREAD {
216                s.spawn(|| {
217                    let (_executor_1, address) = Executor::new(Act);
218                    addrs.lock().unwrap().push(address);
219                });
220            }
221        });
222        let addrs = std::mem::take(&mut *addrs.lock().unwrap());
223        assert_eq!(addrs.len(), NUM_THREAD);
224        for i in 0..NUM_THREAD {
225            for j in (i + 1)..NUM_THREAD {
226                assert!(addrs[i].ne(&addrs[j]))
227            }
228        }
229    }
230}