Skip to main content

unifly_api/controller/
subscriptions.rs

1use std::sync::Arc;
2
3use tokio::sync::{broadcast, watch};
4
5use crate::model::{
6    AclRule, Client, Device, DnsPolicy, Event, FirewallGroup, FirewallPolicy, FirewallZone,
7    HealthSummary, NatPolicy, Network, Site, TrafficMatchingList, Voucher, WifiBroadcast,
8};
9use crate::session::SessionAuth;
10use crate::stream::EntityStream;
11
12use super::{ConnectionState, Controller};
13
14impl Controller {
15    // ── State observation ────────────────────────────────────────
16
17    /// Subscribe to connection state changes.
18    pub fn connection_state(&self) -> watch::Receiver<ConnectionState> {
19        self.inner.connection_state.subscribe()
20    }
21
22    /// Subscribe to the event broadcast stream.
23    pub fn events(&self) -> broadcast::Receiver<Arc<Event>> {
24        self.inner.event_tx.subscribe()
25    }
26
27    // ── Snapshot accessors (delegate to DataStore) ───────────────
28
29    pub fn devices_snapshot(&self) -> Arc<Vec<Arc<Device>>> {
30        self.inner.store.devices_snapshot()
31    }
32
33    pub fn clients_snapshot(&self) -> Arc<Vec<Arc<Client>>> {
34        self.inner.store.clients_snapshot()
35    }
36
37    pub fn networks_snapshot(&self) -> Arc<Vec<Arc<Network>>> {
38        self.inner.store.networks_snapshot()
39    }
40
41    pub fn wifi_broadcasts_snapshot(&self) -> Arc<Vec<Arc<WifiBroadcast>>> {
42        self.inner.store.wifi_broadcasts_snapshot()
43    }
44
45    pub fn firewall_policies_snapshot(&self) -> Arc<Vec<Arc<FirewallPolicy>>> {
46        self.inner.store.firewall_policies_snapshot()
47    }
48
49    pub fn firewall_zones_snapshot(&self) -> Arc<Vec<Arc<FirewallZone>>> {
50        self.inner.store.firewall_zones_snapshot()
51    }
52
53    pub fn acl_rules_snapshot(&self) -> Arc<Vec<Arc<AclRule>>> {
54        self.inner.store.acl_rules_snapshot()
55    }
56
57    pub fn nat_policies_snapshot(&self) -> Arc<Vec<Arc<NatPolicy>>> {
58        self.inner.store.nat_policies_snapshot()
59    }
60
61    pub fn dns_policies_snapshot(&self) -> Arc<Vec<Arc<DnsPolicy>>> {
62        self.inner.store.dns_policies_snapshot()
63    }
64
65    pub fn vouchers_snapshot(&self) -> Arc<Vec<Arc<Voucher>>> {
66        self.inner.store.vouchers_snapshot()
67    }
68
69    pub fn sites_snapshot(&self) -> Arc<Vec<Arc<Site>>> {
70        self.inner.store.sites_snapshot()
71    }
72
73    pub fn events_snapshot(&self) -> Arc<Vec<Arc<Event>>> {
74        self.inner.store.events_snapshot()
75    }
76
77    pub fn traffic_matching_lists_snapshot(&self) -> Arc<Vec<Arc<TrafficMatchingList>>> {
78        self.inner.store.traffic_matching_lists_snapshot()
79    }
80
81    pub fn firewall_groups_snapshot(&self) -> Arc<Vec<Arc<FirewallGroup>>> {
82        self.inner.store.firewall_groups_snapshot()
83    }
84
85    // ── Stream accessors (delegate to DataStore) ─────────────────
86
87    pub fn devices(&self) -> EntityStream<Device> {
88        self.inner.store.subscribe_devices()
89    }
90
91    pub fn clients(&self) -> EntityStream<Client> {
92        self.inner.store.subscribe_clients()
93    }
94
95    pub fn networks(&self) -> EntityStream<Network> {
96        self.inner.store.subscribe_networks()
97    }
98
99    pub fn wifi_broadcasts(&self) -> EntityStream<WifiBroadcast> {
100        self.inner.store.subscribe_wifi_broadcasts()
101    }
102
103    pub fn firewall_policies(&self) -> EntityStream<FirewallPolicy> {
104        self.inner.store.subscribe_firewall_policies()
105    }
106
107    pub fn firewall_zones(&self) -> EntityStream<FirewallZone> {
108        self.inner.store.subscribe_firewall_zones()
109    }
110
111    pub fn acl_rules(&self) -> EntityStream<AclRule> {
112        self.inner.store.subscribe_acl_rules()
113    }
114
115    pub fn nat_policies(&self) -> EntityStream<NatPolicy> {
116        self.inner.store.subscribe_nat_policies()
117    }
118
119    pub fn dns_policies(&self) -> EntityStream<DnsPolicy> {
120        self.inner.store.subscribe_dns_policies()
121    }
122
123    pub fn vouchers(&self) -> EntityStream<Voucher> {
124        self.inner.store.subscribe_vouchers()
125    }
126
127    pub fn sites(&self) -> EntityStream<Site> {
128        self.inner.store.subscribe_sites()
129    }
130
131    pub fn traffic_matching_lists(&self) -> EntityStream<TrafficMatchingList> {
132        self.inner.store.subscribe_traffic_matching_lists()
133    }
134
135    pub fn firewall_groups(&self) -> EntityStream<FirewallGroup> {
136        self.inner.store.subscribe_firewall_groups()
137    }
138
139    /// Subscribe to site health updates (WAN IP, latency, bandwidth rates).
140    pub fn site_health(&self) -> watch::Receiver<Arc<Vec<HealthSummary>>> {
141        self.inner.store.subscribe_site_health()
142    }
143
144    /// Drain warnings accumulated during connect (e.g. Session auth failure).
145    pub async fn take_warnings(&self) -> Vec<String> {
146        std::mem::take(&mut *self.inner.warnings.lock().await)
147    }
148
149    /// Whether any Session API client is available.
150    pub async fn has_session_access(&self) -> bool {
151        self.inner.session_client.lock().await.is_some()
152    }
153
154    /// Whether live event streaming is available via a cookie-backed session.
155    pub async fn has_live_event_access(&self) -> bool {
156        self.inner
157            .session_client
158            .lock()
159            .await
160            .as_ref()
161            .is_some_and(|session| session.auth() == SessionAuth::Cookie)
162    }
163
164    /// Whether the Integration API is available for integration-backed features.
165    pub async fn has_integration_access(&self) -> bool {
166        self.inner.integration_client.lock().await.is_some()
167            && self.inner.site_id.lock().await.is_some()
168    }
169}
170
171#[cfg(test)]
172mod tests {
173    use std::sync::Arc;
174
175    use url::Url;
176
177    use super::{Controller, SessionAuth};
178    use crate::config::ControllerConfig;
179    use crate::{ControllerPlatform, SessionClient};
180
181    fn session_client(auth: SessionAuth) -> Arc<SessionClient> {
182        Arc::new(SessionClient::with_client(
183            reqwest::Client::new(),
184            Url::parse("https://controller.example").expect("valid test URL"),
185            "default".into(),
186            ControllerPlatform::ClassicController,
187            auth,
188        ))
189    }
190
191    #[tokio::test]
192    async fn api_key_session_client_has_session_access_but_not_live_event_access() {
193        let controller = Controller::new(ControllerConfig::default());
194        *controller.inner.session_client.lock().await = Some(session_client(SessionAuth::ApiKey));
195
196        assert!(controller.has_session_access().await);
197        assert!(!controller.has_live_event_access().await);
198    }
199
200    #[tokio::test]
201    async fn cookie_session_client_has_live_event_access() {
202        let controller = Controller::new(ControllerConfig::default());
203        *controller.inner.session_client.lock().await = Some(session_client(SessionAuth::Cookie));
204
205        assert!(controller.has_session_access().await);
206        assert!(controller.has_live_event_access().await);
207    }
208}