use std::sync::Arc;
use serde::{Serialize, de::DeserializeOwned};
use serde_json::Value;
use uuid::Uuid;
#[cfg(not(target_arch = "wasm32"))]
use crate::server::CellServerCtx;
use crate::{
command::CommandError,
common::to_value::ToValue,
entities::client::ClientId,
event::EventOptions,
item::Eventable,
query::QueryParams,
request::RequestContext,
wire::{MEvent, MEventType},
};
#[derive(Clone)]
pub struct CommandContext {
pub req: Arc<RequestContext>,
pub command_id: Arc<str>,
#[cfg(not(target_arch = "wasm32"))]
server_ctx: Arc<CellServerCtx>,
}
impl CommandContext {
#[cfg(not(target_arch = "wasm32"))]
pub fn new(
command_id: Arc<str>,
req: Arc<RequestContext>,
server_ctx: Arc<CellServerCtx>,
) -> Self {
Self {
req,
command_id,
server_ctx,
}
}
pub fn tx(&self) -> Arc<str> {
self.req.tx.clone()
}
pub fn client_id(&self) -> Option<ClientId> {
self.req.client_id.clone()
}
pub fn host_id(&self) -> Uuid {
self.req.host_id
}
pub fn lineage(&self) -> &[Arc<str>] {
&self.req.lineage
}
pub fn created_at(&self) -> &str {
&self.req.created_at
}
pub fn emit_set<T>(&self, item: impl std::ops::Deref<Target = T>) -> Result<(), CommandError>
where
T: Eventable + Serialize + Clone + 'static,
{
self.emit_set_with_options(item, EventOptions::default())
}
pub fn emit_set_with_options<T>(
&self,
item: impl std::ops::Deref<Target = T>,
options: EventOptions,
) -> Result<(), CommandError>
where
T: Eventable + Serialize + Clone + 'static,
{
#[cfg(not(target_arch = "wasm32"))]
{
self.server_ctx
.set_with_options(&*item, Some(options))
.map_err(|e| CommandError {
tx: self.req.tx.to_string(),
command_id: self.command_id.to_string(),
message: e.to_string(),
})
}
#[cfg(target_arch = "wasm32")]
{
let _ = (item, options);
unreachable!();
}
}
pub fn emit_set_batch<T: Eventable + Serialize + Clone + 'static>(
&self,
items: &[T],
) -> Result<(), CommandError> {
#[cfg(not(target_arch = "wasm32"))]
{
if items.is_empty() {
return Ok(());
}
let source_id = Some(self.req.host_id.to_string());
let mut events = Vec::with_capacity(items.len());
for item in items {
let item_json = serde_json::to_value(item).map_err(|err| CommandError {
tx: self.req.tx.to_string(),
command_id: self.command_id.to_string(),
message: format!("Failed to serialize item for batch set: {}", err),
})?;
events.push(MEvent {
item: item_json,
change_type: MEventType::SET,
item_type: item.entity_type().to_string(),
created_at: self.req.created_at.to_string(),
tx: self.req.tx.to_string(),
source_id: source_id.clone(),
options: None,
});
}
self.server_ctx
.apply_event_batch(events)
.map_err(|e| CommandError {
tx: self.req.tx.to_string(),
command_id: self.command_id.to_string(),
message: e.to_string(),
})?;
Ok(())
}
#[cfg(target_arch = "wasm32")]
{
let _ = items;
unreachable!();
}
}
pub fn emit_set_any_batch<I>(&self, items: I) -> Result<(), CommandError>
where
I: IntoIterator<Item = Arc<dyn crate::item::AnyItem>>,
{
#[cfg(not(target_arch = "wasm32"))]
{
let items: Vec<_> = items.into_iter().collect();
if items.is_empty() {
return Ok(());
}
let source_id = Some(self.req.host_id.to_string());
let mut events = Vec::with_capacity(items.len());
for item in items {
events.push(MEvent {
item: item.to_value(),
change_type: MEventType::SET,
item_type: item.entity_type().to_string(),
created_at: self.req.created_at.to_string(),
tx: self.req.tx.to_string(),
source_id: source_id.clone(),
options: None,
});
}
self.server_ctx
.apply_event_batch(events)
.map_err(|e| CommandError {
tx: self.req.tx.to_string(),
command_id: self.command_id.to_string(),
message: e.to_string(),
})?;
Ok(())
}
#[cfg(target_arch = "wasm32")]
{
let _ = items;
unreachable!();
}
}
pub fn emit_del<T>(&self, item: impl std::ops::Deref<Target = T>) -> Result<(), CommandError>
where
T: Eventable + Serialize + Clone + 'static,
{
self.emit_del_with_options(item, EventOptions::default())
}
pub fn emit_del_batch<'a, T, I>(&self, items: I) -> Result<(), CommandError>
where
T: Eventable + Serialize + Clone + 'static,
I: IntoIterator<Item = &'a T>,
T: 'a,
{
#[cfg(not(target_arch = "wasm32"))]
{
let items: Vec<&T> = items.into_iter().collect();
if items.is_empty() {
return Ok(());
}
let source_id = Some(self.req.host_id.to_string());
let mut events = Vec::with_capacity(items.len());
for item in items {
let item_json = serde_json::to_value(item).map_err(|err| CommandError {
tx: self.req.tx.to_string(),
command_id: self.command_id.to_string(),
message: format!("Failed to serialize item for batch del: {}", err),
})?;
events.push(MEvent {
item: item_json,
change_type: MEventType::DEL,
item_type: item.entity_type().to_string(),
created_at: self.req.created_at.to_string(),
tx: self.req.tx.to_string(),
source_id: source_id.clone(),
options: None,
});
}
self.server_ctx
.apply_event_batch(events)
.map_err(|e| CommandError {
tx: self.req.tx.to_string(),
command_id: self.command_id.to_string(),
message: e.to_string(),
})?;
Ok(())
}
#[cfg(target_arch = "wasm32")]
{
let _ = items;
unreachable!();
}
}
pub fn emit_del_with_options<T>(
&self,
item: impl std::ops::Deref<Target = T>,
options: EventOptions,
) -> Result<(), CommandError>
where
T: Eventable + Serialize + Clone + 'static,
{
#[cfg(not(target_arch = "wasm32"))]
{
self.server_ctx
.del_with_options(&*item, Some(options))
.map_err(|e| CommandError {
tx: self.req.tx.to_string(),
command_id: self.command_id.to_string(),
message: e.to_string(),
})
}
#[cfg(target_arch = "wasm32")]
{
let _ = (item, options);
unreachable!();
}
}
pub fn emit_event_batch(&self, events: Vec<MEvent>) -> Result<usize, CommandError> {
#[cfg(not(target_arch = "wasm32"))]
{
self.server_ctx
.apply_event_batch(events)
.map_err(|e| CommandError {
tx: self.req.tx.to_string(),
command_id: self.command_id.to_string(),
message: e.to_string(),
})
}
#[cfg(target_arch = "wasm32")]
{
let _ = events;
unreachable!();
}
}
pub fn exec_query_first<Q>(&self, query: Q) -> Result<Option<Arc<Q::Item>>, CommandError>
where
Q: QueryParams,
Q::Item: DeserializeOwned + std::fmt::Debug + Send + Sync + Clone + 'static,
{
#[cfg(not(target_arch = "wasm32"))]
{
Ok(self
.server_ctx
.query_snapshot(query, self.req.clone())
.into_iter()
.next())
}
#[cfg(target_arch = "wasm32")]
{
let _ = query;
unreachable!();
}
}
pub fn exec_query<Q>(&self, query: Q) -> Result<Vec<Arc<Q::Item>>, CommandError>
where
Q: QueryParams,
Q::Item: DeserializeOwned + std::fmt::Debug + Send + Sync + Clone + 'static,
{
#[cfg(not(target_arch = "wasm32"))]
{
Ok(self
.server_ctx
.query_snapshot(query, self.req.clone())
.into_iter()
.collect())
}
#[cfg(target_arch = "wasm32")]
{
let _ = query;
unreachable!();
}
}
pub fn execute_command<C: CommandHandler>(&self, cmd: C) -> Result<C::Result, CommandError> {
cmd.execute(self.clone())
}
pub fn exec_report<R>(
&self,
report: R,
) -> Result<<R as crate::report::ReportHandler>::Output, CommandError>
where
R: crate::report::ReportParams + Clone,
{
#[cfg(not(target_arch = "wasm32"))]
{
use hyphae::Gettable;
Ok(self
.server_ctx
.report(report, self.req.clone())
.get()
.as_ref()
.clone())
}
#[cfg(target_arch = "wasm32")]
{
let _ = report;
unreachable!();
}
}
}
pub trait CommandHandler: crate::command::CommandParams {
fn execute(self, ctx: CommandContext) -> Result<Self::Result, CommandError>;
}
pub trait DynCommandExecutor: Send + Sync + 'static {
fn command_id(&self) -> &'static str;
fn execute_from_value(
&self,
command: Value,
ctx: CommandContext,
) -> Result<Value, CommandError>;
}
pub struct CommandExecutorAdapter<C: CommandHandler> {
_phantom: std::marker::PhantomData<C>,
}
impl<C: CommandHandler> CommandExecutorAdapter<C> {
pub fn new() -> Self {
Self {
_phantom: std::marker::PhantomData,
}
}
}
impl<C: CommandHandler> Default for CommandExecutorAdapter<C> {
fn default() -> Self {
Self::new()
}
}
impl<C: CommandHandler> DynCommandExecutor for CommandExecutorAdapter<C> {
fn command_id(&self) -> &'static str {
C::command_id_static()
}
fn execute_from_value(
&self,
command: Value,
ctx: CommandContext,
) -> Result<Value, CommandError> {
let cmd: C = serde_json::from_value(command).map_err(|e| CommandError {
tx: ctx.tx().to_string(),
command_id: C::command_id_static().to_string(),
message: format!("Failed to deserialize command: {}", e),
})?;
let result = cmd.execute(ctx)?;
serde_json::to_value(result).map_err(|e| CommandError {
tx: String::new(),
command_id: C::command_id_static().to_string(),
message: format!("Failed to serialize result: {}", e),
})
}
}
pub type CommandExecutorFactory = fn() -> Box<dyn DynCommandExecutor>;
pub struct CommandHandlerRegistration {
pub command_id: &'static str,
pub factory: CommandExecutorFactory,
}
inventory::collect!(CommandHandlerRegistration);
#[macro_export]
macro_rules! register_command_handler {
($cmd:ty) => {
$crate::inventory::submit! {
$crate::command::CommandHandlerRegistration {
command_id: <$cmd as $crate::command::CommandIdStatic>::COMMAND_ID,
factory: || Box::new($crate::command::CommandExecutorAdapter::<$cmd>::new()),
}
}
};
}