Skip to main content

unifly_api/controller/
refresh.rs

1use std::collections::{HashMap, HashSet};
2use std::sync::Arc;
3use std::time::Duration;
4
5use futures_util::stream::{self, StreamExt};
6use tokio_util::sync::CancellationToken;
7use tracing::{debug, info, warn};
8
9use crate::core_error::CoreError;
10use crate::model::{
11    AclRule, Client, Device, DnsPolicy, EntityId, Event, FirewallPolicy, FirewallZone,
12    HealthSummary, MacAddress, NatPolicy, Network, Site, TrafficMatchingList, Voucher,
13    WifiBroadcast,
14};
15use crate::store::{DataStore, event_storage_key};
16
17use super::support::{convert_health_summaries, parse_session_device_wan_ipv6};
18use super::{Controller, REFRESH_DETAIL_CONCURRENCY};
19
20impl Controller {
21    /// Fetch all data from the controller and update the DataStore.
22    ///
23    /// Pulls devices, clients, and events from the controller APIs, converts
24    /// them to domain types, and applies them to the store. Events are
25    /// broadcast through the event channel after snapshot application.
26    #[allow(clippy::cognitive_complexity, clippy::too_many_lines)]
27    pub async fn full_refresh(&self) -> Result<(), CoreError> {
28        let integration = self.inner.integration_client.lock().await.clone();
29        let site_id = *self.inner.site_id.lock().await;
30
31        if let (Some(integration), Some(sid)) = (integration, site_id) {
32            let page_limit = 200;
33
34            let (devices_res, clients_res, networks_res, wifi_res) = tokio::join!(
35                integration.paginate_all(page_limit, |off, lim| {
36                    integration.list_devices(&sid, off, lim)
37                }),
38                integration.paginate_all(page_limit, |off, lim| {
39                    integration.list_clients(&sid, off, lim)
40                }),
41                integration.paginate_all(page_limit, |off, lim| {
42                    integration.list_networks(&sid, off, lim)
43                }),
44                integration.paginate_all(page_limit, |off, lim| {
45                    integration.list_wifi_broadcasts(&sid, off, lim)
46                }),
47            );
48
49            let (policies_res, zones_res, acls_res, dns_res, vouchers_res) = tokio::join!(
50                integration.paginate_all(page_limit, |off, lim| {
51                    integration.list_firewall_policies(&sid, off, lim)
52                }),
53                integration.paginate_all(page_limit, |off, lim| {
54                    integration.list_firewall_zones(&sid, off, lim)
55                }),
56                integration.paginate_all(page_limit, |off, lim| {
57                    integration.list_acl_rules(&sid, off, lim)
58                }),
59                integration.paginate_all(page_limit, |off, lim| {
60                    integration.list_dns_policies(&sid, off, lim)
61                }),
62                integration.paginate_all(page_limit, |off, lim| {
63                    integration.list_vouchers(&sid, off, lim)
64                }),
65            );
66
67            let (sites_res, tml_res) = tokio::join!(
68                integration.paginate_all(50, |off, lim| integration.list_sites(off, lim)),
69                integration.paginate_all(page_limit, |off, lim| {
70                    integration.list_traffic_matching_lists(&sid, off, lim)
71                }),
72            );
73
74            let devices: Vec<Device> = devices_res?.into_iter().map(Device::from).collect();
75            let mut clients: Vec<Client> = clients_res?.into_iter().map(Client::from).collect();
76            let network_ids: Vec<uuid::Uuid> = networks_res?
77                .into_iter()
78                .map(|network| network.id)
79                .collect();
80            info!(
81                network_count = network_ids.len(),
82                "fetching network details"
83            );
84            let networks: Vec<Network> = {
85                stream::iter(network_ids.into_iter().map(|network_id| {
86                    let integration = Arc::clone(&integration);
87                    async move {
88                        match integration.get_network(&sid, &network_id).await {
89                            Ok(detail) => Some(Network::from(detail)),
90                            Err(error) => {
91                                warn!(network_id = %network_id, error = %error, "network detail fetch failed");
92                                None
93                            }
94                        }
95                    }
96                }))
97                .buffer_unordered(REFRESH_DETAIL_CONCURRENCY)
98                .filter_map(async move |network| network)
99                .collect::<Vec<_>>()
100                .await
101            };
102            let wifi: Vec<WifiBroadcast> = wifi_res?.into_iter().map(WifiBroadcast::from).collect();
103            let sites: Vec<Site> = sites_res?.into_iter().map(Site::from).collect();
104            let traffic_matching_lists: Vec<TrafficMatchingList> = tml_res?
105                .into_iter()
106                .map(TrafficMatchingList::from)
107                .collect();
108
109            // Optional endpoints — errors (404, not-configured, etc.) are non-fatal
110            let policies: Vec<FirewallPolicy> = unwrap_or_empty("firewall/policies", policies_res);
111            let zones: Vec<FirewallZone> = unwrap_or_empty("firewall/zones", zones_res);
112            let acls: Vec<AclRule> = unwrap_or_empty("acl/rules", acls_res);
113            let dns: Vec<DnsPolicy> = unwrap_or_empty("dns/policies", dns_res);
114            let vouchers: Vec<Voucher> = unwrap_or_empty("vouchers", vouchers_res);
115
116            info!(
117                device_count = devices.len(),
118                "enriching devices with statistics"
119            );
120            let mut devices = {
121                stream::iter(devices.into_iter().map(|mut device| {
122                    let integration = Arc::clone(&integration);
123                    async move {
124                        if let EntityId::Uuid(device_uuid) = &device.id {
125                            match integration.get_device_statistics(&sid, device_uuid).await {
126                                Ok(stats_resp) => {
127                                    device.stats =
128                                        crate::convert::device_stats_from_integration(&stats_resp);
129                                    crate::convert::enrich_radios_from_stats(
130                                        &mut device.radios,
131                                        &stats_resp.interfaces,
132                                    );
133                                }
134                                Err(error) => {
135                                    warn!(device = ?device.name, error = %error, "device stats fetch failed");
136                                }
137                            }
138                        }
139                        device
140                    }
141                }))
142                .buffer_unordered(REFRESH_DETAIL_CONCURRENCY)
143                .collect::<Vec<_>>()
144                .await
145            };
146
147            #[allow(clippy::type_complexity)]
148            let (
149                session_events,
150                session_health,
151                session_clients,
152                session_devices,
153                session_users,
154                nat,
155            ): (
156                Vec<Event>,
157                Vec<HealthSummary>,
158                Vec<crate::session::models::SessionClientEntry>,
159                Vec<crate::session::models::SessionDevice>,
160                Vec<crate::session::models::SessionUserEntry>,
161                Vec<NatPolicy>,
162            ) = match self.inner.session_client.lock().await.clone() {
163                Some(session) => {
164                    let (events_res, health_res, clients_res, devices_res, users_res, nat_res) = tokio::join!(
165                        session.list_events(Some(100)),
166                        session.get_health(),
167                        session.list_clients(),
168                        session.list_devices(),
169                        session.list_users(),
170                        session.list_nat_rules(),
171                    );
172
173                    let events = match events_res {
174                        Ok(raw) => raw.into_iter().map(Event::from).collect(),
175                        Err(ref error) if error.is_not_found() => {
176                            debug!(
177                                auth = ?session.auth(),
178                                error = %error,
179                                "session event endpoint unavailable; treating as empty"
180                            );
181                            Vec::new()
182                        }
183                        Err(error) => {
184                            warn!(
185                                auth = ?session.auth(),
186                                error = %error,
187                                "session event fetch failed (non-fatal)"
188                            );
189                            Vec::new()
190                        }
191                    };
192
193                    let health = match health_res {
194                        Ok(raw) => convert_health_summaries(raw),
195                        Err(error) => {
196                            warn!(error = %error, "session health fetch failed (non-fatal)");
197                            Vec::new()
198                        }
199                    };
200
201                    let session_clients = match clients_res {
202                        Ok(raw) => raw,
203                        Err(error) => {
204                            warn!(error = %error, "session client fetch failed (non-fatal)");
205                            Vec::new()
206                        }
207                    };
208
209                    let session_devices = match devices_res {
210                        Ok(raw) => raw,
211                        Err(error) => {
212                            warn!(error = %error, "session device fetch failed (non-fatal)");
213                            Vec::new()
214                        }
215                    };
216
217                    let session_users = match users_res {
218                        Ok(raw) => raw,
219                        Err(error) => {
220                            warn!(error = %error, "session user fetch failed (non-fatal)");
221                            Vec::new()
222                        }
223                    };
224
225                    let nat = match nat_res {
226                        Ok(raw) => raw
227                            .iter()
228                            .filter_map(crate::convert::nat_policy_from_v2)
229                            .collect(),
230                        Err(error) => {
231                            warn!(error = %error, "v2 NAT fetch failed (non-fatal)");
232                            Vec::new()
233                        }
234                    };
235
236                    (
237                        events,
238                        health,
239                        session_clients,
240                        session_devices,
241                        session_users,
242                        nat,
243                    )
244                }
245                None => (
246                    Vec::new(),
247                    Vec::new(),
248                    Vec::new(),
249                    Vec::new(),
250                    Vec::new(),
251                    Vec::new(),
252                ),
253            };
254
255            if !session_clients.is_empty() {
256                let session_by_ip: HashMap<&str, &crate::session::models::SessionClientEntry> =
257                    session_clients
258                        .iter()
259                        .filter_map(|client| client.ip.as_deref().map(|ip| (ip, client)))
260                        .collect();
261                let mut merged = 0u32;
262                for client in &mut clients {
263                    let ip_key = client.ip.map(|ip| ip.to_string());
264                    if let Some(session_client) =
265                        ip_key.as_deref().and_then(|ip| session_by_ip.get(ip))
266                    {
267                        if client.tx_bytes.is_none() {
268                            client.tx_bytes = session_client
269                                .tx_bytes
270                                .and_then(|bytes| u64::try_from(bytes).ok());
271                        }
272                        if client.rx_bytes.is_none() {
273                            client.rx_bytes = session_client
274                                .rx_bytes
275                                .and_then(|bytes| u64::try_from(bytes).ok());
276                        }
277                        if client.hostname.is_none() {
278                            client.hostname.clone_from(&session_client.hostname);
279                        }
280                        if client.wireless.is_none() {
281                            let session_client: Client = Client::from((*session_client).clone());
282                            client.wireless = session_client.wireless;
283                        }
284                        if client.uplink_device_mac.is_none() {
285                            let uplink = if session_client.is_wired.unwrap_or(true) {
286                                session_client.sw_mac.as_deref()
287                            } else {
288                                session_client.ap_mac.as_deref()
289                            };
290                            client.uplink_device_mac = uplink.map(MacAddress::new);
291                        }
292                        merged += 1;
293                    }
294                }
295                debug!(
296                    total_clients = clients.len(),
297                    legacy_available = session_by_ip.len(),
298                    merged,
299                    "client traffic merge (by IP)"
300                );
301            }
302
303            if !session_users.is_empty() {
304                let users_by_mac: HashMap<String, &crate::session::models::SessionUserEntry> =
305                    session_users
306                        .iter()
307                        .map(|user| (user.mac.to_lowercase(), user))
308                        .collect();
309                let mut merged_users = 0u32;
310                for client in &mut clients {
311                    // Try MAC first, then fall back to matching the session
312                    // client entry (already joined by IP) whose MAC maps to
313                    // a user record. The Integration API may return UUIDs
314                    // instead of real MACs when access.macAddress is absent.
315                    let user = users_by_mac
316                        .get(&client.mac.as_str().to_lowercase())
317                        .or_else(|| {
318                            let ip_str = client.ip.map(|ip| ip.to_string())?;
319                            let session_client = session_clients
320                                .iter()
321                                .find(|lc| lc.ip.as_deref() == Some(ip_str.as_str()))?;
322                            users_by_mac.get(&session_client.mac.to_lowercase())
323                        });
324                    if let Some(user) = user {
325                        client.use_fixedip = user.use_fixedip.unwrap_or(false);
326                        client.fixed_ip = user.fixed_ip.as_deref().and_then(|ip| ip.parse().ok());
327                        if client.use_fixedip {
328                            merged_users += 1;
329                        }
330                    }
331                }
332                debug!(
333                    users_available = users_by_mac.len(),
334                    merged_users, "user DHCP reservation merge"
335                );
336            }
337
338            if !session_devices.is_empty() {
339                let session_by_mac: HashMap<&str, &crate::session::models::SessionDevice> =
340                    session_devices
341                        .iter()
342                        .map(|device| (device.mac.as_str(), device))
343                        .collect();
344                for device in &mut devices {
345                    if let Some(legacy_device) = session_by_mac.get(device.mac.as_str()) {
346                        if device.client_count.is_none() {
347                            device.client_count = legacy_device
348                                .num_sta
349                                .and_then(|count| count.try_into().ok());
350                        }
351                        if device.wan_ipv6.is_none() {
352                            device.wan_ipv6 = parse_session_device_wan_ipv6(&legacy_device.extra);
353                        }
354                        if device.ports.is_empty() || device.radios.is_empty() {
355                            let session_dev: Device = Device::from((*legacy_device).clone());
356                            if device.ports.is_empty() && !session_dev.ports.is_empty() {
357                                device.ports = session_dev.ports;
358                            }
359                            if device.radios.is_empty() && !session_dev.radios.is_empty() {
360                                device.radios = session_dev.radios;
361                            }
362                        }
363                    }
364                }
365            }
366
367            if !session_health.is_empty() {
368                self.inner
369                    .store
370                    .site_health
371                    .send_modify(|health| *health = Arc::new(session_health));
372            }
373
374            let fresh_legacy_events = unseen_events(self.store(), &session_events);
375
376            self.inner
377                .store
378                .apply_integration_snapshot(crate::store::RefreshSnapshot {
379                    devices,
380                    clients,
381                    networks,
382                    wifi,
383                    policies,
384                    zones,
385                    acls,
386                    nat,
387                    dns,
388                    vouchers,
389                    sites,
390                    events: session_events,
391                    traffic_matching_lists,
392                });
393
394            for event in fresh_legacy_events {
395                let _ = self.inner.event_tx.send(Arc::new(event));
396            }
397        } else {
398            let session = self
399                .inner
400                .session_client
401                .lock()
402                .await
403                .clone()
404                .ok_or(CoreError::ControllerDisconnected)?;
405
406            let (devices_res, clients_res, events_res, sites_res) = tokio::join!(
407                session.list_devices(),
408                session.list_clients(),
409                session.list_events(Some(100)),
410                session.list_sites(),
411            );
412
413            let devices: Vec<Device> = devices_res?.into_iter().map(Device::from).collect();
414            let clients: Vec<Client> = clients_res?.into_iter().map(Client::from).collect();
415            let events: Vec<Event> = events_res?.into_iter().map(Event::from).collect();
416            let sites: Vec<Site> = sites_res?.into_iter().map(Site::from).collect();
417            let fresh_events = unseen_events(self.store(), &events);
418
419            self.inner
420                .store
421                .apply_integration_snapshot(crate::store::RefreshSnapshot {
422                    devices,
423                    clients,
424                    networks: Vec::new(),
425                    wifi: Vec::new(),
426                    policies: Vec::new(),
427                    zones: Vec::new(),
428                    acls: Vec::new(),
429                    nat: Vec::new(),
430                    dns: Vec::new(),
431                    vouchers: Vec::new(),
432                    sites,
433                    events,
434                    traffic_matching_lists: Vec::new(),
435                });
436
437            for event in fresh_events {
438                let _ = self.inner.event_tx.send(Arc::new(event));
439            }
440        }
441
442        debug!(
443            devices = self.inner.store.device_count(),
444            clients = self.inner.store.client_count(),
445            "data refresh complete"
446        );
447
448        Ok(())
449    }
450}
451
452/// Periodically refresh data from the controller.
453pub(super) async fn refresh_task(
454    controller: Controller,
455    interval_secs: u64,
456    cancel: CancellationToken,
457) {
458    let mut interval = tokio::time::interval(Duration::from_secs(interval_secs));
459    interval.tick().await;
460
461    loop {
462        tokio::select! {
463            biased;
464            () = cancel.cancelled() => break,
465            _ = interval.tick() => {
466                if let Err(error) = controller.full_refresh().await {
467                    warn!(error = %error, "periodic refresh failed");
468                }
469            }
470        }
471    }
472}
473
474/// Downgrade a paginated result to an empty `Vec` when the endpoint returns 404.
475///
476/// Some Integration API endpoints are optional on older controller firmware.
477fn unwrap_or_empty<S, D>(endpoint: &str, result: Result<Vec<S>, crate::error::Error>) -> Vec<D>
478where
479    D: From<S>,
480{
481    match result {
482        Ok(items) => items.into_iter().map(D::from).collect(),
483        Err(ref error) if error.is_not_found() => {
484            debug!("{endpoint}: not available (404), treating as empty");
485            Vec::new()
486        }
487        Err(error) => {
488            warn!("{endpoint}: unexpected error {error}, treating as empty");
489            Vec::new()
490        }
491    }
492}
493
494fn unseen_events(store: &DataStore, events: &[Event]) -> Vec<Event> {
495    let mut seen: HashSet<String> = store
496        .events_snapshot()
497        .iter()
498        .map(|event| event_storage_key(event))
499        .collect();
500
501    events
502        .iter()
503        .filter(|event| seen.insert(event_storage_key(event)))
504        .cloned()
505        .collect()
506}