1use std::collections::HashMap;
8use std::net::{Ipv4Addr, Ipv6Addr};
9use std::sync::Arc;
10use std::time::Duration;
11
12use tokio::sync::{Mutex, broadcast, mpsc, watch};
13use tokio::task::JoinHandle;
14use tokio_util::sync::CancellationToken;
15use tracing::{debug, info, warn};
16
17use crate::command::{Command, CommandEnvelope, CommandResult};
18use crate::config::{AuthCredentials, ControllerConfig, TlsVerification};
19use crate::core_error::CoreError;
20use crate::model::{
21 AclRule, Admin, Alarm, Client, Country, Device, DnsPolicy, DpiApplication, DpiCategory,
22 EntityId, Event, FirewallAction, FirewallPolicy, FirewallZone, HealthSummary, MacAddress,
23 Network, NetworkManagement, NetworkPurpose, RadiusProfile, Site, SysInfo, SystemInfo,
24 TrafficMatchingList, Voucher, VpnServer, VpnTunnel, WanInterface, WifiBroadcast,
25};
26use crate::store::DataStore;
27use crate::stream::EntityStream;
28
29use crate::transport::{TlsMode, TransportConfig};
30use crate::websocket::{ReconnectConfig, WebSocketHandle};
31use crate::{IntegrationClient, LegacyClient};
32
33const COMMAND_CHANNEL_SIZE: usize = 64;
34const EVENT_CHANNEL_SIZE: usize = 256;
35
36#[derive(Debug, Clone, PartialEq, Eq)]
40pub enum ConnectionState {
41 Disconnected,
42 Connecting,
43 Connected,
44 Reconnecting { attempt: u32 },
45 Failed,
46}
47
48#[derive(Clone)]
56pub struct Controller {
57 inner: Arc<ControllerInner>,
58}
59
60struct ControllerInner {
61 config: ControllerConfig,
62 store: Arc<DataStore>,
63 connection_state: watch::Sender<ConnectionState>,
64 event_tx: broadcast::Sender<Arc<Event>>,
65 command_tx: Mutex<mpsc::Sender<CommandEnvelope>>,
66 command_rx: Mutex<Option<mpsc::Receiver<CommandEnvelope>>>,
67 cancel: CancellationToken,
68 cancel_child: Mutex<CancellationToken>,
71 legacy_client: Mutex<Option<LegacyClient>>,
72 integration_client: Mutex<Option<IntegrationClient>>,
73 site_id: Mutex<Option<uuid::Uuid>>,
75 ws_handle: Mutex<Option<WebSocketHandle>>,
77 task_handles: Mutex<Vec<JoinHandle<()>>>,
78 warnings: Mutex<Vec<String>>,
80}
81
82impl Controller {
83 pub fn new(config: ControllerConfig) -> Self {
86 let store = Arc::new(DataStore::new());
87 let (connection_state, _) = watch::channel(ConnectionState::Disconnected);
88 let (event_tx, _) = broadcast::channel(EVENT_CHANNEL_SIZE);
89 let (command_tx, command_rx) = mpsc::channel(COMMAND_CHANNEL_SIZE);
90 let cancel = CancellationToken::new();
91 let cancel_child = cancel.child_token();
92
93 Self {
94 inner: Arc::new(ControllerInner {
95 config,
96 store,
97 connection_state,
98 event_tx,
99 command_tx: Mutex::new(command_tx),
100 command_rx: Mutex::new(Some(command_rx)),
101 cancel,
102 cancel_child: Mutex::new(cancel_child),
103 legacy_client: Mutex::new(None),
104 integration_client: Mutex::new(None),
105 warnings: Mutex::new(Vec::new()),
106 site_id: Mutex::new(None),
107 ws_handle: Mutex::new(None),
108 task_handles: Mutex::new(Vec::new()),
109 }),
110 }
111 }
112
113 pub fn config(&self) -> &ControllerConfig {
115 &self.inner.config
116 }
117
118 pub fn store(&self) -> &Arc<DataStore> {
120 &self.inner.store
121 }
122
123 #[allow(clippy::cognitive_complexity, clippy::too_many_lines)]
131 pub async fn connect(&self) -> Result<(), CoreError> {
132 let _ = self
133 .inner
134 .connection_state
135 .send(ConnectionState::Connecting);
136
137 let child = self.inner.cancel.child_token();
139 *self.inner.cancel_child.lock().await = child.clone();
140
141 let config = &self.inner.config;
142 let transport = build_transport(config);
143
144 match &config.auth {
145 AuthCredentials::ApiKey(api_key) => {
146 let platform = LegacyClient::detect_platform(&config.url).await?;
148 debug!(?platform, "detected controller platform");
149
150 let integration = IntegrationClient::from_api_key(
152 config.url.as_str(),
153 api_key,
154 &transport,
155 platform,
156 )?;
157
158 let site_id = resolve_site_id(&integration, &config.site).await?;
160 debug!(site_id = %site_id, "resolved Integration API site UUID");
161
162 *self.inner.integration_client.lock().await = Some(integration);
163 *self.inner.site_id.lock().await = Some(site_id);
164 }
165 AuthCredentials::Credentials { username, password } => {
166 let platform = LegacyClient::detect_platform(&config.url).await?;
168 debug!(?platform, "detected controller platform");
169
170 let client = LegacyClient::new(
171 config.url.clone(),
172 config.site.clone(),
173 platform,
174 &transport,
175 )?;
176 client.login(username, password).await?;
177 debug!("session authentication successful");
178
179 *self.inner.legacy_client.lock().await = Some(client);
180 }
181 AuthCredentials::Hybrid {
182 api_key,
183 username,
184 password,
185 } => {
186 let platform = LegacyClient::detect_platform(&config.url).await?;
188 debug!(?platform, "detected controller platform (hybrid)");
189
190 let integration = IntegrationClient::from_api_key(
192 config.url.as_str(),
193 api_key,
194 &transport,
195 platform,
196 )?;
197
198 let site_id = resolve_site_id(&integration, &config.site).await?;
199 debug!(site_id = %site_id, "resolved Integration API site UUID");
200
201 *self.inner.integration_client.lock().await = Some(integration);
202 *self.inner.site_id.lock().await = Some(site_id);
203
204 match LegacyClient::new(
208 config.url.clone(),
209 config.site.clone(),
210 platform,
211 &transport,
212 ) {
213 Ok(client) => match client.login(username, password).await {
214 Ok(()) => {
215 debug!("legacy session authentication successful (hybrid)");
216 *self.inner.legacy_client.lock().await = Some(client);
217 }
218 Err(e) => {
219 let msg = format!(
220 "Legacy login failed: {e} — events, health stats, and client traffic will be unavailable"
221 );
222 warn!("{msg}");
223 self.inner.warnings.lock().await.push(msg);
224 }
225 },
226 Err(e) => {
227 let msg = format!("Legacy client setup failed: {e}");
228 warn!("{msg}");
229 self.inner.warnings.lock().await.push(msg);
230 }
231 }
232 }
233 AuthCredentials::Cloud { api_key, host_id } => {
234 let integration = IntegrationClient::from_api_key(
235 config.url.as_str(),
236 api_key,
237 &transport,
238 crate::ControllerPlatform::Cloud,
239 )?;
240
241 let site_id = if let Ok(uuid) = uuid::Uuid::parse_str(&config.site) {
242 uuid
243 } else if let Ok(uuid) = uuid::Uuid::parse_str(host_id) {
244 uuid
245 } else {
246 resolve_site_id(&integration, &config.site).await?
247 };
248 debug!(site_id = %site_id, "resolved cloud Integration API site UUID");
249
250 *self.inner.integration_client.lock().await = Some(integration);
251 *self.inner.site_id.lock().await = Some(site_id);
252
253 let msg =
254 "Cloud auth mode active: Legacy API and WebSocket features are unavailable"
255 .to_string();
256 self.inner.warnings.lock().await.push(msg);
257 }
258 }
259
260 self.full_refresh().await?;
262
263 let mut handles = self.inner.task_handles.lock().await;
265
266 if let Some(rx) = self.inner.command_rx.lock().await.take() {
267 let ctrl = self.clone();
268 handles.push(tokio::spawn(command_processor_task(ctrl, rx)));
269 }
270
271 let interval_secs = config.refresh_interval_secs;
272 if interval_secs > 0 {
273 let ctrl = self.clone();
274 let cancel = child.clone();
275 handles.push(tokio::spawn(refresh_task(ctrl, interval_secs, cancel)));
276 }
277
278 if config.websocket_enabled {
280 self.spawn_websocket(&child, &mut handles).await;
281 }
282
283 let _ = self.inner.connection_state.send(ConnectionState::Connected);
284 info!("connected to controller");
285 Ok(())
286 }
287
288 async fn spawn_websocket(&self, cancel: &CancellationToken, handles: &mut Vec<JoinHandle<()>>) {
293 let legacy_guard = self.inner.legacy_client.lock().await;
294 let Some(ref legacy) = *legacy_guard else {
295 debug!("no legacy client — WebSocket unavailable");
296 return;
297 };
298
299 let platform = legacy.platform();
300 let Some(ws_path_template) = platform.websocket_path() else {
301 debug!("platform does not support WebSocket");
302 return;
303 };
304
305 let ws_path = ws_path_template.replace("{site}", &self.inner.config.site);
306 let base_url = &self.inner.config.url;
307 let scheme = if base_url.scheme() == "https" {
308 "wss"
309 } else {
310 "ws"
311 };
312 let host = base_url.host_str().unwrap_or("localhost");
313 let ws_url_str = match base_url.port() {
314 Some(p) => format!("{scheme}://{host}:{p}{ws_path}"),
315 None => format!("{scheme}://{host}{ws_path}"),
316 };
317 let ws_url = match url::Url::parse(&ws_url_str) {
318 Ok(u) => u,
319 Err(e) => {
320 warn!(error = %e, url = %ws_url_str, "invalid WebSocket URL");
321 return;
322 }
323 };
324
325 let cookie = legacy.cookie_header();
326 drop(legacy_guard);
327
328 if cookie.is_none() {
329 warn!("no session cookie — WebSocket requires legacy auth (skipping)");
330 return;
331 }
332
333 let ws_tls = tls_to_transport(&self.inner.config.tls);
334 let ws_cancel = cancel.child_token();
335 let handle = match WebSocketHandle::connect(
336 ws_url,
337 ReconnectConfig::default(),
338 ws_cancel.clone(),
339 cookie,
340 ws_tls,
341 ) {
342 Ok(h) => h,
343 Err(e) => {
344 warn!(error = %e, "WebSocket connection failed (non-fatal)");
345 return;
346 }
347 };
348
349 let mut ws_rx = handle.subscribe();
353 let event_tx = self.inner.event_tx.clone();
354 let store = Arc::clone(&self.inner.store);
355 let bridge_cancel = ws_cancel;
356
357 handles.push(tokio::spawn(async move {
358 loop {
359 tokio::select! {
360 biased;
361 () = bridge_cancel.cancelled() => break,
362 result = ws_rx.recv() => {
363 match result {
364 Ok(ws_event) => {
365 if ws_event.key == "device:sync" || ws_event.key == "device:update" {
367 apply_device_sync(&store, &ws_event.extra);
368 }
369
370 let event = crate::model::event::Event::from(
371 (*ws_event).clone(),
372 );
373 let _ = event_tx.send(Arc::new(event));
374 }
375 Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
376 warn!(skipped = n, "WS bridge: receiver lagged");
377 }
378 Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
379 }
380 }
381 }
382 }
383 }));
384
385 *self.inner.ws_handle.lock().await = Some(handle);
386 info!("WebSocket event stream spawned (handshake in progress)");
387 }
388
389 pub async fn disconnect(&self) {
394 self.inner.cancel_child.lock().await.cancel();
396
397 let mut handles = self.inner.task_handles.lock().await;
399 for handle in handles.drain(..) {
400 let _ = handle.await;
401 }
402
403 if matches!(
405 self.inner.config.auth,
406 AuthCredentials::Credentials { .. } | AuthCredentials::Hybrid { .. }
407 ) && let Some(ref client) = *self.inner.legacy_client.lock().await
408 && let Err(e) = client.logout().await
409 {
410 warn!(error = %e, "logout failed (non-fatal)");
411 }
412
413 if let Some(handle) = self.inner.ws_handle.lock().await.take() {
415 handle.shutdown();
416 }
417
418 *self.inner.legacy_client.lock().await = None;
419 *self.inner.integration_client.lock().await = None;
420 *self.inner.site_id.lock().await = None;
421
422 {
425 let (tx, rx) = mpsc::channel(COMMAND_CHANNEL_SIZE);
426 *self.inner.command_tx.lock().await = tx;
427 *self.inner.command_rx.lock().await = Some(rx);
428 }
429
430 let _ = self
431 .inner
432 .connection_state
433 .send(ConnectionState::Disconnected);
434 debug!("disconnected");
435 }
436
437 #[allow(clippy::cognitive_complexity, clippy::too_many_lines)]
443 pub async fn full_refresh(&self) -> Result<(), CoreError> {
444 let integration_guard = self.inner.integration_client.lock().await;
445 let site_id = *self.inner.site_id.lock().await;
446
447 if let (Some(integration), Some(sid)) = (integration_guard.as_ref(), site_id) {
448 let page_limit = 200;
450
451 let (devices_res, clients_res, networks_res, wifi_res) = tokio::join!(
452 integration.paginate_all(page_limit, |off, lim| {
453 integration.list_devices(&sid, off, lim)
454 }),
455 integration.paginate_all(page_limit, |off, lim| {
456 integration.list_clients(&sid, off, lim)
457 }),
458 integration.paginate_all(page_limit, |off, lim| {
459 integration.list_networks(&sid, off, lim)
460 }),
461 integration.paginate_all(page_limit, |off, lim| {
462 integration.list_wifi_broadcasts(&sid, off, lim)
463 }),
464 );
465
466 let (policies_res, zones_res, acls_res, dns_res, vouchers_res) = tokio::join!(
467 integration.paginate_all(page_limit, |off, lim| {
468 integration.list_firewall_policies(&sid, off, lim)
469 }),
470 integration.paginate_all(page_limit, |off, lim| {
471 integration.list_firewall_zones(&sid, off, lim)
472 }),
473 integration.paginate_all(page_limit, |off, lim| {
474 integration.list_acl_rules(&sid, off, lim)
475 }),
476 integration.paginate_all(page_limit, |off, lim| {
477 integration.list_dns_policies(&sid, off, lim)
478 }),
479 integration.paginate_all(page_limit, |off, lim| {
480 integration.list_vouchers(&sid, off, lim)
481 }),
482 );
483
484 let (sites_res, tml_res) = tokio::join!(
485 integration.paginate_all(50, |off, lim| { integration.list_sites(off, lim) }),
486 integration.paginate_all(page_limit, |off, lim| {
487 integration.list_traffic_matching_lists(&sid, off, lim)
488 }),
489 );
490
491 let devices: Vec<Device> = devices_res?.into_iter().map(Device::from).collect();
493 let mut clients: Vec<Client> = clients_res?.into_iter().map(Client::from).collect();
494 let network_ids: Vec<uuid::Uuid> = networks_res?.into_iter().map(|n| n.id).collect();
496 info!(
497 network_count = network_ids.len(),
498 "fetching network details"
499 );
500 let networks: Vec<Network> = {
501 let futs = network_ids.into_iter().map(|nid| async move {
502 match integration.get_network(&sid, &nid).await {
503 Ok(detail) => Some(Network::from(detail)),
504 Err(e) => {
505 warn!(network_id = %nid, error = %e, "network detail fetch failed");
506 None
507 }
508 }
509 });
510 futures_util::future::join_all(futs)
511 .await
512 .into_iter()
513 .flatten()
514 .collect()
515 };
516 let wifi: Vec<WifiBroadcast> = wifi_res?.into_iter().map(WifiBroadcast::from).collect();
517 let policies: Vec<FirewallPolicy> = policies_res?
518 .into_iter()
519 .map(FirewallPolicy::from)
520 .collect();
521 let zones: Vec<FirewallZone> = zones_res?.into_iter().map(FirewallZone::from).collect();
522 let sites: Vec<Site> = sites_res?.into_iter().map(Site::from).collect();
523 let traffic_matching_lists: Vec<TrafficMatchingList> = tml_res?
524 .into_iter()
525 .map(TrafficMatchingList::from)
526 .collect();
527
528 let acls: Vec<AclRule> = unwrap_or_empty("acl/rules", acls_res);
530 let dns: Vec<DnsPolicy> = unwrap_or_empty("dns/policies", dns_res);
531 let vouchers: Vec<Voucher> = unwrap_or_empty("vouchers", vouchers_res);
532
533 info!(
535 device_count = devices.len(),
536 "enriching devices with statistics"
537 );
538 let mut devices = {
539 let futs = devices.into_iter().map(|mut device| async {
540 if let EntityId::Uuid(device_uuid) = &device.id {
541 match integration.get_device_statistics(&sid, device_uuid).await {
542 Ok(stats_resp) => {
543 device.stats =
544 crate::convert::device_stats_from_integration(&stats_resp);
545 }
546 Err(e) => {
547 warn!(
548 device = ?device.name,
549 error = %e,
550 "device stats fetch failed"
551 );
552 }
553 }
554 }
555 device
556 });
557 futures_util::future::join_all(futs).await
558 };
559
560 drop(integration_guard);
561
562 #[allow(clippy::type_complexity)]
564 let (legacy_events, legacy_health, legacy_clients, legacy_devices, legacy_users): (
565 Vec<Event>,
566 Vec<HealthSummary>,
567 Vec<crate::legacy::models::LegacyClientEntry>,
568 Vec<crate::legacy::models::LegacyDevice>,
569 Vec<crate::legacy::models::LegacyUserEntry>,
570 ) = match *self.inner.legacy_client.lock().await {
571 Some(ref legacy) => {
572 let (events_res, health_res, clients_res, devices_res, users_res) =
573 tokio::join!(
574 legacy.list_events(Some(100)),
575 legacy.get_health(),
576 legacy.list_clients(),
577 legacy.list_devices(),
578 legacy.list_users(),
579 );
580
581 let events = match events_res {
582 Ok(raw) => {
583 let evts: Vec<Event> = raw.into_iter().map(Event::from).collect();
584 for evt in &evts {
585 let _ = self.inner.event_tx.send(Arc::new(evt.clone()));
586 }
587 evts
588 }
589 Err(e) => {
590 warn!(error = %e, "legacy event fetch failed (non-fatal)");
591 Vec::new()
592 }
593 };
594
595 let health = match health_res {
596 Ok(raw) => convert_health_summaries(raw),
597 Err(e) => {
598 warn!(error = %e, "legacy health fetch failed (non-fatal)");
599 Vec::new()
600 }
601 };
602
603 let lc = match clients_res {
604 Ok(raw) => raw,
605 Err(e) => {
606 warn!(
607 error = %e,
608 "legacy client fetch failed (non-fatal)"
609 );
610 Vec::new()
611 }
612 };
613
614 let ld = match devices_res {
615 Ok(raw) => raw,
616 Err(e) => {
617 warn!(error = %e, "legacy device fetch failed (non-fatal)");
618 Vec::new()
619 }
620 };
621
622 let lu = match users_res {
623 Ok(raw) => raw,
624 Err(e) => {
625 warn!(error = %e, "legacy user fetch failed (non-fatal)");
626 Vec::new()
627 }
628 };
629
630 (events, health, lc, ld, lu)
631 }
632 None => (Vec::new(), Vec::new(), Vec::new(), Vec::new(), Vec::new()),
633 };
634
635 if !legacy_clients.is_empty() {
639 let legacy_by_ip: HashMap<&str, &crate::legacy::models::LegacyClientEntry> =
640 legacy_clients
641 .iter()
642 .filter_map(|lc| lc.ip.as_deref().map(|ip| (ip, lc)))
643 .collect();
644 let mut merged = 0u32;
645 for client in &mut clients {
646 let ip_key = client.ip.map(|ip| ip.to_string());
647 if let Some(lc) = ip_key.as_deref().and_then(|ip| legacy_by_ip.get(ip)) {
648 if client.tx_bytes.is_none() {
649 client.tx_bytes = lc.tx_bytes.and_then(|b| u64::try_from(b).ok());
650 }
651 if client.rx_bytes.is_none() {
652 client.rx_bytes = lc.rx_bytes.and_then(|b| u64::try_from(b).ok());
653 }
654 if client.hostname.is_none() {
655 client.hostname.clone_from(&lc.hostname);
656 }
657 if client.wireless.is_none() {
659 let legacy_client: Client = Client::from((*lc).clone());
660 client.wireless = legacy_client.wireless;
661 if client.uplink_device_mac.is_none() {
662 client.uplink_device_mac = legacy_client.uplink_device_mac;
663 }
664 }
665 merged += 1;
666 }
667 }
668 debug!(
669 total_clients = clients.len(),
670 legacy_available = legacy_by_ip.len(),
671 merged,
672 "client traffic merge (by IP)"
673 );
674 }
675
676 if !legacy_users.is_empty() {
678 let users_by_mac: HashMap<String, &crate::legacy::models::LegacyUserEntry> =
679 legacy_users
680 .iter()
681 .map(|u| (u.mac.to_lowercase(), u))
682 .collect();
683 for client in &mut clients {
684 if let Some(user) = users_by_mac.get(&client.mac.as_str().to_lowercase()) {
685 client.use_fixedip = user.use_fixedip.unwrap_or(false);
686 client.fixed_ip = user.fixed_ip.as_deref().and_then(|s| s.parse().ok());
687 }
688 }
689 }
690
691 if !legacy_devices.is_empty() {
693 let legacy_by_mac: HashMap<&str, &crate::legacy::models::LegacyDevice> =
694 legacy_devices.iter().map(|d| (d.mac.as_str(), d)).collect();
695 for device in &mut devices {
696 if let Some(ld) = legacy_by_mac.get(device.mac.as_str()) {
697 if device.client_count.is_none() {
698 device.client_count = ld.num_sta.and_then(|n| n.try_into().ok());
699 }
700 if device.wan_ipv6.is_none() {
701 device.wan_ipv6 = parse_legacy_device_wan_ipv6(&ld.extra);
702 }
703 }
704 }
705 }
706
707 if !legacy_health.is_empty() {
709 self.inner
710 .store
711 .site_health
712 .send_modify(|h| *h = Arc::new(legacy_health));
713 }
714
715 self.inner
716 .store
717 .apply_integration_snapshot(crate::store::RefreshSnapshot {
718 devices,
719 clients,
720 networks,
721 wifi,
722 policies,
723 zones,
724 acls,
725 dns,
726 vouchers,
727 sites,
728 events: legacy_events,
729 traffic_matching_lists,
730 });
731 } else {
732 drop(integration_guard);
734
735 let legacy_guard = self.inner.legacy_client.lock().await;
736 let legacy = legacy_guard
737 .as_ref()
738 .ok_or(CoreError::ControllerDisconnected)?;
739
740 let (devices_res, clients_res, events_res, sites_res) = tokio::join!(
741 legacy.list_devices(),
742 legacy.list_clients(),
743 legacy.list_events(Some(100)),
744 legacy.list_sites(),
745 );
746
747 let devices: Vec<Device> = devices_res?.into_iter().map(Device::from).collect();
748 let clients: Vec<Client> = clients_res?.into_iter().map(Client::from).collect();
749 let events: Vec<Event> = events_res?.into_iter().map(Event::from).collect();
750 let sites: Vec<Site> = sites_res?.into_iter().map(Site::from).collect();
751
752 drop(legacy_guard);
753
754 for event in &events {
755 let _ = self.inner.event_tx.send(Arc::new(event.clone()));
756 }
757
758 self.inner
759 .store
760 .apply_integration_snapshot(crate::store::RefreshSnapshot {
761 devices,
762 clients,
763 networks: Vec::new(),
764 wifi: Vec::new(),
765 policies: Vec::new(),
766 zones: Vec::new(),
767 acls: Vec::new(),
768 dns: Vec::new(),
769 vouchers: Vec::new(),
770 sites,
771 events,
772 traffic_matching_lists: Vec::new(),
773 });
774 }
775
776 debug!(
777 devices = self.inner.store.device_count(),
778 clients = self.inner.store.client_count(),
779 "data refresh complete"
780 );
781
782 Ok(())
783 }
784
785 pub async fn execute(&self, cmd: Command) -> Result<CommandResult, CoreError> {
792 let (tx, rx) = tokio::sync::oneshot::channel();
793
794 let command_tx = self.inner.command_tx.lock().await.clone();
795
796 command_tx
797 .send(CommandEnvelope {
798 command: cmd,
799 response_tx: tx,
800 })
801 .await
802 .map_err(|_| CoreError::ControllerDisconnected)?;
803
804 rx.await.map_err(|_| CoreError::ControllerDisconnected)?
805 }
806
807 pub async fn oneshot<F, Fut, T>(config: ControllerConfig, f: F) -> Result<T, CoreError>
814 where
815 F: FnOnce(Controller) -> Fut,
816 Fut: std::future::Future<Output = Result<T, CoreError>>,
817 {
818 let mut cfg = config;
819 cfg.websocket_enabled = false;
820 cfg.refresh_interval_secs = 0;
821
822 let controller = Controller::new(cfg);
823 controller.connect().await?;
824 let result = f(controller.clone()).await;
825 controller.disconnect().await;
826 result
827 }
828
829 pub fn connection_state(&self) -> watch::Receiver<ConnectionState> {
833 self.inner.connection_state.subscribe()
834 }
835
836 pub fn events(&self) -> broadcast::Receiver<Arc<Event>> {
838 self.inner.event_tx.subscribe()
839 }
840
841 pub fn devices_snapshot(&self) -> Arc<Vec<Arc<Device>>> {
844 self.inner.store.devices_snapshot()
845 }
846
847 pub fn clients_snapshot(&self) -> Arc<Vec<Arc<Client>>> {
848 self.inner.store.clients_snapshot()
849 }
850
851 pub fn networks_snapshot(&self) -> Arc<Vec<Arc<Network>>> {
852 self.inner.store.networks_snapshot()
853 }
854
855 pub fn wifi_broadcasts_snapshot(&self) -> Arc<Vec<Arc<WifiBroadcast>>> {
856 self.inner.store.wifi_broadcasts_snapshot()
857 }
858
859 pub fn firewall_policies_snapshot(&self) -> Arc<Vec<Arc<FirewallPolicy>>> {
860 self.inner.store.firewall_policies_snapshot()
861 }
862
863 pub fn firewall_zones_snapshot(&self) -> Arc<Vec<Arc<FirewallZone>>> {
864 self.inner.store.firewall_zones_snapshot()
865 }
866
867 pub fn acl_rules_snapshot(&self) -> Arc<Vec<Arc<AclRule>>> {
868 self.inner.store.acl_rules_snapshot()
869 }
870
871 pub fn dns_policies_snapshot(&self) -> Arc<Vec<Arc<DnsPolicy>>> {
872 self.inner.store.dns_policies_snapshot()
873 }
874
875 pub fn vouchers_snapshot(&self) -> Arc<Vec<Arc<Voucher>>> {
876 self.inner.store.vouchers_snapshot()
877 }
878
879 pub fn sites_snapshot(&self) -> Arc<Vec<Arc<Site>>> {
880 self.inner.store.sites_snapshot()
881 }
882
883 pub fn events_snapshot(&self) -> Arc<Vec<Arc<Event>>> {
884 self.inner.store.events_snapshot()
885 }
886
887 pub fn traffic_matching_lists_snapshot(&self) -> Arc<Vec<Arc<TrafficMatchingList>>> {
888 self.inner.store.traffic_matching_lists_snapshot()
889 }
890
891 pub fn devices(&self) -> EntityStream<Device> {
894 self.inner.store.subscribe_devices()
895 }
896
897 pub fn clients(&self) -> EntityStream<Client> {
898 self.inner.store.subscribe_clients()
899 }
900
901 pub fn networks(&self) -> EntityStream<Network> {
902 self.inner.store.subscribe_networks()
903 }
904
905 pub fn wifi_broadcasts(&self) -> EntityStream<WifiBroadcast> {
906 self.inner.store.subscribe_wifi_broadcasts()
907 }
908
909 pub fn firewall_policies(&self) -> EntityStream<FirewallPolicy> {
910 self.inner.store.subscribe_firewall_policies()
911 }
912
913 pub fn firewall_zones(&self) -> EntityStream<FirewallZone> {
914 self.inner.store.subscribe_firewall_zones()
915 }
916
917 pub fn acl_rules(&self) -> EntityStream<AclRule> {
918 self.inner.store.subscribe_acl_rules()
919 }
920
921 pub fn dns_policies(&self) -> EntityStream<DnsPolicy> {
922 self.inner.store.subscribe_dns_policies()
923 }
924
925 pub fn vouchers(&self) -> EntityStream<Voucher> {
926 self.inner.store.subscribe_vouchers()
927 }
928
929 pub fn sites(&self) -> EntityStream<Site> {
930 self.inner.store.subscribe_sites()
931 }
932
933 pub fn traffic_matching_lists(&self) -> EntityStream<TrafficMatchingList> {
934 self.inner.store.subscribe_traffic_matching_lists()
935 }
936
937 pub fn site_health(&self) -> watch::Receiver<Arc<Vec<HealthSummary>>> {
939 self.inner.store.subscribe_site_health()
940 }
941
942 pub async fn take_warnings(&self) -> Vec<String> {
944 std::mem::take(&mut *self.inner.warnings.lock().await)
945 }
946
947 pub async fn has_legacy_access(&self) -> bool {
949 self.inner.legacy_client.lock().await.is_some()
950 }
951
952 pub async fn has_integration_access(&self) -> bool {
954 self.inner.integration_client.lock().await.is_some()
955 && self.inner.site_id.lock().await.is_some()
956 }
957
958 pub async fn list_vpn_servers(&self) -> Result<Vec<VpnServer>, CoreError> {
965 let guard = self.inner.integration_client.lock().await;
966 let site_id = *self.inner.site_id.lock().await;
967 let (ic, sid) = require_integration(&guard, site_id, "list_vpn_servers")?;
968 let raw = ic
969 .paginate_all(200, |off, lim| ic.list_vpn_servers(&sid, off, lim))
970 .await?;
971 Ok(raw
972 .into_iter()
973 .map(|s| {
974 let id = s
975 .fields
976 .get("id")
977 .and_then(|v| v.as_str())
978 .and_then(|s| uuid::Uuid::parse_str(s).ok())
979 .map_or_else(|| EntityId::Legacy("unknown".into()), EntityId::Uuid);
980 VpnServer {
981 id,
982 name: s
983 .fields
984 .get("name")
985 .and_then(|v| v.as_str())
986 .map(String::from),
987 server_type: s
988 .fields
989 .get("type")
990 .or_else(|| s.fields.get("serverType"))
991 .and_then(|v| v.as_str())
992 .unwrap_or("UNKNOWN")
993 .to_owned(),
994 enabled: s.fields.get("enabled").and_then(serde_json::Value::as_bool),
995 }
996 })
997 .collect())
998 }
999
1000 pub async fn list_vpn_tunnels(&self) -> Result<Vec<VpnTunnel>, CoreError> {
1002 let guard = self.inner.integration_client.lock().await;
1003 let site_id = *self.inner.site_id.lock().await;
1004 let (ic, sid) = require_integration(&guard, site_id, "list_vpn_tunnels")?;
1005 let raw = ic
1006 .paginate_all(200, |off, lim| ic.list_vpn_tunnels(&sid, off, lim))
1007 .await?;
1008 Ok(raw
1009 .into_iter()
1010 .map(|t| {
1011 let id = t
1012 .fields
1013 .get("id")
1014 .and_then(|v| v.as_str())
1015 .and_then(|s| uuid::Uuid::parse_str(s).ok())
1016 .map_or_else(|| EntityId::Legacy("unknown".into()), EntityId::Uuid);
1017 VpnTunnel {
1018 id,
1019 name: t
1020 .fields
1021 .get("name")
1022 .and_then(|v| v.as_str())
1023 .map(String::from),
1024 tunnel_type: t
1025 .fields
1026 .get("type")
1027 .or_else(|| t.fields.get("tunnelType"))
1028 .and_then(|v| v.as_str())
1029 .unwrap_or("UNKNOWN")
1030 .to_owned(),
1031 enabled: t.fields.get("enabled").and_then(serde_json::Value::as_bool),
1032 }
1033 })
1034 .collect())
1035 }
1036
1037 pub async fn list_wans(&self) -> Result<Vec<WanInterface>, CoreError> {
1039 let guard = self.inner.integration_client.lock().await;
1040 let site_id = *self.inner.site_id.lock().await;
1041 let (ic, sid) = require_integration(&guard, site_id, "list_wans")?;
1042 let raw = ic
1043 .paginate_all(200, |off, lim| ic.list_wans(&sid, off, lim))
1044 .await?;
1045 Ok(raw
1046 .into_iter()
1047 .map(|w| {
1048 let id = w
1049 .fields
1050 .get("id")
1051 .and_then(|v| v.as_str())
1052 .and_then(|s| uuid::Uuid::parse_str(s).ok())
1053 .map_or_else(|| EntityId::Legacy("unknown".into()), EntityId::Uuid);
1054 let parse_ip = |key: &str| -> Option<std::net::IpAddr> {
1055 w.fields
1056 .get(key)
1057 .and_then(|v| v.as_str())
1058 .and_then(|s| s.parse().ok())
1059 };
1060 let dns = w
1061 .fields
1062 .get("dns")
1063 .and_then(|v| v.as_array())
1064 .map(|arr| {
1065 arr.iter()
1066 .filter_map(|v| v.as_str().and_then(|s| s.parse().ok()))
1067 .collect()
1068 })
1069 .unwrap_or_default();
1070 WanInterface {
1071 id,
1072 name: w
1073 .fields
1074 .get("name")
1075 .and_then(|v| v.as_str())
1076 .map(String::from),
1077 ip: parse_ip("ipAddress").or_else(|| parse_ip("ip")),
1078 gateway: parse_ip("gateway"),
1079 dns,
1080 }
1081 })
1082 .collect())
1083 }
1084
1085 pub async fn list_dpi_categories(&self) -> Result<Vec<DpiCategory>, CoreError> {
1087 let guard = self.inner.integration_client.lock().await;
1088 let site_id = *self.inner.site_id.lock().await;
1089 let (ic, sid) = require_integration(&guard, site_id, "list_dpi_categories")?;
1090 let raw = ic
1091 .paginate_all(200, |off, lim| ic.list_dpi_categories(&sid, off, lim))
1092 .await?;
1093 Ok(raw
1094 .into_iter()
1095 .map(|c| {
1096 #[allow(clippy::as_conversions, clippy::cast_possible_truncation)]
1097 let id = c
1098 .fields
1099 .get("id")
1100 .and_then(serde_json::Value::as_u64)
1101 .unwrap_or(0) as u32;
1102 DpiCategory {
1103 id,
1104 name: c
1105 .fields
1106 .get("name")
1107 .and_then(|v| v.as_str())
1108 .unwrap_or("Unknown")
1109 .to_owned(),
1110 tx_bytes: c
1111 .fields
1112 .get("txBytes")
1113 .and_then(serde_json::Value::as_u64)
1114 .unwrap_or(0),
1115 rx_bytes: c
1116 .fields
1117 .get("rxBytes")
1118 .and_then(serde_json::Value::as_u64)
1119 .unwrap_or(0),
1120 apps: Vec::new(),
1121 }
1122 })
1123 .collect())
1124 }
1125
1126 pub async fn list_dpi_applications(&self) -> Result<Vec<DpiApplication>, CoreError> {
1128 let guard = self.inner.integration_client.lock().await;
1129 let site_id = *self.inner.site_id.lock().await;
1130 let (ic, sid) = require_integration(&guard, site_id, "list_dpi_applications")?;
1131 let raw = ic
1132 .paginate_all(200, |off, lim| ic.list_dpi_applications(&sid, off, lim))
1133 .await?;
1134 Ok(raw
1135 .into_iter()
1136 .map(|a| {
1137 #[allow(clippy::as_conversions, clippy::cast_possible_truncation)]
1138 let id = a
1139 .fields
1140 .get("id")
1141 .and_then(serde_json::Value::as_u64)
1142 .unwrap_or(0) as u32;
1143 DpiApplication {
1144 id,
1145 name: a
1146 .fields
1147 .get("name")
1148 .and_then(|v| v.as_str())
1149 .unwrap_or("Unknown")
1150 .to_owned(),
1151 #[allow(clippy::as_conversions, clippy::cast_possible_truncation)]
1152 category_id: a
1153 .fields
1154 .get("categoryId")
1155 .and_then(serde_json::Value::as_u64)
1156 .unwrap_or(0) as u32,
1157 tx_bytes: a
1158 .fields
1159 .get("txBytes")
1160 .and_then(serde_json::Value::as_u64)
1161 .unwrap_or(0),
1162 rx_bytes: a
1163 .fields
1164 .get("rxBytes")
1165 .and_then(serde_json::Value::as_u64)
1166 .unwrap_or(0),
1167 }
1168 })
1169 .collect())
1170 }
1171
1172 pub async fn list_radius_profiles(&self) -> Result<Vec<RadiusProfile>, CoreError> {
1174 let guard = self.inner.integration_client.lock().await;
1175 let site_id = *self.inner.site_id.lock().await;
1176 let (ic, sid) = require_integration(&guard, site_id, "list_radius_profiles")?;
1177 let raw = ic
1178 .paginate_all(200, |off, lim| ic.list_radius_profiles(&sid, off, lim))
1179 .await?;
1180 Ok(raw
1181 .into_iter()
1182 .map(|r| {
1183 let id = r
1184 .fields
1185 .get("id")
1186 .and_then(|v| v.as_str())
1187 .and_then(|s| uuid::Uuid::parse_str(s).ok())
1188 .map_or_else(|| EntityId::Legacy("unknown".into()), EntityId::Uuid);
1189 RadiusProfile {
1190 id,
1191 name: r
1192 .fields
1193 .get("name")
1194 .and_then(|v| v.as_str())
1195 .unwrap_or("Unknown")
1196 .to_owned(),
1197 }
1198 })
1199 .collect())
1200 }
1201
1202 pub async fn list_countries(&self) -> Result<Vec<Country>, CoreError> {
1204 let guard = self.inner.integration_client.lock().await;
1205 let ic = guard
1206 .as_ref()
1207 .ok_or_else(|| unsupported("list_countries"))?;
1208 let raw = ic
1209 .paginate_all(200, |off, lim| ic.list_countries(off, lim))
1210 .await?;
1211 Ok(raw
1212 .into_iter()
1213 .map(|c| Country {
1214 code: c
1215 .fields
1216 .get("code")
1217 .and_then(|v| v.as_str())
1218 .unwrap_or("")
1219 .to_owned(),
1220 name: c
1221 .fields
1222 .get("name")
1223 .and_then(|v| v.as_str())
1224 .unwrap_or("Unknown")
1225 .to_owned(),
1226 })
1227 .collect())
1228 }
1229
1230 pub async fn get_network_references(
1232 &self,
1233 network_id: &EntityId,
1234 ) -> Result<serde_json::Value, CoreError> {
1235 let guard = self.inner.integration_client.lock().await;
1236 let site_id = *self.inner.site_id.lock().await;
1237 let (ic, sid) = require_integration(&guard, site_id, "get_network_references")?;
1238 let uuid = require_uuid(network_id)?;
1239 let refs = ic.get_network_references(&sid, &uuid).await?;
1240 Ok(serde_json::to_value(refs).unwrap_or_default())
1241 }
1242
1243 pub async fn get_firewall_policy_ordering(
1245 &self,
1246 ) -> Result<crate::integration_types::FirewallPolicyOrdering, CoreError> {
1247 let guard = self.inner.integration_client.lock().await;
1248 let site_id = *self.inner.site_id.lock().await;
1249 let (ic, sid) = require_integration(&guard, site_id, "get_firewall_policy_ordering")?;
1250 Ok(ic.get_firewall_policy_ordering(&sid).await?)
1251 }
1252
1253 pub async fn list_pending_devices(&self) -> Result<Vec<serde_json::Value>, CoreError> {
1258 let integration_guard = self.inner.integration_client.lock().await;
1259 let site_id = *self.inner.site_id.lock().await;
1260
1261 if let (Some(ic), Some(sid)) = (integration_guard.as_ref(), site_id) {
1262 let raw = ic
1263 .paginate_all(200, |off, lim| ic.list_pending_devices(&sid, off, lim))
1264 .await?;
1265 return Ok(raw
1266 .into_iter()
1267 .map(|v| serde_json::to_value(v).unwrap_or_default())
1268 .collect());
1269 }
1270
1271 let snapshot = self.devices_snapshot();
1272 Ok(snapshot
1273 .iter()
1274 .filter(|d| d.state == crate::model::DeviceState::PendingAdoption)
1275 .map(|d| serde_json::to_value(d.as_ref()).unwrap_or_default())
1276 .collect())
1277 }
1278
1279 pub async fn list_device_tags(&self) -> Result<Vec<serde_json::Value>, CoreError> {
1283 let integration_guard = self.inner.integration_client.lock().await;
1284 let site_id = *self.inner.site_id.lock().await;
1285 if let (Some(ic), Some(sid)) = (integration_guard.as_ref(), site_id) {
1286 let raw = ic
1287 .paginate_all(200, |off, lim| ic.list_device_tags(&sid, off, lim))
1288 .await?;
1289 return Ok(raw
1290 .into_iter()
1291 .map(|v| serde_json::to_value(v).unwrap_or_default())
1292 .collect());
1293 }
1294
1295 Ok(Vec::new())
1296 }
1297
1298 pub async fn list_backups(&self) -> Result<Vec<serde_json::Value>, CoreError> {
1300 let guard = self.inner.legacy_client.lock().await;
1301 let legacy = require_legacy(&guard)?;
1302 Ok(legacy.list_backups().await?)
1303 }
1304
1305 pub async fn download_backup(&self, filename: &str) -> Result<Vec<u8>, CoreError> {
1307 let guard = self.inner.legacy_client.lock().await;
1308 let legacy = require_legacy(&guard)?;
1309 Ok(legacy.download_backup(filename).await?)
1310 }
1311
1312 pub async fn get_site_stats(
1316 &self,
1317 interval: &str,
1318 start: Option<i64>,
1319 end: Option<i64>,
1320 attrs: Option<&[String]>,
1321 ) -> Result<Vec<serde_json::Value>, CoreError> {
1322 let guard = self.inner.legacy_client.lock().await;
1323 let legacy = require_legacy(&guard)?;
1324 Ok(legacy.get_site_stats(interval, start, end, attrs).await?)
1325 }
1326
1327 pub async fn get_device_stats(
1329 &self,
1330 interval: &str,
1331 macs: Option<&[String]>,
1332 attrs: Option<&[String]>,
1333 ) -> Result<Vec<serde_json::Value>, CoreError> {
1334 let guard = self.inner.legacy_client.lock().await;
1335 let legacy = require_legacy(&guard)?;
1336 Ok(legacy.get_device_stats(interval, macs, attrs).await?)
1337 }
1338
1339 pub async fn get_client_stats(
1341 &self,
1342 interval: &str,
1343 macs: Option<&[String]>,
1344 attrs: Option<&[String]>,
1345 ) -> Result<Vec<serde_json::Value>, CoreError> {
1346 let guard = self.inner.legacy_client.lock().await;
1347 let legacy = require_legacy(&guard)?;
1348 Ok(legacy.get_client_stats(interval, macs, attrs).await?)
1349 }
1350
1351 pub async fn get_gateway_stats(
1353 &self,
1354 interval: &str,
1355 start: Option<i64>,
1356 end: Option<i64>,
1357 attrs: Option<&[String]>,
1358 ) -> Result<Vec<serde_json::Value>, CoreError> {
1359 let guard = self.inner.legacy_client.lock().await;
1360 let legacy = require_legacy(&guard)?;
1361 Ok(legacy
1362 .get_gateway_stats(interval, start, end, attrs)
1363 .await?)
1364 }
1365
1366 pub async fn get_dpi_stats(
1368 &self,
1369 group_by: &str,
1370 macs: Option<&[String]>,
1371 ) -> Result<Vec<serde_json::Value>, CoreError> {
1372 let guard = self.inner.legacy_client.lock().await;
1373 let legacy = require_legacy(&guard)?;
1374 Ok(legacy.get_dpi_stats(group_by, macs).await?)
1375 }
1376
1377 pub async fn list_admins(&self) -> Result<Vec<Admin>, CoreError> {
1383 let guard = self.inner.legacy_client.lock().await;
1384 let legacy = require_legacy(&guard)?;
1385 let raw = legacy.list_admins().await?;
1386 Ok(raw
1387 .into_iter()
1388 .map(|v| Admin {
1389 id: v.get("_id").and_then(|v| v.as_str()).map_or_else(
1390 || EntityId::Legacy("unknown".into()),
1391 |s| EntityId::Legacy(s.into()),
1392 ),
1393 name: v
1394 .get("name")
1395 .and_then(|v| v.as_str())
1396 .unwrap_or("")
1397 .to_owned(),
1398 email: v.get("email").and_then(|v| v.as_str()).map(String::from),
1399 role: v
1400 .get("role")
1401 .and_then(|v| v.as_str())
1402 .unwrap_or("unknown")
1403 .to_owned(),
1404 is_super: v
1405 .get("is_super")
1406 .and_then(serde_json::Value::as_bool)
1407 .unwrap_or(false),
1408 last_login: None,
1409 })
1410 .collect())
1411 }
1412
1413 pub async fn list_alarms(&self) -> Result<Vec<Alarm>, CoreError> {
1415 let guard = self.inner.legacy_client.lock().await;
1416 let legacy = require_legacy(&guard)?;
1417 let raw = legacy.list_alarms().await?;
1418 Ok(raw.into_iter().map(Alarm::from).collect())
1419 }
1420
1421 pub async fn get_system_info(&self) -> Result<SystemInfo, CoreError> {
1426 {
1428 let guard = self.inner.integration_client.lock().await;
1429 if let Some(ic) = guard.as_ref() {
1430 let info = ic.get_info().await?;
1431 let f = &info.fields;
1432 return Ok(SystemInfo {
1433 controller_name: f
1434 .get("applicationName")
1435 .or_else(|| f.get("name"))
1436 .and_then(|v| v.as_str())
1437 .map(String::from),
1438 version: f
1439 .get("applicationVersion")
1440 .or_else(|| f.get("version"))
1441 .and_then(|v| v.as_str())
1442 .unwrap_or("unknown")
1443 .to_owned(),
1444 build: f.get("build").and_then(|v| v.as_str()).map(String::from),
1445 hostname: f.get("hostname").and_then(|v| v.as_str()).map(String::from),
1446 ip: None, uptime_secs: f.get("uptime").and_then(serde_json::Value::as_u64),
1448 update_available: f
1449 .get("isUpdateAvailable")
1450 .or_else(|| f.get("update_available"))
1451 .and_then(serde_json::Value::as_bool),
1452 });
1453 }
1454 }
1455
1456 let guard = self.inner.legacy_client.lock().await;
1458 let legacy = require_legacy(&guard)?;
1459 let raw = legacy.get_sysinfo().await?;
1460 Ok(SystemInfo {
1461 controller_name: raw
1462 .get("controller_name")
1463 .or_else(|| raw.get("name"))
1464 .and_then(|v| v.as_str())
1465 .map(String::from),
1466 version: raw
1467 .get("version")
1468 .and_then(|v| v.as_str())
1469 .unwrap_or("unknown")
1470 .to_owned(),
1471 build: raw.get("build").and_then(|v| v.as_str()).map(String::from),
1472 hostname: raw
1473 .get("hostname")
1474 .and_then(|v| v.as_str())
1475 .map(String::from),
1476 ip: raw
1477 .get("ip_addrs")
1478 .and_then(|v| v.as_array())
1479 .and_then(|a| a.first())
1480 .and_then(|v| v.as_str())
1481 .and_then(|s| s.parse().ok()),
1482 uptime_secs: raw.get("uptime").and_then(serde_json::Value::as_u64),
1483 update_available: raw
1484 .get("update_available")
1485 .and_then(serde_json::Value::as_bool),
1486 })
1487 }
1488
1489 pub async fn get_site_health(&self) -> Result<Vec<HealthSummary>, CoreError> {
1491 let guard = self.inner.legacy_client.lock().await;
1492 let legacy = require_legacy(&guard)?;
1493 let raw = legacy.get_health().await?;
1494 Ok(convert_health_summaries(raw))
1495 }
1496
1497 pub async fn get_sysinfo(&self) -> Result<SysInfo, CoreError> {
1499 let guard = self.inner.legacy_client.lock().await;
1500 let legacy = require_legacy(&guard)?;
1501 let raw = legacy.get_sysinfo().await?;
1502 Ok(SysInfo {
1503 timezone: raw
1504 .get("timezone")
1505 .and_then(|v| v.as_str())
1506 .map(String::from),
1507 autobackup: raw.get("autobackup").and_then(serde_json::Value::as_bool),
1508 hostname: raw
1509 .get("hostname")
1510 .and_then(|v| v.as_str())
1511 .map(String::from),
1512 ip_addrs: raw
1513 .get("ip_addrs")
1514 .and_then(|v| v.as_array())
1515 .map(|a| {
1516 a.iter()
1517 .filter_map(|v| v.as_str().map(String::from))
1518 .collect()
1519 })
1520 .unwrap_or_default(),
1521 live_chat: raw
1522 .get("live_chat")
1523 .and_then(|v| v.as_str())
1524 .map(String::from),
1525 #[allow(clippy::as_conversions, clippy::cast_possible_truncation)]
1526 data_retention_days: raw
1527 .get("data_retention_days")
1528 .and_then(serde_json::Value::as_u64)
1529 .map(|n| n as u32),
1530 extra: raw,
1531 })
1532 }
1533}
1534
1535fn parse_f64_field(parent: Option<&serde_json::Value>, key: &str) -> Option<f64> {
1539 parent.and_then(|s| s.get(key)).and_then(|v| {
1540 v.as_str()
1541 .and_then(|s| s.parse().ok())
1542 .or_else(|| v.as_f64())
1543 })
1544}
1545
1546#[allow(clippy::cast_precision_loss)]
1552fn apply_device_sync(store: &DataStore, data: &serde_json::Value) {
1553 let Some(mac_str) = data.get("mac").and_then(serde_json::Value::as_str) else {
1554 return;
1555 };
1556 let mac = MacAddress::new(mac_str);
1557 let Some(existing) = store.device_by_mac(&mac) else {
1558 return; };
1560
1561 let sys = data.get("sys_stats");
1563 let cpu = sys
1564 .and_then(|s| s.get("cpu"))
1565 .and_then(|v| v.as_str().or_else(|| v.as_f64().map(|_| "")))
1566 .and_then(|s| {
1567 if s.is_empty() {
1568 None
1569 } else {
1570 s.parse::<f64>().ok()
1571 }
1572 })
1573 .or_else(|| {
1574 sys.and_then(|s| s.get("cpu"))
1575 .and_then(serde_json::Value::as_f64)
1576 });
1577 #[allow(clippy::as_conversions, clippy::cast_precision_loss)]
1578 let mem_pct = match (
1579 sys.and_then(|s| s.get("mem_used"))
1580 .and_then(serde_json::Value::as_i64),
1581 sys.and_then(|s| s.get("mem_total"))
1582 .and_then(serde_json::Value::as_i64),
1583 ) {
1584 (Some(used), Some(total)) if total > 0 => Some((used as f64 / total as f64) * 100.0),
1585 _ => None,
1586 };
1587 let load_averages: [Option<f64>; 3] =
1588 ["loadavg_1", "loadavg_5", "loadavg_15"].map(|key| parse_f64_field(sys, key));
1589
1590 let uplink = data.get("uplink");
1592 let tx_bps = uplink
1593 .and_then(|u| u.get("tx_bytes-r").or_else(|| u.get("tx_bytes_r")))
1594 .and_then(serde_json::Value::as_u64)
1595 .or_else(|| data.get("tx_bytes-r").and_then(serde_json::Value::as_u64));
1596 let rx_bps = uplink
1597 .and_then(|u| u.get("rx_bytes-r").or_else(|| u.get("rx_bytes_r")))
1598 .and_then(serde_json::Value::as_u64)
1599 .or_else(|| data.get("rx_bytes-r").and_then(serde_json::Value::as_u64));
1600
1601 let bandwidth = match (tx_bps, rx_bps) {
1602 (Some(tx), Some(rx)) if tx > 0 || rx > 0 => Some(crate::model::common::Bandwidth {
1603 tx_bytes_per_sec: tx,
1604 rx_bytes_per_sec: rx,
1605 }),
1606 _ => existing.stats.uplink_bandwidth, };
1608
1609 let uptime = data
1611 .get("_uptime")
1612 .or_else(|| data.get("uptime"))
1613 .and_then(serde_json::Value::as_i64)
1614 .and_then(|u| u.try_into().ok())
1615 .or(existing.stats.uptime_secs);
1616
1617 let mut device = (*existing).clone();
1619 device.stats.uplink_bandwidth = bandwidth;
1620 if let Some(c) = cpu {
1621 device.stats.cpu_utilization_pct = Some(c);
1622 }
1623 if let Some(m) = mem_pct {
1624 device.stats.memory_utilization_pct = Some(m);
1625 }
1626 if let Some(l) = load_averages[0] {
1627 device.stats.load_average_1m = Some(l);
1628 }
1629 if let Some(l) = load_averages[1] {
1630 device.stats.load_average_5m = Some(l);
1631 }
1632 if let Some(l) = load_averages[2] {
1633 device.stats.load_average_15m = Some(l);
1634 }
1635 device.stats.uptime_secs = uptime;
1636
1637 if let Some(num_sta) = data.get("num_sta").and_then(serde_json::Value::as_u64) {
1639 #[allow(clippy::as_conversions, clippy::cast_possible_truncation)]
1640 {
1641 device.client_count = Some(num_sta as u32);
1642 }
1643 }
1644
1645 if let Some(obj) = data.as_object()
1646 && let Some(wan_ipv6) = parse_legacy_device_wan_ipv6(obj)
1647 {
1648 device.wan_ipv6 = Some(wan_ipv6);
1649 }
1650
1651 let key = mac.as_str().to_owned();
1652 let id = device.id.clone();
1653 store.devices.upsert(key, id, device);
1654}
1655
1656async fn refresh_task(controller: Controller, interval_secs: u64, cancel: CancellationToken) {
1658 let mut interval = tokio::time::interval(Duration::from_secs(interval_secs));
1659 interval.tick().await; loop {
1662 tokio::select! {
1663 biased;
1664 () = cancel.cancelled() => break,
1665 _ = interval.tick() => {
1666 if let Err(e) = controller.full_refresh().await {
1667 warn!(error = %e, "periodic refresh failed");
1668 }
1669 }
1670 }
1671 }
1672}
1673
1674async fn command_processor_task(controller: Controller, mut rx: mpsc::Receiver<CommandEnvelope>) {
1677 let cancel = controller.inner.cancel_child.lock().await.clone();
1678
1679 loop {
1680 tokio::select! {
1681 biased;
1682 () = cancel.cancelled() => break,
1683 envelope = rx.recv() => {
1684 let Some(envelope) = envelope else { break };
1685 let result = route_command(&controller, envelope.command).await;
1686 let _ = envelope.response_tx.send(result);
1687 }
1688 }
1689 }
1690}
1691
1692#[allow(clippy::cognitive_complexity, clippy::too_many_lines)]
1699async fn route_command(controller: &Controller, cmd: Command) -> Result<CommandResult, CoreError> {
1700 let store = &controller.inner.store;
1701
1702 let integration_guard = controller.inner.integration_client.lock().await;
1704 let legacy_guard = controller.inner.legacy_client.lock().await;
1705 let site_id = *controller.inner.site_id.lock().await;
1706
1707 match cmd {
1708 Command::AdoptDevice {
1710 mac,
1711 ignore_device_limit,
1712 } => {
1713 if let (Some(ic), Some(sid)) = (integration_guard.as_ref(), site_id) {
1714 ic.adopt_device(&sid, mac.as_str(), ignore_device_limit)
1715 .await?;
1716 } else {
1717 let legacy = require_legacy(&legacy_guard)?;
1718 legacy.adopt_device(mac.as_str()).await?;
1719 }
1720 Ok(CommandResult::Ok)
1721 }
1722
1723 Command::RestartDevice { id } => {
1724 if let (Some(ic), Some(sid)) = (integration_guard.as_ref(), site_id) {
1725 let device_uuid = require_uuid(&id)?;
1726 ic.device_action(&sid, &device_uuid, "RESTART").await?;
1727 } else {
1728 let legacy = require_legacy(&legacy_guard)?;
1729 let mac = device_mac(store, &id)?;
1730 legacy.restart_device(mac.as_str()).await?;
1731 }
1732 Ok(CommandResult::Ok)
1733 }
1734
1735 Command::LocateDevice { mac, enable } => {
1736 if let (Some(ic), Some(sid)) = (integration_guard.as_ref(), site_id) {
1737 let device =
1738 store
1739 .device_by_mac(&mac)
1740 .ok_or_else(|| CoreError::DeviceNotFound {
1741 identifier: mac.to_string(),
1742 })?;
1743 let device_uuid = require_uuid(&device.id)?;
1744 let action = if enable { "LOCATE_ON" } else { "LOCATE_OFF" };
1745 ic.device_action(&sid, &device_uuid, action).await?;
1746 } else {
1747 let legacy = require_legacy(&legacy_guard)?;
1748 legacy.locate_device(mac.as_str(), enable).await?;
1749 }
1750 Ok(CommandResult::Ok)
1751 }
1752
1753 Command::UpgradeDevice { mac, firmware_url } => {
1754 let legacy = require_legacy(&legacy_guard)?;
1755 legacy
1756 .upgrade_device(mac.as_str(), firmware_url.as_deref())
1757 .await?;
1758 Ok(CommandResult::Ok)
1759 }
1760
1761 Command::RemoveDevice { id } => {
1762 let (ic, sid) = require_integration(&integration_guard, site_id, "RemoveDevice")?;
1763 let device_uuid = require_uuid(&id)?;
1764 ic.remove_device(&sid, &device_uuid).await?;
1765 Ok(CommandResult::Ok)
1766 }
1767
1768 Command::ProvisionDevice { mac } => {
1769 let legacy = require_legacy(&legacy_guard)?;
1770 legacy.provision_device(mac.as_str()).await?;
1771 Ok(CommandResult::Ok)
1772 }
1773 Command::SpeedtestDevice => {
1774 let legacy = require_legacy(&legacy_guard)?;
1775 legacy.speedtest().await?;
1776 Ok(CommandResult::Ok)
1777 }
1778
1779 Command::PowerCyclePort {
1780 device_id,
1781 port_idx,
1782 } => {
1783 let (ic, sid) = require_integration(&integration_guard, site_id, "PowerCyclePort")?;
1784 let device_uuid = require_uuid(&device_id)?;
1785 ic.port_action(&sid, &device_uuid, port_idx, "POWER_CYCLE")
1786 .await?;
1787 Ok(CommandResult::Ok)
1788 }
1789
1790 Command::BlockClient { mac } => {
1792 if let (Some(ic), Some(sid)) = (integration_guard.as_ref(), site_id) {
1793 let client =
1794 store
1795 .client_by_mac(&mac)
1796 .ok_or_else(|| CoreError::ClientNotFound {
1797 identifier: mac.to_string(),
1798 })?;
1799 let client_uuid = require_uuid(&client.id)?;
1800 ic.client_action(&sid, &client_uuid, "BLOCK").await?;
1801 } else {
1802 let legacy = require_legacy(&legacy_guard)?;
1803 legacy.block_client(mac.as_str()).await?;
1804 }
1805 Ok(CommandResult::Ok)
1806 }
1807
1808 Command::UnblockClient { mac } => {
1809 if let (Some(ic), Some(sid)) = (integration_guard.as_ref(), site_id) {
1810 let client =
1811 store
1812 .client_by_mac(&mac)
1813 .ok_or_else(|| CoreError::ClientNotFound {
1814 identifier: mac.to_string(),
1815 })?;
1816 let client_uuid = require_uuid(&client.id)?;
1817 ic.client_action(&sid, &client_uuid, "UNBLOCK").await?;
1818 } else {
1819 let legacy = require_legacy(&legacy_guard)?;
1820 legacy.unblock_client(mac.as_str()).await?;
1821 }
1822 Ok(CommandResult::Ok)
1823 }
1824
1825 Command::KickClient { mac } => {
1826 if let (Some(ic), Some(sid)) = (integration_guard.as_ref(), site_id) {
1827 let client =
1828 store
1829 .client_by_mac(&mac)
1830 .ok_or_else(|| CoreError::ClientNotFound {
1831 identifier: mac.to_string(),
1832 })?;
1833 let client_uuid = require_uuid(&client.id)?;
1834 ic.client_action(&sid, &client_uuid, "RECONNECT").await?;
1835 } else {
1836 let legacy = require_legacy(&legacy_guard)?;
1837 legacy.kick_client(mac.as_str()).await?;
1838 }
1839 Ok(CommandResult::Ok)
1840 }
1841
1842 Command::ForgetClient { mac } => {
1843 let legacy = require_legacy(&legacy_guard)?;
1844 legacy.forget_client(mac.as_str()).await?;
1845 Ok(CommandResult::Ok)
1846 }
1847
1848 Command::AuthorizeGuest {
1849 client_id,
1850 time_limit_minutes,
1851 data_limit_mb,
1852 rx_rate_kbps,
1853 tx_rate_kbps,
1854 } => {
1855 let legacy = require_legacy(&legacy_guard)?;
1856 let mac = client_mac(store, &client_id)?;
1857 let minutes = time_limit_minutes.unwrap_or(60);
1858 #[allow(clippy::as_conversions, clippy::cast_possible_truncation)]
1859 {
1860 legacy
1861 .authorize_guest(
1862 mac.as_str(),
1863 minutes,
1864 tx_rate_kbps.map(|r| r as u32),
1865 rx_rate_kbps.map(|r| r as u32),
1866 data_limit_mb.map(|m| m as u32),
1867 )
1868 .await?;
1869 }
1870 Ok(CommandResult::Ok)
1871 }
1872
1873 Command::UnauthorizeGuest { client_id } => {
1874 let legacy = require_legacy(&legacy_guard)?;
1875 let mac = client_mac(store, &client_id)?;
1876 legacy.unauthorize_guest(mac.as_str()).await?;
1877 Ok(CommandResult::Ok)
1878 }
1879
1880 Command::SetClientFixedIp {
1881 mac,
1882 ip,
1883 network_id,
1884 } => {
1885 let legacy = require_legacy(&legacy_guard)?;
1886 legacy
1887 .set_client_fixed_ip(mac.as_str(), &ip.to_string(), &network_id.to_string())
1888 .await?;
1889 Ok(CommandResult::Ok)
1890 }
1891
1892 Command::RemoveClientFixedIp { mac } => {
1893 let legacy = require_legacy(&legacy_guard)?;
1894 legacy.remove_client_fixed_ip(mac.as_str()).await?;
1895 Ok(CommandResult::Ok)
1896 }
1897
1898 Command::ArchiveAlarm { id } => {
1900 let legacy = require_legacy(&legacy_guard)?;
1901 legacy.archive_alarm(&id.to_string()).await?;
1902 Ok(CommandResult::Ok)
1903 }
1904
1905 Command::ArchiveAllAlarms => {
1906 let legacy = require_legacy(&legacy_guard)?;
1907 legacy.archive_all_alarms().await?;
1908 Ok(CommandResult::Ok)
1909 }
1910
1911 Command::CreateBackup => {
1913 let legacy = require_legacy(&legacy_guard)?;
1914 legacy.create_backup().await?;
1915 Ok(CommandResult::Ok)
1916 }
1917
1918 Command::DeleteBackup { filename } => {
1919 let legacy = require_legacy(&legacy_guard)?;
1920 legacy.delete_backup(&filename).await?;
1921 Ok(CommandResult::Ok)
1922 }
1923
1924 Command::CreateNetwork(req) => {
1926 let (ic, sid) = require_integration(&integration_guard, site_id, "CreateNetwork")?;
1927 let crate::command::CreateNetworkRequest {
1928 name,
1929 vlan_id,
1930 subnet,
1931 management,
1932 purpose,
1933 dhcp_enabled,
1934 enabled,
1935 dhcp_range_start,
1936 dhcp_range_stop,
1937 dhcp_lease_time,
1938 firewall_zone_id,
1939 isolation_enabled,
1940 internet_access_enabled,
1941 } = req;
1942
1943 let management = management.unwrap_or_else(|| {
1944 if matches!(purpose, Some(NetworkPurpose::VlanOnly)) {
1945 NetworkManagement::Unmanaged
1946 } else if purpose.is_some() || subnet.is_some() || dhcp_enabled {
1947 NetworkManagement::Gateway
1948 } else {
1949 NetworkManagement::Unmanaged
1950 }
1951 });
1952 let mut extra = HashMap::new();
1953
1954 if let Some(zone) = firewall_zone_id {
1955 extra.insert("zoneId".into(), serde_json::Value::String(zone));
1956 }
1957
1958 if matches!(management, NetworkManagement::Gateway) {
1959 extra.insert(
1960 "isolationEnabled".into(),
1961 serde_json::Value::Bool(isolation_enabled),
1962 );
1963 extra.insert(
1964 "internetAccessEnabled".into(),
1965 serde_json::Value::Bool(internet_access_enabled),
1966 );
1967
1968 if let Some(cidr) = subnet {
1969 let (host_ip, prefix_len) = parse_ipv4_cidr(&cidr)?;
1970 let mut dhcp_cfg = serde_json::Map::new();
1971 dhcp_cfg.insert(
1972 "mode".into(),
1973 serde_json::Value::String(
1974 if dhcp_enabled { "SERVER" } else { "NONE" }.into(),
1975 ),
1976 );
1977 if let Some(lease) = dhcp_lease_time {
1978 dhcp_cfg.insert(
1979 "leaseTimeSeconds".into(),
1980 serde_json::Value::Number(serde_json::Number::from(u64::from(lease))),
1981 );
1982 }
1983
1984 if let (Some(start), Some(stop)) = (dhcp_range_start, dhcp_range_stop) {
1985 dhcp_cfg.insert(
1986 "ipAddressRange".into(),
1987 serde_json::json!({
1988 "start": start,
1989 "end": stop
1990 }),
1991 );
1992 }
1993
1994 extra.insert(
1995 "ipv4Configuration".into(),
1996 serde_json::json!({
1997 "hostIpAddress": host_ip.to_string(),
1998 "prefixLength": u64::from(prefix_len),
1999 "dhcpConfiguration": dhcp_cfg
2000 }),
2001 );
2002 }
2003 }
2004
2005 let body = crate::integration_types::NetworkCreateUpdate {
2006 name,
2007 enabled,
2008 management: "USER_DEFINED".into(),
2009 vlan_id: vlan_id.map_or(1, i32::from),
2010 dhcp_guarding: None,
2011 extra,
2012 };
2013 ic.create_network(&sid, &body).await?;
2014 Ok(CommandResult::Ok)
2015 }
2016
2017 Command::UpdateNetwork { id, update } => {
2018 let (ic, sid) = require_integration(&integration_guard, site_id, "UpdateNetwork")?;
2019 let uuid = require_uuid(&id)?;
2020 let existing = ic.get_network(&sid, &uuid).await?;
2022 let mut extra = existing.extra;
2024 if let Some(v) = update.isolation_enabled {
2025 extra.insert("isolationEnabled".into(), serde_json::Value::Bool(v));
2026 }
2027 if let Some(v) = update.internet_access_enabled {
2028 extra.insert("internetAccessEnabled".into(), serde_json::Value::Bool(v));
2029 }
2030 if let Some(v) = update.mdns_forwarding_enabled {
2031 extra.insert("mdnsForwardingEnabled".into(), serde_json::Value::Bool(v));
2032 }
2033 if let Some(v) = update.ipv6_enabled {
2034 if v {
2035 extra
2037 .entry("ipv6Configuration".into())
2038 .or_insert_with(|| serde_json::json!({ "type": "PREFIX_DELEGATION" }));
2039 } else {
2040 extra.remove("ipv6Configuration");
2041 }
2042 }
2043 let body = crate::integration_types::NetworkCreateUpdate {
2044 name: update.name.unwrap_or(existing.name),
2045 enabled: update.enabled.unwrap_or(existing.enabled),
2046 management: existing.management,
2047 vlan_id: update.vlan_id.map_or(existing.vlan_id, i32::from),
2048 dhcp_guarding: existing.dhcp_guarding,
2049 extra,
2050 };
2051 ic.update_network(&sid, &uuid, &body).await?;
2052 Ok(CommandResult::Ok)
2053 }
2054
2055 Command::DeleteNetwork { id, force: _ } => {
2056 let (ic, sid) = require_integration(&integration_guard, site_id, "DeleteNetwork")?;
2057 let uuid = require_uuid(&id)?;
2058 ic.delete_network(&sid, &uuid).await?;
2059 Ok(CommandResult::Ok)
2060 }
2061
2062 Command::CreateWifiBroadcast(req) => {
2064 let (ic, sid) =
2065 require_integration(&integration_guard, site_id, "CreateWifiBroadcast")?;
2066 let body = build_create_wifi_broadcast_payload(&req);
2067 ic.create_wifi_broadcast(&sid, &body).await?;
2068 Ok(CommandResult::Ok)
2069 }
2070
2071 Command::UpdateWifiBroadcast { id, update } => {
2072 let (ic, sid) =
2073 require_integration(&integration_guard, site_id, "UpdateWifiBroadcast")?;
2074 let uuid = require_uuid(&id)?;
2075 let existing = ic.get_wifi_broadcast(&sid, &uuid).await?;
2076 let payload = build_update_wifi_broadcast_payload(&existing, &update);
2077 ic.update_wifi_broadcast(&sid, &uuid, &payload).await?;
2078 Ok(CommandResult::Ok)
2079 }
2080
2081 Command::DeleteWifiBroadcast { id, force: _ } => {
2082 let (ic, sid) =
2083 require_integration(&integration_guard, site_id, "DeleteWifiBroadcast")?;
2084 let uuid = require_uuid(&id)?;
2085 ic.delete_wifi_broadcast(&sid, &uuid).await?;
2086 Ok(CommandResult::Ok)
2087 }
2088
2089 Command::CreateFirewallPolicy(req) => {
2091 let (ic, sid) =
2092 require_integration(&integration_guard, site_id, "CreateFirewallPolicy")?;
2093 let action_str = match req.action {
2094 FirewallAction::Allow => "ALLOW",
2095 FirewallAction::Block => "DROP",
2096 FirewallAction::Reject => "REJECT",
2097 };
2098 let source =
2099 build_endpoint_json(&req.source_zone_id.to_string(), req.source_filter.as_ref());
2100 let destination = build_endpoint_json(
2101 &req.destination_zone_id.to_string(),
2102 req.destination_filter.as_ref(),
2103 );
2104 let ip_version = req.ip_version.as_deref().unwrap_or("IPV4_AND_IPV6");
2105 let body = crate::integration_types::FirewallPolicyCreateUpdate {
2106 name: req.name,
2107 description: req.description,
2108 enabled: req.enabled,
2109 action: serde_json::json!({ "type": action_str }),
2110 source,
2111 destination,
2112 ip_protocol_scope: serde_json::json!({ "ipVersion": ip_version }),
2113 logging_enabled: req.logging_enabled,
2114 ipsec_filter: None,
2115 schedule: None,
2116 connection_state_filter: req.connection_states,
2117 };
2118 ic.create_firewall_policy(&sid, &body).await?;
2119 Ok(CommandResult::Ok)
2120 }
2121
2122 Command::UpdateFirewallPolicy { id, update } => {
2123 let (ic, sid) =
2124 require_integration(&integration_guard, site_id, "UpdateFirewallPolicy")?;
2125 let uuid = require_uuid(&id)?;
2126 let existing = ic.get_firewall_policy(&sid, &uuid).await?;
2127
2128 let source = if let Some(ref spec) = update.source_filter {
2130 let zone_id = existing
2131 .source
2132 .as_ref()
2133 .and_then(|s| s.zone_id)
2134 .map(|u| u.to_string())
2135 .unwrap_or_default();
2136 build_endpoint_json(&zone_id, Some(spec))
2137 } else {
2138 serde_json::to_value(&existing.source).unwrap_or_default()
2139 };
2140
2141 let destination = if let Some(ref spec) = update.destination_filter {
2143 let zone_id = existing
2144 .destination
2145 .as_ref()
2146 .and_then(|d| d.zone_id)
2147 .map(|u| u.to_string())
2148 .unwrap_or_default();
2149 build_endpoint_json(&zone_id, Some(spec))
2150 } else {
2151 serde_json::to_value(&existing.destination).unwrap_or_default()
2152 };
2153
2154 let action = if let Some(action) = update.action {
2155 let action_type = match action {
2156 FirewallAction::Allow => "ALLOW",
2157 FirewallAction::Block => "DROP",
2158 FirewallAction::Reject => "REJECT",
2159 };
2160 serde_json::json!({ "type": action_type })
2161 } else {
2162 existing.action
2163 };
2164
2165 let ip_protocol_scope = if let Some(ref version) = update.ip_version {
2166 serde_json::json!({ "ipVersion": version })
2167 } else {
2168 existing
2169 .ip_protocol_scope
2170 .unwrap_or_else(|| serde_json::json!({ "ipVersion": "IPV4_AND_IPV6" }))
2171 };
2172
2173 let connection_state_filter = update.connection_states.or_else(|| {
2174 existing
2175 .extra
2176 .get("connectionStateFilter")
2177 .and_then(serde_json::Value::as_array)
2178 .map(|arr| {
2179 arr.iter()
2180 .filter_map(|v| v.as_str().map(ToOwned::to_owned))
2181 .collect::<Vec<_>>()
2182 })
2183 });
2184
2185 let payload = crate::integration_types::FirewallPolicyCreateUpdate {
2186 name: update.name.unwrap_or(existing.name),
2187 description: update.description.or(existing.description),
2188 enabled: update.enabled.unwrap_or(existing.enabled),
2189 action,
2190 source,
2191 destination,
2192 ip_protocol_scope,
2193 logging_enabled: update.logging_enabled.unwrap_or(existing.logging_enabled),
2194 ipsec_filter: existing
2195 .extra
2196 .get("ipsecFilter")
2197 .and_then(serde_json::Value::as_str)
2198 .map(ToOwned::to_owned),
2199 schedule: existing.extra.get("schedule").cloned(),
2200 connection_state_filter,
2201 };
2202
2203 ic.update_firewall_policy(&sid, &uuid, &payload).await?;
2204 Ok(CommandResult::Ok)
2205 }
2206
2207 Command::DeleteFirewallPolicy { id } => {
2208 let (ic, sid) =
2209 require_integration(&integration_guard, site_id, "DeleteFirewallPolicy")?;
2210 let uuid = require_uuid(&id)?;
2211 ic.delete_firewall_policy(&sid, &uuid).await?;
2212 Ok(CommandResult::Ok)
2213 }
2214
2215 Command::PatchFirewallPolicy {
2216 id,
2217 enabled,
2218 logging,
2219 } => {
2220 let (ic, sid) =
2221 require_integration(&integration_guard, site_id, "PatchFirewallPolicy")?;
2222 let uuid = require_uuid(&id)?;
2223 let body = crate::integration_types::FirewallPolicyPatch {
2224 enabled,
2225 logging_enabled: logging,
2226 };
2227 ic.patch_firewall_policy(&sid, &uuid, &body).await?;
2228 Ok(CommandResult::Ok)
2229 }
2230
2231 Command::ReorderFirewallPolicies {
2232 zone_pair: _,
2233 ordered_ids,
2234 } => {
2235 let (ic, sid) =
2236 require_integration(&integration_guard, site_id, "ReorderFirewallPolicies")?;
2237 let uuids: Result<Vec<uuid::Uuid>, _> = ordered_ids.iter().map(require_uuid).collect();
2238 let body = crate::integration_types::FirewallPolicyOrdering {
2239 before_system_defined: uuids?,
2240 after_system_defined: Vec::new(),
2241 };
2242 ic.set_firewall_policy_ordering(&sid, &body).await?;
2243 Ok(CommandResult::Ok)
2244 }
2245
2246 Command::CreateFirewallZone(req) => {
2248 let (ic, sid) = require_integration(&integration_guard, site_id, "CreateFirewallZone")?;
2249 let network_uuids: Result<Vec<uuid::Uuid>, _> =
2250 req.network_ids.iter().map(require_uuid).collect();
2251 let body = crate::integration_types::FirewallZoneCreateUpdate {
2252 name: req.name,
2253 network_ids: network_uuids?,
2254 };
2255 ic.create_firewall_zone(&sid, &body).await?;
2256 Ok(CommandResult::Ok)
2257 }
2258
2259 Command::UpdateFirewallZone { id, update } => {
2260 let (ic, sid) = require_integration(&integration_guard, site_id, "UpdateFirewallZone")?;
2261 let uuid = require_uuid(&id)?;
2262 let existing = ic.get_firewall_zone(&sid, &uuid).await?;
2263 let network_ids = if let Some(ids) = update.network_ids {
2264 let uuids: Result<Vec<uuid::Uuid>, _> = ids.iter().map(require_uuid).collect();
2265 uuids?
2266 } else {
2267 existing.network_ids
2268 };
2269 let body = crate::integration_types::FirewallZoneCreateUpdate {
2270 name: update.name.unwrap_or(existing.name),
2271 network_ids,
2272 };
2273 ic.update_firewall_zone(&sid, &uuid, &body).await?;
2274 Ok(CommandResult::Ok)
2275 }
2276
2277 Command::DeleteFirewallZone { id } => {
2278 let (ic, sid) = require_integration(&integration_guard, site_id, "DeleteFirewallZone")?;
2279 let uuid = require_uuid(&id)?;
2280 ic.delete_firewall_zone(&sid, &uuid).await?;
2281 Ok(CommandResult::Ok)
2282 }
2283
2284 Command::CreateAclRule(req) => {
2286 let (ic, sid) = require_integration(&integration_guard, site_id, "CreateAclRule")?;
2287 let action_str = match req.action {
2288 FirewallAction::Allow => "ALLOW",
2289 FirewallAction::Block => "BLOCK",
2290 FirewallAction::Reject => "REJECT",
2291 };
2292 let mut source_filter = serde_json::Map::new();
2293 source_filter.insert(
2294 "zoneId".into(),
2295 serde_json::Value::String(req.source_zone_id.to_string()),
2296 );
2297 if let Some(source_port) = req.source_port {
2298 source_filter.insert("port".into(), serde_json::Value::String(source_port));
2299 }
2300 if let Some(protocol) = req.protocol.clone() {
2301 source_filter.insert("protocol".into(), serde_json::Value::String(protocol));
2302 }
2303
2304 let mut destination_filter = serde_json::Map::new();
2305 destination_filter.insert(
2306 "zoneId".into(),
2307 serde_json::Value::String(req.destination_zone_id.to_string()),
2308 );
2309 if let Some(destination_port) = req.destination_port {
2310 destination_filter
2311 .insert("port".into(), serde_json::Value::String(destination_port));
2312 }
2313 if let Some(protocol) = req.protocol {
2314 destination_filter.insert("protocol".into(), serde_json::Value::String(protocol));
2315 }
2316 let body = crate::integration_types::AclRuleCreateUpdate {
2317 name: req.name,
2318 rule_type: req.rule_type,
2319 action: action_str.into(),
2320 enabled: req.enabled,
2321 description: None,
2322 source_filter: Some(serde_json::Value::Object(source_filter)),
2323 destination_filter: Some(serde_json::Value::Object(destination_filter)),
2324 enforcing_device_filter: None,
2325 };
2326 ic.create_acl_rule(&sid, &body).await?;
2327 Ok(CommandResult::Ok)
2328 }
2329
2330 Command::UpdateAclRule { id, update } => {
2331 let (ic, sid) = require_integration(&integration_guard, site_id, "UpdateAclRule")?;
2332 let uuid = require_uuid(&id)?;
2333 let existing = ic.get_acl_rule(&sid, &uuid).await?;
2334 let action_str = match update.action {
2335 Some(FirewallAction::Allow) => "ALLOW".into(),
2336 Some(FirewallAction::Block) => "BLOCK".into(),
2337 Some(FirewallAction::Reject) => "REJECT".into(),
2338 None => existing.action,
2339 };
2340 let body = crate::integration_types::AclRuleCreateUpdate {
2341 name: update.name.unwrap_or(existing.name),
2342 rule_type: existing.rule_type,
2343 action: action_str,
2344 enabled: update.enabled.unwrap_or(existing.enabled),
2345 description: existing.description,
2346 source_filter: existing.source_filter,
2347 destination_filter: existing.destination_filter,
2348 enforcing_device_filter: existing.enforcing_device_filter,
2349 };
2350 ic.update_acl_rule(&sid, &uuid, &body).await?;
2351 Ok(CommandResult::Ok)
2352 }
2353
2354 Command::DeleteAclRule { id } => {
2355 let (ic, sid) = require_integration(&integration_guard, site_id, "DeleteAclRule")?;
2356 let uuid = require_uuid(&id)?;
2357 ic.delete_acl_rule(&sid, &uuid).await?;
2358 Ok(CommandResult::Ok)
2359 }
2360
2361 Command::ReorderAclRules { ordered_ids } => {
2362 let (ic, sid) = require_integration(&integration_guard, site_id, "ReorderAclRules")?;
2363 let uuids: Result<Vec<uuid::Uuid>, _> = ordered_ids.iter().map(require_uuid).collect();
2364 let body = crate::integration_types::AclRuleOrdering {
2365 ordered_acl_rule_ids: uuids?,
2366 };
2367 ic.set_acl_rule_ordering(&sid, &body).await?;
2368 Ok(CommandResult::Ok)
2369 }
2370
2371 Command::CreateDnsPolicy(req) => {
2373 let (ic, sid) = require_integration(&integration_guard, site_id, "CreateDnsPolicy")?;
2374 let policy_type_str = dns_policy_type_name(req.policy_type);
2375 let fields = build_create_dns_policy_fields(&req)?;
2376 let body = crate::integration_types::DnsPolicyCreateUpdate {
2377 policy_type: policy_type_str.to_owned(),
2378 enabled: req.enabled,
2379 fields,
2380 };
2381 ic.create_dns_policy(&sid, &body).await?;
2382 Ok(CommandResult::Ok)
2383 }
2384
2385 Command::UpdateDnsPolicy { id, update } => {
2386 let (ic, sid) = require_integration(&integration_guard, site_id, "UpdateDnsPolicy")?;
2387 let uuid = require_uuid(&id)?;
2388 let existing = ic.get_dns_policy(&sid, &uuid).await?;
2389 let fields = build_update_dns_policy_fields(&existing, &update)?;
2390
2391 let body = crate::integration_types::DnsPolicyCreateUpdate {
2392 policy_type: existing.policy_type,
2393 enabled: update.enabled.unwrap_or(existing.enabled),
2394 fields,
2395 };
2396 ic.update_dns_policy(&sid, &uuid, &body).await?;
2397 Ok(CommandResult::Ok)
2398 }
2399
2400 Command::DeleteDnsPolicy { id } => {
2401 let (ic, sid) = require_integration(&integration_guard, site_id, "DeleteDnsPolicy")?;
2402 let uuid = require_uuid(&id)?;
2403 ic.delete_dns_policy(&sid, &uuid).await?;
2404 Ok(CommandResult::Ok)
2405 }
2406
2407 Command::CreateTrafficMatchingList(req) => {
2409 let (ic, sid) =
2410 require_integration(&integration_guard, site_id, "CreateTrafficMatchingList")?;
2411 let mut fields = serde_json::Map::new();
2412 fields.insert(
2413 "items".into(),
2414 serde_json::Value::Array(traffic_matching_list_items(
2415 &req.entries,
2416 req.raw_items.as_deref(),
2417 )),
2418 );
2419 if let Some(desc) = req.description {
2420 fields.insert("description".into(), serde_json::Value::String(desc));
2421 }
2422 let body = crate::integration_types::TrafficMatchingListCreateUpdate {
2423 name: req.name,
2424 list_type: req.list_type,
2425 fields,
2426 };
2427 ic.create_traffic_matching_list(&sid, &body).await?;
2428 Ok(CommandResult::Ok)
2429 }
2430
2431 Command::UpdateTrafficMatchingList { id, update } => {
2432 let (ic, sid) =
2433 require_integration(&integration_guard, site_id, "UpdateTrafficMatchingList")?;
2434 let uuid = require_uuid(&id)?;
2435 let existing = ic.get_traffic_matching_list(&sid, &uuid).await?;
2436 let mut fields = serde_json::Map::new();
2437 let entries = if let Some(raw_items) = update.raw_items.as_deref() {
2438 serde_json::Value::Array(raw_items.to_vec())
2439 } else if let Some(new_entries) = &update.entries {
2440 serde_json::Value::Array(traffic_matching_list_items(new_entries, None))
2441 } else if let Some(existing_entries) = existing.extra.get("items") {
2442 existing_entries.clone()
2443 } else if let Some(existing_entries) = existing.extra.get("entries") {
2444 existing_entries.clone()
2445 } else {
2446 serde_json::Value::Array(Vec::new())
2447 };
2448 fields.insert("items".into(), entries);
2449 if let Some(desc) = update.description {
2450 fields.insert("description".into(), serde_json::Value::String(desc));
2451 } else if let Some(existing_desc) = existing.extra.get("description") {
2452 fields.insert("description".into(), existing_desc.clone());
2453 }
2454 let body = crate::integration_types::TrafficMatchingListCreateUpdate {
2455 name: update.name.unwrap_or(existing.name),
2456 list_type: existing.list_type,
2457 fields,
2458 };
2459 ic.update_traffic_matching_list(&sid, &uuid, &body).await?;
2460 Ok(CommandResult::Ok)
2461 }
2462
2463 Command::DeleteTrafficMatchingList { id } => {
2464 let (ic, sid) =
2465 require_integration(&integration_guard, site_id, "DeleteTrafficMatchingList")?;
2466 let uuid = require_uuid(&id)?;
2467 ic.delete_traffic_matching_list(&sid, &uuid).await?;
2468 Ok(CommandResult::Ok)
2469 }
2470
2471 Command::CreateVouchers(req) => {
2473 let (ic, sid) = require_integration(&integration_guard, site_id, "CreateVouchers")?;
2474 #[allow(clippy::as_conversions, clippy::cast_possible_wrap)]
2475 let body = crate::integration_types::VoucherCreateRequest {
2476 name: req.name.unwrap_or_else(|| "Voucher".into()),
2477 count: Some(req.count as i32),
2478 time_limit_minutes: i64::from(req.time_limit_minutes.unwrap_or(60)),
2479 authorized_guest_limit: req.authorized_guest_limit.map(i64::from),
2480 data_usage_limit_m_bytes: req.data_usage_limit_mb.map(|m| m as i64),
2481 rx_rate_limit_kbps: req.rx_rate_limit_kbps.map(|r| r as i64),
2482 tx_rate_limit_kbps: req.tx_rate_limit_kbps.map(|r| r as i64),
2483 };
2484 let vouchers = ic.create_vouchers(&sid, &body).await?;
2485 let domain_vouchers: Vec<Voucher> = vouchers.into_iter().map(Voucher::from).collect();
2486 Ok(CommandResult::Vouchers(domain_vouchers))
2487 }
2488
2489 Command::DeleteVoucher { id } => {
2490 let (ic, sid) = require_integration(&integration_guard, site_id, "DeleteVoucher")?;
2491 let uuid = require_uuid(&id)?;
2492 ic.delete_voucher(&sid, &uuid).await?;
2493 Ok(CommandResult::Ok)
2494 }
2495
2496 Command::PurgeVouchers { filter } => {
2497 let (ic, sid) = require_integration(&integration_guard, site_id, "PurgeVouchers")?;
2498 ic.purge_vouchers(&sid, &filter).await?;
2499 Ok(CommandResult::Ok)
2500 }
2501
2502 Command::CreateSite { name, description } => {
2504 let legacy = require_legacy(&legacy_guard)?;
2505 legacy.create_site(&name, &description).await?;
2506 Ok(CommandResult::Ok)
2507 }
2508 Command::DeleteSite { name } => {
2509 let legacy = require_legacy(&legacy_guard)?;
2510 legacy.delete_site(&name).await?;
2511 Ok(CommandResult::Ok)
2512 }
2513 Command::InviteAdmin { name, email, role } => {
2514 let legacy = require_legacy(&legacy_guard)?;
2515 legacy.invite_admin(&name, &email, &role).await?;
2516 Ok(CommandResult::Ok)
2517 }
2518 Command::RevokeAdmin { id } => {
2519 let legacy = require_legacy(&legacy_guard)?;
2520 legacy.revoke_admin(&id.to_string()).await?;
2521 Ok(CommandResult::Ok)
2522 }
2523 Command::UpdateAdmin { id, role } => {
2524 let legacy = require_legacy(&legacy_guard)?;
2525 legacy
2526 .update_admin(&id.to_string(), role.as_deref())
2527 .await?;
2528 Ok(CommandResult::Ok)
2529 }
2530
2531 Command::RebootController => {
2532 let legacy = require_legacy(&legacy_guard)?;
2533 legacy.reboot_controller().await?;
2534 Ok(CommandResult::Ok)
2535 }
2536 Command::PoweroffController => {
2537 let legacy = require_legacy(&legacy_guard)?;
2538 legacy.poweroff_controller().await?;
2539 Ok(CommandResult::Ok)
2540 }
2541 }
2542}
2543
2544fn parse_ipv6_text(raw: &str) -> Option<Ipv6Addr> {
2547 let candidate = raw.trim().split('/').next().unwrap_or(raw).trim();
2548 candidate.parse::<Ipv6Addr>().ok()
2549}
2550
2551fn pick_ipv6_from_value(value: &serde_json::Value) -> Option<String> {
2552 let mut first_link_local: Option<String> = None;
2553
2554 let iter: Box<dyn Iterator<Item = &serde_json::Value> + '_> = match value {
2555 serde_json::Value::Array(items) => Box::new(items.iter()),
2556 _ => Box::new(std::iter::once(value)),
2557 };
2558
2559 for item in iter {
2560 if let Some(ipv6) = item.as_str().and_then(parse_ipv6_text) {
2561 let ip_text = ipv6.to_string();
2562 if !ipv6.is_unicast_link_local() {
2563 return Some(ip_text);
2564 }
2565 if first_link_local.is_none() {
2566 first_link_local = Some(ip_text);
2567 }
2568 }
2569 }
2570
2571 first_link_local
2572}
2573
2574fn parse_legacy_device_wan_ipv6(
2575 extra: &serde_json::Map<String, serde_json::Value>,
2576) -> Option<String> {
2577 if let Some(v) = extra
2579 .get("wan1")
2580 .and_then(|wan| wan.get("ipv6"))
2581 .and_then(pick_ipv6_from_value)
2582 {
2583 return Some(v);
2584 }
2585
2586 extra.get("ipv6").and_then(pick_ipv6_from_value)
2588}
2589
2590fn convert_health_summaries(raw: Vec<serde_json::Value>) -> Vec<HealthSummary> {
2592 raw.into_iter()
2593 .map(|v| HealthSummary {
2594 subsystem: v
2595 .get("subsystem")
2596 .and_then(|v| v.as_str())
2597 .unwrap_or("unknown")
2598 .to_owned(),
2599 status: v
2600 .get("status")
2601 .and_then(|v| v.as_str())
2602 .unwrap_or("unknown")
2603 .to_owned(),
2604 #[allow(clippy::as_conversions, clippy::cast_possible_truncation)]
2605 num_adopted: v
2606 .get("num_adopted")
2607 .and_then(serde_json::Value::as_u64)
2608 .map(|n| n as u32),
2609 #[allow(clippy::as_conversions, clippy::cast_possible_truncation)]
2610 num_sta: v
2611 .get("num_sta")
2612 .and_then(serde_json::Value::as_u64)
2613 .map(|n| n as u32),
2614 tx_bytes_r: v.get("tx_bytes-r").and_then(serde_json::Value::as_u64),
2615 rx_bytes_r: v.get("rx_bytes-r").and_then(serde_json::Value::as_u64),
2616 latency: v.get("latency").and_then(serde_json::Value::as_f64),
2617 wan_ip: v.get("wan_ip").and_then(|v| v.as_str()).map(String::from),
2618 gateways: v.get("gateways").and_then(|v| v.as_array()).map(|a| {
2619 a.iter()
2620 .filter_map(|g| g.as_str().map(String::from))
2621 .collect()
2622 }),
2623 extra: v,
2624 })
2625 .collect()
2626}
2627
2628fn build_transport(config: &ControllerConfig) -> TransportConfig {
2630 TransportConfig {
2631 tls: tls_to_transport(&config.tls),
2632 timeout: config.timeout,
2633 cookie_jar: None, }
2635}
2636
2637fn tls_to_transport(tls: &TlsVerification) -> TlsMode {
2638 match tls {
2639 TlsVerification::SystemDefaults => TlsMode::System,
2640 TlsVerification::CustomCa(path) => TlsMode::CustomCa(path.clone()),
2641 TlsVerification::DangerAcceptInvalid => TlsMode::DangerAcceptInvalid,
2642 }
2643}
2644
2645fn unwrap_or_empty<S, D>(endpoint: &str, result: Result<Vec<S>, crate::error::Error>) -> Vec<D>
2651where
2652 D: From<S>,
2653{
2654 match result {
2655 Ok(items) => items.into_iter().map(D::from).collect(),
2656 Err(ref e) if e.is_not_found() => {
2657 debug!("{endpoint}: not available (404), treating as empty");
2658 Vec::new()
2659 }
2660 Err(e) => {
2661 warn!("{endpoint}: unexpected error {e}, treating as empty");
2662 Vec::new()
2663 }
2664 }
2665}
2666
2667async fn resolve_site_id(
2672 client: &IntegrationClient,
2673 site_name: &str,
2674) -> Result<uuid::Uuid, CoreError> {
2675 if let Ok(uuid) = uuid::Uuid::parse_str(site_name) {
2677 return Ok(uuid);
2678 }
2679
2680 let sites = client
2681 .paginate_all(50, |off, lim| client.list_sites(off, lim))
2682 .await?;
2683
2684 sites
2685 .into_iter()
2686 .find(|s| s.internal_reference == site_name)
2687 .map(|s| s.id)
2688 .ok_or_else(|| CoreError::SiteNotFound {
2689 name: site_name.to_owned(),
2690 })
2691}
2692
2693fn parse_ipv4_cidr(cidr: &str) -> Result<(Ipv4Addr, u8), CoreError> {
2694 let (host, prefix) = cidr
2695 .split_once('/')
2696 .ok_or_else(|| CoreError::ValidationFailed {
2697 message: format!("invalid ipv4 host/prefix value '{cidr}'"),
2698 })?;
2699 let host_ip = host
2700 .parse::<Ipv4Addr>()
2701 .map_err(|_| CoreError::ValidationFailed {
2702 message: format!("invalid IPv4 host address '{host}'"),
2703 })?;
2704 let prefix_len = prefix
2705 .parse::<u8>()
2706 .map_err(|_| CoreError::ValidationFailed {
2707 message: format!("invalid IPv4 prefix length '{prefix}'"),
2708 })?;
2709 if prefix_len > 32 {
2710 return Err(CoreError::ValidationFailed {
2711 message: format!("IPv4 prefix length must be <= 32, got {prefix_len}"),
2712 });
2713 }
2714 Ok((host_ip, prefix_len))
2715}
2716
2717fn require_uuid(id: &EntityId) -> Result<uuid::Uuid, CoreError> {
2719 id.as_uuid().copied().ok_or_else(|| CoreError::Unsupported {
2720 operation: "Integration API operation on legacy ID".into(),
2721 required: "UUID-based entity ID".into(),
2722 })
2723}
2724
2725fn require_legacy<'a>(
2726 guard: &'a tokio::sync::MutexGuard<'_, Option<LegacyClient>>,
2727) -> Result<&'a LegacyClient, CoreError> {
2728 guard.as_ref().ok_or_else(|| CoreError::Unsupported {
2729 operation: "Legacy API operation".into(),
2730 required: "Legacy API credentials".into(),
2731 })
2732}
2733
2734fn require_integration<'a>(
2735 guard: &'a tokio::sync::MutexGuard<'_, Option<IntegrationClient>>,
2736 site_id: Option<uuid::Uuid>,
2737 operation: &str,
2738) -> Result<(&'a IntegrationClient, uuid::Uuid), CoreError> {
2739 let client = guard.as_ref().ok_or_else(|| unsupported(operation))?;
2740 let sid = site_id.ok_or_else(|| unsupported(operation))?;
2741 Ok((client, sid))
2742}
2743
2744fn unsupported(operation: &str) -> CoreError {
2745 CoreError::Unsupported {
2746 operation: operation.into(),
2747 required: "Integration API".into(),
2748 }
2749}
2750
2751fn device_mac(store: &DataStore, id: &EntityId) -> Result<MacAddress, CoreError> {
2753 store
2754 .device_by_id(id)
2755 .map(|d| d.mac.clone())
2756 .ok_or_else(|| CoreError::DeviceNotFound {
2757 identifier: id.to_string(),
2758 })
2759}
2760
2761fn client_mac(store: &DataStore, id: &EntityId) -> Result<MacAddress, CoreError> {
2763 store
2764 .client_by_id(id)
2765 .map(|c| c.mac.clone())
2766 .ok_or_else(|| CoreError::ClientNotFound {
2767 identifier: id.to_string(),
2768 })
2769}
2770
2771fn build_endpoint_json(
2774 zone_id: &str,
2775 filter: Option<&crate::command::requests::TrafficFilterSpec>,
2776) -> serde_json::Value {
2777 use crate::command::requests::TrafficFilterSpec;
2778
2779 let mut obj = serde_json::json!({ "zoneId": zone_id });
2780
2781 if let Some(spec) = filter {
2782 let traffic_filter = match spec {
2783 TrafficFilterSpec::Network {
2784 network_ids,
2785 match_opposite,
2786 } => {
2787 serde_json::json!({
2788 "type": "NETWORK",
2789 "networkFilter": {
2790 "networkIds": network_ids,
2791 "matchOpposite": match_opposite,
2792 }
2793 })
2794 }
2795 TrafficFilterSpec::IpAddress {
2796 addresses,
2797 match_opposite,
2798 } => {
2799 let items: Vec<serde_json::Value> = addresses
2800 .iter()
2801 .map(|addr| {
2802 if addr.contains('/') {
2803 serde_json::json!({ "type": "SUBNET", "value": addr })
2804 } else if addr.contains('-') {
2805 let parts: Vec<&str> = addr.splitn(2, '-').collect();
2806 serde_json::json!({ "type": "RANGE", "start": parts[0], "stop": parts.get(1).unwrap_or(&"") })
2807 } else {
2808 serde_json::json!({ "type": "IP_ADDRESS", "value": addr })
2809 }
2810 })
2811 .collect();
2812 serde_json::json!({
2813 "type": "IP_ADDRESSES",
2814 "ipAddressFilter": {
2815 "type": "IP_ADDRESSES",
2816 "items": items,
2817 "matchOpposite": match_opposite,
2818 }
2819 })
2820 }
2821 TrafficFilterSpec::Port {
2822 ports,
2823 match_opposite,
2824 } => {
2825 let items: Vec<serde_json::Value> = ports
2826 .iter()
2827 .map(|p| {
2828 if p.contains('-') {
2829 let parts: Vec<&str> = p.splitn(2, '-').collect();
2830 serde_json::json!({ "type": "PORT_RANGE", "startPort": parts[0], "endPort": parts.get(1).unwrap_or(&"") })
2831 } else {
2832 serde_json::json!({ "type": "PORT_NUMBER", "value": p })
2833 }
2834 })
2835 .collect();
2836 serde_json::json!({
2837 "type": "PORT",
2838 "portFilter": {
2839 "type": "PORTS",
2840 "items": items,
2841 "matchOpposite": match_opposite,
2842 }
2843 })
2844 }
2845 };
2846 obj.as_object_mut()
2847 .expect("json! produces object")
2848 .insert("trafficFilter".into(), traffic_filter);
2849 }
2850
2851 obj
2852}
2853
2854fn wifi_security_mode_name(mode: crate::model::WifiSecurityMode) -> &'static str {
2855 match mode {
2856 crate::model::WifiSecurityMode::Open => "OPEN",
2857 crate::model::WifiSecurityMode::Wpa2Personal => "WPA2_PERSONAL",
2858 crate::model::WifiSecurityMode::Wpa3Personal => "WPA3_PERSONAL",
2859 crate::model::WifiSecurityMode::Wpa2Wpa3Personal => "WPA2_WPA3_PERSONAL",
2860 crate::model::WifiSecurityMode::Wpa2Enterprise => "WPA2_ENTERPRISE",
2861 crate::model::WifiSecurityMode::Wpa3Enterprise => "WPA3_ENTERPRISE",
2862 crate::model::WifiSecurityMode::Wpa2Wpa3Enterprise => "WPA2_WPA3_ENTERPRISE",
2863 }
2864}
2865
2866fn wifi_payload_name(name: &str, ssid: &str) -> String {
2867 if name.is_empty() {
2868 ssid.to_owned()
2869 } else {
2870 name.to_owned()
2871 }
2872}
2873
2874fn wifi_frequency_values(frequencies: &[f32]) -> Vec<serde_json::Value> {
2875 frequencies
2876 .iter()
2877 .map(|frequency| serde_json::Value::from(f64::from(*frequency)))
2878 .collect()
2879}
2880
2881fn ensure_wifi_payload_defaults(
2882 body: &mut serde_json::Map<String, serde_json::Value>,
2883 broadcast_type: &str,
2884) {
2885 body.entry("clientIsolationEnabled")
2886 .or_insert(serde_json::Value::Bool(false));
2887 body.entry("multicastToUnicastConversionEnabled")
2888 .or_insert(serde_json::Value::Bool(false));
2889 body.entry("hideName")
2890 .or_insert(serde_json::Value::Bool(false));
2891 body.entry("uapsdEnabled")
2892 .or_insert(serde_json::Value::Bool(true));
2893
2894 if broadcast_type == "STANDARD" {
2895 body.entry("broadcastingFrequenciesGHz")
2896 .or_insert_with(|| serde_json::Value::Array(wifi_frequency_values(&[2.4, 5.0])));
2897 body.entry("mloEnabled")
2898 .or_insert(serde_json::Value::Bool(false));
2899 body.entry("bandSteeringEnabled")
2900 .or_insert(serde_json::Value::Bool(false));
2901 body.entry("arpProxyEnabled")
2902 .or_insert(serde_json::Value::Bool(false));
2903 body.entry("bssTransitionEnabled")
2904 .or_insert(serde_json::Value::Bool(false));
2905 body.entry("advertiseDeviceName")
2906 .or_insert(serde_json::Value::Bool(false));
2907 }
2908}
2909
2910fn build_create_wifi_broadcast_payload(
2911 req: &crate::command::CreateWifiBroadcastRequest,
2912) -> crate::integration_types::WifiBroadcastCreateUpdate {
2913 let broadcast_type = req
2914 .broadcast_type
2915 .clone()
2916 .unwrap_or_else(|| "STANDARD".into());
2917
2918 let mut body = serde_json::Map::new();
2919 let mut security_configuration = serde_json::Map::new();
2920 security_configuration.insert(
2921 "mode".into(),
2922 serde_json::Value::String(wifi_security_mode_name(req.security_mode).into()),
2923 );
2924 if let Some(passphrase) = req.passphrase.clone() {
2925 security_configuration.insert("passphrase".into(), serde_json::Value::String(passphrase));
2926 }
2927 body.insert(
2928 "securityConfiguration".into(),
2929 serde_json::Value::Object(security_configuration),
2930 );
2931
2932 if let Some(network_id) = &req.network_id {
2933 body.insert(
2934 "network".into(),
2935 serde_json::json!({ "id": network_id.to_string() }),
2936 );
2937 }
2938 body.insert("hideName".into(), serde_json::Value::Bool(req.hide_ssid));
2939 if req.band_steering {
2940 body.insert("bandSteeringEnabled".into(), serde_json::Value::Bool(true));
2941 }
2942 if req.fast_roaming {
2943 body.insert("bssTransitionEnabled".into(), serde_json::Value::Bool(true));
2944 }
2945 if let Some(frequencies) = req.frequencies_ghz.as_ref() {
2946 body.insert(
2947 "broadcastingFrequenciesGHz".into(),
2948 serde_json::Value::Array(wifi_frequency_values(frequencies)),
2949 );
2950 }
2951 ensure_wifi_payload_defaults(&mut body, &broadcast_type);
2952
2953 crate::integration_types::WifiBroadcastCreateUpdate {
2954 name: wifi_payload_name(&req.name, &req.ssid),
2955 broadcast_type,
2956 enabled: req.enabled,
2957 body,
2958 }
2959}
2960
2961fn build_update_wifi_broadcast_payload(
2962 existing: &crate::integration_types::WifiBroadcastDetailsResponse,
2963 update: &crate::command::UpdateWifiBroadcastRequest,
2964) -> crate::integration_types::WifiBroadcastCreateUpdate {
2965 let mut body: serde_json::Map<String, serde_json::Value> =
2966 existing.extra.clone().into_iter().collect();
2967
2968 body.remove("ssid");
2969 body.remove("hideSsid");
2970 body.remove("bandSteering");
2971 body.remove("fastRoaming");
2972 body.remove("frequencies");
2973
2974 if let Some(network) = existing.network.clone() {
2975 body.insert("network".into(), network);
2976 }
2977 if let Some(filter) = existing.broadcasting_device_filter.clone() {
2978 body.insert("broadcastingDeviceFilter".into(), filter);
2979 }
2980 if let Some(hidden) = update.hide_ssid {
2981 body.insert("hideName".into(), serde_json::Value::Bool(hidden));
2982 }
2983
2984 let mut security_cfg = existing
2985 .security_configuration
2986 .as_object()
2987 .cloned()
2988 .unwrap_or_default();
2989 if let Some(mode) = update.security_mode {
2990 security_cfg.insert(
2991 "mode".into(),
2992 serde_json::Value::String(wifi_security_mode_name(mode).into()),
2993 );
2994 }
2995 if let Some(passphrase) = update.passphrase.clone() {
2996 security_cfg.insert("passphrase".into(), serde_json::Value::String(passphrase));
2997 }
2998 body.insert(
2999 "securityConfiguration".into(),
3000 serde_json::Value::Object(security_cfg),
3001 );
3002 ensure_wifi_payload_defaults(&mut body, &existing.broadcast_type);
3003
3004 crate::integration_types::WifiBroadcastCreateUpdate {
3005 name: update
3006 .name
3007 .clone()
3008 .or_else(|| update.ssid.clone())
3009 .unwrap_or_else(|| existing.name.clone()),
3010 broadcast_type: existing.broadcast_type.clone(),
3011 enabled: update.enabled.unwrap_or(existing.enabled),
3012 body,
3013 }
3014}
3015
3016fn dns_policy_type_name(policy_type: crate::model::DnsPolicyType) -> &'static str {
3017 match policy_type {
3018 crate::model::DnsPolicyType::ARecord => "A",
3019 crate::model::DnsPolicyType::AaaaRecord => "AAAA",
3020 crate::model::DnsPolicyType::CnameRecord => "CNAME",
3021 crate::model::DnsPolicyType::MxRecord => "MX",
3022 crate::model::DnsPolicyType::TxtRecord => "TXT",
3023 crate::model::DnsPolicyType::SrvRecord => "SRV",
3024 crate::model::DnsPolicyType::ForwardDomain => "FORWARD_DOMAIN",
3025 }
3026}
3027
3028fn dns_policy_type_from_name(policy_type: &str) -> crate::model::DnsPolicyType {
3029 match policy_type {
3030 "A" => crate::model::DnsPolicyType::ARecord,
3031 "AAAA" => crate::model::DnsPolicyType::AaaaRecord,
3032 "CNAME" => crate::model::DnsPolicyType::CnameRecord,
3033 "MX" => crate::model::DnsPolicyType::MxRecord,
3034 "TXT" => crate::model::DnsPolicyType::TxtRecord,
3035 "SRV" => crate::model::DnsPolicyType::SrvRecord,
3036 _ => crate::model::DnsPolicyType::ForwardDomain,
3037 }
3038}
3039
3040fn validation_failed(message: impl Into<String>) -> CoreError {
3041 CoreError::ValidationFailed {
3042 message: message.into(),
3043 }
3044}
3045
3046fn dns_domain_value(
3047 domain: Option<&str>,
3048 domains: Option<&[String]>,
3049 fallback: Option<&str>,
3050) -> Option<String> {
3051 domain
3052 .map(str::to_owned)
3053 .or_else(|| domains.and_then(|values| values.first().cloned()))
3054 .or_else(|| {
3055 fallback
3056 .filter(|value| !value.is_empty())
3057 .map(str::to_owned)
3058 })
3059}
3060
3061fn insert_string_field(
3062 fields: &mut serde_json::Map<String, serde_json::Value>,
3063 key: &str,
3064 value: Option<String>,
3065) {
3066 if let Some(value) = value {
3067 fields.insert(key.into(), serde_json::Value::String(value));
3068 }
3069}
3070
3071fn insert_u16_field(
3072 fields: &mut serde_json::Map<String, serde_json::Value>,
3073 key: &str,
3074 value: Option<u16>,
3075) {
3076 if let Some(value) = value {
3077 fields.insert(
3078 key.into(),
3079 serde_json::Value::Number(serde_json::Number::from(value)),
3080 );
3081 }
3082}
3083
3084fn insert_u32_field(
3085 fields: &mut serde_json::Map<String, serde_json::Value>,
3086 key: &str,
3087 value: Option<u32>,
3088) {
3089 if let Some(value) = value {
3090 fields.insert(
3091 key.into(),
3092 serde_json::Value::Number(serde_json::Number::from(value)),
3093 );
3094 }
3095}
3096
3097fn ensure_dns_required_string(
3098 fields: &serde_json::Map<String, serde_json::Value>,
3099 key: &str,
3100 policy_type: crate::model::DnsPolicyType,
3101) -> Result<(), CoreError> {
3102 if fields
3103 .get(key)
3104 .and_then(serde_json::Value::as_str)
3105 .is_some()
3106 {
3107 Ok(())
3108 } else {
3109 Err(validation_failed(format!(
3110 "{policy_type:?} DNS policy requires `{key}`"
3111 )))
3112 }
3113}
3114
3115fn ensure_dns_required_number(
3116 fields: &serde_json::Map<String, serde_json::Value>,
3117 key: &str,
3118 policy_type: crate::model::DnsPolicyType,
3119) -> Result<(), CoreError> {
3120 if fields
3121 .get(key)
3122 .and_then(serde_json::Value::as_u64)
3123 .is_some()
3124 {
3125 Ok(())
3126 } else {
3127 Err(validation_failed(format!(
3128 "{policy_type:?} DNS policy requires `{key}`"
3129 )))
3130 }
3131}
3132
3133fn validate_dns_policy_fields(
3134 policy_type: crate::model::DnsPolicyType,
3135 fields: &serde_json::Map<String, serde_json::Value>,
3136) -> Result<(), CoreError> {
3137 ensure_dns_required_string(fields, "domain", policy_type)?;
3138
3139 match policy_type {
3140 crate::model::DnsPolicyType::ARecord => {
3141 ensure_dns_required_string(fields, "ipv4Address", policy_type)?;
3142 ensure_dns_required_number(fields, "ttlSeconds", policy_type)?;
3143 }
3144 crate::model::DnsPolicyType::AaaaRecord => {
3145 ensure_dns_required_string(fields, "ipv6Address", policy_type)?;
3146 ensure_dns_required_number(fields, "ttlSeconds", policy_type)?;
3147 }
3148 crate::model::DnsPolicyType::CnameRecord => {
3149 ensure_dns_required_string(fields, "targetDomain", policy_type)?;
3150 ensure_dns_required_number(fields, "ttlSeconds", policy_type)?;
3151 }
3152 crate::model::DnsPolicyType::MxRecord => {
3153 ensure_dns_required_string(fields, "mailServerDomain", policy_type)?;
3154 ensure_dns_required_number(fields, "priority", policy_type)?;
3155 }
3156 crate::model::DnsPolicyType::TxtRecord => {
3157 ensure_dns_required_string(fields, "text", policy_type)?;
3158 }
3159 crate::model::DnsPolicyType::SrvRecord => {
3160 for key in ["serverDomain", "service", "protocol"] {
3161 ensure_dns_required_string(fields, key, policy_type)?;
3162 }
3163 for key in ["port", "priority", "weight"] {
3164 ensure_dns_required_number(fields, key, policy_type)?;
3165 }
3166 }
3167 crate::model::DnsPolicyType::ForwardDomain => {
3168 ensure_dns_required_string(fields, "ipAddress", policy_type)?;
3169 }
3170 }
3171
3172 Ok(())
3173}
3174
3175fn build_create_dns_policy_fields(
3176 req: &crate::command::CreateDnsPolicyRequest,
3177) -> Result<serde_json::Map<String, serde_json::Value>, CoreError> {
3178 let mut fields = serde_json::Map::new();
3179 let domain = dns_domain_value(
3180 req.domain.as_deref(),
3181 req.domains.as_deref(),
3182 Some(req.name.as_str()),
3183 )
3184 .ok_or_else(|| validation_failed("DNS policy requires `domain`"))?;
3185 fields.insert("domain".into(), serde_json::Value::String(domain));
3186
3187 match req.policy_type {
3188 crate::model::DnsPolicyType::ARecord => {
3189 insert_string_field(
3190 &mut fields,
3191 "ipv4Address",
3192 req.ipv4_address.clone().or_else(|| req.value.clone()),
3193 );
3194 insert_u32_field(&mut fields, "ttlSeconds", req.ttl_seconds);
3195 }
3196 crate::model::DnsPolicyType::AaaaRecord => {
3197 insert_string_field(
3198 &mut fields,
3199 "ipv6Address",
3200 req.ipv6_address.clone().or_else(|| req.value.clone()),
3201 );
3202 insert_u32_field(&mut fields, "ttlSeconds", req.ttl_seconds);
3203 }
3204 crate::model::DnsPolicyType::CnameRecord => {
3205 insert_string_field(
3206 &mut fields,
3207 "targetDomain",
3208 req.target_domain.clone().or_else(|| req.value.clone()),
3209 );
3210 insert_u32_field(&mut fields, "ttlSeconds", req.ttl_seconds);
3211 }
3212 crate::model::DnsPolicyType::MxRecord => {
3213 insert_string_field(
3214 &mut fields,
3215 "mailServerDomain",
3216 req.mail_server_domain.clone().or_else(|| req.value.clone()),
3217 );
3218 insert_u16_field(&mut fields, "priority", req.priority);
3219 }
3220 crate::model::DnsPolicyType::TxtRecord => {
3221 insert_string_field(
3222 &mut fields,
3223 "text",
3224 req.text.clone().or_else(|| req.value.clone()),
3225 );
3226 }
3227 crate::model::DnsPolicyType::SrvRecord => {
3228 insert_string_field(
3229 &mut fields,
3230 "serverDomain",
3231 req.server_domain.clone().or_else(|| req.value.clone()),
3232 );
3233 insert_string_field(&mut fields, "service", req.service.clone());
3234 insert_string_field(&mut fields, "protocol", req.protocol.clone());
3235 insert_u16_field(&mut fields, "port", req.port);
3236 insert_u16_field(&mut fields, "priority", req.priority);
3237 insert_u16_field(&mut fields, "weight", req.weight);
3238 }
3239 crate::model::DnsPolicyType::ForwardDomain => {
3240 insert_string_field(
3241 &mut fields,
3242 "ipAddress",
3243 req.ip_address
3244 .clone()
3245 .or_else(|| req.upstream.clone())
3246 .or_else(|| req.value.clone()),
3247 );
3248 }
3249 }
3250
3251 validate_dns_policy_fields(req.policy_type, &fields)?;
3252 Ok(fields)
3253}
3254
3255fn build_update_dns_policy_fields(
3256 existing: &crate::integration_types::DnsPolicyResponse,
3257 update: &crate::command::UpdateDnsPolicyRequest,
3258) -> Result<serde_json::Map<String, serde_json::Value>, CoreError> {
3259 let policy_type = dns_policy_type_from_name(&existing.policy_type);
3260 let mut fields: serde_json::Map<String, serde_json::Value> =
3261 existing.extra.clone().into_iter().collect();
3262
3263 if let Some(domain) = dns_domain_value(
3264 update.domain.as_deref(),
3265 update.domains.as_deref(),
3266 existing.domain.as_deref(),
3267 ) {
3268 fields.insert("domain".into(), serde_json::Value::String(domain));
3269 }
3270
3271 match policy_type {
3272 crate::model::DnsPolicyType::ARecord => {
3273 insert_string_field(
3274 &mut fields,
3275 "ipv4Address",
3276 update.ipv4_address.clone().or_else(|| update.value.clone()),
3277 );
3278 insert_u32_field(&mut fields, "ttlSeconds", update.ttl_seconds);
3279 }
3280 crate::model::DnsPolicyType::AaaaRecord => {
3281 insert_string_field(
3282 &mut fields,
3283 "ipv6Address",
3284 update.ipv6_address.clone().or_else(|| update.value.clone()),
3285 );
3286 insert_u32_field(&mut fields, "ttlSeconds", update.ttl_seconds);
3287 }
3288 crate::model::DnsPolicyType::CnameRecord => {
3289 insert_string_field(
3290 &mut fields,
3291 "targetDomain",
3292 update
3293 .target_domain
3294 .clone()
3295 .or_else(|| update.value.clone()),
3296 );
3297 insert_u32_field(&mut fields, "ttlSeconds", update.ttl_seconds);
3298 }
3299 crate::model::DnsPolicyType::MxRecord => {
3300 insert_string_field(
3301 &mut fields,
3302 "mailServerDomain",
3303 update
3304 .mail_server_domain
3305 .clone()
3306 .or_else(|| update.value.clone()),
3307 );
3308 insert_u16_field(&mut fields, "priority", update.priority);
3309 }
3310 crate::model::DnsPolicyType::TxtRecord => {
3311 insert_string_field(
3312 &mut fields,
3313 "text",
3314 update.text.clone().or_else(|| update.value.clone()),
3315 );
3316 }
3317 crate::model::DnsPolicyType::SrvRecord => {
3318 insert_string_field(
3319 &mut fields,
3320 "serverDomain",
3321 update
3322 .server_domain
3323 .clone()
3324 .or_else(|| update.value.clone()),
3325 );
3326 insert_string_field(&mut fields, "service", update.service.clone());
3327 insert_string_field(&mut fields, "protocol", update.protocol.clone());
3328 insert_u16_field(&mut fields, "port", update.port);
3329 insert_u16_field(&mut fields, "priority", update.priority);
3330 insert_u16_field(&mut fields, "weight", update.weight);
3331 }
3332 crate::model::DnsPolicyType::ForwardDomain => {
3333 insert_string_field(
3334 &mut fields,
3335 "ipAddress",
3336 update
3337 .ip_address
3338 .clone()
3339 .or_else(|| update.upstream.clone())
3340 .or_else(|| update.value.clone()),
3341 );
3342 }
3343 }
3344
3345 validate_dns_policy_fields(policy_type, &fields)?;
3346 Ok(fields)
3347}
3348
3349fn traffic_matching_list_items(
3350 entries: &[String],
3351 raw_items: Option<&[serde_json::Value]>,
3352) -> Vec<serde_json::Value> {
3353 raw_items.map_or_else(
3354 || {
3355 entries
3356 .iter()
3357 .cloned()
3358 .map(serde_json::Value::String)
3359 .collect()
3360 },
3361 <[serde_json::Value]>::to_vec,
3362 )
3363}
3364
3365#[cfg(test)]
3366mod tests {
3367 use super::{
3368 build_create_dns_policy_fields, build_create_wifi_broadcast_payload, parse_ipv4_cidr,
3369 traffic_matching_list_items,
3370 };
3371 use crate::command::{CreateDnsPolicyRequest, CreateWifiBroadcastRequest};
3372 use crate::model::{DnsPolicyType, WifiSecurityMode};
3373 use serde_json::json;
3374
3375 #[test]
3376 fn parse_ipv4_cidr_accepts_valid_input() {
3377 let (host, prefix) = parse_ipv4_cidr("192.168.10.1/24").expect("valid CIDR");
3378 assert_eq!(host.to_string(), "192.168.10.1");
3379 assert_eq!(prefix, 24);
3380 }
3381
3382 #[test]
3383 fn parse_ipv4_cidr_rejects_invalid_prefix() {
3384 assert!(parse_ipv4_cidr("192.168.10.1/40").is_err());
3385 }
3386
3387 #[test]
3388 fn parse_ipv4_cidr_rejects_missing_prefix() {
3389 assert!(parse_ipv4_cidr("192.168.10.1").is_err());
3390 }
3391
3392 #[test]
3393 fn wifi_create_payload_uses_integration_field_names() {
3394 let payload = build_create_wifi_broadcast_payload(&CreateWifiBroadcastRequest {
3395 name: "Main".into(),
3396 ssid: "Main".into(),
3397 security_mode: WifiSecurityMode::Wpa2Personal,
3398 passphrase: Some("supersecret".into()),
3399 enabled: true,
3400 network_id: None,
3401 hide_ssid: true,
3402 broadcast_type: Some("STANDARD".into()),
3403 frequencies_ghz: Some(vec![2.4, 5.0]),
3404 band_steering: true,
3405 fast_roaming: true,
3406 });
3407
3408 assert_eq!(payload.name, "Main");
3409 assert!(payload.body.get("ssid").is_none());
3410 assert_eq!(payload.body.get("hideName"), Some(&json!(true)));
3411 let frequencies = payload
3412 .body
3413 .get("broadcastingFrequenciesGHz")
3414 .and_then(serde_json::Value::as_array)
3415 .expect("frequencies array");
3416 assert_eq!(frequencies.len(), 2);
3417 assert_eq!(frequencies[1], json!(5.0));
3418 assert_eq!(payload.body.get("bandSteeringEnabled"), Some(&json!(true)));
3419 assert_eq!(payload.body.get("bssTransitionEnabled"), Some(&json!(true)));
3420 }
3421
3422 #[test]
3423 fn dns_create_fields_use_type_specific_schema_keys() {
3424 let fields = build_create_dns_policy_fields(&CreateDnsPolicyRequest {
3425 name: "example.com".into(),
3426 policy_type: DnsPolicyType::ARecord,
3427 enabled: true,
3428 domain: Some("example.com".into()),
3429 domains: None,
3430 upstream: None,
3431 value: Some("192.168.1.10".into()),
3432 ttl_seconds: Some(600),
3433 priority: None,
3434 ipv4_address: None,
3435 ipv6_address: None,
3436 target_domain: None,
3437 mail_server_domain: None,
3438 text: None,
3439 ip_address: None,
3440 server_domain: None,
3441 service: None,
3442 protocol: None,
3443 port: None,
3444 weight: None,
3445 })
3446 .expect("valid DNS fields");
3447
3448 assert_eq!(fields.get("domain"), Some(&json!("example.com")));
3449 assert_eq!(fields.get("ipv4Address"), Some(&json!("192.168.1.10")));
3450 assert_eq!(fields.get("ttlSeconds"), Some(&json!(600)));
3451 assert!(fields.get("value").is_none());
3452 assert!(fields.get("ttl").is_none());
3453 }
3454
3455 #[test]
3456 fn traffic_matching_list_items_prefer_raw_payloads() {
3457 let raw_items = [json!({"type": "PORT_NUMBER", "value": 443})];
3458 let items = traffic_matching_list_items(&["80".into()], Some(&raw_items));
3459 assert_eq!(items, vec![json!({"type": "PORT_NUMBER", "value": 443})]);
3460 }
3461}