1use core::slice;
2use std::os::raw::c_char;
3use std::{
4 fmt,
5 fmt::{Debug, Display, Formatter},
6 marker::PhantomData,
7 ptr::NonNull,
8};
9
10use libc::c_void;
11
12use crate::{deallocate_pointer, raw::*, Context, RedisError, RedisLockIndicator};
13
14pub struct StringCallReply<'root> {
15 reply: NonNull<RedisModuleCallReply>,
16 _dummy: PhantomData<&'root ()>,
17}
18
19impl<'root> StringCallReply<'root> {
20 pub fn to_string(&self) -> Option<String> {
23 String::from_utf8(self.as_bytes().to_vec()).ok()
24 }
25
26 pub fn as_bytes(&self) -> &[u8] {
28 let mut len: usize = 0;
29 let reply_string: *mut u8 = unsafe {
30 RedisModule_CallReplyStringPtr.unwrap()(self.reply.as_ptr(), &mut len) as *mut u8
31 };
32 unsafe { slice::from_raw_parts(reply_string, len) }
33 }
34}
35
36impl<'root> Drop for StringCallReply<'root> {
37 fn drop(&mut self) {
38 free_call_reply(self.reply.as_ptr());
39 }
40}
41
42impl<'root> Debug for StringCallReply<'root> {
43 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
44 let mut debug_struct = f.debug_struct("StringCallReply");
45 let debug_struct = debug_struct.field("reply", &self.reply);
46 match self.to_string() {
47 Some(s) => debug_struct.field("value", &s),
48 None => debug_struct.field("value", &self.as_bytes()),
49 }
50 .finish()
51 }
52}
53
54impl<'root> Display for StringCallReply<'root> {
55 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
56 fmt::Display::fmt(&String::from_utf8_lossy(self.as_bytes()), f)
57 }
58}
59
60pub struct ErrorCallReply<'root> {
61 reply: NonNull<RedisModuleCallReply>,
62 _dummy: PhantomData<&'root ()>,
63}
64
65impl<'root> ErrorCallReply<'root> {
66 pub fn to_utf8_string(&self) -> Option<String> {
69 String::from_utf8(self.as_bytes().to_vec()).ok()
70 }
71
72 pub fn as_bytes(&self) -> &[u8] {
74 let mut len: usize = 0;
75 let reply_string: *mut u8 = unsafe {
76 RedisModule_CallReplyStringPtr.unwrap()(self.reply.as_ptr(), &mut len) as *mut u8
77 };
78 unsafe { slice::from_raw_parts(reply_string, len) }
79 }
80}
81
82impl<'root> Drop for ErrorCallReply<'root> {
83 fn drop(&mut self) {
84 free_call_reply(self.reply.as_ptr());
85 }
86}
87
88impl<'root> Debug for ErrorCallReply<'root> {
89 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
90 let mut debug_struct = f.debug_struct("ErrorCallReply");
91 let debug_struct = debug_struct.field("reply", &self.reply);
92 match self.to_utf8_string() {
93 Some(s) => debug_struct.field("value", &s),
94 None => debug_struct.field("value", &self.as_bytes()),
95 }
96 .finish()
97 }
98}
99
100impl<'root> Display for ErrorCallReply<'root> {
101 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
102 fmt::Display::fmt(&String::from_utf8_lossy(self.as_bytes()), f)
103 }
104}
105
106#[derive(Debug)]
107pub enum ErrorReply<'root> {
108 Message(String),
109 RedisError(ErrorCallReply<'root>),
110}
111
112unsafe impl<'root> Send for ErrorCallReply<'root> {}
118
119impl<'root> ErrorReply<'root> {
120 pub fn to_utf8_string(&self) -> Option<String> {
122 match self {
123 ErrorReply::Message(s) => Some(s.clone()),
124 ErrorReply::RedisError(r) => r.to_utf8_string(),
125 }
126 }
127
128 pub fn as_bytes(&self) -> &[u8] {
130 match self {
131 ErrorReply::Message(s) => s.as_bytes(),
132 ErrorReply::RedisError(r) => r.as_bytes(),
133 }
134 }
135}
136
137impl<'root> Display for ErrorReply<'root> {
138 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
139 fmt::Display::fmt(&String::from_utf8_lossy(self.as_bytes()), f)
140 }
141}
142
143pub struct I64CallReply<'root> {
144 reply: NonNull<RedisModuleCallReply>,
145 _dummy: PhantomData<&'root ()>,
146}
147
148impl<'root> I64CallReply<'root> {
149 pub fn to_i64(&self) -> i64 {
151 call_reply_integer(self.reply.as_ptr())
152 }
153}
154
155impl<'root> Drop for I64CallReply<'root> {
156 fn drop(&mut self) {
157 free_call_reply(self.reply.as_ptr());
158 }
159}
160
161impl<'root> Debug for I64CallReply<'root> {
162 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
163 f.debug_struct("I64CallReply")
164 .field("reply", &self.reply)
165 .field("value", &self.to_i64())
166 .finish()
167 }
168}
169
170impl<'root> Display for I64CallReply<'root> {
171 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
172 fmt::Display::fmt(&self.to_i64(), f)
173 }
174}
175
176pub struct ArrayCallReply<'root> {
177 reply: NonNull<RedisModuleCallReply>,
178 _dummy: PhantomData<&'root ()>,
179}
180
181impl<'root> Drop for ArrayCallReply<'root> {
182 fn drop(&mut self) {
183 free_call_reply(self.reply.as_ptr());
184 }
185}
186
187impl<'root> ArrayCallReply<'root> {
188 pub fn iter(&self) -> ArrayCallReplyIterator<'root, '_> {
191 ArrayCallReplyIterator {
192 reply: self,
193 index: 0,
194 }
195 }
196
197 pub fn get(&self, idx: usize) -> Option<CallResult<'_>> {
199 let res = NonNull::new(call_reply_array_element(self.reply.as_ptr(), idx))?;
200 Some(create_call_reply(res))
201 }
202
203 pub fn len(&self) -> usize {
205 call_reply_length(self.reply.as_ptr())
206 }
207}
208
209pub struct ArrayCallReplyIterator<'root, 'curr> {
210 reply: &'curr ArrayCallReply<'root>,
211 index: usize,
212}
213
214impl<'root, 'curr> Iterator for ArrayCallReplyIterator<'root, 'curr> {
215 type Item = CallResult<'curr>;
216
217 fn next(&mut self) -> Option<Self::Item> {
218 let res = self.reply.get(self.index);
219 if res.is_some() {
220 self.index += 1;
221 }
222 res
223 }
224}
225
226impl<'root> Debug for ArrayCallReply<'root> {
227 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
228 f.debug_struct("ArrayCallReply")
229 .field("reply", &self.reply)
230 .field("elements", &self.iter().collect::<Vec<CallResult>>())
231 .finish()
232 }
233}
234
235impl<'root> Display for ArrayCallReply<'root> {
236 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
237 f.write_str("[")?;
238
239 self.iter()
240 .enumerate()
241 .try_for_each(|(index, v)| -> fmt::Result {
242 if index > 1 {
243 f.write_str(", ")?;
244 }
245 fmt_call_result(v, f)
246 })?;
247
248 f.write_str("]")
249 }
250}
251
252pub struct NullCallReply<'root> {
253 reply: NonNull<RedisModuleCallReply>,
254 _dummy: PhantomData<&'root ()>,
255}
256
257impl<'root> Drop for NullCallReply<'root> {
258 fn drop(&mut self) {
259 free_call_reply(self.reply.as_ptr());
260 }
261}
262
263impl<'root> Debug for NullCallReply<'root> {
264 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
265 f.debug_struct("NullCallReply")
266 .field("reply", &self.reply)
267 .finish()
268 }
269}
270
271impl<'root> Display for NullCallReply<'root> {
272 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
273 f.write_str("Null")
274 }
275}
276
277pub struct MapCallReply<'root> {
278 reply: NonNull<RedisModuleCallReply>,
279 _dummy: PhantomData<&'root ()>,
280}
281
282impl<'root> MapCallReply<'root> {
283 pub fn iter(&self) -> MapCallReplyIterator<'root, '_> {
287 MapCallReplyIterator {
288 reply: self,
289 index: 0,
290 }
291 }
292
293 pub fn get(&self, idx: usize) -> Option<(CallResult<'_>, CallResult<'_>)> {
295 let (key, val) = call_reply_map_element(self.reply.as_ptr(), idx);
296 Some((
297 create_call_reply(NonNull::new(key)?),
298 create_call_reply(NonNull::new(val)?),
299 ))
300 }
301
302 pub fn len(&self) -> usize {
304 call_reply_length(self.reply.as_ptr())
305 }
306}
307
308pub struct MapCallReplyIterator<'root, 'curr> {
309 reply: &'curr MapCallReply<'root>,
310 index: usize,
311}
312
313impl<'root, 'curr> Iterator for MapCallReplyIterator<'root, 'curr> {
314 type Item = (CallResult<'curr>, CallResult<'curr>);
315
316 fn next(&mut self) -> Option<Self::Item> {
317 let res = self.reply.get(self.index);
318 if res.is_some() {
319 self.index += 1;
320 }
321 res
322 }
323}
324
325impl<'root> Drop for MapCallReply<'root> {
326 fn drop(&mut self) {
327 free_call_reply(self.reply.as_ptr());
328 }
329}
330
331impl<'root> Debug for MapCallReply<'root> {
332 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
333 f.debug_struct("MapCallReply")
334 .field("reply", &self.reply)
335 .field(
336 "elements",
337 &self.iter().collect::<Vec<(CallResult, CallResult)>>(),
338 )
339 .finish()
340 }
341}
342
343impl<'root> Display for MapCallReply<'root> {
344 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
345 f.write_str("{")?;
346
347 self.iter()
348 .enumerate()
349 .try_for_each(|(index, (key, val))| -> fmt::Result {
350 if index > 1 {
351 f.write_str(", ")?;
352 }
353 f.write_str("")?;
354 fmt_call_result(key, f)?;
355 f.write_str(": ")?;
356 fmt_call_result(val, f)
357 })?;
358
359 f.write_str("}")
360 }
361}
362
363pub struct SetCallReply<'root> {
364 reply: NonNull<RedisModuleCallReply>,
365 _dummy: PhantomData<&'root ()>,
366}
367
368impl<'root> SetCallReply<'root> {
369 pub fn iter(&self) -> SetCallReplyIterator<'root, '_> {
371 SetCallReplyIterator {
372 reply: self,
373 index: 0,
374 }
375 }
376
377 pub fn get(&self, idx: usize) -> Option<CallResult<'_>> {
379 let res = NonNull::new(call_reply_set_element(self.reply.as_ptr(), idx))?;
380 Some(create_call_reply(res))
381 }
382
383 pub fn len(&self) -> usize {
385 call_reply_length(self.reply.as_ptr())
386 }
387}
388
389pub struct SetCallReplyIterator<'root, 'curr> {
390 reply: &'curr SetCallReply<'root>,
391 index: usize,
392}
393
394impl<'root, 'curr> Iterator for SetCallReplyIterator<'root, 'curr> {
395 type Item = CallResult<'curr>;
396
397 fn next(&mut self) -> Option<Self::Item> {
398 let res = self.reply.get(self.index);
399 if res.is_some() {
400 self.index += 1;
401 }
402 res
403 }
404}
405
406impl<'root> Drop for SetCallReply<'root> {
407 fn drop(&mut self) {
408 free_call_reply(self.reply.as_ptr());
409 }
410}
411
412impl<'root> Debug for SetCallReply<'root> {
413 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
414 f.debug_struct("SetCallReply")
415 .field("reply", &self.reply)
416 .field("elements", &self.iter().collect::<Vec<CallResult>>())
417 .finish()
418 }
419}
420
421impl<'root> Display for SetCallReply<'root> {
422 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
423 f.write_str("{")?;
424
425 self.iter()
426 .enumerate()
427 .try_for_each(|(index, v)| -> fmt::Result {
428 if index > 1 {
429 f.write_str(", ")?;
430 }
431 fmt_call_result(v, f)
432 })?;
433
434 f.write_str("}")
435 }
436}
437
438pub struct BoolCallReply<'root> {
439 reply: NonNull<RedisModuleCallReply>,
440 _dummy: PhantomData<&'root ()>,
441}
442
443impl<'root> BoolCallReply<'root> {
444 pub fn to_bool(&self) -> bool {
446 call_reply_bool(self.reply.as_ptr())
447 }
448}
449
450impl<'root> Drop for BoolCallReply<'root> {
451 fn drop(&mut self) {
452 free_call_reply(self.reply.as_ptr());
453 }
454}
455
456impl<'root> Debug for BoolCallReply<'root> {
457 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
458 f.debug_struct("BoolCallReply")
459 .field("reply", &self.reply)
460 .field("value", &self.to_bool())
461 .finish()
462 }
463}
464
465impl<'root> Display for BoolCallReply<'root> {
466 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
467 fmt::Display::fmt(&self.to_bool(), f)
468 }
469}
470
471pub struct DoubleCallReply<'root> {
472 reply: NonNull<RedisModuleCallReply>,
473 _dummy: PhantomData<&'root ()>,
474}
475
476impl<'root> DoubleCallReply<'root> {
477 pub fn to_double(&self) -> f64 {
479 call_reply_double(self.reply.as_ptr())
480 }
481}
482
483impl<'root> Drop for DoubleCallReply<'root> {
484 fn drop(&mut self) {
485 free_call_reply(self.reply.as_ptr());
486 }
487}
488
489impl<'root> Debug for DoubleCallReply<'root> {
490 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
491 f.debug_struct("DoubleCallReply")
492 .field("reply", &self.reply)
493 .field("value", &self.to_double())
494 .finish()
495 }
496}
497
498impl<'root> Display for DoubleCallReply<'root> {
499 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
500 fmt::Display::fmt(&self.to_double(), f)
501 }
502}
503
504pub struct BigNumberCallReply<'root> {
505 reply: NonNull<RedisModuleCallReply>,
506 _dummy: PhantomData<&'root ()>,
507}
508
509impl<'root> BigNumberCallReply<'root> {
510 pub fn to_string(&self) -> Option<String> {
513 call_reply_big_number(self.reply.as_ptr())
514 }
515}
516
517impl<'root> Drop for BigNumberCallReply<'root> {
518 fn drop(&mut self) {
519 free_call_reply(self.reply.as_ptr());
520 }
521}
522
523impl<'root> Debug for BigNumberCallReply<'root> {
524 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
525 f.debug_struct("BigNumberCallReply")
526 .field("reply", &self.reply)
527 .field("value", &self.to_string())
528 .finish()
529 }
530}
531
532impl<'root> Display for BigNumberCallReply<'root> {
533 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
534 fmt::Display::fmt(self.to_string().as_deref().unwrap_or("None"), f)
535 }
536}
537
538pub struct VerbatimStringCallReply<'root> {
539 reply: NonNull<RedisModuleCallReply>,
540 _dummy: PhantomData<&'root ()>,
541}
542
543const VERBATIM_FORMAT_LENGTH: usize = 3;
545#[repr(transparent)]
547#[derive(Debug, Default, Copy, Clone, PartialEq)]
548pub struct VerbatimStringFormat(pub [c_char; VERBATIM_FORMAT_LENGTH]);
549
550impl TryFrom<&str> for VerbatimStringFormat {
551 type Error = RedisError;
552
553 fn try_from(value: &str) -> Result<Self, Self::Error> {
554 if value.len() != 3 {
555 return Err(RedisError::String(format!(
556 "Verbatim format length must be {VERBATIM_FORMAT_LENGTH}."
557 )));
558 }
559 let mut res = VerbatimStringFormat::default();
560 value.chars().take(3).enumerate().try_for_each(|(i, c)| {
561 if c as u32 >= 127 {
562 return Err(RedisError::String(
563 "Verbatim format must contains only ASCI values.".to_owned(),
564 ));
565 }
566 res.0[i] = c as c_char;
567 Ok(())
568 })?;
569 Ok(res)
570 }
571}
572
573impl<'root> VerbatimStringCallReply<'root> {
574 pub fn to_parts(&self) -> Option<(VerbatimStringFormat, Vec<u8>)> {
578 let (format, data) = self.as_parts()?;
579 Some((format.try_into().ok()?, data.to_vec()))
580 }
581
582 pub fn as_parts(&self) -> Option<(&str, &[u8])> {
586 const FORMAT_LEN: usize = 3;
588 let mut len: usize = 0;
589 let mut format: *const c_char = std::ptr::null();
590 let reply_string: *mut u8 = unsafe {
591 RedisModule_CallReplyVerbatim.unwrap()(self.reply.as_ptr(), &mut len, &mut format)
592 as *mut u8
593 };
594 Some((
595 std::str::from_utf8(unsafe { slice::from_raw_parts(format as *const u8, FORMAT_LEN) })
596 .ok()
597 .unwrap(),
598 unsafe { slice::from_raw_parts(reply_string, len) },
599 ))
600 }
601}
602
603impl<'root> Drop for VerbatimStringCallReply<'root> {
604 fn drop(&mut self) {
605 free_call_reply(self.reply.as_ptr());
606 }
607}
608
609impl<'root> Debug for VerbatimStringCallReply<'root> {
610 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
611 f.debug_struct("VerbatimStringCallReply")
612 .field("reply", &self.reply)
613 .field("value", &self.as_parts())
614 .finish()
615 }
616}
617
618impl<'root> Display for VerbatimStringCallReply<'root> {
619 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
620 match self.as_parts() {
621 Some((format, data)) => write!(f, "({}, {})", format, String::from_utf8_lossy(data)),
622 None => f.write_str("(None)"),
623 }
624 }
625}
626
627#[derive(Debug)]
628pub enum CallReply<'root> {
629 Unknown,
630 I64(I64CallReply<'root>),
631 String(StringCallReply<'root>),
632 Array(ArrayCallReply<'root>),
633 Null(NullCallReply<'root>),
634 Map(MapCallReply<'root>),
635 Set(SetCallReply<'root>),
636 Bool(BoolCallReply<'root>),
637 Double(DoubleCallReply<'root>),
638 BigNumber(BigNumberCallReply<'root>),
639 VerbatimString(VerbatimStringCallReply<'root>),
640}
641
642unsafe impl<'root> Send for CallReply<'root> {}
648
649impl<'root> Display for CallReply<'root> {
650 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
651 match self {
652 CallReply::Unknown => f.write_str("Unknown"),
653 CallReply::I64(inner) => fmt::Display::fmt(&inner, f),
654 CallReply::String(inner) => fmt::Display::fmt(&inner, f),
655 CallReply::Array(inner) => fmt::Display::fmt(&inner, f),
656 CallReply::Null(inner) => fmt::Display::fmt(&inner, f),
657 CallReply::Map(inner) => fmt::Display::fmt(&inner, f),
658 CallReply::Set(inner) => fmt::Display::fmt(&inner, f),
659 CallReply::Bool(inner) => fmt::Display::fmt(&inner, f),
660 CallReply::Double(inner) => fmt::Display::fmt(&inner, f),
661 CallReply::BigNumber(inner) => fmt::Display::fmt(&inner, f),
662 CallReply::VerbatimString(inner) => fmt::Display::fmt(&inner, f),
663 }
664 }
665}
666
667fn create_call_reply<'root>(reply: NonNull<RedisModuleCallReply>) -> CallResult<'root> {
668 let ty = call_reply_type(reply.as_ptr());
669 match ty {
670 ReplyType::Unknown => Ok(CallReply::Unknown), ReplyType::Integer => Ok(CallReply::I64(I64CallReply {
672 reply,
673 _dummy: PhantomData,
674 })),
675 ReplyType::String => Ok(CallReply::String(StringCallReply {
676 reply,
677 _dummy: PhantomData,
678 })),
679 ReplyType::Error => Err(ErrorReply::RedisError(ErrorCallReply {
680 reply,
681 _dummy: PhantomData,
682 })),
683 ReplyType::Array => Ok(CallReply::Array(ArrayCallReply {
684 reply,
685 _dummy: PhantomData,
686 })),
687 ReplyType::Null => Ok(CallReply::Null(NullCallReply {
688 reply,
689 _dummy: PhantomData,
690 })),
691 ReplyType::Map => Ok(CallReply::Map(MapCallReply {
692 reply,
693 _dummy: PhantomData,
694 })),
695 ReplyType::Set => Ok(CallReply::Set(SetCallReply {
696 reply,
697 _dummy: PhantomData,
698 })),
699 ReplyType::Bool => Ok(CallReply::Bool(BoolCallReply {
700 reply,
701 _dummy: PhantomData,
702 })),
703 ReplyType::Double => Ok(CallReply::Double(DoubleCallReply {
704 reply,
705 _dummy: PhantomData,
706 })),
707 ReplyType::BigNumber => Ok(CallReply::BigNumber(BigNumberCallReply {
708 reply,
709 _dummy: PhantomData,
710 })),
711 ReplyType::VerbatimString => Ok(CallReply::VerbatimString(VerbatimStringCallReply {
712 reply,
713 _dummy: PhantomData,
714 })),
715 }
716}
717
718fn fmt_call_result(res: CallResult<'_>, f: &mut Formatter<'_>) -> fmt::Result {
719 match res {
720 Ok(r) => fmt::Display::fmt(&r, f),
721 Err(e) => fmt::Display::fmt(&e, f),
722 }
723}
724
725pub type CallResult<'root> = Result<CallReply<'root>, ErrorReply<'root>>;
726
727pub struct FutureHandler<C: FnOnce(&Context, CallResult<'static>)> {
728 reply: NonNull<RedisModuleCallReply>,
729 _dummy: PhantomData<C>,
730 reply_freed: bool,
731}
732
733impl<C> FutureHandler<C>
734where
735 C: FnOnce(&Context, CallResult<'static>),
736{
737 pub fn dispose<LockIndicator: RedisLockIndicator>(mut self, _lock_indicator: &LockIndicator) {
743 free_call_reply(self.reply.as_ptr());
744 self.reply_freed = true;
745 }
746
747 pub fn abort_and_dispose<LockIndicator: RedisLockIndicator>(
752 self,
753 lock_indicator: &LockIndicator,
754 ) -> Status {
755 let mut callback: *mut C = std::ptr::null_mut();
756 let res = unsafe {
757 RedisModule_CallReplyPromiseAbort
758 .expect("RedisModule_CallReplyPromiseAbort is expected to be available if we got a promise call reply")
759 (self.reply.as_ptr(), &mut callback as *mut *mut C as *mut *mut c_void)
760 }.into();
761
762 if !callback.is_null() {
763 unsafe { deallocate_pointer(callback) };
764 }
765
766 self.dispose(lock_indicator);
767
768 res
769 }
770}
771
772impl<C: FnOnce(&Context, CallResult<'static>)> Drop for FutureHandler<C> {
773 fn drop(&mut self) {
774 if !self.reply_freed {
775 log::warn!("Memory leak detected!!! FutureHandler was freed without disposed.")
776 }
777 }
778}
779
780pub struct FutureCallReply<'ctx> {
787 _ctx: &'ctx Context,
788 reply: Option<NonNull<RedisModuleCallReply>>,
789}
790
791extern "C" fn on_unblock<C: FnOnce(&Context, CallResult<'static>)>(
792 ctx: *mut RedisModuleCtx,
793 reply: *mut RedisModuleCallReply,
794 private_data: *mut ::std::os::raw::c_void,
795) {
796 let on_unblock = unsafe { Box::from_raw(private_data as *mut C) };
797 let ctx = Context::new(ctx);
798 let reply = NonNull::new(reply).map_or(Ok(CallReply::Unknown), create_call_reply);
799 on_unblock(&ctx, reply);
800}
801
802impl<'ctx> FutureCallReply<'ctx> {
803 pub fn set_unblock_handler<C: FnOnce(&Context, CallResult<'static>)>(
806 mut self,
807 unblock_handler: C,
808 ) -> FutureHandler<C> {
809 let reply = self.reply.take().expect("Got a NULL future reply");
810 unsafe {
811 RedisModule_CallReplyPromiseSetUnblockHandler
812 .expect("RedisModule_CallReplyPromiseSetUnblockHandler is expected to be available if we got a promise call reply")
813 (reply.as_ptr(), Some(on_unblock::<C>), Box::into_raw(Box::new(unblock_handler)) as *mut c_void)
814 }
815 FutureHandler {
816 reply,
817 _dummy: PhantomData,
818 reply_freed: false,
819 }
820 }
821}
822
823impl<'ctx> Drop for FutureCallReply<'ctx> {
824 fn drop(&mut self) {
825 if let Some(v) = self.reply {
826 free_call_reply(v.as_ptr());
827 }
828 }
829}
830
831pub enum PromiseCallReply<'root, 'ctx> {
832 Resolved(CallResult<'root>),
833 Future(FutureCallReply<'ctx>),
834}
835
836pub(crate) fn create_promise_call_reply(
837 ctx: &Context,
838 reply: Option<NonNull<RedisModuleCallReply>>,
839) -> PromiseCallReply<'static, '_> {
840 reply.map_or(PromiseCallReply::Resolved(Ok(CallReply::Unknown)), |val| {
841 let ty = unsafe { RedisModule_CallReplyType.unwrap()(val.as_ptr()) };
842 if ty == REDISMODULE_REPLY_PROMISE as i32 {
843 return PromiseCallReply::Future(FutureCallReply {
844 _ctx: ctx,
845 reply: Some(val),
846 });
847 }
848 PromiseCallReply::Resolved(create_call_reply(val))
849 })
850}
851
852impl<'ctx> From<PromiseCallReply<'static, 'ctx>> for CallResult<'static> {
853 fn from(value: PromiseCallReply<'static, 'ctx>) -> Self {
854 match value {
855 PromiseCallReply::Resolved(c) => c,
856 PromiseCallReply::Future(_) => panic!("Got unexpected future call reply"),
857 }
858 }
859}