near_async/multithread/
runtime_handle.rs

1use crate::instrumentation::queue::InstrumentedQueue;
2use crate::instrumentation::writer::InstrumentedThreadWriterSharedPart;
3use crate::messaging::Actor;
4use crate::pretty_type_name;
5use std::sync::Arc;
6use std::time::Duration;
7
8/// MultithreadRuntimeMessage is a type alias for a boxed function that can be sent to the multithread runtime,
9/// as well as a description for debugging purposes.
10pub(super) struct MultithreadRuntimeMessage<A> {
11    pub(super) seq: u64,
12    pub(super) enqueued_time_ns: u64,
13    pub(super) name: &'static str,
14    pub(super) function: Box<dyn FnOnce(&mut A) + Send>,
15}
16
17/// Allows sending messages to a multithreaded actor runtime. Implements CanSend and CanSendAsync traits
18/// for the messages that the actor can handle.
19pub struct MultithreadRuntimeHandle<A> {
20    pub(super) sender: crossbeam_channel::Sender<MultithreadRuntimeMessage<A>>,
21    /// This is used in the case where the handle controls the lifetime of the runtime,
22    /// dropping (all of) which automatically stops the runtime, as an alterative of having
23    /// the ActorSystem control it.
24    cancellation_signal_holder: Option<crossbeam_channel::Sender<()>>,
25    pub(super) instrumentation: Arc<InstrumentedThreadWriterSharedPart>,
26}
27
28impl<A> Clone for MultithreadRuntimeHandle<A> {
29    fn clone(&self) -> Self {
30        Self {
31            sender: self.sender.clone(),
32            cancellation_signal_holder: self.cancellation_signal_holder.clone(),
33            instrumentation: self.instrumentation.clone(),
34        }
35    }
36}
37
38impl<A> MultithreadRuntimeHandle<A>
39where
40    A: 'static,
41{
42    pub fn sender(&self) -> Arc<MultithreadRuntimeHandle<A>> {
43        Arc::new(self.clone())
44    }
45}
46
47impl<A> MultithreadRuntimeHandle<A> {
48    pub(super) fn send_message(
49        &self,
50        message: MultithreadRuntimeMessage<A>,
51    ) -> Result<(), crossbeam_channel::SendError<MultithreadRuntimeMessage<A>>> {
52        let name = message.name;
53        self.sender.send(message).map(|_| {
54            // Only increment the queue if the message was successfully sent.
55            self.instrumentation.queue().enqueue(name);
56        })
57    }
58}
59
60/// See ActorSystem::spawn_multithread_actor.
61///
62/// The `cancellation_signal_holder` is an optional sender that can be used to disable
63/// system-wide cancellation. If this sender is used, it is just the other side of the
64/// `cancellation_signal`.
65pub(crate) fn spawn_multithread_actor<A>(
66    num_threads: usize,
67    make_actor_fn: impl Fn() -> A + Sync + Send + 'static,
68    cancellation_signal: crossbeam_channel::Receiver<()>,
69    cancellation_signal_holder: Option<crossbeam_channel::Sender<()>>,
70) -> MultithreadRuntimeHandle<A>
71where
72    A: Actor + Send + 'static,
73{
74    let actor_name = pretty_type_name::<A>();
75    tracing::info!(
76        target: "multithread_runtime",
77        actor_name,
78        num_threads,
79        "Starting multithread actor",
80    );
81    let (sender, receiver) = crossbeam_channel::unbounded::<MultithreadRuntimeMessage<A>>();
82    let instrumented_queue = InstrumentedQueue::new(actor_name);
83    let shared_instrumentation =
84        InstrumentedThreadWriterSharedPart::new(actor_name.to_string(), instrumented_queue.clone());
85    let handle = MultithreadRuntimeHandle {
86        sender,
87        cancellation_signal_holder,
88        instrumentation: shared_instrumentation,
89    };
90    let make_actor_fn = Arc::new(make_actor_fn);
91    for thread_id in 0..num_threads {
92        let receiver = receiver.clone();
93        let cancellation_signal = cancellation_signal.clone();
94        let instrumented_queue = instrumented_queue.clone();
95        let handle = handle.clone();
96        let make_actor_fn = make_actor_fn.clone();
97
98        std::thread::spawn(move || {
99            let mut instrumentation =
100                handle.instrumentation.new_writer_with_global_registration(Some(thread_id));
101            let mut actor = make_actor_fn();
102            let window_update_ticker = crossbeam_channel::tick(Duration::from_secs(1));
103            loop {
104                crossbeam_channel::select! {
105                    recv(cancellation_signal) -> _ => {
106                        tracing::info!(target: "multithread_runtime", actor_name, thread_id, "cancellation received, exiting loop.");
107                        return;
108                    }
109                    recv(window_update_ticker) -> _ => {
110                        tracing::debug!(target: "multithread_runtime", actor_name, thread_id, "Updating instrumentation window");
111                        instrumentation.advance_window_if_needed();
112                    }
113                    recv(receiver) -> message => {
114                        let Ok(message) = message else {
115                            tracing::warn!(target: "multithread_runtime", actor_name, thread_id, "message queue closed, exiting event loop.");
116                            return;
117                        };
118                        instrumented_queue.dequeue(message.name);
119                        let seq = message.seq;
120                        let dequeue_time_ns = handle.instrumentation.current_time().saturating_sub(message.enqueued_time_ns);
121                        instrumentation.start_event(message.name, dequeue_time_ns);
122                        tracing::debug!(target: "multithread_runtime", seq, "Executing message");
123                        (message.function)(&mut actor);
124                        instrumentation.end_event(message.name);
125                    }
126                }
127            }
128        });
129    }
130    handle
131}