near_async/messaging.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308
use crate::break_apart::BreakApart;
use crate::functional::{SendAsyncFunction, SendFunction};
use crate::futures::DelayedActionRunner;
use futures::future::BoxFuture;
use futures::FutureExt;
use once_cell::sync::OnceCell;
use std::fmt::{Debug, Display};
use std::sync::Arc;
use tokio::sync::oneshot;
/// Trait for an actor. An actor is a struct that can handle messages and implementes the Handler or
/// HandlerWithContext trait. We can optionally implement the start_actor trait which is executed in
/// the beginning of the actor's lifecycle.
/// This corresponds to the actix::Actor trait `started` function.
pub trait Actor {
fn start_actor(&mut self, _ctx: &mut dyn DelayedActionRunner<Self>) {}
fn wrap_handler<M: actix::Message>(
&mut self,
msg: M,
ctx: &mut dyn DelayedActionRunner<Self>,
f: impl FnOnce(&mut Self, M, &mut dyn DelayedActionRunner<Self>) -> M::Result,
) -> M::Result {
f(self, msg, ctx)
}
}
/// Trait for handling a message.
/// This works in unison with the [`CanSend`] trait. An actor implements the Handler trait for all
/// messages it would like to handle, while the CanSend trait implements the logic to send the
/// message to the actor. Handle and CanSend are typically not both implemented by the same struct.
/// Note that the actor is any struct that implements the Handler trait, not just actix actors.
pub trait Handler<M: actix::Message> {
fn handle(&mut self, msg: M) -> M::Result;
}
/// Trait for handling a message with context.
/// This is similar to the [`Handler`] trait, but it allows the handler to access the delayed action
/// runner that is used to schedule actions to be run in the future. For actix::Actor, the context
/// defined as actix::Context<Self> implements DelayedActionRunner<T>.
/// Note that the implementer for hander of a message only needs to implement either of Handler or
/// HandlerWithContext, not both.
pub trait HandlerWithContext<M: actix::Message> {
fn handle(&mut self, msg: M, ctx: &mut dyn DelayedActionRunner<Self>) -> M::Result;
}
impl<A, M> HandlerWithContext<M> for A
where
M: actix::Message,
A: Actor + Handler<M>,
{
fn handle(&mut self, msg: M, ctx: &mut dyn DelayedActionRunner<Self>) -> M::Result {
self.wrap_handler(msg, ctx, |this, msg, _| Handler::handle(this, msg))
}
}
/// Trait for sending a typed message. The sent message is then handled by the Handler trait.
/// actix::Addr, which is derived from actix::Actor is an example of a struct that implements CanSend.
/// See [`Handler`] trait for more details.
pub trait CanSend<M>: Send + Sync + 'static {
fn send(&self, message: M);
}
/// Wraps a CanSend. This should be used to pass around an Arc<dyn CanSend<M>>, instead
/// of spelling out that type. Using a wrapper struct allows us to define more flexible
/// APIs.
pub struct Sender<M: 'static> {
sender: Arc<dyn CanSend<M>>,
}
impl<M> Clone for Sender<M> {
fn clone(&self) -> Self {
Self { sender: self.sender.clone() }
}
}
/// Extension functions to wrap a CanSend as a Sender.
pub trait IntoSender<M> {
/// This allows conversion of an owned CanSend into a Sender.
fn into_sender(self) -> Sender<M>;
/// This allows conversion of a reference-counted CanSend into a Sender.
fn as_sender(self: &Arc<Self>) -> Sender<M>;
}
impl<M, T: CanSend<M>> IntoSender<M> for T {
fn into_sender(self) -> Sender<M> {
Sender::from_impl(self)
}
fn as_sender(self: &Arc<Self>) -> Sender<M> {
Sender::from_arc(self.clone())
}
}
impl<M> Sender<M> {
/// Sends a message. It's the responsibility of the underlying CanSend
/// implementation to decide how to handle the message.
pub fn send(&self, message: M) {
self.sender.send(message)
}
fn from_impl(sender: impl CanSend<M> + 'static) -> Self {
Self { sender: Arc::new(sender) }
}
fn from_arc<T: CanSend<M> + 'static>(arc: Arc<T>) -> Self {
Self { sender: arc }
}
/// Creates a sender that handles messages using the given function.
pub fn from_fn(send: impl Fn(M) + Send + Sync + 'static) -> Self {
Self::from_impl(SendFunction::new(send))
}
/// Creates an object that implements `CanSend<Inner>` for any message `Inner`
/// that can be converted to `M`.
pub fn break_apart(self) -> BreakApart<M> {
BreakApart { sender: self }
}
}
/// Extension trait (not for anyone to implement), that allows a
/// Sender<MessageWithCallback<M, R>> to be used to send a message and then
/// getting a future of the response.
///
/// See `AsyncSendError` for reasons that the future may resolve to an error result.
pub trait SendAsync<M, R: Send + 'static> {
fn send_async(&self, message: M) -> BoxFuture<'static, Result<R, AsyncSendError>>;
}
impl<M, R: Send + 'static, A: CanSend<MessageWithCallback<M, R>> + ?Sized> SendAsync<M, R> for A {
fn send_async(&self, message: M) -> BoxFuture<'static, Result<R, AsyncSendError>> {
// To send a message and get a future of the response, we use a oneshot
// channel that is resolved when the responder function is called. It's
// possible that someone implementing the Sender would just drop the
// message without calling the responder, in which case we return a
// Dropped error.
let (sender, receiver) = oneshot::channel::<Result<R, AsyncSendError>>();
let future = async move { receiver.await.unwrap_or_else(|_| Err(AsyncSendError::Dropped)) };
let responder = Box::new(move |r| {
// It's ok for the send to return an error, because that means the receiver is dropped
// therefore the sender does not care about the result.
sender.send(r).ok();
});
self.send(MessageWithCallback { message, callback: responder });
future.boxed()
}
}
impl<M, R: Send + 'static> Sender<MessageWithCallback<M, R>> {
/// Same as the above, but for a concrete Sender type.
pub fn send_async(&self, message: M) -> BoxFuture<'static, Result<R, AsyncSendError>> {
self.sender.send_async(message)
}
/// Creates a sender that would handle send_async messages by producing a
/// result synchronously. Note that the provided function is NOT async.
///
/// (Implementing the similar functionality with async function is possible
/// but requires deciding who drives the async function; a FutureSpawner
/// can be a good idea.)
pub fn from_async_fn(send_async: impl Fn(M) -> R + Send + Sync + 'static) -> Self {
Self::from_impl(SendAsyncFunction::new(send_async))
}
}
/// Generic failure for async send.
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum AsyncSendError {
// The receiver side could not accept the message.
Closed,
// The receiver side could not process the message in time.
Timeout,
// The message was ignored entirely.
Dropped,
}
impl Display for AsyncSendError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
Debug::fmt(self, f)
}
}
/// Used to implement an async sender. An async sender is just a Sender whose
/// message is `MessageWithCallback<M, R>`, which is a message plus a
/// callback function (which resolves the future that send_async returns).
pub struct MessageWithCallback<T, R> {
pub message: T,
pub callback: Box<dyn FnOnce(Result<R, AsyncSendError>) + Send>,
}
impl<T: Debug, R> Debug for MessageWithCallback<T, R> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_tuple("MessageWithCallback").field(&self.message).finish()
}
}
pub type AsyncSender<M, R> = Sender<MessageWithCallback<M, R>>;
/// Sometimes we want to be able to pass in a sender that has not yet been fully constructed.
/// LateBoundSender can act as a placeholder to pass CanSend and CanSendAsync capabilities
/// through to the inner object. bind() should be called when the inner object is ready.
/// All calls to send() and send_async() through this wrapper will block until bind() is called.
///
/// Example:
/// let late_bound = LateBoundSender::new();
/// let something_else = SomethingElse::new(late_bound.as_sender());
/// let implementation = Implementation::new(something_else);
/// late_bound.bind(implementation);
pub struct LateBoundSender<S> {
sender: OnceCell<S>,
}
impl<S> LateBoundSender<S> {
pub fn new() -> Arc<Self> {
Arc::new(Self { sender: OnceCell::new() })
}
pub fn bind(&self, sender: S) {
self.sender.set(sender).map_err(|_| ()).expect("cannot set sender twice");
}
}
/// Allows LateBoundSender to be convertible to a Sender as long as the inner object could be.
impl<M, S: CanSend<M>> CanSend<M> for LateBoundSender<S> {
fn send(&self, message: M) {
self.sender.wait().send(message);
}
}
pub struct Noop;
impl<M> CanSend<M> for Noop {
fn send(&self, _message: M) {}
}
/// Creates a no-op sender that does nothing with the message.
///
/// Returns a type that can be converted to any type of sender,
/// sync or async, including multi-senders.
pub fn noop() -> Noop {
Noop
}
/// A trait for converting something that implements individual senders into
/// a multi-sender.
pub trait IntoMultiSender<A> {
fn as_multi_sender(self: &Arc<Self>) -> A;
fn into_multi_sender(self) -> A;
}
pub trait MultiSenderFrom<A> {
fn multi_sender_from(a: Arc<A>) -> Self;
}
impl<A, B: MultiSenderFrom<A>> IntoMultiSender<B> for A {
fn as_multi_sender(self: &Arc<Self>) -> B {
B::multi_sender_from(self.clone())
}
fn into_multi_sender(self) -> B {
B::multi_sender_from(Arc::new(self))
}
}
#[cfg(test)]
mod tests {
use crate::messaging::{AsyncSendError, MessageWithCallback, Sender};
use tokio_util::sync::CancellationToken;
#[tokio::test]
async fn test_async_send_sender_dropped() {
// The handler drops the callback, making the response will never resolve.
let sender: Sender<MessageWithCallback<u32, u32>> = Sender::from_fn(|_| {});
let result = sender.send_async(4).await;
assert_eq!(result, Err(AsyncSendError::Dropped));
}
#[tokio::test]
async fn test_async_send_receiver_dropped() {
// Test that if the receiver is dropped, the sending side will not panic.
let result_blocker = CancellationToken::new();
let callback_done = CancellationToken::new();
let default_panic = std::panic::take_hook();
std::panic::set_hook(Box::new(move |info| {
default_panic(info);
std::process::abort();
}));
let sender = {
let result_blocker = result_blocker.clone();
let callback_done = callback_done.clone();
Sender::<MessageWithCallback<u32, u32>>::from_fn(move |msg| {
let MessageWithCallback { message, callback } = msg;
let result_blocker = result_blocker.clone();
let callback_done = callback_done.clone();
tokio::spawn(async move {
result_blocker.cancelled().await;
callback(Ok(message));
callback_done.cancel();
});
})
};
drop(sender.send_async(4)); // drops the resulting future
result_blocker.cancel();
callback_done.cancelled().await;
}
}