1use serde::Serialize;
2use std::{
3 hash::{Hash, Hasher},
4 time::Duration,
5};
6
7pub trait Queue: Send + Sync + Serialize {
8 fn key(&self) -> String {
9 match Self::to_config().kind {
10 QueueKind::Static { key } => key,
11 QueueKind::Dynamic { prefix, .. } => {
12 let value = serde_json::to_value(self).unwrap_or_default();
13 format!("{}#{}", prefix, value_to_queue_key(value))
14 }
15 }
16 }
17 fn to_config() -> QueueConfig;
18 fn config(&self) -> QueueConfig {
19 Self::to_config()
20 }
21}
22
23#[derive(Debug, Clone)]
24pub struct QueueConfig {
25 pub kind: QueueKind,
26 pub concurrency: usize,
27 pub throttle: Option<QueueThrottle>,
28}
29
30impl PartialEq for QueueConfig {
31 fn eq(&self, other: &Self) -> bool {
32 self.kind == other.kind
33 }
34}
35
36impl Eq for QueueConfig {}
37
38impl Hash for QueueConfig {
39 fn hash<H: Hasher>(&self, state: &mut H) {
40 self.kind.hash(state);
41 }
42}
43
44impl QueueConfig {
45 pub fn as_dynamic(prefix: impl Into<String>) -> Self {
46 Self {
47 kind: QueueKind::Dynamic {
48 prefix: prefix.into(),
49 sleep_period: Duration::from_millis(500),
50 },
51 concurrency: 1,
52 throttle: None,
53 }
54 }
55
56 pub fn as_static(key: impl Into<String>) -> Self {
57 Self {
58 kind: QueueKind::Static { key: key.into() },
59 concurrency: 1,
60 throttle: None,
61 }
62 }
63
64 pub fn concurrency(mut self, concurrency: usize) -> Self {
65 self.concurrency = concurrency;
66 self
67 }
68
69 pub fn throttle(mut self, throttle: QueueThrottle) -> Self {
70 self.throttle = Some(throttle);
71 self
72 }
73
74 pub fn static_key(&self) -> Option<String> {
75 match &self.kind {
76 QueueKind::Static { key } => Some(key.clone()),
77 QueueKind::Dynamic { .. } => None,
78 }
79 }
80}
81
82#[derive(Debug, Clone)]
83pub enum QueueKind {
84 Static {
85 key: String,
86 },
87 Dynamic {
88 prefix: String,
89 sleep_period: Duration,
90 },
91}
92
93impl PartialEq for QueueKind {
94 fn eq(&self, other: &Self) -> bool {
95 match (self, other) {
96 (QueueKind::Static { key: k1 }, QueueKind::Static { key: k2 }) => k1 == k2,
97 (QueueKind::Dynamic { prefix: p1, .. }, QueueKind::Dynamic { prefix: p2, .. }) => {
98 p1 == p2
99 }
100 _ => false,
101 }
102 }
103}
104
105impl Eq for QueueKind {}
106
107impl Hash for QueueKind {
108 fn hash<H: Hasher>(&self, state: &mut H) {
109 match self {
110 QueueKind::Static { key } => key.hash(state),
111 QueueKind::Dynamic { prefix, .. } => prefix.hash(state),
112 }
113 }
114}
115
116impl QueueKind {
117 pub fn is_dynamic(&self) -> bool {
118 matches!(self, QueueKind::Dynamic { .. })
119 }
120
121 pub fn is_static(&self) -> bool {
122 matches!(self, QueueKind::Static { .. })
123 }
124}
125
126#[derive(Debug, Clone)]
127pub struct QueueThrottle {
128 pub window_ms: i64,
129 pub limit: u64,
130}
131
132fn value_to_queue_key(value: serde_json::Value) -> String {
133 match value {
134 serde_json::Value::Null => "".to_string(),
135 serde_json::Value::String(s) => s,
136 serde_json::Value::Number(n) => n.to_string(),
137 serde_json::Value::Bool(b) => b.to_string(),
138 serde_json::Value::Array(a) => a
139 .into_iter()
140 .map(value_to_queue_key)
141 .collect::<Vec<String>>()
142 .join(":"),
143 serde_json::Value::Object(object) => object
144 .into_iter()
145 .map(|(k, v)| format!("{}={}", k, value_to_queue_key(v)))
146 .collect::<Vec<String>>()
147 .join(":"),
148 }
149}
150
151#[cfg(test)]
152mod tests {
153 use super::*;
154
155 #[derive(Serialize)]
156 struct TestStaticQueue;
157
158 impl Queue for TestStaticQueue {
159 fn to_config() -> QueueConfig {
160 QueueConfig::as_static("test_static_queue")
161 }
162 }
163
164 #[derive(Serialize)]
165 struct TestDynamicQueue {
166 name: String,
167 age: u32,
168 is_student: bool,
169 }
170
171 impl Queue for TestDynamicQueue {
172 fn to_config() -> QueueConfig {
173 QueueConfig::as_dynamic("test_dynamic_queue")
174 }
175 }
176
177 #[test]
178 fn test_queue_key() {
179 let static_queue = TestStaticQueue;
180 let dynamic_queue = TestDynamicQueue {
181 name: "John".to_string(),
182 age: 30,
183 is_student: true,
184 };
185
186 assert_eq!(static_queue.key(), "test_static_queue");
187 assert_eq!(
188 dynamic_queue.key(),
189 "test_dynamic_queue#name=John:age=30:is_student=true"
190 );
191 }
192
193 #[cfg(feature = "macros")]
194 #[test]
195 fn test_define_queue_with_macro() {
196 use crate as oxanus; #[derive(oxanus::Registry)]
199 #[allow(dead_code)]
200 struct ComponentRegistry(oxanus::ComponentRegistry<(), ()>);
201
202 #[derive(Serialize, oxanus::Queue)]
203 struct DefaultQueue;
204
205 assert_eq!(DefaultQueue.key(), "default_queue");
206 assert_eq!(DefaultQueue.config().concurrency, 1);
207
208 #[derive(Serialize, oxanus::Queue)]
209 #[oxanus(key = "static_queue")]
210 struct QueueWithKey;
211
212 assert_eq!(QueueWithKey.key(), "static_queue");
213 assert_eq!(QueueWithKey.config().concurrency, 1);
214
215 #[derive(Serialize, oxanus::Queue)]
216 #[oxanus(concurrency = 2)]
217 struct QueueWithConcurrency;
218
219 assert_eq!(QueueWithConcurrency.key(), "queue_with_concurrency");
220 assert_eq!(QueueWithConcurrency.config().concurrency, 2);
221
222 #[derive(Serialize, oxanus::Queue)]
223 #[oxanus(concurrency = 2)]
224 #[oxanus(throttle(window_ms = 3, limit = 4))]
225 struct QueueWithThrottle;
226
227 assert_eq!(QueueWithThrottle.key(), "queue_with_throttle");
228 assert_eq!(QueueWithThrottle.config().concurrency, 2);
229 assert_eq!(QueueWithThrottle.config().throttle.unwrap().window_ms, 3);
230 assert_eq!(QueueWithThrottle.config().throttle.unwrap().limit, 4);
231
232 #[derive(Serialize, oxanus::Queue)]
233 #[oxanus(key = "static_queue_key")]
234 #[oxanus(concurrency = 2)]
235 struct QueueWithKeyAndConcurrency;
236
237 assert_eq!(QueueWithKeyAndConcurrency.key(), "static_queue_key");
238 assert_eq!(QueueWithKeyAndConcurrency.config().concurrency, 2);
239
240 #[derive(Serialize, oxanus::Queue)]
241 #[oxanus(key = "static_queue_key", concurrency = 3)]
242 struct QueueWithKeyAndConcurrency1 {}
243
244 assert_eq!(QueueWithKeyAndConcurrency1 {}.key(), "static_queue_key");
245 assert_eq!(QueueWithKeyAndConcurrency1 {}.config().concurrency, 3);
246
247 #[derive(Serialize, oxanus::Queue)]
248 #[oxanus(prefix = "dyn_queue", concurrency = 2)]
249 struct DynQueue {
250 i: i32,
251 }
252
253 assert_eq!(DynQueue { i: 2 }.key(), "dyn_queue#i=2");
254 assert_eq!(DynQueue::to_config().concurrency, 2);
255 }
256}