use super::{ActionRecipient, InteractionRecipient};
use crate::actor_runtime::{Actor, Status};
use crate::compat::watch;
use crate::forwarders::AttachStream;
use crate::handlers::{
Action, ActionHandler, Consumer, Envelope, Handler, InstantAction, InstantActionHandler,
Interact, Interaction, InteractionHandler, InteractionTask, InterruptedBy, Operation, Parcel,
Priority, Scheduled, ScheduledItem, StreamAcceptor, TerminateBy, TerminatedBy,
};
use crate::ids::{Id, IdOf};
use crate::lifecycle::Interrupt;
use crate::lite_runtime::Tag;
use anyhow::Error;
use futures::Stream;
use std::fmt;
use std::hash::{Hash, Hasher};
use std::time::Instant;
use tokio::sync::mpsc;
pub struct AddressPair<A: Actor> {
pub(crate) joint: AddressJoint<A>,
pub(crate) address: Address<A>,
}
impl<A: Actor> AddressPair<A> {
pub fn new() -> Self {
let id = Id::unique();
let (hp_msg_tx, hp_msg_rx) = mpsc::unbounded_channel();
let (msg_tx, msg_rx) = mpsc::unbounded_channel();
let (join_tx, join_rx) = watch::channel(Status::Alive);
let joint = AddressJoint {
msg_rx,
hp_msg_rx,
join_tx,
};
let address = Address {
id,
hp_msg_tx,
msg_tx,
join_rx,
};
Self { joint, address }
}
pub fn address(&self) -> &Address<A> {
&self.address
}
}
pub(crate) struct AddressJoint<A: Actor> {
pub msg_rx: mpsc::UnboundedReceiver<Envelope<A>>,
pub hp_msg_rx: mpsc::UnboundedReceiver<Parcel<A>>,
pub join_tx: watch::Sender<Status>,
}
pub struct Address<A: Actor> {
id: Id,
hp_msg_tx: mpsc::UnboundedSender<Parcel<A>>,
msg_tx: mpsc::UnboundedSender<Envelope<A>>,
join_rx: watch::Receiver<Status>,
}
impl<A: Actor> Clone for Address<A> {
fn clone(&self) -> Self {
Self {
id: self.id.clone(),
hp_msg_tx: self.hp_msg_tx.clone(),
msg_tx: self.msg_tx.clone(),
join_rx: self.join_rx.clone(),
}
}
}
impl<A: Actor> fmt::Debug for Address<A> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_tuple("Address").field(&self.id).finish()
}
}
impl<A: Actor> PartialEq for Address<A> {
fn eq(&self, other: &Self) -> bool {
self.id.eq(&other.id)
}
}
impl<A: Actor> Eq for Address<A> {}
impl<A: Actor> Hash for Address<A> {
fn hash<H: Hasher>(&self, state: &mut H) {
self.id.hash(state);
}
}
impl<A: Actor> Address<A> {
pub fn id(&self) -> IdOf<A> {
IdOf::new(self.id.clone())
}
pub(crate) fn raw_id(&self) -> &Id {
&self.id
}
pub fn act<I>(&self, input: I) -> Result<(), Error>
where
I: Action,
A: ActionHandler<I>,
{
let envelope = Envelope::new(input);
self.normal_priority_send(envelope)
}
pub fn instant<I>(&self, input: I) -> Result<(), Error>
where
I: InstantAction,
A: InstantActionHandler<I>,
{
let parcel = Parcel::new(Operation::Forward, input);
self.high_priority_send(parcel)
}
pub fn schedule<I>(&self, input: I, deadline: Instant) -> Result<(), Error>
where
I: Send + 'static,
A: Scheduled<I>,
{
let operation = Operation::Schedule { deadline };
let wrapped = ScheduledItem {
timestamp: deadline,
item: input,
};
let parcel = Parcel::new(operation, wrapped);
self.high_priority_send(parcel)
}
pub fn unpack_parcel(&self, parcel: Parcel<A>) -> Result<(), Error> {
self.high_priority_send(parcel)
}
fn high_priority_send(&self, parcel: Parcel<A>) -> Result<(), Error> {
self.hp_msg_tx
.send(parcel)
.map_err(|_| Error::msg("can't send a high-priority service message"))
}
fn normal_priority_send(&self, envelope: Envelope<A>) -> Result<(), Error> {
self.msg_tx
.send(envelope)
.map_err(|err| Error::msg(err.to_string()))
}
pub fn send_event(&self, handler: impl Handler<A>) -> Result<(), Error> {
let priority = handler.priority();
let envelope = Envelope::from_handler(handler);
match priority {
Priority::Normal => self.normal_priority_send(envelope),
Priority::Instant => {
let parcel = Parcel::from_envelope(envelope);
self.high_priority_send(parcel)
}
}
}
pub fn interact<I>(&self, request: I) -> InteractionTask<I>
where
I: Interaction,
A: ActionHandler<Interact<I>>,
{
InteractionTask::new(self, request)
}
pub async fn join(self) {
let mut rx = self.join_rx.clone();
drop(self);
while rx.changed().await.is_ok() {
if *rx.borrow() == Status::Stop {
break;
}
}
}
pub(crate) fn interrupt_by<T>(&self) -> Result<(), Error>
where
A: InterruptedBy<T>,
T: Actor,
{
let parcel = Parcel::new(Operation::Forward, Interrupt::new());
self.high_priority_send(parcel)
}
pub fn terminate_by<T>(&self) -> Result<(), Error>
where
A: TerminatedBy<T>,
T: 'static,
{
let input = TerminateBy::new();
let envelope = Envelope::new(input);
self.normal_priority_send(envelope)
}
pub fn attach<S, M>(&mut self, stream: S, tag: M) -> Result<(), Error>
where
A: Consumer<S::Item> + StreamAcceptor<S::Item>,
S: Stream + Send + Unpin + 'static,
S::Item: Send + 'static,
M: Tag,
{
let msg = AttachStream::new(stream, tag);
self.instant(msg)
}
pub fn link<T>(&self) -> T
where
T: From<Self>,
{
T::from(self.clone())
}
pub fn action_recipient<T>(&self) -> Box<dyn ActionRecipient<T>>
where
T: Action,
A: ActionHandler<T>,
{
Box::new(self.clone())
}
pub fn interaction_recipient<T>(&self) -> Box<dyn InteractionRecipient<T>>
where
T: Interaction,
A: InteractionHandler<T>,
{
Box::new(self.clone())
}
}
impl<T, A> From<Address<A>> for Box<dyn ActionRecipient<T>>
where
T: Action,
A: Actor + ActionHandler<T>,
{
fn from(address: Address<A>) -> Self {
Box::new(address)
}
}
impl<T, A> From<Address<A>> for Box<dyn InteractionRecipient<T>>
where
T: Interaction,
A: Actor + InteractionHandler<T>,
{
fn from(address: Address<A>) -> Self {
Box::new(address)
}
}