hakuban 0.8.5

Data-object sharing library
Documentation
use std::{fmt::Display, sync::Arc, time::Duration};

use futures::StreamExt;
use hakuban::{
	tokio_runtime::{monitor::WebsocketListenerSnapshot, WebsocketConnector},
	Exchange, JsonDeserializeState,
};
use serde_json::json;

use super::router::Router;

#[derive(Clone, Debug)]
pub enum Topology {
	SingleRouter,
	TwoStackedRouters,
	StarOfRouters,
}

impl Topology {
	pub async fn setup_network(self, task_count: usize) -> Vec<Arc<Router>> {
		match self {
			Topology::SingleRouter => {
				let mut routers = vec![];
				let root_router = Router::spawn("router", vec!["--monitor-tag=monitor".to_string()], "", true, 0).await;
				routers.push(root_router.clone());
				let task_routers = (0..task_count).map(|_| root_router.clone());
				routers.extend(task_routers);
				routers
			}
			Topology::TwoStackedRouters => {
				let root_router = Router::spawn("router", vec!["--monitor-tag=monitor".to_string()], "", true, 0).await;
				let main_router = Router::spawn("router", vec!["-c".to_string(), root_router.url.to_string()], "", true, 1).await;
				let exchange = Exchange::new();
				let upstream = WebsocketConnector::new(exchange.clone(), (root_router.url.clone().as_str().to_string() + "name=regu").as_str()).unwrap();
				let mut listener_monitor_contract = exchange.object_observe_contract((vec![json!("monitor")], json!("websocket-listener"))).build();
				let mut listener_snapshots = listener_monitor_contract.next().await.unwrap();
				while listener_snapshots.next().await.unwrap().json_deserialize::<WebsocketListenerSnapshot>().data.downstream_connections.len() != 2 {
					tokio::time::sleep(Duration::from_millis(10)).await;
				}
				drop(upstream);
				tokio::time::sleep(Duration::from_millis(100)).await;
				let mut routers = vec![];
				routers.push(root_router);
				routers.extend((0..task_count).map(|_| main_router.clone()));
				routers
			}
			Topology::StarOfRouters => {
				let root_router = Router::spawn("router", vec!["--monitor-tag=monitor".to_string()], "", true, 0).await;
				let spoke_routers = futures::future::join_all((0..task_count).map(|i| {
					Router::spawn(
						"router",
						vec!["-c".to_string(), format!("{},name=spoke-{}", root_router.url.to_string(), i + 1), format!("--monitor-tag=monitor-{}", i + 1)],
						"",
						true,
						1 + i,
					)
				}))
				.await;
				let exchange = Exchange::new();
				let upstream = WebsocketConnector::new(exchange.clone(), (root_router.url.clone().as_str().to_string()).as_str()).unwrap();
				let mut listener_monitor_contract = exchange.object_observe_contract((vec![json!("monitor")], json!("websocket-listener"))).build();
				let mut listener_snapshots = listener_monitor_contract.next().await.unwrap();
				while listener_snapshots.next().await.unwrap().json_deserialize::<WebsocketListenerSnapshot>().data.downstream_connections.len()
					!= task_count + 1
				{
					tokio::time::sleep(Duration::from_millis(10)).await;
				}
				drop(upstream);
				tokio::time::sleep(Duration::from_millis(100)).await;
				let mut routers = vec![];
				routers.push(root_router);
				routers.extend(spoke_routers);
				routers
			}
		}
	}

	pub async fn teardown_network(routers: Vec<Arc<Router>>) -> std::io::Result<()> {
		futures::future::join_all(routers.into_iter().map(|router| router.exit())).await.into_iter().collect::<Result<Vec<()>, std::io::Error>>().map(|_| ())
	}
}

impl Display for Topology {
	fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
		write!(f, "{:?}", self)
	}
}