Skip to main content

edge_core/
ws_client.rs

1//! WebSocket client to `weave-server` `/ws/edge`.
2//!
3//! Runs a long-lived reconnect loop: after connect, sends a `Hello` frame,
4//! reads frames from the server (updating the `RoutingEngine` and local cache
5//! on `ConfigFull`/`ConfigPatch`), and forwards outbound `EdgeToServer`
6//! frames produced by adapters.
7//!
8//! On unreachable server, the agent loads the cached config so local routing
9//! keeps working; the reconnect loop retries in the background.
10
11use std::path::PathBuf;
12use std::sync::Arc;
13use std::time::Duration;
14
15use futures_util::{SinkExt, StreamExt};
16use tokio::sync::{broadcast, mpsc};
17use tokio_tungstenite::tungstenite::protocol::Message;
18use weave_contracts::{EdgeConfig, EdgeToServer, PatchOp, ServerToEdge};
19
20use super::cache;
21use super::device_control::{DeviceControlHook, NoopDeviceControl};
22use super::registry::GlyphRegistry;
23use super::routing::RoutingEngine;
24
25const RECONNECT_INITIAL_DELAY: Duration = Duration::from_secs(2);
26const RECONNECT_MAX_DELAY: Duration = Duration::from_secs(30);
27
28pub struct WsClient {
29    url: String,
30    edge_id: String,
31    version: String,
32    capabilities: Vec<String>,
33    engine: Arc<RoutingEngine>,
34    glyphs: Arc<GlyphRegistry>,
35    cache_path: PathBuf,
36    outbox_rx: mpsc::Receiver<EdgeToServer>,
37    outbox_tx: mpsc::Sender<EdgeToServer>,
38    resync_tx: broadcast::Sender<()>,
39    device_control: Arc<dyn DeviceControlHook>,
40}
41
42impl WsClient {
43    pub fn new(
44        url: String,
45        edge_id: String,
46        version: String,
47        capabilities: Vec<String>,
48        engine: Arc<RoutingEngine>,
49        glyphs: Arc<GlyphRegistry>,
50    ) -> Self {
51        Self::with_device_control(
52            url,
53            edge_id,
54            version,
55            capabilities,
56            engine,
57            glyphs,
58            Arc::new(NoopDeviceControl),
59        )
60    }
61
62    /// Construct a `WsClient` with a host-supplied `DeviceControlHook`. The
63    /// edge-agent binary wires this to its per-device map so weave-server
64    /// can drive Connect / Disconnect / Test-LED via the WS protocol.
65    pub fn with_device_control(
66        url: String,
67        edge_id: String,
68        version: String,
69        capabilities: Vec<String>,
70        engine: Arc<RoutingEngine>,
71        glyphs: Arc<GlyphRegistry>,
72        device_control: Arc<dyn DeviceControlHook>,
73    ) -> Self {
74        let cache_path = cache::default_cache_path(&edge_id);
75        let (outbox_tx, outbox_rx) = mpsc::channel(256);
76        // Small buffer is fine — subscribers only care about the latest
77        // reconnect event; lagged ones can be dropped silently.
78        let (resync_tx, _) = broadcast::channel(8);
79        Self {
80            url,
81            edge_id,
82            version,
83            capabilities,
84            engine,
85            glyphs,
86            cache_path,
87            outbox_rx,
88            outbox_tx,
89            resync_tx,
90            device_control,
91        }
92    }
93
94    /// Get a sender for outbound `EdgeToServer` frames. Clone as many times
95    /// as needed; adapters publish state updates via this channel.
96    pub fn outbox(&self) -> mpsc::Sender<EdgeToServer> {
97        self.outbox_tx.clone()
98    }
99
100    /// Clone the `ws/edge` (re)connect broadcaster. Fires once per
101    /// successful connect + Hello exchange. State-pumps subscribe to this
102    /// and replay the most recent frame per
103    /// (service_type, target, property, output_id) key so weave-server
104    /// recovers its full snapshot after a restart — otherwise idle zones
105    /// / lights that haven't changed since the last connect disappear
106    /// from the UI because the adapter's source-side dedup suppresses
107    /// re-sends.
108    pub fn resync_sender(&self) -> broadcast::Sender<()> {
109        self.resync_tx.clone()
110    }
111
112    /// Populate the routing engine + glyph registry from the local cache, if
113    /// one exists. Call this once at startup before entering `run()`.
114    pub async fn prime_from_cache(&self) -> anyhow::Result<()> {
115        if let Some(cfg) = cache::load(&self.cache_path).await? {
116            tracing::info!(
117                mappings = cfg.mappings.len(),
118                glyphs = cfg.glyphs.len(),
119                path = %self.cache_path.display(),
120                "primed routing engine from cache",
121            );
122            self.engine.replace_all(cfg.mappings).await;
123            self.glyphs.replace_all(cfg.glyphs).await;
124        }
125        Ok(())
126    }
127
128    /// Run the reconnect loop. Never returns under normal operation.
129    pub async fn run(mut self) {
130        let mut delay = RECONNECT_INITIAL_DELAY;
131        loop {
132            match self.connect_once().await {
133                Ok(_) => {
134                    tracing::info!("ws session ended cleanly; reconnecting");
135                    delay = RECONNECT_INITIAL_DELAY;
136                }
137                Err(e) => {
138                    tracing::warn!(error = %e, delay_secs = delay.as_secs(), "ws session failed");
139                }
140            }
141            tokio::time::sleep(delay).await;
142            delay = (delay * 2).min(RECONNECT_MAX_DELAY);
143        }
144    }
145
146    async fn connect_once(&mut self) -> anyhow::Result<()> {
147        let (ws, _) = tokio_tungstenite::connect_async(&self.url).await?;
148        tracing::info!(url = %self.url, "ws connected");
149        let (mut tx, mut rx) = ws.split();
150
151        let hello = EdgeToServer::Hello {
152            edge_id: self.edge_id.clone(),
153            version: self.version.clone(),
154            capabilities: self.capabilities.clone(),
155        };
156        tx.send(Message::Text(serde_json::to_string(&hello)?))
157            .await?;
158
159        // Fire after the Hello is on the wire so subscribers replay only
160        // once the server is ready to accept frames. `Err` here just means
161        // no live subscribers yet, which is fine.
162        let _ = self.resync_tx.send(());
163
164        loop {
165            tokio::select! {
166                incoming = rx.next() => {
167                    let Some(msg) = incoming else { return Ok(()); };
168                    let msg = msg?;
169                    match msg {
170                        Message::Text(t) => self.handle_server_frame(&t).await?,
171                        Message::Binary(_) => continue,
172                        Message::Ping(p) => tx.send(Message::Pong(p)).await?,
173                        Message::Pong(_) => continue,
174                        Message::Close(_) => return Ok(()),
175                        Message::Frame(_) => continue,
176                    }
177                }
178                outbound = self.outbox_rx.recv() => {
179                    let Some(frame) = outbound else { return Ok(()); };
180                    tx.send(Message::Text(serde_json::to_string(&frame)?)).await?;
181                }
182            }
183        }
184    }
185
186    async fn handle_server_frame(&self, text: &str) -> anyhow::Result<()> {
187        let frame: ServerToEdge = serde_json::from_str(text)?;
188        match frame {
189            ServerToEdge::ConfigFull { config } => {
190                tracing::info!(
191                    mappings = config.mappings.len(),
192                    glyphs = config.glyphs.len(),
193                    edge_id = %config.edge_id,
194                    "received config_full",
195                );
196                self.apply_full(&config).await;
197                let _ = cache::save(&self.cache_path, &config).await;
198            }
199            ServerToEdge::ConfigPatch {
200                mapping_id,
201                op,
202                mapping,
203            } => match op {
204                PatchOp::Upsert => {
205                    if let Some(m) = mapping {
206                        tracing::info!(
207                            %mapping_id,
208                            device = %m.device_id,
209                            service = %m.service_type,
210                            "config_patch upsert",
211                        );
212                        self.engine.upsert_mapping(m).await;
213                        self.refresh_cache().await;
214                    } else {
215                        tracing::warn!(%mapping_id, "config_patch upsert without mapping payload; ignoring");
216                    }
217                }
218                PatchOp::Delete => {
219                    tracing::info!(%mapping_id, "config_patch delete");
220                    self.engine.remove_mapping(&mapping_id).await;
221                    self.refresh_cache().await;
222                }
223            },
224            ServerToEdge::TargetSwitch {
225                mapping_id,
226                service_target,
227            } => {
228                // Express as an upsert of the current mapping with the new
229                // service_target. Cheap since we already have it locally.
230                tracing::info!(%mapping_id, %service_target, "target_switch");
231                let mut current = self.engine.snapshot().await;
232                if let Some(idx) = current.iter().position(|m| m.mapping_id == mapping_id) {
233                    current[idx].service_target = service_target;
234                    self.engine.upsert_mapping(current.remove(idx)).await;
235                    self.refresh_cache().await;
236                } else {
237                    tracing::warn!(%mapping_id, "target_switch for unknown mapping");
238                }
239            }
240            ServerToEdge::GlyphsUpdate { glyphs } => {
241                tracing::info!(count = glyphs.len(), "received glyphs_update");
242                self.glyphs.replace_all(glyphs).await;
243            }
244            ServerToEdge::DisplayGlyph {
245                device_type,
246                device_id,
247                pattern,
248                brightness,
249                timeout_ms,
250                transition,
251            } => {
252                tracing::info!(
253                    %device_type,
254                    %device_id,
255                    "display_glyph",
256                );
257                if let Err(e) = self
258                    .device_control
259                    .display_glyph(
260                        &device_type,
261                        &device_id,
262                        &pattern,
263                        brightness,
264                        timeout_ms,
265                        transition.as_deref(),
266                    )
267                    .await
268                {
269                    tracing::warn!(
270                        error = %e,
271                        %device_type,
272                        %device_id,
273                        "display_glyph failed",
274                    );
275                }
276            }
277            ServerToEdge::DeviceConnect {
278                device_type,
279                device_id,
280            } => {
281                tracing::info!(%device_type, %device_id, "device_connect");
282                if let Err(e) = self
283                    .device_control
284                    .connect_device(&device_type, &device_id)
285                    .await
286                {
287                    tracing::warn!(
288                        error = %e,
289                        %device_type,
290                        %device_id,
291                        "device_connect failed",
292                    );
293                }
294            }
295            ServerToEdge::DeviceDisconnect {
296                device_type,
297                device_id,
298            } => {
299                tracing::info!(%device_type, %device_id, "device_disconnect");
300                if let Err(e) = self
301                    .device_control
302                    .disconnect_device(&device_type, &device_id)
303                    .await
304                {
305                    tracing::warn!(
306                        error = %e,
307                        %device_type,
308                        %device_id,
309                        "device_disconnect failed",
310                    );
311                }
312            }
313            ServerToEdge::Ping => {
314                // Pong is handled via the outbox channel to avoid tx contention here;
315                // fire-and-forget.
316                let _ = self.outbox_tx.try_send(EdgeToServer::Pong);
317            }
318        }
319        Ok(())
320    }
321
322    async fn apply_full(&self, config: &EdgeConfig) {
323        self.engine.replace_all(config.mappings.clone()).await;
324        self.glyphs.replace_all(config.glyphs.clone()).await;
325    }
326
327    /// Persist a fresh cache after an incremental patch so the agent
328    /// comes back up with the latest config even if the server is
329    /// unreachable on the next boot.
330    async fn refresh_cache(&self) {
331        let mappings = self.engine.snapshot().await;
332        // The cache stores an EdgeConfig; we need edge_id + current glyphs.
333        // Glyphs aren't kept in a cheap-to-read form on the engine, so
334        // derive from the last saved cache if present.
335        let edge_id = self.edge_id.clone();
336        let glyphs = match cache::load(&self.cache_path).await {
337            Ok(Some(cfg)) => cfg.glyphs,
338            _ => Vec::new(),
339        };
340        let cfg = EdgeConfig {
341            edge_id,
342            mappings,
343            glyphs,
344        };
345        if let Err(e) = cache::save(&self.cache_path, &cfg).await {
346            tracing::warn!(error = %e, "failed to persist cache after patch");
347        }
348    }
349}