1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
use std::{
    any,
    collections::HashMap,
    fmt,
    sync::{
        atomic::{AtomicU64, Ordering},
        Arc, Mutex, TryLockError,
    },
    time::Duration,
};

use dyn_clone::DynClone;
use futures::stream::AbortHandle;
use tokio::{
    sync::{mpsc, oneshot},
    task::JoinHandle,
};

use crate::{
    error::{ActorStopReason, SendError},
    message::{BoxReply, DynMessage, DynQuery, Message, Query},
};

static ACTOR_COUNTER: AtomicU64 = AtomicU64::new(0);
pub(crate) struct Ctx {
    pub(crate) id: u64,
    pub(crate) actor_ref: Box<dyn any::Any + Send>,
    pub(crate) signal_mailbox: Box<dyn SignalMailbox>,
    pub(crate) links: Links,
}
tokio::task_local! {
    pub(crate) static CURRENT_CTX: Ctx;
}

type Mailbox<A> = mpsc::UnboundedSender<Signal<A>>;
pub(crate) type Links = Arc<Mutex<HashMap<u64, Box<dyn SignalMailbox>>>>;

/// A reference to an actor for sending messages/queries and managing the actor.
pub struct ActorRef<A> {
    id: u64,
    mailbox: Mailbox<A>,
    abort_handle: AbortHandle,
    links: Links,
}

impl<A> ActorRef<A> {
    pub(crate) fn new(mailbox: Mailbox<A>, abort_handle: AbortHandle, links: Links) -> Self {
        ActorRef {
            id: ACTOR_COUNTER.fetch_add(1, Ordering::Relaxed),
            mailbox,
            abort_handle,
            links,
        }
    }

    /// Returns the actor identifier.
    pub fn id(&self) -> u64 {
        self.id
    }

    /// Returns whether the actor is currently alive.
    pub fn is_alive(&self) -> bool {
        self.mailbox.is_closed()
    }

    /// Returns the current actor ref if called within an actor.
    pub fn current() -> Option<ActorRef<A>>
    where
        A: 'static,
    {
        CURRENT_CTX
            .try_with(|ctx| ctx.actor_ref.downcast_ref().cloned())
            .ok()
            .flatten()
    }

    /// Signals the actor to stop after processing all messages currently in its mailbox.
    ///
    /// This method sends a special stop message to the end of the actor's mailbox, ensuring
    /// that the actor will process all preceding messages before stopping. Any messages sent
    /// after this stop signal will be ignored and dropped. This approach allows for a graceful
    /// shutdown of the actor, ensuring all pending work is completed before termination.
    pub fn stop_gracefully(&self) -> Result<(), SendError>
    where
        A: 'static,
    {
        self.mailbox.signal_stop()
    }

    /// Kills the actor immediately.
    ///
    /// This method aborts the actor immediately. Messages in the mailbox will be ignored and dropped.
    ///
    /// The actors on_stop hook will still be called.
    ///
    /// Note: If the actor is in the middle of processing a message, it will abort processing of that message.
    pub fn kill(&self) {
        self.abort_handle.abort()
    }

    /// Waits for the actor to finish processing and stop.
    ///
    /// This method suspends execution until the actor has stopped, ensuring that any ongoing
    /// processing is completed and the actor has fully terminated. This is particularly useful
    /// in scenarios where it's necessary to wait for an actor to clean up its resources or
    /// complete its final tasks before proceeding.
    ///
    /// Note: This method does not initiate the stop process; it only waits for the actor to
    /// stop. You should signal the actor to stop using `stop_gracefully` or `kill`
    /// before calling this method.
    ///
    /// # Examples
    ///
    /// ```ignore
    /// // Assuming `actor.stop_gracefully().await` has been called earlier
    /// actor.wait_for_stop().await;
    /// ```
    pub async fn wait_for_stop(&self) {
        self.mailbox.closed().await
    }

