redis_objects/
lib.rs

1//! An object oriented wrapper around certain redis objects.
2
3#![warn(missing_docs, non_ascii_idents, trivial_numeric_casts,
4    unused_crate_dependencies, noop_method_call, single_use_lifetimes, trivial_casts,
5    unused_lifetimes, nonstandard_style, variant_size_differences)]
6#![deny(keyword_idents)]
7// #![warn(clippy::missing_docs_in_private_items)]
8// #![allow(clippy::needless_return)]
9// #![allow(clippy::while_let_on_iterator, clippy::collapsible_else_if)]
10
11use std::sync::Arc;
12use std::time::Duration;
13
14use log::debug;
15use queue::MultiQueue;
16use quota::UserQuotaTracker;
17use redis::AsyncCommands;
18pub use redis::Msg;
19use serde::Serialize;
20use serde::de::DeserializeOwned;
21use tokio::sync::mpsc;
22use tracing::instrument;
23
24pub use self::queue::PriorityQueue;
25pub use self::queue::Queue;
26// pub use self::quota::QuotaGuard;
27pub use self::hashmap::Hashmap;
28pub use self::counters::{AutoExportingMetrics, AutoExportingMetricsBuilder, MetricMessage};
29pub use self::pubsub::{JsonListenerBuilder, ListenerBuilder, Publisher};
30pub use self::set::Set;
31
32pub mod queue;
33pub mod quota;
34pub mod hashmap;
35pub mod counters;
36pub mod pubsub;
37pub mod set;
38
39/// Handle for a pool of connections to a redis server.
40pub struct RedisObjects {
41    pool: deadpool_redis::Pool,
42    client: redis::Client,
43    hostname: String,
44}
45
46impl std::fmt::Debug for RedisObjects {
47    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
48        f.debug_struct("Redis").field("host", &self.hostname).finish()
49    }
50}
51
52impl RedisObjects {
53    /// Open given more limited connection info
54    pub fn open_host(host: &str, port: u16, db: i64) -> Result<Arc<Self>, ErrorTypes> {
55        Self::open(redis::ConnectionInfo{
56            addr: redis::ConnectionAddr::Tcp(host.to_string(), port),
57            redis: redis::RedisConnectionInfo {
58                db,
59                ..Default::default()
60            },
61        })
62    }
63   
64    /// Open a connection using the local tls configuration
65    pub fn open_host_native_tls(host: &str, port: u16, db: i64) -> Result<Arc<Self>, ErrorTypes> {
66        Self::open(redis::ConnectionInfo{
67            addr: redis::ConnectionAddr::TcpTls { 
68                host: host.to_string(), 
69                port, 
70                insecure: false, 
71                tls_params: None,
72            },
73            redis: redis::RedisConnectionInfo {
74                db,
75                ..Default::default()
76            },
77        })
78    }
79
80    /// Open a connection pool
81    pub fn open(config: redis::ConnectionInfo) -> Result<Arc<Self>, ErrorTypes> {
82        debug!("Create redis connection pool.");
83        // configuration for the pool manager itself
84        let hostname = config.addr.to_string();
85        let mut pool_cfg = deadpool_redis::PoolConfig::new(1024);
86        pool_cfg.timeouts.wait = Some(Duration::from_secs(5));
87
88        // load redis configuration and create the pool
89        let mut cfg = deadpool_redis::Config::from_connection_info(config.clone());
90        cfg.pool = Some(pool_cfg);
91        let pool = cfg.create_pool(Some(deadpool_redis::Runtime::Tokio1))?;
92        let client = redis::Client::open(config)?;
93        Ok(Arc::new(Self{ 
94            pool,
95            client,
96            hostname
97        }))
98    }
99
100    /// Open a priority queue under the given key
101    pub fn priority_queue<T: Serialize + DeserializeOwned>(self: &Arc<Self>, name: String) -> PriorityQueue<T> {
102        PriorityQueue::new(name, self.clone())
103    }
104
105    /// Open a FIFO queue under the given key
106    pub fn queue<T: Serialize + DeserializeOwned>(self: &Arc<Self>, name: String, ttl: Option<Duration>) -> Queue<T> {
107        Queue::new(name, self.clone(), ttl)
108    }
109
110    /// an object that represents a set of queues with a common prefix
111    pub fn multiqueue<T: Serialize + DeserializeOwned>(self: &Arc<Self>, prefix: String) -> MultiQueue<T> {
112        MultiQueue::new(prefix, self.clone())
113    }
114
115    /// Open a hash map under the given key
116    pub fn hashmap<T: Serialize + DeserializeOwned>(self: &Arc<Self>, name: String, ttl: Option<Duration>) -> Hashmap<T> {
117        Hashmap::new(name, self.clone(), ttl)
118    }
119
120    /// Create a sink to publish messages to a named channel
121    pub fn publisher(self: &Arc<Self>, channel: String) -> Publisher {
122        Publisher::new(self.clone(), channel)
123    }
124
125    /// Write a message directly to the channel given
126    #[instrument]
127    pub async fn publish(&self, channel: &str, data: &[u8]) -> Result<u32, ErrorTypes> {
128        retry_call!(self.pool, publish, channel, data)
129    }
130
131    /// Write a json message directly to the channel given
132    #[instrument(skip(value))]
133    pub async fn publish_json<T: Serialize>(&self, channel: &str, value: &T) -> Result<u32, ErrorTypes> {
134        self.publish(channel, &serde_json::to_vec(value)?).await
135    }
136
137    /// Start building a metrics exporter
138    pub fn auto_exporting_metrics<T: MetricMessage>(self: &Arc<Self>, name: String, counter_type: String) -> AutoExportingMetricsBuilder<T> {
139        AutoExportingMetricsBuilder::new(self.clone(), name, counter_type)
140    }
141
142    /// Start building a json listener that produces a stream of events
143    pub fn pubsub_json_listener<T: DeserializeOwned + Send + 'static>(self: &Arc<Self>) -> JsonListenerBuilder<T> {
144        JsonListenerBuilder::new(self.clone())
145    }
146
147    /// Build a json listener that produces a stream of events using the default configuration
148    #[instrument]
149    pub async fn subscribe_json<T: DeserializeOwned + Send + 'static>(self: &Arc<Self>, channel: String) -> mpsc::Receiver<Option<T>> {
150        self.pubsub_json_listener()
151            .subscribe(channel)
152            .listen().await
153    }
154
155    /// Start building a raw data listener that produces a stream of events
156    pub fn pubsub_listener(self: &Arc<Self>) -> ListenerBuilder {
157        ListenerBuilder::new(self.clone())
158    }
159
160    /// Build a raw data listener that produces a stream of events using the default configuration
161    #[instrument]
162    pub async fn subscribe(self: &Arc<Self>, channel: String) -> mpsc::Receiver<Option<Msg>> {
163        self.pubsub_listener()
164            .subscribe(channel)
165            .listen().await
166    }
167
168    /// Open an interface for tracking user quotas on redis
169    pub fn user_quota_tracker(self: &Arc<Self>, prefix: String) -> UserQuotaTracker {
170        UserQuotaTracker::new(self.clone(), prefix)
171    }
172
173    /// Open a set of values
174    pub fn set<T: Serialize + DeserializeOwned>(self: &Arc<Self>, name: String) -> Set<T> {
175        Set::new(name, self.clone(), None)
176    }
177
178    /// Open a set of values with an expiration policy
179    pub fn expiring_set<T: Serialize + DeserializeOwned>(self: &Arc<Self>, name: String, ttl: Option<Duration>) -> Set<T> {
180        let ttl = ttl.unwrap_or_else(|| Duration::from_secs(86400));
181        Set::new(name, self.clone(), Some(ttl))
182    }
183
184    /// Erase all data on the redis server
185    pub async fn wipe(&self) -> Result<(), ErrorTypes> {
186        let mut con = self.pool.get().await?;
187        let _: () = redis::cmd("FLUSHDB").arg("SYNC").query_async(&mut con).await?;
188        Ok(())
189    }
190
191    /// List all keys on the redis server that satisfies the given pattern
192    #[instrument]
193    pub async fn keys(&self, pattern: &str) -> Result<Vec<String>, ErrorTypes> {
194        Ok(retry_call!(self.pool, keys, pattern)?)
195    }
196
197}
198
199/// Enumeration over all possible errors
200#[derive(Debug)]
201pub enum ErrorTypes {
202    /// There is something wrong with the redis configuration
203    Configuration(Box<deadpool_redis::CreatePoolError>),
204    /// Could not get a connection from the redis connection pool
205    Pool(Box<deadpool_redis::PoolError>),
206    /// Returned by the redis server
207    Redis(Box<redis::RedisError>),
208    /// Unexpected result from the redis server
209    UnknownRedisError,
210    /// Could not serialize or deserialize a payload
211    Serde(serde_json::Error),
212}
213
214impl ErrorTypes {
215    /// Test if an error was created in serializing or deserializing data
216    pub fn is_serialize_error(&self) -> bool {
217        matches!(self, ErrorTypes::Serde(_))
218    }
219}
220
221
222impl std::fmt::Display for ErrorTypes {
223    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
224        match self {
225            ErrorTypes::Configuration(err) => write!(f, "Redis configuration: {err}"),
226            ErrorTypes::Pool(err) => write!(f, "Redis connection pool: {err}"),
227            ErrorTypes::Redis(err) => write!(f, "Redis runtime error: {err}"),
228            ErrorTypes::UnknownRedisError => write!(f, "Unexpected response from redis server"),
229            ErrorTypes::Serde(err) => write!(f, "Encoding issue with message: {err}"),
230        }
231    }
232}
233
234impl From<deadpool_redis::CreatePoolError> for ErrorTypes {
235    fn from(value: deadpool_redis::CreatePoolError) -> Self { Self::Configuration(Box::new(value)) }
236}
237
238impl From<deadpool_redis::PoolError> for ErrorTypes {
239    fn from(value: deadpool_redis::PoolError) -> Self { Self::Pool(Box::new(value)) }
240}
241
242impl From<redis::RedisError> for ErrorTypes {
243    fn from(value: redis::RedisError) -> Self { Self::Redis(Box::new(value)) }
244}
245
246impl From<serde_json::Error> for ErrorTypes {
247    fn from(value: serde_json::Error) -> Self { Self::Serde(value) }
248}
249
250impl std::error::Error for ErrorTypes {}
251
252/// A convenience trait that lets you pass an i32 value or None for arguments
253pub trait Ii32: Into<Option<i32>> + Copy {}
254impl<T: Into<Option<i32>> + Copy> Ii32 for T {}
255
256/// A convenience trait that lets you pass an usize value or None for arguments
257pub trait Iusize: Into<Option<usize>> + Copy {}
258impl<T: Into<Option<usize>> + Copy> Iusize for T {}
259
260
261/// A macro for retrying calls to redis when an IO error occurs
262macro_rules! retry_call {
263
264    (handle_error, $err:ident, $exponent:ident, $maximum:ident) => {
265        {
266            // If the error from redis is something not related to IO let the error propagate
267            if !$err.is_io_error() {
268                break Result::<_, ErrorTypes>::Err($err.into())
269            }
270
271            // For IO errors print a warning and sleep
272            log::warn!("No connection to Redis, reconnecting... [{}]", $err);
273            tokio::time::sleep(tokio::time::Duration::from_secs_f64(2f64.powf($exponent))).await;
274            $exponent = ($exponent + 1.0).min($maximum);
275        }
276    };
277
278    (handle_output, $call:expr, $exponent:ident, $maximum:ident) => {
279        {
280            match $call {
281                Ok(val) => {
282                    if $exponent > -7.0 {
283                        log::info!("Reconnected to Redis!")
284                    }
285                    break Ok(val)
286                },
287                Err(err) => retry_call!(handle_error, err, $exponent, $maximum),
288            }
289        }
290    };
291
292    ($pool:expr, $method:ident, $($args:expr),+) => {
293        {
294            // track our backoff parameters
295            let mut exponent = -7.0;
296            let maximum = 3.0;
297            loop {
298                // get a (fresh if needed) connection form the pool
299                let mut con = match $pool.get().await {
300                    Ok(connection) => connection,
301                    Err(deadpool_redis::PoolError::Backend(err)) => {
302                        retry_call!(handle_error, err, exponent, maximum);
303                        continue
304                    },
305                    Err(err) => break Err(err.into())
306                };
307
308                // execute the method given with the argments specified
309                retry_call!(handle_output, con.$method($($args),+).await, exponent, maximum)
310            }
311        }
312    };
313
314    (method, $pool:expr, $obj:expr, $method:ident) => {
315        {
316            // track our backoff parameters
317            let mut exponent = -7.0;
318            let maximum = 3.0;
319            loop {
320                // get a (fresh if needed) connection form the pool
321                let mut con = match $pool.get().await {
322                    Ok(connection) => connection,
323                    Err(deadpool_redis::PoolError::Backend(err)) => {
324                        retry_call!(handle_error, err, exponent, maximum);
325                        continue
326                    },
327                    Err(err) => break Err(err.into())
328                };
329                
330                // execute the method given with the argments specified
331                retry_call!(handle_output, $obj.$method(&mut con).await, exponent, maximum)
332            }
333        }
334    };
335}
336
337pub (crate) use retry_call;
338
339#[cfg(test)]
340pub (crate) mod test {
341    use std::sync::Arc;
342
343    use redis::ConnectionInfo;
344
345    use crate::{ErrorTypes, PriorityQueue, Queue, RedisObjects};
346
347
348    pub (crate) async fn redis_connection() -> Arc<RedisObjects> {
349        RedisObjects::open(ConnectionInfo{
350            addr: redis::ConnectionAddr::Tcp("localhost".to_string(), 6379),
351            redis: Default::default(),
352        }).unwrap()
353    }
354
355    fn init() {
356        let _ = env_logger::builder().is_test(true).try_init();
357    }
358
359    // simple function to help test that reconnect is working. 
360    // Enable and run this then turn your redis server on and off.
361    // #[tokio::test]
362    // async fn reconnect() {
363    //     init();
364    //     let connection = redis_connection().await;
365    //     let mut listener = connection.subscribe("abc123".to_string());
366    //     // launch a thread that sends messages every second forever
367    //     tokio::spawn(async move {
368    //         loop {
369    //             tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
370    //             connection.publish("abc123", b"100").await.unwrap();
371    //         }
372    //     });
373        
374    //     while let Some(msg) = listener.recv().await {
375    //         println!("{msg:?}");
376    //     }
377    // }
378    
379    #[tokio::test]
380    async fn test_sets() {
381        init();
382        let redis = redis_connection().await;
383        let s = redis.set::<String>("test-set".to_owned());
384
385        s.delete().await.unwrap();
386
387        let values = &["a", "b", "1", "2"];
388        let owned = values.iter().map(|v|v.to_string()).collect::<Vec<_>>();
389        assert_eq!(s.add_batch(&owned).await.unwrap(), 4);
390        assert_eq!(s.length().await.unwrap(), 4);
391        let members = s.members().await.unwrap();
392        assert_eq!(members.len(), 4);
393        for x in members {
394            assert!(owned.contains(&x));
395        }
396        assert!(owned.contains(&s.random().await.unwrap().unwrap()));
397        assert!(s.exist(&owned[2]).await.unwrap());
398        s.remove(&owned[2]).await.unwrap();
399        assert!(!s.exist(&owned[2]).await.unwrap());
400        let pop_val = s.pop().await.unwrap().unwrap();
401        assert!(owned.contains(&pop_val));
402        assert!(!s.exist(&pop_val).await.unwrap());
403        assert_eq!(s.length().await.unwrap(), 2);
404
405        assert!(s.limited_add(&"dog".to_owned(), 3).await.unwrap());
406        assert!(!s.limited_add(&"cat".to_owned(), 3).await.unwrap());
407        assert!(s.exist(&"dog".to_owned()).await.unwrap());
408        assert!(!s.exist(&"cat".to_owned()).await.unwrap());
409        assert_eq!(s.length().await.unwrap(), 3);
410
411        for pop_val in s.pop_all().await.unwrap() {
412            assert!(values.contains(&pop_val.as_str()) || ["cat", "dog"].contains(&pop_val.as_str()));
413        }
414        assert!(s.pop().await.unwrap().is_none());
415        assert_eq!(s.length().await.unwrap(), 0);
416    }
417    
418    
419    // def test_expiring_sets(redis_connection):
420    //     if redis_connection:
421    //         from assemblyline.remote.datatypes.set import ExpiringSet
422    //         with ExpiringSet('test-expiring-set', ttl=1) as es:
423    //             es.delete()
424    
425    //             values = ['a', 'b', 1, 2]
426    //             assert es.add(*values) == 4
427    //             assert es.length() == 4
428    //             assert es.exist(values[2])
429    //             for x in es.members():
430    //                 assert x in values
431    //             time.sleep(1.1)
432    //             assert es.length() == 0
433    //             assert not es.exist(values[2])
434    
435    
436    // # noinspection PyShadowingNames
437    // def test_lock(redis_connection):
438    //     if redis_connection:
439    //         from assemblyline.remote.datatypes.lock import Lock
440    
441    //         def locked_execution(next_thread=None):
442    //             with Lock('test', 10):
443    //                 if next_thread:
444    //                     next_thread.start()
445    //                 time.sleep(2)
446    
447    //         t2 = Thread(target=locked_execution)
448    //         t1 = Thread(target=locked_execution, args=(t2,))
449    //         t1.start()
450    
451    //         time.sleep(1)
452    //         assert t1.is_alive()
453    //         assert t2.is_alive()
454    //         time.sleep(2)
455    //         assert not t1.is_alive()
456    //         assert t2.is_alive()
457    //         time.sleep(2)
458    //         assert not t1.is_alive()
459    //         assert not t2.is_alive()
460    
461    #[tokio::test]
462    async fn priority_queue() -> Result<(), ErrorTypes> {
463        let redis = redis_connection().await;
464        let pq = redis.priority_queue("test-priority-queue".to_string());
465        pq.delete().await?;
466
467        for x in 0..10 {
468            pq.push(100.0, &x.to_string()).await?;
469        }
470
471        let a_key = pq.push(101.0, &"a".to_string()).await?;
472        let z_key = pq.push(99.0, &"z".to_string()).await?;
473        assert_eq!(pq.rank(&a_key).await?.unwrap(), 0);
474        assert_eq!(pq.rank(&z_key).await?.unwrap(), pq.length().await? - 1);
475        assert!(pq.rank(b"onethuosentuh").await?.is_none());
476
477        assert_eq!(pq.pop(1).await?, ["a"]);
478        assert_eq!(pq.unpush(1).await?, ["z"]);
479        assert_eq!(pq.count(100.0, 100.0).await?, 10);
480        assert_eq!(pq.pop(1).await?, ["0"]);
481        assert_eq!(pq.unpush(1).await?, ["9"]);
482        assert_eq!(pq.length().await?, 8);
483        assert_eq!(pq.pop(4).await?, ["1", "2", "3", "4"]);
484        assert_eq!(pq.unpush(3).await?, ["8", "7", "6"]);
485        assert_eq!(pq.length().await?, 1);
486        // Should be [(100, 5)] at this point
487
488        // for x in 0..5 {
489        for x in 0..5 {
490            pq.push(100.0 + x as f64, &x.to_string()).await?;
491        }
492
493        assert_eq!(pq.length().await?, 6);
494        assert!(pq.dequeue_range(Some(106), None, None, None).await?.is_empty());
495        assert_eq!(pq.length().await?, 6);
496        // 3 and 4 are both options, 4 has higher score
497        assert_eq!(pq.dequeue_range(Some(103), None, None, None).await?, vec!["4"]);
498        // 2 and 3 are both options, 3 has higher score, skip it
499        assert_eq!(pq.dequeue_range(Some(102), None, Some(1), None).await?, vec!["2"]);
500        // Take some off the other end
501        assert_eq!(pq.dequeue_range(None, Some(100), None, Some(10)).await?, vec!["5", "0"]);
502        assert_eq!(pq.length().await?, 2);
503
504        let other = redis.priority_queue("second-priority-queue".to_string());
505        other.delete().await?;
506        other.push(100.0, &"a".to_string()).await?;
507        assert_eq!(PriorityQueue::all_length(&[&other, &pq]).await?, [1, 2]);
508        assert!(PriorityQueue::select(&[&other, &pq], None).await?.is_some());
509        assert!(PriorityQueue::select(&[&other, &pq], None).await?.is_some());
510        assert!(PriorityQueue::select(&[&other, &pq], None).await?.is_some());
511        assert_eq!(PriorityQueue::all_length(&[&other, &pq]).await?, [0, 0]);
512
513        pq.push(50.0, &"first".to_string()).await?;
514        pq.push(-50.0, &"second".to_string()).await?;
515
516        assert_eq!(pq.dequeue_range(Some(0), Some(100), None, None).await?, ["first"]);
517        assert_eq!(pq.dequeue_range(Some(-100), Some(0), None, None).await?, ["second"]);
518        Ok(())
519    }
520    
521    
522    // # noinspection PyShadowingNames,PyUnusedLocal
523    // def test_unique_priority_queue(redis_connection):
524    //     from assemblyline.remote.datatypes.queues.priority import UniquePriorityQueue
525    //     with UniquePriorityQueue('test-priority-queue') as pq:
526    //         pq.delete()
527    
528    //         for x in range(10):
529    //             pq.push(100, x)
530    //         assert pq.length() == 10
531    
532    //         # Values should be unique, this should have no effect on the length
533    //         for x in range(10):
534    //             pq.push(100, x)
535    //         assert pq.length() == 10
536    
537    //         pq.push(101, 'a')
538    //         pq.push(99, 'z')
539    
540    //         assert pq.pop() == 'a'
541    //         assert pq.unpush() == 'z'
542    //         assert pq.count(100, 100) == 10
543    //         assert pq.pop() == 0
544    //         assert pq.unpush() == 9
545    //         assert pq.length() == 8
546    //         assert pq.pop(4) == [1, 2, 3, 4]
547    //         assert pq.unpush(3) == [8, 7, 6]
548    //         assert pq.length() == 1  # Should be [<100, 5>] at this point
549    
550    //         for x in range(5):
551    //             pq.push(100 + x, x)
552    
553    //         assert pq.length() == 6
554    //         assert pq.dequeue_range(lower_limit=106) == []
555    //         assert pq.length() == 6
556    //         assert pq.dequeue_range(lower_limit=103) == [4]  # 3 and 4 are both options, 4 has higher score
557    //         assert pq.dequeue_range(lower_limit=102, skip=1) == [2]  # 2 and 3 are both options, 3 has higher score, skip it
558    //         assert sorted(pq.dequeue_range(upper_limit=100, num=10)) == [0, 5]  # Take some off the other end
559    //         assert pq.length() == 2
560    //         pq.pop(2)
561    
562    //         pq.push(50, 'first')
563    //         pq.push(-50, 'second')
564    
565    //         assert pq.dequeue_range(0, 100) == ['first']
566    //         assert pq.dequeue_range(-100, 0) == ['second']   
567    
568    #[tokio::test]
569    async fn named_queue() {
570        let redis = redis_connection().await;
571
572        let nq = redis.queue("test-named-queue".to_owned(), None);
573        nq.delete().await.unwrap();
574
575        assert!(nq.pop().await.unwrap().is_none());
576        assert!(nq.pop_batch(100).await.unwrap().is_empty());
577
578        for x in 0..5 {
579            nq.push(&x).await.unwrap();
580        }
581
582        assert_eq!(nq.content().await.unwrap(), [0, 1, 2, 3, 4]);
583
584        assert_eq!(nq.pop_batch(100).await.unwrap(), [0, 1, 2, 3, 4]);
585
586        for x in 0..5 {
587            nq.push(&x).await.unwrap();
588        }
589
590        assert_eq!(nq.length().await.unwrap(), 5);
591        nq.push_batch(&(0..5).collect::<Vec<i32>>()).await.unwrap();
592        assert_eq!(nq.length().await.unwrap(), 10);
593
594        assert_eq!(nq.peek_next().await.unwrap(), nq.pop().await.unwrap());
595        assert_eq!(nq.peek_next().await.unwrap(), Some(1));
596        let v = nq.pop().await.unwrap().unwrap();
597        assert_eq!(v, 1);
598        assert_eq!(nq.peek_next().await.unwrap().unwrap(), 2);
599        nq.unpop(&v).await.unwrap();
600        assert_eq!(nq.peek_next().await.unwrap().unwrap(), 1);
601
602        assert_eq!(Queue::select(&[&nq], None).await.unwrap().unwrap(), ("test-named-queue".to_owned(), 1));
603
604        let nq1 = redis.queue("test-named-queue-1".to_owned(), None);
605        nq1.delete().await.unwrap();
606        let nq2 = redis.queue("test-named-queue-2".to_owned(), None);
607        nq2.delete().await.unwrap();
608
609        nq1.push(&1).await.unwrap();
610        nq2.push(&2).await.unwrap();
611
612        assert_eq!(Queue::select(&[&nq1, &nq2], None).await.unwrap().unwrap(), ("test-named-queue-1".to_owned(), 1));
613        assert_eq!(Queue::select(&[&nq1, &nq2], None).await.unwrap().unwrap(), ("test-named-queue-2".to_owned(), 2));
614    }
615
616    // # noinspection PyShadowingNames
617    // def test_multi_queue(redis_connection):
618    //     if redis_connection:
619    //         from assemblyline.remote.datatypes.queues.multi import MultiQueue
620    //         mq = MultiQueue()
621    //         mq.delete('test-multi-q1')
622    //         mq.delete('test-multi-q2')
623    
624    //         for x in range(5):
625    //             mq.push('test-multi-q1', x+1)
626    //             mq.push('test-multi-q2', x+6)
627    
628    //         assert mq.length('test-multi-q1') == 5
629    //         assert mq.length('test-multi-q2') == 5
630    
631    //         assert mq.pop('test-multi-q1') == 1
632    //         assert mq.pop('test-multi-q2') == 6
633    
634    //         assert mq.length('test-multi-q1') == 4
635    //         assert mq.length('test-multi-q2') == 4
636    
637    //         mq.delete('test-multi-q1')
638    //         mq.delete('test-multi-q2')
639    
640    //         assert mq.length('test-multi-q1') == 0
641    //         assert mq.length('test-multi-q2') == 0
642    
643    
644    // # noinspection PyShadowingNames
645    // def test_comms_queue(redis_connection):
646    //     if redis_connection:
647    //         from assemblyline.remote.datatypes.queues.comms import CommsQueue
648    
649    //         def publish_messages(message_list):
650    //             time.sleep(0.1)
651    //             with CommsQueue('test-comms-queue') as cq_p:
652    //                 for message in message_list:
653    //                     cq_p.publish(message)
654    
655    //         msg_list = ["bob", 1, {"bob": 1}, [1, 2, 3], None, "Nice!", "stop"]
656    //         t = Thread(target=publish_messages, args=(msg_list,))
657    //         t.start()
658    
659    //         with CommsQueue('test-comms-queue') as cq:
660    //             x = 0
661    //             for msg in cq.listen():
662    //                 if msg == "stop":
663    //                     break
664    
665    //                 assert msg == msg_list[x]
666    
667    //                 x += 1
668    
669    //         t.join()
670    //         assert not t.is_alive()
671    
672    
673    // # noinspection PyShadowingNames
674    // def test_user_quota_tracker(redis_connection):
675    //     if redis_connection:
676    //         from assemblyline.remote.datatypes.user_quota_tracker import UserQuotaTracker
677    
678    //         max_quota = 3
679    //         timeout = 2
680    //         name = get_random_id()
681    //         uqt = UserQuotaTracker('test-quota', timeout=timeout)
682    
683    //         # First 0 to max_quota items should succeed
684    //         for _ in range(max_quota):
685    //             assert uqt.begin(name, max_quota) is True
686    
687    //         # All other items should fail until items timeout
688    //         for _ in range(max_quota):
689    //             assert uqt.begin(name, max_quota) is False
690    
691    //         # if you remove and item only one should be able to go in
692    //         uqt.end(name)
693    //         assert uqt.begin(name, max_quota) is True
694    //         assert uqt.begin(name, max_quota) is False
695    
696    //         # if you wait the timeout, all items can go in
697    //         time.sleep(timeout+1)
698    //         for _ in range(max_quota):
699    //             assert uqt.begin(name, max_quota) is True
700    
701
702// def test_exact_event(redis_connection: Redis[Any]):
703//     calls: list[dict[str, Any]] = []
704
705//     def _track_call(data: Optional[dict[str, Any]]):
706//         if data is not None:
707//             calls.append(data)
708
709//     watcher = EventWatcher(redis_connection)
710//     try:
711//         watcher.register('changes.test', _track_call)
712//         watcher.start()
713//         sender = EventSender('changes.', redis_connection)
714//         start = time.time()
715
716//         while len(calls) < 5:
717//             sender.send('test', {'payload': 100})
718
719//             if time.time() - start > 10:
720//                 pytest.fail()
721//         assert len(calls) >= 5
722
723//         for row in calls:
724//             assert row == {'payload': 100}
725
726//     finally:
727//         watcher.stop()
728
729
730// def test_serialized_event(redis_connection: Redis[Any]):
731//     import threading
732//     started = threading.Event()
733
734//     class Event(enum.IntEnum):
735//         ADD = 0
736//         REM = 1
737
738//     @dataclass
739//     class Message:
740//         name: str
741//         event: Event
742
743//     def _serialize(message: Message):
744//         return json.dumps(asdict(message))
745
746//     def _deserialize(data: str) -> Message:
747//         return Message(**json.loads(data))
748
749//     calls: list[Message] = []
750
751//     def _track_call(data: Optional[Message]):
752//         if data is not None:
753//             calls.append(data)
754//         else:
755//             started.set()
756
757//     watcher = EventWatcher[Message](redis_connection, deserializer=_deserialize)
758//     try:
759//         watcher.register('changes.test', _track_call)
760//         watcher.skip_first_refresh = False
761//         watcher.start()
762//         assert started.wait(timeout=5)
763//         sender = EventSender[Message]('changes.', redis_connection, serializer=_serialize)
764//         start = time.time()
765
766//         while len(calls) < 5:
767//             sender.send('test', Message(name='test', event=Event.ADD))
768
769//             if time.time() - start > 10:
770//                 pytest.fail()
771//         assert len(calls) >= 5
772
773//         expected = Message(name='test', event=Event.ADD)
774//         for row in calls:
775//             assert row == expected
776
777//     finally:
778//         watcher.stop()
779
780
781// def test_pattern_event(redis_connection: Redis[Any]):
782//     calls: list[dict[str, Any]] = []
783
784//     def _track_call(data: Optional[dict[str, Any]]):
785//         if data is not None:
786//             calls.append(data)
787
788//     watcher = EventWatcher(redis_connection)
789//     try:
790//         watcher.register('changes.*', _track_call)
791//         watcher.start()
792//         sender = EventSender('changes.', redis_connection)
793//         start = time.time()
794
795//         while len(calls) < 5:
796//             sender.send(uuid.uuid4().hex, {'payload': 100})
797
798//             if time.time() - start > 10:
799//                 pytest.fail()
800//         assert len(calls) >= 5
801
802//         for row in calls:
803//             assert row == {'payload': 100}
804
805//     finally:
806//         watcher.stop()
807
808
809}