momento_functions_host/
redis.rs1use momento_functions_wit::host::momento::host;
4
5use crate::{
6 FunctionResult,
7 encoding::{Encode, Extract},
8};
9
10pub struct RedisClient {
17 client: host::redis::Client,
18}
19
20impl RedisClient {
21 pub fn new(connection_string: impl Into<String>) -> Self {
38 Self {
39 client: host::redis::Client::new(&host::redis::RedisConnectionType::BasicConnection(
40 connection_string.into(),
41 )),
42 }
43 }
44
45 pub fn get<T: Extract>(&self, key: impl Into<Vec<u8>>) -> FunctionResult<Option<T>> {
47 let response = self.client.pipe(&[host::redis::Command {
48 command: "get".to_string(),
49 arguments: vec![key.into()],
50 }])?;
51 Ok(match response.next() {
52 Some(value) => {
53 log::debug!("Redis get response: {value:?}");
54 match value {
55 host::redis::Value::Nil => None,
56 host::redis::Value::Int(i) => Some(T::extract(i.to_string().into_bytes())?),
57 host::redis::Value::Data(value) => Some(T::extract(value)?),
58 host::redis::Value::Bulk(response_stream) => {
59 return Err(crate::Error::MessageError(format!(
60 "Bulk response not supported in this context {response_stream:?}"
61 )));
62 }
63 host::redis::Value::Status(status) => {
64 return Err(crate::Error::MessageError(status));
65 }
66 host::redis::Value::Okay => {
67 return Err(crate::Error::MessageError(
68 "Okay response not supported in this context".into(),
69 ));
70 }
71 }
72 }
73 None => None,
74 })
75 }
76
77 pub fn set<T: Encode>(&self, key: impl Into<Vec<u8>>, value: T) -> FunctionResult<()> {
79 let serialized_value = value.try_serialize()?.into();
80 let response = self.client.pipe(&[host::redis::Command {
81 command: "set".to_string(),
82 arguments: vec![key.into(), serialized_value],
83 }])?;
84 match response.next() {
85 Some(host::redis::Value::Okay) => Ok(()),
86 Some(host::redis::Value::Status(status)) => Err(crate::Error::MessageError(status)),
87 e => Err(crate::Error::MessageError(format!(
88 "unexpected response: {e:?}"
89 ))),
90 }
91 }
92
93 pub fn delete(&self, key: impl Into<Vec<u8>>) -> FunctionResult<()> {
95 let response = self.client.pipe(&[host::redis::Command {
96 command: "del".to_string(),
97 arguments: vec![key.into()],
98 }])?;
99 match response.next() {
100 Some(host::redis::Value::Int(count)) => {
101 log::debug!("delete response: {count}");
102 Ok(())
103 }
104 Some(host::redis::Value::Status(status)) => Err(crate::Error::MessageError(status)),
105 e => Err(crate::Error::MessageError(format!(
106 "unexpected response: {e:?}"
107 ))),
108 }
109 }
110
111 pub fn pipe(&self, commands: Vec<Command>) -> FunctionResult<ResponseStream> {
130 let response_stream = self.client.pipe(
131 &commands
132 .into_iter()
133 .map(|Command { command, arguments }| host::redis::Command { command, arguments })
134 .collect::<Vec<_>>(),
135 )?;
136
137 Ok(ResponseStream {
138 inner: response_stream,
139 })
140 }
141}
142
143impl From<host::redis::RedisError> for crate::Error {
144 fn from(e: host::redis::RedisError) -> Self {
145 crate::Error::MessageError(format!("Redis error: {e:?}"))
146 }
147}
148
149#[derive(Debug, Clone)]
151pub struct Command {
152 command: String,
153 arguments: Vec<Vec<u8>>,
154}
155impl Command {
156 pub fn builder() -> CommandBuilder<SelectCommand> {
158 CommandBuilder {
159 command: SelectCommand,
160 }
161 }
162}
163
164#[derive(Debug)]
166pub struct ResponseStream {
167 inner: host::redis::ResponseStream,
168}
169impl ResponseStream {
170 fn next_value(&mut self) -> Option<RedisValue> {
172 let next = self.inner.next();
173 next.map(|value| match value {
174 host::redis::Value::Nil => RedisValue::Nil,
175 host::redis::Value::Int(i) => RedisValue::Int(i),
176 host::redis::Value::Data(data) => RedisValue::Data(data),
177 host::redis::Value::Bulk(response_stream) => RedisValue::Bulk(ResponseStream {
178 inner: response_stream,
179 }),
180 host::redis::Value::Status(status) => RedisValue::Status(status),
181 host::redis::Value::Okay => RedisValue::Okay,
182 })
183 }
184}
185impl Iterator for ResponseStream {
186 type Item = RedisValue;
187
188 fn next(&mut self) -> Option<Self::Item> {
189 self.next_value()
190 }
191}
192
193#[derive(Debug)]
195pub enum RedisValue {
196 Nil,
198 Int(i64),
200 Data(Vec<u8>),
202 Bulk(ResponseStream),
206 Status(String),
208 Okay,
210}
211impl RedisValue {
212 pub fn extract<T: Extract>(self) -> FunctionResult<T> {
216 match self {
217 RedisValue::Data(data) => T::extract(data),
218 v => Err(crate::Error::MessageError(format!(
219 "cannot extract value from {v:?}"
220 ))),
221 }
222 }
223}
224
225#[derive(Debug, Clone)]
227pub struct CommandBuilder<SelectCommand> {
228 command: SelectCommand,
229}
230
231#[doc(hidden)]
232pub struct SelectCommand;
233impl CommandBuilder<SelectCommand> {
234 pub fn get(self, key: impl Into<Vec<u8>>) -> CommandBuilder<Get> {
236 CommandBuilder {
237 command: Get { key: key.into() },
238 }
239 }
240
241 pub fn set<T: Encode>(
243 self,
244 key: impl Into<Vec<u8>>,
245 value: T,
246 ) -> FunctionResult<CommandBuilder<Set>> {
247 Ok(CommandBuilder {
248 command: Set {
249 key: key.into(),
250 value: value.try_serialize()?.into(),
251 existence_check: Default::default(),
252 },
253 })
254 }
255
256 pub fn any(self, command: impl Into<String>) -> CommandBuilder<Any> {
258 CommandBuilder {
259 command: Any {
260 command: command.into(),
261 arguments: Default::default(),
262 },
263 }
264 }
265}
266
267#[doc(hidden)]
268#[derive(Debug, Clone)]
269pub struct Get {
270 key: Vec<u8>,
271}
272impl CommandBuilder<Get> {
273 pub fn build(self) -> Command {
275 Command {
276 command: "get".to_string(),
277 arguments: vec![self.command.key],
278 }
279 }
280}
281
282#[doc(hidden)]
283#[derive(Debug, Clone)]
284pub struct Set {
285 key: Vec<u8>,
286 value: Vec<u8>,
287 existence_check: Option<bool>,
288}
289impl CommandBuilder<Set> {
290 pub fn if_not_exists(mut self) -> Self {
292 self.command.existence_check = Some(false);
293 self
294 }
295
296 pub fn if_exists(mut self) -> Self {
298 self.command.existence_check = Some(true);
299 self
300 }
301
302 pub fn build(self) -> Command {
304 let mut arguments = vec![self.command.key, self.command.value];
305 if let Some(existence_check) = self.command.existence_check {
306 if existence_check {
307 arguments.push(b"XX".to_vec());
308 } else {
309 arguments.push(b"NX".to_vec());
310 }
311 }
312 Command {
313 command: "set".to_string(),
314 arguments,
315 }
316 }
317}
318
319#[doc(hidden)]
320#[derive(Debug, Clone)]
321pub struct Any {
322 command: String,
323 arguments: Vec<Vec<u8>>,
324}
325impl CommandBuilder<Any> {
326 pub fn value<T: Encode>(mut self, arg: T) -> FunctionResult<Self> {
328 self.command.arguments.push(arg.try_serialize()?.into());
329 Ok(self)
330 }
331
332 pub fn arg(mut self, arg: impl Into<Vec<u8>>) -> Self {
334 self.command.arguments.push(arg.into());
335 self
336 }
337
338 pub fn build(self) -> Command {
340 Command {
341 command: self.command.command,
342 arguments: self.command.arguments,
343 }
344 }
345}