Skip to main content

wae_queue/
types.rs

1//! 消息队列类型定义
2
3use serde::{Serialize, de::DeserializeOwned};
4use std::time::Duration;
5use wae_types::{WaeError, WaeResult};
6
7/// 消息 ID 类型
8pub type MessageId = String;
9
10/// 队列名称类型
11pub type QueueName = String;
12
13/// 消息元数据
14#[derive(Debug, Clone, Default)]
15pub struct MessageMetadata {
16    /// 消息 ID
17    pub id: Option<MessageId>,
18    /// 关联 ID (用于消息关联)
19    pub correlation_id: Option<String>,
20    /// 回复队列
21    pub reply_to: Option<QueueName>,
22    /// 内容类型
23    pub content_type: Option<String>,
24    /// 时间戳
25    pub timestamp: Option<u64>,
26    /// 优先级 (0-9)
27    pub priority: Option<u8>,
28    /// 过期时间 (毫秒)
29    pub expiration: Option<u64>,
30    /// 自定义头信息
31    pub headers: std::collections::HashMap<String, String>,
32}
33
34/// 原始消息 (字节形式)
35#[derive(Debug, Clone)]
36pub struct RawMessage {
37    /// 消息数据
38    pub data: Vec<u8>,
39    /// 消息元数据
40    pub metadata: MessageMetadata,
41}
42
43impl RawMessage {
44    /// 创建新消息
45    pub fn new(data: Vec<u8>) -> Self {
46        Self { data, metadata: MessageMetadata::default() }
47    }
48
49    /// 设置关联 ID
50    pub fn with_correlation_id(mut self, id: impl Into<String>) -> Self {
51        self.metadata.correlation_id = Some(id.into());
52        self
53    }
54
55    /// 设置回复队列
56    pub fn with_reply_to(mut self, queue: impl Into<String>) -> Self {
57        self.metadata.reply_to = Some(queue.into());
58        self
59    }
60
61    /// 设置优先级
62    pub fn with_priority(mut self, priority: u8) -> Self {
63        self.metadata.priority = Some(priority.min(9));
64        self
65    }
66
67    /// 设置过期时间
68    pub fn with_expiration(mut self, ms: u64) -> Self {
69        self.metadata.expiration = Some(ms);
70        self
71    }
72
73    /// 添加自定义头
74    pub fn with_header(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
75        self.metadata.headers.insert(key.into(), value.into());
76        self
77    }
78}
79
80/// 消息封装 (泛型)
81#[derive(Debug, Clone)]
82pub struct Message<T> {
83    /// 消息体
84    pub payload: T,
85    /// 消息元数据
86    pub metadata: MessageMetadata,
87}
88
89impl<T> Message<T> {
90    /// 创建新消息
91    pub fn new(payload: T) -> Self {
92        Self { payload, metadata: MessageMetadata::default() }
93    }
94
95    /// 设置关联 ID
96    pub fn with_correlation_id(mut self, id: impl Into<String>) -> Self {
97        self.metadata.correlation_id = Some(id.into());
98        self
99    }
100
101    /// 设置回复队列
102    pub fn with_reply_to(mut self, queue: impl Into<String>) -> Self {
103        self.metadata.reply_to = Some(queue.into());
104        self
105    }
106
107    /// 设置优先级
108    pub fn with_priority(mut self, priority: u8) -> Self {
109        self.metadata.priority = Some(priority.min(9));
110        self
111    }
112
113    /// 设置过期时间
114    pub fn with_expiration(mut self, ms: u64) -> Self {
115        self.metadata.expiration = Some(ms);
116        self
117    }
118
119    /// 添加自定义头
120    pub fn with_header(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
121        self.metadata.headers.insert(key.into(), value.into());
122        self
123    }
124
125    /// 序列化为原始消息
126    pub fn into_raw(self) -> WaeResult<RawMessage>
127    where
128        T: Serialize,
129    {
130        let data = serde_json::to_vec(&self.payload).map_err(|_e| WaeError::serialization_failed("Message"))?;
131        Ok(RawMessage { data, metadata: self.metadata })
132    }
133
134    /// 序列化为原始消息 (引用版本)
135    pub fn to_raw(&self) -> WaeResult<RawMessage>
136    where
137        T: Serialize,
138    {
139        let data = serde_json::to_vec(&self.payload).map_err(|_e| WaeError::serialization_failed("Message"))?;
140        Ok(RawMessage { data, metadata: self.metadata.clone() })
141    }
142}
143
144impl RawMessage {
145    /// 反序列化为泛型消息
146    pub fn into_typed<T: DeserializeOwned>(self) -> WaeResult<Message<T>> {
147        let payload = serde_json::from_slice(&self.data).map_err(|_e| WaeError::deserialization_failed("Message"))?;
148        Ok(Message { payload, metadata: self.metadata })
149    }
150}
151
152/// 接收到的原始消息 (带确认能力)
153#[derive(Debug)]
154pub struct ReceivedRawMessage {
155    /// 消息内容
156    pub message: RawMessage,
157    /// 消息 ID (用于确认)
158    pub delivery_tag: u64,
159    /// 重投递次数
160    pub redelivery_count: u32,
161}
162
163/// 接收到的消息 (泛型)
164#[derive(Debug)]
165pub struct ReceivedMessage<T> {
166    /// 消息内容
167    pub message: Message<T>,
168    /// 消息 ID (用于确认)
169    pub delivery_tag: u64,
170    /// 重投递次数
171    pub redelivery_count: u32,
172}
173
174/// 队列配置
175#[derive(Debug, Clone)]
176pub struct QueueConfig {
177    /// 队列名称
178    pub name: QueueName,
179    /// 是否持久化
180    pub durable: bool,
181    /// 是否自动删除 (当没有消费者时)
182    pub auto_delete: bool,
183    /// 最大消息数
184    pub max_messages: Option<u64>,
185    /// 最大消息大小 (字节)
186    pub max_message_size: Option<u64>,
187    /// 消息存活时间 (毫秒)
188    pub message_ttl: Option<u64>,
189    /// 死信队列
190    pub dead_letter_queue: Option<QueueName>,
191}
192
193impl QueueConfig {
194    /// 创建新的队列配置
195    pub fn new(name: impl Into<String>) -> Self {
196        Self {
197            name: name.into(),
198            durable: true,
199            auto_delete: false,
200            max_messages: None,
201            max_message_size: None,
202            message_ttl: None,
203            dead_letter_queue: None,
204        }
205    }
206
207    /// 设置持久化
208    pub fn durable(mut self, durable: bool) -> Self {
209        self.durable = durable;
210        self
211    }
212
213    /// 设置自动删除
214    pub fn auto_delete(mut self, auto_delete: bool) -> Self {
215        self.auto_delete = auto_delete;
216        self
217    }
218
219    /// 设置最大消息数
220    pub fn max_messages(mut self, max: u64) -> Self {
221        self.max_messages = Some(max);
222        self
223    }
224
225    /// 设置消息 TTL
226    pub fn message_ttl(mut self, ttl_ms: u64) -> Self {
227        self.message_ttl = Some(ttl_ms);
228        self
229    }
230
231    /// 设置死信队列
232    pub fn dead_letter_queue(mut self, queue: impl Into<String>) -> Self {
233        self.dead_letter_queue = Some(queue.into());
234        self
235    }
236}
237
238/// 生产者配置
239#[derive(Debug, Clone)]
240pub struct ProducerConfig {
241    /// 默认队列
242    pub default_queue: Option<QueueName>,
243    /// 消息确认超时
244    pub confirm_timeout: Duration,
245    /// 重试次数
246    pub retry_count: u32,
247    /// 重试间隔
248    pub retry_interval: Duration,
249}
250
251impl Default for ProducerConfig {
252    fn default() -> Self {
253        Self {
254            default_queue: None,
255            confirm_timeout: Duration::from_secs(5),
256            retry_count: 3,
257            retry_interval: Duration::from_millis(100),
258        }
259    }
260}
261
262/// 消费者配置
263#[derive(Debug, Clone)]
264pub struct ConsumerConfig {
265    /// 队列名称
266    pub queue: QueueName,
267    /// 消费者标签
268    pub consumer_tag: Option<String>,
269    /// 是否自动确认
270    pub auto_ack: bool,
271    /// 预取数量
272    pub prefetch_count: u16,
273    /// 是否独占
274    pub exclusive: bool,
275}
276
277impl ConsumerConfig {
278    /// 创建新的消费者配置
279    pub fn new(queue: impl Into<String>) -> Self {
280        Self { queue: queue.into(), consumer_tag: None, auto_ack: false, prefetch_count: 10, exclusive: false }
281    }
282
283    /// 设置自动确认
284    pub fn auto_ack(mut self, auto_ack: bool) -> Self {
285        self.auto_ack = auto_ack;
286        self
287    }
288
289    /// 设置预取数量
290    pub fn prefetch(mut self, count: u16) -> Self {
291        self.prefetch_count = count;
292        self
293    }
294}