near_async/tokio/
sender.rs

1use std::fmt::Debug;
2
3use futures::FutureExt;
4use futures::future::BoxFuture;
5
6use crate::futures::{DelayedActionRunner, FutureSpawner};
7use crate::instrumentation::InstrumentedThreadWriterSharedPart;
8use crate::messaging::{AsyncSendError, CanSend, CanSendAsync, HandlerWithContext};
9use crate::tokio::runtime_handle::{TokioRuntimeHandle, TokioRuntimeMessage};
10use crate::{next_message_sequence_num, pretty_type_name};
11use std::pin::Pin;
12use std::sync::Arc;
13use std::task::{Context, Poll};
14
15impl<A, M> CanSend<M> for TokioRuntimeHandle<A>
16where
17    A: HandlerWithContext<M> + 'static,
18    M: Debug + Send + 'static,
19{
20    fn send(&self, message: M) {
21        let seq = next_message_sequence_num();
22        let message_type = pretty_type_name::<M>();
23        tracing::trace!(target: "tokio_runtime", seq, message_type, ?message, "sending sync message");
24
25        let function = |actor: &mut A, ctx: &mut dyn DelayedActionRunner<A>| {
26            actor.handle(message, ctx);
27        };
28
29        let message = TokioRuntimeMessage {
30            seq,
31            enqueued_time_ns: self.instrumentation.current_time(),
32            name: message_type,
33            function: Box::new(function),
34        };
35        if let Err(_) = self.send_message(message) {
36            tracing::info!(target: "tokio_runtime", seq, "Ignoring sync message, receiving actor is being shut down");
37        }
38    }
39}
40
41impl<A, M, R> CanSendAsync<M, R> for TokioRuntimeHandle<A>
42where
43    A: HandlerWithContext<M, R> + 'static,
44    M: Debug + Send + 'static,
45    R: Send + 'static,
46{
47    fn send_async(&self, message: M) -> BoxFuture<'static, Result<R, AsyncSendError>> {
48        let seq = next_message_sequence_num();
49        let message_type = pretty_type_name::<M>();
50        tracing::trace!(target: "tokio_runtime", seq, message_type, ?message, "sending async message");
51        let (sender, receiver) = tokio::sync::oneshot::channel();
52        let future = async move { receiver.await.map_err(|_| AsyncSendError::Dropped) };
53        let function = move |actor: &mut A, ctx: &mut dyn DelayedActionRunner<A>| {
54            let result = actor.handle(message, ctx);
55            sender.send(result).ok(); // OK if the sender doesn't care about the result anymore.
56        };
57        let message = TokioRuntimeMessage {
58            seq,
59            enqueued_time_ns: self.instrumentation.current_time(),
60            name: message_type,
61            function: Box::new(function),
62        };
63        if let Err(_) = self.send_message(message) {
64            async { Err(AsyncSendError::Dropped) }.boxed()
65        } else {
66            future.boxed()
67        }
68    }
69}
70
71impl<A> FutureSpawner for TokioRuntimeHandle<A> {
72    fn spawn_boxed(&self, description: &'static str, f: BoxFuture<'static, ()>) {
73        tracing::debug!(target: "tokio_runtime", description, "spawning future");
74        self.runtime_handle.spawn(InstrumentingFuture::new(
75            description,
76            f,
77            self.instrumentation.clone(),
78        ));
79    }
80}
81
82impl<A> DelayedActionRunner<A> for TokioRuntimeHandle<A>
83where
84    A: 'static,
85{
86    fn run_later_boxed(
87        &mut self,
88        name: &'static str,
89        dur: near_time::Duration,
90        f: Box<dyn FnOnce(&mut A, &mut dyn DelayedActionRunner<A>) + Send + 'static>,
91    ) {
92        let seq = next_message_sequence_num();
93        tracing::debug!(target: "tokio_runtime", seq, name, "sending delayed action");
94        let handle = self.clone();
95        self.runtime_handle.spawn(async move {
96            tokio::time::sleep(dur.unsigned_abs()).await;
97            let function = move |actor: &mut A, ctx: &mut dyn DelayedActionRunner<A>| f(actor, ctx);
98            let message = TokioRuntimeMessage {
99                seq,
100                enqueued_time_ns: handle.instrumentation.current_time(),
101                name,
102                function: Box::new(function),
103            };
104            // It's ok for this to fail; it means the runtime is shutting down already.
105            handle.send_message(message).ok();
106        });
107    }
108}
109
110/// Instruments the future, recording executions and manages its existence in the queue.
111struct InstrumentingFuture {
112    description: &'static str,
113    future: futures::future::BoxFuture<'static, ()>,
114    instrumentation: Arc<InstrumentedThreadWriterSharedPart>,
115}
116
117impl InstrumentingFuture {
118    pub fn new(
119        description: &'static str,
120        future: futures::future::BoxFuture<'static, ()>,
121        shared_instrumentation: Arc<InstrumentedThreadWriterSharedPart>,
122    ) -> Self {
123        shared_instrumentation.queue().enqueue(description);
124        Self { description, future, instrumentation: shared_instrumentation }
125    }
126}
127
128impl Future for InstrumentingFuture {
129    type Output = ();
130
131    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
132        self.instrumentation.with_thread_local_writer(|writer| {
133            writer.start_event(
134                self.description,
135                0, /* we don't know the dequeue time unfortunately */
136            )
137        });
138        let result = Pin::new(&mut self.future).poll(cx);
139        self.instrumentation.with_thread_local_writer(|writer| writer.end_event(&self.description));
140        if let Poll::Ready(()) = result {
141            self.instrumentation.queue().dequeue(self.description);
142        }
143        result
144    }
145}