1use std::sync::Arc;
8
9use crate::command::{Command, CommandEnvelope, CommandResult};
10use crate::config::ControllerConfig;
11use crate::core_error::CoreError;
12use crate::model::Event;
13use crate::store::DataStore;
14use crate::websocket::WebSocketHandle;
15use crate::{IntegrationClient, SessionClient};
16use tokio::sync::{Mutex, broadcast, mpsc, watch};
17use tokio::task::JoinHandle;
18use tokio_util::sync::CancellationToken;
19
20mod commands;
21mod lifecycle;
22mod payloads;
23mod ports;
24mod query;
25mod refresh;
26mod runtime;
27mod session_queries;
28mod subscriptions;
29mod support;
30
31pub use ports::{ApplyPortsSummary, PortProfileUpdate};
32
33use self::support::{
34 client_mac, device_mac, integration_client_context, integration_site_context,
35 require_integration, require_session, require_uuid,
36};
37
38const COMMAND_CHANNEL_SIZE: usize = 64;
39const EVENT_CHANNEL_SIZE: usize = 256;
40const REFRESH_DETAIL_CONCURRENCY: usize = 16;
41
42#[derive(Debug, Clone, PartialEq, Eq)]
46pub enum ConnectionState {
47 Disconnected,
48 Connecting,
49 Connected,
50 Reconnecting { attempt: u32 },
51 Failed,
52}
53
54#[derive(Clone)]
62pub struct Controller {
63 inner: Arc<ControllerInner>,
64}
65
66struct ControllerInner {
67 config: ControllerConfig,
68 store: Arc<DataStore>,
69 connection_state: watch::Sender<ConnectionState>,
70 event_tx: broadcast::Sender<Arc<Event>>,
71 command_tx: Mutex<mpsc::Sender<CommandEnvelope>>,
72 command_rx: Mutex<Option<mpsc::Receiver<CommandEnvelope>>>,
73 cancel: CancellationToken,
74 cancel_child: Mutex<CancellationToken>,
77 session_client: Mutex<Option<Arc<SessionClient>>>,
78 integration_client: Mutex<Option<Arc<IntegrationClient>>>,
79 site_id: Mutex<Option<uuid::Uuid>>,
81 ws_handle: Mutex<Option<WebSocketHandle>>,
83 task_handles: Mutex<Vec<JoinHandle<()>>>,
84 warnings: Mutex<Vec<String>>,
86}
87
88impl Controller {
89 pub fn new(config: ControllerConfig) -> Self {
92 let store = Arc::new(DataStore::new());
93 let (connection_state, _) = watch::channel(ConnectionState::Disconnected);
94 let (event_tx, _) = broadcast::channel(EVENT_CHANNEL_SIZE);
95 let (command_tx, command_rx) = mpsc::channel(COMMAND_CHANNEL_SIZE);
96 let cancel = CancellationToken::new();
97 let cancel_child = cancel.child_token();
98
99 Self {
100 inner: Arc::new(ControllerInner {
101 config,
102 store,
103 connection_state,
104 event_tx,
105 command_tx: Mutex::new(command_tx),
106 command_rx: Mutex::new(Some(command_rx)),
107 cancel,
108 cancel_child: Mutex::new(cancel_child),
109 session_client: Mutex::new(None),
110 integration_client: Mutex::new(None),
111 warnings: Mutex::new(Vec::new()),
112 site_id: Mutex::new(None),
113 ws_handle: Mutex::new(None),
114 task_handles: Mutex::new(Vec::new()),
115 }),
116 }
117 }
118
119 pub fn config(&self) -> &ControllerConfig {
121 &self.inner.config
122 }
123
124 pub fn store(&self) -> &Arc<DataStore> {
126 &self.inner.store
127 }
128
129 pub async fn execute(&self, cmd: Command) -> Result<CommandResult, CoreError> {
136 let (tx, rx) = tokio::sync::oneshot::channel();
137
138 let command_tx = self.inner.command_tx.lock().await.clone();
139
140 command_tx
141 .send(CommandEnvelope {
142 command: cmd,
143 response_tx: tx,
144 })
145 .await
146 .map_err(|_| CoreError::ControllerDisconnected)?;
147
148 rx.await.map_err(|_| CoreError::ControllerDisconnected)?
149 }
150
151 pub async fn oneshot<F, Fut, T>(config: ControllerConfig, f: F) -> Result<T, CoreError>
158 where
159 F: FnOnce(Controller) -> Fut,
160 Fut: std::future::Future<Output = Result<T, CoreError>>,
161 {
162 let mut cfg = config;
163 cfg.websocket_enabled = false;
164 cfg.refresh_interval_secs = 0;
165
166 let controller = Controller::new(cfg);
167 controller.connect().await?;
168 let result = f(controller.clone()).await;
169 controller.disconnect().await;
170 result
171 }
172}