redis_grpc/
conn.rs

1use redis::{AsyncCommands, RedisResult, Value};
2use tokio_stream::StreamExt;
3use tonic::IntoRequest;
4
5#[tonic::async_trait]
6pub trait MessageConsumer {
7    async fn consume(&self, msg: redis::Msg) -> anyhow::Result<()>;
8}
9
10pub struct RedisFacade {
11    pub conn: redis::aio::MultiplexedConnection,
12    pub url: String,
13}
14
15impl RedisFacade {
16    pub async fn new(url: &str) -> Self {
17        let client = redis::Client::open(url)
18            .expect(format!("Failed to establish redis connection at {}", url).as_str());
19        let conn = client
20            .get_multiplexed_async_connection()
21            .await
22            .expect("Failed to get multiplexed async connection.");
23        RedisFacade {
24            conn,
25            url: url.to_string(),
26        }
27    }
28
29    fn get_conn(&self) -> redis::aio::MultiplexedConnection {
30        self.conn.clone()
31    }
32
33    fn format_redis_value(value: redis::Value) -> String {
34        match value {
35            Value::Nil => String::from("(nil)"),
36            Value::Int(value) => format!("{}", value),
37            Value::Data(data) => String::from_utf8(data).unwrap(),
38            Value::Bulk(data) => format!("{:?}", data),
39            Value::Status(status) => format!("{}", status),
40            Value::Okay => String::from("OK"),
41        }
42    }
43
44    pub async fn command(&self, command: &str) -> anyhow::Result<String> {
45        let mut conn = self.get_conn();
46        let args = command.split(" ").collect::<Vec<&str>>();
47        let mut cmd = redis::cmd(args.get(0).unwrap());
48        for arg in args[1..].iter() {
49            cmd.arg(arg);
50        }
51        let msg = cmd
52            .query_async::<redis::aio::MultiplexedConnection, redis::Value>(&mut conn)
53            .await?;
54        Ok(RedisFacade::format_redis_value(msg))
55    }
56
57    pub async fn keys(&self, pattern: &str) -> anyhow::Result<Vec<String>> {
58        let mut conn = self.get_conn();
59        let response = conn.keys::<&str, Vec<String>>(pattern).await?;
60        Ok(response)
61    }
62
63    pub async fn set(&self, k: &str, v: &str) -> anyhow::Result<String> {
64        let mut conn = self.get_conn();
65        let response = conn.set::<&str, &str, Value>(k, v).await?;
66        Ok(RedisFacade::format_redis_value(response))
67    }
68
69    pub async fn get(&self, k: &str) -> anyhow::Result<String> {
70        let mut conn = self.get_conn();
71        let response = conn.get::<&str, Value>(k).await?;
72        Ok(RedisFacade::format_redis_value(response))
73    }
74
75    pub async fn subscribe_channels<T>(
76        url: &str,
77        channels: &Vec<String>,
78        consumer: &T,
79    ) -> anyhow::Result<()>
80    where
81        T: MessageConsumer,
82    {
83        let client = redis::Client::open(url)?;
84        let conn = client.get_async_connection().await?;
85        let mut pubsub = conn.into_pubsub();
86        for channel in channels {
87            info!("subscribe channel: {}", channel);
88            pubsub.subscribe(channel).await?;
89        }
90        let mut stream = pubsub.on_message();
91        while let Some(msg) = stream.next().await {
92            consumer.consume(msg).await?
93        }
94        Ok(())
95    }
96
97    pub async fn publish(&self, channel: &str, message: &str) -> anyhow::Result<i64> {
98        let mut conn = self.conn.clone();
99        let result = conn.publish::<&str, &str, i64>(channel, message).await;
100        match result {
101            Ok(n) => Ok(n),
102            Err(err) => Err(anyhow::anyhow!("{}", err)),
103        }
104    }
105}