Skip to main content

redis_objects/
lib.rs

1//! An object oriented wrapper around certain redis objects.
2
3#![warn(missing_docs, non_ascii_idents, trivial_numeric_casts,
4    unused_crate_dependencies, noop_method_call, single_use_lifetimes, trivial_casts,
5    unused_lifetimes, nonstandard_style, variant_size_differences)]
6#![deny(keyword_idents)]
7// #![warn(clippy::missing_docs_in_private_items)]
8// #![allow(clippy::needless_return)]
9// #![allow(clippy::while_let_on_iterator, clippy::collapsible_else_if)]
10
11use std::sync::Arc;
12use std::time::Duration;
13
14use log::debug;
15use queue::MultiQueue;
16use quota::UserQuotaTracker;
17use redis::AsyncCommands;
18pub use redis::Msg;
19use serde::Serialize;
20use serde::de::DeserializeOwned;
21use tokio::sync::mpsc;
22use tracing::instrument;
23
24pub use self::queue::PriorityQueue;
25pub use self::queue::Queue;
26// pub use self::quota::QuotaGuard;
27pub use self::hashmap::Hashmap;
28pub use self::counters::{AutoExportingMetrics, AutoExportingMetricsBuilder, MetricMessage};
29pub use self::pubsub::{JsonListenerBuilder, ListenerBuilder, Publisher};
30pub use self::set::Set;
31
32pub mod queue;
33pub mod quota;
34pub mod hashmap;
35pub mod counters;
36pub mod pubsub;
37pub mod set;
38
39/// Handle for a pool of connections to a redis server.
40pub struct RedisObjects {
41    pool: deadpool_redis::Pool,
42    client: redis::Client,
43    hostname: String,
44    pubsub_prefix: String,
45}
46
47impl std::fmt::Debug for RedisObjects {
48    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
49        f.debug_struct("Redis").field("host", &self.hostname).finish()
50    }
51}
52
53impl RedisObjects {
54    /// Open given more limited connection info
55    pub fn open_host(host: &str, port: u16, db: i64) -> Result<Arc<Self>, ErrorTypes> {
56        Self::open(redis::ConnectionInfo{
57            addr: redis::ConnectionAddr::Tcp(host.to_string(), port),
58            redis: redis::RedisConnectionInfo {
59                db,
60                ..Default::default()
61            },
62        })
63    }
64
65    /// Open a connection using the local tls configuration
66    pub fn open_host_native_tls(host: &str, port: u16, db: i64) -> Result<Arc<Self>, ErrorTypes> {
67        Self::open(redis::ConnectionInfo{
68            addr: redis::ConnectionAddr::TcpTls {
69                host: host.to_string(),
70                port,
71                insecure: false,
72                tls_params: None,
73            },
74            redis: redis::RedisConnectionInfo {
75                db,
76                ..Default::default()
77            },
78        })
79    }
80
81    /// Construct a builder to construct a redis connection
82    pub fn builder() -> Builder {
83        Builder::default()
84    }
85
86    /// Open a connection pool
87    pub fn open(config: redis::ConnectionInfo) -> Result<Arc<Self>, ErrorTypes> {
88        Self::_open(config, Default::default())
89    }
90
91    /// Open a connection pool
92    fn _open(config: redis::ConnectionInfo, pubsub_prefix: String) -> Result<Arc<Self>, ErrorTypes> {
93        debug!("Create redis connection pool.");
94        // configuration for the pool manager itself
95        let hostname = config.addr.to_string();
96        let mut pool_cfg = deadpool_redis::PoolConfig::new(1024);
97        pool_cfg.timeouts.wait = Some(Duration::from_secs(5));
98
99        // load redis configuration and create the pool
100        let mut cfg = deadpool_redis::Config::from_connection_info(config.clone());
101        cfg.pool = Some(pool_cfg);
102        let pool = cfg.create_pool(Some(deadpool_redis::Runtime::Tokio1))?;
103        let client = redis::Client::open(config)?;
104        Ok(Arc::new(Self{
105            pool,
106            client,
107            hostname,
108            pubsub_prefix,
109        }))
110    }
111
112    /// Open a priority queue under the given key
113    pub fn priority_queue<T: Serialize + DeserializeOwned>(self: &Arc<Self>, name: String) -> PriorityQueue<T> {
114        PriorityQueue::new(name, self.clone())
115    }
116
117    /// Open a FIFO queue under the given key
118    pub fn queue<T: Serialize + DeserializeOwned>(self: &Arc<Self>, name: String, ttl: Option<Duration>) -> Queue<T> {
119        Queue::new(name, self.clone(), ttl)
120    }
121
122    /// an object that represents a set of queues with a common prefix
123    pub fn multiqueue<T: Serialize + DeserializeOwned>(self: &Arc<Self>, prefix: String) -> MultiQueue<T> {
124        MultiQueue::new(prefix, self.clone())
125    }
126
127    /// Open a hash map under the given key
128    pub fn hashmap<T: Serialize + DeserializeOwned>(self: &Arc<Self>, name: String, ttl: Option<Duration>) -> Hashmap<T> {
129        Hashmap::new(name, self.clone(), ttl)
130    }
131
132    /// Create a sink to publish messages to a named channel
133    pub fn publisher(self: &Arc<Self>, channel: String) -> Publisher {
134        Publisher::new(self.clone(), channel)
135    }
136
137    /// Write a message directly to the channel given
138    #[instrument]
139    pub async fn publish(&self, channel: &str, data: &[u8]) -> Result<u32, ErrorTypes> {
140        retry_call!(self.pool, publish, self.pubsub_prefix.clone() + channel, data)
141    }
142
143    /// Write a json message directly to the channel given
144    #[instrument(skip(value))]
145    pub async fn publish_json<T: Serialize>(&self, channel: &str, value: &T) -> Result<u32, ErrorTypes> {
146        self.publish(channel, &serde_json::to_vec(value)?).await
147    }
148
149    /// Start building a metrics exporter
150    pub fn auto_exporting_metrics<T: MetricMessage>(self: &Arc<Self>, name: String, counter_type: String) -> AutoExportingMetricsBuilder<T> {
151        AutoExportingMetricsBuilder::new(self.clone(), name, counter_type)
152    }
153
154    /// Start building a json listener that produces a stream of events
155    pub fn pubsub_json_listener<T: DeserializeOwned + Send + 'static>(self: &Arc<Self>) -> JsonListenerBuilder<T> {
156        JsonListenerBuilder::new(self.clone())
157    }
158
159    /// Build a json listener that produces a stream of events using the default configuration
160    #[instrument]
161    pub async fn subscribe_json<T: DeserializeOwned + Send + 'static>(self: &Arc<Self>, channel: String) -> mpsc::Receiver<Option<T>> {
162        self.pubsub_json_listener()
163            .subscribe(channel)
164            .listen().await
165    }
166
167    /// Start building a raw data listener that produces a stream of events
168    pub fn pubsub_listener(self: &Arc<Self>) -> ListenerBuilder {
169        ListenerBuilder::new(self.clone())
170    }
171
172    /// Build a raw data listener that produces a stream of events using the default configuration
173    #[instrument]
174    pub async fn subscribe(self: &Arc<Self>, channel: String) -> mpsc::Receiver<Option<Msg>> {
175        self.pubsub_listener()
176            .subscribe(channel)
177            .listen().await
178    }
179
180    /// Open an interface for tracking user quotas on redis
181    pub fn user_quota_tracker(self: &Arc<Self>, prefix: String) -> UserQuotaTracker {
182        UserQuotaTracker::new(self.clone(), prefix)
183    }
184
185    /// Open a set of values
186    pub fn set<T: Serialize + DeserializeOwned>(self: &Arc<Self>, name: String) -> Set<T> {
187        Set::new(name, self.clone(), None)
188    }
189
190    /// Open a set of values with an expiration policy
191    pub fn expiring_set<T: Serialize + DeserializeOwned>(self: &Arc<Self>, name: String, ttl: Option<Duration>) -> Set<T> {
192        let ttl = ttl.unwrap_or_else(|| Duration::from_secs(86400));
193        Set::new(name, self.clone(), Some(ttl))
194    }
195
196    /// Erase all data on the redis server
197    pub async fn wipe(&self) -> Result<(), ErrorTypes> {
198        let mut con = self.pool.get().await?;
199        let _: () = redis::cmd("FLUSHDB").arg("SYNC").query_async(&mut con).await?;
200        Ok(())
201    }
202
203    /// List all keys on the redis server that satisfies the given pattern
204    #[instrument]
205    pub async fn keys(&self, pattern: &str) -> Result<Vec<String>, ErrorTypes> {
206        Ok(retry_call!(self.pool, keys, pattern)?)
207    }
208
209}
210
211/// Utility class to simplify setting up a redis object
212pub struct Builder {
213    host: String,
214    port: u16,
215    db: i64,
216    native_tls: bool,
217    pubsub_prefix: String
218}
219
220impl Default for Builder {
221    fn default() -> Self {
222        Self {
223            host: "localhost".to_string(),
224            port: 6379,
225            db: 0,
226            native_tls: true,
227            pubsub_prefix: Default::default()
228        }
229    }
230}
231
232impl Builder {
233    /// Set the host for the connection being built
234    pub fn host(mut self, host: String) -> Self {
235        self.host = host; self
236    }
237
238    /// Set the port for the connection being built
239    pub fn port(mut self, port: u16) -> Self {
240        self.port = port; self
241    }
242
243    /// Set the database index for the connection being built
244    pub fn db(mut self, db: i64) -> Self {
245        self.db = db; self
246    }
247
248    /// Set whether the connection being built should be configured
249    pub fn native_tls(mut self, native_tls: bool) -> Self {
250        self.native_tls = native_tls; self
251    }
252
253    /// Set a prefix that should be applied to all pubsub listening and publishing.
254    /// This is intended for namespacing pubsub operations for testing.
255    pub fn pubsub_prefix(mut self, pubsub_prefix: String) -> Self {
256        self.pubsub_prefix = pubsub_prefix; self
257    }
258
259    /// finalize the building process and create the redis object
260    pub fn build(self) -> Result<Arc<RedisObjects>, ErrorTypes> {
261        let config = if self.native_tls {
262            redis::ConnectionInfo{
263                addr: redis::ConnectionAddr::TcpTls {
264                    host: self.host,
265                    port: self.port,
266                    insecure: false,
267                    tls_params: None,
268                },
269                redis: redis::RedisConnectionInfo {
270                    db: self.db,
271                    ..Default::default()
272                },
273            }
274        } else {
275            redis::ConnectionInfo{
276                addr: redis::ConnectionAddr::Tcp(self.host, self.port),
277                redis: redis::RedisConnectionInfo {
278                    db: self.db,
279                    ..Default::default()
280                },
281            }
282        };
283
284        RedisObjects::_open(config, self.pubsub_prefix)
285    }
286}
287
288
289/// Enumeration over all possible errors
290#[derive(Debug)]
291pub enum ErrorTypes {
292    /// There is something wrong with the redis configuration
293    Configuration(Box<deadpool_redis::CreatePoolError>),
294    /// Could not get a connection from the redis connection pool
295    Pool(Box<deadpool_redis::PoolError>),
296    /// Returned by the redis server
297    Redis(Box<redis::RedisError>),
298    /// Unexpected result from the redis server
299    UnknownRedisError,
300    /// Could not serialize or deserialize a payload
301    Serde(serde_json::Error),
302}
303
304impl ErrorTypes {
305    /// Test if an error was created in serializing or deserializing data
306    pub fn is_serialize_error(&self) -> bool {
307        matches!(self, ErrorTypes::Serde(_))
308    }
309}
310
311
312impl std::fmt::Display for ErrorTypes {
313    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
314        match self {
315            ErrorTypes::Configuration(err) => write!(f, "Redis configuration: {err}"),
316            ErrorTypes::Pool(err) => write!(f, "Redis connection pool: {err}"),
317            ErrorTypes::Redis(err) => write!(f, "Redis runtime error: {err}"),
318            ErrorTypes::UnknownRedisError => write!(f, "Unexpected response from redis server"),
319            ErrorTypes::Serde(err) => write!(f, "Encoding issue with message: {err}"),
320        }
321    }
322}
323
324impl From<deadpool_redis::CreatePoolError> for ErrorTypes {
325    fn from(value: deadpool_redis::CreatePoolError) -> Self { Self::Configuration(Box::new(value)) }
326}
327
328impl From<deadpool_redis::PoolError> for ErrorTypes {
329    fn from(value: deadpool_redis::PoolError) -> Self { Self::Pool(Box::new(value)) }
330}
331
332impl From<redis::RedisError> for ErrorTypes {
333    fn from(value: redis::RedisError) -> Self { Self::Redis(Box::new(value)) }
334}
335
336impl From<serde_json::Error> for ErrorTypes {
337    fn from(value: serde_json::Error) -> Self { Self::Serde(value) }
338}
339
340impl std::error::Error for ErrorTypes {}
341
342/// A convenience trait that lets you pass an i32 value or None for arguments
343pub trait Ii32: Into<Option<i32>> + Copy {}
344impl<T: Into<Option<i32>> + Copy> Ii32 for T {}
345
346/// A convenience trait that lets you pass an usize value or None for arguments
347pub trait Iusize: Into<Option<usize>> + Copy {}
348impl<T: Into<Option<usize>> + Copy> Iusize for T {}
349
350
351/// A macro for retrying calls to redis when an IO error occurs
352macro_rules! retry_call {
353
354    (handle_error, $err:ident, $exponent:ident, $maximum:ident) => {
355        {
356            // If the error from redis is something not related to IO let the error propagate
357            if !$err.is_io_error() {
358                break Result::<_, ErrorTypes>::Err($err.into())
359            }
360
361            // For IO errors print a warning and sleep
362            log::warn!("No connection to Redis, reconnecting... [{}]", $err);
363            tokio::time::sleep(tokio::time::Duration::from_secs_f64(2f64.powf($exponent))).await;
364            $exponent = ($exponent + 1.0).min($maximum);
365        }
366    };
367
368    (handle_output, $call:expr, $exponent:ident, $maximum:ident) => {
369        {
370            match $call {
371                Ok(val) => {
372                    if $exponent > -7.0 {
373                        log::info!("Reconnected to Redis!")
374                    }
375                    break Ok(val)
376                },
377                Err(err) => retry_call!(handle_error, err, $exponent, $maximum),
378            }
379        }
380    };
381
382    ($pool:expr, $method:ident, $($args:expr),+) => {
383        {
384            // track our backoff parameters
385            let mut exponent = -7.0;
386            let maximum = 3.0;
387            loop {
388                // get a (fresh if needed) connection form the pool
389                let mut con = match $pool.get().await {
390                    Ok(connection) => connection,
391                    Err(deadpool_redis::PoolError::Backend(err)) => {
392                        retry_call!(handle_error, err, exponent, maximum);
393                        continue
394                    },
395                    Err(err) => break Err(err.into())
396                };
397
398                // execute the method given with the argments specified
399                retry_call!(handle_output, con.$method($($args),+).await, exponent, maximum)
400            }
401        }
402    };
403
404    (method, $pool:expr, $obj:expr, $method:ident) => {
405        {
406            // track our backoff parameters
407            let mut exponent = -7.0;
408            let maximum = 3.0;
409            loop {
410                // get a (fresh if needed) connection form the pool
411                let mut con = match $pool.get().await {
412                    Ok(connection) => connection,
413                    Err(deadpool_redis::PoolError::Backend(err)) => {
414                        retry_call!(handle_error, err, exponent, maximum);
415                        continue
416                    },
417                    Err(err) => break Err(err.into())
418                };
419
420                // execute the method given with the argments specified
421                retry_call!(handle_output, $obj.$method(&mut con).await, exponent, maximum)
422            }
423        }
424    };
425}
426
427pub (crate) use retry_call;
428
429#[cfg(test)]
430pub (crate) mod test {
431    use std::sync::Arc;
432
433    use redis::ConnectionInfo;
434
435    use crate::{ErrorTypes, PriorityQueue, Queue, RedisObjects};
436
437
438    pub (crate) async fn redis_connection() -> Arc<RedisObjects> {
439        RedisObjects::open(ConnectionInfo{
440            addr: redis::ConnectionAddr::Tcp("localhost".to_string(), 6379),
441            redis: Default::default(),
442        }).unwrap()
443    }
444
445    fn init() {
446        let _ = env_logger::builder().is_test(true).try_init();
447    }
448
449    // simple function to help test that reconnect is working.
450    // Enable and run this then turn your redis server on and off.
451    // #[tokio::test]
452    // async fn reconnect() {
453    //     init();
454    //     let connection = redis_connection().await;
455    //     let mut listener = connection.subscribe("abc123".to_string());
456    //     // launch a thread that sends messages every second forever
457    //     tokio::spawn(async move {
458    //         loop {
459    //             tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
460    //             connection.publish("abc123", b"100").await.unwrap();
461    //         }
462    //     });
463
464    //     while let Some(msg) = listener.recv().await {
465    //         println!("{msg:?}");
466    //     }
467    // }
468
469    #[tokio::test]
470    async fn test_sets() {
471        init();
472        let redis = redis_connection().await;
473        let s = redis.set::<String>("test-set".to_owned());
474
475        s.delete().await.unwrap();
476
477        let values = &["a", "b", "1", "2"];
478        let owned = values.iter().map(|v|v.to_string()).collect::<Vec<_>>();
479        assert_eq!(s.add_batch(&owned).await.unwrap(), 4);
480        assert_eq!(s.length().await.unwrap(), 4);
481        let members = s.members().await.unwrap();
482        assert_eq!(members.len(), 4);
483        for x in members {
484            assert!(owned.contains(&x));
485        }
486        assert!(owned.contains(&s.random().await.unwrap().unwrap()));
487        assert!(s.exist(&owned[2]).await.unwrap());
488        s.remove(&owned[2]).await.unwrap();
489        assert!(!s.exist(&owned[2]).await.unwrap());
490        let pop_val = s.pop().await.unwrap().unwrap();
491        assert!(owned.contains(&pop_val));
492        assert!(!s.exist(&pop_val).await.unwrap());
493        assert_eq!(s.length().await.unwrap(), 2);
494
495        assert!(s.limited_add(&"dog".to_owned(), 3).await.unwrap());
496        assert!(!s.limited_add(&"cat".to_owned(), 3).await.unwrap());
497        assert!(s.exist(&"dog".to_owned()).await.unwrap());
498        assert!(!s.exist(&"cat".to_owned()).await.unwrap());
499        assert_eq!(s.length().await.unwrap(), 3);
500
501        for pop_val in s.pop_all().await.unwrap() {
502            assert!(values.contains(&pop_val.as_str()) || ["cat", "dog"].contains(&pop_val.as_str()));
503        }
504        assert!(s.pop().await.unwrap().is_none());
505        assert_eq!(s.length().await.unwrap(), 0);
506    }
507
508
509    // def test_expiring_sets(redis_connection):
510    //     if redis_connection:
511    //         from assemblyline.remote.datatypes.set import ExpiringSet
512    //         with ExpiringSet('test-expiring-set', ttl=1) as es:
513    //             es.delete()
514
515    //             values = ['a', 'b', 1, 2]
516    //             assert es.add(*values) == 4
517    //             assert es.length() == 4
518    //             assert es.exist(values[2])
519    //             for x in es.members():
520    //                 assert x in values
521    //             time.sleep(1.1)
522    //             assert es.length() == 0
523    //             assert not es.exist(values[2])
524
525
526    // # noinspection PyShadowingNames
527    // def test_lock(redis_connection):
528    //     if redis_connection:
529    //         from assemblyline.remote.datatypes.lock import Lock
530
531    //         def locked_execution(next_thread=None):
532    //             with Lock('test', 10):
533    //                 if next_thread:
534    //                     next_thread.start()
535    //                 time.sleep(2)
536
537    //         t2 = Thread(target=locked_execution)
538    //         t1 = Thread(target=locked_execution, args=(t2,))
539    //         t1.start()
540
541    //         time.sleep(1)
542    //         assert t1.is_alive()
543    //         assert t2.is_alive()
544    //         time.sleep(2)
545    //         assert not t1.is_alive()
546    //         assert t2.is_alive()
547    //         time.sleep(2)
548    //         assert not t1.is_alive()
549    //         assert not t2.is_alive()
550
551    #[tokio::test]
552    async fn priority_queue() -> Result<(), ErrorTypes> {
553        let redis = redis_connection().await;
554        let pq = redis.priority_queue("test-priority-queue".to_string());
555        pq.delete().await?;
556
557        for x in 0..10 {
558            pq.push(100.0, &x.to_string()).await?;
559        }
560
561        let a_key = pq.push(101.0, &"a".to_string()).await?;
562        let z_key = pq.push(99.0, &"z".to_string()).await?;
563        assert_eq!(pq.rank(&a_key).await?.unwrap(), 0);
564        assert_eq!(pq.rank(&z_key).await?.unwrap(), pq.length().await? - 1);
565        assert!(pq.rank(b"onethuosentuh").await?.is_none());
566
567        assert_eq!(pq.pop(1).await?, ["a"]);
568        assert_eq!(pq.unpush(1).await?, ["z"]);
569        assert_eq!(pq.count(100.0, 100.0).await?, 10);
570        assert_eq!(pq.pop(1).await?, ["0"]);
571        assert_eq!(pq.unpush(1).await?, ["9"]);
572        assert_eq!(pq.length().await?, 8);
573        assert_eq!(pq.pop(4).await?, ["1", "2", "3", "4"]);
574        assert_eq!(pq.unpush(3).await?, ["8", "7", "6"]);
575        assert_eq!(pq.length().await?, 1);
576        // Should be [(100, 5)] at this point
577
578        // for x in 0..5 {
579        for x in 0..5 {
580            pq.push(100.0 + x as f64, &x.to_string()).await?;
581        }
582
583        assert_eq!(pq.length().await?, 6);
584        assert!(pq.dequeue_range(Some(106), None, None, None).await?.is_empty());
585        assert_eq!(pq.length().await?, 6);
586        // 3 and 4 are both options, 4 has higher score
587        assert_eq!(pq.dequeue_range(Some(103), None, None, None).await?, vec!["4"]);
588        // 2 and 3 are both options, 3 has higher score, skip it
589        assert_eq!(pq.dequeue_range(Some(102), None, Some(1), None).await?, vec!["2"]);
590        // Take some off the other end
591        assert_eq!(pq.dequeue_range(None, Some(100), None, Some(10)).await?, vec!["5", "0"]);
592        assert_eq!(pq.length().await?, 2);
593
594        let other = redis.priority_queue("second-priority-queue".to_string());
595        other.delete().await?;
596        other.push(100.0, &"a".to_string()).await?;
597        assert_eq!(PriorityQueue::all_length(&[&other, &pq]).await?, [1, 2]);
598        assert!(PriorityQueue::select(&[&other, &pq], None).await?.is_some());
599        assert!(PriorityQueue::select(&[&other, &pq], None).await?.is_some());
600        assert!(PriorityQueue::select(&[&other, &pq], None).await?.is_some());
601        assert_eq!(PriorityQueue::all_length(&[&other, &pq]).await?, [0, 0]);
602
603        pq.push(50.0, &"first".to_string()).await?;
604        pq.push(-50.0, &"second".to_string()).await?;
605
606        assert_eq!(pq.dequeue_range(Some(0), Some(100), None, None).await?, ["first"]);
607        assert_eq!(pq.dequeue_range(Some(-100), Some(0), None, None).await?, ["second"]);
608        Ok(())
609    }
610
611
612    // # noinspection PyShadowingNames,PyUnusedLocal
613    // def test_unique_priority_queue(redis_connection):
614    //     from assemblyline.remote.datatypes.queues.priority import UniquePriorityQueue
615    //     with UniquePriorityQueue('test-priority-queue') as pq:
616    //         pq.delete()
617
618    //         for x in range(10):
619    //             pq.push(100, x)
620    //         assert pq.length() == 10
621
622    //         # Values should be unique, this should have no effect on the length
623    //         for x in range(10):
624    //             pq.push(100, x)
625    //         assert pq.length() == 10
626
627    //         pq.push(101, 'a')
628    //         pq.push(99, 'z')
629
630    //         assert pq.pop() == 'a'
631    //         assert pq.unpush() == 'z'
632    //         assert pq.count(100, 100) == 10
633    //         assert pq.pop() == 0
634    //         assert pq.unpush() == 9
635    //         assert pq.length() == 8
636    //         assert pq.pop(4) == [1, 2, 3, 4]
637    //         assert pq.unpush(3) == [8, 7, 6]
638    //         assert pq.length() == 1  # Should be [<100, 5>] at this point
639
640    //         for x in range(5):
641    //             pq.push(100 + x, x)
642
643    //         assert pq.length() == 6
644    //         assert pq.dequeue_range(lower_limit=106) == []
645    //         assert pq.length() == 6
646    //         assert pq.dequeue_range(lower_limit=103) == [4]  # 3 and 4 are both options, 4 has higher score
647    //         assert pq.dequeue_range(lower_limit=102, skip=1) == [2]  # 2 and 3 are both options, 3 has higher score, skip it
648    //         assert sorted(pq.dequeue_range(upper_limit=100, num=10)) == [0, 5]  # Take some off the other end
649    //         assert pq.length() == 2
650    //         pq.pop(2)
651
652    //         pq.push(50, 'first')
653    //         pq.push(-50, 'second')
654
655    //         assert pq.dequeue_range(0, 100) == ['first']
656    //         assert pq.dequeue_range(-100, 0) == ['second']
657
658    #[tokio::test]
659    async fn named_queue() {
660        let redis = redis_connection().await;
661
662        let nq = redis.queue("test-named-queue".to_owned(), None);
663        nq.delete().await.unwrap();
664
665        assert!(nq.pop().await.unwrap().is_none());
666        assert!(nq.pop_batch(100).await.unwrap().is_empty());
667
668        for x in 0..5 {
669            nq.push(&x).await.unwrap();
670        }
671
672        assert_eq!(nq.content().await.unwrap(), [0, 1, 2, 3, 4]);
673
674        assert_eq!(nq.pop_batch(100).await.unwrap(), [0, 1, 2, 3, 4]);
675
676        for x in 0..5 {
677            nq.push(&x).await.unwrap();
678        }
679
680        assert_eq!(nq.length().await.unwrap(), 5);
681        nq.push_batch(&(0..5).collect::<Vec<i32>>()).await.unwrap();
682        assert_eq!(nq.length().await.unwrap(), 10);
683
684        assert_eq!(nq.peek_next().await.unwrap(), nq.pop().await.unwrap());
685        assert_eq!(nq.peek_next().await.unwrap(), Some(1));
686        let v = nq.pop().await.unwrap().unwrap();
687        assert_eq!(v, 1);
688        assert_eq!(nq.peek_next().await.unwrap().unwrap(), 2);
689        nq.unpop(&v).await.unwrap();
690        assert_eq!(nq.peek_next().await.unwrap().unwrap(), 1);
691
692        assert_eq!(Queue::select(&[&nq], None).await.unwrap().unwrap(), ("test-named-queue".to_owned(), 1));
693
694        let nq1 = redis.queue("test-named-queue-1".to_owned(), None);
695        nq1.delete().await.unwrap();
696        let nq2 = redis.queue("test-named-queue-2".to_owned(), None);
697        nq2.delete().await.unwrap();
698
699        nq1.push(&1).await.unwrap();
700        nq2.push(&2).await.unwrap();
701
702        assert_eq!(Queue::select(&[&nq1, &nq2], None).await.unwrap().unwrap(), ("test-named-queue-1".to_owned(), 1));
703        assert_eq!(Queue::select(&[&nq1, &nq2], None).await.unwrap().unwrap(), ("test-named-queue-2".to_owned(), 2));
704    }
705
706    // # noinspection PyShadowingNames
707    // def test_multi_queue(redis_connection):
708    //     if redis_connection:
709    //         from assemblyline.remote.datatypes.queues.multi import MultiQueue
710    //         mq = MultiQueue()
711    //         mq.delete('test-multi-q1')
712    //         mq.delete('test-multi-q2')
713
714    //         for x in range(5):
715    //             mq.push('test-multi-q1', x+1)
716    //             mq.push('test-multi-q2', x+6)
717
718    //         assert mq.length('test-multi-q1') == 5
719    //         assert mq.length('test-multi-q2') == 5
720
721    //         assert mq.pop('test-multi-q1') == 1
722    //         assert mq.pop('test-multi-q2') == 6
723
724    //         assert mq.length('test-multi-q1') == 4
725    //         assert mq.length('test-multi-q2') == 4
726
727    //         mq.delete('test-multi-q1')
728    //         mq.delete('test-multi-q2')
729
730    //         assert mq.length('test-multi-q1') == 0
731    //         assert mq.length('test-multi-q2') == 0
732
733
734    // # noinspection PyShadowingNames
735    // def test_comms_queue(redis_connection):
736    //     if redis_connection:
737    //         from assemblyline.remote.datatypes.queues.comms import CommsQueue
738
739    //         def publish_messages(message_list):
740    //             time.sleep(0.1)
741    //             with CommsQueue('test-comms-queue') as cq_p:
742    //                 for message in message_list:
743    //                     cq_p.publish(message)
744
745    //         msg_list = ["bob", 1, {"bob": 1}, [1, 2, 3], None, "Nice!", "stop"]
746    //         t = Thread(target=publish_messages, args=(msg_list,))
747    //         t.start()
748
749    //         with CommsQueue('test-comms-queue') as cq:
750    //             x = 0
751    //             for msg in cq.listen():
752    //                 if msg == "stop":
753    //                     break
754
755    //                 assert msg == msg_list[x]
756
757    //                 x += 1
758
759    //         t.join()
760    //         assert not t.is_alive()
761
762
763    // # noinspection PyShadowingNames
764    // def test_user_quota_tracker(redis_connection):
765    //     if redis_connection:
766    //         from assemblyline.remote.datatypes.user_quota_tracker import UserQuotaTracker
767
768    //         max_quota = 3
769    //         timeout = 2
770    //         name = get_random_id()
771    //         uqt = UserQuotaTracker('test-quota', timeout=timeout)
772
773    //         # First 0 to max_quota items should succeed
774    //         for _ in range(max_quota):
775    //             assert uqt.begin(name, max_quota) is True
776
777    //         # All other items should fail until items timeout
778    //         for _ in range(max_quota):
779    //             assert uqt.begin(name, max_quota) is False
780
781    //         # if you remove and item only one should be able to go in
782    //         uqt.end(name)
783    //         assert uqt.begin(name, max_quota) is True
784    //         assert uqt.begin(name, max_quota) is False
785
786    //         # if you wait the timeout, all items can go in
787    //         time.sleep(timeout+1)
788    //         for _ in range(max_quota):
789    //             assert uqt.begin(name, max_quota) is True
790
791
792// def test_exact_event(redis_connection: Redis[Any]):
793//     calls: list[dict[str, Any]] = []
794
795//     def _track_call(data: Optional[dict[str, Any]]):
796//         if data is not None:
797//             calls.append(data)
798
799//     watcher = EventWatcher(redis_connection)
800//     try:
801//         watcher.register('changes.test', _track_call)
802//         watcher.start()
803//         sender = EventSender('changes.', redis_connection)
804//         start = time.time()
805
806//         while len(calls) < 5:
807//             sender.send('test', {'payload': 100})
808
809//             if time.time() - start > 10:
810//                 pytest.fail()
811//         assert len(calls) >= 5
812
813//         for row in calls:
814//             assert row == {'payload': 100}
815
816//     finally:
817//         watcher.stop()
818
819
820// def test_serialized_event(redis_connection: Redis[Any]):
821//     import threading
822//     started = threading.Event()
823
824//     class Event(enum.IntEnum):
825//         ADD = 0
826//         REM = 1
827
828//     @dataclass
829//     class Message:
830//         name: str
831//         event: Event
832
833//     def _serialize(message: Message):
834//         return json.dumps(asdict(message))
835
836//     def _deserialize(data: str) -> Message:
837//         return Message(**json.loads(data))
838
839//     calls: list[Message] = []
840
841//     def _track_call(data: Optional[Message]):
842//         if data is not None:
843//             calls.append(data)
844//         else:
845//             started.set()
846
847//     watcher = EventWatcher[Message](redis_connection, deserializer=_deserialize)
848//     try:
849//         watcher.register('changes.test', _track_call)
850//         watcher.skip_first_refresh = False
851//         watcher.start()
852//         assert started.wait(timeout=5)
853//         sender = EventSender[Message]('changes.', redis_connection, serializer=_serialize)
854//         start = time.time()
855
856//         while len(calls) < 5:
857//             sender.send('test', Message(name='test', event=Event.ADD))
858
859//             if time.time() - start > 10:
860//                 pytest.fail()
861//         assert len(calls) >= 5
862
863//         expected = Message(name='test', event=Event.ADD)
864//         for row in calls:
865//             assert row == expected
866
867//     finally:
868//         watcher.stop()
869
870
871// def test_pattern_event(redis_connection: Redis[Any]):
872//     calls: list[dict[str, Any]] = []
873
874//     def _track_call(data: Optional[dict[str, Any]]):
875//         if data is not None:
876//             calls.append(data)
877
878//     watcher = EventWatcher(redis_connection)
879//     try:
880//         watcher.register('changes.*', _track_call)
881//         watcher.start()
882//         sender = EventSender('changes.', redis_connection)
883//         start = time.time()
884
885//         while len(calls) < 5:
886//             sender.send(uuid.uuid4().hex, {'payload': 100})
887
888//             if time.time() - start > 10:
889//                 pytest.fail()
890//         assert len(calls) >= 5
891
892//         for row in calls:
893//             assert row == {'payload': 100}
894
895//     finally:
896//         watcher.stop()
897
898
899}