1use std::sync::Arc;
2
3use tokio::sync::mpsc;
4use tokio::task::JoinHandle;
5use tokio_util::sync::CancellationToken;
6use tracing::{debug, info, warn};
7
8use reqwest::header::{HeaderMap, HeaderValue};
9use secrecy::ExposeSecret;
10
11use crate::config::AuthCredentials;
12use crate::core_error::CoreError;
13use crate::websocket::{ReconnectConfig, WebSocketHandle};
14use crate::{IntegrationClient, SessionClient};
15
16use super::support::{build_transport, resolve_site, tls_to_transport};
17use super::{COMMAND_CHANNEL_SIZE, ConnectionState, Controller, refresh};
18
19impl Controller {
20 #[allow(clippy::cognitive_complexity, clippy::too_many_lines)]
28 pub async fn connect(&self) -> Result<(), CoreError> {
29 self.connect_with_refresh(true).await
30 }
31
32 pub async fn connect_lightweight(&self) -> Result<(), CoreError> {
37 self.connect_with_refresh(false).await
38 }
39
40 #[allow(clippy::cognitive_complexity, clippy::too_many_lines)]
41 async fn connect_with_refresh(&self, initial_refresh: bool) -> Result<(), CoreError> {
42 let _ = self
43 .inner
44 .connection_state
45 .send(ConnectionState::Connecting);
46
47 let child = self.inner.cancel.child_token();
49 *self.inner.cancel_child.lock().await = child.clone();
50
51 let config = &self.inner.config;
52 let transport = build_transport(config);
53
54 match &config.auth {
55 AuthCredentials::ApiKey(api_key) => {
56 let platform = SessionClient::detect_platform(&config.url).await?;
58 debug!(?platform, "detected controller platform");
59
60 let integration = IntegrationClient::from_api_key(
62 config.url.as_str(),
63 api_key,
64 &transport,
65 platform,
66 )?;
67
68 let resolved = resolve_site(&integration, &config.site)
73 .await
74 .map_err(|e| match &e {
75 CoreError::Api {
76 status: Some(404), ..
77 } => {
78 debug!(error = %e, "Integration API returned 404 during site resolution");
79 CoreError::Unsupported {
80 operation: "API-key authentication".into(),
81 required: "a controller with the Integration API \
82 (Settings > Integrations).\n\
83 For older UniFi Network Application installs, \
84 use --username/--password instead"
85 .into(),
86 }
87 }
88 _ => e,
89 })?;
90 debug!(site_id = %resolved.id, slug = %resolved.slug, "resolved Integration API site");
91
92 *self.inner.integration_client.lock().await = Some(Arc::new(integration));
93 *self.inner.site_id.lock().await = Some(resolved.id);
94
95 let mut headers = HeaderMap::new();
101 let mut key_value =
102 HeaderValue::from_str(api_key.expose_secret()).map_err(|e| {
103 CoreError::from(crate::error::Error::Authentication {
104 message: format!("invalid API key header value: {e}"),
105 })
106 })?;
107 key_value.set_sensitive(true);
108 headers.insert("X-API-KEY", key_value);
109 let legacy_http = transport.build_client_with_headers(headers)?;
110 let session = SessionClient::with_client(
111 legacy_http,
112 config.url.clone(),
113 resolved.slug,
114 platform,
115 crate::session::client::SessionAuth::ApiKey,
116 );
117 *self.inner.session_client.lock().await = Some(Arc::new(session));
118 }
119 AuthCredentials::Credentials { username, password } => {
120 let platform = SessionClient::detect_platform(&config.url).await?;
122 debug!(?platform, "detected controller platform");
123
124 let client = SessionClient::new(
125 config.url.clone(),
126 config.site.clone(),
127 platform,
128 &transport,
129 )?;
130
131 let cache = build_session_cache(config);
132 if let Some(ref cache) = cache {
133 client
134 .login_with_cache(username, password, config.totp_token.as_ref(), cache)
135 .await?;
136 } else {
137 client
138 .login(username, password, config.totp_token.as_ref())
139 .await?;
140 }
141 debug!("session authentication successful");
142
143 *self.inner.session_client.lock().await = Some(Arc::new(client));
144 }
145 AuthCredentials::Hybrid {
146 api_key,
147 username,
148 password,
149 } => {
150 let platform = SessionClient::detect_platform(&config.url).await?;
152 debug!(?platform, "detected controller platform (hybrid)");
153
154 let integration = IntegrationClient::from_api_key(
156 config.url.as_str(),
157 api_key,
158 &transport,
159 platform,
160 )?;
161
162 let resolved = resolve_site(&integration, &config.site)
163 .await
164 .map_err(|e| match &e {
165 CoreError::Api {
166 status: Some(404), ..
167 } => {
168 debug!(error = %e, "Integration API returned 404 during site resolution");
169 CoreError::Unsupported {
170 operation: "API-key authentication".into(),
171 required: "a controller with the Integration API \
172 (Settings > Integrations).\n\
173 For older UniFi Network Application installs, \
174 use --username/--password instead"
175 .into(),
176 }
177 }
178 _ => e,
179 })?;
180 debug!(site_id = %resolved.id, slug = %resolved.slug, "resolved Integration API site");
181
182 *self.inner.integration_client.lock().await = Some(Arc::new(integration));
183 *self.inner.site_id.lock().await = Some(resolved.id);
184
185 match SessionClient::new(config.url.clone(), resolved.slug, platform, &transport) {
189 Ok(client) => {
190 let cache = build_session_cache(config);
191 let login_result = if let Some(ref cache) = cache {
192 client
193 .login_with_cache(
194 username,
195 password,
196 config.totp_token.as_ref(),
197 cache,
198 )
199 .await
200 } else {
201 client
202 .login(username, password, config.totp_token.as_ref())
203 .await
204 };
205 match login_result {
206 Ok(()) => {
207 debug!("session authentication successful (hybrid)");
208 *self.inner.session_client.lock().await = Some(Arc::new(client));
209 }
210 Err(e) => {
211 let msg = format!(
212 "Session login failed: {e} — events, health stats, and client traffic will be unavailable"
213 );
214 warn!("{msg}");
215 self.inner.warnings.lock().await.push(msg);
216 }
217 }
218 }
219 Err(e) => {
220 let msg = format!("Session client setup failed: {e}");
221 warn!("{msg}");
222 self.inner.warnings.lock().await.push(msg);
223 }
224 }
225 }
226 AuthCredentials::Cloud { api_key, host_id } => {
227 let connector_base = format!(
228 "{}/v1/connector/consoles/{}",
229 config.url.as_str().trim_end_matches('/'),
230 host_id,
231 );
232
233 let integration = IntegrationClient::from_api_key(
234 &connector_base,
235 api_key,
236 &transport,
237 crate::ControllerPlatform::Cloud,
238 )?;
239
240 let resolved = resolve_site(&integration, &config.site).await?;
241 debug!(site_id = %resolved.id, slug = %resolved.slug, "resolved cloud Integration API site");
242
243 *self.inner.integration_client.lock().await = Some(Arc::new(integration));
244 *self.inner.site_id.lock().await = Some(resolved.id);
245
246 let msg =
247 "Cloud connector mode active: events watch, Wi-Fi observability, admin/session features, and live WebSocket data are unavailable"
248 .to_string();
249 self.inner.warnings.lock().await.push(msg);
250 }
251 }
252
253 if initial_refresh {
254 self.full_refresh().await?;
255 }
256
257 let mut handles = self.inner.task_handles.lock().await;
259
260 if let Some(rx) = self.inner.command_rx.lock().await.take() {
261 let ctrl = self.clone();
262 handles.push(tokio::spawn(super::runtime::command_processor_task(
263 ctrl, rx,
264 )));
265 }
266
267 let interval_secs = config.refresh_interval_secs;
268 if interval_secs > 0 {
269 let ctrl = self.clone();
270 let cancel = child.clone();
271 handles.push(tokio::spawn(refresh::refresh_task(
272 ctrl,
273 interval_secs,
274 cancel,
275 )));
276 }
277
278 if config.websocket_enabled {
279 self.spawn_websocket(&child, &mut handles).await;
280 }
281
282 let _ = self.inner.connection_state.send(ConnectionState::Connected);
283 info!("connected to controller");
284 Ok(())
285 }
286
287 async fn spawn_websocket(&self, cancel: &CancellationToken, handles: &mut Vec<JoinHandle<()>>) {
292 let Some(session) = self.inner.session_client.lock().await.clone() else {
293 debug!("no session client — WebSocket unavailable");
294 return;
295 };
296
297 let platform = session.platform();
298 let Some(ws_path_template) = platform.websocket_path() else {
299 debug!("platform does not support WebSocket");
300 return;
301 };
302
303 let ws_path = ws_path_template.replace("{site}", session.site());
307 let base_url = &self.inner.config.url;
308 let scheme = if base_url.scheme() == "https" {
309 "wss"
310 } else {
311 "ws"
312 };
313 let host = base_url.host_str().unwrap_or("localhost");
314 let ws_url_str = match base_url.port() {
315 Some(port) => format!("{scheme}://{host}:{port}{ws_path}"),
316 None => format!("{scheme}://{host}{ws_path}"),
317 };
318 let ws_url = match url::Url::parse(&ws_url_str) {
319 Ok(url) => url,
320 Err(error) => {
321 warn!(error = %error, url = %ws_url_str, "invalid WebSocket URL");
322 return;
323 }
324 };
325
326 let cookie = session.cookie_header();
327
328 if cookie.is_none() {
329 warn!("no session cookie — WebSocket requires session auth (skipping)");
330 return;
331 }
332
333 let ws_tls = tls_to_transport(&self.inner.config.tls);
334 let ws_cancel = cancel.child_token();
335 let handle = match WebSocketHandle::connect(
336 ws_url,
337 ReconnectConfig::default(),
338 ws_cancel.clone(),
339 cookie,
340 ws_tls,
341 ) {
342 Ok(handle) => handle,
343 Err(error) => {
344 warn!(error = %error, "WebSocket connection failed (non-fatal)");
345 return;
346 }
347 };
348
349 let mut ws_rx = handle.subscribe();
353 let event_tx = self.inner.event_tx.clone();
354 let store = Arc::clone(&self.inner.store);
355 let bridge_cancel = ws_cancel;
356
357 handles.push(tokio::spawn(async move {
358 loop {
359 tokio::select! {
360 biased;
361 () = bridge_cancel.cancelled() => break,
362 result = ws_rx.recv() => {
363 match result {
364 Ok(ws_event) => {
365 store.mark_ws_event(chrono::Utc::now());
366
367 if ws_event.key == "device:sync" || ws_event.key == "device:update" {
368 super::runtime::apply_device_sync(&store, &ws_event.extra);
369 }
370
371 if ws_event.key.starts_with("EVT_") {
372 let event = crate::model::event::Event::from((*ws_event).clone());
373 let _ = event_tx.send(Arc::new(event));
374 }
375 }
376 Err(tokio::sync::broadcast::error::RecvError::Lagged(skipped)) => {
377 warn!(skipped, "WS bridge: receiver lagged");
378 }
379 Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
380 }
381 }
382 }
383 }
384 }));
385
386 *self.inner.ws_handle.lock().await = Some(handle);
387 info!("WebSocket event stream spawned (handshake in progress)");
388 }
389
390 pub async fn disconnect(&self) {
395 self.inner.cancel_child.lock().await.cancel();
396
397 if let Some(handle) = self.inner.ws_handle.lock().await.take() {
398 handle.shutdown();
399 }
400
401 let mut handles = self.inner.task_handles.lock().await;
402 for handle in handles.drain(..) {
403 let _ = handle.await;
404 }
405
406 let session = self.inner.session_client.lock().await.clone();
407
408 let cache_active = build_session_cache(&self.inner.config).is_some();
411
412 if !cache_active
413 && matches!(
414 self.inner.config.auth,
415 AuthCredentials::Credentials { .. } | AuthCredentials::Hybrid { .. }
416 )
417 && let Some(client) = session
418 && let Err(error) = client.logout().await
419 {
420 warn!(error = %error, "logout failed (non-fatal)");
421 }
422
423 *self.inner.session_client.lock().await = None;
424 *self.inner.integration_client.lock().await = None;
425 *self.inner.site_id.lock().await = None;
426
427 {
428 let (tx, rx) = mpsc::channel(COMMAND_CHANNEL_SIZE);
429 *self.inner.command_tx.lock().await = tx;
430 *self.inner.command_rx.lock().await = Some(rx);
431 }
432
433 let _ = self
434 .inner
435 .connection_state
436 .send(ConnectionState::Disconnected);
437 debug!("disconnected");
438 }
439}
440
441fn build_session_cache(
443 config: &crate::config::ControllerConfig,
444) -> Option<crate::session::session_cache::SessionCache> {
445 if config.no_session_cache {
446 return None;
447 }
448 let name = config.profile_name.as_deref()?;
449 crate::session::session_cache::SessionCache::new(name, config.url.as_str())
450}