1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74
// Copyright (c) Sean Lawlor
//
// This source code is licensed under both the MIT license found in the
// LICENSE-MIT file in the root directory of this source tree.
//! Port implementations for signaling and reception of messages in the Ractor environment
//!
//! Most of the ports we utilize are direct aliases of [tokio]'s channels
//! (in the `sync` feature of the crate), however there are some helpful wrappers
//! and utilities to make working with mailbox processing in `ractor` easier in
//! the actor framework.
use crate::concurrency;
use crate::MessagingErr;
// ============ Output Ports ============ //
pub mod output;
pub use output::*;
// ============ Rpc (one-use) Ports ============ //
/// A remote procedure call's reply port. Wrapper of [concurrency::OneshotSender] with a
/// consistent error type
#[derive(Debug)]
pub struct RpcReplyPort<TMsg> {
port: concurrency::OneshotSender<TMsg>,
timeout: Option<concurrency::Duration>,
}
impl<TMsg> RpcReplyPort<TMsg> {
/// Read the timeout of this RPC reply port
///
/// Returns [Some(concurrency::Duration)] if a timeout is set, [None] otherwise
pub fn get_timeout(&self) -> Option<concurrency::Duration> {
self.timeout
}
/// Send a message to the Rpc reply port. This consumes the port
///
/// * `msg` - The message to send
///
/// Returns [Ok(())] if the message send was successful, [Err(MessagingErr)] otherwise
pub fn send(self, msg: TMsg) -> Result<(), MessagingErr<TMsg>> {
self.port.send(msg).map_err(|t| MessagingErr::SendErr(t))
}
/// Determine if the port is closed (i.e. the receiver has been dropped)
///
/// Returns [true] if the receiver has been dropped and the channel is
/// closed, this means sends will fail, [false] if channel is open and
/// receiving messages
pub fn is_closed(&self) -> bool {
self.port.is_closed()
}
}
impl<TMsg> From<concurrency::OneshotSender<TMsg>> for RpcReplyPort<TMsg> {
fn from(value: concurrency::OneshotSender<TMsg>) -> Self {
Self {
port: value,
timeout: None,
}
}
}
impl<TMsg> From<(concurrency::OneshotSender<TMsg>, concurrency::Duration)> for RpcReplyPort<TMsg> {
fn from((value, timeout): (concurrency::OneshotSender<TMsg>, concurrency::Duration)) -> Self {
Self {
port: value,
timeout: Some(timeout),
}
}
}