1use std::path::PathBuf;
12use std::sync::Arc;
13use std::time::Duration;
14
15use futures_util::{SinkExt, StreamExt};
16use tokio::sync::{broadcast, mpsc};
17use tokio_tungstenite::tungstenite::protocol::Message;
18use weave_contracts::{EdgeConfig, EdgeToServer, PatchOp, ServerToEdge};
19
20use super::cache;
21use super::device_control::{DeviceControlHook, NoopDeviceControl};
22use super::registry::GlyphRegistry;
23use super::routing::RoutingEngine;
24
25const RECONNECT_INITIAL_DELAY: Duration = Duration::from_secs(2);
26const RECONNECT_MAX_DELAY: Duration = Duration::from_secs(30);
27
28pub struct WsClient {
29 url: String,
30 edge_id: String,
31 version: String,
32 capabilities: Vec<String>,
33 engine: Arc<RoutingEngine>,
34 glyphs: Arc<GlyphRegistry>,
35 cache_path: PathBuf,
36 outbox_rx: mpsc::Receiver<EdgeToServer>,
37 outbox_tx: mpsc::Sender<EdgeToServer>,
38 resync_tx: broadcast::Sender<()>,
39 device_control: Arc<dyn DeviceControlHook>,
40}
41
42impl WsClient {
43 pub fn new(
44 url: String,
45 edge_id: String,
46 version: String,
47 capabilities: Vec<String>,
48 engine: Arc<RoutingEngine>,
49 glyphs: Arc<GlyphRegistry>,
50 ) -> Self {
51 Self::with_device_control(
52 url,
53 edge_id,
54 version,
55 capabilities,
56 engine,
57 glyphs,
58 Arc::new(NoopDeviceControl),
59 )
60 }
61
62 pub fn with_device_control(
66 url: String,
67 edge_id: String,
68 version: String,
69 capabilities: Vec<String>,
70 engine: Arc<RoutingEngine>,
71 glyphs: Arc<GlyphRegistry>,
72 device_control: Arc<dyn DeviceControlHook>,
73 ) -> Self {
74 let cache_path = cache::default_cache_path(&edge_id);
75 let (outbox_tx, outbox_rx) = mpsc::channel(256);
76 let (resync_tx, _) = broadcast::channel(8);
79 Self {
80 url,
81 edge_id,
82 version,
83 capabilities,
84 engine,
85 glyphs,
86 cache_path,
87 outbox_rx,
88 outbox_tx,
89 resync_tx,
90 device_control,
91 }
92 }
93
94 pub fn outbox(&self) -> mpsc::Sender<EdgeToServer> {
97 self.outbox_tx.clone()
98 }
99
100 pub fn resync_sender(&self) -> broadcast::Sender<()> {
109 self.resync_tx.clone()
110 }
111
112 pub async fn prime_from_cache(&self) -> anyhow::Result<()> {
115 if let Some(cfg) = cache::load(&self.cache_path).await? {
116 tracing::info!(
117 mappings = cfg.mappings.len(),
118 glyphs = cfg.glyphs.len(),
119 path = %self.cache_path.display(),
120 "primed routing engine from cache",
121 );
122 self.engine.replace_all(cfg.mappings).await;
123 self.glyphs.replace_all(cfg.glyphs).await;
124 }
125 Ok(())
126 }
127
128 pub async fn run(mut self) {
130 let mut delay = RECONNECT_INITIAL_DELAY;
131 loop {
132 match self.connect_once().await {
133 Ok(_) => {
134 tracing::info!("ws session ended cleanly; reconnecting");
135 delay = RECONNECT_INITIAL_DELAY;
136 }
137 Err(e) => {
138 tracing::warn!(error = %e, delay_secs = delay.as_secs(), "ws session failed");
139 }
140 }
141 tokio::time::sleep(delay).await;
142 delay = (delay * 2).min(RECONNECT_MAX_DELAY);
143 }
144 }
145
146 async fn connect_once(&mut self) -> anyhow::Result<()> {
147 let (ws, _) = tokio_tungstenite::connect_async(&self.url).await?;
148 tracing::info!(url = %self.url, "ws connected");
149 let (mut tx, mut rx) = ws.split();
150
151 let hello = EdgeToServer::Hello {
152 edge_id: self.edge_id.clone(),
153 version: self.version.clone(),
154 capabilities: self.capabilities.clone(),
155 };
156 tx.send(Message::Text(serde_json::to_string(&hello)?))
157 .await?;
158
159 let _ = self.resync_tx.send(());
163
164 loop {
165 tokio::select! {
166 incoming = rx.next() => {
167 let Some(msg) = incoming else { return Ok(()); };
168 let msg = msg?;
169 match msg {
170 Message::Text(t) => self.handle_server_frame(&t).await?,
171 Message::Binary(_) => continue,
172 Message::Ping(p) => tx.send(Message::Pong(p)).await?,
173 Message::Pong(_) => continue,
174 Message::Close(_) => return Ok(()),
175 Message::Frame(_) => continue,
176 }
177 }
178 outbound = self.outbox_rx.recv() => {
179 let Some(frame) = outbound else { return Ok(()); };
180 tx.send(Message::Text(serde_json::to_string(&frame)?)).await?;
181 }
182 }
183 }
184 }
185
186 async fn handle_server_frame(&self, text: &str) -> anyhow::Result<()> {
187 let frame: ServerToEdge = serde_json::from_str(text)?;
188 match frame {
189 ServerToEdge::ConfigFull { config } => {
190 tracing::info!(
191 mappings = config.mappings.len(),
192 glyphs = config.glyphs.len(),
193 edge_id = %config.edge_id,
194 "received config_full",
195 );
196 self.apply_full(&config).await;
197 let _ = cache::save(&self.cache_path, &config).await;
198 }
199 ServerToEdge::ConfigPatch {
200 mapping_id,
201 op,
202 mapping,
203 } => match op {
204 PatchOp::Upsert => {
205 if let Some(m) = mapping {
206 tracing::info!(
207 %mapping_id,
208 device = %m.device_id,
209 service = %m.service_type,
210 "config_patch upsert",
211 );
212 self.engine.upsert_mapping(m).await;
213 self.refresh_cache().await;
214 } else {
215 tracing::warn!(%mapping_id, "config_patch upsert without mapping payload; ignoring");
216 }
217 }
218 PatchOp::Delete => {
219 tracing::info!(%mapping_id, "config_patch delete");
220 self.engine.remove_mapping(&mapping_id).await;
221 self.refresh_cache().await;
222 }
223 },
224 ServerToEdge::TargetSwitch {
225 mapping_id,
226 service_target,
227 } => {
228 tracing::info!(%mapping_id, %service_target, "target_switch");
231 let mut current = self.engine.snapshot().await;
232 if let Some(idx) = current.iter().position(|m| m.mapping_id == mapping_id) {
233 current[idx].service_target = service_target;
234 self.engine.upsert_mapping(current.remove(idx)).await;
235 self.refresh_cache().await;
236 } else {
237 tracing::warn!(%mapping_id, "target_switch for unknown mapping");
238 }
239 }
240 ServerToEdge::GlyphsUpdate { glyphs } => {
241 tracing::info!(count = glyphs.len(), "received glyphs_update");
242 self.glyphs.replace_all(glyphs).await;
243 }
244 ServerToEdge::DisplayGlyph {
245 device_type,
246 device_id,
247 pattern,
248 brightness,
249 timeout_ms,
250 transition,
251 } => {
252 tracing::info!(
253 %device_type,
254 %device_id,
255 "display_glyph",
256 );
257 if let Err(e) = self
258 .device_control
259 .display_glyph(
260 &device_type,
261 &device_id,
262 &pattern,
263 brightness,
264 timeout_ms,
265 transition.as_deref(),
266 )
267 .await
268 {
269 tracing::warn!(
270 error = %e,
271 %device_type,
272 %device_id,
273 "display_glyph failed",
274 );
275 }
276 }
277 ServerToEdge::DeviceConnect {
278 device_type,
279 device_id,
280 } => {
281 tracing::info!(%device_type, %device_id, "device_connect");
282 if let Err(e) = self
283 .device_control
284 .connect_device(&device_type, &device_id)
285 .await
286 {
287 tracing::warn!(
288 error = %e,
289 %device_type,
290 %device_id,
291 "device_connect failed",
292 );
293 }
294 }
295 ServerToEdge::DeviceDisconnect {
296 device_type,
297 device_id,
298 } => {
299 tracing::info!(%device_type, %device_id, "device_disconnect");
300 if let Err(e) = self
301 .device_control
302 .disconnect_device(&device_type, &device_id)
303 .await
304 {
305 tracing::warn!(
306 error = %e,
307 %device_type,
308 %device_id,
309 "device_disconnect failed",
310 );
311 }
312 }
313 ServerToEdge::Ping => {
314 let _ = self.outbox_tx.try_send(EdgeToServer::Pong);
317 }
318 }
319 Ok(())
320 }
321
322 async fn apply_full(&self, config: &EdgeConfig) {
323 self.engine.replace_all(config.mappings.clone()).await;
324 self.glyphs.replace_all(config.glyphs.clone()).await;
325 }
326
327 async fn refresh_cache(&self) {
331 let mappings = self.engine.snapshot().await;
332 let edge_id = self.edge_id.clone();
336 let glyphs = match cache::load(&self.cache_path).await {
337 Ok(Some(cfg)) => cfg.glyphs,
338 _ => Vec::new(),
339 };
340 let cfg = EdgeConfig {
341 edge_id,
342 mappings,
343 glyphs,
344 };
345 if let Err(e) = cache::save(&self.cache_path, &cfg).await {
346 tracing::warn!(error = %e, "failed to persist cache after patch");
347 }
348 }
349}