near_async/tokio/
sender.rs1use std::fmt::Debug;
2
3use futures::FutureExt;
4use futures::future::BoxFuture;
5
6use crate::futures::{DelayedActionRunner, FutureSpawner};
7use crate::instrumentation::InstrumentedThreadWriterSharedPart;
8use crate::messaging::{AsyncSendError, CanSend, CanSendAsync, HandlerWithContext};
9use crate::tokio::runtime_handle::{TokioRuntimeHandle, TokioRuntimeMessage};
10use crate::{next_message_sequence_num, pretty_type_name};
11use std::pin::Pin;
12use std::sync::Arc;
13use std::task::{Context, Poll};
14
15impl<A, M> CanSend<M> for TokioRuntimeHandle<A>
16where
17 A: HandlerWithContext<M> + 'static,
18 M: Debug + Send + 'static,
19{
20 fn send(&self, message: M) {
21 let seq = next_message_sequence_num();
22 let message_type = pretty_type_name::<M>();
23 tracing::trace!(target: "tokio_runtime", seq, message_type, ?message, "sending sync message");
24
25 let function = |actor: &mut A, ctx: &mut dyn DelayedActionRunner<A>| {
26 actor.handle(message, ctx);
27 };
28
29 let message = TokioRuntimeMessage {
30 seq,
31 enqueued_time_ns: self.instrumentation.current_time(),
32 name: message_type,
33 function: Box::new(function),
34 };
35 if let Err(_) = self.send_message(message) {
36 tracing::info!(target: "tokio_runtime", seq, "Ignoring sync message, receiving actor is being shut down");
37 }
38 }
39}
40
41impl<A, M, R> CanSendAsync<M, R> for TokioRuntimeHandle<A>
42where
43 A: HandlerWithContext<M, R> + 'static,
44 M: Debug + Send + 'static,
45 R: Send + 'static,
46{
47 fn send_async(&self, message: M) -> BoxFuture<'static, Result<R, AsyncSendError>> {
48 let seq = next_message_sequence_num();
49 let message_type = pretty_type_name::<M>();
50 tracing::trace!(target: "tokio_runtime", seq, message_type, ?message, "sending async message");
51 let (sender, receiver) = tokio::sync::oneshot::channel();
52 let future = async move { receiver.await.map_err(|_| AsyncSendError::Dropped) };
53 let function = move |actor: &mut A, ctx: &mut dyn DelayedActionRunner<A>| {
54 let result = actor.handle(message, ctx);
55 sender.send(result).ok(); };
57 let message = TokioRuntimeMessage {
58 seq,
59 enqueued_time_ns: self.instrumentation.current_time(),
60 name: message_type,
61 function: Box::new(function),
62 };
63 if let Err(_) = self.send_message(message) {
64 async { Err(AsyncSendError::Dropped) }.boxed()
65 } else {
66 future.boxed()
67 }
68 }
69}
70
71impl<A> FutureSpawner for TokioRuntimeHandle<A> {
72 fn spawn_boxed(&self, description: &'static str, f: BoxFuture<'static, ()>) {
73 tracing::debug!(target: "tokio_runtime", description, "spawning future");
74 self.runtime_handle.spawn(InstrumentingFuture::new(
75 description,
76 f,
77 self.instrumentation.clone(),
78 ));
79 }
80}
81
82impl<A> DelayedActionRunner<A> for TokioRuntimeHandle<A>
83where
84 A: 'static,
85{
86 fn run_later_boxed(
87 &mut self,
88 name: &'static str,
89 dur: near_time::Duration,
90 f: Box<dyn FnOnce(&mut A, &mut dyn DelayedActionRunner<A>) + Send + 'static>,
91 ) {
92 let seq = next_message_sequence_num();
93 tracing::debug!(target: "tokio_runtime", seq, name, "sending delayed action");
94 let handle = self.clone();
95 self.runtime_handle.spawn(async move {
96 tokio::time::sleep(dur.unsigned_abs()).await;
97 let function = move |actor: &mut A, ctx: &mut dyn DelayedActionRunner<A>| f(actor, ctx);
98 let message = TokioRuntimeMessage {
99 seq,
100 enqueued_time_ns: handle.instrumentation.current_time(),
101 name,
102 function: Box::new(function),
103 };
104 handle.send_message(message).ok();
106 });
107 }
108}
109
110struct InstrumentingFuture {
112 description: &'static str,
113 future: futures::future::BoxFuture<'static, ()>,
114 instrumentation: Arc<InstrumentedThreadWriterSharedPart>,
115}
116
117impl InstrumentingFuture {
118 pub fn new(
119 description: &'static str,
120 future: futures::future::BoxFuture<'static, ()>,
121 shared_instrumentation: Arc<InstrumentedThreadWriterSharedPart>,
122 ) -> Self {
123 shared_instrumentation.queue().enqueue(description);
124 Self { description, future, instrumentation: shared_instrumentation }
125 }
126}
127
128impl Future for InstrumentingFuture {
129 type Output = ();
130
131 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
132 self.instrumentation.with_thread_local_writer(|writer| {
133 writer.start_event(
134 self.description,
135 0, )
137 });
138 let result = Pin::new(&mut self.future).poll(cx);
139 self.instrumentation.with_thread_local_writer(|writer| writer.end_event(&self.description));
140 if let Poll::Ready(()) = result {
141 self.instrumentation.queue().dequeue(self.description);
142 }
143 result
144 }
145}