#[allow(missing_debug_implementations)]
#[must_use = "request won't be sent without awaiting, or calling a send method"]
pub struct AskRequest<'a, A, M, Tm, Tr>
where
A: Actor + Message<M>,
M: Send + 'static,
{
actor_ref: &'a ActorRef<A>,
msg: M,
mailbox_timeout: Tm,
reply_timeout: Tr,
#[cfg(all(debug_assertions, feature = "tracing"))]
called_at: &'static std::panic::Location<'static>,
}
impl<'a, A, M, Tm, Tr> AskRequest<'a, A, M, Tm, Tr>
where
A: Actor + Message<M>,
M: Send + 'static,
{
pub(crate) fn new(
actor_ref: &'a ActorRef<A>,
msg: M,
#[cfg(all(debug_assertions, feature = "tracing"))] called_at: &'static std::panic::Location<
'static,
>,
) -> Self
where
Tm: Default,
Tr: Default,
{
AskRequest {
actor_ref,
msg,
mailbox_timeout: Tm::default(),
reply_timeout: Tr::default(),
#[cfg(all(debug_assertions, feature = "tracing"))]
called_at,
}
}
pub fn mailbox_timeout(
self,
duration: Duration,
) -> AskRequest<'a, A, M, WithRequestTimeout, Tr> {
self.mailbox_timeout_opt(Some(duration))
}
pub(crate) fn mailbox_timeout_opt(
self,
duration: Option<Duration>,
) -> AskRequest<'a, A, M, WithRequestTimeout, Tr> {
AskRequest {
actor_ref: self.actor_ref,
msg: self.msg,
mailbox_timeout: WithRequestTimeout(duration),
reply_timeout: self.reply_timeout,
#[cfg(all(debug_assertions, feature = "tracing"))]
called_at: self.called_at,
}
}
pub fn reply_timeout(self, duration: Duration) -> AskRequest<'a, A, M, Tm, WithRequestTimeout> {
self.reply_timeout_opt(Some(duration))
}
pub(crate) fn reply_timeout_opt(
self,
duration: Option<Duration>,
) -> AskRequest<'a, A, M, Tm, WithRequestTimeout> {
AskRequest {
actor_ref: self.actor_ref,
msg: self.msg,
mailbox_timeout: self.mailbox_timeout,
reply_timeout: WithRequestTimeout(duration),
#[cfg(all(debug_assertions, feature = "tracing"))]
called_at: self.called_at,
}
}
pub async fn send(
self,
) -> Result<<A::Reply as Reply>::Ok, SendError<M, <A::Reply as Reply>::Error>>
where
Tm: Into<Option<Duration>>,
Tr: Into<Option<Duration>>,
{
#[cfg(all(debug_assertions, feature = "tracing"))]
warn_deadlock(
self.actor_ref,
"An actor is sending an `ask` request to itself, which will likely lead to a deadlock. To avoid this, use a `tell` request instead.",
self.called_at,
);
let (reply, rx) = oneshot::channel();
let signal = Signal::Message {
message: Box::new(self.msg),
actor_ref: self.actor_ref.clone(),
reply: Some(reply),
sent_within_actor: self.actor_ref.is_current(),
};
let tx = self.actor_ref.mailbox_sender();
match self.mailbox_timeout.into() {
Some(timeout) => {
tx.send_timeout(signal, timeout).await?;
}
None => {
tx.send(signal).await?;
}
}
let reply = match self.reply_timeout.into() {
Some(timeout) => tokio::time::timeout(timeout, rx).await??,
None => rx.await?,
};
match reply {
Ok(val) => Ok(<A::Reply as Reply>::downcast_ok(val)),
Err(err) => Err(<A::Reply as Reply>::downcast_err(err)),
}
}
pub async fn enqueue(self) -> Result<PendingReply<M, A::Reply>, SendError>
where
Tm: Into<Option<Duration>> + Send + 'static,
Tr: Into<Option<Duration>> + Send + 'static,
{
let (reply, rx) = oneshot::channel();
let signal = Signal::Message {
message: Box::new(self.msg),
actor_ref: self.actor_ref.clone(),
reply: Some(reply),
sent_within_actor: self.actor_ref.is_current(),
};
let tx = self.actor_ref.mailbox_sender();
match self.mailbox_timeout.into() {
Some(timeout) => {
tx.send_timeout(signal, timeout).await?;
}
None => {
tx.send(signal).await?;
}
}
let fut = async move {
let reply = match self.reply_timeout.into() {
Some(timeout) => tokio::time::timeout(timeout, rx).await??,
None => rx.await?,
};
match reply {
Ok(val) => Ok(<A::Reply as Reply>::downcast_ok(val)),
Err(err) => Err(<A::Reply as Reply>::downcast_err(err)),
}
}
.boxed();
Ok(PendingReply { fut })
}
}
impl<A, M, Tm> AskRequest<'_, A, M, Tm, WithoutRequestTimeout>
where
A: Actor + Message<M>,
M: Send + 'static,
{
pub async fn forward(
self,
sender: ReplySender<<A::Reply as Reply>::Value>,
) -> Result<
(),
SendError<(M, ReplySender<<A::Reply as Reply>::Value>), <A::Reply as Reply>::Error>,
>
where
Tm: Into<Option<Duration>>,
{
let signal = Signal::Message {
message: Box::new(self.msg),
actor_ref: self.actor_ref.clone(),
reply: Some(sender.boxed()),
sent_within_actor: self.actor_ref.is_current(),
};
let tx = self.actor_ref.mailbox_sender();
match self.mailbox_timeout.into() {
Some(timeout) => {
tx.send_timeout(signal, timeout).await?;
}
None => {
tx.send(signal).await?;
}
}
Ok(())
}
}
impl<A, M> AskRequest<'_, A, M, WithoutRequestTimeout, WithoutRequestTimeout>
where
A: Actor + Message<M>,
M: Send + 'static,
{
#[allow(clippy::type_complexity)]
pub fn try_forward(
self,
sender: ReplySender<<A::Reply as Reply>::Value>,
) -> Result<
(),
SendError<(M, ReplySender<<A::Reply as Reply>::Value>), <A::Reply as Reply>::Error>,
> {
let signal = Signal::Message {
message: Box::new(self.msg),
actor_ref: self.actor_ref.clone(),
reply: Some(sender.boxed()),
sent_within_actor: self.actor_ref.is_current(),
};
let tx = self.actor_ref.mailbox_sender();
tx.try_send(signal)?;
Ok(())
}
}
impl<'a, A, M, Tr> AskRequest<'a, A, M, WithoutRequestTimeout, Tr>
where
A: Actor + Message<M>,
M: Send + 'static,
{
pub async fn try_send(
self,
) -> Result<<A::Reply as Reply>::Ok, SendError<M, <A::Reply as Reply>::Error>>
where
Tr: Into<Option<Duration>>,
{
let (reply, rx) = oneshot::channel();
let signal = Signal::Message {
message: Box::new(self.msg),
actor_ref: self.actor_ref.clone(),
reply: Some(reply),
sent_within_actor: self.actor_ref.is_current(),
};
let tx = self.actor_ref.mailbox_sender();
tx.try_send(signal)?;
let reply = match self.reply_timeout.into() {
Some(timeout) => tokio::time::timeout(timeout, rx).await??,
None => rx.await?,
};
match reply {
Ok(val) => Ok(<A::Reply as Reply>::downcast_ok(val)),
Err(err) => Err(<A::Reply as Reply>::downcast_err(err)),
}
}
pub fn try_enqueue(self) -> Result<PendingReply<M, A::Reply>, SendError>
where
Tr: Into<Option<Duration>> + Send + 'static,
{
let (reply, rx) = oneshot::channel();
let signal = Signal::Message {
message: Box::new(self.msg),
actor_ref: self.actor_ref.clone(),
reply: Some(reply),
sent_within_actor: self.actor_ref.is_current(),
};
let tx = self.actor_ref.mailbox_sender();
tx.try_send(signal)?;
let fut = async move {
let reply = match self.reply_timeout.into() {
Some(timeout) => tokio::time::timeout(timeout, rx).await??,
None => rx.await?,
};
match reply {
Ok(val) => Ok(<A::Reply as Reply>::downcast_ok(val)),
Err(err) => Err(<A::Reply as Reply>::downcast_err(err)),
}
}
.boxed();
Ok(PendingReply { fut })
}
}
impl<'a, A, M> AskRequest<'a, A, M, WithoutRequestTimeout, WithoutRequestTimeout>
where
A: Actor + Message<M>,
M: Send + 'static,
{
#[allow(clippy::type_complexity)]
pub fn blocking_send(
self,
) -> Result<<A::Reply as Reply>::Ok, SendError<M, <A::Reply as Reply>::Error>> {
let (reply, rx) = oneshot::channel();
let signal = Signal::Message {
message: Box::new(self.msg),
actor_ref: self.actor_ref.clone(),
reply: Some(reply),
sent_within_actor: self.actor_ref.is_current(),
};
let tx = self.actor_ref.mailbox_sender();
tx.blocking_send(signal)?;
match rx.blocking_recv()? {
Ok(val) => Ok(<A::Reply as Reply>::downcast_ok(val)),
Err(err) => Err(<A::Reply as Reply>::downcast_err(err)),
}
}
#[allow(clippy::type_complexity)]
pub fn blocking_forward(
self,
sender: ReplySender<<A::Reply as Reply>::Value>,
) -> Result<
(),
SendError<(M, ReplySender<<A::Reply as Reply>::Value>), <A::Reply as Reply>::Error>,
> {
let signal = Signal::Message {
message: Box::new(self.msg),
actor_ref: self.actor_ref.clone(),
reply: Some(sender.boxed()),
sent_within_actor: self.actor_ref.is_current(),
};
let tx = self.actor_ref.mailbox_sender();
tx.blocking_send(signal)?;
Ok(())
}
pub fn blocking_enqueue(self) -> Result<BlockingPendingReply<'a, M, A::Reply>, SendError> {
let (reply, rx) = oneshot::channel();
let signal = Signal::Message {
message: Box::new(self.msg),
actor_ref: self.actor_ref.clone(),
reply: Some(reply),
sent_within_actor: self.actor_ref.is_current(),
};
let tx = self.actor_ref.mailbox_sender();
tx.blocking_send(signal)?;
let f = Box::new(move || match rx.blocking_recv()? {
Ok(val) => Ok(<A::Reply as Reply>::downcast_ok(val)),
Err(err) => Err(<A::Reply as Reply>::downcast_err(err)),
});
Ok(BlockingPendingReply { f })
}
}
impl<'a, A, M, Tm, Tr> IntoFuture for AskRequest<'a, A, M, Tm, Tr>
where
A: Actor + Message<M>,
M: Send + 'static,
Tm: Into<Option<Duration>> + Send + 'static,
Tr: Into<Option<Duration>> + Send + 'static,
{
type Output = Result<<A::Reply as Reply>::Ok, error::SendError<M, <A::Reply as Reply>::Error>>;
type IntoFuture = BoxFuture<'a, Self::Output>;
fn into_future(self) -> Self::IntoFuture {
self.send().boxed()
}
}