Skip to main content

unifly_api/
controller.rs

1// ── Controller abstraction ──
2//
3// Full lifecycle management for a UniFi controller connection.
4// Handles authentication, background refresh, command routing,
5// and reactive data streaming through the DataStore.
6
7use std::collections::HashMap;
8use std::net::{Ipv4Addr, Ipv6Addr};
9use std::sync::Arc;
10use std::time::Duration;
11
12use tokio::sync::{Mutex, broadcast, mpsc, watch};
13use tokio::task::JoinHandle;
14use tokio_util::sync::CancellationToken;
15use tracing::{debug, info, warn};
16
17use crate::command::{Command, CommandEnvelope, CommandResult};
18use crate::config::{AuthCredentials, ControllerConfig, TlsVerification};
19use crate::core_error::CoreError;
20use crate::model::{
21    AclRule, Admin, Alarm, Client, Country, Device, DnsPolicy, DpiApplication, DpiCategory,
22    EntityId, Event, FirewallAction, FirewallPolicy, FirewallZone, HealthSummary, MacAddress,
23    Network, NetworkManagement, NetworkPurpose, RadiusProfile, Site, SysInfo, SystemInfo,
24    TrafficMatchingList, Voucher, VpnServer, VpnTunnel, WanInterface, WifiBroadcast,
25};
26use crate::store::DataStore;
27use crate::stream::EntityStream;
28
29use crate::transport::{TlsMode, TransportConfig};
30use crate::websocket::{ReconnectConfig, WebSocketHandle};
31use crate::{IntegrationClient, LegacyClient};
32
33const COMMAND_CHANNEL_SIZE: usize = 64;
34const EVENT_CHANNEL_SIZE: usize = 256;
35
36// ── ConnectionState ──────────────────────────────────────────────
37
38/// Connection state observable by consumers.
39#[derive(Debug, Clone, PartialEq, Eq)]
40pub enum ConnectionState {
41    Disconnected,
42    Connecting,
43    Connected,
44    Reconnecting { attempt: u32 },
45    Failed,
46}
47
48// ── Controller ───────────────────────────────────────────────────
49
50/// The main entry point for consumers.
51///
52/// Cheaply cloneable via `Arc<ControllerInner>`. Manages the full
53/// connection lifecycle: authentication, background data refresh,
54/// command routing, and reactive entity streaming.
55#[derive(Clone)]
56pub struct Controller {
57    inner: Arc<ControllerInner>,
58}
59
60struct ControllerInner {
61    config: ControllerConfig,
62    store: Arc<DataStore>,
63    connection_state: watch::Sender<ConnectionState>,
64    event_tx: broadcast::Sender<Arc<Event>>,
65    command_tx: Mutex<mpsc::Sender<CommandEnvelope>>,
66    command_rx: Mutex<Option<mpsc::Receiver<CommandEnvelope>>>,
67    cancel: CancellationToken,
68    /// Child token for the current connection — cancelled on disconnect,
69    /// replaced on reconnect (avoids permanent cancellation).
70    cancel_child: Mutex<CancellationToken>,
71    legacy_client: Mutex<Option<LegacyClient>>,
72    integration_client: Mutex<Option<IntegrationClient>>,
73    /// Resolved Integration API site UUID (populated on connect).
74    site_id: Mutex<Option<uuid::Uuid>>,
75    /// WebSocket event stream handle (populated on connect if enabled).
76    ws_handle: Mutex<Option<WebSocketHandle>>,
77    task_handles: Mutex<Vec<JoinHandle<()>>>,
78    /// Warnings accumulated during connect (e.g. Legacy auth failure in Hybrid mode).
79    warnings: Mutex<Vec<String>>,
80}
81
82impl Controller {
83    /// Create a new Controller from configuration. Does NOT connect --
84    /// call [`connect()`](Self::connect) to authenticate and start background tasks.
85    pub fn new(config: ControllerConfig) -> Self {
86        let store = Arc::new(DataStore::new());
87        let (connection_state, _) = watch::channel(ConnectionState::Disconnected);
88        let (event_tx, _) = broadcast::channel(EVENT_CHANNEL_SIZE);
89        let (command_tx, command_rx) = mpsc::channel(COMMAND_CHANNEL_SIZE);
90        let cancel = CancellationToken::new();
91        let cancel_child = cancel.child_token();
92
93        Self {
94            inner: Arc::new(ControllerInner {
95                config,
96                store,
97                connection_state,
98                event_tx,
99                command_tx: Mutex::new(command_tx),
100                command_rx: Mutex::new(Some(command_rx)),
101                cancel,
102                cancel_child: Mutex::new(cancel_child),
103                legacy_client: Mutex::new(None),
104                integration_client: Mutex::new(None),
105                warnings: Mutex::new(Vec::new()),
106                site_id: Mutex::new(None),
107                ws_handle: Mutex::new(None),
108                task_handles: Mutex::new(Vec::new()),
109            }),
110        }
111    }
112
113    /// Access the controller configuration.
114    pub fn config(&self) -> &ControllerConfig {
115        &self.inner.config
116    }
117
118    /// Access the underlying DataStore.
119    pub fn store(&self) -> &Arc<DataStore> {
120        &self.inner.store
121    }
122
123    // ── Connection lifecycle ─────────────────────────────────────
124
125    /// Connect to the controller.
126    ///
127    /// Detects the platform, authenticates, performs an initial data
128    /// refresh, and spawns background tasks (periodic refresh, command
129    /// processor).
130    #[allow(clippy::cognitive_complexity, clippy::too_many_lines)]
131    pub async fn connect(&self) -> Result<(), CoreError> {
132        let _ = self
133            .inner
134            .connection_state
135            .send(ConnectionState::Connecting);
136
137        // Fresh child token for this connection (supports reconnect).
138        let child = self.inner.cancel.child_token();
139        *self.inner.cancel_child.lock().await = child.clone();
140
141        let config = &self.inner.config;
142        let transport = build_transport(config);
143
144        match &config.auth {
145            AuthCredentials::ApiKey(api_key) => {
146                // Detect platform so we use the right URL prefix
147                let platform = LegacyClient::detect_platform(&config.url).await?;
148                debug!(?platform, "detected controller platform");
149
150                // Integration API client (preferred)
151                let integration = IntegrationClient::from_api_key(
152                    config.url.as_str(),
153                    api_key,
154                    &transport,
155                    platform,
156                )?;
157
158                // Resolve site UUID from Integration API
159                let site_id = resolve_site_id(&integration, &config.site).await?;
160                debug!(site_id = %site_id, "resolved Integration API site UUID");
161
162                *self.inner.integration_client.lock().await = Some(integration);
163                *self.inner.site_id.lock().await = Some(site_id);
164
165                // Also set up Legacy client for event streams and supplementary data.
166                // API key auth may not work with Legacy API on all controllers,
167                // so we swallow errors here — it's optional.
168                match setup_legacy_client(config, &transport).await {
169                    Ok(client) => {
170                        *self.inner.legacy_client.lock().await = Some(client);
171                        debug!("legacy client available as supplement");
172                    }
173                    Err(e) => {
174                        debug!(error = %e, "legacy client unavailable (non-fatal with API key auth)");
175                    }
176                }
177            }
178            AuthCredentials::Credentials { username, password } => {
179                // Legacy-only auth
180                let platform = LegacyClient::detect_platform(&config.url).await?;
181                debug!(?platform, "detected controller platform");
182
183                let client = LegacyClient::new(
184                    config.url.clone(),
185                    config.site.clone(),
186                    platform,
187                    &transport,
188                )?;
189                client.login(username, password).await?;
190                debug!("session authentication successful");
191
192                *self.inner.legacy_client.lock().await = Some(client);
193            }
194            AuthCredentials::Hybrid {
195                api_key,
196                username,
197                password,
198            } => {
199                // Hybrid: both Integration API (API key) and Legacy API (session auth)
200                let platform = LegacyClient::detect_platform(&config.url).await?;
201                debug!(?platform, "detected controller platform (hybrid)");
202
203                // Integration API client
204                let integration = IntegrationClient::from_api_key(
205                    config.url.as_str(),
206                    api_key,
207                    &transport,
208                    platform,
209                )?;
210
211                let site_id = resolve_site_id(&integration, &config.site).await?;
212                debug!(site_id = %site_id, "resolved Integration API site UUID");
213
214                *self.inner.integration_client.lock().await = Some(integration);
215                *self.inner.site_id.lock().await = Some(site_id);
216
217                // Legacy API client — attempt login but degrade gracefully
218                // if it fails. The Integration API is the primary surface;
219                // Legacy adds events, stats, and admin ops.
220                match LegacyClient::new(
221                    config.url.clone(),
222                    config.site.clone(),
223                    platform,
224                    &transport,
225                ) {
226                    Ok(client) => match client.login(username, password).await {
227                        Ok(()) => {
228                            debug!("legacy session authentication successful (hybrid)");
229                            *self.inner.legacy_client.lock().await = Some(client);
230                        }
231                        Err(e) => {
232                            let msg = format!(
233                                "Legacy login failed: {e} — events, health stats, and client traffic will be unavailable"
234                            );
235                            warn!("{msg}");
236                            self.inner.warnings.lock().await.push(msg);
237                        }
238                    },
239                    Err(e) => {
240                        let msg = format!("Legacy client setup failed: {e}");
241                        warn!("{msg}");
242                        self.inner.warnings.lock().await.push(msg);
243                    }
244                }
245            }
246            AuthCredentials::Cloud { api_key, host_id } => {
247                let integration = IntegrationClient::from_api_key(
248                    config.url.as_str(),
249                    api_key,
250                    &transport,
251                    crate::ControllerPlatform::Cloud,
252                )?;
253
254                let site_id = if let Ok(uuid) = uuid::Uuid::parse_str(&config.site) {
255                    uuid
256                } else if let Ok(uuid) = uuid::Uuid::parse_str(host_id) {
257                    uuid
258                } else {
259                    resolve_site_id(&integration, &config.site).await?
260                };
261                debug!(site_id = %site_id, "resolved cloud Integration API site UUID");
262
263                *self.inner.integration_client.lock().await = Some(integration);
264                *self.inner.site_id.lock().await = Some(site_id);
265
266                let msg =
267                    "Cloud auth mode active: Legacy API and WebSocket features are unavailable"
268                        .to_string();
269                self.inner.warnings.lock().await.push(msg);
270            }
271        }
272
273        // Initial data load
274        self.full_refresh().await?;
275
276        // Spawn background tasks
277        let mut handles = self.inner.task_handles.lock().await;
278
279        if let Some(rx) = self.inner.command_rx.lock().await.take() {
280            let ctrl = self.clone();
281            handles.push(tokio::spawn(command_processor_task(ctrl, rx)));
282        }
283
284        let interval_secs = config.refresh_interval_secs;
285        if interval_secs > 0 {
286            let ctrl = self.clone();
287            let cancel = child.clone();
288            handles.push(tokio::spawn(refresh_task(ctrl, interval_secs, cancel)));
289        }
290
291        // WebSocket event stream
292        if config.websocket_enabled {
293            self.spawn_websocket(&child, &mut handles).await;
294        }
295
296        let _ = self.inner.connection_state.send(ConnectionState::Connected);
297        info!("connected to controller");
298        Ok(())
299    }
300
301    /// Spawn the WebSocket event stream and a bridge task that converts
302    /// raw [`UnifiEvent`]s into domain [`Event`]s and broadcasts them.
303    ///
304    /// Non-fatal on failure — the TUI falls back to polling.
305    async fn spawn_websocket(&self, cancel: &CancellationToken, handles: &mut Vec<JoinHandle<()>>) {
306        let legacy_guard = self.inner.legacy_client.lock().await;
307        let Some(ref legacy) = *legacy_guard else {
308            debug!("no legacy client — WebSocket unavailable");
309            return;
310        };
311
312        let platform = legacy.platform();
313        let Some(ws_path_template) = platform.websocket_path() else {
314            debug!("platform does not support WebSocket");
315            return;
316        };
317
318        let ws_path = ws_path_template.replace("{site}", &self.inner.config.site);
319        let base_url = &self.inner.config.url;
320        let scheme = if base_url.scheme() == "https" {
321            "wss"
322        } else {
323            "ws"
324        };
325        let host = base_url.host_str().unwrap_or("localhost");
326        let ws_url_str = match base_url.port() {
327            Some(p) => format!("{scheme}://{host}:{p}{ws_path}"),
328            None => format!("{scheme}://{host}{ws_path}"),
329        };
330        let ws_url = match url::Url::parse(&ws_url_str) {
331            Ok(u) => u,
332            Err(e) => {
333                warn!(error = %e, url = %ws_url_str, "invalid WebSocket URL");
334                return;
335            }
336        };
337
338        let cookie = legacy.cookie_header();
339        drop(legacy_guard);
340
341        if cookie.is_none() {
342            warn!("no session cookie — WebSocket requires legacy auth (skipping)");
343            return;
344        }
345
346        let ws_tls = tls_to_transport(&self.inner.config.tls);
347        let ws_cancel = cancel.child_token();
348        let handle = match WebSocketHandle::connect(
349            ws_url,
350            ReconnectConfig::default(),
351            ws_cancel.clone(),
352            cookie,
353            ws_tls,
354        ) {
355            Ok(h) => h,
356            Err(e) => {
357                warn!(error = %e, "WebSocket connection failed (non-fatal)");
358                return;
359            }
360        };
361
362        // Bridge task: WS events → domain Events → broadcast channel.
363        // Also extracts real-time device stats from `device:sync` messages
364        // to feed the dashboard chart without waiting for full_refresh().
365        let mut ws_rx = handle.subscribe();
366        let event_tx = self.inner.event_tx.clone();
367        let store = Arc::clone(&self.inner.store);
368        let bridge_cancel = ws_cancel;
369
370        handles.push(tokio::spawn(async move {
371            loop {
372                tokio::select! {
373                    biased;
374                    () = bridge_cancel.cancelled() => break,
375                    result = ws_rx.recv() => {
376                        match result {
377                            Ok(ws_event) => {
378                                // Extract real-time stats from device:sync messages
379                                if ws_event.key == "device:sync" || ws_event.key == "device:update" {
380                                    apply_device_sync(&store, &ws_event.extra);
381                                }
382
383                                let event = crate::model::event::Event::from(
384                                    (*ws_event).clone(),
385                                );
386                                let _ = event_tx.send(Arc::new(event));
387                            }
388                            Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
389                                warn!(skipped = n, "WS bridge: receiver lagged");
390                            }
391                            Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
392                        }
393                    }
394                }
395            }
396        }));
397
398        *self.inner.ws_handle.lock().await = Some(handle);
399        info!("WebSocket event stream spawned (handshake in progress)");
400    }
401
402    /// Disconnect from the controller.
403    ///
404    /// Cancels background tasks, logs out if session-based, and resets
405    /// the connection state to [`Disconnected`](ConnectionState::Disconnected).
406    pub async fn disconnect(&self) {
407        // Cancel the child token (not the parent — allows reconnect).
408        self.inner.cancel_child.lock().await.cancel();
409
410        // Join all background tasks
411        let mut handles = self.inner.task_handles.lock().await;
412        for handle in handles.drain(..) {
413            let _ = handle.await;
414        }
415
416        // Logout if session-based (Credentials or Hybrid both have active sessions)
417        if matches!(
418            self.inner.config.auth,
419            AuthCredentials::Credentials { .. } | AuthCredentials::Hybrid { .. }
420        ) {
421            if let Some(ref client) = *self.inner.legacy_client.lock().await {
422                if let Err(e) = client.logout().await {
423                    warn!(error = %e, "logout failed (non-fatal)");
424                }
425            }
426        }
427
428        // Shut down WebSocket if active
429        if let Some(handle) = self.inner.ws_handle.lock().await.take() {
430            handle.shutdown();
431        }
432
433        *self.inner.legacy_client.lock().await = None;
434        *self.inner.integration_client.lock().await = None;
435        *self.inner.site_id.lock().await = None;
436
437        // Recreate command channel so reconnects can spawn a fresh receiver.
438        // The previous receiver is consumed by the command processor task.
439        {
440            let (tx, rx) = mpsc::channel(COMMAND_CHANNEL_SIZE);
441            *self.inner.command_tx.lock().await = tx;
442            *self.inner.command_rx.lock().await = Some(rx);
443        }
444
445        let _ = self
446            .inner
447            .connection_state
448            .send(ConnectionState::Disconnected);
449        debug!("disconnected");
450    }
451
452    /// Fetch all data from the controller and update the DataStore.
453    ///
454    /// Pulls devices, clients, and events from the Legacy API, converts
455    /// them to domain types, and applies them to the store. Events are
456    /// broadcast through the event channel (not stored).
457    #[allow(clippy::cognitive_complexity, clippy::too_many_lines)]
458    pub async fn full_refresh(&self) -> Result<(), CoreError> {
459        let integration_guard = self.inner.integration_client.lock().await;
460        let site_id = *self.inner.site_id.lock().await;
461
462        if let (Some(integration), Some(sid)) = (integration_guard.as_ref(), site_id) {
463            // ── Integration API path (preferred) ─────────────────
464            let page_limit = 200;
465
466            let (devices_res, clients_res, networks_res, wifi_res) = tokio::join!(
467                integration.paginate_all(page_limit, |off, lim| {
468                    integration.list_devices(&sid, off, lim)
469                }),
470                integration.paginate_all(page_limit, |off, lim| {
471                    integration.list_clients(&sid, off, lim)
472                }),
473                integration.paginate_all(page_limit, |off, lim| {
474                    integration.list_networks(&sid, off, lim)
475                }),
476                integration.paginate_all(page_limit, |off, lim| {
477                    integration.list_wifi_broadcasts(&sid, off, lim)
478                }),
479            );
480
481            let (policies_res, zones_res, acls_res, dns_res, vouchers_res) = tokio::join!(
482                integration.paginate_all(page_limit, |off, lim| {
483                    integration.list_firewall_policies(&sid, off, lim)
484                }),
485                integration.paginate_all(page_limit, |off, lim| {
486                    integration.list_firewall_zones(&sid, off, lim)
487                }),
488                integration.paginate_all(page_limit, |off, lim| {
489                    integration.list_acl_rules(&sid, off, lim)
490                }),
491                integration.paginate_all(page_limit, |off, lim| {
492                    integration.list_dns_policies(&sid, off, lim)
493                }),
494                integration.paginate_all(page_limit, |off, lim| {
495                    integration.list_vouchers(&sid, off, lim)
496                }),
497            );
498
499            let (sites_res, tml_res) = tokio::join!(
500                integration.paginate_all(50, |off, lim| { integration.list_sites(off, lim) }),
501                integration.paginate_all(page_limit, |off, lim| {
502                    integration.list_traffic_matching_lists(&sid, off, lim)
503                }),
504            );
505
506            // Core endpoints — failure is fatal
507            let devices: Vec<Device> = devices_res?.into_iter().map(Device::from).collect();
508            let mut clients: Vec<Client> = clients_res?.into_iter().map(Client::from).collect();
509            // Fetch full details for each network (list endpoint omits ipv4/ipv6 config)
510            let network_ids: Vec<uuid::Uuid> = networks_res?.into_iter().map(|n| n.id).collect();
511            info!(
512                network_count = network_ids.len(),
513                "fetching network details"
514            );
515            let networks: Vec<Network> = {
516                let futs = network_ids.into_iter().map(|nid| async move {
517                    match integration.get_network(&sid, &nid).await {
518                        Ok(detail) => Some(Network::from(detail)),
519                        Err(e) => {
520                            warn!(network_id = %nid, error = %e, "network detail fetch failed");
521                            None
522                        }
523                    }
524                });
525                futures_util::future::join_all(futs)
526                    .await
527                    .into_iter()
528                    .flatten()
529                    .collect()
530            };
531            let wifi: Vec<WifiBroadcast> = wifi_res?.into_iter().map(WifiBroadcast::from).collect();
532            let policies: Vec<FirewallPolicy> = policies_res?
533                .into_iter()
534                .map(FirewallPolicy::from)
535                .collect();
536            let zones: Vec<FirewallZone> = zones_res?.into_iter().map(FirewallZone::from).collect();
537            let sites: Vec<Site> = sites_res?.into_iter().map(Site::from).collect();
538            let traffic_matching_lists: Vec<TrafficMatchingList> = tml_res?
539                .into_iter()
540                .map(TrafficMatchingList::from)
541                .collect();
542
543            // Optional endpoints — 404 means the controller doesn't support them
544            let acls: Vec<AclRule> = unwrap_or_empty("acl/rules", acls_res);
545            let dns: Vec<DnsPolicy> = unwrap_or_empty("dns/policies", dns_res);
546            let vouchers: Vec<Voucher> = unwrap_or_empty("vouchers", vouchers_res);
547
548            // Enrich devices with per-device statistics (parallel, non-fatal)
549            info!(
550                device_count = devices.len(),
551                "enriching devices with statistics"
552            );
553            let mut devices = {
554                let futs = devices.into_iter().map(|mut device| async {
555                    if let EntityId::Uuid(device_uuid) = &device.id {
556                        match integration.get_device_statistics(&sid, device_uuid).await {
557                            Ok(stats_resp) => {
558                                device.stats =
559                                    crate::convert::device_stats_from_integration(&stats_resp);
560                            }
561                            Err(e) => {
562                                warn!(
563                                    device = ?device.name,
564                                    error = %e,
565                                    "device stats fetch failed"
566                                );
567                            }
568                        }
569                    }
570                    device
571                });
572                futures_util::future::join_all(futs).await
573            };
574
575            drop(integration_guard);
576
577            // Supplement with Legacy API data (events, health, client traffic, device stats)
578            let (legacy_events, legacy_health, legacy_clients, legacy_devices): (
579                Vec<Event>,
580                Vec<HealthSummary>,
581                Vec<crate::legacy::models::LegacyClientEntry>,
582                Vec<crate::legacy::models::LegacyDevice>,
583            ) = match *self.inner.legacy_client.lock().await {
584                Some(ref legacy) => {
585                    let (events_res, health_res, clients_res, devices_res) = tokio::join!(
586                        legacy.list_events(Some(100)),
587                        legacy.get_health(),
588                        legacy.list_clients(),
589                        legacy.list_devices(),
590                    );
591
592                    let events = match events_res {
593                        Ok(raw) => {
594                            let evts: Vec<Event> = raw.into_iter().map(Event::from).collect();
595                            for evt in &evts {
596                                let _ = self.inner.event_tx.send(Arc::new(evt.clone()));
597                            }
598                            evts
599                        }
600                        Err(e) => {
601                            warn!(error = %e, "legacy event fetch failed (non-fatal)");
602                            Vec::new()
603                        }
604                    };
605
606                    let health = match health_res {
607                        Ok(raw) => convert_health_summaries(raw),
608                        Err(e) => {
609                            warn!(error = %e, "legacy health fetch failed (non-fatal)");
610                            Vec::new()
611                        }
612                    };
613
614                    let lc = match clients_res {
615                        Ok(raw) => raw,
616                        Err(e) => {
617                            warn!(
618                                error = %e,
619                                "legacy client fetch failed (non-fatal)"
620                            );
621                            Vec::new()
622                        }
623                    };
624
625                    let ld = match devices_res {
626                        Ok(raw) => raw,
627                        Err(e) => {
628                            warn!(error = %e, "legacy device fetch failed (non-fatal)");
629                            Vec::new()
630                        }
631                    };
632
633                    (events, health, lc, ld)
634                }
635                None => (Vec::new(), Vec::new(), Vec::new(), Vec::new()),
636            };
637
638            // Merge Legacy client traffic (tx/rx bytes, hostname) into Integration clients.
639            // Match by IP address — Integration API clients often lack real MAC addresses
640            // in the access object, falling back to UUIDs which don't match Legacy MACs.
641            if !legacy_clients.is_empty() {
642                let legacy_by_ip: HashMap<&str, &crate::legacy::models::LegacyClientEntry> =
643                    legacy_clients
644                        .iter()
645                        .filter_map(|lc| lc.ip.as_deref().map(|ip| (ip, lc)))
646                        .collect();
647                let mut merged = 0u32;
648                for client in &mut clients {
649                    let ip_key = client.ip.map(|ip| ip.to_string());
650                    if let Some(lc) = ip_key.as_deref().and_then(|ip| legacy_by_ip.get(ip)) {
651                        if client.tx_bytes.is_none() {
652                            client.tx_bytes = lc.tx_bytes.and_then(|b| u64::try_from(b).ok());
653                        }
654                        if client.rx_bytes.is_none() {
655                            client.rx_bytes = lc.rx_bytes.and_then(|b| u64::try_from(b).ok());
656                        }
657                        if client.hostname.is_none() {
658                            client.hostname.clone_from(&lc.hostname);
659                        }
660                        // Merge wireless info (Legacy has AP MAC, signal, channel)
661                        if client.wireless.is_none() {
662                            let legacy_client: Client = Client::from((*lc).clone());
663                            client.wireless = legacy_client.wireless;
664                            if client.uplink_device_mac.is_none() {
665                                client.uplink_device_mac = legacy_client.uplink_device_mac;
666                            }
667                        }
668                        merged += 1;
669                    }
670                }
671                debug!(
672                    total_clients = clients.len(),
673                    legacy_available = legacy_by_ip.len(),
674                    merged,
675                    "client traffic merge (by IP)"
676                );
677            }
678
679            // Merge Legacy device num_sta (client counts) into Integration devices
680            if !legacy_devices.is_empty() {
681                let legacy_by_mac: HashMap<&str, &crate::legacy::models::LegacyDevice> =
682                    legacy_devices.iter().map(|d| (d.mac.as_str(), d)).collect();
683                for device in &mut devices {
684                    if let Some(ld) = legacy_by_mac.get(device.mac.as_str()) {
685                        if device.client_count.is_none() {
686                            device.client_count = ld.num_sta.and_then(|n| n.try_into().ok());
687                        }
688                        if device.wan_ipv6.is_none() {
689                            device.wan_ipv6 = parse_legacy_device_wan_ipv6(&ld.extra);
690                        }
691                    }
692                }
693            }
694
695            // Push health to DataStore
696            if !legacy_health.is_empty() {
697                self.inner
698                    .store
699                    .site_health
700                    .send_modify(|h| *h = Arc::new(legacy_health));
701            }
702
703            self.inner
704                .store
705                .apply_integration_snapshot(crate::store::RefreshSnapshot {
706                    devices,
707                    clients,
708                    networks,
709                    wifi,
710                    policies,
711                    zones,
712                    acls,
713                    dns,
714                    vouchers,
715                    sites,
716                    events: legacy_events,
717                    traffic_matching_lists,
718                });
719        } else {
720            // ── Legacy-only path ─────────────────────────────────
721            drop(integration_guard);
722
723            let legacy_guard = self.inner.legacy_client.lock().await;
724            let legacy = legacy_guard
725                .as_ref()
726                .ok_or(CoreError::ControllerDisconnected)?;
727
728            let (devices_res, clients_res, events_res) = tokio::join!(
729                legacy.list_devices(),
730                legacy.list_clients(),
731                legacy.list_events(Some(100)),
732            );
733
734            let devices: Vec<Device> = devices_res?.into_iter().map(Device::from).collect();
735            let clients: Vec<Client> = clients_res?.into_iter().map(Client::from).collect();
736            let events: Vec<Event> = events_res?.into_iter().map(Event::from).collect();
737
738            drop(legacy_guard);
739
740            for event in &events {
741                let _ = self.inner.event_tx.send(Arc::new(event.clone()));
742            }
743
744            self.inner
745                .store
746                .apply_integration_snapshot(crate::store::RefreshSnapshot {
747                    devices,
748                    clients,
749                    networks: Vec::new(),
750                    wifi: Vec::new(),
751                    policies: Vec::new(),
752                    zones: Vec::new(),
753                    acls: Vec::new(),
754                    dns: Vec::new(),
755                    vouchers: Vec::new(),
756                    sites: Vec::new(),
757                    events,
758                    traffic_matching_lists: Vec::new(),
759                });
760        }
761
762        debug!(
763            devices = self.inner.store.device_count(),
764            clients = self.inner.store.client_count(),
765            "data refresh complete"
766        );
767
768        Ok(())
769    }
770
771    // ── Command execution ────────────────────────────────────────
772
773    /// Execute a command against the controller.
774    ///
775    /// Sends the command through the internal channel to the command
776    /// processor task and awaits the result.
777    pub async fn execute(&self, cmd: Command) -> Result<CommandResult, CoreError> {
778        if *self.inner.connection_state.borrow() != ConnectionState::Connected {
779            return Err(CoreError::ControllerDisconnected);
780        }
781
782        let (tx, rx) = tokio::sync::oneshot::channel();
783
784        let command_tx = self.inner.command_tx.lock().await.clone();
785
786        command_tx
787            .send(CommandEnvelope {
788                command: cmd,
789                response_tx: tx,
790            })
791            .await
792            .map_err(|_| CoreError::ControllerDisconnected)?;
793
794        rx.await.map_err(|_| CoreError::ControllerDisconnected)?
795    }
796
797    // ── One-shot convenience ─────────────────────────────────────
798
799    /// One-shot: connect, run closure, disconnect.
800    ///
801    /// Optimized for CLI: disables WebSocket and periodic refresh since
802    /// we only need a single request-response cycle.
803    pub async fn oneshot<F, Fut, T>(config: ControllerConfig, f: F) -> Result<T, CoreError>
804    where
805        F: FnOnce(Controller) -> Fut,
806        Fut: std::future::Future<Output = Result<T, CoreError>>,
807    {
808        let mut cfg = config;
809        cfg.websocket_enabled = false;
810        cfg.refresh_interval_secs = 0;
811
812        let controller = Controller::new(cfg);
813        controller.connect().await?;
814        let result = f(controller.clone()).await;
815        controller.disconnect().await;
816        result
817    }
818
819    // ── State observation ────────────────────────────────────────
820
821    /// Subscribe to connection state changes.
822    pub fn connection_state(&self) -> watch::Receiver<ConnectionState> {
823        self.inner.connection_state.subscribe()
824    }
825
826    /// Subscribe to the event broadcast stream.
827    pub fn events(&self) -> broadcast::Receiver<Arc<Event>> {
828        self.inner.event_tx.subscribe()
829    }
830
831    // ── Snapshot accessors (delegate to DataStore) ───────────────
832
833    pub fn devices_snapshot(&self) -> Arc<Vec<Arc<Device>>> {
834        self.inner.store.devices_snapshot()
835    }
836
837    pub fn clients_snapshot(&self) -> Arc<Vec<Arc<Client>>> {
838        self.inner.store.clients_snapshot()
839    }
840
841    pub fn networks_snapshot(&self) -> Arc<Vec<Arc<Network>>> {
842        self.inner.store.networks_snapshot()
843    }
844
845    pub fn wifi_broadcasts_snapshot(&self) -> Arc<Vec<Arc<WifiBroadcast>>> {
846        self.inner.store.wifi_broadcasts_snapshot()
847    }
848
849    pub fn firewall_policies_snapshot(&self) -> Arc<Vec<Arc<FirewallPolicy>>> {
850        self.inner.store.firewall_policies_snapshot()
851    }
852
853    pub fn firewall_zones_snapshot(&self) -> Arc<Vec<Arc<FirewallZone>>> {
854        self.inner.store.firewall_zones_snapshot()
855    }
856
857    pub fn acl_rules_snapshot(&self) -> Arc<Vec<Arc<AclRule>>> {
858        self.inner.store.acl_rules_snapshot()
859    }
860
861    pub fn dns_policies_snapshot(&self) -> Arc<Vec<Arc<DnsPolicy>>> {
862        self.inner.store.dns_policies_snapshot()
863    }
864
865    pub fn vouchers_snapshot(&self) -> Arc<Vec<Arc<Voucher>>> {
866        self.inner.store.vouchers_snapshot()
867    }
868
869    pub fn sites_snapshot(&self) -> Arc<Vec<Arc<Site>>> {
870        self.inner.store.sites_snapshot()
871    }
872
873    pub fn events_snapshot(&self) -> Arc<Vec<Arc<Event>>> {
874        self.inner.store.events_snapshot()
875    }
876
877    pub fn traffic_matching_lists_snapshot(&self) -> Arc<Vec<Arc<TrafficMatchingList>>> {
878        self.inner.store.traffic_matching_lists_snapshot()
879    }
880
881    // ── Stream accessors (delegate to DataStore) ─────────────────
882
883    pub fn devices(&self) -> EntityStream<Device> {
884        self.inner.store.subscribe_devices()
885    }
886
887    pub fn clients(&self) -> EntityStream<Client> {
888        self.inner.store.subscribe_clients()
889    }
890
891    pub fn networks(&self) -> EntityStream<Network> {
892        self.inner.store.subscribe_networks()
893    }
894
895    pub fn wifi_broadcasts(&self) -> EntityStream<WifiBroadcast> {
896        self.inner.store.subscribe_wifi_broadcasts()
897    }
898
899    pub fn firewall_policies(&self) -> EntityStream<FirewallPolicy> {
900        self.inner.store.subscribe_firewall_policies()
901    }
902
903    pub fn firewall_zones(&self) -> EntityStream<FirewallZone> {
904        self.inner.store.subscribe_firewall_zones()
905    }
906
907    pub fn acl_rules(&self) -> EntityStream<AclRule> {
908        self.inner.store.subscribe_acl_rules()
909    }
910
911    pub fn dns_policies(&self) -> EntityStream<DnsPolicy> {
912        self.inner.store.subscribe_dns_policies()
913    }
914
915    pub fn vouchers(&self) -> EntityStream<Voucher> {
916        self.inner.store.subscribe_vouchers()
917    }
918
919    pub fn sites(&self) -> EntityStream<Site> {
920        self.inner.store.subscribe_sites()
921    }
922
923    pub fn traffic_matching_lists(&self) -> EntityStream<TrafficMatchingList> {
924        self.inner.store.subscribe_traffic_matching_lists()
925    }
926
927    /// Subscribe to site health updates (WAN IP, latency, bandwidth rates).
928    pub fn site_health(&self) -> watch::Receiver<Arc<Vec<HealthSummary>>> {
929        self.inner.store.subscribe_site_health()
930    }
931
932    /// Drain warnings accumulated during connect (e.g. Legacy auth failure).
933    pub async fn take_warnings(&self) -> Vec<String> {
934        std::mem::take(&mut *self.inner.warnings.lock().await)
935    }
936
937    // ── Ad-hoc Integration API queries ───────────────────────────
938    //
939    // These bypass the DataStore and query the Integration API directly.
940    // Intended for reference data that doesn't need reactive subscriptions.
941
942    /// Fetch VPN servers from the Integration API.
943    pub async fn list_vpn_servers(&self) -> Result<Vec<VpnServer>, CoreError> {
944        let guard = self.inner.integration_client.lock().await;
945        let site_id = *self.inner.site_id.lock().await;
946        let (ic, sid) = require_integration(&guard, site_id, "list_vpn_servers")?;
947        let raw = ic
948            .paginate_all(200, |off, lim| ic.list_vpn_servers(&sid, off, lim))
949            .await?;
950        Ok(raw
951            .into_iter()
952            .map(|s| {
953                let id = s
954                    .fields
955                    .get("id")
956                    .and_then(|v| v.as_str())
957                    .and_then(|s| uuid::Uuid::parse_str(s).ok())
958                    .map_or_else(|| EntityId::Legacy("unknown".into()), EntityId::Uuid);
959                VpnServer {
960                    id,
961                    name: s
962                        .fields
963                        .get("name")
964                        .and_then(|v| v.as_str())
965                        .map(String::from),
966                    server_type: s
967                        .fields
968                        .get("type")
969                        .or_else(|| s.fields.get("serverType"))
970                        .and_then(|v| v.as_str())
971                        .unwrap_or("UNKNOWN")
972                        .to_owned(),
973                    enabled: s.fields.get("enabled").and_then(serde_json::Value::as_bool),
974                }
975            })
976            .collect())
977    }
978
979    /// Fetch VPN tunnels from the Integration API.
980    pub async fn list_vpn_tunnels(&self) -> Result<Vec<VpnTunnel>, CoreError> {
981        let guard = self.inner.integration_client.lock().await;
982        let site_id = *self.inner.site_id.lock().await;
983        let (ic, sid) = require_integration(&guard, site_id, "list_vpn_tunnels")?;
984        let raw = ic
985            .paginate_all(200, |off, lim| ic.list_vpn_tunnels(&sid, off, lim))
986            .await?;
987        Ok(raw
988            .into_iter()
989            .map(|t| {
990                let id = t
991                    .fields
992                    .get("id")
993                    .and_then(|v| v.as_str())
994                    .and_then(|s| uuid::Uuid::parse_str(s).ok())
995                    .map_or_else(|| EntityId::Legacy("unknown".into()), EntityId::Uuid);
996                VpnTunnel {
997                    id,
998                    name: t
999                        .fields
1000                        .get("name")
1001                        .and_then(|v| v.as_str())
1002                        .map(String::from),
1003                    tunnel_type: t
1004                        .fields
1005                        .get("type")
1006                        .or_else(|| t.fields.get("tunnelType"))
1007                        .and_then(|v| v.as_str())
1008                        .unwrap_or("UNKNOWN")
1009                        .to_owned(),
1010                    enabled: t.fields.get("enabled").and_then(serde_json::Value::as_bool),
1011                }
1012            })
1013            .collect())
1014    }
1015
1016    /// Fetch WAN interfaces from the Integration API.
1017    pub async fn list_wans(&self) -> Result<Vec<WanInterface>, CoreError> {
1018        let guard = self.inner.integration_client.lock().await;
1019        let site_id = *self.inner.site_id.lock().await;
1020        let (ic, sid) = require_integration(&guard, site_id, "list_wans")?;
1021        let raw = ic
1022            .paginate_all(200, |off, lim| ic.list_wans(&sid, off, lim))
1023            .await?;
1024        Ok(raw
1025            .into_iter()
1026            .map(|w| {
1027                let id = w
1028                    .fields
1029                    .get("id")
1030                    .and_then(|v| v.as_str())
1031                    .and_then(|s| uuid::Uuid::parse_str(s).ok())
1032                    .map_or_else(|| EntityId::Legacy("unknown".into()), EntityId::Uuid);
1033                let parse_ip = |key: &str| -> Option<std::net::IpAddr> {
1034                    w.fields
1035                        .get(key)
1036                        .and_then(|v| v.as_str())
1037                        .and_then(|s| s.parse().ok())
1038                };
1039                let dns = w
1040                    .fields
1041                    .get("dns")
1042                    .and_then(|v| v.as_array())
1043                    .map(|arr| {
1044                        arr.iter()
1045                            .filter_map(|v| v.as_str().and_then(|s| s.parse().ok()))
1046                            .collect()
1047                    })
1048                    .unwrap_or_default();
1049                WanInterface {
1050                    id,
1051                    name: w
1052                        .fields
1053                        .get("name")
1054                        .and_then(|v| v.as_str())
1055                        .map(String::from),
1056                    ip: parse_ip("ipAddress").or_else(|| parse_ip("ip")),
1057                    gateway: parse_ip("gateway"),
1058                    dns,
1059                }
1060            })
1061            .collect())
1062    }
1063
1064    /// Fetch DPI categories from the Integration API.
1065    pub async fn list_dpi_categories(&self) -> Result<Vec<DpiCategory>, CoreError> {
1066        let guard = self.inner.integration_client.lock().await;
1067        let site_id = *self.inner.site_id.lock().await;
1068        let (ic, sid) = require_integration(&guard, site_id, "list_dpi_categories")?;
1069        let raw = ic
1070            .paginate_all(200, |off, lim| ic.list_dpi_categories(&sid, off, lim))
1071            .await?;
1072        Ok(raw
1073            .into_iter()
1074            .map(|c| {
1075                #[allow(clippy::as_conversions, clippy::cast_possible_truncation)]
1076                let id = c
1077                    .fields
1078                    .get("id")
1079                    .and_then(serde_json::Value::as_u64)
1080                    .unwrap_or(0) as u32;
1081                DpiCategory {
1082                    id,
1083                    name: c
1084                        .fields
1085                        .get("name")
1086                        .and_then(|v| v.as_str())
1087                        .unwrap_or("Unknown")
1088                        .to_owned(),
1089                    tx_bytes: c
1090                        .fields
1091                        .get("txBytes")
1092                        .and_then(serde_json::Value::as_u64)
1093                        .unwrap_or(0),
1094                    rx_bytes: c
1095                        .fields
1096                        .get("rxBytes")
1097                        .and_then(serde_json::Value::as_u64)
1098                        .unwrap_or(0),
1099                    apps: Vec::new(),
1100                }
1101            })
1102            .collect())
1103    }
1104
1105    /// Fetch DPI applications from the Integration API.
1106    pub async fn list_dpi_applications(&self) -> Result<Vec<DpiApplication>, CoreError> {
1107        let guard = self.inner.integration_client.lock().await;
1108        let site_id = *self.inner.site_id.lock().await;
1109        let (ic, sid) = require_integration(&guard, site_id, "list_dpi_applications")?;
1110        let raw = ic
1111            .paginate_all(200, |off, lim| ic.list_dpi_applications(&sid, off, lim))
1112            .await?;
1113        Ok(raw
1114            .into_iter()
1115            .map(|a| {
1116                #[allow(clippy::as_conversions, clippy::cast_possible_truncation)]
1117                let id = a
1118                    .fields
1119                    .get("id")
1120                    .and_then(serde_json::Value::as_u64)
1121                    .unwrap_or(0) as u32;
1122                DpiApplication {
1123                    id,
1124                    name: a
1125                        .fields
1126                        .get("name")
1127                        .and_then(|v| v.as_str())
1128                        .unwrap_or("Unknown")
1129                        .to_owned(),
1130                    #[allow(clippy::as_conversions, clippy::cast_possible_truncation)]
1131                    category_id: a
1132                        .fields
1133                        .get("categoryId")
1134                        .and_then(serde_json::Value::as_u64)
1135                        .unwrap_or(0) as u32,
1136                    tx_bytes: a
1137                        .fields
1138                        .get("txBytes")
1139                        .and_then(serde_json::Value::as_u64)
1140                        .unwrap_or(0),
1141                    rx_bytes: a
1142                        .fields
1143                        .get("rxBytes")
1144                        .and_then(serde_json::Value::as_u64)
1145                        .unwrap_or(0),
1146                }
1147            })
1148            .collect())
1149    }
1150
1151    /// Fetch RADIUS profiles from the Integration API.
1152    pub async fn list_radius_profiles(&self) -> Result<Vec<RadiusProfile>, CoreError> {
1153        let guard = self.inner.integration_client.lock().await;
1154        let site_id = *self.inner.site_id.lock().await;
1155        let (ic, sid) = require_integration(&guard, site_id, "list_radius_profiles")?;
1156        let raw = ic
1157            .paginate_all(200, |off, lim| ic.list_radius_profiles(&sid, off, lim))
1158            .await?;
1159        Ok(raw
1160            .into_iter()
1161            .map(|r| {
1162                let id = r
1163                    .fields
1164                    .get("id")
1165                    .and_then(|v| v.as_str())
1166                    .and_then(|s| uuid::Uuid::parse_str(s).ok())
1167                    .map_or_else(|| EntityId::Legacy("unknown".into()), EntityId::Uuid);
1168                RadiusProfile {
1169                    id,
1170                    name: r
1171                        .fields
1172                        .get("name")
1173                        .and_then(|v| v.as_str())
1174                        .unwrap_or("Unknown")
1175                        .to_owned(),
1176                }
1177            })
1178            .collect())
1179    }
1180
1181    /// Fetch countries from the Integration API.
1182    pub async fn list_countries(&self) -> Result<Vec<Country>, CoreError> {
1183        let guard = self.inner.integration_client.lock().await;
1184        let ic = guard
1185            .as_ref()
1186            .ok_or_else(|| unsupported("list_countries"))?;
1187        let raw = ic
1188            .paginate_all(200, |off, lim| ic.list_countries(off, lim))
1189            .await?;
1190        Ok(raw
1191            .into_iter()
1192            .map(|c| Country {
1193                code: c
1194                    .fields
1195                    .get("code")
1196                    .and_then(|v| v.as_str())
1197                    .unwrap_or("")
1198                    .to_owned(),
1199                name: c
1200                    .fields
1201                    .get("name")
1202                    .and_then(|v| v.as_str())
1203                    .unwrap_or("Unknown")
1204                    .to_owned(),
1205            })
1206            .collect())
1207    }
1208
1209    /// Fetch references for a specific network (Integration API).
1210    pub async fn get_network_references(
1211        &self,
1212        network_id: &EntityId,
1213    ) -> Result<serde_json::Value, CoreError> {
1214        let guard = self.inner.integration_client.lock().await;
1215        let site_id = *self.inner.site_id.lock().await;
1216        let (ic, sid) = require_integration(&guard, site_id, "get_network_references")?;
1217        let uuid = require_uuid(network_id)?;
1218        let refs = ic.get_network_references(&sid, &uuid).await?;
1219        Ok(serde_json::to_value(refs).unwrap_or_default())
1220    }
1221
1222    /// Fetch firewall policy ordering (Integration API).
1223    pub async fn get_firewall_policy_ordering(
1224        &self,
1225    ) -> Result<crate::integration_types::FirewallPolicyOrdering, CoreError> {
1226        let guard = self.inner.integration_client.lock().await;
1227        let site_id = *self.inner.site_id.lock().await;
1228        let (ic, sid) = require_integration(&guard, site_id, "get_firewall_policy_ordering")?;
1229        Ok(ic.get_firewall_policy_ordering(&sid).await?)
1230    }
1231
1232    /// List pending devices.
1233    ///
1234    /// Prefers Integration API pending endpoint, falls back to filtering
1235    /// the canonical device snapshot by pending adoption state.
1236    pub async fn list_pending_devices(&self) -> Result<Vec<serde_json::Value>, CoreError> {
1237        let integration_guard = self.inner.integration_client.lock().await;
1238        let site_id = *self.inner.site_id.lock().await;
1239
1240        if let (Some(ic), Some(sid)) = (integration_guard.as_ref(), site_id) {
1241            let raw = ic
1242                .paginate_all(200, |off, lim| ic.list_pending_devices(&sid, off, lim))
1243                .await?;
1244            return Ok(raw
1245                .into_iter()
1246                .map(|v| serde_json::to_value(v).unwrap_or_default())
1247                .collect());
1248        }
1249
1250        let snapshot = self.devices_snapshot();
1251        Ok(snapshot
1252            .iter()
1253            .filter(|d| d.state == crate::model::DeviceState::PendingAdoption)
1254            .map(|d| serde_json::to_value(d.as_ref()).unwrap_or_default())
1255            .collect())
1256    }
1257
1258    /// List device tags.
1259    ///
1260    /// Uses Integration API when available.
1261    pub async fn list_device_tags(&self) -> Result<Vec<serde_json::Value>, CoreError> {
1262        let integration_guard = self.inner.integration_client.lock().await;
1263        let site_id = *self.inner.site_id.lock().await;
1264        if let (Some(ic), Some(sid)) = (integration_guard.as_ref(), site_id) {
1265            let raw = ic
1266                .paginate_all(200, |off, lim| ic.list_device_tags(&sid, off, lim))
1267                .await?;
1268            return Ok(raw
1269                .into_iter()
1270                .map(|v| serde_json::to_value(v).unwrap_or_default())
1271                .collect());
1272        }
1273
1274        Ok(Vec::new())
1275    }
1276
1277    /// List controller backups (legacy API).
1278    pub async fn list_backups(&self) -> Result<Vec<serde_json::Value>, CoreError> {
1279        let guard = self.inner.legacy_client.lock().await;
1280        let legacy = require_legacy(&guard)?;
1281        Ok(legacy.list_backups().await?)
1282    }
1283
1284    /// Download a controller backup file (legacy API).
1285    pub async fn download_backup(&self, filename: &str) -> Result<Vec<u8>, CoreError> {
1286        let guard = self.inner.legacy_client.lock().await;
1287        let legacy = require_legacy(&guard)?;
1288        Ok(legacy.download_backup(filename).await?)
1289    }
1290
1291    // ── Statistics (Legacy API) ────────────────────────────────────
1292
1293    /// Fetch site-level historical statistics.
1294    pub async fn get_site_stats(
1295        &self,
1296        interval: &str,
1297        start: Option<i64>,
1298        end: Option<i64>,
1299        attrs: Option<&[String]>,
1300    ) -> Result<Vec<serde_json::Value>, CoreError> {
1301        let guard = self.inner.legacy_client.lock().await;
1302        let legacy = require_legacy(&guard)?;
1303        Ok(legacy.get_site_stats(interval, start, end, attrs).await?)
1304    }
1305
1306    /// Fetch per-device historical statistics.
1307    pub async fn get_device_stats(
1308        &self,
1309        interval: &str,
1310        macs: Option<&[String]>,
1311        attrs: Option<&[String]>,
1312    ) -> Result<Vec<serde_json::Value>, CoreError> {
1313        let guard = self.inner.legacy_client.lock().await;
1314        let legacy = require_legacy(&guard)?;
1315        Ok(legacy.get_device_stats(interval, macs, attrs).await?)
1316    }
1317
1318    /// Fetch per-client historical statistics.
1319    pub async fn get_client_stats(
1320        &self,
1321        interval: &str,
1322        macs: Option<&[String]>,
1323        attrs: Option<&[String]>,
1324    ) -> Result<Vec<serde_json::Value>, CoreError> {
1325        let guard = self.inner.legacy_client.lock().await;
1326        let legacy = require_legacy(&guard)?;
1327        Ok(legacy.get_client_stats(interval, macs, attrs).await?)
1328    }
1329
1330    /// Fetch gateway historical statistics.
1331    pub async fn get_gateway_stats(
1332        &self,
1333        interval: &str,
1334        start: Option<i64>,
1335        end: Option<i64>,
1336        attrs: Option<&[String]>,
1337    ) -> Result<Vec<serde_json::Value>, CoreError> {
1338        let guard = self.inner.legacy_client.lock().await;
1339        let legacy = require_legacy(&guard)?;
1340        Ok(legacy
1341            .get_gateway_stats(interval, start, end, attrs)
1342            .await?)
1343    }
1344
1345    /// Fetch DPI statistics.
1346    pub async fn get_dpi_stats(
1347        &self,
1348        group_by: &str,
1349        macs: Option<&[String]>,
1350    ) -> Result<Vec<serde_json::Value>, CoreError> {
1351        let guard = self.inner.legacy_client.lock().await;
1352        let legacy = require_legacy(&guard)?;
1353        Ok(legacy.get_dpi_stats(group_by, macs).await?)
1354    }
1355
1356    // ── Ad-hoc Legacy API queries ──────────────────────────────────
1357    //
1358    // Legacy-only data that doesn't live in the DataStore.
1359
1360    /// Fetch admin list from the Legacy API.
1361    pub async fn list_admins(&self) -> Result<Vec<Admin>, CoreError> {
1362        let guard = self.inner.legacy_client.lock().await;
1363        let legacy = require_legacy(&guard)?;
1364        let raw = legacy.list_admins().await?;
1365        Ok(raw
1366            .into_iter()
1367            .map(|v| Admin {
1368                id: v.get("_id").and_then(|v| v.as_str()).map_or_else(
1369                    || EntityId::Legacy("unknown".into()),
1370                    |s| EntityId::Legacy(s.into()),
1371                ),
1372                name: v
1373                    .get("name")
1374                    .and_then(|v| v.as_str())
1375                    .unwrap_or("")
1376                    .to_owned(),
1377                email: v.get("email").and_then(|v| v.as_str()).map(String::from),
1378                role: v
1379                    .get("role")
1380                    .and_then(|v| v.as_str())
1381                    .unwrap_or("unknown")
1382                    .to_owned(),
1383                is_super: v
1384                    .get("is_super")
1385                    .and_then(serde_json::Value::as_bool)
1386                    .unwrap_or(false),
1387                last_login: None,
1388            })
1389            .collect())
1390    }
1391
1392    /// Fetch alarms from the Legacy API.
1393    pub async fn list_alarms(&self) -> Result<Vec<Alarm>, CoreError> {
1394        let guard = self.inner.legacy_client.lock().await;
1395        let legacy = require_legacy(&guard)?;
1396        let raw = legacy.list_alarms().await?;
1397        Ok(raw.into_iter().map(Alarm::from).collect())
1398    }
1399
1400    /// Fetch controller system info.
1401    ///
1402    /// Prefers the Integration API (`GET /v1/info`) when available,
1403    /// falls back to Legacy `stat/sysinfo`.
1404    pub async fn get_system_info(&self) -> Result<SystemInfo, CoreError> {
1405        // Try Integration API first (works with API key auth).
1406        {
1407            let guard = self.inner.integration_client.lock().await;
1408            if let Some(ic) = guard.as_ref() {
1409                let info = ic.get_info().await?;
1410                let f = &info.fields;
1411                return Ok(SystemInfo {
1412                    controller_name: f
1413                        .get("applicationName")
1414                        .or_else(|| f.get("name"))
1415                        .and_then(|v| v.as_str())
1416                        .map(String::from),
1417                    version: f
1418                        .get("applicationVersion")
1419                        .or_else(|| f.get("version"))
1420                        .and_then(|v| v.as_str())
1421                        .unwrap_or("unknown")
1422                        .to_owned(),
1423                    build: f.get("build").and_then(|v| v.as_str()).map(String::from),
1424                    hostname: f.get("hostname").and_then(|v| v.as_str()).map(String::from),
1425                    ip: None, // Not available via Integration API
1426                    uptime_secs: f.get("uptime").and_then(serde_json::Value::as_u64),
1427                    update_available: f
1428                        .get("isUpdateAvailable")
1429                        .or_else(|| f.get("update_available"))
1430                        .and_then(serde_json::Value::as_bool),
1431                });
1432            }
1433        }
1434
1435        // Fallback to Legacy API (requires session auth).
1436        let guard = self.inner.legacy_client.lock().await;
1437        let legacy = require_legacy(&guard)?;
1438        let raw = legacy.get_sysinfo().await?;
1439        Ok(SystemInfo {
1440            controller_name: raw
1441                .get("controller_name")
1442                .or_else(|| raw.get("name"))
1443                .and_then(|v| v.as_str())
1444                .map(String::from),
1445            version: raw
1446                .get("version")
1447                .and_then(|v| v.as_str())
1448                .unwrap_or("unknown")
1449                .to_owned(),
1450            build: raw.get("build").and_then(|v| v.as_str()).map(String::from),
1451            hostname: raw
1452                .get("hostname")
1453                .and_then(|v| v.as_str())
1454                .map(String::from),
1455            ip: raw
1456                .get("ip_addrs")
1457                .and_then(|v| v.as_array())
1458                .and_then(|a| a.first())
1459                .and_then(|v| v.as_str())
1460                .and_then(|s| s.parse().ok()),
1461            uptime_secs: raw.get("uptime").and_then(serde_json::Value::as_u64),
1462            update_available: raw
1463                .get("update_available")
1464                .and_then(serde_json::Value::as_bool),
1465        })
1466    }
1467
1468    /// Fetch site health dashboard from the Legacy API.
1469    pub async fn get_site_health(&self) -> Result<Vec<HealthSummary>, CoreError> {
1470        let guard = self.inner.legacy_client.lock().await;
1471        let legacy = require_legacy(&guard)?;
1472        let raw = legacy.get_health().await?;
1473        Ok(convert_health_summaries(raw))
1474    }
1475
1476    /// Fetch low-level sysinfo from the Legacy API.
1477    pub async fn get_sysinfo(&self) -> Result<SysInfo, CoreError> {
1478        let guard = self.inner.legacy_client.lock().await;
1479        let legacy = require_legacy(&guard)?;
1480        let raw = legacy.get_sysinfo().await?;
1481        Ok(SysInfo {
1482            timezone: raw
1483                .get("timezone")
1484                .and_then(|v| v.as_str())
1485                .map(String::from),
1486            autobackup: raw.get("autobackup").and_then(serde_json::Value::as_bool),
1487            hostname: raw
1488                .get("hostname")
1489                .and_then(|v| v.as_str())
1490                .map(String::from),
1491            ip_addrs: raw
1492                .get("ip_addrs")
1493                .and_then(|v| v.as_array())
1494                .map(|a| {
1495                    a.iter()
1496                        .filter_map(|v| v.as_str().map(String::from))
1497                        .collect()
1498                })
1499                .unwrap_or_default(),
1500            live_chat: raw
1501                .get("live_chat")
1502                .and_then(|v| v.as_str())
1503                .map(String::from),
1504            #[allow(clippy::as_conversions, clippy::cast_possible_truncation)]
1505            data_retention_days: raw
1506                .get("data_retention_days")
1507                .and_then(serde_json::Value::as_u64)
1508                .map(|n| n as u32),
1509            extra: raw,
1510        })
1511    }
1512}
1513
1514// ── Background tasks ─────────────────────────────────────────────
1515
1516/// Parse a numeric field from a JSON object, tolerating both string and number encodings.
1517fn parse_f64_field(parent: Option<&serde_json::Value>, key: &str) -> Option<f64> {
1518    parent.and_then(|s| s.get(key)).and_then(|v| {
1519        v.as_str()
1520            .and_then(|s| s.parse().ok())
1521            .or_else(|| v.as_f64())
1522    })
1523}
1524
1525/// Apply a `device:sync` WebSocket message to the DataStore.
1526///
1527/// Extracts CPU, memory, load averages, and uplink bandwidth from the
1528/// raw Legacy API device JSON. Merges stats into the existing device
1529/// (looked up by MAC) without clobbering Integration API fields.
1530#[allow(clippy::cast_precision_loss)]
1531fn apply_device_sync(store: &DataStore, data: &serde_json::Value) {
1532    let Some(mac_str) = data.get("mac").and_then(serde_json::Value::as_str) else {
1533        return;
1534    };
1535    let mac = MacAddress::new(mac_str);
1536    let Some(existing) = store.device_by_mac(&mac) else {
1537        return; // Device not in store yet — full_refresh will add it
1538    };
1539
1540    // Parse sys_stats
1541    let sys = data.get("sys_stats");
1542    let cpu = sys
1543        .and_then(|s| s.get("cpu"))
1544        .and_then(|v| v.as_str().or_else(|| v.as_f64().map(|_| "")))
1545        .and_then(|s| {
1546            if s.is_empty() {
1547                None
1548            } else {
1549                s.parse::<f64>().ok()
1550            }
1551        })
1552        .or_else(|| {
1553            sys.and_then(|s| s.get("cpu"))
1554                .and_then(serde_json::Value::as_f64)
1555        });
1556    #[allow(clippy::as_conversions, clippy::cast_precision_loss)]
1557    let mem_pct = match (
1558        sys.and_then(|s| s.get("mem_used"))
1559            .and_then(serde_json::Value::as_i64),
1560        sys.and_then(|s| s.get("mem_total"))
1561            .and_then(serde_json::Value::as_i64),
1562    ) {
1563        (Some(used), Some(total)) if total > 0 => Some((used as f64 / total as f64) * 100.0),
1564        _ => None,
1565    };
1566    let load_averages: [Option<f64>; 3] =
1567        ["loadavg_1", "loadavg_5", "loadavg_15"].map(|key| parse_f64_field(sys, key));
1568
1569    // Uplink bandwidth: check "uplink" object or top-level fields
1570    let uplink = data.get("uplink");
1571    let tx_bps = uplink
1572        .and_then(|u| u.get("tx_bytes-r").or_else(|| u.get("tx_bytes_r")))
1573        .and_then(serde_json::Value::as_u64)
1574        .or_else(|| data.get("tx_bytes-r").and_then(serde_json::Value::as_u64));
1575    let rx_bps = uplink
1576        .and_then(|u| u.get("rx_bytes-r").or_else(|| u.get("rx_bytes_r")))
1577        .and_then(serde_json::Value::as_u64)
1578        .or_else(|| data.get("rx_bytes-r").and_then(serde_json::Value::as_u64));
1579
1580    let bandwidth = match (tx_bps, rx_bps) {
1581        (Some(tx), Some(rx)) if tx > 0 || rx > 0 => Some(crate::model::common::Bandwidth {
1582            tx_bytes_per_sec: tx,
1583            rx_bytes_per_sec: rx,
1584        }),
1585        _ => existing.stats.uplink_bandwidth, // Keep existing if no new data
1586    };
1587
1588    // Uptime from top-level `_uptime` or `uptime`
1589    let uptime = data
1590        .get("_uptime")
1591        .or_else(|| data.get("uptime"))
1592        .and_then(serde_json::Value::as_i64)
1593        .and_then(|u| u.try_into().ok())
1594        .or(existing.stats.uptime_secs);
1595
1596    // Clone and update
1597    let mut device = (*existing).clone();
1598    device.stats.uplink_bandwidth = bandwidth;
1599    if let Some(c) = cpu {
1600        device.stats.cpu_utilization_pct = Some(c);
1601    }
1602    if let Some(m) = mem_pct {
1603        device.stats.memory_utilization_pct = Some(m);
1604    }
1605    if let Some(l) = load_averages[0] {
1606        device.stats.load_average_1m = Some(l);
1607    }
1608    if let Some(l) = load_averages[1] {
1609        device.stats.load_average_5m = Some(l);
1610    }
1611    if let Some(l) = load_averages[2] {
1612        device.stats.load_average_15m = Some(l);
1613    }
1614    device.stats.uptime_secs = uptime;
1615
1616    // Update client count from num_sta (AP/switch connected stations)
1617    if let Some(num_sta) = data.get("num_sta").and_then(serde_json::Value::as_u64) {
1618        #[allow(clippy::as_conversions, clippy::cast_possible_truncation)]
1619        {
1620            device.client_count = Some(num_sta as u32);
1621        }
1622    }
1623
1624    if let Some(obj) = data.as_object() {
1625        if let Some(wan_ipv6) = parse_legacy_device_wan_ipv6(obj) {
1626            device.wan_ipv6 = Some(wan_ipv6);
1627        }
1628    }
1629
1630    let key = mac.as_str().to_owned();
1631    let id = device.id.clone();
1632    store.devices.upsert(key, id, device);
1633}
1634
1635/// Periodically refresh data from the controller.
1636async fn refresh_task(controller: Controller, interval_secs: u64, cancel: CancellationToken) {
1637    let mut interval = tokio::time::interval(Duration::from_secs(interval_secs));
1638    interval.tick().await; // consume the immediate first tick
1639
1640    loop {
1641        tokio::select! {
1642            biased;
1643            () = cancel.cancelled() => break,
1644            _ = interval.tick() => {
1645                if let Err(e) = controller.full_refresh().await {
1646                    warn!(error = %e, "periodic refresh failed");
1647                }
1648            }
1649        }
1650    }
1651}
1652
1653/// Process commands from the mpsc channel, routing each to the
1654/// appropriate Legacy API call.
1655async fn command_processor_task(controller: Controller, mut rx: mpsc::Receiver<CommandEnvelope>) {
1656    let cancel = controller.inner.cancel_child.lock().await.clone();
1657
1658    loop {
1659        tokio::select! {
1660            biased;
1661            () = cancel.cancelled() => break,
1662            envelope = rx.recv() => {
1663                let Some(envelope) = envelope else { break };
1664                let result = route_command(&controller, envelope.command).await;
1665                let _ = envelope.response_tx.send(result);
1666            }
1667        }
1668    }
1669}
1670
1671// ── Command routing ──────────────────────────────────────────────
1672
1673/// Route a command to the appropriate API call.
1674///
1675/// Uses the Integration API for CRUD operations when available,
1676/// falls back to the Legacy API for session-based commands.
1677#[allow(clippy::cognitive_complexity, clippy::too_many_lines)]
1678async fn route_command(controller: &Controller, cmd: Command) -> Result<CommandResult, CoreError> {
1679    let store = &controller.inner.store;
1680
1681    // Acquire both clients for routing decisions
1682    let integration_guard = controller.inner.integration_client.lock().await;
1683    let legacy_guard = controller.inner.legacy_client.lock().await;
1684    let site_id = *controller.inner.site_id.lock().await;
1685
1686    match cmd {
1687        // ── Device operations ────────────────────────────────────
1688        Command::AdoptDevice {
1689            mac,
1690            ignore_device_limit,
1691        } => {
1692            if let (Some(ic), Some(sid)) = (integration_guard.as_ref(), site_id) {
1693                ic.adopt_device(&sid, mac.as_str(), ignore_device_limit)
1694                    .await?;
1695            } else {
1696                let legacy = require_legacy(&legacy_guard)?;
1697                legacy.adopt_device(mac.as_str()).await?;
1698            }
1699            Ok(CommandResult::Ok)
1700        }
1701
1702        Command::RestartDevice { id } => {
1703            if let (Some(ic), Some(sid)) = (integration_guard.as_ref(), site_id) {
1704                let device_uuid = require_uuid(&id)?;
1705                ic.device_action(&sid, &device_uuid, "RESTART").await?;
1706            } else {
1707                let legacy = require_legacy(&legacy_guard)?;
1708                let mac = device_mac(store, &id)?;
1709                legacy.restart_device(mac.as_str()).await?;
1710            }
1711            Ok(CommandResult::Ok)
1712        }
1713
1714        Command::LocateDevice { mac, enable } => {
1715            if let (Some(ic), Some(sid)) = (integration_guard.as_ref(), site_id) {
1716                let device =
1717                    store
1718                        .device_by_mac(&mac)
1719                        .ok_or_else(|| CoreError::DeviceNotFound {
1720                            identifier: mac.to_string(),
1721                        })?;
1722                let device_uuid = require_uuid(&device.id)?;
1723                let action = if enable { "LOCATE_ON" } else { "LOCATE_OFF" };
1724                ic.device_action(&sid, &device_uuid, action).await?;
1725            } else {
1726                let legacy = require_legacy(&legacy_guard)?;
1727                legacy.locate_device(mac.as_str(), enable).await?;
1728            }
1729            Ok(CommandResult::Ok)
1730        }
1731
1732        Command::UpgradeDevice { mac, firmware_url } => {
1733            let legacy = require_legacy(&legacy_guard)?;
1734            legacy
1735                .upgrade_device(mac.as_str(), firmware_url.as_deref())
1736                .await?;
1737            Ok(CommandResult::Ok)
1738        }
1739
1740        Command::RemoveDevice { id } => {
1741            let (ic, sid) = require_integration(&integration_guard, site_id, "RemoveDevice")?;
1742            let device_uuid = require_uuid(&id)?;
1743            ic.remove_device(&sid, &device_uuid).await?;
1744            Ok(CommandResult::Ok)
1745        }
1746
1747        Command::ProvisionDevice { mac } => {
1748            let legacy = require_legacy(&legacy_guard)?;
1749            legacy.provision_device(mac.as_str()).await?;
1750            Ok(CommandResult::Ok)
1751        }
1752        Command::SpeedtestDevice => {
1753            let legacy = require_legacy(&legacy_guard)?;
1754            legacy.speedtest().await?;
1755            Ok(CommandResult::Ok)
1756        }
1757
1758        Command::PowerCyclePort {
1759            device_id,
1760            port_idx,
1761        } => {
1762            let (ic, sid) = require_integration(&integration_guard, site_id, "PowerCyclePort")?;
1763            let device_uuid = require_uuid(&device_id)?;
1764            ic.port_action(&sid, &device_uuid, port_idx, "POWER_CYCLE")
1765                .await?;
1766            Ok(CommandResult::Ok)
1767        }
1768
1769        // ── Client operations ────────────────────────────────────
1770        Command::BlockClient { mac } => {
1771            if let (Some(ic), Some(sid)) = (integration_guard.as_ref(), site_id) {
1772                let client =
1773                    store
1774                        .client_by_mac(&mac)
1775                        .ok_or_else(|| CoreError::ClientNotFound {
1776                            identifier: mac.to_string(),
1777                        })?;
1778                let client_uuid = require_uuid(&client.id)?;
1779                ic.client_action(&sid, &client_uuid, "BLOCK").await?;
1780            } else {
1781                let legacy = require_legacy(&legacy_guard)?;
1782                legacy.block_client(mac.as_str()).await?;
1783            }
1784            Ok(CommandResult::Ok)
1785        }
1786
1787        Command::UnblockClient { mac } => {
1788            if let (Some(ic), Some(sid)) = (integration_guard.as_ref(), site_id) {
1789                let client =
1790                    store
1791                        .client_by_mac(&mac)
1792                        .ok_or_else(|| CoreError::ClientNotFound {
1793                            identifier: mac.to_string(),
1794                        })?;
1795                let client_uuid = require_uuid(&client.id)?;
1796                ic.client_action(&sid, &client_uuid, "UNBLOCK").await?;
1797            } else {
1798                let legacy = require_legacy(&legacy_guard)?;
1799                legacy.unblock_client(mac.as_str()).await?;
1800            }
1801            Ok(CommandResult::Ok)
1802        }
1803
1804        Command::KickClient { mac } => {
1805            if let (Some(ic), Some(sid)) = (integration_guard.as_ref(), site_id) {
1806                let client =
1807                    store
1808                        .client_by_mac(&mac)
1809                        .ok_or_else(|| CoreError::ClientNotFound {
1810                            identifier: mac.to_string(),
1811                        })?;
1812                let client_uuid = require_uuid(&client.id)?;
1813                ic.client_action(&sid, &client_uuid, "RECONNECT").await?;
1814            } else {
1815                let legacy = require_legacy(&legacy_guard)?;
1816                legacy.kick_client(mac.as_str()).await?;
1817            }
1818            Ok(CommandResult::Ok)
1819        }
1820
1821        Command::ForgetClient { mac } => {
1822            let legacy = require_legacy(&legacy_guard)?;
1823            legacy.forget_client(mac.as_str()).await?;
1824            Ok(CommandResult::Ok)
1825        }
1826
1827        Command::AuthorizeGuest {
1828            client_id,
1829            time_limit_minutes,
1830            data_limit_mb,
1831            rx_rate_kbps,
1832            tx_rate_kbps,
1833        } => {
1834            let legacy = require_legacy(&legacy_guard)?;
1835            let mac = client_mac(store, &client_id)?;
1836            let minutes = time_limit_minutes.unwrap_or(60);
1837            #[allow(clippy::as_conversions, clippy::cast_possible_truncation)]
1838            {
1839                legacy
1840                    .authorize_guest(
1841                        mac.as_str(),
1842                        minutes,
1843                        tx_rate_kbps.map(|r| r as u32),
1844                        rx_rate_kbps.map(|r| r as u32),
1845                        data_limit_mb.map(|m| m as u32),
1846                    )
1847                    .await?;
1848            }
1849            Ok(CommandResult::Ok)
1850        }
1851
1852        Command::UnauthorizeGuest { client_id } => {
1853            let legacy = require_legacy(&legacy_guard)?;
1854            let mac = client_mac(store, &client_id)?;
1855            legacy.unauthorize_guest(mac.as_str()).await?;
1856            Ok(CommandResult::Ok)
1857        }
1858
1859        // ── Alarm operations ─────────────────────────────────────
1860        Command::ArchiveAlarm { id } => {
1861            let legacy = require_legacy(&legacy_guard)?;
1862            legacy.archive_alarm(&id.to_string()).await?;
1863            Ok(CommandResult::Ok)
1864        }
1865
1866        Command::ArchiveAllAlarms => {
1867            let legacy = require_legacy(&legacy_guard)?;
1868            legacy.archive_all_alarms().await?;
1869            Ok(CommandResult::Ok)
1870        }
1871
1872        // ── Backup operations ────────────────────────────────────
1873        Command::CreateBackup => {
1874            let legacy = require_legacy(&legacy_guard)?;
1875            legacy.create_backup().await?;
1876            Ok(CommandResult::Ok)
1877        }
1878
1879        Command::DeleteBackup { filename } => {
1880            let legacy = require_legacy(&legacy_guard)?;
1881            legacy.delete_backup(&filename).await?;
1882            Ok(CommandResult::Ok)
1883        }
1884
1885        // ── Network CRUD (Integration API) ───────────────────────
1886        Command::CreateNetwork(req) => {
1887            let (ic, sid) = require_integration(&integration_guard, site_id, "CreateNetwork")?;
1888            let crate::command::CreateNetworkRequest {
1889                name,
1890                vlan_id,
1891                subnet,
1892                management,
1893                purpose,
1894                dhcp_enabled,
1895                enabled,
1896                dhcp_range_start,
1897                dhcp_range_stop,
1898                dhcp_lease_time,
1899                firewall_zone_id,
1900                isolation_enabled,
1901                internet_access_enabled,
1902            } = req;
1903
1904            let management = management.unwrap_or_else(|| {
1905                if matches!(purpose, Some(NetworkPurpose::VlanOnly)) {
1906                    NetworkManagement::Unmanaged
1907                } else if purpose.is_some() || subnet.is_some() || dhcp_enabled {
1908                    NetworkManagement::Gateway
1909                } else {
1910                    NetworkManagement::Unmanaged
1911                }
1912            });
1913            let mut extra = HashMap::new();
1914
1915            if let Some(zone) = firewall_zone_id {
1916                extra.insert("zoneId".into(), serde_json::Value::String(zone));
1917            }
1918
1919            if matches!(management, NetworkManagement::Gateway) {
1920                extra.insert(
1921                    "isolationEnabled".into(),
1922                    serde_json::Value::Bool(isolation_enabled),
1923                );
1924                extra.insert(
1925                    "internetAccessEnabled".into(),
1926                    serde_json::Value::Bool(internet_access_enabled),
1927                );
1928
1929                if let Some(cidr) = subnet {
1930                    let (host_ip, prefix_len) = parse_ipv4_cidr(&cidr)?;
1931                    let mut dhcp_cfg = serde_json::Map::new();
1932                    dhcp_cfg.insert(
1933                        "mode".into(),
1934                        serde_json::Value::String(
1935                            if dhcp_enabled { "SERVER" } else { "NONE" }.into(),
1936                        ),
1937                    );
1938                    if let Some(lease) = dhcp_lease_time {
1939                        dhcp_cfg.insert(
1940                            "leaseTimeSeconds".into(),
1941                            serde_json::Value::Number(serde_json::Number::from(u64::from(lease))),
1942                        );
1943                    }
1944
1945                    if let (Some(start), Some(stop)) = (dhcp_range_start, dhcp_range_stop) {
1946                        dhcp_cfg.insert(
1947                            "ipAddressRange".into(),
1948                            serde_json::json!({
1949                                "start": start,
1950                                "end": stop
1951                            }),
1952                        );
1953                    }
1954
1955                    extra.insert(
1956                        "ipv4Configuration".into(),
1957                        serde_json::json!({
1958                            "hostIpAddress": host_ip.to_string(),
1959                            "prefixLength": u64::from(prefix_len),
1960                            "dhcpConfiguration": dhcp_cfg
1961                        }),
1962                    );
1963                }
1964            }
1965
1966            let body = crate::integration_types::NetworkCreateUpdate {
1967                name,
1968                enabled,
1969                management: "USER_DEFINED".into(),
1970                vlan_id: vlan_id.map_or(1, i32::from),
1971                dhcp_guarding: None,
1972                extra,
1973            };
1974            ic.create_network(&sid, &body).await?;
1975            Ok(CommandResult::Ok)
1976        }
1977
1978        Command::UpdateNetwork { id, update } => {
1979            let (ic, sid) = require_integration(&integration_guard, site_id, "UpdateNetwork")?;
1980            let uuid = require_uuid(&id)?;
1981            // Fetch existing to merge partial update
1982            let existing = ic.get_network(&sid, &uuid).await?;
1983            // Start from existing extra fields, then apply toggle overrides
1984            let mut extra = existing.extra;
1985            if let Some(v) = update.isolation_enabled {
1986                extra.insert("isolationEnabled".into(), serde_json::Value::Bool(v));
1987            }
1988            if let Some(v) = update.internet_access_enabled {
1989                extra.insert("internetAccessEnabled".into(), serde_json::Value::Bool(v));
1990            }
1991            if let Some(v) = update.mdns_forwarding_enabled {
1992                extra.insert("mdnsForwardingEnabled".into(), serde_json::Value::Bool(v));
1993            }
1994            if let Some(v) = update.ipv6_enabled {
1995                if v {
1996                    // Enable IPv6 with prefix delegation if not already configured
1997                    extra
1998                        .entry("ipv6Configuration".into())
1999                        .or_insert_with(|| serde_json::json!({ "type": "PREFIX_DELEGATION" }));
2000                } else {
2001                    extra.remove("ipv6Configuration");
2002                }
2003            }
2004            let body = crate::integration_types::NetworkCreateUpdate {
2005                name: update.name.unwrap_or(existing.name),
2006                enabled: update.enabled.unwrap_or(existing.enabled),
2007                management: existing.management,
2008                vlan_id: update.vlan_id.map_or(existing.vlan_id, i32::from),
2009                dhcp_guarding: existing.dhcp_guarding,
2010                extra,
2011            };
2012            ic.update_network(&sid, &uuid, &body).await?;
2013            Ok(CommandResult::Ok)
2014        }
2015
2016        Command::DeleteNetwork { id, force: _ } => {
2017            let (ic, sid) = require_integration(&integration_guard, site_id, "DeleteNetwork")?;
2018            let uuid = require_uuid(&id)?;
2019            ic.delete_network(&sid, &uuid).await?;
2020            Ok(CommandResult::Ok)
2021        }
2022
2023        // ── WiFi Broadcast CRUD ──────────────────────────────────
2024        Command::CreateWifiBroadcast(req) => {
2025            let (ic, sid) =
2026                require_integration(&integration_guard, site_id, "CreateWifiBroadcast")?;
2027            let mut extra = serde_json::Map::new();
2028            extra.insert("ssid".into(), serde_json::Value::String(req.ssid));
2029            let security_mode = match req.security_mode {
2030                crate::model::WifiSecurityMode::Open => "OPEN",
2031                crate::model::WifiSecurityMode::Wpa2Personal => "WPA2_PERSONAL",
2032                crate::model::WifiSecurityMode::Wpa3Personal => "WPA3_PERSONAL",
2033                crate::model::WifiSecurityMode::Wpa2Wpa3Personal => "WPA2_WPA3_PERSONAL",
2034                crate::model::WifiSecurityMode::Wpa2Enterprise => "WPA2_ENTERPRISE",
2035                crate::model::WifiSecurityMode::Wpa3Enterprise => "WPA3_ENTERPRISE",
2036                crate::model::WifiSecurityMode::Wpa2Wpa3Enterprise => "WPA2_WPA3_ENTERPRISE",
2037            };
2038            let mut security_configuration = serde_json::Map::new();
2039            security_configuration.insert(
2040                "mode".into(),
2041                serde_json::Value::String(security_mode.into()),
2042            );
2043            if let Some(pass) = req.passphrase {
2044                security_configuration.insert("passphrase".into(), serde_json::Value::String(pass));
2045            }
2046            extra.insert(
2047                "securityConfiguration".into(),
2048                serde_json::Value::Object(security_configuration),
2049            );
2050            if let Some(network_id) = req.network_id {
2051                extra.insert(
2052                    "network".into(),
2053                    serde_json::json!({ "id": network_id.to_string() }),
2054                );
2055            }
2056            extra.insert("hideSsid".into(), serde_json::Value::Bool(req.hide_ssid));
2057            if req.band_steering {
2058                extra.insert("bandSteering".into(), serde_json::Value::Bool(true));
2059            }
2060            if req.fast_roaming {
2061                extra.insert("fastRoaming".into(), serde_json::Value::Bool(true));
2062            }
2063            if let Some(freqs) = req.frequencies_ghz {
2064                let values = freqs
2065                    .into_iter()
2066                    .map(|f| serde_json::Value::from(f64::from(f)))
2067                    .collect::<Vec<_>>();
2068                extra.insert("frequencies".into(), serde_json::Value::Array(values));
2069            }
2070            let body = crate::integration_types::WifiBroadcastCreateUpdate {
2071                name: req.name,
2072                broadcast_type: req.broadcast_type.unwrap_or_else(|| "STANDARD".into()),
2073                enabled: req.enabled,
2074                body: extra,
2075            };
2076            ic.create_wifi_broadcast(&sid, &body).await?;
2077            Ok(CommandResult::Ok)
2078        }
2079
2080        Command::UpdateWifiBroadcast { id, update } => {
2081            let (ic, sid) =
2082                require_integration(&integration_guard, site_id, "UpdateWifiBroadcast")?;
2083            let uuid = require_uuid(&id)?;
2084            let existing = ic.get_wifi_broadcast(&sid, &uuid).await?;
2085
2086            let mut body = serde_json::Map::new();
2087            for (k, v) in existing.extra {
2088                body.insert(k, v);
2089            }
2090            body.insert(
2091                "securityConfiguration".into(),
2092                existing.security_configuration.clone(),
2093            );
2094            if let Some(network) = existing.network.clone() {
2095                body.insert("network".into(), network);
2096            }
2097            if let Some(filter) = existing.broadcasting_device_filter.clone() {
2098                body.insert("broadcastingDeviceFilter".into(), filter);
2099            }
2100
2101            if let Some(ssid) = update.ssid.clone() {
2102                body.insert("ssid".into(), serde_json::Value::String(ssid));
2103            }
2104            if let Some(hidden) = update.hide_ssid {
2105                body.insert("hideSsid".into(), serde_json::Value::Bool(hidden));
2106            }
2107
2108            let mut security_cfg = existing
2109                .security_configuration
2110                .as_object()
2111                .cloned()
2112                .unwrap_or_default();
2113            if let Some(mode) = update.security_mode {
2114                let mode = match mode {
2115                    crate::model::WifiSecurityMode::Open => "OPEN",
2116                    crate::model::WifiSecurityMode::Wpa2Personal => "WPA2_PERSONAL",
2117                    crate::model::WifiSecurityMode::Wpa3Personal => "WPA3_PERSONAL",
2118                    crate::model::WifiSecurityMode::Wpa2Wpa3Personal => "WPA2_WPA3_PERSONAL",
2119                    crate::model::WifiSecurityMode::Wpa2Enterprise => "WPA2_ENTERPRISE",
2120                    crate::model::WifiSecurityMode::Wpa3Enterprise => "WPA3_ENTERPRISE",
2121                    crate::model::WifiSecurityMode::Wpa2Wpa3Enterprise => "WPA2_WPA3_ENTERPRISE",
2122                };
2123                security_cfg.insert("mode".into(), serde_json::Value::String(mode.into()));
2124            }
2125            if let Some(passphrase) = update.passphrase.clone() {
2126                security_cfg.insert("passphrase".into(), serde_json::Value::String(passphrase));
2127            }
2128            body.insert(
2129                "securityConfiguration".into(),
2130                serde_json::Value::Object(security_cfg),
2131            );
2132
2133            let payload = crate::integration_types::WifiBroadcastCreateUpdate {
2134                name: update.name.unwrap_or(existing.name),
2135                broadcast_type: existing.broadcast_type,
2136                enabled: update.enabled.unwrap_or(existing.enabled),
2137                body,
2138            };
2139            ic.update_wifi_broadcast(&sid, &uuid, &payload).await?;
2140            Ok(CommandResult::Ok)
2141        }
2142
2143        Command::DeleteWifiBroadcast { id, force: _ } => {
2144            let (ic, sid) =
2145                require_integration(&integration_guard, site_id, "DeleteWifiBroadcast")?;
2146            let uuid = require_uuid(&id)?;
2147            ic.delete_wifi_broadcast(&sid, &uuid).await?;
2148            Ok(CommandResult::Ok)
2149        }
2150
2151        // ── Firewall Policy CRUD ─────────────────────────────────
2152        Command::CreateFirewallPolicy(req) => {
2153            let (ic, sid) =
2154                require_integration(&integration_guard, site_id, "CreateFirewallPolicy")?;
2155            let action_str = match req.action {
2156                FirewallAction::Allow => "ALLOW",
2157                FirewallAction::Block => "DROP",
2158                FirewallAction::Reject => "REJECT",
2159            };
2160            let body = crate::integration_types::FirewallPolicyCreateUpdate {
2161                name: req.name,
2162                description: req.description,
2163                enabled: req.enabled,
2164                action: serde_json::json!({ "type": action_str }),
2165                source: serde_json::json!({ "zoneId": req.source_zone_id.to_string() }),
2166                destination: serde_json::json!({ "zoneId": req.destination_zone_id.to_string() }),
2167                ip_protocol_scope: serde_json::json!("ALL"),
2168                logging_enabled: req.logging_enabled,
2169                ipsec_filter: None,
2170                schedule: None,
2171                connection_state_filter: None,
2172            };
2173            ic.create_firewall_policy(&sid, &body).await?;
2174            Ok(CommandResult::Ok)
2175        }
2176
2177        Command::UpdateFirewallPolicy { id, update } => {
2178            let (ic, sid) =
2179                require_integration(&integration_guard, site_id, "UpdateFirewallPolicy")?;
2180            let uuid = require_uuid(&id)?;
2181            let existing = ic.get_firewall_policy(&sid, &uuid).await?;
2182
2183            let mut source = existing
2184                .extra
2185                .get("source")
2186                .cloned()
2187                .unwrap_or_else(|| serde_json::json!({}));
2188            if let Some(addr) = update.source_address.clone() {
2189                if let Some(obj) = source.as_object_mut() {
2190                    obj.insert("address".into(), serde_json::Value::String(addr));
2191                }
2192            }
2193
2194            let mut destination = existing
2195                .extra
2196                .get("destination")
2197                .cloned()
2198                .unwrap_or_else(|| serde_json::json!({}));
2199            if let Some(addr) = update.destination_address.clone() {
2200                if let Some(obj) = destination.as_object_mut() {
2201                    obj.insert("address".into(), serde_json::Value::String(addr));
2202                }
2203            }
2204            if let Some(port) = update.destination_port.clone() {
2205                if let Some(obj) = destination.as_object_mut() {
2206                    obj.insert("port".into(), serde_json::Value::String(port));
2207                }
2208            }
2209
2210            let action = if let Some(action) = update.action {
2211                let action_type = match action {
2212                    FirewallAction::Allow => "ALLOW",
2213                    FirewallAction::Block => "DROP",
2214                    FirewallAction::Reject => "REJECT",
2215                };
2216                serde_json::json!({ "type": action_type })
2217            } else {
2218                existing.action
2219            };
2220
2221            let ip_protocol_scope = if let Some(protocol) = update.protocol.clone() {
2222                serde_json::json!({ "protocol": protocol })
2223            } else {
2224                existing
2225                    .ip_protocol_scope
2226                    .unwrap_or_else(|| serde_json::json!("ALL"))
2227            };
2228
2229            let connection_state_filter = existing
2230                .extra
2231                .get("connectionStateFilter")
2232                .and_then(serde_json::Value::as_array)
2233                .map(|arr| {
2234                    arr.iter()
2235                        .filter_map(|v| v.as_str().map(ToOwned::to_owned))
2236                        .collect::<Vec<_>>()
2237                });
2238
2239            let payload = crate::integration_types::FirewallPolicyCreateUpdate {
2240                name: update.name.unwrap_or(existing.name),
2241                description: update.description.or(existing.description),
2242                enabled: update.enabled.unwrap_or(existing.enabled),
2243                action,
2244                source,
2245                destination,
2246                ip_protocol_scope,
2247                logging_enabled: existing.logging_enabled,
2248                ipsec_filter: existing
2249                    .extra
2250                    .get("ipsecFilter")
2251                    .and_then(serde_json::Value::as_str)
2252                    .map(ToOwned::to_owned),
2253                schedule: existing.extra.get("schedule").cloned(),
2254                connection_state_filter,
2255            };
2256
2257            ic.update_firewall_policy(&sid, &uuid, &payload).await?;
2258            Ok(CommandResult::Ok)
2259        }
2260
2261        Command::DeleteFirewallPolicy { id } => {
2262            let (ic, sid) =
2263                require_integration(&integration_guard, site_id, "DeleteFirewallPolicy")?;
2264            let uuid = require_uuid(&id)?;
2265            ic.delete_firewall_policy(&sid, &uuid).await?;
2266            Ok(CommandResult::Ok)
2267        }
2268
2269        Command::PatchFirewallPolicy { id, enabled } => {
2270            let (ic, sid) =
2271                require_integration(&integration_guard, site_id, "PatchFirewallPolicy")?;
2272            let uuid = require_uuid(&id)?;
2273            let body = crate::integration_types::FirewallPolicyPatch {
2274                enabled: Some(enabled),
2275                logging_enabled: None,
2276            };
2277            ic.patch_firewall_policy(&sid, &uuid, &body).await?;
2278            Ok(CommandResult::Ok)
2279        }
2280
2281        Command::ReorderFirewallPolicies {
2282            zone_pair: _,
2283            ordered_ids,
2284        } => {
2285            let (ic, sid) =
2286                require_integration(&integration_guard, site_id, "ReorderFirewallPolicies")?;
2287            let uuids: Result<Vec<uuid::Uuid>, _> = ordered_ids.iter().map(require_uuid).collect();
2288            let body = crate::integration_types::FirewallPolicyOrdering {
2289                before_system_defined: uuids?,
2290                after_system_defined: Vec::new(),
2291            };
2292            ic.set_firewall_policy_ordering(&sid, &body).await?;
2293            Ok(CommandResult::Ok)
2294        }
2295
2296        // ── Firewall Zone CRUD ───────────────────────────────────
2297        Command::CreateFirewallZone(req) => {
2298            let (ic, sid) = require_integration(&integration_guard, site_id, "CreateFirewallZone")?;
2299            let network_uuids: Result<Vec<uuid::Uuid>, _> =
2300                req.network_ids.iter().map(require_uuid).collect();
2301            let body = crate::integration_types::FirewallZoneCreateUpdate {
2302                name: req.name,
2303                network_ids: network_uuids?,
2304            };
2305            ic.create_firewall_zone(&sid, &body).await?;
2306            Ok(CommandResult::Ok)
2307        }
2308
2309        Command::UpdateFirewallZone { id, update } => {
2310            let (ic, sid) = require_integration(&integration_guard, site_id, "UpdateFirewallZone")?;
2311            let uuid = require_uuid(&id)?;
2312            let existing = ic.get_firewall_zone(&sid, &uuid).await?;
2313            let network_ids = if let Some(ids) = update.network_ids {
2314                let uuids: Result<Vec<uuid::Uuid>, _> = ids.iter().map(require_uuid).collect();
2315                uuids?
2316            } else {
2317                existing.network_ids
2318            };
2319            let body = crate::integration_types::FirewallZoneCreateUpdate {
2320                name: update.name.unwrap_or(existing.name),
2321                network_ids,
2322            };
2323            ic.update_firewall_zone(&sid, &uuid, &body).await?;
2324            Ok(CommandResult::Ok)
2325        }
2326
2327        Command::DeleteFirewallZone { id } => {
2328            let (ic, sid) = require_integration(&integration_guard, site_id, "DeleteFirewallZone")?;
2329            let uuid = require_uuid(&id)?;
2330            ic.delete_firewall_zone(&sid, &uuid).await?;
2331            Ok(CommandResult::Ok)
2332        }
2333
2334        // ── ACL Rule CRUD ────────────────────────────────────────
2335        Command::CreateAclRule(req) => {
2336            let (ic, sid) = require_integration(&integration_guard, site_id, "CreateAclRule")?;
2337            let action_str = match req.action {
2338                FirewallAction::Allow => "ALLOW",
2339                FirewallAction::Block => "BLOCK",
2340                FirewallAction::Reject => "REJECT",
2341            };
2342            let mut source_filter = serde_json::Map::new();
2343            source_filter.insert(
2344                "zoneId".into(),
2345                serde_json::Value::String(req.source_zone_id.to_string()),
2346            );
2347            if let Some(source_port) = req.source_port {
2348                source_filter.insert("port".into(), serde_json::Value::String(source_port));
2349            }
2350            if let Some(protocol) = req.protocol.clone() {
2351                source_filter.insert("protocol".into(), serde_json::Value::String(protocol));
2352            }
2353
2354            let mut destination_filter = serde_json::Map::new();
2355            destination_filter.insert(
2356                "zoneId".into(),
2357                serde_json::Value::String(req.destination_zone_id.to_string()),
2358            );
2359            if let Some(destination_port) = req.destination_port {
2360                destination_filter
2361                    .insert("port".into(), serde_json::Value::String(destination_port));
2362            }
2363            if let Some(protocol) = req.protocol {
2364                destination_filter.insert("protocol".into(), serde_json::Value::String(protocol));
2365            }
2366            let body = crate::integration_types::AclRuleCreateUpdate {
2367                name: req.name,
2368                rule_type: req.rule_type,
2369                action: action_str.into(),
2370                enabled: req.enabled,
2371                description: None,
2372                source_filter: Some(serde_json::Value::Object(source_filter)),
2373                destination_filter: Some(serde_json::Value::Object(destination_filter)),
2374                enforcing_device_filter: None,
2375            };
2376            ic.create_acl_rule(&sid, &body).await?;
2377            Ok(CommandResult::Ok)
2378        }
2379
2380        Command::UpdateAclRule { id, update } => {
2381            let (ic, sid) = require_integration(&integration_guard, site_id, "UpdateAclRule")?;
2382            let uuid = require_uuid(&id)?;
2383            let existing = ic.get_acl_rule(&sid, &uuid).await?;
2384            let action_str = match update.action {
2385                Some(FirewallAction::Allow) => "ALLOW".into(),
2386                Some(FirewallAction::Block) => "BLOCK".into(),
2387                Some(FirewallAction::Reject) => "REJECT".into(),
2388                None => existing.action,
2389            };
2390            let body = crate::integration_types::AclRuleCreateUpdate {
2391                name: update.name.unwrap_or(existing.name),
2392                rule_type: existing.rule_type,
2393                action: action_str,
2394                enabled: update.enabled.unwrap_or(existing.enabled),
2395                description: existing.description,
2396                source_filter: existing.source_filter,
2397                destination_filter: existing.destination_filter,
2398                enforcing_device_filter: existing.enforcing_device_filter,
2399            };
2400            ic.update_acl_rule(&sid, &uuid, &body).await?;
2401            Ok(CommandResult::Ok)
2402        }
2403
2404        Command::DeleteAclRule { id } => {
2405            let (ic, sid) = require_integration(&integration_guard, site_id, "DeleteAclRule")?;
2406            let uuid = require_uuid(&id)?;
2407            ic.delete_acl_rule(&sid, &uuid).await?;
2408            Ok(CommandResult::Ok)
2409        }
2410
2411        Command::ReorderAclRules { ordered_ids } => {
2412            let (ic, sid) = require_integration(&integration_guard, site_id, "ReorderAclRules")?;
2413            let uuids: Result<Vec<uuid::Uuid>, _> = ordered_ids.iter().map(require_uuid).collect();
2414            let body = crate::integration_types::AclRuleOrdering {
2415                ordered_acl_rule_ids: uuids?,
2416            };
2417            ic.set_acl_rule_ordering(&sid, &body).await?;
2418            Ok(CommandResult::Ok)
2419        }
2420
2421        // ── DNS Policy CRUD ──────────────────────────────────────
2422        Command::CreateDnsPolicy(req) => {
2423            let (ic, sid) = require_integration(&integration_guard, site_id, "CreateDnsPolicy")?;
2424            let policy_type_str = match req.policy_type {
2425                crate::model::DnsPolicyType::ARecord => "A",
2426                crate::model::DnsPolicyType::AaaaRecord => "AAAA",
2427                crate::model::DnsPolicyType::CnameRecord => "CNAME",
2428                crate::model::DnsPolicyType::MxRecord => "MX",
2429                crate::model::DnsPolicyType::TxtRecord => "TXT",
2430                crate::model::DnsPolicyType::SrvRecord => "SRV",
2431                crate::model::DnsPolicyType::ForwardDomain => "FORWARD_DOMAIN",
2432            };
2433            let mut fields = serde_json::Map::new();
2434            if let Some(domains) = req.domains {
2435                if let Some(first) = domains.first() {
2436                    fields.insert("domain".into(), serde_json::Value::String(first.clone()));
2437                }
2438                fields.insert(
2439                    "domains".into(),
2440                    serde_json::Value::Array(
2441                        domains.into_iter().map(serde_json::Value::String).collect(),
2442                    ),
2443                );
2444            }
2445            if let Some(upstream) = req.upstream {
2446                fields.insert("upstream".into(), serde_json::Value::String(upstream));
2447            }
2448            if let Some(value) = req.value {
2449                fields.insert("value".into(), serde_json::Value::String(value));
2450            }
2451            if let Some(ttl) = req.ttl_seconds {
2452                fields.insert(
2453                    "ttl".into(),
2454                    serde_json::Value::Number(serde_json::Number::from(ttl)),
2455                );
2456            }
2457            if let Some(priority) = req.priority {
2458                fields.insert(
2459                    "priority".into(),
2460                    serde_json::Value::Number(serde_json::Number::from(priority)),
2461                );
2462            }
2463            fields.insert("name".into(), serde_json::Value::String(req.name));
2464            let body = crate::integration_types::DnsPolicyCreateUpdate {
2465                policy_type: policy_type_str.into(),
2466                enabled: req.enabled,
2467                fields,
2468            };
2469            ic.create_dns_policy(&sid, &body).await?;
2470            Ok(CommandResult::Ok)
2471        }
2472
2473        Command::UpdateDnsPolicy { id, update } => {
2474            let (ic, sid) = require_integration(&integration_guard, site_id, "UpdateDnsPolicy")?;
2475            let uuid = require_uuid(&id)?;
2476            let existing = ic.get_dns_policy(&sid, &uuid).await?;
2477            let mut fields: serde_json::Map<String, serde_json::Value> =
2478                existing.extra.into_iter().collect();
2479
2480            if let Some(domains) = update.domains {
2481                if let Some(first) = domains.first() {
2482                    fields.insert("domain".into(), serde_json::Value::String(first.clone()));
2483                }
2484                fields.insert(
2485                    "domains".into(),
2486                    serde_json::Value::Array(
2487                        domains.into_iter().map(serde_json::Value::String).collect(),
2488                    ),
2489                );
2490            } else if let Some(domain) = existing.domain {
2491                fields
2492                    .entry("domain")
2493                    .or_insert_with(|| serde_json::Value::String(domain));
2494            }
2495
2496            if let Some(name) = update.name {
2497                fields.insert("name".into(), serde_json::Value::String(name));
2498            }
2499            if let Some(upstream) = update.upstream {
2500                fields.insert("upstream".into(), serde_json::Value::String(upstream));
2501            }
2502            if let Some(value) = update.value {
2503                fields.insert("value".into(), serde_json::Value::String(value));
2504            }
2505            if let Some(ttl) = update.ttl_seconds {
2506                fields.insert(
2507                    "ttl".into(),
2508                    serde_json::Value::Number(serde_json::Number::from(ttl)),
2509                );
2510            }
2511            if let Some(priority) = update.priority {
2512                fields.insert(
2513                    "priority".into(),
2514                    serde_json::Value::Number(serde_json::Number::from(priority)),
2515                );
2516            }
2517
2518            let body = crate::integration_types::DnsPolicyCreateUpdate {
2519                policy_type: existing.policy_type,
2520                enabled: update.enabled.unwrap_or(existing.enabled),
2521                fields,
2522            };
2523            ic.update_dns_policy(&sid, &uuid, &body).await?;
2524            Ok(CommandResult::Ok)
2525        }
2526
2527        Command::DeleteDnsPolicy { id } => {
2528            let (ic, sid) = require_integration(&integration_guard, site_id, "DeleteDnsPolicy")?;
2529            let uuid = require_uuid(&id)?;
2530            ic.delete_dns_policy(&sid, &uuid).await?;
2531            Ok(CommandResult::Ok)
2532        }
2533
2534        // ── Traffic Matching List CRUD ───────────────────────────
2535        Command::CreateTrafficMatchingList(req) => {
2536            let (ic, sid) =
2537                require_integration(&integration_guard, site_id, "CreateTrafficMatchingList")?;
2538            let mut fields = serde_json::Map::new();
2539            fields.insert(
2540                "entries".into(),
2541                serde_json::Value::Array(
2542                    req.entries
2543                        .into_iter()
2544                        .map(serde_json::Value::String)
2545                        .collect(),
2546                ),
2547            );
2548            if let Some(desc) = req.description {
2549                fields.insert("description".into(), serde_json::Value::String(desc));
2550            }
2551            let body = crate::integration_types::TrafficMatchingListCreateUpdate {
2552                name: req.name,
2553                list_type: req.list_type,
2554                fields,
2555            };
2556            ic.create_traffic_matching_list(&sid, &body).await?;
2557            Ok(CommandResult::Ok)
2558        }
2559
2560        Command::UpdateTrafficMatchingList { id, update } => {
2561            let (ic, sid) =
2562                require_integration(&integration_guard, site_id, "UpdateTrafficMatchingList")?;
2563            let uuid = require_uuid(&id)?;
2564            let existing = ic.get_traffic_matching_list(&sid, &uuid).await?;
2565            let mut fields = serde_json::Map::new();
2566            let entries = if let Some(new_entries) = update.entries {
2567                serde_json::Value::Array(
2568                    new_entries
2569                        .into_iter()
2570                        .map(serde_json::Value::String)
2571                        .collect(),
2572                )
2573            } else if let Some(existing_entries) = existing.extra.get("entries") {
2574                existing_entries.clone()
2575            } else {
2576                serde_json::Value::Array(Vec::new())
2577            };
2578            fields.insert("entries".into(), entries);
2579            if let Some(desc) = update.description {
2580                fields.insert("description".into(), serde_json::Value::String(desc));
2581            } else if let Some(existing_desc) = existing.extra.get("description") {
2582                fields.insert("description".into(), existing_desc.clone());
2583            }
2584            let body = crate::integration_types::TrafficMatchingListCreateUpdate {
2585                name: update.name.unwrap_or(existing.name),
2586                list_type: existing.list_type,
2587                fields,
2588            };
2589            ic.update_traffic_matching_list(&sid, &uuid, &body).await?;
2590            Ok(CommandResult::Ok)
2591        }
2592
2593        Command::DeleteTrafficMatchingList { id } => {
2594            let (ic, sid) =
2595                require_integration(&integration_guard, site_id, "DeleteTrafficMatchingList")?;
2596            let uuid = require_uuid(&id)?;
2597            ic.delete_traffic_matching_list(&sid, &uuid).await?;
2598            Ok(CommandResult::Ok)
2599        }
2600
2601        // ── Voucher management ───────────────────────────────────
2602        Command::CreateVouchers(req) => {
2603            let (ic, sid) = require_integration(&integration_guard, site_id, "CreateVouchers")?;
2604            #[allow(clippy::as_conversions, clippy::cast_possible_wrap)]
2605            let body = crate::integration_types::VoucherCreateRequest {
2606                name: req.name.unwrap_or_else(|| "Voucher".into()),
2607                count: Some(req.count as i32),
2608                time_limit_minutes: i64::from(req.time_limit_minutes.unwrap_or(60)),
2609                authorized_guest_limit: req.authorized_guest_limit.map(i64::from),
2610                data_usage_limit_m_bytes: req.data_usage_limit_mb.map(|m| m as i64),
2611                rx_rate_limit_kbps: req.rx_rate_limit_kbps.map(|r| r as i64),
2612                tx_rate_limit_kbps: req.tx_rate_limit_kbps.map(|r| r as i64),
2613            };
2614            let vouchers = ic.create_vouchers(&sid, &body).await?;
2615            let domain_vouchers: Vec<Voucher> = vouchers.into_iter().map(Voucher::from).collect();
2616            Ok(CommandResult::Vouchers(domain_vouchers))
2617        }
2618
2619        Command::DeleteVoucher { id } => {
2620            let (ic, sid) = require_integration(&integration_guard, site_id, "DeleteVoucher")?;
2621            let uuid = require_uuid(&id)?;
2622            ic.delete_voucher(&sid, &uuid).await?;
2623            Ok(CommandResult::Ok)
2624        }
2625
2626        Command::PurgeVouchers { filter } => {
2627            let (ic, sid) = require_integration(&integration_guard, site_id, "PurgeVouchers")?;
2628            ic.purge_vouchers(&sid, &filter).await?;
2629            Ok(CommandResult::Ok)
2630        }
2631
2632        // ── System administration ────────────────────────────────
2633        Command::CreateSite { name, description } => {
2634            let legacy = require_legacy(&legacy_guard)?;
2635            legacy.create_site(&name, &description).await?;
2636            Ok(CommandResult::Ok)
2637        }
2638        Command::DeleteSite { name } => {
2639            let legacy = require_legacy(&legacy_guard)?;
2640            legacy.delete_site(&name).await?;
2641            Ok(CommandResult::Ok)
2642        }
2643        Command::InviteAdmin { name, email, role } => {
2644            let legacy = require_legacy(&legacy_guard)?;
2645            legacy.invite_admin(&name, &email, &role).await?;
2646            Ok(CommandResult::Ok)
2647        }
2648        Command::RevokeAdmin { id } => {
2649            let legacy = require_legacy(&legacy_guard)?;
2650            legacy.revoke_admin(&id.to_string()).await?;
2651            Ok(CommandResult::Ok)
2652        }
2653        Command::UpdateAdmin { id, role } => {
2654            let legacy = require_legacy(&legacy_guard)?;
2655            legacy
2656                .update_admin(&id.to_string(), role.as_deref())
2657                .await?;
2658            Ok(CommandResult::Ok)
2659        }
2660
2661        Command::RebootController => {
2662            let legacy = require_legacy(&legacy_guard)?;
2663            legacy.reboot_controller().await?;
2664            Ok(CommandResult::Ok)
2665        }
2666        Command::PoweroffController => {
2667            let legacy = require_legacy(&legacy_guard)?;
2668            legacy.poweroff_controller().await?;
2669            Ok(CommandResult::Ok)
2670        }
2671    }
2672}
2673
2674// ── Helpers ──────────────────────────────────────────────────────
2675
2676fn parse_ipv6_text(raw: &str) -> Option<Ipv6Addr> {
2677    let candidate = raw.trim().split('/').next().unwrap_or(raw).trim();
2678    candidate.parse::<Ipv6Addr>().ok()
2679}
2680
2681fn pick_ipv6_from_value(value: &serde_json::Value) -> Option<String> {
2682    let mut first_link_local: Option<String> = None;
2683
2684    let iter: Box<dyn Iterator<Item = &serde_json::Value> + '_> = match value {
2685        serde_json::Value::Array(items) => Box::new(items.iter()),
2686        _ => Box::new(std::iter::once(value)),
2687    };
2688
2689    for item in iter {
2690        if let Some(ipv6) = item.as_str().and_then(parse_ipv6_text) {
2691            let ip_text = ipv6.to_string();
2692            if !ipv6.is_unicast_link_local() {
2693                return Some(ip_text);
2694            }
2695            if first_link_local.is_none() {
2696                first_link_local = Some(ip_text);
2697            }
2698        }
2699    }
2700
2701    first_link_local
2702}
2703
2704fn parse_legacy_device_wan_ipv6(
2705    extra: &serde_json::Map<String, serde_json::Value>,
2706) -> Option<String> {
2707    // Primary source on gateways: wan1.ipv6 = ["global", "link-local"].
2708    if let Some(v) = extra
2709        .get("wan1")
2710        .and_then(|wan| wan.get("ipv6"))
2711        .and_then(pick_ipv6_from_value)
2712    {
2713        return Some(v);
2714    }
2715
2716    // Fallback source on some firmware: top-level ipv6 array.
2717    extra.get("ipv6").and_then(pick_ipv6_from_value)
2718}
2719
2720/// Convert raw health JSON values into domain `HealthSummary` types.
2721fn convert_health_summaries(raw: Vec<serde_json::Value>) -> Vec<HealthSummary> {
2722    raw.into_iter()
2723        .map(|v| HealthSummary {
2724            subsystem: v
2725                .get("subsystem")
2726                .and_then(|v| v.as_str())
2727                .unwrap_or("unknown")
2728                .to_owned(),
2729            status: v
2730                .get("status")
2731                .and_then(|v| v.as_str())
2732                .unwrap_or("unknown")
2733                .to_owned(),
2734            #[allow(clippy::as_conversions, clippy::cast_possible_truncation)]
2735            num_adopted: v
2736                .get("num_adopted")
2737                .and_then(serde_json::Value::as_u64)
2738                .map(|n| n as u32),
2739            #[allow(clippy::as_conversions, clippy::cast_possible_truncation)]
2740            num_sta: v
2741                .get("num_sta")
2742                .and_then(serde_json::Value::as_u64)
2743                .map(|n| n as u32),
2744            tx_bytes_r: v.get("tx_bytes-r").and_then(serde_json::Value::as_u64),
2745            rx_bytes_r: v.get("rx_bytes-r").and_then(serde_json::Value::as_u64),
2746            latency: v.get("latency").and_then(serde_json::Value::as_f64),
2747            wan_ip: v.get("wan_ip").and_then(|v| v.as_str()).map(String::from),
2748            gateways: v.get("gateways").and_then(|v| v.as_array()).map(|a| {
2749                a.iter()
2750                    .filter_map(|g| g.as_str().map(String::from))
2751                    .collect()
2752            }),
2753            extra: v,
2754        })
2755        .collect()
2756}
2757
2758/// Build a [`TransportConfig`] from the controller configuration.
2759fn build_transport(config: &ControllerConfig) -> TransportConfig {
2760    TransportConfig {
2761        tls: tls_to_transport(&config.tls),
2762        timeout: config.timeout,
2763        cookie_jar: None, // LegacyClient::new adds one automatically
2764    }
2765}
2766
2767fn tls_to_transport(tls: &TlsVerification) -> TlsMode {
2768    match tls {
2769        TlsVerification::SystemDefaults => TlsMode::System,
2770        TlsVerification::CustomCa(path) => TlsMode::CustomCa(path.clone()),
2771        TlsVerification::DangerAcceptInvalid => TlsMode::DangerAcceptInvalid,
2772    }
2773}
2774
2775/// Downgrade a paginated result to an empty `Vec` when the endpoint returns 404.
2776///
2777/// Some Integration API endpoints (ACL rules, DNS policies, vouchers) are not
2778/// available on all controller firmware versions. Rather than failing the entire
2779/// refresh, we log a debug message and return an empty collection.
2780fn unwrap_or_empty<S, D>(endpoint: &str, result: Result<Vec<S>, crate::error::Error>) -> Vec<D>
2781where
2782    D: From<S>,
2783{
2784    match result {
2785        Ok(items) => items.into_iter().map(D::from).collect(),
2786        Err(ref e) if e.is_not_found() => {
2787            debug!("{endpoint}: not available (404), treating as empty");
2788            Vec::new()
2789        }
2790        Err(e) => {
2791            warn!("{endpoint}: unexpected error {e}, treating as empty");
2792            Vec::new()
2793        }
2794    }
2795}
2796
2797/// Resolve the Integration API site UUID from a site name or UUID string.
2798///
2799/// If `site_name` is already a valid UUID, returns it directly.
2800/// Otherwise lists all sites and finds the one matching by `internal_reference`.
2801async fn resolve_site_id(
2802    client: &IntegrationClient,
2803    site_name: &str,
2804) -> Result<uuid::Uuid, CoreError> {
2805    // Fast path: if the input is already a UUID, use it directly.
2806    if let Ok(uuid) = uuid::Uuid::parse_str(site_name) {
2807        return Ok(uuid);
2808    }
2809
2810    let sites = client
2811        .paginate_all(50, |off, lim| client.list_sites(off, lim))
2812        .await?;
2813
2814    sites
2815        .into_iter()
2816        .find(|s| s.internal_reference == site_name)
2817        .map(|s| s.id)
2818        .ok_or_else(|| CoreError::SiteNotFound {
2819            name: site_name.to_owned(),
2820        })
2821}
2822
2823/// Try to set up a Legacy client (best-effort for API key auth).
2824async fn setup_legacy_client(
2825    config: &ControllerConfig,
2826    transport: &TransportConfig,
2827) -> Result<LegacyClient, CoreError> {
2828    let platform = LegacyClient::detect_platform(&config.url).await?;
2829    let client = LegacyClient::new(config.url.clone(), config.site.clone(), platform, transport)?;
2830    Ok(client)
2831}
2832
2833fn parse_ipv4_cidr(cidr: &str) -> Result<(Ipv4Addr, u8), CoreError> {
2834    let (host, prefix) = cidr
2835        .split_once('/')
2836        .ok_or_else(|| CoreError::ValidationFailed {
2837            message: format!("invalid ipv4 host/prefix value '{cidr}'"),
2838        })?;
2839    let host_ip = host
2840        .parse::<Ipv4Addr>()
2841        .map_err(|_| CoreError::ValidationFailed {
2842            message: format!("invalid IPv4 host address '{host}'"),
2843        })?;
2844    let prefix_len = prefix
2845        .parse::<u8>()
2846        .map_err(|_| CoreError::ValidationFailed {
2847            message: format!("invalid IPv4 prefix length '{prefix}'"),
2848        })?;
2849    if prefix_len > 32 {
2850        return Err(CoreError::ValidationFailed {
2851            message: format!("IPv4 prefix length must be <= 32, got {prefix_len}"),
2852        });
2853    }
2854    Ok((host_ip, prefix_len))
2855}
2856
2857/// Extract a `Uuid` from an `EntityId`, or return an error.
2858fn require_uuid(id: &EntityId) -> Result<uuid::Uuid, CoreError> {
2859    id.as_uuid().copied().ok_or_else(|| CoreError::Unsupported {
2860        operation: "Integration API operation on legacy ID".into(),
2861        required: "UUID-based entity ID".into(),
2862    })
2863}
2864
2865fn require_legacy<'a>(
2866    guard: &'a tokio::sync::MutexGuard<'_, Option<LegacyClient>>,
2867) -> Result<&'a LegacyClient, CoreError> {
2868    guard.as_ref().ok_or(CoreError::ControllerDisconnected)
2869}
2870
2871fn require_integration<'a>(
2872    guard: &'a tokio::sync::MutexGuard<'_, Option<IntegrationClient>>,
2873    site_id: Option<uuid::Uuid>,
2874    operation: &str,
2875) -> Result<(&'a IntegrationClient, uuid::Uuid), CoreError> {
2876    let client = guard.as_ref().ok_or_else(|| unsupported(operation))?;
2877    let sid = site_id.ok_or_else(|| unsupported(operation))?;
2878    Ok((client, sid))
2879}
2880
2881fn unsupported(operation: &str) -> CoreError {
2882    CoreError::Unsupported {
2883        operation: operation.into(),
2884        required: "Integration API".into(),
2885    }
2886}
2887
2888/// Resolve an [`EntityId`] to a device MAC via the DataStore.
2889fn device_mac(store: &DataStore, id: &EntityId) -> Result<MacAddress, CoreError> {
2890    store
2891        .device_by_id(id)
2892        .map(|d| d.mac.clone())
2893        .ok_or_else(|| CoreError::DeviceNotFound {
2894            identifier: id.to_string(),
2895        })
2896}
2897
2898/// Resolve an [`EntityId`] to a client MAC via the DataStore.
2899fn client_mac(store: &DataStore, id: &EntityId) -> Result<MacAddress, CoreError> {
2900    store
2901        .client_by_id(id)
2902        .map(|c| c.mac.clone())
2903        .ok_or_else(|| CoreError::ClientNotFound {
2904            identifier: id.to_string(),
2905        })
2906}
2907
2908#[cfg(test)]
2909mod tests {
2910    use super::parse_ipv4_cidr;
2911
2912    #[test]
2913    fn parse_ipv4_cidr_accepts_valid_input() {
2914        let (host, prefix) = parse_ipv4_cidr("192.168.10.1/24").expect("valid CIDR");
2915        assert_eq!(host.to_string(), "192.168.10.1");
2916        assert_eq!(prefix, 24);
2917    }
2918
2919    #[test]
2920    fn parse_ipv4_cidr_rejects_invalid_prefix() {
2921        assert!(parse_ipv4_cidr("192.168.10.1/40").is_err());
2922    }
2923
2924    #[test]
2925    fn parse_ipv4_cidr_rejects_missing_prefix() {
2926        assert!(parse_ipv4_cidr("192.168.10.1").is_err());
2927    }
2928}