Skip to main content

oxanus/
queue.rs

1use serde::Serialize;
2use std::{
3    hash::{Hash, Hasher},
4    time::Duration,
5};
6
7pub trait Queue: Send + Sync + Serialize {
8    fn key(&self) -> String {
9        match Self::to_config().kind {
10            QueueKind::Static { key } => key,
11            QueueKind::Dynamic { prefix, .. } => {
12                let value = serde_json::to_value(self).unwrap_or_default();
13                format!("{}#{}", prefix, value_to_queue_key(value))
14            }
15        }
16    }
17    fn to_config() -> QueueConfig;
18    fn config(&self) -> QueueConfig {
19        Self::to_config()
20    }
21}
22
23#[derive(Debug, Clone)]
24pub struct QueueConfig {
25    pub kind: QueueKind,
26    pub concurrency: usize,
27    pub throttle: Option<QueueThrottle>,
28}
29
30impl PartialEq for QueueConfig {
31    fn eq(&self, other: &Self) -> bool {
32        self.kind == other.kind
33    }
34}
35
36impl Eq for QueueConfig {}
37
38impl Hash for QueueConfig {
39    fn hash<H: Hasher>(&self, state: &mut H) {
40        self.kind.hash(state);
41    }
42}
43
44impl QueueConfig {
45    pub fn as_dynamic(prefix: impl Into<String>) -> Self {
46        Self {
47            kind: QueueKind::Dynamic {
48                prefix: prefix.into(),
49                sleep_period: Duration::from_millis(500),
50            },
51            concurrency: 1,
52            throttle: None,
53        }
54    }
55
56    pub fn as_static(key: impl Into<String>) -> Self {
57        Self {
58            kind: QueueKind::Static { key: key.into() },
59            concurrency: 1,
60            throttle: None,
61        }
62    }
63
64    pub fn concurrency(mut self, concurrency: usize) -> Self {
65        self.concurrency = concurrency;
66        self
67    }
68
69    pub fn throttle(mut self, throttle: QueueThrottle) -> Self {
70        self.throttle = Some(throttle);
71        self
72    }
73
74    pub fn static_key(&self) -> Option<String> {
75        match &self.kind {
76            QueueKind::Static { key } => Some(key.clone()),
77            QueueKind::Dynamic { .. } => None,
78        }
79    }
80}
81
82#[derive(Debug, Clone)]
83pub enum QueueKind {
84    Static {
85        key: String,
86    },
87    Dynamic {
88        prefix: String,
89        sleep_period: Duration,
90    },
91}
92
93impl PartialEq for QueueKind {
94    fn eq(&self, other: &Self) -> bool {
95        match (self, other) {
96            (QueueKind::Static { key: k1 }, QueueKind::Static { key: k2 }) => k1 == k2,
97            (QueueKind::Dynamic { prefix: p1, .. }, QueueKind::Dynamic { prefix: p2, .. }) => {
98                p1 == p2
99            }
100            _ => false,
101        }
102    }
103}
104
105impl Eq for QueueKind {}
106
107impl Hash for QueueKind {
108    fn hash<H: Hasher>(&self, state: &mut H) {
109        match self {
110            QueueKind::Static { key } => key.hash(state),
111            QueueKind::Dynamic { prefix, .. } => prefix.hash(state),
112        }
113    }
114}
115
116impl QueueKind {
117    pub fn is_dynamic(&self) -> bool {
118        matches!(self, QueueKind::Dynamic { .. })
119    }
120
121    pub fn is_static(&self) -> bool {
122        matches!(self, QueueKind::Static { .. })
123    }
124}
125
126#[derive(Debug, Clone)]
127pub struct QueueThrottle {
128    pub window_ms: i64,
129    pub limit: u64,
130}
131
132fn value_to_queue_key(value: serde_json::Value) -> String {
133    match value {
134        serde_json::Value::Null => "".to_string(),
135        serde_json::Value::String(s) => s,
136        serde_json::Value::Number(n) => n.to_string(),
137        serde_json::Value::Bool(b) => b.to_string(),
138        serde_json::Value::Array(a) => a
139            .into_iter()
140            .map(value_to_queue_key)
141            .collect::<Vec<String>>()
142            .join(":"),
143        serde_json::Value::Object(object) => object
144            .into_iter()
145            .map(|(k, v)| format!("{}={}", k, value_to_queue_key(v)))
146            .collect::<Vec<String>>()
147            .join(":"),
148    }
149}
150
151#[cfg(test)]
152mod tests {
153    use super::*;
154
155    #[derive(Serialize)]
156    struct TestStaticQueue;
157
158    impl Queue for TestStaticQueue {
159        fn to_config() -> QueueConfig {
160            QueueConfig::as_static("test_static_queue")
161        }
162    }
163
164    #[derive(Serialize)]
165    struct TestDynamicQueue {
166        name: String,
167        age: u32,
168        is_student: bool,
169    }
170
171    impl Queue for TestDynamicQueue {
172        fn to_config() -> QueueConfig {
173            QueueConfig::as_dynamic("test_dynamic_queue")
174        }
175    }
176
177    #[test]
178    fn test_queue_key() {
179        let static_queue = TestStaticQueue;
180        let dynamic_queue = TestDynamicQueue {
181            name: "John".to_string(),
182            age: 30,
183            is_student: true,
184        };
185
186        assert_eq!(static_queue.key(), "test_static_queue");
187        assert_eq!(
188            dynamic_queue.key(),
189            "test_dynamic_queue#name=John:age=30:is_student=true"
190        );
191    }
192
193    #[cfg(feature = "macros")]
194    #[test]
195    fn test_define_queue_with_macro() {
196        use crate as oxanus; // needed for unit test
197
198        #[derive(oxanus::Registry)]
199        #[allow(dead_code)]
200        struct ComponentRegistry(oxanus::ComponentRegistry<(), ()>);
201
202        #[derive(Serialize, oxanus::Queue)]
203        struct DefaultQueue;
204
205        assert_eq!(DefaultQueue.key(), "default_queue");
206        assert_eq!(DefaultQueue.config().concurrency, 1);
207
208        #[derive(Serialize, oxanus::Queue)]
209        #[oxanus(key = "static_queue")]
210        struct QueueWithKey;
211
212        assert_eq!(QueueWithKey.key(), "static_queue");
213        assert_eq!(QueueWithKey.config().concurrency, 1);
214
215        #[derive(Serialize, oxanus::Queue)]
216        #[oxanus(concurrency = 2)]
217        struct QueueWithConcurrency;
218
219        assert_eq!(QueueWithConcurrency.key(), "queue_with_concurrency");
220        assert_eq!(QueueWithConcurrency.config().concurrency, 2);
221
222        #[derive(Serialize, oxanus::Queue)]
223        #[oxanus(concurrency = 2)]
224        #[oxanus(throttle(window_ms = 3, limit = 4))]
225        struct QueueWithThrottle;
226
227        assert_eq!(QueueWithThrottle.key(), "queue_with_throttle");
228        assert_eq!(QueueWithThrottle.config().concurrency, 2);
229        assert_eq!(QueueWithThrottle.config().throttle.unwrap().window_ms, 3);
230        assert_eq!(QueueWithThrottle.config().throttle.unwrap().limit, 4);
231
232        #[derive(Serialize, oxanus::Queue)]
233        #[oxanus(key = "static_queue_key")]
234        #[oxanus(concurrency = 2)]
235        struct QueueWithKeyAndConcurrency;
236
237        assert_eq!(QueueWithKeyAndConcurrency.key(), "static_queue_key");
238        assert_eq!(QueueWithKeyAndConcurrency.config().concurrency, 2);
239
240        #[derive(Serialize, oxanus::Queue)]
241        #[oxanus(key = "static_queue_key", concurrency = 3)]
242        struct QueueWithKeyAndConcurrency1 {}
243
244        assert_eq!(QueueWithKeyAndConcurrency1 {}.key(), "static_queue_key");
245        assert_eq!(QueueWithKeyAndConcurrency1 {}.config().concurrency, 3);
246
247        #[derive(Serialize, oxanus::Queue)]
248        #[oxanus(prefix = "dyn_queue", concurrency = 2)]
249        struct DynQueue {
250            i: i32,
251        }
252
253        assert_eq!(DynQueue { i: 2 }.key(), "dyn_queue#i=2");
254        assert_eq!(DynQueue::to_config().concurrency, 2);
255    }
256}