use std::{marker::PhantomData, sync::Arc};
use futures::{future::poll_fn, pin_mut};
use once_cell::sync::Lazy;
use tracing::{error, info, trace};
use crate::{self as elfo};
use elfo_macros::msg_raw as msg;
use crate::{
actor::{Actor, ActorStatus},
addr::Addr,
address_book::AddressBook,
config::AnyConfig,
demux::Demux,
dumping::{self, Direction, Dump, Dumper, INTERNAL_CLASS},
envelope::{Envelope, MessageKind},
errors::{RequestError, SendError, TryRecvError, TrySendError},
mailbox::RecvResult,
message::{Message, Request},
messages,
request_table::ResponseToken,
routers::Singleton,
scope,
source::{Combined, Source},
};
use self::{budget::Budget, stats::Stats};
mod budget;
mod stats;
static DUMPER: Lazy<Dumper> = Lazy::new(|| Dumper::new(INTERNAL_CLASS));
pub struct Context<C = (), K = Singleton, S = ()> {
book: AddressBook,
addr: Addr,
group: Addr,
demux: Demux,
config: Arc<C>,
key: K,
source: S,
stage: Stage,
stats: Stats,
budget: Budget,
}
#[derive(Clone, Copy, PartialEq)]
enum Stage {
PreRecv,
Working,
Closed,
}
assert_impl_all!(Context: Send);
impl<C, K, S> Context<C, K, S> {
#[inline]
pub fn addr(&self) -> Addr {
self.addr
}
#[inline]
pub fn group(&self) -> Addr {
self.group
}
#[deprecated]
#[doc(hidden)]
#[cfg(feature = "test-util")]
pub fn set_addr(&mut self, addr: Addr) {
self.addr = addr;
}
#[inline]
pub fn config(&self) -> &C {
&self.config
}
#[inline]
pub fn key(&self) -> &K {
&self.key
}
pub fn with<S1>(self, source: S1) -> Context<C, K, Combined<S, S1>> {
Context {
book: self.book,
addr: self.addr,
group: self.group,
demux: self.demux,
config: self.config,
key: self.key,
source: Combined::new(self.source, source),
stage: Stage::PreRecv,
stats: self.stats,
budget: self.budget,
}
}
pub fn set_status(&self, status: ActorStatus) {
let object = ward!(self.book.get_owned(self.addr));
let actor = ward!(object.as_actor());
actor.set_status(status);
}
pub fn close(&self) -> bool {
let object = ward!(self.book.get_owned(self.addr), return false);
ward!(object.as_actor(), return false).close()
}
pub async fn send<M: Message>(&self, message: M) -> Result<(), SendError<M>> {
let kind = MessageKind::Regular { sender: self.addr };
self.do_send(message, kind).await
}
pub fn try_send<M: Message>(&self, message: M) -> Result<(), TrySendError<M>> {
let kind = MessageKind::Regular { sender: self.addr };
self.stats.on_sent_message::<M>();
trace!("> {:?}", message);
if let Some(permit) = DUMPER.acquire_m::<M>() {
permit.record(Dump::message(message.clone(), &kind, Direction::Out));
}
let envelope = Envelope::new(message, kind).upcast();
let addrs = self.demux.filter(&envelope);
if addrs.is_empty() {
return Err(TrySendError::Closed(e2m(envelope)));
}
if addrs.len() == 1 {
return match self.book.get(addrs[0]) {
Some(object) => object.try_send(envelope).map_err(|err| err.map(e2m)),
None => Err(TrySendError::Closed(e2m(envelope))),
};
}
let mut unused = None;
let mut has_full = false;
let mut success = false;
for addr in addrs {
let envelope = unused.take().or_else(|| envelope.duplicate(&self.book));
let envelope = ward!(envelope, break);
match self.book.get(addr) {
Some(object) => match object.try_send(envelope) {
Ok(()) => success = true,
Err(err) => {
has_full |= err.is_full();
unused = Some(err.into_inner());
}
},
None => unused = Some(envelope),
};
}
if success {
Ok(())
} else if has_full {
Err(TrySendError::Full(e2m(envelope)))
} else {
Err(TrySendError::Closed(e2m(envelope)))
}
}
#[inline]
pub fn request<R: Request>(&self, request: R) -> RequestBuilder<'_, C, K, S, R, Any> {
RequestBuilder::new(self, request)
}
#[inline]
pub fn request_to<R: Request>(
&self,
recipient: Addr,
request: R,
) -> RequestBuilder<'_, C, K, S, R, Any> {
RequestBuilder::new(self, request).to(recipient)
}
async fn do_send<M: Message>(&self, message: M, kind: MessageKind) -> Result<(), SendError<M>> {
self.stats.on_sent_message::<M>();
trace!("> {:?}", message);
if let Some(permit) = DUMPER.acquire_m::<M>() {
permit.record(Dump::message(message.clone(), &kind, Direction::Out));
}
let envelope = Envelope::new(message, kind).upcast();
let addrs = self.demux.filter(&envelope);
if addrs.is_empty() {
return Err(SendError(e2m(envelope)));
}
if addrs.len() == 1 {
return match self.book.get_owned(addrs[0]) {
Some(object) => object
.send(self, envelope)
.await
.map_err(|err| SendError(e2m(err.0))),
None => Err(SendError(e2m(envelope))),
};
}
let mut unused = None;
let mut success = false;
for addr in addrs {
let envelope = unused.take().or_else(|| envelope.duplicate(&self.book));
let envelope = ward!(envelope, break);
match self.book.get_owned(addr) {
Some(object) => {
unused = object.send(self, envelope).await.err().map(|err| err.0);
if unused.is_none() {
success = true;
}
}
None => unused = Some(envelope),
};
}
if success {
Ok(())
} else {
Err(SendError(e2m(envelope)))
}
}
pub async fn send_to<M: Message>(
&self,
recipient: Addr,
message: M,
) -> Result<(), SendError<M>> {
let kind = MessageKind::Regular { sender: self.addr };
self.do_send_to(recipient, message, kind).await
}
async fn do_send_to<M: Message>(
&self,
recipient: Addr,
message: M,
kind: MessageKind,
) -> Result<(), SendError<M>> {
self.stats.on_sent_message::<M>();
trace!(to = %recipient, "> {:?}", message);
if let Some(permit) = DUMPER.acquire_m::<M>() {
permit.record(Dump::message(message.clone(), &kind, Direction::Out));
}
let entry = self.book.get_owned(recipient);
let object = ward!(entry, return Err(SendError(message)));
let envelope = Envelope::new(message, kind);
let fut = object.send(self, envelope.upcast());
let result = fut.await;
result.map_err(|err| SendError(e2m(err.0)))
}
pub fn try_send_to<M: Message>(
&self,
recipient: Addr,
message: M,
) -> Result<(), TrySendError<M>> {
self.stats.on_sent_message::<M>();
let kind = MessageKind::Regular { sender: self.addr };
trace!(to = %recipient, "> {:?}", message);
if let Some(permit) = DUMPER.acquire_m::<M>() {
permit.record(Dump::message(message.clone(), &kind, Direction::Out));
}
let entry = self.book.get(recipient);
let object = ward!(entry, return Err(TrySendError::Closed(message)));
let envelope = Envelope::new(message, kind);
object
.try_send(envelope.upcast())
.map_err(|err| err.map(e2m))
}
pub fn respond<R: Request>(&self, token: ResponseToken<R>, message: R::Response) {
if token.is_forgotten() {
return;
}
self.stats.on_sent_message::<R::Wrapper>();
let sender = token.sender;
let message = R::Wrapper::from(message);
let kind = MessageKind::Response {
sender: self.addr(),
request_id: token.request_id,
};
trace!(to = %sender, "> {:?}", message);
if let Some(permit) = DUMPER.acquire_m::<R>() {
permit.record(Dump::message(message.clone(), &kind, Direction::Out));
}
let envelope = Envelope::new(message, kind).upcast();
let object = ward!(self.book.get(token.sender));
let actor = ward!(object.as_actor());
actor
.request_table()
.respond(token.into_untyped(), envelope);
}
pub async fn recv(&mut self) -> Option<Envelope>
where
C: 'static,
S: Source,
{
loop {
self.stats.on_recv();
if self.stage == Stage::Closed {
on_recv_after_close();
}
self.budget.acquire().await;
let object = self.book.get_owned(self.addr)?;
let actor = object.as_actor()?;
if self.stage == Stage::PreRecv {
on_first_recv(&mut self.stage, actor);
}
let mailbox_fut = actor.recv();
pin_mut!(mailbox_fut);
let source_fut = poll_fn(|cx| self.source.poll_recv(cx));
pin_mut!(source_fut);
tokio::select! {
result = mailbox_fut => match result {
RecvResult::Data(envelope) => {
if let Some(envelope) = self.post_recv(envelope) {
return Some(envelope);
}
},
RecvResult::Closed(trace_id) => {
scope::set_trace_id(trace_id);
on_input_closed(&mut self.stage, actor);
return None;
}
},
option = source_fut => {
let envelope = option.expect("source cannot return None");
if let Some(envelope) = self.post_recv(envelope) {
return Some(envelope);
}
},
}
}
}
pub fn try_recv(&mut self) -> Result<Envelope, TryRecvError>
where
C: 'static,
{
loop {
self.stats.on_recv();
if self.stage == Stage::Closed {
on_recv_after_close();
}
let object = self.book.get(self.addr).ok_or(TryRecvError::Closed)?;
let actor = object.as_actor().ok_or(TryRecvError::Closed)?;
if self.stage == Stage::PreRecv {
on_first_recv(&mut self.stage, actor);
}
match actor.try_recv() {
Some(RecvResult::Data(envelope)) => {
drop(object);
if let Some(envelope) = self.post_recv(envelope) {
return Ok(envelope);
}
}
Some(RecvResult::Closed(trace_id)) => {
scope::set_trace_id(trace_id);
on_input_closed(&mut self.stage, actor);
return Err(TryRecvError::Closed);
}
None => {
self.stats.on_empty_mailbox();
return Err(TryRecvError::Empty);
}
}
}
}
fn post_recv(&mut self, envelope: Envelope) -> Option<Envelope>
where
C: 'static,
{
self.budget.decrement();
scope::set_trace_id(envelope.trace_id());
let envelope = msg!(match envelope {
(messages::UpdateConfig { config }, token) => {
self.config = config.get_user::<C>().clone();
info!("config updated");
let message = messages::ConfigUpdated {};
let kind = MessageKind::Regular { sender: self.addr };
let envelope = Envelope::new(message, kind).upcast();
self.respond(token, Ok(()));
envelope
}
envelope => envelope,
});
let message = envelope.message();
trace!("< {:?}", message);
if message.dumping_allowed() {
if let Some(permit) = DUMPER.acquire() {
let dump = Dump::builder()
.direction(Direction::In)
.message_name(message.name())
.message_protocol(message.protocol())
.message_kind(dumping::MessageKind::from_message_kind(
envelope.message_kind(),
))
.do_finish(message.erase());
permit.record(dump);
}
}
if envelope.is::<messages::Terminate>() {
self.set_status(ActorStatus::TERMINATING);
}
self.stats.on_received_envelope(&envelope);
msg!(match envelope {
(messages::Ping, token) => {
self.respond(token, ());
None
}
envelope => Some(envelope),
})
}
#[doc(hidden)]
pub async fn finished(&self, addr: Addr) {
ward!(self.book.get_owned(addr)).finished().await;
}
pub fn unpack_config<'c>(&self, config: &'c AnyConfig) -> &'c C
where
C: for<'de> serde::Deserialize<'de> + 'static,
{
config.get_user()
}
pub fn pruned(&self) -> Context {
Context {
book: self.book.clone(),
addr: self.addr,
group: self.group,
demux: self.demux.clone(),
config: Arc::new(()),
key: Singleton,
source: (),
stage: self.stage,
stats: Stats::empty(),
budget: self.budget.clone(),
}
}
pub(crate) fn book(&self) -> &AddressBook {
&self.book
}
pub(crate) fn with_config<C1>(self, config: Arc<C1>) -> Context<C1, K, S> {
Context {
book: self.book,
addr: self.addr,
group: self.group,
demux: self.demux,
config,
key: self.key,
source: self.source,
stage: self.stage,
stats: self.stats,
budget: self.budget,
}
}
pub(crate) fn with_addr(mut self, addr: Addr) -> Self {
self.addr = addr;
self.stats = Stats::startup();
self
}
pub(crate) fn with_group(mut self, group: Addr) -> Self {
self.group = group;
self
}
pub(crate) fn with_key<K1>(self, key: K1) -> Context<C, K1, S> {
Context {
book: self.book,
addr: self.addr,
group: self.group,
demux: self.demux,
config: self.config,
key,
source: self.source,
stage: self.stage,
stats: self.stats,
budget: self.budget,
}
}
}
fn e2m<M: Message>(envelope: Envelope) -> M {
envelope.do_downcast::<M>().into_message()
}
#[cold]
fn on_first_recv(stage: &mut Stage, actor: &Actor) {
if actor.is_initializing() {
actor.set_status(ActorStatus::NORMAL);
}
*stage = Stage::Working;
}
#[cold]
fn on_input_closed(stage: &mut Stage, actor: &Actor) {
if !actor.is_terminating() {
actor.set_status(ActorStatus::TERMINATING);
}
*stage = Stage::Closed;
trace!("input closed");
}
#[cold]
fn on_recv_after_close() {
error!("calling `recv()` or `try_recv()` after `None` is returned, an infinite loop?");
panic!("suicide");
}
impl Context {
pub(crate) fn new(book: AddressBook, demux: Demux) -> Self {
Self {
book,
addr: Addr::NULL,
group: Addr::NULL,
demux,
config: Arc::new(()),
key: Singleton,
source: (),
stage: Stage::PreRecv,
stats: Stats::empty(),
budget: Budget::default(),
}
}
}
impl<C, K: Clone> Clone for Context<C, K> {
fn clone(&self) -> Self {
Self {
book: self.book.clone(),
addr: self.addr,
group: self.group,
demux: self.demux.clone(),
config: self.config.clone(),
key: self.key.clone(),
source: (),
stage: self.stage,
stats: Stats::empty(),
budget: self.budget.clone(),
}
}
}
#[must_use]
pub struct RequestBuilder<'c, C, K, S, R, M> {
context: &'c Context<C, K, S>,
request: R,
to: Option<Addr>,
marker: PhantomData<M>,
}
pub struct Any;
pub struct All;
pub(crate) struct Forgotten;
impl<'c, C, K, S, R> RequestBuilder<'c, C, K, S, R, Any> {
fn new(context: &'c Context<C, K, S>, request: R) -> Self {
Self {
context,
request,
to: None,
marker: PhantomData,
}
}
#[inline]
pub fn all(self) -> RequestBuilder<'c, C, K, S, R, All> {
RequestBuilder {
context: self.context,
request: self.request,
to: self.to,
marker: PhantomData,
}
}
#[allow(unused)]
pub(crate) fn forgotten(self) -> RequestBuilder<'c, C, K, S, R, Forgotten> {
RequestBuilder {
context: self.context,
request: self.request,
to: self.to,
marker: PhantomData,
}
}
}
impl<'c, C, K, S, R, M> RequestBuilder<'c, C, K, S, R, M> {
#[deprecated(note = "use `Context::request_to()` instead")]
#[doc(hidden)]
#[inline]
pub fn from(self, addr: Addr) -> Self {
self.to(addr)
}
#[inline]
fn to(mut self, addr: Addr) -> Self {
self.to = Some(addr);
self
}
}
impl<'c, C: 'static, K, S, R: Request> RequestBuilder<'c, C, S, K, R, Any> {
pub async fn resolve(self) -> Result<R::Response, RequestError<R>> {
let this = self.context.addr;
let object = self.context.book.get_owned(this).expect("invalid addr");
let actor = object.as_actor().expect("can be called only on actors");
let token = actor
.request_table()
.new_request(self.context.book.clone(), false);
let request_id = token.request_id;
let kind = MessageKind::RequestAny(token);
let res = if let Some(recipient) = self.to {
self.context.do_send_to(recipient, self.request, kind).await
} else {
self.context.do_send(self.request, kind).await
};
if let Err(err) = res {
return Err(RequestError::Closed(err.0));
}
let mut data = actor.request_table().wait(request_id).await;
if let Some(Some(envelope)) = data.pop() {
let envelope = envelope.do_downcast::<R::Wrapper>();
trace!("< {:?}", envelope.message());
if let Some(permit) = DUMPER.acquire_m::<R>() {
permit.record(Dump::message(
envelope.message().clone(),
envelope.message_kind(),
Direction::In,
));
}
Ok(envelope.into_message().into())
} else {
Err(RequestError::Ignored)
}
}
}
impl<'c, C: 'static, K, S, R: Request> RequestBuilder<'c, C, K, S, R, All> {
pub async fn resolve(self) -> Vec<Result<R::Response, RequestError<R>>> {
let this = self.context.addr;
let object = self.context.book.get_owned(this).expect("invalid addr");
let actor = object.as_actor().expect("can be called only on actors");
let token = actor
.request_table()
.new_request(self.context.book.clone(), true);
let request_id = token.request_id;
let kind = MessageKind::RequestAll(token);
let res = if let Some(recipient) = self.to {
self.context.do_send_to(recipient, self.request, kind).await
} else {
self.context.do_send(self.request, kind).await
};
if let Err(err) = res {
return vec![Err(RequestError::Closed(err.0))];
}
actor
.request_table()
.wait(request_id)
.await
.into_iter()
.map(|opt| match opt {
Some(envelope) => Ok(envelope.do_downcast::<R::Wrapper>()),
None => Err(RequestError::Ignored),
})
.map(|res| {
let envelope = res?;
trace!("< {:?}", envelope.message());
if let Some(permit) = DUMPER.acquire_m::<R>() {
permit.record(Dump::message(
envelope.message().clone(),
envelope.message_kind(),
Direction::In,
));
}
Ok(envelope.into_message().into())
})
.collect()
}
}
impl<'c, C: 'static, K, S, R: Request> RequestBuilder<'c, C, S, K, R, Forgotten> {
pub async fn resolve(self) -> Result<R::Response, RequestError<R>> {
let token = ResponseToken::forgotten(self.context.book.clone());
let kind = MessageKind::RequestAny(token);
let res = if let Some(recipient) = self.to {
self.context.do_send_to(recipient, self.request, kind).await
} else {
self.context.do_send(self.request, kind).await
};
if let Err(err) = res {
return Err(RequestError::Closed(err.0));
}
Err(RequestError::Ignored)
}
}