1use momento_functions_wit::host::momento::host;
4
5use crate::encoding::{Encode, EncodeError, Extract, ExtractError};
6use crate::redis::RedisSetError::UnexpectedValueResponse;
7
8pub struct RedisClient {
15 client: host::redis::Client,
16}
17
18#[derive(Debug, thiserror::Error)]
20pub enum RedisGetError<E: ExtractError> {
21 #[error(transparent)]
23 RedisError(#[from] host::redis::RedisError),
24 #[error("Failed to extract value.")]
26 ExtractFailed {
27 cause: E,
29 },
30 #[error("Status message returned from redis: {status}")]
32 StatusMessage {
33 status: String,
35 },
36 #[error("Unexpected bulk response: {response:?}")]
38 UnexpectedBulkResponse {
39 response: host::redis::ResponseStream,
41 },
42 #[error("Unexpected okay response.")]
44 UnexpectedOkayResponse,
45}
46
47#[derive(Debug, thiserror::Error)]
49pub enum RedisSetError<E: EncodeError> {
50 #[error(transparent)]
52 RedisError(#[from] host::redis::RedisError),
53 #[error("Failed to encode value.")]
55 EncodeError {
56 cause: E,
58 },
59 #[error("Status message returned from redis: {status}")]
61 StatusMessage {
62 status: String,
64 },
65 #[error("Unexpected value response: {value:?}")]
67 UnexpectedValueResponse {
68 value: Option<host::redis::Value>,
70 },
71}
72
73#[derive(Debug, thiserror::Error)]
75pub enum RedisDeleteError {
76 #[error(transparent)]
78 RedisError(#[from] host::redis::RedisError),
79 #[error("Status message returned from redis: {status}")]
81 StatusMessage {
82 status: String,
84 },
85 #[error("Unexpected value response: {value:?}")]
87 UnexpectedValueResponse {
88 value: Option<host::redis::Value>,
90 },
91}
92
93impl RedisClient {
94 pub fn new(connection_string: impl Into<String>) -> Self {
109 Self {
110 client: host::redis::Client::new(&host::redis::RedisConnectionType::BasicConnection(
111 connection_string.into(),
112 )),
113 }
114 }
115
116 pub fn get<T: Extract>(
118 &self,
119 key: impl Into<Vec<u8>>,
120 ) -> Result<Option<T>, RedisGetError<T::Error>> {
121 let response = self.client.pipe(&[host::redis::Command {
122 command: "get".to_string(),
123 arguments: vec![key.into()],
124 }])?;
125 Ok(match response.next() {
126 Some(value) => {
127 log::debug!("Redis get response: {value:?}");
128 match value {
129 host::redis::Value::Nil => None,
130 host::redis::Value::Int(i) => Some(
131 T::extract(i.to_string().into_bytes())
132 .map_err(|e| RedisGetError::ExtractFailed { cause: e })?,
133 ),
134 host::redis::Value::Data(value) => Some(
135 T::extract(value).map_err(|e| RedisGetError::ExtractFailed { cause: e })?,
136 ),
137 host::redis::Value::Bulk(response_stream) => {
138 return Err(RedisGetError::UnexpectedBulkResponse {
139 response: response_stream,
140 });
141 }
142 host::redis::Value::Status(status) => {
143 return Err(RedisGetError::StatusMessage { status });
144 }
145 host::redis::Value::Okay => {
146 return Err(RedisGetError::UnexpectedOkayResponse);
147 }
148 }
149 }
150 None => None,
151 })
152 }
153
154 pub fn set<T: Encode>(
156 &self,
157 key: impl Into<Vec<u8>>,
158 value: T,
159 ) -> Result<(), RedisSetError<T::Error>> {
160 let serialized_value = value
161 .try_serialize()
162 .map_err(|e| RedisSetError::EncodeError { cause: e })?
163 .into();
164 let response = self.client.pipe(&[host::redis::Command {
165 command: "set".to_string(),
166 arguments: vec![key.into(), serialized_value],
167 }])?;
168 match response.next() {
169 Some(host::redis::Value::Okay) => Ok(()),
170 Some(host::redis::Value::Status(status)) => {
171 Err(RedisSetError::StatusMessage { status })
172 }
173 e => Err(UnexpectedValueResponse { value: e }),
174 }
175 }
176
177 pub fn delete(&self, key: impl Into<Vec<u8>>) -> Result<(), RedisDeleteError> {
179 let response = self.client.pipe(&[host::redis::Command {
180 command: "del".to_string(),
181 arguments: vec![key.into()],
182 }])?;
183 match response.next() {
184 Some(host::redis::Value::Int(count)) => {
185 log::debug!("delete response: {count}");
186 Ok(())
187 }
188 Some(host::redis::Value::Status(status)) => {
189 Err(RedisDeleteError::StatusMessage { status })
190 }
191 e => Err(RedisDeleteError::UnexpectedValueResponse { value: e }),
192 }
193 }
194
195 pub fn pipe(&self, commands: Vec<Command>) -> Result<ResponseStream, host::redis::RedisError> {
214 let response_stream = self.client.pipe(
215 &commands
216 .into_iter()
217 .map(|Command { command, arguments }| host::redis::Command { command, arguments })
218 .collect::<Vec<_>>(),
219 )?;
220
221 Ok(ResponseStream {
222 inner: response_stream,
223 })
224 }
225}
226
227#[derive(Debug, Clone)]
229pub struct Command {
230 command: String,
231 arguments: Vec<Vec<u8>>,
232}
233impl Command {
234 pub fn builder() -> CommandBuilder<SelectCommand> {
236 CommandBuilder {
237 command: SelectCommand,
238 }
239 }
240}
241
242#[derive(Debug)]
244pub struct ResponseStream {
245 inner: host::redis::ResponseStream,
246}
247impl ResponseStream {
248 fn next_value(&mut self) -> Option<RedisValue> {
250 let next = self.inner.next();
251 next.map(|value| match value {
252 host::redis::Value::Nil => RedisValue::Nil,
253 host::redis::Value::Int(i) => RedisValue::Int(i),
254 host::redis::Value::Data(data) => RedisValue::Data(data),
255 host::redis::Value::Bulk(response_stream) => RedisValue::Bulk(ResponseStream {
256 inner: response_stream,
257 }),
258 host::redis::Value::Status(status) => RedisValue::Status(status),
259 host::redis::Value::Okay => RedisValue::Okay,
260 })
261 }
262}
263impl Iterator for ResponseStream {
264 type Item = RedisValue;
265
266 fn next(&mut self) -> Option<Self::Item> {
267 self.next_value()
268 }
269}
270
271#[derive(Debug, thiserror::Error)]
273pub enum ValueError<E: ExtractError> {
274 #[error("Failed to extract value.")]
276 ExtractFailed {
277 cause: E,
279 },
280 #[error("Value cannot be extracted from {value:?}")]
282 UnextractableValue {
283 value: RedisValue,
285 },
286}
287
288#[derive(Debug)]
290pub enum RedisValue {
291 Nil,
293 Int(i64),
295 Data(Vec<u8>),
297 Bulk(ResponseStream),
301 Status(String),
303 Okay,
305}
306impl RedisValue {
307 pub fn extract<T: Extract>(self) -> Result<T, ValueError<T::Error>> {
311 match self {
312 RedisValue::Data(data) => {
313 T::extract(data).map_err(|e| ValueError::ExtractFailed { cause: e })
314 }
315 v => Err(ValueError::UnextractableValue { value: v }),
316 }
317 }
318}
319
320#[derive(Debug, Clone)]
322pub struct CommandBuilder<SelectCommand> {
323 command: SelectCommand,
324}
325
326#[doc(hidden)]
327pub struct SelectCommand;
328impl CommandBuilder<SelectCommand> {
329 pub fn get(self, key: impl Into<Vec<u8>>) -> CommandBuilder<Get> {
331 CommandBuilder {
332 command: Get { key: key.into() },
333 }
334 }
335
336 pub fn set<T: Encode>(
338 self,
339 key: impl Into<Vec<u8>>,
340 value: T,
341 ) -> Result<CommandBuilder<Set>, T::Error> {
342 Ok(CommandBuilder {
343 command: Set {
344 key: key.into(),
345 value: value.try_serialize()?.into(),
346 existence_check: Default::default(),
347 },
348 })
349 }
350
351 pub fn any(self, command: impl Into<String>) -> CommandBuilder<Any> {
353 CommandBuilder {
354 command: Any {
355 command: command.into(),
356 arguments: Default::default(),
357 },
358 }
359 }
360}
361
362#[doc(hidden)]
363#[derive(Debug, Clone)]
364pub struct Get {
365 key: Vec<u8>,
366}
367impl CommandBuilder<Get> {
368 pub fn build(self) -> Command {
370 Command {
371 command: "get".to_string(),
372 arguments: vec![self.command.key],
373 }
374 }
375}
376
377#[doc(hidden)]
378#[derive(Debug, Clone)]
379pub struct Set {
380 key: Vec<u8>,
381 value: Vec<u8>,
382 existence_check: Option<bool>,
383}
384impl CommandBuilder<Set> {
385 pub fn if_not_exists(mut self) -> Self {
387 self.command.existence_check = Some(false);
388 self
389 }
390
391 pub fn if_exists(mut self) -> Self {
393 self.command.existence_check = Some(true);
394 self
395 }
396
397 pub fn build(self) -> Command {
399 let mut arguments = vec![self.command.key, self.command.value];
400 if let Some(existence_check) = self.command.existence_check {
401 if existence_check {
402 arguments.push(b"XX".to_vec());
403 } else {
404 arguments.push(b"NX".to_vec());
405 }
406 }
407 Command {
408 command: "set".to_string(),
409 arguments,
410 }
411 }
412}
413
414#[doc(hidden)]
415#[derive(Debug, Clone)]
416pub struct Any {
417 command: String,
418 arguments: Vec<Vec<u8>>,
419}
420impl CommandBuilder<Any> {
421 pub fn value<T: Encode>(mut self, arg: T) -> Result<Self, T::Error> {
423 self.command.arguments.push(arg.try_serialize()?.into());
424 Ok(self)
425 }
426
427 pub fn arg(mut self, arg: impl Into<Vec<u8>>) -> Self {
429 self.command.arguments.push(arg.into());
430 self
431 }
432
433 pub fn build(self) -> Command {
435 Command {
436 command: self.command.command,
437 arguments: self.command.arguments,
438 }
439 }
440}