use crate::{
UniResponse,
app::Context,
domain::{Aggregate, CommandEnum, EventEnum},
};
use rkyv::{
Archive, Deserialize,
de::Pool,
rancor::{Error, Strategy},
};
use tokio::sync::{mpsc::error::SendError, oneshot};
use tracing::{error, info, instrument};
use uuid::Uuid;
pub trait Sender<A, C, E>: Sized + 'static
where
A: Aggregate,
C: CommandEnum<A = A, E = E>,
<C as Archive>::Archived: Deserialize<C, Strategy<Pool, Error>>,
E: EventEnum<A = A>,
<E as Archive>::Archived: Deserialize<E, Strategy<Pool, Error>>,
{
#[doc(hidden)]
fn new(ctx: &'static Context) -> impl Future<Output = Result<Self, String>>;
fn agg_type(&self) -> &'static str;
fn send(&self, todo: Todo<A, C, E>) -> Result<(), SendError<Todo<A, C, E>>>;
#[inline]
fn create(&self, com_id: Uuid, com: C) -> impl Future<Output = UniResponse> {
async move {
let agg_id = Uuid::new_v4();
self.change(agg_id, com_id, com).await
}
}
#[instrument(name = "send_command", skip_all, fields(agg_type = self.agg_type(), %agg_id, %com_id))]
fn change(&self, agg_id: Uuid, com_id: Uuid, com: C) -> impl Future<Output = UniResponse> {
async move {
let (res_tx, res_rx) = oneshot::channel::<UniResponse>();
if let Err(e) = self.send(Todo::Reply {
agg_id,
com_id,
com,
res_tx,
}) {
error!("聚合命令请求反馈错误:{e}");
panic!("响应处理器停止工作");
}
info!("发送聚合命令");
match res_rx.await {
Ok(res) => {
info!("聚合命令收到反馈:{res}");
res
}
Err(e) => {
error!("聚合命令接收反馈错误:{e}");
UniResponse::Timeout
}
}
}
}
}
pub enum Todo<A, C, E>
where
A: Aggregate,
C: CommandEnum<A = A, E = E>,
<C as Archive>::Archived: Deserialize<C, Strategy<Pool, Error>>,
E: EventEnum<A = A>,
<E as Archive>::Archived: Deserialize<E, Strategy<Pool, Error>>,
{
Reply {
agg_id: Uuid,
com_id: Uuid,
com: C,
res_tx: oneshot::Sender<UniResponse>,
},
Response {
com_id: Uuid,
res: UniResponse,
},
}
#[macro_export]
macro_rules! route_builder {
($agg:ident, $format:ty, [$($cr:ident), *], [$($ch:ident), *]) => {{
let mut router = Router::new();
$(
router = router.route(
concat!("/", stringify!($agg), "/", stringify!($cr), "/{com_id}"),
post($cr::<$format>),
);
)*
$(
router = router.route(
concat!("/", stringify!($agg), "/", stringify!($ch), "/{agg_id}/{com_id}"),
post($ch::<$format>),
);
)*
router
}};
}
#[doc(hidden)]
#[cfg(any(test, feature = "test-utils"))]
pub fn create(path: &str, op: &str, com_id: uuid::Uuid) -> String {
format!("{path}/{op}/{com_id}")
}
#[doc(hidden)]
#[cfg(any(test, feature = "test-utils"))]
pub fn change(path: &str, op: &str, agg_id: uuid::Uuid, com_id: uuid::Uuid) -> String {
format!("{path}/{op}/{agg_id}/{com_id}")
}