momento_functions_host/
redis.rs

1//! Host interfaces for working with redis or valkey
2
3use momento_functions_wit::host::momento::host;
4
5use crate::encoding::{Encode, EncodeError, Extract, ExtractError};
6use crate::redis::RedisSetError::UnexpectedValueResponse;
7
8/// Redis client for Function host interfaces.
9///
10/// This client is used to connect to a Redis or Valkey instance that you own.
11///
12/// This client uses Momento's host-provided connection cache, which keeps connections
13/// alive across invocations of your Function for reuse.
14pub struct RedisClient {
15    client: host::redis::Client,
16}
17
18/// An error occurred while getting a redis value.
19#[derive(Debug, thiserror::Error)]
20pub enum RedisGetError<E: ExtractError> {
21    /// An error occurred while calling the host redis function.
22    #[error(transparent)]
23    RedisError(#[from] host::redis::RedisError),
24    /// An error occurred when trying to extract the value using the provided implementation.
25    #[error("Failed to extract value.")]
26    ExtractFailed {
27        /// The underlying extraction error.
28        cause: E,
29    },
30    /// Redis returned a status message.
31    #[error("Status message returned from redis: {status}")]
32    StatusMessage {
33        /// The message from Redis.
34        status: String,
35    },
36    /// A bulk response was returned by Redis.
37    #[error("Unexpected bulk response: {response:?}")]
38    UnexpectedBulkResponse {
39        /// The bulk response.
40        response: host::redis::ResponseStream,
41    },
42    /// An 'okay' response was returned by Redis. (Get expects a value response)
43    #[error("Unexpected okay response.")]
44    UnexpectedOkayResponse,
45}
46
47/// An error occurred while setting a redis value.
48#[derive(Debug, thiserror::Error)]
49pub enum RedisSetError<E: EncodeError> {
50    /// An error occurred while calling the host redis function.
51    #[error(transparent)]
52    RedisError(#[from] host::redis::RedisError),
53    /// An error occurred when encoding the provided value.
54    #[error("Failed to encode value.")]
55    EncodeError {
56        /// The underlying encoding error.
57        cause: E,
58    },
59    /// Redis returned a status message.
60    #[error("Status message returned from redis: {status}")]
61    StatusMessage {
62        /// The message from Redis.
63        status: String,
64    },
65    /// Redis returned a value in response. (Set expects an "okay" response)
66    #[error("Unexpected value response: {value:?}")]
67    UnexpectedValueResponse {
68        /// The value Redis returned.
69        value: Option<host::redis::Value>,
70    },
71}
72
73/// An error occurred while deleting a redis value.
74#[derive(Debug, thiserror::Error)]
75pub enum RedisDeleteError {
76    /// An error occurred while calling the host redis function.
77    #[error(transparent)]
78    RedisError(#[from] host::redis::RedisError),
79    /// Redis returned a status message.
80    #[error("Status message returned from redis: {status}")]
81    StatusMessage {
82        /// The message from Redis.
83        status: String,
84    },
85    /// Redis returned a value in response. (Delete expects an "okay" response)
86    #[error("Unexpected value response: {value:?}")]
87    UnexpectedValueResponse {
88        /// The value Redis returned.
89        value: Option<host::redis::Value>,
90    },
91}
92
93impl RedisClient {
94    /// Create a new Redis client from a connection string.
95    ///
96    /// Note that the redis/valkey you are connecting to must be accessible to the
97    /// Functions host environment. If you are using public Momento endpoints, you
98    /// will only be able to connect to public caches - that is not a reasonable
99    /// way to set up a production environment. If you want to use a private cache
100    /// for a real application, please get in touch with support@momentohq.com
101    ///
102    /// ```rust
103    /// # use momento_functions_host::redis::RedisClient;
104    /// # fn f() {
105    /// let client = RedisClient::new("valkey://my.valkey.instance:6379");
106    /// # }
107    /// ```
108    pub fn new(connection_string: impl Into<String>) -> Self {
109        Self {
110            client: host::redis::Client::new(&host::redis::RedisConnectionType::BasicConnection(
111                connection_string.into(),
112            )),
113        }
114    }
115
116    /// Get a value from Redis by key.
117    pub fn get<T: Extract>(
118        &self,
119        key: impl Into<Vec<u8>>,
120    ) -> Result<Option<T>, RedisGetError<T::Error>> {
121        let response = self.client.pipe(&[host::redis::Command {
122            command: "get".to_string(),
123            arguments: vec![key.into()],
124        }])?;
125        Ok(match response.next() {
126            Some(value) => {
127                log::debug!("Redis get response: {value:?}");
128                match value {
129                    host::redis::Value::Nil => None,
130                    host::redis::Value::Int(i) => Some(
131                        T::extract(i.to_string().into_bytes())
132                            .map_err(|e| RedisGetError::ExtractFailed { cause: e })?,
133                    ),
134                    host::redis::Value::Data(value) => Some(
135                        T::extract(value).map_err(|e| RedisGetError::ExtractFailed { cause: e })?,
136                    ),
137                    host::redis::Value::Bulk(response_stream) => {
138                        return Err(RedisGetError::UnexpectedBulkResponse {
139                            response: response_stream,
140                        });
141                    }
142                    host::redis::Value::Status(status) => {
143                        return Err(RedisGetError::StatusMessage { status });
144                    }
145                    host::redis::Value::Okay => {
146                        return Err(RedisGetError::UnexpectedOkayResponse);
147                    }
148                }
149            }
150            None => None,
151        })
152    }
153
154    /// Set a value in Redis with a key.
155    pub fn set<T: Encode>(
156        &self,
157        key: impl Into<Vec<u8>>,
158        value: T,
159    ) -> Result<(), RedisSetError<T::Error>> {
160        let serialized_value = value
161            .try_serialize()
162            .map_err(|e| RedisSetError::EncodeError { cause: e })?
163            .into();
164        let response = self.client.pipe(&[host::redis::Command {
165            command: "set".to_string(),
166            arguments: vec![key.into(), serialized_value],
167        }])?;
168        match response.next() {
169            Some(host::redis::Value::Okay) => Ok(()),
170            Some(host::redis::Value::Status(status)) => {
171                Err(RedisSetError::StatusMessage { status })
172            }
173            e => Err(UnexpectedValueResponse { value: e }),
174        }
175    }
176
177    /// Delete a key from Redis.
178    pub fn delete(&self, key: impl Into<Vec<u8>>) -> Result<(), RedisDeleteError> {
179        let response = self.client.pipe(&[host::redis::Command {
180            command: "del".to_string(),
181            arguments: vec![key.into()],
182        }])?;
183        match response.next() {
184            Some(host::redis::Value::Int(count)) => {
185                log::debug!("delete response: {count}");
186                Ok(())
187            }
188            Some(host::redis::Value::Status(status)) => {
189                Err(RedisDeleteError::StatusMessage { status })
190            }
191            e => Err(RedisDeleteError::UnexpectedValueResponse { value: e }),
192        }
193    }
194
195    /// Execute redis commands
196    ///
197    /// ```rust
198    /// # use momento_functions_host::redis::{RedisClient, Command};
199    /// # use momento_functions_wit::host::momento::host;
200    /// # fn f(client: &RedisClient) -> Result<(), host::redis::RedisError> {
201    /// let response_stream = client.pipe(vec![
202    ///     Command::builder().set("my_key", "my_value")?.build(),
203    ///     Command::builder().get("my_key").build(),
204    ///     Command::builder()
205    ///         .any("FT.SEARCH")
206    ///         .arg(r#"test_index "*=>[KNN 5 @vector_a $query_vector]" PARAMS 2 query_vector "\xcd\xccL?\x00\x00\x00\x00\x00\x00\x00\x00""#)
207    ///         .build(),
208    /// ]);
209    ///
210    /// #     Ok(())
211    /// # }
212    /// ```
213    pub fn pipe(&self, commands: Vec<Command>) -> Result<ResponseStream, host::redis::RedisError> {
214        let response_stream = self.client.pipe(
215            &commands
216                .into_iter()
217                .map(|Command { command, arguments }| host::redis::Command { command, arguments })
218                .collect::<Vec<_>>(),
219        )?;
220
221        Ok(ResponseStream {
222            inner: response_stream,
223        })
224    }
225}
226
227/// A raw redis command
228#[derive(Debug, Clone)]
229pub struct Command {
230    command: String,
231    arguments: Vec<Vec<u8>>,
232}
233impl Command {
234    /// A builder for creating redis commands
235    pub fn builder() -> CommandBuilder<SelectCommand> {
236        CommandBuilder {
237            command: SelectCommand,
238        }
239    }
240}
241
242/// A stream of responses from a redis pipe
243#[derive(Debug)]
244pub struct ResponseStream {
245    inner: host::redis::ResponseStream,
246}
247impl ResponseStream {
248    /// Get the next response from the stream
249    fn next_value(&mut self) -> Option<RedisValue> {
250        let next = self.inner.next();
251        next.map(|value| match value {
252            host::redis::Value::Nil => RedisValue::Nil,
253            host::redis::Value::Int(i) => RedisValue::Int(i),
254            host::redis::Value::Data(data) => RedisValue::Data(data),
255            host::redis::Value::Bulk(response_stream) => RedisValue::Bulk(ResponseStream {
256                inner: response_stream,
257            }),
258            host::redis::Value::Status(status) => RedisValue::Status(status),
259            host::redis::Value::Okay => RedisValue::Okay,
260        })
261    }
262}
263impl Iterator for ResponseStream {
264    type Item = RedisValue;
265
266    fn next(&mut self) -> Option<Self::Item> {
267        self.next_value()
268    }
269}
270
271/// An error occurred when extracting a value.
272#[derive(Debug, thiserror::Error)]
273pub enum ValueError<E: ExtractError> {
274    /// The provided extraction implementation produced an error.
275    #[error("Failed to extract value.")]
276    ExtractFailed {
277        /// The underlying extraction error.
278        cause: E,
279    },
280    /// The redis value could not be extracted from.
281    #[error("Value cannot be extracted from {value:?}")]
282    UnextractableValue {
283        /// The unextractable value in question.
284        value: RedisValue,
285    },
286}
287
288/// A value returned from a redis command
289#[derive(Debug)]
290pub enum RedisValue {
291    /// An explicit nil value was returned from the server
292    Nil,
293    /// An explicit integer value was returned from the server
294    Int(i64),
295    /// A data blob was returned from the server
296    Data(Vec<u8>),
297    /// A bulk response stream was returned from the server.
298    /// This is used for commands that return multiple values. You iterate over it
299    /// to get each individual value.
300    Bulk(ResponseStream),
301    /// A status message was returned from the server
302    Status(String),
303    /// An okay response was returned from the server
304    Okay,
305}
306impl RedisValue {
307    /// try to extract the value as a specific type
308    ///
309    /// Only works for Data responses.
310    pub fn extract<T: Extract>(self) -> Result<T, ValueError<T::Error>> {
311        match self {
312            RedisValue::Data(data) => {
313                T::extract(data).map_err(|e| ValueError::ExtractFailed { cause: e })
314            }
315            v => Err(ValueError::UnextractableValue { value: v }),
316        }
317    }
318}
319
320/// A builder for creating raw redis commands
321#[derive(Debug, Clone)]
322pub struct CommandBuilder<SelectCommand> {
323    command: SelectCommand,
324}
325
326#[doc(hidden)]
327pub struct SelectCommand;
328impl CommandBuilder<SelectCommand> {
329    /// Set the command to execute
330    pub fn get(self, key: impl Into<Vec<u8>>) -> CommandBuilder<Get> {
331        CommandBuilder {
332            command: Get { key: key.into() },
333        }
334    }
335
336    /// Set the command to execute
337    pub fn set<T: Encode>(
338        self,
339        key: impl Into<Vec<u8>>,
340        value: T,
341    ) -> Result<CommandBuilder<Set>, T::Error> {
342        Ok(CommandBuilder {
343            command: Set {
344                key: key.into(),
345                value: value.try_serialize()?.into(),
346                existence_check: Default::default(),
347            },
348        })
349    }
350
351    /// Set the command to execute
352    pub fn any(self, command: impl Into<String>) -> CommandBuilder<Any> {
353        CommandBuilder {
354            command: Any {
355                command: command.into(),
356                arguments: Default::default(),
357            },
358        }
359    }
360}
361
362#[doc(hidden)]
363#[derive(Debug, Clone)]
364pub struct Get {
365    key: Vec<u8>,
366}
367impl CommandBuilder<Get> {
368    /// Finalize the command
369    pub fn build(self) -> Command {
370        Command {
371            command: "get".to_string(),
372            arguments: vec![self.command.key],
373        }
374    }
375}
376
377#[doc(hidden)]
378#[derive(Debug, Clone)]
379pub struct Set {
380    key: Vec<u8>,
381    value: Vec<u8>,
382    existence_check: Option<bool>,
383}
384impl CommandBuilder<Set> {
385    /// Only set the value if the key does not already exist
386    pub fn if_not_exists(mut self) -> Self {
387        self.command.existence_check = Some(false);
388        self
389    }
390
391    /// Only set the value if the key already exists
392    pub fn if_exists(mut self) -> Self {
393        self.command.existence_check = Some(true);
394        self
395    }
396
397    /// Finalize the command
398    pub fn build(self) -> Command {
399        let mut arguments = vec![self.command.key, self.command.value];
400        if let Some(existence_check) = self.command.existence_check {
401            if existence_check {
402                arguments.push(b"XX".to_vec());
403            } else {
404                arguments.push(b"NX".to_vec());
405            }
406        }
407        Command {
408            command: "set".to_string(),
409            arguments,
410        }
411    }
412}
413
414#[doc(hidden)]
415#[derive(Debug, Clone)]
416pub struct Any {
417    command: String,
418    arguments: Vec<Vec<u8>>,
419}
420impl CommandBuilder<Any> {
421    /// Add an argument to the command
422    pub fn value<T: Encode>(mut self, arg: T) -> Result<Self, T::Error> {
423        self.command.arguments.push(arg.try_serialize()?.into());
424        Ok(self)
425    }
426
427    /// Add a pre-encoded argument to the command
428    pub fn arg(mut self, arg: impl Into<Vec<u8>>) -> Self {
429        self.command.arguments.push(arg.into());
430        self
431    }
432
433    /// Finalize the command
434    pub fn build(self) -> Command {
435        Command {
436            command: self.command.command,
437            arguments: self.command.arguments,
438        }
439    }
440}