use async_trait::async_trait;
use std::hash::{Hash, Hasher};
use std::fmt;
use std::marker::Send;
use std::sync::{Arc, RwLock};
use crate::message::Message;
use crate::utils::random_string;
use tokio::sync::mpsc::{UnboundedSender, UnboundedReceiver, Receiver, Sender, channel, unbounded_channel};
use tokio::task::JoinHandle;
use futures::Future;
#[async_trait]
pub trait Actor: Send + Sync + 'static {
async fn handle(&mut self, message: Message, context: &ActorContext);
async fn pre_start(&mut self, _context: &ActorContext) {}
async fn stopping(&mut self, _context: &ActorContext) {}
}
impl dyn Actor {
async fn run(&mut self, mut receiver: UnboundedReceiver<Message>, mut stop_receiver: Receiver<()>, context: ActorContext) {
self.pre_start(&context).await;
loop {
tokio::select! {
_v = stop_receiver.recv() => {
context.stop();
break;
},
opt_msg = receiver.recv() => {
let msg = match opt_msg {
Some(msg) => msg,
None => break,
};
self.handle(msg, &context).await
}
}
}
self.stopping(&context).await;
}
}
#[derive(Clone)]
pub struct ActorContext {
pub peer_id: String,
pub router: Addr,
stop_signals: Arc<RwLock<Vec<Sender<()>>>>,
task_handles: Arc<RwLock<Vec<JoinHandle<()>>>>,
pub addr: Addr,
}
impl ActorContext {
pub fn new(peer_id: String) -> Self {
let (sender, _receiver) = unbounded_channel::<Message>();
let noop = Addr::new(sender);
Self {
addr: noop.clone(),
stop_signals: Arc::new(RwLock::new(Vec::new())),
task_handles: Arc::new(RwLock::new(Vec::new())),
peer_id,
router: noop
}
}
fn child_context(&self, addr: Addr, stop_signal: Sender<()>) -> Self {
Self {
addr,
stop_signals: Arc::new(RwLock::new(vec![stop_signal])),
task_handles: Arc::new(RwLock::new(Vec::new())),
peer_id: self.peer_id.clone(), router: self.router.clone()
}
}
pub fn start_actor(&self, actor: Box<dyn Actor>) -> Addr {
self.start_actor_or_router(actor, false)
}
pub fn start_router(&self, actor: Box<dyn Actor>) -> Addr {
self.start_actor_or_router(actor, true)
}
pub fn abort_on_stop(&self, handle: JoinHandle<()>) {
self.task_handles.write().unwrap().push(handle);
}
fn start_actor_or_router(&self, mut actor: Box<dyn Actor>, is_router: bool) -> Addr {
let (sender, receiver) = unbounded_channel::<Message>();
let (stop_sender, stop_receiver) = channel(1);
let addr = Addr::new(sender);
let mut new_context = self.child_context(addr.clone(), stop_sender.clone());
if is_router {
new_context.router = addr.clone();
}
self.stop_signals.write().unwrap().push(stop_sender);
tokio::spawn(async move { actor.run(receiver, stop_receiver, new_context).await }); addr
}
pub fn stop(&self) {
for handle in self.task_handles.read().unwrap().iter() {
handle.abort();
}
for signal in self.stop_signals.read().unwrap().iter() {
let _ = signal.try_send(());
}
}
}
#[derive(Clone, Debug)]
pub struct Addr {
id: String,
pub sender: UnboundedSender<Message>
}
impl Addr {
pub fn new(sender: UnboundedSender<Message>) -> Self {
Self {
id: random_string(32),
sender
}
}
}
impl PartialEq for Addr {
fn eq(&self, other: &Addr) -> bool {
self.id == other.id
}
}
impl Eq for Addr {}
impl Hash for Addr {
fn hash<H: Hasher>(&self, state: &mut H) {
self.id.hash(state);
}
}
impl fmt::Display for Addr {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "actor:{}", self.id)
}
}