Skip to main content

unifly_api/controller/
lifecycle.rs

1use std::sync::Arc;
2
3use tokio::sync::mpsc;
4use tokio::task::JoinHandle;
5use tokio_util::sync::CancellationToken;
6use tracing::{debug, info, warn};
7
8use reqwest::header::{HeaderMap, HeaderValue};
9use secrecy::ExposeSecret;
10
11use crate::config::AuthCredentials;
12use crate::core_error::CoreError;
13use crate::websocket::{ReconnectConfig, WebSocketHandle};
14use crate::{IntegrationClient, SessionClient};
15
16use super::support::{build_transport, resolve_site_id, tls_to_transport};
17use super::{COMMAND_CHANNEL_SIZE, ConnectionState, Controller, refresh};
18
19impl Controller {
20    // ── Connection lifecycle ─────────────────────────────────────
21
22    /// Connect to the controller.
23    ///
24    /// Detects the platform, authenticates, performs an initial data
25    /// refresh, and spawns background tasks (periodic refresh, command
26    /// processor).
27    #[allow(clippy::cognitive_complexity, clippy::too_many_lines)]
28    pub async fn connect(&self) -> Result<(), CoreError> {
29        let _ = self
30            .inner
31            .connection_state
32            .send(ConnectionState::Connecting);
33
34        // Fresh child token for this connection (supports reconnect).
35        let child = self.inner.cancel.child_token();
36        *self.inner.cancel_child.lock().await = child.clone();
37
38        let config = &self.inner.config;
39        let transport = build_transport(config);
40
41        match &config.auth {
42            AuthCredentials::ApiKey(api_key) => {
43                // Detect platform so we use the right URL prefix
44                let platform = SessionClient::detect_platform(&config.url).await?;
45                debug!(?platform, "detected controller platform");
46
47                // Integration API client (preferred)
48                let integration = IntegrationClient::from_api_key(
49                    config.url.as_str(),
50                    api_key,
51                    &transport,
52                    platform,
53                )?;
54
55                // Resolve site UUID from Integration API.
56                // A 404 here usually means the controller doesn't expose
57                // the Integration API (older or self-hosted UNA without
58                // Settings > Integrations). Surface a targeted hint.
59                let site_id = resolve_site_id(&integration, &config.site)
60                    .await
61                    .map_err(|e| match &e {
62                        CoreError::Api {
63                            status: Some(404), ..
64                        } => {
65                            debug!(error = %e, "Integration API returned 404 during site resolution");
66                            CoreError::Unsupported {
67                                operation: "API-key authentication".into(),
68                                required: "a controller with the Integration API \
69                                     (Settings > Integrations).\n\
70                                     For older UniFi Network Application installs, \
71                                     use --username/--password instead"
72                                    .into(),
73                            }
74                        }
75                        _ => e,
76                    })?;
77                debug!(site_id = %site_id, "resolved Integration API site UUID");
78
79                *self.inner.integration_client.lock().await = Some(Arc::new(integration));
80                *self.inner.site_id.lock().await = Some(site_id);
81
82                // Also create a session client using the same API key.
83                // UniFi OS accepts X-API-KEY on session endpoints, which
84                // gives us access to /rest/user (DHCP reservations),
85                // /stat/sta (client stats), and health data. Some
86                // legacy routes such as /stat/event vary by controller.
87                let mut headers = HeaderMap::new();
88                let mut key_value =
89                    HeaderValue::from_str(api_key.expose_secret()).map_err(|e| {
90                        CoreError::from(crate::error::Error::Authentication {
91                            message: format!("invalid API key header value: {e}"),
92                        })
93                    })?;
94                key_value.set_sensitive(true);
95                headers.insert("X-API-KEY", key_value);
96                let legacy_http = transport.build_client_with_headers(headers)?;
97                let session = SessionClient::with_client(
98                    legacy_http,
99                    config.url.clone(),
100                    config.site.clone(),
101                    platform,
102                    crate::session::client::SessionAuth::ApiKey,
103                );
104                *self.inner.session_client.lock().await = Some(Arc::new(session));
105            }
106            AuthCredentials::Credentials { username, password } => {
107                // Session-only auth
108                let platform = SessionClient::detect_platform(&config.url).await?;
109                debug!(?platform, "detected controller platform");
110
111                let client = SessionClient::new(
112                    config.url.clone(),
113                    config.site.clone(),
114                    platform,
115                    &transport,
116                )?;
117
118                let cache = build_session_cache(config);
119                if let Some(ref cache) = cache {
120                    client
121                        .login_with_cache(username, password, config.totp_token.as_ref(), cache)
122                        .await?;
123                } else {
124                    client
125                        .login(username, password, config.totp_token.as_ref())
126                        .await?;
127                }
128                debug!("session authentication successful");
129
130                *self.inner.session_client.lock().await = Some(Arc::new(client));
131            }
132            AuthCredentials::Hybrid {
133                api_key,
134                username,
135                password,
136            } => {
137                // Hybrid: both Integration API (API key) and Session API (session auth)
138                let platform = SessionClient::detect_platform(&config.url).await?;
139                debug!(?platform, "detected controller platform (hybrid)");
140
141                // Integration API client
142                let integration = IntegrationClient::from_api_key(
143                    config.url.as_str(),
144                    api_key,
145                    &transport,
146                    platform,
147                )?;
148
149                let site_id = resolve_site_id(&integration, &config.site)
150                    .await
151                    .map_err(|e| match &e {
152                        CoreError::Api {
153                            status: Some(404), ..
154                        } => {
155                            debug!(error = %e, "Integration API returned 404 during site resolution");
156                            CoreError::Unsupported {
157                                operation: "API-key authentication".into(),
158                                required: "a controller with the Integration API \
159                                     (Settings > Integrations).\n\
160                                     For older UniFi Network Application installs, \
161                                     use --username/--password instead"
162                                    .into(),
163                            }
164                        }
165                        _ => e,
166                    })?;
167                debug!(site_id = %site_id, "resolved Integration API site UUID");
168
169                *self.inner.integration_client.lock().await = Some(Arc::new(integration));
170                *self.inner.site_id.lock().await = Some(site_id);
171
172                // Session API client — attempt login but degrade gracefully
173                // if it fails. The Integration API is the primary surface;
174                // Session adds events, stats, and admin ops.
175                match SessionClient::new(
176                    config.url.clone(),
177                    config.site.clone(),
178                    platform,
179                    &transport,
180                ) {
181                    Ok(client) => {
182                        let cache = build_session_cache(config);
183                        let login_result = if let Some(ref cache) = cache {
184                            client
185                                .login_with_cache(
186                                    username,
187                                    password,
188                                    config.totp_token.as_ref(),
189                                    cache,
190                                )
191                                .await
192                        } else {
193                            client
194                                .login(username, password, config.totp_token.as_ref())
195                                .await
196                        };
197                        match login_result {
198                            Ok(()) => {
199                                debug!("session authentication successful (hybrid)");
200                                *self.inner.session_client.lock().await = Some(Arc::new(client));
201                            }
202                            Err(e) => {
203                                let msg = format!(
204                                    "Session login failed: {e} — events, health stats, and client traffic will be unavailable"
205                                );
206                                warn!("{msg}");
207                                self.inner.warnings.lock().await.push(msg);
208                            }
209                        }
210                    }
211                    Err(e) => {
212                        let msg = format!("Session client setup failed: {e}");
213                        warn!("{msg}");
214                        self.inner.warnings.lock().await.push(msg);
215                    }
216                }
217            }
218            AuthCredentials::Cloud { api_key, host_id } => {
219                let integration = IntegrationClient::from_api_key(
220                    config.url.as_str(),
221                    api_key,
222                    &transport,
223                    crate::ControllerPlatform::Cloud,
224                )?;
225
226                let site_id = if let Ok(uuid) = uuid::Uuid::parse_str(&config.site) {
227                    uuid
228                } else if let Ok(uuid) = uuid::Uuid::parse_str(host_id) {
229                    uuid
230                } else {
231                    resolve_site_id(&integration, &config.site).await?
232                };
233                debug!(site_id = %site_id, "resolved cloud Integration API site UUID");
234
235                *self.inner.integration_client.lock().await = Some(Arc::new(integration));
236                *self.inner.site_id.lock().await = Some(site_id);
237
238                let msg =
239                    "Cloud auth mode active: Session API and WebSocket features are unavailable"
240                        .to_string();
241                self.inner.warnings.lock().await.push(msg);
242            }
243        }
244
245        // Initial data load
246        self.full_refresh().await?;
247
248        // Spawn background tasks
249        let mut handles = self.inner.task_handles.lock().await;
250
251        if let Some(rx) = self.inner.command_rx.lock().await.take() {
252            let ctrl = self.clone();
253            handles.push(tokio::spawn(super::runtime::command_processor_task(
254                ctrl, rx,
255            )));
256        }
257
258        let interval_secs = config.refresh_interval_secs;
259        if interval_secs > 0 {
260            let ctrl = self.clone();
261            let cancel = child.clone();
262            handles.push(tokio::spawn(refresh::refresh_task(
263                ctrl,
264                interval_secs,
265                cancel,
266            )));
267        }
268
269        if config.websocket_enabled {
270            self.spawn_websocket(&child, &mut handles).await;
271        }
272
273        let _ = self.inner.connection_state.send(ConnectionState::Connected);
274        info!("connected to controller");
275        Ok(())
276    }
277
278    /// Spawn the WebSocket event stream and a bridge task that converts
279    /// raw [`UnifiEvent`]s into domain [`Event`]s and broadcasts them.
280    ///
281    /// Non-fatal on failure — the TUI falls back to polling.
282    async fn spawn_websocket(&self, cancel: &CancellationToken, handles: &mut Vec<JoinHandle<()>>) {
283        let Some(session) = self.inner.session_client.lock().await.clone() else {
284            debug!("no session client — WebSocket unavailable");
285            return;
286        };
287
288        let platform = session.platform();
289        let Some(ws_path_template) = platform.websocket_path() else {
290            debug!("platform does not support WebSocket");
291            return;
292        };
293
294        let ws_path = ws_path_template.replace("{site}", &self.inner.config.site);
295        let base_url = &self.inner.config.url;
296        let scheme = if base_url.scheme() == "https" {
297            "wss"
298        } else {
299            "ws"
300        };
301        let host = base_url.host_str().unwrap_or("localhost");
302        let ws_url_str = match base_url.port() {
303            Some(port) => format!("{scheme}://{host}:{port}{ws_path}"),
304            None => format!("{scheme}://{host}{ws_path}"),
305        };
306        let ws_url = match url::Url::parse(&ws_url_str) {
307            Ok(url) => url,
308            Err(error) => {
309                warn!(error = %error, url = %ws_url_str, "invalid WebSocket URL");
310                return;
311            }
312        };
313
314        let cookie = session.cookie_header();
315
316        if cookie.is_none() {
317            warn!("no session cookie — WebSocket requires session auth (skipping)");
318            return;
319        }
320
321        let ws_tls = tls_to_transport(&self.inner.config.tls);
322        let ws_cancel = cancel.child_token();
323        let handle = match WebSocketHandle::connect(
324            ws_url,
325            ReconnectConfig::default(),
326            ws_cancel.clone(),
327            cookie,
328            ws_tls,
329        ) {
330            Ok(handle) => handle,
331            Err(error) => {
332                warn!(error = %error, "WebSocket connection failed (non-fatal)");
333                return;
334            }
335        };
336
337        // Bridge task: WS events → domain Events → broadcast channel.
338        // Also extracts real-time device stats from `device:sync` messages
339        // to feed the dashboard chart without waiting for full_refresh().
340        let mut ws_rx = handle.subscribe();
341        let event_tx = self.inner.event_tx.clone();
342        let store = Arc::clone(&self.inner.store);
343        let bridge_cancel = ws_cancel;
344
345        handles.push(tokio::spawn(async move {
346            loop {
347                tokio::select! {
348                    biased;
349                    () = bridge_cancel.cancelled() => break,
350                    result = ws_rx.recv() => {
351                        match result {
352                            Ok(ws_event) => {
353                                store.mark_ws_event(chrono::Utc::now());
354
355                                if ws_event.key == "device:sync" || ws_event.key == "device:update" {
356                                    super::runtime::apply_device_sync(&store, &ws_event.extra);
357                                }
358
359                                if ws_event.key.starts_with("EVT_") {
360                                    let event = crate::model::event::Event::from((*ws_event).clone());
361                                    let _ = event_tx.send(Arc::new(event));
362                                }
363                            }
364                            Err(tokio::sync::broadcast::error::RecvError::Lagged(skipped)) => {
365                                warn!(skipped, "WS bridge: receiver lagged");
366                            }
367                            Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
368                        }
369                    }
370                }
371            }
372        }));
373
374        *self.inner.ws_handle.lock().await = Some(handle);
375        info!("WebSocket event stream spawned (handshake in progress)");
376    }
377
378    /// Disconnect from the controller.
379    ///
380    /// Cancels background tasks, logs out if session-based, and resets
381    /// the connection state to [`Disconnected`](ConnectionState::Disconnected).
382    pub async fn disconnect(&self) {
383        self.inner.cancel_child.lock().await.cancel();
384
385        if let Some(handle) = self.inner.ws_handle.lock().await.take() {
386            handle.shutdown();
387        }
388
389        let mut handles = self.inner.task_handles.lock().await;
390        for handle in handles.drain(..) {
391            let _ = handle.await;
392        }
393
394        let session = self.inner.session_client.lock().await.clone();
395
396        // Skip logout when session caching is active — we want the
397        // session cookie to survive for the next CLI invocation.
398        let cache_active = build_session_cache(&self.inner.config).is_some();
399
400        if !cache_active
401            && matches!(
402                self.inner.config.auth,
403                AuthCredentials::Credentials { .. } | AuthCredentials::Hybrid { .. }
404            )
405            && let Some(client) = session
406            && let Err(error) = client.logout().await
407        {
408            warn!(error = %error, "logout failed (non-fatal)");
409        }
410
411        *self.inner.session_client.lock().await = None;
412        *self.inner.integration_client.lock().await = None;
413        *self.inner.site_id.lock().await = None;
414
415        {
416            let (tx, rx) = mpsc::channel(COMMAND_CHANNEL_SIZE);
417            *self.inner.command_tx.lock().await = tx;
418            *self.inner.command_rx.lock().await = Some(rx);
419        }
420
421        let _ = self
422            .inner
423            .connection_state
424            .send(ConnectionState::Disconnected);
425        debug!("disconnected");
426    }
427}
428
429/// Build a `SessionCache` if caching is enabled for this config.
430fn build_session_cache(
431    config: &crate::config::ControllerConfig,
432) -> Option<crate::session::session_cache::SessionCache> {
433    if config.no_session_cache {
434        return None;
435    }
436    let name = config.profile_name.as_deref()?;
437    crate::session::session_cache::SessionCache::new(name, config.url.as_str())
438}