1use std::collections::{HashMap, HashSet};
2use std::sync::Arc;
3use std::time::Duration;
4
5use futures_util::stream::{self, StreamExt};
6use tokio_util::sync::CancellationToken;
7use tracing::{debug, info, warn};
8
9use crate::core_error::CoreError;
10use crate::model::{
11 AclRule, Client, Device, DnsPolicy, EntityId, Event, FirewallPolicy, FirewallZone,
12 HealthSummary, NatPolicy, Network, Site, TrafficMatchingList, Voucher, WifiBroadcast,
13};
14use crate::store::{DataStore, event_storage_key};
15
16use super::support::{convert_health_summaries, parse_session_device_wan_ipv6};
17use super::{Controller, REFRESH_DETAIL_CONCURRENCY};
18
19impl Controller {
20 #[allow(clippy::cognitive_complexity, clippy::too_many_lines)]
26 pub async fn full_refresh(&self) -> Result<(), CoreError> {
27 let integration = self.inner.integration_client.lock().await.clone();
28 let site_id = *self.inner.site_id.lock().await;
29
30 if let (Some(integration), Some(sid)) = (integration, site_id) {
31 let page_limit = 200;
32
33 let (devices_res, clients_res, networks_res, wifi_res) = tokio::join!(
34 integration.paginate_all(page_limit, |off, lim| {
35 integration.list_devices(&sid, off, lim)
36 }),
37 integration.paginate_all(page_limit, |off, lim| {
38 integration.list_clients(&sid, off, lim)
39 }),
40 integration.paginate_all(page_limit, |off, lim| {
41 integration.list_networks(&sid, off, lim)
42 }),
43 integration.paginate_all(page_limit, |off, lim| {
44 integration.list_wifi_broadcasts(&sid, off, lim)
45 }),
46 );
47
48 let (policies_res, zones_res, acls_res, dns_res, vouchers_res) = tokio::join!(
49 integration.paginate_all(page_limit, |off, lim| {
50 integration.list_firewall_policies(&sid, off, lim)
51 }),
52 integration.paginate_all(page_limit, |off, lim| {
53 integration.list_firewall_zones(&sid, off, lim)
54 }),
55 integration.paginate_all(page_limit, |off, lim| {
56 integration.list_acl_rules(&sid, off, lim)
57 }),
58 integration.paginate_all(page_limit, |off, lim| {
59 integration.list_dns_policies(&sid, off, lim)
60 }),
61 integration.paginate_all(page_limit, |off, lim| {
62 integration.list_vouchers(&sid, off, lim)
63 }),
64 );
65
66 let (sites_res, tml_res) = tokio::join!(
67 integration.paginate_all(50, |off, lim| integration.list_sites(off, lim)),
68 integration.paginate_all(page_limit, |off, lim| {
69 integration.list_traffic_matching_lists(&sid, off, lim)
70 }),
71 );
72
73 let devices: Vec<Device> = devices_res?.into_iter().map(Device::from).collect();
74 let mut clients: Vec<Client> = clients_res?.into_iter().map(Client::from).collect();
75 let network_ids: Vec<uuid::Uuid> = networks_res?
76 .into_iter()
77 .map(|network| network.id)
78 .collect();
79 info!(
80 network_count = network_ids.len(),
81 "fetching network details"
82 );
83 let networks: Vec<Network> = {
84 stream::iter(network_ids.into_iter().map(|network_id| {
85 let integration = Arc::clone(&integration);
86 async move {
87 match integration.get_network(&sid, &network_id).await {
88 Ok(detail) => Some(Network::from(detail)),
89 Err(error) => {
90 warn!(network_id = %network_id, error = %error, "network detail fetch failed");
91 None
92 }
93 }
94 }
95 }))
96 .buffer_unordered(REFRESH_DETAIL_CONCURRENCY)
97 .filter_map(async move |network| network)
98 .collect::<Vec<_>>()
99 .await
100 };
101 let wifi: Vec<WifiBroadcast> = wifi_res?.into_iter().map(WifiBroadcast::from).collect();
102 let sites: Vec<Site> = sites_res?.into_iter().map(Site::from).collect();
103 let traffic_matching_lists: Vec<TrafficMatchingList> = tml_res?
104 .into_iter()
105 .map(TrafficMatchingList::from)
106 .collect();
107
108 let policies: Vec<FirewallPolicy> = unwrap_or_empty("firewall/policies", policies_res);
110 let zones: Vec<FirewallZone> = unwrap_or_empty("firewall/zones", zones_res);
111 let acls: Vec<AclRule> = unwrap_or_empty("acl/rules", acls_res);
112 let dns: Vec<DnsPolicy> = unwrap_or_empty("dns/policies", dns_res);
113 let vouchers: Vec<Voucher> = unwrap_or_empty("vouchers", vouchers_res);
114
115 info!(
116 device_count = devices.len(),
117 "enriching devices with statistics"
118 );
119 let mut devices = {
120 stream::iter(devices.into_iter().map(|mut device| {
121 let integration = Arc::clone(&integration);
122 async move {
123 if let EntityId::Uuid(device_uuid) = &device.id {
124 match integration.get_device_statistics(&sid, device_uuid).await {
125 Ok(stats_resp) => {
126 device.stats =
127 crate::convert::device_stats_from_integration(&stats_resp);
128 }
129 Err(error) => {
130 warn!(device = ?device.name, error = %error, "device stats fetch failed");
131 }
132 }
133 }
134 device
135 }
136 }))
137 .buffer_unordered(REFRESH_DETAIL_CONCURRENCY)
138 .collect::<Vec<_>>()
139 .await
140 };
141
142 #[allow(clippy::type_complexity)]
143 let (
144 session_events,
145 session_health,
146 session_clients,
147 session_devices,
148 session_users,
149 nat,
150 ): (
151 Vec<Event>,
152 Vec<HealthSummary>,
153 Vec<crate::session::models::SessionClientEntry>,
154 Vec<crate::session::models::SessionDevice>,
155 Vec<crate::session::models::SessionUserEntry>,
156 Vec<NatPolicy>,
157 ) = match self.inner.session_client.lock().await.clone() {
158 Some(session) => {
159 let (events_res, health_res, clients_res, devices_res, users_res, nat_res) = tokio::join!(
160 session.list_events(Some(100)),
161 session.get_health(),
162 session.list_clients(),
163 session.list_devices(),
164 session.list_users(),
165 session.list_nat_rules(),
166 );
167
168 let events = match events_res {
169 Ok(raw) => raw.into_iter().map(Event::from).collect(),
170 Err(ref error) if error.is_not_found() => {
171 debug!(
172 auth = ?session.auth(),
173 error = %error,
174 "session event endpoint unavailable; treating as empty"
175 );
176 Vec::new()
177 }
178 Err(error) => {
179 warn!(
180 auth = ?session.auth(),
181 error = %error,
182 "session event fetch failed (non-fatal)"
183 );
184 Vec::new()
185 }
186 };
187
188 let health = match health_res {
189 Ok(raw) => convert_health_summaries(raw),
190 Err(error) => {
191 warn!(error = %error, "session health fetch failed (non-fatal)");
192 Vec::new()
193 }
194 };
195
196 let session_clients = match clients_res {
197 Ok(raw) => raw,
198 Err(error) => {
199 warn!(error = %error, "session client fetch failed (non-fatal)");
200 Vec::new()
201 }
202 };
203
204 let session_devices = match devices_res {
205 Ok(raw) => raw,
206 Err(error) => {
207 warn!(error = %error, "session device fetch failed (non-fatal)");
208 Vec::new()
209 }
210 };
211
212 let session_users = match users_res {
213 Ok(raw) => raw,
214 Err(error) => {
215 warn!(error = %error, "session user fetch failed (non-fatal)");
216 Vec::new()
217 }
218 };
219
220 let nat = match nat_res {
221 Ok(raw) => raw
222 .iter()
223 .filter_map(crate::convert::nat_policy_from_v2)
224 .collect(),
225 Err(error) => {
226 warn!(error = %error, "v2 NAT fetch failed (non-fatal)");
227 Vec::new()
228 }
229 };
230
231 (
232 events,
233 health,
234 session_clients,
235 session_devices,
236 session_users,
237 nat,
238 )
239 }
240 None => (
241 Vec::new(),
242 Vec::new(),
243 Vec::new(),
244 Vec::new(),
245 Vec::new(),
246 Vec::new(),
247 ),
248 };
249
250 if !session_clients.is_empty() {
251 let session_by_ip: HashMap<&str, &crate::session::models::SessionClientEntry> =
252 session_clients
253 .iter()
254 .filter_map(|client| client.ip.as_deref().map(|ip| (ip, client)))
255 .collect();
256 let mut merged = 0u32;
257 for client in &mut clients {
258 let ip_key = client.ip.map(|ip| ip.to_string());
259 if let Some(session_client) =
260 ip_key.as_deref().and_then(|ip| session_by_ip.get(ip))
261 {
262 if client.tx_bytes.is_none() {
263 client.tx_bytes = session_client
264 .tx_bytes
265 .and_then(|bytes| u64::try_from(bytes).ok());
266 }
267 if client.rx_bytes.is_none() {
268 client.rx_bytes = session_client
269 .rx_bytes
270 .and_then(|bytes| u64::try_from(bytes).ok());
271 }
272 if client.hostname.is_none() {
273 client.hostname.clone_from(&session_client.hostname);
274 }
275 if client.wireless.is_none() {
276 let session_client: Client = Client::from((*session_client).clone());
277 client.wireless = session_client.wireless;
278 if client.uplink_device_mac.is_none() {
279 client.uplink_device_mac = session_client.uplink_device_mac;
280 }
281 }
282 merged += 1;
283 }
284 }
285 debug!(
286 total_clients = clients.len(),
287 legacy_available = session_by_ip.len(),
288 merged,
289 "client traffic merge (by IP)"
290 );
291 }
292
293 if !session_users.is_empty() {
294 let users_by_mac: HashMap<String, &crate::session::models::SessionUserEntry> =
295 session_users
296 .iter()
297 .map(|user| (user.mac.to_lowercase(), user))
298 .collect();
299 let mut merged_users = 0u32;
300 for client in &mut clients {
301 let user = users_by_mac
306 .get(&client.mac.as_str().to_lowercase())
307 .or_else(|| {
308 let ip_str = client.ip.map(|ip| ip.to_string())?;
309 let session_client = session_clients
310 .iter()
311 .find(|lc| lc.ip.as_deref() == Some(ip_str.as_str()))?;
312 users_by_mac.get(&session_client.mac.to_lowercase())
313 });
314 if let Some(user) = user {
315 client.use_fixedip = user.use_fixedip.unwrap_or(false);
316 client.fixed_ip = user.fixed_ip.as_deref().and_then(|ip| ip.parse().ok());
317 if client.use_fixedip {
318 merged_users += 1;
319 }
320 }
321 }
322 debug!(
323 users_available = users_by_mac.len(),
324 merged_users, "user DHCP reservation merge"
325 );
326 }
327
328 if !session_devices.is_empty() {
329 let session_by_mac: HashMap<&str, &crate::session::models::SessionDevice> =
330 session_devices
331 .iter()
332 .map(|device| (device.mac.as_str(), device))
333 .collect();
334 for device in &mut devices {
335 if let Some(legacy_device) = session_by_mac.get(device.mac.as_str()) {
336 if device.client_count.is_none() {
337 device.client_count = legacy_device
338 .num_sta
339 .and_then(|count| count.try_into().ok());
340 }
341 if device.wan_ipv6.is_none() {
342 device.wan_ipv6 = parse_session_device_wan_ipv6(&legacy_device.extra);
343 }
344 }
345 }
346 }
347
348 if !session_health.is_empty() {
349 self.inner
350 .store
351 .site_health
352 .send_modify(|health| *health = Arc::new(session_health));
353 }
354
355 let fresh_legacy_events = unseen_events(self.store(), &session_events);
356
357 self.inner
358 .store
359 .apply_integration_snapshot(crate::store::RefreshSnapshot {
360 devices,
361 clients,
362 networks,
363 wifi,
364 policies,
365 zones,
366 acls,
367 nat,
368 dns,
369 vouchers,
370 sites,
371 events: session_events,
372 traffic_matching_lists,
373 });
374
375 for event in fresh_legacy_events {
376 let _ = self.inner.event_tx.send(Arc::new(event));
377 }
378 } else {
379 let session = self
380 .inner
381 .session_client
382 .lock()
383 .await
384 .clone()
385 .ok_or(CoreError::ControllerDisconnected)?;
386
387 let (devices_res, clients_res, events_res, sites_res) = tokio::join!(
388 session.list_devices(),
389 session.list_clients(),
390 session.list_events(Some(100)),
391 session.list_sites(),
392 );
393
394 let devices: Vec<Device> = devices_res?.into_iter().map(Device::from).collect();
395 let clients: Vec<Client> = clients_res?.into_iter().map(Client::from).collect();
396 let events: Vec<Event> = events_res?.into_iter().map(Event::from).collect();
397 let sites: Vec<Site> = sites_res?.into_iter().map(Site::from).collect();
398 let fresh_events = unseen_events(self.store(), &events);
399
400 self.inner
401 .store
402 .apply_integration_snapshot(crate::store::RefreshSnapshot {
403 devices,
404 clients,
405 networks: Vec::new(),
406 wifi: Vec::new(),
407 policies: Vec::new(),
408 zones: Vec::new(),
409 acls: Vec::new(),
410 nat: Vec::new(),
411 dns: Vec::new(),
412 vouchers: Vec::new(),
413 sites,
414 events,
415 traffic_matching_lists: Vec::new(),
416 });
417
418 for event in fresh_events {
419 let _ = self.inner.event_tx.send(Arc::new(event));
420 }
421 }
422
423 debug!(
424 devices = self.inner.store.device_count(),
425 clients = self.inner.store.client_count(),
426 "data refresh complete"
427 );
428
429 Ok(())
430 }
431}
432
433pub(super) async fn refresh_task(
435 controller: Controller,
436 interval_secs: u64,
437 cancel: CancellationToken,
438) {
439 let mut interval = tokio::time::interval(Duration::from_secs(interval_secs));
440 interval.tick().await;
441
442 loop {
443 tokio::select! {
444 biased;
445 () = cancel.cancelled() => break,
446 _ = interval.tick() => {
447 if let Err(error) = controller.full_refresh().await {
448 warn!(error = %error, "periodic refresh failed");
449 }
450 }
451 }
452 }
453}
454
455fn unwrap_or_empty<S, D>(endpoint: &str, result: Result<Vec<S>, crate::error::Error>) -> Vec<D>
459where
460 D: From<S>,
461{
462 match result {
463 Ok(items) => items.into_iter().map(D::from).collect(),
464 Err(ref error) if error.is_not_found() => {
465 debug!("{endpoint}: not available (404), treating as empty");
466 Vec::new()
467 }
468 Err(error) => {
469 warn!("{endpoint}: unexpected error {error}, treating as empty");
470 Vec::new()
471 }
472 }
473}
474
475fn unseen_events(store: &DataStore, events: &[Event]) -> Vec<Event> {
476 let mut seen: HashSet<String> = store
477 .events_snapshot()
478 .iter()
479 .map(|event| event_storage_key(event))
480 .collect();
481
482 events
483 .iter()
484 .filter(|event| seen.insert(event_storage_key(event)))
485 .cloned()
486 .collect()
487}