1use std::collections::HashMap;
8use std::net::{Ipv4Addr, Ipv6Addr};
9use std::sync::Arc;
10use std::time::Duration;
11
12use tokio::sync::{Mutex, broadcast, mpsc, watch};
13use tokio::task::JoinHandle;
14use tokio_util::sync::CancellationToken;
15use tracing::{debug, info, warn};
16
17use crate::command::{Command, CommandEnvelope, CommandResult};
18use crate::config::{AuthCredentials, ControllerConfig, TlsVerification};
19use crate::error::CoreError;
20use crate::model::{
21 AclRule, Admin, Alarm, Client, Country, Device, DnsPolicy, DpiApplication, DpiCategory,
22 EntityId, Event, FirewallAction, FirewallPolicy, FirewallZone, HealthSummary, MacAddress,
23 Network, NetworkManagement, NetworkPurpose, RadiusProfile, Site, SysInfo, SystemInfo,
24 TrafficMatchingList, Voucher, VpnServer, VpnTunnel, WanInterface, WifiBroadcast,
25};
26use crate::store::DataStore;
27use crate::stream::EntityStream;
28
29use unifly_api::transport::{TlsMode, TransportConfig};
30use unifly_api::websocket::{ReconnectConfig, WebSocketHandle};
31use unifly_api::{IntegrationClient, LegacyClient};
32
33const COMMAND_CHANNEL_SIZE: usize = 64;
34const EVENT_CHANNEL_SIZE: usize = 256;
35
36#[derive(Debug, Clone, PartialEq, Eq)]
40pub enum ConnectionState {
41 Disconnected,
42 Connecting,
43 Connected,
44 Reconnecting { attempt: u32 },
45 Failed,
46}
47
48#[derive(Clone)]
56pub struct Controller {
57 inner: Arc<ControllerInner>,
58}
59
60struct ControllerInner {
61 config: ControllerConfig,
62 store: Arc<DataStore>,
63 connection_state: watch::Sender<ConnectionState>,
64 event_tx: broadcast::Sender<Arc<Event>>,
65 command_tx: Mutex<mpsc::Sender<CommandEnvelope>>,
66 command_rx: Mutex<Option<mpsc::Receiver<CommandEnvelope>>>,
67 cancel: CancellationToken,
68 cancel_child: Mutex<CancellationToken>,
71 legacy_client: Mutex<Option<LegacyClient>>,
72 integration_client: Mutex<Option<IntegrationClient>>,
73 site_id: Mutex<Option<uuid::Uuid>>,
75 ws_handle: Mutex<Option<WebSocketHandle>>,
77 task_handles: Mutex<Vec<JoinHandle<()>>>,
78 warnings: Mutex<Vec<String>>,
80}
81
82impl Controller {
83 pub fn new(config: ControllerConfig) -> Self {
86 let store = Arc::new(DataStore::new());
87 let (connection_state, _) = watch::channel(ConnectionState::Disconnected);
88 let (event_tx, _) = broadcast::channel(EVENT_CHANNEL_SIZE);
89 let (command_tx, command_rx) = mpsc::channel(COMMAND_CHANNEL_SIZE);
90 let cancel = CancellationToken::new();
91 let cancel_child = cancel.child_token();
92
93 Self {
94 inner: Arc::new(ControllerInner {
95 config,
96 store,
97 connection_state,
98 event_tx,
99 command_tx: Mutex::new(command_tx),
100 command_rx: Mutex::new(Some(command_rx)),
101 cancel,
102 cancel_child: Mutex::new(cancel_child),
103 legacy_client: Mutex::new(None),
104 integration_client: Mutex::new(None),
105 warnings: Mutex::new(Vec::new()),
106 site_id: Mutex::new(None),
107 ws_handle: Mutex::new(None),
108 task_handles: Mutex::new(Vec::new()),
109 }),
110 }
111 }
112
113 pub fn config(&self) -> &ControllerConfig {
115 &self.inner.config
116 }
117
118 pub fn store(&self) -> &Arc<DataStore> {
120 &self.inner.store
121 }
122
123 #[allow(clippy::cognitive_complexity, clippy::too_many_lines)]
131 pub async fn connect(&self) -> Result<(), CoreError> {
132 let _ = self
133 .inner
134 .connection_state
135 .send(ConnectionState::Connecting);
136
137 let child = self.inner.cancel.child_token();
139 *self.inner.cancel_child.lock().await = child.clone();
140
141 let config = &self.inner.config;
142 let transport = build_transport(config);
143
144 match &config.auth {
145 AuthCredentials::ApiKey(api_key) => {
146 let platform = LegacyClient::detect_platform(&config.url).await?;
148 debug!(?platform, "detected controller platform");
149
150 let integration = IntegrationClient::from_api_key(
152 config.url.as_str(),
153 api_key,
154 &transport,
155 platform,
156 )?;
157
158 let site_id = resolve_site_id(&integration, &config.site).await?;
160 debug!(site_id = %site_id, "resolved Integration API site UUID");
161
162 *self.inner.integration_client.lock().await = Some(integration);
163 *self.inner.site_id.lock().await = Some(site_id);
164
165 match setup_legacy_client(config, &transport).await {
169 Ok(client) => {
170 *self.inner.legacy_client.lock().await = Some(client);
171 debug!("legacy client available as supplement");
172 }
173 Err(e) => {
174 debug!(error = %e, "legacy client unavailable (non-fatal with API key auth)");
175 }
176 }
177 }
178 AuthCredentials::Credentials { username, password } => {
179 let platform = LegacyClient::detect_platform(&config.url).await?;
181 debug!(?platform, "detected controller platform");
182
183 let client = LegacyClient::new(
184 config.url.clone(),
185 config.site.clone(),
186 platform,
187 &transport,
188 )?;
189 client.login(username, password).await?;
190 debug!("session authentication successful");
191
192 *self.inner.legacy_client.lock().await = Some(client);
193 }
194 AuthCredentials::Hybrid {
195 api_key,
196 username,
197 password,
198 } => {
199 let platform = LegacyClient::detect_platform(&config.url).await?;
201 debug!(?platform, "detected controller platform (hybrid)");
202
203 let integration = IntegrationClient::from_api_key(
205 config.url.as_str(),
206 api_key,
207 &transport,
208 platform,
209 )?;
210
211 let site_id = resolve_site_id(&integration, &config.site).await?;
212 debug!(site_id = %site_id, "resolved Integration API site UUID");
213
214 *self.inner.integration_client.lock().await = Some(integration);
215 *self.inner.site_id.lock().await = Some(site_id);
216
217 match LegacyClient::new(
221 config.url.clone(),
222 config.site.clone(),
223 platform,
224 &transport,
225 ) {
226 Ok(client) => match client.login(username, password).await {
227 Ok(()) => {
228 debug!("legacy session authentication successful (hybrid)");
229 *self.inner.legacy_client.lock().await = Some(client);
230 }
231 Err(e) => {
232 let msg = format!(
233 "Legacy login failed: {e} — events, health stats, and client traffic will be unavailable"
234 );
235 warn!("{msg}");
236 self.inner.warnings.lock().await.push(msg);
237 }
238 },
239 Err(e) => {
240 let msg = format!("Legacy client setup failed: {e}");
241 warn!("{msg}");
242 self.inner.warnings.lock().await.push(msg);
243 }
244 }
245 }
246 AuthCredentials::Cloud { api_key, host_id } => {
247 let integration = IntegrationClient::from_api_key(
248 config.url.as_str(),
249 api_key,
250 &transport,
251 unifly_api::ControllerPlatform::Cloud,
252 )?;
253
254 let site_id = if let Ok(uuid) = uuid::Uuid::parse_str(&config.site) {
255 uuid
256 } else if let Ok(uuid) = uuid::Uuid::parse_str(host_id) {
257 uuid
258 } else {
259 resolve_site_id(&integration, &config.site).await?
260 };
261 debug!(site_id = %site_id, "resolved cloud Integration API site UUID");
262
263 *self.inner.integration_client.lock().await = Some(integration);
264 *self.inner.site_id.lock().await = Some(site_id);
265
266 let msg =
267 "Cloud auth mode active: Legacy API and WebSocket features are unavailable"
268 .to_string();
269 self.inner.warnings.lock().await.push(msg);
270 }
271 }
272
273 self.full_refresh().await?;
275
276 let mut handles = self.inner.task_handles.lock().await;
278
279 if let Some(rx) = self.inner.command_rx.lock().await.take() {
280 let ctrl = self.clone();
281 handles.push(tokio::spawn(command_processor_task(ctrl, rx)));
282 }
283
284 let interval_secs = config.refresh_interval_secs;
285 if interval_secs > 0 {
286 let ctrl = self.clone();
287 let cancel = child.clone();
288 handles.push(tokio::spawn(refresh_task(ctrl, interval_secs, cancel)));
289 }
290
291 if config.websocket_enabled {
293 self.spawn_websocket(&child, &mut handles).await;
294 }
295
296 let _ = self.inner.connection_state.send(ConnectionState::Connected);
297 info!("connected to controller");
298 Ok(())
299 }
300
301 async fn spawn_websocket(&self, cancel: &CancellationToken, handles: &mut Vec<JoinHandle<()>>) {
306 let legacy_guard = self.inner.legacy_client.lock().await;
307 let Some(ref legacy) = *legacy_guard else {
308 debug!("no legacy client — WebSocket unavailable");
309 return;
310 };
311
312 let platform = legacy.platform();
313 let Some(ws_path_template) = platform.websocket_path() else {
314 debug!("platform does not support WebSocket");
315 return;
316 };
317
318 let ws_path = ws_path_template.replace("{site}", &self.inner.config.site);
319 let base_url = &self.inner.config.url;
320 let scheme = if base_url.scheme() == "https" {
321 "wss"
322 } else {
323 "ws"
324 };
325 let host = base_url.host_str().unwrap_or("localhost");
326 let ws_url_str = match base_url.port() {
327 Some(p) => format!("{scheme}://{host}:{p}{ws_path}"),
328 None => format!("{scheme}://{host}{ws_path}"),
329 };
330 let ws_url = match url::Url::parse(&ws_url_str) {
331 Ok(u) => u,
332 Err(e) => {
333 warn!(error = %e, url = %ws_url_str, "invalid WebSocket URL");
334 return;
335 }
336 };
337
338 let cookie = legacy.cookie_header();
339 drop(legacy_guard);
340
341 if cookie.is_none() {
342 warn!("no session cookie — WebSocket requires legacy auth (skipping)");
343 return;
344 }
345
346 let ws_tls = tls_to_transport(&self.inner.config.tls);
347 let ws_cancel = cancel.child_token();
348 let handle = match WebSocketHandle::connect(
349 ws_url,
350 ReconnectConfig::default(),
351 ws_cancel.clone(),
352 cookie,
353 ws_tls,
354 ) {
355 Ok(h) => h,
356 Err(e) => {
357 warn!(error = %e, "WebSocket connection failed (non-fatal)");
358 return;
359 }
360 };
361
362 let mut ws_rx = handle.subscribe();
366 let event_tx = self.inner.event_tx.clone();
367 let store = Arc::clone(&self.inner.store);
368 let bridge_cancel = ws_cancel;
369
370 handles.push(tokio::spawn(async move {
371 loop {
372 tokio::select! {
373 biased;
374 () = bridge_cancel.cancelled() => break,
375 result = ws_rx.recv() => {
376 match result {
377 Ok(ws_event) => {
378 if ws_event.key == "device:sync" || ws_event.key == "device:update" {
380 apply_device_sync(&store, &ws_event.extra);
381 }
382
383 let event = crate::model::event::Event::from(
384 (*ws_event).clone(),
385 );
386 let _ = event_tx.send(Arc::new(event));
387 }
388 Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
389 warn!(skipped = n, "WS bridge: receiver lagged");
390 }
391 Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
392 }
393 }
394 }
395 }
396 }));
397
398 *self.inner.ws_handle.lock().await = Some(handle);
399 info!("WebSocket event stream spawned (handshake in progress)");
400 }
401
402 pub async fn disconnect(&self) {
407 self.inner.cancel_child.lock().await.cancel();
409
410 let mut handles = self.inner.task_handles.lock().await;
412 for handle in handles.drain(..) {
413 let _ = handle.await;
414 }
415
416 if matches!(
418 self.inner.config.auth,
419 AuthCredentials::Credentials { .. } | AuthCredentials::Hybrid { .. }
420 ) {
421 if let Some(ref client) = *self.inner.legacy_client.lock().await {
422 if let Err(e) = client.logout().await {
423 warn!(error = %e, "logout failed (non-fatal)");
424 }
425 }
426 }
427
428 if let Some(handle) = self.inner.ws_handle.lock().await.take() {
430 handle.shutdown();
431 }
432
433 *self.inner.legacy_client.lock().await = None;
434 *self.inner.integration_client.lock().await = None;
435 *self.inner.site_id.lock().await = None;
436
437 {
440 let (tx, rx) = mpsc::channel(COMMAND_CHANNEL_SIZE);
441 *self.inner.command_tx.lock().await = tx;
442 *self.inner.command_rx.lock().await = Some(rx);
443 }
444
445 let _ = self
446 .inner
447 .connection_state
448 .send(ConnectionState::Disconnected);
449 debug!("disconnected");
450 }
451
452 #[allow(clippy::cognitive_complexity, clippy::too_many_lines)]
458 pub async fn full_refresh(&self) -> Result<(), CoreError> {
459 let integration_guard = self.inner.integration_client.lock().await;
460 let site_id = *self.inner.site_id.lock().await;
461
462 if let (Some(integration), Some(sid)) = (integration_guard.as_ref(), site_id) {
463 let page_limit = 200;
465
466 let (devices_res, clients_res, networks_res, wifi_res) = tokio::join!(
467 integration.paginate_all(page_limit, |off, lim| {
468 integration.list_devices(&sid, off, lim)
469 }),
470 integration.paginate_all(page_limit, |off, lim| {
471 integration.list_clients(&sid, off, lim)
472 }),
473 integration.paginate_all(page_limit, |off, lim| {
474 integration.list_networks(&sid, off, lim)
475 }),
476 integration.paginate_all(page_limit, |off, lim| {
477 integration.list_wifi_broadcasts(&sid, off, lim)
478 }),
479 );
480
481 let (policies_res, zones_res, acls_res, dns_res, vouchers_res) = tokio::join!(
482 integration.paginate_all(page_limit, |off, lim| {
483 integration.list_firewall_policies(&sid, off, lim)
484 }),
485 integration.paginate_all(page_limit, |off, lim| {
486 integration.list_firewall_zones(&sid, off, lim)
487 }),
488 integration.paginate_all(page_limit, |off, lim| {
489 integration.list_acl_rules(&sid, off, lim)
490 }),
491 integration.paginate_all(page_limit, |off, lim| {
492 integration.list_dns_policies(&sid, off, lim)
493 }),
494 integration.paginate_all(page_limit, |off, lim| {
495 integration.list_vouchers(&sid, off, lim)
496 }),
497 );
498
499 let (sites_res, tml_res) = tokio::join!(
500 integration.paginate_all(50, |off, lim| { integration.list_sites(off, lim) }),
501 integration.paginate_all(page_limit, |off, lim| {
502 integration.list_traffic_matching_lists(&sid, off, lim)
503 }),
504 );
505
506 let devices: Vec<Device> = devices_res?.into_iter().map(Device::from).collect();
508 let mut clients: Vec<Client> = clients_res?.into_iter().map(Client::from).collect();
509 let network_ids: Vec<uuid::Uuid> = networks_res?.into_iter().map(|n| n.id).collect();
511 info!(
512 network_count = network_ids.len(),
513 "fetching network details"
514 );
515 let networks: Vec<Network> = {
516 let futs = network_ids.into_iter().map(|nid| async move {
517 match integration.get_network(&sid, &nid).await {
518 Ok(detail) => Some(Network::from(detail)),
519 Err(e) => {
520 warn!(network_id = %nid, error = %e, "network detail fetch failed");
521 None
522 }
523 }
524 });
525 futures_util::future::join_all(futs)
526 .await
527 .into_iter()
528 .flatten()
529 .collect()
530 };
531 let wifi: Vec<WifiBroadcast> = wifi_res?.into_iter().map(WifiBroadcast::from).collect();
532 let policies: Vec<FirewallPolicy> = policies_res?
533 .into_iter()
534 .map(FirewallPolicy::from)
535 .collect();
536 let zones: Vec<FirewallZone> = zones_res?.into_iter().map(FirewallZone::from).collect();
537 let sites: Vec<Site> = sites_res?.into_iter().map(Site::from).collect();
538 let traffic_matching_lists: Vec<TrafficMatchingList> = tml_res?
539 .into_iter()
540 .map(TrafficMatchingList::from)
541 .collect();
542
543 let acls: Vec<AclRule> = unwrap_or_empty("acl/rules", acls_res);
545 let dns: Vec<DnsPolicy> = unwrap_or_empty("dns/policies", dns_res);
546 let vouchers: Vec<Voucher> = unwrap_or_empty("vouchers", vouchers_res);
547
548 info!(
550 device_count = devices.len(),
551 "enriching devices with statistics"
552 );
553 let mut devices = {
554 let futs = devices.into_iter().map(|mut device| async {
555 if let EntityId::Uuid(device_uuid) = &device.id {
556 match integration.get_device_statistics(&sid, device_uuid).await {
557 Ok(stats_resp) => {
558 device.stats =
559 crate::convert::device_stats_from_integration(&stats_resp);
560 }
561 Err(e) => {
562 warn!(
563 device = ?device.name,
564 error = %e,
565 "device stats fetch failed"
566 );
567 }
568 }
569 }
570 device
571 });
572 futures_util::future::join_all(futs).await
573 };
574
575 drop(integration_guard);
576
577 let (legacy_events, legacy_health, legacy_clients, legacy_devices): (
579 Vec<Event>,
580 Vec<HealthSummary>,
581 Vec<unifly_api::legacy::models::LegacyClientEntry>,
582 Vec<unifly_api::legacy::models::LegacyDevice>,
583 ) = match *self.inner.legacy_client.lock().await {
584 Some(ref legacy) => {
585 let (events_res, health_res, clients_res, devices_res) = tokio::join!(
586 legacy.list_events(Some(100)),
587 legacy.get_health(),
588 legacy.list_clients(),
589 legacy.list_devices(),
590 );
591
592 let events = match events_res {
593 Ok(raw) => {
594 let evts: Vec<Event> = raw.into_iter().map(Event::from).collect();
595 for evt in &evts {
596 let _ = self.inner.event_tx.send(Arc::new(evt.clone()));
597 }
598 evts
599 }
600 Err(e) => {
601 warn!(error = %e, "legacy event fetch failed (non-fatal)");
602 Vec::new()
603 }
604 };
605
606 let health = match health_res {
607 Ok(raw) => convert_health_summaries(raw),
608 Err(e) => {
609 warn!(error = %e, "legacy health fetch failed (non-fatal)");
610 Vec::new()
611 }
612 };
613
614 let lc = match clients_res {
615 Ok(raw) => raw,
616 Err(e) => {
617 warn!(
618 error = %e,
619 "legacy client fetch failed (non-fatal)"
620 );
621 Vec::new()
622 }
623 };
624
625 let ld = match devices_res {
626 Ok(raw) => raw,
627 Err(e) => {
628 warn!(error = %e, "legacy device fetch failed (non-fatal)");
629 Vec::new()
630 }
631 };
632
633 (events, health, lc, ld)
634 }
635 None => (Vec::new(), Vec::new(), Vec::new(), Vec::new()),
636 };
637
638 if !legacy_clients.is_empty() {
642 let legacy_by_ip: HashMap<&str, &unifly_api::legacy::models::LegacyClientEntry> =
643 legacy_clients
644 .iter()
645 .filter_map(|lc| lc.ip.as_deref().map(|ip| (ip, lc)))
646 .collect();
647 let mut merged = 0u32;
648 for client in &mut clients {
649 let ip_key = client.ip.map(|ip| ip.to_string());
650 if let Some(lc) = ip_key.as_deref().and_then(|ip| legacy_by_ip.get(ip)) {
651 if client.tx_bytes.is_none() {
652 client.tx_bytes = lc.tx_bytes.and_then(|b| u64::try_from(b).ok());
653 }
654 if client.rx_bytes.is_none() {
655 client.rx_bytes = lc.rx_bytes.and_then(|b| u64::try_from(b).ok());
656 }
657 if client.hostname.is_none() {
658 client.hostname.clone_from(&lc.hostname);
659 }
660 if client.wireless.is_none() {
662 let legacy_client: Client = Client::from((*lc).clone());
663 client.wireless = legacy_client.wireless;
664 if client.uplink_device_mac.is_none() {
665 client.uplink_device_mac = legacy_client.uplink_device_mac;
666 }
667 }
668 merged += 1;
669 }
670 }
671 debug!(
672 total_clients = clients.len(),
673 legacy_available = legacy_by_ip.len(),
674 merged,
675 "client traffic merge (by IP)"
676 );
677 }
678
679 if !legacy_devices.is_empty() {
681 let legacy_by_mac: HashMap<&str, &unifly_api::legacy::models::LegacyDevice> =
682 legacy_devices.iter().map(|d| (d.mac.as_str(), d)).collect();
683 for device in &mut devices {
684 if let Some(ld) = legacy_by_mac.get(device.mac.as_str()) {
685 if device.client_count.is_none() {
686 device.client_count = ld.num_sta.and_then(|n| n.try_into().ok());
687 }
688 if device.wan_ipv6.is_none() {
689 device.wan_ipv6 = parse_legacy_device_wan_ipv6(&ld.extra);
690 }
691 }
692 }
693 }
694
695 if !legacy_health.is_empty() {
697 self.inner
698 .store
699 .site_health
700 .send_modify(|h| *h = Arc::new(legacy_health));
701 }
702
703 self.inner
704 .store
705 .apply_integration_snapshot(crate::store::RefreshSnapshot {
706 devices,
707 clients,
708 networks,
709 wifi,
710 policies,
711 zones,
712 acls,
713 dns,
714 vouchers,
715 sites,
716 events: legacy_events,
717 traffic_matching_lists,
718 });
719 } else {
720 drop(integration_guard);
722
723 let legacy_guard = self.inner.legacy_client.lock().await;
724 let legacy = legacy_guard
725 .as_ref()
726 .ok_or(CoreError::ControllerDisconnected)?;
727
728 let (devices_res, clients_res, events_res) = tokio::join!(
729 legacy.list_devices(),
730 legacy.list_clients(),
731 legacy.list_events(Some(100)),
732 );
733
734 let devices: Vec<Device> = devices_res?.into_iter().map(Device::from).collect();
735 let clients: Vec<Client> = clients_res?.into_iter().map(Client::from).collect();
736 let events: Vec<Event> = events_res?.into_iter().map(Event::from).collect();
737
738 drop(legacy_guard);
739
740 for event in &events {
741 let _ = self.inner.event_tx.send(Arc::new(event.clone()));
742 }
743
744 self.inner
745 .store
746 .apply_integration_snapshot(crate::store::RefreshSnapshot {
747 devices,
748 clients,
749 networks: Vec::new(),
750 wifi: Vec::new(),
751 policies: Vec::new(),
752 zones: Vec::new(),
753 acls: Vec::new(),
754 dns: Vec::new(),
755 vouchers: Vec::new(),
756 sites: Vec::new(),
757 events,
758 traffic_matching_lists: Vec::new(),
759 });
760 }
761
762 debug!(
763 devices = self.inner.store.device_count(),
764 clients = self.inner.store.client_count(),
765 "data refresh complete"
766 );
767
768 Ok(())
769 }
770
771 pub async fn execute(&self, cmd: Command) -> Result<CommandResult, CoreError> {
778 if *self.inner.connection_state.borrow() != ConnectionState::Connected {
779 return Err(CoreError::ControllerDisconnected);
780 }
781
782 let (tx, rx) = tokio::sync::oneshot::channel();
783
784 let command_tx = self.inner.command_tx.lock().await.clone();
785
786 command_tx
787 .send(CommandEnvelope {
788 command: cmd,
789 response_tx: tx,
790 })
791 .await
792 .map_err(|_| CoreError::ControllerDisconnected)?;
793
794 rx.await.map_err(|_| CoreError::ControllerDisconnected)?
795 }
796
797 pub async fn oneshot<F, Fut, T>(config: ControllerConfig, f: F) -> Result<T, CoreError>
804 where
805 F: FnOnce(Controller) -> Fut,
806 Fut: std::future::Future<Output = Result<T, CoreError>>,
807 {
808 let mut cfg = config;
809 cfg.websocket_enabled = false;
810 cfg.refresh_interval_secs = 0;
811
812 let controller = Controller::new(cfg);
813 controller.connect().await?;
814 let result = f(controller.clone()).await;
815 controller.disconnect().await;
816 result
817 }
818
819 pub fn connection_state(&self) -> watch::Receiver<ConnectionState> {
823 self.inner.connection_state.subscribe()
824 }
825
826 pub fn events(&self) -> broadcast::Receiver<Arc<Event>> {
828 self.inner.event_tx.subscribe()
829 }
830
831 pub fn devices_snapshot(&self) -> Arc<Vec<Arc<Device>>> {
834 self.inner.store.devices_snapshot()
835 }
836
837 pub fn clients_snapshot(&self) -> Arc<Vec<Arc<Client>>> {
838 self.inner.store.clients_snapshot()
839 }
840
841 pub fn networks_snapshot(&self) -> Arc<Vec<Arc<Network>>> {
842 self.inner.store.networks_snapshot()
843 }
844
845 pub fn wifi_broadcasts_snapshot(&self) -> Arc<Vec<Arc<WifiBroadcast>>> {
846 self.inner.store.wifi_broadcasts_snapshot()
847 }
848
849 pub fn firewall_policies_snapshot(&self) -> Arc<Vec<Arc<FirewallPolicy>>> {
850 self.inner.store.firewall_policies_snapshot()
851 }
852
853 pub fn firewall_zones_snapshot(&self) -> Arc<Vec<Arc<FirewallZone>>> {
854 self.inner.store.firewall_zones_snapshot()
855 }
856
857 pub fn acl_rules_snapshot(&self) -> Arc<Vec<Arc<AclRule>>> {
858 self.inner.store.acl_rules_snapshot()
859 }
860
861 pub fn dns_policies_snapshot(&self) -> Arc<Vec<Arc<DnsPolicy>>> {
862 self.inner.store.dns_policies_snapshot()
863 }
864
865 pub fn vouchers_snapshot(&self) -> Arc<Vec<Arc<Voucher>>> {
866 self.inner.store.vouchers_snapshot()
867 }
868
869 pub fn sites_snapshot(&self) -> Arc<Vec<Arc<Site>>> {
870 self.inner.store.sites_snapshot()
871 }
872
873 pub fn events_snapshot(&self) -> Arc<Vec<Arc<Event>>> {
874 self.inner.store.events_snapshot()
875 }
876
877 pub fn traffic_matching_lists_snapshot(&self) -> Arc<Vec<Arc<TrafficMatchingList>>> {
878 self.inner.store.traffic_matching_lists_snapshot()
879 }
880
881 pub fn devices(&self) -> EntityStream<Device> {
884 self.inner.store.subscribe_devices()
885 }
886
887 pub fn clients(&self) -> EntityStream<Client> {
888 self.inner.store.subscribe_clients()
889 }
890
891 pub fn networks(&self) -> EntityStream<Network> {
892 self.inner.store.subscribe_networks()
893 }
894
895 pub fn wifi_broadcasts(&self) -> EntityStream<WifiBroadcast> {
896 self.inner.store.subscribe_wifi_broadcasts()
897 }
898
899 pub fn firewall_policies(&self) -> EntityStream<FirewallPolicy> {
900 self.inner.store.subscribe_firewall_policies()
901 }
902
903 pub fn firewall_zones(&self) -> EntityStream<FirewallZone> {
904 self.inner.store.subscribe_firewall_zones()
905 }
906
907 pub fn acl_rules(&self) -> EntityStream<AclRule> {
908 self.inner.store.subscribe_acl_rules()
909 }
910
911 pub fn dns_policies(&self) -> EntityStream<DnsPolicy> {
912 self.inner.store.subscribe_dns_policies()
913 }
914
915 pub fn vouchers(&self) -> EntityStream<Voucher> {
916 self.inner.store.subscribe_vouchers()
917 }
918
919 pub fn sites(&self) -> EntityStream<Site> {
920 self.inner.store.subscribe_sites()
921 }
922
923 pub fn traffic_matching_lists(&self) -> EntityStream<TrafficMatchingList> {
924 self.inner.store.subscribe_traffic_matching_lists()
925 }
926
927 pub fn site_health(&self) -> watch::Receiver<Arc<Vec<HealthSummary>>> {
929 self.inner.store.subscribe_site_health()
930 }
931
932 pub async fn take_warnings(&self) -> Vec<String> {
934 std::mem::take(&mut *self.inner.warnings.lock().await)
935 }
936
937 pub async fn list_vpn_servers(&self) -> Result<Vec<VpnServer>, CoreError> {
944 let guard = self.inner.integration_client.lock().await;
945 let site_id = *self.inner.site_id.lock().await;
946 let (ic, sid) = require_integration(&guard, site_id, "list_vpn_servers")?;
947 let raw = ic
948 .paginate_all(200, |off, lim| ic.list_vpn_servers(&sid, off, lim))
949 .await?;
950 Ok(raw
951 .into_iter()
952 .map(|s| {
953 let id = s
954 .fields
955 .get("id")
956 .and_then(|v| v.as_str())
957 .and_then(|s| uuid::Uuid::parse_str(s).ok())
958 .map_or_else(|| EntityId::Legacy("unknown".into()), EntityId::Uuid);
959 VpnServer {
960 id,
961 name: s
962 .fields
963 .get("name")
964 .and_then(|v| v.as_str())
965 .map(String::from),
966 server_type: s
967 .fields
968 .get("type")
969 .or_else(|| s.fields.get("serverType"))
970 .and_then(|v| v.as_str())
971 .unwrap_or("UNKNOWN")
972 .to_owned(),
973 enabled: s.fields.get("enabled").and_then(serde_json::Value::as_bool),
974 }
975 })
976 .collect())
977 }
978
979 pub async fn list_vpn_tunnels(&self) -> Result<Vec<VpnTunnel>, CoreError> {
981 let guard = self.inner.integration_client.lock().await;
982 let site_id = *self.inner.site_id.lock().await;
983 let (ic, sid) = require_integration(&guard, site_id, "list_vpn_tunnels")?;
984 let raw = ic
985 .paginate_all(200, |off, lim| ic.list_vpn_tunnels(&sid, off, lim))
986 .await?;
987 Ok(raw
988 .into_iter()
989 .map(|t| {
990 let id = t
991 .fields
992 .get("id")
993 .and_then(|v| v.as_str())
994 .and_then(|s| uuid::Uuid::parse_str(s).ok())
995 .map_or_else(|| EntityId::Legacy("unknown".into()), EntityId::Uuid);
996 VpnTunnel {
997 id,
998 name: t
999 .fields
1000 .get("name")
1001 .and_then(|v| v.as_str())
1002 .map(String::from),
1003 tunnel_type: t
1004 .fields
1005 .get("type")
1006 .or_else(|| t.fields.get("tunnelType"))
1007 .and_then(|v| v.as_str())
1008 .unwrap_or("UNKNOWN")
1009 .to_owned(),
1010 enabled: t.fields.get("enabled").and_then(serde_json::Value::as_bool),
1011 }
1012 })
1013 .collect())
1014 }
1015
1016 pub async fn list_wans(&self) -> Result<Vec<WanInterface>, CoreError> {
1018 let guard = self.inner.integration_client.lock().await;
1019 let site_id = *self.inner.site_id.lock().await;
1020 let (ic, sid) = require_integration(&guard, site_id, "list_wans")?;
1021 let raw = ic
1022 .paginate_all(200, |off, lim| ic.list_wans(&sid, off, lim))
1023 .await?;
1024 Ok(raw
1025 .into_iter()
1026 .map(|w| {
1027 let id = w
1028 .fields
1029 .get("id")
1030 .and_then(|v| v.as_str())
1031 .and_then(|s| uuid::Uuid::parse_str(s).ok())
1032 .map_or_else(|| EntityId::Legacy("unknown".into()), EntityId::Uuid);
1033 let parse_ip = |key: &str| -> Option<std::net::IpAddr> {
1034 w.fields
1035 .get(key)
1036 .and_then(|v| v.as_str())
1037 .and_then(|s| s.parse().ok())
1038 };
1039 let dns = w
1040 .fields
1041 .get("dns")
1042 .and_then(|v| v.as_array())
1043 .map(|arr| {
1044 arr.iter()
1045 .filter_map(|v| v.as_str().and_then(|s| s.parse().ok()))
1046 .collect()
1047 })
1048 .unwrap_or_default();
1049 WanInterface {
1050 id,
1051 name: w
1052 .fields
1053 .get("name")
1054 .and_then(|v| v.as_str())
1055 .map(String::from),
1056 ip: parse_ip("ipAddress").or_else(|| parse_ip("ip")),
1057 gateway: parse_ip("gateway"),
1058 dns,
1059 }
1060 })
1061 .collect())
1062 }
1063
1064 pub async fn list_dpi_categories(&self) -> Result<Vec<DpiCategory>, CoreError> {
1066 let guard = self.inner.integration_client.lock().await;
1067 let site_id = *self.inner.site_id.lock().await;
1068 let (ic, sid) = require_integration(&guard, site_id, "list_dpi_categories")?;
1069 let raw = ic
1070 .paginate_all(200, |off, lim| ic.list_dpi_categories(&sid, off, lim))
1071 .await?;
1072 Ok(raw
1073 .into_iter()
1074 .map(|c| {
1075 #[allow(clippy::as_conversions, clippy::cast_possible_truncation)]
1076 let id = c
1077 .fields
1078 .get("id")
1079 .and_then(serde_json::Value::as_u64)
1080 .unwrap_or(0) as u32;
1081 DpiCategory {
1082 id,
1083 name: c
1084 .fields
1085 .get("name")
1086 .and_then(|v| v.as_str())
1087 .unwrap_or("Unknown")
1088 .to_owned(),
1089 tx_bytes: c
1090 .fields
1091 .get("txBytes")
1092 .and_then(serde_json::Value::as_u64)
1093 .unwrap_or(0),
1094 rx_bytes: c
1095 .fields
1096 .get("rxBytes")
1097 .and_then(serde_json::Value::as_u64)
1098 .unwrap_or(0),
1099 apps: Vec::new(),
1100 }
1101 })
1102 .collect())
1103 }
1104
1105 pub async fn list_dpi_applications(&self) -> Result<Vec<DpiApplication>, CoreError> {
1107 let guard = self.inner.integration_client.lock().await;
1108 let site_id = *self.inner.site_id.lock().await;
1109 let (ic, sid) = require_integration(&guard, site_id, "list_dpi_applications")?;
1110 let raw = ic
1111 .paginate_all(200, |off, lim| ic.list_dpi_applications(&sid, off, lim))
1112 .await?;
1113 Ok(raw
1114 .into_iter()
1115 .map(|a| {
1116 #[allow(clippy::as_conversions, clippy::cast_possible_truncation)]
1117 let id = a
1118 .fields
1119 .get("id")
1120 .and_then(serde_json::Value::as_u64)
1121 .unwrap_or(0) as u32;
1122 DpiApplication {
1123 id,
1124 name: a
1125 .fields
1126 .get("name")
1127 .and_then(|v| v.as_str())
1128 .unwrap_or("Unknown")
1129 .to_owned(),
1130 #[allow(clippy::as_conversions, clippy::cast_possible_truncation)]
1131 category_id: a
1132 .fields
1133 .get("categoryId")
1134 .and_then(serde_json::Value::as_u64)
1135 .unwrap_or(0) as u32,
1136 tx_bytes: a
1137 .fields
1138 .get("txBytes")
1139 .and_then(serde_json::Value::as_u64)
1140 .unwrap_or(0),
1141 rx_bytes: a
1142 .fields
1143 .get("rxBytes")
1144 .and_then(serde_json::Value::as_u64)
1145 .unwrap_or(0),
1146 }
1147 })
1148 .collect())
1149 }
1150
1151 pub async fn list_radius_profiles(&self) -> Result<Vec<RadiusProfile>, CoreError> {
1153 let guard = self.inner.integration_client.lock().await;
1154 let site_id = *self.inner.site_id.lock().await;
1155 let (ic, sid) = require_integration(&guard, site_id, "list_radius_profiles")?;
1156 let raw = ic
1157 .paginate_all(200, |off, lim| ic.list_radius_profiles(&sid, off, lim))
1158 .await?;
1159 Ok(raw
1160 .into_iter()
1161 .map(|r| {
1162 let id = r
1163 .fields
1164 .get("id")
1165 .and_then(|v| v.as_str())
1166 .and_then(|s| uuid::Uuid::parse_str(s).ok())
1167 .map_or_else(|| EntityId::Legacy("unknown".into()), EntityId::Uuid);
1168 RadiusProfile {
1169 id,
1170 name: r
1171 .fields
1172 .get("name")
1173 .and_then(|v| v.as_str())
1174 .unwrap_or("Unknown")
1175 .to_owned(),
1176 }
1177 })
1178 .collect())
1179 }
1180
1181 pub async fn list_countries(&self) -> Result<Vec<Country>, CoreError> {
1183 let guard = self.inner.integration_client.lock().await;
1184 let ic = guard
1185 .as_ref()
1186 .ok_or_else(|| unsupported("list_countries"))?;
1187 let raw = ic
1188 .paginate_all(200, |off, lim| ic.list_countries(off, lim))
1189 .await?;
1190 Ok(raw
1191 .into_iter()
1192 .map(|c| Country {
1193 code: c
1194 .fields
1195 .get("code")
1196 .and_then(|v| v.as_str())
1197 .unwrap_or("")
1198 .to_owned(),
1199 name: c
1200 .fields
1201 .get("name")
1202 .and_then(|v| v.as_str())
1203 .unwrap_or("Unknown")
1204 .to_owned(),
1205 })
1206 .collect())
1207 }
1208
1209 pub async fn get_network_references(
1211 &self,
1212 network_id: &EntityId,
1213 ) -> Result<serde_json::Value, CoreError> {
1214 let guard = self.inner.integration_client.lock().await;
1215 let site_id = *self.inner.site_id.lock().await;
1216 let (ic, sid) = require_integration(&guard, site_id, "get_network_references")?;
1217 let uuid = require_uuid(network_id)?;
1218 let refs = ic.get_network_references(&sid, &uuid).await?;
1219 Ok(serde_json::to_value(refs).unwrap_or_default())
1220 }
1221
1222 pub async fn get_firewall_policy_ordering(
1224 &self,
1225 ) -> Result<unifly_api::integration_types::FirewallPolicyOrdering, CoreError> {
1226 let guard = self.inner.integration_client.lock().await;
1227 let site_id = *self.inner.site_id.lock().await;
1228 let (ic, sid) = require_integration(&guard, site_id, "get_firewall_policy_ordering")?;
1229 Ok(ic.get_firewall_policy_ordering(&sid).await?)
1230 }
1231
1232 pub async fn list_pending_devices(&self) -> Result<Vec<serde_json::Value>, CoreError> {
1237 let integration_guard = self.inner.integration_client.lock().await;
1238 let site_id = *self.inner.site_id.lock().await;
1239
1240 if let (Some(ic), Some(sid)) = (integration_guard.as_ref(), site_id) {
1241 let raw = ic
1242 .paginate_all(200, |off, lim| ic.list_pending_devices(&sid, off, lim))
1243 .await?;
1244 return Ok(raw
1245 .into_iter()
1246 .map(|v| serde_json::to_value(v).unwrap_or_default())
1247 .collect());
1248 }
1249
1250 let snapshot = self.devices_snapshot();
1251 Ok(snapshot
1252 .iter()
1253 .filter(|d| d.state == crate::model::DeviceState::PendingAdoption)
1254 .map(|d| serde_json::to_value(d.as_ref()).unwrap_or_default())
1255 .collect())
1256 }
1257
1258 pub async fn list_device_tags(&self) -> Result<Vec<serde_json::Value>, CoreError> {
1262 let integration_guard = self.inner.integration_client.lock().await;
1263 let site_id = *self.inner.site_id.lock().await;
1264 if let (Some(ic), Some(sid)) = (integration_guard.as_ref(), site_id) {
1265 let raw = ic
1266 .paginate_all(200, |off, lim| ic.list_device_tags(&sid, off, lim))
1267 .await?;
1268 return Ok(raw
1269 .into_iter()
1270 .map(|v| serde_json::to_value(v).unwrap_or_default())
1271 .collect());
1272 }
1273
1274 Ok(Vec::new())
1275 }
1276
1277 pub async fn list_backups(&self) -> Result<Vec<serde_json::Value>, CoreError> {
1279 let guard = self.inner.legacy_client.lock().await;
1280 let legacy = require_legacy(&guard)?;
1281 Ok(legacy.list_backups().await?)
1282 }
1283
1284 pub async fn download_backup(&self, filename: &str) -> Result<Vec<u8>, CoreError> {
1286 let guard = self.inner.legacy_client.lock().await;
1287 let legacy = require_legacy(&guard)?;
1288 Ok(legacy.download_backup(filename).await?)
1289 }
1290
1291 pub async fn get_site_stats(
1295 &self,
1296 interval: &str,
1297 start: Option<i64>,
1298 end: Option<i64>,
1299 attrs: Option<&[String]>,
1300 ) -> Result<Vec<serde_json::Value>, CoreError> {
1301 let guard = self.inner.legacy_client.lock().await;
1302 let legacy = require_legacy(&guard)?;
1303 Ok(legacy.get_site_stats(interval, start, end, attrs).await?)
1304 }
1305
1306 pub async fn get_device_stats(
1308 &self,
1309 interval: &str,
1310 macs: Option<&[String]>,
1311 attrs: Option<&[String]>,
1312 ) -> Result<Vec<serde_json::Value>, CoreError> {
1313 let guard = self.inner.legacy_client.lock().await;
1314 let legacy = require_legacy(&guard)?;
1315 Ok(legacy.get_device_stats(interval, macs, attrs).await?)
1316 }
1317
1318 pub async fn get_client_stats(
1320 &self,
1321 interval: &str,
1322 macs: Option<&[String]>,
1323 attrs: Option<&[String]>,
1324 ) -> Result<Vec<serde_json::Value>, CoreError> {
1325 let guard = self.inner.legacy_client.lock().await;
1326 let legacy = require_legacy(&guard)?;
1327 Ok(legacy.get_client_stats(interval, macs, attrs).await?)
1328 }
1329
1330 pub async fn get_gateway_stats(
1332 &self,
1333 interval: &str,
1334 start: Option<i64>,
1335 end: Option<i64>,
1336 attrs: Option<&[String]>,
1337 ) -> Result<Vec<serde_json::Value>, CoreError> {
1338 let guard = self.inner.legacy_client.lock().await;
1339 let legacy = require_legacy(&guard)?;
1340 Ok(legacy
1341 .get_gateway_stats(interval, start, end, attrs)
1342 .await?)
1343 }
1344
1345 pub async fn get_dpi_stats(
1347 &self,
1348 group_by: &str,
1349 macs: Option<&[String]>,
1350 ) -> Result<Vec<serde_json::Value>, CoreError> {
1351 let guard = self.inner.legacy_client.lock().await;
1352 let legacy = require_legacy(&guard)?;
1353 Ok(legacy.get_dpi_stats(group_by, macs).await?)
1354 }
1355
1356 pub async fn list_admins(&self) -> Result<Vec<Admin>, CoreError> {
1362 let guard = self.inner.legacy_client.lock().await;
1363 let legacy = require_legacy(&guard)?;
1364 let raw = legacy.list_admins().await?;
1365 Ok(raw
1366 .into_iter()
1367 .map(|v| Admin {
1368 id: v.get("_id").and_then(|v| v.as_str()).map_or_else(
1369 || EntityId::Legacy("unknown".into()),
1370 |s| EntityId::Legacy(s.into()),
1371 ),
1372 name: v
1373 .get("name")
1374 .and_then(|v| v.as_str())
1375 .unwrap_or("")
1376 .to_owned(),
1377 email: v.get("email").and_then(|v| v.as_str()).map(String::from),
1378 role: v
1379 .get("role")
1380 .and_then(|v| v.as_str())
1381 .unwrap_or("unknown")
1382 .to_owned(),
1383 is_super: v
1384 .get("is_super")
1385 .and_then(serde_json::Value::as_bool)
1386 .unwrap_or(false),
1387 last_login: None,
1388 })
1389 .collect())
1390 }
1391
1392 pub async fn list_alarms(&self) -> Result<Vec<Alarm>, CoreError> {
1394 let guard = self.inner.legacy_client.lock().await;
1395 let legacy = require_legacy(&guard)?;
1396 let raw = legacy.list_alarms().await?;
1397 Ok(raw.into_iter().map(Alarm::from).collect())
1398 }
1399
1400 pub async fn get_system_info(&self) -> Result<SystemInfo, CoreError> {
1405 {
1407 let guard = self.inner.integration_client.lock().await;
1408 if let Some(ic) = guard.as_ref() {
1409 let info = ic.get_info().await?;
1410 let f = &info.fields;
1411 return Ok(SystemInfo {
1412 controller_name: f
1413 .get("applicationName")
1414 .or_else(|| f.get("name"))
1415 .and_then(|v| v.as_str())
1416 .map(String::from),
1417 version: f
1418 .get("applicationVersion")
1419 .or_else(|| f.get("version"))
1420 .and_then(|v| v.as_str())
1421 .unwrap_or("unknown")
1422 .to_owned(),
1423 build: f.get("build").and_then(|v| v.as_str()).map(String::from),
1424 hostname: f.get("hostname").and_then(|v| v.as_str()).map(String::from),
1425 ip: None, uptime_secs: f.get("uptime").and_then(serde_json::Value::as_u64),
1427 update_available: f
1428 .get("isUpdateAvailable")
1429 .or_else(|| f.get("update_available"))
1430 .and_then(serde_json::Value::as_bool),
1431 });
1432 }
1433 }
1434
1435 let guard = self.inner.legacy_client.lock().await;
1437 let legacy = require_legacy(&guard)?;
1438 let raw = legacy.get_sysinfo().await?;
1439 Ok(SystemInfo {
1440 controller_name: raw
1441 .get("controller_name")
1442 .or_else(|| raw.get("name"))
1443 .and_then(|v| v.as_str())
1444 .map(String::from),
1445 version: raw
1446 .get("version")
1447 .and_then(|v| v.as_str())
1448 .unwrap_or("unknown")
1449 .to_owned(),
1450 build: raw.get("build").and_then(|v| v.as_str()).map(String::from),
1451 hostname: raw
1452 .get("hostname")
1453 .and_then(|v| v.as_str())
1454 .map(String::from),
1455 ip: raw
1456 .get("ip_addrs")
1457 .and_then(|v| v.as_array())
1458 .and_then(|a| a.first())
1459 .and_then(|v| v.as_str())
1460 .and_then(|s| s.parse().ok()),
1461 uptime_secs: raw.get("uptime").and_then(serde_json::Value::as_u64),
1462 update_available: raw
1463 .get("update_available")
1464 .and_then(serde_json::Value::as_bool),
1465 })
1466 }
1467
1468 pub async fn get_site_health(&self) -> Result<Vec<HealthSummary>, CoreError> {
1470 let guard = self.inner.legacy_client.lock().await;
1471 let legacy = require_legacy(&guard)?;
1472 let raw = legacy.get_health().await?;
1473 Ok(convert_health_summaries(raw))
1474 }
1475
1476 pub async fn get_sysinfo(&self) -> Result<SysInfo, CoreError> {
1478 let guard = self.inner.legacy_client.lock().await;
1479 let legacy = require_legacy(&guard)?;
1480 let raw = legacy.get_sysinfo().await?;
1481 Ok(SysInfo {
1482 timezone: raw
1483 .get("timezone")
1484 .and_then(|v| v.as_str())
1485 .map(String::from),
1486 autobackup: raw.get("autobackup").and_then(serde_json::Value::as_bool),
1487 hostname: raw
1488 .get("hostname")
1489 .and_then(|v| v.as_str())
1490 .map(String::from),
1491 ip_addrs: raw
1492 .get("ip_addrs")
1493 .and_then(|v| v.as_array())
1494 .map(|a| {
1495 a.iter()
1496 .filter_map(|v| v.as_str().map(String::from))
1497 .collect()
1498 })
1499 .unwrap_or_default(),
1500 live_chat: raw
1501 .get("live_chat")
1502 .and_then(|v| v.as_str())
1503 .map(String::from),
1504 #[allow(clippy::as_conversions, clippy::cast_possible_truncation)]
1505 data_retention_days: raw
1506 .get("data_retention_days")
1507 .and_then(serde_json::Value::as_u64)
1508 .map(|n| n as u32),
1509 extra: raw,
1510 })
1511 }
1512}
1513
1514fn parse_f64_field(parent: Option<&serde_json::Value>, key: &str) -> Option<f64> {
1518 parent.and_then(|s| s.get(key)).and_then(|v| {
1519 v.as_str()
1520 .and_then(|s| s.parse().ok())
1521 .or_else(|| v.as_f64())
1522 })
1523}
1524
1525#[allow(clippy::cast_precision_loss)]
1531fn apply_device_sync(store: &DataStore, data: &serde_json::Value) {
1532 let Some(mac_str) = data.get("mac").and_then(serde_json::Value::as_str) else {
1533 return;
1534 };
1535 let mac = MacAddress::new(mac_str);
1536 let Some(existing) = store.device_by_mac(&mac) else {
1537 return; };
1539
1540 let sys = data.get("sys_stats");
1542 let cpu = sys
1543 .and_then(|s| s.get("cpu"))
1544 .and_then(|v| v.as_str().or_else(|| v.as_f64().map(|_| "")))
1545 .and_then(|s| {
1546 if s.is_empty() {
1547 None
1548 } else {
1549 s.parse::<f64>().ok()
1550 }
1551 })
1552 .or_else(|| {
1553 sys.and_then(|s| s.get("cpu"))
1554 .and_then(serde_json::Value::as_f64)
1555 });
1556 #[allow(clippy::as_conversions, clippy::cast_precision_loss)]
1557 let mem_pct = match (
1558 sys.and_then(|s| s.get("mem_used"))
1559 .and_then(serde_json::Value::as_i64),
1560 sys.and_then(|s| s.get("mem_total"))
1561 .and_then(serde_json::Value::as_i64),
1562 ) {
1563 (Some(used), Some(total)) if total > 0 => Some((used as f64 / total as f64) * 100.0),
1564 _ => None,
1565 };
1566 let load_averages: [Option<f64>; 3] =
1567 ["loadavg_1", "loadavg_5", "loadavg_15"].map(|key| parse_f64_field(sys, key));
1568
1569 let uplink = data.get("uplink");
1571 let tx_bps = uplink
1572 .and_then(|u| u.get("tx_bytes-r").or_else(|| u.get("tx_bytes_r")))
1573 .and_then(serde_json::Value::as_u64)
1574 .or_else(|| data.get("tx_bytes-r").and_then(serde_json::Value::as_u64));
1575 let rx_bps = uplink
1576 .and_then(|u| u.get("rx_bytes-r").or_else(|| u.get("rx_bytes_r")))
1577 .and_then(serde_json::Value::as_u64)
1578 .or_else(|| data.get("rx_bytes-r").and_then(serde_json::Value::as_u64));
1579
1580 let bandwidth = match (tx_bps, rx_bps) {
1581 (Some(tx), Some(rx)) if tx > 0 || rx > 0 => Some(crate::model::common::Bandwidth {
1582 tx_bytes_per_sec: tx,
1583 rx_bytes_per_sec: rx,
1584 }),
1585 _ => existing.stats.uplink_bandwidth, };
1587
1588 let uptime = data
1590 .get("_uptime")
1591 .or_else(|| data.get("uptime"))
1592 .and_then(serde_json::Value::as_i64)
1593 .and_then(|u| u.try_into().ok())
1594 .or(existing.stats.uptime_secs);
1595
1596 let mut device = (*existing).clone();
1598 device.stats.uplink_bandwidth = bandwidth;
1599 if let Some(c) = cpu {
1600 device.stats.cpu_utilization_pct = Some(c);
1601 }
1602 if let Some(m) = mem_pct {
1603 device.stats.memory_utilization_pct = Some(m);
1604 }
1605 if let Some(l) = load_averages[0] {
1606 device.stats.load_average_1m = Some(l);
1607 }
1608 if let Some(l) = load_averages[1] {
1609 device.stats.load_average_5m = Some(l);
1610 }
1611 if let Some(l) = load_averages[2] {
1612 device.stats.load_average_15m = Some(l);
1613 }
1614 device.stats.uptime_secs = uptime;
1615
1616 if let Some(num_sta) = data.get("num_sta").and_then(serde_json::Value::as_u64) {
1618 #[allow(clippy::as_conversions, clippy::cast_possible_truncation)]
1619 {
1620 device.client_count = Some(num_sta as u32);
1621 }
1622 }
1623
1624 if let Some(obj) = data.as_object() {
1625 if let Some(wan_ipv6) = parse_legacy_device_wan_ipv6(obj) {
1626 device.wan_ipv6 = Some(wan_ipv6);
1627 }
1628 }
1629
1630 let key = mac.as_str().to_owned();
1631 let id = device.id.clone();
1632 store.devices.upsert(key, id, device);
1633}
1634
1635async fn refresh_task(controller: Controller, interval_secs: u64, cancel: CancellationToken) {
1637 let mut interval = tokio::time::interval(Duration::from_secs(interval_secs));
1638 interval.tick().await; loop {
1641 tokio::select! {
1642 biased;
1643 () = cancel.cancelled() => break,
1644 _ = interval.tick() => {
1645 if let Err(e) = controller.full_refresh().await {
1646 warn!(error = %e, "periodic refresh failed");
1647 }
1648 }
1649 }
1650 }
1651}
1652
1653async fn command_processor_task(controller: Controller, mut rx: mpsc::Receiver<CommandEnvelope>) {
1656 let cancel = controller.inner.cancel_child.lock().await.clone();
1657
1658 loop {
1659 tokio::select! {
1660 biased;
1661 () = cancel.cancelled() => break,
1662 envelope = rx.recv() => {
1663 let Some(envelope) = envelope else { break };
1664 let result = route_command(&controller, envelope.command).await;
1665 let _ = envelope.response_tx.send(result);
1666 }
1667 }
1668 }
1669}
1670
1671#[allow(clippy::cognitive_complexity, clippy::too_many_lines)]
1678async fn route_command(controller: &Controller, cmd: Command) -> Result<CommandResult, CoreError> {
1679 let store = &controller.inner.store;
1680
1681 let integration_guard = controller.inner.integration_client.lock().await;
1683 let legacy_guard = controller.inner.legacy_client.lock().await;
1684 let site_id = *controller.inner.site_id.lock().await;
1685
1686 match cmd {
1687 Command::AdoptDevice {
1689 mac,
1690 ignore_device_limit,
1691 } => {
1692 if let (Some(ic), Some(sid)) = (integration_guard.as_ref(), site_id) {
1693 ic.adopt_device(&sid, mac.as_str(), ignore_device_limit)
1694 .await?;
1695 } else {
1696 let legacy = require_legacy(&legacy_guard)?;
1697 legacy.adopt_device(mac.as_str()).await?;
1698 }
1699 Ok(CommandResult::Ok)
1700 }
1701
1702 Command::RestartDevice { id } => {
1703 if let (Some(ic), Some(sid)) = (integration_guard.as_ref(), site_id) {
1704 let device_uuid = require_uuid(&id)?;
1705 ic.device_action(&sid, &device_uuid, "RESTART").await?;
1706 } else {
1707 let legacy = require_legacy(&legacy_guard)?;
1708 let mac = device_mac(store, &id)?;
1709 legacy.restart_device(mac.as_str()).await?;
1710 }
1711 Ok(CommandResult::Ok)
1712 }
1713
1714 Command::LocateDevice { mac, enable } => {
1715 if let (Some(ic), Some(sid)) = (integration_guard.as_ref(), site_id) {
1716 let device =
1717 store
1718 .device_by_mac(&mac)
1719 .ok_or_else(|| CoreError::DeviceNotFound {
1720 identifier: mac.to_string(),
1721 })?;
1722 let device_uuid = require_uuid(&device.id)?;
1723 let action = if enable { "LOCATE_ON" } else { "LOCATE_OFF" };
1724 ic.device_action(&sid, &device_uuid, action).await?;
1725 } else {
1726 let legacy = require_legacy(&legacy_guard)?;
1727 legacy.locate_device(mac.as_str(), enable).await?;
1728 }
1729 Ok(CommandResult::Ok)
1730 }
1731
1732 Command::UpgradeDevice { mac, firmware_url } => {
1733 let legacy = require_legacy(&legacy_guard)?;
1734 legacy
1735 .upgrade_device(mac.as_str(), firmware_url.as_deref())
1736 .await?;
1737 Ok(CommandResult::Ok)
1738 }
1739
1740 Command::RemoveDevice { id } => {
1741 let (ic, sid) = require_integration(&integration_guard, site_id, "RemoveDevice")?;
1742 let device_uuid = require_uuid(&id)?;
1743 ic.remove_device(&sid, &device_uuid).await?;
1744 Ok(CommandResult::Ok)
1745 }
1746
1747 Command::ProvisionDevice { mac } => {
1748 let legacy = require_legacy(&legacy_guard)?;
1749 legacy.provision_device(mac.as_str()).await?;
1750 Ok(CommandResult::Ok)
1751 }
1752 Command::SpeedtestDevice => {
1753 let legacy = require_legacy(&legacy_guard)?;
1754 legacy.speedtest().await?;
1755 Ok(CommandResult::Ok)
1756 }
1757
1758 Command::PowerCyclePort {
1759 device_id,
1760 port_idx,
1761 } => {
1762 let (ic, sid) = require_integration(&integration_guard, site_id, "PowerCyclePort")?;
1763 let device_uuid = require_uuid(&device_id)?;
1764 ic.port_action(&sid, &device_uuid, port_idx, "POWER_CYCLE")
1765 .await?;
1766 Ok(CommandResult::Ok)
1767 }
1768
1769 Command::BlockClient { mac } => {
1771 if let (Some(ic), Some(sid)) = (integration_guard.as_ref(), site_id) {
1772 let client =
1773 store
1774 .client_by_mac(&mac)
1775 .ok_or_else(|| CoreError::ClientNotFound {
1776 identifier: mac.to_string(),
1777 })?;
1778 let client_uuid = require_uuid(&client.id)?;
1779 ic.client_action(&sid, &client_uuid, "BLOCK").await?;
1780 } else {
1781 let legacy = require_legacy(&legacy_guard)?;
1782 legacy.block_client(mac.as_str()).await?;
1783 }
1784 Ok(CommandResult::Ok)
1785 }
1786
1787 Command::UnblockClient { mac } => {
1788 if let (Some(ic), Some(sid)) = (integration_guard.as_ref(), site_id) {
1789 let client =
1790 store
1791 .client_by_mac(&mac)
1792 .ok_or_else(|| CoreError::ClientNotFound {
1793 identifier: mac.to_string(),
1794 })?;
1795 let client_uuid = require_uuid(&client.id)?;
1796 ic.client_action(&sid, &client_uuid, "UNBLOCK").await?;
1797 } else {
1798 let legacy = require_legacy(&legacy_guard)?;
1799 legacy.unblock_client(mac.as_str()).await?;
1800 }
1801 Ok(CommandResult::Ok)
1802 }
1803
1804 Command::KickClient { mac } => {
1805 if let (Some(ic), Some(sid)) = (integration_guard.as_ref(), site_id) {
1806 let client =
1807 store
1808 .client_by_mac(&mac)
1809 .ok_or_else(|| CoreError::ClientNotFound {
1810 identifier: mac.to_string(),
1811 })?;
1812 let client_uuid = require_uuid(&client.id)?;
1813 ic.client_action(&sid, &client_uuid, "RECONNECT").await?;
1814 } else {
1815 let legacy = require_legacy(&legacy_guard)?;
1816 legacy.kick_client(mac.as_str()).await?;
1817 }
1818 Ok(CommandResult::Ok)
1819 }
1820
1821 Command::ForgetClient { mac } => {
1822 let legacy = require_legacy(&legacy_guard)?;
1823 legacy.forget_client(mac.as_str()).await?;
1824 Ok(CommandResult::Ok)
1825 }
1826
1827 Command::AuthorizeGuest {
1828 client_id,
1829 time_limit_minutes,
1830 data_limit_mb,
1831 rx_rate_kbps,
1832 tx_rate_kbps,
1833 } => {
1834 let legacy = require_legacy(&legacy_guard)?;
1835 let mac = client_mac(store, &client_id)?;
1836 let minutes = time_limit_minutes.unwrap_or(60);
1837 #[allow(clippy::as_conversions, clippy::cast_possible_truncation)]
1838 {
1839 legacy
1840 .authorize_guest(
1841 mac.as_str(),
1842 minutes,
1843 tx_rate_kbps.map(|r| r as u32),
1844 rx_rate_kbps.map(|r| r as u32),
1845 data_limit_mb.map(|m| m as u32),
1846 )
1847 .await?;
1848 }
1849 Ok(CommandResult::Ok)
1850 }
1851
1852 Command::UnauthorizeGuest { client_id } => {
1853 let legacy = require_legacy(&legacy_guard)?;
1854 let mac = client_mac(store, &client_id)?;
1855 legacy.unauthorize_guest(mac.as_str()).await?;
1856 Ok(CommandResult::Ok)
1857 }
1858
1859 Command::ArchiveAlarm { id } => {
1861 let legacy = require_legacy(&legacy_guard)?;
1862 legacy.archive_alarm(&id.to_string()).await?;
1863 Ok(CommandResult::Ok)
1864 }
1865
1866 Command::ArchiveAllAlarms => {
1867 let legacy = require_legacy(&legacy_guard)?;
1868 legacy.archive_all_alarms().await?;
1869 Ok(CommandResult::Ok)
1870 }
1871
1872 Command::CreateBackup => {
1874 let legacy = require_legacy(&legacy_guard)?;
1875 legacy.create_backup().await?;
1876 Ok(CommandResult::Ok)
1877 }
1878
1879 Command::DeleteBackup { filename } => {
1880 let legacy = require_legacy(&legacy_guard)?;
1881 legacy.delete_backup(&filename).await?;
1882 Ok(CommandResult::Ok)
1883 }
1884
1885 Command::CreateNetwork(req) => {
1887 let (ic, sid) = require_integration(&integration_guard, site_id, "CreateNetwork")?;
1888 let crate::command::CreateNetworkRequest {
1889 name,
1890 vlan_id,
1891 subnet,
1892 management,
1893 purpose,
1894 dhcp_enabled,
1895 enabled,
1896 dhcp_range_start,
1897 dhcp_range_stop,
1898 dhcp_lease_time,
1899 firewall_zone_id,
1900 isolation_enabled,
1901 internet_access_enabled,
1902 } = req;
1903
1904 let management = management.unwrap_or_else(|| {
1905 if matches!(purpose, Some(NetworkPurpose::VlanOnly)) {
1906 NetworkManagement::Unmanaged
1907 } else if purpose.is_some() || subnet.is_some() || dhcp_enabled {
1908 NetworkManagement::Gateway
1909 } else {
1910 NetworkManagement::Unmanaged
1911 }
1912 });
1913 let mut extra = HashMap::new();
1914
1915 if let Some(zone) = firewall_zone_id {
1916 extra.insert("zoneId".into(), serde_json::Value::String(zone));
1917 }
1918
1919 if matches!(management, NetworkManagement::Gateway) {
1920 extra.insert(
1921 "isolationEnabled".into(),
1922 serde_json::Value::Bool(isolation_enabled),
1923 );
1924 extra.insert(
1925 "internetAccessEnabled".into(),
1926 serde_json::Value::Bool(internet_access_enabled),
1927 );
1928
1929 if let Some(cidr) = subnet {
1930 let (host_ip, prefix_len) = parse_ipv4_cidr(&cidr)?;
1931 let mut dhcp_cfg = serde_json::Map::new();
1932 dhcp_cfg.insert(
1933 "mode".into(),
1934 serde_json::Value::String(
1935 if dhcp_enabled { "SERVER" } else { "NONE" }.into(),
1936 ),
1937 );
1938 if let Some(lease) = dhcp_lease_time {
1939 dhcp_cfg.insert(
1940 "leaseTimeSeconds".into(),
1941 serde_json::Value::Number(serde_json::Number::from(u64::from(lease))),
1942 );
1943 }
1944
1945 if let (Some(start), Some(stop)) = (dhcp_range_start, dhcp_range_stop) {
1946 dhcp_cfg.insert(
1947 "ipAddressRange".into(),
1948 serde_json::json!({
1949 "start": start,
1950 "end": stop
1951 }),
1952 );
1953 }
1954
1955 extra.insert(
1956 "ipv4Configuration".into(),
1957 serde_json::json!({
1958 "hostIpAddress": host_ip.to_string(),
1959 "prefixLength": u64::from(prefix_len),
1960 "dhcpConfiguration": dhcp_cfg
1961 }),
1962 );
1963 }
1964 }
1965
1966 let body = unifly_api::integration_types::NetworkCreateUpdate {
1967 name,
1968 enabled,
1969 management: "USER_DEFINED".into(),
1970 vlan_id: vlan_id.map_or(1, i32::from),
1971 dhcp_guarding: None,
1972 extra,
1973 };
1974 ic.create_network(&sid, &body).await?;
1975 Ok(CommandResult::Ok)
1976 }
1977
1978 Command::UpdateNetwork { id, update } => {
1979 let (ic, sid) = require_integration(&integration_guard, site_id, "UpdateNetwork")?;
1980 let uuid = require_uuid(&id)?;
1981 let existing = ic.get_network(&sid, &uuid).await?;
1983 let mut extra = existing.extra;
1985 if let Some(v) = update.isolation_enabled {
1986 extra.insert("isolationEnabled".into(), serde_json::Value::Bool(v));
1987 }
1988 if let Some(v) = update.internet_access_enabled {
1989 extra.insert("internetAccessEnabled".into(), serde_json::Value::Bool(v));
1990 }
1991 if let Some(v) = update.mdns_forwarding_enabled {
1992 extra.insert("mdnsForwardingEnabled".into(), serde_json::Value::Bool(v));
1993 }
1994 if let Some(v) = update.ipv6_enabled {
1995 if v {
1996 extra
1998 .entry("ipv6Configuration".into())
1999 .or_insert_with(|| serde_json::json!({ "type": "PREFIX_DELEGATION" }));
2000 } else {
2001 extra.remove("ipv6Configuration");
2002 }
2003 }
2004 let body = unifly_api::integration_types::NetworkCreateUpdate {
2005 name: update.name.unwrap_or(existing.name),
2006 enabled: update.enabled.unwrap_or(existing.enabled),
2007 management: existing.management,
2008 vlan_id: update.vlan_id.map_or(existing.vlan_id, i32::from),
2009 dhcp_guarding: existing.dhcp_guarding,
2010 extra,
2011 };
2012 ic.update_network(&sid, &uuid, &body).await?;
2013 Ok(CommandResult::Ok)
2014 }
2015
2016 Command::DeleteNetwork { id, force: _ } => {
2017 let (ic, sid) = require_integration(&integration_guard, site_id, "DeleteNetwork")?;
2018 let uuid = require_uuid(&id)?;
2019 ic.delete_network(&sid, &uuid).await?;
2020 Ok(CommandResult::Ok)
2021 }
2022
2023 Command::CreateWifiBroadcast(req) => {
2025 let (ic, sid) =
2026 require_integration(&integration_guard, site_id, "CreateWifiBroadcast")?;
2027 let mut extra = serde_json::Map::new();
2028 extra.insert("ssid".into(), serde_json::Value::String(req.ssid));
2029 let security_mode = match req.security_mode {
2030 crate::model::WifiSecurityMode::Open => "OPEN",
2031 crate::model::WifiSecurityMode::Wpa2Personal => "WPA2_PERSONAL",
2032 crate::model::WifiSecurityMode::Wpa3Personal => "WPA3_PERSONAL",
2033 crate::model::WifiSecurityMode::Wpa2Wpa3Personal => "WPA2_WPA3_PERSONAL",
2034 crate::model::WifiSecurityMode::Wpa2Enterprise => "WPA2_ENTERPRISE",
2035 crate::model::WifiSecurityMode::Wpa3Enterprise => "WPA3_ENTERPRISE",
2036 crate::model::WifiSecurityMode::Wpa2Wpa3Enterprise => "WPA2_WPA3_ENTERPRISE",
2037 };
2038 let mut security_configuration = serde_json::Map::new();
2039 security_configuration.insert(
2040 "mode".into(),
2041 serde_json::Value::String(security_mode.into()),
2042 );
2043 if let Some(pass) = req.passphrase {
2044 security_configuration.insert("passphrase".into(), serde_json::Value::String(pass));
2045 }
2046 extra.insert(
2047 "securityConfiguration".into(),
2048 serde_json::Value::Object(security_configuration),
2049 );
2050 if let Some(network_id) = req.network_id {
2051 extra.insert(
2052 "network".into(),
2053 serde_json::json!({ "id": network_id.to_string() }),
2054 );
2055 }
2056 extra.insert("hideSsid".into(), serde_json::Value::Bool(req.hide_ssid));
2057 if req.band_steering {
2058 extra.insert("bandSteering".into(), serde_json::Value::Bool(true));
2059 }
2060 if req.fast_roaming {
2061 extra.insert("fastRoaming".into(), serde_json::Value::Bool(true));
2062 }
2063 if let Some(freqs) = req.frequencies_ghz {
2064 let values = freqs
2065 .into_iter()
2066 .map(|f| serde_json::Value::from(f64::from(f)))
2067 .collect::<Vec<_>>();
2068 extra.insert("frequencies".into(), serde_json::Value::Array(values));
2069 }
2070 let body = unifly_api::integration_types::WifiBroadcastCreateUpdate {
2071 name: req.name,
2072 broadcast_type: req.broadcast_type.unwrap_or_else(|| "STANDARD".into()),
2073 enabled: req.enabled,
2074 body: extra,
2075 };
2076 ic.create_wifi_broadcast(&sid, &body).await?;
2077 Ok(CommandResult::Ok)
2078 }
2079
2080 Command::UpdateWifiBroadcast { id, update } => {
2081 let (ic, sid) =
2082 require_integration(&integration_guard, site_id, "UpdateWifiBroadcast")?;
2083 let uuid = require_uuid(&id)?;
2084 let existing = ic.get_wifi_broadcast(&sid, &uuid).await?;
2085
2086 let mut body = serde_json::Map::new();
2087 for (k, v) in existing.extra {
2088 body.insert(k, v);
2089 }
2090 body.insert(
2091 "securityConfiguration".into(),
2092 existing.security_configuration.clone(),
2093 );
2094 if let Some(network) = existing.network.clone() {
2095 body.insert("network".into(), network);
2096 }
2097 if let Some(filter) = existing.broadcasting_device_filter.clone() {
2098 body.insert("broadcastingDeviceFilter".into(), filter);
2099 }
2100
2101 if let Some(ssid) = update.ssid.clone() {
2102 body.insert("ssid".into(), serde_json::Value::String(ssid));
2103 }
2104 if let Some(hidden) = update.hide_ssid {
2105 body.insert("hideSsid".into(), serde_json::Value::Bool(hidden));
2106 }
2107
2108 let mut security_cfg = existing
2109 .security_configuration
2110 .as_object()
2111 .cloned()
2112 .unwrap_or_default();
2113 if let Some(mode) = update.security_mode {
2114 let mode = match mode {
2115 crate::model::WifiSecurityMode::Open => "OPEN",
2116 crate::model::WifiSecurityMode::Wpa2Personal => "WPA2_PERSONAL",
2117 crate::model::WifiSecurityMode::Wpa3Personal => "WPA3_PERSONAL",
2118 crate::model::WifiSecurityMode::Wpa2Wpa3Personal => "WPA2_WPA3_PERSONAL",
2119 crate::model::WifiSecurityMode::Wpa2Enterprise => "WPA2_ENTERPRISE",
2120 crate::model::WifiSecurityMode::Wpa3Enterprise => "WPA3_ENTERPRISE",
2121 crate::model::WifiSecurityMode::Wpa2Wpa3Enterprise => "WPA2_WPA3_ENTERPRISE",
2122 };
2123 security_cfg.insert("mode".into(), serde_json::Value::String(mode.into()));
2124 }
2125 if let Some(passphrase) = update.passphrase.clone() {
2126 security_cfg.insert("passphrase".into(), serde_json::Value::String(passphrase));
2127 }
2128 body.insert(
2129 "securityConfiguration".into(),
2130 serde_json::Value::Object(security_cfg),
2131 );
2132
2133 let payload = unifly_api::integration_types::WifiBroadcastCreateUpdate {
2134 name: update.name.unwrap_or(existing.name),
2135 broadcast_type: existing.broadcast_type,
2136 enabled: update.enabled.unwrap_or(existing.enabled),
2137 body,
2138 };
2139 ic.update_wifi_broadcast(&sid, &uuid, &payload).await?;
2140 Ok(CommandResult::Ok)
2141 }
2142
2143 Command::DeleteWifiBroadcast { id, force: _ } => {
2144 let (ic, sid) =
2145 require_integration(&integration_guard, site_id, "DeleteWifiBroadcast")?;
2146 let uuid = require_uuid(&id)?;
2147 ic.delete_wifi_broadcast(&sid, &uuid).await?;
2148 Ok(CommandResult::Ok)
2149 }
2150
2151 Command::CreateFirewallPolicy(req) => {
2153 let (ic, sid) =
2154 require_integration(&integration_guard, site_id, "CreateFirewallPolicy")?;
2155 let action_str = match req.action {
2156 FirewallAction::Allow => "ALLOW",
2157 FirewallAction::Block => "DROP",
2158 FirewallAction::Reject => "REJECT",
2159 };
2160 let body = unifly_api::integration_types::FirewallPolicyCreateUpdate {
2161 name: req.name,
2162 description: req.description,
2163 enabled: req.enabled,
2164 action: serde_json::json!({ "type": action_str }),
2165 source: serde_json::json!({ "zoneId": req.source_zone_id.to_string() }),
2166 destination: serde_json::json!({ "zoneId": req.destination_zone_id.to_string() }),
2167 ip_protocol_scope: serde_json::json!("ALL"),
2168 logging_enabled: req.logging_enabled,
2169 ipsec_filter: None,
2170 schedule: None,
2171 connection_state_filter: None,
2172 };
2173 ic.create_firewall_policy(&sid, &body).await?;
2174 Ok(CommandResult::Ok)
2175 }
2176
2177 Command::UpdateFirewallPolicy { id, update } => {
2178 let (ic, sid) =
2179 require_integration(&integration_guard, site_id, "UpdateFirewallPolicy")?;
2180 let uuid = require_uuid(&id)?;
2181 let existing = ic.get_firewall_policy(&sid, &uuid).await?;
2182
2183 let mut source = existing
2184 .extra
2185 .get("source")
2186 .cloned()
2187 .unwrap_or_else(|| serde_json::json!({}));
2188 if let Some(addr) = update.source_address.clone() {
2189 if let Some(obj) = source.as_object_mut() {
2190 obj.insert("address".into(), serde_json::Value::String(addr));
2191 }
2192 }
2193
2194 let mut destination = existing
2195 .extra
2196 .get("destination")
2197 .cloned()
2198 .unwrap_or_else(|| serde_json::json!({}));
2199 if let Some(addr) = update.destination_address.clone() {
2200 if let Some(obj) = destination.as_object_mut() {
2201 obj.insert("address".into(), serde_json::Value::String(addr));
2202 }
2203 }
2204 if let Some(port) = update.destination_port.clone() {
2205 if let Some(obj) = destination.as_object_mut() {
2206 obj.insert("port".into(), serde_json::Value::String(port));
2207 }
2208 }
2209
2210 let action = if let Some(action) = update.action {
2211 let action_type = match action {
2212 FirewallAction::Allow => "ALLOW",
2213 FirewallAction::Block => "DROP",
2214 FirewallAction::Reject => "REJECT",
2215 };
2216 serde_json::json!({ "type": action_type })
2217 } else {
2218 existing.action
2219 };
2220
2221 let ip_protocol_scope = if let Some(protocol) = update.protocol.clone() {
2222 serde_json::json!({ "protocol": protocol })
2223 } else {
2224 existing
2225 .ip_protocol_scope
2226 .unwrap_or_else(|| serde_json::json!("ALL"))
2227 };
2228
2229 let connection_state_filter = existing
2230 .extra
2231 .get("connectionStateFilter")
2232 .and_then(serde_json::Value::as_array)
2233 .map(|arr| {
2234 arr.iter()
2235 .filter_map(|v| v.as_str().map(ToOwned::to_owned))
2236 .collect::<Vec<_>>()
2237 });
2238
2239 let payload = unifly_api::integration_types::FirewallPolicyCreateUpdate {
2240 name: update.name.unwrap_or(existing.name),
2241 description: update.description.or(existing.description),
2242 enabled: update.enabled.unwrap_or(existing.enabled),
2243 action,
2244 source,
2245 destination,
2246 ip_protocol_scope,
2247 logging_enabled: existing.logging_enabled,
2248 ipsec_filter: existing
2249 .extra
2250 .get("ipsecFilter")
2251 .and_then(serde_json::Value::as_str)
2252 .map(ToOwned::to_owned),
2253 schedule: existing.extra.get("schedule").cloned(),
2254 connection_state_filter,
2255 };
2256
2257 ic.update_firewall_policy(&sid, &uuid, &payload).await?;
2258 Ok(CommandResult::Ok)
2259 }
2260
2261 Command::DeleteFirewallPolicy { id } => {
2262 let (ic, sid) =
2263 require_integration(&integration_guard, site_id, "DeleteFirewallPolicy")?;
2264 let uuid = require_uuid(&id)?;
2265 ic.delete_firewall_policy(&sid, &uuid).await?;
2266 Ok(CommandResult::Ok)
2267 }
2268
2269 Command::PatchFirewallPolicy { id, enabled } => {
2270 let (ic, sid) =
2271 require_integration(&integration_guard, site_id, "PatchFirewallPolicy")?;
2272 let uuid = require_uuid(&id)?;
2273 let body = unifly_api::integration_types::FirewallPolicyPatch {
2274 enabled: Some(enabled),
2275 logging_enabled: None,
2276 };
2277 ic.patch_firewall_policy(&sid, &uuid, &body).await?;
2278 Ok(CommandResult::Ok)
2279 }
2280
2281 Command::ReorderFirewallPolicies {
2282 zone_pair: _,
2283 ordered_ids,
2284 } => {
2285 let (ic, sid) =
2286 require_integration(&integration_guard, site_id, "ReorderFirewallPolicies")?;
2287 let uuids: Result<Vec<uuid::Uuid>, _> = ordered_ids.iter().map(require_uuid).collect();
2288 let body = unifly_api::integration_types::FirewallPolicyOrdering {
2289 before_system_defined: uuids?,
2290 after_system_defined: Vec::new(),
2291 };
2292 ic.set_firewall_policy_ordering(&sid, &body).await?;
2293 Ok(CommandResult::Ok)
2294 }
2295
2296 Command::CreateFirewallZone(req) => {
2298 let (ic, sid) = require_integration(&integration_guard, site_id, "CreateFirewallZone")?;
2299 let network_uuids: Result<Vec<uuid::Uuid>, _> =
2300 req.network_ids.iter().map(require_uuid).collect();
2301 let body = unifly_api::integration_types::FirewallZoneCreateUpdate {
2302 name: req.name,
2303 network_ids: network_uuids?,
2304 };
2305 ic.create_firewall_zone(&sid, &body).await?;
2306 Ok(CommandResult::Ok)
2307 }
2308
2309 Command::UpdateFirewallZone { id, update } => {
2310 let (ic, sid) = require_integration(&integration_guard, site_id, "UpdateFirewallZone")?;
2311 let uuid = require_uuid(&id)?;
2312 let existing = ic.get_firewall_zone(&sid, &uuid).await?;
2313 let network_ids = if let Some(ids) = update.network_ids {
2314 let uuids: Result<Vec<uuid::Uuid>, _> = ids.iter().map(require_uuid).collect();
2315 uuids?
2316 } else {
2317 existing.network_ids
2318 };
2319 let body = unifly_api::integration_types::FirewallZoneCreateUpdate {
2320 name: update.name.unwrap_or(existing.name),
2321 network_ids,
2322 };
2323 ic.update_firewall_zone(&sid, &uuid, &body).await?;
2324 Ok(CommandResult::Ok)
2325 }
2326
2327 Command::DeleteFirewallZone { id } => {
2328 let (ic, sid) = require_integration(&integration_guard, site_id, "DeleteFirewallZone")?;
2329 let uuid = require_uuid(&id)?;
2330 ic.delete_firewall_zone(&sid, &uuid).await?;
2331 Ok(CommandResult::Ok)
2332 }
2333
2334 Command::CreateAclRule(req) => {
2336 let (ic, sid) = require_integration(&integration_guard, site_id, "CreateAclRule")?;
2337 let action_str = match req.action {
2338 FirewallAction::Allow => "ALLOW",
2339 FirewallAction::Block => "BLOCK",
2340 FirewallAction::Reject => "REJECT",
2341 };
2342 let mut source_filter = serde_json::Map::new();
2343 source_filter.insert(
2344 "zoneId".into(),
2345 serde_json::Value::String(req.source_zone_id.to_string()),
2346 );
2347 if let Some(source_port) = req.source_port {
2348 source_filter.insert("port".into(), serde_json::Value::String(source_port));
2349 }
2350 if let Some(protocol) = req.protocol.clone() {
2351 source_filter.insert("protocol".into(), serde_json::Value::String(protocol));
2352 }
2353
2354 let mut destination_filter = serde_json::Map::new();
2355 destination_filter.insert(
2356 "zoneId".into(),
2357 serde_json::Value::String(req.destination_zone_id.to_string()),
2358 );
2359 if let Some(destination_port) = req.destination_port {
2360 destination_filter
2361 .insert("port".into(), serde_json::Value::String(destination_port));
2362 }
2363 if let Some(protocol) = req.protocol {
2364 destination_filter.insert("protocol".into(), serde_json::Value::String(protocol));
2365 }
2366 let body = unifly_api::integration_types::AclRuleCreateUpdate {
2367 name: req.name,
2368 rule_type: req.rule_type,
2369 action: action_str.into(),
2370 enabled: req.enabled,
2371 description: None,
2372 source_filter: Some(serde_json::Value::Object(source_filter)),
2373 destination_filter: Some(serde_json::Value::Object(destination_filter)),
2374 enforcing_device_filter: None,
2375 };
2376 ic.create_acl_rule(&sid, &body).await?;
2377 Ok(CommandResult::Ok)
2378 }
2379
2380 Command::UpdateAclRule { id, update } => {
2381 let (ic, sid) = require_integration(&integration_guard, site_id, "UpdateAclRule")?;
2382 let uuid = require_uuid(&id)?;
2383 let existing = ic.get_acl_rule(&sid, &uuid).await?;
2384 let action_str = match update.action {
2385 Some(FirewallAction::Allow) => "ALLOW".into(),
2386 Some(FirewallAction::Block) => "BLOCK".into(),
2387 Some(FirewallAction::Reject) => "REJECT".into(),
2388 None => existing.action,
2389 };
2390 let body = unifly_api::integration_types::AclRuleCreateUpdate {
2391 name: update.name.unwrap_or(existing.name),
2392 rule_type: existing.rule_type,
2393 action: action_str,
2394 enabled: update.enabled.unwrap_or(existing.enabled),
2395 description: existing.description,
2396 source_filter: existing.source_filter,
2397 destination_filter: existing.destination_filter,
2398 enforcing_device_filter: existing.enforcing_device_filter,
2399 };
2400 ic.update_acl_rule(&sid, &uuid, &body).await?;
2401 Ok(CommandResult::Ok)
2402 }
2403
2404 Command::DeleteAclRule { id } => {
2405 let (ic, sid) = require_integration(&integration_guard, site_id, "DeleteAclRule")?;
2406 let uuid = require_uuid(&id)?;
2407 ic.delete_acl_rule(&sid, &uuid).await?;
2408 Ok(CommandResult::Ok)
2409 }
2410
2411 Command::ReorderAclRules { ordered_ids } => {
2412 let (ic, sid) = require_integration(&integration_guard, site_id, "ReorderAclRules")?;
2413 let uuids: Result<Vec<uuid::Uuid>, _> = ordered_ids.iter().map(require_uuid).collect();
2414 let body = unifly_api::integration_types::AclRuleOrdering {
2415 ordered_acl_rule_ids: uuids?,
2416 };
2417 ic.set_acl_rule_ordering(&sid, &body).await?;
2418 Ok(CommandResult::Ok)
2419 }
2420
2421 Command::CreateDnsPolicy(req) => {
2423 let (ic, sid) = require_integration(&integration_guard, site_id, "CreateDnsPolicy")?;
2424 let policy_type_str = match req.policy_type {
2425 crate::model::DnsPolicyType::ARecord => "A",
2426 crate::model::DnsPolicyType::AaaaRecord => "AAAA",
2427 crate::model::DnsPolicyType::CnameRecord => "CNAME",
2428 crate::model::DnsPolicyType::MxRecord => "MX",
2429 crate::model::DnsPolicyType::TxtRecord => "TXT",
2430 crate::model::DnsPolicyType::SrvRecord => "SRV",
2431 crate::model::DnsPolicyType::ForwardDomain => "FORWARD_DOMAIN",
2432 };
2433 let mut fields = serde_json::Map::new();
2434 if let Some(domains) = req.domains {
2435 if let Some(first) = domains.first() {
2436 fields.insert("domain".into(), serde_json::Value::String(first.clone()));
2437 }
2438 fields.insert(
2439 "domains".into(),
2440 serde_json::Value::Array(
2441 domains.into_iter().map(serde_json::Value::String).collect(),
2442 ),
2443 );
2444 }
2445 if let Some(upstream) = req.upstream {
2446 fields.insert("upstream".into(), serde_json::Value::String(upstream));
2447 }
2448 if let Some(value) = req.value {
2449 fields.insert("value".into(), serde_json::Value::String(value));
2450 }
2451 if let Some(ttl) = req.ttl_seconds {
2452 fields.insert(
2453 "ttl".into(),
2454 serde_json::Value::Number(serde_json::Number::from(ttl)),
2455 );
2456 }
2457 if let Some(priority) = req.priority {
2458 fields.insert(
2459 "priority".into(),
2460 serde_json::Value::Number(serde_json::Number::from(priority)),
2461 );
2462 }
2463 fields.insert("name".into(), serde_json::Value::String(req.name));
2464 let body = unifly_api::integration_types::DnsPolicyCreateUpdate {
2465 policy_type: policy_type_str.into(),
2466 enabled: req.enabled,
2467 fields,
2468 };
2469 ic.create_dns_policy(&sid, &body).await?;
2470 Ok(CommandResult::Ok)
2471 }
2472
2473 Command::UpdateDnsPolicy { id, update } => {
2474 let (ic, sid) = require_integration(&integration_guard, site_id, "UpdateDnsPolicy")?;
2475 let uuid = require_uuid(&id)?;
2476 let existing = ic.get_dns_policy(&sid, &uuid).await?;
2477 let mut fields: serde_json::Map<String, serde_json::Value> =
2478 existing.extra.into_iter().collect();
2479
2480 if let Some(domains) = update.domains {
2481 if let Some(first) = domains.first() {
2482 fields.insert("domain".into(), serde_json::Value::String(first.clone()));
2483 }
2484 fields.insert(
2485 "domains".into(),
2486 serde_json::Value::Array(
2487 domains.into_iter().map(serde_json::Value::String).collect(),
2488 ),
2489 );
2490 } else if let Some(domain) = existing.domain {
2491 fields
2492 .entry("domain")
2493 .or_insert_with(|| serde_json::Value::String(domain));
2494 }
2495
2496 if let Some(name) = update.name {
2497 fields.insert("name".into(), serde_json::Value::String(name));
2498 }
2499 if let Some(upstream) = update.upstream {
2500 fields.insert("upstream".into(), serde_json::Value::String(upstream));
2501 }
2502 if let Some(value) = update.value {
2503 fields.insert("value".into(), serde_json::Value::String(value));
2504 }
2505 if let Some(ttl) = update.ttl_seconds {
2506 fields.insert(
2507 "ttl".into(),
2508 serde_json::Value::Number(serde_json::Number::from(ttl)),
2509 );
2510 }
2511 if let Some(priority) = update.priority {
2512 fields.insert(
2513 "priority".into(),
2514 serde_json::Value::Number(serde_json::Number::from(priority)),
2515 );
2516 }
2517
2518 let body = unifly_api::integration_types::DnsPolicyCreateUpdate {
2519 policy_type: existing.policy_type,
2520 enabled: update.enabled.unwrap_or(existing.enabled),
2521 fields,
2522 };
2523 ic.update_dns_policy(&sid, &uuid, &body).await?;
2524 Ok(CommandResult::Ok)
2525 }
2526
2527 Command::DeleteDnsPolicy { id } => {
2528 let (ic, sid) = require_integration(&integration_guard, site_id, "DeleteDnsPolicy")?;
2529 let uuid = require_uuid(&id)?;
2530 ic.delete_dns_policy(&sid, &uuid).await?;
2531 Ok(CommandResult::Ok)
2532 }
2533
2534 Command::CreateTrafficMatchingList(req) => {
2536 let (ic, sid) =
2537 require_integration(&integration_guard, site_id, "CreateTrafficMatchingList")?;
2538 let mut fields = serde_json::Map::new();
2539 fields.insert(
2540 "entries".into(),
2541 serde_json::Value::Array(
2542 req.entries
2543 .into_iter()
2544 .map(serde_json::Value::String)
2545 .collect(),
2546 ),
2547 );
2548 if let Some(desc) = req.description {
2549 fields.insert("description".into(), serde_json::Value::String(desc));
2550 }
2551 let body = unifly_api::integration_types::TrafficMatchingListCreateUpdate {
2552 name: req.name,
2553 list_type: req.list_type,
2554 fields,
2555 };
2556 ic.create_traffic_matching_list(&sid, &body).await?;
2557 Ok(CommandResult::Ok)
2558 }
2559
2560 Command::UpdateTrafficMatchingList { id, update } => {
2561 let (ic, sid) =
2562 require_integration(&integration_guard, site_id, "UpdateTrafficMatchingList")?;
2563 let uuid = require_uuid(&id)?;
2564 let existing = ic.get_traffic_matching_list(&sid, &uuid).await?;
2565 let mut fields = serde_json::Map::new();
2566 let entries = if let Some(new_entries) = update.entries {
2567 serde_json::Value::Array(
2568 new_entries
2569 .into_iter()
2570 .map(serde_json::Value::String)
2571 .collect(),
2572 )
2573 } else if let Some(existing_entries) = existing.extra.get("entries") {
2574 existing_entries.clone()
2575 } else {
2576 serde_json::Value::Array(Vec::new())
2577 };
2578 fields.insert("entries".into(), entries);
2579 if let Some(desc) = update.description {
2580 fields.insert("description".into(), serde_json::Value::String(desc));
2581 } else if let Some(existing_desc) = existing.extra.get("description") {
2582 fields.insert("description".into(), existing_desc.clone());
2583 }
2584 let body = unifly_api::integration_types::TrafficMatchingListCreateUpdate {
2585 name: update.name.unwrap_or(existing.name),
2586 list_type: existing.list_type,
2587 fields,
2588 };
2589 ic.update_traffic_matching_list(&sid, &uuid, &body).await?;
2590 Ok(CommandResult::Ok)
2591 }
2592
2593 Command::DeleteTrafficMatchingList { id } => {
2594 let (ic, sid) =
2595 require_integration(&integration_guard, site_id, "DeleteTrafficMatchingList")?;
2596 let uuid = require_uuid(&id)?;
2597 ic.delete_traffic_matching_list(&sid, &uuid).await?;
2598 Ok(CommandResult::Ok)
2599 }
2600
2601 Command::CreateVouchers(req) => {
2603 let (ic, sid) = require_integration(&integration_guard, site_id, "CreateVouchers")?;
2604 #[allow(clippy::as_conversions, clippy::cast_possible_wrap)]
2605 let body = unifly_api::integration_types::VoucherCreateRequest {
2606 name: req.name.unwrap_or_else(|| "Voucher".into()),
2607 count: Some(req.count as i32),
2608 time_limit_minutes: i64::from(req.time_limit_minutes.unwrap_or(60)),
2609 authorized_guest_limit: req.authorized_guest_limit.map(i64::from),
2610 data_usage_limit_m_bytes: req.data_usage_limit_mb.map(|m| m as i64),
2611 rx_rate_limit_kbps: req.rx_rate_limit_kbps.map(|r| r as i64),
2612 tx_rate_limit_kbps: req.tx_rate_limit_kbps.map(|r| r as i64),
2613 };
2614 let vouchers = ic.create_vouchers(&sid, &body).await?;
2615 let domain_vouchers: Vec<Voucher> = vouchers.into_iter().map(Voucher::from).collect();
2616 Ok(CommandResult::Vouchers(domain_vouchers))
2617 }
2618
2619 Command::DeleteVoucher { id } => {
2620 let (ic, sid) = require_integration(&integration_guard, site_id, "DeleteVoucher")?;
2621 let uuid = require_uuid(&id)?;
2622 ic.delete_voucher(&sid, &uuid).await?;
2623 Ok(CommandResult::Ok)
2624 }
2625
2626 Command::PurgeVouchers { filter } => {
2627 let (ic, sid) = require_integration(&integration_guard, site_id, "PurgeVouchers")?;
2628 ic.purge_vouchers(&sid, &filter).await?;
2629 Ok(CommandResult::Ok)
2630 }
2631
2632 Command::CreateSite { name, description } => {
2634 let legacy = require_legacy(&legacy_guard)?;
2635 legacy.create_site(&name, &description).await?;
2636 Ok(CommandResult::Ok)
2637 }
2638 Command::DeleteSite { name } => {
2639 let legacy = require_legacy(&legacy_guard)?;
2640 legacy.delete_site(&name).await?;
2641 Ok(CommandResult::Ok)
2642 }
2643 Command::InviteAdmin { name, email, role } => {
2644 let legacy = require_legacy(&legacy_guard)?;
2645 legacy.invite_admin(&name, &email, &role).await?;
2646 Ok(CommandResult::Ok)
2647 }
2648 Command::RevokeAdmin { id } => {
2649 let legacy = require_legacy(&legacy_guard)?;
2650 legacy.revoke_admin(&id.to_string()).await?;
2651 Ok(CommandResult::Ok)
2652 }
2653 Command::UpdateAdmin { id, role } => {
2654 let legacy = require_legacy(&legacy_guard)?;
2655 legacy
2656 .update_admin(&id.to_string(), role.as_deref())
2657 .await?;
2658 Ok(CommandResult::Ok)
2659 }
2660
2661 Command::RebootController => {
2662 let legacy = require_legacy(&legacy_guard)?;
2663 legacy.reboot_controller().await?;
2664 Ok(CommandResult::Ok)
2665 }
2666 Command::PoweroffController => {
2667 let legacy = require_legacy(&legacy_guard)?;
2668 legacy.poweroff_controller().await?;
2669 Ok(CommandResult::Ok)
2670 }
2671 }
2672}
2673
2674fn parse_ipv6_text(raw: &str) -> Option<Ipv6Addr> {
2677 let candidate = raw.trim().split('/').next().unwrap_or(raw).trim();
2678 candidate.parse::<Ipv6Addr>().ok()
2679}
2680
2681fn pick_ipv6_from_value(value: &serde_json::Value) -> Option<String> {
2682 let mut first_link_local: Option<String> = None;
2683
2684 let iter: Box<dyn Iterator<Item = &serde_json::Value> + '_> = match value {
2685 serde_json::Value::Array(items) => Box::new(items.iter()),
2686 _ => Box::new(std::iter::once(value)),
2687 };
2688
2689 for item in iter {
2690 if let Some(ipv6) = item.as_str().and_then(parse_ipv6_text) {
2691 let ip_text = ipv6.to_string();
2692 if !ipv6.is_unicast_link_local() {
2693 return Some(ip_text);
2694 }
2695 if first_link_local.is_none() {
2696 first_link_local = Some(ip_text);
2697 }
2698 }
2699 }
2700
2701 first_link_local
2702}
2703
2704fn parse_legacy_device_wan_ipv6(
2705 extra: &serde_json::Map<String, serde_json::Value>,
2706) -> Option<String> {
2707 if let Some(v) = extra
2709 .get("wan1")
2710 .and_then(|wan| wan.get("ipv6"))
2711 .and_then(pick_ipv6_from_value)
2712 {
2713 return Some(v);
2714 }
2715
2716 extra.get("ipv6").and_then(pick_ipv6_from_value)
2718}
2719
2720fn convert_health_summaries(raw: Vec<serde_json::Value>) -> Vec<HealthSummary> {
2722 raw.into_iter()
2723 .map(|v| HealthSummary {
2724 subsystem: v
2725 .get("subsystem")
2726 .and_then(|v| v.as_str())
2727 .unwrap_or("unknown")
2728 .to_owned(),
2729 status: v
2730 .get("status")
2731 .and_then(|v| v.as_str())
2732 .unwrap_or("unknown")
2733 .to_owned(),
2734 #[allow(clippy::as_conversions, clippy::cast_possible_truncation)]
2735 num_adopted: v
2736 .get("num_adopted")
2737 .and_then(serde_json::Value::as_u64)
2738 .map(|n| n as u32),
2739 #[allow(clippy::as_conversions, clippy::cast_possible_truncation)]
2740 num_sta: v
2741 .get("num_sta")
2742 .and_then(serde_json::Value::as_u64)
2743 .map(|n| n as u32),
2744 tx_bytes_r: v.get("tx_bytes-r").and_then(serde_json::Value::as_u64),
2745 rx_bytes_r: v.get("rx_bytes-r").and_then(serde_json::Value::as_u64),
2746 latency: v.get("latency").and_then(serde_json::Value::as_f64),
2747 wan_ip: v.get("wan_ip").and_then(|v| v.as_str()).map(String::from),
2748 gateways: v.get("gateways").and_then(|v| v.as_array()).map(|a| {
2749 a.iter()
2750 .filter_map(|g| g.as_str().map(String::from))
2751 .collect()
2752 }),
2753 extra: v,
2754 })
2755 .collect()
2756}
2757
2758fn build_transport(config: &ControllerConfig) -> TransportConfig {
2760 TransportConfig {
2761 tls: tls_to_transport(&config.tls),
2762 timeout: config.timeout,
2763 cookie_jar: None, }
2765}
2766
2767fn tls_to_transport(tls: &TlsVerification) -> TlsMode {
2768 match tls {
2769 TlsVerification::SystemDefaults => TlsMode::System,
2770 TlsVerification::CustomCa(path) => TlsMode::CustomCa(path.clone()),
2771 TlsVerification::DangerAcceptInvalid => TlsMode::DangerAcceptInvalid,
2772 }
2773}
2774
2775fn unwrap_or_empty<S, D>(endpoint: &str, result: Result<Vec<S>, unifly_api::Error>) -> Vec<D>
2781where
2782 D: From<S>,
2783{
2784 match result {
2785 Ok(items) => items.into_iter().map(D::from).collect(),
2786 Err(ref e) if e.is_not_found() => {
2787 debug!("{endpoint}: not available (404), treating as empty");
2788 Vec::new()
2789 }
2790 Err(e) => {
2791 warn!("{endpoint}: unexpected error {e}, treating as empty");
2792 Vec::new()
2793 }
2794 }
2795}
2796
2797async fn resolve_site_id(
2802 client: &IntegrationClient,
2803 site_name: &str,
2804) -> Result<uuid::Uuid, CoreError> {
2805 if let Ok(uuid) = uuid::Uuid::parse_str(site_name) {
2807 return Ok(uuid);
2808 }
2809
2810 let sites = client
2811 .paginate_all(50, |off, lim| client.list_sites(off, lim))
2812 .await?;
2813
2814 sites
2815 .into_iter()
2816 .find(|s| s.internal_reference == site_name)
2817 .map(|s| s.id)
2818 .ok_or_else(|| CoreError::SiteNotFound {
2819 name: site_name.to_owned(),
2820 })
2821}
2822
2823async fn setup_legacy_client(
2825 config: &ControllerConfig,
2826 transport: &TransportConfig,
2827) -> Result<LegacyClient, CoreError> {
2828 let platform = LegacyClient::detect_platform(&config.url).await?;
2829 let client = LegacyClient::new(config.url.clone(), config.site.clone(), platform, transport)?;
2830 Ok(client)
2831}
2832
2833fn parse_ipv4_cidr(cidr: &str) -> Result<(Ipv4Addr, u8), CoreError> {
2834 let (host, prefix) = cidr
2835 .split_once('/')
2836 .ok_or_else(|| CoreError::ValidationFailed {
2837 message: format!("invalid ipv4 host/prefix value '{cidr}'"),
2838 })?;
2839 let host_ip = host
2840 .parse::<Ipv4Addr>()
2841 .map_err(|_| CoreError::ValidationFailed {
2842 message: format!("invalid IPv4 host address '{host}'"),
2843 })?;
2844 let prefix_len = prefix
2845 .parse::<u8>()
2846 .map_err(|_| CoreError::ValidationFailed {
2847 message: format!("invalid IPv4 prefix length '{prefix}'"),
2848 })?;
2849 if prefix_len > 32 {
2850 return Err(CoreError::ValidationFailed {
2851 message: format!("IPv4 prefix length must be <= 32, got {prefix_len}"),
2852 });
2853 }
2854 Ok((host_ip, prefix_len))
2855}
2856
2857fn require_uuid(id: &EntityId) -> Result<uuid::Uuid, CoreError> {
2859 id.as_uuid().copied().ok_or_else(|| CoreError::Unsupported {
2860 operation: "Integration API operation on legacy ID".into(),
2861 required: "UUID-based entity ID".into(),
2862 })
2863}
2864
2865fn require_legacy<'a>(
2866 guard: &'a tokio::sync::MutexGuard<'_, Option<LegacyClient>>,
2867) -> Result<&'a LegacyClient, CoreError> {
2868 guard.as_ref().ok_or(CoreError::ControllerDisconnected)
2869}
2870
2871fn require_integration<'a>(
2872 guard: &'a tokio::sync::MutexGuard<'_, Option<IntegrationClient>>,
2873 site_id: Option<uuid::Uuid>,
2874 operation: &str,
2875) -> Result<(&'a IntegrationClient, uuid::Uuid), CoreError> {
2876 let client = guard.as_ref().ok_or_else(|| unsupported(operation))?;
2877 let sid = site_id.ok_or_else(|| unsupported(operation))?;
2878 Ok((client, sid))
2879}
2880
2881fn unsupported(operation: &str) -> CoreError {
2882 CoreError::Unsupported {
2883 operation: operation.into(),
2884 required: "Integration API".into(),
2885 }
2886}
2887
2888fn device_mac(store: &DataStore, id: &EntityId) -> Result<MacAddress, CoreError> {
2890 store
2891 .device_by_id(id)
2892 .map(|d| d.mac.clone())
2893 .ok_or_else(|| CoreError::DeviceNotFound {
2894 identifier: id.to_string(),
2895 })
2896}
2897
2898fn client_mac(store: &DataStore, id: &EntityId) -> Result<MacAddress, CoreError> {
2900 store
2901 .client_by_id(id)
2902 .map(|c| c.mac.clone())
2903 .ok_or_else(|| CoreError::ClientNotFound {
2904 identifier: id.to_string(),
2905 })
2906}
2907
2908#[cfg(test)]
2909mod tests {
2910 use super::parse_ipv4_cidr;
2911
2912 #[test]
2913 fn parse_ipv4_cidr_accepts_valid_input() {
2914 let (host, prefix) = parse_ipv4_cidr("192.168.10.1/24").expect("valid CIDR");
2915 assert_eq!(host.to_string(), "192.168.10.1");
2916 assert_eq!(prefix, 24);
2917 }
2918
2919 #[test]
2920 fn parse_ipv4_cidr_rejects_invalid_prefix() {
2921 assert!(parse_ipv4_cidr("192.168.10.1/40").is_err());
2922 }
2923
2924 #[test]
2925 fn parse_ipv4_cidr_rejects_missing_prefix() {
2926 assert!(parse_ipv4_cidr("192.168.10.1").is_err());
2927 }
2928}