Skip to main content

edge_core/
ws_client.rs

1//! WebSocket client to `weave-server` `/ws/edge`.
2//!
3//! Runs a long-lived reconnect loop: after connect, sends a `Hello` frame,
4//! reads frames from the server (updating the `RoutingEngine` and local cache
5//! on `ConfigFull`/`ConfigPatch`), and forwards outbound `EdgeToServer`
6//! frames produced by adapters.
7//!
8//! On unreachable server, the agent loads the cached config so local routing
9//! keeps working; the reconnect loop retries in the background.
10
11use std::path::PathBuf;
12use std::sync::Arc;
13use std::time::Duration;
14
15use futures_util::{SinkExt, StreamExt};
16use tokio::sync::{broadcast, mpsc};
17use tokio_tungstenite::tungstenite::protocol::Message;
18use weave_contracts::{EdgeConfig, EdgeToServer, PatchOp, ServerToEdge};
19
20use super::cache;
21use super::registry::GlyphRegistry;
22use super::routing::RoutingEngine;
23
24const RECONNECT_INITIAL_DELAY: Duration = Duration::from_secs(2);
25const RECONNECT_MAX_DELAY: Duration = Duration::from_secs(30);
26
27pub struct WsClient {
28    url: String,
29    edge_id: String,
30    version: String,
31    capabilities: Vec<String>,
32    engine: Arc<RoutingEngine>,
33    glyphs: Arc<GlyphRegistry>,
34    cache_path: PathBuf,
35    outbox_rx: mpsc::Receiver<EdgeToServer>,
36    outbox_tx: mpsc::Sender<EdgeToServer>,
37    resync_tx: broadcast::Sender<()>,
38}
39
40impl WsClient {
41    pub fn new(
42        url: String,
43        edge_id: String,
44        version: String,
45        capabilities: Vec<String>,
46        engine: Arc<RoutingEngine>,
47        glyphs: Arc<GlyphRegistry>,
48    ) -> Self {
49        let cache_path = cache::default_cache_path(&edge_id);
50        let (outbox_tx, outbox_rx) = mpsc::channel(256);
51        // Small buffer is fine — subscribers only care about the latest
52        // reconnect event; lagged ones can be dropped silently.
53        let (resync_tx, _) = broadcast::channel(8);
54        Self {
55            url,
56            edge_id,
57            version,
58            capabilities,
59            engine,
60            glyphs,
61            cache_path,
62            outbox_rx,
63            outbox_tx,
64            resync_tx,
65        }
66    }
67
68    /// Get a sender for outbound `EdgeToServer` frames. Clone as many times
69    /// as needed; adapters publish state updates via this channel.
70    pub fn outbox(&self) -> mpsc::Sender<EdgeToServer> {
71        self.outbox_tx.clone()
72    }
73
74    /// Clone the `ws/edge` (re)connect broadcaster. Fires once per
75    /// successful connect + Hello exchange. State-pumps subscribe to this
76    /// and replay the most recent frame per
77    /// (service_type, target, property, output_id) key so weave-server
78    /// recovers its full snapshot after a restart — otherwise idle zones
79    /// / lights that haven't changed since the last connect disappear
80    /// from the UI because the adapter's source-side dedup suppresses
81    /// re-sends.
82    pub fn resync_sender(&self) -> broadcast::Sender<()> {
83        self.resync_tx.clone()
84    }
85
86    /// Populate the routing engine + glyph registry from the local cache, if
87    /// one exists. Call this once at startup before entering `run()`.
88    pub async fn prime_from_cache(&self) -> anyhow::Result<()> {
89        if let Some(cfg) = cache::load(&self.cache_path).await? {
90            tracing::info!(
91                mappings = cfg.mappings.len(),
92                glyphs = cfg.glyphs.len(),
93                path = %self.cache_path.display(),
94                "primed routing engine from cache",
95            );
96            self.engine.replace_all(cfg.mappings).await;
97            self.glyphs.replace_all(cfg.glyphs).await;
98        }
99        Ok(())
100    }
101
102    /// Run the reconnect loop. Never returns under normal operation.
103    pub async fn run(mut self) {
104        let mut delay = RECONNECT_INITIAL_DELAY;
105        loop {
106            match self.connect_once().await {
107                Ok(_) => {
108                    tracing::info!("ws session ended cleanly; reconnecting");
109                    delay = RECONNECT_INITIAL_DELAY;
110                }
111                Err(e) => {
112                    tracing::warn!(error = %e, delay_secs = delay.as_secs(), "ws session failed");
113                }
114            }
115            tokio::time::sleep(delay).await;
116            delay = (delay * 2).min(RECONNECT_MAX_DELAY);
117        }
118    }
119
120    async fn connect_once(&mut self) -> anyhow::Result<()> {
121        let (ws, _) = tokio_tungstenite::connect_async(&self.url).await?;
122        tracing::info!(url = %self.url, "ws connected");
123        let (mut tx, mut rx) = ws.split();
124
125        let hello = EdgeToServer::Hello {
126            edge_id: self.edge_id.clone(),
127            version: self.version.clone(),
128            capabilities: self.capabilities.clone(),
129        };
130        tx.send(Message::Text(serde_json::to_string(&hello)?))
131            .await?;
132
133        // Fire after the Hello is on the wire so subscribers replay only
134        // once the server is ready to accept frames. `Err` here just means
135        // no live subscribers yet, which is fine.
136        let _ = self.resync_tx.send(());
137
138        loop {
139            tokio::select! {
140                incoming = rx.next() => {
141                    let Some(msg) = incoming else { return Ok(()); };
142                    let msg = msg?;
143                    match msg {
144                        Message::Text(t) => self.handle_server_frame(&t).await?,
145                        Message::Binary(_) => continue,
146                        Message::Ping(p) => tx.send(Message::Pong(p)).await?,
147                        Message::Pong(_) => continue,
148                        Message::Close(_) => return Ok(()),
149                        Message::Frame(_) => continue,
150                    }
151                }
152                outbound = self.outbox_rx.recv() => {
153                    let Some(frame) = outbound else { return Ok(()); };
154                    tx.send(Message::Text(serde_json::to_string(&frame)?)).await?;
155                }
156            }
157        }
158    }
159
160    async fn handle_server_frame(&self, text: &str) -> anyhow::Result<()> {
161        let frame: ServerToEdge = serde_json::from_str(text)?;
162        match frame {
163            ServerToEdge::ConfigFull { config } => {
164                tracing::info!(
165                    mappings = config.mappings.len(),
166                    glyphs = config.glyphs.len(),
167                    edge_id = %config.edge_id,
168                    "received config_full",
169                );
170                self.apply_full(&config).await;
171                let _ = cache::save(&self.cache_path, &config).await;
172            }
173            ServerToEdge::ConfigPatch {
174                mapping_id,
175                op,
176                mapping,
177            } => match op {
178                PatchOp::Upsert => {
179                    if let Some(m) = mapping {
180                        tracing::info!(
181                            %mapping_id,
182                            device = %m.device_id,
183                            service = %m.service_type,
184                            "config_patch upsert",
185                        );
186                        self.engine.upsert_mapping(m).await;
187                        self.refresh_cache().await;
188                    } else {
189                        tracing::warn!(%mapping_id, "config_patch upsert without mapping payload; ignoring");
190                    }
191                }
192                PatchOp::Delete => {
193                    tracing::info!(%mapping_id, "config_patch delete");
194                    self.engine.remove_mapping(&mapping_id).await;
195                    self.refresh_cache().await;
196                }
197            },
198            ServerToEdge::TargetSwitch {
199                mapping_id,
200                service_target,
201            } => {
202                // Express as an upsert of the current mapping with the new
203                // service_target. Cheap since we already have it locally.
204                tracing::info!(%mapping_id, %service_target, "target_switch");
205                let mut current = self.engine.snapshot().await;
206                if let Some(idx) = current.iter().position(|m| m.mapping_id == mapping_id) {
207                    current[idx].service_target = service_target;
208                    self.engine.upsert_mapping(current.remove(idx)).await;
209                    self.refresh_cache().await;
210                } else {
211                    tracing::warn!(%mapping_id, "target_switch for unknown mapping");
212                }
213            }
214            ServerToEdge::GlyphsUpdate { glyphs } => {
215                tracing::info!(count = glyphs.len(), "received glyphs_update");
216                self.glyphs.replace_all(glyphs).await;
217            }
218            ServerToEdge::Ping => {
219                // Pong is handled via the outbox channel to avoid tx contention here;
220                // fire-and-forget.
221                let _ = self.outbox_tx.try_send(EdgeToServer::Pong);
222            }
223        }
224        Ok(())
225    }
226
227    async fn apply_full(&self, config: &EdgeConfig) {
228        self.engine.replace_all(config.mappings.clone()).await;
229        self.glyphs.replace_all(config.glyphs.clone()).await;
230    }
231
232    /// Persist a fresh cache after an incremental patch so the agent
233    /// comes back up with the latest config even if the server is
234    /// unreachable on the next boot.
235    async fn refresh_cache(&self) {
236        let mappings = self.engine.snapshot().await;
237        // The cache stores an EdgeConfig; we need edge_id + current glyphs.
238        // Glyphs aren't kept in a cheap-to-read form on the engine, so
239        // derive from the last saved cache if present.
240        let edge_id = self.edge_id.clone();
241        let glyphs = match cache::load(&self.cache_path).await {
242            Ok(Some(cfg)) => cfg.glyphs,
243            _ => Vec::new(),
244        };
245        let cfg = EdgeConfig {
246            edge_id,
247            mappings,
248            glyphs,
249        };
250        if let Err(e) = cache::save(&self.cache_path, &cfg).await {
251            tracing::warn!(error = %e, "failed to persist cache after patch");
252        }
253    }
254}