1use std::path::PathBuf;
12use std::sync::Arc;
13use std::time::Duration;
14
15use futures_util::{SinkExt, StreamExt};
16use tokio::sync::{broadcast, mpsc};
17use tokio_tungstenite::tungstenite::protocol::Message;
18use weave_contracts::{EdgeConfig, EdgeToServer, PatchOp, ServerToEdge};
19
20use super::cache;
21use super::device_control::{DeviceControlHook, NoopDeviceControl};
22use super::intent::Intent;
23use super::registry::GlyphRegistry;
24use super::routing::{RoutedIntent, RoutingEngine};
25
26const RECONNECT_INITIAL_DELAY: Duration = Duration::from_secs(2);
27const RECONNECT_MAX_DELAY: Duration = Duration::from_secs(30);
28
29pub struct WsClient {
30 url: String,
31 edge_id: String,
32 version: String,
33 capabilities: Vec<String>,
34 engine: Arc<RoutingEngine>,
35 glyphs: Arc<GlyphRegistry>,
36 cache_path: PathBuf,
37 outbox_rx: mpsc::Receiver<EdgeToServer>,
38 outbox_tx: mpsc::Sender<EdgeToServer>,
39 resync_tx: broadcast::Sender<()>,
40 device_control: Arc<dyn DeviceControlHook>,
41 dispatch_tx: Option<mpsc::Sender<RoutedIntent>>,
48}
49
50impl WsClient {
51 pub fn new(
52 url: String,
53 edge_id: String,
54 version: String,
55 capabilities: Vec<String>,
56 engine: Arc<RoutingEngine>,
57 glyphs: Arc<GlyphRegistry>,
58 ) -> Self {
59 Self::with_device_control(
60 url,
61 edge_id,
62 version,
63 capabilities,
64 engine,
65 glyphs,
66 Arc::new(NoopDeviceControl),
67 )
68 }
69
70 pub fn with_device_control(
74 url: String,
75 edge_id: String,
76 version: String,
77 capabilities: Vec<String>,
78 engine: Arc<RoutingEngine>,
79 glyphs: Arc<GlyphRegistry>,
80 device_control: Arc<dyn DeviceControlHook>,
81 ) -> Self {
82 let cache_path = cache::default_cache_path(&edge_id);
83 let (outbox_tx, outbox_rx) = mpsc::channel(256);
84 let (resync_tx, _) = broadcast::channel(8);
87 Self {
88 url,
89 edge_id,
90 version,
91 capabilities,
92 engine,
93 glyphs,
94 cache_path,
95 outbox_rx,
96 outbox_tx,
97 resync_tx,
98 device_control,
99 dispatch_tx: None,
100 }
101 }
102
103 #[must_use]
110 pub fn with_intent_dispatcher(mut self, tx: mpsc::Sender<RoutedIntent>) -> Self {
111 self.dispatch_tx = Some(tx);
112 self
113 }
114
115 pub fn outbox(&self) -> mpsc::Sender<EdgeToServer> {
118 self.outbox_tx.clone()
119 }
120
121 pub fn resync_sender(&self) -> broadcast::Sender<()> {
130 self.resync_tx.clone()
131 }
132
133 pub async fn prime_from_cache(&self) -> anyhow::Result<()> {
136 if let Some(cfg) = cache::load(&self.cache_path).await? {
137 tracing::info!(
138 mappings = cfg.mappings.len(),
139 glyphs = cfg.glyphs.len(),
140 path = %self.cache_path.display(),
141 "primed routing engine from cache",
142 );
143 self.engine.replace_all(cfg.mappings).await;
144 self.glyphs.replace_all(cfg.glyphs).await;
145 }
146 Ok(())
147 }
148
149 pub async fn run(mut self) {
151 let mut delay = RECONNECT_INITIAL_DELAY;
152 loop {
153 match self.connect_once().await {
154 Ok(_) => {
155 tracing::info!("ws session ended cleanly; reconnecting");
156 delay = RECONNECT_INITIAL_DELAY;
157 }
158 Err(e) => {
159 tracing::warn!(error = %e, delay_secs = delay.as_secs(), "ws session failed");
160 }
161 }
162 tokio::time::sleep(delay).await;
163 delay = (delay * 2).min(RECONNECT_MAX_DELAY);
164 }
165 }
166
167 async fn connect_once(&mut self) -> anyhow::Result<()> {
168 let (ws, _) = tokio_tungstenite::connect_async(&self.url).await?;
169 tracing::info!(url = %self.url, "ws connected");
170 let (mut tx, mut rx) = ws.split();
171
172 let hello = EdgeToServer::Hello {
173 edge_id: self.edge_id.clone(),
174 version: self.version.clone(),
175 capabilities: self.capabilities.clone(),
176 };
177 tx.send(Message::Text(serde_json::to_string(&hello)?))
178 .await?;
179
180 let _ = self.resync_tx.send(());
184
185 loop {
186 tokio::select! {
187 incoming = rx.next() => {
188 let Some(msg) = incoming else { return Ok(()); };
189 let msg = msg?;
190 match msg {
191 Message::Text(t) => self.handle_server_frame(&t).await?,
192 Message::Binary(_) => continue,
193 Message::Ping(p) => tx.send(Message::Pong(p)).await?,
194 Message::Pong(_) => continue,
195 Message::Close(_) => return Ok(()),
196 Message::Frame(_) => continue,
197 }
198 }
199 outbound = self.outbox_rx.recv() => {
200 let Some(frame) = outbound else { return Ok(()); };
201 tx.send(Message::Text(serde_json::to_string(&frame)?)).await?;
202 }
203 }
204 }
205 }
206
207 async fn handle_server_frame(&self, text: &str) -> anyhow::Result<()> {
208 let frame: ServerToEdge = serde_json::from_str(text)?;
209 match frame {
210 ServerToEdge::ConfigFull { config } => {
211 tracing::info!(
212 mappings = config.mappings.len(),
213 glyphs = config.glyphs.len(),
214 edge_id = %config.edge_id,
215 "received config_full",
216 );
217 self.apply_full(&config).await;
218 let _ = cache::save(&self.cache_path, &config).await;
219 }
220 ServerToEdge::ConfigPatch {
221 mapping_id,
222 op,
223 mapping,
224 } => match op {
225 PatchOp::Upsert => {
226 if let Some(m) = mapping {
227 tracing::info!(
228 %mapping_id,
229 device = %m.device_id,
230 service = %m.service_type,
231 "config_patch upsert",
232 );
233 self.engine.upsert_mapping(m).await;
234 self.refresh_cache().await;
235 } else {
236 tracing::warn!(%mapping_id, "config_patch upsert without mapping payload; ignoring");
237 }
238 }
239 PatchOp::Delete => {
240 tracing::info!(%mapping_id, "config_patch delete");
241 self.engine.remove_mapping(&mapping_id).await;
242 self.refresh_cache().await;
243 }
244 },
245 ServerToEdge::TargetSwitch {
246 mapping_id,
247 service_target,
248 } => {
249 tracing::info!(%mapping_id, %service_target, "target_switch");
252 let mut current = self.engine.snapshot().await;
253 if let Some(idx) = current.iter().position(|m| m.mapping_id == mapping_id) {
254 current[idx].service_target = service_target;
255 self.engine.upsert_mapping(current.remove(idx)).await;
256 self.refresh_cache().await;
257 } else {
258 tracing::warn!(%mapping_id, "target_switch for unknown mapping");
259 }
260 }
261 ServerToEdge::GlyphsUpdate { glyphs } => {
262 tracing::info!(count = glyphs.len(), "received glyphs_update");
263 self.glyphs.replace_all(glyphs).await;
264 }
265 ServerToEdge::DisplayGlyph {
266 device_type,
267 device_id,
268 pattern,
269 brightness,
270 timeout_ms,
271 transition,
272 } => {
273 tracing::info!(
274 %device_type,
275 %device_id,
276 "display_glyph",
277 );
278 if let Err(e) = self
279 .device_control
280 .display_glyph(
281 &device_type,
282 &device_id,
283 &pattern,
284 brightness,
285 timeout_ms,
286 transition.as_deref(),
287 )
288 .await
289 {
290 tracing::warn!(
291 error = %e,
292 %device_type,
293 %device_id,
294 "display_glyph failed",
295 );
296 }
297 }
298 ServerToEdge::DeviceConnect {
299 device_type,
300 device_id,
301 } => {
302 tracing::info!(%device_type, %device_id, "device_connect");
303 if let Err(e) = self
304 .device_control
305 .connect_device(&device_type, &device_id)
306 .await
307 {
308 tracing::warn!(
309 error = %e,
310 %device_type,
311 %device_id,
312 "device_connect failed",
313 );
314 }
315 }
316 ServerToEdge::DeviceDisconnect {
317 device_type,
318 device_id,
319 } => {
320 tracing::info!(%device_type, %device_id, "device_disconnect");
321 if let Err(e) = self
322 .device_control
323 .disconnect_device(&device_type, &device_id)
324 .await
325 {
326 tracing::warn!(
327 error = %e,
328 %device_type,
329 %device_id,
330 "device_disconnect failed",
331 );
332 }
333 }
334 ServerToEdge::DispatchIntent {
335 service_type,
336 service_target,
337 intent,
338 params,
339 output_id,
340 } => {
341 let Some(dispatch_tx) = self.dispatch_tx.as_ref() else {
342 tracing::debug!(
343 %service_type,
344 target = %service_target,
345 intent = %intent,
346 "dispatch_intent: no dispatcher wired on this edge; dropping",
347 );
348 return Ok(());
349 };
350 let intent_obj = match Intent::reassemble(&intent, ¶ms) {
351 Ok(i) => i,
352 Err(e) => {
353 tracing::warn!(
354 error = %e,
355 %service_type,
356 target = %service_target,
357 %intent,
358 "dispatch_intent: failed to reassemble intent payload",
359 );
360 return Ok(());
361 }
362 };
363 let _ = output_id; let routed = RoutedIntent {
365 service_type,
366 service_target,
367 intent: intent_obj,
368 };
369 if let Err(e) = dispatch_tx.try_send(routed) {
370 tracing::warn!(error = %e, "dispatch_intent: dispatcher full or closed; dropping");
371 }
372 }
373 ServerToEdge::Ping => {
374 let _ = self.outbox_tx.try_send(EdgeToServer::Pong);
377 }
378 }
379 Ok(())
380 }
381
382 async fn apply_full(&self, config: &EdgeConfig) {
383 self.engine.replace_all(config.mappings.clone()).await;
384 self.glyphs.replace_all(config.glyphs.clone()).await;
385 }
386
387 async fn refresh_cache(&self) {
391 let mappings = self.engine.snapshot().await;
392 let edge_id = self.edge_id.clone();
396 let glyphs = match cache::load(&self.cache_path).await {
397 Ok(Some(cfg)) => cfg.glyphs,
398 _ => Vec::new(),
399 };
400 let cfg = EdgeConfig {
401 edge_id,
402 mappings,
403 glyphs,
404 };
405 if let Err(e) = cache::save(&self.cache_path, &cfg).await {
406 tracing::warn!(error = %e, "failed to persist cache after patch");
407 }
408 }
409}