use std::sync::Arc;
use crate::command::{Command, CommandEnvelope, CommandResult};
use crate::config::ControllerConfig;
use crate::core_error::CoreError;
use crate::model::Event;
use crate::store::DataStore;
use crate::websocket::WebSocketHandle;
use crate::{IntegrationClient, LegacyClient};
use tokio::sync::{Mutex, broadcast, mpsc, watch};
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
mod commands;
mod legacy_queries;
mod lifecycle;
mod payloads;
mod query;
mod refresh;
mod runtime;
mod subscriptions;
mod support;
use self::support::{
client_mac, device_mac, integration_client_context, integration_site_context,
require_integration, require_legacy, require_uuid,
};
const COMMAND_CHANNEL_SIZE: usize = 64;
const EVENT_CHANNEL_SIZE: usize = 256;
const REFRESH_DETAIL_CONCURRENCY: usize = 16;
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ConnectionState {
Disconnected,
Connecting,
Connected,
Reconnecting { attempt: u32 },
Failed,
}
#[derive(Clone)]
pub struct Controller {
inner: Arc<ControllerInner>,
}
struct ControllerInner {
config: ControllerConfig,
store: Arc<DataStore>,
connection_state: watch::Sender<ConnectionState>,
event_tx: broadcast::Sender<Arc<Event>>,
command_tx: Mutex<mpsc::Sender<CommandEnvelope>>,
command_rx: Mutex<Option<mpsc::Receiver<CommandEnvelope>>>,
cancel: CancellationToken,
cancel_child: Mutex<CancellationToken>,
legacy_client: Mutex<Option<Arc<LegacyClient>>>,
integration_client: Mutex<Option<Arc<IntegrationClient>>>,
site_id: Mutex<Option<uuid::Uuid>>,
ws_handle: Mutex<Option<WebSocketHandle>>,
task_handles: Mutex<Vec<JoinHandle<()>>>,
warnings: Mutex<Vec<String>>,
}
impl Controller {
pub fn new(config: ControllerConfig) -> Self {
let store = Arc::new(DataStore::new());
let (connection_state, _) = watch::channel(ConnectionState::Disconnected);
let (event_tx, _) = broadcast::channel(EVENT_CHANNEL_SIZE);
let (command_tx, command_rx) = mpsc::channel(COMMAND_CHANNEL_SIZE);
let cancel = CancellationToken::new();
let cancel_child = cancel.child_token();
Self {
inner: Arc::new(ControllerInner {
config,
store,
connection_state,
event_tx,
command_tx: Mutex::new(command_tx),
command_rx: Mutex::new(Some(command_rx)),
cancel,
cancel_child: Mutex::new(cancel_child),
legacy_client: Mutex::new(None),
integration_client: Mutex::new(None),
warnings: Mutex::new(Vec::new()),
site_id: Mutex::new(None),
ws_handle: Mutex::new(None),
task_handles: Mutex::new(Vec::new()),
}),
}
}
pub fn config(&self) -> &ControllerConfig {
&self.inner.config
}
pub fn store(&self) -> &Arc<DataStore> {
&self.inner.store
}
pub async fn execute(&self, cmd: Command) -> Result<CommandResult, CoreError> {
let (tx, rx) = tokio::sync::oneshot::channel();
let command_tx = self.inner.command_tx.lock().await.clone();
command_tx
.send(CommandEnvelope {
command: cmd,
response_tx: tx,
})
.await
.map_err(|_| CoreError::ControllerDisconnected)?;
rx.await.map_err(|_| CoreError::ControllerDisconnected)?
}
pub async fn oneshot<F, Fut, T>(config: ControllerConfig, f: F) -> Result<T, CoreError>
where
F: FnOnce(Controller) -> Fut,
Fut: std::future::Future<Output = Result<T, CoreError>>,
{
let mut cfg = config;
cfg.websocket_enabled = false;
cfg.refresh_interval_secs = 0;
let controller = Controller::new(cfg);
controller.connect().await?;
let result = f(controller.clone()).await;
controller.disconnect().await;
result
}
}