near_async/multithread/
sender.rs1use std::fmt::Debug;
2
3use futures::FutureExt;
4use futures::future::BoxFuture;
5
6use crate::messaging::{AsyncSendError, CanSend, CanSendAsync, Handler};
7use crate::multithread::runtime_handle::{MultithreadRuntimeHandle, MultithreadRuntimeMessage};
8use crate::{next_message_sequence_num, pretty_type_name};
9
10impl<A, M> CanSend<M> for MultithreadRuntimeHandle<A>
11where
12 A: Handler<M> + 'static,
13 M: Debug + Send + 'static,
14{
15 fn send(&self, message: M) {
16 let seq = next_message_sequence_num();
17 let message_type = pretty_type_name::<M>();
18 tracing::trace!(target: "multithread_runtime", seq, message_type, "sending sync message");
19
20 let function = |actor: &mut A| {
21 actor.handle(message);
22 };
23
24 let message = MultithreadRuntimeMessage {
25 seq,
26 enqueued_time_ns: self.instrumentation.current_time(),
27 name: message_type,
28 function: Box::new(function),
29 };
30 if let Err(_) = self.send_message(message) {
31 tracing::info!(target: "multithread_runtime", seq, "Ignoring sync message, receiving actor is being shut down");
32 }
33 }
34}
35
36impl<A, M, R> CanSendAsync<M, R> for MultithreadRuntimeHandle<A>
37where
38 A: Handler<M, R> + 'static,
39 M: Debug + Send + 'static,
40 R: Send + 'static,
41{
42 fn send_async(&self, message: M) -> BoxFuture<'static, Result<R, AsyncSendError>> {
43 let seq = next_message_sequence_num();
44 let message_type = pretty_type_name::<M>();
45 tracing::trace!(target: "multithread_runtime", seq, message_type, ?message, "sending async message");
46
47 let (sender, receiver) = tokio::sync::oneshot::channel();
48 let future = async move { receiver.await.map_err(|_| AsyncSendError::Dropped) };
49 let function = move |actor: &mut A| {
50 let result = actor.handle(message);
51 sender.send(result).ok(); };
53
54 let message = MultithreadRuntimeMessage {
55 seq,
56 enqueued_time_ns: self.instrumentation.current_time(),
57 name: message_type,
58 function: Box::new(function),
59 };
60 if let Err(_) = self.send_message(message) {
61 async { Err(AsyncSendError::Dropped) }.boxed()
62 } else {
63 future.boxed()
64 }
65 }
66}