near_async/multithread/
sender.rs

1use std::fmt::Debug;
2
3use futures::FutureExt;
4use futures::future::BoxFuture;
5
6use crate::messaging::{AsyncSendError, CanSend, CanSendAsync, Handler};
7use crate::multithread::runtime_handle::{MultithreadRuntimeHandle, MultithreadRuntimeMessage};
8use crate::{next_message_sequence_num, pretty_type_name};
9
10impl<A, M> CanSend<M> for MultithreadRuntimeHandle<A>
11where
12    A: Handler<M> + 'static,
13    M: Debug + Send + 'static,
14{
15    fn send(&self, message: M) {
16        let seq = next_message_sequence_num();
17        let message_type = pretty_type_name::<M>();
18        tracing::trace!(target: "multithread_runtime", seq, message_type, "sending sync message");
19
20        let function = |actor: &mut A| {
21            actor.handle(message);
22        };
23
24        let message = MultithreadRuntimeMessage {
25            seq,
26            enqueued_time_ns: self.instrumentation.current_time(),
27            name: message_type,
28            function: Box::new(function),
29        };
30        if let Err(_) = self.send_message(message) {
31            tracing::info!(target: "multithread_runtime", seq, "Ignoring sync message, receiving actor is being shut down");
32        }
33    }
34}
35
36impl<A, M, R> CanSendAsync<M, R> for MultithreadRuntimeHandle<A>
37where
38    A: Handler<M, R> + 'static,
39    M: Debug + Send + 'static,
40    R: Send + 'static,
41{
42    fn send_async(&self, message: M) -> BoxFuture<'static, Result<R, AsyncSendError>> {
43        let seq = next_message_sequence_num();
44        let message_type = pretty_type_name::<M>();
45        tracing::trace!(target: "multithread_runtime", seq, message_type, ?message, "sending async message");
46
47        let (sender, receiver) = tokio::sync::oneshot::channel();
48        let future = async move { receiver.await.map_err(|_| AsyncSendError::Dropped) };
49        let function = move |actor: &mut A| {
50            let result = actor.handle(message);
51            sender.send(result).ok(); // OK if the sender doesn't care about the result anymore.
52        };
53
54        let message = MultithreadRuntimeMessage {
55            seq,
56            enqueued_time_ns: self.instrumentation.current_time(),
57            name: message_type,
58            function: Box::new(function),
59        };
60        if let Err(_) = self.send_message(message) {
61            async { Err(AsyncSendError::Dropped) }.boxed()
62        } else {
63            future.boxed()
64        }
65    }
66}