1use serde::Serialize;
2use std::{
3 hash::{Hash, Hasher},
4 time::Duration,
5};
6
7pub trait Queue: Send + Sync + Serialize {
8 fn key(&self) -> String {
9 match Self::to_config().kind {
10 QueueKind::Static { key } => key,
11 QueueKind::Dynamic { prefix, .. } => {
12 let value = serde_json::to_value(self).unwrap_or_default();
13 format!("{}#{}", prefix, value_to_queue_key(value))
14 }
15 }
16 }
17 fn to_config() -> QueueConfig;
18 fn config(&self) -> QueueConfig {
19 Self::to_config()
20 }
21}
22
23#[derive(Debug, Clone)]
24pub struct QueueConfig {
25 pub kind: QueueKind,
26 pub concurrency: usize,
27 pub throttle: Option<QueueThrottle>,
28}
29
30impl PartialEq for QueueConfig {
31 fn eq(&self, other: &Self) -> bool {
32 self.kind == other.kind
33 }
34}
35
36impl Eq for QueueConfig {}
37
38impl Hash for QueueConfig {
39 fn hash<H: Hasher>(&self, state: &mut H) {
40 self.kind.hash(state);
41 }
42}
43
44impl QueueConfig {
45 pub fn as_dynamic(prefix: impl Into<String>) -> Self {
46 Self {
47 kind: QueueKind::Dynamic {
48 prefix: prefix.into(),
49 sleep_period: Duration::from_millis(500),
50 },
51 concurrency: 1,
52 throttle: None,
53 }
54 }
55
56 pub fn as_static(key: impl Into<String>) -> Self {
57 Self {
58 kind: QueueKind::Static { key: key.into() },
59 concurrency: 1,
60 throttle: None,
61 }
62 }
63
64 pub fn concurrency(mut self, concurrency: usize) -> Self {
65 self.concurrency = concurrency;
66 self
67 }
68
69 pub fn throttle(mut self, throttle: QueueThrottle) -> Self {
70 self.throttle = Some(throttle);
71 self
72 }
73}
74
75#[derive(Debug, Clone)]
76pub enum QueueKind {
77 Static {
78 key: String,
79 },
80 Dynamic {
81 prefix: String,
82 sleep_period: Duration,
83 },
84}
85
86impl PartialEq for QueueKind {
87 fn eq(&self, other: &Self) -> bool {
88 match (self, other) {
89 (QueueKind::Static { key: k1 }, QueueKind::Static { key: k2 }) => k1 == k2,
90 (QueueKind::Dynamic { prefix: p1, .. }, QueueKind::Dynamic { prefix: p2, .. }) => {
91 p1 == p2
92 }
93 _ => false,
94 }
95 }
96}
97
98impl Eq for QueueKind {}
99
100impl Hash for QueueKind {
101 fn hash<H: Hasher>(&self, state: &mut H) {
102 match self {
103 QueueKind::Static { key } => key.hash(state),
104 QueueKind::Dynamic { prefix, .. } => prefix.hash(state),
105 }
106 }
107}
108
109impl QueueKind {
110 pub fn is_dynamic(&self) -> bool {
111 matches!(self, QueueKind::Dynamic { .. })
112 }
113
114 pub fn is_static(&self) -> bool {
115 matches!(self, QueueKind::Static { .. })
116 }
117}
118
119#[derive(Debug, Clone)]
120pub struct QueueThrottle {
121 pub window_ms: i64,
122 pub limit: u64,
123}
124
125fn value_to_queue_key(value: serde_json::Value) -> String {
126 match value {
127 serde_json::Value::Null => "".to_string(),
128 serde_json::Value::String(s) => s,
129 serde_json::Value::Number(n) => n.to_string(),
130 serde_json::Value::Bool(b) => b.to_string(),
131 serde_json::Value::Array(a) => a
132 .into_iter()
133 .map(value_to_queue_key)
134 .collect::<Vec<String>>()
135 .join(":"),
136 serde_json::Value::Object(object) => object
137 .into_iter()
138 .map(|(k, v)| format!("{}={}", k, value_to_queue_key(v)))
139 .collect::<Vec<String>>()
140 .join(":"),
141 }
142}
143
144#[cfg(test)]
145mod tests {
146 use super::*;
147
148 #[derive(Serialize)]
149 struct TestStaticQueue;
150
151 impl Queue for TestStaticQueue {
152 fn to_config() -> QueueConfig {
153 QueueConfig::as_static("test_static_queue")
154 }
155 }
156
157 #[derive(Serialize)]
158 struct TestDynamicQueue {
159 name: String,
160 age: u32,
161 is_student: bool,
162 }
163
164 impl Queue for TestDynamicQueue {
165 fn to_config() -> QueueConfig {
166 QueueConfig::as_dynamic("test_dynamic_queue")
167 }
168 }
169
170 #[test]
171 fn test_queue_key() {
172 let static_queue = TestStaticQueue;
173 let dynamic_queue = TestDynamicQueue {
174 name: "John".to_string(),
175 age: 30,
176 is_student: true,
177 };
178
179 assert_eq!(static_queue.key(), "test_static_queue");
180 assert_eq!(
181 dynamic_queue.key(),
182 "test_dynamic_queue#name=John:age=30:is_student=true"
183 );
184 }
185}