    /// Sends a message to the actor, waiting for a reply.
    ///
    /// ```
    /// let reply = actor_ref.send(msg).await?;
    /// ```
    pub async fn send<M>(&self, msg: M) -> Result<A::Reply, SendError<M>>
    where
        A: Message<M>,
        M: Send + 'static,
    {
        debug_assert!(
            Self::current().map(|actor_ref| actor_ref.id() != self.id()).unwrap_or(true),
            "actors cannot send messages syncronously themselves as this would deadlock - use send_async instead\nthis assertion only occurs on debug builds, release builds will deadlock",
        );

        let (reply, rx) = oneshot::channel();
        self.mailbox.send(Signal::Message {
            message: Box::new(msg),
            reply: Some(reply),
        })?;
        Ok(rx.await.map(|val| *val.downcast().unwrap())?)
    }

    /// Sends a message to the actor asyncronously without waiting for a reply.
    ///
    /// If the handler for this message returns an error, the actor will panic.
    ///
    /// ```
    /// actor_ref.send_async(msg)?;
    /// ```
    pub fn send_async<M>(&self, msg: M) -> Result<(), SendError<M>>
    where
        A: Message<M>,
        M: Send + 'static,
    {
        Ok(self.mailbox.send(Signal::Message {
            message: Box::new(msg),
            reply: None,
        })?)
    }

    /// Sends a message to the actor after a delay.
    ///
    /// This spawns a tokio::task and sleeps for the duration of `delay` before seding the message.
    ///
    /// If the handler for this message returns an error, the actor will panic.
    ///
    /// ```
    /// actor_ref.send_after(msg, Duration::from_secs(5));
    /// ```
    pub fn send_after<M>(&self, msg: M, delay: Duration) -> JoinHandle<Result<(), SendError<M>>>
    where
        A: Message<M>,
        M: Send + 'static,
    {
        let actor_ref = self.clone();
        tokio::spawn(async move {
            tokio::time::sleep(delay).await;
            actor_ref.send_async(msg)
        })
    }

    /// Queries the actor for some data.
    ///
    /// Queries can run in parallel if executed in sequence.
    ///
    /// If the actor was spawned as `!Sync` with `Spawn::spawn_unsync`, then queries will not be supported
    /// and any query will return `Err(SendError::QueriesNotSupported)`.
    ///
    /// ```
    /// // Query from the actor
    /// let reply = actor_ref.query(msg).await?;
    ///
    /// // Queries run concurrently if no `Message`'s are sent between them
    /// let (a, b, c) = tokio::try_join!(
    ///     actor_ref.query(Foo { ... }),
    ///     actor_ref.query(Foo { ... }),
    ///     actor_ref.query(Foo { ... }),
    /// )?;
    /// ```
    pub async fn query<M>(&self, msg: M) -> Result<A::Reply, SendError<M>>
    where
        A: Query<M>,
        M: Send + 'static,
    {
        let (reply, rx) = oneshot::channel();
        self.mailbox.send(Signal::Query {
            query: Box::new(msg),
            reply: Some(reply),
        })?;
        match rx.await {
            Ok(Ok(val)) => Ok(*val.downcast().unwrap()),
            Ok(Err(SendError::ActorNotRunning(err))) => {
                Err(SendError::ActorNotRunning(*err.downcast().unwrap()))
            }
            Ok(Err(SendError::ActorStopped)) => Err(SendError::QueriesNotSupported),
            Ok(Err(SendError::QueriesNotSupported)) => Err(SendError::QueriesNotSupported),
            Err(err) => Err(err.into()),
        }
    }

    /// Links this actor with a child, making this one the parent.
    ///
    /// If the parent dies, then the child will be notified with a link died signal.
    pub fn link_child<B>(&self, child: &ActorRef<B>)
    where
        B: 'static,
    {
        if self.id == child.id() {
            return;
        }

        let child_id = child.id();
        let child: Box<dyn SignalMailbox> = child.signal_mailbox();
        self.links.lock().unwrap().insert(child_id, child);
    }

