use std::sync::Arc;
use serde_json::{Map, Value as Json};
use uuid::Uuid;
use super::core::Core;
use crate::core::{Device, DeviceConfig, DeviceError, DeviceId, TransitionInput};
pub type AppError = Box<dyn std::error::Error + Send + Sync>;
#[async_trait::async_trait]
pub trait App: Send + Sync + 'static {
async fn run(self: Arc<Self>, server: ServerHandle) -> Result<(), AppError>;
}
#[async_trait::async_trait]
pub trait Scout: Send + Sync + 'static {
async fn run(self: Arc<Self>, ctx: ScoutCtx) -> Result<(), DeviceError>;
}
#[derive(Clone)]
pub struct ScoutCtx {
core: Arc<Core>,
}
impl ScoutCtx {
#[doc(hidden)]
pub fn new_internal(core: Arc<Core>) -> Self {
Self { core }
}
pub fn name(&self) -> &str {
&self.core.name
}
pub async fn discover<D: Device + 'static>(&self, device: D) -> DeviceId {
let id = Uuid::new_v4();
let mut cfg = DeviceConfig::default();
device.config(&mut cfg);
self.core.register_device(id, cfg, Box::new(device)).await;
id
}
pub fn server(&self) -> ServerHandle {
ServerHandle::new_internal(self.core.clone())
}
}
#[derive(Clone)]
pub struct ServerHandle {
pub(crate) core: Arc<Core>,
}
impl ServerHandle {
#[doc(hidden)]
pub fn new_internal(core: Arc<Core>) -> Self {
Self { core }
}
pub fn name(&self) -> &str {
&self.core.name
}
pub async fn query(&self, ql: &str) -> Vec<DeviceProxy> {
let q = match crate::caql::parse(ql) {
Ok(q) => q,
Err(e) => {
tracing::warn!(%ql, error = %e, "invalid CaQL in app query");
return Vec::new();
}
};
let devices = self.core.list_devices().await;
devices
.into_iter()
.filter(|d| {
let target = serde_json::json!({
"id": d.id.to_string(),
"type": d.type_,
"name": d.name,
"state": d.state,
});
crate::caql::matches(&q, &target).unwrap_or(false)
})
.map(|d| DeviceProxy {
core: self.core.clone(),
id: d.id,
})
.collect()
}
pub async fn device(&self, id: DeviceId) -> Option<DeviceProxy> {
self.core.get_device(&id).await.map(|_| DeviceProxy {
core: self.core.clone(),
id,
})
}
pub async fn observe<F, Fut>(&self, queries: Vec<&str>, callback: F) -> Result<(), AppError>
where
F: FnOnce(Vec<DeviceProxy>) -> Fut + Send,
Fut: std::future::Future<Output = Result<(), AppError>> + Send,
{
let parsed: Vec<crate::caql::Query> = queries
.iter()
.map(|q| crate::caql::parse(q))
.collect::<Result<_, _>>()
.map_err(|e| -> AppError { Box::new(std::io::Error::other(format!("caql: {e}"))) })?;
let mut rx = self.core.device_changes.subscribe();
loop {
let devices = self.core.list_devices().await;
let mut proxies = Vec::with_capacity(parsed.len());
let mut ok = true;
for q in &parsed {
let m = devices.iter().find(|d| {
let target = serde_json::json!({
"id": d.id.to_string(),
"type": d.type_,
"name": d.name,
"state": d.state,
});
crate::caql::matches(q, &target).unwrap_or(false)
});
match m {
Some(d) => proxies.push(DeviceProxy {
core: self.core.clone(),
id: d.id,
}),
None => {
ok = false;
break;
}
}
}
if ok {
return callback(proxies).await;
}
match rx.recv().await {
Ok(()) | Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
Err(tokio::sync::broadcast::error::RecvError::Closed) => return Ok(()),
}
}
}
pub async fn observe_loop<F, Fut>(
&self,
queries: Vec<&str>,
mut callback: F,
) -> Result<(), AppError>
where
F: FnMut(Vec<DeviceProxy>) -> Fut + Send,
Fut: std::future::Future<Output = Result<(), AppError>> + Send,
{
let parsed: Vec<crate::caql::Query> = queries
.iter()
.map(|q| crate::caql::parse(q))
.collect::<Result<_, _>>()
.map_err(|e| -> AppError { Box::new(std::io::Error::other(format!("caql: {e}"))) })?;
let mut rx = self.core.device_changes.subscribe();
let mut prev: Option<Vec<DeviceId>> = None;
loop {
let devices = self.core.list_devices().await;
let mut proxies = Vec::with_capacity(parsed.len());
let mut ok = true;
for q in &parsed {
let m = devices.iter().find(|d| {
let target = serde_json::json!({
"id": d.id.to_string(),
"type": d.type_,
"name": d.name,
"state": d.state,
});
crate::caql::matches(q, &target).unwrap_or(false)
});
match m {
Some(d) => proxies.push(DeviceProxy {
core: self.core.clone(),
id: d.id,
}),
None => {
ok = false;
break;
}
}
}
if ok {
let ids: Vec<DeviceId> = proxies.iter().map(|p| p.id()).collect();
if prev.as_ref() != Some(&ids) {
callback(proxies).await?;
prev = Some(ids);
}
} else if prev.is_some() {
prev = None;
}
match rx.recv().await {
Ok(()) | Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
Err(tokio::sync::broadcast::error::RecvError::Closed) => return Ok(()),
}
}
}
}
#[derive(Clone)]
pub struct DeviceProxy {
core: Arc<Core>,
id: DeviceId,
}
impl DeviceProxy {
pub fn id(&self) -> DeviceId {
self.id
}
pub async fn state(&self) -> Option<String> {
self.core.get_device(&self.id).await.map(|d| d.state)
}
pub async fn available(&self, transition: &str) -> bool {
let Some(snap) = self.core.get_device(&self.id).await else {
return false;
};
snap.config
.allowed_in(&snap.state)
.iter()
.any(|t| t == transition)
}
pub async fn call(&self, transition: &str, input: TransitionInput) -> Result<(), DeviceError> {
self.core
.run_transition(&self.id, transition, input)
.await
.map(|_| ())
}
pub async fn call_simple(&self, transition: &str) -> Result<(), DeviceError> {
self.call(transition, TransitionInput::default()).await
}
pub async fn property(&self, name: &str) -> Option<Json> {
let snap = self.core.get_device(&self.id).await?;
snap.properties.get(name).cloned()
}
pub async fn properties(&self) -> Option<Map<String, Json>> {
Some(self.core.get_device(&self.id).await?.properties)
}
}