bios_basic/rbum/helper/
rbum_event_helper.rs1use std::collections::HashMap;
2use std::future::Future;
3
4use serde::{Deserialize, Serialize};
5use tardis::basic::dto::TardisContext;
6use tardis::basic::result::TardisResult;
7#[cfg(feature = "with-mq")]
8use tardis::chrono::Utc;
9use tardis::{TardisFuns, TardisFunsInst};
10
11#[cfg(feature = "with-mq")]
12use crate::rbum::rbum_config::RbumConfigApi;
13
14const NOTIFY_EVENT_IN_CTX_FLAG: &str = "notify";
15
16pub async fn try_notifies(event_messages: Vec<NotifyEventMessage>, funs: &TardisFunsInst, ctx: &TardisContext) -> TardisResult<()> {
17 for event_message in event_messages {
18 self::try_notify(&event_message.table_name, &event_message.operate, &event_message.record_id, funs, ctx).await?;
19 }
20 Ok(())
21}
22
23#[allow(unused)]
24pub async fn try_notify<'a>(table_name: &str, operate: &str, record_id: &str, funs: &TardisFunsInst, ctx: &TardisContext) -> TardisResult<bool> {
25 #[cfg(feature = "with-mq")]
26 {
27 if funs.rbum_conf_match_event(table_name, operate) {
28 funs.mq()
29 .publish(
30 &funs.rbum_conf_mq_topic_event(),
31 tardis::TardisFuns::json.obj_to_string(&RbumEventMessage {
32 table_name: table_name.to_string(),
33 operate: operate.to_string(),
34 operator: ctx.owner.clone(),
35 record_id: record_id.to_string(),
36 ts: Utc::now().timestamp_millis(),
37 })?,
38 &HashMap::new(),
39 )
40 .await?;
41 }
42 Ok(true)
43 }
44 #[cfg(not(feature = "with-mq"))]
45 {
46 Ok(false)
47 }
48}
49
50#[allow(unused)]
51pub async fn receive<F, T>(fun: F, funs: &TardisFunsInst) -> TardisResult<bool>
52where
53 F: Fn((HashMap<String, String>, String)) -> T + Send + Sync + 'static,
54 T: Future<Output = TardisResult<()>> + Send + 'static,
55{
56 #[cfg(feature = "with-mq")]
57 {
58 funs.mq().subscribe(&funs.rbum_conf_mq_topic_event(), fun).await?;
59 Ok(true)
60 }
61 #[cfg(not(feature = "with-mq"))]
62 {
63 Ok(false)
64 }
65}
66
67pub fn parse_message(message: String) -> TardisResult<RbumEventMessage> {
68 tardis::TardisFuns::json.str_to_obj::<RbumEventMessage>(&message)
69}
70
71pub async fn add_notify_event(table_name: &str, operate: &str, record_id: &str, ctx: &TardisContext) -> TardisResult<()> {
72 ctx.add_ext(
73 &format!("{}{}", NOTIFY_EVENT_IN_CTX_FLAG, TardisFuns::field.nanoid()),
74 &tardis::TardisFuns::json.obj_to_string(&NotifyEventMessage {
75 table_name: table_name.to_string(),
76 operate: operate.to_string(),
77 record_id: record_id.to_string(),
78 })?,
79 )
80 .await
81}
82
83pub async fn get_notify_event_with_ctx(ctx: &TardisContext) -> TardisResult<Option<Vec<NotifyEventMessage>>> {
84 let notify_events = ctx.ext.read().await;
85 let notify_events = notify_events
86 .iter()
87 .filter(|(k, _)| k.starts_with(NOTIFY_EVENT_IN_CTX_FLAG))
88 .map(|(_, v)| TardisFuns::json.str_to_obj::<NotifyEventMessage>(v))
89 .collect::<TardisResult<Vec<_>>>()?;
90 if notify_events.is_empty() {
91 Ok(None)
92 } else {
93 Ok(Some(notify_events))
94 }
95}
96
97#[derive(Clone, Debug, Deserialize, Serialize)]
98pub struct RbumEventMessage {
99 pub table_name: String,
100 pub operate: String,
101 pub operator: String,
102 pub record_id: String,
103 pub ts: i64,
104}
105
106#[derive(Clone, Debug, Deserialize, Serialize)]
107pub struct NotifyEventMessage {
108 pub table_name: String,
109 pub operate: String,
110 pub record_id: String,
111}