Skip to main content

unifly_api/controller/
refresh.rs

1use std::collections::{HashMap, HashSet};
2use std::sync::Arc;
3use std::time::Duration;
4
5use futures_util::stream::{self, StreamExt};
6use tokio_util::sync::CancellationToken;
7use tracing::{debug, info, warn};
8
9use crate::core_error::CoreError;
10use crate::model::{
11    AclRule, Client, Device, DnsPolicy, EntityId, Event, FirewallPolicy, FirewallZone,
12    HealthSummary, NatPolicy, Network, Site, TrafficMatchingList, Voucher, WifiBroadcast,
13};
14use crate::store::{DataStore, event_storage_key};
15
16use super::support::{convert_health_summaries, parse_session_device_wan_ipv6};
17use super::{Controller, REFRESH_DETAIL_CONCURRENCY};
18
19impl Controller {
20    /// Fetch all data from the controller and update the DataStore.
21    ///
22    /// Pulls devices, clients, and events from the controller APIs, converts
23    /// them to domain types, and applies them to the store. Events are
24    /// broadcast through the event channel after snapshot application.
25    #[allow(clippy::cognitive_complexity, clippy::too_many_lines)]
26    pub async fn full_refresh(&self) -> Result<(), CoreError> {
27        let integration = self.inner.integration_client.lock().await.clone();
28        let site_id = *self.inner.site_id.lock().await;
29
30        if let (Some(integration), Some(sid)) = (integration, site_id) {
31            let page_limit = 200;
32
33            let (devices_res, clients_res, networks_res, wifi_res) = tokio::join!(
34                integration.paginate_all(page_limit, |off, lim| {
35                    integration.list_devices(&sid, off, lim)
36                }),
37                integration.paginate_all(page_limit, |off, lim| {
38                    integration.list_clients(&sid, off, lim)
39                }),
40                integration.paginate_all(page_limit, |off, lim| {
41                    integration.list_networks(&sid, off, lim)
42                }),
43                integration.paginate_all(page_limit, |off, lim| {
44                    integration.list_wifi_broadcasts(&sid, off, lim)
45                }),
46            );
47
48            let (policies_res, zones_res, acls_res, dns_res, vouchers_res) = tokio::join!(
49                integration.paginate_all(page_limit, |off, lim| {
50                    integration.list_firewall_policies(&sid, off, lim)
51                }),
52                integration.paginate_all(page_limit, |off, lim| {
53                    integration.list_firewall_zones(&sid, off, lim)
54                }),
55                integration.paginate_all(page_limit, |off, lim| {
56                    integration.list_acl_rules(&sid, off, lim)
57                }),
58                integration.paginate_all(page_limit, |off, lim| {
59                    integration.list_dns_policies(&sid, off, lim)
60                }),
61                integration.paginate_all(page_limit, |off, lim| {
62                    integration.list_vouchers(&sid, off, lim)
63                }),
64            );
65
66            let (sites_res, tml_res) = tokio::join!(
67                integration.paginate_all(50, |off, lim| integration.list_sites(off, lim)),
68                integration.paginate_all(page_limit, |off, lim| {
69                    integration.list_traffic_matching_lists(&sid, off, lim)
70                }),
71            );
72
73            let devices: Vec<Device> = devices_res?.into_iter().map(Device::from).collect();
74            let mut clients: Vec<Client> = clients_res?.into_iter().map(Client::from).collect();
75            let network_ids: Vec<uuid::Uuid> = networks_res?
76                .into_iter()
77                .map(|network| network.id)
78                .collect();
79            info!(
80                network_count = network_ids.len(),
81                "fetching network details"
82            );
83            let networks: Vec<Network> = {
84                stream::iter(network_ids.into_iter().map(|network_id| {
85                    let integration = Arc::clone(&integration);
86                    async move {
87                        match integration.get_network(&sid, &network_id).await {
88                            Ok(detail) => Some(Network::from(detail)),
89                            Err(error) => {
90                                warn!(network_id = %network_id, error = %error, "network detail fetch failed");
91                                None
92                            }
93                        }
94                    }
95                }))
96                .buffer_unordered(REFRESH_DETAIL_CONCURRENCY)
97                .filter_map(async move |network| network)
98                .collect::<Vec<_>>()
99                .await
100            };
101            let wifi: Vec<WifiBroadcast> = wifi_res?.into_iter().map(WifiBroadcast::from).collect();
102            let sites: Vec<Site> = sites_res?.into_iter().map(Site::from).collect();
103            let traffic_matching_lists: Vec<TrafficMatchingList> = tml_res?
104                .into_iter()
105                .map(TrafficMatchingList::from)
106                .collect();
107
108            // Optional endpoints — errors (404, not-configured, etc.) are non-fatal
109            let policies: Vec<FirewallPolicy> = unwrap_or_empty("firewall/policies", policies_res);
110            let zones: Vec<FirewallZone> = unwrap_or_empty("firewall/zones", zones_res);
111            let acls: Vec<AclRule> = unwrap_or_empty("acl/rules", acls_res);
112            let dns: Vec<DnsPolicy> = unwrap_or_empty("dns/policies", dns_res);
113            let vouchers: Vec<Voucher> = unwrap_or_empty("vouchers", vouchers_res);
114
115            info!(
116                device_count = devices.len(),
117                "enriching devices with statistics"
118            );
119            let mut devices = {
120                stream::iter(devices.into_iter().map(|mut device| {
121                    let integration = Arc::clone(&integration);
122                    async move {
123                        if let EntityId::Uuid(device_uuid) = &device.id {
124                            match integration.get_device_statistics(&sid, device_uuid).await {
125                                Ok(stats_resp) => {
126                                    device.stats =
127                                        crate::convert::device_stats_from_integration(&stats_resp);
128                                }
129                                Err(error) => {
130                                    warn!(device = ?device.name, error = %error, "device stats fetch failed");
131                                }
132                            }
133                        }
134                        device
135                    }
136                }))
137                .buffer_unordered(REFRESH_DETAIL_CONCURRENCY)
138                .collect::<Vec<_>>()
139                .await
140            };
141
142            #[allow(clippy::type_complexity)]
143            let (
144                session_events,
145                session_health,
146                session_clients,
147                session_devices,
148                session_users,
149                nat,
150            ): (
151                Vec<Event>,
152                Vec<HealthSummary>,
153                Vec<crate::session::models::SessionClientEntry>,
154                Vec<crate::session::models::SessionDevice>,
155                Vec<crate::session::models::SessionUserEntry>,
156                Vec<NatPolicy>,
157            ) = match self.inner.session_client.lock().await.clone() {
158                Some(session) => {
159                    let (events_res, health_res, clients_res, devices_res, users_res, nat_res) = tokio::join!(
160                        session.list_events(Some(100)),
161                        session.get_health(),
162                        session.list_clients(),
163                        session.list_devices(),
164                        session.list_users(),
165                        session.list_nat_rules(),
166                    );
167
168                    let events = match events_res {
169                        Ok(raw) => raw.into_iter().map(Event::from).collect(),
170                        Err(ref error) if error.is_not_found() => {
171                            debug!(
172                                auth = ?session.auth(),
173                                error = %error,
174                                "session event endpoint unavailable; treating as empty"
175                            );
176                            Vec::new()
177                        }
178                        Err(error) => {
179                            warn!(
180                                auth = ?session.auth(),
181                                error = %error,
182                                "session event fetch failed (non-fatal)"
183                            );
184                            Vec::new()
185                        }
186                    };
187
188                    let health = match health_res {
189                        Ok(raw) => convert_health_summaries(raw),
190                        Err(error) => {
191                            warn!(error = %error, "session health fetch failed (non-fatal)");
192                            Vec::new()
193                        }
194                    };
195
196                    let session_clients = match clients_res {
197                        Ok(raw) => raw,
198                        Err(error) => {
199                            warn!(error = %error, "session client fetch failed (non-fatal)");
200                            Vec::new()
201                        }
202                    };
203
204                    let session_devices = match devices_res {
205                        Ok(raw) => raw,
206                        Err(error) => {
207                            warn!(error = %error, "session device fetch failed (non-fatal)");
208                            Vec::new()
209                        }
210                    };
211
212                    let session_users = match users_res {
213                        Ok(raw) => raw,
214                        Err(error) => {
215                            warn!(error = %error, "session user fetch failed (non-fatal)");
216                            Vec::new()
217                        }
218                    };
219
220                    let nat = match nat_res {
221                        Ok(raw) => raw
222                            .iter()
223                            .filter_map(crate::convert::nat_policy_from_v2)
224                            .collect(),
225                        Err(error) => {
226                            warn!(error = %error, "v2 NAT fetch failed (non-fatal)");
227                            Vec::new()
228                        }
229                    };
230
231                    (
232                        events,
233                        health,
234                        session_clients,
235                        session_devices,
236                        session_users,
237                        nat,
238                    )
239                }
240                None => (
241                    Vec::new(),
242                    Vec::new(),
243                    Vec::new(),
244                    Vec::new(),
245                    Vec::new(),
246                    Vec::new(),
247                ),
248            };
249
250            if !session_clients.is_empty() {
251                let session_by_ip: HashMap<&str, &crate::session::models::SessionClientEntry> =
252                    session_clients
253                        .iter()
254                        .filter_map(|client| client.ip.as_deref().map(|ip| (ip, client)))
255                        .collect();
256                let mut merged = 0u32;
257                for client in &mut clients {
258                    let ip_key = client.ip.map(|ip| ip.to_string());
259                    if let Some(session_client) =
260                        ip_key.as_deref().and_then(|ip| session_by_ip.get(ip))
261                    {
262                        if client.tx_bytes.is_none() {
263                            client.tx_bytes = session_client
264                                .tx_bytes
265                                .and_then(|bytes| u64::try_from(bytes).ok());
266                        }
267                        if client.rx_bytes.is_none() {
268                            client.rx_bytes = session_client
269                                .rx_bytes
270                                .and_then(|bytes| u64::try_from(bytes).ok());
271                        }
272                        if client.hostname.is_none() {
273                            client.hostname.clone_from(&session_client.hostname);
274                        }
275                        if client.wireless.is_none() {
276                            let session_client: Client = Client::from((*session_client).clone());
277                            client.wireless = session_client.wireless;
278                            if client.uplink_device_mac.is_none() {
279                                client.uplink_device_mac = session_client.uplink_device_mac;
280                            }
281                        }
282                        merged += 1;
283                    }
284                }
285                debug!(
286                    total_clients = clients.len(),
287                    legacy_available = session_by_ip.len(),
288                    merged,
289                    "client traffic merge (by IP)"
290                );
291            }
292
293            if !session_users.is_empty() {
294                let users_by_mac: HashMap<String, &crate::session::models::SessionUserEntry> =
295                    session_users
296                        .iter()
297                        .map(|user| (user.mac.to_lowercase(), user))
298                        .collect();
299                let mut merged_users = 0u32;
300                for client in &mut clients {
301                    // Try MAC first, then fall back to matching the session
302                    // client entry (already joined by IP) whose MAC maps to
303                    // a user record. The Integration API may return UUIDs
304                    // instead of real MACs when access.macAddress is absent.
305                    let user = users_by_mac
306                        .get(&client.mac.as_str().to_lowercase())
307                        .or_else(|| {
308                            let ip_str = client.ip.map(|ip| ip.to_string())?;
309                            let session_client = session_clients
310                                .iter()
311                                .find(|lc| lc.ip.as_deref() == Some(ip_str.as_str()))?;
312                            users_by_mac.get(&session_client.mac.to_lowercase())
313                        });
314                    if let Some(user) = user {
315                        client.use_fixedip = user.use_fixedip.unwrap_or(false);
316                        client.fixed_ip = user.fixed_ip.as_deref().and_then(|ip| ip.parse().ok());
317                        if client.use_fixedip {
318                            merged_users += 1;
319                        }
320                    }
321                }
322                debug!(
323                    users_available = users_by_mac.len(),
324                    merged_users, "user DHCP reservation merge"
325                );
326            }
327
328            if !session_devices.is_empty() {
329                let session_by_mac: HashMap<&str, &crate::session::models::SessionDevice> =
330                    session_devices
331                        .iter()
332                        .map(|device| (device.mac.as_str(), device))
333                        .collect();
334                for device in &mut devices {
335                    if let Some(legacy_device) = session_by_mac.get(device.mac.as_str()) {
336                        if device.client_count.is_none() {
337                            device.client_count = legacy_device
338                                .num_sta
339                                .and_then(|count| count.try_into().ok());
340                        }
341                        if device.wan_ipv6.is_none() {
342                            device.wan_ipv6 = parse_session_device_wan_ipv6(&legacy_device.extra);
343                        }
344                    }
345                }
346            }
347
348            if !session_health.is_empty() {
349                self.inner
350                    .store
351                    .site_health
352                    .send_modify(|health| *health = Arc::new(session_health));
353            }
354
355            let fresh_legacy_events = unseen_events(self.store(), &session_events);
356
357            self.inner
358                .store
359                .apply_integration_snapshot(crate::store::RefreshSnapshot {
360                    devices,
361                    clients,
362                    networks,
363                    wifi,
364                    policies,
365                    zones,
366                    acls,
367                    nat,
368                    dns,
369                    vouchers,
370                    sites,
371                    events: session_events,
372                    traffic_matching_lists,
373                });
374
375            for event in fresh_legacy_events {
376                let _ = self.inner.event_tx.send(Arc::new(event));
377            }
378        } else {
379            let session = self
380                .inner
381                .session_client
382                .lock()
383                .await
384                .clone()
385                .ok_or(CoreError::ControllerDisconnected)?;
386
387            let (devices_res, clients_res, events_res, sites_res) = tokio::join!(
388                session.list_devices(),
389                session.list_clients(),
390                session.list_events(Some(100)),
391                session.list_sites(),
392            );
393
394            let devices: Vec<Device> = devices_res?.into_iter().map(Device::from).collect();
395            let clients: Vec<Client> = clients_res?.into_iter().map(Client::from).collect();
396            let events: Vec<Event> = events_res?.into_iter().map(Event::from).collect();
397            let sites: Vec<Site> = sites_res?.into_iter().map(Site::from).collect();
398            let fresh_events = unseen_events(self.store(), &events);
399
400            self.inner
401                .store
402                .apply_integration_snapshot(crate::store::RefreshSnapshot {
403                    devices,
404                    clients,
405                    networks: Vec::new(),
406                    wifi: Vec::new(),
407                    policies: Vec::new(),
408                    zones: Vec::new(),
409                    acls: Vec::new(),
410                    nat: Vec::new(),
411                    dns: Vec::new(),
412                    vouchers: Vec::new(),
413                    sites,
414                    events,
415                    traffic_matching_lists: Vec::new(),
416                });
417
418            for event in fresh_events {
419                let _ = self.inner.event_tx.send(Arc::new(event));
420            }
421        }
422
423        debug!(
424            devices = self.inner.store.device_count(),
425            clients = self.inner.store.client_count(),
426            "data refresh complete"
427        );
428
429        Ok(())
430    }
431}
432
433/// Periodically refresh data from the controller.
434pub(super) async fn refresh_task(
435    controller: Controller,
436    interval_secs: u64,
437    cancel: CancellationToken,
438) {
439    let mut interval = tokio::time::interval(Duration::from_secs(interval_secs));
440    interval.tick().await;
441
442    loop {
443        tokio::select! {
444            biased;
445            () = cancel.cancelled() => break,
446            _ = interval.tick() => {
447                if let Err(error) = controller.full_refresh().await {
448                    warn!(error = %error, "periodic refresh failed");
449                }
450            }
451        }
452    }
453}
454
455/// Downgrade a paginated result to an empty `Vec` when the endpoint returns 404.
456///
457/// Some Integration API endpoints are optional on older controller firmware.
458fn unwrap_or_empty<S, D>(endpoint: &str, result: Result<Vec<S>, crate::error::Error>) -> Vec<D>
459where
460    D: From<S>,
461{
462    match result {
463        Ok(items) => items.into_iter().map(D::from).collect(),
464        Err(ref error) if error.is_not_found() => {
465            debug!("{endpoint}: not available (404), treating as empty");
466            Vec::new()
467        }
468        Err(error) => {
469            warn!("{endpoint}: unexpected error {error}, treating as empty");
470            Vec::new()
471        }
472    }
473}
474
475fn unseen_events(store: &DataStore, events: &[Event]) -> Vec<Event> {
476    let mut seen: HashSet<String> = store
477        .events_snapshot()
478        .iter()
479        .map(|event| event_storage_key(event))
480        .collect();
481
482    events
483        .iter()
484        .filter(|event| seen.insert(event_storage_key(event)))
485        .cloned()
486        .collect()
487}