Skip to main content

unifly_api/
controller.rs

1// ── Controller abstraction ──
2//
3// Full lifecycle management for a UniFi controller connection.
4// Handles authentication, background refresh, command routing,
5// and reactive data streaming through the DataStore.
6
7use std::sync::Arc;
8
9use crate::command::{Command, CommandEnvelope, CommandResult};
10use crate::config::ControllerConfig;
11use crate::core_error::CoreError;
12use crate::model::Event;
13use crate::store::DataStore;
14use crate::websocket::WebSocketHandle;
15use crate::{IntegrationClient, SessionClient};
16use tokio::sync::{Mutex, broadcast, mpsc, watch};
17use tokio::task::JoinHandle;
18use tokio_util::sync::CancellationToken;
19
20mod commands;
21mod lifecycle;
22mod payloads;
23mod ports;
24mod query;
25mod refresh;
26mod runtime;
27mod session_queries;
28mod subscriptions;
29mod support;
30
31pub use ports::{ApplyPortsSummary, PortProfileUpdate};
32
33use self::support::{
34    client_mac, device_mac, integration_client_context, integration_site_context,
35    require_integration, require_session, require_uuid,
36};
37
38const COMMAND_CHANNEL_SIZE: usize = 64;
39const EVENT_CHANNEL_SIZE: usize = 256;
40const REFRESH_DETAIL_CONCURRENCY: usize = 16;
41
42// ── ConnectionState ──────────────────────────────────────────────
43
44/// Connection state observable by consumers.
45#[derive(Debug, Clone, PartialEq, Eq)]
46pub enum ConnectionState {
47    Disconnected,
48    Connecting,
49    Connected,
50    Reconnecting { attempt: u32 },
51    Failed,
52}
53
54// ── Controller ───────────────────────────────────────────────────
55
56/// The main entry point for consumers.
57///
58/// Cheaply cloneable via `Arc<ControllerInner>`. Manages the full
59/// connection lifecycle: authentication, background data refresh,
60/// command routing, and reactive entity streaming.
61#[derive(Clone)]
62pub struct Controller {
63    inner: Arc<ControllerInner>,
64}
65
66struct ControllerInner {
67    config: ControllerConfig,
68    store: Arc<DataStore>,
69    connection_state: watch::Sender<ConnectionState>,
70    event_tx: broadcast::Sender<Arc<Event>>,
71    command_tx: Mutex<mpsc::Sender<CommandEnvelope>>,
72    command_rx: Mutex<Option<mpsc::Receiver<CommandEnvelope>>>,
73    cancel: CancellationToken,
74    /// Child token for the current connection — cancelled on disconnect,
75    /// replaced on reconnect (avoids permanent cancellation).
76    cancel_child: Mutex<CancellationToken>,
77    session_client: Mutex<Option<Arc<SessionClient>>>,
78    integration_client: Mutex<Option<Arc<IntegrationClient>>>,
79    /// Resolved Integration API site UUID (populated on connect).
80    site_id: Mutex<Option<uuid::Uuid>>,
81    /// WebSocket event stream handle (populated on connect if enabled).
82    ws_handle: Mutex<Option<WebSocketHandle>>,
83    task_handles: Mutex<Vec<JoinHandle<()>>>,
84    /// Warnings accumulated during connect (e.g. Session auth failure in Hybrid mode).
85    warnings: Mutex<Vec<String>>,
86}
87
88impl Controller {
89    /// Create a new Controller from configuration. Does NOT connect --
90    /// call [`connect()`](Self::connect) to authenticate and start background tasks.
91    pub fn new(config: ControllerConfig) -> Self {
92        let store = Arc::new(DataStore::new());
93        let (connection_state, _) = watch::channel(ConnectionState::Disconnected);
94        let (event_tx, _) = broadcast::channel(EVENT_CHANNEL_SIZE);
95        let (command_tx, command_rx) = mpsc::channel(COMMAND_CHANNEL_SIZE);
96        let cancel = CancellationToken::new();
97        let cancel_child = cancel.child_token();
98
99        Self {
100            inner: Arc::new(ControllerInner {
101                config,
102                store,
103                connection_state,
104                event_tx,
105                command_tx: Mutex::new(command_tx),
106                command_rx: Mutex::new(Some(command_rx)),
107                cancel,
108                cancel_child: Mutex::new(cancel_child),
109                session_client: Mutex::new(None),
110                integration_client: Mutex::new(None),
111                warnings: Mutex::new(Vec::new()),
112                site_id: Mutex::new(None),
113                ws_handle: Mutex::new(None),
114                task_handles: Mutex::new(Vec::new()),
115            }),
116        }
117    }
118
119    /// Access the controller configuration.
120    pub fn config(&self) -> &ControllerConfig {
121        &self.inner.config
122    }
123
124    /// Access the underlying DataStore.
125    pub fn store(&self) -> &Arc<DataStore> {
126        &self.inner.store
127    }
128
129    // ── Command execution ────────────────────────────────────────
130
131    /// Execute a command against the controller.
132    ///
133    /// Sends the command through the internal channel to the command
134    /// processor task and awaits the result.
135    pub async fn execute(&self, cmd: Command) -> Result<CommandResult, CoreError> {
136        let (tx, rx) = tokio::sync::oneshot::channel();
137
138        let command_tx = self.inner.command_tx.lock().await.clone();
139
140        command_tx
141            .send(CommandEnvelope {
142                command: cmd,
143                response_tx: tx,
144            })
145            .await
146            .map_err(|_| CoreError::ControllerDisconnected)?;
147
148        rx.await.map_err(|_| CoreError::ControllerDisconnected)?
149    }
150
151    // ── One-shot convenience ─────────────────────────────────────
152
153    /// One-shot: connect, run closure, disconnect.
154    ///
155    /// Optimized for CLI: disables WebSocket and periodic refresh since
156    /// we only need a single request-response cycle.
157    pub async fn oneshot<F, Fut, T>(config: ControllerConfig, f: F) -> Result<T, CoreError>
158    where
159        F: FnOnce(Controller) -> Fut,
160        Fut: std::future::Future<Output = Result<T, CoreError>>,
161    {
162        let mut cfg = config;
163        cfg.websocket_enabled = false;
164        cfg.refresh_interval_secs = 0;
165
166        let controller = Controller::new(cfg);
167        controller.connect().await?;
168        let result = f(controller.clone()).await;
169        controller.disconnect().await;
170        result
171    }
172}