Skip to main content

unifly_api/store/
data_store.rs

1// ── Central reactive data store ──
2//
3// Thread-safe, lock-free storage for all UniFi domain entities.
4// Mutations are broadcast to subscribers via `watch` channels.
5
6use std::sync::Arc;
7
8use chrono::{DateTime, Utc};
9use tokio::sync::watch;
10
11use super::collection::EntityCollection;
12use crate::model::{
13    AclRule, Client, Device, DnsPolicy, EntityId, Event, FirewallPolicy, FirewallZone,
14    HealthSummary, MacAddress, Network, Site, TrafficMatchingList, Voucher, WifiBroadcast,
15};
16use crate::stream::EntityStream;
17
18/// Central reactive store for all UniFi domain entities.
19///
20/// Thread-safe and lock-free: all reads are wait-free, writes use
21/// fine-grained per-shard locks within `DashMap`. Mutations are
22/// broadcast to subscribers via `watch` channels.
23pub struct DataStore {
24    pub(crate) devices: EntityCollection<Device>,
25    pub(crate) clients: EntityCollection<Client>,
26    pub(crate) networks: EntityCollection<Network>,
27    pub(crate) wifi_broadcasts: EntityCollection<WifiBroadcast>,
28    pub(crate) firewall_policies: EntityCollection<FirewallPolicy>,
29    pub(crate) firewall_zones: EntityCollection<FirewallZone>,
30    pub(crate) acl_rules: EntityCollection<AclRule>,
31    pub(crate) dns_policies: EntityCollection<DnsPolicy>,
32    pub(crate) vouchers: EntityCollection<Voucher>,
33    pub(crate) sites: EntityCollection<Site>,
34    pub(crate) events: EntityCollection<Event>,
35    pub(crate) traffic_matching_lists: EntityCollection<TrafficMatchingList>,
36    pub(crate) site_health: watch::Sender<Arc<Vec<HealthSummary>>>,
37    pub(crate) last_full_refresh: watch::Sender<Option<DateTime<Utc>>>,
38    pub(crate) last_ws_event: watch::Sender<Option<DateTime<Utc>>>,
39}
40
41impl DataStore {
42    pub fn new() -> Self {
43        let (site_health, _) = watch::channel(Arc::new(Vec::new()));
44        let (last_full_refresh, _) = watch::channel(None);
45        let (last_ws_event, _) = watch::channel(None);
46
47        Self {
48            devices: EntityCollection::new(),
49            clients: EntityCollection::new(),
50            networks: EntityCollection::new(),
51            wifi_broadcasts: EntityCollection::new(),
52            firewall_policies: EntityCollection::new(),
53            firewall_zones: EntityCollection::new(),
54            acl_rules: EntityCollection::new(),
55            dns_policies: EntityCollection::new(),
56            vouchers: EntityCollection::new(),
57            sites: EntityCollection::new(),
58            events: EntityCollection::new(),
59            traffic_matching_lists: EntityCollection::new(),
60            site_health,
61            last_full_refresh,
62            last_ws_event,
63        }
64    }
65
66    // ── Snapshot accessors ───────────────────────────────────────────
67
68    pub fn devices_snapshot(&self) -> Arc<Vec<Arc<Device>>> {
69        self.devices.snapshot()
70    }
71
72    pub fn clients_snapshot(&self) -> Arc<Vec<Arc<Client>>> {
73        self.clients.snapshot()
74    }
75
76    pub fn networks_snapshot(&self) -> Arc<Vec<Arc<Network>>> {
77        self.networks.snapshot()
78    }
79
80    pub fn wifi_broadcasts_snapshot(&self) -> Arc<Vec<Arc<WifiBroadcast>>> {
81        self.wifi_broadcasts.snapshot()
82    }
83
84    pub fn firewall_policies_snapshot(&self) -> Arc<Vec<Arc<FirewallPolicy>>> {
85        self.firewall_policies.snapshot()
86    }
87
88    pub fn firewall_zones_snapshot(&self) -> Arc<Vec<Arc<FirewallZone>>> {
89        self.firewall_zones.snapshot()
90    }
91
92    pub fn acl_rules_snapshot(&self) -> Arc<Vec<Arc<AclRule>>> {
93        self.acl_rules.snapshot()
94    }
95
96    pub fn dns_policies_snapshot(&self) -> Arc<Vec<Arc<DnsPolicy>>> {
97        self.dns_policies.snapshot()
98    }
99
100    pub fn vouchers_snapshot(&self) -> Arc<Vec<Arc<Voucher>>> {
101        self.vouchers.snapshot()
102    }
103
104    pub fn sites_snapshot(&self) -> Arc<Vec<Arc<Site>>> {
105        self.sites.snapshot()
106    }
107
108    pub fn events_snapshot(&self) -> Arc<Vec<Arc<Event>>> {
109        self.events.snapshot()
110    }
111
112    pub fn traffic_matching_lists_snapshot(&self) -> Arc<Vec<Arc<TrafficMatchingList>>> {
113        self.traffic_matching_lists.snapshot()
114    }
115
116    // ── Single-entity lookups ────────────────────────────────────────
117
118    pub fn device_by_mac(&self, mac: &MacAddress) -> Option<Arc<Device>> {
119        self.devices.get_by_key(mac.as_str())
120    }
121
122    pub fn device_by_id(&self, id: &EntityId) -> Option<Arc<Device>> {
123        self.devices.get_by_id(id)
124    }
125
126    pub fn client_by_mac(&self, mac: &MacAddress) -> Option<Arc<Client>> {
127        self.clients.get_by_key(mac.as_str())
128    }
129
130    pub fn client_by_id(&self, id: &EntityId) -> Option<Arc<Client>> {
131        self.clients.get_by_id(id)
132    }
133
134    pub fn network_by_id(&self, id: &EntityId) -> Option<Arc<Network>> {
135        self.networks.get_by_id(id)
136    }
137
138    // ── Count accessors ──────────────────────────────────────────────
139
140    pub fn device_count(&self) -> usize {
141        self.devices.len()
142    }
143
144    pub fn client_count(&self) -> usize {
145        self.clients.len()
146    }
147
148    pub fn network_count(&self) -> usize {
149        self.networks.len()
150    }
151
152    // ── Subscriptions ────────────────────────────────────────────────
153
154    pub fn subscribe_devices(&self) -> EntityStream<Device> {
155        EntityStream::new(self.devices.subscribe())
156    }
157
158    pub fn subscribe_clients(&self) -> EntityStream<Client> {
159        EntityStream::new(self.clients.subscribe())
160    }
161
162    pub fn subscribe_networks(&self) -> EntityStream<Network> {
163        EntityStream::new(self.networks.subscribe())
164    }
165
166    pub fn subscribe_wifi_broadcasts(&self) -> EntityStream<WifiBroadcast> {
167        EntityStream::new(self.wifi_broadcasts.subscribe())
168    }
169
170    pub fn subscribe_firewall_policies(&self) -> EntityStream<FirewallPolicy> {
171        EntityStream::new(self.firewall_policies.subscribe())
172    }
173
174    pub fn subscribe_firewall_zones(&self) -> EntityStream<FirewallZone> {
175        EntityStream::new(self.firewall_zones.subscribe())
176    }
177
178    pub fn subscribe_acl_rules(&self) -> EntityStream<AclRule> {
179        EntityStream::new(self.acl_rules.subscribe())
180    }
181
182    pub fn subscribe_dns_policies(&self) -> EntityStream<DnsPolicy> {
183        EntityStream::new(self.dns_policies.subscribe())
184    }
185
186    pub fn subscribe_vouchers(&self) -> EntityStream<Voucher> {
187        EntityStream::new(self.vouchers.subscribe())
188    }
189
190    pub fn subscribe_sites(&self) -> EntityStream<Site> {
191        EntityStream::new(self.sites.subscribe())
192    }
193
194    pub fn subscribe_events(&self) -> EntityStream<Event> {
195        EntityStream::new(self.events.subscribe())
196    }
197
198    pub fn subscribe_traffic_matching_lists(&self) -> EntityStream<TrafficMatchingList> {
199        EntityStream::new(self.traffic_matching_lists.subscribe())
200    }
201
202    // ── Site health ──────────────────────────────────────────────────
203
204    pub fn site_health_snapshot(&self) -> Arc<Vec<HealthSummary>> {
205        self.site_health.borrow().clone()
206    }
207
208    pub fn subscribe_site_health(&self) -> watch::Receiver<Arc<Vec<HealthSummary>>> {
209        self.site_health.subscribe()
210    }
211
212    // ── Metadata ─────────────────────────────────────────────────────
213
214    pub fn last_full_refresh(&self) -> Option<DateTime<Utc>> {
215        *self.last_full_refresh.borrow()
216    }
217
218    pub fn last_ws_event(&self) -> Option<DateTime<Utc>> {
219        *self.last_ws_event.borrow()
220    }
221
222    /// How long ago the last full refresh occurred, or `None` if never refreshed.
223    pub fn data_age(&self) -> Option<chrono::Duration> {
224        self.last_full_refresh().map(|t| Utc::now() - t)
225    }
226}
227
228impl Default for DataStore {
229    fn default() -> Self {
230        Self::new()
231    }
232}