pub mod sub_bus;
pub mod worker;
use crate::sub_bus::{CopyOfSubBus, SubBus};
use log::{debug, error};
use std::any::{Any, TypeId};
use std::collections::HashMap;
use anyhow::{anyhow, Result};
use std::sync::Arc;
use tokio::spawn;
use tokio::sync::mpsc::unbounded_channel;
use tokio::sync::{
mpsc::{UnboundedReceiver, UnboundedSender},
oneshot,
};
use worker::{CopyOfWorker, IdentityOfWorker, WorkerId};
pub type Event = Arc<dyn Any + Send + Sync + 'static>;
pub enum BusData {
Login(oneshot::Sender<IdentityOfWorker>),
Subscribe(WorkerId, TypeId),
DispatchEvent(WorkerId, Event),
Drop(WorkerId),
}
#[derive(Clone)]
pub struct CopyOfBus {
tx: UnboundedSender<BusData>,
}
impl CopyOfBus {
pub async fn login(&self) -> Result<IdentityOfWorker> {
let (tx, rx) = oneshot::channel();
self.tx
.send(BusData::Login(tx))
.map_err(|_| anyhow!("fail to contact bus"))?;
Ok(rx.await.map_err(|_| anyhow!("fail to contact bus"))?)
}
}
pub struct Bus {
rx: UnboundedReceiver<BusData>,
tx: UnboundedSender<BusData>,
workers: HashMap<WorkerId, CopyOfWorker>,
sub_buses: HashMap<TypeId, CopyOfSubBus>,
}
impl Bus {
pub fn init() -> CopyOfBus {
let (tx, rx) = unbounded_channel();
Self {
rx,
tx: tx.clone(),
workers: Default::default(),
sub_buses: Default::default(),
}
.run();
CopyOfBus { tx }
}
fn run(mut self) {
spawn(async move {
while let Some(event) = self.rx.recv().await {
match event {
BusData::Login(tx) => {
let (identity_worker, copy_of_worker) = self.init_worker();
self.workers.insert(copy_of_worker.id(), copy_of_worker);
if tx.send(identity_worker).is_err() {
error!("login fail: tx ack fail");
}
}
BusData::Drop(worker_id) => {
debug!("{:?} Drop", worker_id);
if let Some(worker) = self.workers.remove(&worker_id) {
for ty_id in worker.subscribe_events() {
let should_remove =
if let Some(sub_bus) = self.sub_buses.get_mut(&ty_id) {
sub_bus.send_unsubscribe(worker_id).await == 0
} else {
false
};
if should_remove {
self.sub_buses.remove(ty_id);
}
}
} else {
}
}
BusData::DispatchEvent(worker_id, event) => {
debug!(
"{:?} DispatchEvent {:?}",
worker_id,
event.as_ref().type_id()
);
if let Some(sub_buses) = self.sub_buses.get(&event.as_ref().type_id()) {
sub_buses.send_event(event).await;
}
}
BusData::Subscribe(worker_id, typeid) => {
debug!("Subscribe {:?} {:?}", worker_id, typeid);
if let Some(worker) = self.workers.get_mut(&worker_id) {
worker.subscribe_event(typeid);
if let Some(sub_buses) = self.sub_buses.get_mut(&typeid) {
sub_buses.send_subscribe(worker.init_subscriber()).await;
} else {
let mut copy = SubBus::init(typeid);
copy.send_subscribe(worker.init_subscriber()).await;
self.sub_buses.insert(typeid, copy);
}
}
}
}
}
});
}
fn init_worker(&self) -> (IdentityOfWorker, CopyOfWorker) {
let (tx_event, rx_event) = unbounded_channel();
let id = WorkerId::default();
(
IdentityOfWorker::init(id, rx_event, self.tx.clone()),
CopyOfWorker::init(id, tx_event),
)
}
}