momento_functions_host/
redis.rs1use momento_functions_wit::host::momento::host;
4
5use crate::{
6 FunctionResult,
7 encoding::{Encode, Extract},
8};
9
10pub struct RedisClient {
17 client: host::redis::Client,
18}
19
20impl RedisClient {
21 pub fn new(connection_string: impl Into<String>) -> Self {
37 Self {
38 client: host::redis::Client::new(&host::redis::RedisConnectionType::BasicConnection(
39 connection_string.into(),
40 )),
41 }
42 }
43
44 pub fn get<T: Extract>(&self, key: impl Into<Vec<u8>>) -> FunctionResult<Option<T>> {
46 let response = self.client.pipe(&[host::redis::Command {
47 command: "get".to_string(),
48 arguments: vec![key.into()],
49 }])?;
50 Ok(match response.next() {
51 Some(value) => {
52 log::debug!("Redis get response: {value:?}");
53 match value {
54 host::redis::Value::Nil => None,
55 host::redis::Value::Int(i) => Some(T::extract(i.to_string().into_bytes())?),
56 host::redis::Value::Data(value) => Some(T::extract(value)?),
57 host::redis::Value::Bulk(response_stream) => {
58 return Err(crate::Error::MessageError(format!(
59 "Bulk response not supported in this context {response_stream:?}"
60 )));
61 }
62 host::redis::Value::Status(status) => {
63 return Err(crate::Error::MessageError(status));
64 }
65 host::redis::Value::Okay => {
66 return Err(crate::Error::MessageError(
67 "Okay response not supported in this context".into(),
68 ));
69 }
70 }
71 }
72 None => None,
73 })
74 }
75
76 pub fn set<T: Encode>(&self, key: impl Into<Vec<u8>>, value: T) -> FunctionResult<()> {
78 let serialized_value = value.try_serialize()?.into();
79 let response = self.client.pipe(&[host::redis::Command {
80 command: "set".to_string(),
81 arguments: vec![key.into(), serialized_value],
82 }])?;
83 match response.next() {
84 Some(host::redis::Value::Okay) => Ok(()),
85 Some(host::redis::Value::Status(status)) => Err(crate::Error::MessageError(status)),
86 e => Err(crate::Error::MessageError(format!(
87 "unexpected response: {e:?}"
88 ))),
89 }
90 }
91
92 pub fn delete(&self, key: impl Into<Vec<u8>>) -> FunctionResult<()> {
94 let response = self.client.pipe(&[host::redis::Command {
95 command: "del".to_string(),
96 arguments: vec![key.into()],
97 }])?;
98 match response.next() {
99 Some(host::redis::Value::Int(count)) => {
100 log::debug!("delete response: {count}");
101 Ok(())
102 }
103 Some(host::redis::Value::Status(status)) => Err(crate::Error::MessageError(status)),
104 e => Err(crate::Error::MessageError(format!(
105 "unexpected response: {e:?}"
106 ))),
107 }
108 }
109
110 pub fn pipe(&self, commands: Vec<Command>) -> FunctionResult<ResponseStream> {
129 let response_stream = self.client.pipe(
130 &commands
131 .into_iter()
132 .map(|Command { command, arguments }| host::redis::Command { command, arguments })
133 .collect::<Vec<_>>(),
134 )?;
135
136 Ok(ResponseStream {
137 inner: response_stream,
138 })
139 }
140}
141
142impl From<host::redis::RedisError> for crate::Error {
143 fn from(e: host::redis::RedisError) -> Self {
144 crate::Error::MessageError(format!("Redis error: {e:?}"))
145 }
146}
147
148#[derive(Debug, Clone)]
150pub struct Command {
151 command: String,
152 arguments: Vec<Vec<u8>>,
153}
154impl Command {
155 pub fn builder() -> CommandBuilder<SelectCommand> {
157 CommandBuilder {
158 command: SelectCommand,
159 }
160 }
161}
162
163#[derive(Debug)]
165pub struct ResponseStream {
166 inner: host::redis::ResponseStream,
167}
168impl ResponseStream {
169 fn next_value(&mut self) -> Option<RedisValue> {
171 let next = self.inner.next();
172 next.map(|value| match value {
173 host::redis::Value::Nil => RedisValue::Nil,
174 host::redis::Value::Int(i) => RedisValue::Int(i),
175 host::redis::Value::Data(data) => RedisValue::Data(data),
176 host::redis::Value::Bulk(response_stream) => RedisValue::Bulk(ResponseStream {
177 inner: response_stream,
178 }),
179 host::redis::Value::Status(status) => RedisValue::Status(status),
180 host::redis::Value::Okay => RedisValue::Okay,
181 })
182 }
183}
184impl Iterator for ResponseStream {
185 type Item = RedisValue;
186
187 fn next(&mut self) -> Option<Self::Item> {
188 self.next_value()
189 }
190}
191
192#[derive(Debug)]
194pub enum RedisValue {
195 Nil,
197 Int(i64),
199 Data(Vec<u8>),
201 Bulk(ResponseStream),
205 Status(String),
207 Okay,
209}
210impl RedisValue {
211 pub fn extract<T: Extract>(self) -> FunctionResult<T> {
215 match self {
216 RedisValue::Data(data) => T::extract(data),
217 v => Err(crate::Error::MessageError(format!(
218 "cannot extract value from {v:?}"
219 ))),
220 }
221 }
222}
223
224#[derive(Debug, Clone)]
226pub struct CommandBuilder<SelectCommand> {
227 command: SelectCommand,
228}
229
230#[doc(hidden)]
231pub struct SelectCommand;
232impl CommandBuilder<SelectCommand> {
233 pub fn get(self, key: impl Into<Vec<u8>>) -> CommandBuilder<Get> {
235 CommandBuilder {
236 command: Get { key: key.into() },
237 }
238 }
239
240 pub fn set<T: Encode>(
242 self,
243 key: impl Into<Vec<u8>>,
244 value: T,
245 ) -> FunctionResult<CommandBuilder<Set>> {
246 Ok(CommandBuilder {
247 command: Set {
248 key: key.into(),
249 value: value.try_serialize()?.into(),
250 existence_check: Default::default(),
251 },
252 })
253 }
254
255 pub fn any(self, command: impl Into<String>) -> CommandBuilder<Any> {
257 CommandBuilder {
258 command: Any {
259 command: command.into(),
260 arguments: Default::default(),
261 },
262 }
263 }
264}
265
266#[doc(hidden)]
267#[derive(Debug, Clone)]
268pub struct Get {
269 key: Vec<u8>,
270}
271impl CommandBuilder<Get> {
272 pub fn build(self) -> Command {
274 Command {
275 command: "get".to_string(),
276 arguments: vec![self.command.key],
277 }
278 }
279}
280
281#[doc(hidden)]
282#[derive(Debug, Clone)]
283pub struct Set {
284 key: Vec<u8>,
285 value: Vec<u8>,
286 existence_check: Option<bool>,
287}
288impl CommandBuilder<Set> {
289 pub fn if_not_exists(mut self) -> Self {
291 self.command.existence_check = Some(false);
292 self
293 }
294
295 pub fn if_exists(mut self) -> Self {
297 self.command.existence_check = Some(true);
298 self
299 }
300
301 pub fn build(self) -> Command {
303 let mut arguments = vec![self.command.key, self.command.value];
304 if let Some(existence_check) = self.command.existence_check {
305 if existence_check {
306 arguments.push(b"XX".to_vec());
307 } else {
308 arguments.push(b"NX".to_vec());
309 }
310 }
311 Command {
312 command: "set".to_string(),
313 arguments,
314 }
315 }
316}
317
318#[doc(hidden)]
319#[derive(Debug, Clone)]
320pub struct Any {
321 command: String,
322 arguments: Vec<Vec<u8>>,
323}
324impl CommandBuilder<Any> {
325 pub fn value<T: Encode>(mut self, arg: T) -> FunctionResult<Self> {
327 self.command.arguments.push(arg.try_serialize()?.into());
328 Ok(self)
329 }
330
331 pub fn arg(mut self, arg: impl Into<Vec<u8>>) -> Self {
333 self.command.arguments.push(arg.into());
334 self
335 }
336
337 pub fn build(self) -> Command {
339 Command {
340 command: self.command.command,
341 arguments: self.command.arguments,
342 }
343 }
344}