redis_objects/
lib.rs

1//! An object oriented wrapper around certain redis objects.
2
3#![warn(missing_docs, non_ascii_idents, trivial_numeric_casts,
4    unused_crate_dependencies, noop_method_call, single_use_lifetimes, trivial_casts,
5    unused_lifetimes, nonstandard_style, variant_size_differences)]
6#![deny(keyword_idents)]
7// #![warn(clippy::missing_docs_in_private_items)]
8// #![allow(clippy::needless_return)]
9// #![allow(clippy::while_let_on_iterator, clippy::collapsible_else_if)]
10
11use std::sync::Arc;
12use std::time::Duration;
13
14use log::debug;
15use queue::MultiQueue;
16use quota::UserQuotaTracker;
17use redis::AsyncCommands;
18pub use redis::Msg;
19use serde::Serialize;
20use serde::de::DeserializeOwned;
21use tokio::sync::mpsc;
22
23pub use self::queue::PriorityQueue;
24pub use self::queue::Queue;
25// pub use self::quota::QuotaGuard;
26pub use self::hashmap::Hashmap;
27pub use self::counters::{AutoExportingMetrics, AutoExportingMetricsBuilder, MetricMessage};
28pub use self::pubsub::{JsonListenerBuilder, ListenerBuilder, Publisher};
29pub use self::set::Set;
30
31pub mod queue;
32pub mod quota;
33pub mod hashmap;
34pub mod counters;
35pub mod pubsub;
36pub mod set;
37
38/// Handle for a pool of connections to a redis server.
39pub struct RedisObjects {
40    pool: deadpool_redis::Pool,
41    client: redis::Client,
42}
43
44impl RedisObjects {
45    /// Open given more limited connection info
46    pub fn open_host(host: &str, port: u16, db: i64) -> Result<Arc<Self>, ErrorTypes> {
47        Self::open(redis::ConnectionInfo{
48            addr: redis::ConnectionAddr::Tcp(host.to_string(), port),
49            redis: redis::RedisConnectionInfo {
50                db,
51                ..Default::default()
52            },
53        })
54    }
55   
56    /// Open a connection using the local tls configuration
57    pub fn open_host_native_tls(host: &str, port: u16, db: i64) -> Result<Arc<Self>, ErrorTypes> {
58        Self::open(redis::ConnectionInfo{
59            addr: redis::ConnectionAddr::TcpTls { 
60                host: host.to_string(), 
61                port, 
62                insecure: false, 
63                tls_params: None,
64            },
65            redis: redis::RedisConnectionInfo {
66                db,
67                ..Default::default()
68            },
69        })
70    }
71
72    /// Open a connection pool
73    pub fn open(config: redis::ConnectionInfo) -> Result<Arc<Self>, ErrorTypes> {
74        debug!("Create redis connection pool.");
75        let cfg = deadpool_redis::Config::from_connection_info(config.clone());
76        let pool = cfg.create_pool(Some(deadpool_redis::Runtime::Tokio1))?;
77        let client = redis::Client::open(config)?;
78        Ok(Arc::new(Self{ 
79            pool,
80            client,
81        }))
82    }
83
84    /// Open a priority queue under the given key
85    pub fn priority_queue<T: Serialize + DeserializeOwned>(self: &Arc<Self>, name: String) -> PriorityQueue<T> {
86        PriorityQueue::new(name, self.clone())
87    }
88
89    /// Open a FIFO queue under the given key
90    pub fn queue<T: Serialize + DeserializeOwned>(self: &Arc<Self>, name: String, ttl: Option<Duration>) -> Queue<T> {
91        Queue::new(name, self.clone(), ttl)
92    }
93
94    /// an object that represents a set of queues with a common prefix
95    pub fn multiqueue<T: Serialize + DeserializeOwned>(self: &Arc<Self>, prefix: String) -> MultiQueue<T> {
96        MultiQueue::new(prefix, self.clone())
97    }
98
99    /// Open a hash map under the given key
100    pub fn hashmap<T: Serialize + DeserializeOwned>(self: &Arc<Self>, name: String, ttl: Option<Duration>) -> Hashmap<T> {
101        Hashmap::new(name, self.clone(), ttl)
102    }
103
104    /// Create a sink to publish messages to a named channel
105    pub fn publisher(self: &Arc<Self>, channel: String) -> Publisher {
106        Publisher::new(self.clone(), channel)
107    }
108
109    /// Write a message directly to the channel given
110    pub async fn publish(&self, channel: &str, data: &[u8]) -> Result<u32, ErrorTypes> {
111        retry_call!(self.pool, publish, channel, data)
112    }
113
114    /// Write a json message directly to the channel given
115    pub async fn publish_json<T: Serialize>(&self, channel: &str, value: &T) -> Result<u32, ErrorTypes> {
116        self.publish(channel, &serde_json::to_vec(value)?).await
117    }
118
119    /// Start building a metrics exporter
120    pub fn auto_exporting_metrics<T: MetricMessage>(self: &Arc<Self>, name: String, counter_type: String) -> AutoExportingMetricsBuilder<T> {
121        AutoExportingMetricsBuilder::new(self.clone(), name, counter_type)
122    }
123
124    /// Start building a json listener that produces a stream of events
125    pub fn pubsub_json_listener<T: DeserializeOwned + Send + 'static>(self: &Arc<Self>) -> JsonListenerBuilder<T> {
126        JsonListenerBuilder::new(self.clone())
127    }
128
129    /// Build a json listener that produces a stream of events using the default configuration
130    pub async fn subscribe_json<T: DeserializeOwned + Send + 'static>(self: &Arc<Self>, channel: String) -> mpsc::Receiver<Option<T>> {
131        self.pubsub_json_listener()
132            .subscribe(channel)
133            .listen().await
134    }
135
136    /// Start building a raw data listener that produces a stream of events
137    pub fn pubsub_listener(self: &Arc<Self>) -> ListenerBuilder {
138        ListenerBuilder::new(self.clone())
139    }
140
141    /// Build a raw data listener that produces a stream of events using the default configuration
142    pub async fn subscribe(self: &Arc<Self>, channel: String) -> mpsc::Receiver<Option<Msg>> {
143        self.pubsub_listener()
144            .subscribe(channel)
145            .listen().await
146    }
147
148    /// Open an interface for tracking user quotas on redis
149    pub fn user_quota_tracker(self: &Arc<Self>, prefix: String) -> UserQuotaTracker {
150        UserQuotaTracker::new(self.clone(), prefix)
151    }
152
153    /// Open a set of values
154    pub fn set<T: Serialize + DeserializeOwned>(self: &Arc<Self>, name: String) -> Set<T> {
155        Set::new(name, self.clone(), None)
156    }
157
158    /// Open a set of values with an expiration policy
159    pub fn expiring_set<T: Serialize + DeserializeOwned>(self: &Arc<Self>, name: String, ttl: Option<Duration>) -> Set<T> {
160        let ttl = ttl.unwrap_or_else(|| Duration::from_secs(86400));
161        Set::new(name, self.clone(), Some(ttl))
162    }
163
164    /// Erase all data on the redis server
165    pub async fn wipe(&self) -> Result<(), ErrorTypes> {
166        let mut con = self.pool.get().await?;
167        let _: () = redis::cmd("FLUSHDB").arg("SYNC").query_async(&mut con).await?;
168        Ok(())
169    }
170
171    /// List all keys on the redis server that satisfies the given pattern
172    pub async fn keys(&self, pattern: &str) -> Result<Vec<String>, ErrorTypes> {
173        Ok(retry_call!(self.pool, keys, pattern)?)
174    }
175
176}
177
178/// Enumeration over all possible errors
179#[derive(Debug)]
180pub enum ErrorTypes {
181    /// There is something wrong with the redis configuration
182    Configuration(Box<deadpool_redis::CreatePoolError>),
183    /// Could not get a connection from the redis connection pool
184    Pool(Box<deadpool_redis::PoolError>),
185    /// Returned by the redis server
186    Redis(Box<redis::RedisError>),
187    /// Unexpected result from the redis server
188    UnknownRedisError,
189    /// Could not serialize or deserialize a payload
190    Serde(serde_json::Error),
191}
192
193impl ErrorTypes {
194    /// Test if an error was created in serializing or deserializing data
195    pub fn is_serialize_error(&self) -> bool {
196        matches!(self, ErrorTypes::Serde(_))
197    }
198}
199
200
201impl std::fmt::Display for ErrorTypes {
202    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
203        match self {
204            ErrorTypes::Configuration(err) => write!(f, "Redis configuration: {err}"),
205            ErrorTypes::Pool(err) => write!(f, "Redis connection pool: {err}"),
206            ErrorTypes::Redis(err) => write!(f, "Redis runtime error: {err}"),
207            ErrorTypes::UnknownRedisError => write!(f, "Unexpected response from redis server"),
208            ErrorTypes::Serde(err) => write!(f, "Encoding issue with message: {err}"),
209        }
210    }
211}
212
213impl From<deadpool_redis::CreatePoolError> for ErrorTypes {
214    fn from(value: deadpool_redis::CreatePoolError) -> Self { Self::Configuration(Box::new(value)) }
215}
216
217impl From<deadpool_redis::PoolError> for ErrorTypes {
218    fn from(value: deadpool_redis::PoolError) -> Self { Self::Pool(Box::new(value)) }
219}
220
221impl From<redis::RedisError> for ErrorTypes {
222    fn from(value: redis::RedisError) -> Self { Self::Redis(Box::new(value)) }
223}
224
225impl From<serde_json::Error> for ErrorTypes {
226    fn from(value: serde_json::Error) -> Self { Self::Serde(value) }
227}
228
229impl std::error::Error for ErrorTypes {}
230
231/// A convenience trait that lets you pass an i32 value or None for arguments
232pub trait Ii32: Into<Option<i32>> + Copy {}
233impl<T: Into<Option<i32>> + Copy> Ii32 for T {}
234
235/// A convenience trait that lets you pass an usize value or None for arguments
236pub trait Iusize: Into<Option<usize>> + Copy {}
237impl<T: Into<Option<usize>> + Copy> Iusize for T {}
238
239
240/// A macro for retrying calls to redis when an IO error occurs
241macro_rules! retry_call {
242
243    (handle_error, $err:ident, $exponent:ident, $maximum:ident) => {
244        {
245            // If the error from redis is something not related to IO let the error propagate
246            if !$err.is_io_error() {
247                break Result::<_, ErrorTypes>::Err($err.into())
248            }
249
250            // For IO errors print a warning and sleep
251            log::warn!("No connection to Redis, reconnecting... [{}]", $err);
252            tokio::time::sleep(tokio::time::Duration::from_secs_f64(2f64.powf($exponent))).await;
253            $exponent = ($exponent + 1.0).min($maximum);
254        }
255    };
256
257    (handle_output, $call:expr, $exponent:ident, $maximum:ident) => {
258        {
259            match $call {
260                Ok(val) => {
261                    if $exponent > -7.0 {
262                        log::info!("Reconnected to Redis!")
263                    }
264                    break Ok(val)
265                },
266                Err(err) => retry_call!(handle_error, err, $exponent, $maximum),
267            }
268        }
269    };
270
271    ($pool:expr, $method:ident, $($args:expr),+) => {
272        {
273            // track our backoff parameters
274            let mut exponent = -7.0;
275            let maximum = 3.0;
276            loop {
277                // get a (fresh if needed) connection form the pool
278                let mut con = match $pool.get().await {
279                    Ok(connection) => connection,
280                    Err(deadpool_redis::PoolError::Backend(err)) => {
281                        retry_call!(handle_error, err, exponent, maximum);
282                        continue
283                    },
284                    Err(err) => break Err(err.into())
285                };
286
287                // execute the method given with the argments specified
288                retry_call!(handle_output, con.$method($($args),+).await, exponent, maximum)
289            }
290        }
291    };
292
293    (method, $pool:expr, $obj:expr, $method:ident) => {
294        {
295            // track our backoff parameters
296            let mut exponent = -7.0;
297            let maximum = 3.0;
298            loop {
299                // get a (fresh if needed) connection form the pool
300                let mut con = match $pool.get().await {
301                    Ok(connection) => connection,
302                    Err(deadpool_redis::PoolError::Backend(err)) => {
303                        retry_call!(handle_error, err, exponent, maximum);
304                        continue
305                    },
306                    Err(err) => break Err(err.into())
307                };
308                
309                // execute the method given with the argments specified
310                retry_call!(handle_output, $obj.$method(&mut con).await, exponent, maximum)
311            }
312        }
313    };
314}
315
316pub (crate) use retry_call;
317
318#[cfg(test)]
319pub (crate) mod test {
320    use std::sync::Arc;
321
322    use redis::ConnectionInfo;
323
324    use crate::{ErrorTypes, PriorityQueue, Queue, RedisObjects};
325
326
327    pub (crate) async fn redis_connection() -> Arc<RedisObjects> {
328        RedisObjects::open(ConnectionInfo{
329            addr: redis::ConnectionAddr::Tcp("localhost".to_string(), 6379),
330            redis: Default::default(),
331        }).unwrap()
332    }
333
334    fn init() {
335        let _ = env_logger::builder().is_test(true).try_init();
336    }
337
338    // simple function to help test that reconnect is working. 
339    // Enable and run this then turn your redis server on and off.
340    // #[tokio::test]
341    // async fn reconnect() {
342    //     init();
343    //     let connection = redis_connection().await;
344    //     let mut listener = connection.subscribe("abc123".to_string());
345    //     // launch a thread that sends messages every second forever
346    //     tokio::spawn(async move {
347    //         loop {
348    //             tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
349    //             connection.publish("abc123", b"100").await.unwrap();
350    //         }
351    //     });
352        
353    //     while let Some(msg) = listener.recv().await {
354    //         println!("{msg:?}");
355    //     }
356    // }
357    
358    #[tokio::test]
359    async fn test_sets() {
360        init();
361        let redis = redis_connection().await;
362        let s = redis.set::<String>("test-set".to_owned());
363
364        s.delete().await.unwrap();
365
366        let values = &["a", "b", "1", "2"];
367        let owned = values.iter().map(|v|v.to_string()).collect::<Vec<_>>();
368        assert_eq!(s.add_batch(&owned).await.unwrap(), 4);
369        assert_eq!(s.length().await.unwrap(), 4);
370        let members = s.members().await.unwrap();
371        assert_eq!(members.len(), 4);
372        for x in members {
373            assert!(owned.contains(&x));
374        }
375        assert!(owned.contains(&s.random().await.unwrap().unwrap()));
376        assert!(s.exist(&owned[2]).await.unwrap());
377        s.remove(&owned[2]).await.unwrap();
378        assert!(!s.exist(&owned[2]).await.unwrap());
379        let pop_val = s.pop().await.unwrap().unwrap();
380        assert!(owned.contains(&pop_val));
381        assert!(!s.exist(&pop_val).await.unwrap());
382        assert_eq!(s.length().await.unwrap(), 2);
383
384        assert!(s.limited_add(&"dog".to_owned(), 3).await.unwrap());
385        assert!(!s.limited_add(&"cat".to_owned(), 3).await.unwrap());
386        assert!(s.exist(&"dog".to_owned()).await.unwrap());
387        assert!(!s.exist(&"cat".to_owned()).await.unwrap());
388        assert_eq!(s.length().await.unwrap(), 3);
389
390        for pop_val in s.pop_all().await.unwrap() {
391            assert!(values.contains(&pop_val.as_str()) || ["cat", "dog"].contains(&pop_val.as_str()));
392        }
393        assert!(s.pop().await.unwrap().is_none());
394        assert_eq!(s.length().await.unwrap(), 0);
395    }
396    
397    
398    // def test_expiring_sets(redis_connection):
399    //     if redis_connection:
400    //         from assemblyline.remote.datatypes.set import ExpiringSet
401    //         with ExpiringSet('test-expiring-set', ttl=1) as es:
402    //             es.delete()
403    
404    //             values = ['a', 'b', 1, 2]
405    //             assert es.add(*values) == 4
406    //             assert es.length() == 4
407    //             assert es.exist(values[2])
408    //             for x in es.members():
409    //                 assert x in values
410    //             time.sleep(1.1)
411    //             assert es.length() == 0
412    //             assert not es.exist(values[2])
413    
414    
415    // # noinspection PyShadowingNames
416    // def test_lock(redis_connection):
417    //     if redis_connection:
418    //         from assemblyline.remote.datatypes.lock import Lock
419    
420    //         def locked_execution(next_thread=None):
421    //             with Lock('test', 10):
422    //                 if next_thread:
423    //                     next_thread.start()
424    //                 time.sleep(2)
425    
426    //         t2 = Thread(target=locked_execution)
427    //         t1 = Thread(target=locked_execution, args=(t2,))
428    //         t1.start()
429    
430    //         time.sleep(1)
431    //         assert t1.is_alive()
432    //         assert t2.is_alive()
433    //         time.sleep(2)
434    //         assert not t1.is_alive()
435    //         assert t2.is_alive()
436    //         time.sleep(2)
437    //         assert not t1.is_alive()
438    //         assert not t2.is_alive()
439    
440    #[tokio::test]
441    async fn priority_queue() -> Result<(), ErrorTypes> {
442        let redis = redis_connection().await;
443        let pq = redis.priority_queue("test-priority-queue".to_string());
444        pq.delete().await?;
445
446        for x in 0..10 {
447            pq.push(100.0, &x.to_string()).await?;
448        }
449
450        let a_key = pq.push(101.0, &"a".to_string()).await?;
451        let z_key = pq.push(99.0, &"z".to_string()).await?;
452        assert_eq!(pq.rank(&a_key).await?.unwrap(), 0);
453        assert_eq!(pq.rank(&z_key).await?.unwrap(), pq.length().await? - 1);
454        assert!(pq.rank(b"onethuosentuh").await?.is_none());
455
456        assert_eq!(pq.pop(1).await?, ["a"]);
457        assert_eq!(pq.unpush(1).await?, ["z"]);
458        assert_eq!(pq.count(100.0, 100.0).await?, 10);
459        assert_eq!(pq.pop(1).await?, ["0"]);
460        assert_eq!(pq.unpush(1).await?, ["9"]);
461        assert_eq!(pq.length().await?, 8);
462        assert_eq!(pq.pop(4).await?, ["1", "2", "3", "4"]);
463        assert_eq!(pq.unpush(3).await?, ["8", "7", "6"]);
464        assert_eq!(pq.length().await?, 1);
465        // Should be [(100, 5)] at this point
466
467        // for x in 0..5 {
468        for x in 0..5 {
469            pq.push(100.0 + x as f64, &x.to_string()).await?;
470        }
471
472        assert_eq!(pq.length().await?, 6);
473        assert!(pq.dequeue_range(Some(106), None, None, None).await?.is_empty());
474        assert_eq!(pq.length().await?, 6);
475        // 3 and 4 are both options, 4 has higher score
476        assert_eq!(pq.dequeue_range(Some(103), None, None, None).await?, vec!["4"]);
477        // 2 and 3 are both options, 3 has higher score, skip it
478        assert_eq!(pq.dequeue_range(Some(102), None, Some(1), None).await?, vec!["2"]);
479        // Take some off the other end
480        assert_eq!(pq.dequeue_range(None, Some(100), None, Some(10)).await?, vec!["5", "0"]);
481        assert_eq!(pq.length().await?, 2);
482
483        let other = redis.priority_queue("second-priority-queue".to_string());
484        other.delete().await?;
485        other.push(100.0, &"a".to_string()).await?;
486        assert_eq!(PriorityQueue::all_length(&[&other, &pq]).await?, [1, 2]);
487        assert!(PriorityQueue::select(&[&other, &pq], None).await?.is_some());
488        assert!(PriorityQueue::select(&[&other, &pq], None).await?.is_some());
489        assert!(PriorityQueue::select(&[&other, &pq], None).await?.is_some());
490        assert_eq!(PriorityQueue::all_length(&[&other, &pq]).await?, [0, 0]);
491
492        pq.push(50.0, &"first".to_string()).await?;
493        pq.push(-50.0, &"second".to_string()).await?;
494
495        assert_eq!(pq.dequeue_range(Some(0), Some(100), None, None).await?, ["first"]);
496        assert_eq!(pq.dequeue_range(Some(-100), Some(0), None, None).await?, ["second"]);
497        Ok(())
498    }
499    
500    
501    // # noinspection PyShadowingNames,PyUnusedLocal
502    // def test_unique_priority_queue(redis_connection):
503    //     from assemblyline.remote.datatypes.queues.priority import UniquePriorityQueue
504    //     with UniquePriorityQueue('test-priority-queue') as pq:
505    //         pq.delete()
506    
507    //         for x in range(10):
508    //             pq.push(100, x)
509    //         assert pq.length() == 10
510    
511    //         # Values should be unique, this should have no effect on the length
512    //         for x in range(10):
513    //             pq.push(100, x)
514    //         assert pq.length() == 10
515    
516    //         pq.push(101, 'a')
517    //         pq.push(99, 'z')
518    
519    //         assert pq.pop() == 'a'
520    //         assert pq.unpush() == 'z'
521    //         assert pq.count(100, 100) == 10
522    //         assert pq.pop() == 0
523    //         assert pq.unpush() == 9
524    //         assert pq.length() == 8
525    //         assert pq.pop(4) == [1, 2, 3, 4]
526    //         assert pq.unpush(3) == [8, 7, 6]
527    //         assert pq.length() == 1  # Should be [<100, 5>] at this point
528    
529    //         for x in range(5):
530    //             pq.push(100 + x, x)
531    
532    //         assert pq.length() == 6
533    //         assert pq.dequeue_range(lower_limit=106) == []
534    //         assert pq.length() == 6
535    //         assert pq.dequeue_range(lower_limit=103) == [4]  # 3 and 4 are both options, 4 has higher score
536    //         assert pq.dequeue_range(lower_limit=102, skip=1) == [2]  # 2 and 3 are both options, 3 has higher score, skip it
537    //         assert sorted(pq.dequeue_range(upper_limit=100, num=10)) == [0, 5]  # Take some off the other end
538    //         assert pq.length() == 2
539    //         pq.pop(2)
540    
541    //         pq.push(50, 'first')
542    //         pq.push(-50, 'second')
543    
544    //         assert pq.dequeue_range(0, 100) == ['first']
545    //         assert pq.dequeue_range(-100, 0) == ['second']   
546    
547    #[tokio::test]
548    async fn named_queue() {
549        let redis = redis_connection().await;
550
551        let nq = redis.queue("test-named-queue".to_owned(), None);
552        nq.delete().await.unwrap();
553
554        assert!(nq.pop().await.unwrap().is_none());
555        assert!(nq.pop_batch(100).await.unwrap().is_empty());
556
557        for x in 0..5 {
558            nq.push(&x).await.unwrap();
559        }
560
561        assert_eq!(nq.content().await.unwrap(), [0, 1, 2, 3, 4]);
562
563        assert_eq!(nq.pop_batch(100).await.unwrap(), [0, 1, 2, 3, 4]);
564
565        for x in 0..5 {
566            nq.push(&x).await.unwrap();
567        }
568
569        assert_eq!(nq.length().await.unwrap(), 5);
570        nq.push_batch(&(0..5).collect::<Vec<i32>>()).await.unwrap();
571        assert_eq!(nq.length().await.unwrap(), 10);
572
573        assert_eq!(nq.peek_next().await.unwrap(), nq.pop().await.unwrap());
574        assert_eq!(nq.peek_next().await.unwrap(), Some(1));
575        let v = nq.pop().await.unwrap().unwrap();
576        assert_eq!(v, 1);
577        assert_eq!(nq.peek_next().await.unwrap().unwrap(), 2);
578        nq.unpop(&v).await.unwrap();
579        assert_eq!(nq.peek_next().await.unwrap().unwrap(), 1);
580
581        assert_eq!(Queue::select(&[&nq], None).await.unwrap().unwrap(), ("test-named-queue".to_owned(), 1));
582
583        let nq1 = redis.queue("test-named-queue-1".to_owned(), None);
584        nq1.delete().await.unwrap();
585        let nq2 = redis.queue("test-named-queue-2".to_owned(), None);
586        nq2.delete().await.unwrap();
587
588        nq1.push(&1).await.unwrap();
589        nq2.push(&2).await.unwrap();
590
591        assert_eq!(Queue::select(&[&nq1, &nq2], None).await.unwrap().unwrap(), ("test-named-queue-1".to_owned(), 1));
592        assert_eq!(Queue::select(&[&nq1, &nq2], None).await.unwrap().unwrap(), ("test-named-queue-2".to_owned(), 2));
593    }
594
595    // # noinspection PyShadowingNames
596    // def test_multi_queue(redis_connection):
597    //     if redis_connection:
598    //         from assemblyline.remote.datatypes.queues.multi import MultiQueue
599    //         mq = MultiQueue()
600    //         mq.delete('test-multi-q1')
601    //         mq.delete('test-multi-q2')
602    
603    //         for x in range(5):
604    //             mq.push('test-multi-q1', x+1)
605    //             mq.push('test-multi-q2', x+6)
606    
607    //         assert mq.length('test-multi-q1') == 5
608    //         assert mq.length('test-multi-q2') == 5
609    
610    //         assert mq.pop('test-multi-q1') == 1
611    //         assert mq.pop('test-multi-q2') == 6
612    
613    //         assert mq.length('test-multi-q1') == 4
614    //         assert mq.length('test-multi-q2') == 4
615    
616    //         mq.delete('test-multi-q1')
617    //         mq.delete('test-multi-q2')
618    
619    //         assert mq.length('test-multi-q1') == 0
620    //         assert mq.length('test-multi-q2') == 0
621    
622    
623    // # noinspection PyShadowingNames
624    // def test_comms_queue(redis_connection):
625    //     if redis_connection:
626    //         from assemblyline.remote.datatypes.queues.comms import CommsQueue
627    
628    //         def publish_messages(message_list):
629    //             time.sleep(0.1)
630    //             with CommsQueue('test-comms-queue') as cq_p:
631    //                 for message in message_list:
632    //                     cq_p.publish(message)
633    
634    //         msg_list = ["bob", 1, {"bob": 1}, [1, 2, 3], None, "Nice!", "stop"]
635    //         t = Thread(target=publish_messages, args=(msg_list,))
636    //         t.start()
637    
638    //         with CommsQueue('test-comms-queue') as cq:
639    //             x = 0
640    //             for msg in cq.listen():
641    //                 if msg == "stop":
642    //                     break
643    
644    //                 assert msg == msg_list[x]
645    
646    //                 x += 1
647    
648    //         t.join()
649    //         assert not t.is_alive()
650    
651    
652    // # noinspection PyShadowingNames
653    // def test_user_quota_tracker(redis_connection):
654    //     if redis_connection:
655    //         from assemblyline.remote.datatypes.user_quota_tracker import UserQuotaTracker
656    
657    //         max_quota = 3
658    //         timeout = 2
659    //         name = get_random_id()
660    //         uqt = UserQuotaTracker('test-quota', timeout=timeout)
661    
662    //         # First 0 to max_quota items should succeed
663    //         for _ in range(max_quota):
664    //             assert uqt.begin(name, max_quota) is True
665    
666    //         # All other items should fail until items timeout
667    //         for _ in range(max_quota):
668    //             assert uqt.begin(name, max_quota) is False
669    
670    //         # if you remove and item only one should be able to go in
671    //         uqt.end(name)
672    //         assert uqt.begin(name, max_quota) is True
673    //         assert uqt.begin(name, max_quota) is False
674    
675    //         # if you wait the timeout, all items can go in
676    //         time.sleep(timeout+1)
677    //         for _ in range(max_quota):
678    //             assert uqt.begin(name, max_quota) is True
679    
680
681// def test_exact_event(redis_connection: Redis[Any]):
682//     calls: list[dict[str, Any]] = []
683
684//     def _track_call(data: Optional[dict[str, Any]]):
685//         if data is not None:
686//             calls.append(data)
687
688//     watcher = EventWatcher(redis_connection)
689//     try:
690//         watcher.register('changes.test', _track_call)
691//         watcher.start()
692//         sender = EventSender('changes.', redis_connection)
693//         start = time.time()
694
695//         while len(calls) < 5:
696//             sender.send('test', {'payload': 100})
697
698//             if time.time() - start > 10:
699//                 pytest.fail()
700//         assert len(calls) >= 5
701
702//         for row in calls:
703//             assert row == {'payload': 100}
704
705//     finally:
706//         watcher.stop()
707
708
709// def test_serialized_event(redis_connection: Redis[Any]):
710//     import threading
711//     started = threading.Event()
712
713//     class Event(enum.IntEnum):
714//         ADD = 0
715//         REM = 1
716
717//     @dataclass
718//     class Message:
719//         name: str
720//         event: Event
721
722//     def _serialize(message: Message):
723//         return json.dumps(asdict(message))
724
725//     def _deserialize(data: str) -> Message:
726//         return Message(**json.loads(data))
727
728//     calls: list[Message] = []
729
730//     def _track_call(data: Optional[Message]):
731//         if data is not None:
732//             calls.append(data)
733//         else:
734//             started.set()
735
736//     watcher = EventWatcher[Message](redis_connection, deserializer=_deserialize)
737//     try:
738//         watcher.register('changes.test', _track_call)
739//         watcher.skip_first_refresh = False
740//         watcher.start()
741//         assert started.wait(timeout=5)
742//         sender = EventSender[Message]('changes.', redis_connection, serializer=_serialize)
743//         start = time.time()
744
745//         while len(calls) < 5:
746//             sender.send('test', Message(name='test', event=Event.ADD))
747
748//             if time.time() - start > 10:
749//                 pytest.fail()
750//         assert len(calls) >= 5
751
752//         expected = Message(name='test', event=Event.ADD)
753//         for row in calls:
754//             assert row == expected
755
756//     finally:
757//         watcher.stop()
758
759
760// def test_pattern_event(redis_connection: Redis[Any]):
761//     calls: list[dict[str, Any]] = []
762
763//     def _track_call(data: Optional[dict[str, Any]]):
764//         if data is not None:
765//             calls.append(data)
766
767//     watcher = EventWatcher(redis_connection)
768//     try:
769//         watcher.register('changes.*', _track_call)
770//         watcher.start()
771//         sender = EventSender('changes.', redis_connection)
772//         start = time.time()
773
774//         while len(calls) < 5:
775//             sender.send(uuid.uuid4().hex, {'payload': 100})
776
777//             if time.time() - start > 10:
778//                 pytest.fail()
779//         assert len(calls) >= 5
780
781//         for row in calls:
782//             assert row == {'payload': 100}
783
784//     finally:
785//         watcher.stop()
786
787
788}