use crate::message::Message;
use anyhow::Result;
use async_trait::async_trait;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
pub type MessageHandler = Arc<
dyn Fn(Message) -> Pin<Box<dyn Future<Output = Result<Option<String>>> + Send>> + Send + Sync,
>;
pub fn into_handler<F, Fut>(f: F) -> MessageHandler
where
F: Fn(Message) -> Fut + Send + Sync + 'static,
Fut: Future<Output = Result<Option<String>>> + Send + 'static,
{
Arc::new(move |msg| Box::pin(f(msg)))
}
#[async_trait]
pub trait ChatListener: Send + Sync {
fn address(&self) -> &str;
fn protocol(&self) -> &str;
async fn start(
&mut self,
handler: MessageHandler,
alive: tokio::sync::mpsc::Sender<()>,
) -> Result<()>;
async fn shutdown(&mut self) -> Result<()>;
}
#[async_trait]
pub trait ChatServer: Send + Sync {
fn name(&self) -> &str;
fn listeners(&self) -> Vec<&dyn ChatListener>;
async fn run<F, Fut>(&mut self, handler: F) -> Result<()>
where
F: Fn(Message) -> Fut + Send + Sync + 'static,
Fut: std::future::Future<Output = Result<Option<String>>> + Send + 'static;
async fn shutdown(&mut self) -> Result<()>;
}
pub struct Server {
name: String,
listeners: Vec<Box<dyn ChatListener>>,
}
impl Server {
pub fn new(name: impl Into<String>) -> Self {
Self {
name: name.into(),
listeners: Vec::new(),
}
}
pub fn add_listener(mut self, listener: impl ChatListener + 'static) -> Self {
self.listeners.push(Box::new(listener));
self
}
pub fn add_boxed_listener(mut self, listener: Box<dyn ChatListener>) -> Self {
self.listeners.push(listener);
self
}
}
#[async_trait]
impl ChatServer for Server {
fn name(&self) -> &str {
&self.name
}
fn listeners(&self) -> Vec<&dyn ChatListener> {
self.listeners.iter().map(|l| l.as_ref()).collect()
}
async fn run<F, Fut>(&mut self, handler: F) -> Result<()>
where
F: Fn(Message) -> Fut + Send + Sync + 'static,
Fut: std::future::Future<Output = Result<Option<String>>> + Send + 'static,
{
let handler: MessageHandler = into_handler(handler);
let (alive_tx, mut alive_rx) = tokio::sync::mpsc::channel::<()>(1);
for listener in &mut self.listeners {
listener.start(handler.clone(), alive_tx.clone()).await?;
}
drop(alive_tx);
let _ = alive_rx.recv().await;
Ok(())
}
async fn shutdown(&mut self) -> Result<()> {
for listener in &mut self.listeners {
listener.shutdown().await?;
}
Ok(())
}
}