1use std::path::PathBuf;
12use std::sync::Arc;
13use std::time::Duration;
14
15use futures_util::{SinkExt, StreamExt};
16use tokio::sync::{broadcast, mpsc};
17use tokio_tungstenite::tungstenite::protocol::Message;
18use weave_contracts::{EdgeConfig, EdgeToServer, PatchOp, ServerToEdge};
19
20use super::cache;
21use super::registry::GlyphRegistry;
22use super::routing::RoutingEngine;
23
24const RECONNECT_INITIAL_DELAY: Duration = Duration::from_secs(2);
25const RECONNECT_MAX_DELAY: Duration = Duration::from_secs(30);
26
27pub struct WsClient {
28 url: String,
29 edge_id: String,
30 version: String,
31 capabilities: Vec<String>,
32 engine: Arc<RoutingEngine>,
33 glyphs: Arc<GlyphRegistry>,
34 cache_path: PathBuf,
35 outbox_rx: mpsc::Receiver<EdgeToServer>,
36 outbox_tx: mpsc::Sender<EdgeToServer>,
37 resync_tx: broadcast::Sender<()>,
38}
39
40impl WsClient {
41 pub fn new(
42 url: String,
43 edge_id: String,
44 version: String,
45 capabilities: Vec<String>,
46 engine: Arc<RoutingEngine>,
47 glyphs: Arc<GlyphRegistry>,
48 ) -> Self {
49 let cache_path = cache::default_cache_path(&edge_id);
50 let (outbox_tx, outbox_rx) = mpsc::channel(256);
51 let (resync_tx, _) = broadcast::channel(8);
54 Self {
55 url,
56 edge_id,
57 version,
58 capabilities,
59 engine,
60 glyphs,
61 cache_path,
62 outbox_rx,
63 outbox_tx,
64 resync_tx,
65 }
66 }
67
68 pub fn outbox(&self) -> mpsc::Sender<EdgeToServer> {
71 self.outbox_tx.clone()
72 }
73
74 pub fn resync_sender(&self) -> broadcast::Sender<()> {
83 self.resync_tx.clone()
84 }
85
86 pub async fn prime_from_cache(&self) -> anyhow::Result<()> {
89 if let Some(cfg) = cache::load(&self.cache_path).await? {
90 tracing::info!(
91 mappings = cfg.mappings.len(),
92 glyphs = cfg.glyphs.len(),
93 path = %self.cache_path.display(),
94 "primed routing engine from cache",
95 );
96 self.engine.replace_all(cfg.mappings).await;
97 self.glyphs.replace_all(cfg.glyphs).await;
98 }
99 Ok(())
100 }
101
102 pub async fn run(mut self) {
104 let mut delay = RECONNECT_INITIAL_DELAY;
105 loop {
106 match self.connect_once().await {
107 Ok(_) => {
108 tracing::info!("ws session ended cleanly; reconnecting");
109 delay = RECONNECT_INITIAL_DELAY;
110 }
111 Err(e) => {
112 tracing::warn!(error = %e, delay_secs = delay.as_secs(), "ws session failed");
113 }
114 }
115 tokio::time::sleep(delay).await;
116 delay = (delay * 2).min(RECONNECT_MAX_DELAY);
117 }
118 }
119
120 async fn connect_once(&mut self) -> anyhow::Result<()> {
121 let (ws, _) = tokio_tungstenite::connect_async(&self.url).await?;
122 tracing::info!(url = %self.url, "ws connected");
123 let (mut tx, mut rx) = ws.split();
124
125 let hello = EdgeToServer::Hello {
126 edge_id: self.edge_id.clone(),
127 version: self.version.clone(),
128 capabilities: self.capabilities.clone(),
129 };
130 tx.send(Message::Text(serde_json::to_string(&hello)?))
131 .await?;
132
133 let _ = self.resync_tx.send(());
137
138 loop {
139 tokio::select! {
140 incoming = rx.next() => {
141 let Some(msg) = incoming else { return Ok(()); };
142 let msg = msg?;
143 match msg {
144 Message::Text(t) => self.handle_server_frame(&t).await?,
145 Message::Binary(_) => continue,
146 Message::Ping(p) => tx.send(Message::Pong(p)).await?,
147 Message::Pong(_) => continue,
148 Message::Close(_) => return Ok(()),
149 Message::Frame(_) => continue,
150 }
151 }
152 outbound = self.outbox_rx.recv() => {
153 let Some(frame) = outbound else { return Ok(()); };
154 tx.send(Message::Text(serde_json::to_string(&frame)?)).await?;
155 }
156 }
157 }
158 }
159
160 async fn handle_server_frame(&self, text: &str) -> anyhow::Result<()> {
161 let frame: ServerToEdge = serde_json::from_str(text)?;
162 match frame {
163 ServerToEdge::ConfigFull { config } => {
164 tracing::info!(
165 mappings = config.mappings.len(),
166 glyphs = config.glyphs.len(),
167 edge_id = %config.edge_id,
168 "received config_full",
169 );
170 self.apply_full(&config).await;
171 let _ = cache::save(&self.cache_path, &config).await;
172 }
173 ServerToEdge::ConfigPatch {
174 mapping_id,
175 op,
176 mapping,
177 } => match op {
178 PatchOp::Upsert => {
179 if let Some(m) = mapping {
180 tracing::info!(
181 %mapping_id,
182 device = %m.device_id,
183 service = %m.service_type,
184 "config_patch upsert",
185 );
186 self.engine.upsert_mapping(m).await;
187 self.refresh_cache().await;
188 } else {
189 tracing::warn!(%mapping_id, "config_patch upsert without mapping payload; ignoring");
190 }
191 }
192 PatchOp::Delete => {
193 tracing::info!(%mapping_id, "config_patch delete");
194 self.engine.remove_mapping(&mapping_id).await;
195 self.refresh_cache().await;
196 }
197 },
198 ServerToEdge::TargetSwitch {
199 mapping_id,
200 service_target,
201 } => {
202 tracing::info!(%mapping_id, %service_target, "target_switch");
205 let mut current = self.engine.snapshot().await;
206 if let Some(idx) = current.iter().position(|m| m.mapping_id == mapping_id) {
207 current[idx].service_target = service_target;
208 self.engine.upsert_mapping(current.remove(idx)).await;
209 self.refresh_cache().await;
210 } else {
211 tracing::warn!(%mapping_id, "target_switch for unknown mapping");
212 }
213 }
214 ServerToEdge::GlyphsUpdate { glyphs } => {
215 tracing::info!(count = glyphs.len(), "received glyphs_update");
216 self.glyphs.replace_all(glyphs).await;
217 }
218 ServerToEdge::Ping => {
219 let _ = self.outbox_tx.try_send(EdgeToServer::Pong);
222 }
223 }
224 Ok(())
225 }
226
227 async fn apply_full(&self, config: &EdgeConfig) {
228 self.engine.replace_all(config.mappings.clone()).await;
229 self.glyphs.replace_all(config.glyphs.clone()).await;
230 }
231
232 async fn refresh_cache(&self) {
236 let mappings = self.engine.snapshot().await;
237 let edge_id = self.edge_id.clone();
241 let glyphs = match cache::load(&self.cache_path).await {
242 Ok(Some(cfg)) => cfg.glyphs,
243 _ => Vec::new(),
244 };
245 let cfg = EdgeConfig {
246 edge_id,
247 mappings,
248 glyphs,
249 };
250 if let Err(e) = cache::save(&self.cache_path, &cfg).await {
251 tracing::warn!(error = %e, "failed to persist cache after patch");
252 }
253 }
254}