1use std::collections::{HashMap, HashSet};
2use std::sync::Arc;
3use std::time::Duration;
4
5use futures_util::stream::{self, StreamExt};
6use tokio_util::sync::CancellationToken;
7use tracing::{debug, info, warn};
8
9use crate::core_error::CoreError;
10use crate::model::{
11 AclRule, Client, Device, DnsPolicy, EntityId, Event, FirewallPolicy, FirewallZone,
12 HealthSummary, MacAddress, NatPolicy, Network, Site, TrafficMatchingList, Voucher,
13 WifiBroadcast,
14};
15use crate::store::{DataStore, event_storage_key};
16
17use super::support::{convert_health_summaries, parse_session_device_wan_ipv6};
18use super::{Controller, REFRESH_DETAIL_CONCURRENCY};
19
20impl Controller {
21 #[allow(clippy::cognitive_complexity, clippy::too_many_lines)]
27 pub async fn full_refresh(&self) -> Result<(), CoreError> {
28 let integration = self.inner.integration_client.lock().await.clone();
29 let site_id = *self.inner.site_id.lock().await;
30
31 if let (Some(integration), Some(sid)) = (integration, site_id) {
32 let page_limit = 200;
33
34 let (devices_res, clients_res, networks_res, wifi_res) = tokio::join!(
35 integration.paginate_all(page_limit, |off, lim| {
36 integration.list_devices(&sid, off, lim)
37 }),
38 integration.paginate_all(page_limit, |off, lim| {
39 integration.list_clients(&sid, off, lim)
40 }),
41 integration.paginate_all(page_limit, |off, lim| {
42 integration.list_networks(&sid, off, lim)
43 }),
44 integration.paginate_all(page_limit, |off, lim| {
45 integration.list_wifi_broadcasts(&sid, off, lim)
46 }),
47 );
48
49 let (policies_res, zones_res, acls_res, dns_res, vouchers_res) = tokio::join!(
50 integration.paginate_all(page_limit, |off, lim| {
51 integration.list_firewall_policies(&sid, off, lim)
52 }),
53 integration.paginate_all(page_limit, |off, lim| {
54 integration.list_firewall_zones(&sid, off, lim)
55 }),
56 integration.paginate_all(page_limit, |off, lim| {
57 integration.list_acl_rules(&sid, off, lim)
58 }),
59 integration.paginate_all(page_limit, |off, lim| {
60 integration.list_dns_policies(&sid, off, lim)
61 }),
62 integration.paginate_all(page_limit, |off, lim| {
63 integration.list_vouchers(&sid, off, lim)
64 }),
65 );
66
67 let (sites_res, tml_res) = tokio::join!(
68 integration.paginate_all(50, |off, lim| integration.list_sites(off, lim)),
69 integration.paginate_all(page_limit, |off, lim| {
70 integration.list_traffic_matching_lists(&sid, off, lim)
71 }),
72 );
73
74 let devices: Vec<Device> = devices_res?.into_iter().map(Device::from).collect();
75 let mut clients: Vec<Client> = clients_res?.into_iter().map(Client::from).collect();
76 let network_ids: Vec<uuid::Uuid> = networks_res?
77 .into_iter()
78 .map(|network| network.id)
79 .collect();
80 info!(
81 network_count = network_ids.len(),
82 "fetching network details"
83 );
84 let networks: Vec<Network> = {
85 stream::iter(network_ids.into_iter().map(|network_id| {
86 let integration = Arc::clone(&integration);
87 async move {
88 match integration.get_network(&sid, &network_id).await {
89 Ok(detail) => Some(Network::from(detail)),
90 Err(error) => {
91 warn!(network_id = %network_id, error = %error, "network detail fetch failed");
92 None
93 }
94 }
95 }
96 }))
97 .buffer_unordered(REFRESH_DETAIL_CONCURRENCY)
98 .filter_map(async move |network| network)
99 .collect::<Vec<_>>()
100 .await
101 };
102 let wifi: Vec<WifiBroadcast> = wifi_res?.into_iter().map(WifiBroadcast::from).collect();
103 let sites: Vec<Site> = sites_res?.into_iter().map(Site::from).collect();
104 let traffic_matching_lists: Vec<TrafficMatchingList> = tml_res?
105 .into_iter()
106 .map(TrafficMatchingList::from)
107 .collect();
108
109 let policies: Vec<FirewallPolicy> = unwrap_or_empty("firewall/policies", policies_res);
111 let zones: Vec<FirewallZone> = unwrap_or_empty("firewall/zones", zones_res);
112 let acls: Vec<AclRule> = unwrap_or_empty("acl/rules", acls_res);
113 let dns: Vec<DnsPolicy> = unwrap_or_empty("dns/policies", dns_res);
114 let vouchers: Vec<Voucher> = unwrap_or_empty("vouchers", vouchers_res);
115
116 info!(
117 device_count = devices.len(),
118 "enriching devices with statistics"
119 );
120 let mut devices = {
121 stream::iter(devices.into_iter().map(|mut device| {
122 let integration = Arc::clone(&integration);
123 async move {
124 if let EntityId::Uuid(device_uuid) = &device.id {
125 match integration.get_device_statistics(&sid, device_uuid).await {
126 Ok(stats_resp) => {
127 device.stats =
128 crate::convert::device_stats_from_integration(&stats_resp);
129 crate::convert::enrich_radios_from_stats(
130 &mut device.radios,
131 &stats_resp.interfaces,
132 );
133 }
134 Err(error) => {
135 warn!(device = ?device.name, error = %error, "device stats fetch failed");
136 }
137 }
138 }
139 device
140 }
141 }))
142 .buffer_unordered(REFRESH_DETAIL_CONCURRENCY)
143 .collect::<Vec<_>>()
144 .await
145 };
146
147 #[allow(clippy::type_complexity)]
148 let (
149 session_events,
150 session_health,
151 session_clients,
152 session_devices,
153 session_users,
154 nat,
155 ): (
156 Vec<Event>,
157 Vec<HealthSummary>,
158 Vec<crate::session::models::SessionClientEntry>,
159 Vec<crate::session::models::SessionDevice>,
160 Vec<crate::session::models::SessionUserEntry>,
161 Vec<NatPolicy>,
162 ) = match self.inner.session_client.lock().await.clone() {
163 Some(session) => {
164 let (events_res, health_res, clients_res, devices_res, users_res, nat_res) = tokio::join!(
165 session.list_events(Some(100)),
166 session.get_health(),
167 session.list_clients(),
168 session.list_devices(),
169 session.list_users(),
170 session.list_nat_rules(),
171 );
172
173 let events = match events_res {
174 Ok(raw) => raw.into_iter().map(Event::from).collect(),
175 Err(ref error) if error.is_not_found() => {
176 debug!(
177 auth = ?session.auth(),
178 error = %error,
179 "session event endpoint unavailable; treating as empty"
180 );
181 Vec::new()
182 }
183 Err(error) => {
184 warn!(
185 auth = ?session.auth(),
186 error = %error,
187 "session event fetch failed (non-fatal)"
188 );
189 Vec::new()
190 }
191 };
192
193 let health = match health_res {
194 Ok(raw) => convert_health_summaries(raw),
195 Err(error) => {
196 warn!(error = %error, "session health fetch failed (non-fatal)");
197 Vec::new()
198 }
199 };
200
201 let session_clients = match clients_res {
202 Ok(raw) => raw,
203 Err(error) => {
204 warn!(error = %error, "session client fetch failed (non-fatal)");
205 Vec::new()
206 }
207 };
208
209 let session_devices = match devices_res {
210 Ok(raw) => raw,
211 Err(error) => {
212 warn!(error = %error, "session device fetch failed (non-fatal)");
213 Vec::new()
214 }
215 };
216
217 let session_users = match users_res {
218 Ok(raw) => raw,
219 Err(error) => {
220 warn!(error = %error, "session user fetch failed (non-fatal)");
221 Vec::new()
222 }
223 };
224
225 let nat = match nat_res {
226 Ok(raw) => raw
227 .iter()
228 .filter_map(crate::convert::nat_policy_from_v2)
229 .collect(),
230 Err(error) => {
231 warn!(error = %error, "v2 NAT fetch failed (non-fatal)");
232 Vec::new()
233 }
234 };
235
236 (
237 events,
238 health,
239 session_clients,
240 session_devices,
241 session_users,
242 nat,
243 )
244 }
245 None => (
246 Vec::new(),
247 Vec::new(),
248 Vec::new(),
249 Vec::new(),
250 Vec::new(),
251 Vec::new(),
252 ),
253 };
254
255 if !session_clients.is_empty() {
256 let session_by_ip: HashMap<&str, &crate::session::models::SessionClientEntry> =
257 session_clients
258 .iter()
259 .filter_map(|client| client.ip.as_deref().map(|ip| (ip, client)))
260 .collect();
261 let mut merged = 0u32;
262 for client in &mut clients {
263 let ip_key = client.ip.map(|ip| ip.to_string());
264 if let Some(session_client) =
265 ip_key.as_deref().and_then(|ip| session_by_ip.get(ip))
266 {
267 if client.tx_bytes.is_none() {
268 client.tx_bytes = session_client
269 .tx_bytes
270 .and_then(|bytes| u64::try_from(bytes).ok());
271 }
272 if client.rx_bytes.is_none() {
273 client.rx_bytes = session_client
274 .rx_bytes
275 .and_then(|bytes| u64::try_from(bytes).ok());
276 }
277 if client.hostname.is_none() {
278 client.hostname.clone_from(&session_client.hostname);
279 }
280 if client.wireless.is_none() {
281 let session_client: Client = Client::from((*session_client).clone());
282 client.wireless = session_client.wireless;
283 }
284 if client.uplink_device_mac.is_none() {
285 let uplink = if session_client.is_wired.unwrap_or(true) {
286 session_client.sw_mac.as_deref()
287 } else {
288 session_client.ap_mac.as_deref()
289 };
290 client.uplink_device_mac = uplink.map(MacAddress::new);
291 }
292 merged += 1;
293 }
294 }
295 debug!(
296 total_clients = clients.len(),
297 legacy_available = session_by_ip.len(),
298 merged,
299 "client traffic merge (by IP)"
300 );
301 }
302
303 if !session_users.is_empty() {
304 let users_by_mac: HashMap<String, &crate::session::models::SessionUserEntry> =
305 session_users
306 .iter()
307 .map(|user| (user.mac.to_lowercase(), user))
308 .collect();
309 let mut merged_users = 0u32;
310 for client in &mut clients {
311 let user = users_by_mac
316 .get(&client.mac.as_str().to_lowercase())
317 .or_else(|| {
318 let ip_str = client.ip.map(|ip| ip.to_string())?;
319 let session_client = session_clients
320 .iter()
321 .find(|lc| lc.ip.as_deref() == Some(ip_str.as_str()))?;
322 users_by_mac.get(&session_client.mac.to_lowercase())
323 });
324 if let Some(user) = user {
325 client.use_fixedip = user.use_fixedip.unwrap_or(false);
326 client.fixed_ip = user.fixed_ip.as_deref().and_then(|ip| ip.parse().ok());
327 if client.use_fixedip {
328 merged_users += 1;
329 }
330 }
331 }
332 debug!(
333 users_available = users_by_mac.len(),
334 merged_users, "user DHCP reservation merge"
335 );
336 }
337
338 if !session_devices.is_empty() {
339 let session_by_mac: HashMap<&str, &crate::session::models::SessionDevice> =
340 session_devices
341 .iter()
342 .map(|device| (device.mac.as_str(), device))
343 .collect();
344 for device in &mut devices {
345 if let Some(legacy_device) = session_by_mac.get(device.mac.as_str()) {
346 if device.client_count.is_none() {
347 device.client_count = legacy_device
348 .num_sta
349 .and_then(|count| count.try_into().ok());
350 }
351 if device.wan_ipv6.is_none() {
352 device.wan_ipv6 = parse_session_device_wan_ipv6(&legacy_device.extra);
353 }
354 if device.ports.is_empty() || device.radios.is_empty() {
355 let session_dev: Device = Device::from((*legacy_device).clone());
356 if device.ports.is_empty() && !session_dev.ports.is_empty() {
357 device.ports = session_dev.ports;
358 }
359 if device.radios.is_empty() && !session_dev.radios.is_empty() {
360 device.radios = session_dev.radios;
361 }
362 }
363 }
364 }
365 }
366
367 if !session_health.is_empty() {
368 self.inner
369 .store
370 .site_health
371 .send_modify(|health| *health = Arc::new(session_health));
372 }
373
374 let fresh_legacy_events = unseen_events(self.store(), &session_events);
375
376 self.inner
377 .store
378 .apply_integration_snapshot(crate::store::RefreshSnapshot {
379 devices,
380 clients,
381 networks,
382 wifi,
383 policies,
384 zones,
385 acls,
386 nat,
387 dns,
388 vouchers,
389 sites,
390 events: session_events,
391 traffic_matching_lists,
392 });
393
394 for event in fresh_legacy_events {
395 let _ = self.inner.event_tx.send(Arc::new(event));
396 }
397 } else {
398 let session = self
399 .inner
400 .session_client
401 .lock()
402 .await
403 .clone()
404 .ok_or(CoreError::ControllerDisconnected)?;
405
406 let (devices_res, clients_res, events_res, sites_res) = tokio::join!(
407 session.list_devices(),
408 session.list_clients(),
409 session.list_events(Some(100)),
410 session.list_sites(),
411 );
412
413 let devices: Vec<Device> = devices_res?.into_iter().map(Device::from).collect();
414 let clients: Vec<Client> = clients_res?.into_iter().map(Client::from).collect();
415 let events: Vec<Event> = events_res?.into_iter().map(Event::from).collect();
416 let sites: Vec<Site> = sites_res?.into_iter().map(Site::from).collect();
417 let fresh_events = unseen_events(self.store(), &events);
418
419 self.inner
420 .store
421 .apply_integration_snapshot(crate::store::RefreshSnapshot {
422 devices,
423 clients,
424 networks: Vec::new(),
425 wifi: Vec::new(),
426 policies: Vec::new(),
427 zones: Vec::new(),
428 acls: Vec::new(),
429 nat: Vec::new(),
430 dns: Vec::new(),
431 vouchers: Vec::new(),
432 sites,
433 events,
434 traffic_matching_lists: Vec::new(),
435 });
436
437 for event in fresh_events {
438 let _ = self.inner.event_tx.send(Arc::new(event));
439 }
440 }
441
442 debug!(
443 devices = self.inner.store.device_count(),
444 clients = self.inner.store.client_count(),
445 "data refresh complete"
446 );
447
448 Ok(())
449 }
450}
451
452pub(super) async fn refresh_task(
454 controller: Controller,
455 interval_secs: u64,
456 cancel: CancellationToken,
457) {
458 let mut interval = tokio::time::interval(Duration::from_secs(interval_secs));
459 interval.tick().await;
460
461 loop {
462 tokio::select! {
463 biased;
464 () = cancel.cancelled() => break,
465 _ = interval.tick() => {
466 if let Err(error) = controller.full_refresh().await {
467 warn!(error = %error, "periodic refresh failed");
468 }
469 }
470 }
471 }
472}
473
474fn unwrap_or_empty<S, D>(endpoint: &str, result: Result<Vec<S>, crate::error::Error>) -> Vec<D>
478where
479 D: From<S>,
480{
481 match result {
482 Ok(items) => items.into_iter().map(D::from).collect(),
483 Err(ref error) if error.is_not_found() => {
484 debug!("{endpoint}: not available (404), treating as empty");
485 Vec::new()
486 }
487 Err(error) => {
488 warn!("{endpoint}: unexpected error {error}, treating as empty");
489 Vec::new()
490 }
491 }
492}
493
494fn unseen_events(store: &DataStore, events: &[Event]) -> Vec<Event> {
495 let mut seen: HashSet<String> = store
496 .events_snapshot()
497 .iter()
498 .map(|event| event_storage_key(event))
499 .collect();
500
501 events
502 .iter()
503 .filter(|event| seen.insert(event_storage_key(event)))
504 .cloned()
505 .collect()
506}