Skip to main content

amqp_client_rust/api/
utils.rs

1use std::{collections::HashMap, fmt::{Display, write}, pin::Pin, sync::Arc};
2use std::error::Error as StdError;
3use crate::errors::{AppError, AppErrorType};
4use tracing::error;
5
6
7#[derive(Debug, Clone)]
8pub struct Message {
9    pub body: Arc<[u8]>,
10    pub content_type: Option<String>,
11}
12
13pub type Handler = Arc<
14    dyn Fn(
15            Message,
16        )
17            -> Pin<Box<dyn Future<Output = Result<(), Box<dyn StdError + Send + Sync>>> + Send>>
18        + Send
19        + Sync,
20>;
21pub type RPCHandler = Arc<
22    dyn Fn(
23            Message,
24        )
25            -> Pin<Box<dyn Future<Output = Result<Message, Box<dyn StdError + Send + Sync>>> + Send>>
26        + Send
27        + Sync,
28>;
29
30#[derive(Debug, Clone, Copy, PartialEq, Eq)]
31pub enum Confirmations{
32    Disables,
33    PublisherConfirms,
34    RPCClientPublisherConfirms,
35    RPCServerPublisherConfirms,
36}
37
38#[derive(Debug, Clone, Copy, PartialEq, Eq)]
39pub enum DeliveryMode {
40    Transient = 1,
41    Persistent = 2,
42}
43
44#[derive(Debug, Clone, Copy, PartialEq, Eq)]
45pub enum ExchangeType {
46    Direct,
47    Fanout,
48    Topic,
49}
50
51pub enum PendingCmd {
52    Ack((u64, bool)),
53    Nack((u64, bool)),
54}
55#[derive(Debug, Clone, Copy, PartialEq, Eq)]
56pub enum ContentEncoding {
57    #[cfg(feature = "zstd")]
58    Zstd,
59    #[cfg(feature = "lz4_flex")]
60    Lz4,
61    #[cfg(feature = "flate2")]
62    Zlib,
63    None,
64}
65impl ContentEncoding {
66    pub fn from_str(s: &str) -> Option<ContentEncoding> {
67        match s {
68            #[cfg(feature = "zstd")]
69            "application/zstd" | "application/zstandard" => Some(ContentEncoding::Zstd),
70            #[cfg(feature = "lz4_flex")]
71            "application/lz4" => Some(ContentEncoding::Lz4),
72            #[cfg(feature = "flate2")]
73            "application/x-gzip" | "application/gzip" | "application/zlib" => Some(ContentEncoding::Zlib),
74            "none" => Some(ContentEncoding::None),
75            _ => None,
76        }
77    }
78    pub fn as_str(&self) -> &'static str {
79        match self {
80            #[cfg(feature = "zstd")]
81            ContentEncoding::Zstd => "application/zstd",
82            #[cfg(feature = "lz4_flex")]
83            ContentEncoding::Lz4 => "application/lz4",
84            #[cfg(feature = "flate2")]
85            ContentEncoding::Zlib => "application/x-gzip",
86            ContentEncoding::None => "none",
87        }
88    }
89}
90
91impl Display for ContentEncoding {
92    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
93        write!(f, "{}", self.as_str())
94    }
95}
96
97#[derive(Clone,Debug)]
98pub struct TopicNode<T> {
99    children: HashMap<String, TopicNode<T>>,
100    values: Vec<T>,
101}
102
103impl<T> Default for TopicNode<T> {
104    fn default() -> Self {
105        Self {
106            children: HashMap::new(),
107            values: Vec::new(),
108        }
109    }
110}
111
112#[derive(Clone,Debug, Default)]
113pub struct TopicTrie<T> {
114    root: TopicNode<T>,
115}
116
117impl<T: Clone> TopicTrie<T> {
118    pub fn new() -> Self {
119        Self {
120            root: TopicNode::default(),
121        }
122    }
123
124    /// Inserts a new subscription pattern (binding key) and its associated handler.
125    pub fn insert(&mut self, pattern: &str, value: T) {
126        let segments: Vec<&str> = if pattern.is_empty() {
127            vec![]
128        } else {
129            pattern.split('.').collect()
130        };
131
132        let mut current = &mut self.root;
133        for segment in segments {
134            // Move to the child node, creating it if it doesn't exist
135            current = current.children.entry(segment.to_string()).or_default();
136        }
137        // Add the handler at the terminal node
138        current.values.push(value);
139    }
140
141    /// Searches for all handlers that match the incoming message's routing key.
142    pub fn search(&self, routing_key: &str) -> Vec<T> {
143        let mut results = Vec::new();
144        let segments: Vec<&str> = if routing_key.is_empty() {
145            vec![]
146        } else {
147            routing_key.split('.').collect()
148        };
149        
150        self.search_node(&self.root, &segments, &mut results);
151        results
152    }
153
154    /// Recursive search to handle branches created by '*' and '#'
155    fn search_node(&self, node: &TopicNode<T>, segments: &[&str], results: &mut Vec<T>) {
156        if segments.is_empty() {
157            // 1. If we've exhausted the routing key, any values at this node are a match.
158            results.extend(node.values.iter().cloned());
159
160            // 2. Edge Case: A '#' can match ZERO segments. 
161            // If we are out of segments, but the pattern ends in '#', it still matches.
162            // Example: Pattern "stock.#" matches routing key "stock"
163            if let Some(hash_child) = node.children.get("#") {
164                self.search_node(hash_child, segments, results);
165            }
166            return;
167        }
168
169        let head = segments[0];
170        let tail = &segments[1..];
171
172        // Path A: Exact Match
173        if let Some(child) = node.children.get(head) {
174            self.search_node(child, tail, results);
175        }
176
177        // Path B: Star '*' Match (substitutes exactly one word)
178        if let Some(star_child) = node.children.get("*") {
179            self.search_node(star_child, tail, results);
180        }
181
182        // Path C: Hash '#' Match (substitutes zero or more words)
183        if let Some(hash_child) = node.children.get("#") {
184            // Because '#' can consume any number of words, we branch out and test 
185            // consuming 0 segments, 1 segment, 2 segments... all the way to the end.
186            for i in 0..=segments.len() {
187                self.search_node(hash_child, &segments[i..], results);
188            }
189        }
190    }
191}
192
193#[cfg(feature = "zstd")]
194fn compress_zstd(data: &[u8]) -> Result<Vec<u8>, std::io::Error> {
195    zstd::encode_all(data, 1) 
196}
197
198
199#[cfg(feature = "zstd")]
200fn decompress_zstd(compressed_data: &[u8]) -> Result<Vec<u8>, std::io::Error> {
201    zstd::decode_all(compressed_data)
202}
203
204#[cfg(feature = "lz4_flex")]
205fn compress_lz4(data: &[u8]) -> Vec<u8> {
206    lz4_flex::compress_prepend_size(data)
207}
208#[cfg(feature = "lz4_flex")]
209fn decompress_lz4(compressed_data: &[u8]) -> Result<Vec<u8>, AppError> {
210    Ok(lz4_flex::decompress_size_prepended(compressed_data)?)
211}
212
213#[cfg(feature = "flate2")]
214fn compress_zlib(data: &[u8]) -> Result<Vec<u8>, AppError> {
215    use std::io::Read;
216    let mut encoder = flate2::read::ZlibEncoder::new(data, flate2::Compression::default());
217    let mut compressed = Vec::new();
218    
219    match encoder.read_to_end(&mut compressed) {
220        Ok(_) => Ok(compressed),
221        Err(e) => Err(AppError::new(
222            Some(format!("Zlib compression failed: {}", e)), 
223            None, 
224            AppErrorType::InternalError
225        )),
226    }
227}
228#[cfg(feature = "flate2")]
229fn decompress_zlib(compressed_data: &[u8]) -> Result<Vec<u8>, AppError> {
230    use std::io::Read;
231
232    let mut decoder = flate2::read::ZlibDecoder::new(compressed_data);
233    let mut decompressed = Vec::new();
234    
235    match decoder.read_to_end(&mut decompressed) {
236        Ok(_) => Ok(decompressed),
237        Err(e) => Err(AppError::new(
238            Some(format!("Zlib decompression failed: {}", e)), 
239            None, 
240            AppErrorType::InternalError
241        )),
242    }
243}
244
245
246pub fn decompress(content: Vec<u8>, content_encoding: Option<&str>) -> Result<Vec<u8>, AppError> {
247    if let Some(ct) = content_encoding {
248        match ct {
249            #[cfg(feature = "zstd")]
250            "application/zstd" | "application/zstandard" => {
251                match decompress_zstd(&content[..]) {
252                    Ok(decompressed) => {
253                        Ok(decompressed)
254                    }
255                    Err(e) => {
256                        error!("Failed to create gzip decoder: {}", e);
257                        Err(AppError::new(Some("Failed to create gzip decoder".to_string()), None, AppErrorType::InternalError).into())
258                    }
259                }
260            },
261            #[cfg(feature = "lz4_flex")]
262            "application/lz4" => {
263                match decompress_lz4(&content[..]) {
264                    Ok(decompressed) => Ok(decompressed),
265                    Err(e) => {
266                        error!("Failed to decompress LZ4 content: {}", e);
267                        Err(AppError::new(Some("Failed to decompress LZ4 content".to_string()), None, AppErrorType::InternalError))
268                    }
269                }
270            },
271            #[cfg(feature = "flate2")]
272            "application/x-gzip" | "application/gzip" | "application/zlib" => {
273                match decompress_zlib(&content[..]) {
274                    Ok(decompressed) => Ok(decompressed),
275                    Err(e) => {
276                        error!("Failed to create gzip decoder: {}", e);
277                        Err(AppError::new(Some("Failed to create gzip decoder".to_string()), None, AppErrorType::InternalError).into())
278                    }
279                }
280            },
281            _ => Err(AppError::new(Some(format!("Unsupported content encoding: {}", ct)), None, AppErrorType::InternalError))
282        }
283    } else {
284        Ok(content)
285    }
286}
287
288pub fn compress(content: impl Into<Vec<u8>>, content_type: ContentEncoding) -> Result<Vec<u8>, AppError> {
289    match content_type {
290        #[cfg(feature = "zstd")]
291        ContentEncoding::Zstd => {
292            match compress_zstd(&content.into()) {
293                Ok(compressed) => Ok(compressed),
294                Err(e) => {
295                    error!("Failed to compress with zstd: {}", e);
296                    Err(AppError::new(Some("Failed to compress with zstd".to_string()), None, AppErrorType::InternalError))
297                }
298            }
299        },
300        #[cfg(feature = "lz4_flex")]
301        ContentEncoding::Lz4 => Ok(compress_lz4(&content.into())),
302        #[cfg(feature = "flate2")]
303        ContentEncoding::Zlib => Ok(compress_zlib(&mut content.into())?),
304        ContentEncoding::None => Ok(content.into()),
305    }
306}