momento_functions_host/
redis.rs

1//! Host interfaces for working with redis or valkey
2
3use momento_functions_wit::host::momento::host;
4
5use crate::{
6    FunctionResult,
7    encoding::{Encode, Extract},
8};
9
10/// Redis client for Function host interfaces.
11///
12/// This client is used to connect to a Redis or Valkey instance that you own.
13///
14/// This client uses Momento's host-provided connection cache, which keeps connections
15/// alive across invocations of your Function for reuse.
16pub struct RedisClient {
17    client: host::redis::Client,
18}
19
20impl RedisClient {
21    /// Create a new Redis client from a connection string.
22    ///
23    /// Note that the redis/valkey you are connecting to must be accessible to the
24    /// Functions host environment. If you are using public Momento endpoints, you
25    /// will only be able to connect to public caches - that is not a reasonable
26    /// way to set up a production environment. If you want to use a private cache
27    /// for a real application, please get in touch with support@momentohq.com
28    ///
29    /// ```rust
30    /// # use momento_functions_host::redis::RedisClient;
31    /// # fn f() -> FunctionResult<()> {
32    /// let client = RedisClient::new("valkey://my.valkey.instance:6379");
33    /// # Ok(())
34    /// # }
35    /// ```
36    pub fn new(connection_string: impl Into<String>) -> Self {
37        Self {
38            client: host::redis::Client::new(&host::redis::RedisConnectionType::BasicConnection(
39                connection_string.into(),
40            )),
41        }
42    }
43
44    /// Get a value from Redis by key.
45    pub fn get<T: Extract>(&self, key: impl Into<Vec<u8>>) -> FunctionResult<Option<T>> {
46        let response = self.client.pipe(&[host::redis::Command {
47            command: "get".to_string(),
48            arguments: vec![key.into()],
49        }])?;
50        Ok(match response.next() {
51            Some(value) => {
52                log::debug!("Redis get response: {value:?}");
53                match value {
54                    host::redis::Value::Nil => None,
55                    host::redis::Value::Int(i) => Some(T::extract(i.to_string().into_bytes())?),
56                    host::redis::Value::Data(value) => Some(T::extract(value)?),
57                    host::redis::Value::Bulk(response_stream) => {
58                        return Err(crate::Error::MessageError(format!(
59                            "Bulk response not supported in this context {response_stream:?}"
60                        )));
61                    }
62                    host::redis::Value::Status(status) => {
63                        return Err(crate::Error::MessageError(status));
64                    }
65                    host::redis::Value::Okay => {
66                        return Err(crate::Error::MessageError(
67                            "Okay response not supported in this context".into(),
68                        ));
69                    }
70                }
71            }
72            None => None,
73        })
74    }
75
76    /// Set a value in Redis with a key.
77    pub fn set<T: Encode>(&self, key: impl Into<Vec<u8>>, value: T) -> FunctionResult<()> {
78        let serialized_value = value.try_serialize()?.into();
79        let response = self.client.pipe(&[host::redis::Command {
80            command: "set".to_string(),
81            arguments: vec![key.into(), serialized_value],
82        }])?;
83        match response.next() {
84            Some(host::redis::Value::Okay) => Ok(()),
85            Some(host::redis::Value::Status(status)) => Err(crate::Error::MessageError(status)),
86            e => Err(crate::Error::MessageError(format!(
87                "unexpected response: {e:?}"
88            ))),
89        }
90    }
91
92    /// Delete a key from Redis.
93    pub fn delete(&self, key: impl Into<Vec<u8>>) -> FunctionResult<()> {
94        let response = self.client.pipe(&[host::redis::Command {
95            command: "del".to_string(),
96            arguments: vec![key.into()],
97        }])?;
98        match response.next() {
99            Some(host::redis::Value::Int(count)) => {
100                log::debug!("delete response: {count}");
101                Ok(())
102            }
103            Some(host::redis::Value::Status(status)) => Err(crate::Error::MessageError(status)),
104            e => Err(crate::Error::MessageError(format!(
105                "unexpected response: {e:?}"
106            ))),
107        }
108    }
109
110    /// Execute redis commands
111    ///
112    /// ```rust
113    /// # use momento_functions_host::redis::{RedisClient, Command};
114    /// # use momento_functions_host::FunctionResult;
115    /// # fn f(client: &RedisClient) -> FunctionResult<()> {
116    /// let response_stream = client.pipe(vec![
117    ///     Command::builder().set("my_key", "my_value")?.build(),
118    ///     Command::builder().get("my_key").build(),
119    ///     Command::builder()
120    ///         .any("FT.SEARCH")
121    ///         .arg(r#"test_index "*=>[KNN 5 @vector_a $query_vector]" PARAMS 2 query_vector "\xcd\xccL?\x00\x00\x00\x00\x00\x00\x00\x00""#)
122    ///         .build(),
123    /// ]);
124    ///
125    /// #    Ok(())
126    /// #}
127    /// ```
128    pub fn pipe(&self, commands: Vec<Command>) -> FunctionResult<ResponseStream> {
129        let response_stream = self.client.pipe(
130            &commands
131                .into_iter()
132                .map(|Command { command, arguments }| host::redis::Command { command, arguments })
133                .collect::<Vec<_>>(),
134        )?;
135
136        Ok(ResponseStream {
137            inner: response_stream,
138        })
139    }
140}
141
142impl From<host::redis::RedisError> for crate::Error {
143    fn from(e: host::redis::RedisError) -> Self {
144        crate::Error::MessageError(format!("Redis error: {e:?}"))
145    }
146}
147
148/// A raw redis command
149#[derive(Debug, Clone)]
150pub struct Command {
151    command: String,
152    arguments: Vec<Vec<u8>>,
153}
154impl Command {
155    /// A builder for creating redis commands
156    pub fn builder() -> CommandBuilder<SelectCommand> {
157        CommandBuilder {
158            command: SelectCommand,
159        }
160    }
161}
162
163/// A stream of responses from a redis pipe
164#[derive(Debug)]
165pub struct ResponseStream {
166    inner: host::redis::ResponseStream,
167}
168impl ResponseStream {
169    /// Get the next response from the stream
170    fn next_value(&mut self) -> Option<RedisValue> {
171        let next = self.inner.next();
172        next.map(|value| match value {
173            host::redis::Value::Nil => RedisValue::Nil,
174            host::redis::Value::Int(i) => RedisValue::Int(i),
175            host::redis::Value::Data(data) => RedisValue::Data(data),
176            host::redis::Value::Bulk(response_stream) => RedisValue::Bulk(ResponseStream {
177                inner: response_stream,
178            }),
179            host::redis::Value::Status(status) => RedisValue::Status(status),
180            host::redis::Value::Okay => RedisValue::Okay,
181        })
182    }
183}
184impl Iterator for ResponseStream {
185    type Item = RedisValue;
186
187    fn next(&mut self) -> Option<Self::Item> {
188        self.next_value()
189    }
190}
191
192/// A value returned from a redis command
193#[derive(Debug)]
194pub enum RedisValue {
195    /// An explicit nil value was returned from the server
196    Nil,
197    /// An explicit integer value was returned from the server
198    Int(i64),
199    /// A data blob was returned from the server
200    Data(Vec<u8>),
201    /// A bulk response stream was returned from the server.
202    /// This is used for commands that return multiple values. You iterate over it
203    /// to get each individual value.
204    Bulk(ResponseStream),
205    /// A status message was returned from the server
206    Status(String),
207    /// An okay response was returned from the server
208    Okay,
209}
210impl RedisValue {
211    /// try to extract the value as a specific type
212    ///
213    /// Only works for Data responses.
214    pub fn extract<T: Extract>(self) -> FunctionResult<T> {
215        match self {
216            RedisValue::Data(data) => T::extract(data),
217            v => Err(crate::Error::MessageError(format!(
218                "cannot extract value from {v:?}"
219            ))),
220        }
221    }
222}
223
224/// A builder for creating raw redis commands
225#[derive(Debug, Clone)]
226pub struct CommandBuilder<SelectCommand> {
227    command: SelectCommand,
228}
229
230#[doc(hidden)]
231pub struct SelectCommand;
232impl CommandBuilder<SelectCommand> {
233    /// Set the command to execute
234    pub fn get(self, key: impl Into<Vec<u8>>) -> CommandBuilder<Get> {
235        CommandBuilder {
236            command: Get { key: key.into() },
237        }
238    }
239
240    /// Set the command to execute
241    pub fn set<T: Encode>(
242        self,
243        key: impl Into<Vec<u8>>,
244        value: T,
245    ) -> FunctionResult<CommandBuilder<Set>> {
246        Ok(CommandBuilder {
247            command: Set {
248                key: key.into(),
249                value: value.try_serialize()?.into(),
250                existence_check: Default::default(),
251            },
252        })
253    }
254
255    /// Set the command to execute
256    pub fn any(self, command: impl Into<String>) -> CommandBuilder<Any> {
257        CommandBuilder {
258            command: Any {
259                command: command.into(),
260                arguments: Default::default(),
261            },
262        }
263    }
264}
265
266#[doc(hidden)]
267#[derive(Debug, Clone)]
268pub struct Get {
269    key: Vec<u8>,
270}
271impl CommandBuilder<Get> {
272    /// Finalize the command
273    pub fn build(self) -> Command {
274        Command {
275            command: "get".to_string(),
276            arguments: vec![self.command.key],
277        }
278    }
279}
280
281#[doc(hidden)]
282#[derive(Debug, Clone)]
283pub struct Set {
284    key: Vec<u8>,
285    value: Vec<u8>,
286    existence_check: Option<bool>,
287}
288impl CommandBuilder<Set> {
289    /// Only set the value if the key does not already exist
290    pub fn if_not_exists(mut self) -> Self {
291        self.command.existence_check = Some(false);
292        self
293    }
294
295    /// Only set the value if the key already exists
296    pub fn if_exists(mut self) -> Self {
297        self.command.existence_check = Some(true);
298        self
299    }
300
301    /// Finalize the command
302    pub fn build(self) -> Command {
303        let mut arguments = vec![self.command.key, self.command.value];
304        if let Some(existence_check) = self.command.existence_check {
305            if existence_check {
306                arguments.push(b"XX".to_vec());
307            } else {
308                arguments.push(b"NX".to_vec());
309            }
310        }
311        Command {
312            command: "set".to_string(),
313            arguments,
314        }
315    }
316}
317
318#[doc(hidden)]
319#[derive(Debug, Clone)]
320pub struct Any {
321    command: String,
322    arguments: Vec<Vec<u8>>,
323}
324impl CommandBuilder<Any> {
325    /// Add an argument to the command
326    pub fn value<T: Encode>(mut self, arg: T) -> FunctionResult<Self> {
327        self.command.arguments.push(arg.try_serialize()?.into());
328        Ok(self)
329    }
330
331    /// Add a pre-encoded argument to the command
332    pub fn arg(mut self, arg: impl Into<Vec<u8>>) -> Self {
333        self.command.arguments.push(arg.into());
334        self
335    }
336
337    /// Finalize the command
338    pub fn build(self) -> Command {
339        Command {
340            command: self.command.command,
341            arguments: self.command.arguments,
342        }
343    }
344}