near_async/tokio/
runtime_handle.rs

1use crate::futures::{DelayedActionRunner, FutureSpawner};
2use crate::instrumentation::queue::InstrumentedQueue;
3use crate::instrumentation::writer::InstrumentedThreadWriterSharedPart;
4use crate::messaging::Actor;
5use crate::pretty_type_name;
6use crate::tokio::runtime::AsyncDroppableRuntime;
7use std::sync::Arc;
8use std::time::Duration;
9use tokio::runtime::Runtime;
10use tokio::sync::mpsc;
11use tokio_util::sync::CancellationToken;
12
13/// TokioRuntimeMessage is a type alias for a boxed function that can be sent to the Tokio runtime.
14pub(super) struct TokioRuntimeMessage<A> {
15    pub(super) seq: u64,
16    pub(super) enqueued_time_ns: u64,
17    pub(super) name: &'static str,
18    pub(super) function: Box<dyn FnOnce(&mut A, &mut dyn DelayedActionRunner<A>) + Send>,
19}
20
21/// TokioRuntimeHandle is a handle to a Tokio runtime that can be used to send messages to an actor.
22/// It allows for sending messages and spawning futures into the Tokio runtime.
23pub struct TokioRuntimeHandle<A> {
24    /// The sender is used to send messages to the actor running in the Tokio runtime.
25    sender: mpsc::UnboundedSender<TokioRuntimeMessage<A>>,
26    pub(super) instrumentation: Arc<InstrumentedThreadWriterSharedPart>,
27    /// The runtime_handle used to post futures to the Tokio runtime.
28    /// This is a handle, meaning it does not prevent the runtime from shutting down.
29    /// Runtime shutdown is governed by cancellation only (either via TokioRuntimeHandle::stop() or
30    /// ActorSystem::stop()).
31    pub(super) runtime_handle: tokio::runtime::Handle,
32    /// Cancellation token used to signal shutdown of this specific Tokio runtime.
33    /// There is also a global shutdown signal in the ActorSystem. These are separate
34    /// shutdown mechanisms that can both be used to shut down the actor.
35    cancel: CancellationToken,
36}
37
38impl<A> Clone for TokioRuntimeHandle<A> {
39    fn clone(&self) -> Self {
40        Self {
41            sender: self.sender.clone(),
42            instrumentation: self.instrumentation.clone(),
43            runtime_handle: self.runtime_handle.clone(),
44            cancel: self.cancel.clone(),
45        }
46    }
47}
48
49impl<A> TokioRuntimeHandle<A>
50where
51    A: 'static,
52{
53    pub fn sender(&self) -> Arc<TokioRuntimeHandle<A>> {
54        Arc::new(self.clone())
55    }
56
57    pub fn future_spawner(&self) -> Box<dyn FutureSpawner> {
58        Box::new(self.clone())
59    }
60
61    pub fn stop(&self) {
62        self.cancel.cancel();
63    }
64}
65
66impl<A> TokioRuntimeHandle<A> {
67    pub(super) fn send_message(
68        &self,
69        message: TokioRuntimeMessage<A>,
70    ) -> Result<(), mpsc::error::SendError<TokioRuntimeMessage<A>>> {
71        let name = message.name;
72        self.sender.send(message).map(|_| {
73            // Only increment the queue if the message was successfully sent.
74            self.instrumentation.queue().enqueue(name);
75        })
76    }
77}
78
79/// See ActorSystem::spawn_tokio_actor.
80pub(crate) fn spawn_tokio_actor<A>(
81    actor: A,
82    actor_name: String,
83    system_cancellation_signal: CancellationToken,
84) -> TokioRuntimeHandle<A>
85where
86    A: Actor + Send + 'static,
87{
88    let runtime_builder = TokioRuntimeBuilder::new(actor_name, system_cancellation_signal);
89    let handle = runtime_builder.handle();
90    runtime_builder.spawn_tokio_actor(actor);
91    handle
92}
93
94struct CallStopWhenDropping<A: Actor> {
95    actor: A,
96}
97
98impl<A: Actor> Drop for CallStopWhenDropping<A> {
99    fn drop(&mut self) {
100        self.actor.stop_actor();
101    }
102}
103
104/// A more granular way to build a tokio runtime. It allows spawning futures and getting a handle
105/// before the actor is constructed (so that the actor can be constructed with the handle,
106/// for sending messages to itself).
107pub struct TokioRuntimeBuilder<A: Actor + Send + 'static> {
108    handle: TokioRuntimeHandle<A>,
109    receiver: Option<mpsc::UnboundedReceiver<TokioRuntimeMessage<A>>>,
110    shared_instrumentation: Arc<InstrumentedThreadWriterSharedPart>,
111    system_cancellation_signal: CancellationToken,
112    runtime: Option<Runtime>,
113}
114
115impl<A: Actor + Send + 'static> TokioRuntimeBuilder<A> {
116    pub fn new(actor_name: String, system_cancellation_signal: CancellationToken) -> Self {
117        let runtime = tokio::runtime::Builder::new_multi_thread()
118            .worker_threads(1)
119            .enable_all()
120            .build()
121            .expect("Failed to create Tokio runtime");
122
123        let (sender, receiver) = mpsc::unbounded_channel::<TokioRuntimeMessage<A>>();
124        let instrumented_queue = InstrumentedQueue::new(&actor_name);
125        let shared_instrumentation =
126            InstrumentedThreadWriterSharedPart::new(actor_name, instrumented_queue);
127
128        // Start a background task that just initializes the thread-local writer.
129        // This ensures the instrumentation is aware of the actor's existence even if
130        // it doesn't process any messages for a while.
131        runtime.spawn({
132            let shared_instrumentation = shared_instrumentation.clone();
133            async move {
134                shared_instrumentation.with_thread_local_writer(|_writer| {});
135            }
136        });
137
138        let cancel = CancellationToken::new();
139        let handle = TokioRuntimeHandle {
140            sender,
141            runtime_handle: runtime.handle().clone(),
142            cancel,
143            instrumentation: shared_instrumentation.clone(),
144        };
145
146        Self {
147            handle,
148            receiver: Some(receiver),
149            shared_instrumentation,
150            system_cancellation_signal,
151            runtime: Some(runtime),
152        }
153    }
154
155    pub fn handle(&self) -> TokioRuntimeHandle<A> {
156        self.handle.clone()
157    }
158
159    pub fn spawn_tokio_actor(mut self, mut actor: A) {
160        let mut runtime_handle = self.handle.clone();
161        let inner_runtime_handle = runtime_handle.runtime_handle.clone();
162        let runtime = self.runtime.take().unwrap();
163        let mut receiver = self.receiver.take().unwrap();
164        let shared_instrumentation = self.shared_instrumentation.clone();
165        let actor_name = pretty_type_name::<A>();
166        inner_runtime_handle.spawn(async move {
167            actor.start_actor(&mut runtime_handle);
168            // The runtime gets dropped as soon as this loop exits, cancelling all other futures on
169            // the same tokio runtime.
170            let _runtime = AsyncDroppableRuntime::new(runtime);
171            let mut actor = CallStopWhenDropping { actor };
172            let mut window_update_timer = tokio::time::interval(Duration::from_secs(1));
173            loop {
174                tokio::select! {
175                    _ = self.system_cancellation_signal.cancelled() => {
176                        tracing::debug!(target: "tokio_runtime", actor_name, "Shutting down Tokio runtime due to ActorSystem shutdown");
177                        break;
178                    }
179                    _ = runtime_handle.cancel.cancelled() => {
180                        tracing::debug!(target: "tokio_runtime", actor_name, "Shutting down Tokio runtime due to targeted cancellation");
181                        break;
182                    }
183                    _ = window_update_timer.tick() => {
184                        tracing::debug!(target: "tokio_runtime", "Advancing instrumentation window");
185                        shared_instrumentation.with_thread_local_writer(|writer| writer.advance_window_if_needed());
186                    }
187                    Some(message) = receiver.recv() => {
188                        let seq = message.seq;
189                        shared_instrumentation.queue().dequeue(message.name);
190                        tracing::debug!(target: "tokio_runtime", seq, actor_name, "Executing message");
191                        let dequeue_time_ns = shared_instrumentation.current_time().saturating_sub(message.enqueued_time_ns);
192                        shared_instrumentation.with_thread_local_writer(|writer| writer.start_event(message.name, dequeue_time_ns));
193                        (message.function)(&mut actor.actor, &mut runtime_handle);
194                        shared_instrumentation.with_thread_local_writer(|writer| writer.end_event(message.name));
195                    }
196                    // Note: If the sender is closed, that stops being a selectable option.
197                    // This is valid: we can spawn a tokio runtime without a handle, just to keep
198                    // some futures running.
199                }
200            }
201        });
202    }
203}
204
205impl<A> Drop for TokioRuntimeBuilder<A>
206where
207    A: Actor + Send + 'static,
208{
209    fn drop(&mut self) {
210        if self.runtime.is_some() {
211            panic!(
212                "TokioRuntimeBuilder must be built before dropping. Did you forget to call spawn_tokio_actor?"
213            );
214        }
215    }
216}