1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329
// Copyright (c) Sean Lawlor
//
// This source code is licensed under both the MIT license found in the
// LICENSE-MIT file in the root directory of this source tree.
//! Remote procedure calls (RPC) are helpful communication primitives to communicate with actors
//!
//! There are generally 2 kinds of RPCs, `cast` and `call`, and their definition comes from the
//! standard [Erlang `gen_server`](https://www.erlang.org/doc/man/gen_server.html#cast-2).
//! The tl;dr is that `cast` is an send without waiting on a reply while `call` is expecting
//! a reply from the actor being communicated with.
//!
//! ## Examples
//!
//! ```rust
//! use ractor::concurrency::Duration;
//! use ractor::{call, call_t, cast};
//! use ractor::{Actor, ActorProcessingErr, ActorRef, RpcReplyPort};
//!
//! struct ExampleActor;
//!
//! enum ExampleMessage {
//! Cast,
//! Call(RpcReplyPort<String>),
//! }
//!
//! #[cfg(feature = "cluster")]
//! impl ractor::Message for ExampleMessage {}
//!
//! #[cfg_attr(feature = "async-trait", ractor::async_trait)]
//! impl Actor for ExampleActor {
//! type Msg = ExampleMessage;
//! type State = ();
//! type Arguments = ();
//!
//! async fn pre_start(
//! &self,
//! _myself: ActorRef<Self::Msg>,
//! _args: Self::Arguments,
//! ) -> Result<Self::State, ActorProcessingErr> {
//! println!("Starting");
//! Ok(())
//! }
//!
//! async fn handle(
//! &self,
//! _myself: ActorRef<Self::Msg>,
//! message: Self::Msg,
//! _state: &mut Self::State,
//! ) -> Result<(), ActorProcessingErr> {
//! match message {
//! ExampleMessage::Cast => println!("Cast message"),
//! ExampleMessage::Call(reply) => {
//! println!("Call message");
//! let _ = reply.send("a reply".to_string());
//! }
//! }
//! Ok(())
//! }
//! }
//!
//! #[tokio::main]
//! async fn main() {
//! let (actor, handle) = Actor::spawn(None, ExampleActor, ())
//! .await
//! .expect("Failed to startup dummy actor");
//!
//! // send a 1-way message (equivalent patterns)
//! actor
//! .cast(ExampleMessage::Cast)
//! .expect("Failed to send message");
//! cast!(actor, ExampleMessage::Cast).expect("Failed to send message");
//!
//! // Send a message to the actor, with an associated reply channel,
//! // and wait for the reply from the actor (optionally up to a timeout)
//! let _result = actor
//! .call(ExampleMessage::Call, Some(Duration::from_millis(100)))
//! .await
//! .expect("Failed to call actor");
//! let _result = call!(actor, ExampleMessage::Call).expect("Failed to call actor");
//! let _result =
//! call_t!(actor, ExampleMessage::Call, 100).expect("Failed to call actor with timeout");
//!
//! // wait for actor exit
//! actor.stop(None);
//! handle.await.unwrap();
//! }
//! ```
use crate::concurrency::{self, Duration, JoinHandle};
use crate::{ActorCell, ActorRef, Message, MessagingErr, RpcReplyPort};
pub mod call_result;
pub use call_result::CallResult;
#[cfg(test)]
mod tests;
/// Sends an asynchronous request to the specified actor, ignoring if the
/// actor is alive or healthy and simply returns immediately
///
/// * `actor` - A reference to the [ActorCell] to communicate with
/// * `msg` - The message to send to the actor
///
/// Returns [Ok(())] upon successful send, [Err(MessagingErr)] otherwise
pub fn cast<TMessage>(actor: &ActorCell, msg: TMessage) -> Result<(), MessagingErr<TMessage>>
where
TMessage: Message,
{
actor.send_message::<TMessage>(msg)
}
/// Sends an asynchronous request to the specified actor, building a one-time
/// use reply channel and awaiting the result with the specified timeout
///
/// * `actor` - A reference to the [ActorCell] to communicate with
/// * `msg_builder` - The [FnOnce] to construct the message
/// * `timeout_option` - An optional [Duration] which represents the amount of
/// time until the operation times out
///
/// Returns [Ok(CallResult)] upon successful initial sending with the reply from
/// the [crate::Actor], [Err(MessagingErr)] if the initial send operation failed
pub async fn call<TMessage, TReply, TMsgBuilder>(
actor: &ActorCell,
msg_builder: TMsgBuilder,
timeout_option: Option<Duration>,
) -> Result<CallResult<TReply>, MessagingErr<TMessage>>
where
TMessage: Message,
TMsgBuilder: FnOnce(RpcReplyPort<TReply>) -> TMessage,
{
let (tx, rx) = concurrency::oneshot();
let port: RpcReplyPort<TReply> = match timeout_option {
Some(duration) => (tx, duration).into(),
None => tx.into(),
};
actor.send_message::<TMessage>(msg_builder(port))?;
// wait for the reply
Ok(if let Some(duration) = timeout_option {
match crate::concurrency::timeout(duration, rx).await {
Ok(Ok(result)) => CallResult::Success(result),
Ok(Err(_send_err)) => CallResult::SenderError,
Err(_timeout_err) => CallResult::Timeout,
}
} else {
match rx.await {
Ok(result) => CallResult::Success(result),
Err(_send_err) => CallResult::SenderError,
}
})
}
/// Sends an asynchronous request to the specified actors, building a one-time
/// use reply channel for each actor and awaiting the results with the
/// specified timeout
///
/// * `actors` - A reference to the group of [ActorCell]s to communicate with
/// * `msg_builder` - The [FnOnce] to construct the message
/// * `timeout_option` - An optional [Duration] which represents the amount of
/// time until the operation times out
///
/// Returns [Ok(`Vec<CallResult<TReply>>>`)] upon successful initial sending with the reply from
/// the [crate::Actor]s, [Err(MessagingErr)] if the initial send operation failed
pub async fn multi_call<TMessage, TReply, TMsgBuilder>(
actors: &[ActorRef<TMessage>],
msg_builder: TMsgBuilder,
timeout_option: Option<Duration>,
) -> Result<Vec<CallResult<TReply>>, MessagingErr<TMessage>>
where
TMessage: Message,
TReply: Send + 'static,
TMsgBuilder: Fn(RpcReplyPort<TReply>) -> TMessage,
{
let mut rx_ports = Vec::with_capacity(actors.len());
// send to all actors
for actor in actors {
let (tx, rx) = concurrency::oneshot();
let port: RpcReplyPort<TReply> = match timeout_option {
Some(duration) => (tx, duration).into(),
None => tx.into(),
};
actor.cast(msg_builder(port))?;
rx_ports.push(rx);
}
let mut results = Vec::new();
let mut join_set = crate::concurrency::JoinSet::new();
for (i, rx) in rx_ports.into_iter().enumerate() {
if let Some(duration) = timeout_option {
join_set.spawn(async move {
(
i,
match crate::concurrency::timeout(duration, rx).await {
Ok(Ok(result)) => CallResult::Success(result),
Ok(Err(_send_err)) => CallResult::SenderError,
Err(_) => CallResult::Timeout,
},
)
});
} else {
join_set.spawn(async move {
(
i,
match rx.await {
Ok(result) => CallResult::Success(result),
Err(_send_err) => CallResult::SenderError,
},
)
});
}
}
// we threaded the index in order to maintain ordering from the originally called
// actors.
results.resize_with(join_set.len(), || CallResult::Timeout);
while let Some(result) = join_set.join_next().await {
match result {
Ok((i, r)) => results[i] = r,
_ => return Err(MessagingErr::ChannelClosed),
}
}
// wait for the replies
Ok(results)
}
/// Send a message asynchronously to another actor, waiting in a new task for the reply
/// and then forwarding the reply to a followup-actor. If this [CallResult] from the first
/// actor is not success, the forward is not sent.
///
/// * `actor` - A reference to the [ActorCell] to communicate with
/// * `msg_builder` - The [FnOnce] to construct the message
/// * `response_forward` - The [ActorCell] to forward the message to
/// * `forward_mapping` - The [FnOnce] which maps the response from the `actor` [ActorCell]'s reply message
/// type to the `response_forward` [ActorCell]'s message type
/// * `timeout_option` - An optional [Duration] which represents the amount of
/// time until the operation times out
///
/// Returns: A [JoinHandle<CallResult<()>>] which can be awaited to see if the
/// forward was successful or ignored
#[allow(clippy::type_complexity)]
pub fn call_and_forward<TMessage, TForwardMessage, TReply, TMsgBuilder, FwdMapFn>(
actor: &ActorCell,
msg_builder: TMsgBuilder,
response_forward: ActorCell,
forward_mapping: FwdMapFn,
timeout_option: Option<Duration>,
) -> Result<JoinHandle<CallResult<Result<(), MessagingErr<TForwardMessage>>>>, MessagingErr<TMessage>>
where
TMessage: Message,
TReply: Send + 'static,
TMsgBuilder: FnOnce(RpcReplyPort<TReply>) -> TMessage,
TForwardMessage: Message,
FwdMapFn: FnOnce(TReply) -> TForwardMessage + Send + 'static,
{
let (tx, rx) = concurrency::oneshot();
let port: RpcReplyPort<TReply> = match timeout_option {
Some(duration) => (tx, duration).into(),
None => tx.into(),
};
actor.send_message::<TMessage>(msg_builder(port))?;
// wait for the reply
Ok(crate::concurrency::spawn(async move {
if let Some(duration) = timeout_option {
match crate::concurrency::timeout(duration, rx).await {
Ok(Ok(result)) => CallResult::Success(result),
Ok(Err(_send_err)) => CallResult::SenderError,
Err(_timeout_err) => CallResult::Timeout,
}
} else {
match rx.await {
Ok(result) => CallResult::Success(result),
Err(_send_err) => CallResult::SenderError,
}
}
.map(|msg| response_forward.send_message::<TForwardMessage>(forward_mapping(msg)))
}))
}
impl<TMessage> ActorRef<TMessage>
where
TMessage: Message,
{
/// Alias of [cast]
pub fn cast(&self, msg: TMessage) -> Result<(), MessagingErr<TMessage>> {
cast::<TMessage>(&self.inner, msg)
}
/// Alias of [call]
pub async fn call<TReply, TMsgBuilder>(
&self,
msg_builder: TMsgBuilder,
timeout_option: Option<Duration>,
) -> Result<CallResult<TReply>, MessagingErr<TMessage>>
where
TMsgBuilder: FnOnce(RpcReplyPort<TReply>) -> TMessage,
{
call::<TMessage, TReply, TMsgBuilder>(&self.inner, msg_builder, timeout_option).await
}
/// Alias of [call_and_forward]
#[allow(clippy::type_complexity)]
pub fn call_and_forward<TReply, TForwardMessage, TMsgBuilder, TFwdMessageBuilder>(
&self,
msg_builder: TMsgBuilder,
response_forward: &ActorRef<TForwardMessage>,
forward_mapping: TFwdMessageBuilder,
timeout_option: Option<Duration>,
) -> Result<
crate::concurrency::JoinHandle<CallResult<Result<(), MessagingErr<TForwardMessage>>>>,
MessagingErr<TMessage>,
>
where
TReply: Send + 'static,
TMsgBuilder: FnOnce(RpcReplyPort<TReply>) -> TMessage,
TForwardMessage: Message,
TFwdMessageBuilder: FnOnce(TReply) -> TForwardMessage + Send + 'static,
{
call_and_forward::<TMessage, TForwardMessage, TReply, TMsgBuilder, TFwdMessageBuilder>(
&self.inner,
msg_builder,
response_forward.inner.clone(),
forward_mapping,
timeout_option,
)
}
}