#![allow(unused_variables)]
use std::{
collections::HashMap,
hash::Hash,
};
use crate::{
actor::{
Actor, ActorRef, ActorRefFactory, ActorReference, BasicActorRef, BoxedTell, Context,
CreateError, Receive, Sender,
},
system::{SystemEvent, SystemMsg},
Message,
};
type Subs<Msg> = HashMap<Topic, Vec<BoxedTell<Msg>>>;
pub type ChannelCtx<Msg> = Context<ChannelMsg<Msg>>;
pub type ChannelRef<Msg> = ActorRef<ChannelMsg<Msg>>;
pub struct Channel<Msg: Message> {
subs: Subs<Msg>,
}
impl<Msg: Message> Default for Channel<Msg> {
fn default() -> Self {
Channel {
subs: HashMap::new(),
}
}
}
impl<Msg> Actor for Channel<Msg>
where
Msg: Message,
{
type Msg = ChannelMsg<Msg>;
fn pre_start(&mut self, ctx: &ChannelCtx<Msg>) {
}
fn recv(&mut self, ctx: &ChannelCtx<Msg>, msg: ChannelMsg<Msg>, sender: Sender) {
self.receive(ctx, msg, sender);
}
fn sys_recv(&mut self, _: &ChannelCtx<Msg>, msg: SystemMsg, sender: Sender) {
if let SystemMsg::Event(evt) = msg {
if let SystemEvent::ActorTerminated(terminated) = evt {
let subs = self.subs.clone();
for topic in subs.keys() {
unsubscribe(&mut self.subs, topic, &terminated.actor);
}
}
}
}
}
impl<Msg> Receive<ChannelMsg<Msg>> for Channel<Msg>
where
Msg: Message,
{
type Msg = ChannelMsg<Msg>;
fn receive(&mut self, ctx: &ChannelCtx<Msg>, msg: Self::Msg, sender: Sender) {
match msg {
ChannelMsg::Publish(p) => self.receive(ctx, p, sender),
ChannelMsg::Subscribe(sub) => self.receive(ctx, sub, sender),
ChannelMsg::Unsubscribe(unsub) => self.receive(ctx, unsub, sender),
ChannelMsg::UnsubscribeAll(unsub) => self.receive(ctx, unsub, sender),
}
}
}
impl<Msg> Receive<Subscribe<Msg>> for Channel<Msg>
where
Msg: Message,
{
type Msg = ChannelMsg<Msg>;
fn receive(&mut self, ctx: &ChannelCtx<Msg>, msg: Subscribe<Msg>, sender: Sender) {
let subs = self.subs.entry(msg.topic).or_default();
subs.push(msg.actor);
}
}
impl<Msg> Receive<Unsubscribe<Msg>> for Channel<Msg>
where
Msg: Message,
{
type Msg = ChannelMsg<Msg>;
fn receive(&mut self, ctx: &ChannelCtx<Msg>, msg: Unsubscribe<Msg>, sender: Sender) {
unsubscribe(&mut self.subs, &msg.topic, &msg.actor);
}
}
impl<Msg> Receive<UnsubscribeAll<Msg>> for Channel<Msg>
where
Msg: Message,
{
type Msg = ChannelMsg<Msg>;
fn receive(&mut self, ctx: &ChannelCtx<Msg>, msg: UnsubscribeAll<Msg>, sender: Sender) {
let subs = self.subs.clone();
for topic in subs.keys() {
unsubscribe(&mut self.subs, topic, &msg.actor);
}
}
}
impl<Msg> Receive<Publish<Msg>> for Channel<Msg>
where
Msg: Message,
{
type Msg = ChannelMsg<Msg>;
fn receive(&mut self, ctx: &ChannelCtx<Msg>, msg: Publish<Msg>, sender: Sender) {
if let Some(subs) = self.subs.get(&All.into()) {
for sub in subs.iter() {
sub.tell(msg.msg.clone(), sender.clone());
}
}
if let Some(subs) = self.subs.get(&msg.topic) {
for sub in subs.iter() {
sub.tell(msg.msg.clone(), sender.clone());
}
}
}
}
fn unsubscribe<Msg>(subs: &mut Subs<Msg>, topic: &Topic, actor: &dyn ActorReference) {
if subs.contains_key(topic) {
if let Some(pos) = subs
.get(topic)
.unwrap()
.iter()
.position(|x| x.path() == actor.path())
{
subs.get_mut(topic).unwrap().remove(pos);
}
}
}
#[derive(Default)]
pub struct EventsChannel(Channel<SystemEvent>);
impl Actor for EventsChannel {
type Msg = ChannelMsg<SystemEvent>;
fn pre_start(&mut self, ctx: &ChannelCtx<SystemEvent>) {
self.0.pre_start(ctx);
}
fn recv(
&mut self,
ctx: &ChannelCtx<SystemEvent>,
msg: ChannelMsg<SystemEvent>,
sender: Sender,
) {
self.receive(ctx, msg, sender);
}
fn sys_recv(&mut self, ctx: &ChannelCtx<SystemEvent>, msg: SystemMsg, sender: Sender) {
self.0.sys_recv(ctx, msg, sender);
}
}
impl Receive<ChannelMsg<SystemEvent>> for EventsChannel {
type Msg = ChannelMsg<SystemEvent>;
fn receive(&mut self, ctx: &ChannelCtx<SystemEvent>, msg: Self::Msg, sender: Sender) {
match msg {
ChannelMsg::Publish(p) => self.receive(ctx, p, sender),
ChannelMsg::Subscribe(sub) => self.0.receive(ctx, sub, sender),
ChannelMsg::Unsubscribe(unsub) => self.0.receive(ctx, unsub, sender),
ChannelMsg::UnsubscribeAll(unsub) => self.0.receive(ctx, unsub, sender),
}
}
}
impl Receive<Publish<SystemEvent>> for EventsChannel {
type Msg = ChannelMsg<SystemEvent>;
fn receive(
&mut self,
ctx: &ChannelCtx<SystemEvent>,
msg: Publish<SystemEvent>,
sender: Sender,
) {
if let Some(subs) = self.0.subs.get(&All.into()) {
for sub in subs.iter() {
let evt = SystemMsg::Event(msg.msg.clone());
sub.sys_tell(evt);
}
}
if let Some(subs) = self.0.subs.get(&msg.topic) {
for sub in subs.iter() {
let evt = SystemMsg::Event(msg.msg.clone());
sub.sys_tell(evt);
}
}
}
}
pub type DLChannelMsg = ChannelMsg<DeadLetter>;
#[derive(Clone, Debug)]
pub struct DeadLetter {
pub msg: String,
pub sender: Sender,
pub recipient: BasicActorRef,
}
#[derive(Debug, Clone)]
pub struct Subscribe<Msg: Message> {
pub topic: Topic,
pub actor: BoxedTell<Msg>,
}
#[derive(Debug, Clone)]
pub struct Unsubscribe<Msg: Message> {
pub topic: Topic,
pub actor: BoxedTell<Msg>,
}
#[derive(Debug, Clone)]
pub struct UnsubscribeAll<Msg: Message> {
pub actor: BoxedTell<Msg>,
}
#[derive(Debug, Clone)]
pub struct Publish<Msg: Message> {
pub topic: Topic,
pub msg: Msg,
}
#[derive(Debug, Clone)]
pub enum ChannelMsg<Msg: Message> {
Publish(Publish<Msg>),
Subscribe(Subscribe<Msg>),
Unsubscribe(Unsubscribe<Msg>),
UnsubscribeAll(UnsubscribeAll<Msg>),
}
impl<Msg: Message> Into<ChannelMsg<Msg>> for Publish<Msg> {
fn into(self) -> ChannelMsg<Msg> {
ChannelMsg::Publish(self)
}
}
impl<Msg: Message> Into<ChannelMsg<Msg>> for Subscribe<Msg> {
fn into(self) -> ChannelMsg<Msg> {
ChannelMsg::Subscribe(self)
}
}
impl<Msg: Message> Into<ChannelMsg<Msg>> for Unsubscribe<Msg> {
fn into(self) -> ChannelMsg<Msg> {
ChannelMsg::Unsubscribe(self)
}
}
impl<Msg: Message> Into<ChannelMsg<Msg>> for UnsubscribeAll<Msg> {
fn into(self) -> ChannelMsg<Msg> {
ChannelMsg::UnsubscribeAll(self)
}
}
#[derive(Clone, Debug, Eq, PartialEq, Hash)]
pub struct Topic(String);
impl<'a> From<&'a str> for Topic {
fn from(topic: &str) -> Self {
Topic(topic.to_string())
}
}
impl From<String> for Topic {
fn from(topic: String) -> Self {
Topic(topic)
}
}
impl<'a> From<&'a SystemEvent> for Topic {
fn from(evt: &SystemEvent) -> Self {
match *evt {
SystemEvent::ActorCreated(_) => Topic::from("actor.created"),
SystemEvent::ActorTerminated(_) => Topic::from("actor.terminated"),
SystemEvent::ActorRestarted(_) => Topic::from("actor.restarted"),
}
}
}
pub struct All;
impl From<All> for Topic {
fn from(all: All) -> Self {
Topic::from("*")
}
}
pub enum SysTopic {
ActorCreated,
ActorTerminated,
ActorRestarted,
}
impl From<SysTopic> for Topic {
fn from(evt: SysTopic) -> Self {
match evt {
SysTopic::ActorCreated => Topic::from("actor.created"),
SysTopic::ActorTerminated => Topic::from("actor.terminated"),
SysTopic::ActorRestarted => Topic::from("actor.restarted"),
}
}
}
pub fn channel<Msg>(name: &str, fact: &impl ActorRefFactory) -> Result<ChannelRef<Msg>, CreateError>
where
Msg: Message,
{
fact.actor_of::<Channel<Msg>>(name)
}