use std::collections::HashSet;
use chrono::Utc;
use super::DataStore;
use super::collection::EntityCollection;
use crate::model::{
AclRule, Client, Device, DnsPolicy, EntityId, Event, FirewallPolicy, FirewallZone, NatPolicy,
Network, Site, TrafficMatchingList, Voucher, WifiBroadcast,
};
fn upsert_and_prune<T: Clone + Send + Sync + 'static>(
collection: &EntityCollection<T>,
items: Vec<(String, EntityId, T)>,
) {
let _batch = collection.begin_batch();
let incoming_keys: HashSet<String> = items.iter().map(|(k, _, _)| k.clone()).collect();
for (key, id, entity) in items {
collection.upsert(key, id, entity);
}
for existing_key in collection.keys() {
if !incoming_keys.contains(&existing_key) {
collection.remove(&existing_key);
}
}
}
pub(crate) struct RefreshSnapshot {
pub devices: Vec<Device>,
pub clients: Vec<Client>,
pub networks: Vec<Network>,
pub wifi: Vec<WifiBroadcast>,
pub policies: Vec<FirewallPolicy>,
pub zones: Vec<FirewallZone>,
pub acls: Vec<AclRule>,
pub nat: Vec<NatPolicy>,
pub dns: Vec<DnsPolicy>,
pub vouchers: Vec<Voucher>,
pub sites: Vec<Site>,
pub events: Vec<Event>,
pub traffic_matching_lists: Vec<TrafficMatchingList>,
}
pub(crate) fn event_storage_key(event: &Event) -> String {
event.id.as_ref().map_or_else(
|| {
format!(
"evt:{}:{}:{}:{}:{}:{}:{}",
event.timestamp.timestamp_millis(),
event.raw_key.as_deref().unwrap_or_default(),
event.event_type,
event.message,
event
.site_id
.as_ref()
.map(ToString::to_string)
.unwrap_or_default(),
event
.device_mac
.as_ref()
.map(ToString::to_string)
.unwrap_or_default(),
event
.client_mac
.as_ref()
.map(ToString::to_string)
.unwrap_or_default(),
)
},
std::string::ToString::to_string,
)
}
pub(crate) fn event_storage_id(event: &Event, key: &str) -> EntityId {
event
.id
.clone()
.unwrap_or_else(|| EntityId::Legacy(key.to_owned()))
}
impl DataStore {
#[allow(clippy::too_many_lines)]
pub(crate) fn apply_integration_snapshot(&self, snap: RefreshSnapshot) {
upsert_and_prune(
&self.devices,
snap.devices
.into_iter()
.map(|d| {
let key = d.mac.as_str().to_owned();
let id = d.id.clone();
(key, id, d)
})
.collect(),
);
upsert_and_prune(
&self.clients,
snap.clients
.into_iter()
.map(|c| {
let key = c.mac.as_str().to_owned();
let id = c.id.clone();
(key, id, c)
})
.collect(),
);
upsert_and_prune(
&self.networks,
snap.networks
.into_iter()
.map(|n| {
let key = format!("net:{}", n.id);
let id = n.id.clone();
(key, id, n)
})
.collect(),
);
upsert_and_prune(
&self.wifi_broadcasts,
snap.wifi
.into_iter()
.map(|wb| {
let key = format!("wifi:{}", wb.id);
let id = wb.id.clone();
(key, id, wb)
})
.collect(),
);
upsert_and_prune(
&self.firewall_policies,
snap.policies
.into_iter()
.map(|p| {
let key = format!("fwp:{}", p.id);
let id = p.id.clone();
(key, id, p)
})
.collect(),
);
upsert_and_prune(
&self.firewall_zones,
snap.zones
.into_iter()
.map(|z| {
let key = format!("fwz:{}", z.id);
let id = z.id.clone();
(key, id, z)
})
.collect(),
);
upsert_and_prune(
&self.acl_rules,
snap.acls
.into_iter()
.map(|a| {
let key = format!("acl:{}", a.id);
let id = a.id.clone();
(key, id, a)
})
.collect(),
);
upsert_and_prune(
&self.nat_policies,
snap.nat
.into_iter()
.map(|n| {
let key = format!("nat:{}", n.id);
let id = n.id.clone();
(key, id, n)
})
.collect(),
);
upsert_and_prune(
&self.dns_policies,
snap.dns
.into_iter()
.map(|d| {
let key = format!("dns:{}", d.id);
let id = d.id.clone();
(key, id, d)
})
.collect(),
);
upsert_and_prune(
&self.vouchers,
snap.vouchers
.into_iter()
.map(|v| {
let key = format!("vch:{}", v.id);
let id = v.id.clone();
(key, id, v)
})
.collect(),
);
upsert_and_prune(
&self.sites,
snap.sites
.into_iter()
.map(|s| {
let key = format!("site:{}", s.id);
let id = s.id.clone();
(key, id, s)
})
.collect(),
);
upsert_and_prune(
&self.events,
snap.events
.into_iter()
.map(|e| {
let key = event_storage_key(&e);
let id = event_storage_id(&e, &key);
(key, id, e)
})
.collect(),
);
upsert_and_prune(
&self.traffic_matching_lists,
snap.traffic_matching_lists
.into_iter()
.map(|t| {
let key = format!("tml:{}", t.id);
let id = t.id.clone();
(key, id, t)
})
.collect(),
);
let _ = self.last_full_refresh.send(Some(Utc::now()));
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used)]
mod tests {
use super::*;
use crate::model::{EventCategory, EventSeverity};
use chrono::{TimeZone, Utc};
#[test]
fn upsert_and_prune_batches_snapshot_updates() {
let collection: EntityCollection<String> = EntityCollection::new();
collection.upsert("stale".into(), EntityId::from("stale"), "old".into());
let version_rx = collection.version_receiver();
let start_version = *version_rx.borrow();
upsert_and_prune(
&collection,
vec![
("keep-a".into(), EntityId::from("a"), "one".into()),
("keep-b".into(), EntityId::from("b"), "two".into()),
],
);
assert_eq!(*version_rx.borrow(), start_version + 1);
assert_eq!(collection.len(), 2);
assert!(collection.get_by_key("stale").is_none());
assert_eq!(collection.snapshot().len(), 2);
}
#[test]
fn event_snapshot_keeps_distinct_id_less_events_with_same_timestamp() {
let store = DataStore::new();
let timestamp = Utc.with_ymd_and_hms(2026, 1, 1, 0, 0, 0).unwrap();
store.apply_integration_snapshot(RefreshSnapshot {
devices: Vec::new(),
clients: Vec::new(),
networks: Vec::new(),
wifi: Vec::new(),
policies: Vec::new(),
zones: Vec::new(),
acls: Vec::new(),
nat: Vec::new(),
dns: Vec::new(),
vouchers: Vec::new(),
sites: Vec::new(),
events: vec![
Event {
id: None,
timestamp,
category: EventCategory::System,
severity: EventSeverity::Info,
event_type: "EVT_TEST".into(),
message: "first".into(),
device_mac: None,
client_mac: None,
site_id: None,
raw_key: Some("EVT_TEST".into()),
source: crate::model::common::DataSource::SessionApi,
},
Event {
id: None,
timestamp,
category: EventCategory::System,
severity: EventSeverity::Info,
event_type: "EVT_TEST".into(),
message: "second".into(),
device_mac: None,
client_mac: None,
site_id: None,
raw_key: Some("EVT_TEST".into()),
source: crate::model::common::DataSource::SessionApi,
},
],
traffic_matching_lists: Vec::new(),
});
assert_eq!(store.events_snapshot().len(), 2);
}
}