use crate::config::Config;
use crate::error::{Result, RouterError};
use crate::router::MessageBus;
use crate::routing::RoutingTable;
use parking_lot::RwLock;
use std::path::Path;
use std::sync::Arc;
use std::time::Duration;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use tracing::info;
pub struct Router {
cancel_token: CancellationToken,
handles: Vec<JoinHandle<()>>,
bus: MessageBus,
routing_table: Arc<RwLock<RoutingTable>>,
}
impl Router {
pub async fn start(config: Config) -> Result<Self> {
let has_endpoints = !config.endpoint.is_empty() || config.general.tcp_port.is_some();
if !has_endpoints {
return Err(RouterError::config("No endpoints configured"));
}
let cancel_token = CancellationToken::new();
let orchestrated = crate::orchestration::spawn_all(&config, &cancel_token);
info!(
"Router started with {} endpoint(s) and {} background task(s)",
config.endpoint.len(),
orchestrated.handles.len()
);
Ok(Self {
cancel_token,
handles: orchestrated.handles,
bus: orchestrated.bus,
routing_table: orchestrated.routing_table,
})
}
pub async fn from_str(toml: &str) -> Result<Self> {
let config = Config::parse(toml)?;
Self::start(config).await
}
pub async fn from_file(path: impl AsRef<Path>) -> Result<Self> {
let config = Config::load(path).await?;
Self::start(config).await
}
pub async fn stop(self) {
info!("Router stopping...");
self.cancel_token.cancel();
tokio::time::sleep(Duration::from_millis(500)).await;
for handle in self.handles {
let _ = handle.await;
}
info!("Router stopped");
}
pub fn cancel_token(&self) -> CancellationToken {
self.cancel_token.clone()
}
pub fn bus(&self) -> &MessageBus {
&self.bus
}
pub fn routing_table(&self) -> &Arc<RwLock<RoutingTable>> {
&self.routing_table
}
pub fn is_running(&self) -> bool {
!self.cancel_token.is_cancelled()
}
}
#[cfg(test)]
#[allow(clippy::expect_used)]
mod tests {
use super::*;
#[tokio::test]
async fn test_router_from_str_no_endpoints() {
let result = Router::from_str("").await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_router_start_and_stop() {
let toml = r#"
[[endpoint]]
type = "udp"
address = "127.0.0.1:24550"
mode = "server"
"#;
let router = Router::from_str(toml).await.expect("should start");
assert!(router.is_running());
router.stop().await;
}
#[tokio::test]
async fn test_router_bus_access() {
let toml = r#"
[[endpoint]]
type = "udp"
address = "127.0.0.1:24551"
mode = "server"
"#;
let router = Router::from_str(toml).await.expect("should start");
let _subscriber = router.bus().subscribe();
router.stop().await;
}
#[tokio::test]
async fn test_router_routing_table_access() {
let toml = r#"
[[endpoint]]
type = "udp"
address = "127.0.0.1:24552"
mode = "server"
"#;
let router = Router::from_str(toml).await.expect("should start");
let stats = router.routing_table().read().stats();
assert_eq!(stats.total_systems, 0);
router.stop().await;
}
}