Skip to main content

bios_basic/rbum/helper/
rbum_event_helper.rs

1use std::collections::HashMap;
2use std::future::Future;
3
4use serde::{Deserialize, Serialize};
5use tardis::basic::dto::TardisContext;
6use tardis::basic::result::TardisResult;
7#[cfg(feature = "with-mq")]
8use tardis::chrono::Utc;
9use tardis::{TardisFuns, TardisFunsInst};
10
11#[cfg(feature = "with-mq")]
12use crate::rbum::rbum_config::RbumConfigApi;
13
14const NOTIFY_EVENT_IN_CTX_FLAG: &str = "notify";
15
16pub async fn try_notifies(event_messages: Vec<NotifyEventMessage>, funs: &TardisFunsInst, ctx: &TardisContext) -> TardisResult<()> {
17    for event_message in event_messages {
18        self::try_notify(&event_message.table_name, &event_message.operate, &event_message.record_id, funs, ctx).await?;
19    }
20    Ok(())
21}
22
23#[allow(unused)]
24pub async fn try_notify<'a>(table_name: &str, operate: &str, record_id: &str, funs: &TardisFunsInst, ctx: &TardisContext) -> TardisResult<bool> {
25    #[cfg(feature = "with-mq")]
26    {
27        if funs.rbum_conf_match_event(table_name, operate) {
28            funs.mq()
29                .publish(
30                    &funs.rbum_conf_mq_topic_event(),
31                    tardis::TardisFuns::json.obj_to_string(&RbumEventMessage {
32                        table_name: table_name.to_string(),
33                        operate: operate.to_string(),
34                        operator: ctx.owner.clone(),
35                        record_id: record_id.to_string(),
36                        ts: Utc::now().timestamp_millis(),
37                    })?,
38                    &HashMap::new(),
39                )
40                .await?;
41        }
42        Ok(true)
43    }
44    #[cfg(not(feature = "with-mq"))]
45    {
46        Ok(false)
47    }
48}
49
50#[allow(unused)]
51pub async fn receive<F, T>(fun: F, funs: &TardisFunsInst) -> TardisResult<bool>
52where
53    F: Fn((HashMap<String, String>, String)) -> T + Send + Sync + 'static,
54    T: Future<Output = TardisResult<()>> + Send + 'static,
55{
56    #[cfg(feature = "with-mq")]
57    {
58        funs.mq().subscribe(&funs.rbum_conf_mq_topic_event(), fun).await?;
59        Ok(true)
60    }
61    #[cfg(not(feature = "with-mq"))]
62    {
63        Ok(false)
64    }
65}
66
67pub fn parse_message(message: String) -> TardisResult<RbumEventMessage> {
68    tardis::TardisFuns::json.str_to_obj::<RbumEventMessage>(&message)
69}
70
71pub async fn add_notify_event(table_name: &str, operate: &str, record_id: &str, ctx: &TardisContext) -> TardisResult<()> {
72    ctx.add_ext(
73        &format!("{}{}", NOTIFY_EVENT_IN_CTX_FLAG, TardisFuns::field.nanoid()),
74        &tardis::TardisFuns::json.obj_to_string(&NotifyEventMessage {
75            table_name: table_name.to_string(),
76            operate: operate.to_string(),
77            record_id: record_id.to_string(),
78        })?,
79    )
80    .await
81}
82
83pub async fn get_notify_event_with_ctx(ctx: &TardisContext) -> TardisResult<Option<Vec<NotifyEventMessage>>> {
84    let notify_events = ctx.ext.read().await;
85    let notify_events = notify_events
86        .iter()
87        .filter(|(k, _)| k.starts_with(NOTIFY_EVENT_IN_CTX_FLAG))
88        .map(|(_, v)| TardisFuns::json.str_to_obj::<NotifyEventMessage>(v))
89        .collect::<TardisResult<Vec<_>>>()?;
90    if notify_events.is_empty() {
91        Ok(None)
92    } else {
93        Ok(Some(notify_events))
94    }
95}
96
97#[derive(Clone, Debug, Deserialize, Serialize)]
98pub struct RbumEventMessage {
99    pub table_name: String,
100    pub operate: String,
101    pub operator: String,
102    pub record_id: String,
103    pub ts: i64,
104}
105
106#[derive(Clone, Debug, Deserialize, Serialize)]
107pub struct NotifyEventMessage {
108    pub table_name: String,
109    pub operate: String,
110    pub record_id: String,
111}