mycommon-utils 0.1.2

Common utilities library for database operations, Redis caching and system utilities
Documentation

use sea_orm::prelude::DateTime;
use redis::{streams, Commands, ErrorKind, FromRedisValue, RedisError, RedisResult, RedisWrite, ToRedisArgs, Value as RedisValue};
use serde::{Deserialize, Serialize};
use crate::database::BigIntPrimaryKey;
use crate::database::config::REDIS_POOL;
use crate::error::{Error, Result};

// 消息推送类型
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum  MessagePushCategory {
    LOG(SysOpeLogCache)
}

#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct SysOpeLogCache {
    pub store_id: Option<BigIntPrimaryKey>,
    pub title: Option<String>,
    pub business_type: Option<i32>,
    pub method: Option<String>,
    pub request_method: Option<String>,
    pub operator_type: Option<i32>,
    pub oper_name: Option<String>,
    pub dept_name: Option<String>,
    pub oper_url: Option<String>,
    pub oper_ip: Option<String>,
    pub oper_location: Option<String>,
    pub oper_param: Option<String>,
    pub json_result: Option<String>,
    pub status: Option<i32>,
    pub error_msg: Option<String>,
    pub oper_time: Option<DateTime>,
    pub cost_time: Option<i64>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct  MessageRedisCache {
    pub content: MessagePushCategory,
}

impl ToRedisArgs for MessageRedisCache {
    fn write_redis_args<W>(&self, out: &mut W) where W: ?Sized + RedisWrite,
    {
        let result = serde_json::to_string(self);
        if let Ok(result) = result {
            out.write_arg(result.as_bytes())
        }else {
            tracing::error!("TemplateMessageRedisCache::write_redis_args() failed");
        }
    }
}
impl FromRedisValue for MessageRedisCache {
    fn from_redis_value(v: &RedisValue) -> RedisResult<MessageRedisCache> {
        match *v {
            RedisValue::BulkString(ref bytes) => {
                let data = String::from_utf8(bytes.to_vec());
                if let Ok(content) = data {
                    let result = serde_json::from_str::<MessageRedisCache>(content.as_str());
                    if result.is_err() {
                        return Err(RedisError::from((
                            ErrorKind::ParseError,
                            "解析数据失败",
                            "MessageRedisCache 数据序列化错误".to_string(),
                        )));
                    }
                    return Ok(result?);
                }
                Err(RedisError::from((
                    ErrorKind::TypeError,
                    "序列化失败",
                    "MessageRedisCache 数据序列化错误".to_string(),
                )))
            }
            _ => {
                Err(RedisError::from((
                    ErrorKind::TypeError,
                    "TypeError Error",
                    "MessageRedisCache 数据类型错误".to_string(),
                )))
            }
        }
    }
}


#[allow(dead_code)]
#[derive(Default,Clone)]
pub struct MessagePushStream{
    pub cache_name :String,
}

#[allow(dead_code)]
impl MessagePushStream {
    pub fn new(cache_name:  String) -> Self {
        MessagePushStream{
            cache_name,
        }
    }
    // stream 队列名称
    fn keys(&self) ->String {
        format!("{}",self.cache_name)
    }

    /// 添加到队列
    pub fn publish(&self,item:MessageRedisCache) ->Result<String> {
        let redis = REDIS_POOL.get();
        if let Some(client)= redis {
            let connect = client.get();
            if let Ok(mut connect) = connect {
                let queue_name = self.keys();
                let len = streams::StreamMaxlen::Equals(100000);
                let id = connect.xadd_maxlen(queue_name,len,"*",&[("message",item)])?;
                return Ok(id);
            }
        }
        Err(Error::ErrorRedisConnectError())
    }

}