genies_derive 1.6.0

A powerful derive macro for configuration management in Rust
Documentation
/*
 * @Author: tzw
 * @Date: 2021-11-01 00:25:38
 * @LastEditors: tzw
 * @LastEditTime: 2021-12-16 00:49:16
 */

use proc_macro2::*;
use quote::{quote, ToTokens};
use syn::{AttributeArgs, FnArg, ItemFn, Lit, Meta, NestedMeta};

use crate::helpers::*;
use crate::proc_macro::TokenStream;

/// impl the topic macro
pub(crate) fn impl_topic(target_fn: &ItemFn, args: &AttributeArgs) -> TokenStream {
    let return_ty = find_return_type(target_fn);
    let func_name_ident = target_fn.sig.ident.to_token_stream();
    let func_name = func_name_ident.to_string();

    let mut _aggregate_ident = "".to_token_stream();
    let mut _aggregate_name = String::new();
    let mut aggregate_ty_ident = "".to_token_stream();
    let mut _aggregate_ty_name = String::new();
    let mut event_ident = "".to_token_stream();
    let mut _event_name = String::new();
    let mut event_ty_ident = "".to_token_stream();
    let mut event_ty_name = String::new();

    for x in &target_fn.sig.inputs {
        match x {
            FnArg::Receiver(_) => {}
            FnArg::Typed(t) => {
                if is_aggregate_ref(t) {
                    _aggregate_ident = t.pat.to_token_stream();
                    _aggregate_name = _aggregate_ident.to_string();
                    aggregate_ty_ident = t.ty.to_token_stream();
                    _aggregate_ty_name = aggregate_ty_ident.to_string();
                }
                if is_domain_event_ref(t) {
                    event_ident = t.pat.to_token_stream();
                    _event_name = event_ident.to_string();
                    event_ty_ident = t.ty.to_token_stream();
                    event_ty_name = event_ty_ident.to_string();
                }
            }
        }
    }

    let mut topic_name = String::new();
    let mut pubsub_name = String::new();
    let mut metadata = String::new();

    for arg in args {
        match arg {
            NestedMeta::Meta(Meta::NameValue(nv)) => {
                let key = nv.path.get_ident().map(|i| i.to_string());
                if let Some(key) = key {
                    if let Lit::Str(lit) = &nv.lit {
                        match key.as_str() {
                            "name" => topic_name = lit.value(),
                            "pubsub" => pubsub_name = lit.value(),
                            "metadata" => metadata = lit.value(),
                            other => {
                                return syn::Error::new_spanned(
                                    &nv.path,
                                    format!("#[topic] 不支持的参数 `{}`,支持: name, pubsub, metadata", other)
                                ).to_compile_error().into();
                            }
                        }
                    } else {
                        return syn::Error::new_spanned(
                            &nv.lit,
                            format!("#[topic] 参数 `{}` 的值必须是字符串", key)
                        ).to_compile_error().into();
                    }
                }
            }
            _ => {
                return syn::Error::new_spanned(
                    arg,
                    "#[topic] 参数格式错误,应为 key = \"value\" 形式,如: #[topic(name = \"...\", pubsub = \".\"..\".\")]"
                ).to_compile_error().into();
            }
        }
    }

    let topic_ident = if topic_name.is_empty() {
        quote! {
            #aggregate_ty_ident::atype().to_string()
       }
    } else {
        quote! {
           #topic_name.to_string()
       }
    };

    if pubsub_name.is_empty() {
        pubsub_name = "messagebus".to_string();
    }

    let (metadata_ident, metadata_hash_map) = if metadata.is_empty() {
        (quote! {None}, quote! {})
    } else {
        let mut metadata_hash_map = quote! {
            let mut metadata: HashMap<String, String> = HashMap::new();
        };
        for item in metadata.split(',') {
            let item = item.trim();
            if item.is_empty() {
                continue;
            }
            let mut parts = item.splitn(2, '=');
            let k = parts.next().unwrap().trim();
            let v = match parts.next() {
                Some(v) => v.trim(),
                None => {
                    return syn::Error::new(
                        Span::call_site(),
                        format!("#[topic] metadata 格式错误: `{}` 应为 `key=value` 格式", item)
                    ).to_compile_error().into();
                }
            };
            metadata_hash_map = quote! {
                #metadata_hash_map
                metadata.insert(#k.to_string(), #v.to_string());
            };
        }
        (quote! {Some(metadata)}, metadata_hash_map)
    };

    #[cfg(feature = "debug_mode")]
        if cfg!(debug_assertions){
            println!("{}{}{}{}{}{}{}{}{}{}", topic_ident, metadata_ident, _aggregate_ident, _aggregate_name, aggregate_ty_ident, _aggregate_ty_name, event_ident, _event_name, event_ty_ident, event_ty_name);
        }

    let func_args_stream = target_fn.sig.inputs.to_token_stream();
    let fn_body = find_fn_body(target_fn);
    let is_async = target_fn.sig.asyncness.is_some();
    if !is_async {
        panic!(
            "[genies] #[topic] 'fn {}({})' must be  async fn! ",
            func_name_ident, func_args_stream
        );
    }

    let dapr_config_wrap_fn_name = format!("{}_dapr", func_name_ident);
    let dapr_config_wrap_fn = Ident::new(&dapr_config_wrap_fn_name, Span::call_site());

    let m_struct_name = format!("{}_hoop", func_name_ident);
    let m_salvo_wrap=Ident::new(&m_struct_name, Span::call_site());

    let wrap_url = format!("/daprsub/consumers").to_lowercase();

    let dapr_config_wrap_code = quote! {
        pub  fn #dapr_config_wrap_fn() -> genies::dapr::pubsub::DaprTopicSubscription{
            use std::collections::HashMap;
            #metadata_hash_map
           let dapr_topic_subscription = genies::dapr::pubsub::DaprTopicSubscription {
                    pubsub_name: Some(#pubsub_name.to_string()),
                    topic: Some(#topic_ident),
                    route: Some(#wrap_url.to_string()),
                    routes: None,
                    metadata: #metadata_ident,
                   };
            return dapr_topic_subscription;
       }
    };

    let handler_code = quote! {
        pub async fn #func_name_ident(tx: &mut dyn rbatis::executor::Executor,#event_ident:#event_ty_ident) -> #return_ty
            #fn_body
    };

    let salvo_code=quote!{
         use salvo::prelude::*;
         #[handler]
         pub async fn #m_salvo_wrap(_req: &mut Request, _depot: &mut Depot, _res: &mut Response){
            use std::time::Duration;
            use genies::ddd::event::DomainEvent;
           // use ddd_dapr::cloud_event::CloudEvent;
            let CONSUME_STATUS_CONSUMING: String = "CONSUMING".to_string();
            let CONSUME_STATUS_CONSUMED: String = "CONSUMED".to_string();

            // let body=_req.body();
            
            let body=_req.payload().await.unwrap();

            let body=std::str::from_utf8(body).unwrap();

            log::debug!("原始:{}",body);


            let processing_expire_seconds = genies::context::CONTEXT.config.processing_expire_seconds as u64;
            let record_reserve_minutes = genies::context::CONTEXT.config.record_reserve_minutes as u64;

            let cloud_event: genies::dapr::cloud_event::CloudEvent = serde_json::from_str(&body).unwrap_or_default();
            //let cloud_event =_req.parse_json::<ddd_dapr::dapr::cloud_event::CloudEvent>().await.unwrap_or_default();

            log::debug!("{:?}",cloud_event);
            let message_imp: genies::ddd::message::MessageImpl = serde_json::from_value(cloud_event.data).unwrap_or_default();

            let payload = message_imp.payload;
            let headers = message_imp.headers;

            let event_type = headers.event_type.clone().unwrap_or_default();
            let subed_type = #event_ty_ident::default();

            // subed_type.event_type() 中这个 event_type() 是 ddd_dapr::event::DomainEvent 中的方法
            if subed_type.event_type() != event_type{
                log::debug!("不是订阅的事件类型,不进行处理。");
            }else {
                let event: #event_ty_ident = serde_json::from_str(&payload).unwrap();
                log::debug!("匹配到事件类型,事件对象为:{:?}", event);

                let mut tx = genies::context::CONTEXT
                                .rbatis
                                .acquire_begin()
                                .await
                                .unwrap()
                                .defer_async(|mut tx| async move{
                                    if !tx.done() {
                                        // tx.rollback().await;
                                        // log::error!("事务没有正常操作,自动进行Rollback");
                                         let r = tx.rollback().await;
                                         if let Err(e) = r {
                                               log::error!("transaction [{}] rollback fail={}", tx.tx_id, e);
                                                   } else {
                                            log::info!("transaction [{}] rollback", tx.tx_id);
                                                }
                                    }else {
                                        log::debug!("事务正常操作成功");
                                    }
                                });

                let hander_name=#func_name;
                let server_name=genies::context::CONTEXT.config.server_name.clone();
                let event_type_name=#event_ty_name;
                let key = format!("{}-{}-{}-{}",server_name,hander_name,event_type_name, headers.ID.clone().unwrap());
                
                // 使用原子操作尝试抢占消费权
                let acquired = genies::context::CONTEXT
                    .redis_save_service
                    .set_string_ex_nx(&key, &CONSUME_STATUS_CONSUMING, Some(Duration::new(processing_expire_seconds, 0)))
                    .await
                    .unwrap();
                
                log::debug!("当前事件redis key = {}, 原子抢占结果 = {}", key, acquired);
                
                if acquired {
                    // 抢到消费权,执行业务处理
                    log::debug!("3设置事件为正在消费中,开始调用事件处理程序,key={}",key);
                    let event_handle = #func_name_ident(&mut tx, event).await;
                    // 如果事件消费成功 设置redis消费 状态为 消费完成
                    if event_handle.is_ok() {
                        log::debug!("4事件处理程序处理成功,key={}",key);

                        let set_CONSUMED = genies::context::CONTEXT
                            .redis_save_service
                            .set_string_ex(
                                &key,
                                &CONSUME_STATUS_CONSUMED,
                                Some(Duration::new(record_reserve_minutes * 60, 0)),
                            )
                            .await;

                        // redis 消费状态更新成功了,才提交数据库事务
                        if set_CONSUMED.is_ok(){
                            tx.commit().await;
                        }else {
                            // redis 消费状态更新失败了,数据库事务rollback,让dapr 重发消息
                            tx.rollback().await;
                           _depot.insert("is_retry", "true");
                        }
                    }else {
                        // 如果事件处理程序处理失败
                        tx.rollback().await;
                        genies::context::CONTEXT.redis_save_service.del_string(&key).await;
                        _depot.insert("is_retry", "true");
                    }
                } else {
                    // 没抢到消费权,key 已存在,查看当前状态
                    let v = genies::context::CONTEXT.redis_save_service.get_string(&key).await.unwrap();
                    log::debug!("当前事件已被其他实例处理,当前状态 value={:?}", v);
                    
                    if v.eq(&CONSUME_STATUS_CONSUMED) {
                        // 已经消费完成,跳过
                        tx.rollback().await;
                        log::debug!("2事件已完成,key={}",key);
                    } else {
                        // 正在被其他实例消费中,或其他情况,设置重试
                        tx.rollback().await;
                        _depot.insert("is_retry", "true");
                        log::debug!("1事件正在处理中,事件将进行重发,key={}",key);
                    }
                }
                
            }
        }
    };

    let get_hoop_name=format!("{}_router", m_struct_name);
    let get_hoop_router=Ident::new(&get_hoop_name, Span::call_site());
    
    let topic_registry_code = quote! {

      pub fn #get_hoop_router()->Router{
            Router::with_path("/daprsub/consumers").hoop(#m_salvo_wrap)
        }

        genies::context::inventory::submit!{
            genies::dapr::topicpoint::Topicpoint::new(#dapr_config_wrap_fn,#get_hoop_router)
        }

};
    
    let gen_token_temple = quote! {
         #handler_code
         #salvo_code
         #dapr_config_wrap_code

         #topic_registry_code
    };
    gen_token_temple.into()
}