1use std::sync::Arc;
8
9use crate::command::{Command, CommandEnvelope, CommandResult};
10use crate::config::ControllerConfig;
11use crate::core_error::CoreError;
12use crate::model::Event;
13use crate::store::DataStore;
14use crate::websocket::WebSocketHandle;
15use crate::{IntegrationClient, SessionClient};
16use tokio::sync::{Mutex, broadcast, mpsc, watch};
17use tokio::task::JoinHandle;
18use tokio_util::sync::CancellationToken;
19
20mod commands;
21mod lifecycle;
22mod payloads;
23mod query;
24mod refresh;
25mod runtime;
26mod session_queries;
27mod subscriptions;
28mod support;
29
30use self::support::{
31 client_mac, device_mac, integration_client_context, integration_site_context,
32 require_integration, require_session, require_uuid,
33};
34
35const COMMAND_CHANNEL_SIZE: usize = 64;
36const EVENT_CHANNEL_SIZE: usize = 256;
37const REFRESH_DETAIL_CONCURRENCY: usize = 16;
38
39#[derive(Debug, Clone, PartialEq, Eq)]
43pub enum ConnectionState {
44 Disconnected,
45 Connecting,
46 Connected,
47 Reconnecting { attempt: u32 },
48 Failed,
49}
50
51#[derive(Clone)]
59pub struct Controller {
60 inner: Arc<ControllerInner>,
61}
62
63struct ControllerInner {
64 config: ControllerConfig,
65 store: Arc<DataStore>,
66 connection_state: watch::Sender<ConnectionState>,
67 event_tx: broadcast::Sender<Arc<Event>>,
68 command_tx: Mutex<mpsc::Sender<CommandEnvelope>>,
69 command_rx: Mutex<Option<mpsc::Receiver<CommandEnvelope>>>,
70 cancel: CancellationToken,
71 cancel_child: Mutex<CancellationToken>,
74 session_client: Mutex<Option<Arc<SessionClient>>>,
75 integration_client: Mutex<Option<Arc<IntegrationClient>>>,
76 site_id: Mutex<Option<uuid::Uuid>>,
78 ws_handle: Mutex<Option<WebSocketHandle>>,
80 task_handles: Mutex<Vec<JoinHandle<()>>>,
81 warnings: Mutex<Vec<String>>,
83}
84
85impl Controller {
86 pub fn new(config: ControllerConfig) -> Self {
89 let store = Arc::new(DataStore::new());
90 let (connection_state, _) = watch::channel(ConnectionState::Disconnected);
91 let (event_tx, _) = broadcast::channel(EVENT_CHANNEL_SIZE);
92 let (command_tx, command_rx) = mpsc::channel(COMMAND_CHANNEL_SIZE);
93 let cancel = CancellationToken::new();
94 let cancel_child = cancel.child_token();
95
96 Self {
97 inner: Arc::new(ControllerInner {
98 config,
99 store,
100 connection_state,
101 event_tx,
102 command_tx: Mutex::new(command_tx),
103 command_rx: Mutex::new(Some(command_rx)),
104 cancel,
105 cancel_child: Mutex::new(cancel_child),
106 session_client: Mutex::new(None),
107 integration_client: Mutex::new(None),
108 warnings: Mutex::new(Vec::new()),
109 site_id: Mutex::new(None),
110 ws_handle: Mutex::new(None),
111 task_handles: Mutex::new(Vec::new()),
112 }),
113 }
114 }
115
116 pub fn config(&self) -> &ControllerConfig {
118 &self.inner.config
119 }
120
121 pub fn store(&self) -> &Arc<DataStore> {
123 &self.inner.store
124 }
125
126 pub async fn execute(&self, cmd: Command) -> Result<CommandResult, CoreError> {
133 let (tx, rx) = tokio::sync::oneshot::channel();
134
135 let command_tx = self.inner.command_tx.lock().await.clone();
136
137 command_tx
138 .send(CommandEnvelope {
139 command: cmd,
140 response_tx: tx,
141 })
142 .await
143 .map_err(|_| CoreError::ControllerDisconnected)?;
144
145 rx.await.map_err(|_| CoreError::ControllerDisconnected)?
146 }
147
148 pub async fn oneshot<F, Fut, T>(config: ControllerConfig, f: F) -> Result<T, CoreError>
155 where
156 F: FnOnce(Controller) -> Fut,
157 Fut: std::future::Future<Output = Result<T, CoreError>>,
158 {
159 let mut cfg = config;
160 cfg.websocket_enabled = false;
161 cfg.refresh_interval_secs = 0;
162
163 let controller = Controller::new(cfg);
164 controller.connect().await?;
165 let result = f(controller.clone()).await;
166 controller.disconnect().await;
167 result
168 }
169}