use std::{fmt::Display, sync::Arc, time::Duration};
use futures::StreamExt;
use hakuban::{
tokio_runtime::{monitor::WebsocketListenerSnapshot, WebsocketConnector},
Exchange, JsonDeserializeState,
};
use serde_json::json;
use super::router::Router;
#[derive(Clone, Debug)]
pub enum Topology {
SingleRouter,
TwoStackedRouters,
StarOfRouters,
}
impl Topology {
pub async fn setup_network(self, task_count: usize) -> Vec<Arc<Router>> {
match self {
Topology::SingleRouter => {
let mut routers = vec![];
let root_router = Router::spawn("router", vec!["--monitor-tag=monitor".to_string()], "", true, 0).await;
routers.push(root_router.clone());
let task_routers = (0..task_count).map(|_| root_router.clone());
routers.extend(task_routers);
routers
}
Topology::TwoStackedRouters => {
let root_router = Router::spawn("router", vec!["--monitor-tag=monitor".to_string()], "", true, 0).await;
let main_router = Router::spawn("router", vec!["-c".to_string(), root_router.url.to_string()], "", true, 1).await;
let exchange = Exchange::new();
let upstream = WebsocketConnector::new(exchange.clone(), (root_router.url.clone().as_str().to_string() + "name=regu").as_str()).unwrap();
let mut listener_monitor_contract = exchange.object_observe_contract((vec![json!("monitor")], json!("websocket-listener"))).build();
let mut listener_snapshots = listener_monitor_contract.next().await.unwrap();
while listener_snapshots.next().await.unwrap().json_deserialize::<WebsocketListenerSnapshot>().data.downstream_connections.len() != 2 {
tokio::time::sleep(Duration::from_millis(10)).await;
}
drop(upstream);
tokio::time::sleep(Duration::from_millis(100)).await;
let mut routers = vec![];
routers.push(root_router);
routers.extend((0..task_count).map(|_| main_router.clone()));
routers
}
Topology::StarOfRouters => {
let root_router = Router::spawn("router", vec!["--monitor-tag=monitor".to_string()], "", true, 0).await;
let spoke_routers = futures::future::join_all((0..task_count).map(|i| {
Router::spawn(
"router",
vec!["-c".to_string(), format!("{},name=spoke-{}", root_router.url.to_string(), i + 1), format!("--monitor-tag=monitor-{}", i + 1)],
"",
true,
1 + i,
)
}))
.await;
let exchange = Exchange::new();
let upstream = WebsocketConnector::new(exchange.clone(), (root_router.url.clone().as_str().to_string()).as_str()).unwrap();
let mut listener_monitor_contract = exchange.object_observe_contract((vec![json!("monitor")], json!("websocket-listener"))).build();
let mut listener_snapshots = listener_monitor_contract.next().await.unwrap();
while listener_snapshots.next().await.unwrap().json_deserialize::<WebsocketListenerSnapshot>().data.downstream_connections.len()
!= task_count + 1
{
tokio::time::sleep(Duration::from_millis(10)).await;
}
drop(upstream);
tokio::time::sleep(Duration::from_millis(100)).await;
let mut routers = vec![];
routers.push(root_router);
routers.extend(spoke_routers);
routers
}
}
}
pub async fn teardown_network(routers: Vec<Arc<Router>>) -> std::io::Result<()> {
futures::future::join_all(routers.into_iter().map(|router| router.exit())).await.into_iter().collect::<Result<Vec<()>, std::io::Error>>().map(|_| ())
}
}
impl Display for Topology {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{:?}", self)
}
}