1use crate::error::Error;
3use actix::prelude::*;
4use hitbox_backend::{Backend, BackendError, Delete, DeleteStatus, Get, Lock, LockStatus, Set};
5use log::{debug, info};
6use redis::{aio::ConnectionManager, Client};
7
8pub struct RedisBackend {
16 connection: ConnectionManager,
17}
18
19impl RedisBackend {
20 pub async fn new() -> Result<RedisBackend, Error> {
32 Self::builder().build().await
33 }
34
35 pub fn builder() -> RedisBackendBuilder {
37 RedisBackendBuilder::default()
38 }
39}
40
41pub struct RedisBackendBuilder {
43 connection_info: String,
44}
45
46impl Default for RedisBackendBuilder {
47 fn default() -> Self {
48 Self {
49 connection_info: "redis://127.0.0.1/".to_owned(),
50 }
51 }
52}
53
54impl RedisBackendBuilder {
55 pub fn server(mut self, connection_info: String) -> Self {
57 self.connection_info = connection_info;
58 self
59 }
60
61 pub async fn build(&self) -> Result<RedisBackend, Error> {
63 let client = Client::open(self.connection_info.as_str())?;
64 let connection = client.get_tokio_connection_manager().await?;
65 Ok(RedisBackend { connection })
66 }
67}
68
69impl Backend for RedisBackend {
70 type Actor = Self;
71 type Context = Context<Self>;
72}
73
74impl Actor for RedisBackend {
76 type Context = Context<Self>;
77
78 fn started(&mut self, _: &mut Self::Context) {
79 info!("Cache actor started");
80 }
81}
82
83impl Handler<Get> for RedisBackend {
85 type Result = ResponseFuture<Result<Option<Vec<u8>>, BackendError>>;
86
87 fn handle(&mut self, msg: Get, _: &mut Self::Context) -> Self::Result {
88 let mut con = self.connection.clone();
89 let fut = async move {
90 redis::cmd("GET")
91 .arg(msg.key)
92 .query_async(&mut con)
93 .await
94 .map_err(Error::from)
95 .map_err(BackendError::from)
96 };
97 Box::pin(fut)
98 }
99}
100
101impl Handler<Set> for RedisBackend {
103 type Result = ResponseFuture<Result<String, BackendError>>;
104
105 fn handle(&mut self, msg: Set, _: &mut Self::Context) -> Self::Result {
106 let mut con = self.connection.clone();
107 Box::pin(async move {
108 let mut request = redis::cmd("SET");
109 request.arg(msg.key).arg(msg.value);
110 if let Some(ttl) = msg.ttl {
111 request.arg("EX").arg(ttl);
112 };
113 request
114 .query_async(&mut con)
115 .await
116 .map_err(Error::from)
117 .map_err(BackendError::from)
118 })
119 }
120}
121
122impl Handler<Delete> for RedisBackend {
124 type Result = ResponseFuture<Result<DeleteStatus, BackendError>>;
125
126 fn handle(&mut self, msg: Delete, _: &mut Self::Context) -> Self::Result {
127 let mut con = self.connection.clone();
128 Box::pin(async move {
129 redis::cmd("DEL")
130 .arg(msg.key)
131 .query_async(&mut con)
132 .await
133 .map(|res| {
134 if res > 0 {
135 DeleteStatus::Deleted(res)
136 } else {
137 DeleteStatus::Missing
138 }
139 })
140 .map_err(Error::from)
141 .map_err(BackendError::from)
142 })
143 }
144}
145
146impl Handler<Lock> for RedisBackend {
148 type Result = ResponseFuture<Result<LockStatus, BackendError>>;
149
150 fn handle(&mut self, msg: Lock, _: &mut Self::Context) -> Self::Result {
151 debug!("Redis Lock: {}", msg.key);
152 let mut con = self.connection.clone();
153 Box::pin(async move {
154 redis::cmd("SET")
155 .arg(format!("lock::{}", msg.key))
156 .arg("")
157 .arg("NX")
158 .arg("EX")
159 .arg(msg.ttl)
160 .query_async(&mut con)
161 .await
162 .map(|res: Option<String>| -> LockStatus {
163 if res.is_some() {
164 LockStatus::Acquired
165 } else {
166 LockStatus::Locked
167 }
168 })
169 .map_err(Error::from)
170 .map_err(BackendError::from)
171 })
172 }
173}