extern crate sids;
use env_logger::{Builder, Env};
use log::info;
use sids::actors::actor::{Actor, ActorImpl};
use sids::actors::actor_ref::ActorRef;
use sids::actors::messages::{Message, ResponseMessage};
use sids::actors::response_handler::from_oneshot;
use sids::actors::{get_response_handler, send_message_by_id, spawn_actor, start_actor_system};
use std::collections::HashMap;
fn get_loggings() {
let env = Env::default().filter_or("MY_LOG_LEVEL", "info");
Builder::from_env(env).init()
}
#[derive(Debug, Clone)]
enum ChatMessage {
Hello { name: String },
Goodbye,
StringMessage(String),
}
struct Alice {
partners: HashMap<String, ActorRef<ChatMessage, ResponseMessage>>,
}
impl Alice {
fn new() -> Self {
Alice {
partners: HashMap::new(),
}
}
fn add_partner<T: Actor<ChatMessage, ResponseMessage> + 'static>(
&mut self,
partner: T,
name: String,
thread_ref: &'static std::sync::atomic::AtomicUsize,
message_ref: &'static std::sync::atomic::AtomicUsize,
) {
let (sender, receiver) =
tokio::sync::mpsc::channel::<Message<ChatMessage, ResponseMessage>>(100);
let actor = ActorImpl::<T, ChatMessage, ResponseMessage>::new(
Some(name.clone()),
partner,
receiver,
None,
);
let reference = ActorRef::new(actor, sender, thread_ref, message_ref);
self.partners.insert(name, reference);
}
}
impl Actor<ChatMessage, ResponseMessage> for Alice {
async fn receive(&mut self, message: Message<ChatMessage, ResponseMessage>)
where
Self: Sized + 'static,
{
match message {
Message {
payload: Some(ChatMessage::Hello { name: name_string }),
stop: _,
responder: _,
blocking: _,
} => {
info!("Alice received a Hello message");
let (tx, rx) = tokio::sync::oneshot::channel::<ResponseMessage>();
let handler = from_oneshot(tx);
let reference = self.partners.get_mut(&name_string).unwrap();
info!("Alice is sending a message to Bob");
let _ = reference
.send(Message {
payload: Some(ChatMessage::Hello { name: name_string }),
stop: false,
responder: Some(handler),
blocking: None,
})
.await;
info!("Alice sent a message to Bob. Awaiting response.");
let response = rx.await.expect("Failed to receive response");
info!("Alice received a response: {:?}", response);
}
Message {
payload: Some(ChatMessage::Goodbye),
stop: _,
responder: _,
blocking: _,
} => {
info!("Alice received a Goodbye message");
}
Message {
payload: Some(ChatMessage::StringMessage(message)),
stop: _,
responder: Some(response),
blocking: _,
} => {
info!("Alice received a message: {}", message);
response.handle(ResponseMessage::Complete).await;
}
_ => {
info!("Alice received a message with no information.");
}
}
}
}
struct Bob;
impl Bob {
fn new() -> Self {
Bob
}
}
impl Actor<ChatMessage, ResponseMessage> for Bob {
async fn receive(&mut self, message: Message<ChatMessage, ResponseMessage>)
where
Self: Sized + 'static,
{
match message {
Message {
payload: Some(ChatMessage::Hello { name: name_string }),
stop: _,
responder: Some(courrier),
blocking: _,
} => {
info!("{} received a Hello message", name_string);
courrier.handle(ResponseMessage::Success).await;
}
Message {
payload: Some(ChatMessage::StringMessage(message)),
stop: _,
responder: Some(courrier),
blocking: _,
} => {
info!("Bob received a message: {}", message);
courrier.handle(ResponseMessage::Complete).await;
}
_ => {
info!("Bob received a message with irrelevant information.");
}
}
}
}
async fn start_sample_actor_system() -> Result<(), Box<dyn std::error::Error>> {
let mut actor_system = start_actor_system::<ChatMessage, ResponseMessage>();
let thread_ref = actor_system.get_thread_count_reference();
let message_ref = actor_system.get_message_count_reference();
let bob = Bob::new();
let mut alice = Alice::new();
alice.add_partner(bob, "Bob".to_string(), thread_ref, message_ref);
let (handler, rx) = get_response_handler::<ResponseMessage>();
spawn_actor(&mut actor_system, alice, Some("Alice".to_string())).await;
let hello = Message {
payload: Some(ChatMessage::Hello {
name: "Bob".to_string(),
}),
stop: false,
responder: None,
blocking: None,
};
let goodbye = Message {
payload: Some(ChatMessage::Goodbye),
stop: false,
responder: None,
blocking: None,
};
let string_message = Message {
payload: Some(ChatMessage::StringMessage("Hello Alice".to_string())),
stop: false,
responder: Some(handler),
blocking: None,
};
send_message_by_id(&mut actor_system, 0, hello).await?;
send_message_by_id(&mut actor_system, 0, goodbye).await?;
send_message_by_id(&mut actor_system, 0, string_message).await?;
let response = rx
.await
.map_err(|e| format!("Failed to receive response: {}", e))?;
info!("We received a response: {:?} from Alice", response);
info!("Total messages sent: {}", actor_system.get_message_count());
info!("Total threads: {}", actor_system.get_thread_count());
Ok(())
}
#[tokio::main]
async fn main() {
get_loggings();
if let Err(e) = start_sample_actor_system().await {
eprintln!("Error running chatbot example: {}", e);
std::process::exit(1);
}
}