1use std::path::PathBuf;
12use std::sync::Arc;
13use std::time::Duration;
14
15use futures_util::{SinkExt, StreamExt};
16use tokio::sync::{broadcast, mpsc};
17use tokio_tungstenite::tungstenite::protocol::Message;
18use weave_contracts::{EdgeConfig, EdgeToServer, PatchOp, ServerToEdge};
19
20use super::cache;
21use super::device_control::{DeviceControlHook, NoopDeviceControl};
22use super::intent::Intent;
23use super::registry::GlyphRegistry;
24use super::routing::{RoutedIntent, RoutingEngine};
25
26const RECONNECT_INITIAL_DELAY: Duration = Duration::from_secs(2);
27const RECONNECT_MAX_DELAY: Duration = Duration::from_secs(30);
28
29pub struct WsClient {
30 url: String,
31 edge_id: String,
32 version: String,
33 capabilities: Vec<String>,
34 engine: Arc<RoutingEngine>,
35 glyphs: Arc<GlyphRegistry>,
36 cache_path: PathBuf,
37 outbox_rx: mpsc::Receiver<EdgeToServer>,
38 outbox_tx: mpsc::Sender<EdgeToServer>,
39 resync_tx: broadcast::Sender<()>,
40 device_control: Arc<dyn DeviceControlHook>,
41 dispatch_tx: Option<mpsc::Sender<RoutedIntent>>,
48}
49
50impl WsClient {
51 pub fn new(
52 url: String,
53 edge_id: String,
54 version: String,
55 capabilities: Vec<String>,
56 engine: Arc<RoutingEngine>,
57 glyphs: Arc<GlyphRegistry>,
58 ) -> Self {
59 Self::with_device_control(
60 url,
61 edge_id,
62 version,
63 capabilities,
64 engine,
65 glyphs,
66 Arc::new(NoopDeviceControl),
67 )
68 }
69
70 pub fn with_device_control(
74 url: String,
75 edge_id: String,
76 version: String,
77 capabilities: Vec<String>,
78 engine: Arc<RoutingEngine>,
79 glyphs: Arc<GlyphRegistry>,
80 device_control: Arc<dyn DeviceControlHook>,
81 ) -> Self {
82 let cache_path = cache::default_cache_path(&edge_id);
83 let (outbox_tx, outbox_rx) = mpsc::channel(256);
84 let (resync_tx, _) = broadcast::channel(8);
87 Self {
88 url,
89 edge_id,
90 version,
91 capabilities,
92 engine,
93 glyphs,
94 cache_path,
95 outbox_rx,
96 outbox_tx,
97 resync_tx,
98 device_control,
99 dispatch_tx: None,
100 }
101 }
102
103 #[must_use]
110 pub fn with_intent_dispatcher(mut self, tx: mpsc::Sender<RoutedIntent>) -> Self {
111 self.dispatch_tx = Some(tx);
112 self
113 }
114
115 pub fn outbox(&self) -> mpsc::Sender<EdgeToServer> {
118 self.outbox_tx.clone()
119 }
120
121 pub fn resync_sender(&self) -> broadcast::Sender<()> {
130 self.resync_tx.clone()
131 }
132
133 pub async fn prime_from_cache(&self) -> anyhow::Result<()> {
136 if let Some(cfg) = cache::load(&self.cache_path).await? {
137 tracing::info!(
138 mappings = cfg.mappings.len(),
139 glyphs = cfg.glyphs.len(),
140 path = %self.cache_path.display(),
141 "primed routing engine from cache",
142 );
143 self.engine.replace_all(cfg.mappings).await;
144 self.glyphs.replace_all(cfg.glyphs).await;
145 }
146 Ok(())
147 }
148
149 pub async fn run(mut self) {
151 let mut delay = RECONNECT_INITIAL_DELAY;
152 loop {
153 match self.connect_once().await {
154 Ok(_) => {
155 tracing::info!("ws session ended cleanly; reconnecting");
156 delay = RECONNECT_INITIAL_DELAY;
157 }
158 Err(e) => {
159 tracing::warn!(error = %e, delay_secs = delay.as_secs(), "ws session failed");
160 }
161 }
162 tokio::time::sleep(delay).await;
163 delay = (delay * 2).min(RECONNECT_MAX_DELAY);
164 }
165 }
166
167 async fn connect_once(&mut self) -> anyhow::Result<()> {
168 let (ws, _) = tokio_tungstenite::connect_async(&self.url).await?;
169 tracing::info!(url = %self.url, "ws connected");
170 let (mut tx, mut rx) = ws.split();
171
172 let hello = EdgeToServer::Hello {
173 edge_id: self.edge_id.clone(),
174 version: self.version.clone(),
175 capabilities: self.capabilities.clone(),
176 };
177 tx.send(Message::Text(serde_json::to_string(&hello)?))
178 .await?;
179
180 let _ = self.resync_tx.send(());
184
185 loop {
186 tokio::select! {
187 incoming = rx.next() => {
188 let Some(msg) = incoming else { return Ok(()); };
189 let msg = msg?;
190 match msg {
191 Message::Text(t) => self.handle_server_frame(&t).await?,
192 Message::Binary(_) => continue,
193 Message::Ping(p) => tx.send(Message::Pong(p)).await?,
194 Message::Pong(_) => continue,
195 Message::Close(_) => return Ok(()),
196 Message::Frame(_) => continue,
197 }
198 }
199 outbound = self.outbox_rx.recv() => {
200 let Some(frame) = outbound else { return Ok(()); };
201 tx.send(Message::Text(serde_json::to_string(&frame)?)).await?;
202 }
203 }
204 }
205 }
206
207 async fn handle_server_frame(&self, text: &str) -> anyhow::Result<()> {
208 let frame: ServerToEdge = serde_json::from_str(text)?;
209 match frame {
210 ServerToEdge::ConfigFull { config } => {
211 tracing::info!(
212 mappings = config.mappings.len(),
213 glyphs = config.glyphs.len(),
214 edge_id = %config.edge_id,
215 "received config_full",
216 );
217 self.apply_full(&config).await;
218 let _ = cache::save(&self.cache_path, &config).await;
219 }
220 ServerToEdge::ConfigPatch {
221 mapping_id,
222 op,
223 mapping,
224 } => match op {
225 PatchOp::Upsert => {
226 if let Some(m) = mapping {
227 tracing::info!(
228 %mapping_id,
229 device = %m.device_id,
230 service = %m.service_type,
231 "config_patch upsert",
232 );
233 self.engine.upsert_mapping(m).await;
234 self.refresh_cache().await;
235 } else {
236 tracing::warn!(%mapping_id, "config_patch upsert without mapping payload; ignoring");
237 }
238 }
239 PatchOp::Delete => {
240 tracing::info!(%mapping_id, "config_patch delete");
241 self.engine.remove_mapping(&mapping_id).await;
242 self.refresh_cache().await;
243 }
244 },
245 ServerToEdge::TargetSwitch {
246 mapping_id,
247 service_target,
248 } => {
249 tracing::info!(%mapping_id, %service_target, "target_switch");
252 let mut current = self.engine.snapshot().await;
253 if let Some(idx) = current.iter().position(|m| m.mapping_id == mapping_id) {
254 current[idx].service_target = service_target;
255 self.engine.upsert_mapping(current.remove(idx)).await;
256 self.refresh_cache().await;
257 } else {
258 tracing::warn!(%mapping_id, "target_switch for unknown mapping");
259 }
260 }
261 ServerToEdge::GlyphsUpdate { glyphs } => {
262 tracing::info!(count = glyphs.len(), "received glyphs_update");
263 self.glyphs.replace_all(glyphs).await;
264 }
265 ServerToEdge::DisplayGlyph {
266 device_type,
267 device_id,
268 pattern,
269 brightness,
270 timeout_ms,
271 transition,
272 } => {
273 tracing::info!(
274 %device_type,
275 %device_id,
276 "display_glyph",
277 );
278 if let Err(e) = self
279 .device_control
280 .display_glyph(
281 &device_type,
282 &device_id,
283 &pattern,
284 brightness,
285 timeout_ms,
286 transition.as_deref(),
287 )
288 .await
289 {
290 tracing::warn!(
291 error = %e,
292 %device_type,
293 %device_id,
294 "display_glyph failed",
295 );
296 }
297 }
298 ServerToEdge::DeviceConnect {
299 device_type,
300 device_id,
301 } => {
302 tracing::info!(%device_type, %device_id, "device_connect");
303 if let Err(e) = self
304 .device_control
305 .connect_device(&device_type, &device_id)
306 .await
307 {
308 tracing::warn!(
309 error = %e,
310 %device_type,
311 %device_id,
312 "device_connect failed",
313 );
314 }
315 }
316 ServerToEdge::DeviceDisconnect {
317 device_type,
318 device_id,
319 } => {
320 tracing::info!(%device_type, %device_id, "device_disconnect");
321 if let Err(e) = self
322 .device_control
323 .disconnect_device(&device_type, &device_id)
324 .await
325 {
326 tracing::warn!(
327 error = %e,
328 %device_type,
329 %device_id,
330 "device_disconnect failed",
331 );
332 }
333 }
334 ServerToEdge::DispatchIntent {
335 service_type,
336 service_target,
337 intent,
338 params,
339 output_id,
340 } => {
341 let Some(dispatch_tx) = self.dispatch_tx.as_ref() else {
342 tracing::debug!(
343 %service_type,
344 target = %service_target,
345 intent = %intent,
346 "dispatch_intent: no dispatcher wired on this edge; dropping",
347 );
348 return Ok(());
349 };
350 let intent_obj = match Intent::reassemble(&intent, ¶ms) {
351 Ok(i) => i,
352 Err(e) => {
353 tracing::warn!(
354 error = %e,
355 %service_type,
356 target = %service_target,
357 %intent,
358 "dispatch_intent: failed to reassemble intent payload",
359 );
360 return Ok(());
361 }
362 };
363 let _ = output_id; let routed = RoutedIntent {
365 service_type,
366 service_target,
367 intent: intent_obj,
368 };
369 if let Err(e) = dispatch_tx.try_send(routed) {
370 tracing::warn!(error = %e, "dispatch_intent: dispatcher full or closed; dropping");
371 }
372 }
373 ServerToEdge::Ping => {
374 let _ = self.outbox_tx.try_send(EdgeToServer::Pong);
377 }
378 ServerToEdge::DeviceCyclePatch { cycle, op } => match op {
379 weave_contracts::PatchOp::Upsert => {
380 tracing::info!(
381 device_type = %cycle.device_type,
382 device_id = %cycle.device_id,
383 active = ?cycle.active_mapping_id,
384 gesture = ?cycle.cycle_gesture,
385 "device_cycle_patch upsert"
386 );
387 self.engine.upsert_cycle(cycle).await;
388 self.refresh_cache().await;
389 }
390 weave_contracts::PatchOp::Delete => {
391 tracing::info!(
392 device_type = %cycle.device_type,
393 device_id = %cycle.device_id,
394 "device_cycle_patch delete"
395 );
396 self.engine
397 .remove_cycle(&cycle.device_type, &cycle.device_id)
398 .await;
399 self.refresh_cache().await;
400 }
401 },
402 ServerToEdge::SwitchActiveConnection {
403 device_type,
404 device_id,
405 active_mapping_id,
406 } => {
407 tracing::info!(
408 %device_type,
409 %device_id,
410 %active_mapping_id,
411 "switch_active_connection from server"
412 );
413 let applied = self
414 .engine
415 .set_cycle_active(&device_type, &device_id, active_mapping_id)
416 .await;
417 if !applied {
418 tracing::warn!(
419 %device_type, %device_id, %active_mapping_id,
420 "switch_active_connection: cycle missing or active not in mapping_ids — ignoring"
421 );
422 }
423 self.refresh_cache().await;
424 }
425 ServerToEdge::ServiceState {
426 edge_id: source_edge,
427 service_type,
428 target,
429 property,
430 ..
431 } => {
432 tracing::debug!(
440 %source_edge,
441 %service_type,
442 %target,
443 %property,
444 "service_state echo from peer edge (Linux/Mac feedback echo deferred)"
445 );
446 }
447 }
448 Ok(())
449 }
450
451 async fn apply_full(&self, config: &EdgeConfig) {
452 self.engine.replace_all(config.mappings.clone()).await;
453 self.engine
454 .replace_cycles(config.device_cycles.clone())
455 .await;
456 self.glyphs.replace_all(config.glyphs.clone()).await;
457 }
458
459 async fn refresh_cache(&self) {
463 let mappings = self.engine.snapshot().await;
464 let device_cycles = self.engine.cycles_snapshot().await;
465 let edge_id = self.edge_id.clone();
469 let glyphs = match cache::load(&self.cache_path).await {
470 Ok(Some(cfg)) => cfg.glyphs,
471 _ => Vec::new(),
472 };
473 let cfg = EdgeConfig {
474 edge_id,
475 mappings,
476 glyphs,
477 device_cycles,
478 };
479 if let Err(e) = cache::save(&self.cache_path, &cfg).await {
480 tracing::warn!(error = %e, "failed to persist cache after patch");
481 }
482 }
483}