Skip to main content

unifly_api/store/
data_store.rs

1// ── Central reactive data store ──
2//
3// Thread-safe, lock-free storage for all UniFi domain entities.
4// Mutations are broadcast to subscribers via `watch` channels.
5
6use std::sync::Arc;
7
8use chrono::{DateTime, Utc};
9use tokio::sync::watch;
10
11use super::collection::EntityCollection;
12use crate::model::{
13    AclRule, Client, Device, DnsPolicy, EntityId, Event, FirewallGroup, FirewallPolicy,
14    FirewallZone, HealthSummary, MacAddress, NatPolicy, Network, Site, TrafficMatchingList,
15    Voucher, WifiBroadcast,
16};
17use crate::stream::EntityStream;
18
19/// Central reactive store for all UniFi domain entities.
20///
21/// Thread-safe and lock-free: all reads are wait-free, writes use
22/// fine-grained per-shard locks within `DashMap`. Mutations are
23/// broadcast to subscribers via `watch` channels.
24pub struct DataStore {
25    pub(crate) devices: EntityCollection<Device>,
26    pub(crate) clients: EntityCollection<Client>,
27    pub(crate) networks: EntityCollection<Network>,
28    pub(crate) wifi_broadcasts: EntityCollection<WifiBroadcast>,
29    pub(crate) firewall_policies: EntityCollection<FirewallPolicy>,
30    pub(crate) firewall_zones: EntityCollection<FirewallZone>,
31    pub(crate) acl_rules: EntityCollection<AclRule>,
32    pub(crate) nat_policies: EntityCollection<NatPolicy>,
33    pub(crate) dns_policies: EntityCollection<DnsPolicy>,
34    pub(crate) vouchers: EntityCollection<Voucher>,
35    pub(crate) sites: EntityCollection<Site>,
36    pub(crate) events: EntityCollection<Event>,
37    pub(crate) traffic_matching_lists: EntityCollection<TrafficMatchingList>,
38    pub(crate) firewall_groups: EntityCollection<FirewallGroup>,
39    pub(crate) site_health: watch::Sender<Arc<Vec<HealthSummary>>>,
40    pub(crate) last_full_refresh: watch::Sender<Option<DateTime<Utc>>>,
41    pub(crate) last_ws_event: watch::Sender<Option<DateTime<Utc>>>,
42}
43
44impl DataStore {
45    pub fn new() -> Self {
46        let (site_health, _) = watch::channel(Arc::new(Vec::new()));
47        let (last_full_refresh, _) = watch::channel(None);
48        let (last_ws_event, _) = watch::channel(None);
49
50        Self {
51            devices: EntityCollection::new(),
52            clients: EntityCollection::new(),
53            networks: EntityCollection::new(),
54            wifi_broadcasts: EntityCollection::new(),
55            firewall_policies: EntityCollection::new(),
56            firewall_zones: EntityCollection::new(),
57            acl_rules: EntityCollection::new(),
58            nat_policies: EntityCollection::new(),
59            dns_policies: EntityCollection::new(),
60            vouchers: EntityCollection::new(),
61            sites: EntityCollection::new(),
62            events: EntityCollection::new(),
63            traffic_matching_lists: EntityCollection::new(),
64            firewall_groups: EntityCollection::new(),
65            site_health,
66            last_full_refresh,
67            last_ws_event,
68        }
69    }
70
71    // ── Snapshot accessors ───────────────────────────────────────────
72
73    pub fn devices_snapshot(&self) -> Arc<Vec<Arc<Device>>> {
74        self.devices.snapshot()
75    }
76
77    pub fn clients_snapshot(&self) -> Arc<Vec<Arc<Client>>> {
78        self.clients.snapshot()
79    }
80
81    pub fn networks_snapshot(&self) -> Arc<Vec<Arc<Network>>> {
82        self.networks.snapshot()
83    }
84
85    pub fn wifi_broadcasts_snapshot(&self) -> Arc<Vec<Arc<WifiBroadcast>>> {
86        self.wifi_broadcasts.snapshot()
87    }
88
89    pub fn firewall_policies_snapshot(&self) -> Arc<Vec<Arc<FirewallPolicy>>> {
90        self.firewall_policies.snapshot()
91    }
92
93    pub fn firewall_zones_snapshot(&self) -> Arc<Vec<Arc<FirewallZone>>> {
94        self.firewall_zones.snapshot()
95    }
96
97    pub fn acl_rules_snapshot(&self) -> Arc<Vec<Arc<AclRule>>> {
98        self.acl_rules.snapshot()
99    }
100
101    pub fn nat_policies_snapshot(&self) -> Arc<Vec<Arc<NatPolicy>>> {
102        self.nat_policies.snapshot()
103    }
104
105    pub fn dns_policies_snapshot(&self) -> Arc<Vec<Arc<DnsPolicy>>> {
106        self.dns_policies.snapshot()
107    }
108
109    pub fn vouchers_snapshot(&self) -> Arc<Vec<Arc<Voucher>>> {
110        self.vouchers.snapshot()
111    }
112
113    pub fn sites_snapshot(&self) -> Arc<Vec<Arc<Site>>> {
114        self.sites.snapshot()
115    }
116
117    pub fn events_snapshot(&self) -> Arc<Vec<Arc<Event>>> {
118        self.events.snapshot()
119    }
120
121    pub fn traffic_matching_lists_snapshot(&self) -> Arc<Vec<Arc<TrafficMatchingList>>> {
122        self.traffic_matching_lists.snapshot()
123    }
124
125    pub fn firewall_groups_snapshot(&self) -> Arc<Vec<Arc<FirewallGroup>>> {
126        self.firewall_groups.snapshot()
127    }
128
129    // ── Single-entity lookups ────────────────────────────────────────
130
131    pub fn device_by_mac(&self, mac: &MacAddress) -> Option<Arc<Device>> {
132        self.devices.get_by_key(mac.as_str())
133    }
134
135    pub fn device_by_id(&self, id: &EntityId) -> Option<Arc<Device>> {
136        self.devices.get_by_id(id)
137    }
138
139    pub fn client_by_mac(&self, mac: &MacAddress) -> Option<Arc<Client>> {
140        self.clients.get_by_key(mac.as_str())
141    }
142
143    pub fn client_by_id(&self, id: &EntityId) -> Option<Arc<Client>> {
144        self.clients.get_by_id(id)
145    }
146
147    pub fn network_by_id(&self, id: &EntityId) -> Option<Arc<Network>> {
148        self.networks.get_by_id(id)
149    }
150
151    // ── Count accessors ──────────────────────────────────────────────
152
153    pub fn device_count(&self) -> usize {
154        self.devices.len()
155    }
156
157    pub fn client_count(&self) -> usize {
158        self.clients.len()
159    }
160
161    pub fn network_count(&self) -> usize {
162        self.networks.len()
163    }
164
165    // ── Subscriptions ────────────────────────────────────────────────
166
167    pub fn subscribe_devices(&self) -> EntityStream<Device> {
168        EntityStream::new(self.devices.subscribe())
169    }
170
171    pub fn subscribe_clients(&self) -> EntityStream<Client> {
172        EntityStream::new(self.clients.subscribe())
173    }
174
175    pub fn subscribe_networks(&self) -> EntityStream<Network> {
176        EntityStream::new(self.networks.subscribe())
177    }
178
179    pub fn subscribe_wifi_broadcasts(&self) -> EntityStream<WifiBroadcast> {
180        EntityStream::new(self.wifi_broadcasts.subscribe())
181    }
182
183    pub fn subscribe_firewall_policies(&self) -> EntityStream<FirewallPolicy> {
184        EntityStream::new(self.firewall_policies.subscribe())
185    }
186
187    pub fn subscribe_firewall_zones(&self) -> EntityStream<FirewallZone> {
188        EntityStream::new(self.firewall_zones.subscribe())
189    }
190
191    pub fn subscribe_acl_rules(&self) -> EntityStream<AclRule> {
192        EntityStream::new(self.acl_rules.subscribe())
193    }
194
195    pub fn subscribe_nat_policies(&self) -> EntityStream<NatPolicy> {
196        EntityStream::new(self.nat_policies.subscribe())
197    }
198
199    pub fn subscribe_dns_policies(&self) -> EntityStream<DnsPolicy> {
200        EntityStream::new(self.dns_policies.subscribe())
201    }
202
203    pub fn subscribe_vouchers(&self) -> EntityStream<Voucher> {
204        EntityStream::new(self.vouchers.subscribe())
205    }
206
207    pub fn subscribe_sites(&self) -> EntityStream<Site> {
208        EntityStream::new(self.sites.subscribe())
209    }
210
211    pub fn subscribe_events(&self) -> EntityStream<Event> {
212        EntityStream::new(self.events.subscribe())
213    }
214
215    pub fn subscribe_traffic_matching_lists(&self) -> EntityStream<TrafficMatchingList> {
216        EntityStream::new(self.traffic_matching_lists.subscribe())
217    }
218
219    pub fn subscribe_firewall_groups(&self) -> EntityStream<FirewallGroup> {
220        EntityStream::new(self.firewall_groups.subscribe())
221    }
222
223    // ── Site health ──────────────────────────────────────────────────
224
225    pub fn site_health_snapshot(&self) -> Arc<Vec<HealthSummary>> {
226        self.site_health.borrow().clone()
227    }
228
229    pub fn subscribe_site_health(&self) -> watch::Receiver<Arc<Vec<HealthSummary>>> {
230        self.site_health.subscribe()
231    }
232
233    // ── Metadata ─────────────────────────────────────────────────────
234
235    pub fn last_full_refresh(&self) -> Option<DateTime<Utc>> {
236        *self.last_full_refresh.borrow()
237    }
238
239    pub fn last_ws_event(&self) -> Option<DateTime<Utc>> {
240        *self.last_ws_event.borrow()
241    }
242
243    pub(crate) fn mark_ws_event(&self, timestamp: DateTime<Utc>) {
244        let _ = self.last_ws_event.send(Some(timestamp));
245    }
246
247    /// How long ago the last full refresh occurred, or `None` if never refreshed.
248    pub fn data_age(&self) -> Option<chrono::Duration> {
249        self.last_full_refresh().map(|t| Utc::now() - t)
250    }
251}
252
253impl Default for DataStore {
254    fn default() -> Self {
255        Self::new()
256    }
257}