Skip to main content

edge_core/
ws_client.rs

1//! WebSocket client to `weave-server` `/ws/edge`.
2//!
3//! Runs a long-lived reconnect loop: after connect, sends a `Hello` frame,
4//! reads frames from the server (updating the `RoutingEngine` and local cache
5//! on `ConfigFull`/`ConfigPatch`), and forwards outbound `EdgeToServer`
6//! frames produced by adapters.
7//!
8//! On unreachable server, the agent loads the cached config so local routing
9//! keeps working; the reconnect loop retries in the background.
10
11use std::path::PathBuf;
12use std::sync::Arc;
13use std::time::Duration;
14
15use futures_util::{SinkExt, StreamExt};
16use tokio::sync::{broadcast, mpsc};
17use tokio_tungstenite::tungstenite::protocol::Message;
18use weave_contracts::{EdgeConfig, EdgeToServer, PatchOp, ServerToEdge};
19
20use super::cache;
21use super::device_control::{DeviceControlHook, NoopDeviceControl};
22use super::intent::Intent;
23use super::registry::GlyphRegistry;
24use super::routing::{RoutedIntent, RoutingEngine};
25
26const RECONNECT_INITIAL_DELAY: Duration = Duration::from_secs(2);
27const RECONNECT_MAX_DELAY: Duration = Duration::from_secs(30);
28
29pub struct WsClient {
30    url: String,
31    edge_id: String,
32    version: String,
33    capabilities: Vec<String>,
34    engine: Arc<RoutingEngine>,
35    glyphs: Arc<GlyphRegistry>,
36    cache_path: PathBuf,
37    outbox_rx: mpsc::Receiver<EdgeToServer>,
38    outbox_tx: mpsc::Sender<EdgeToServer>,
39    resync_tx: broadcast::Sender<()>,
40    device_control: Arc<dyn DeviceControlHook>,
41    /// Optional sender into the edge-agent's dispatcher queue. Set via
42    /// `with_intent_dispatcher` when the host wants to handle
43    /// `ServerToEdge::DispatchIntent` (cross-edge intent forwarding).
44    /// `None` causes incoming forwarded intents to be logged and
45    /// dropped — appropriate for hosts that don't have any service
46    /// adapter the server might forward into.
47    dispatch_tx: Option<mpsc::Sender<RoutedIntent>>,
48}
49
50impl WsClient {
51    pub fn new(
52        url: String,
53        edge_id: String,
54        version: String,
55        capabilities: Vec<String>,
56        engine: Arc<RoutingEngine>,
57        glyphs: Arc<GlyphRegistry>,
58    ) -> Self {
59        Self::with_device_control(
60            url,
61            edge_id,
62            version,
63            capabilities,
64            engine,
65            glyphs,
66            Arc::new(NoopDeviceControl),
67        )
68    }
69
70    /// Construct a `WsClient` with a host-supplied `DeviceControlHook`. The
71    /// edge-agent binary wires this to its per-device map so weave-server
72    /// can drive Connect / Disconnect / Test-LED via the WS protocol.
73    pub fn with_device_control(
74        url: String,
75        edge_id: String,
76        version: String,
77        capabilities: Vec<String>,
78        engine: Arc<RoutingEngine>,
79        glyphs: Arc<GlyphRegistry>,
80        device_control: Arc<dyn DeviceControlHook>,
81    ) -> Self {
82        let cache_path = cache::default_cache_path(&edge_id);
83        let (outbox_tx, outbox_rx) = mpsc::channel(256);
84        // Small buffer is fine — subscribers only care about the latest
85        // reconnect event; lagged ones can be dropped silently.
86        let (resync_tx, _) = broadcast::channel(8);
87        Self {
88            url,
89            edge_id,
90            version,
91            capabilities,
92            engine,
93            glyphs,
94            cache_path,
95            outbox_rx,
96            outbox_tx,
97            resync_tx,
98            device_control,
99            dispatch_tx: None,
100        }
101    }
102
103    /// Wire the edge-agent's dispatcher channel so this WS client can
104    /// hand `ServerToEdge::DispatchIntent` payloads to the same
105    /// `(service_type, target)` worker pool that locally-routed intents
106    /// flow through. Without this hook, forwarded intents are logged
107    /// and dropped — fine for hosts whose adapters never receive
108    /// cross-edge work.
109    #[must_use]
110    pub fn with_intent_dispatcher(mut self, tx: mpsc::Sender<RoutedIntent>) -> Self {
111        self.dispatch_tx = Some(tx);
112        self
113    }
114
115    /// Get a sender for outbound `EdgeToServer` frames. Clone as many times
116    /// as needed; adapters publish state updates via this channel.
117    pub fn outbox(&self) -> mpsc::Sender<EdgeToServer> {
118        self.outbox_tx.clone()
119    }
120
121    /// Clone the `ws/edge` (re)connect broadcaster. Fires once per
122    /// successful connect + Hello exchange. State-pumps subscribe to this
123    /// and replay the most recent frame per
124    /// (service_type, target, property, output_id) key so weave-server
125    /// recovers its full snapshot after a restart — otherwise idle zones
126    /// / lights that haven't changed since the last connect disappear
127    /// from the UI because the adapter's source-side dedup suppresses
128    /// re-sends.
129    pub fn resync_sender(&self) -> broadcast::Sender<()> {
130        self.resync_tx.clone()
131    }
132
133    /// Populate the routing engine + glyph registry from the local cache, if
134    /// one exists. Call this once at startup before entering `run()`.
135    pub async fn prime_from_cache(&self) -> anyhow::Result<()> {
136        if let Some(cfg) = cache::load(&self.cache_path).await? {
137            tracing::info!(
138                mappings = cfg.mappings.len(),
139                glyphs = cfg.glyphs.len(),
140                path = %self.cache_path.display(),
141                "primed routing engine from cache",
142            );
143            self.engine.replace_all(cfg.mappings).await;
144            self.glyphs.replace_all(cfg.glyphs).await;
145        }
146        Ok(())
147    }
148
149    /// Run the reconnect loop. Never returns under normal operation.
150    pub async fn run(mut self) {
151        let mut delay = RECONNECT_INITIAL_DELAY;
152        loop {
153            match self.connect_once().await {
154                Ok(_) => {
155                    tracing::info!("ws session ended cleanly; reconnecting");
156                    delay = RECONNECT_INITIAL_DELAY;
157                }
158                Err(e) => {
159                    tracing::warn!(error = %e, delay_secs = delay.as_secs(), "ws session failed");
160                }
161            }
162            tokio::time::sleep(delay).await;
163            delay = (delay * 2).min(RECONNECT_MAX_DELAY);
164        }
165    }
166
167    async fn connect_once(&mut self) -> anyhow::Result<()> {
168        let (ws, _) = tokio_tungstenite::connect_async(&self.url).await?;
169        tracing::info!(url = %self.url, "ws connected");
170        let (mut tx, mut rx) = ws.split();
171
172        let hello = EdgeToServer::Hello {
173            edge_id: self.edge_id.clone(),
174            version: self.version.clone(),
175            capabilities: self.capabilities.clone(),
176        };
177        tx.send(Message::Text(serde_json::to_string(&hello)?))
178            .await?;
179
180        // Fire after the Hello is on the wire so subscribers replay only
181        // once the server is ready to accept frames. `Err` here just means
182        // no live subscribers yet, which is fine.
183        let _ = self.resync_tx.send(());
184
185        loop {
186            tokio::select! {
187                incoming = rx.next() => {
188                    let Some(msg) = incoming else { return Ok(()); };
189                    let msg = msg?;
190                    match msg {
191                        Message::Text(t) => self.handle_server_frame(&t).await?,
192                        Message::Binary(_) => continue,
193                        Message::Ping(p) => tx.send(Message::Pong(p)).await?,
194                        Message::Pong(_) => continue,
195                        Message::Close(_) => return Ok(()),
196                        Message::Frame(_) => continue,
197                    }
198                }
199                outbound = self.outbox_rx.recv() => {
200                    let Some(frame) = outbound else { return Ok(()); };
201                    tx.send(Message::Text(serde_json::to_string(&frame)?)).await?;
202                }
203            }
204        }
205    }
206
207    async fn handle_server_frame(&self, text: &str) -> anyhow::Result<()> {
208        let frame: ServerToEdge = serde_json::from_str(text)?;
209        match frame {
210            ServerToEdge::ConfigFull { config } => {
211                tracing::info!(
212                    mappings = config.mappings.len(),
213                    glyphs = config.glyphs.len(),
214                    edge_id = %config.edge_id,
215                    "received config_full",
216                );
217                self.apply_full(&config).await;
218                let _ = cache::save(&self.cache_path, &config).await;
219            }
220            ServerToEdge::ConfigPatch {
221                mapping_id,
222                op,
223                mapping,
224            } => match op {
225                PatchOp::Upsert => {
226                    if let Some(m) = mapping {
227                        tracing::info!(
228                            %mapping_id,
229                            device = %m.device_id,
230                            service = %m.service_type,
231                            "config_patch upsert",
232                        );
233                        self.engine.upsert_mapping(m).await;
234                        self.refresh_cache().await;
235                    } else {
236                        tracing::warn!(%mapping_id, "config_patch upsert without mapping payload; ignoring");
237                    }
238                }
239                PatchOp::Delete => {
240                    tracing::info!(%mapping_id, "config_patch delete");
241                    self.engine.remove_mapping(&mapping_id).await;
242                    self.refresh_cache().await;
243                }
244            },
245            ServerToEdge::TargetSwitch {
246                mapping_id,
247                service_target,
248            } => {
249                // Express as an upsert of the current mapping with the new
250                // service_target. Cheap since we already have it locally.
251                tracing::info!(%mapping_id, %service_target, "target_switch");
252                let mut current = self.engine.snapshot().await;
253                if let Some(idx) = current.iter().position(|m| m.mapping_id == mapping_id) {
254                    current[idx].service_target = service_target;
255                    self.engine.upsert_mapping(current.remove(idx)).await;
256                    self.refresh_cache().await;
257                } else {
258                    tracing::warn!(%mapping_id, "target_switch for unknown mapping");
259                }
260            }
261            ServerToEdge::GlyphsUpdate { glyphs } => {
262                tracing::info!(count = glyphs.len(), "received glyphs_update");
263                self.glyphs.replace_all(glyphs).await;
264            }
265            ServerToEdge::DisplayGlyph {
266                device_type,
267                device_id,
268                pattern,
269                brightness,
270                timeout_ms,
271                transition,
272            } => {
273                tracing::info!(
274                    %device_type,
275                    %device_id,
276                    "display_glyph",
277                );
278                if let Err(e) = self
279                    .device_control
280                    .display_glyph(
281                        &device_type,
282                        &device_id,
283                        &pattern,
284                        brightness,
285                        timeout_ms,
286                        transition.as_deref(),
287                    )
288                    .await
289                {
290                    tracing::warn!(
291                        error = %e,
292                        %device_type,
293                        %device_id,
294                        "display_glyph failed",
295                    );
296                }
297            }
298            ServerToEdge::DeviceConnect {
299                device_type,
300                device_id,
301            } => {
302                tracing::info!(%device_type, %device_id, "device_connect");
303                if let Err(e) = self
304                    .device_control
305                    .connect_device(&device_type, &device_id)
306                    .await
307                {
308                    tracing::warn!(
309                        error = %e,
310                        %device_type,
311                        %device_id,
312                        "device_connect failed",
313                    );
314                }
315            }
316            ServerToEdge::DeviceDisconnect {
317                device_type,
318                device_id,
319            } => {
320                tracing::info!(%device_type, %device_id, "device_disconnect");
321                if let Err(e) = self
322                    .device_control
323                    .disconnect_device(&device_type, &device_id)
324                    .await
325                {
326                    tracing::warn!(
327                        error = %e,
328                        %device_type,
329                        %device_id,
330                        "device_disconnect failed",
331                    );
332                }
333            }
334            ServerToEdge::DispatchIntent {
335                service_type,
336                service_target,
337                intent,
338                params,
339                output_id,
340            } => {
341                let Some(dispatch_tx) = self.dispatch_tx.as_ref() else {
342                    tracing::debug!(
343                        %service_type,
344                        target = %service_target,
345                        intent = %intent,
346                        "dispatch_intent: no dispatcher wired on this edge; dropping",
347                    );
348                    return Ok(());
349                };
350                let intent_obj = match Intent::reassemble(&intent, &params) {
351                    Ok(i) => i,
352                    Err(e) => {
353                        tracing::warn!(
354                            error = %e,
355                            %service_type,
356                            target = %service_target,
357                            %intent,
358                            "dispatch_intent: failed to reassemble intent payload",
359                        );
360                        return Ok(());
361                    }
362                };
363                let _ = output_id; // reserved for future zone-specific routing
364                let routed = RoutedIntent {
365                    service_type,
366                    service_target,
367                    intent: intent_obj,
368                };
369                if let Err(e) = dispatch_tx.try_send(routed) {
370                    tracing::warn!(error = %e, "dispatch_intent: dispatcher full or closed; dropping");
371                }
372            }
373            ServerToEdge::Ping => {
374                // Pong is handled via the outbox channel to avoid tx contention here;
375                // fire-and-forget.
376                let _ = self.outbox_tx.try_send(EdgeToServer::Pong);
377            }
378            ServerToEdge::DeviceCyclePatch { cycle, op } => match op {
379                weave_contracts::PatchOp::Upsert => {
380                    tracing::info!(
381                        device_type = %cycle.device_type,
382                        device_id = %cycle.device_id,
383                        active = ?cycle.active_mapping_id,
384                        gesture = ?cycle.cycle_gesture,
385                        "device_cycle_patch upsert"
386                    );
387                    self.engine.upsert_cycle(cycle).await;
388                    self.refresh_cache().await;
389                }
390                weave_contracts::PatchOp::Delete => {
391                    tracing::info!(
392                        device_type = %cycle.device_type,
393                        device_id = %cycle.device_id,
394                        "device_cycle_patch delete"
395                    );
396                    self.engine
397                        .remove_cycle(&cycle.device_type, &cycle.device_id)
398                        .await;
399                    self.refresh_cache().await;
400                }
401            },
402            ServerToEdge::SwitchActiveConnection {
403                device_type,
404                device_id,
405                active_mapping_id,
406            } => {
407                tracing::info!(
408                    %device_type,
409                    %device_id,
410                    %active_mapping_id,
411                    "switch_active_connection from server"
412                );
413                let applied = self
414                    .engine
415                    .set_cycle_active(&device_type, &device_id, active_mapping_id)
416                    .await;
417                if !applied {
418                    tracing::warn!(
419                        %device_type, %device_id, %active_mapping_id,
420                        "switch_active_connection: cycle missing or active not in mapping_ids — ignoring"
421                    );
422                }
423                self.refresh_cache().await;
424            }
425            ServerToEdge::ServiceState {
426                edge_id: source_edge,
427                service_type,
428                target,
429                property,
430                ..
431            } => {
432                // Cross-edge feedback echo from a peer edge. Linux/Mac
433                // edge-agent's feedback pumps are wired per-adapter
434                // (each adapter publishes into its own broadcast
435                // channel), so plumbing imported state into a feedback
436                // pump here would require a refactor that's out of
437                // scope for the iOS-first echo work. Log for diagnosis;
438                // wire up in a follow-up.
439                tracing::debug!(
440                    %source_edge,
441                    %service_type,
442                    %target,
443                    %property,
444                    "service_state echo from peer edge (Linux/Mac feedback echo deferred)"
445                );
446            }
447        }
448        Ok(())
449    }
450
451    async fn apply_full(&self, config: &EdgeConfig) {
452        self.engine.replace_all(config.mappings.clone()).await;
453        self.engine
454            .replace_cycles(config.device_cycles.clone())
455            .await;
456        self.glyphs.replace_all(config.glyphs.clone()).await;
457    }
458
459    /// Persist a fresh cache after an incremental patch so the agent
460    /// comes back up with the latest config even if the server is
461    /// unreachable on the next boot.
462    async fn refresh_cache(&self) {
463        let mappings = self.engine.snapshot().await;
464        let device_cycles = self.engine.cycles_snapshot().await;
465        // The cache stores an EdgeConfig; we need edge_id + current glyphs.
466        // Glyphs aren't kept in a cheap-to-read form on the engine, so
467        // derive from the last saved cache if present.
468        let edge_id = self.edge_id.clone();
469        let glyphs = match cache::load(&self.cache_path).await {
470            Ok(Some(cfg)) => cfg.glyphs,
471            _ => Vec::new(),
472        };
473        let cfg = EdgeConfig {
474            edge_id,
475            mappings,
476            glyphs,
477            device_cycles,
478        };
479        if let Err(e) = cache::save(&self.cache_path, &cfg).await {
480            tracing::warn!(error = %e, "failed to persist cache after patch");
481        }
482    }
483}