Skip to main content

unifly_api/
controller.rs

1// ── Controller abstraction ──
2//
3// Full lifecycle management for a UniFi controller connection.
4// Handles authentication, background refresh, command routing,
5// and reactive data streaming through the DataStore.
6
7use std::collections::HashMap;
8use std::net::{Ipv4Addr, Ipv6Addr};
9use std::sync::Arc;
10use std::time::Duration;
11
12use tokio::sync::{Mutex, broadcast, mpsc, watch};
13use tokio::task::JoinHandle;
14use tokio_util::sync::CancellationToken;
15use tracing::{debug, info, warn};
16
17use crate::command::{Command, CommandEnvelope, CommandResult};
18use crate::config::{AuthCredentials, ControllerConfig, TlsVerification};
19use crate::core_error::CoreError;
20use crate::model::{
21    AclRule, Admin, Alarm, Client, Country, Device, DnsPolicy, DpiApplication, DpiCategory,
22    EntityId, Event, FirewallAction, FirewallPolicy, FirewallZone, HealthSummary, MacAddress,
23    Network, NetworkManagement, NetworkPurpose, RadiusProfile, Site, SysInfo, SystemInfo,
24    TrafficMatchingList, Voucher, VpnServer, VpnTunnel, WanInterface, WifiBroadcast,
25};
26use crate::store::DataStore;
27use crate::stream::EntityStream;
28
29use crate::transport::{TlsMode, TransportConfig};
30use crate::websocket::{ReconnectConfig, WebSocketHandle};
31use crate::{IntegrationClient, LegacyClient};
32
33const COMMAND_CHANNEL_SIZE: usize = 64;
34const EVENT_CHANNEL_SIZE: usize = 256;
35
36// ── ConnectionState ──────────────────────────────────────────────
37
38/// Connection state observable by consumers.
39#[derive(Debug, Clone, PartialEq, Eq)]
40pub enum ConnectionState {
41    Disconnected,
42    Connecting,
43    Connected,
44    Reconnecting { attempt: u32 },
45    Failed,
46}
47
48// ── Controller ───────────────────────────────────────────────────
49
50/// The main entry point for consumers.
51///
52/// Cheaply cloneable via `Arc<ControllerInner>`. Manages the full
53/// connection lifecycle: authentication, background data refresh,
54/// command routing, and reactive entity streaming.
55#[derive(Clone)]
56pub struct Controller {
57    inner: Arc<ControllerInner>,
58}
59
60struct ControllerInner {
61    config: ControllerConfig,
62    store: Arc<DataStore>,
63    connection_state: watch::Sender<ConnectionState>,
64    event_tx: broadcast::Sender<Arc<Event>>,
65    command_tx: Mutex<mpsc::Sender<CommandEnvelope>>,
66    command_rx: Mutex<Option<mpsc::Receiver<CommandEnvelope>>>,
67    cancel: CancellationToken,
68    /// Child token for the current connection — cancelled on disconnect,
69    /// replaced on reconnect (avoids permanent cancellation).
70    cancel_child: Mutex<CancellationToken>,
71    legacy_client: Mutex<Option<LegacyClient>>,
72    integration_client: Mutex<Option<IntegrationClient>>,
73    /// Resolved Integration API site UUID (populated on connect).
74    site_id: Mutex<Option<uuid::Uuid>>,
75    /// WebSocket event stream handle (populated on connect if enabled).
76    ws_handle: Mutex<Option<WebSocketHandle>>,
77    task_handles: Mutex<Vec<JoinHandle<()>>>,
78    /// Warnings accumulated during connect (e.g. Legacy auth failure in Hybrid mode).
79    warnings: Mutex<Vec<String>>,
80}
81
82impl Controller {
83    /// Create a new Controller from configuration. Does NOT connect --
84    /// call [`connect()`](Self::connect) to authenticate and start background tasks.
85    pub fn new(config: ControllerConfig) -> Self {
86        let store = Arc::new(DataStore::new());
87        let (connection_state, _) = watch::channel(ConnectionState::Disconnected);
88        let (event_tx, _) = broadcast::channel(EVENT_CHANNEL_SIZE);
89        let (command_tx, command_rx) = mpsc::channel(COMMAND_CHANNEL_SIZE);
90        let cancel = CancellationToken::new();
91        let cancel_child = cancel.child_token();
92
93        Self {
94            inner: Arc::new(ControllerInner {
95                config,
96                store,
97                connection_state,
98                event_tx,
99                command_tx: Mutex::new(command_tx),
100                command_rx: Mutex::new(Some(command_rx)),
101                cancel,
102                cancel_child: Mutex::new(cancel_child),
103                legacy_client: Mutex::new(None),
104                integration_client: Mutex::new(None),
105                warnings: Mutex::new(Vec::new()),
106                site_id: Mutex::new(None),
107                ws_handle: Mutex::new(None),
108                task_handles: Mutex::new(Vec::new()),
109            }),
110        }
111    }
112
113    /// Access the controller configuration.
114    pub fn config(&self) -> &ControllerConfig {
115        &self.inner.config
116    }
117
118    /// Access the underlying DataStore.
119    pub fn store(&self) -> &Arc<DataStore> {
120        &self.inner.store
121    }
122
123    // ── Connection lifecycle ─────────────────────────────────────
124
125    /// Connect to the controller.
126    ///
127    /// Detects the platform, authenticates, performs an initial data
128    /// refresh, and spawns background tasks (periodic refresh, command
129    /// processor).
130    #[allow(clippy::cognitive_complexity, clippy::too_many_lines)]
131    pub async fn connect(&self) -> Result<(), CoreError> {
132        let _ = self
133            .inner
134            .connection_state
135            .send(ConnectionState::Connecting);
136
137        // Fresh child token for this connection (supports reconnect).
138        let child = self.inner.cancel.child_token();
139        *self.inner.cancel_child.lock().await = child.clone();
140
141        let config = &self.inner.config;
142        let transport = build_transport(config);
143
144        match &config.auth {
145            AuthCredentials::ApiKey(api_key) => {
146                // Detect platform so we use the right URL prefix
147                let platform = LegacyClient::detect_platform(&config.url).await?;
148                debug!(?platform, "detected controller platform");
149
150                // Integration API client (preferred)
151                let integration = IntegrationClient::from_api_key(
152                    config.url.as_str(),
153                    api_key,
154                    &transport,
155                    platform,
156                )?;
157
158                // Resolve site UUID from Integration API
159                let site_id = resolve_site_id(&integration, &config.site).await?;
160                debug!(site_id = %site_id, "resolved Integration API site UUID");
161
162                *self.inner.integration_client.lock().await = Some(integration);
163                *self.inner.site_id.lock().await = Some(site_id);
164            }
165            AuthCredentials::Credentials { username, password } => {
166                // Legacy-only auth
167                let platform = LegacyClient::detect_platform(&config.url).await?;
168                debug!(?platform, "detected controller platform");
169
170                let client = LegacyClient::new(
171                    config.url.clone(),
172                    config.site.clone(),
173                    platform,
174                    &transport,
175                )?;
176                client.login(username, password).await?;
177                debug!("session authentication successful");
178
179                *self.inner.legacy_client.lock().await = Some(client);
180            }
181            AuthCredentials::Hybrid {
182                api_key,
183                username,
184                password,
185            } => {
186                // Hybrid: both Integration API (API key) and Legacy API (session auth)
187                let platform = LegacyClient::detect_platform(&config.url).await?;
188                debug!(?platform, "detected controller platform (hybrid)");
189
190                // Integration API client
191                let integration = IntegrationClient::from_api_key(
192                    config.url.as_str(),
193                    api_key,
194                    &transport,
195                    platform,
196                )?;
197
198                let site_id = resolve_site_id(&integration, &config.site).await?;
199                debug!(site_id = %site_id, "resolved Integration API site UUID");
200
201                *self.inner.integration_client.lock().await = Some(integration);
202                *self.inner.site_id.lock().await = Some(site_id);
203
204                // Legacy API client — attempt login but degrade gracefully
205                // if it fails. The Integration API is the primary surface;
206                // Legacy adds events, stats, and admin ops.
207                match LegacyClient::new(
208                    config.url.clone(),
209                    config.site.clone(),
210                    platform,
211                    &transport,
212                ) {
213                    Ok(client) => match client.login(username, password).await {
214                        Ok(()) => {
215                            debug!("legacy session authentication successful (hybrid)");
216                            *self.inner.legacy_client.lock().await = Some(client);
217                        }
218                        Err(e) => {
219                            let msg = format!(
220                                "Legacy login failed: {e} — events, health stats, and client traffic will be unavailable"
221                            );
222                            warn!("{msg}");
223                            self.inner.warnings.lock().await.push(msg);
224                        }
225                    },
226                    Err(e) => {
227                        let msg = format!("Legacy client setup failed: {e}");
228                        warn!("{msg}");
229                        self.inner.warnings.lock().await.push(msg);
230                    }
231                }
232            }
233            AuthCredentials::Cloud { api_key, host_id } => {
234                let integration = IntegrationClient::from_api_key(
235                    config.url.as_str(),
236                    api_key,
237                    &transport,
238                    crate::ControllerPlatform::Cloud,
239                )?;
240
241                let site_id = if let Ok(uuid) = uuid::Uuid::parse_str(&config.site) {
242                    uuid
243                } else if let Ok(uuid) = uuid::Uuid::parse_str(host_id) {
244                    uuid
245                } else {
246                    resolve_site_id(&integration, &config.site).await?
247                };
248                debug!(site_id = %site_id, "resolved cloud Integration API site UUID");
249
250                *self.inner.integration_client.lock().await = Some(integration);
251                *self.inner.site_id.lock().await = Some(site_id);
252
253                let msg =
254                    "Cloud auth mode active: Legacy API and WebSocket features are unavailable"
255                        .to_string();
256                self.inner.warnings.lock().await.push(msg);
257            }
258        }
259
260        // Initial data load
261        self.full_refresh().await?;
262
263        // Spawn background tasks
264        let mut handles = self.inner.task_handles.lock().await;
265
266        if let Some(rx) = self.inner.command_rx.lock().await.take() {
267            let ctrl = self.clone();
268            handles.push(tokio::spawn(command_processor_task(ctrl, rx)));
269        }
270
271        let interval_secs = config.refresh_interval_secs;
272        if interval_secs > 0 {
273            let ctrl = self.clone();
274            let cancel = child.clone();
275            handles.push(tokio::spawn(refresh_task(ctrl, interval_secs, cancel)));
276        }
277
278        // WebSocket event stream
279        if config.websocket_enabled {
280            self.spawn_websocket(&child, &mut handles).await;
281        }
282
283        let _ = self.inner.connection_state.send(ConnectionState::Connected);
284        info!("connected to controller");
285        Ok(())
286    }
287
288    /// Spawn the WebSocket event stream and a bridge task that converts
289    /// raw [`UnifiEvent`]s into domain [`Event`]s and broadcasts them.
290    ///
291    /// Non-fatal on failure — the TUI falls back to polling.
292    async fn spawn_websocket(&self, cancel: &CancellationToken, handles: &mut Vec<JoinHandle<()>>) {
293        let legacy_guard = self.inner.legacy_client.lock().await;
294        let Some(ref legacy) = *legacy_guard else {
295            debug!("no legacy client — WebSocket unavailable");
296            return;
297        };
298
299        let platform = legacy.platform();
300        let Some(ws_path_template) = platform.websocket_path() else {
301            debug!("platform does not support WebSocket");
302            return;
303        };
304
305        let ws_path = ws_path_template.replace("{site}", &self.inner.config.site);
306        let base_url = &self.inner.config.url;
307        let scheme = if base_url.scheme() == "https" {
308            "wss"
309        } else {
310            "ws"
311        };
312        let host = base_url.host_str().unwrap_or("localhost");
313        let ws_url_str = match base_url.port() {
314            Some(p) => format!("{scheme}://{host}:{p}{ws_path}"),
315            None => format!("{scheme}://{host}{ws_path}"),
316        };
317        let ws_url = match url::Url::parse(&ws_url_str) {
318            Ok(u) => u,
319            Err(e) => {
320                warn!(error = %e, url = %ws_url_str, "invalid WebSocket URL");
321                return;
322            }
323        };
324
325        let cookie = legacy.cookie_header();
326        drop(legacy_guard);
327
328        if cookie.is_none() {
329            warn!("no session cookie — WebSocket requires legacy auth (skipping)");
330            return;
331        }
332
333        let ws_tls = tls_to_transport(&self.inner.config.tls);
334        let ws_cancel = cancel.child_token();
335        let handle = match WebSocketHandle::connect(
336            ws_url,
337            ReconnectConfig::default(),
338            ws_cancel.clone(),
339            cookie,
340            ws_tls,
341        ) {
342            Ok(h) => h,
343            Err(e) => {
344                warn!(error = %e, "WebSocket connection failed (non-fatal)");
345                return;
346            }
347        };
348
349        // Bridge task: WS events → domain Events → broadcast channel.
350        // Also extracts real-time device stats from `device:sync` messages
351        // to feed the dashboard chart without waiting for full_refresh().
352        let mut ws_rx = handle.subscribe();
353        let event_tx = self.inner.event_tx.clone();
354        let store = Arc::clone(&self.inner.store);
355        let bridge_cancel = ws_cancel;
356
357        handles.push(tokio::spawn(async move {
358            loop {
359                tokio::select! {
360                    biased;
361                    () = bridge_cancel.cancelled() => break,
362                    result = ws_rx.recv() => {
363                        match result {
364                            Ok(ws_event) => {
365                                // Extract real-time stats from device:sync messages
366                                if ws_event.key == "device:sync" || ws_event.key == "device:update" {
367                                    apply_device_sync(&store, &ws_event.extra);
368                                }
369
370                                let event = crate::model::event::Event::from(
371                                    (*ws_event).clone(),
372                                );
373                                let _ = event_tx.send(Arc::new(event));
374                            }
375                            Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
376                                warn!(skipped = n, "WS bridge: receiver lagged");
377                            }
378                            Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
379                        }
380                    }
381                }
382            }
383        }));
384
385        *self.inner.ws_handle.lock().await = Some(handle);
386        info!("WebSocket event stream spawned (handshake in progress)");
387    }
388
389    /// Disconnect from the controller.
390    ///
391    /// Cancels background tasks, logs out if session-based, and resets
392    /// the connection state to [`Disconnected`](ConnectionState::Disconnected).
393    pub async fn disconnect(&self) {
394        // Cancel the child token (not the parent — allows reconnect).
395        self.inner.cancel_child.lock().await.cancel();
396
397        // Join all background tasks
398        let mut handles = self.inner.task_handles.lock().await;
399        for handle in handles.drain(..) {
400            let _ = handle.await;
401        }
402
403        // Logout if session-based (Credentials or Hybrid both have active sessions)
404        if matches!(
405            self.inner.config.auth,
406            AuthCredentials::Credentials { .. } | AuthCredentials::Hybrid { .. }
407        ) && let Some(ref client) = *self.inner.legacy_client.lock().await
408            && let Err(e) = client.logout().await
409        {
410            warn!(error = %e, "logout failed (non-fatal)");
411        }
412
413        // Shut down WebSocket if active
414        if let Some(handle) = self.inner.ws_handle.lock().await.take() {
415            handle.shutdown();
416        }
417
418        *self.inner.legacy_client.lock().await = None;
419        *self.inner.integration_client.lock().await = None;
420        *self.inner.site_id.lock().await = None;
421
422        // Recreate command channel so reconnects can spawn a fresh receiver.
423        // The previous receiver is consumed by the command processor task.
424        {
425            let (tx, rx) = mpsc::channel(COMMAND_CHANNEL_SIZE);
426            *self.inner.command_tx.lock().await = tx;
427            *self.inner.command_rx.lock().await = Some(rx);
428        }
429
430        let _ = self
431            .inner
432            .connection_state
433            .send(ConnectionState::Disconnected);
434        debug!("disconnected");
435    }
436
437    /// Fetch all data from the controller and update the DataStore.
438    ///
439    /// Pulls devices, clients, and events from the Legacy API, converts
440    /// them to domain types, and applies them to the store. Events are
441    /// broadcast through the event channel (not stored).
442    #[allow(clippy::cognitive_complexity, clippy::too_many_lines)]
443    pub async fn full_refresh(&self) -> Result<(), CoreError> {
444        let integration_guard = self.inner.integration_client.lock().await;
445        let site_id = *self.inner.site_id.lock().await;
446
447        if let (Some(integration), Some(sid)) = (integration_guard.as_ref(), site_id) {
448            // ── Integration API path (preferred) ─────────────────
449            let page_limit = 200;
450
451            let (devices_res, clients_res, networks_res, wifi_res) = tokio::join!(
452                integration.paginate_all(page_limit, |off, lim| {
453                    integration.list_devices(&sid, off, lim)
454                }),
455                integration.paginate_all(page_limit, |off, lim| {
456                    integration.list_clients(&sid, off, lim)
457                }),
458                integration.paginate_all(page_limit, |off, lim| {
459                    integration.list_networks(&sid, off, lim)
460                }),
461                integration.paginate_all(page_limit, |off, lim| {
462                    integration.list_wifi_broadcasts(&sid, off, lim)
463                }),
464            );
465
466            let (policies_res, zones_res, acls_res, dns_res, vouchers_res) = tokio::join!(
467                integration.paginate_all(page_limit, |off, lim| {
468                    integration.list_firewall_policies(&sid, off, lim)
469                }),
470                integration.paginate_all(page_limit, |off, lim| {
471                    integration.list_firewall_zones(&sid, off, lim)
472                }),
473                integration.paginate_all(page_limit, |off, lim| {
474                    integration.list_acl_rules(&sid, off, lim)
475                }),
476                integration.paginate_all(page_limit, |off, lim| {
477                    integration.list_dns_policies(&sid, off, lim)
478                }),
479                integration.paginate_all(page_limit, |off, lim| {
480                    integration.list_vouchers(&sid, off, lim)
481                }),
482            );
483
484            let (sites_res, tml_res) = tokio::join!(
485                integration.paginate_all(50, |off, lim| { integration.list_sites(off, lim) }),
486                integration.paginate_all(page_limit, |off, lim| {
487                    integration.list_traffic_matching_lists(&sid, off, lim)
488                }),
489            );
490
491            // Core endpoints — failure is fatal
492            let devices: Vec<Device> = devices_res?.into_iter().map(Device::from).collect();
493            let mut clients: Vec<Client> = clients_res?.into_iter().map(Client::from).collect();
494            // Fetch full details for each network (list endpoint omits ipv4/ipv6 config)
495            let network_ids: Vec<uuid::Uuid> = networks_res?.into_iter().map(|n| n.id).collect();
496            info!(
497                network_count = network_ids.len(),
498                "fetching network details"
499            );
500            let networks: Vec<Network> = {
501                let futs = network_ids.into_iter().map(|nid| async move {
502                    match integration.get_network(&sid, &nid).await {
503                        Ok(detail) => Some(Network::from(detail)),
504                        Err(e) => {
505                            warn!(network_id = %nid, error = %e, "network detail fetch failed");
506                            None
507                        }
508                    }
509                });
510                futures_util::future::join_all(futs)
511                    .await
512                    .into_iter()
513                    .flatten()
514                    .collect()
515            };
516            let wifi: Vec<WifiBroadcast> = wifi_res?.into_iter().map(WifiBroadcast::from).collect();
517            let policies: Vec<FirewallPolicy> = policies_res?
518                .into_iter()
519                .map(FirewallPolicy::from)
520                .collect();
521            let zones: Vec<FirewallZone> = zones_res?.into_iter().map(FirewallZone::from).collect();
522            let sites: Vec<Site> = sites_res?.into_iter().map(Site::from).collect();
523            let traffic_matching_lists: Vec<TrafficMatchingList> = tml_res?
524                .into_iter()
525                .map(TrafficMatchingList::from)
526                .collect();
527
528            // Optional endpoints — 404 means the controller doesn't support them
529            let acls: Vec<AclRule> = unwrap_or_empty("acl/rules", acls_res);
530            let dns: Vec<DnsPolicy> = unwrap_or_empty("dns/policies", dns_res);
531            let vouchers: Vec<Voucher> = unwrap_or_empty("vouchers", vouchers_res);
532
533            // Enrich devices with per-device statistics (parallel, non-fatal)
534            info!(
535                device_count = devices.len(),
536                "enriching devices with statistics"
537            );
538            let mut devices = {
539                let futs = devices.into_iter().map(|mut device| async {
540                    if let EntityId::Uuid(device_uuid) = &device.id {
541                        match integration.get_device_statistics(&sid, device_uuid).await {
542                            Ok(stats_resp) => {
543                                device.stats =
544                                    crate::convert::device_stats_from_integration(&stats_resp);
545                            }
546                            Err(e) => {
547                                warn!(
548                                    device = ?device.name,
549                                    error = %e,
550                                    "device stats fetch failed"
551                                );
552                            }
553                        }
554                    }
555                    device
556                });
557                futures_util::future::join_all(futs).await
558            };
559
560            drop(integration_guard);
561
562            // Supplement with Legacy API data (events, health, client traffic, device stats, DHCP reservations)
563            #[allow(clippy::type_complexity)]
564            let (legacy_events, legacy_health, legacy_clients, legacy_devices, legacy_users): (
565                Vec<Event>,
566                Vec<HealthSummary>,
567                Vec<crate::legacy::models::LegacyClientEntry>,
568                Vec<crate::legacy::models::LegacyDevice>,
569                Vec<crate::legacy::models::LegacyUserEntry>,
570            ) = match *self.inner.legacy_client.lock().await {
571                Some(ref legacy) => {
572                    let (events_res, health_res, clients_res, devices_res, users_res) =
573                        tokio::join!(
574                            legacy.list_events(Some(100)),
575                            legacy.get_health(),
576                            legacy.list_clients(),
577                            legacy.list_devices(),
578                            legacy.list_users(),
579                        );
580
581                    let events = match events_res {
582                        Ok(raw) => {
583                            let evts: Vec<Event> = raw.into_iter().map(Event::from).collect();
584                            for evt in &evts {
585                                let _ = self.inner.event_tx.send(Arc::new(evt.clone()));
586                            }
587                            evts
588                        }
589                        Err(e) => {
590                            warn!(error = %e, "legacy event fetch failed (non-fatal)");
591                            Vec::new()
592                        }
593                    };
594
595                    let health = match health_res {
596                        Ok(raw) => convert_health_summaries(raw),
597                        Err(e) => {
598                            warn!(error = %e, "legacy health fetch failed (non-fatal)");
599                            Vec::new()
600                        }
601                    };
602
603                    let lc = match clients_res {
604                        Ok(raw) => raw,
605                        Err(e) => {
606                            warn!(
607                                error = %e,
608                                "legacy client fetch failed (non-fatal)"
609                            );
610                            Vec::new()
611                        }
612                    };
613
614                    let ld = match devices_res {
615                        Ok(raw) => raw,
616                        Err(e) => {
617                            warn!(error = %e, "legacy device fetch failed (non-fatal)");
618                            Vec::new()
619                        }
620                    };
621
622                    let lu = match users_res {
623                        Ok(raw) => raw,
624                        Err(e) => {
625                            warn!(error = %e, "legacy user fetch failed (non-fatal)");
626                            Vec::new()
627                        }
628                    };
629
630                    (events, health, lc, ld, lu)
631                }
632                None => (Vec::new(), Vec::new(), Vec::new(), Vec::new(), Vec::new()),
633            };
634
635            // Merge Legacy client traffic (tx/rx bytes, hostname) into Integration clients.
636            // Match by IP address — Integration API clients often lack real MAC addresses
637            // in the access object, falling back to UUIDs which don't match Legacy MACs.
638            if !legacy_clients.is_empty() {
639                let legacy_by_ip: HashMap<&str, &crate::legacy::models::LegacyClientEntry> =
640                    legacy_clients
641                        .iter()
642                        .filter_map(|lc| lc.ip.as_deref().map(|ip| (ip, lc)))
643                        .collect();
644                let mut merged = 0u32;
645                for client in &mut clients {
646                    let ip_key = client.ip.map(|ip| ip.to_string());
647                    if let Some(lc) = ip_key.as_deref().and_then(|ip| legacy_by_ip.get(ip)) {
648                        if client.tx_bytes.is_none() {
649                            client.tx_bytes = lc.tx_bytes.and_then(|b| u64::try_from(b).ok());
650                        }
651                        if client.rx_bytes.is_none() {
652                            client.rx_bytes = lc.rx_bytes.and_then(|b| u64::try_from(b).ok());
653                        }
654                        if client.hostname.is_none() {
655                            client.hostname.clone_from(&lc.hostname);
656                        }
657                        // Merge wireless info (Legacy has AP MAC, signal, channel)
658                        if client.wireless.is_none() {
659                            let legacy_client: Client = Client::from((*lc).clone());
660                            client.wireless = legacy_client.wireless;
661                            if client.uplink_device_mac.is_none() {
662                                client.uplink_device_mac = legacy_client.uplink_device_mac;
663                            }
664                        }
665                        merged += 1;
666                    }
667                }
668                debug!(
669                    total_clients = clients.len(),
670                    legacy_available = legacy_by_ip.len(),
671                    merged,
672                    "client traffic merge (by IP)"
673                );
674            }
675
676            // Merge Legacy user DHCP reservations (fixed IP) into clients by MAC.
677            if !legacy_users.is_empty() {
678                let users_by_mac: HashMap<String, &crate::legacy::models::LegacyUserEntry> =
679                    legacy_users
680                        .iter()
681                        .map(|u| (u.mac.to_lowercase(), u))
682                        .collect();
683                for client in &mut clients {
684                    if let Some(user) = users_by_mac.get(&client.mac.as_str().to_lowercase()) {
685                        client.use_fixedip = user.use_fixedip.unwrap_or(false);
686                        client.fixed_ip = user.fixed_ip.as_deref().and_then(|s| s.parse().ok());
687                    }
688                }
689            }
690
691            // Merge Legacy device num_sta (client counts) into Integration devices
692            if !legacy_devices.is_empty() {
693                let legacy_by_mac: HashMap<&str, &crate::legacy::models::LegacyDevice> =
694                    legacy_devices.iter().map(|d| (d.mac.as_str(), d)).collect();
695                for device in &mut devices {
696                    if let Some(ld) = legacy_by_mac.get(device.mac.as_str()) {
697                        if device.client_count.is_none() {
698                            device.client_count = ld.num_sta.and_then(|n| n.try_into().ok());
699                        }
700                        if device.wan_ipv6.is_none() {
701                            device.wan_ipv6 = parse_legacy_device_wan_ipv6(&ld.extra);
702                        }
703                    }
704                }
705            }
706
707            // Push health to DataStore
708            if !legacy_health.is_empty() {
709                self.inner
710                    .store
711                    .site_health
712                    .send_modify(|h| *h = Arc::new(legacy_health));
713            }
714
715            self.inner
716                .store
717                .apply_integration_snapshot(crate::store::RefreshSnapshot {
718                    devices,
719                    clients,
720                    networks,
721                    wifi,
722                    policies,
723                    zones,
724                    acls,
725                    dns,
726                    vouchers,
727                    sites,
728                    events: legacy_events,
729                    traffic_matching_lists,
730                });
731        } else {
732            // ── Legacy-only path ─────────────────────────────────
733            drop(integration_guard);
734
735            let legacy_guard = self.inner.legacy_client.lock().await;
736            let legacy = legacy_guard
737                .as_ref()
738                .ok_or(CoreError::ControllerDisconnected)?;
739
740            let (devices_res, clients_res, events_res, sites_res) = tokio::join!(
741                legacy.list_devices(),
742                legacy.list_clients(),
743                legacy.list_events(Some(100)),
744                legacy.list_sites(),
745            );
746
747            let devices: Vec<Device> = devices_res?.into_iter().map(Device::from).collect();
748            let clients: Vec<Client> = clients_res?.into_iter().map(Client::from).collect();
749            let events: Vec<Event> = events_res?.into_iter().map(Event::from).collect();
750            let sites: Vec<Site> = sites_res?.into_iter().map(Site::from).collect();
751
752            drop(legacy_guard);
753
754            for event in &events {
755                let _ = self.inner.event_tx.send(Arc::new(event.clone()));
756            }
757
758            self.inner
759                .store
760                .apply_integration_snapshot(crate::store::RefreshSnapshot {
761                    devices,
762                    clients,
763                    networks: Vec::new(),
764                    wifi: Vec::new(),
765                    policies: Vec::new(),
766                    zones: Vec::new(),
767                    acls: Vec::new(),
768                    dns: Vec::new(),
769                    vouchers: Vec::new(),
770                    sites,
771                    events,
772                    traffic_matching_lists: Vec::new(),
773                });
774        }
775
776        debug!(
777            devices = self.inner.store.device_count(),
778            clients = self.inner.store.client_count(),
779            "data refresh complete"
780        );
781
782        Ok(())
783    }
784
785    // ── Command execution ────────────────────────────────────────
786
787    /// Execute a command against the controller.
788    ///
789    /// Sends the command through the internal channel to the command
790    /// processor task and awaits the result.
791    pub async fn execute(&self, cmd: Command) -> Result<CommandResult, CoreError> {
792        let (tx, rx) = tokio::sync::oneshot::channel();
793
794        let command_tx = self.inner.command_tx.lock().await.clone();
795
796        command_tx
797            .send(CommandEnvelope {
798                command: cmd,
799                response_tx: tx,
800            })
801            .await
802            .map_err(|_| CoreError::ControllerDisconnected)?;
803
804        rx.await.map_err(|_| CoreError::ControllerDisconnected)?
805    }
806
807    // ── One-shot convenience ─────────────────────────────────────
808
809    /// One-shot: connect, run closure, disconnect.
810    ///
811    /// Optimized for CLI: disables WebSocket and periodic refresh since
812    /// we only need a single request-response cycle.
813    pub async fn oneshot<F, Fut, T>(config: ControllerConfig, f: F) -> Result<T, CoreError>
814    where
815        F: FnOnce(Controller) -> Fut,
816        Fut: std::future::Future<Output = Result<T, CoreError>>,
817    {
818        let mut cfg = config;
819        cfg.websocket_enabled = false;
820        cfg.refresh_interval_secs = 0;
821
822        let controller = Controller::new(cfg);
823        controller.connect().await?;
824        let result = f(controller.clone()).await;
825        controller.disconnect().await;
826        result
827    }
828
829    // ── State observation ────────────────────────────────────────
830
831    /// Subscribe to connection state changes.
832    pub fn connection_state(&self) -> watch::Receiver<ConnectionState> {
833        self.inner.connection_state.subscribe()
834    }
835
836    /// Subscribe to the event broadcast stream.
837    pub fn events(&self) -> broadcast::Receiver<Arc<Event>> {
838        self.inner.event_tx.subscribe()
839    }
840
841    // ── Snapshot accessors (delegate to DataStore) ───────────────
842
843    pub fn devices_snapshot(&self) -> Arc<Vec<Arc<Device>>> {
844        self.inner.store.devices_snapshot()
845    }
846
847    pub fn clients_snapshot(&self) -> Arc<Vec<Arc<Client>>> {
848        self.inner.store.clients_snapshot()
849    }
850
851    pub fn networks_snapshot(&self) -> Arc<Vec<Arc<Network>>> {
852        self.inner.store.networks_snapshot()
853    }
854
855    pub fn wifi_broadcasts_snapshot(&self) -> Arc<Vec<Arc<WifiBroadcast>>> {
856        self.inner.store.wifi_broadcasts_snapshot()
857    }
858
859    pub fn firewall_policies_snapshot(&self) -> Arc<Vec<Arc<FirewallPolicy>>> {
860        self.inner.store.firewall_policies_snapshot()
861    }
862
863    pub fn firewall_zones_snapshot(&self) -> Arc<Vec<Arc<FirewallZone>>> {
864        self.inner.store.firewall_zones_snapshot()
865    }
866
867    pub fn acl_rules_snapshot(&self) -> Arc<Vec<Arc<AclRule>>> {
868        self.inner.store.acl_rules_snapshot()
869    }
870
871    pub fn dns_policies_snapshot(&self) -> Arc<Vec<Arc<DnsPolicy>>> {
872        self.inner.store.dns_policies_snapshot()
873    }
874
875    pub fn vouchers_snapshot(&self) -> Arc<Vec<Arc<Voucher>>> {
876        self.inner.store.vouchers_snapshot()
877    }
878
879    pub fn sites_snapshot(&self) -> Arc<Vec<Arc<Site>>> {
880        self.inner.store.sites_snapshot()
881    }
882
883    pub fn events_snapshot(&self) -> Arc<Vec<Arc<Event>>> {
884        self.inner.store.events_snapshot()
885    }
886
887    pub fn traffic_matching_lists_snapshot(&self) -> Arc<Vec<Arc<TrafficMatchingList>>> {
888        self.inner.store.traffic_matching_lists_snapshot()
889    }
890
891    // ── Stream accessors (delegate to DataStore) ─────────────────
892
893    pub fn devices(&self) -> EntityStream<Device> {
894        self.inner.store.subscribe_devices()
895    }
896
897    pub fn clients(&self) -> EntityStream<Client> {
898        self.inner.store.subscribe_clients()
899    }
900
901    pub fn networks(&self) -> EntityStream<Network> {
902        self.inner.store.subscribe_networks()
903    }
904
905    pub fn wifi_broadcasts(&self) -> EntityStream<WifiBroadcast> {
906        self.inner.store.subscribe_wifi_broadcasts()
907    }
908
909    pub fn firewall_policies(&self) -> EntityStream<FirewallPolicy> {
910        self.inner.store.subscribe_firewall_policies()
911    }
912
913    pub fn firewall_zones(&self) -> EntityStream<FirewallZone> {
914        self.inner.store.subscribe_firewall_zones()
915    }
916
917    pub fn acl_rules(&self) -> EntityStream<AclRule> {
918        self.inner.store.subscribe_acl_rules()
919    }
920
921    pub fn dns_policies(&self) -> EntityStream<DnsPolicy> {
922        self.inner.store.subscribe_dns_policies()
923    }
924
925    pub fn vouchers(&self) -> EntityStream<Voucher> {
926        self.inner.store.subscribe_vouchers()
927    }
928
929    pub fn sites(&self) -> EntityStream<Site> {
930        self.inner.store.subscribe_sites()
931    }
932
933    pub fn traffic_matching_lists(&self) -> EntityStream<TrafficMatchingList> {
934        self.inner.store.subscribe_traffic_matching_lists()
935    }
936
937    /// Subscribe to site health updates (WAN IP, latency, bandwidth rates).
938    pub fn site_health(&self) -> watch::Receiver<Arc<Vec<HealthSummary>>> {
939        self.inner.store.subscribe_site_health()
940    }
941
942    /// Drain warnings accumulated during connect (e.g. Legacy auth failure).
943    pub async fn take_warnings(&self) -> Vec<String> {
944        std::mem::take(&mut *self.inner.warnings.lock().await)
945    }
946
947    /// Whether a logged-in Legacy client is available for legacy-only features.
948    pub async fn has_legacy_access(&self) -> bool {
949        self.inner.legacy_client.lock().await.is_some()
950    }
951
952    /// Whether the Integration API is available for integration-backed features.
953    pub async fn has_integration_access(&self) -> bool {
954        self.inner.integration_client.lock().await.is_some()
955            && self.inner.site_id.lock().await.is_some()
956    }
957
958    // ── Ad-hoc Integration API queries ───────────────────────────
959    //
960    // These bypass the DataStore and query the Integration API directly.
961    // Intended for reference data that doesn't need reactive subscriptions.
962
963    /// Fetch VPN servers from the Integration API.
964    pub async fn list_vpn_servers(&self) -> Result<Vec<VpnServer>, CoreError> {
965        let guard = self.inner.integration_client.lock().await;
966        let site_id = *self.inner.site_id.lock().await;
967        let (ic, sid) = require_integration(&guard, site_id, "list_vpn_servers")?;
968        let raw = ic
969            .paginate_all(200, |off, lim| ic.list_vpn_servers(&sid, off, lim))
970            .await?;
971        Ok(raw
972            .into_iter()
973            .map(|s| {
974                let id = s
975                    .fields
976                    .get("id")
977                    .and_then(|v| v.as_str())
978                    .and_then(|s| uuid::Uuid::parse_str(s).ok())
979                    .map_or_else(|| EntityId::Legacy("unknown".into()), EntityId::Uuid);
980                VpnServer {
981                    id,
982                    name: s
983                        .fields
984                        .get("name")
985                        .and_then(|v| v.as_str())
986                        .map(String::from),
987                    server_type: s
988                        .fields
989                        .get("type")
990                        .or_else(|| s.fields.get("serverType"))
991                        .and_then(|v| v.as_str())
992                        .unwrap_or("UNKNOWN")
993                        .to_owned(),
994                    enabled: s.fields.get("enabled").and_then(serde_json::Value::as_bool),
995                }
996            })
997            .collect())
998    }
999
1000    /// Fetch VPN tunnels from the Integration API.
1001    pub async fn list_vpn_tunnels(&self) -> Result<Vec<VpnTunnel>, CoreError> {
1002        let guard = self.inner.integration_client.lock().await;
1003        let site_id = *self.inner.site_id.lock().await;
1004        let (ic, sid) = require_integration(&guard, site_id, "list_vpn_tunnels")?;
1005        let raw = ic
1006            .paginate_all(200, |off, lim| ic.list_vpn_tunnels(&sid, off, lim))
1007            .await?;
1008        Ok(raw
1009            .into_iter()
1010            .map(|t| {
1011                let id = t
1012                    .fields
1013                    .get("id")
1014                    .and_then(|v| v.as_str())
1015                    .and_then(|s| uuid::Uuid::parse_str(s).ok())
1016                    .map_or_else(|| EntityId::Legacy("unknown".into()), EntityId::Uuid);
1017                VpnTunnel {
1018                    id,
1019                    name: t
1020                        .fields
1021                        .get("name")
1022                        .and_then(|v| v.as_str())
1023                        .map(String::from),
1024                    tunnel_type: t
1025                        .fields
1026                        .get("type")
1027                        .or_else(|| t.fields.get("tunnelType"))
1028                        .and_then(|v| v.as_str())
1029                        .unwrap_or("UNKNOWN")
1030                        .to_owned(),
1031                    enabled: t.fields.get("enabled").and_then(serde_json::Value::as_bool),
1032                }
1033            })
1034            .collect())
1035    }
1036
1037    /// Fetch WAN interfaces from the Integration API.
1038    pub async fn list_wans(&self) -> Result<Vec<WanInterface>, CoreError> {
1039        let guard = self.inner.integration_client.lock().await;
1040        let site_id = *self.inner.site_id.lock().await;
1041        let (ic, sid) = require_integration(&guard, site_id, "list_wans")?;
1042        let raw = ic
1043            .paginate_all(200, |off, lim| ic.list_wans(&sid, off, lim))
1044            .await?;
1045        Ok(raw
1046            .into_iter()
1047            .map(|w| {
1048                let id = w
1049                    .fields
1050                    .get("id")
1051                    .and_then(|v| v.as_str())
1052                    .and_then(|s| uuid::Uuid::parse_str(s).ok())
1053                    .map_or_else(|| EntityId::Legacy("unknown".into()), EntityId::Uuid);
1054                let parse_ip = |key: &str| -> Option<std::net::IpAddr> {
1055                    w.fields
1056                        .get(key)
1057                        .and_then(|v| v.as_str())
1058                        .and_then(|s| s.parse().ok())
1059                };
1060                let dns = w
1061                    .fields
1062                    .get("dns")
1063                    .and_then(|v| v.as_array())
1064                    .map(|arr| {
1065                        arr.iter()
1066                            .filter_map(|v| v.as_str().and_then(|s| s.parse().ok()))
1067                            .collect()
1068                    })
1069                    .unwrap_or_default();
1070                WanInterface {
1071                    id,
1072                    name: w
1073                        .fields
1074                        .get("name")
1075                        .and_then(|v| v.as_str())
1076                        .map(String::from),
1077                    ip: parse_ip("ipAddress").or_else(|| parse_ip("ip")),
1078                    gateway: parse_ip("gateway"),
1079                    dns,
1080                }
1081            })
1082            .collect())
1083    }
1084
1085    /// Fetch DPI categories from the Integration API.
1086    pub async fn list_dpi_categories(&self) -> Result<Vec<DpiCategory>, CoreError> {
1087        let guard = self.inner.integration_client.lock().await;
1088        let site_id = *self.inner.site_id.lock().await;
1089        let (ic, sid) = require_integration(&guard, site_id, "list_dpi_categories")?;
1090        let raw = ic
1091            .paginate_all(200, |off, lim| ic.list_dpi_categories(&sid, off, lim))
1092            .await?;
1093        Ok(raw
1094            .into_iter()
1095            .map(|c| {
1096                #[allow(clippy::as_conversions, clippy::cast_possible_truncation)]
1097                let id = c
1098                    .fields
1099                    .get("id")
1100                    .and_then(serde_json::Value::as_u64)
1101                    .unwrap_or(0) as u32;
1102                DpiCategory {
1103                    id,
1104                    name: c
1105                        .fields
1106                        .get("name")
1107                        .and_then(|v| v.as_str())
1108                        .unwrap_or("Unknown")
1109                        .to_owned(),
1110                    tx_bytes: c
1111                        .fields
1112                        .get("txBytes")
1113                        .and_then(serde_json::Value::as_u64)
1114                        .unwrap_or(0),
1115                    rx_bytes: c
1116                        .fields
1117                        .get("rxBytes")
1118                        .and_then(serde_json::Value::as_u64)
1119                        .unwrap_or(0),
1120                    apps: Vec::new(),
1121                }
1122            })
1123            .collect())
1124    }
1125
1126    /// Fetch DPI applications from the Integration API.
1127    pub async fn list_dpi_applications(&self) -> Result<Vec<DpiApplication>, CoreError> {
1128        let guard = self.inner.integration_client.lock().await;
1129        let site_id = *self.inner.site_id.lock().await;
1130        let (ic, sid) = require_integration(&guard, site_id, "list_dpi_applications")?;
1131        let raw = ic
1132            .paginate_all(200, |off, lim| ic.list_dpi_applications(&sid, off, lim))
1133            .await?;
1134        Ok(raw
1135            .into_iter()
1136            .map(|a| {
1137                #[allow(clippy::as_conversions, clippy::cast_possible_truncation)]
1138                let id = a
1139                    .fields
1140                    .get("id")
1141                    .and_then(serde_json::Value::as_u64)
1142                    .unwrap_or(0) as u32;
1143                DpiApplication {
1144                    id,
1145                    name: a
1146                        .fields
1147                        .get("name")
1148                        .and_then(|v| v.as_str())
1149                        .unwrap_or("Unknown")
1150                        .to_owned(),
1151                    #[allow(clippy::as_conversions, clippy::cast_possible_truncation)]
1152                    category_id: a
1153                        .fields
1154                        .get("categoryId")
1155                        .and_then(serde_json::Value::as_u64)
1156                        .unwrap_or(0) as u32,
1157                    tx_bytes: a
1158                        .fields
1159                        .get("txBytes")
1160                        .and_then(serde_json::Value::as_u64)
1161                        .unwrap_or(0),
1162                    rx_bytes: a
1163                        .fields
1164                        .get("rxBytes")
1165                        .and_then(serde_json::Value::as_u64)
1166                        .unwrap_or(0),
1167                }
1168            })
1169            .collect())
1170    }
1171
1172    /// Fetch RADIUS profiles from the Integration API.
1173    pub async fn list_radius_profiles(&self) -> Result<Vec<RadiusProfile>, CoreError> {
1174        let guard = self.inner.integration_client.lock().await;
1175        let site_id = *self.inner.site_id.lock().await;
1176        let (ic, sid) = require_integration(&guard, site_id, "list_radius_profiles")?;
1177        let raw = ic
1178            .paginate_all(200, |off, lim| ic.list_radius_profiles(&sid, off, lim))
1179            .await?;
1180        Ok(raw
1181            .into_iter()
1182            .map(|r| {
1183                let id = r
1184                    .fields
1185                    .get("id")
1186                    .and_then(|v| v.as_str())
1187                    .and_then(|s| uuid::Uuid::parse_str(s).ok())
1188                    .map_or_else(|| EntityId::Legacy("unknown".into()), EntityId::Uuid);
1189                RadiusProfile {
1190                    id,
1191                    name: r
1192                        .fields
1193                        .get("name")
1194                        .and_then(|v| v.as_str())
1195                        .unwrap_or("Unknown")
1196                        .to_owned(),
1197                }
1198            })
1199            .collect())
1200    }
1201
1202    /// Fetch countries from the Integration API.
1203    pub async fn list_countries(&self) -> Result<Vec<Country>, CoreError> {
1204        let guard = self.inner.integration_client.lock().await;
1205        let ic = guard
1206            .as_ref()
1207            .ok_or_else(|| unsupported("list_countries"))?;
1208        let raw = ic
1209            .paginate_all(200, |off, lim| ic.list_countries(off, lim))
1210            .await?;
1211        Ok(raw
1212            .into_iter()
1213            .map(|c| Country {
1214                code: c
1215                    .fields
1216                    .get("code")
1217                    .and_then(|v| v.as_str())
1218                    .unwrap_or("")
1219                    .to_owned(),
1220                name: c
1221                    .fields
1222                    .get("name")
1223                    .and_then(|v| v.as_str())
1224                    .unwrap_or("Unknown")
1225                    .to_owned(),
1226            })
1227            .collect())
1228    }
1229
1230    /// Fetch references for a specific network (Integration API).
1231    pub async fn get_network_references(
1232        &self,
1233        network_id: &EntityId,
1234    ) -> Result<serde_json::Value, CoreError> {
1235        let guard = self.inner.integration_client.lock().await;
1236        let site_id = *self.inner.site_id.lock().await;
1237        let (ic, sid) = require_integration(&guard, site_id, "get_network_references")?;
1238        let uuid = require_uuid(network_id)?;
1239        let refs = ic.get_network_references(&sid, &uuid).await?;
1240        Ok(serde_json::to_value(refs).unwrap_or_default())
1241    }
1242
1243    /// Fetch firewall policy ordering (Integration API).
1244    pub async fn get_firewall_policy_ordering(
1245        &self,
1246    ) -> Result<crate::integration_types::FirewallPolicyOrdering, CoreError> {
1247        let guard = self.inner.integration_client.lock().await;
1248        let site_id = *self.inner.site_id.lock().await;
1249        let (ic, sid) = require_integration(&guard, site_id, "get_firewall_policy_ordering")?;
1250        Ok(ic.get_firewall_policy_ordering(&sid).await?)
1251    }
1252
1253    /// List pending devices.
1254    ///
1255    /// Prefers Integration API pending endpoint, falls back to filtering
1256    /// the canonical device snapshot by pending adoption state.
1257    pub async fn list_pending_devices(&self) -> Result<Vec<serde_json::Value>, CoreError> {
1258        let integration_guard = self.inner.integration_client.lock().await;
1259        let site_id = *self.inner.site_id.lock().await;
1260
1261        if let (Some(ic), Some(sid)) = (integration_guard.as_ref(), site_id) {
1262            let raw = ic
1263                .paginate_all(200, |off, lim| ic.list_pending_devices(&sid, off, lim))
1264                .await?;
1265            return Ok(raw
1266                .into_iter()
1267                .map(|v| serde_json::to_value(v).unwrap_or_default())
1268                .collect());
1269        }
1270
1271        let snapshot = self.devices_snapshot();
1272        Ok(snapshot
1273            .iter()
1274            .filter(|d| d.state == crate::model::DeviceState::PendingAdoption)
1275            .map(|d| serde_json::to_value(d.as_ref()).unwrap_or_default())
1276            .collect())
1277    }
1278
1279    /// List device tags.
1280    ///
1281    /// Uses Integration API when available.
1282    pub async fn list_device_tags(&self) -> Result<Vec<serde_json::Value>, CoreError> {
1283        let integration_guard = self.inner.integration_client.lock().await;
1284        let site_id = *self.inner.site_id.lock().await;
1285        if let (Some(ic), Some(sid)) = (integration_guard.as_ref(), site_id) {
1286            let raw = ic
1287                .paginate_all(200, |off, lim| ic.list_device_tags(&sid, off, lim))
1288                .await?;
1289            return Ok(raw
1290                .into_iter()
1291                .map(|v| serde_json::to_value(v).unwrap_or_default())
1292                .collect());
1293        }
1294
1295        Ok(Vec::new())
1296    }
1297
1298    /// List controller backups (legacy API).
1299    pub async fn list_backups(&self) -> Result<Vec<serde_json::Value>, CoreError> {
1300        let guard = self.inner.legacy_client.lock().await;
1301        let legacy = require_legacy(&guard)?;
1302        Ok(legacy.list_backups().await?)
1303    }
1304
1305    /// Download a controller backup file (legacy API).
1306    pub async fn download_backup(&self, filename: &str) -> Result<Vec<u8>, CoreError> {
1307        let guard = self.inner.legacy_client.lock().await;
1308        let legacy = require_legacy(&guard)?;
1309        Ok(legacy.download_backup(filename).await?)
1310    }
1311
1312    // ── Statistics (Legacy API) ────────────────────────────────────
1313
1314    /// Fetch site-level historical statistics.
1315    pub async fn get_site_stats(
1316        &self,
1317        interval: &str,
1318        start: Option<i64>,
1319        end: Option<i64>,
1320        attrs: Option<&[String]>,
1321    ) -> Result<Vec<serde_json::Value>, CoreError> {
1322        let guard = self.inner.legacy_client.lock().await;
1323        let legacy = require_legacy(&guard)?;
1324        Ok(legacy.get_site_stats(interval, start, end, attrs).await?)
1325    }
1326
1327    /// Fetch per-device historical statistics.
1328    pub async fn get_device_stats(
1329        &self,
1330        interval: &str,
1331        macs: Option<&[String]>,
1332        attrs: Option<&[String]>,
1333    ) -> Result<Vec<serde_json::Value>, CoreError> {
1334        let guard = self.inner.legacy_client.lock().await;
1335        let legacy = require_legacy(&guard)?;
1336        Ok(legacy.get_device_stats(interval, macs, attrs).await?)
1337    }
1338
1339    /// Fetch per-client historical statistics.
1340    pub async fn get_client_stats(
1341        &self,
1342        interval: &str,
1343        macs: Option<&[String]>,
1344        attrs: Option<&[String]>,
1345    ) -> Result<Vec<serde_json::Value>, CoreError> {
1346        let guard = self.inner.legacy_client.lock().await;
1347        let legacy = require_legacy(&guard)?;
1348        Ok(legacy.get_client_stats(interval, macs, attrs).await?)
1349    }
1350
1351    /// Fetch gateway historical statistics.
1352    pub async fn get_gateway_stats(
1353        &self,
1354        interval: &str,
1355        start: Option<i64>,
1356        end: Option<i64>,
1357        attrs: Option<&[String]>,
1358    ) -> Result<Vec<serde_json::Value>, CoreError> {
1359        let guard = self.inner.legacy_client.lock().await;
1360        let legacy = require_legacy(&guard)?;
1361        Ok(legacy
1362            .get_gateway_stats(interval, start, end, attrs)
1363            .await?)
1364    }
1365
1366    /// Fetch DPI statistics.
1367    pub async fn get_dpi_stats(
1368        &self,
1369        group_by: &str,
1370        macs: Option<&[String]>,
1371    ) -> Result<Vec<serde_json::Value>, CoreError> {
1372        let guard = self.inner.legacy_client.lock().await;
1373        let legacy = require_legacy(&guard)?;
1374        Ok(legacy.get_dpi_stats(group_by, macs).await?)
1375    }
1376
1377    // ── Ad-hoc Legacy API queries ──────────────────────────────────
1378    //
1379    // Legacy-only data that doesn't live in the DataStore.
1380
1381    /// Fetch admin list from the Legacy API.
1382    pub async fn list_admins(&self) -> Result<Vec<Admin>, CoreError> {
1383        let guard = self.inner.legacy_client.lock().await;
1384        let legacy = require_legacy(&guard)?;
1385        let raw = legacy.list_admins().await?;
1386        Ok(raw
1387            .into_iter()
1388            .map(|v| Admin {
1389                id: v.get("_id").and_then(|v| v.as_str()).map_or_else(
1390                    || EntityId::Legacy("unknown".into()),
1391                    |s| EntityId::Legacy(s.into()),
1392                ),
1393                name: v
1394                    .get("name")
1395                    .and_then(|v| v.as_str())
1396                    .unwrap_or("")
1397                    .to_owned(),
1398                email: v.get("email").and_then(|v| v.as_str()).map(String::from),
1399                role: v
1400                    .get("role")
1401                    .and_then(|v| v.as_str())
1402                    .unwrap_or("unknown")
1403                    .to_owned(),
1404                is_super: v
1405                    .get("is_super")
1406                    .and_then(serde_json::Value::as_bool)
1407                    .unwrap_or(false),
1408                last_login: None,
1409            })
1410            .collect())
1411    }
1412
1413    /// Fetch alarms from the Legacy API.
1414    pub async fn list_alarms(&self) -> Result<Vec<Alarm>, CoreError> {
1415        let guard = self.inner.legacy_client.lock().await;
1416        let legacy = require_legacy(&guard)?;
1417        let raw = legacy.list_alarms().await?;
1418        Ok(raw.into_iter().map(Alarm::from).collect())
1419    }
1420
1421    /// Fetch controller system info.
1422    ///
1423    /// Prefers the Integration API (`GET /v1/info`) when available,
1424    /// falls back to Legacy `stat/sysinfo`.
1425    pub async fn get_system_info(&self) -> Result<SystemInfo, CoreError> {
1426        // Try Integration API first (works with API key auth).
1427        {
1428            let guard = self.inner.integration_client.lock().await;
1429            if let Some(ic) = guard.as_ref() {
1430                let info = ic.get_info().await?;
1431                let f = &info.fields;
1432                return Ok(SystemInfo {
1433                    controller_name: f
1434                        .get("applicationName")
1435                        .or_else(|| f.get("name"))
1436                        .and_then(|v| v.as_str())
1437                        .map(String::from),
1438                    version: f
1439                        .get("applicationVersion")
1440                        .or_else(|| f.get("version"))
1441                        .and_then(|v| v.as_str())
1442                        .unwrap_or("unknown")
1443                        .to_owned(),
1444                    build: f.get("build").and_then(|v| v.as_str()).map(String::from),
1445                    hostname: f.get("hostname").and_then(|v| v.as_str()).map(String::from),
1446                    ip: None, // Not available via Integration API
1447                    uptime_secs: f.get("uptime").and_then(serde_json::Value::as_u64),
1448                    update_available: f
1449                        .get("isUpdateAvailable")
1450                        .or_else(|| f.get("update_available"))
1451                        .and_then(serde_json::Value::as_bool),
1452                });
1453            }
1454        }
1455
1456        // Fallback to Legacy API (requires session auth).
1457        let guard = self.inner.legacy_client.lock().await;
1458        let legacy = require_legacy(&guard)?;
1459        let raw = legacy.get_sysinfo().await?;
1460        Ok(SystemInfo {
1461            controller_name: raw
1462                .get("controller_name")
1463                .or_else(|| raw.get("name"))
1464                .and_then(|v| v.as_str())
1465                .map(String::from),
1466            version: raw
1467                .get("version")
1468                .and_then(|v| v.as_str())
1469                .unwrap_or("unknown")
1470                .to_owned(),
1471            build: raw.get("build").and_then(|v| v.as_str()).map(String::from),
1472            hostname: raw
1473                .get("hostname")
1474                .and_then(|v| v.as_str())
1475                .map(String::from),
1476            ip: raw
1477                .get("ip_addrs")
1478                .and_then(|v| v.as_array())
1479                .and_then(|a| a.first())
1480                .and_then(|v| v.as_str())
1481                .and_then(|s| s.parse().ok()),
1482            uptime_secs: raw.get("uptime").and_then(serde_json::Value::as_u64),
1483            update_available: raw
1484                .get("update_available")
1485                .and_then(serde_json::Value::as_bool),
1486        })
1487    }
1488
1489    /// Fetch site health dashboard from the Legacy API.
1490    pub async fn get_site_health(&self) -> Result<Vec<HealthSummary>, CoreError> {
1491        let guard = self.inner.legacy_client.lock().await;
1492        let legacy = require_legacy(&guard)?;
1493        let raw = legacy.get_health().await?;
1494        Ok(convert_health_summaries(raw))
1495    }
1496
1497    /// Fetch low-level sysinfo from the Legacy API.
1498    pub async fn get_sysinfo(&self) -> Result<SysInfo, CoreError> {
1499        let guard = self.inner.legacy_client.lock().await;
1500        let legacy = require_legacy(&guard)?;
1501        let raw = legacy.get_sysinfo().await?;
1502        Ok(SysInfo {
1503            timezone: raw
1504                .get("timezone")
1505                .and_then(|v| v.as_str())
1506                .map(String::from),
1507            autobackup: raw.get("autobackup").and_then(serde_json::Value::as_bool),
1508            hostname: raw
1509                .get("hostname")
1510                .and_then(|v| v.as_str())
1511                .map(String::from),
1512            ip_addrs: raw
1513                .get("ip_addrs")
1514                .and_then(|v| v.as_array())
1515                .map(|a| {
1516                    a.iter()
1517                        .filter_map(|v| v.as_str().map(String::from))
1518                        .collect()
1519                })
1520                .unwrap_or_default(),
1521            live_chat: raw
1522                .get("live_chat")
1523                .and_then(|v| v.as_str())
1524                .map(String::from),
1525            #[allow(clippy::as_conversions, clippy::cast_possible_truncation)]
1526            data_retention_days: raw
1527                .get("data_retention_days")
1528                .and_then(serde_json::Value::as_u64)
1529                .map(|n| n as u32),
1530            extra: raw,
1531        })
1532    }
1533}
1534
1535// ── Background tasks ─────────────────────────────────────────────
1536
1537/// Parse a numeric field from a JSON object, tolerating both string and number encodings.
1538fn parse_f64_field(parent: Option<&serde_json::Value>, key: &str) -> Option<f64> {
1539    parent.and_then(|s| s.get(key)).and_then(|v| {
1540        v.as_str()
1541            .and_then(|s| s.parse().ok())
1542            .or_else(|| v.as_f64())
1543    })
1544}
1545
1546/// Apply a `device:sync` WebSocket message to the DataStore.
1547///
1548/// Extracts CPU, memory, load averages, and uplink bandwidth from the
1549/// raw Legacy API device JSON. Merges stats into the existing device
1550/// (looked up by MAC) without clobbering Integration API fields.
1551#[allow(clippy::cast_precision_loss)]
1552fn apply_device_sync(store: &DataStore, data: &serde_json::Value) {
1553    let Some(mac_str) = data.get("mac").and_then(serde_json::Value::as_str) else {
1554        return;
1555    };
1556    let mac = MacAddress::new(mac_str);
1557    let Some(existing) = store.device_by_mac(&mac) else {
1558        return; // Device not in store yet — full_refresh will add it
1559    };
1560
1561    // Parse sys_stats
1562    let sys = data.get("sys_stats");
1563    let cpu = sys
1564        .and_then(|s| s.get("cpu"))
1565        .and_then(|v| v.as_str().or_else(|| v.as_f64().map(|_| "")))
1566        .and_then(|s| {
1567            if s.is_empty() {
1568                None
1569            } else {
1570                s.parse::<f64>().ok()
1571            }
1572        })
1573        .or_else(|| {
1574            sys.and_then(|s| s.get("cpu"))
1575                .and_then(serde_json::Value::as_f64)
1576        });
1577    #[allow(clippy::as_conversions, clippy::cast_precision_loss)]
1578    let mem_pct = match (
1579        sys.and_then(|s| s.get("mem_used"))
1580            .and_then(serde_json::Value::as_i64),
1581        sys.and_then(|s| s.get("mem_total"))
1582            .and_then(serde_json::Value::as_i64),
1583    ) {
1584        (Some(used), Some(total)) if total > 0 => Some((used as f64 / total as f64) * 100.0),
1585        _ => None,
1586    };
1587    let load_averages: [Option<f64>; 3] =
1588        ["loadavg_1", "loadavg_5", "loadavg_15"].map(|key| parse_f64_field(sys, key));
1589
1590    // Uplink bandwidth: check "uplink" object or top-level fields
1591    let uplink = data.get("uplink");
1592    let tx_bps = uplink
1593        .and_then(|u| u.get("tx_bytes-r").or_else(|| u.get("tx_bytes_r")))
1594        .and_then(serde_json::Value::as_u64)
1595        .or_else(|| data.get("tx_bytes-r").and_then(serde_json::Value::as_u64));
1596    let rx_bps = uplink
1597        .and_then(|u| u.get("rx_bytes-r").or_else(|| u.get("rx_bytes_r")))
1598        .and_then(serde_json::Value::as_u64)
1599        .or_else(|| data.get("rx_bytes-r").and_then(serde_json::Value::as_u64));
1600
1601    let bandwidth = match (tx_bps, rx_bps) {
1602        (Some(tx), Some(rx)) if tx > 0 || rx > 0 => Some(crate::model::common::Bandwidth {
1603            tx_bytes_per_sec: tx,
1604            rx_bytes_per_sec: rx,
1605        }),
1606        _ => existing.stats.uplink_bandwidth, // Keep existing if no new data
1607    };
1608
1609    // Uptime from top-level `_uptime` or `uptime`
1610    let uptime = data
1611        .get("_uptime")
1612        .or_else(|| data.get("uptime"))
1613        .and_then(serde_json::Value::as_i64)
1614        .and_then(|u| u.try_into().ok())
1615        .or(existing.stats.uptime_secs);
1616
1617    // Clone and update
1618    let mut device = (*existing).clone();
1619    device.stats.uplink_bandwidth = bandwidth;
1620    if let Some(c) = cpu {
1621        device.stats.cpu_utilization_pct = Some(c);
1622    }
1623    if let Some(m) = mem_pct {
1624        device.stats.memory_utilization_pct = Some(m);
1625    }
1626    if let Some(l) = load_averages[0] {
1627        device.stats.load_average_1m = Some(l);
1628    }
1629    if let Some(l) = load_averages[1] {
1630        device.stats.load_average_5m = Some(l);
1631    }
1632    if let Some(l) = load_averages[2] {
1633        device.stats.load_average_15m = Some(l);
1634    }
1635    device.stats.uptime_secs = uptime;
1636
1637    // Update client count from num_sta (AP/switch connected stations)
1638    if let Some(num_sta) = data.get("num_sta").and_then(serde_json::Value::as_u64) {
1639        #[allow(clippy::as_conversions, clippy::cast_possible_truncation)]
1640        {
1641            device.client_count = Some(num_sta as u32);
1642        }
1643    }
1644
1645    if let Some(obj) = data.as_object()
1646        && let Some(wan_ipv6) = parse_legacy_device_wan_ipv6(obj)
1647    {
1648        device.wan_ipv6 = Some(wan_ipv6);
1649    }
1650
1651    let key = mac.as_str().to_owned();
1652    let id = device.id.clone();
1653    store.devices.upsert(key, id, device);
1654}
1655
1656/// Periodically refresh data from the controller.
1657async fn refresh_task(controller: Controller, interval_secs: u64, cancel: CancellationToken) {
1658    let mut interval = tokio::time::interval(Duration::from_secs(interval_secs));
1659    interval.tick().await; // consume the immediate first tick
1660
1661    loop {
1662        tokio::select! {
1663            biased;
1664            () = cancel.cancelled() => break,
1665            _ = interval.tick() => {
1666                if let Err(e) = controller.full_refresh().await {
1667                    warn!(error = %e, "periodic refresh failed");
1668                }
1669            }
1670        }
1671    }
1672}
1673
1674/// Process commands from the mpsc channel, routing each to the
1675/// appropriate Legacy API call.
1676async fn command_processor_task(controller: Controller, mut rx: mpsc::Receiver<CommandEnvelope>) {
1677    let cancel = controller.inner.cancel_child.lock().await.clone();
1678
1679    loop {
1680        tokio::select! {
1681            biased;
1682            () = cancel.cancelled() => break,
1683            envelope = rx.recv() => {
1684                let Some(envelope) = envelope else { break };
1685                let result = route_command(&controller, envelope.command).await;
1686                let _ = envelope.response_tx.send(result);
1687            }
1688        }
1689    }
1690}
1691
1692// ── Command routing ──────────────────────────────────────────────
1693
1694/// Route a command to the appropriate API call.
1695///
1696/// Uses the Integration API for CRUD operations when available,
1697/// falls back to the Legacy API for session-based commands.
1698#[allow(clippy::cognitive_complexity, clippy::too_many_lines)]
1699async fn route_command(controller: &Controller, cmd: Command) -> Result<CommandResult, CoreError> {
1700    let store = &controller.inner.store;
1701
1702    // Acquire both clients for routing decisions
1703    let integration_guard = controller.inner.integration_client.lock().await;
1704    let legacy_guard = controller.inner.legacy_client.lock().await;
1705    let site_id = *controller.inner.site_id.lock().await;
1706
1707    match cmd {
1708        // ── Device operations ────────────────────────────────────
1709        Command::AdoptDevice {
1710            mac,
1711            ignore_device_limit,
1712        } => {
1713            if let (Some(ic), Some(sid)) = (integration_guard.as_ref(), site_id) {
1714                ic.adopt_device(&sid, mac.as_str(), ignore_device_limit)
1715                    .await?;
1716            } else {
1717                let legacy = require_legacy(&legacy_guard)?;
1718                legacy.adopt_device(mac.as_str()).await?;
1719            }
1720            Ok(CommandResult::Ok)
1721        }
1722
1723        Command::RestartDevice { id } => {
1724            if let (Some(ic), Some(sid)) = (integration_guard.as_ref(), site_id) {
1725                let device_uuid = require_uuid(&id)?;
1726                ic.device_action(&sid, &device_uuid, "RESTART").await?;
1727            } else {
1728                let legacy = require_legacy(&legacy_guard)?;
1729                let mac = device_mac(store, &id)?;
1730                legacy.restart_device(mac.as_str()).await?;
1731            }
1732            Ok(CommandResult::Ok)
1733        }
1734
1735        Command::LocateDevice { mac, enable } => {
1736            if let (Some(ic), Some(sid)) = (integration_guard.as_ref(), site_id) {
1737                let device =
1738                    store
1739                        .device_by_mac(&mac)
1740                        .ok_or_else(|| CoreError::DeviceNotFound {
1741                            identifier: mac.to_string(),
1742                        })?;
1743                let device_uuid = require_uuid(&device.id)?;
1744                let action = if enable { "LOCATE_ON" } else { "LOCATE_OFF" };
1745                ic.device_action(&sid, &device_uuid, action).await?;
1746            } else {
1747                let legacy = require_legacy(&legacy_guard)?;
1748                legacy.locate_device(mac.as_str(), enable).await?;
1749            }
1750            Ok(CommandResult::Ok)
1751        }
1752
1753        Command::UpgradeDevice { mac, firmware_url } => {
1754            let legacy = require_legacy(&legacy_guard)?;
1755            legacy
1756                .upgrade_device(mac.as_str(), firmware_url.as_deref())
1757                .await?;
1758            Ok(CommandResult::Ok)
1759        }
1760
1761        Command::RemoveDevice { id } => {
1762            let (ic, sid) = require_integration(&integration_guard, site_id, "RemoveDevice")?;
1763            let device_uuid = require_uuid(&id)?;
1764            ic.remove_device(&sid, &device_uuid).await?;
1765            Ok(CommandResult::Ok)
1766        }
1767
1768        Command::ProvisionDevice { mac } => {
1769            let legacy = require_legacy(&legacy_guard)?;
1770            legacy.provision_device(mac.as_str()).await?;
1771            Ok(CommandResult::Ok)
1772        }
1773        Command::SpeedtestDevice => {
1774            let legacy = require_legacy(&legacy_guard)?;
1775            legacy.speedtest().await?;
1776            Ok(CommandResult::Ok)
1777        }
1778
1779        Command::PowerCyclePort {
1780            device_id,
1781            port_idx,
1782        } => {
1783            let (ic, sid) = require_integration(&integration_guard, site_id, "PowerCyclePort")?;
1784            let device_uuid = require_uuid(&device_id)?;
1785            ic.port_action(&sid, &device_uuid, port_idx, "POWER_CYCLE")
1786                .await?;
1787            Ok(CommandResult::Ok)
1788        }
1789
1790        // ── Client operations ────────────────────────────────────
1791        Command::BlockClient { mac } => {
1792            if let (Some(ic), Some(sid)) = (integration_guard.as_ref(), site_id) {
1793                let client =
1794                    store
1795                        .client_by_mac(&mac)
1796                        .ok_or_else(|| CoreError::ClientNotFound {
1797                            identifier: mac.to_string(),
1798                        })?;
1799                let client_uuid = require_uuid(&client.id)?;
1800                ic.client_action(&sid, &client_uuid, "BLOCK").await?;
1801            } else {
1802                let legacy = require_legacy(&legacy_guard)?;
1803                legacy.block_client(mac.as_str()).await?;
1804            }
1805            Ok(CommandResult::Ok)
1806        }
1807
1808        Command::UnblockClient { mac } => {
1809            if let (Some(ic), Some(sid)) = (integration_guard.as_ref(), site_id) {
1810                let client =
1811                    store
1812                        .client_by_mac(&mac)
1813                        .ok_or_else(|| CoreError::ClientNotFound {
1814                            identifier: mac.to_string(),
1815                        })?;
1816                let client_uuid = require_uuid(&client.id)?;
1817                ic.client_action(&sid, &client_uuid, "UNBLOCK").await?;
1818            } else {
1819                let legacy = require_legacy(&legacy_guard)?;
1820                legacy.unblock_client(mac.as_str()).await?;
1821            }
1822            Ok(CommandResult::Ok)
1823        }
1824
1825        Command::KickClient { mac } => {
1826            if let (Some(ic), Some(sid)) = (integration_guard.as_ref(), site_id) {
1827                let client =
1828                    store
1829                        .client_by_mac(&mac)
1830                        .ok_or_else(|| CoreError::ClientNotFound {
1831                            identifier: mac.to_string(),
1832                        })?;
1833                let client_uuid = require_uuid(&client.id)?;
1834                ic.client_action(&sid, &client_uuid, "RECONNECT").await?;
1835            } else {
1836                let legacy = require_legacy(&legacy_guard)?;
1837                legacy.kick_client(mac.as_str()).await?;
1838            }
1839            Ok(CommandResult::Ok)
1840        }
1841
1842        Command::ForgetClient { mac } => {
1843            let legacy = require_legacy(&legacy_guard)?;
1844            legacy.forget_client(mac.as_str()).await?;
1845            Ok(CommandResult::Ok)
1846        }
1847
1848        Command::AuthorizeGuest {
1849            client_id,
1850            time_limit_minutes,
1851            data_limit_mb,
1852            rx_rate_kbps,
1853            tx_rate_kbps,
1854        } => {
1855            let legacy = require_legacy(&legacy_guard)?;
1856            let mac = client_mac(store, &client_id)?;
1857            let minutes = time_limit_minutes.unwrap_or(60);
1858            #[allow(clippy::as_conversions, clippy::cast_possible_truncation)]
1859            {
1860                legacy
1861                    .authorize_guest(
1862                        mac.as_str(),
1863                        minutes,
1864                        tx_rate_kbps.map(|r| r as u32),
1865                        rx_rate_kbps.map(|r| r as u32),
1866                        data_limit_mb.map(|m| m as u32),
1867                    )
1868                    .await?;
1869            }
1870            Ok(CommandResult::Ok)
1871        }
1872
1873        Command::UnauthorizeGuest { client_id } => {
1874            let legacy = require_legacy(&legacy_guard)?;
1875            let mac = client_mac(store, &client_id)?;
1876            legacy.unauthorize_guest(mac.as_str()).await?;
1877            Ok(CommandResult::Ok)
1878        }
1879
1880        Command::SetClientFixedIp {
1881            mac,
1882            ip,
1883            network_id,
1884        } => {
1885            let legacy = require_legacy(&legacy_guard)?;
1886            legacy
1887                .set_client_fixed_ip(mac.as_str(), &ip.to_string(), &network_id.to_string())
1888                .await?;
1889            Ok(CommandResult::Ok)
1890        }
1891
1892        Command::RemoveClientFixedIp { mac } => {
1893            let legacy = require_legacy(&legacy_guard)?;
1894            legacy.remove_client_fixed_ip(mac.as_str()).await?;
1895            Ok(CommandResult::Ok)
1896        }
1897
1898        // ── Alarm operations ─────────────────────────────────────
1899        Command::ArchiveAlarm { id } => {
1900            let legacy = require_legacy(&legacy_guard)?;
1901            legacy.archive_alarm(&id.to_string()).await?;
1902            Ok(CommandResult::Ok)
1903        }
1904
1905        Command::ArchiveAllAlarms => {
1906            let legacy = require_legacy(&legacy_guard)?;
1907            legacy.archive_all_alarms().await?;
1908            Ok(CommandResult::Ok)
1909        }
1910
1911        // ── Backup operations ────────────────────────────────────
1912        Command::CreateBackup => {
1913            let legacy = require_legacy(&legacy_guard)?;
1914            legacy.create_backup().await?;
1915            Ok(CommandResult::Ok)
1916        }
1917
1918        Command::DeleteBackup { filename } => {
1919            let legacy = require_legacy(&legacy_guard)?;
1920            legacy.delete_backup(&filename).await?;
1921            Ok(CommandResult::Ok)
1922        }
1923
1924        // ── Network CRUD (Integration API) ───────────────────────
1925        Command::CreateNetwork(req) => {
1926            let (ic, sid) = require_integration(&integration_guard, site_id, "CreateNetwork")?;
1927            let crate::command::CreateNetworkRequest {
1928                name,
1929                vlan_id,
1930                subnet,
1931                management,
1932                purpose,
1933                dhcp_enabled,
1934                enabled,
1935                dhcp_range_start,
1936                dhcp_range_stop,
1937                dhcp_lease_time,
1938                firewall_zone_id,
1939                isolation_enabled,
1940                internet_access_enabled,
1941            } = req;
1942
1943            let management = management.unwrap_or_else(|| {
1944                if matches!(purpose, Some(NetworkPurpose::VlanOnly)) {
1945                    NetworkManagement::Unmanaged
1946                } else if purpose.is_some() || subnet.is_some() || dhcp_enabled {
1947                    NetworkManagement::Gateway
1948                } else {
1949                    NetworkManagement::Unmanaged
1950                }
1951            });
1952            let mut extra = HashMap::new();
1953
1954            if let Some(zone) = firewall_zone_id {
1955                extra.insert("zoneId".into(), serde_json::Value::String(zone));
1956            }
1957
1958            if matches!(management, NetworkManagement::Gateway) {
1959                extra.insert(
1960                    "isolationEnabled".into(),
1961                    serde_json::Value::Bool(isolation_enabled),
1962                );
1963                extra.insert(
1964                    "internetAccessEnabled".into(),
1965                    serde_json::Value::Bool(internet_access_enabled),
1966                );
1967
1968                if let Some(cidr) = subnet {
1969                    let (host_ip, prefix_len) = parse_ipv4_cidr(&cidr)?;
1970                    let mut dhcp_cfg = serde_json::Map::new();
1971                    dhcp_cfg.insert(
1972                        "mode".into(),
1973                        serde_json::Value::String(
1974                            if dhcp_enabled { "SERVER" } else { "NONE" }.into(),
1975                        ),
1976                    );
1977                    if let Some(lease) = dhcp_lease_time {
1978                        dhcp_cfg.insert(
1979                            "leaseTimeSeconds".into(),
1980                            serde_json::Value::Number(serde_json::Number::from(u64::from(lease))),
1981                        );
1982                    }
1983
1984                    if let (Some(start), Some(stop)) = (dhcp_range_start, dhcp_range_stop) {
1985                        dhcp_cfg.insert(
1986                            "ipAddressRange".into(),
1987                            serde_json::json!({
1988                                "start": start,
1989                                "end": stop
1990                            }),
1991                        );
1992                    }
1993
1994                    extra.insert(
1995                        "ipv4Configuration".into(),
1996                        serde_json::json!({
1997                            "hostIpAddress": host_ip.to_string(),
1998                            "prefixLength": u64::from(prefix_len),
1999                            "dhcpConfiguration": dhcp_cfg
2000                        }),
2001                    );
2002                }
2003            }
2004
2005            let body = crate::integration_types::NetworkCreateUpdate {
2006                name,
2007                enabled,
2008                management: "USER_DEFINED".into(),
2009                vlan_id: vlan_id.map_or(1, i32::from),
2010                dhcp_guarding: None,
2011                extra,
2012            };
2013            ic.create_network(&sid, &body).await?;
2014            Ok(CommandResult::Ok)
2015        }
2016
2017        Command::UpdateNetwork { id, update } => {
2018            let (ic, sid) = require_integration(&integration_guard, site_id, "UpdateNetwork")?;
2019            let uuid = require_uuid(&id)?;
2020            // Fetch existing to merge partial update
2021            let existing = ic.get_network(&sid, &uuid).await?;
2022            // Start from existing extra fields, then apply toggle overrides
2023            let mut extra = existing.extra;
2024            if let Some(v) = update.isolation_enabled {
2025                extra.insert("isolationEnabled".into(), serde_json::Value::Bool(v));
2026            }
2027            if let Some(v) = update.internet_access_enabled {
2028                extra.insert("internetAccessEnabled".into(), serde_json::Value::Bool(v));
2029            }
2030            if let Some(v) = update.mdns_forwarding_enabled {
2031                extra.insert("mdnsForwardingEnabled".into(), serde_json::Value::Bool(v));
2032            }
2033            if let Some(v) = update.ipv6_enabled {
2034                if v {
2035                    // Enable IPv6 with prefix delegation if not already configured
2036                    extra
2037                        .entry("ipv6Configuration".into())
2038                        .or_insert_with(|| serde_json::json!({ "type": "PREFIX_DELEGATION" }));
2039                } else {
2040                    extra.remove("ipv6Configuration");
2041                }
2042            }
2043            let body = crate::integration_types::NetworkCreateUpdate {
2044                name: update.name.unwrap_or(existing.name),
2045                enabled: update.enabled.unwrap_or(existing.enabled),
2046                management: existing.management,
2047                vlan_id: update.vlan_id.map_or(existing.vlan_id, i32::from),
2048                dhcp_guarding: existing.dhcp_guarding,
2049                extra,
2050            };
2051            ic.update_network(&sid, &uuid, &body).await?;
2052            Ok(CommandResult::Ok)
2053        }
2054
2055        Command::DeleteNetwork { id, force: _ } => {
2056            let (ic, sid) = require_integration(&integration_guard, site_id, "DeleteNetwork")?;
2057            let uuid = require_uuid(&id)?;
2058            ic.delete_network(&sid, &uuid).await?;
2059            Ok(CommandResult::Ok)
2060        }
2061
2062        // ── WiFi Broadcast CRUD ──────────────────────────────────
2063        Command::CreateWifiBroadcast(req) => {
2064            let (ic, sid) =
2065                require_integration(&integration_guard, site_id, "CreateWifiBroadcast")?;
2066            let body = build_create_wifi_broadcast_payload(&req);
2067            ic.create_wifi_broadcast(&sid, &body).await?;
2068            Ok(CommandResult::Ok)
2069        }
2070
2071        Command::UpdateWifiBroadcast { id, update } => {
2072            let (ic, sid) =
2073                require_integration(&integration_guard, site_id, "UpdateWifiBroadcast")?;
2074            let uuid = require_uuid(&id)?;
2075            let existing = ic.get_wifi_broadcast(&sid, &uuid).await?;
2076            let payload = build_update_wifi_broadcast_payload(&existing, &update);
2077            ic.update_wifi_broadcast(&sid, &uuid, &payload).await?;
2078            Ok(CommandResult::Ok)
2079        }
2080
2081        Command::DeleteWifiBroadcast { id, force: _ } => {
2082            let (ic, sid) =
2083                require_integration(&integration_guard, site_id, "DeleteWifiBroadcast")?;
2084            let uuid = require_uuid(&id)?;
2085            ic.delete_wifi_broadcast(&sid, &uuid).await?;
2086            Ok(CommandResult::Ok)
2087        }
2088
2089        // ── Firewall Policy CRUD ─────────────────────────────────
2090        Command::CreateFirewallPolicy(req) => {
2091            let (ic, sid) =
2092                require_integration(&integration_guard, site_id, "CreateFirewallPolicy")?;
2093            let action_str = match req.action {
2094                FirewallAction::Allow => "ALLOW",
2095                FirewallAction::Block => "DROP",
2096                FirewallAction::Reject => "REJECT",
2097            };
2098            let source =
2099                build_endpoint_json(&req.source_zone_id.to_string(), req.source_filter.as_ref());
2100            let destination = build_endpoint_json(
2101                &req.destination_zone_id.to_string(),
2102                req.destination_filter.as_ref(),
2103            );
2104            let ip_version = req.ip_version.as_deref().unwrap_or("IPV4_AND_IPV6");
2105            let body = crate::integration_types::FirewallPolicyCreateUpdate {
2106                name: req.name,
2107                description: req.description,
2108                enabled: req.enabled,
2109                action: serde_json::json!({ "type": action_str }),
2110                source,
2111                destination,
2112                ip_protocol_scope: serde_json::json!({ "ipVersion": ip_version }),
2113                logging_enabled: req.logging_enabled,
2114                ipsec_filter: None,
2115                schedule: None,
2116                connection_state_filter: req.connection_states,
2117            };
2118            ic.create_firewall_policy(&sid, &body).await?;
2119            Ok(CommandResult::Ok)
2120        }
2121
2122        Command::UpdateFirewallPolicy { id, update } => {
2123            let (ic, sid) =
2124                require_integration(&integration_guard, site_id, "UpdateFirewallPolicy")?;
2125            let uuid = require_uuid(&id)?;
2126            let existing = ic.get_firewall_policy(&sid, &uuid).await?;
2127
2128            // Source: use new filter spec if provided, otherwise keep existing
2129            let source = if let Some(ref spec) = update.source_filter {
2130                let zone_id = existing
2131                    .source
2132                    .as_ref()
2133                    .and_then(|s| s.zone_id)
2134                    .map(|u| u.to_string())
2135                    .unwrap_or_default();
2136                build_endpoint_json(&zone_id, Some(spec))
2137            } else {
2138                serde_json::to_value(&existing.source).unwrap_or_default()
2139            };
2140
2141            // Destination: use new filter spec if provided, otherwise keep existing
2142            let destination = if let Some(ref spec) = update.destination_filter {
2143                let zone_id = existing
2144                    .destination
2145                    .as_ref()
2146                    .and_then(|d| d.zone_id)
2147                    .map(|u| u.to_string())
2148                    .unwrap_or_default();
2149                build_endpoint_json(&zone_id, Some(spec))
2150            } else {
2151                serde_json::to_value(&existing.destination).unwrap_or_default()
2152            };
2153
2154            let action = if let Some(action) = update.action {
2155                let action_type = match action {
2156                    FirewallAction::Allow => "ALLOW",
2157                    FirewallAction::Block => "DROP",
2158                    FirewallAction::Reject => "REJECT",
2159                };
2160                serde_json::json!({ "type": action_type })
2161            } else {
2162                existing.action
2163            };
2164
2165            let ip_protocol_scope = if let Some(ref version) = update.ip_version {
2166                serde_json::json!({ "ipVersion": version })
2167            } else {
2168                existing
2169                    .ip_protocol_scope
2170                    .unwrap_or_else(|| serde_json::json!({ "ipVersion": "IPV4_AND_IPV6" }))
2171            };
2172
2173            let connection_state_filter = update.connection_states.or_else(|| {
2174                existing
2175                    .extra
2176                    .get("connectionStateFilter")
2177                    .and_then(serde_json::Value::as_array)
2178                    .map(|arr| {
2179                        arr.iter()
2180                            .filter_map(|v| v.as_str().map(ToOwned::to_owned))
2181                            .collect::<Vec<_>>()
2182                    })
2183            });
2184
2185            let payload = crate::integration_types::FirewallPolicyCreateUpdate {
2186                name: update.name.unwrap_or(existing.name),
2187                description: update.description.or(existing.description),
2188                enabled: update.enabled.unwrap_or(existing.enabled),
2189                action,
2190                source,
2191                destination,
2192                ip_protocol_scope,
2193                logging_enabled: update.logging_enabled.unwrap_or(existing.logging_enabled),
2194                ipsec_filter: existing
2195                    .extra
2196                    .get("ipsecFilter")
2197                    .and_then(serde_json::Value::as_str)
2198                    .map(ToOwned::to_owned),
2199                schedule: existing.extra.get("schedule").cloned(),
2200                connection_state_filter,
2201            };
2202
2203            ic.update_firewall_policy(&sid, &uuid, &payload).await?;
2204            Ok(CommandResult::Ok)
2205        }
2206
2207        Command::DeleteFirewallPolicy { id } => {
2208            let (ic, sid) =
2209                require_integration(&integration_guard, site_id, "DeleteFirewallPolicy")?;
2210            let uuid = require_uuid(&id)?;
2211            ic.delete_firewall_policy(&sid, &uuid).await?;
2212            Ok(CommandResult::Ok)
2213        }
2214
2215        Command::PatchFirewallPolicy {
2216            id,
2217            enabled,
2218            logging,
2219        } => {
2220            let (ic, sid) =
2221                require_integration(&integration_guard, site_id, "PatchFirewallPolicy")?;
2222            let uuid = require_uuid(&id)?;
2223            let body = crate::integration_types::FirewallPolicyPatch {
2224                enabled,
2225                logging_enabled: logging,
2226            };
2227            ic.patch_firewall_policy(&sid, &uuid, &body).await?;
2228            Ok(CommandResult::Ok)
2229        }
2230
2231        Command::ReorderFirewallPolicies {
2232            zone_pair: _,
2233            ordered_ids,
2234        } => {
2235            let (ic, sid) =
2236                require_integration(&integration_guard, site_id, "ReorderFirewallPolicies")?;
2237            let uuids: Result<Vec<uuid::Uuid>, _> = ordered_ids.iter().map(require_uuid).collect();
2238            let body = crate::integration_types::FirewallPolicyOrdering {
2239                before_system_defined: uuids?,
2240                after_system_defined: Vec::new(),
2241            };
2242            ic.set_firewall_policy_ordering(&sid, &body).await?;
2243            Ok(CommandResult::Ok)
2244        }
2245
2246        // ── Firewall Zone CRUD ───────────────────────────────────
2247        Command::CreateFirewallZone(req) => {
2248            let (ic, sid) = require_integration(&integration_guard, site_id, "CreateFirewallZone")?;
2249            let network_uuids: Result<Vec<uuid::Uuid>, _> =
2250                req.network_ids.iter().map(require_uuid).collect();
2251            let body = crate::integration_types::FirewallZoneCreateUpdate {
2252                name: req.name,
2253                network_ids: network_uuids?,
2254            };
2255            ic.create_firewall_zone(&sid, &body).await?;
2256            Ok(CommandResult::Ok)
2257        }
2258
2259        Command::UpdateFirewallZone { id, update } => {
2260            let (ic, sid) = require_integration(&integration_guard, site_id, "UpdateFirewallZone")?;
2261            let uuid = require_uuid(&id)?;
2262            let existing = ic.get_firewall_zone(&sid, &uuid).await?;
2263            let network_ids = if let Some(ids) = update.network_ids {
2264                let uuids: Result<Vec<uuid::Uuid>, _> = ids.iter().map(require_uuid).collect();
2265                uuids?
2266            } else {
2267                existing.network_ids
2268            };
2269            let body = crate::integration_types::FirewallZoneCreateUpdate {
2270                name: update.name.unwrap_or(existing.name),
2271                network_ids,
2272            };
2273            ic.update_firewall_zone(&sid, &uuid, &body).await?;
2274            Ok(CommandResult::Ok)
2275        }
2276
2277        Command::DeleteFirewallZone { id } => {
2278            let (ic, sid) = require_integration(&integration_guard, site_id, "DeleteFirewallZone")?;
2279            let uuid = require_uuid(&id)?;
2280            ic.delete_firewall_zone(&sid, &uuid).await?;
2281            Ok(CommandResult::Ok)
2282        }
2283
2284        // ── ACL Rule CRUD ────────────────────────────────────────
2285        Command::CreateAclRule(req) => {
2286            let (ic, sid) = require_integration(&integration_guard, site_id, "CreateAclRule")?;
2287            let action_str = match req.action {
2288                FirewallAction::Allow => "ALLOW",
2289                FirewallAction::Block => "BLOCK",
2290                FirewallAction::Reject => "REJECT",
2291            };
2292            let mut source_filter = serde_json::Map::new();
2293            source_filter.insert(
2294                "zoneId".into(),
2295                serde_json::Value::String(req.source_zone_id.to_string()),
2296            );
2297            if let Some(source_port) = req.source_port {
2298                source_filter.insert("port".into(), serde_json::Value::String(source_port));
2299            }
2300            if let Some(protocol) = req.protocol.clone() {
2301                source_filter.insert("protocol".into(), serde_json::Value::String(protocol));
2302            }
2303
2304            let mut destination_filter = serde_json::Map::new();
2305            destination_filter.insert(
2306                "zoneId".into(),
2307                serde_json::Value::String(req.destination_zone_id.to_string()),
2308            );
2309            if let Some(destination_port) = req.destination_port {
2310                destination_filter
2311                    .insert("port".into(), serde_json::Value::String(destination_port));
2312            }
2313            if let Some(protocol) = req.protocol {
2314                destination_filter.insert("protocol".into(), serde_json::Value::String(protocol));
2315            }
2316            let body = crate::integration_types::AclRuleCreateUpdate {
2317                name: req.name,
2318                rule_type: req.rule_type,
2319                action: action_str.into(),
2320                enabled: req.enabled,
2321                description: None,
2322                source_filter: Some(serde_json::Value::Object(source_filter)),
2323                destination_filter: Some(serde_json::Value::Object(destination_filter)),
2324                enforcing_device_filter: None,
2325            };
2326            ic.create_acl_rule(&sid, &body).await?;
2327            Ok(CommandResult::Ok)
2328        }
2329
2330        Command::UpdateAclRule { id, update } => {
2331            let (ic, sid) = require_integration(&integration_guard, site_id, "UpdateAclRule")?;
2332            let uuid = require_uuid(&id)?;
2333            let existing = ic.get_acl_rule(&sid, &uuid).await?;
2334            let action_str = match update.action {
2335                Some(FirewallAction::Allow) => "ALLOW".into(),
2336                Some(FirewallAction::Block) => "BLOCK".into(),
2337                Some(FirewallAction::Reject) => "REJECT".into(),
2338                None => existing.action,
2339            };
2340            let body = crate::integration_types::AclRuleCreateUpdate {
2341                name: update.name.unwrap_or(existing.name),
2342                rule_type: existing.rule_type,
2343                action: action_str,
2344                enabled: update.enabled.unwrap_or(existing.enabled),
2345                description: existing.description,
2346                source_filter: existing.source_filter,
2347                destination_filter: existing.destination_filter,
2348                enforcing_device_filter: existing.enforcing_device_filter,
2349            };
2350            ic.update_acl_rule(&sid, &uuid, &body).await?;
2351            Ok(CommandResult::Ok)
2352        }
2353
2354        Command::DeleteAclRule { id } => {
2355            let (ic, sid) = require_integration(&integration_guard, site_id, "DeleteAclRule")?;
2356            let uuid = require_uuid(&id)?;
2357            ic.delete_acl_rule(&sid, &uuid).await?;
2358            Ok(CommandResult::Ok)
2359        }
2360
2361        Command::ReorderAclRules { ordered_ids } => {
2362            let (ic, sid) = require_integration(&integration_guard, site_id, "ReorderAclRules")?;
2363            let uuids: Result<Vec<uuid::Uuid>, _> = ordered_ids.iter().map(require_uuid).collect();
2364            let body = crate::integration_types::AclRuleOrdering {
2365                ordered_acl_rule_ids: uuids?,
2366            };
2367            ic.set_acl_rule_ordering(&sid, &body).await?;
2368            Ok(CommandResult::Ok)
2369        }
2370
2371        // ── DNS Policy CRUD ──────────────────────────────────────
2372        Command::CreateDnsPolicy(req) => {
2373            let (ic, sid) = require_integration(&integration_guard, site_id, "CreateDnsPolicy")?;
2374            let policy_type_str = dns_policy_type_name(req.policy_type);
2375            let fields = build_create_dns_policy_fields(&req)?;
2376            let body = crate::integration_types::DnsPolicyCreateUpdate {
2377                policy_type: policy_type_str.to_owned(),
2378                enabled: req.enabled,
2379                fields,
2380            };
2381            ic.create_dns_policy(&sid, &body).await?;
2382            Ok(CommandResult::Ok)
2383        }
2384
2385        Command::UpdateDnsPolicy { id, update } => {
2386            let (ic, sid) = require_integration(&integration_guard, site_id, "UpdateDnsPolicy")?;
2387            let uuid = require_uuid(&id)?;
2388            let existing = ic.get_dns_policy(&sid, &uuid).await?;
2389            let fields = build_update_dns_policy_fields(&existing, &update)?;
2390
2391            let body = crate::integration_types::DnsPolicyCreateUpdate {
2392                policy_type: existing.policy_type,
2393                enabled: update.enabled.unwrap_or(existing.enabled),
2394                fields,
2395            };
2396            ic.update_dns_policy(&sid, &uuid, &body).await?;
2397            Ok(CommandResult::Ok)
2398        }
2399
2400        Command::DeleteDnsPolicy { id } => {
2401            let (ic, sid) = require_integration(&integration_guard, site_id, "DeleteDnsPolicy")?;
2402            let uuid = require_uuid(&id)?;
2403            ic.delete_dns_policy(&sid, &uuid).await?;
2404            Ok(CommandResult::Ok)
2405        }
2406
2407        // ── Traffic Matching List CRUD ───────────────────────────
2408        Command::CreateTrafficMatchingList(req) => {
2409            let (ic, sid) =
2410                require_integration(&integration_guard, site_id, "CreateTrafficMatchingList")?;
2411            let mut fields = serde_json::Map::new();
2412            fields.insert(
2413                "items".into(),
2414                serde_json::Value::Array(traffic_matching_list_items(
2415                    &req.entries,
2416                    req.raw_items.as_deref(),
2417                )),
2418            );
2419            if let Some(desc) = req.description {
2420                fields.insert("description".into(), serde_json::Value::String(desc));
2421            }
2422            let body = crate::integration_types::TrafficMatchingListCreateUpdate {
2423                name: req.name,
2424                list_type: req.list_type,
2425                fields,
2426            };
2427            ic.create_traffic_matching_list(&sid, &body).await?;
2428            Ok(CommandResult::Ok)
2429        }
2430
2431        Command::UpdateTrafficMatchingList { id, update } => {
2432            let (ic, sid) =
2433                require_integration(&integration_guard, site_id, "UpdateTrafficMatchingList")?;
2434            let uuid = require_uuid(&id)?;
2435            let existing = ic.get_traffic_matching_list(&sid, &uuid).await?;
2436            let mut fields = serde_json::Map::new();
2437            let entries = if let Some(raw_items) = update.raw_items.as_deref() {
2438                serde_json::Value::Array(raw_items.to_vec())
2439            } else if let Some(new_entries) = &update.entries {
2440                serde_json::Value::Array(traffic_matching_list_items(new_entries, None))
2441            } else if let Some(existing_entries) = existing.extra.get("items") {
2442                existing_entries.clone()
2443            } else if let Some(existing_entries) = existing.extra.get("entries") {
2444                existing_entries.clone()
2445            } else {
2446                serde_json::Value::Array(Vec::new())
2447            };
2448            fields.insert("items".into(), entries);
2449            if let Some(desc) = update.description {
2450                fields.insert("description".into(), serde_json::Value::String(desc));
2451            } else if let Some(existing_desc) = existing.extra.get("description") {
2452                fields.insert("description".into(), existing_desc.clone());
2453            }
2454            let body = crate::integration_types::TrafficMatchingListCreateUpdate {
2455                name: update.name.unwrap_or(existing.name),
2456                list_type: existing.list_type,
2457                fields,
2458            };
2459            ic.update_traffic_matching_list(&sid, &uuid, &body).await?;
2460            Ok(CommandResult::Ok)
2461        }
2462
2463        Command::DeleteTrafficMatchingList { id } => {
2464            let (ic, sid) =
2465                require_integration(&integration_guard, site_id, "DeleteTrafficMatchingList")?;
2466            let uuid = require_uuid(&id)?;
2467            ic.delete_traffic_matching_list(&sid, &uuid).await?;
2468            Ok(CommandResult::Ok)
2469        }
2470
2471        // ── Voucher management ───────────────────────────────────
2472        Command::CreateVouchers(req) => {
2473            let (ic, sid) = require_integration(&integration_guard, site_id, "CreateVouchers")?;
2474            #[allow(clippy::as_conversions, clippy::cast_possible_wrap)]
2475            let body = crate::integration_types::VoucherCreateRequest {
2476                name: req.name.unwrap_or_else(|| "Voucher".into()),
2477                count: Some(req.count as i32),
2478                time_limit_minutes: i64::from(req.time_limit_minutes.unwrap_or(60)),
2479                authorized_guest_limit: req.authorized_guest_limit.map(i64::from),
2480                data_usage_limit_m_bytes: req.data_usage_limit_mb.map(|m| m as i64),
2481                rx_rate_limit_kbps: req.rx_rate_limit_kbps.map(|r| r as i64),
2482                tx_rate_limit_kbps: req.tx_rate_limit_kbps.map(|r| r as i64),
2483            };
2484            let vouchers = ic.create_vouchers(&sid, &body).await?;
2485            let domain_vouchers: Vec<Voucher> = vouchers.into_iter().map(Voucher::from).collect();
2486            Ok(CommandResult::Vouchers(domain_vouchers))
2487        }
2488
2489        Command::DeleteVoucher { id } => {
2490            let (ic, sid) = require_integration(&integration_guard, site_id, "DeleteVoucher")?;
2491            let uuid = require_uuid(&id)?;
2492            ic.delete_voucher(&sid, &uuid).await?;
2493            Ok(CommandResult::Ok)
2494        }
2495
2496        Command::PurgeVouchers { filter } => {
2497            let (ic, sid) = require_integration(&integration_guard, site_id, "PurgeVouchers")?;
2498            ic.purge_vouchers(&sid, &filter).await?;
2499            Ok(CommandResult::Ok)
2500        }
2501
2502        // ── System administration ────────────────────────────────
2503        Command::CreateSite { name, description } => {
2504            let legacy = require_legacy(&legacy_guard)?;
2505            legacy.create_site(&name, &description).await?;
2506            Ok(CommandResult::Ok)
2507        }
2508        Command::DeleteSite { name } => {
2509            let legacy = require_legacy(&legacy_guard)?;
2510            legacy.delete_site(&name).await?;
2511            Ok(CommandResult::Ok)
2512        }
2513        Command::InviteAdmin { name, email, role } => {
2514            let legacy = require_legacy(&legacy_guard)?;
2515            legacy.invite_admin(&name, &email, &role).await?;
2516            Ok(CommandResult::Ok)
2517        }
2518        Command::RevokeAdmin { id } => {
2519            let legacy = require_legacy(&legacy_guard)?;
2520            legacy.revoke_admin(&id.to_string()).await?;
2521            Ok(CommandResult::Ok)
2522        }
2523        Command::UpdateAdmin { id, role } => {
2524            let legacy = require_legacy(&legacy_guard)?;
2525            legacy
2526                .update_admin(&id.to_string(), role.as_deref())
2527                .await?;
2528            Ok(CommandResult::Ok)
2529        }
2530
2531        Command::RebootController => {
2532            let legacy = require_legacy(&legacy_guard)?;
2533            legacy.reboot_controller().await?;
2534            Ok(CommandResult::Ok)
2535        }
2536        Command::PoweroffController => {
2537            let legacy = require_legacy(&legacy_guard)?;
2538            legacy.poweroff_controller().await?;
2539            Ok(CommandResult::Ok)
2540        }
2541    }
2542}
2543
2544// ── Helpers ──────────────────────────────────────────────────────
2545
2546fn parse_ipv6_text(raw: &str) -> Option<Ipv6Addr> {
2547    let candidate = raw.trim().split('/').next().unwrap_or(raw).trim();
2548    candidate.parse::<Ipv6Addr>().ok()
2549}
2550
2551fn pick_ipv6_from_value(value: &serde_json::Value) -> Option<String> {
2552    let mut first_link_local: Option<String> = None;
2553
2554    let iter: Box<dyn Iterator<Item = &serde_json::Value> + '_> = match value {
2555        serde_json::Value::Array(items) => Box::new(items.iter()),
2556        _ => Box::new(std::iter::once(value)),
2557    };
2558
2559    for item in iter {
2560        if let Some(ipv6) = item.as_str().and_then(parse_ipv6_text) {
2561            let ip_text = ipv6.to_string();
2562            if !ipv6.is_unicast_link_local() {
2563                return Some(ip_text);
2564            }
2565            if first_link_local.is_none() {
2566                first_link_local = Some(ip_text);
2567            }
2568        }
2569    }
2570
2571    first_link_local
2572}
2573
2574fn parse_legacy_device_wan_ipv6(
2575    extra: &serde_json::Map<String, serde_json::Value>,
2576) -> Option<String> {
2577    // Primary source on gateways: wan1.ipv6 = ["global", "link-local"].
2578    if let Some(v) = extra
2579        .get("wan1")
2580        .and_then(|wan| wan.get("ipv6"))
2581        .and_then(pick_ipv6_from_value)
2582    {
2583        return Some(v);
2584    }
2585
2586    // Fallback source on some firmware: top-level ipv6 array.
2587    extra.get("ipv6").and_then(pick_ipv6_from_value)
2588}
2589
2590/// Convert raw health JSON values into domain `HealthSummary` types.
2591fn convert_health_summaries(raw: Vec<serde_json::Value>) -> Vec<HealthSummary> {
2592    raw.into_iter()
2593        .map(|v| HealthSummary {
2594            subsystem: v
2595                .get("subsystem")
2596                .and_then(|v| v.as_str())
2597                .unwrap_or("unknown")
2598                .to_owned(),
2599            status: v
2600                .get("status")
2601                .and_then(|v| v.as_str())
2602                .unwrap_or("unknown")
2603                .to_owned(),
2604            #[allow(clippy::as_conversions, clippy::cast_possible_truncation)]
2605            num_adopted: v
2606                .get("num_adopted")
2607                .and_then(serde_json::Value::as_u64)
2608                .map(|n| n as u32),
2609            #[allow(clippy::as_conversions, clippy::cast_possible_truncation)]
2610            num_sta: v
2611                .get("num_sta")
2612                .and_then(serde_json::Value::as_u64)
2613                .map(|n| n as u32),
2614            tx_bytes_r: v.get("tx_bytes-r").and_then(serde_json::Value::as_u64),
2615            rx_bytes_r: v.get("rx_bytes-r").and_then(serde_json::Value::as_u64),
2616            latency: v.get("latency").and_then(serde_json::Value::as_f64),
2617            wan_ip: v.get("wan_ip").and_then(|v| v.as_str()).map(String::from),
2618            gateways: v.get("gateways").and_then(|v| v.as_array()).map(|a| {
2619                a.iter()
2620                    .filter_map(|g| g.as_str().map(String::from))
2621                    .collect()
2622            }),
2623            extra: v,
2624        })
2625        .collect()
2626}
2627
2628/// Build a [`TransportConfig`] from the controller configuration.
2629fn build_transport(config: &ControllerConfig) -> TransportConfig {
2630    TransportConfig {
2631        tls: tls_to_transport(&config.tls),
2632        timeout: config.timeout,
2633        cookie_jar: None, // LegacyClient::new adds one automatically
2634    }
2635}
2636
2637fn tls_to_transport(tls: &TlsVerification) -> TlsMode {
2638    match tls {
2639        TlsVerification::SystemDefaults => TlsMode::System,
2640        TlsVerification::CustomCa(path) => TlsMode::CustomCa(path.clone()),
2641        TlsVerification::DangerAcceptInvalid => TlsMode::DangerAcceptInvalid,
2642    }
2643}
2644
2645/// Downgrade a paginated result to an empty `Vec` when the endpoint returns 404.
2646///
2647/// Some Integration API endpoints (ACL rules, DNS policies, vouchers) are not
2648/// available on all controller firmware versions. Rather than failing the entire
2649/// refresh, we log a debug message and return an empty collection.
2650fn unwrap_or_empty<S, D>(endpoint: &str, result: Result<Vec<S>, crate::error::Error>) -> Vec<D>
2651where
2652    D: From<S>,
2653{
2654    match result {
2655        Ok(items) => items.into_iter().map(D::from).collect(),
2656        Err(ref e) if e.is_not_found() => {
2657            debug!("{endpoint}: not available (404), treating as empty");
2658            Vec::new()
2659        }
2660        Err(e) => {
2661            warn!("{endpoint}: unexpected error {e}, treating as empty");
2662            Vec::new()
2663        }
2664    }
2665}
2666
2667/// Resolve the Integration API site UUID from a site name or UUID string.
2668///
2669/// If `site_name` is already a valid UUID, returns it directly.
2670/// Otherwise lists all sites and finds the one matching by `internal_reference`.
2671async fn resolve_site_id(
2672    client: &IntegrationClient,
2673    site_name: &str,
2674) -> Result<uuid::Uuid, CoreError> {
2675    // Fast path: if the input is already a UUID, use it directly.
2676    if let Ok(uuid) = uuid::Uuid::parse_str(site_name) {
2677        return Ok(uuid);
2678    }
2679
2680    let sites = client
2681        .paginate_all(50, |off, lim| client.list_sites(off, lim))
2682        .await?;
2683
2684    sites
2685        .into_iter()
2686        .find(|s| s.internal_reference == site_name)
2687        .map(|s| s.id)
2688        .ok_or_else(|| CoreError::SiteNotFound {
2689            name: site_name.to_owned(),
2690        })
2691}
2692
2693fn parse_ipv4_cidr(cidr: &str) -> Result<(Ipv4Addr, u8), CoreError> {
2694    let (host, prefix) = cidr
2695        .split_once('/')
2696        .ok_or_else(|| CoreError::ValidationFailed {
2697            message: format!("invalid ipv4 host/prefix value '{cidr}'"),
2698        })?;
2699    let host_ip = host
2700        .parse::<Ipv4Addr>()
2701        .map_err(|_| CoreError::ValidationFailed {
2702            message: format!("invalid IPv4 host address '{host}'"),
2703        })?;
2704    let prefix_len = prefix
2705        .parse::<u8>()
2706        .map_err(|_| CoreError::ValidationFailed {
2707            message: format!("invalid IPv4 prefix length '{prefix}'"),
2708        })?;
2709    if prefix_len > 32 {
2710        return Err(CoreError::ValidationFailed {
2711            message: format!("IPv4 prefix length must be <= 32, got {prefix_len}"),
2712        });
2713    }
2714    Ok((host_ip, prefix_len))
2715}
2716
2717/// Extract a `Uuid` from an `EntityId`, or return an error.
2718fn require_uuid(id: &EntityId) -> Result<uuid::Uuid, CoreError> {
2719    id.as_uuid().copied().ok_or_else(|| CoreError::Unsupported {
2720        operation: "Integration API operation on legacy ID".into(),
2721        required: "UUID-based entity ID".into(),
2722    })
2723}
2724
2725fn require_legacy<'a>(
2726    guard: &'a tokio::sync::MutexGuard<'_, Option<LegacyClient>>,
2727) -> Result<&'a LegacyClient, CoreError> {
2728    guard.as_ref().ok_or_else(|| CoreError::Unsupported {
2729        operation: "Legacy API operation".into(),
2730        required: "Legacy API credentials".into(),
2731    })
2732}
2733
2734fn require_integration<'a>(
2735    guard: &'a tokio::sync::MutexGuard<'_, Option<IntegrationClient>>,
2736    site_id: Option<uuid::Uuid>,
2737    operation: &str,
2738) -> Result<(&'a IntegrationClient, uuid::Uuid), CoreError> {
2739    let client = guard.as_ref().ok_or_else(|| unsupported(operation))?;
2740    let sid = site_id.ok_or_else(|| unsupported(operation))?;
2741    Ok((client, sid))
2742}
2743
2744fn unsupported(operation: &str) -> CoreError {
2745    CoreError::Unsupported {
2746        operation: operation.into(),
2747        required: "Integration API".into(),
2748    }
2749}
2750
2751/// Resolve an [`EntityId`] to a device MAC via the DataStore.
2752fn device_mac(store: &DataStore, id: &EntityId) -> Result<MacAddress, CoreError> {
2753    store
2754        .device_by_id(id)
2755        .map(|d| d.mac.clone())
2756        .ok_or_else(|| CoreError::DeviceNotFound {
2757            identifier: id.to_string(),
2758        })
2759}
2760
2761/// Resolve an [`EntityId`] to a client MAC via the DataStore.
2762fn client_mac(store: &DataStore, id: &EntityId) -> Result<MacAddress, CoreError> {
2763    store
2764        .client_by_id(id)
2765        .map(|c| c.mac.clone())
2766        .ok_or_else(|| CoreError::ClientNotFound {
2767            identifier: id.to_string(),
2768        })
2769}
2770
2771/// Build a firewall policy source/destination JSON object from a zone ID
2772/// and an optional traffic filter spec.
2773fn build_endpoint_json(
2774    zone_id: &str,
2775    filter: Option<&crate::command::requests::TrafficFilterSpec>,
2776) -> serde_json::Value {
2777    use crate::command::requests::TrafficFilterSpec;
2778
2779    let mut obj = serde_json::json!({ "zoneId": zone_id });
2780
2781    if let Some(spec) = filter {
2782        let traffic_filter = match spec {
2783            TrafficFilterSpec::Network {
2784                network_ids,
2785                match_opposite,
2786            } => {
2787                serde_json::json!({
2788                    "type": "NETWORK",
2789                    "networkFilter": {
2790                        "networkIds": network_ids,
2791                        "matchOpposite": match_opposite,
2792                    }
2793                })
2794            }
2795            TrafficFilterSpec::IpAddress {
2796                addresses,
2797                match_opposite,
2798            } => {
2799                let items: Vec<serde_json::Value> = addresses
2800                    .iter()
2801                    .map(|addr| {
2802                        if addr.contains('/') {
2803                            serde_json::json!({ "type": "SUBNET", "value": addr })
2804                        } else if addr.contains('-') {
2805                            let parts: Vec<&str> = addr.splitn(2, '-').collect();
2806                            serde_json::json!({ "type": "RANGE", "start": parts[0], "stop": parts.get(1).unwrap_or(&"") })
2807                        } else {
2808                            serde_json::json!({ "type": "IP_ADDRESS", "value": addr })
2809                        }
2810                    })
2811                    .collect();
2812                serde_json::json!({
2813                    "type": "IP_ADDRESSES",
2814                    "ipAddressFilter": {
2815                        "type": "IP_ADDRESSES",
2816                        "items": items,
2817                        "matchOpposite": match_opposite,
2818                    }
2819                })
2820            }
2821            TrafficFilterSpec::Port {
2822                ports,
2823                match_opposite,
2824            } => {
2825                let items: Vec<serde_json::Value> = ports
2826                    .iter()
2827                    .map(|p| {
2828                        if p.contains('-') {
2829                            let parts: Vec<&str> = p.splitn(2, '-').collect();
2830                            serde_json::json!({ "type": "PORT_RANGE", "startPort": parts[0], "endPort": parts.get(1).unwrap_or(&"") })
2831                        } else {
2832                            serde_json::json!({ "type": "PORT_NUMBER", "value": p })
2833                        }
2834                    })
2835                    .collect();
2836                serde_json::json!({
2837                    "type": "PORT",
2838                    "portFilter": {
2839                        "type": "PORTS",
2840                        "items": items,
2841                        "matchOpposite": match_opposite,
2842                    }
2843                })
2844            }
2845        };
2846        obj.as_object_mut()
2847            .expect("json! produces object")
2848            .insert("trafficFilter".into(), traffic_filter);
2849    }
2850
2851    obj
2852}
2853
2854fn wifi_security_mode_name(mode: crate::model::WifiSecurityMode) -> &'static str {
2855    match mode {
2856        crate::model::WifiSecurityMode::Open => "OPEN",
2857        crate::model::WifiSecurityMode::Wpa2Personal => "WPA2_PERSONAL",
2858        crate::model::WifiSecurityMode::Wpa3Personal => "WPA3_PERSONAL",
2859        crate::model::WifiSecurityMode::Wpa2Wpa3Personal => "WPA2_WPA3_PERSONAL",
2860        crate::model::WifiSecurityMode::Wpa2Enterprise => "WPA2_ENTERPRISE",
2861        crate::model::WifiSecurityMode::Wpa3Enterprise => "WPA3_ENTERPRISE",
2862        crate::model::WifiSecurityMode::Wpa2Wpa3Enterprise => "WPA2_WPA3_ENTERPRISE",
2863    }
2864}
2865
2866fn wifi_payload_name(name: &str, ssid: &str) -> String {
2867    if name.is_empty() {
2868        ssid.to_owned()
2869    } else {
2870        name.to_owned()
2871    }
2872}
2873
2874fn wifi_frequency_values(frequencies: &[f32]) -> Vec<serde_json::Value> {
2875    frequencies
2876        .iter()
2877        .map(|frequency| serde_json::Value::from(f64::from(*frequency)))
2878        .collect()
2879}
2880
2881fn ensure_wifi_payload_defaults(
2882    body: &mut serde_json::Map<String, serde_json::Value>,
2883    broadcast_type: &str,
2884) {
2885    body.entry("clientIsolationEnabled")
2886        .or_insert(serde_json::Value::Bool(false));
2887    body.entry("multicastToUnicastConversionEnabled")
2888        .or_insert(serde_json::Value::Bool(false));
2889    body.entry("hideName")
2890        .or_insert(serde_json::Value::Bool(false));
2891    body.entry("uapsdEnabled")
2892        .or_insert(serde_json::Value::Bool(true));
2893
2894    if broadcast_type == "STANDARD" {
2895        body.entry("broadcastingFrequenciesGHz")
2896            .or_insert_with(|| serde_json::Value::Array(wifi_frequency_values(&[2.4, 5.0])));
2897        body.entry("mloEnabled")
2898            .or_insert(serde_json::Value::Bool(false));
2899        body.entry("bandSteeringEnabled")
2900            .or_insert(serde_json::Value::Bool(false));
2901        body.entry("arpProxyEnabled")
2902            .or_insert(serde_json::Value::Bool(false));
2903        body.entry("bssTransitionEnabled")
2904            .or_insert(serde_json::Value::Bool(false));
2905        body.entry("advertiseDeviceName")
2906            .or_insert(serde_json::Value::Bool(false));
2907    }
2908}
2909
2910fn build_create_wifi_broadcast_payload(
2911    req: &crate::command::CreateWifiBroadcastRequest,
2912) -> crate::integration_types::WifiBroadcastCreateUpdate {
2913    let broadcast_type = req
2914        .broadcast_type
2915        .clone()
2916        .unwrap_or_else(|| "STANDARD".into());
2917
2918    let mut body = serde_json::Map::new();
2919    let mut security_configuration = serde_json::Map::new();
2920    security_configuration.insert(
2921        "mode".into(),
2922        serde_json::Value::String(wifi_security_mode_name(req.security_mode).into()),
2923    );
2924    if let Some(passphrase) = req.passphrase.clone() {
2925        security_configuration.insert("passphrase".into(), serde_json::Value::String(passphrase));
2926    }
2927    body.insert(
2928        "securityConfiguration".into(),
2929        serde_json::Value::Object(security_configuration),
2930    );
2931
2932    if let Some(network_id) = &req.network_id {
2933        body.insert(
2934            "network".into(),
2935            serde_json::json!({ "id": network_id.to_string() }),
2936        );
2937    }
2938    body.insert("hideName".into(), serde_json::Value::Bool(req.hide_ssid));
2939    if req.band_steering {
2940        body.insert("bandSteeringEnabled".into(), serde_json::Value::Bool(true));
2941    }
2942    if req.fast_roaming {
2943        body.insert("bssTransitionEnabled".into(), serde_json::Value::Bool(true));
2944    }
2945    if let Some(frequencies) = req.frequencies_ghz.as_ref() {
2946        body.insert(
2947            "broadcastingFrequenciesGHz".into(),
2948            serde_json::Value::Array(wifi_frequency_values(frequencies)),
2949        );
2950    }
2951    ensure_wifi_payload_defaults(&mut body, &broadcast_type);
2952
2953    crate::integration_types::WifiBroadcastCreateUpdate {
2954        name: wifi_payload_name(&req.name, &req.ssid),
2955        broadcast_type,
2956        enabled: req.enabled,
2957        body,
2958    }
2959}
2960
2961fn build_update_wifi_broadcast_payload(
2962    existing: &crate::integration_types::WifiBroadcastDetailsResponse,
2963    update: &crate::command::UpdateWifiBroadcastRequest,
2964) -> crate::integration_types::WifiBroadcastCreateUpdate {
2965    let mut body: serde_json::Map<String, serde_json::Value> =
2966        existing.extra.clone().into_iter().collect();
2967
2968    body.remove("ssid");
2969    body.remove("hideSsid");
2970    body.remove("bandSteering");
2971    body.remove("fastRoaming");
2972    body.remove("frequencies");
2973
2974    if let Some(network) = existing.network.clone() {
2975        body.insert("network".into(), network);
2976    }
2977    if let Some(filter) = existing.broadcasting_device_filter.clone() {
2978        body.insert("broadcastingDeviceFilter".into(), filter);
2979    }
2980    if let Some(hidden) = update.hide_ssid {
2981        body.insert("hideName".into(), serde_json::Value::Bool(hidden));
2982    }
2983
2984    let mut security_cfg = existing
2985        .security_configuration
2986        .as_object()
2987        .cloned()
2988        .unwrap_or_default();
2989    if let Some(mode) = update.security_mode {
2990        security_cfg.insert(
2991            "mode".into(),
2992            serde_json::Value::String(wifi_security_mode_name(mode).into()),
2993        );
2994    }
2995    if let Some(passphrase) = update.passphrase.clone() {
2996        security_cfg.insert("passphrase".into(), serde_json::Value::String(passphrase));
2997    }
2998    body.insert(
2999        "securityConfiguration".into(),
3000        serde_json::Value::Object(security_cfg),
3001    );
3002    ensure_wifi_payload_defaults(&mut body, &existing.broadcast_type);
3003
3004    crate::integration_types::WifiBroadcastCreateUpdate {
3005        name: update
3006            .name
3007            .clone()
3008            .or_else(|| update.ssid.clone())
3009            .unwrap_or_else(|| existing.name.clone()),
3010        broadcast_type: existing.broadcast_type.clone(),
3011        enabled: update.enabled.unwrap_or(existing.enabled),
3012        body,
3013    }
3014}
3015
3016fn dns_policy_type_name(policy_type: crate::model::DnsPolicyType) -> &'static str {
3017    match policy_type {
3018        crate::model::DnsPolicyType::ARecord => "A",
3019        crate::model::DnsPolicyType::AaaaRecord => "AAAA",
3020        crate::model::DnsPolicyType::CnameRecord => "CNAME",
3021        crate::model::DnsPolicyType::MxRecord => "MX",
3022        crate::model::DnsPolicyType::TxtRecord => "TXT",
3023        crate::model::DnsPolicyType::SrvRecord => "SRV",
3024        crate::model::DnsPolicyType::ForwardDomain => "FORWARD_DOMAIN",
3025    }
3026}
3027
3028fn dns_policy_type_from_name(policy_type: &str) -> crate::model::DnsPolicyType {
3029    match policy_type {
3030        "A" => crate::model::DnsPolicyType::ARecord,
3031        "AAAA" => crate::model::DnsPolicyType::AaaaRecord,
3032        "CNAME" => crate::model::DnsPolicyType::CnameRecord,
3033        "MX" => crate::model::DnsPolicyType::MxRecord,
3034        "TXT" => crate::model::DnsPolicyType::TxtRecord,
3035        "SRV" => crate::model::DnsPolicyType::SrvRecord,
3036        _ => crate::model::DnsPolicyType::ForwardDomain,
3037    }
3038}
3039
3040fn validation_failed(message: impl Into<String>) -> CoreError {
3041    CoreError::ValidationFailed {
3042        message: message.into(),
3043    }
3044}
3045
3046fn dns_domain_value(
3047    domain: Option<&str>,
3048    domains: Option<&[String]>,
3049    fallback: Option<&str>,
3050) -> Option<String> {
3051    domain
3052        .map(str::to_owned)
3053        .or_else(|| domains.and_then(|values| values.first().cloned()))
3054        .or_else(|| {
3055            fallback
3056                .filter(|value| !value.is_empty())
3057                .map(str::to_owned)
3058        })
3059}
3060
3061fn insert_string_field(
3062    fields: &mut serde_json::Map<String, serde_json::Value>,
3063    key: &str,
3064    value: Option<String>,
3065) {
3066    if let Some(value) = value {
3067        fields.insert(key.into(), serde_json::Value::String(value));
3068    }
3069}
3070
3071fn insert_u16_field(
3072    fields: &mut serde_json::Map<String, serde_json::Value>,
3073    key: &str,
3074    value: Option<u16>,
3075) {
3076    if let Some(value) = value {
3077        fields.insert(
3078            key.into(),
3079            serde_json::Value::Number(serde_json::Number::from(value)),
3080        );
3081    }
3082}
3083
3084fn insert_u32_field(
3085    fields: &mut serde_json::Map<String, serde_json::Value>,
3086    key: &str,
3087    value: Option<u32>,
3088) {
3089    if let Some(value) = value {
3090        fields.insert(
3091            key.into(),
3092            serde_json::Value::Number(serde_json::Number::from(value)),
3093        );
3094    }
3095}
3096
3097fn ensure_dns_required_string(
3098    fields: &serde_json::Map<String, serde_json::Value>,
3099    key: &str,
3100    policy_type: crate::model::DnsPolicyType,
3101) -> Result<(), CoreError> {
3102    if fields
3103        .get(key)
3104        .and_then(serde_json::Value::as_str)
3105        .is_some()
3106    {
3107        Ok(())
3108    } else {
3109        Err(validation_failed(format!(
3110            "{policy_type:?} DNS policy requires `{key}`"
3111        )))
3112    }
3113}
3114
3115fn ensure_dns_required_number(
3116    fields: &serde_json::Map<String, serde_json::Value>,
3117    key: &str,
3118    policy_type: crate::model::DnsPolicyType,
3119) -> Result<(), CoreError> {
3120    if fields
3121        .get(key)
3122        .and_then(serde_json::Value::as_u64)
3123        .is_some()
3124    {
3125        Ok(())
3126    } else {
3127        Err(validation_failed(format!(
3128            "{policy_type:?} DNS policy requires `{key}`"
3129        )))
3130    }
3131}
3132
3133fn validate_dns_policy_fields(
3134    policy_type: crate::model::DnsPolicyType,
3135    fields: &serde_json::Map<String, serde_json::Value>,
3136) -> Result<(), CoreError> {
3137    ensure_dns_required_string(fields, "domain", policy_type)?;
3138
3139    match policy_type {
3140        crate::model::DnsPolicyType::ARecord => {
3141            ensure_dns_required_string(fields, "ipv4Address", policy_type)?;
3142            ensure_dns_required_number(fields, "ttlSeconds", policy_type)?;
3143        }
3144        crate::model::DnsPolicyType::AaaaRecord => {
3145            ensure_dns_required_string(fields, "ipv6Address", policy_type)?;
3146            ensure_dns_required_number(fields, "ttlSeconds", policy_type)?;
3147        }
3148        crate::model::DnsPolicyType::CnameRecord => {
3149            ensure_dns_required_string(fields, "targetDomain", policy_type)?;
3150            ensure_dns_required_number(fields, "ttlSeconds", policy_type)?;
3151        }
3152        crate::model::DnsPolicyType::MxRecord => {
3153            ensure_dns_required_string(fields, "mailServerDomain", policy_type)?;
3154            ensure_dns_required_number(fields, "priority", policy_type)?;
3155        }
3156        crate::model::DnsPolicyType::TxtRecord => {
3157            ensure_dns_required_string(fields, "text", policy_type)?;
3158        }
3159        crate::model::DnsPolicyType::SrvRecord => {
3160            for key in ["serverDomain", "service", "protocol"] {
3161                ensure_dns_required_string(fields, key, policy_type)?;
3162            }
3163            for key in ["port", "priority", "weight"] {
3164                ensure_dns_required_number(fields, key, policy_type)?;
3165            }
3166        }
3167        crate::model::DnsPolicyType::ForwardDomain => {
3168            ensure_dns_required_string(fields, "ipAddress", policy_type)?;
3169        }
3170    }
3171
3172    Ok(())
3173}
3174
3175fn build_create_dns_policy_fields(
3176    req: &crate::command::CreateDnsPolicyRequest,
3177) -> Result<serde_json::Map<String, serde_json::Value>, CoreError> {
3178    let mut fields = serde_json::Map::new();
3179    let domain = dns_domain_value(
3180        req.domain.as_deref(),
3181        req.domains.as_deref(),
3182        Some(req.name.as_str()),
3183    )
3184    .ok_or_else(|| validation_failed("DNS policy requires `domain`"))?;
3185    fields.insert("domain".into(), serde_json::Value::String(domain));
3186
3187    match req.policy_type {
3188        crate::model::DnsPolicyType::ARecord => {
3189            insert_string_field(
3190                &mut fields,
3191                "ipv4Address",
3192                req.ipv4_address.clone().or_else(|| req.value.clone()),
3193            );
3194            insert_u32_field(&mut fields, "ttlSeconds", req.ttl_seconds);
3195        }
3196        crate::model::DnsPolicyType::AaaaRecord => {
3197            insert_string_field(
3198                &mut fields,
3199                "ipv6Address",
3200                req.ipv6_address.clone().or_else(|| req.value.clone()),
3201            );
3202            insert_u32_field(&mut fields, "ttlSeconds", req.ttl_seconds);
3203        }
3204        crate::model::DnsPolicyType::CnameRecord => {
3205            insert_string_field(
3206                &mut fields,
3207                "targetDomain",
3208                req.target_domain.clone().or_else(|| req.value.clone()),
3209            );
3210            insert_u32_field(&mut fields, "ttlSeconds", req.ttl_seconds);
3211        }
3212        crate::model::DnsPolicyType::MxRecord => {
3213            insert_string_field(
3214                &mut fields,
3215                "mailServerDomain",
3216                req.mail_server_domain.clone().or_else(|| req.value.clone()),
3217            );
3218            insert_u16_field(&mut fields, "priority", req.priority);
3219        }
3220        crate::model::DnsPolicyType::TxtRecord => {
3221            insert_string_field(
3222                &mut fields,
3223                "text",
3224                req.text.clone().or_else(|| req.value.clone()),
3225            );
3226        }
3227        crate::model::DnsPolicyType::SrvRecord => {
3228            insert_string_field(
3229                &mut fields,
3230                "serverDomain",
3231                req.server_domain.clone().or_else(|| req.value.clone()),
3232            );
3233            insert_string_field(&mut fields, "service", req.service.clone());
3234            insert_string_field(&mut fields, "protocol", req.protocol.clone());
3235            insert_u16_field(&mut fields, "port", req.port);
3236            insert_u16_field(&mut fields, "priority", req.priority);
3237            insert_u16_field(&mut fields, "weight", req.weight);
3238        }
3239        crate::model::DnsPolicyType::ForwardDomain => {
3240            insert_string_field(
3241                &mut fields,
3242                "ipAddress",
3243                req.ip_address
3244                    .clone()
3245                    .or_else(|| req.upstream.clone())
3246                    .or_else(|| req.value.clone()),
3247            );
3248        }
3249    }
3250
3251    validate_dns_policy_fields(req.policy_type, &fields)?;
3252    Ok(fields)
3253}
3254
3255fn build_update_dns_policy_fields(
3256    existing: &crate::integration_types::DnsPolicyResponse,
3257    update: &crate::command::UpdateDnsPolicyRequest,
3258) -> Result<serde_json::Map<String, serde_json::Value>, CoreError> {
3259    let policy_type = dns_policy_type_from_name(&existing.policy_type);
3260    let mut fields: serde_json::Map<String, serde_json::Value> =
3261        existing.extra.clone().into_iter().collect();
3262
3263    if let Some(domain) = dns_domain_value(
3264        update.domain.as_deref(),
3265        update.domains.as_deref(),
3266        existing.domain.as_deref(),
3267    ) {
3268        fields.insert("domain".into(), serde_json::Value::String(domain));
3269    }
3270
3271    match policy_type {
3272        crate::model::DnsPolicyType::ARecord => {
3273            insert_string_field(
3274                &mut fields,
3275                "ipv4Address",
3276                update.ipv4_address.clone().or_else(|| update.value.clone()),
3277            );
3278            insert_u32_field(&mut fields, "ttlSeconds", update.ttl_seconds);
3279        }
3280        crate::model::DnsPolicyType::AaaaRecord => {
3281            insert_string_field(
3282                &mut fields,
3283                "ipv6Address",
3284                update.ipv6_address.clone().or_else(|| update.value.clone()),
3285            );
3286            insert_u32_field(&mut fields, "ttlSeconds", update.ttl_seconds);
3287        }
3288        crate::model::DnsPolicyType::CnameRecord => {
3289            insert_string_field(
3290                &mut fields,
3291                "targetDomain",
3292                update
3293                    .target_domain
3294                    .clone()
3295                    .or_else(|| update.value.clone()),
3296            );
3297            insert_u32_field(&mut fields, "ttlSeconds", update.ttl_seconds);
3298        }
3299        crate::model::DnsPolicyType::MxRecord => {
3300            insert_string_field(
3301                &mut fields,
3302                "mailServerDomain",
3303                update
3304                    .mail_server_domain
3305                    .clone()
3306                    .or_else(|| update.value.clone()),
3307            );
3308            insert_u16_field(&mut fields, "priority", update.priority);
3309        }
3310        crate::model::DnsPolicyType::TxtRecord => {
3311            insert_string_field(
3312                &mut fields,
3313                "text",
3314                update.text.clone().or_else(|| update.value.clone()),
3315            );
3316        }
3317        crate::model::DnsPolicyType::SrvRecord => {
3318            insert_string_field(
3319                &mut fields,
3320                "serverDomain",
3321                update
3322                    .server_domain
3323                    .clone()
3324                    .or_else(|| update.value.clone()),
3325            );
3326            insert_string_field(&mut fields, "service", update.service.clone());
3327            insert_string_field(&mut fields, "protocol", update.protocol.clone());
3328            insert_u16_field(&mut fields, "port", update.port);
3329            insert_u16_field(&mut fields, "priority", update.priority);
3330            insert_u16_field(&mut fields, "weight", update.weight);
3331        }
3332        crate::model::DnsPolicyType::ForwardDomain => {
3333            insert_string_field(
3334                &mut fields,
3335                "ipAddress",
3336                update
3337                    .ip_address
3338                    .clone()
3339                    .or_else(|| update.upstream.clone())
3340                    .or_else(|| update.value.clone()),
3341            );
3342        }
3343    }
3344
3345    validate_dns_policy_fields(policy_type, &fields)?;
3346    Ok(fields)
3347}
3348
3349fn traffic_matching_list_items(
3350    entries: &[String],
3351    raw_items: Option<&[serde_json::Value]>,
3352) -> Vec<serde_json::Value> {
3353    raw_items.map_or_else(
3354        || {
3355            entries
3356                .iter()
3357                .cloned()
3358                .map(serde_json::Value::String)
3359                .collect()
3360        },
3361        <[serde_json::Value]>::to_vec,
3362    )
3363}
3364
3365#[cfg(test)]
3366mod tests {
3367    use super::{
3368        build_create_dns_policy_fields, build_create_wifi_broadcast_payload, parse_ipv4_cidr,
3369        traffic_matching_list_items,
3370    };
3371    use crate::command::{CreateDnsPolicyRequest, CreateWifiBroadcastRequest};
3372    use crate::model::{DnsPolicyType, WifiSecurityMode};
3373    use serde_json::json;
3374
3375    #[test]
3376    fn parse_ipv4_cidr_accepts_valid_input() {
3377        let (host, prefix) = parse_ipv4_cidr("192.168.10.1/24").expect("valid CIDR");
3378        assert_eq!(host.to_string(), "192.168.10.1");
3379        assert_eq!(prefix, 24);
3380    }
3381
3382    #[test]
3383    fn parse_ipv4_cidr_rejects_invalid_prefix() {
3384        assert!(parse_ipv4_cidr("192.168.10.1/40").is_err());
3385    }
3386
3387    #[test]
3388    fn parse_ipv4_cidr_rejects_missing_prefix() {
3389        assert!(parse_ipv4_cidr("192.168.10.1").is_err());
3390    }
3391
3392    #[test]
3393    fn wifi_create_payload_uses_integration_field_names() {
3394        let payload = build_create_wifi_broadcast_payload(&CreateWifiBroadcastRequest {
3395            name: "Main".into(),
3396            ssid: "Main".into(),
3397            security_mode: WifiSecurityMode::Wpa2Personal,
3398            passphrase: Some("supersecret".into()),
3399            enabled: true,
3400            network_id: None,
3401            hide_ssid: true,
3402            broadcast_type: Some("STANDARD".into()),
3403            frequencies_ghz: Some(vec![2.4, 5.0]),
3404            band_steering: true,
3405            fast_roaming: true,
3406        });
3407
3408        assert_eq!(payload.name, "Main");
3409        assert!(payload.body.get("ssid").is_none());
3410        assert_eq!(payload.body.get("hideName"), Some(&json!(true)));
3411        let frequencies = payload
3412            .body
3413            .get("broadcastingFrequenciesGHz")
3414            .and_then(serde_json::Value::as_array)
3415            .expect("frequencies array");
3416        assert_eq!(frequencies.len(), 2);
3417        assert_eq!(frequencies[1], json!(5.0));
3418        assert_eq!(payload.body.get("bandSteeringEnabled"), Some(&json!(true)));
3419        assert_eq!(payload.body.get("bssTransitionEnabled"), Some(&json!(true)));
3420    }
3421
3422    #[test]
3423    fn dns_create_fields_use_type_specific_schema_keys() {
3424        let fields = build_create_dns_policy_fields(&CreateDnsPolicyRequest {
3425            name: "example.com".into(),
3426            policy_type: DnsPolicyType::ARecord,
3427            enabled: true,
3428            domain: Some("example.com".into()),
3429            domains: None,
3430            upstream: None,
3431            value: Some("192.168.1.10".into()),
3432            ttl_seconds: Some(600),
3433            priority: None,
3434            ipv4_address: None,
3435            ipv6_address: None,
3436            target_domain: None,
3437            mail_server_domain: None,
3438            text: None,
3439            ip_address: None,
3440            server_domain: None,
3441            service: None,
3442            protocol: None,
3443            port: None,
3444            weight: None,
3445        })
3446        .expect("valid DNS fields");
3447
3448        assert_eq!(fields.get("domain"), Some(&json!("example.com")));
3449        assert_eq!(fields.get("ipv4Address"), Some(&json!("192.168.1.10")));
3450        assert_eq!(fields.get("ttlSeconds"), Some(&json!(600)));
3451        assert!(fields.get("value").is_none());
3452        assert!(fields.get("ttl").is_none());
3453    }
3454
3455    #[test]
3456    fn traffic_matching_list_items_prefer_raw_payloads() {
3457        let raw_items = [json!({"type": "PORT_NUMBER", "value": 443})];
3458        let items = traffic_matching_list_items(&["80".into()], Some(&raw_items));
3459        assert_eq!(items, vec![json!({"type": "PORT_NUMBER", "value": 443})]);
3460    }
3461}