redis_module/context/
call_reply.rs

1use core::slice;
2use std::os::raw::c_char;
3use std::{
4    fmt,
5    fmt::{Debug, Display, Formatter},
6    marker::PhantomData,
7    ptr::NonNull,
8};
9
10use libc::c_void;
11
12use crate::{deallocate_pointer, raw::*, Context, RedisError, RedisLockIndicator};
13
14pub struct StringCallReply<'root> {
15    reply: NonNull<RedisModuleCallReply>,
16    _dummy: PhantomData<&'root ()>,
17}
18
19impl<'root> StringCallReply<'root> {
20    /// Convert StringCallReply to String.
21    /// Return None data is not a valid utf8.
22    pub fn to_string(&self) -> Option<String> {
23        String::from_utf8(self.as_bytes().to_vec()).ok()
24    }
25
26    /// Return the StringCallReply data as &[u8]
27    pub fn as_bytes(&self) -> &[u8] {
28        let mut len: usize = 0;
29        let reply_string: *mut u8 = unsafe {
30            RedisModule_CallReplyStringPtr.unwrap()(self.reply.as_ptr(), &mut len) as *mut u8
31        };
32        unsafe { slice::from_raw_parts(reply_string, len) }
33    }
34}
35
36impl<'root> Drop for StringCallReply<'root> {
37    fn drop(&mut self) {
38        free_call_reply(self.reply.as_ptr());
39    }
40}
41
42impl<'root> Debug for StringCallReply<'root> {
43    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
44        let mut debug_struct = f.debug_struct("StringCallReply");
45        let debug_struct = debug_struct.field("reply", &self.reply);
46        match self.to_string() {
47            Some(s) => debug_struct.field("value", &s),
48            None => debug_struct.field("value", &self.as_bytes()),
49        }
50        .finish()
51    }
52}
53
54impl<'root> Display for StringCallReply<'root> {
55    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
56        fmt::Display::fmt(&String::from_utf8_lossy(self.as_bytes()), f)
57    }
58}
59
60pub struct ErrorCallReply<'root> {
61    reply: NonNull<RedisModuleCallReply>,
62    _dummy: PhantomData<&'root ()>,
63}
64
65impl<'root> ErrorCallReply<'root> {
66    /// Convert ErrorCallReply to String.
67    /// Return None data is not a valid utf8.
68    pub fn to_utf8_string(&self) -> Option<String> {
69        String::from_utf8(self.as_bytes().to_vec()).ok()
70    }
71
72    /// Return the ErrorCallReply data as &[u8]
73    pub fn as_bytes(&self) -> &[u8] {
74        let mut len: usize = 0;
75        let reply_string: *mut u8 = unsafe {
76            RedisModule_CallReplyStringPtr.unwrap()(self.reply.as_ptr(), &mut len) as *mut u8
77        };
78        unsafe { slice::from_raw_parts(reply_string, len) }
79    }
80}
81
82impl<'root> Drop for ErrorCallReply<'root> {
83    fn drop(&mut self) {
84        free_call_reply(self.reply.as_ptr());
85    }
86}
87
88impl<'root> Debug for ErrorCallReply<'root> {
89    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
90        let mut debug_struct = f.debug_struct("ErrorCallReply");
91        let debug_struct = debug_struct.field("reply", &self.reply);
92        match self.to_utf8_string() {
93            Some(s) => debug_struct.field("value", &s),
94            None => debug_struct.field("value", &self.as_bytes()),
95        }
96        .finish()
97    }
98}
99
100impl<'root> Display for ErrorCallReply<'root> {
101    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
102        fmt::Display::fmt(&String::from_utf8_lossy(self.as_bytes()), f)
103    }
104}
105
106#[derive(Debug)]
107pub enum ErrorReply<'root> {
108    Message(String),
109    RedisError(ErrorCallReply<'root>),
110}
111
112/// Send implementation to [ErrorCallReply].
113/// We need to implements this trait because [ErrorCallReply] hold
114/// raw pointers to C data which does not auto implement the [Send] trait.
115/// By implementing [Send] on [ErrorCallReply] we basically tells the compiler
116/// that it is safe to send the underline C data between threads.
117unsafe impl<'root> Send for ErrorCallReply<'root> {}
118
119impl<'root> ErrorReply<'root> {
120    /// Convert [ErrorCallReply] to [String] or [None] if its not a valid utf8.
121    pub fn to_utf8_string(&self) -> Option<String> {
122        match self {
123            ErrorReply::Message(s) => Some(s.clone()),
124            ErrorReply::RedisError(r) => r.to_utf8_string(),
125        }
126    }
127
128    /// Return the ErrorCallReply data as &[u8]
129    pub fn as_bytes(&self) -> &[u8] {
130        match self {
131            ErrorReply::Message(s) => s.as_bytes(),
132            ErrorReply::RedisError(r) => r.as_bytes(),
133        }
134    }
135}
136
137impl<'root> Display for ErrorReply<'root> {
138    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
139        fmt::Display::fmt(&String::from_utf8_lossy(self.as_bytes()), f)
140    }
141}
142
143pub struct I64CallReply<'root> {
144    reply: NonNull<RedisModuleCallReply>,
145    _dummy: PhantomData<&'root ()>,
146}
147
148impl<'root> I64CallReply<'root> {
149    /// Return the i64 value of the [I64CallReply]
150    pub fn to_i64(&self) -> i64 {
151        call_reply_integer(self.reply.as_ptr())
152    }
153}
154
155impl<'root> Drop for I64CallReply<'root> {
156    fn drop(&mut self) {
157        free_call_reply(self.reply.as_ptr());
158    }
159}
160
161impl<'root> Debug for I64CallReply<'root> {
162    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
163        f.debug_struct("I64CallReply")
164            .field("reply", &self.reply)
165            .field("value", &self.to_i64())
166            .finish()
167    }
168}
169
170impl<'root> Display for I64CallReply<'root> {
171    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
172        fmt::Display::fmt(&self.to_i64(), f)
173    }
174}
175
176pub struct ArrayCallReply<'root> {
177    reply: NonNull<RedisModuleCallReply>,
178    _dummy: PhantomData<&'root ()>,
179}
180
181impl<'root> Drop for ArrayCallReply<'root> {
182    fn drop(&mut self) {
183        free_call_reply(self.reply.as_ptr());
184    }
185}
186
187impl<'root> ArrayCallReply<'root> {
188    /// Return an Iterator that allows to iterate over the elements
189    /// in the [ArrayCallReply].
190    pub fn iter(&self) -> ArrayCallReplyIterator<'root, '_> {
191        ArrayCallReplyIterator {
192            reply: self,
193            index: 0,
194        }
195    }
196
197    /// Return the array element on the given index.
198    pub fn get(&self, idx: usize) -> Option<CallResult<'_>> {
199        let res = NonNull::new(call_reply_array_element(self.reply.as_ptr(), idx))?;
200        Some(create_call_reply(res))
201    }
202
203    /// Return the number of elements in the [ArrayCallReply].
204    pub fn len(&self) -> usize {
205        call_reply_length(self.reply.as_ptr())
206    }
207}
208
209pub struct ArrayCallReplyIterator<'root, 'curr> {
210    reply: &'curr ArrayCallReply<'root>,
211    index: usize,
212}
213
214impl<'root, 'curr> Iterator for ArrayCallReplyIterator<'root, 'curr> {
215    type Item = CallResult<'curr>;
216
217    fn next(&mut self) -> Option<Self::Item> {
218        let res = self.reply.get(self.index);
219        if res.is_some() {
220            self.index += 1;
221        }
222        res
223    }
224}
225
226impl<'root> Debug for ArrayCallReply<'root> {
227    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
228        f.debug_struct("ArrayCallReply")
229            .field("reply", &self.reply)
230            .field("elements", &self.iter().collect::<Vec<CallResult>>())
231            .finish()
232    }
233}
234
235impl<'root> Display for ArrayCallReply<'root> {
236    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
237        f.write_str("[")?;
238
239        self.iter()
240            .enumerate()
241            .try_for_each(|(index, v)| -> fmt::Result {
242                if index > 1 {
243                    f.write_str(", ")?;
244                }
245                fmt_call_result(v, f)
246            })?;
247
248        f.write_str("]")
249    }
250}
251
252pub struct NullCallReply<'root> {
253    reply: NonNull<RedisModuleCallReply>,
254    _dummy: PhantomData<&'root ()>,
255}
256
257impl<'root> Drop for NullCallReply<'root> {
258    fn drop(&mut self) {
259        free_call_reply(self.reply.as_ptr());
260    }
261}
262
263impl<'root> Debug for NullCallReply<'root> {
264    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
265        f.debug_struct("NullCallReply")
266            .field("reply", &self.reply)
267            .finish()
268    }
269}
270
271impl<'root> Display for NullCallReply<'root> {
272    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
273        f.write_str("Null")
274    }
275}
276
277pub struct MapCallReply<'root> {
278    reply: NonNull<RedisModuleCallReply>,
279    _dummy: PhantomData<&'root ()>,
280}
281
282impl<'root> MapCallReply<'root> {
283    /// Return an iterator over the elements in the [MapCallReply].
284    /// The iterator return each element as a tuple representing the
285    /// key and the value.
286    pub fn iter(&self) -> MapCallReplyIterator<'root, '_> {
287        MapCallReplyIterator {
288            reply: self,
289            index: 0,
290        }
291    }
292
293    /// Return the map element on the given index.
294    pub fn get(&self, idx: usize) -> Option<(CallResult<'_>, CallResult<'_>)> {
295        let (key, val) = call_reply_map_element(self.reply.as_ptr(), idx);
296        Some((
297            create_call_reply(NonNull::new(key)?),
298            create_call_reply(NonNull::new(val)?),
299        ))
300    }
301
302    /// Return the number of elements in the [MapCallReply].
303    pub fn len(&self) -> usize {
304        call_reply_length(self.reply.as_ptr())
305    }
306}
307
308pub struct MapCallReplyIterator<'root, 'curr> {
309    reply: &'curr MapCallReply<'root>,
310    index: usize,
311}
312
313impl<'root, 'curr> Iterator for MapCallReplyIterator<'root, 'curr> {
314    type Item = (CallResult<'curr>, CallResult<'curr>);
315
316    fn next(&mut self) -> Option<Self::Item> {
317        let res = self.reply.get(self.index);
318        if res.is_some() {
319            self.index += 1;
320        }
321        res
322    }
323}
324
325impl<'root> Drop for MapCallReply<'root> {
326    fn drop(&mut self) {
327        free_call_reply(self.reply.as_ptr());
328    }
329}
330
331impl<'root> Debug for MapCallReply<'root> {
332    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
333        f.debug_struct("MapCallReply")
334            .field("reply", &self.reply)
335            .field(
336                "elements",
337                &self.iter().collect::<Vec<(CallResult, CallResult)>>(),
338            )
339            .finish()
340    }
341}
342
343impl<'root> Display for MapCallReply<'root> {
344    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
345        f.write_str("{")?;
346
347        self.iter()
348            .enumerate()
349            .try_for_each(|(index, (key, val))| -> fmt::Result {
350                if index > 1 {
351                    f.write_str(", ")?;
352                }
353                f.write_str("")?;
354                fmt_call_result(key, f)?;
355                f.write_str(": ")?;
356                fmt_call_result(val, f)
357            })?;
358
359        f.write_str("}")
360    }
361}
362
363pub struct SetCallReply<'root> {
364    reply: NonNull<RedisModuleCallReply>,
365    _dummy: PhantomData<&'root ()>,
366}
367
368impl<'root> SetCallReply<'root> {
369    /// Return an iterator over the elements in the [SetCallReply].
370    pub fn iter(&self) -> SetCallReplyIterator<'root, '_> {
371        SetCallReplyIterator {
372            reply: self,
373            index: 0,
374        }
375    }
376
377    /// Return the set element on the given index.
378    pub fn get(&self, idx: usize) -> Option<CallResult<'_>> {
379        let res = NonNull::new(call_reply_set_element(self.reply.as_ptr(), idx))?;
380        Some(create_call_reply(res))
381    }
382
383    /// Return the number of elements in the [SetCallReply].
384    pub fn len(&self) -> usize {
385        call_reply_length(self.reply.as_ptr())
386    }
387}
388
389pub struct SetCallReplyIterator<'root, 'curr> {
390    reply: &'curr SetCallReply<'root>,
391    index: usize,
392}
393
394impl<'root, 'curr> Iterator for SetCallReplyIterator<'root, 'curr> {
395    type Item = CallResult<'curr>;
396
397    fn next(&mut self) -> Option<Self::Item> {
398        let res = self.reply.get(self.index);
399        if res.is_some() {
400            self.index += 1;
401        }
402        res
403    }
404}
405
406impl<'root> Drop for SetCallReply<'root> {
407    fn drop(&mut self) {
408        free_call_reply(self.reply.as_ptr());
409    }
410}
411
412impl<'root> Debug for SetCallReply<'root> {
413    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
414        f.debug_struct("SetCallReply")
415            .field("reply", &self.reply)
416            .field("elements", &self.iter().collect::<Vec<CallResult>>())
417            .finish()
418    }
419}
420
421impl<'root> Display for SetCallReply<'root> {
422    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
423        f.write_str("{")?;
424
425        self.iter()
426            .enumerate()
427            .try_for_each(|(index, v)| -> fmt::Result {
428                if index > 1 {
429                    f.write_str(", ")?;
430                }
431                fmt_call_result(v, f)
432            })?;
433
434        f.write_str("}")
435    }
436}
437
438pub struct BoolCallReply<'root> {
439    reply: NonNull<RedisModuleCallReply>,
440    _dummy: PhantomData<&'root ()>,
441}
442
443impl<'root> BoolCallReply<'root> {
444    /// Return the boolean value of the [BoolCallReply].
445    pub fn to_bool(&self) -> bool {
446        call_reply_bool(self.reply.as_ptr())
447    }
448}
449
450impl<'root> Drop for BoolCallReply<'root> {
451    fn drop(&mut self) {
452        free_call_reply(self.reply.as_ptr());
453    }
454}
455
456impl<'root> Debug for BoolCallReply<'root> {
457    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
458        f.debug_struct("BoolCallReply")
459            .field("reply", &self.reply)
460            .field("value", &self.to_bool())
461            .finish()
462    }
463}
464
465impl<'root> Display for BoolCallReply<'root> {
466    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
467        fmt::Display::fmt(&self.to_bool(), f)
468    }
469}
470
471pub struct DoubleCallReply<'root> {
472    reply: NonNull<RedisModuleCallReply>,
473    _dummy: PhantomData<&'root ()>,
474}
475
476impl<'root> DoubleCallReply<'root> {
477    /// Return the double value of the [BoolCallReply] as f64.
478    pub fn to_double(&self) -> f64 {
479        call_reply_double(self.reply.as_ptr())
480    }
481}
482
483impl<'root> Drop for DoubleCallReply<'root> {
484    fn drop(&mut self) {
485        free_call_reply(self.reply.as_ptr());
486    }
487}
488
489impl<'root> Debug for DoubleCallReply<'root> {
490    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
491        f.debug_struct("DoubleCallReply")
492            .field("reply", &self.reply)
493            .field("value", &self.to_double())
494            .finish()
495    }
496}
497
498impl<'root> Display for DoubleCallReply<'root> {
499    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
500        fmt::Display::fmt(&self.to_double(), f)
501    }
502}
503
504pub struct BigNumberCallReply<'root> {
505    reply: NonNull<RedisModuleCallReply>,
506    _dummy: PhantomData<&'root ()>,
507}
508
509impl<'root> BigNumberCallReply<'root> {
510    /// Return the big number value of the [BigNumberCallReply] as String.
511    /// Return None if the data is not a valid utf8
512    pub fn to_string(&self) -> Option<String> {
513        call_reply_big_number(self.reply.as_ptr())
514    }
515}
516
517impl<'root> Drop for BigNumberCallReply<'root> {
518    fn drop(&mut self) {
519        free_call_reply(self.reply.as_ptr());
520    }
521}
522
523impl<'root> Debug for BigNumberCallReply<'root> {
524    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
525        f.debug_struct("BigNumberCallReply")
526            .field("reply", &self.reply)
527            .field("value", &self.to_string())
528            .finish()
529    }
530}
531
532impl<'root> Display for BigNumberCallReply<'root> {
533    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
534        fmt::Display::fmt(self.to_string().as_deref().unwrap_or("None"), f)
535    }
536}
537
538pub struct VerbatimStringCallReply<'root> {
539    reply: NonNull<RedisModuleCallReply>,
540    _dummy: PhantomData<&'root ()>,
541}
542
543/// RESP3 state that the verbatim string format must be of length 3.
544const VERBATIM_FORMAT_LENGTH: usize = 3;
545/// The string format of a verbatim string ([VerbatimStringCallReply]).
546#[repr(transparent)]
547#[derive(Debug, Default, Copy, Clone, PartialEq)]
548pub struct VerbatimStringFormat(pub [c_char; VERBATIM_FORMAT_LENGTH]);
549
550impl TryFrom<&str> for VerbatimStringFormat {
551    type Error = RedisError;
552
553    fn try_from(value: &str) -> Result<Self, Self::Error> {
554        if value.len() != 3 {
555            return Err(RedisError::String(format!(
556                "Verbatim format length must be {VERBATIM_FORMAT_LENGTH}."
557            )));
558        }
559        let mut res = VerbatimStringFormat::default();
560        value.chars().take(3).enumerate().try_for_each(|(i, c)| {
561            if c as u32 >= 127 {
562                return Err(RedisError::String(
563                    "Verbatim format must contains only ASCI values.".to_owned(),
564                ));
565            }
566            res.0[i] = c as c_char;
567            Ok(())
568        })?;
569        Ok(res)
570    }
571}
572
573impl<'root> VerbatimStringCallReply<'root> {
574    /// Return the verbatim string value of the [VerbatimStringCallReply] as a tuple.
575    /// The first entry represents the format, the second entry represent the data.
576    /// Return None if the format is not a valid utf8
577    pub fn to_parts(&self) -> Option<(VerbatimStringFormat, Vec<u8>)> {
578        let (format, data) = self.as_parts()?;
579        Some((format.try_into().ok()?, data.to_vec()))
580    }
581
582    /// Borrow the verbatim string value of the [VerbatimStringCallReply] as a tuple.
583    /// The first entry represents the format as &str, the second entry represent the data as &[u8].
584    /// Return None if the format is not a valid utf8.
585    pub fn as_parts(&self) -> Option<(&str, &[u8])> {
586        // RESP3 state that veribatim string format must be of size 3.
587        const FORMAT_LEN: usize = 3;
588        let mut len: usize = 0;
589        let mut format: *const c_char = std::ptr::null();
590        let reply_string: *mut u8 = unsafe {
591            RedisModule_CallReplyVerbatim.unwrap()(self.reply.as_ptr(), &mut len, &mut format)
592                as *mut u8
593        };
594        Some((
595            std::str::from_utf8(unsafe { slice::from_raw_parts(format as *const u8, FORMAT_LEN) })
596                .ok()
597                .unwrap(),
598            unsafe { slice::from_raw_parts(reply_string, len) },
599        ))
600    }
601}
602
603impl<'root> Drop for VerbatimStringCallReply<'root> {
604    fn drop(&mut self) {
605        free_call_reply(self.reply.as_ptr());
606    }
607}
608
609impl<'root> Debug for VerbatimStringCallReply<'root> {
610    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
611        f.debug_struct("VerbatimStringCallReply")
612            .field("reply", &self.reply)
613            .field("value", &self.as_parts())
614            .finish()
615    }
616}
617
618impl<'root> Display for VerbatimStringCallReply<'root> {
619    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
620        match self.as_parts() {
621            Some((format, data)) => write!(f, "({}, {})", format, String::from_utf8_lossy(data)),
622            None => f.write_str("(None)"),
623        }
624    }
625}
626
627#[derive(Debug)]
628pub enum CallReply<'root> {
629    Unknown,
630    I64(I64CallReply<'root>),
631    String(StringCallReply<'root>),
632    Array(ArrayCallReply<'root>),
633    Null(NullCallReply<'root>),
634    Map(MapCallReply<'root>),
635    Set(SetCallReply<'root>),
636    Bool(BoolCallReply<'root>),
637    Double(DoubleCallReply<'root>),
638    BigNumber(BigNumberCallReply<'root>),
639    VerbatimString(VerbatimStringCallReply<'root>),
640}
641
642/// Send implementation to [CallReply].
643/// We need to implements this trait because [CallReply] hold
644/// raw pointers to C data which does not auto implement the [Send] trait.
645/// By implementing [Send] on [CallReply] we basically tells the compiler
646/// that it is safe to send the underline C data between threads.
647unsafe impl<'root> Send for CallReply<'root> {}
648
649impl<'root> Display for CallReply<'root> {
650    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
651        match self {
652            CallReply::Unknown => f.write_str("Unknown"),
653            CallReply::I64(inner) => fmt::Display::fmt(&inner, f),
654            CallReply::String(inner) => fmt::Display::fmt(&inner, f),
655            CallReply::Array(inner) => fmt::Display::fmt(&inner, f),
656            CallReply::Null(inner) => fmt::Display::fmt(&inner, f),
657            CallReply::Map(inner) => fmt::Display::fmt(&inner, f),
658            CallReply::Set(inner) => fmt::Display::fmt(&inner, f),
659            CallReply::Bool(inner) => fmt::Display::fmt(&inner, f),
660            CallReply::Double(inner) => fmt::Display::fmt(&inner, f),
661            CallReply::BigNumber(inner) => fmt::Display::fmt(&inner, f),
662            CallReply::VerbatimString(inner) => fmt::Display::fmt(&inner, f),
663        }
664    }
665}
666
667fn create_call_reply<'root>(reply: NonNull<RedisModuleCallReply>) -> CallResult<'root> {
668    let ty = call_reply_type(reply.as_ptr());
669    match ty {
670        ReplyType::Unknown => Ok(CallReply::Unknown), // unknown means NULL so no need to free free anything
671        ReplyType::Integer => Ok(CallReply::I64(I64CallReply {
672            reply,
673            _dummy: PhantomData,
674        })),
675        ReplyType::String => Ok(CallReply::String(StringCallReply {
676            reply,
677            _dummy: PhantomData,
678        })),
679        ReplyType::Error => Err(ErrorReply::RedisError(ErrorCallReply {
680            reply,
681            _dummy: PhantomData,
682        })),
683        ReplyType::Array => Ok(CallReply::Array(ArrayCallReply {
684            reply,
685            _dummy: PhantomData,
686        })),
687        ReplyType::Null => Ok(CallReply::Null(NullCallReply {
688            reply,
689            _dummy: PhantomData,
690        })),
691        ReplyType::Map => Ok(CallReply::Map(MapCallReply {
692            reply,
693            _dummy: PhantomData,
694        })),
695        ReplyType::Set => Ok(CallReply::Set(SetCallReply {
696            reply,
697            _dummy: PhantomData,
698        })),
699        ReplyType::Bool => Ok(CallReply::Bool(BoolCallReply {
700            reply,
701            _dummy: PhantomData,
702        })),
703        ReplyType::Double => Ok(CallReply::Double(DoubleCallReply {
704            reply,
705            _dummy: PhantomData,
706        })),
707        ReplyType::BigNumber => Ok(CallReply::BigNumber(BigNumberCallReply {
708            reply,
709            _dummy: PhantomData,
710        })),
711        ReplyType::VerbatimString => Ok(CallReply::VerbatimString(VerbatimStringCallReply {
712            reply,
713            _dummy: PhantomData,
714        })),
715    }
716}
717
718fn fmt_call_result(res: CallResult<'_>, f: &mut Formatter<'_>) -> fmt::Result {
719    match res {
720        Ok(r) => fmt::Display::fmt(&r, f),
721        Err(e) => fmt::Display::fmt(&e, f),
722    }
723}
724
725pub type CallResult<'root> = Result<CallReply<'root>, ErrorReply<'root>>;
726
727pub struct FutureHandler<C: FnOnce(&Context, CallResult<'static>)> {
728    reply: NonNull<RedisModuleCallReply>,
729    _dummy: PhantomData<C>,
730    reply_freed: bool,
731}
732
733impl<C> FutureHandler<C>
734where
735    C: FnOnce(&Context, CallResult<'static>),
736{
737    /// Dispose the future, handler. This function must be called in order to
738    /// release the [FutureHandler]. The reason we must have a dispose function
739    /// and we can not use the Drop is that [FutureHandler] must be released
740    /// when the Redis GIL is held. This is also why this function also gets a
741    /// lock indicator.
742    pub fn dispose<LockIndicator: RedisLockIndicator>(mut self, _lock_indicator: &LockIndicator) {
743        free_call_reply(self.reply.as_ptr());
744        self.reply_freed = true;
745    }
746
747    /// Aborts the invocation of the blocking commands. Return [Status::Ok] on
748    /// success and [Status::Err] on failure. In case of success it is promised
749    /// that the unblock handler will not be called.
750    /// The function also dispose the [FutureHandler].
751    pub fn abort_and_dispose<LockIndicator: RedisLockIndicator>(
752        self,
753        lock_indicator: &LockIndicator,
754    ) -> Status {
755        let mut callback: *mut C = std::ptr::null_mut();
756        let res = unsafe {
757            RedisModule_CallReplyPromiseAbort
758                .expect("RedisModule_CallReplyPromiseAbort is expected to be available if we got a promise call reply")
759                (self.reply.as_ptr(), &mut callback as *mut *mut C as *mut *mut c_void)
760        }.into();
761
762        if !callback.is_null() {
763            unsafe { deallocate_pointer(callback) };
764        }
765
766        self.dispose(lock_indicator);
767
768        res
769    }
770}
771
772impl<C: FnOnce(&Context, CallResult<'static>)> Drop for FutureHandler<C> {
773    fn drop(&mut self) {
774        if !self.reply_freed {
775            log::warn!("Memory leak detected!!! FutureHandler was freed without disposed.")
776        }
777    }
778}
779
780/// A future call reply struct that will be return in case
781/// the module invoke a blocking command using [call_blocking].
782/// This struct can be used to set unblock handler. Notice that the
783/// struct can not outlive the `ctx lifetime, This is because
784/// the future handler must be set before the Redis GIL will
785/// be released.
786pub struct FutureCallReply<'ctx> {
787    _ctx: &'ctx Context,
788    reply: Option<NonNull<RedisModuleCallReply>>,
789}
790
791extern "C" fn on_unblock<C: FnOnce(&Context, CallResult<'static>)>(
792    ctx: *mut RedisModuleCtx,
793    reply: *mut RedisModuleCallReply,
794    private_data: *mut ::std::os::raw::c_void,
795) {
796    let on_unblock = unsafe { Box::from_raw(private_data as *mut C) };
797    let ctx = Context::new(ctx);
798    let reply = NonNull::new(reply).map_or(Ok(CallReply::Unknown), create_call_reply);
799    on_unblock(&ctx, reply);
800}
801
802impl<'ctx> FutureCallReply<'ctx> {
803    /// Allow to set an handler that will be called when the command gets
804    /// unblock. Return [FutureHandler] that can be used to abort the command.
805    pub fn set_unblock_handler<C: FnOnce(&Context, CallResult<'static>)>(
806        mut self,
807        unblock_handler: C,
808    ) -> FutureHandler<C> {
809        let reply = self.reply.take().expect("Got a NULL future reply");
810        unsafe {
811            RedisModule_CallReplyPromiseSetUnblockHandler
812                .expect("RedisModule_CallReplyPromiseSetUnblockHandler is expected to be available if we got a promise call reply")
813                (reply.as_ptr(), Some(on_unblock::<C>), Box::into_raw(Box::new(unblock_handler)) as *mut c_void)
814        }
815        FutureHandler {
816            reply,
817            _dummy: PhantomData,
818            reply_freed: false,
819        }
820    }
821}
822
823impl<'ctx> Drop for FutureCallReply<'ctx> {
824    fn drop(&mut self) {
825        if let Some(v) = self.reply {
826            free_call_reply(v.as_ptr());
827        }
828    }
829}
830
831pub enum PromiseCallReply<'root, 'ctx> {
832    Resolved(CallResult<'root>),
833    Future(FutureCallReply<'ctx>),
834}
835
836pub(crate) fn create_promise_call_reply(
837    ctx: &Context,
838    reply: Option<NonNull<RedisModuleCallReply>>,
839) -> PromiseCallReply<'static, '_> {
840    reply.map_or(PromiseCallReply::Resolved(Ok(CallReply::Unknown)), |val| {
841        let ty = unsafe { RedisModule_CallReplyType.unwrap()(val.as_ptr()) };
842        if ty == REDISMODULE_REPLY_PROMISE as i32 {
843            return PromiseCallReply::Future(FutureCallReply {
844                _ctx: ctx,
845                reply: Some(val),
846            });
847        }
848        PromiseCallReply::Resolved(create_call_reply(val))
849    })
850}
851
852impl<'ctx> From<PromiseCallReply<'static, 'ctx>> for CallResult<'static> {
853    fn from(value: PromiseCallReply<'static, 'ctx>) -> Self {
854        match value {
855            PromiseCallReply::Resolved(c) => c,
856            PromiseCallReply::Future(_) => panic!("Got unexpected future call reply"),
857        }
858    }
859}