1use redis::{AsyncCommands, RedisResult, Value};
2use tokio_stream::StreamExt;
3use tonic::IntoRequest;
4
5#[tonic::async_trait]
6pub trait MessageConsumer {
7 async fn consume(&self, msg: redis::Msg) -> anyhow::Result<()>;
8}
9
10pub struct RedisFacade {
11 pub conn: redis::aio::MultiplexedConnection,
12 pub url: String,
13}
14
15impl RedisFacade {
16 pub async fn new(url: &str) -> Self {
17 let client = redis::Client::open(url)
18 .expect(format!("Failed to establish redis connection at {}", url).as_str());
19 let conn = client
20 .get_multiplexed_async_connection()
21 .await
22 .expect("Failed to get multiplexed async connection.");
23 RedisFacade {
24 conn,
25 url: url.to_string(),
26 }
27 }
28
29 fn get_conn(&self) -> redis::aio::MultiplexedConnection {
30 self.conn.clone()
31 }
32
33 fn format_redis_value(value: redis::Value) -> String {
34 match value {
35 Value::Nil => String::from("(nil)"),
36 Value::Int(value) => format!("{}", value),
37 Value::Data(data) => String::from_utf8(data).unwrap(),
38 Value::Bulk(data) => format!("{:?}", data),
39 Value::Status(status) => format!("{}", status),
40 Value::Okay => String::from("OK"),
41 }
42 }
43
44 pub async fn command(&self, command: &str) -> anyhow::Result<String> {
45 let mut conn = self.get_conn();
46 let args = command.split(" ").collect::<Vec<&str>>();
47 let mut cmd = redis::cmd(args.get(0).unwrap());
48 for arg in args[1..].iter() {
49 cmd.arg(arg);
50 }
51 let msg = cmd
52 .query_async::<redis::aio::MultiplexedConnection, redis::Value>(&mut conn)
53 .await?;
54 Ok(RedisFacade::format_redis_value(msg))
55 }
56
57 pub async fn keys(&self, pattern: &str) -> anyhow::Result<Vec<String>> {
58 let mut conn = self.get_conn();
59 let response = conn.keys::<&str, Vec<String>>(pattern).await?;
60 Ok(response)
61 }
62
63 pub async fn set(&self, k: &str, v: &str) -> anyhow::Result<String> {
64 let mut conn = self.get_conn();
65 let response = conn.set::<&str, &str, Value>(k, v).await?;
66 Ok(RedisFacade::format_redis_value(response))
67 }
68
69 pub async fn get(&self, k: &str) -> anyhow::Result<String> {
70 let mut conn = self.get_conn();
71 let response = conn.get::<&str, Value>(k).await?;
72 Ok(RedisFacade::format_redis_value(response))
73 }
74
75 pub async fn subscribe_channels<T>(
76 url: &str,
77 channels: &Vec<String>,
78 consumer: &T,
79 ) -> anyhow::Result<()>
80 where
81 T: MessageConsumer,
82 {
83 let client = redis::Client::open(url)?;
84 let conn = client.get_async_connection().await?;
85 let mut pubsub = conn.into_pubsub();
86 for channel in channels {
87 info!("subscribe channel: {}", channel);
88 pubsub.subscribe(channel).await?;
89 }
90 let mut stream = pubsub.on_message();
91 while let Some(msg) = stream.next().await {
92 consumer.consume(msg).await?
93 }
94 Ok(())
95 }
96
97 pub async fn publish(&self, channel: &str, message: &str) -> anyhow::Result<i64> {
98 let mut conn = self.conn.clone();
99 let result = conn.publish::<&str, &str, i64>(channel, message).await;
100 match result {
101 Ok(n) => Ok(n),
102 Err(err) => Err(anyhow::anyhow!("{}", err)),
103 }
104 }
105}