Skip to main content

mycommon_utils/log/
push.rs

1
2use sea_orm::prelude::DateTime;
3use redis::{streams, Commands, ErrorKind, FromRedisValue, RedisError, RedisResult, RedisWrite, ToRedisArgs, Value as RedisValue};
4use serde::{Deserialize, Serialize};
5use crate::database::BigIntPrimaryKey;
6use crate::database::config::REDIS_POOL;
7use crate::error::{Error, Result};
8
9// 消息推送类型
10#[derive(Debug, Clone, Serialize, Deserialize)]
11pub enum  MessagePushCategory {
12    LOG(SysOpeLogCache)
13}
14
15#[derive(Debug, Clone, Serialize, Deserialize, Default)]
16pub struct SysOpeLogCache {
17    pub store_id: Option<BigIntPrimaryKey>,
18    pub title: Option<String>,
19    pub business_type: Option<i32>,
20    pub method: Option<String>,
21    pub request_method: Option<String>,
22    pub operator_type: Option<i32>,
23    pub oper_name: Option<String>,
24    pub dept_name: Option<String>,
25    pub oper_url: Option<String>,
26    pub oper_ip: Option<String>,
27    pub oper_location: Option<String>,
28    pub oper_param: Option<String>,
29    pub json_result: Option<String>,
30    pub status: Option<i32>,
31    pub error_msg: Option<String>,
32    pub oper_time: Option<DateTime>,
33    pub cost_time: Option<i64>,
34}
35
36#[derive(Debug, Clone, Serialize, Deserialize)]
37pub struct  MessageRedisCache {
38    pub content: MessagePushCategory,
39}
40
41impl ToRedisArgs for MessageRedisCache {
42    fn write_redis_args<W>(&self, out: &mut W) where W: ?Sized + RedisWrite,
43    {
44        let result = serde_json::to_string(self);
45        if let Ok(result) = result {
46            out.write_arg(result.as_bytes())
47        }else {
48            tracing::error!("TemplateMessageRedisCache::write_redis_args() failed");
49        }
50    }
51}
52impl FromRedisValue for MessageRedisCache {
53    fn from_redis_value(v: &RedisValue) -> RedisResult<MessageRedisCache> {
54        match *v {
55            RedisValue::BulkString(ref bytes) => {
56                let data = String::from_utf8(bytes.to_vec());
57                if let Ok(content) = data {
58                    let result = serde_json::from_str::<MessageRedisCache>(content.as_str());
59                    if result.is_err() {
60                        return Err(RedisError::from((
61                            ErrorKind::ParseError,
62                            "解析数据失败",
63                            "MessageRedisCache 数据序列化错误".to_string(),
64                        )));
65                    }
66                    return Ok(result?);
67                }
68                Err(RedisError::from((
69                    ErrorKind::TypeError,
70                    "序列化失败",
71                    "MessageRedisCache 数据序列化错误".to_string(),
72                )))
73            }
74            _ => {
75                Err(RedisError::from((
76                    ErrorKind::TypeError,
77                    "TypeError Error",
78                    "MessageRedisCache 数据类型错误".to_string(),
79                )))
80            }
81        }
82    }
83}
84
85
86#[allow(dead_code)]
87#[derive(Default,Clone)]
88pub struct MessagePushStream{
89    pub cache_name :String,
90}
91
92#[allow(dead_code)]
93impl MessagePushStream {
94    pub fn new(cache_name:  String) -> Self {
95        MessagePushStream{
96            cache_name,
97        }
98    }
99    // stream 队列名称
100    fn keys(&self) ->String {
101        format!("{}",self.cache_name)
102    }
103
104    /// 添加到队列
105    pub fn publish(&self,item:MessageRedisCache) ->Result<String> {
106        let redis = REDIS_POOL.get();
107        if let Some(client)= redis {
108            let connect = client.get();
109            if let Ok(mut connect) = connect {
110                let queue_name = self.keys();
111                let len = streams::StreamMaxlen::Equals(100000);
112                let id = connect.xadd_maxlen(queue_name,len,"*",&[("message",item)])?;
113                return Ok(id);
114            }
115        }
116        Err(Error::ErrorRedisConnectError())
117    }
118
119}