quickwit_actors/
runner.rs1use std::future::Future;
21
22use anyhow::Context;
23use tokio::sync::watch;
24use tracing::{debug, error, info, Instrument};
25
26use crate::actor::process_command;
27use crate::actor_with_state_tx::ActorWithStateTx;
28use crate::join_handle::JoinHandle;
29use crate::mailbox::{CommandOrMessage, Inbox};
30use crate::{Actor, ActorContext, ActorExitStatus, ActorHandle, RecvError};
31
32#[derive(Clone, Copy, Debug)]
41pub enum ActorRunner {
42 GlobalRuntime,
47 DedicatedThread,
52}
53
54impl ActorRunner {
55 pub fn spawn_actor<A: Actor>(
56 &self,
57 actor: A,
58 ctx: ActorContext<A>,
59 inbox: Inbox<A>,
60 ) -> ActorHandle<A> {
61 debug!(actor_id = %ctx.actor_instance_id(), "spawn-async");
62 let (state_tx, state_rx) = watch::channel(actor.observable_state());
63 let ctx_clone = ctx.clone();
64 let span = actor.span(&ctx_clone);
65 let actor_instance_id = ctx.actor_instance_id().to_string();
66 let loop_async_actor_future =
67 async move { async_actor_loop(actor, inbox, ctx, state_tx).await }.instrument(span);
68 let join_handle = self.spawn_named_task(loop_async_actor_future, &actor_instance_id);
69 ActorHandle::new(state_rx, join_handle, ctx_clone)
70 }
71
72 fn spawn_named_task(
73 &self,
74 task: impl Future<Output = ActorExitStatus> + Send + 'static,
75 name: &str,
76 ) -> JoinHandle {
77 match *self {
78 ActorRunner::GlobalRuntime => tokio_task_runtime_spawn_named(task, name),
79 ActorRunner::DedicatedThread => dedicated_runtime_spawn_named(task, name),
80 }
81 }
82}
83
84fn dedicated_runtime_spawn_named(
85 task: impl Future<Output = ActorExitStatus> + Send + 'static,
86 name: &str,
87) -> JoinHandle {
88 let (join_handle, sender) = JoinHandle::create_for_thread();
89 std::thread::Builder::new()
90 .name(name.to_string())
91 .spawn(move || {
92 let rt = tokio::runtime::Builder::new_current_thread()
93 .enable_all()
94 .build()
95 .unwrap();
96 let exit_status = rt.block_on(task);
97 let _ = sender.send(exit_status);
98 })
99 .unwrap();
100 join_handle
101}
102
103#[allow(unused_variables)]
104fn tokio_task_runtime_spawn_named(
105 task: impl Future<Output = ActorExitStatus> + Send + 'static,
106 name: &str,
107) -> JoinHandle {
108 let tokio_task_join_handle = {
109 #[cfg(tokio_unstable)]
110 {
111 tokio::task::Builder::new().name(_name).spawn(task)
112 }
113 #[cfg(not(tokio_unstable))]
114 {
115 tokio::spawn(task)
116 }
117 };
118 JoinHandle::create_for_task(tokio_task_join_handle)
119}
120
121async fn process_msg<A: Actor>(
122 actor: &mut A,
123 msg_id: u64,
124 inbox: &mut Inbox<A>,
125 ctx: &ActorContext<A>,
126 state_tx: &watch::Sender<A::ObservableState>,
127) -> Option<ActorExitStatus> {
128 if ctx.kill_switch().is_dead() {
129 return Some(ActorExitStatus::Killed);
130 }
131 ctx.progress().record_progress();
132
133 let command_or_msg_recv_res = if ctx.state().is_running() {
134 inbox.recv_timeout().await
135 } else {
136 inbox.recv_timeout_cmd_and_scheduled_msg_only().await
138 };
139
140 ctx.progress().record_progress();
141 if ctx.kill_switch().is_dead() {
142 return Some(ActorExitStatus::Killed);
143 }
144
145 match command_or_msg_recv_res {
146 Ok(CommandOrMessage::Command(cmd)) => {
147 ctx.process();
148 process_command(actor, cmd, ctx, state_tx)
149 }
150 Ok(CommandOrMessage::Message(mut msg)) => {
151 ctx.process();
152 msg.handle_message(msg_id, actor, ctx).await.err()
153 }
154 Err(RecvError::Disconnected) => Some(ActorExitStatus::Success),
155 Err(RecvError::Timeout) => {
156 ctx.idle();
157 if ctx.mailbox().is_last_mailbox() {
158 Some(ActorExitStatus::Success)
161 } else {
162 None
163 }
164 }
165 }
166}
167
168async fn async_actor_loop<A: Actor>(
169 actor: A,
170 mut inbox: Inbox<A>,
171 ctx: ActorContext<A>,
172 state_tx: watch::Sender<A::ObservableState>,
173) -> ActorExitStatus {
174 let mut actor_with_state_tx = ActorWithStateTx { actor, state_tx };
177
178 let mut exit_status_opt: Option<ActorExitStatus> =
179 actor_with_state_tx.actor.initialize(&ctx).await.err();
180
181 let mut msg_id: u64 = 1;
182 let mut exit_status: ActorExitStatus = loop {
183 tokio::task::yield_now().await;
184 if let Some(exit_status) = exit_status_opt {
185 break exit_status;
186 }
187 exit_status_opt = process_msg(
188 &mut actor_with_state_tx.actor,
189 msg_id,
190 &mut inbox,
191 &ctx,
192 &actor_with_state_tx.state_tx,
193 )
194 .await;
195 msg_id += 1;
196 };
197 ctx.record_progress();
198 if let Err(finalize_error) = actor_with_state_tx
199 .actor
200 .finalize(&exit_status, &ctx)
201 .await
202 .with_context(|| format!("Finalization of actor {}", actor_with_state_tx.actor.name()))
203 {
204 error!(error=?finalize_error, "Finalizing failed, set exit status to panicked.");
205 exit_status = ActorExitStatus::Panicked;
206 }
207 match &exit_status {
208 ActorExitStatus::Success
209 | ActorExitStatus::Quit
210 | ActorExitStatus::DownstreamClosed
211 | ActorExitStatus::Killed => {}
212 ActorExitStatus::Failure(err) => {
213 error!(cause=?err, exit_status=?exit_status, "actor-failure");
214 }
215 ActorExitStatus::Panicked => {
216 error!(exit_status=?exit_status, "actor-failure");
217 }
218 }
219 info!(actor_id = %ctx.actor_instance_id(), exit_status = %exit_status, "actor-exit");
220 ctx.exit(&exit_status);
221 exit_status
222}