use crate::concurrency::Duration;
use crate::concurrency::JoinHandle;
use crate::concurrency::{self};
use crate::ActorCell;
use crate::ActorRef;
use crate::DerivedActorRef;
use crate::Message;
use crate::MessagingErr;
use crate::RpcReplyPort;
pub mod call_result;
pub use call_result::CallResult;
#[cfg(test)]
mod tests;
fn internal_cast<F, TMessage>(sender: F, msg: TMessage) -> Result<(), MessagingErr<TMessage>>
where
F: Fn(TMessage) -> Result<(), MessagingErr<TMessage>>,
TMessage: Message,
{
sender(msg)
}
fn internal_call<F, TMessage, TReply, TMsgBuilder>(
sender: F,
msg_builder: TMsgBuilder,
timeout_option: Option<Duration>,
) -> impl std::future::Future<Output = Result<CallResult<TReply>, MessagingErr<TMessage>>> + Send
where
F: Fn(TMessage) -> Result<(), MessagingErr<TMessage>>,
TMessage: Message,
TMsgBuilder: FnOnce(RpcReplyPort<TReply>) -> TMessage,
TReply: Send + 'static,
{
let (tx, rx) = concurrency::oneshot();
let port: RpcReplyPort<TReply> = match timeout_option {
Some(duration) => (tx, duration).into(),
None => tx.into(),
};
let sent = sender(msg_builder(port));
async move {
sent?;
Ok(if let Some(duration) = timeout_option {
match crate::concurrency::timeout(duration, rx).await {
Ok(Ok(result)) => CallResult::Success(result),
Ok(Err(_send_err)) => CallResult::SenderError,
Err(_timeout_err) => CallResult::Timeout,
}
} else {
match rx.await {
Ok(result) => CallResult::Success(result),
Err(_send_err) => CallResult::SenderError,
}
})
}
}
pub fn cast<TMessage>(actor: &ActorCell, msg: TMessage) -> Result<(), MessagingErr<TMessage>>
where
TMessage: Message,
{
internal_cast(|m| actor.send_message::<TMessage>(m), msg)
}
pub async fn call<TMessage, TReply, TMsgBuilder>(
actor: &ActorCell,
msg_builder: TMsgBuilder,
timeout_option: Option<Duration>,
) -> Result<CallResult<TReply>, MessagingErr<TMessage>>
where
TMessage: Message,
TMsgBuilder: FnOnce(RpcReplyPort<TReply>) -> TMessage,
TReply: Send + 'static,
{
internal_call(|m| actor.send_message(m), msg_builder, timeout_option).await
}
pub async fn multi_call<TMessage, TReply, TMsgBuilder>(
actors: &[ActorRef<TMessage>],
msg_builder: TMsgBuilder,
timeout_option: Option<Duration>,
) -> Result<Vec<CallResult<TReply>>, MessagingErr<TMessage>>
where
TMessage: Message,
TReply: Send + 'static,
TMsgBuilder: Fn(RpcReplyPort<TReply>) -> TMessage,
{
let mut rx_ports = Vec::with_capacity(actors.len());
for actor in actors {
let (tx, rx) = concurrency::oneshot();
let port: RpcReplyPort<TReply> = match timeout_option {
Some(duration) => (tx, duration).into(),
None => tx.into(),
};
actor.cast(msg_builder(port))?;
rx_ports.push(rx);
}
let mut results = Vec::new();
let mut join_set = crate::concurrency::JoinSet::new();
for (i, rx) in rx_ports.into_iter().enumerate() {
if let Some(duration) = timeout_option {
join_set.spawn(async move {
(
i,
match crate::concurrency::timeout(duration, rx).await {
Ok(Ok(result)) => CallResult::Success(result),
Ok(Err(_send_err)) => CallResult::SenderError,
Err(_) => CallResult::Timeout,
},
)
});
} else {
join_set.spawn(async move {
(
i,
match rx.await {
Ok(result) => CallResult::Success(result),
Err(_send_err) => CallResult::SenderError,
},
)
});
}
}
results.resize_with(join_set.len(), || CallResult::Timeout);
while let Some(result) = join_set.join_next().await {
match result {
Ok((i, r)) => results[i] = r,
_ => return Err(MessagingErr::ChannelClosed),
}
}
Ok(results)
}
#[allow(clippy::type_complexity)]
pub fn call_and_forward<TMessage, TForwardMessage, TReply, TMsgBuilder, FwdMapFn>(
actor: &ActorCell,
msg_builder: TMsgBuilder,
response_forward: ActorCell,
forward_mapping: FwdMapFn,
timeout_option: Option<Duration>,
) -> Result<JoinHandle<CallResult<Result<(), MessagingErr<TForwardMessage>>>>, MessagingErr<TMessage>>
where
TMessage: Message,
TReply: Send + 'static,
TMsgBuilder: FnOnce(RpcReplyPort<TReply>) -> TMessage,
TForwardMessage: Message,
FwdMapFn: FnOnce(TReply) -> TForwardMessage + Send + 'static,
{
let (tx, rx) = concurrency::oneshot();
let port: RpcReplyPort<TReply> = match timeout_option {
Some(duration) => (tx, duration).into(),
None => tx.into(),
};
actor.send_message::<TMessage>(msg_builder(port))?;
Ok(crate::concurrency::spawn(async move {
if let Some(duration) = timeout_option {
match crate::concurrency::timeout(duration, rx).await {
Ok(Ok(result)) => CallResult::Success(result),
Ok(Err(_send_err)) => CallResult::SenderError,
Err(_timeout_err) => CallResult::Timeout,
}
} else {
match rx.await {
Ok(result) => CallResult::Success(result),
Err(_send_err) => CallResult::SenderError,
}
}
.map(|msg| response_forward.send_message::<TForwardMessage>(forward_mapping(msg)))
}))
}
impl<TMessage> ActorRef<TMessage>
where
TMessage: Message,
{
pub fn cast(&self, msg: TMessage) -> Result<(), MessagingErr<TMessage>> {
cast::<TMessage>(&self.inner, msg)
}
pub async fn call<TReply, TMsgBuilder>(
&self,
msg_builder: TMsgBuilder,
timeout_option: Option<Duration>,
) -> Result<CallResult<TReply>, MessagingErr<TMessage>>
where
TMsgBuilder: FnOnce(RpcReplyPort<TReply>) -> TMessage,
TReply: Send + 'static,
{
call::<TMessage, TReply, TMsgBuilder>(&self.inner, msg_builder, timeout_option).await
}
#[allow(clippy::type_complexity)]
pub fn call_and_forward<TReply, TForwardMessage, TMsgBuilder, TFwdMessageBuilder>(
&self,
msg_builder: TMsgBuilder,
response_forward: &ActorRef<TForwardMessage>,
forward_mapping: TFwdMessageBuilder,
timeout_option: Option<Duration>,
) -> Result<
crate::concurrency::JoinHandle<CallResult<Result<(), MessagingErr<TForwardMessage>>>>,
MessagingErr<TMessage>,
>
where
TReply: Send + 'static,
TMsgBuilder: FnOnce(RpcReplyPort<TReply>) -> TMessage,
TForwardMessage: Message,
TFwdMessageBuilder: FnOnce(TReply) -> TForwardMessage + Send + 'static,
{
call_and_forward::<TMessage, TForwardMessage, TReply, TMsgBuilder, TFwdMessageBuilder>(
&self.inner,
msg_builder,
response_forward.inner.clone(),
forward_mapping,
timeout_option,
)
}
}
impl<TMessage> DerivedActorRef<TMessage>
where
TMessage: Message,
{
pub fn cast(&self, msg: TMessage) -> Result<(), MessagingErr<TMessage>> {
internal_cast(|m| self.send_message(m), msg)
}
pub async fn call<TReply, TMsgBuilder>(
&self,
msg_builder: TMsgBuilder,
timeout_option: Option<Duration>,
) -> Result<CallResult<TReply>, MessagingErr<TMessage>>
where
TMsgBuilder: FnOnce(RpcReplyPort<TReply>) -> TMessage,
TReply: Send + 'static,
{
internal_call(|m| self.send_message(m), msg_builder, timeout_option).await
}
}