Skip to main content

unifly_api/
controller.rs

1// ── Controller abstraction ──
2//
3// Full lifecycle management for a UniFi controller connection.
4// Handles authentication, background refresh, command routing,
5// and reactive data streaming through the DataStore.
6
7use std::sync::Arc;
8
9use crate::command::{Command, CommandEnvelope, CommandResult};
10use crate::config::ControllerConfig;
11use crate::core_error::CoreError;
12use crate::model::Event;
13use crate::store::DataStore;
14use crate::websocket::WebSocketHandle;
15use crate::{IntegrationClient, SessionClient};
16use tokio::sync::{Mutex, broadcast, mpsc, watch};
17use tokio::task::JoinHandle;
18use tokio_util::sync::CancellationToken;
19
20mod commands;
21mod lifecycle;
22mod payloads;
23mod query;
24mod refresh;
25mod runtime;
26mod session_queries;
27mod subscriptions;
28mod support;
29
30use self::support::{
31    client_mac, device_mac, integration_client_context, integration_site_context,
32    require_integration, require_session, require_uuid,
33};
34
35const COMMAND_CHANNEL_SIZE: usize = 64;
36const EVENT_CHANNEL_SIZE: usize = 256;
37const REFRESH_DETAIL_CONCURRENCY: usize = 16;
38
39// ── ConnectionState ──────────────────────────────────────────────
40
41/// Connection state observable by consumers.
42#[derive(Debug, Clone, PartialEq, Eq)]
43pub enum ConnectionState {
44    Disconnected,
45    Connecting,
46    Connected,
47    Reconnecting { attempt: u32 },
48    Failed,
49}
50
51// ── Controller ───────────────────────────────────────────────────
52
53/// The main entry point for consumers.
54///
55/// Cheaply cloneable via `Arc<ControllerInner>`. Manages the full
56/// connection lifecycle: authentication, background data refresh,
57/// command routing, and reactive entity streaming.
58#[derive(Clone)]
59pub struct Controller {
60    inner: Arc<ControllerInner>,
61}
62
63struct ControllerInner {
64    config: ControllerConfig,
65    store: Arc<DataStore>,
66    connection_state: watch::Sender<ConnectionState>,
67    event_tx: broadcast::Sender<Arc<Event>>,
68    command_tx: Mutex<mpsc::Sender<CommandEnvelope>>,
69    command_rx: Mutex<Option<mpsc::Receiver<CommandEnvelope>>>,
70    cancel: CancellationToken,
71    /// Child token for the current connection — cancelled on disconnect,
72    /// replaced on reconnect (avoids permanent cancellation).
73    cancel_child: Mutex<CancellationToken>,
74    session_client: Mutex<Option<Arc<SessionClient>>>,
75    integration_client: Mutex<Option<Arc<IntegrationClient>>>,
76    /// Resolved Integration API site UUID (populated on connect).
77    site_id: Mutex<Option<uuid::Uuid>>,
78    /// WebSocket event stream handle (populated on connect if enabled).
79    ws_handle: Mutex<Option<WebSocketHandle>>,
80    task_handles: Mutex<Vec<JoinHandle<()>>>,
81    /// Warnings accumulated during connect (e.g. Session auth failure in Hybrid mode).
82    warnings: Mutex<Vec<String>>,
83}
84
85impl Controller {
86    /// Create a new Controller from configuration. Does NOT connect --
87    /// call [`connect()`](Self::connect) to authenticate and start background tasks.
88    pub fn new(config: ControllerConfig) -> Self {
89        let store = Arc::new(DataStore::new());
90        let (connection_state, _) = watch::channel(ConnectionState::Disconnected);
91        let (event_tx, _) = broadcast::channel(EVENT_CHANNEL_SIZE);
92        let (command_tx, command_rx) = mpsc::channel(COMMAND_CHANNEL_SIZE);
93        let cancel = CancellationToken::new();
94        let cancel_child = cancel.child_token();
95
96        Self {
97            inner: Arc::new(ControllerInner {
98                config,
99                store,
100                connection_state,
101                event_tx,
102                command_tx: Mutex::new(command_tx),
103                command_rx: Mutex::new(Some(command_rx)),
104                cancel,
105                cancel_child: Mutex::new(cancel_child),
106                session_client: Mutex::new(None),
107                integration_client: Mutex::new(None),
108                warnings: Mutex::new(Vec::new()),
109                site_id: Mutex::new(None),
110                ws_handle: Mutex::new(None),
111                task_handles: Mutex::new(Vec::new()),
112            }),
113        }
114    }
115
116    /// Access the controller configuration.
117    pub fn config(&self) -> &ControllerConfig {
118        &self.inner.config
119    }
120
121    /// Access the underlying DataStore.
122    pub fn store(&self) -> &Arc<DataStore> {
123        &self.inner.store
124    }
125
126    // ── Command execution ────────────────────────────────────────
127
128    /// Execute a command against the controller.
129    ///
130    /// Sends the command through the internal channel to the command
131    /// processor task and awaits the result.
132    pub async fn execute(&self, cmd: Command) -> Result<CommandResult, CoreError> {
133        let (tx, rx) = tokio::sync::oneshot::channel();
134
135        let command_tx = self.inner.command_tx.lock().await.clone();
136
137        command_tx
138            .send(CommandEnvelope {
139                command: cmd,
140                response_tx: tx,
141            })
142            .await
143            .map_err(|_| CoreError::ControllerDisconnected)?;
144
145        rx.await.map_err(|_| CoreError::ControllerDisconnected)?
146    }
147
148    // ── One-shot convenience ─────────────────────────────────────
149
150    /// One-shot: connect, run closure, disconnect.
151    ///
152    /// Optimized for CLI: disables WebSocket and periodic refresh since
153    /// we only need a single request-response cycle.
154    pub async fn oneshot<F, Fut, T>(config: ControllerConfig, f: F) -> Result<T, CoreError>
155    where
156        F: FnOnce(Controller) -> Fut,
157        Fut: std::future::Future<Output = Result<T, CoreError>>,
158    {
159        let mut cfg = config;
160        cfg.websocket_enabled = false;
161        cfg.refresh_interval_secs = 0;
162
163        let controller = Controller::new(cfg);
164        controller.connect().await?;
165        let result = f(controller.clone()).await;
166        controller.disconnect().await;
167        result
168    }
169}