1use std::sync::Arc;
2
3use tokio::sync::mpsc;
4use tokio::task::JoinHandle;
5use tokio_util::sync::CancellationToken;
6use tracing::{debug, info, warn};
7
8use reqwest::header::{HeaderMap, HeaderValue};
9use secrecy::ExposeSecret;
10
11use crate::config::AuthCredentials;
12use crate::core_error::CoreError;
13use crate::websocket::{ReconnectConfig, WebSocketHandle};
14use crate::{IntegrationClient, SessionClient};
15
16use super::support::{build_transport, resolve_site_id, tls_to_transport};
17use super::{COMMAND_CHANNEL_SIZE, ConnectionState, Controller, refresh};
18
19impl Controller {
20 #[allow(clippy::cognitive_complexity, clippy::too_many_lines)]
28 pub async fn connect(&self) -> Result<(), CoreError> {
29 let _ = self
30 .inner
31 .connection_state
32 .send(ConnectionState::Connecting);
33
34 let child = self.inner.cancel.child_token();
36 *self.inner.cancel_child.lock().await = child.clone();
37
38 let config = &self.inner.config;
39 let transport = build_transport(config);
40
41 match &config.auth {
42 AuthCredentials::ApiKey(api_key) => {
43 let platform = SessionClient::detect_platform(&config.url).await?;
45 debug!(?platform, "detected controller platform");
46
47 let integration = IntegrationClient::from_api_key(
49 config.url.as_str(),
50 api_key,
51 &transport,
52 platform,
53 )?;
54
55 let site_id = resolve_site_id(&integration, &config.site)
60 .await
61 .map_err(|e| match &e {
62 CoreError::Api {
63 status: Some(404), ..
64 } => {
65 debug!(error = %e, "Integration API returned 404 during site resolution");
66 CoreError::Unsupported {
67 operation: "API-key authentication".into(),
68 required: "a controller with the Integration API \
69 (Settings > Integrations).\n\
70 For older UniFi Network Application installs, \
71 use --username/--password instead"
72 .into(),
73 }
74 }
75 _ => e,
76 })?;
77 debug!(site_id = %site_id, "resolved Integration API site UUID");
78
79 *self.inner.integration_client.lock().await = Some(Arc::new(integration));
80 *self.inner.site_id.lock().await = Some(site_id);
81
82 let mut headers = HeaderMap::new();
88 let mut key_value =
89 HeaderValue::from_str(api_key.expose_secret()).map_err(|e| {
90 CoreError::from(crate::error::Error::Authentication {
91 message: format!("invalid API key header value: {e}"),
92 })
93 })?;
94 key_value.set_sensitive(true);
95 headers.insert("X-API-KEY", key_value);
96 let legacy_http = transport.build_client_with_headers(headers)?;
97 let session = SessionClient::with_client(
98 legacy_http,
99 config.url.clone(),
100 config.site.clone(),
101 platform,
102 crate::session::client::SessionAuth::ApiKey,
103 );
104 *self.inner.session_client.lock().await = Some(Arc::new(session));
105 }
106 AuthCredentials::Credentials { username, password } => {
107 let platform = SessionClient::detect_platform(&config.url).await?;
109 debug!(?platform, "detected controller platform");
110
111 let client = SessionClient::new(
112 config.url.clone(),
113 config.site.clone(),
114 platform,
115 &transport,
116 )?;
117
118 let cache = build_session_cache(config);
119 if let Some(ref cache) = cache {
120 client
121 .login_with_cache(username, password, config.totp_token.as_ref(), cache)
122 .await?;
123 } else {
124 client
125 .login(username, password, config.totp_token.as_ref())
126 .await?;
127 }
128 debug!("session authentication successful");
129
130 *self.inner.session_client.lock().await = Some(Arc::new(client));
131 }
132 AuthCredentials::Hybrid {
133 api_key,
134 username,
135 password,
136 } => {
137 let platform = SessionClient::detect_platform(&config.url).await?;
139 debug!(?platform, "detected controller platform (hybrid)");
140
141 let integration = IntegrationClient::from_api_key(
143 config.url.as_str(),
144 api_key,
145 &transport,
146 platform,
147 )?;
148
149 let site_id = resolve_site_id(&integration, &config.site)
150 .await
151 .map_err(|e| match &e {
152 CoreError::Api {
153 status: Some(404), ..
154 } => {
155 debug!(error = %e, "Integration API returned 404 during site resolution");
156 CoreError::Unsupported {
157 operation: "API-key authentication".into(),
158 required: "a controller with the Integration API \
159 (Settings > Integrations).\n\
160 For older UniFi Network Application installs, \
161 use --username/--password instead"
162 .into(),
163 }
164 }
165 _ => e,
166 })?;
167 debug!(site_id = %site_id, "resolved Integration API site UUID");
168
169 *self.inner.integration_client.lock().await = Some(Arc::new(integration));
170 *self.inner.site_id.lock().await = Some(site_id);
171
172 match SessionClient::new(
176 config.url.clone(),
177 config.site.clone(),
178 platform,
179 &transport,
180 ) {
181 Ok(client) => {
182 let cache = build_session_cache(config);
183 let login_result = if let Some(ref cache) = cache {
184 client
185 .login_with_cache(
186 username,
187 password,
188 config.totp_token.as_ref(),
189 cache,
190 )
191 .await
192 } else {
193 client
194 .login(username, password, config.totp_token.as_ref())
195 .await
196 };
197 match login_result {
198 Ok(()) => {
199 debug!("session authentication successful (hybrid)");
200 *self.inner.session_client.lock().await = Some(Arc::new(client));
201 }
202 Err(e) => {
203 let msg = format!(
204 "Session login failed: {e} — events, health stats, and client traffic will be unavailable"
205 );
206 warn!("{msg}");
207 self.inner.warnings.lock().await.push(msg);
208 }
209 }
210 }
211 Err(e) => {
212 let msg = format!("Session client setup failed: {e}");
213 warn!("{msg}");
214 self.inner.warnings.lock().await.push(msg);
215 }
216 }
217 }
218 AuthCredentials::Cloud { api_key, host_id } => {
219 let integration = IntegrationClient::from_api_key(
220 config.url.as_str(),
221 api_key,
222 &transport,
223 crate::ControllerPlatform::Cloud,
224 )?;
225
226 let site_id = if let Ok(uuid) = uuid::Uuid::parse_str(&config.site) {
227 uuid
228 } else if let Ok(uuid) = uuid::Uuid::parse_str(host_id) {
229 uuid
230 } else {
231 resolve_site_id(&integration, &config.site).await?
232 };
233 debug!(site_id = %site_id, "resolved cloud Integration API site UUID");
234
235 *self.inner.integration_client.lock().await = Some(Arc::new(integration));
236 *self.inner.site_id.lock().await = Some(site_id);
237
238 let msg =
239 "Cloud auth mode active: Session API and WebSocket features are unavailable"
240 .to_string();
241 self.inner.warnings.lock().await.push(msg);
242 }
243 }
244
245 self.full_refresh().await?;
247
248 let mut handles = self.inner.task_handles.lock().await;
250
251 if let Some(rx) = self.inner.command_rx.lock().await.take() {
252 let ctrl = self.clone();
253 handles.push(tokio::spawn(super::runtime::command_processor_task(
254 ctrl, rx,
255 )));
256 }
257
258 let interval_secs = config.refresh_interval_secs;
259 if interval_secs > 0 {
260 let ctrl = self.clone();
261 let cancel = child.clone();
262 handles.push(tokio::spawn(refresh::refresh_task(
263 ctrl,
264 interval_secs,
265 cancel,
266 )));
267 }
268
269 if config.websocket_enabled {
270 self.spawn_websocket(&child, &mut handles).await;
271 }
272
273 let _ = self.inner.connection_state.send(ConnectionState::Connected);
274 info!("connected to controller");
275 Ok(())
276 }
277
278 async fn spawn_websocket(&self, cancel: &CancellationToken, handles: &mut Vec<JoinHandle<()>>) {
283 let Some(session) = self.inner.session_client.lock().await.clone() else {
284 debug!("no session client — WebSocket unavailable");
285 return;
286 };
287
288 let platform = session.platform();
289 let Some(ws_path_template) = platform.websocket_path() else {
290 debug!("platform does not support WebSocket");
291 return;
292 };
293
294 let ws_path = ws_path_template.replace("{site}", &self.inner.config.site);
295 let base_url = &self.inner.config.url;
296 let scheme = if base_url.scheme() == "https" {
297 "wss"
298 } else {
299 "ws"
300 };
301 let host = base_url.host_str().unwrap_or("localhost");
302 let ws_url_str = match base_url.port() {
303 Some(port) => format!("{scheme}://{host}:{port}{ws_path}"),
304 None => format!("{scheme}://{host}{ws_path}"),
305 };
306 let ws_url = match url::Url::parse(&ws_url_str) {
307 Ok(url) => url,
308 Err(error) => {
309 warn!(error = %error, url = %ws_url_str, "invalid WebSocket URL");
310 return;
311 }
312 };
313
314 let cookie = session.cookie_header();
315
316 if cookie.is_none() {
317 warn!("no session cookie — WebSocket requires session auth (skipping)");
318 return;
319 }
320
321 let ws_tls = tls_to_transport(&self.inner.config.tls);
322 let ws_cancel = cancel.child_token();
323 let handle = match WebSocketHandle::connect(
324 ws_url,
325 ReconnectConfig::default(),
326 ws_cancel.clone(),
327 cookie,
328 ws_tls,
329 ) {
330 Ok(handle) => handle,
331 Err(error) => {
332 warn!(error = %error, "WebSocket connection failed (non-fatal)");
333 return;
334 }
335 };
336
337 let mut ws_rx = handle.subscribe();
341 let event_tx = self.inner.event_tx.clone();
342 let store = Arc::clone(&self.inner.store);
343 let bridge_cancel = ws_cancel;
344
345 handles.push(tokio::spawn(async move {
346 loop {
347 tokio::select! {
348 biased;
349 () = bridge_cancel.cancelled() => break,
350 result = ws_rx.recv() => {
351 match result {
352 Ok(ws_event) => {
353 store.mark_ws_event(chrono::Utc::now());
354
355 if ws_event.key == "device:sync" || ws_event.key == "device:update" {
356 super::runtime::apply_device_sync(&store, &ws_event.extra);
357 }
358
359 if ws_event.key.starts_with("EVT_") {
360 let event = crate::model::event::Event::from((*ws_event).clone());
361 let _ = event_tx.send(Arc::new(event));
362 }
363 }
364 Err(tokio::sync::broadcast::error::RecvError::Lagged(skipped)) => {
365 warn!(skipped, "WS bridge: receiver lagged");
366 }
367 Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
368 }
369 }
370 }
371 }
372 }));
373
374 *self.inner.ws_handle.lock().await = Some(handle);
375 info!("WebSocket event stream spawned (handshake in progress)");
376 }
377
378 pub async fn disconnect(&self) {
383 self.inner.cancel_child.lock().await.cancel();
384
385 if let Some(handle) = self.inner.ws_handle.lock().await.take() {
386 handle.shutdown();
387 }
388
389 let mut handles = self.inner.task_handles.lock().await;
390 for handle in handles.drain(..) {
391 let _ = handle.await;
392 }
393
394 let session = self.inner.session_client.lock().await.clone();
395
396 let cache_active = build_session_cache(&self.inner.config).is_some();
399
400 if !cache_active
401 && matches!(
402 self.inner.config.auth,
403 AuthCredentials::Credentials { .. } | AuthCredentials::Hybrid { .. }
404 )
405 && let Some(client) = session
406 && let Err(error) = client.logout().await
407 {
408 warn!(error = %error, "logout failed (non-fatal)");
409 }
410
411 *self.inner.session_client.lock().await = None;
412 *self.inner.integration_client.lock().await = None;
413 *self.inner.site_id.lock().await = None;
414
415 {
416 let (tx, rx) = mpsc::channel(COMMAND_CHANNEL_SIZE);
417 *self.inner.command_tx.lock().await = tx;
418 *self.inner.command_rx.lock().await = Some(rx);
419 }
420
421 let _ = self
422 .inner
423 .connection_state
424 .send(ConnectionState::Disconnected);
425 debug!("disconnected");
426 }
427}
428
429fn build_session_cache(
431 config: &crate::config::ControllerConfig,
432) -> Option<crate::session::session_cache::SessionCache> {
433 if config.no_session_cache {
434 return None;
435 }
436 let name = config.profile_name.as_deref()?;
437 crate::session::session_cache::SessionCache::new(name, config.url.as_str())
438}