use crate::common::error::Result;
use crate::common::protocol::Frame;
use async_trait::async_trait;
use std::collections::HashMap;
use std::sync::Arc;
use tracing::{debug, warn};
#[async_trait]
pub trait MessageHandler: Send + Sync {
async fn handle(&self, frame: &Frame) -> Result<Option<Frame>>;
}
pub struct MessageRouter {
handlers: HashMap<String, Vec<Arc<dyn MessageHandler>>>,
default_handler: Option<Arc<dyn MessageHandler>>,
}
impl MessageRouter {
pub fn new() -> Self {
Self {
handlers: HashMap::new(),
default_handler: None,
}
}
pub fn register(&mut self, route: impl Into<String>, handler: Arc<dyn MessageHandler>) {
let route = route.into();
self.handlers
.entry(route)
.or_insert_with(Vec::new)
.push(handler);
}
pub fn set_default_handler(&mut self, handler: Arc<dyn MessageHandler>) {
self.default_handler = Some(handler);
}
pub async fn route(&self, frame: &Frame) -> Result<Vec<Frame>> {
let route_key = Self::extract_route_key(frame);
debug!("Routing message: route={}, frame_id={}", route_key, frame.message_id);
let mut replies = Vec::new();
if let Some(handlers) = self.handlers.get(&route_key) {
for handler in handlers {
match handler.handle(frame).await {
Ok(Some(reply)) => {
replies.push(reply);
}
Ok(None) => {
}
Err(e) => {
warn!("Handler error for route {}: {}", route_key, e);
}
}
}
} else if let Some(ref default_handler) = self.default_handler {
match default_handler.handle(frame).await {
Ok(Some(reply)) => {
replies.push(reply);
}
Ok(None) => {
}
Err(e) => {
warn!("Default handler error: {}", e);
}
}
} else {
debug!("No handler found for route: {}", route_key);
}
Ok(replies)
}
fn extract_route_key(frame: &Frame) -> String {
if let Some(ref command) = frame.command {
match &command.r#type {
Some(crate::common::protocol::command::Type::System(sys_cmd)) => {
use crate::common::protocol::system_command::Type as SysType;
match SysType::from_i32(sys_cmd.r#type) {
Some(SysType::Connect) => "system.connect".to_string(),
Some(SysType::ConnectAck) => "system.connect_ack".to_string(),
Some(SysType::Close) => "system.close".to_string(),
Some(SysType::Ping) => "system.ping".to_string(),
Some(SysType::Pong) => "system.pong".to_string(),
Some(SysType::Error) => "system.error".to_string(),
Some(SysType::Event) => "system.event".to_string(),
Some(SysType::Auth) => "system.auth".to_string(),
Some(SysType::AuthAck) => "system.auth_ack".to_string(),
_ => "system.unknown".to_string(),
}
}
Some(crate::common::protocol::command::Type::Message(msg_cmd)) => {
use crate::common::protocol::message_command::Type as MsgType;
match MsgType::from_i32(msg_cmd.r#type) {
Some(MsgType::Send) => "message.send".to_string(),
Some(MsgType::Ack) => "message.ack".to_string(),
Some(MsgType::Data) => "message.data".to_string(),
_ => format!("message.{}", msg_cmd.r#type),
}
}
Some(crate::common::protocol::command::Type::Notification(notif_cmd)) => {
use crate::common::protocol::notification_command::Type as NotifType;
match NotifType::from_i32(notif_cmd.r#type) {
Some(NotifType::System) => "notification.system".to_string(),
Some(NotifType::Broadcast) => "notification.broadcast".to_string(),
Some(NotifType::Alert) => "notification.alert".to_string(),
Some(NotifType::User) => "notification.user".to_string(),
Some(NotifType::Connection) => "notification.connection".to_string(),
_ => format!("notification.{}", notif_cmd.r#type),
}
}
Some(crate::common::protocol::command::Type::Custom(custom_cmd)) => {
format!("custom.{}", custom_cmd.name)
}
None => "unknown".to_string(),
}
} else {
"unknown".to_string()
}
}
pub fn remove_route(&mut self, route: &str) {
self.handlers.remove(route);
}
pub fn clear(&mut self) {
self.handlers.clear();
self.default_handler = None;
}
}
impl Default for MessageRouter {
fn default() -> Self {
Self::new()
}
}
pub struct SimpleHandler {
handler: Box<dyn Fn(&Frame) -> Result<Option<Frame>> + Send + Sync>,
}
impl SimpleHandler {
pub fn new<F>(handler: F) -> Self
where
F: Fn(&Frame) -> Result<Option<Frame>> + Send + Sync + 'static,
{
Self {
handler: Box::new(handler),
}
}
}
#[async_trait]
impl MessageHandler for SimpleHandler {
async fn handle(&self, frame: &Frame) -> Result<Option<Frame>> {
(self.handler)(frame)
}
}
pub struct AsyncHandler {
handler: Arc<dyn Fn(&Frame) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<Option<Frame>>> + Send + '_>> + Send + Sync>,
}
impl AsyncHandler {
pub fn new<F, Fut>(handler: F) -> Self
where
F: Fn(&Frame) -> Fut + Send + Sync + 'static,
Fut: std::future::Future<Output = Result<Option<Frame>>> + Send + 'static,
{
Self {
handler: Arc::new(move |frame| Box::pin(handler(frame))),
}
}
}
#[async_trait]
impl MessageHandler for AsyncHandler {
async fn handle(&self, frame: &Frame) -> Result<Option<Frame>> {
(self.handler)(frame).await
}
}