use crate::{bridge::RemoteConnection, message::Message, network::Network};
use anyhow::Result;
use parking_lot::RwLock;
use serde::{Deserialize, Serialize};
use std::{collections::HashMap, sync::Arc};
#[derive(Clone)]
pub struct MessageRouter {
remote_actor_registry: Arc<RwLock<HashMap<String, RemoteActorInfo>>>,
connection_pool: Arc<RwLock<HashMap<String, RemoteConnection>>>,
local_network: Arc<RwLock<Option<Arc<RwLock<Network>>>>>,
local_network_id: Arc<RwLock<String>>,
}
unsafe impl Sync for MessageRouter {}
unsafe impl Send for MessageRouter {}
#[derive(Debug, Clone)]
pub struct RemoteActorInfo {
pub actor_id: String,
pub network_id: String,
pub capabilities: Vec<String>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct RemoteMessage {
pub message_id: String,
pub source_network: String,
pub source_actor: String,
pub target_network: String,
pub target_actor: String,
pub target_port: String,
pub payload: Message,
pub timestamp: chrono::DateTime<chrono::Utc>,
}
impl Default for MessageRouter {
fn default() -> Self {
Self::new()
}
}
impl MessageRouter {
pub fn new() -> Self {
MessageRouter {
remote_actor_registry: Arc::new(RwLock::new(HashMap::new())),
connection_pool: Arc::new(RwLock::new(HashMap::new())),
local_network: Arc::new(RwLock::new(None)),
local_network_id: Arc::new(RwLock::new("local".to_string())),
}
}
pub fn with_connection_pool(
connections: Arc<RwLock<HashMap<String, RemoteConnection>>>,
) -> Self {
MessageRouter {
remote_actor_registry: Arc::new(RwLock::new(HashMap::new())),
connection_pool: connections,
local_network: Arc::new(RwLock::new(None)),
local_network_id: Arc::new(RwLock::new("local".to_string())),
}
}
pub fn set_local_network(&self, network: Arc<RwLock<Network>>, network_id: String) {
*self.local_network.write() = Some(network);
*self.local_network_id.write() = network_id;
}
pub async fn route_message(
&self,
network_id: &str,
actor_id: &str,
port: &str,
message: Message,
source_actor: Option<&str>,
) -> Result<()> {
let source_network = self.get_local_network_id().await?;
let source_actor_id = source_actor.unwrap_or("unknown").to_string();
tracing::info!(
"📨 ROUTER: Routing message from {}::{} to {}::{} on port {}",
source_network,
source_actor_id,
network_id,
actor_id,
port
);
let remote_message = RemoteMessage {
message_id: uuid::Uuid::new_v4().to_string(),
source_network,
source_actor: source_actor_id,
target_network: network_id.to_string(),
target_actor: actor_id.to_string(),
target_port: port.to_string(),
payload: message,
timestamp: chrono::Utc::now(),
};
let connection = {
let connections = self.connection_pool.read();
tracing::info!(
"🔍 ROUTER: Available connections: {:?}",
connections.keys().collect::<Vec<_>>()
);
connections.get(network_id).cloned()
};
if let Some(connection) = connection {
tracing::info!(
"✅ ROUTER: Found connection for network {}, sending message {}",
network_id,
remote_message.message_id
);
match self.send_over_connection(&connection, remote_message).await {
Ok(_) => {
tracing::info!("✅ ROUTER: Successfully sent message over connection");
Ok(())
}
Err(e) => {
tracing::error!("❌ ROUTER: Failed to send message over connection: {}", e);
Err(e)
}
}
} else {
tracing::error!("❌ ROUTER: No connection to network: {}", network_id);
Err(anyhow::anyhow!("No connection to network: {}", network_id))
}
}
pub async fn handle_incoming_message(
&self,
message: RemoteMessage,
) -> Result<(), anyhow::Error> {
tracing::info!(
"🎯 ROUTER: Routing message from {} to local actor: {} port: {}",
message.source_network,
message.target_actor,
message.target_port
);
let local_network_guard = self.local_network.read();
if let Some(ref local_network_arc) = *local_network_guard {
let network = local_network_arc.read();
tracing::info!(
"🔍 ROUTER: Sending to local network, available actors: {:?}",
network.actors.keys().collect::<Vec<_>>()
);
tracing::info!(
"🔍 ROUTER: Available nodes: {:?}",
network.nodes.keys().collect::<Vec<_>>()
);
match network.send_to_actor(
&message.target_actor,
&message.target_port,
message.payload,
) {
Ok(_) => {
tracing::info!(
"✅ ROUTER: Successfully routed message to local actor {}",
message.target_actor
);
}
Err(e) => {
tracing::error!(
"❌ ROUTER: Failed to route message to local actor {}: {}",
message.target_actor,
e
);
return Err(e);
}
}
} else {
tracing::error!("❌ ROUTER: No local network configured");
return Err(anyhow::anyhow!("No local network configured"));
}
Ok(())
}
async fn send_over_connection(
&self,
connection: &RemoteConnection,
message: RemoteMessage,
) -> Result<()> {
tracing::info!("🔗 ROUTER: Serializing message {}", message.message_id);
let serialized = match serde_json::to_string(&message) {
Ok(s) => {
tracing::info!("✅ ROUTER: Serialized message {} bytes", s.len());
s
}
Err(e) => {
tracing::error!("❌ ROUTER: Failed to serialize message: {}", e);
return Err(e.into());
}
};
tracing::info!(
"📡 ROUTER: Sending message over WebSocket to {}",
connection.network_id
);
match connection
.websocket
.send(tokio_tungstenite::tungstenite::Message::Text(
serialized.into(),
))
.await
{
Ok(_) => {
tracing::info!(
"✅ ROUTER: Successfully sent message {} over WebSocket",
message.message_id
);
Ok(())
}
Err(e) => {
tracing::error!("❌ ROUTER: Failed to send message over WebSocket: {}", e);
Err(e.into())
}
}
}
async fn get_local_network_id(&self) -> Result<String> {
Ok(self.local_network_id.read().clone())
}
pub fn get_local_actor_list(&self) -> Vec<crate::bridge::ActorInfo> {
let local_network_guard = self.local_network.read();
if let Some(ref local_network_arc) = *local_network_guard {
let network = local_network_arc.read();
network
.actors
.keys()
.map(|actor_id| crate::bridge::ActorInfo {
actor_id: actor_id.clone(),
capabilities: vec!["actor_messaging".to_string()],
description: None,
})
.collect()
} else {
vec![]
}
}
pub async fn register_remote_actor(
&self,
actor_id: &str,
remote_network_id: &str,
capabilities: Option<Vec<String>>,
) -> Result<(), anyhow::Error> {
let remote_info = RemoteActorInfo {
actor_id: actor_id.to_string(),
network_id: remote_network_id.to_string(),
capabilities: capabilities.unwrap_or_else(|| vec!["actor_messaging".to_string()]),
};
self.remote_actor_registry
.write()
.insert(actor_id.to_string(), remote_info);
tracing::info!(
"Registered remote actor {} from network {}",
actor_id,
remote_network_id
);
Ok(())
}
}