Skip to main content

redis_module/context/
call_reply.rs

1use core::slice;
2use std::os::raw::c_char;
3use std::{
4    fmt,
5    fmt::{Debug, Display, Formatter},
6    marker::PhantomData,
7    ptr::NonNull,
8};
9
10use libc::c_void;
11
12use crate::{deallocate_pointer, raw::*, Context, RedisError, RedisLockIndicator};
13
14pub struct StringCallReply<'root> {
15    reply: NonNull<RedisModuleCallReply>,
16    _dummy: PhantomData<&'root ()>,
17}
18
19impl<'root> StringCallReply<'root> {
20    /// Convert StringCallReply to String.
21    /// Return None data is not a valid utf8.
22    pub fn to_string(&self) -> Option<String> {
23        String::from_utf8(self.as_bytes().to_vec()).ok()
24    }
25
26    /// Return the StringCallReply data as &[u8]
27    pub fn as_bytes(&self) -> &[u8] {
28        let mut len: usize = 0;
29        let reply_string: *mut u8 = unsafe {
30            RedisModule_CallReplyStringPtr.unwrap()(self.reply.as_ptr(), &mut len) as *mut u8
31        };
32        unsafe { slice::from_raw_parts(reply_string, len) }
33    }
34
35    /// Return the raw pointer to the underlying [RedisModuleCallReply].
36    pub fn get_raw(&self) -> *mut RedisModuleCallReply {
37        self.reply.as_ptr()
38    }
39}
40
41impl<'root> Drop for StringCallReply<'root> {
42    fn drop(&mut self) {
43        free_call_reply(self.reply.as_ptr());
44    }
45}
46
47impl<'root> Debug for StringCallReply<'root> {
48    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
49        let mut debug_struct = f.debug_struct("StringCallReply");
50        let debug_struct = debug_struct.field("reply", &self.reply);
51        match self.to_string() {
52            Some(s) => debug_struct.field("value", &s),
53            None => debug_struct.field("value", &self.as_bytes()),
54        }
55        .finish()
56    }
57}
58
59impl<'root> Display for StringCallReply<'root> {
60    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
61        fmt::Display::fmt(&String::from_utf8_lossy(self.as_bytes()), f)
62    }
63}
64
65pub struct ErrorCallReply<'root> {
66    reply: NonNull<RedisModuleCallReply>,
67    _dummy: PhantomData<&'root ()>,
68}
69
70impl<'root> ErrorCallReply<'root> {
71    /// Convert ErrorCallReply to String.
72    /// Return None data is not a valid utf8.
73    pub fn to_utf8_string(&self) -> Option<String> {
74        String::from_utf8(self.as_bytes().to_vec()).ok()
75    }
76
77    /// Return the ErrorCallReply data as &[u8]
78    pub fn as_bytes(&self) -> &[u8] {
79        let mut len: usize = 0;
80        let reply_string: *mut u8 = unsafe {
81            RedisModule_CallReplyStringPtr.unwrap()(self.reply.as_ptr(), &mut len) as *mut u8
82        };
83        unsafe { slice::from_raw_parts(reply_string, len) }
84    }
85
86    /// Return the raw pointer to the underlying [RedisModuleCallReply].
87    pub fn get_raw(&self) -> *mut RedisModuleCallReply {
88        self.reply.as_ptr()
89    }
90}
91
92impl<'root> Drop for ErrorCallReply<'root> {
93    fn drop(&mut self) {
94        free_call_reply(self.reply.as_ptr());
95    }
96}
97
98impl<'root> Debug for ErrorCallReply<'root> {
99    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
100        let mut debug_struct = f.debug_struct("ErrorCallReply");
101        let debug_struct = debug_struct.field("reply", &self.reply);
102        match self.to_utf8_string() {
103            Some(s) => debug_struct.field("value", &s),
104            None => debug_struct.field("value", &self.as_bytes()),
105        }
106        .finish()
107    }
108}
109
110impl<'root> Display for ErrorCallReply<'root> {
111    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
112        fmt::Display::fmt(&String::from_utf8_lossy(self.as_bytes()), f)
113    }
114}
115
116#[derive(Debug)]
117pub enum ErrorReply<'root> {
118    Message(String),
119    RedisError(ErrorCallReply<'root>),
120}
121
122/// Send implementation to [ErrorCallReply].
123/// We need to implements this trait because [ErrorCallReply] hold
124/// raw pointers to C data which does not auto implement the [Send] trait.
125/// By implementing [Send] on [ErrorCallReply] we basically tells the compiler
126/// that it is safe to send the underline C data between threads.
127unsafe impl<'root> Send for ErrorCallReply<'root> {}
128
129impl<'root> ErrorReply<'root> {
130    /// Convert [ErrorCallReply] to [String] or [None] if its not a valid utf8.
131    pub fn to_utf8_string(&self) -> Option<String> {
132        match self {
133            ErrorReply::Message(s) => Some(s.clone()),
134            ErrorReply::RedisError(r) => r.to_utf8_string(),
135        }
136    }
137
138    /// Return the ErrorCallReply data as &[u8]
139    pub fn as_bytes(&self) -> &[u8] {
140        match self {
141            ErrorReply::Message(s) => s.as_bytes(),
142            ErrorReply::RedisError(r) => r.as_bytes(),
143        }
144    }
145}
146
147impl<'root> Display for ErrorReply<'root> {
148    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
149        fmt::Display::fmt(&String::from_utf8_lossy(self.as_bytes()), f)
150    }
151}
152
153pub struct I64CallReply<'root> {
154    reply: NonNull<RedisModuleCallReply>,
155    _dummy: PhantomData<&'root ()>,
156}
157
158impl<'root> I64CallReply<'root> {
159    /// Return the i64 value of the [I64CallReply]
160    pub fn to_i64(&self) -> i64 {
161        call_reply_integer(self.reply.as_ptr())
162    }
163
164    /// Return the raw pointer to the underlying [RedisModuleCallReply].
165    pub fn get_raw(&self) -> *mut RedisModuleCallReply {
166        self.reply.as_ptr()
167    }
168}
169
170impl<'root> Drop for I64CallReply<'root> {
171    fn drop(&mut self) {
172        free_call_reply(self.reply.as_ptr());
173    }
174}
175
176impl<'root> Debug for I64CallReply<'root> {
177    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
178        f.debug_struct("I64CallReply")
179            .field("reply", &self.reply)
180            .field("value", &self.to_i64())
181            .finish()
182    }
183}
184
185impl<'root> Display for I64CallReply<'root> {
186    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
187        fmt::Display::fmt(&self.to_i64(), f)
188    }
189}
190
191pub struct ArrayCallReply<'root> {
192    reply: NonNull<RedisModuleCallReply>,
193    _dummy: PhantomData<&'root ()>,
194}
195
196impl<'root> Drop for ArrayCallReply<'root> {
197    fn drop(&mut self) {
198        free_call_reply(self.reply.as_ptr());
199    }
200}
201
202impl<'root> ArrayCallReply<'root> {
203    /// Return an Iterator that allows to iterate over the elements
204    /// in the [ArrayCallReply].
205    pub fn iter(&self) -> ArrayCallReplyIterator<'root, '_> {
206        ArrayCallReplyIterator {
207            reply: self,
208            index: 0,
209        }
210    }
211
212    /// Return the array element on the given index.
213    pub fn get(&self, idx: usize) -> Option<CallResult<'_>> {
214        let res = NonNull::new(call_reply_array_element(self.reply.as_ptr(), idx))?;
215        Some(create_call_reply(res))
216    }
217
218    /// Return the number of elements in the [ArrayCallReply].
219    pub fn len(&self) -> usize {
220        call_reply_length(self.reply.as_ptr())
221    }
222
223    /// Return the raw pointer to the underlying [RedisModuleCallReply].
224    pub fn get_raw(&self) -> *mut RedisModuleCallReply {
225        self.reply.as_ptr()
226    }
227}
228
229pub struct ArrayCallReplyIterator<'root, 'curr> {
230    reply: &'curr ArrayCallReply<'root>,
231    index: usize,
232}
233
234impl<'root, 'curr> Iterator for ArrayCallReplyIterator<'root, 'curr> {
235    type Item = CallResult<'curr>;
236
237    fn next(&mut self) -> Option<Self::Item> {
238        let res = self.reply.get(self.index);
239        if res.is_some() {
240            self.index += 1;
241        }
242        res
243    }
244}
245
246impl<'root> Debug for ArrayCallReply<'root> {
247    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
248        f.debug_struct("ArrayCallReply")
249            .field("reply", &self.reply)
250            .field("elements", &self.iter().collect::<Vec<CallResult>>())
251            .finish()
252    }
253}
254
255impl<'root> Display for ArrayCallReply<'root> {
256    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
257        f.write_str("[")?;
258
259        self.iter()
260            .enumerate()
261            .try_for_each(|(index, v)| -> fmt::Result {
262                if index > 1 {
263                    f.write_str(", ")?;
264                }
265                fmt_call_result(v, f)
266            })?;
267
268        f.write_str("]")
269    }
270}
271
272pub struct NullCallReply<'root> {
273    reply: NonNull<RedisModuleCallReply>,
274    _dummy: PhantomData<&'root ()>,
275}
276
277impl<'root> NullCallReply<'root> {
278    /// Return the raw pointer to the underlying [RedisModuleCallReply].
279    pub fn get_raw(&self) -> *mut RedisModuleCallReply {
280        self.reply.as_ptr()
281    }
282}
283
284impl<'root> Drop for NullCallReply<'root> {
285    fn drop(&mut self) {
286        free_call_reply(self.reply.as_ptr());
287    }
288}
289
290impl<'root> Debug for NullCallReply<'root> {
291    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
292        f.debug_struct("NullCallReply")
293            .field("reply", &self.reply)
294            .finish()
295    }
296}
297
298impl<'root> Display for NullCallReply<'root> {
299    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
300        f.write_str("Null")
301    }
302}
303
304pub struct MapCallReply<'root> {
305    reply: NonNull<RedisModuleCallReply>,
306    _dummy: PhantomData<&'root ()>,
307}
308
309impl<'root> MapCallReply<'root> {
310    /// Return an iterator over the elements in the [MapCallReply].
311    /// The iterator return each element as a tuple representing the
312    /// key and the value.
313    pub fn iter(&self) -> MapCallReplyIterator<'root, '_> {
314        MapCallReplyIterator {
315            reply: self,
316            index: 0,
317        }
318    }
319
320    /// Return the map element on the given index.
321    pub fn get(&self, idx: usize) -> Option<(CallResult<'_>, CallResult<'_>)> {
322        let (key, val) = call_reply_map_element(self.reply.as_ptr(), idx);
323        Some((
324            create_call_reply(NonNull::new(key)?),
325            create_call_reply(NonNull::new(val)?),
326        ))
327    }
328
329    /// Return the number of elements in the [MapCallReply].
330    pub fn len(&self) -> usize {
331        call_reply_length(self.reply.as_ptr())
332    }
333
334    /// Return the raw pointer to the underlying [RedisModuleCallReply].
335    pub fn get_raw(&self) -> *mut RedisModuleCallReply {
336        self.reply.as_ptr()
337    }
338}
339
340pub struct MapCallReplyIterator<'root, 'curr> {
341    reply: &'curr MapCallReply<'root>,
342    index: usize,
343}
344
345impl<'root, 'curr> Iterator for MapCallReplyIterator<'root, 'curr> {
346    type Item = (CallResult<'curr>, CallResult<'curr>);
347
348    fn next(&mut self) -> Option<Self::Item> {
349        let res = self.reply.get(self.index);
350        if res.is_some() {
351            self.index += 1;
352        }
353        res
354    }
355}
356
357impl<'root> Drop for MapCallReply<'root> {
358    fn drop(&mut self) {
359        free_call_reply(self.reply.as_ptr());
360    }
361}
362
363impl<'root> Debug for MapCallReply<'root> {
364    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
365        f.debug_struct("MapCallReply")
366            .field("reply", &self.reply)
367            .field(
368                "elements",
369                &self.iter().collect::<Vec<(CallResult, CallResult)>>(),
370            )
371            .finish()
372    }
373}
374
375impl<'root> Display for MapCallReply<'root> {
376    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
377        f.write_str("{")?;
378
379        self.iter()
380            .enumerate()
381            .try_for_each(|(index, (key, val))| -> fmt::Result {
382                if index > 1 {
383                    f.write_str(", ")?;
384                }
385                f.write_str("")?;
386                fmt_call_result(key, f)?;
387                f.write_str(": ")?;
388                fmt_call_result(val, f)
389            })?;
390
391        f.write_str("}")
392    }
393}
394
395pub struct SetCallReply<'root> {
396    reply: NonNull<RedisModuleCallReply>,
397    _dummy: PhantomData<&'root ()>,
398}
399
400impl<'root> SetCallReply<'root> {
401    /// Return an iterator over the elements in the [SetCallReply].
402    pub fn iter(&self) -> SetCallReplyIterator<'root, '_> {
403        SetCallReplyIterator {
404            reply: self,
405            index: 0,
406        }
407    }
408
409    /// Return the set element on the given index.
410    pub fn get(&self, idx: usize) -> Option<CallResult<'_>> {
411        let res = NonNull::new(call_reply_set_element(self.reply.as_ptr(), idx))?;
412        Some(create_call_reply(res))
413    }
414
415    /// Return the number of elements in the [SetCallReply].
416    pub fn len(&self) -> usize {
417        call_reply_length(self.reply.as_ptr())
418    }
419
420    /// Return the raw pointer to the underlying [RedisModuleCallReply].
421    pub fn get_raw(&self) -> *mut RedisModuleCallReply {
422        self.reply.as_ptr()
423    }
424}
425
426pub struct SetCallReplyIterator<'root, 'curr> {
427    reply: &'curr SetCallReply<'root>,
428    index: usize,
429}
430
431impl<'root, 'curr> Iterator for SetCallReplyIterator<'root, 'curr> {
432    type Item = CallResult<'curr>;
433
434    fn next(&mut self) -> Option<Self::Item> {
435        let res = self.reply.get(self.index);
436        if res.is_some() {
437            self.index += 1;
438        }
439        res
440    }
441}
442
443impl<'root> Drop for SetCallReply<'root> {
444    fn drop(&mut self) {
445        free_call_reply(self.reply.as_ptr());
446    }
447}
448
449impl<'root> Debug for SetCallReply<'root> {
450    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
451        f.debug_struct("SetCallReply")
452            .field("reply", &self.reply)
453            .field("elements", &self.iter().collect::<Vec<CallResult>>())
454            .finish()
455    }
456}
457
458impl<'root> Display for SetCallReply<'root> {
459    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
460        f.write_str("{")?;
461
462        self.iter()
463            .enumerate()
464            .try_for_each(|(index, v)| -> fmt::Result {
465                if index > 1 {
466                    f.write_str(", ")?;
467                }
468                fmt_call_result(v, f)
469            })?;
470
471        f.write_str("}")
472    }
473}
474
475pub struct BoolCallReply<'root> {
476    reply: NonNull<RedisModuleCallReply>,
477    _dummy: PhantomData<&'root ()>,
478}
479
480impl<'root> BoolCallReply<'root> {
481    /// Return the boolean value of the [BoolCallReply].
482    pub fn to_bool(&self) -> bool {
483        call_reply_bool(self.reply.as_ptr())
484    }
485
486    /// Return the raw pointer to the underlying [RedisModuleCallReply].
487    pub fn get_raw(&self) -> *mut RedisModuleCallReply {
488        self.reply.as_ptr()
489    }
490}
491
492impl<'root> Drop for BoolCallReply<'root> {
493    fn drop(&mut self) {
494        free_call_reply(self.reply.as_ptr());
495    }
496}
497
498impl<'root> Debug for BoolCallReply<'root> {
499    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
500        f.debug_struct("BoolCallReply")
501            .field("reply", &self.reply)
502            .field("value", &self.to_bool())
503            .finish()
504    }
505}
506
507impl<'root> Display for BoolCallReply<'root> {
508    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
509        fmt::Display::fmt(&self.to_bool(), f)
510    }
511}
512
513pub struct DoubleCallReply<'root> {
514    reply: NonNull<RedisModuleCallReply>,
515    _dummy: PhantomData<&'root ()>,
516}
517
518impl<'root> DoubleCallReply<'root> {
519    /// Return the double value of the [BoolCallReply] as f64.
520    pub fn to_double(&self) -> f64 {
521        call_reply_double(self.reply.as_ptr())
522    }
523
524    /// Return the raw pointer to the underlying [RedisModuleCallReply].
525    pub fn get_raw(&self) -> *mut RedisModuleCallReply {
526        self.reply.as_ptr()
527    }
528}
529
530impl<'root> Drop for DoubleCallReply<'root> {
531    fn drop(&mut self) {
532        free_call_reply(self.reply.as_ptr());
533    }
534}
535
536impl<'root> Debug for DoubleCallReply<'root> {
537    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
538        f.debug_struct("DoubleCallReply")
539            .field("reply", &self.reply)
540            .field("value", &self.to_double())
541            .finish()
542    }
543}
544
545impl<'root> Display for DoubleCallReply<'root> {
546    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
547        fmt::Display::fmt(&self.to_double(), f)
548    }
549}
550
551pub struct BigNumberCallReply<'root> {
552    reply: NonNull<RedisModuleCallReply>,
553    _dummy: PhantomData<&'root ()>,
554}
555
556impl<'root> BigNumberCallReply<'root> {
557    /// Return the big number value of the [BigNumberCallReply] as String.
558    /// Return None if the data is not a valid utf8
559    pub fn to_string(&self) -> Option<String> {
560        call_reply_big_number(self.reply.as_ptr())
561    }
562
563    /// Return the raw pointer to the underlying [RedisModuleCallReply].
564    pub fn get_raw(&self) -> *mut RedisModuleCallReply {
565        self.reply.as_ptr()
566    }
567}
568
569impl<'root> Drop for BigNumberCallReply<'root> {
570    fn drop(&mut self) {
571        free_call_reply(self.reply.as_ptr());
572    }
573}
574
575impl<'root> Debug for BigNumberCallReply<'root> {
576    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
577        f.debug_struct("BigNumberCallReply")
578            .field("reply", &self.reply)
579            .field("value", &self.to_string())
580            .finish()
581    }
582}
583
584impl<'root> Display for BigNumberCallReply<'root> {
585    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
586        fmt::Display::fmt(self.to_string().as_deref().unwrap_or("None"), f)
587    }
588}
589
590pub struct VerbatimStringCallReply<'root> {
591    reply: NonNull<RedisModuleCallReply>,
592    _dummy: PhantomData<&'root ()>,
593}
594
595impl<'root> VerbatimStringCallReply<'root> {
596    /// Return the raw pointer to the underlying [RedisModuleCallReply].
597    pub fn get_raw(&self) -> *mut RedisModuleCallReply {
598        self.reply.as_ptr()
599    }
600}
601
602/// RESP3 state that the verbatim string format must be of length 3.
603const VERBATIM_FORMAT_LENGTH: usize = 3;
604/// The string format of a verbatim string ([VerbatimStringCallReply]).
605#[repr(transparent)]
606#[derive(Debug, Default, Copy, Clone, PartialEq)]
607pub struct VerbatimStringFormat(pub [c_char; VERBATIM_FORMAT_LENGTH]);
608
609impl TryFrom<&str> for VerbatimStringFormat {
610    type Error = RedisError;
611
612    fn try_from(value: &str) -> Result<Self, Self::Error> {
613        if value.len() != 3 {
614            return Err(RedisError::String(format!(
615                "Verbatim format length must be {VERBATIM_FORMAT_LENGTH}."
616            )));
617        }
618        let mut res = VerbatimStringFormat::default();
619        value.chars().take(3).enumerate().try_for_each(|(i, c)| {
620            if c as u32 >= 127 {
621                return Err(RedisError::String(
622                    "Verbatim format must contains only ASCI values.".to_owned(),
623                ));
624            }
625            res.0[i] = c as c_char;
626            Ok(())
627        })?;
628        Ok(res)
629    }
630}
631
632impl<'root> VerbatimStringCallReply<'root> {
633    /// Return the verbatim string value of the [VerbatimStringCallReply] as a tuple.
634    /// The first entry represents the format, the second entry represent the data.
635    /// Return None if the format is not a valid utf8
636    pub fn to_parts(&self) -> Option<(VerbatimStringFormat, Vec<u8>)> {
637        let (format, data) = self.as_parts()?;
638        Some((format.try_into().ok()?, data.to_vec()))
639    }
640
641    /// Borrow the verbatim string value of the [VerbatimStringCallReply] as a tuple.
642    /// The first entry represents the format as &str, the second entry represent the data as &[u8].
643    /// Return None if the format is not a valid utf8.
644    pub fn as_parts(&self) -> Option<(&str, &[u8])> {
645        // RESP3 state that veribatim string format must be of size 3.
646        const FORMAT_LEN: usize = 3;
647        let mut len: usize = 0;
648        let mut format: *const c_char = std::ptr::null();
649        let reply_string: *mut u8 = unsafe {
650            RedisModule_CallReplyVerbatim.unwrap()(self.reply.as_ptr(), &mut len, &mut format)
651                as *mut u8
652        };
653        Some((
654            std::str::from_utf8(unsafe { slice::from_raw_parts(format as *const u8, FORMAT_LEN) })
655                .ok()
656                .unwrap(),
657            unsafe { slice::from_raw_parts(reply_string, len) },
658        ))
659    }
660}
661
662impl<'root> Drop for VerbatimStringCallReply<'root> {
663    fn drop(&mut self) {
664        free_call_reply(self.reply.as_ptr());
665    }
666}
667
668impl<'root> Debug for VerbatimStringCallReply<'root> {
669    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
670        f.debug_struct("VerbatimStringCallReply")
671            .field("reply", &self.reply)
672            .field("value", &self.as_parts())
673            .finish()
674    }
675}
676
677impl<'root> Display for VerbatimStringCallReply<'root> {
678    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
679        match self.as_parts() {
680            Some((format, data)) => write!(f, "({}, {})", format, String::from_utf8_lossy(data)),
681            None => f.write_str("(None)"),
682        }
683    }
684}
685
686#[derive(Debug)]
687pub enum CallReply<'root> {
688    Unknown,
689    I64(I64CallReply<'root>),
690    String(StringCallReply<'root>),
691    Array(ArrayCallReply<'root>),
692    Null(NullCallReply<'root>),
693    Map(MapCallReply<'root>),
694    Set(SetCallReply<'root>),
695    Bool(BoolCallReply<'root>),
696    Double(DoubleCallReply<'root>),
697    BigNumber(BigNumberCallReply<'root>),
698    VerbatimString(VerbatimStringCallReply<'root>),
699}
700
701impl<'root> CallReply<'root> {
702    /// Return the raw pointer to the underlying [RedisModuleCallReply], or `None` if this is the `Unknown` variant.
703    pub fn get_raw(&self) -> Option<*mut RedisModuleCallReply> {
704        match self {
705            CallReply::Unknown => None,
706            CallReply::I64(inner) => Some(inner.get_raw()),
707            CallReply::String(inner) => Some(inner.get_raw()),
708            CallReply::Array(inner) => Some(inner.get_raw()),
709            CallReply::Null(inner) => Some(inner.get_raw()),
710            CallReply::Map(inner) => Some(inner.get_raw()),
711            CallReply::Set(inner) => Some(inner.get_raw()),
712            CallReply::Bool(inner) => Some(inner.get_raw()),
713            CallReply::Double(inner) => Some(inner.get_raw()),
714            CallReply::BigNumber(inner) => Some(inner.get_raw()),
715            CallReply::VerbatimString(inner) => Some(inner.get_raw()),
716        }
717    }
718}
719
720/// Send implementation to [CallReply].
721/// We need to implements this trait because [CallReply] hold
722/// raw pointers to C data which does not auto implement the [Send] trait.
723/// By implementing [Send] on [CallReply] we basically tells the compiler
724/// that it is safe to send the underline C data between threads.
725unsafe impl<'root> Send for CallReply<'root> {}
726
727impl<'root> Display for CallReply<'root> {
728    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
729        match self {
730            CallReply::Unknown => f.write_str("Unknown"),
731            CallReply::I64(inner) => fmt::Display::fmt(&inner, f),
732            CallReply::String(inner) => fmt::Display::fmt(&inner, f),
733            CallReply::Array(inner) => fmt::Display::fmt(&inner, f),
734            CallReply::Null(inner) => fmt::Display::fmt(&inner, f),
735            CallReply::Map(inner) => fmt::Display::fmt(&inner, f),
736            CallReply::Set(inner) => fmt::Display::fmt(&inner, f),
737            CallReply::Bool(inner) => fmt::Display::fmt(&inner, f),
738            CallReply::Double(inner) => fmt::Display::fmt(&inner, f),
739            CallReply::BigNumber(inner) => fmt::Display::fmt(&inner, f),
740            CallReply::VerbatimString(inner) => fmt::Display::fmt(&inner, f),
741        }
742    }
743}
744
745fn create_call_reply<'root>(reply: NonNull<RedisModuleCallReply>) -> CallResult<'root> {
746    let ty = call_reply_type(reply.as_ptr());
747    match ty {
748        ReplyType::Unknown => Ok(CallReply::Unknown), // unknown means NULL so no need to free free anything
749        ReplyType::Integer => Ok(CallReply::I64(I64CallReply {
750            reply,
751            _dummy: PhantomData,
752        })),
753        ReplyType::String => Ok(CallReply::String(StringCallReply {
754            reply,
755            _dummy: PhantomData,
756        })),
757        ReplyType::Error => Err(ErrorReply::RedisError(ErrorCallReply {
758            reply,
759            _dummy: PhantomData,
760        })),
761        ReplyType::Array => Ok(CallReply::Array(ArrayCallReply {
762            reply,
763            _dummy: PhantomData,
764        })),
765        ReplyType::Null => Ok(CallReply::Null(NullCallReply {
766            reply,
767            _dummy: PhantomData,
768        })),
769        ReplyType::Map => Ok(CallReply::Map(MapCallReply {
770            reply,
771            _dummy: PhantomData,
772        })),
773        ReplyType::Set => Ok(CallReply::Set(SetCallReply {
774            reply,
775            _dummy: PhantomData,
776        })),
777        ReplyType::Bool => Ok(CallReply::Bool(BoolCallReply {
778            reply,
779            _dummy: PhantomData,
780        })),
781        ReplyType::Double => Ok(CallReply::Double(DoubleCallReply {
782            reply,
783            _dummy: PhantomData,
784        })),
785        ReplyType::BigNumber => Ok(CallReply::BigNumber(BigNumberCallReply {
786            reply,
787            _dummy: PhantomData,
788        })),
789        ReplyType::VerbatimString => Ok(CallReply::VerbatimString(VerbatimStringCallReply {
790            reply,
791            _dummy: PhantomData,
792        })),
793    }
794}
795
796fn fmt_call_result(res: CallResult<'_>, f: &mut Formatter<'_>) -> fmt::Result {
797    match res {
798        Ok(r) => fmt::Display::fmt(&r, f),
799        Err(e) => fmt::Display::fmt(&e, f),
800    }
801}
802
803pub type CallResult<'root> = Result<CallReply<'root>, ErrorReply<'root>>;
804
805pub struct FutureHandler<C: FnOnce(&Context, CallResult<'static>)> {
806    reply: NonNull<RedisModuleCallReply>,
807    _dummy: PhantomData<C>,
808    reply_freed: bool,
809}
810
811impl<C> FutureHandler<C>
812where
813    C: FnOnce(&Context, CallResult<'static>),
814{
815    /// Dispose the future, handler. This function must be called in order to
816    /// release the [FutureHandler]. The reason we must have a dispose function
817    /// and we can not use the Drop is that [FutureHandler] must be released
818    /// when the Redis GIL is held. This is also why this function also gets a
819    /// lock indicator.
820    pub fn dispose<LockIndicator: RedisLockIndicator>(mut self, _lock_indicator: &LockIndicator) {
821        free_call_reply(self.reply.as_ptr());
822        self.reply_freed = true;
823    }
824
825    /// Aborts the invocation of the blocking commands. Return [Status::Ok] on
826    /// success and [Status::Err] on failure. In case of success it is promised
827    /// that the unblock handler will not be called.
828    /// The function also dispose the [FutureHandler].
829    pub fn abort_and_dispose<LockIndicator: RedisLockIndicator>(
830        self,
831        lock_indicator: &LockIndicator,
832    ) -> Status {
833        let mut callback: *mut C = std::ptr::null_mut();
834        let res = unsafe {
835            RedisModule_CallReplyPromiseAbort
836                .expect("RedisModule_CallReplyPromiseAbort is expected to be available if we got a promise call reply")
837                (self.reply.as_ptr(), &mut callback as *mut *mut C as *mut *mut c_void)
838        }.into();
839
840        if !callback.is_null() {
841            unsafe { deallocate_pointer(callback) };
842        }
843
844        self.dispose(lock_indicator);
845
846        res
847    }
848}
849
850impl<C: FnOnce(&Context, CallResult<'static>)> Drop for FutureHandler<C> {
851    fn drop(&mut self) {
852        if !self.reply_freed {
853            log::warn!("Memory leak detected!!! FutureHandler was freed without disposed.")
854        }
855    }
856}
857
858/// A future call reply struct that will be return in case
859/// the module invoke a blocking command using [call_blocking].
860/// This struct can be used to set unblock handler. Notice that the
861/// struct can not outlive the `ctx lifetime, This is because
862/// the future handler must be set before the Redis GIL will
863/// be released.
864pub struct FutureCallReply<'ctx> {
865    _ctx: &'ctx Context,
866    reply: Option<NonNull<RedisModuleCallReply>>,
867}
868
869extern "C" fn on_unblock<C: FnOnce(&Context, CallResult<'static>)>(
870    ctx: *mut RedisModuleCtx,
871    reply: *mut RedisModuleCallReply,
872    private_data: *mut ::std::os::raw::c_void,
873) {
874    let on_unblock = unsafe { Box::from_raw(private_data as *mut C) };
875    let ctx = Context::new(ctx);
876    let reply = NonNull::new(reply).map_or(Ok(CallReply::Unknown), create_call_reply);
877    on_unblock(&ctx, reply);
878}
879
880impl<'ctx> FutureCallReply<'ctx> {
881    /// Allow to set an handler that will be called when the command gets
882    /// unblock. Return [FutureHandler] that can be used to abort the command.
883    pub fn set_unblock_handler<C: FnOnce(&Context, CallResult<'static>)>(
884        mut self,
885        unblock_handler: C,
886    ) -> FutureHandler<C> {
887        let reply = self.reply.take().expect("Got a NULL future reply");
888        unsafe {
889            RedisModule_CallReplyPromiseSetUnblockHandler
890                .expect("RedisModule_CallReplyPromiseSetUnblockHandler is expected to be available if we got a promise call reply")
891                (reply.as_ptr(), Some(on_unblock::<C>), Box::into_raw(Box::new(unblock_handler)) as *mut c_void)
892        }
893        FutureHandler {
894            reply,
895            _dummy: PhantomData,
896            reply_freed: false,
897        }
898    }
899}
900
901impl<'ctx> Drop for FutureCallReply<'ctx> {
902    fn drop(&mut self) {
903        if let Some(v) = self.reply {
904            free_call_reply(v.as_ptr());
905        }
906    }
907}
908
909pub enum PromiseCallReply<'root, 'ctx> {
910    Resolved(CallResult<'root>),
911    Future(FutureCallReply<'ctx>),
912}
913
914pub(crate) fn create_promise_call_reply(
915    ctx: &Context,
916    reply: Option<NonNull<RedisModuleCallReply>>,
917) -> PromiseCallReply<'static, '_> {
918    reply.map_or(PromiseCallReply::Resolved(Ok(CallReply::Unknown)), |val| {
919        let ty = unsafe { RedisModule_CallReplyType.unwrap()(val.as_ptr()) };
920        if ty == REDISMODULE_REPLY_PROMISE as i32 {
921            return PromiseCallReply::Future(FutureCallReply {
922                _ctx: ctx,
923                reply: Some(val),
924            });
925        }
926        PromiseCallReply::Resolved(create_call_reply(val))
927    })
928}
929
930impl<'ctx> From<PromiseCallReply<'static, 'ctx>> for CallResult<'static> {
931    fn from(value: PromiseCallReply<'static, 'ctx>) -> Self {
932        match value {
933            PromiseCallReply::Resolved(c) => c,
934            PromiseCallReply::Future(_) => panic!("Got unexpected future call reply"),
935        }
936    }
937}