use super::state::MeshBrokerState;
use super::{routes, ws};
use axum::{routing::get, Router};
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use tower_http::cors::{Any, CorsLayer};
use tower_http::timeout::TimeoutLayer;
use tower_http::trace::TraceLayer;
#[derive(Debug, Clone)]
pub struct BrokerConfig {
pub bind_addr: SocketAddr,
pub timeout_secs: u64,
}
impl Default for BrokerConfig {
fn default() -> Self {
Self {
bind_addr: "0.0.0.0:8081"
.parse()
.expect("static bind address is valid"),
timeout_secs: 30,
}
}
}
pub struct Broker {
state: Arc<dyn MeshBrokerState>,
config: BrokerConfig,
}
impl Broker {
pub fn new(state: Arc<dyn MeshBrokerState>) -> Self {
Self {
state,
config: BrokerConfig::default(),
}
}
pub fn with_config(mut self, config: BrokerConfig) -> Self {
self.config = config;
self
}
pub fn build_router(&self) -> Router {
let api = Router::new()
.route("/health", get(routes::health))
.route("/ready", get(routes::readiness))
.route("/node", get(routes::node_info))
.route("/peers", get(routes::list_peers))
.route("/peers/:id", get(routes::get_peer))
.route("/topology", get(routes::topology))
.route("/documents/:collection", get(routes::list_documents))
.route("/documents/:collection/:id", get(routes::get_document))
.route("/ws", get(ws::ws_handler))
.with_state(self.state.clone());
Router::new()
.nest("/api/v1", api)
.layer(
CorsLayer::new()
.allow_origin(Any)
.allow_methods(Any)
.allow_headers(Any),
)
.layer(TimeoutLayer::new(Duration::from_secs(
self.config.timeout_secs,
)))
.layer(TraceLayer::new_for_http())
}
#[cfg(feature = "lite-bridge")]
pub fn build_router_with_ota(&self, ota_state: Arc<super::ota_routes::OtaAppState>) -> Router {
use axum::routing::post;
let api = Router::new()
.route("/health", get(routes::health))
.route("/ready", get(routes::readiness))
.route("/node", get(routes::node_info))
.route("/peers", get(routes::list_peers))
.route("/peers/:id", get(routes::get_peer))
.route("/topology", get(routes::topology))
.route("/documents/:collection", get(routes::list_documents))
.route("/documents/:collection/:id", get(routes::get_document))
.route("/ws", get(ws::ws_handler))
.with_state(self.state.clone());
let ota_api = Router::new()
.route("/ota/:peer_id", post(super::ota_routes::upload_firmware))
.route("/ota/:peer_id/status", get(super::ota_routes::ota_status))
.with_state(ota_state);
Router::new()
.nest("/api/v1", api)
.nest("/api/v1", ota_api)
.layer(
CorsLayer::new()
.allow_origin(Any)
.allow_methods(Any)
.allow_headers(Any),
)
.layer(TimeoutLayer::new(Duration::from_secs(
self.config.timeout_secs,
)))
.layer(TraceLayer::new_for_http())
}
pub async fn serve(self) -> Result<(), BrokerError> {
let router = self.build_router();
let addr = self.config.bind_addr;
tracing::info!("Starting mesh broker on {}", addr);
let listener = tokio::net::TcpListener::bind(addr)
.await
.map_err(|e| BrokerError(format!("failed to bind to {}: {}", addr, e)))?;
tracing::info!("Mesh broker listening on http://{}", addr);
axum::serve(listener, router)
.await
.map_err(|e| BrokerError(format!("broker server error: {}", e)))?;
Ok(())
}
}
#[derive(Debug, thiserror::Error)]
#[error("broker: {0}")]
pub struct BrokerError(pub String);
#[cfg(test)]
mod tests {
use super::*;
use crate::broker::state::{MeshEvent, MeshNodeInfo, PeerSummary, TopologySummary};
use axum::body::Body;
use http_body_util::BodyExt;
use serde_json::{json, Value};
use tokio::sync::broadcast;
use tower::ServiceExt;
struct MockState {
tx: broadcast::Sender<MeshEvent>,
}
impl MockState {
fn new() -> Self {
let (tx, _) = broadcast::channel(16);
Self { tx }
}
}
#[async_trait::async_trait]
impl MeshBrokerState for MockState {
fn node_info(&self) -> MeshNodeInfo {
MeshNodeInfo {
node_id: "test-node".into(),
uptime_secs: 100,
version: "0.1.0-test".into(),
}
}
async fn list_peers(&self) -> Vec<PeerSummary> {
vec![PeerSummary {
id: "peer-a".into(),
connected: true,
state: "active".into(),
rtt_ms: Some(12),
}]
}
async fn get_peer(&self, id: &str) -> Option<PeerSummary> {
if id == "peer-a" {
Some(PeerSummary {
id: "peer-a".into(),
connected: true,
state: "active".into(),
rtt_ms: Some(12),
})
} else {
None
}
}
fn topology(&self) -> TopologySummary {
TopologySummary {
peer_count: 1,
role: "leader".into(),
hierarchy_level: 0,
}
}
fn subscribe_events(&self) -> broadcast::Receiver<MeshEvent> {
self.tx.subscribe()
}
}
struct RichMockState {
tx: broadcast::Sender<MeshEvent>,
}
impl RichMockState {
fn new() -> Self {
let (tx, _) = broadcast::channel(16);
Self { tx }
}
}
#[async_trait::async_trait]
impl MeshBrokerState for RichMockState {
fn node_info(&self) -> MeshNodeInfo {
MeshNodeInfo {
node_id: "rich-node".into(),
uptime_secs: 9999,
version: "1.2.3".into(),
}
}
async fn list_peers(&self) -> Vec<PeerSummary> {
vec![
PeerSummary {
id: "peer-a".into(),
connected: true,
state: "active".into(),
rtt_ms: Some(12),
},
PeerSummary {
id: "peer-b".into(),
connected: false,
state: "disconnected".into(),
rtt_ms: None,
},
PeerSummary {
id: "peer-c".into(),
connected: true,
state: "syncing".into(),
rtt_ms: Some(150),
},
]
}
async fn get_peer(&self, id: &str) -> Option<PeerSummary> {
self.list_peers().await.into_iter().find(|p| p.id == id)
}
fn topology(&self) -> TopologySummary {
TopologySummary {
peer_count: 3,
role: "coordinator".into(),
hierarchy_level: 2,
}
}
fn subscribe_events(&self) -> broadcast::Receiver<MeshEvent> {
self.tx.subscribe()
}
async fn list_documents(&self, collection: &str) -> Option<Vec<Value>> {
match collection {
"sensors" => Some(vec![
json!({"id": "s1", "type": "temperature", "value": 22.5}),
json!({"id": "s2", "type": "humidity", "value": 65.0}),
]),
"empty" => Some(vec![]),
_ => None,
}
}
async fn get_document(&self, collection: &str, id: &str) -> Option<Value> {
self.list_documents(collection)
.await?
.into_iter()
.find(|d| d["id"].as_str() == Some(id))
}
}
fn make_broker() -> Broker {
Broker::new(Arc::new(MockState::new()))
}
fn make_rich_broker() -> Broker {
Broker::new(Arc::new(RichMockState::new()))
}
async fn get_json(router: &Router, uri: &str) -> (u16, Value) {
let req = axum::http::Request::builder()
.uri(uri)
.body(Body::empty())
.unwrap();
let resp = router.clone().oneshot(req).await.unwrap();
let status = resp.status().as_u16();
let body = resp.into_body().collect().await.unwrap().to_bytes();
let json: Value = serde_json::from_slice(&body).unwrap();
(status, json)
}
async fn get_status(router: &Router, uri: &str) -> u16 {
let req = axum::http::Request::builder()
.uri(uri)
.body(Body::empty())
.unwrap();
router.clone().oneshot(req).await.unwrap().status().as_u16()
}
#[test]
fn test_default_config() {
let config = BrokerConfig::default();
assert_eq!(config.bind_addr.port(), 8081);
assert_eq!(config.timeout_secs, 30);
}
#[test]
fn test_with_config() {
let custom = BrokerConfig {
bind_addr: "127.0.0.1:9090".parse().unwrap(),
timeout_secs: 10,
};
let broker = Broker::new(Arc::new(MockState::new())).with_config(custom.clone());
assert_eq!(broker.config.bind_addr.port(), 9090);
assert_eq!(broker.config.timeout_secs, 10);
}
#[test]
fn test_config_debug_and_clone() {
let config = BrokerConfig::default();
let cloned = config.clone();
assert_eq!(format!("{:?}", config), format!("{:?}", cloned));
}
#[tokio::test]
async fn test_health_endpoint() {
let router = make_broker().build_router();
let (status, json) = get_json(&router, "/api/v1/health").await;
assert_eq!(status, 200);
assert_eq!(json["status"], "healthy");
assert_eq!(json["node_id"], "test-node");
}
#[tokio::test]
async fn test_readiness_endpoint_default_ready() {
let router = make_broker().build_router();
let (status, json) = get_json(&router, "/api/v1/ready").await;
assert_eq!(status, 200);
assert_eq!(json["ready"], true);
assert_eq!(json["node_id"], "test-node");
assert!(json["checks"].as_array().unwrap().is_empty());
}
#[tokio::test]
async fn test_readiness_endpoint_not_ready() {
use crate::broker::state::{
MeshBrokerState, MeshEvent, MeshNodeInfo, PeerSummary, ReadinessCheck,
ReadinessResponse, TopologySummary,
};
struct NotReadyState {
tx: broadcast::Sender<MeshEvent>,
}
#[async_trait::async_trait]
impl MeshBrokerState for NotReadyState {
fn node_info(&self) -> MeshNodeInfo {
MeshNodeInfo {
node_id: "unready-node".into(),
uptime_secs: 0,
version: "0.1.0".into(),
}
}
async fn list_peers(&self) -> Vec<PeerSummary> {
vec![]
}
async fn get_peer(&self, _id: &str) -> Option<PeerSummary> {
None
}
fn topology(&self) -> TopologySummary {
TopologySummary {
peer_count: 0,
role: "none".into(),
hierarchy_level: 0,
}
}
fn subscribe_events(&self) -> broadcast::Receiver<MeshEvent> {
self.tx.subscribe()
}
fn readiness(&self) -> ReadinessResponse {
ReadinessResponse {
ready: false,
node_id: "unready-node".into(),
checks: vec![ReadinessCheck {
name: "transport".into(),
ready: false,
message: Some("not connected".into()),
}],
}
}
}
let (tx, _) = broadcast::channel(16);
let broker = Broker::new(Arc::new(NotReadyState { tx }));
let router = broker.build_router();
let (status, json) = get_json(&router, "/api/v1/ready").await;
assert_eq!(status, 503);
assert_eq!(json["ready"], false);
assert_eq!(json["node_id"], "unready-node");
assert_eq!(json["checks"][0]["name"], "transport");
assert_eq!(json["checks"][0]["message"], "not connected");
}
#[tokio::test]
async fn test_node_info_endpoint() {
let router = make_broker().build_router();
let (status, json) = get_json(&router, "/api/v1/node").await;
assert_eq!(status, 200);
assert_eq!(json["node_id"], "test-node");
assert_eq!(json["uptime_secs"], 100);
assert_eq!(json["version"], "0.1.0-test");
}
#[tokio::test]
async fn test_node_info_rich() {
let router = make_rich_broker().build_router();
let (status, json) = get_json(&router, "/api/v1/node").await;
assert_eq!(status, 200);
assert_eq!(json["node_id"], "rich-node");
assert_eq!(json["uptime_secs"], 9999);
assert_eq!(json["version"], "1.2.3");
}
#[tokio::test]
async fn test_list_peers_single() {
let router = make_broker().build_router();
let (status, json) = get_json(&router, "/api/v1/peers").await;
assert_eq!(status, 200);
assert_eq!(json["count"], 1);
assert_eq!(json["peers"][0]["id"], "peer-a");
assert_eq!(json["peers"][0]["connected"], true);
assert_eq!(json["peers"][0]["rtt_ms"], 12);
}
#[tokio::test]
async fn test_list_peers_multiple() {
let router = make_rich_broker().build_router();
let (status, json) = get_json(&router, "/api/v1/peers").await;
assert_eq!(status, 200);
assert_eq!(json["count"], 3);
let peers = json["peers"].as_array().unwrap();
assert_eq!(peers[0]["id"], "peer-a");
assert_eq!(peers[0]["connected"], true);
assert_eq!(peers[0]["rtt_ms"], 12);
assert_eq!(peers[1]["id"], "peer-b");
assert_eq!(peers[1]["connected"], false);
assert!(peers[1]["rtt_ms"].is_null());
assert_eq!(peers[2]["id"], "peer-c");
assert_eq!(peers[2]["state"], "syncing");
assert_eq!(peers[2]["rtt_ms"], 150);
}
#[tokio::test]
async fn test_get_peer_found() {
let router = make_broker().build_router();
let (status, json) = get_json(&router, "/api/v1/peers/peer-a").await;
assert_eq!(status, 200);
assert_eq!(json["id"], "peer-a");
assert_eq!(json["connected"], true);
assert_eq!(json["state"], "active");
assert_eq!(json["rtt_ms"], 12);
}
#[tokio::test]
async fn test_get_peer_disconnected_with_null_rtt() {
let router = make_rich_broker().build_router();
let (status, json) = get_json(&router, "/api/v1/peers/peer-b").await;
assert_eq!(status, 200);
assert_eq!(json["id"], "peer-b");
assert_eq!(json["connected"], false);
assert_eq!(json["state"], "disconnected");
assert!(json["rtt_ms"].is_null());
}
#[tokio::test]
async fn test_get_peer_not_found() {
let router = make_broker().build_router();
let (status, json) = get_json(&router, "/api/v1/peers/no-such").await;
assert_eq!(status, 404);
assert_eq!(json["status"], 404);
assert!(json["error"].as_str().unwrap().contains("not found"));
}
#[tokio::test]
async fn test_topology_endpoint() {
let router = make_broker().build_router();
let (status, json) = get_json(&router, "/api/v1/topology").await;
assert_eq!(status, 200);
assert_eq!(json["role"], "leader");
assert_eq!(json["peer_count"], 1);
assert_eq!(json["hierarchy_level"], 0);
}
#[tokio::test]
async fn test_topology_rich() {
let router = make_rich_broker().build_router();
let (status, json) = get_json(&router, "/api/v1/topology").await;
assert_eq!(status, 200);
assert_eq!(json["role"], "coordinator");
assert_eq!(json["peer_count"], 3);
assert_eq!(json["hierarchy_level"], 2);
}
#[tokio::test]
async fn test_documents_not_implemented() {
let router = make_broker().build_router();
let (status, json) = get_json(&router, "/api/v1/documents/test-col").await;
assert_eq!(status, 404);
assert!(json["error"].as_str().unwrap().contains("not available"));
}
#[tokio::test]
async fn test_list_documents_success() {
let router = make_rich_broker().build_router();
let (status, json) = get_json(&router, "/api/v1/documents/sensors").await;
assert_eq!(status, 200);
assert_eq!(json["collection"], "sensors");
assert_eq!(json["count"], 2);
let docs = json["documents"].as_array().unwrap();
assert_eq!(docs[0]["id"], "s1");
assert_eq!(docs[0]["type"], "temperature");
assert_eq!(docs[0]["value"], 22.5);
assert_eq!(docs[1]["id"], "s2");
assert_eq!(docs[1]["type"], "humidity");
}
#[tokio::test]
async fn test_list_documents_empty_collection() {
let router = make_rich_broker().build_router();
let (status, json) = get_json(&router, "/api/v1/documents/empty").await;
assert_eq!(status, 200);
assert_eq!(json["collection"], "empty");
assert_eq!(json["count"], 0);
assert_eq!(json["documents"].as_array().unwrap().len(), 0);
}
#[tokio::test]
async fn test_list_documents_unknown_collection() {
let router = make_rich_broker().build_router();
let (status, json) = get_json(&router, "/api/v1/documents/unknown").await;
assert_eq!(status, 404);
assert!(json["error"].as_str().unwrap().contains("not available"));
}
#[tokio::test]
async fn test_get_document_success() {
let router = make_rich_broker().build_router();
let (status, json) = get_json(&router, "/api/v1/documents/sensors/s1").await;
assert_eq!(status, 200);
assert_eq!(json["collection"], "sensors");
assert_eq!(json["id"], "s1");
assert_eq!(json["document"]["type"], "temperature");
assert_eq!(json["document"]["value"], 22.5);
}
#[tokio::test]
async fn test_get_document_not_found_wrong_id() {
let router = make_rich_broker().build_router();
let (status, json) = get_json(&router, "/api/v1/documents/sensors/no-such").await;
assert_eq!(status, 404);
assert!(json["error"]
.as_str()
.unwrap()
.contains("document not found"));
}
#[tokio::test]
async fn test_get_document_not_found_wrong_collection() {
let router = make_rich_broker().build_router();
let (status, json) = get_json(&router, "/api/v1/documents/unknown/s1").await;
assert_eq!(status, 404);
assert!(json["error"]
.as_str()
.unwrap()
.contains("document not found"));
}
#[tokio::test]
async fn test_unknown_route_returns_404() {
let router = make_broker().build_router();
let status = get_status(&router, "/api/v1/nonexistent").await;
assert_eq!(status, 404);
}
#[tokio::test]
async fn test_root_returns_404() {
let router = make_broker().build_router();
let status = get_status(&router, "/").await;
assert_eq!(status, 404);
}
#[tokio::test]
async fn test_ws_streams_events() {
use futures_util::StreamExt;
let mock = Arc::new(MockState::new());
let tx = mock.tx.clone();
let broker = Broker::new(mock);
let router = broker.build_router();
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
tokio::spawn(async move {
axum::serve(listener, router).await.unwrap();
});
let (mut ws, _) = tokio_tungstenite::connect_async(format!("ws://{}/api/v1/ws", addr))
.await
.expect("ws connect failed");
tx.send(MeshEvent::PeerConnected {
peer_id: "peer-x".into(),
})
.unwrap();
let msg = tokio::time::timeout(Duration::from_secs(2), ws.next())
.await
.expect("timed out waiting for ws message")
.expect("stream ended")
.expect("ws error");
let text = msg.into_text().expect("expected text frame");
let event: Value = serde_json::from_str(&text).unwrap();
assert_eq!(event["type"], "PeerConnected");
assert_eq!(event["peer_id"], "peer-x");
tx.send(MeshEvent::TopologyChanged {
new_role: "follower".into(),
peer_count: 4,
})
.unwrap();
let msg = tokio::time::timeout(Duration::from_secs(2), ws.next())
.await
.expect("timed out")
.expect("stream ended")
.expect("ws error");
let text = msg.into_text().unwrap();
let event: Value = serde_json::from_str(&text).unwrap();
assert_eq!(event["type"], "TopologyChanged");
assert_eq!(event["new_role"], "follower");
assert_eq!(event["peer_count"], 4);
ws.close(None).await.ok();
}
#[tokio::test]
async fn test_ws_multiple_event_types() {
use futures_util::StreamExt;
let mock = Arc::new(MockState::new());
let tx = mock.tx.clone();
let broker = Broker::new(mock);
let router = broker.build_router();
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
tokio::spawn(async move {
axum::serve(listener, router).await.unwrap();
});
let (mut ws, _) = tokio_tungstenite::connect_async(format!("ws://{}/api/v1/ws", addr))
.await
.unwrap();
let events = vec![
MeshEvent::PeerConnected {
peer_id: "p1".into(),
},
MeshEvent::PeerDisconnected {
peer_id: "p2".into(),
reason: "timeout".into(),
},
MeshEvent::TopologyChanged {
new_role: "member".into(),
peer_count: 5,
},
MeshEvent::SyncEvent {
collection: "sensors".into(),
doc_id: "d1".into(),
action: "update".into(),
},
];
for event in &events {
tx.send(event.clone()).unwrap();
}
for expected_type in &[
"PeerConnected",
"PeerDisconnected",
"TopologyChanged",
"SyncEvent",
] {
let msg = tokio::time::timeout(Duration::from_secs(2), ws.next())
.await
.expect("timed out")
.expect("stream ended")
.expect("ws error");
let text = msg.into_text().unwrap();
let json: Value = serde_json::from_str(&text).unwrap();
assert_eq!(json["type"], *expected_type);
}
}
#[tokio::test]
async fn test_ws_sender_drop_closes_stream() {
use futures_util::StreamExt;
use std::sync::Mutex;
struct ClosableMockState {
tx: Mutex<Option<broadcast::Sender<MeshEvent>>>,
}
impl ClosableMockState {
fn new(tx: broadcast::Sender<MeshEvent>) -> Self {
Self {
tx: Mutex::new(Some(tx)),
}
}
fn close(&self) {
self.tx.lock().unwrap_or_else(|e| e.into_inner()).take();
}
}
#[async_trait::async_trait]
impl MeshBrokerState for ClosableMockState {
fn node_info(&self) -> MeshNodeInfo {
MeshNodeInfo {
node_id: "n".into(),
uptime_secs: 0,
version: "0".into(),
}
}
async fn list_peers(&self) -> Vec<PeerSummary> {
vec![]
}
async fn get_peer(&self, _id: &str) -> Option<PeerSummary> {
None
}
fn topology(&self) -> TopologySummary {
TopologySummary {
peer_count: 0,
role: "none".into(),
hierarchy_level: 0,
}
}
fn subscribe_events(&self) -> broadcast::Receiver<MeshEvent> {
let guard = self.tx.lock().unwrap_or_else(|e| e.into_inner());
match &*guard {
Some(tx) => tx.subscribe(),
None => {
let (tx, rx) = broadcast::channel(1);
drop(tx); rx
}
}
}
}
let (tx, _) = broadcast::channel::<MeshEvent>(16);
let mock = Arc::new(ClosableMockState::new(tx));
let mock_ref = Arc::clone(&mock);
let broker = Broker::new(mock);
let router = broker.build_router();
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
tokio::spawn(async move {
axum::serve(listener, router).await.unwrap();
});
let (mut ws, _) = tokio_tungstenite::connect_async(format!("ws://{}/api/v1/ws", addr))
.await
.unwrap();
mock_ref.close();
let result = tokio::time::timeout(Duration::from_secs(2), ws.next()).await;
match result {
Ok(Some(Ok(msg))) => {
assert!(
msg.is_close(),
"expected close or end of stream, got: {:?}",
msg
);
}
Ok(None) => {} Ok(Some(Err(_))) => {} Err(_) => panic!("timed out waiting for ws stream to close after sender drop"),
}
}
#[tokio::test]
async fn test_ws_client_sends_message_then_close() {
use futures_util::{SinkExt, StreamExt};
use tokio_tungstenite::tungstenite::Message as WsMessage;
let mock = Arc::new(MockState::new());
let tx = mock.tx.clone();
let broker = Broker::new(mock);
let router = broker.build_router();
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
tokio::spawn(async move {
axum::serve(listener, router).await.unwrap();
});
let (mut ws, _) = tokio_tungstenite::connect_async(format!("ws://{}/api/v1/ws", addr))
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(50)).await;
ws.send(WsMessage::Text("hello from client".into()))
.await
.ok();
tokio::time::sleep(Duration::from_millis(50)).await;
tx.send(MeshEvent::PeerConnected {
peer_id: "after-msg".into(),
})
.unwrap();
let msg = tokio::time::timeout(Duration::from_secs(2), ws.next())
.await
.expect("timed out")
.expect("stream ended")
.expect("ws error");
let text = msg.into_text().unwrap();
let json: Value = serde_json::from_str(&text).unwrap();
assert_eq!(json["type"], "PeerConnected");
ws.close(None).await.ok();
}
#[tokio::test]
async fn test_ws_lagged_events() {
use futures_util::StreamExt;
struct TinyChannelMock {
tx: broadcast::Sender<MeshEvent>,
}
impl TinyChannelMock {
fn new() -> Self {
let (tx, _) = broadcast::channel(1);
Self { tx }
}
}
#[async_trait::async_trait]
impl MeshBrokerState for TinyChannelMock {
fn node_info(&self) -> MeshNodeInfo {
MeshNodeInfo {
node_id: "n".into(),
uptime_secs: 0,
version: "0".into(),
}
}
async fn list_peers(&self) -> Vec<PeerSummary> {
vec![]
}
async fn get_peer(&self, _id: &str) -> Option<PeerSummary> {
None
}
fn topology(&self) -> TopologySummary {
TopologySummary {
peer_count: 0,
role: "none".into(),
hierarchy_level: 0,
}
}
fn subscribe_events(&self) -> broadcast::Receiver<MeshEvent> {
self.tx.subscribe()
}
}
let mock = Arc::new(TinyChannelMock::new());
let tx = mock.tx.clone();
let broker = Broker::new(mock);
let router = broker.build_router();
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
tokio::spawn(async move {
axum::serve(listener, router).await.unwrap();
});
let (mut ws, _) = tokio_tungstenite::connect_async(format!("ws://{}/api/v1/ws", addr))
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(100)).await;
for i in 0..10 {
let _ = tx.send(MeshEvent::PeerConnected {
peer_id: format!("p{}", i),
});
}
tokio::time::sleep(Duration::from_millis(100)).await;
tx.send(MeshEvent::TopologyChanged {
new_role: "after-lag".into(),
peer_count: 99,
})
.unwrap();
let msg = tokio::time::timeout(Duration::from_secs(2), ws.next()).await;
let _ = msg;
}
#[tokio::test]
async fn test_ws_client_drop_connection() {
let mock = Arc::new(MockState::new());
let broker = Broker::new(mock);
let router = broker.build_router();
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
tokio::spawn(async move {
axum::serve(listener, router).await.unwrap();
});
let (ws, _) = tokio_tungstenite::connect_async(format!("ws://{}/api/v1/ws", addr))
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(50)).await;
drop(ws);
tokio::time::sleep(Duration::from_millis(200)).await;
}
#[tokio::test]
async fn test_ws_send_to_dropped_client() {
use futures_util::StreamExt;
let mock = Arc::new(MockState::new());
let tx = mock.tx.clone();
let broker = Broker::new(mock);
let router = broker.build_router();
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
tokio::spawn(async move {
axum::serve(listener, router).await.unwrap();
});
let (mut ws, _) = tokio_tungstenite::connect_async(format!("ws://{}/api/v1/ws", addr))
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(50)).await;
tx.send(MeshEvent::PeerConnected {
peer_id: "warmup".into(),
})
.unwrap();
let _ = tokio::time::timeout(Duration::from_secs(1), ws.next()).await;
for i in 0..20 {
let _ = tx.send(MeshEvent::PeerConnected {
peer_id: format!("flood-{}", i),
});
}
drop(ws);
tokio::time::sleep(Duration::from_millis(300)).await;
}
#[test]
fn test_broker_error_display() {
let err = BrokerError("bind failed".into());
assert_eq!(err.to_string(), "broker: bind failed");
}
#[test]
fn test_rich_mock_subscribe_events() {
let mock = RichMockState::new();
let _rx = mock.subscribe_events();
}
}