1use std::collections::{HashMap, HashSet};
2use std::sync::Arc;
3use std::time::Duration;
4
5use futures_util::stream::{self, StreamExt};
6use tokio_util::sync::CancellationToken;
7use tracing::{debug, info, warn};
8
9use crate::core_error::CoreError;
10use crate::model::{
11 AclRule, Client, Device, DnsPolicy, EntityId, Event, FirewallGroup, FirewallPolicy,
12 FirewallZone, HealthSummary, MacAddress, NatPolicy, Network, Site, TrafficMatchingList,
13 Voucher, WifiBroadcast,
14};
15use crate::store::{DataStore, event_storage_key};
16
17use super::support::{convert_health_summaries, parse_session_device_wan_ipv6};
18use super::{Controller, REFRESH_DETAIL_CONCURRENCY};
19
20impl Controller {
21 #[allow(clippy::cognitive_complexity, clippy::too_many_lines)]
27 pub async fn full_refresh(&self) -> Result<(), CoreError> {
28 let integration = self.inner.integration_client.lock().await.clone();
29 let site_id = *self.inner.site_id.lock().await;
30
31 if let (Some(integration), Some(sid)) = (integration, site_id) {
32 let page_limit = 200;
33
34 let (devices_res, clients_res, networks_res, wifi_res) = tokio::join!(
35 integration.paginate_all(page_limit, |off, lim| {
36 integration.list_devices(&sid, off, lim)
37 }),
38 integration.paginate_all(page_limit, |off, lim| {
39 integration.list_clients(&sid, off, lim)
40 }),
41 integration.paginate_all(page_limit, |off, lim| {
42 integration.list_networks(&sid, off, lim)
43 }),
44 integration.paginate_all(page_limit, |off, lim| {
45 integration.list_wifi_broadcasts(&sid, off, lim)
46 }),
47 );
48
49 let (policies_res, zones_res, acls_res, dns_res, vouchers_res) = tokio::join!(
50 integration.paginate_all(page_limit, |off, lim| {
51 integration.list_firewall_policies(&sid, off, lim)
52 }),
53 integration.paginate_all(page_limit, |off, lim| {
54 integration.list_firewall_zones(&sid, off, lim)
55 }),
56 integration.paginate_all(page_limit, |off, lim| {
57 integration.list_acl_rules(&sid, off, lim)
58 }),
59 integration.paginate_all(page_limit, |off, lim| {
60 integration.list_dns_policies(&sid, off, lim)
61 }),
62 integration.paginate_all(page_limit, |off, lim| {
63 integration.list_vouchers(&sid, off, lim)
64 }),
65 );
66
67 let (sites_res, tml_res) = tokio::join!(
68 integration.paginate_all(50, |off, lim| integration.list_sites(off, lim)),
69 integration.paginate_all(page_limit, |off, lim| {
70 integration.list_traffic_matching_lists(&sid, off, lim)
71 }),
72 );
73
74 let devices: Vec<Device> = devices_res?.into_iter().map(Device::from).collect();
75 let mut clients: Vec<Client> = clients_res?.into_iter().map(Client::from).collect();
76 let network_ids: Vec<uuid::Uuid> = networks_res?
77 .into_iter()
78 .map(|network| network.id)
79 .collect();
80 info!(
81 network_count = network_ids.len(),
82 "fetching network details"
83 );
84 let networks: Vec<Network> = {
85 stream::iter(network_ids.into_iter().map(|network_id| {
86 let integration = Arc::clone(&integration);
87 async move {
88 match integration.get_network(&sid, &network_id).await {
89 Ok(detail) => Some(Network::from(detail)),
90 Err(error) => {
91 warn!(network_id = %network_id, error = %error, "network detail fetch failed");
92 None
93 }
94 }
95 }
96 }))
97 .buffer_unordered(REFRESH_DETAIL_CONCURRENCY)
98 .filter_map(async move |network| network)
99 .collect::<Vec<_>>()
100 .await
101 };
102 let wifi: Vec<WifiBroadcast> = wifi_res?.into_iter().map(WifiBroadcast::from).collect();
103 let sites: Vec<Site> = sites_res?.into_iter().map(Site::from).collect();
104 let traffic_matching_lists: Vec<TrafficMatchingList> = tml_res?
105 .into_iter()
106 .map(TrafficMatchingList::from)
107 .collect();
108
109 let policies: Vec<FirewallPolicy> = unwrap_or_empty("firewall/policies", policies_res);
111 let zones: Vec<FirewallZone> = unwrap_or_empty("firewall/zones", zones_res);
112 let acls: Vec<AclRule> = unwrap_or_empty("acl/rules", acls_res);
113 let dns: Vec<DnsPolicy> = unwrap_or_empty("dns/policies", dns_res);
114 let vouchers: Vec<Voucher> = unwrap_or_empty("vouchers", vouchers_res);
115
116 info!(
117 device_count = devices.len(),
118 "enriching devices with statistics"
119 );
120 let mut devices = {
121 stream::iter(devices.into_iter().map(|mut device| {
122 let integration = Arc::clone(&integration);
123 async move {
124 if let EntityId::Uuid(device_uuid) = &device.id {
125 match integration.get_device_statistics(&sid, device_uuid).await {
126 Ok(stats_resp) => {
127 device.stats =
128 crate::convert::device_stats_from_integration(&stats_resp);
129 crate::convert::enrich_radios_from_stats(
130 &mut device.radios,
131 &stats_resp.interfaces,
132 );
133 }
134 Err(error) => {
135 warn!(device = ?device.name, error = %error, "device stats fetch failed");
136 }
137 }
138 }
139 device
140 }
141 }))
142 .buffer_unordered(REFRESH_DETAIL_CONCURRENCY)
143 .collect::<Vec<_>>()
144 .await
145 };
146
147 #[allow(clippy::type_complexity)]
148 let (
149 session_events,
150 session_health,
151 session_clients,
152 session_devices,
153 session_users,
154 nat,
155 firewall_groups,
156 ): (
157 Vec<Event>,
158 Vec<HealthSummary>,
159 Vec<crate::session::models::SessionClientEntry>,
160 Vec<crate::session::models::SessionDevice>,
161 Vec<crate::session::models::SessionUserEntry>,
162 Vec<NatPolicy>,
163 Vec<FirewallGroup>,
164 ) = match self.inner.session_client.lock().await.clone() {
165 Some(session) => {
166 let (
167 events_res,
168 health_res,
169 clients_res,
170 devices_res,
171 users_res,
172 nat_res,
173 fwg_res,
174 ) = tokio::join!(
175 session.list_events(Some(100)),
176 session.get_health(),
177 session.list_clients(),
178 session.list_devices(),
179 session.list_users(),
180 session.list_nat_rules(),
181 session.list_firewall_groups(),
182 );
183
184 let events = match events_res {
185 Ok(raw) => raw.into_iter().map(Event::from).collect(),
186 Err(ref error) if error.is_not_found() => {
187 debug!(
188 auth = ?session.auth(),
189 error = %error,
190 "session event endpoint unavailable; treating as empty"
191 );
192 Vec::new()
193 }
194 Err(error) => {
195 warn!(
196 auth = ?session.auth(),
197 error = %error,
198 "session event fetch failed (non-fatal)"
199 );
200 Vec::new()
201 }
202 };
203
204 let health = match health_res {
205 Ok(raw) => convert_health_summaries(raw),
206 Err(error) => {
207 warn!(error = %error, "session health fetch failed (non-fatal)");
208 Vec::new()
209 }
210 };
211
212 let session_clients = match clients_res {
213 Ok(raw) => raw,
214 Err(error) => {
215 warn!(error = %error, "session client fetch failed (non-fatal)");
216 Vec::new()
217 }
218 };
219
220 let session_devices = match devices_res {
221 Ok(raw) => raw,
222 Err(error) => {
223 warn!(error = %error, "session device fetch failed (non-fatal)");
224 Vec::new()
225 }
226 };
227
228 let session_users = match users_res {
229 Ok(raw) => raw,
230 Err(error) => {
231 warn!(error = %error, "session user fetch failed (non-fatal)");
232 Vec::new()
233 }
234 };
235
236 let nat = match nat_res {
237 Ok(raw) => raw
238 .iter()
239 .filter_map(crate::convert::nat_policy_from_v2)
240 .collect(),
241 Err(error) => {
242 warn!(error = %error, "v2 NAT fetch failed (non-fatal)");
243 Vec::new()
244 }
245 };
246
247 let firewall_groups = match fwg_res {
248 Ok(raw) => raw
249 .iter()
250 .filter_map(crate::convert::firewall_group_from_session)
251 .collect(),
252 Err(error) => {
253 warn!(error = %error, "firewall group fetch failed (non-fatal)");
254 Vec::new()
255 }
256 };
257
258 (
259 events,
260 health,
261 session_clients,
262 session_devices,
263 session_users,
264 nat,
265 firewall_groups,
266 )
267 }
268 None => (
269 Vec::new(),
270 Vec::new(),
271 Vec::new(),
272 Vec::new(),
273 Vec::new(),
274 Vec::new(),
275 Vec::new(),
276 ),
277 };
278
279 if !session_clients.is_empty() {
280 let session_by_ip: HashMap<&str, &crate::session::models::SessionClientEntry> =
281 session_clients
282 .iter()
283 .filter_map(|client| client.ip.as_deref().map(|ip| (ip, client)))
284 .collect();
285 let mut merged = 0u32;
286 for client in &mut clients {
287 let ip_key = client.ip.map(|ip| ip.to_string());
288 if let Some(session_client) =
289 ip_key.as_deref().and_then(|ip| session_by_ip.get(ip))
290 {
291 if client.tx_bytes.is_none() {
292 client.tx_bytes = session_client
293 .tx_bytes
294 .and_then(|bytes| u64::try_from(bytes).ok());
295 }
296 if client.rx_bytes.is_none() {
297 client.rx_bytes = session_client
298 .rx_bytes
299 .and_then(|bytes| u64::try_from(bytes).ok());
300 }
301 if client.hostname.is_none() {
302 client.hostname.clone_from(&session_client.hostname);
303 }
304 if client.wireless.is_none() {
305 let session_client: Client = Client::from((*session_client).clone());
306 client.wireless = session_client.wireless;
307 }
308 let session_is_wired = session_client.is_wired.unwrap_or(false);
311 if client.uplink_device_mac.is_none() {
312 let uplink = if session_is_wired {
313 session_client.sw_mac.as_deref()
314 } else {
315 session_client.ap_mac.as_deref()
316 };
317 client.uplink_device_mac = uplink.map(MacAddress::new);
318 }
319 if client.switch_port.is_none() && session_is_wired {
320 client.switch_port =
321 session_client.sw_port.and_then(|p| u32::try_from(p).ok());
322 }
323 merged += 1;
324 }
325 }
326 debug!(
327 total_clients = clients.len(),
328 legacy_available = session_by_ip.len(),
329 merged,
330 "client traffic merge (by IP)"
331 );
332 }
333
334 if !session_users.is_empty() {
335 let users_by_mac: HashMap<String, &crate::session::models::SessionUserEntry> =
336 session_users
337 .iter()
338 .map(|user| (user.mac.to_lowercase(), user))
339 .collect();
340 let mut merged_users = 0u32;
341 for client in &mut clients {
342 let user = users_by_mac
347 .get(&client.mac.as_str().to_lowercase())
348 .or_else(|| {
349 let ip_str = client.ip.map(|ip| ip.to_string())?;
350 let session_client = session_clients
351 .iter()
352 .find(|lc| lc.ip.as_deref() == Some(ip_str.as_str()))?;
353 users_by_mac.get(&session_client.mac.to_lowercase())
354 });
355 if let Some(user) = user {
356 client.use_fixedip = user.use_fixedip.unwrap_or(false);
357 client.fixed_ip = user.fixed_ip.as_deref().and_then(|ip| ip.parse().ok());
358 if client.use_fixedip {
359 merged_users += 1;
360 }
361 }
362 }
363 debug!(
364 users_available = users_by_mac.len(),
365 merged_users, "user DHCP reservation merge"
366 );
367 }
368
369 if !session_devices.is_empty() {
370 let session_by_mac: HashMap<&str, &crate::session::models::SessionDevice> =
371 session_devices
372 .iter()
373 .map(|device| (device.mac.as_str(), device))
374 .collect();
375 for device in &mut devices {
376 if let Some(legacy_device) = session_by_mac.get(device.mac.as_str()) {
377 if device.client_count.is_none() {
378 device.client_count = legacy_device
379 .num_sta
380 .and_then(|count| count.try_into().ok());
381 }
382 if device.wan_ipv6.is_none() {
383 device.wan_ipv6 = parse_session_device_wan_ipv6(&legacy_device.extra);
384 }
385 if device.ports.is_empty()
386 || device.radios.is_empty()
387 || device.uplink_device_mac.is_none()
388 || device.uplink_port_idx.is_none()
389 {
390 let session_dev: Device = Device::from((*legacy_device).clone());
391 if device.ports.is_empty() && !session_dev.ports.is_empty() {
392 device.ports = session_dev.ports;
393 }
394 if device.radios.is_empty() && !session_dev.radios.is_empty() {
395 device.radios = session_dev.radios;
396 }
397 if device.uplink_device_mac.is_none() {
398 device.uplink_device_mac = session_dev.uplink_device_mac;
399 }
400 if device.uplink_port_idx.is_none() {
401 device.uplink_port_idx = session_dev.uplink_port_idx;
402 }
403 }
404 }
405 }
406 }
407
408 if !session_health.is_empty() {
409 self.inner
410 .store
411 .site_health
412 .send_modify(|health| *health = Arc::new(session_health));
413 }
414
415 let fresh_legacy_events = unseen_events(self.store(), &session_events);
416
417 self.inner
418 .store
419 .apply_integration_snapshot(crate::store::RefreshSnapshot {
420 devices,
421 clients,
422 networks,
423 wifi,
424 policies,
425 zones,
426 acls,
427 nat,
428 dns,
429 vouchers,
430 sites,
431 events: session_events,
432 traffic_matching_lists,
433 firewall_groups,
434 });
435
436 for event in fresh_legacy_events {
437 let _ = self.inner.event_tx.send(Arc::new(event));
438 }
439 } else {
440 let session = self
441 .inner
442 .session_client
443 .lock()
444 .await
445 .clone()
446 .ok_or(CoreError::ControllerDisconnected)?;
447
448 let (devices_res, clients_res, events_res, sites_res, fwg_res) = tokio::join!(
449 session.list_devices(),
450 session.list_clients(),
451 session.list_events(Some(100)),
452 session.list_sites(),
453 session.list_firewall_groups(),
454 );
455
456 let devices: Vec<Device> = devices_res?.into_iter().map(Device::from).collect();
457 let clients: Vec<Client> = clients_res?.into_iter().map(Client::from).collect();
458 let events: Vec<Event> = events_res?.into_iter().map(Event::from).collect();
459 let sites: Vec<Site> = sites_res?.into_iter().map(Site::from).collect();
460 let firewall_groups = match fwg_res {
461 Ok(raw) => raw
462 .iter()
463 .filter_map(crate::convert::firewall_group_from_session)
464 .collect(),
465 Err(error) => {
466 warn!(error = %error, "firewall group fetch failed (non-fatal)");
467 Vec::new()
468 }
469 };
470 let fresh_events = unseen_events(self.store(), &events);
471
472 self.inner
473 .store
474 .apply_integration_snapshot(crate::store::RefreshSnapshot {
475 devices,
476 clients,
477 networks: Vec::new(),
478 wifi: Vec::new(),
479 policies: Vec::new(),
480 zones: Vec::new(),
481 acls: Vec::new(),
482 nat: Vec::new(),
483 dns: Vec::new(),
484 vouchers: Vec::new(),
485 sites,
486 events,
487 traffic_matching_lists: Vec::new(),
488 firewall_groups,
489 });
490
491 for event in fresh_events {
492 let _ = self.inner.event_tx.send(Arc::new(event));
493 }
494 }
495
496 debug!(
497 devices = self.inner.store.device_count(),
498 clients = self.inner.store.client_count(),
499 "data refresh complete"
500 );
501
502 Ok(())
503 }
504}
505
506pub(super) async fn refresh_task(
508 controller: Controller,
509 interval_secs: u64,
510 cancel: CancellationToken,
511) {
512 let mut interval = tokio::time::interval(Duration::from_secs(interval_secs));
513 interval.tick().await;
514
515 loop {
516 tokio::select! {
517 biased;
518 () = cancel.cancelled() => break,
519 _ = interval.tick() => {
520 if let Err(error) = controller.full_refresh().await {
521 warn!(error = %error, "periodic refresh failed");
522 }
523 }
524 }
525 }
526}
527
528fn unwrap_or_empty<S, D>(endpoint: &str, result: Result<Vec<S>, crate::error::Error>) -> Vec<D>
532where
533 D: From<S>,
534{
535 match result {
536 Ok(items) => items.into_iter().map(D::from).collect(),
537 Err(ref error) if error.is_not_found() => {
538 debug!("{endpoint}: not available (404), treating as empty");
539 Vec::new()
540 }
541 Err(error) => {
542 warn!("{endpoint}: unexpected error {error}, treating as empty");
543 Vec::new()
544 }
545 }
546}
547
548fn unseen_events(store: &DataStore, events: &[Event]) -> Vec<Event> {
549 let mut seen: HashSet<String> = store
550 .events_snapshot()
551 .iter()
552 .map(|event| event_storage_key(event))
553 .collect();
554
555 events
556 .iter()
557 .filter(|event| seen.insert(event_storage_key(event)))
558 .cloned()
559 .collect()
560}