use std::cmp::Ordering;
use std::fmt::{self, Debug, Formatter};
use std::future::Future;
use std::hash::{Hash, Hasher};
use std::pin::Pin;
use std::task::{Context, Poll};
use event_listener::EventListener;
use futures_util::FutureExt;
use crate::refcount::{Either, RefCounter, Strong, Weak};
use crate::send_future::{ActorNamedBroadcasting, Broadcast, ResolveToHandlerReturn};
use crate::{chan, ActorNamedSending, Handler, SendFuture};
pub struct Address<A, Rc: RefCounter = Strong>(pub(crate) chan::Ptr<A, Rc>);
impl<A, Rc: RefCounter> Debug for Address<A, Rc> {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
let actor_type = std::any::type_name::<A>();
let rc_type = std::any::type_name::<Rc>()
.replace("xtra::chan::ptr::", "")
.replace("Tx", "");
f.debug_struct(&format!("Address<{}, {}>", actor_type, rc_type))
.field("addresses", &self.0.sender_count())
.field("mailboxes", &self.0.receiver_count())
.finish()
}
}
pub type WeakAddress<A> = Address<A, Weak>;
impl<A> WeakAddress<A> {
pub fn try_upgrade(&self) -> Option<Address<A>> {
Some(Address(self.0.try_to_tx_strong()?))
}
}
impl<A> Address<A, Strong> {
pub fn downgrade(&self) -> WeakAddress<A> {
Address(self.0.to_tx_weak())
}
}
impl<A> Address<A, Either> {
pub fn downgrade(&self) -> WeakAddress<A> {
Address(self.0.to_tx_weak())
}
}
impl<A, Rc> Address<A, Rc>
where
Rc: RefCounter + Into<Either>,
{
pub fn as_either(&self) -> Address<A, Either> {
Address(self.0.to_tx_either())
}
}
impl<A, Rc: RefCounter> Address<A, Rc> {
pub fn is_connected(&self) -> bool {
self.0.is_connected()
}
pub fn len(&self) -> usize {
self.0.len()
}
pub fn capacity(&self) -> Option<usize> {
self.0.capacity()
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
#[allow(clippy::type_complexity)]
pub fn send<M>(
&self,
message: M,
) -> SendFuture<ActorNamedSending<A, Rc>, ResolveToHandlerReturn<<A as Handler<M>>::Return>>
where
M: Send + 'static,
A: Handler<M>,
{
SendFuture::sending_named(message, self.0.clone())
}
pub fn broadcast<M>(&self, msg: M) -> SendFuture<ActorNamedBroadcasting<A, Rc>, Broadcast>
where
M: Clone + Send + Sync + 'static,
A: Handler<M, Return = ()>,
{
SendFuture::broadcast_named(msg, self.0.clone())
}
pub fn join(&self) -> ActorJoinHandle {
ActorJoinHandle(self.0.disconnect_listener())
}
pub fn same_actor<Rc2: RefCounter>(&self, other: &Address<A, Rc2>) -> bool {
self.0.inner_ptr() == other.0.inner_ptr()
}
#[cfg(feature = "sink")]
pub fn into_sink<M>(self) -> impl futures_sink::Sink<M, Error = crate::Error>
where
A: Handler<M, Return = ()>,
M: Send + 'static,
{
futures_util::sink::unfold((), move |(), message| self.send(message))
}
}
#[must_use = "Futures do nothing unless polled"]
pub struct ActorJoinHandle(Option<EventListener>);
impl Future for ActorJoinHandle {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.0.take() {
Some(mut listener) => match listener.poll_unpin(cx) {
Poll::Ready(()) => Poll::Ready(()),
Poll::Pending => {
self.0 = Some(listener);
Poll::Pending
}
},
None => Poll::Ready(()),
}
}
}
impl<A, Rc: RefCounter> Clone for Address<A, Rc> {
fn clone(&self) -> Self {
Address(self.0.clone())
}
}
impl<A, Rc: RefCounter, Rc2: RefCounter> PartialEq<Address<A, Rc2>> for Address<A, Rc> {
fn eq(&self, other: &Address<A, Rc2>) -> bool {
(self.same_actor(other)) && (self.0.is_strong() == other.0.is_strong())
}
}
impl<A, Rc: RefCounter> Eq for Address<A, Rc> {}
impl<A, Rc: RefCounter, Rc2: RefCounter> PartialOrd<Address<A, Rc2>> for Address<A, Rc> {
fn partial_cmp(&self, other: &Address<A, Rc2>) -> Option<Ordering> {
Some(match self.0.inner_ptr().cmp(&other.0.inner_ptr()) {
Ordering::Equal => self.0.is_strong().cmp(&other.0.is_strong()),
ord => ord,
})
}
}
impl<A, Rc: RefCounter> Ord for Address<A, Rc> {
fn cmp(&self, other: &Self) -> Ordering {
self.0.inner_ptr().cmp(&other.0.inner_ptr())
}
}
impl<A, Rc: RefCounter> Hash for Address<A, Rc> {
fn hash<H: Hasher>(&self, state: &mut H) {
state.write_usize(self.0.inner_ptr() as *const _ as usize);
state.write_u8(self.0.is_strong() as u8);
state.finish();
}
}