1use std::{collections::HashMap, fmt::{Display, write}, pin::Pin, sync::Arc};
2use std::error::Error as StdError;
3use crate::errors::{AppError, AppErrorType};
4use tracing::error;
5
6
7#[derive(Debug, Clone)]
8pub struct Message {
9 pub body: Arc<[u8]>,
10 pub content_type: Option<String>,
11}
12
13pub type Handler = Arc<
14 dyn Fn(
15 Message,
16 )
17 -> Pin<Box<dyn Future<Output = Result<(), Box<dyn StdError + Send + Sync>>> + Send>>
18 + Send
19 + Sync,
20>;
21pub type RPCHandler = Arc<
22 dyn Fn(
23 Message,
24 )
25 -> Pin<Box<dyn Future<Output = Result<Message, Box<dyn StdError + Send + Sync>>> + Send>>
26 + Send
27 + Sync,
28>;
29
30#[derive(Debug, Clone, Copy, PartialEq, Eq)]
31pub enum Confirmations{
32 Disables,
33 PublisherConfirms,
34 RPCClientPublisherConfirms,
35 RPCServerPublisherConfirms,
36}
37
38#[derive(Debug, Clone, Copy, PartialEq, Eq)]
39pub enum DeliveryMode {
40 Transient = 1,
41 Persistent = 2,
42}
43
44#[derive(Debug, Clone, Copy, PartialEq, Eq)]
45pub enum ExchangeType {
46 Direct,
47 Fanout,
48 Topic,
49}
50
51pub enum PendingCmd {
52 Ack((u64, bool)),
53 Nack((u64, bool)),
54}
55#[derive(Debug, Clone, Copy, PartialEq, Eq)]
56pub enum ContentEncoding {
57 #[cfg(feature = "zstd")]
58 Zstd,
59 #[cfg(feature = "lz4_flex")]
60 Lz4,
61 #[cfg(feature = "flate2")]
62 Zlib,
63 None,
64}
65impl ContentEncoding {
66 pub fn from_str(s: &str) -> Option<ContentEncoding> {
67 match s {
68 #[cfg(feature = "zstd")]
69 "application/zstd" | "application/zstandard" => Some(ContentEncoding::Zstd),
70 #[cfg(feature = "lz4_flex")]
71 "application/lz4" => Some(ContentEncoding::Lz4),
72 #[cfg(feature = "flate2")]
73 "application/x-gzip" | "application/gzip" | "application/zlib" => Some(ContentEncoding::Zlib),
74 "none" => Some(ContentEncoding::None),
75 _ => None,
76 }
77 }
78 pub fn as_str(&self) -> &'static str {
79 match self {
80 #[cfg(feature = "zstd")]
81 ContentEncoding::Zstd => "application/zstd",
82 #[cfg(feature = "lz4_flex")]
83 ContentEncoding::Lz4 => "application/lz4",
84 #[cfg(feature = "flate2")]
85 ContentEncoding::Zlib => "application/x-gzip",
86 ContentEncoding::None => "none",
87 }
88 }
89}
90
91impl Display for ContentEncoding {
92 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
93 write!(f, "{}", self.as_str())
94 }
95}
96
97#[derive(Clone,Debug)]
98pub struct TopicNode<T> {
99 children: HashMap<String, TopicNode<T>>,
100 values: Vec<T>,
101}
102
103impl<T> Default for TopicNode<T> {
104 fn default() -> Self {
105 Self {
106 children: HashMap::new(),
107 values: Vec::new(),
108 }
109 }
110}
111
112#[derive(Clone,Debug, Default)]
113pub struct TopicTrie<T> {
114 root: TopicNode<T>,
115}
116
117impl<T: Clone> TopicTrie<T> {
118 pub fn new() -> Self {
119 Self {
120 root: TopicNode::default(),
121 }
122 }
123
124 pub fn insert(&mut self, pattern: &str, value: T) {
126 let segments: Vec<&str> = if pattern.is_empty() {
127 vec![]
128 } else {
129 pattern.split('.').collect()
130 };
131
132 let mut current = &mut self.root;
133 for segment in segments {
134 current = current.children.entry(segment.to_string()).or_default();
136 }
137 current.values.push(value);
139 }
140
141 pub fn search(&self, routing_key: &str) -> Vec<T> {
143 let mut results = Vec::new();
144 let segments: Vec<&str> = if routing_key.is_empty() {
145 vec![]
146 } else {
147 routing_key.split('.').collect()
148 };
149
150 self.search_node(&self.root, &segments, &mut results);
151 results
152 }
153
154 fn search_node(&self, node: &TopicNode<T>, segments: &[&str], results: &mut Vec<T>) {
156 if segments.is_empty() {
157 results.extend(node.values.iter().cloned());
159
160 if let Some(hash_child) = node.children.get("#") {
164 self.search_node(hash_child, segments, results);
165 }
166 return;
167 }
168
169 let head = segments[0];
170 let tail = &segments[1..];
171
172 if let Some(child) = node.children.get(head) {
174 self.search_node(child, tail, results);
175 }
176
177 if let Some(star_child) = node.children.get("*") {
179 self.search_node(star_child, tail, results);
180 }
181
182 if let Some(hash_child) = node.children.get("#") {
184 for i in 0..=segments.len() {
187 self.search_node(hash_child, &segments[i..], results);
188 }
189 }
190 }
191}
192
193#[cfg(feature = "zstd")]
194fn compress_zstd(data: &[u8]) -> Result<Vec<u8>, std::io::Error> {
195 zstd::encode_all(data, 1)
196}
197
198
199#[cfg(feature = "zstd")]
200fn decompress_zstd(compressed_data: &[u8]) -> Result<Vec<u8>, std::io::Error> {
201 zstd::decode_all(compressed_data)
202}
203
204#[cfg(feature = "lz4_flex")]
205fn compress_lz4(data: &[u8]) -> Vec<u8> {
206 lz4_flex::compress_prepend_size(data)
207}
208#[cfg(feature = "lz4_flex")]
209fn decompress_lz4(compressed_data: &[u8]) -> Result<Vec<u8>, AppError> {
210 Ok(lz4_flex::decompress_size_prepended(compressed_data)?)
211}
212
213#[cfg(feature = "flate2")]
214fn compress_zlib(data: &[u8]) -> Result<Vec<u8>, AppError> {
215 use std::io::Read;
216 let mut encoder = flate2::read::ZlibEncoder::new(data, flate2::Compression::default());
217 let mut compressed = Vec::new();
218
219 match encoder.read_to_end(&mut compressed) {
220 Ok(_) => Ok(compressed),
221 Err(e) => Err(AppError::new(
222 Some(format!("Zlib compression failed: {}", e)),
223 None,
224 AppErrorType::InternalError
225 )),
226 }
227}
228#[cfg(feature = "flate2")]
229fn decompress_zlib(compressed_data: &[u8]) -> Result<Vec<u8>, AppError> {
230 use std::io::Read;
231
232 let mut decoder = flate2::read::ZlibDecoder::new(compressed_data);
233 let mut decompressed = Vec::new();
234
235 match decoder.read_to_end(&mut decompressed) {
236 Ok(_) => Ok(decompressed),
237 Err(e) => Err(AppError::new(
238 Some(format!("Zlib decompression failed: {}", e)),
239 None,
240 AppErrorType::InternalError
241 )),
242 }
243}
244
245
246pub fn decompress(content: Vec<u8>, content_encoding: Option<&str>) -> Result<Vec<u8>, AppError> {
247 if let Some(ct) = content_encoding {
248 match ct {
249 #[cfg(feature = "zstd")]
250 "application/zstd" | "application/zstandard" => {
251 match decompress_zstd(&content[..]) {
252 Ok(decompressed) => {
253 Ok(decompressed)
254 }
255 Err(e) => {
256 error!("Failed to create gzip decoder: {}", e);
257 Err(AppError::new(Some("Failed to create gzip decoder".to_string()), None, AppErrorType::InternalError).into())
258 }
259 }
260 },
261 #[cfg(feature = "lz4_flex")]
262 "application/lz4" => {
263 match decompress_lz4(&content[..]) {
264 Ok(decompressed) => Ok(decompressed),
265 Err(e) => {
266 error!("Failed to decompress LZ4 content: {}", e);
267 Err(AppError::new(Some("Failed to decompress LZ4 content".to_string()), None, AppErrorType::InternalError))
268 }
269 }
270 },
271 #[cfg(feature = "flate2")]
272 "application/x-gzip" | "application/gzip" | "application/zlib" => {
273 match decompress_zlib(&content[..]) {
274 Ok(decompressed) => Ok(decompressed),
275 Err(e) => {
276 error!("Failed to create gzip decoder: {}", e);
277 Err(AppError::new(Some("Failed to create gzip decoder".to_string()), None, AppErrorType::InternalError).into())
278 }
279 }
280 },
281 _ => Err(AppError::new(Some(format!("Unsupported content encoding: {}", ct)), None, AppErrorType::InternalError))
282 }
283 } else {
284 Ok(content)
285 }
286}
287
288pub fn compress(content: impl Into<Vec<u8>>, content_type: ContentEncoding) -> Result<Vec<u8>, AppError> {
289 match content_type {
290 #[cfg(feature = "zstd")]
291 ContentEncoding::Zstd => {
292 match compress_zstd(&content.into()) {
293 Ok(compressed) => Ok(compressed),
294 Err(e) => {
295 error!("Failed to compress with zstd: {}", e);
296 Err(AppError::new(Some("Failed to compress with zstd".to_string()), None, AppErrorType::InternalError))
297 }
298 }
299 },
300 #[cfg(feature = "lz4_flex")]
301 ContentEncoding::Lz4 => Ok(compress_lz4(&content.into())),
302 #[cfg(feature = "flate2")]
303 ContentEncoding::Zlib => Ok(compress_zlib(&mut content.into())?),
304 ContentEncoding::None => Ok(content.into()),
305 }
306}