momento_functions_host/
redis.rs

1//! Host interfaces for working with redis or valkey
2
3use momento_functions_wit::host::momento::host;
4
5use crate::{
6    FunctionResult,
7    encoding::{Encode, Extract},
8};
9
10/// Redis client for Function host interfaces.
11///
12/// This client is used to connect to a Redis or Valkey instance that you own.
13///
14/// This client uses Momento's host-provided connection cache, which keeps connections
15/// alive across invocations of your Function for reuse.
16pub struct RedisClient {
17    client: host::redis::Client,
18}
19
20impl RedisClient {
21    /// Create a new Redis client from a connection string.
22    ///
23    /// Note that the redis/valkey you are connecting to must be accessible to the
24    /// Functions host environment. If you are using public Momento endpoints, you
25    /// will only be able to connect to public caches - that is not a reasonable
26    /// way to set up a production environment. If you want to use a private cache
27    /// for a real application, please get in touch with support@momentohq.com
28    ///
29    /// ```rust
30    /// # use momento_functions_host::redis::RedisClient;
31    /// # use momento_functions_host::FunctionResult;
32    /// # fn f() -> FunctionResult<()> {
33    /// let client = RedisClient::new("valkey://my.valkey.instance:6379");
34    /// # Ok(())
35    /// # }
36    /// ```
37    pub fn new(connection_string: impl Into<String>) -> Self {
38        Self {
39            client: host::redis::Client::new(&host::redis::RedisConnectionType::BasicConnection(
40                connection_string.into(),
41            )),
42        }
43    }
44
45    /// Get a value from Redis by key.
46    pub fn get<T: Extract>(&self, key: impl Into<Vec<u8>>) -> FunctionResult<Option<T>> {
47        let response = self.client.pipe(&[host::redis::Command {
48            command: "get".to_string(),
49            arguments: vec![key.into()],
50        }])?;
51        Ok(match response.next() {
52            Some(value) => {
53                log::debug!("Redis get response: {value:?}");
54                match value {
55                    host::redis::Value::Nil => None,
56                    host::redis::Value::Int(i) => Some(T::extract(i.to_string().into_bytes())?),
57                    host::redis::Value::Data(value) => Some(T::extract(value)?),
58                    host::redis::Value::Bulk(response_stream) => {
59                        return Err(crate::Error::MessageError(format!(
60                            "Bulk response not supported in this context {response_stream:?}"
61                        )));
62                    }
63                    host::redis::Value::Status(status) => {
64                        return Err(crate::Error::MessageError(status));
65                    }
66                    host::redis::Value::Okay => {
67                        return Err(crate::Error::MessageError(
68                            "Okay response not supported in this context".into(),
69                        ));
70                    }
71                }
72            }
73            None => None,
74        })
75    }
76
77    /// Set a value in Redis with a key.
78    pub fn set<T: Encode>(&self, key: impl Into<Vec<u8>>, value: T) -> FunctionResult<()> {
79        let serialized_value = value.try_serialize()?.into();
80        let response = self.client.pipe(&[host::redis::Command {
81            command: "set".to_string(),
82            arguments: vec![key.into(), serialized_value],
83        }])?;
84        match response.next() {
85            Some(host::redis::Value::Okay) => Ok(()),
86            Some(host::redis::Value::Status(status)) => Err(crate::Error::MessageError(status)),
87            e => Err(crate::Error::MessageError(format!(
88                "unexpected response: {e:?}"
89            ))),
90        }
91    }
92
93    /// Delete a key from Redis.
94    pub fn delete(&self, key: impl Into<Vec<u8>>) -> FunctionResult<()> {
95        let response = self.client.pipe(&[host::redis::Command {
96            command: "del".to_string(),
97            arguments: vec![key.into()],
98        }])?;
99        match response.next() {
100            Some(host::redis::Value::Int(count)) => {
101                log::debug!("delete response: {count}");
102                Ok(())
103            }
104            Some(host::redis::Value::Status(status)) => Err(crate::Error::MessageError(status)),
105            e => Err(crate::Error::MessageError(format!(
106                "unexpected response: {e:?}"
107            ))),
108        }
109    }
110
111    /// Execute redis commands
112    ///
113    /// ```rust
114    /// # use momento_functions_host::redis::{RedisClient, Command};
115    /// # use momento_functions_host::FunctionResult;
116    /// # fn f(client: &RedisClient) -> FunctionResult<()> {
117    /// let response_stream = client.pipe(vec![
118    ///     Command::builder().set("my_key", "my_value")?.build(),
119    ///     Command::builder().get("my_key").build(),
120    ///     Command::builder()
121    ///         .any("FT.SEARCH")
122    ///         .arg(r#"test_index "*=>[KNN 5 @vector_a $query_vector]" PARAMS 2 query_vector "\xcd\xccL?\x00\x00\x00\x00\x00\x00\x00\x00""#)
123    ///         .build(),
124    /// ]);
125    ///
126    /// #     Ok(())
127    /// # }
128    /// ```
129    pub fn pipe(&self, commands: Vec<Command>) -> FunctionResult<ResponseStream> {
130        let response_stream = self.client.pipe(
131            &commands
132                .into_iter()
133                .map(|Command { command, arguments }| host::redis::Command { command, arguments })
134                .collect::<Vec<_>>(),
135        )?;
136
137        Ok(ResponseStream {
138            inner: response_stream,
139        })
140    }
141}
142
143impl From<host::redis::RedisError> for crate::Error {
144    fn from(e: host::redis::RedisError) -> Self {
145        crate::Error::MessageError(format!("Redis error: {e:?}"))
146    }
147}
148
149/// A raw redis command
150#[derive(Debug, Clone)]
151pub struct Command {
152    command: String,
153    arguments: Vec<Vec<u8>>,
154}
155impl Command {
156    /// A builder for creating redis commands
157    pub fn builder() -> CommandBuilder<SelectCommand> {
158        CommandBuilder {
159            command: SelectCommand,
160        }
161    }
162}
163
164/// A stream of responses from a redis pipe
165#[derive(Debug)]
166pub struct ResponseStream {
167    inner: host::redis::ResponseStream,
168}
169impl ResponseStream {
170    /// Get the next response from the stream
171    fn next_value(&mut self) -> Option<RedisValue> {
172        let next = self.inner.next();
173        next.map(|value| match value {
174            host::redis::Value::Nil => RedisValue::Nil,
175            host::redis::Value::Int(i) => RedisValue::Int(i),
176            host::redis::Value::Data(data) => RedisValue::Data(data),
177            host::redis::Value::Bulk(response_stream) => RedisValue::Bulk(ResponseStream {
178                inner: response_stream,
179            }),
180            host::redis::Value::Status(status) => RedisValue::Status(status),
181            host::redis::Value::Okay => RedisValue::Okay,
182        })
183    }
184}
185impl Iterator for ResponseStream {
186    type Item = RedisValue;
187
188    fn next(&mut self) -> Option<Self::Item> {
189        self.next_value()
190    }
191}
192
193/// A value returned from a redis command
194#[derive(Debug)]
195pub enum RedisValue {
196    /// An explicit nil value was returned from the server
197    Nil,
198    /// An explicit integer value was returned from the server
199    Int(i64),
200    /// A data blob was returned from the server
201    Data(Vec<u8>),
202    /// A bulk response stream was returned from the server.
203    /// This is used for commands that return multiple values. You iterate over it
204    /// to get each individual value.
205    Bulk(ResponseStream),
206    /// A status message was returned from the server
207    Status(String),
208    /// An okay response was returned from the server
209    Okay,
210}
211impl RedisValue {
212    /// try to extract the value as a specific type
213    ///
214    /// Only works for Data responses.
215    pub fn extract<T: Extract>(self) -> FunctionResult<T> {
216        match self {
217            RedisValue::Data(data) => T::extract(data),
218            v => Err(crate::Error::MessageError(format!(
219                "cannot extract value from {v:?}"
220            ))),
221        }
222    }
223}
224
225/// A builder for creating raw redis commands
226#[derive(Debug, Clone)]
227pub struct CommandBuilder<SelectCommand> {
228    command: SelectCommand,
229}
230
231#[doc(hidden)]
232pub struct SelectCommand;
233impl CommandBuilder<SelectCommand> {
234    /// Set the command to execute
235    pub fn get(self, key: impl Into<Vec<u8>>) -> CommandBuilder<Get> {
236        CommandBuilder {
237            command: Get { key: key.into() },
238        }
239    }
240
241    /// Set the command to execute
242    pub fn set<T: Encode>(
243        self,
244        key: impl Into<Vec<u8>>,
245        value: T,
246    ) -> FunctionResult<CommandBuilder<Set>> {
247        Ok(CommandBuilder {
248            command: Set {
249                key: key.into(),
250                value: value.try_serialize()?.into(),
251                existence_check: Default::default(),
252            },
253        })
254    }
255
256    /// Set the command to execute
257    pub fn any(self, command: impl Into<String>) -> CommandBuilder<Any> {
258        CommandBuilder {
259            command: Any {
260                command: command.into(),
261                arguments: Default::default(),
262            },
263        }
264    }
265}
266
267#[doc(hidden)]
268#[derive(Debug, Clone)]
269pub struct Get {
270    key: Vec<u8>,
271}
272impl CommandBuilder<Get> {
273    /// Finalize the command
274    pub fn build(self) -> Command {
275        Command {
276            command: "get".to_string(),
277            arguments: vec![self.command.key],
278        }
279    }
280}
281
282#[doc(hidden)]
283#[derive(Debug, Clone)]
284pub struct Set {
285    key: Vec<u8>,
286    value: Vec<u8>,
287    existence_check: Option<bool>,
288}
289impl CommandBuilder<Set> {
290    /// Only set the value if the key does not already exist
291    pub fn if_not_exists(mut self) -> Self {
292        self.command.existence_check = Some(false);
293        self
294    }
295
296    /// Only set the value if the key already exists
297    pub fn if_exists(mut self) -> Self {
298        self.command.existence_check = Some(true);
299        self
300    }
301
302    /// Finalize the command
303    pub fn build(self) -> Command {
304        let mut arguments = vec![self.command.key, self.command.value];
305        if let Some(existence_check) = self.command.existence_check {
306            if existence_check {
307                arguments.push(b"XX".to_vec());
308            } else {
309                arguments.push(b"NX".to_vec());
310            }
311        }
312        Command {
313            command: "set".to_string(),
314            arguments,
315        }
316    }
317}
318
319#[doc(hidden)]
320#[derive(Debug, Clone)]
321pub struct Any {
322    command: String,
323    arguments: Vec<Vec<u8>>,
324}
325impl CommandBuilder<Any> {
326    /// Add an argument to the command
327    pub fn value<T: Encode>(mut self, arg: T) -> FunctionResult<Self> {
328        self.command.arguments.push(arg.try_serialize()?.into());
329        Ok(self)
330    }
331
332    /// Add a pre-encoded argument to the command
333    pub fn arg(mut self, arg: impl Into<Vec<u8>>) -> Self {
334        self.command.arguments.push(arg.into());
335        self
336    }
337
338    /// Finalize the command
339    pub fn build(self) -> Command {
340        Command {
341            command: self.command.command,
342            arguments: self.command.arguments,
343        }
344    }
345}