oxanus/
queue.rs

1use serde::Serialize;
2use std::{
3    hash::{Hash, Hasher},
4    time::Duration,
5};
6
7pub trait Queue: Send + Sync + Serialize {
8    fn key(&self) -> String {
9        match Self::to_config().kind {
10            QueueKind::Static { key } => key,
11            QueueKind::Dynamic { prefix, .. } => {
12                let value = serde_json::to_value(self).unwrap_or_default();
13                format!("{}#{}", prefix, value_to_queue_key(value))
14            }
15        }
16    }
17    fn to_config() -> QueueConfig;
18    fn config(&self) -> QueueConfig {
19        Self::to_config()
20    }
21}
22
23#[derive(Debug, Clone)]
24pub struct QueueConfig {
25    pub kind: QueueKind,
26    pub concurrency: usize,
27    pub throttle: Option<QueueThrottle>,
28}
29
30impl PartialEq for QueueConfig {
31    fn eq(&self, other: &Self) -> bool {
32        self.kind == other.kind
33    }
34}
35
36impl Eq for QueueConfig {}
37
38impl Hash for QueueConfig {
39    fn hash<H: Hasher>(&self, state: &mut H) {
40        self.kind.hash(state);
41    }
42}
43
44impl QueueConfig {
45    pub fn as_dynamic(prefix: impl Into<String>) -> Self {
46        Self {
47            kind: QueueKind::Dynamic {
48                prefix: prefix.into(),
49                sleep_period: Duration::from_millis(500),
50            },
51            concurrency: 1,
52            throttle: None,
53        }
54    }
55
56    pub fn as_static(key: impl Into<String>) -> Self {
57        Self {
58            kind: QueueKind::Static { key: key.into() },
59            concurrency: 1,
60            throttle: None,
61        }
62    }
63
64    pub fn concurrency(mut self, concurrency: usize) -> Self {
65        self.concurrency = concurrency;
66        self
67    }
68
69    pub fn throttle(mut self, throttle: QueueThrottle) -> Self {
70        self.throttle = Some(throttle);
71        self
72    }
73}
74
75#[derive(Debug, Clone)]
76pub enum QueueKind {
77    Static {
78        key: String,
79    },
80    Dynamic {
81        prefix: String,
82        sleep_period: Duration,
83    },
84}
85
86impl PartialEq for QueueKind {
87    fn eq(&self, other: &Self) -> bool {
88        match (self, other) {
89            (QueueKind::Static { key: k1 }, QueueKind::Static { key: k2 }) => k1 == k2,
90            (QueueKind::Dynamic { prefix: p1, .. }, QueueKind::Dynamic { prefix: p2, .. }) => {
91                p1 == p2
92            }
93            _ => false,
94        }
95    }
96}
97
98impl Eq for QueueKind {}
99
100impl Hash for QueueKind {
101    fn hash<H: Hasher>(&self, state: &mut H) {
102        match self {
103            QueueKind::Static { key } => key.hash(state),
104            QueueKind::Dynamic { prefix, .. } => prefix.hash(state),
105        }
106    }
107}
108
109impl QueueKind {
110    pub fn is_dynamic(&self) -> bool {
111        matches!(self, QueueKind::Dynamic { .. })
112    }
113
114    pub fn is_static(&self) -> bool {
115        matches!(self, QueueKind::Static { .. })
116    }
117}
118
119#[derive(Debug, Clone)]
120pub struct QueueThrottle {
121    pub window_ms: i64,
122    pub limit: u64,
123}
124
125fn value_to_queue_key(value: serde_json::Value) -> String {
126    match value {
127        serde_json::Value::Null => "".to_string(),
128        serde_json::Value::String(s) => s,
129        serde_json::Value::Number(n) => n.to_string(),
130        serde_json::Value::Bool(b) => b.to_string(),
131        serde_json::Value::Array(a) => a
132            .into_iter()
133            .map(value_to_queue_key)
134            .collect::<Vec<String>>()
135            .join(":"),
136        serde_json::Value::Object(object) => object
137            .into_iter()
138            .map(|(k, v)| format!("{}={}", k, value_to_queue_key(v)))
139            .collect::<Vec<String>>()
140            .join(":"),
141    }
142}
143
144#[cfg(test)]
145mod tests {
146    use super::*;
147
148    #[derive(Serialize)]
149    struct TestStaticQueue;
150
151    impl Queue for TestStaticQueue {
152        fn to_config() -> QueueConfig {
153            QueueConfig::as_static("test_static_queue")
154        }
155    }
156
157    #[derive(Serialize)]
158    struct TestDynamicQueue {
159        name: String,
160        age: u32,
161        is_student: bool,
162    }
163
164    impl Queue for TestDynamicQueue {
165        fn to_config() -> QueueConfig {
166            QueueConfig::as_dynamic("test_dynamic_queue")
167        }
168    }
169
170    #[test]
171    fn test_queue_key() {
172        let static_queue = TestStaticQueue;
173        let dynamic_queue = TestDynamicQueue {
174            name: "John".to_string(),
175            age: 30,
176            is_student: true,
177        };
178
179        assert_eq!(static_queue.key(), "test_static_queue");
180        assert_eq!(
181            dynamic_queue.key(),
182            "test_dynamic_queue#name=John:age=30:is_student=true"
183        );
184    }
185}