near_async/multithread/
runtime_handle.rs1use crate::instrumentation::queue::InstrumentedQueue;
2use crate::instrumentation::writer::InstrumentedThreadWriterSharedPart;
3use crate::messaging::Actor;
4use crate::pretty_type_name;
5use std::sync::Arc;
6use std::time::Duration;
7
8pub(super) struct MultithreadRuntimeMessage<A> {
11 pub(super) seq: u64,
12 pub(super) enqueued_time_ns: u64,
13 pub(super) name: &'static str,
14 pub(super) function: Box<dyn FnOnce(&mut A) + Send>,
15}
16
17pub struct MultithreadRuntimeHandle<A> {
20 pub(super) sender: crossbeam_channel::Sender<MultithreadRuntimeMessage<A>>,
21 cancellation_signal_holder: Option<crossbeam_channel::Sender<()>>,
25 pub(super) instrumentation: Arc<InstrumentedThreadWriterSharedPart>,
26}
27
28impl<A> Clone for MultithreadRuntimeHandle<A> {
29 fn clone(&self) -> Self {
30 Self {
31 sender: self.sender.clone(),
32 cancellation_signal_holder: self.cancellation_signal_holder.clone(),
33 instrumentation: self.instrumentation.clone(),
34 }
35 }
36}
37
38impl<A> MultithreadRuntimeHandle<A>
39where
40 A: 'static,
41{
42 pub fn sender(&self) -> Arc<MultithreadRuntimeHandle<A>> {
43 Arc::new(self.clone())
44 }
45}
46
47impl<A> MultithreadRuntimeHandle<A> {
48 pub(super) fn send_message(
49 &self,
50 message: MultithreadRuntimeMessage<A>,
51 ) -> Result<(), crossbeam_channel::SendError<MultithreadRuntimeMessage<A>>> {
52 let name = message.name;
53 self.sender.send(message).map(|_| {
54 self.instrumentation.queue().enqueue(name);
56 })
57 }
58}
59
60pub(crate) fn spawn_multithread_actor<A>(
66 num_threads: usize,
67 make_actor_fn: impl Fn() -> A + Sync + Send + 'static,
68 cancellation_signal: crossbeam_channel::Receiver<()>,
69 cancellation_signal_holder: Option<crossbeam_channel::Sender<()>>,
70) -> MultithreadRuntimeHandle<A>
71where
72 A: Actor + Send + 'static,
73{
74 let actor_name = pretty_type_name::<A>();
75 tracing::info!(
76 target: "multithread_runtime",
77 actor_name,
78 num_threads,
79 "Starting multithread actor",
80 );
81 let (sender, receiver) = crossbeam_channel::unbounded::<MultithreadRuntimeMessage<A>>();
82 let instrumented_queue = InstrumentedQueue::new(actor_name);
83 let shared_instrumentation =
84 InstrumentedThreadWriterSharedPart::new(actor_name.to_string(), instrumented_queue.clone());
85 let handle = MultithreadRuntimeHandle {
86 sender,
87 cancellation_signal_holder,
88 instrumentation: shared_instrumentation,
89 };
90 let make_actor_fn = Arc::new(make_actor_fn);
91 for thread_id in 0..num_threads {
92 let receiver = receiver.clone();
93 let cancellation_signal = cancellation_signal.clone();
94 let instrumented_queue = instrumented_queue.clone();
95 let handle = handle.clone();
96 let make_actor_fn = make_actor_fn.clone();
97
98 std::thread::spawn(move || {
99 let mut instrumentation =
100 handle.instrumentation.new_writer_with_global_registration(Some(thread_id));
101 let mut actor = make_actor_fn();
102 let window_update_ticker = crossbeam_channel::tick(Duration::from_secs(1));
103 loop {
104 crossbeam_channel::select! {
105 recv(cancellation_signal) -> _ => {
106 tracing::info!(target: "multithread_runtime", actor_name, thread_id, "cancellation received, exiting loop.");
107 return;
108 }
109 recv(window_update_ticker) -> _ => {
110 tracing::debug!(target: "multithread_runtime", actor_name, thread_id, "Updating instrumentation window");
111 instrumentation.advance_window_if_needed();
112 }
113 recv(receiver) -> message => {
114 let Ok(message) = message else {
115 tracing::warn!(target: "multithread_runtime", actor_name, thread_id, "message queue closed, exiting event loop.");
116 return;
117 };
118 instrumented_queue.dequeue(message.name);
119 let seq = message.seq;
120 let dequeue_time_ns = handle.instrumentation.current_time().saturating_sub(message.enqueued_time_ns);
121 instrumentation.start_event(message.name, dequeue_time_ns);
122 tracing::debug!(target: "multithread_runtime", seq, "Executing message");
123 (message.function)(&mut actor);
124 instrumentation.end_event(message.name);
125 }
126 }
127 }
128 });
129 }
130 handle
131}