near_async/tokio/
runtime_handle.rs1use crate::futures::{DelayedActionRunner, FutureSpawner};
2use crate::instrumentation::queue::InstrumentedQueue;
3use crate::instrumentation::writer::InstrumentedThreadWriterSharedPart;
4use crate::messaging::Actor;
5use crate::pretty_type_name;
6use crate::tokio::runtime::AsyncDroppableRuntime;
7use std::sync::Arc;
8use std::time::Duration;
9use tokio::runtime::Runtime;
10use tokio::sync::mpsc;
11use tokio_util::sync::CancellationToken;
12
13pub(super) struct TokioRuntimeMessage<A> {
15 pub(super) seq: u64,
16 pub(super) enqueued_time_ns: u64,
17 pub(super) name: &'static str,
18 pub(super) function: Box<dyn FnOnce(&mut A, &mut dyn DelayedActionRunner<A>) + Send>,
19}
20
21pub struct TokioRuntimeHandle<A> {
24 sender: mpsc::UnboundedSender<TokioRuntimeMessage<A>>,
26 pub(super) instrumentation: Arc<InstrumentedThreadWriterSharedPart>,
27 pub(super) runtime_handle: tokio::runtime::Handle,
32 cancel: CancellationToken,
36}
37
38impl<A> Clone for TokioRuntimeHandle<A> {
39 fn clone(&self) -> Self {
40 Self {
41 sender: self.sender.clone(),
42 instrumentation: self.instrumentation.clone(),
43 runtime_handle: self.runtime_handle.clone(),
44 cancel: self.cancel.clone(),
45 }
46 }
47}
48
49impl<A> TokioRuntimeHandle<A>
50where
51 A: 'static,
52{
53 pub fn sender(&self) -> Arc<TokioRuntimeHandle<A>> {
54 Arc::new(self.clone())
55 }
56
57 pub fn future_spawner(&self) -> Box<dyn FutureSpawner> {
58 Box::new(self.clone())
59 }
60
61 pub fn stop(&self) {
62 self.cancel.cancel();
63 }
64}
65
66impl<A> TokioRuntimeHandle<A> {
67 pub(super) fn send_message(
68 &self,
69 message: TokioRuntimeMessage<A>,
70 ) -> Result<(), mpsc::error::SendError<TokioRuntimeMessage<A>>> {
71 let name = message.name;
72 self.sender.send(message).map(|_| {
73 self.instrumentation.queue().enqueue(name);
75 })
76 }
77}
78
79pub(crate) fn spawn_tokio_actor<A>(
81 actor: A,
82 actor_name: String,
83 system_cancellation_signal: CancellationToken,
84) -> TokioRuntimeHandle<A>
85where
86 A: Actor + Send + 'static,
87{
88 let runtime_builder = TokioRuntimeBuilder::new(actor_name, system_cancellation_signal);
89 let handle = runtime_builder.handle();
90 runtime_builder.spawn_tokio_actor(actor);
91 handle
92}
93
94struct CallStopWhenDropping<A: Actor> {
95 actor: A,
96}
97
98impl<A: Actor> Drop for CallStopWhenDropping<A> {
99 fn drop(&mut self) {
100 self.actor.stop_actor();
101 }
102}
103
104pub struct TokioRuntimeBuilder<A: Actor + Send + 'static> {
108 handle: TokioRuntimeHandle<A>,
109 receiver: Option<mpsc::UnboundedReceiver<TokioRuntimeMessage<A>>>,
110 shared_instrumentation: Arc<InstrumentedThreadWriterSharedPart>,
111 system_cancellation_signal: CancellationToken,
112 runtime: Option<Runtime>,
113}
114
115impl<A: Actor + Send + 'static> TokioRuntimeBuilder<A> {
116 pub fn new(actor_name: String, system_cancellation_signal: CancellationToken) -> Self {
117 let runtime = tokio::runtime::Builder::new_multi_thread()
118 .worker_threads(1)
119 .enable_all()
120 .build()
121 .expect("Failed to create Tokio runtime");
122
123 let (sender, receiver) = mpsc::unbounded_channel::<TokioRuntimeMessage<A>>();
124 let instrumented_queue = InstrumentedQueue::new(&actor_name);
125 let shared_instrumentation =
126 InstrumentedThreadWriterSharedPart::new(actor_name, instrumented_queue);
127
128 runtime.spawn({
132 let shared_instrumentation = shared_instrumentation.clone();
133 async move {
134 shared_instrumentation.with_thread_local_writer(|_writer| {});
135 }
136 });
137
138 let cancel = CancellationToken::new();
139 let handle = TokioRuntimeHandle {
140 sender,
141 runtime_handle: runtime.handle().clone(),
142 cancel,
143 instrumentation: shared_instrumentation.clone(),
144 };
145
146 Self {
147 handle,
148 receiver: Some(receiver),
149 shared_instrumentation,
150 system_cancellation_signal,
151 runtime: Some(runtime),
152 }
153 }
154
155 pub fn handle(&self) -> TokioRuntimeHandle<A> {
156 self.handle.clone()
157 }
158
159 pub fn spawn_tokio_actor(mut self, mut actor: A) {
160 let mut runtime_handle = self.handle.clone();
161 let inner_runtime_handle = runtime_handle.runtime_handle.clone();
162 let runtime = self.runtime.take().unwrap();
163 let mut receiver = self.receiver.take().unwrap();
164 let shared_instrumentation = self.shared_instrumentation.clone();
165 let actor_name = pretty_type_name::<A>();
166 inner_runtime_handle.spawn(async move {
167 actor.start_actor(&mut runtime_handle);
168 let _runtime = AsyncDroppableRuntime::new(runtime);
171 let mut actor = CallStopWhenDropping { actor };
172 let mut window_update_timer = tokio::time::interval(Duration::from_secs(1));
173 loop {
174 tokio::select! {
175 _ = self.system_cancellation_signal.cancelled() => {
176 tracing::debug!(target: "tokio_runtime", actor_name, "Shutting down Tokio runtime due to ActorSystem shutdown");
177 break;
178 }
179 _ = runtime_handle.cancel.cancelled() => {
180 tracing::debug!(target: "tokio_runtime", actor_name, "Shutting down Tokio runtime due to targeted cancellation");
181 break;
182 }
183 _ = window_update_timer.tick() => {
184 tracing::debug!(target: "tokio_runtime", "Advancing instrumentation window");
185 shared_instrumentation.with_thread_local_writer(|writer| writer.advance_window_if_needed());
186 }
187 Some(message) = receiver.recv() => {
188 let seq = message.seq;
189 shared_instrumentation.queue().dequeue(message.name);
190 tracing::debug!(target: "tokio_runtime", seq, actor_name, "Executing message");
191 let dequeue_time_ns = shared_instrumentation.current_time().saturating_sub(message.enqueued_time_ns);
192 shared_instrumentation.with_thread_local_writer(|writer| writer.start_event(message.name, dequeue_time_ns));
193 (message.function)(&mut actor.actor, &mut runtime_handle);
194 shared_instrumentation.with_thread_local_writer(|writer| writer.end_event(message.name));
195 }
196 }
200 }
201 });
202 }
203}
204
205impl<A> Drop for TokioRuntimeBuilder<A>
206where
207 A: Actor + Send + 'static,
208{
209 fn drop(&mut self) {
210 if self.runtime.is_some() {
211 panic!(
212 "TokioRuntimeBuilder must be built before dropping. Did you forget to call spawn_tokio_actor?"
213 );
214 }
215 }
216}