Skip to main content

unifly_api/controller/
lifecycle.rs

1use std::sync::Arc;
2
3use tokio::sync::mpsc;
4use tokio::task::JoinHandle;
5use tokio_util::sync::CancellationToken;
6use tracing::{debug, info, warn};
7
8use reqwest::header::{HeaderMap, HeaderValue};
9use secrecy::ExposeSecret;
10
11use crate::config::AuthCredentials;
12use crate::core_error::CoreError;
13use crate::websocket::{ReconnectConfig, WebSocketHandle};
14use crate::{IntegrationClient, SessionClient};
15
16use super::support::{build_transport, resolve_site, tls_to_transport};
17use super::{COMMAND_CHANNEL_SIZE, ConnectionState, Controller, refresh};
18
19impl Controller {
20    // ── Connection lifecycle ─────────────────────────────────────
21
22    /// Connect to the controller.
23    ///
24    /// Detects the platform, authenticates, performs an initial data
25    /// refresh, and spawns background tasks (periodic refresh, command
26    /// processor).
27    #[allow(clippy::cognitive_complexity, clippy::too_many_lines)]
28    pub async fn connect(&self) -> Result<(), CoreError> {
29        self.connect_with_refresh(true).await
30    }
31
32    /// Connect to the controller without eagerly loading snapshot data.
33    ///
34    /// Useful for one-shot commands that issue a direct API call and do
35    /// not read from the reactive `DataStore`.
36    pub async fn connect_lightweight(&self) -> Result<(), CoreError> {
37        self.connect_with_refresh(false).await
38    }
39
40    #[allow(clippy::cognitive_complexity, clippy::too_many_lines)]
41    async fn connect_with_refresh(&self, initial_refresh: bool) -> Result<(), CoreError> {
42        let _ = self
43            .inner
44            .connection_state
45            .send(ConnectionState::Connecting);
46
47        // Fresh child token for this connection (supports reconnect).
48        let child = self.inner.cancel.child_token();
49        *self.inner.cancel_child.lock().await = child.clone();
50
51        let config = &self.inner.config;
52        let transport = build_transport(config);
53
54        match &config.auth {
55            AuthCredentials::ApiKey(api_key) => {
56                // Detect platform so we use the right URL prefix
57                let platform = SessionClient::detect_platform(&config.url).await?;
58                debug!(?platform, "detected controller platform");
59
60                // Integration API client (preferred)
61                let integration = IntegrationClient::from_api_key(
62                    config.url.as_str(),
63                    api_key,
64                    &transport,
65                    platform,
66                )?;
67
68                // Resolve site UUID from Integration API.
69                // A 404 here usually means the controller doesn't expose
70                // the Integration API (older or self-hosted UNA without
71                // Settings > Integrations). Surface a targeted hint.
72                let resolved = resolve_site(&integration, &config.site)
73                    .await
74                    .map_err(|e| match &e {
75                        CoreError::Api {
76                            status: Some(404), ..
77                        } => {
78                            debug!(error = %e, "Integration API returned 404 during site resolution");
79                            CoreError::Unsupported {
80                                operation: "API-key authentication".into(),
81                                required: "a controller with the Integration API \
82                                     (Settings > Integrations).\n\
83                                     For older UniFi Network Application installs, \
84                                     use --username/--password instead"
85                                    .into(),
86                            }
87                        }
88                        _ => e,
89                    })?;
90                debug!(site_id = %resolved.id, slug = %resolved.slug, "resolved Integration API site");
91
92                *self.inner.integration_client.lock().await = Some(Arc::new(integration));
93                *self.inner.site_id.lock().await = Some(resolved.id);
94
95                // Also create a session client using the same API key.
96                // UniFi OS accepts X-API-KEY on session endpoints, which
97                // gives us access to /rest/user (DHCP reservations),
98                // /stat/sta (client stats), and health data. Some
99                // legacy routes such as /stat/event vary by controller.
100                let mut headers = HeaderMap::new();
101                let mut key_value =
102                    HeaderValue::from_str(api_key.expose_secret()).map_err(|e| {
103                        CoreError::from(crate::error::Error::Authentication {
104                            message: format!("invalid API key header value: {e}"),
105                        })
106                    })?;
107                key_value.set_sensitive(true);
108                headers.insert("X-API-KEY", key_value);
109                let legacy_http = transport.build_client_with_headers(headers)?;
110                let session = SessionClient::with_client(
111                    legacy_http,
112                    config.url.clone(),
113                    resolved.slug,
114                    platform,
115                    crate::session::client::SessionAuth::ApiKey,
116                );
117                *self.inner.session_client.lock().await = Some(Arc::new(session));
118            }
119            AuthCredentials::Credentials { username, password } => {
120                // Session-only auth
121                let platform = SessionClient::detect_platform(&config.url).await?;
122                debug!(?platform, "detected controller platform");
123
124                let client = SessionClient::new(
125                    config.url.clone(),
126                    config.site.clone(),
127                    platform,
128                    &transport,
129                )?;
130
131                let cache = build_session_cache(config);
132                if let Some(ref cache) = cache {
133                    client
134                        .login_with_cache(username, password, config.totp_token.as_ref(), cache)
135                        .await?;
136                } else {
137                    client
138                        .login(username, password, config.totp_token.as_ref())
139                        .await?;
140                }
141                debug!("session authentication successful");
142
143                *self.inner.session_client.lock().await = Some(Arc::new(client));
144            }
145            AuthCredentials::Hybrid {
146                api_key,
147                username,
148                password,
149            } => {
150                // Hybrid: both Integration API (API key) and Session API (session auth)
151                let platform = SessionClient::detect_platform(&config.url).await?;
152                debug!(?platform, "detected controller platform (hybrid)");
153
154                // Integration API client
155                let integration = IntegrationClient::from_api_key(
156                    config.url.as_str(),
157                    api_key,
158                    &transport,
159                    platform,
160                )?;
161
162                let resolved = resolve_site(&integration, &config.site)
163                    .await
164                    .map_err(|e| match &e {
165                        CoreError::Api {
166                            status: Some(404), ..
167                        } => {
168                            debug!(error = %e, "Integration API returned 404 during site resolution");
169                            CoreError::Unsupported {
170                                operation: "API-key authentication".into(),
171                                required: "a controller with the Integration API \
172                                     (Settings > Integrations).\n\
173                                     For older UniFi Network Application installs, \
174                                     use --username/--password instead"
175                                    .into(),
176                            }
177                        }
178                        _ => e,
179                    })?;
180                debug!(site_id = %resolved.id, slug = %resolved.slug, "resolved Integration API site");
181
182                *self.inner.integration_client.lock().await = Some(Arc::new(integration));
183                *self.inner.site_id.lock().await = Some(resolved.id);
184
185                // Session API client — attempt login but degrade gracefully
186                // if it fails. The Integration API is the primary surface;
187                // Session adds events, stats, and admin ops.
188                match SessionClient::new(config.url.clone(), resolved.slug, platform, &transport) {
189                    Ok(client) => {
190                        let cache = build_session_cache(config);
191                        let login_result = if let Some(ref cache) = cache {
192                            client
193                                .login_with_cache(
194                                    username,
195                                    password,
196                                    config.totp_token.as_ref(),
197                                    cache,
198                                )
199                                .await
200                        } else {
201                            client
202                                .login(username, password, config.totp_token.as_ref())
203                                .await
204                        };
205                        match login_result {
206                            Ok(()) => {
207                                debug!("session authentication successful (hybrid)");
208                                *self.inner.session_client.lock().await = Some(Arc::new(client));
209                            }
210                            Err(e) => {
211                                let msg = format!(
212                                    "Session login failed: {e} — events, health stats, and client traffic will be unavailable"
213                                );
214                                warn!("{msg}");
215                                self.inner.warnings.lock().await.push(msg);
216                            }
217                        }
218                    }
219                    Err(e) => {
220                        let msg = format!("Session client setup failed: {e}");
221                        warn!("{msg}");
222                        self.inner.warnings.lock().await.push(msg);
223                    }
224                }
225            }
226            AuthCredentials::Cloud { api_key, host_id } => {
227                let connector_base = format!(
228                    "{}/v1/connector/consoles/{}",
229                    config.url.as_str().trim_end_matches('/'),
230                    host_id,
231                );
232
233                let integration = IntegrationClient::from_api_key(
234                    &connector_base,
235                    api_key,
236                    &transport,
237                    crate::ControllerPlatform::Cloud,
238                )?;
239
240                let resolved = resolve_site(&integration, &config.site).await?;
241                debug!(site_id = %resolved.id, slug = %resolved.slug, "resolved cloud Integration API site");
242
243                *self.inner.integration_client.lock().await = Some(Arc::new(integration));
244                *self.inner.site_id.lock().await = Some(resolved.id);
245
246                let msg =
247                    "Cloud connector mode active: events watch, Wi-Fi observability, admin/session features, and live WebSocket data are unavailable"
248                        .to_string();
249                self.inner.warnings.lock().await.push(msg);
250            }
251        }
252
253        if initial_refresh {
254            self.full_refresh().await?;
255        }
256
257        // Spawn background tasks
258        let mut handles = self.inner.task_handles.lock().await;
259
260        if let Some(rx) = self.inner.command_rx.lock().await.take() {
261            let ctrl = self.clone();
262            handles.push(tokio::spawn(super::runtime::command_processor_task(
263                ctrl, rx,
264            )));
265        }
266
267        let interval_secs = config.refresh_interval_secs;
268        if interval_secs > 0 {
269            let ctrl = self.clone();
270            let cancel = child.clone();
271            handles.push(tokio::spawn(refresh::refresh_task(
272                ctrl,
273                interval_secs,
274                cancel,
275            )));
276        }
277
278        if config.websocket_enabled {
279            self.spawn_websocket(&child, &mut handles).await;
280        }
281
282        let _ = self.inner.connection_state.send(ConnectionState::Connected);
283        info!("connected to controller");
284        Ok(())
285    }
286
287    /// Spawn the WebSocket event stream and a bridge task that converts
288    /// raw [`UnifiEvent`]s into domain [`Event`]s and broadcasts them.
289    ///
290    /// Non-fatal on failure — the TUI falls back to polling.
291    async fn spawn_websocket(&self, cancel: &CancellationToken, handles: &mut Vec<JoinHandle<()>>) {
292        let Some(session) = self.inner.session_client.lock().await.clone() else {
293            debug!("no session client — WebSocket unavailable");
294            return;
295        };
296
297        let platform = session.platform();
298        let Some(ws_path_template) = platform.websocket_path() else {
299            debug!("platform does not support WebSocket");
300            return;
301        };
302
303        // Use the resolved slug stored on the session client, not the
304        // raw `config.site` -- which may be a display name or UUID that
305        // the WebSocket route does not understand.
306        let ws_path = ws_path_template.replace("{site}", session.site());
307        let base_url = &self.inner.config.url;
308        let scheme = if base_url.scheme() == "https" {
309            "wss"
310        } else {
311            "ws"
312        };
313        let host = base_url.host_str().unwrap_or("localhost");
314        let ws_url_str = match base_url.port() {
315            Some(port) => format!("{scheme}://{host}:{port}{ws_path}"),
316            None => format!("{scheme}://{host}{ws_path}"),
317        };
318        let ws_url = match url::Url::parse(&ws_url_str) {
319            Ok(url) => url,
320            Err(error) => {
321                warn!(error = %error, url = %ws_url_str, "invalid WebSocket URL");
322                return;
323            }
324        };
325
326        let cookie = session.cookie_header();
327
328        if cookie.is_none() {
329            warn!("no session cookie — WebSocket requires session auth (skipping)");
330            return;
331        }
332
333        let ws_tls = tls_to_transport(&self.inner.config.tls);
334        let ws_cancel = cancel.child_token();
335        let handle = match WebSocketHandle::connect(
336            ws_url,
337            ReconnectConfig::default(),
338            ws_cancel.clone(),
339            cookie,
340            ws_tls,
341        ) {
342            Ok(handle) => handle,
343            Err(error) => {
344                warn!(error = %error, "WebSocket connection failed (non-fatal)");
345                return;
346            }
347        };
348
349        // Bridge task: WS events → domain Events → broadcast channel.
350        // Also extracts real-time device stats from `device:sync` messages
351        // to feed the dashboard chart without waiting for full_refresh().
352        let mut ws_rx = handle.subscribe();
353        let event_tx = self.inner.event_tx.clone();
354        let store = Arc::clone(&self.inner.store);
355        let bridge_cancel = ws_cancel;
356
357        handles.push(tokio::spawn(async move {
358            loop {
359                tokio::select! {
360                    biased;
361                    () = bridge_cancel.cancelled() => break,
362                    result = ws_rx.recv() => {
363                        match result {
364                            Ok(ws_event) => {
365                                store.mark_ws_event(chrono::Utc::now());
366
367                                if ws_event.key == "device:sync" || ws_event.key == "device:update" {
368                                    super::runtime::apply_device_sync(&store, &ws_event.extra);
369                                }
370
371                                if ws_event.key.starts_with("EVT_") {
372                                    let event = crate::model::event::Event::from((*ws_event).clone());
373                                    let _ = event_tx.send(Arc::new(event));
374                                }
375                            }
376                            Err(tokio::sync::broadcast::error::RecvError::Lagged(skipped)) => {
377                                warn!(skipped, "WS bridge: receiver lagged");
378                            }
379                            Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
380                        }
381                    }
382                }
383            }
384        }));
385
386        *self.inner.ws_handle.lock().await = Some(handle);
387        info!("WebSocket event stream spawned (handshake in progress)");
388    }
389
390    /// Disconnect from the controller.
391    ///
392    /// Cancels background tasks, logs out if session-based, and resets
393    /// the connection state to [`Disconnected`](ConnectionState::Disconnected).
394    pub async fn disconnect(&self) {
395        self.inner.cancel_child.lock().await.cancel();
396
397        if let Some(handle) = self.inner.ws_handle.lock().await.take() {
398            handle.shutdown();
399        }
400
401        let mut handles = self.inner.task_handles.lock().await;
402        for handle in handles.drain(..) {
403            let _ = handle.await;
404        }
405
406        let session = self.inner.session_client.lock().await.clone();
407
408        // Skip logout when session caching is active — we want the
409        // session cookie to survive for the next CLI invocation.
410        let cache_active = build_session_cache(&self.inner.config).is_some();
411
412        if !cache_active
413            && matches!(
414                self.inner.config.auth,
415                AuthCredentials::Credentials { .. } | AuthCredentials::Hybrid { .. }
416            )
417            && let Some(client) = session
418            && let Err(error) = client.logout().await
419        {
420            warn!(error = %error, "logout failed (non-fatal)");
421        }
422
423        *self.inner.session_client.lock().await = None;
424        *self.inner.integration_client.lock().await = None;
425        *self.inner.site_id.lock().await = None;
426
427        {
428            let (tx, rx) = mpsc::channel(COMMAND_CHANNEL_SIZE);
429            *self.inner.command_tx.lock().await = tx;
430            *self.inner.command_rx.lock().await = Some(rx);
431        }
432
433        let _ = self
434            .inner
435            .connection_state
436            .send(ConnectionState::Disconnected);
437        debug!("disconnected");
438    }
439}
440
441/// Build a `SessionCache` if caching is enabled for this config.
442fn build_session_cache(
443    config: &crate::config::ControllerConfig,
444) -> Option<crate::session::session_cache::SessionCache> {
445    if config.no_session_cache {
446        return None;
447    }
448    let name = config.profile_name.as_deref()?;
449    crate::session::session_cache::SessionCache::new(name, config.url.as_str())
450}