1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186
use super::Context; use crate::block_client::BlockClient; use crate::raw; use crate::string::RStr; use crate::{FromPtr, GetPtr}; use std::os::raw::c_void; use std::time::Duration; impl Context { /// Get the key that is ready when the reply callback is called in the context /// of a client blocked by `Context::block_client_on_keys`. pub fn get_blocked_client_ready_key(&self) -> Option<RStr> { let p: *mut raw::RedisModuleString = unsafe { raw::RedisModule_GetBlockedClientReadyKey.unwrap()(self.ptr) }; if p.is_null() { None } else { Some(RStr::from_ptr(p)) } } /// This call is similar to `Context::block_client`, however in this case we /// don't just block the client, but also ask Redis to unblock it automatically /// once certain keys become "ready", that is, contain more data. /// /// Basically this is similar to what a typical Redis command usually does, /// like BLPOP or ZPOPMAX: the client blocks if it cannot be served ASAP, /// and later when the key receives new data (a list push for instance), the /// client is unblocked and served. /// /// However in the case of this module API, when the client is unblocked? /// /// 1. If you block ok a key of a type that has blocking operations associated, /// like a list, a sorted set, a stream, and so forth, the client may be /// unblocked once the relevant key is targeted by an operation that normally /// unblocks the native blocking operations for that type. So if we block /// on a list key, an RPUSH command may unblock our client and so forth. /// 2. If you are implementing your native data type, or if you want to add new /// unblocking conditions in addition to "1", you can call the modules API /// `Context::signal_key_as_ready`. /// /// Anyway we can't be sure if the client should be unblocked just because the /// key is signaled as ready: for instance a successive operation may change the /// key, or a client in queue before this one can be served, modifying the key /// as well and making it empty again. So when a client is blocked with /// `Context::block_client_on_keys` the reply callback is not called after /// `BlockClient::unlock` is called, but every time a key is signaled as ready: /// if the reply callback can serve the client, it returns OK and the client /// is unblocked, otherwise it will return ERR and we'll try again later. /// /// The reply callback can access the key that was signaled as ready by /// calling the API `Context::get_blocked_client_ready_key`, that returns /// just the string name of the key as a `string::RStr` object. /// /// Thanks to this system we can setup complex blocking scenarios, like /// unblocking a client only if a list contains at least 5 items or other /// more fancy logics. /// /// Note that another difference with `Context::block_client` is that here /// we pass the private data directly when blocking the client: it will /// be accessible later in the reply callback. Normally when blocking with /// `Context::block_client` the private data to reply to the client is /// passed when calling `BlockClient::unblock` but here the unblocking /// is performed by Redis itself, so we need to have some private data before /// hand. The private data is used to store any information about the specific /// unblocking operation that you are implementing. Such information will be /// freed using the free_privdata callback provided by the user. /// /// However the reply callback will be able to access the argument vector of /// the command, so the private data is often not needed. /// /// Note: Under normal circumstances `BlockClient::unblock` should not be /// called for clients that are blocked on keys (Either the key will /// become ready or a timeout will occur). If for some reason you do want /// to call `BlockClient::unblock` it is possible: Client will be /// handled as if it were timed-out (You must implement the timeout /// callback in that case). /// pub fn block_client_on_keys<T>( &self, reply_callbck: raw::RedisModuleCmdFunc, timeout_callback: raw::RedisModuleCmdFunc, free_privdata: raw::FreePrivateDataFunc, timeout_ms: Duration, keys: &[&RStr], privdata: T, ) -> Option<BlockClient> { let mut keys: Vec<*mut raw::RedisModuleString> = keys.iter().map(|s| s.get_ptr()).collect(); let data = Box::into_raw(Box::from(privdata)); let bc: *mut raw::RedisModuleBlockedClient = unsafe { raw::RedisModule_BlockClientOnKeys.unwrap()( self.ptr, reply_callbck, timeout_callback, free_privdata, timeout_ms.as_millis() as i64, keys.as_mut_ptr(), keys.len() as i32, data as *mut c_void, ) }; if bc.is_null() { None } else { Some(BlockClient::from_ptr(bc)) } } /// Block a client in the context of a blocking command, returning an handle /// which will be used, later, in order to unblock the client with a call to /// `BlockClient::unblock`. The arguments specify callback functions /// and a timeout after which the client is unblocked. /// /// The callbacks are called in the following contexts: /// reply_callback: called after a successful `BlockClient.unblock` /// call in order to reply to the client and unblock it. /// reply_timeout: called when the timeout is reached in order to send an /// error to the client. /// free_privdata: called in order to free the private data that is passed /// by `BlockClient.unblock` call. /// Note: `BlockClient.unblock` should be called for every blocked client, /// even if client was killed, timed-out or disconnected. Failing to do so /// will result in memory leaks. pub fn block_client( &self, reply_callbck: raw::RedisModuleCmdFunc, timeout_callback: raw::RedisModuleCmdFunc, free_privdata: raw::FreePrivateDataFunc, timeout_ms: Duration, ) -> Option<BlockClient> { let bc: *mut raw::RedisModuleBlockedClient = unsafe { raw::RedisModule_BlockClient.unwrap()( self.ptr, reply_callbck, timeout_callback, free_privdata, timeout_ms.as_millis() as i64, ) }; if bc.is_null() { None } else { Some(BlockClient::from_ptr(bc)) } } /// Return false if a module command was called in order to fill the /// reply for a blocked client. pub fn is_blocked_reply_request(&self) -> bool { let ret = unsafe { raw::RedisModule_IsBlockedReplyRequest.unwrap()(self.ptr) }; ret != 0 } /// Return false if a module command was called in order to fill the /// reply for a blocked client that timed out. pub fn is_blocked_timeout_request(&self) -> bool { let ret = unsafe { raw::RedisModule_IsBlockedTimeoutRequest.unwrap()(self.ptr) }; ret != 0 } /// Get the private data set by `BlockClient.unlock` pub fn get_block_client_private_data<T>(&self) -> &mut T { let data: *mut c_void = unsafe { raw::RedisModule_GetBlockedClientPrivateData.unwrap()(self.ptr) }; unsafe { &mut *(data as *mut T) } } /// Get the blocked client associated with a given context. /// This is useful in the reply and timeout callbacks of blocked clients, /// before sometimes the module has the blocked client handle references /// around, and wants to cleanup it pub fn get_block_client_handle(&self) -> Option<BlockClient> { let bc: *mut raw::RedisModuleBlockedClient = unsafe { raw::RedisModule_GetBlockedClientHandle.unwrap()(self.ptr) }; if bc.is_null() { None } else { Some(BlockClient::from_ptr(bc)) } } /// Return true if when the free callback of a blocked client is called, /// the reason for the client to be unblocked is that it disconnected /// while it was blocked. pub fn blocked_client_disconnected(&self) -> bool { let ret = unsafe { raw::RedisModule_BlockedClientDisconnected.unwrap()(self.ptr) }; ret != 0 } }