use axum::{extract::State, Json};
use freshblu_core::{
error::FreshBluError,
forwarder::ForwarderEvent,
message::{DeviceEvent, Message, SendMessageParams},
permissions::PermissionChecker,
subscription::SubscriptionType,
};
use std::sync::Arc;
use uuid::Uuid;
use super::AuthenticatedDevice;
use crate::metrics::MESSAGES_SENT;
use crate::{ApiError, AppState};
type ApiResult<T> = Result<Json<T>, ApiError>;
pub async fn send_message(
State(state): State<AppState>,
AuthenticatedDevice(actor, as_uuid): AuthenticatedDevice,
Json(params): Json<SendMessageParams>,
) -> ApiResult<serde_json::Value> {
{
let payload_size = params
.payload
.as_ref()
.and_then(|p| serde_json::to_string(p).ok())
.map(|s| s.len())
.unwrap_or(0);
let extra_size = if params.extra.is_empty() {
0
} else {
serde_json::to_string(¶ms.extra)
.map(|s| s.len())
.unwrap_or(0)
};
if payload_size + extra_size > state.config.max_message_size {
return Err(FreshBluError::MessageTooLarge.into());
}
}
let sender_uuid = as_uuid.unwrap_or(actor.uuid);
if let Some(ref as_u) = as_uuid {
let target = state
.store
.get_device(as_u)
.await?
.ok_or(FreshBluError::NotFound)
.map_err(ApiError::from)?;
let checker = PermissionChecker::new(&target.meshblu.whitelists, &actor.uuid, as_u);
if !checker.can_message_as() {
return Err(FreshBluError::Forbidden.into());
}
if params.devices.iter().any(|d| d == "*") && !checker.can_broadcast_as() {
return Err(FreshBluError::Forbidden.into());
}
}
let is_broadcast = params.is_broadcast();
let message = Arc::new(Message {
devices: params.devices.clone(),
from_uuid: Some(sender_uuid),
topic: params.topic.clone(),
payload: params.payload.clone(),
metadata: None,
extra: params.extra.clone(),
});
for device_id in ¶ms.devices {
if device_id == "*" {
continue; }
let target_uuid = match Uuid::parse_str(device_id) {
Ok(u) => u,
Err(_) => continue,
};
let target_device = match state.store.get_device(&target_uuid).await {
Ok(Some(d)) => d,
_ => continue,
};
let checker = PermissionChecker::new(
&target_device.meshblu.whitelists,
&sender_uuid,
&target_uuid,
);
if !checker.can_message_from() {
continue; }
let msg_event = DeviceEvent::Message((*message).clone());
let _ = state.bus.publish(&target_uuid, msg_event.clone()).await;
let received_subs = state
.store
.get_subscribers(&target_uuid, &SubscriptionType::MessageReceived)
.await
.unwrap_or_default();
if !received_subs.is_empty() {
let _ = state.bus.publish_many(&received_subs, msg_event.clone()).await;
}
let sent_subs = state
.store
.get_subscribers(&sender_uuid, &SubscriptionType::MessageSent)
.await
.unwrap_or_default();
if !sent_subs.is_empty() {
let _ = state.bus.publish_many(&sent_subs, msg_event.clone()).await;
}
}
if is_broadcast {
let broadcast_subs = state
.store
.get_subscribers(&sender_uuid, &SubscriptionType::BroadcastSent)
.await
.unwrap_or_default();
let broadcast_event = DeviceEvent::Broadcast((*message).clone());
if !broadcast_subs.is_empty() {
let _ = state
.bus
.publish_many(&broadcast_subs, broadcast_event.clone())
.await;
}
for sub_uuid in &broadcast_subs {
let br_subs = state
.store
.get_subscribers(sub_uuid, &SubscriptionType::BroadcastReceived)
.await
.unwrap_or_default();
if !br_subs.is_empty() {
let _ = state
.bus
.publish_many(&br_subs, broadcast_event.clone())
.await;
}
}
}
if let Ok(Some(sender_device)) = state.store.get_device(&sender_uuid).await {
let payload = serde_json::to_value(&*message).unwrap_or_default();
let executor = state.webhook_executor.clone();
let dev = sender_device.clone();
tokio::spawn(async move {
executor
.execute(&dev, ForwarderEvent::MessageSent, &payload, &[])
.await;
});
}
MESSAGES_SENT.inc();
Ok(Json(serde_json::json!({ "sent": true })))
}
#[derive(Debug, serde::Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct BroadcastParams {
#[serde(default)]
pub devices: Option<Vec<String>>,
pub topic: Option<String>,
pub payload: Option<serde_json::Value>,
#[serde(flatten)]
pub extra: std::collections::HashMap<String, serde_json::Value>,
}
pub async fn broadcast(
State(state): State<AppState>,
AuthenticatedDevice(actor, as_uuid): AuthenticatedDevice,
Json(params): Json<BroadcastParams>,
) -> ApiResult<serde_json::Value> {
let msg_params = SendMessageParams {
devices: vec!["*".to_string()],
topic: params.topic,
payload: params.payload,
extra: params.extra,
};
send_message(
State(state),
AuthenticatedDevice(actor, as_uuid),
Json(msg_params),
)
.await
}