Skip to main content

actors_rs/kernel/
mod.rs

1pub(crate) mod kernel_ref;
2pub(crate) mod mailbox;
3pub(crate) mod provider;
4pub(crate) mod queue;
5
6use crate::actor::actor_cell::ExtendedCell;
7use crate::actor::{Actor, ActorProducer, BoxActorProd, Context, CreateError};
8use crate::actor_ref::{ActorRef, ActorReference, BasicActorRef};
9use crate::kernel::kernel_ref::KernelRef;
10use crate::kernel::mailbox::{flush_to_deadletters, run_mailbox, Mailbox};
11use crate::system::{ActorRestarted, ActorSystem, ActorTerminated, SystemMsg};
12use crate::Message;
13use futures::{channel::mpsc::channel, task::SpawnExt, StreamExt};
14use slog::warn;
15use std::{
16    panic::{catch_unwind, AssertUnwindSafe},
17    sync::{Arc, Mutex},
18};
19
20// TODO: fix clippy issue https://github.com/actors-rs/actors.rs/issues/7
21#[derive(Debug)]
22#[allow(clippy::large_enum_variant)]
23pub enum KernelMsg {
24    TerminateActor,
25    RestartActor,
26    RunActor,
27    Sys(ActorSystem),
28}
29
30pub struct Dock<A: Actor> {
31    pub actor: Arc<Mutex<Option<A>>>,
32    pub cell: ExtendedCell<A::Msg>,
33}
34
35impl<A: Actor> Clone for Dock<A> {
36    fn clone(&self) -> Self {
37        Self {
38            actor: self.actor.clone(),
39            cell: self.cell.clone(),
40        }
41    }
42}
43
44pub fn kernel<A>(
45    props: BoxActorProd<A>,
46    cell: ExtendedCell<A::Msg>,
47    mailbox: Mailbox<A::Msg>,
48    sys: &ActorSystem,
49) -> Result<KernelRef, CreateError>
50where
51    A: Actor + 'static,
52{
53    let (tx, mut rx) = channel::<KernelMsg>(1000); // todo config?
54    let kr = KernelRef { tx };
55
56    let mut sys = sys.clone();
57    let mut child_sys = sys.clone();
58    let akr = kr.clone();
59    let actor = start_actor(&props)?;
60    let cell = cell.init(&kr);
61
62    let dock = Dock {
63        actor: Arc::new(Mutex::new(Some(actor))),
64        cell: cell.clone(),
65    };
66
67    let actor_ref = ActorRef::new(cell);
68
69    let f = async move {
70        while let Some(msg) = rx.next().await {
71            match msg {
72                KernelMsg::RunActor => {
73                    let ctx = Context {
74                        myself: actor_ref.clone(),
75                        system: child_sys.clone(),
76                        kernel: akr.clone(),
77                    };
78
79                    let mb = mailbox.clone();
80                    let d = dock.clone();
81
82                    let _ =
83                        std::panic::catch_unwind(AssertUnwindSafe(|| run_mailbox(&mb, &ctx, d)));
84                    //.unwrap();
85                }
86                KernelMsg::RestartActor => {
87                    restart_actor(&dock, actor_ref.clone().into(), &props, &child_sys);
88                }
89                KernelMsg::TerminateActor => {
90                    terminate_actor(&mailbox, actor_ref.clone().into(), &child_sys);
91                    break;
92                }
93                KernelMsg::Sys(s) => {
94                    child_sys = s;
95                }
96            }
97        }
98    };
99
100    sys.exec.spawn(f).unwrap();
101    Ok(kr)
102}
103
104fn restart_actor<A>(
105    dock: &Dock<A>,
106    actor_ref: BasicActorRef,
107    props: &BoxActorProd<A>,
108    sys: &ActorSystem,
109) where
110    A: Actor,
111{
112    let mut a = dock.actor.lock().unwrap();
113    if let Ok(actor) = start_actor(props) {
114        *a = Some(actor);
115        actor_ref.sys_tell(SystemMsg::ActorInit);
116        sys.publish_event(ActorRestarted { actor: actor_ref }.into());
117    } else {
118        warn!(sys.log(), "Actor failed to restart: {:?}", actor_ref);
119    }
120}
121
122fn terminate_actor<Msg>(mbox: &Mailbox<Msg>, actor_ref: BasicActorRef, sys: &ActorSystem)
123where
124    Msg: Message,
125{
126    sys.provider.unregister(actor_ref.path());
127    flush_to_deadletters(mbox, &actor_ref, sys);
128    sys.publish_event(
129        ActorTerminated {
130            actor: actor_ref.clone(),
131        }
132        .into(),
133    );
134
135    let parent = actor_ref.parent();
136    if !parent.is_root() {
137        parent.sys_tell(ActorTerminated { actor: actor_ref }.into());
138    }
139}
140
141fn start_actor<A>(props: &BoxActorProd<A>) -> Result<A, CreateError>
142where
143    A: Actor,
144{
145    let actor = catch_unwind(|| props.produce()).map_err(|_| CreateError::Panicked)?;
146
147    Ok(actor)
148}