use proc_macro2::*;
use quote::{quote, ToTokens};
use syn::{AttributeArgs, FnArg, ItemFn, Lit, Meta, NestedMeta};
use crate::helpers::*;
use crate::proc_macro::TokenStream;
pub(crate) fn impl_topic(target_fn: &ItemFn, args: &AttributeArgs) -> TokenStream {
let return_ty = find_return_type(target_fn);
let func_name_ident = target_fn.sig.ident.to_token_stream();
let func_name = func_name_ident.to_string();
let mut _aggregate_ident = "".to_token_stream();
let mut _aggregate_name = String::new();
let mut aggregate_ty_ident = "".to_token_stream();
let mut _aggregate_ty_name = String::new();
let mut event_ident = "".to_token_stream();
let mut _event_name = String::new();
let mut event_ty_ident = "".to_token_stream();
let mut event_ty_name = String::new();
for x in &target_fn.sig.inputs {
match x {
FnArg::Receiver(_) => {}
FnArg::Typed(t) => {
if is_aggregate_ref(t) {
_aggregate_ident = t.pat.to_token_stream();
_aggregate_name = _aggregate_ident.to_string();
aggregate_ty_ident = t.ty.to_token_stream();
_aggregate_ty_name = aggregate_ty_ident.to_string();
}
if is_domain_event_ref(t) {
event_ident = t.pat.to_token_stream();
_event_name = event_ident.to_string();
event_ty_ident = t.ty.to_token_stream();
event_ty_name = event_ty_ident.to_string();
}
}
}
}
let mut topic_name = String::new();
let mut pubsub_name = String::new();
let mut metadata = String::new();
for arg in args {
match arg {
NestedMeta::Meta(Meta::NameValue(nv)) => {
let key = nv.path.get_ident().map(|i| i.to_string());
if let Some(key) = key {
if let Lit::Str(lit) = &nv.lit {
match key.as_str() {
"name" => topic_name = lit.value(),
"pubsub" => pubsub_name = lit.value(),
"metadata" => metadata = lit.value(),
other => {
return syn::Error::new_spanned(
&nv.path,
format!("#[topic] 不支持的参数 `{}`,支持: name, pubsub, metadata", other)
).to_compile_error().into();
}
}
} else {
return syn::Error::new_spanned(
&nv.lit,
format!("#[topic] 参数 `{}` 的值必须是字符串", key)
).to_compile_error().into();
}
}
}
_ => {
return syn::Error::new_spanned(
arg,
"#[topic] 参数格式错误,应为 key = \"value\" 形式,如: #[topic(name = \"...\", pubsub = \".\"..\".\")]"
).to_compile_error().into();
}
}
}
let topic_ident = if topic_name.is_empty() {
quote! {
#aggregate_ty_ident::atype().to_string()
}
} else {
quote! {
#topic_name.to_string()
}
};
if pubsub_name.is_empty() {
pubsub_name = "messagebus".to_string();
}
let (metadata_ident, metadata_hash_map) = if metadata.is_empty() {
(quote! {None}, quote! {})
} else {
let mut metadata_hash_map = quote! {
let mut metadata: HashMap<String, String> = HashMap::new();
};
for item in metadata.split(',') {
let item = item.trim();
if item.is_empty() {
continue;
}
let mut parts = item.splitn(2, '=');
let k = parts.next().unwrap().trim();
let v = match parts.next() {
Some(v) => v.trim(),
None => {
return syn::Error::new(
Span::call_site(),
format!("#[topic] metadata 格式错误: `{}` 应为 `key=value` 格式", item)
).to_compile_error().into();
}
};
metadata_hash_map = quote! {
#metadata_hash_map
metadata.insert(#k.to_string(), #v.to_string());
};
}
(quote! {Some(metadata)}, metadata_hash_map)
};
#[cfg(feature = "debug_mode")]
if cfg!(debug_assertions){
println!("{}{}{}{}{}{}{}{}{}{}", topic_ident, metadata_ident, _aggregate_ident, _aggregate_name, aggregate_ty_ident, _aggregate_ty_name, event_ident, _event_name, event_ty_ident, event_ty_name);
}
let func_args_stream = target_fn.sig.inputs.to_token_stream();
let fn_body = find_fn_body(target_fn);
let is_async = target_fn.sig.asyncness.is_some();
if !is_async {
panic!(
"[genies] #[topic] 'fn {}({})' must be async fn! ",
func_name_ident, func_args_stream
);
}
let dapr_config_wrap_fn_name = format!("{}_dapr", func_name_ident);
let dapr_config_wrap_fn = Ident::new(&dapr_config_wrap_fn_name, Span::call_site());
let m_struct_name = format!("{}_hoop", func_name_ident);
let m_salvo_wrap=Ident::new(&m_struct_name, Span::call_site());
let wrap_url = format!("/daprsub/consumers").to_lowercase();
let dapr_config_wrap_code = quote! {
pub fn #dapr_config_wrap_fn() -> genies::dapr::pubsub::DaprTopicSubscription{
use std::collections::HashMap;
#metadata_hash_map
let dapr_topic_subscription = genies::dapr::pubsub::DaprTopicSubscription {
pubsub_name: Some(#pubsub_name.to_string()),
topic: Some(#topic_ident),
route: Some(#wrap_url.to_string()),
routes: None,
metadata: #metadata_ident,
};
return dapr_topic_subscription;
}
};
let handler_code = quote! {
pub async fn #func_name_ident(tx: &mut dyn rbatis::executor::Executor,#event_ident:#event_ty_ident) -> #return_ty
#fn_body
};
let salvo_code=quote!{
use salvo::prelude::*;
#[handler]
pub async fn #m_salvo_wrap(_req: &mut Request, _depot: &mut Depot, _res: &mut Response){
use std::time::Duration;
use genies::ddd::event::DomainEvent;
let CONSUME_STATUS_CONSUMING: String = "CONSUMING".to_string();
let CONSUME_STATUS_CONSUMED: String = "CONSUMED".to_string();
let body=_req.payload().await.unwrap();
let body=std::str::from_utf8(body).unwrap();
log::debug!("原始:{}",body);
let processing_expire_seconds = genies::context::CONTEXT.config.processing_expire_seconds as u64;
let record_reserve_minutes = genies::context::CONTEXT.config.record_reserve_minutes as u64;
let cloud_event: genies::dapr::cloud_event::CloudEvent = serde_json::from_str(&body).unwrap_or_default();
log::debug!("{:?}",cloud_event);
let message_imp: genies::ddd::message::MessageImpl = serde_json::from_value(cloud_event.data).unwrap_or_default();
let payload = message_imp.payload;
let headers = message_imp.headers;
let event_type = headers.event_type.clone().unwrap_or_default();
let subed_type = #event_ty_ident::default();
if subed_type.event_type() != event_type{
log::debug!("不是订阅的事件类型,不进行处理。");
}else {
let event: #event_ty_ident = serde_json::from_str(&payload).unwrap();
log::debug!("匹配到事件类型,事件对象为:{:?}", event);
let mut tx = genies::context::CONTEXT
.rbatis
.acquire_begin()
.await
.unwrap()
.defer_async(|mut tx| async move{
if !tx.done() {
let r = tx.rollback().await;
if let Err(e) = r {
log::error!("transaction [{}] rollback fail={}", tx.tx_id, e);
} else {
log::info!("transaction [{}] rollback", tx.tx_id);
}
}else {
log::debug!("事务正常操作成功");
}
});
let hander_name=#func_name;
let server_name=genies::context::CONTEXT.config.server_name.clone();
let event_type_name=#event_ty_name;
let key = format!("{}-{}-{}-{}",server_name,hander_name,event_type_name, headers.ID.clone().unwrap());
let acquired = genies::context::CONTEXT
.redis_save_service
.set_string_ex_nx(&key, &CONSUME_STATUS_CONSUMING, Some(Duration::new(processing_expire_seconds, 0)))
.await
.unwrap();
log::debug!("当前事件redis key = {}, 原子抢占结果 = {}", key, acquired);
if acquired {
log::debug!("3设置事件为正在消费中,开始调用事件处理程序,key={}",key);
let event_handle = #func_name_ident(&mut tx, event).await;
if event_handle.is_ok() {
log::debug!("4事件处理程序处理成功,key={}",key);
let set_CONSUMED = genies::context::CONTEXT
.redis_save_service
.set_string_ex(
&key,
&CONSUME_STATUS_CONSUMED,
Some(Duration::new(record_reserve_minutes * 60, 0)),
)
.await;
if set_CONSUMED.is_ok(){
let _ = tx.commit().await;
}else {
let _ = tx.rollback().await;
_depot.insert("is_retry", "true");
}
}else {
let _ = tx.rollback().await;
let _ = genies::context::CONTEXT.redis_save_service.del_string(&key).await;
_depot.insert("is_retry", "true");
}
} else {
let v = genies::context::CONTEXT.redis_save_service.get_string(&key).await.unwrap();
log::debug!("当前事件已被其他实例处理,当前状态 value={:?}", v);
if v.eq(&CONSUME_STATUS_CONSUMED) {
let _ = tx.rollback().await;
log::debug!("2事件已完成,key={}",key);
} else {
let _ = tx.rollback().await;
_depot.insert("is_retry", "true");
log::debug!("1事件正在处理中,事件将进行重发,key={}",key);
}
}
}
}
};
let get_hoop_name=format!("{}_router", m_struct_name);
let get_hoop_router=Ident::new(&get_hoop_name, Span::call_site());
let topic_registry_code = quote! {
pub fn #get_hoop_router()->Router{
Router::with_path("/daprsub/consumers").hoop(#m_salvo_wrap)
}
genies::context::inventory::submit!{
genies::dapr::topicpoint::Topicpoint::new(#dapr_config_wrap_fn,#get_hoop_router)
}
};
let gen_token_temple = quote! {
#handler_code
#salvo_code
#dapr_config_wrap_code
#topic_registry_code
};
gen_token_temple.into()
}