    /// Unlinks a previously linked child actor.
    pub fn unlink_child<B>(&self, child: &ActorRef<B>)
    where
        B: 'static,
    {
        if self.id == child.id() {
            return;
        }

        self.links.lock().unwrap().remove(&child.id());
    }

    /// Links this actor with a sibbling, notifying eachother if either one dies.
    pub fn link_together<B>(&self, sibbling: &ActorRef<B>)
    where
        A: 'static,
        B: 'static,
    {
        if self.id == sibbling.id() {
            return;
        }

        let (mut this_links, mut sibbling_links) =
            (self.links.lock().unwrap(), sibbling.links.lock().unwrap());
        this_links.insert(sibbling.id(), sibbling.signal_mailbox());
        sibbling_links.insert(self.id, self.signal_mailbox());
    }

    /// Unlinks previously linked processes from eachother.
    pub fn unlink_together<B>(&self, sibbling: &ActorRef<B>)
    where
        B: 'static,
    {
        if self.id == sibbling.id() {
            return;
        }

        let (mut this_links, mut sibbling_links) =
            (self.links.lock().unwrap(), sibbling.links.lock().unwrap());
        this_links.remove(&sibbling.id());
        sibbling_links.remove(&self.id);
    }

    pub(crate) fn signal_mailbox(&self) -> Box<dyn SignalMailbox>
    where
        A: 'static,
    {
        Box::new(self.mailbox.clone())
    }

    pub(crate) fn links(&self) -> Links {
        self.links.clone()
    }
}

impl<A> Clone for ActorRef<A> {
    fn clone(&self) -> Self {
        ActorRef {
            id: self.id,
            mailbox: self.mailbox.clone(),
            abort_handle: self.abort_handle.clone(),
            links: self.links.clone(),
        }
    }
}

impl<A> fmt::Debug for ActorRef<A> {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        let mut d = f.debug_struct("ActorRef");
        d.field("id", &self.id);
        match self.links.try_lock() {
            Ok(guard) => {
                d.field("links", &guard.keys());
            }
            Err(TryLockError::Poisoned(_)) => {
                d.field("links", &format_args!("<poisoned>"));
            }
            Err(TryLockError::WouldBlock) => {
                d.field("links", &format_args!("<locked>"));
            }
        }
        d.finish()
    }
}

pub(crate) trait SignalMailbox: DynClone + Send {
    fn signal_link_died(&self, id: u64, reason: ActorStopReason) -> Result<(), SendError>;
    fn signal_stop(&self) -> Result<(), SendError>;
}

dyn_clone::clone_trait_object!(SignalMailbox);

impl<A> SignalMailbox for Mailbox<A> {
    fn signal_link_died(&self, id: u64, reason: ActorStopReason) -> Result<(), SendError> {
        self.send(Signal::LinkDied { id, reason })
            .map_err(|_| SendError::ActorNotRunning(()))
    }

    fn signal_stop(&self) -> Result<(), SendError> {
        self.send(Signal::Stop)
            .map_err(|_| SendError::ActorNotRunning(()))
    }
}

pub(crate) enum Signal<A> {
    Message {
        message: Box<dyn DynMessage<A>>,
        reply: Option<oneshot::Sender<BoxReply>>,
    },
    Query {
        query: Box<dyn DynQuery<A>>,
        reply: Option<oneshot::Sender<Result<BoxReply, SendError<Box<dyn any::Any + Send>>>>>,
    },
    LinkDied {
        id: u64,
        reason: ActorStopReason,
    },
    Stop,
}

impl<A> Signal<A> {
    pub(crate) fn downcast_message<M>(self) -> Option<M>
    where
        M: 'static,
    {
        match self {
            Signal::Message { message, reply: _ } => message.as_any().downcast().ok().map(|v| *v),
            Signal::Query {
                query: message,
                reply: _,
            } => message.as_any().downcast().ok().map(|v| *v),
            _ => None,
        }
    }
}