1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
use super::mobc_error::MobcError;
use crate::config::cache::mobc_error::Error;
use crate::config::cache::mobc_error::MobcError::{RedisCMDError, RedisTypeError};
use log::error;
use redis::RedisError;
use redis::{Commands, Connection, FromRedisValue, Value};
use std::env;

pub fn get_con() -> Connection {
    let config_redis_string: String = env::var("REDIS_URL").expect("redis url missing");
    let redis_client =
        redis::Client::open(config_redis_string.as_str()).expect("get redis client failed");
    let con = redis_client
        .get_connection()
        .expect("get redis connection failed");
    return con;
}

pub fn get_redis_conn(conn_url: &str) -> Connection {
    let redis_client = redis::Client::open(conn_url).expect("get redis client by url failed");
    let con = redis_client
        .get_connection()
        .expect("get redis connection by url failed");
    return con;
}

pub fn set_value(key: &str, value: &str, ttl_seconds: usize) -> Result<String, MobcError> {
    let mut redis_conn = get_con();
    let set_result: Result<String, MobcError> = redis_conn
        .set_ex(key, value, ttl_seconds)
        .map_err(RedisCMDError);
    return set_result;
}

pub fn set_str_with_conn(con: &mut Connection, key: &str, value: &str, ttl_seconds: usize) {
    let set_result: Result<i32, MobcError> = con.set(key, value).map_err(RedisCMDError);
    match set_result {
        Ok(_) => {
            if ttl_seconds > 0 {
                let expire_result = con
                    .expire::<&str, usize>(key, ttl_seconds)
                    .map_err(RedisCMDError);
                match expire_result {
                    Ok(_) => {}
                    Err(_) => {
                        error!("expire redis {} value error", key);
                    }
                }
            }
        }
        Err(e) => {
            error!("with conn set redis {} value error, {}", key, e);
        }
    }
}

pub fn set_str(key: &str, value: &str, ttl_seconds: usize) {
    let mut con = get_con();
    let set_result: Result<String, MobcError> = con.set(key, value).map_err(RedisCMDError);
    match set_result {
        Ok(_) => {
            if ttl_seconds > 0 {
                let expire_result = con
                    .expire::<&str, usize>(key, ttl_seconds)
                    .map_err(RedisCMDError);
                match expire_result {
                    Ok(_) => {}
                    Err(_) => {
                        error!("expire redis {} value error", key);
                    }
                }
            }
        }
        Err(e) => {
            error!("set redis {} value error, {}", key, e);
        }
    }
}

pub fn get_str_default(key: &str) -> Result<Option<String>, Error> {
    let mut redis_conn = get_con();
    let value = redis_conn.get(key).map_err(RedisCMDError)?;
    if Value::Nil == value {
        return Ok(None);
    }
    FromRedisValue::from_redis_value(&value).map_err(|e| RedisTypeError(e).into())
}

pub async fn get_str(con: &mut Connection, key: &str) -> Result<Option<String>, Error> {
    let value = con.get(key).map_err(RedisCMDError)?;
    if Value::Nil == value {
        return Ok(None);
    }
    FromRedisValue::from_redis_value(&value).map_err(|e| RedisTypeError(e).into())
}

pub fn del_redis_key(key: &str) -> Result<(), Error> {
    let mut redis_conn_unwrap = get_con();
    let del_result = redis_conn_unwrap.del(key).map_err(RedisCMDError)?;
    FromRedisValue::from_redis_value(&del_result).map_err(|e| RedisTypeError(e).into())
}

pub fn incre_redis_key(key: &str, incre_value: i32) -> Result<(), Error> {
    let mut redis_conn_unwrap = get_con();
    let incr_result = redis_conn_unwrap
        .incr(key, incre_value)
        .map_err(RedisCMDError)?;
    FromRedisValue::from_redis_value(&incr_result).map_err(|e| RedisTypeError(e).into())
}

pub fn sync_get_str(key: &str) -> Option<String> {
    let mut connection = get_con();
    let redis_result = connection.get(key);
    if let Err(e) = redis_result {
        error!("get redis key failed,{}", e);
        return None;
    }
    return redis_result.unwrap();
}

pub fn get_list_size(key: &str) -> Result<usize, Error> {
    let mut connection = get_con();
    let size = connection.llen(key).unwrap();
    Ok(size)
}

pub fn push_data_to_stream(stream_key: &str, params: &[(&str, &str)]) {
    let mut connection = get_con();
    let result = connection.xadd::<&str, &str, &str, &str, String>(stream_key, "*", params);
    match result {
        Ok(_) => {}
        Err(e) => {
            error!("Couldn't send to redis stream,{}", e);
        }
    }
}

pub fn push_to_stream(stream_key: &str, params: &[(&str, &str)]) -> Result<String, RedisError> {
    let mut connection = get_con();
    connection.xadd::<&str, &str, &str, &str, String>(stream_key, "*", params)
}

pub fn push_to_group_stream(
    stream_key: &str,
    group: &str,
    params: &[(&str, &str)],
) -> Result<String, RedisError> {
    let mut connection = get_con();
    let g_result =
        connection.xgroup_create_mkstream::<&str, &str, &str, String>(stream_key, group, "$");
    if let Err(e) = g_result.as_ref() {
        error!(
            "create group failed,{},stream key: {}, group: {}",
            e, stream_key, group
        );
        return g_result;
    }
    connection.xadd::<&str, &str, &str, &str, String>(stream_key, "*", params)
}

pub fn delete_stream_element(stream_key: &str, msg_id: String) -> Result<usize, RedisError> {
    let mut connection = get_con();
    let del_result: Result<usize, RedisError> = connection.xdel(stream_key, &[msg_id]);
    return del_result;
}