Skip to main content

unifly_api/controller/
refresh.rs

1use std::collections::{HashMap, HashSet};
2use std::sync::Arc;
3use std::time::Duration;
4
5use futures_util::stream::{self, StreamExt};
6use tokio_util::sync::CancellationToken;
7use tracing::{debug, info, warn};
8
9use crate::core_error::CoreError;
10use crate::model::{
11    AclRule, Client, Device, DnsPolicy, EntityId, Event, FirewallGroup, FirewallPolicy,
12    FirewallZone, HealthSummary, MacAddress, NatPolicy, Network, Site, TrafficMatchingList,
13    Voucher, WifiBroadcast,
14};
15use crate::store::{DataStore, event_storage_key};
16
17use super::support::{convert_health_summaries, parse_session_device_wan_ipv6};
18use super::{Controller, REFRESH_DETAIL_CONCURRENCY};
19
20impl Controller {
21    /// Fetch all data from the controller and update the DataStore.
22    ///
23    /// Pulls devices, clients, and events from the controller APIs, converts
24    /// them to domain types, and applies them to the store. Events are
25    /// broadcast through the event channel after snapshot application.
26    #[allow(clippy::cognitive_complexity, clippy::too_many_lines)]
27    pub async fn full_refresh(&self) -> Result<(), CoreError> {
28        let integration = self.inner.integration_client.lock().await.clone();
29        let site_id = *self.inner.site_id.lock().await;
30
31        if let (Some(integration), Some(sid)) = (integration, site_id) {
32            let page_limit = 200;
33
34            let (devices_res, clients_res, networks_res, wifi_res) = tokio::join!(
35                integration.paginate_all(page_limit, |off, lim| {
36                    integration.list_devices(&sid, off, lim)
37                }),
38                integration.paginate_all(page_limit, |off, lim| {
39                    integration.list_clients(&sid, off, lim)
40                }),
41                integration.paginate_all(page_limit, |off, lim| {
42                    integration.list_networks(&sid, off, lim)
43                }),
44                integration.paginate_all(page_limit, |off, lim| {
45                    integration.list_wifi_broadcasts(&sid, off, lim)
46                }),
47            );
48
49            let (policies_res, zones_res, acls_res, dns_res, vouchers_res) = tokio::join!(
50                integration.paginate_all(page_limit, |off, lim| {
51                    integration.list_firewall_policies(&sid, off, lim)
52                }),
53                integration.paginate_all(page_limit, |off, lim| {
54                    integration.list_firewall_zones(&sid, off, lim)
55                }),
56                integration.paginate_all(page_limit, |off, lim| {
57                    integration.list_acl_rules(&sid, off, lim)
58                }),
59                integration.paginate_all(page_limit, |off, lim| {
60                    integration.list_dns_policies(&sid, off, lim)
61                }),
62                integration.paginate_all(page_limit, |off, lim| {
63                    integration.list_vouchers(&sid, off, lim)
64                }),
65            );
66
67            let (sites_res, tml_res) = tokio::join!(
68                integration.paginate_all(50, |off, lim| integration.list_sites(off, lim)),
69                integration.paginate_all(page_limit, |off, lim| {
70                    integration.list_traffic_matching_lists(&sid, off, lim)
71                }),
72            );
73
74            let devices: Vec<Device> = devices_res?.into_iter().map(Device::from).collect();
75            let mut clients: Vec<Client> = clients_res?.into_iter().map(Client::from).collect();
76            let network_ids: Vec<uuid::Uuid> = networks_res?
77                .into_iter()
78                .map(|network| network.id)
79                .collect();
80            info!(
81                network_count = network_ids.len(),
82                "fetching network details"
83            );
84            let networks: Vec<Network> = {
85                stream::iter(network_ids.into_iter().map(|network_id| {
86                    let integration = Arc::clone(&integration);
87                    async move {
88                        match integration.get_network(&sid, &network_id).await {
89                            Ok(detail) => Some(Network::from(detail)),
90                            Err(error) => {
91                                warn!(network_id = %network_id, error = %error, "network detail fetch failed");
92                                None
93                            }
94                        }
95                    }
96                }))
97                .buffer_unordered(REFRESH_DETAIL_CONCURRENCY)
98                .filter_map(async move |network| network)
99                .collect::<Vec<_>>()
100                .await
101            };
102            let wifi: Vec<WifiBroadcast> = wifi_res?.into_iter().map(WifiBroadcast::from).collect();
103            let sites: Vec<Site> = sites_res?.into_iter().map(Site::from).collect();
104            let traffic_matching_lists: Vec<TrafficMatchingList> = tml_res?
105                .into_iter()
106                .map(TrafficMatchingList::from)
107                .collect();
108
109            // Optional endpoints — errors (404, not-configured, etc.) are non-fatal
110            let policies: Vec<FirewallPolicy> = unwrap_or_empty("firewall/policies", policies_res);
111            let zones: Vec<FirewallZone> = unwrap_or_empty("firewall/zones", zones_res);
112            let acls: Vec<AclRule> = unwrap_or_empty("acl/rules", acls_res);
113            let dns: Vec<DnsPolicy> = unwrap_or_empty("dns/policies", dns_res);
114            let vouchers: Vec<Voucher> = unwrap_or_empty("vouchers", vouchers_res);
115
116            info!(
117                device_count = devices.len(),
118                "enriching devices with statistics"
119            );
120            let mut devices = {
121                stream::iter(devices.into_iter().map(|mut device| {
122                    let integration = Arc::clone(&integration);
123                    async move {
124                        if let EntityId::Uuid(device_uuid) = &device.id {
125                            match integration.get_device_statistics(&sid, device_uuid).await {
126                                Ok(stats_resp) => {
127                                    device.stats =
128                                        crate::convert::device_stats_from_integration(&stats_resp);
129                                    crate::convert::enrich_radios_from_stats(
130                                        &mut device.radios,
131                                        &stats_resp.interfaces,
132                                    );
133                                }
134                                Err(error) => {
135                                    warn!(device = ?device.name, error = %error, "device stats fetch failed");
136                                }
137                            }
138                        }
139                        device
140                    }
141                }))
142                .buffer_unordered(REFRESH_DETAIL_CONCURRENCY)
143                .collect::<Vec<_>>()
144                .await
145            };
146
147            #[allow(clippy::type_complexity)]
148            let (
149                session_events,
150                session_health,
151                session_clients,
152                session_devices,
153                session_users,
154                nat,
155                firewall_groups,
156            ): (
157                Vec<Event>,
158                Vec<HealthSummary>,
159                Vec<crate::session::models::SessionClientEntry>,
160                Vec<crate::session::models::SessionDevice>,
161                Vec<crate::session::models::SessionUserEntry>,
162                Vec<NatPolicy>,
163                Vec<FirewallGroup>,
164            ) = match self.inner.session_client.lock().await.clone() {
165                Some(session) => {
166                    let (
167                        events_res,
168                        health_res,
169                        clients_res,
170                        devices_res,
171                        users_res,
172                        nat_res,
173                        fwg_res,
174                    ) = tokio::join!(
175                        session.list_events(Some(100)),
176                        session.get_health(),
177                        session.list_clients(),
178                        session.list_devices(),
179                        session.list_users(),
180                        session.list_nat_rules(),
181                        session.list_firewall_groups(),
182                    );
183
184                    let events = match events_res {
185                        Ok(raw) => raw.into_iter().map(Event::from).collect(),
186                        Err(ref error) if error.is_not_found() => {
187                            debug!(
188                                auth = ?session.auth(),
189                                error = %error,
190                                "session event endpoint unavailable; treating as empty"
191                            );
192                            Vec::new()
193                        }
194                        Err(error) => {
195                            warn!(
196                                auth = ?session.auth(),
197                                error = %error,
198                                "session event fetch failed (non-fatal)"
199                            );
200                            Vec::new()
201                        }
202                    };
203
204                    let health = match health_res {
205                        Ok(raw) => convert_health_summaries(raw),
206                        Err(error) => {
207                            warn!(error = %error, "session health fetch failed (non-fatal)");
208                            Vec::new()
209                        }
210                    };
211
212                    let session_clients = match clients_res {
213                        Ok(raw) => raw,
214                        Err(error) => {
215                            warn!(error = %error, "session client fetch failed (non-fatal)");
216                            Vec::new()
217                        }
218                    };
219
220                    let session_devices = match devices_res {
221                        Ok(raw) => raw,
222                        Err(error) => {
223                            warn!(error = %error, "session device fetch failed (non-fatal)");
224                            Vec::new()
225                        }
226                    };
227
228                    let session_users = match users_res {
229                        Ok(raw) => raw,
230                        Err(error) => {
231                            warn!(error = %error, "session user fetch failed (non-fatal)");
232                            Vec::new()
233                        }
234                    };
235
236                    let nat = match nat_res {
237                        Ok(raw) => raw
238                            .iter()
239                            .filter_map(crate::convert::nat_policy_from_v2)
240                            .collect(),
241                        Err(error) => {
242                            warn!(error = %error, "v2 NAT fetch failed (non-fatal)");
243                            Vec::new()
244                        }
245                    };
246
247                    let firewall_groups = match fwg_res {
248                        Ok(raw) => raw
249                            .iter()
250                            .filter_map(crate::convert::firewall_group_from_session)
251                            .collect(),
252                        Err(error) => {
253                            warn!(error = %error, "firewall group fetch failed (non-fatal)");
254                            Vec::new()
255                        }
256                    };
257
258                    (
259                        events,
260                        health,
261                        session_clients,
262                        session_devices,
263                        session_users,
264                        nat,
265                        firewall_groups,
266                    )
267                }
268                None => (
269                    Vec::new(),
270                    Vec::new(),
271                    Vec::new(),
272                    Vec::new(),
273                    Vec::new(),
274                    Vec::new(),
275                    Vec::new(),
276                ),
277            };
278
279            if !session_clients.is_empty() {
280                let session_by_ip: HashMap<&str, &crate::session::models::SessionClientEntry> =
281                    session_clients
282                        .iter()
283                        .filter_map(|client| client.ip.as_deref().map(|ip| (ip, client)))
284                        .collect();
285                let mut merged = 0u32;
286                for client in &mut clients {
287                    let ip_key = client.ip.map(|ip| ip.to_string());
288                    if let Some(session_client) =
289                        ip_key.as_deref().and_then(|ip| session_by_ip.get(ip))
290                    {
291                        if client.tx_bytes.is_none() {
292                            client.tx_bytes = session_client
293                                .tx_bytes
294                                .and_then(|bytes| u64::try_from(bytes).ok());
295                        }
296                        if client.rx_bytes.is_none() {
297                            client.rx_bytes = session_client
298                                .rx_bytes
299                                .and_then(|bytes| u64::try_from(bytes).ok());
300                        }
301                        if client.hostname.is_none() {
302                            client.hostname.clone_from(&session_client.hostname);
303                        }
304                        if client.wireless.is_none() {
305                            let session_client: Client = Client::from((*session_client).clone());
306                            client.wireless = session_client.wireless;
307                        }
308                        // Match `convert/client.rs`'s default: an absent
309                        // `is_wired` means wireless, not wired.
310                        let session_is_wired = session_client.is_wired.unwrap_or(false);
311                        if client.uplink_device_mac.is_none() {
312                            let uplink = if session_is_wired {
313                                session_client.sw_mac.as_deref()
314                            } else {
315                                session_client.ap_mac.as_deref()
316                            };
317                            client.uplink_device_mac = uplink.map(MacAddress::new);
318                        }
319                        if client.switch_port.is_none() && session_is_wired {
320                            client.switch_port =
321                                session_client.sw_port.and_then(|p| u32::try_from(p).ok());
322                        }
323                        merged += 1;
324                    }
325                }
326                debug!(
327                    total_clients = clients.len(),
328                    legacy_available = session_by_ip.len(),
329                    merged,
330                    "client traffic merge (by IP)"
331                );
332            }
333
334            if !session_users.is_empty() {
335                let users_by_mac: HashMap<String, &crate::session::models::SessionUserEntry> =
336                    session_users
337                        .iter()
338                        .map(|user| (user.mac.to_lowercase(), user))
339                        .collect();
340                let mut merged_users = 0u32;
341                for client in &mut clients {
342                    // Try MAC first, then fall back to matching the session
343                    // client entry (already joined by IP) whose MAC maps to
344                    // a user record. The Integration API may return UUIDs
345                    // instead of real MACs when access.macAddress is absent.
346                    let user = users_by_mac
347                        .get(&client.mac.as_str().to_lowercase())
348                        .or_else(|| {
349                            let ip_str = client.ip.map(|ip| ip.to_string())?;
350                            let session_client = session_clients
351                                .iter()
352                                .find(|lc| lc.ip.as_deref() == Some(ip_str.as_str()))?;
353                            users_by_mac.get(&session_client.mac.to_lowercase())
354                        });
355                    if let Some(user) = user {
356                        client.use_fixedip = user.use_fixedip.unwrap_or(false);
357                        client.fixed_ip = user.fixed_ip.as_deref().and_then(|ip| ip.parse().ok());
358                        if client.use_fixedip {
359                            merged_users += 1;
360                        }
361                    }
362                }
363                debug!(
364                    users_available = users_by_mac.len(),
365                    merged_users, "user DHCP reservation merge"
366                );
367            }
368
369            if !session_devices.is_empty() {
370                let session_by_mac: HashMap<&str, &crate::session::models::SessionDevice> =
371                    session_devices
372                        .iter()
373                        .map(|device| (device.mac.as_str(), device))
374                        .collect();
375                for device in &mut devices {
376                    if let Some(legacy_device) = session_by_mac.get(device.mac.as_str()) {
377                        if device.client_count.is_none() {
378                            device.client_count = legacy_device
379                                .num_sta
380                                .and_then(|count| count.try_into().ok());
381                        }
382                        if device.wan_ipv6.is_none() {
383                            device.wan_ipv6 = parse_session_device_wan_ipv6(&legacy_device.extra);
384                        }
385                        if device.ports.is_empty()
386                            || device.radios.is_empty()
387                            || device.uplink_device_mac.is_none()
388                            || device.uplink_port_idx.is_none()
389                        {
390                            let session_dev: Device = Device::from((*legacy_device).clone());
391                            if device.ports.is_empty() && !session_dev.ports.is_empty() {
392                                device.ports = session_dev.ports;
393                            }
394                            if device.radios.is_empty() && !session_dev.radios.is_empty() {
395                                device.radios = session_dev.radios;
396                            }
397                            if device.uplink_device_mac.is_none() {
398                                device.uplink_device_mac = session_dev.uplink_device_mac;
399                            }
400                            if device.uplink_port_idx.is_none() {
401                                device.uplink_port_idx = session_dev.uplink_port_idx;
402                            }
403                        }
404                    }
405                }
406            }
407
408            if !session_health.is_empty() {
409                self.inner
410                    .store
411                    .site_health
412                    .send_modify(|health| *health = Arc::new(session_health));
413            }
414
415            let fresh_legacy_events = unseen_events(self.store(), &session_events);
416
417            self.inner
418                .store
419                .apply_integration_snapshot(crate::store::RefreshSnapshot {
420                    devices,
421                    clients,
422                    networks,
423                    wifi,
424                    policies,
425                    zones,
426                    acls,
427                    nat,
428                    dns,
429                    vouchers,
430                    sites,
431                    events: session_events,
432                    traffic_matching_lists,
433                    firewall_groups,
434                });
435
436            for event in fresh_legacy_events {
437                let _ = self.inner.event_tx.send(Arc::new(event));
438            }
439        } else {
440            let session = self
441                .inner
442                .session_client
443                .lock()
444                .await
445                .clone()
446                .ok_or(CoreError::ControllerDisconnected)?;
447
448            let (devices_res, clients_res, events_res, sites_res, fwg_res) = tokio::join!(
449                session.list_devices(),
450                session.list_clients(),
451                session.list_events(Some(100)),
452                session.list_sites(),
453                session.list_firewall_groups(),
454            );
455
456            let devices: Vec<Device> = devices_res?.into_iter().map(Device::from).collect();
457            let clients: Vec<Client> = clients_res?.into_iter().map(Client::from).collect();
458            let events: Vec<Event> = events_res?.into_iter().map(Event::from).collect();
459            let sites: Vec<Site> = sites_res?.into_iter().map(Site::from).collect();
460            let firewall_groups = match fwg_res {
461                Ok(raw) => raw
462                    .iter()
463                    .filter_map(crate::convert::firewall_group_from_session)
464                    .collect(),
465                Err(error) => {
466                    warn!(error = %error, "firewall group fetch failed (non-fatal)");
467                    Vec::new()
468                }
469            };
470            let fresh_events = unseen_events(self.store(), &events);
471
472            self.inner
473                .store
474                .apply_integration_snapshot(crate::store::RefreshSnapshot {
475                    devices,
476                    clients,
477                    networks: Vec::new(),
478                    wifi: Vec::new(),
479                    policies: Vec::new(),
480                    zones: Vec::new(),
481                    acls: Vec::new(),
482                    nat: Vec::new(),
483                    dns: Vec::new(),
484                    vouchers: Vec::new(),
485                    sites,
486                    events,
487                    traffic_matching_lists: Vec::new(),
488                    firewall_groups,
489                });
490
491            for event in fresh_events {
492                let _ = self.inner.event_tx.send(Arc::new(event));
493            }
494        }
495
496        debug!(
497            devices = self.inner.store.device_count(),
498            clients = self.inner.store.client_count(),
499            "data refresh complete"
500        );
501
502        Ok(())
503    }
504}
505
506/// Periodically refresh data from the controller.
507pub(super) async fn refresh_task(
508    controller: Controller,
509    interval_secs: u64,
510    cancel: CancellationToken,
511) {
512    let mut interval = tokio::time::interval(Duration::from_secs(interval_secs));
513    interval.tick().await;
514
515    loop {
516        tokio::select! {
517            biased;
518            () = cancel.cancelled() => break,
519            _ = interval.tick() => {
520                if let Err(error) = controller.full_refresh().await {
521                    warn!(error = %error, "periodic refresh failed");
522                }
523            }
524        }
525    }
526}
527
528/// Downgrade a paginated result to an empty `Vec` when the endpoint returns 404.
529///
530/// Some Integration API endpoints are optional on older controller firmware.
531fn unwrap_or_empty<S, D>(endpoint: &str, result: Result<Vec<S>, crate::error::Error>) -> Vec<D>
532where
533    D: From<S>,
534{
535    match result {
536        Ok(items) => items.into_iter().map(D::from).collect(),
537        Err(ref error) if error.is_not_found() => {
538            debug!("{endpoint}: not available (404), treating as empty");
539            Vec::new()
540        }
541        Err(error) => {
542            warn!("{endpoint}: unexpected error {error}, treating as empty");
543            Vec::new()
544        }
545    }
546}
547
548fn unseen_events(store: &DataStore, events: &[Event]) -> Vec<Event> {
549    let mut seen: HashSet<String> = store
550        .events_snapshot()
551        .iter()
552        .map(|event| event_storage_key(event))
553        .collect();
554
555    events
556        .iter()
557        .filter(|event| seen.insert(event_storage_key(event)))
558        .cloned()
559        .collect()
560}