quickwit_actors/
runner.rs

1// Copyright (C) 2021 Quickwit, Inc.
2//
3// Quickwit is offered under the AGPL v3.0 and as commercial software.
4// For commercial licensing, contact us at hello@quickwit.io.
5//
6// AGPL:
7// This program is free software: you can redistribute it and/or modify
8// it under the terms of the GNU Affero General Public License as
9// published by the Free Software Foundation, either version 3 of the
10// License, or (at your option) any later version.
11//
12// This program is distributed in the hope that it will be useful,
13// but WITHOUT ANY WARRANTY; without even the implied warranty of
14// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15// GNU Affero General Public License for more details.
16//
17// You should have received a copy of the GNU Affero General Public License
18// along with this program. If not, see <http://www.gnu.org/licenses/>.
19
20use std::future::Future;
21
22use anyhow::Context;
23use tokio::sync::watch;
24use tracing::{debug, error, info, Instrument};
25
26use crate::actor::process_command;
27use crate::actor_with_state_tx::ActorWithStateTx;
28use crate::join_handle::JoinHandle;
29use crate::mailbox::{CommandOrMessage, Inbox};
30use crate::{Actor, ActorContext, ActorExitStatus, ActorHandle, RecvError};
31
32/// The `ActorRunner` defines the environment in which an actor
33/// should be executed.
34///
35/// While all actor's handlers have a asynchronous trait,
36/// some actor can be implemented to do some heavy blocking work
37/// and no rely on any asynchronous contructs.
38///
39/// In that case, they should simply rely on the `DedicatedThread` runner.
40#[derive(Clone, Copy, Debug)]
41pub enum ActorRunner {
42    /// Returns the actor as a tokio task running on the global runtime.
43    ///
44    /// This is the most lightweight solution.
45    /// Actor running on this runtime should not block for more than 50 microsecs.
46    GlobalRuntime,
47    /// Spawns a dedicated thread, a Tokio runtime, and rely on
48    /// `tokio::Runtime::block_on` to run the actor loop.
49    ///
50    /// This runner is suitable for actors that are heavily blocking.
51    DedicatedThread,
52}
53
54impl ActorRunner {
55    pub fn spawn_actor<A: Actor>(
56        &self,
57        actor: A,
58        ctx: ActorContext<A>,
59        inbox: Inbox<A>,
60    ) -> ActorHandle<A> {
61        debug!(actor_id = %ctx.actor_instance_id(), "spawn-async");
62        let (state_tx, state_rx) = watch::channel(actor.observable_state());
63        let ctx_clone = ctx.clone();
64        let span = actor.span(&ctx_clone);
65        let actor_instance_id = ctx.actor_instance_id().to_string();
66        let loop_async_actor_future =
67            async move { async_actor_loop(actor, inbox, ctx, state_tx).await }.instrument(span);
68        let join_handle = self.spawn_named_task(loop_async_actor_future, &actor_instance_id);
69        ActorHandle::new(state_rx, join_handle, ctx_clone)
70    }
71
72    fn spawn_named_task(
73        &self,
74        task: impl Future<Output = ActorExitStatus> + Send + 'static,
75        name: &str,
76    ) -> JoinHandle {
77        match *self {
78            ActorRunner::GlobalRuntime => tokio_task_runtime_spawn_named(task, name),
79            ActorRunner::DedicatedThread => dedicated_runtime_spawn_named(task, name),
80        }
81    }
82}
83
84fn dedicated_runtime_spawn_named(
85    task: impl Future<Output = ActorExitStatus> + Send + 'static,
86    name: &str,
87) -> JoinHandle {
88    let (join_handle, sender) = JoinHandle::create_for_thread();
89    std::thread::Builder::new()
90        .name(name.to_string())
91        .spawn(move || {
92            let rt = tokio::runtime::Builder::new_current_thread()
93                .enable_all()
94                .build()
95                .unwrap();
96            let exit_status = rt.block_on(task);
97            let _ = sender.send(exit_status);
98        })
99        .unwrap();
100    join_handle
101}
102
103#[allow(unused_variables)]
104fn tokio_task_runtime_spawn_named(
105    task: impl Future<Output = ActorExitStatus> + Send + 'static,
106    name: &str,
107) -> JoinHandle {
108    let tokio_task_join_handle = {
109        #[cfg(tokio_unstable)]
110        {
111            tokio::task::Builder::new().name(_name).spawn(task)
112        }
113        #[cfg(not(tokio_unstable))]
114        {
115            tokio::spawn(task)
116        }
117    };
118    JoinHandle::create_for_task(tokio_task_join_handle)
119}
120
121async fn process_msg<A: Actor>(
122    actor: &mut A,
123    msg_id: u64,
124    inbox: &mut Inbox<A>,
125    ctx: &ActorContext<A>,
126    state_tx: &watch::Sender<A::ObservableState>,
127) -> Option<ActorExitStatus> {
128    if ctx.kill_switch().is_dead() {
129        return Some(ActorExitStatus::Killed);
130    }
131    ctx.progress().record_progress();
132
133    let command_or_msg_recv_res = if ctx.state().is_running() {
134        inbox.recv_timeout().await
135    } else {
136        // The actor is paused. We only process command and scheduled message.
137        inbox.recv_timeout_cmd_and_scheduled_msg_only().await
138    };
139
140    ctx.progress().record_progress();
141    if ctx.kill_switch().is_dead() {
142        return Some(ActorExitStatus::Killed);
143    }
144
145    match command_or_msg_recv_res {
146        Ok(CommandOrMessage::Command(cmd)) => {
147            ctx.process();
148            process_command(actor, cmd, ctx, state_tx)
149        }
150        Ok(CommandOrMessage::Message(mut msg)) => {
151            ctx.process();
152            msg.handle_message(msg_id, actor, ctx).await.err()
153        }
154        Err(RecvError::Disconnected) => Some(ActorExitStatus::Success),
155        Err(RecvError::Timeout) => {
156            ctx.idle();
157            if ctx.mailbox().is_last_mailbox() {
158                // No one will be able to send us more messages.
159                // We can exit the actor.
160                Some(ActorExitStatus::Success)
161            } else {
162                None
163            }
164        }
165    }
166}
167
168async fn async_actor_loop<A: Actor>(
169    actor: A,
170    mut inbox: Inbox<A>,
171    ctx: ActorContext<A>,
172    state_tx: watch::Sender<A::ObservableState>,
173) -> ActorExitStatus {
174    // We rely on this object internally to fetch a post-mortem state,
175    // even in case of a panic.
176    let mut actor_with_state_tx = ActorWithStateTx { actor, state_tx };
177
178    let mut exit_status_opt: Option<ActorExitStatus> =
179        actor_with_state_tx.actor.initialize(&ctx).await.err();
180
181    let mut msg_id: u64 = 1;
182    let mut exit_status: ActorExitStatus = loop {
183        tokio::task::yield_now().await;
184        if let Some(exit_status) = exit_status_opt {
185            break exit_status;
186        }
187        exit_status_opt = process_msg(
188            &mut actor_with_state_tx.actor,
189            msg_id,
190            &mut inbox,
191            &ctx,
192            &actor_with_state_tx.state_tx,
193        )
194        .await;
195        msg_id += 1;
196    };
197    ctx.record_progress();
198    if let Err(finalize_error) = actor_with_state_tx
199        .actor
200        .finalize(&exit_status, &ctx)
201        .await
202        .with_context(|| format!("Finalization of actor {}", actor_with_state_tx.actor.name()))
203    {
204        error!(error=?finalize_error, "Finalizing failed, set exit status to panicked.");
205        exit_status = ActorExitStatus::Panicked;
206    }
207    match &exit_status {
208        ActorExitStatus::Success
209        | ActorExitStatus::Quit
210        | ActorExitStatus::DownstreamClosed
211        | ActorExitStatus::Killed => {}
212        ActorExitStatus::Failure(err) => {
213            error!(cause=?err, exit_status=?exit_status, "actor-failure");
214        }
215        ActorExitStatus::Panicked => {
216            error!(exit_status=?exit_status, "actor-failure");
217        }
218    }
219    info!(actor_id = %ctx.actor_instance_id(), exit_status = %exit_status, "actor-exit");
220    ctx.exit(&exit_status);
221    exit_status
222}