use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use futures_util::{SinkExt, StreamExt};
use tokio::sync::{broadcast, mpsc};
use tokio_tungstenite::tungstenite::protocol::Message;
use weave_contracts::{EdgeConfig, EdgeToServer, PatchOp, ServerToEdge};
use super::cache;
use super::registry::GlyphRegistry;
use super::routing::RoutingEngine;
const RECONNECT_INITIAL_DELAY: Duration = Duration::from_secs(2);
const RECONNECT_MAX_DELAY: Duration = Duration::from_secs(30);
pub struct WsClient {
url: String,
edge_id: String,
version: String,
capabilities: Vec<String>,
engine: Arc<RoutingEngine>,
glyphs: Arc<GlyphRegistry>,
cache_path: PathBuf,
outbox_rx: mpsc::Receiver<EdgeToServer>,
outbox_tx: mpsc::Sender<EdgeToServer>,
resync_tx: broadcast::Sender<()>,
}
impl WsClient {
pub fn new(
url: String,
edge_id: String,
version: String,
capabilities: Vec<String>,
engine: Arc<RoutingEngine>,
glyphs: Arc<GlyphRegistry>,
) -> Self {
let cache_path = cache::default_cache_path(&edge_id);
let (outbox_tx, outbox_rx) = mpsc::channel(256);
let (resync_tx, _) = broadcast::channel(8);
Self {
url,
edge_id,
version,
capabilities,
engine,
glyphs,
cache_path,
outbox_rx,
outbox_tx,
resync_tx,
}
}
pub fn outbox(&self) -> mpsc::Sender<EdgeToServer> {
self.outbox_tx.clone()
}
pub fn resync_sender(&self) -> broadcast::Sender<()> {
self.resync_tx.clone()
}
pub async fn prime_from_cache(&self) -> anyhow::Result<()> {
if let Some(cfg) = cache::load(&self.cache_path).await? {
tracing::info!(
mappings = cfg.mappings.len(),
glyphs = cfg.glyphs.len(),
path = %self.cache_path.display(),
"primed routing engine from cache",
);
self.engine.replace_all(cfg.mappings).await;
self.glyphs.replace_all(cfg.glyphs).await;
}
Ok(())
}
pub async fn run(mut self) {
let mut delay = RECONNECT_INITIAL_DELAY;
loop {
match self.connect_once().await {
Ok(_) => {
tracing::info!("ws session ended cleanly; reconnecting");
delay = RECONNECT_INITIAL_DELAY;
}
Err(e) => {
tracing::warn!(error = %e, delay_secs = delay.as_secs(), "ws session failed");
}
}
tokio::time::sleep(delay).await;
delay = (delay * 2).min(RECONNECT_MAX_DELAY);
}
}
async fn connect_once(&mut self) -> anyhow::Result<()> {
let (ws, _) = tokio_tungstenite::connect_async(&self.url).await?;
tracing::info!(url = %self.url, "ws connected");
let (mut tx, mut rx) = ws.split();
let hello = EdgeToServer::Hello {
edge_id: self.edge_id.clone(),
version: self.version.clone(),
capabilities: self.capabilities.clone(),
};
tx.send(Message::Text(serde_json::to_string(&hello)?))
.await?;
let _ = self.resync_tx.send(());
loop {
tokio::select! {
incoming = rx.next() => {
let Some(msg) = incoming else { return Ok(()); };
let msg = msg?;
match msg {
Message::Text(t) => self.handle_server_frame(&t).await?,
Message::Binary(_) => continue,
Message::Ping(p) => tx.send(Message::Pong(p)).await?,
Message::Pong(_) => continue,
Message::Close(_) => return Ok(()),
Message::Frame(_) => continue,
}
}
outbound = self.outbox_rx.recv() => {
let Some(frame) = outbound else { return Ok(()); };
tx.send(Message::Text(serde_json::to_string(&frame)?)).await?;
}
}
}
}
async fn handle_server_frame(&self, text: &str) -> anyhow::Result<()> {
let frame: ServerToEdge = serde_json::from_str(text)?;
match frame {
ServerToEdge::ConfigFull { config } => {
tracing::info!(
mappings = config.mappings.len(),
glyphs = config.glyphs.len(),
edge_id = %config.edge_id,
"received config_full",
);
self.apply_full(&config).await;
let _ = cache::save(&self.cache_path, &config).await;
}
ServerToEdge::ConfigPatch {
mapping_id,
op,
mapping,
} => match op {
PatchOp::Upsert => {
if let Some(m) = mapping {
tracing::info!(
%mapping_id,
device = %m.device_id,
service = %m.service_type,
"config_patch upsert",
);
self.engine.upsert_mapping(m).await;
self.refresh_cache().await;
} else {
tracing::warn!(%mapping_id, "config_patch upsert without mapping payload; ignoring");
}
}
PatchOp::Delete => {
tracing::info!(%mapping_id, "config_patch delete");
self.engine.remove_mapping(&mapping_id).await;
self.refresh_cache().await;
}
},
ServerToEdge::TargetSwitch {
mapping_id,
service_target,
} => {
tracing::info!(%mapping_id, %service_target, "target_switch");
let mut current = self.engine.snapshot().await;
if let Some(idx) = current.iter().position(|m| m.mapping_id == mapping_id) {
current[idx].service_target = service_target;
self.engine.upsert_mapping(current.remove(idx)).await;
self.refresh_cache().await;
} else {
tracing::warn!(%mapping_id, "target_switch for unknown mapping");
}
}
ServerToEdge::GlyphsUpdate { glyphs } => {
tracing::info!(count = glyphs.len(), "received glyphs_update");
self.glyphs.replace_all(glyphs).await;
}
ServerToEdge::Ping => {
let _ = self.outbox_tx.try_send(EdgeToServer::Pong);
}
}
Ok(())
}
async fn apply_full(&self, config: &EdgeConfig) {
self.engine.replace_all(config.mappings.clone()).await;
self.glyphs.replace_all(config.glyphs.clone()).await;
}
async fn refresh_cache(&self) {
let mappings = self.engine.snapshot().await;
let edge_id = self.edge_id.clone();
let glyphs = match cache::load(&self.cache_path).await {
Ok(Some(cfg)) => cfg.glyphs,
_ => Vec::new(),
};
let cfg = EdgeConfig {
edge_id,
mappings,
glyphs,
};
if let Err(e) = cache::save(&self.cache_path, &cfg).await {
tracing::warn!(error = %e, "failed to persist cache after patch");
}
}
}