1use crate::error::ValidationError;
4use crate::provider::ProviderType;
5use bytes::Bytes;
6use chrono::{DateTime, Duration, Utc};
7use serde::{Deserialize, Serialize};
8use std::collections::HashMap;
9use std::str::FromStr;
10
11#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
17pub struct QueueName(String);
18
19impl QueueName {
20 pub fn new(name: String) -> Result<Self, ValidationError> {
22 if name.is_empty() || name.len() > 260 {
24 return Err(ValidationError::OutOfRange {
25 field: "queue_name".to_string(),
26 message: "must be 1-260 characters".to_string(),
27 });
28 }
29
30 if !name
32 .chars()
33 .all(|c| c.is_ascii_alphanumeric() || c == '-' || c == '_')
34 {
35 return Err(ValidationError::InvalidFormat {
36 field: "queue_name".to_string(),
37 message: "only ASCII alphanumeric, hyphens, and underscores allowed".to_string(),
38 });
39 }
40
41 if name.starts_with('-') || name.ends_with('-') || name.contains("--") {
43 return Err(ValidationError::InvalidFormat {
44 field: "queue_name".to_string(),
45 message: "no leading/trailing hyphens or consecutive hyphens".to_string(),
46 });
47 }
48
49 Ok(Self(name))
50 }
51
52 pub fn with_prefix(prefix: &str, base_name: &str) -> Result<Self, ValidationError> {
54 let full_name = format!("{}-{}", prefix, base_name);
55 Self::new(full_name)
56 }
57
58 pub fn as_str(&self) -> &str {
60 &self.0
61 }
62}
63
64impl std::fmt::Display for QueueName {
65 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
66 write!(f, "{}", self.0)
67 }
68}
69
70impl FromStr for QueueName {
71 type Err = ValidationError;
72
73 fn from_str(s: &str) -> Result<Self, Self::Err> {
74 Self::new(s.to_string())
75 }
76}
77
78#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
80pub struct MessageId(String);
81
82impl MessageId {
83 pub fn new() -> Self {
85 let id = uuid::Uuid::new_v4();
86 Self(id.to_string())
87 }
88
89 pub fn as_str(&self) -> &str {
91 &self.0
92 }
93}
94
95impl Default for MessageId {
96 fn default() -> Self {
97 Self::new()
98 }
99}
100
101impl std::fmt::Display for MessageId {
102 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
103 write!(f, "{}", self.0)
104 }
105}
106
107impl FromStr for MessageId {
108 type Err = ValidationError;
109
110 fn from_str(s: &str) -> Result<Self, Self::Err> {
111 if s.is_empty() {
112 return Err(ValidationError::Required {
113 field: "message_id".to_string(),
114 });
115 }
116
117 Ok(Self(s.to_string()))
118 }
119}
120
121#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
123pub struct SessionId(String);
124
125impl SessionId {
126 pub fn new(id: String) -> Result<Self, ValidationError> {
128 if id.is_empty() {
129 return Err(ValidationError::Required {
130 field: "session_id".to_string(),
131 });
132 }
133
134 if id.len() > 128 {
135 return Err(ValidationError::OutOfRange {
136 field: "session_id".to_string(),
137 message: "maximum 128 characters".to_string(),
138 });
139 }
140
141 if !id.chars().all(|c| c.is_ascii() && !c.is_ascii_control()) {
143 return Err(ValidationError::InvalidFormat {
144 field: "session_id".to_string(),
145 message: "only ASCII printable characters allowed".to_string(),
146 });
147 }
148
149 Ok(Self(id))
150 }
151
152 pub fn from_parts(owner: &str, repo: &str, entity_type: &str, entity_id: &str) -> Self {
154 let id = format!("{}/{}/{}/{}", owner, repo, entity_type, entity_id);
155 Self(id)
157 }
158
159 pub fn as_str(&self) -> &str {
161 &self.0
162 }
163}
164
165impl std::fmt::Display for SessionId {
166 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
167 write!(f, "{}", self.0)
168 }
169}
170
171impl FromStr for SessionId {
172 type Err = ValidationError;
173
174 fn from_str(s: &str) -> Result<Self, Self::Err> {
175 Self::new(s.to_string())
176 }
177}
178
179#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
181pub struct Timestamp(DateTime<Utc>);
182
183impl Timestamp {
184 pub fn now() -> Self {
186 Self(Utc::now())
187 }
188
189 pub fn from_datetime(dt: DateTime<Utc>) -> Self {
191 Self(dt)
192 }
193
194 pub fn as_datetime(&self) -> DateTime<Utc> {
196 self.0
197 }
198}
199
200impl std::fmt::Display for Timestamp {
201 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
202 write!(f, "{}", self.0.format("%Y-%m-%d %H:%M:%S UTC"))
203 }
204}
205
206impl FromStr for Timestamp {
207 type Err = chrono::ParseError;
208
209 fn from_str(s: &str) -> Result<Self, Self::Err> {
210 let dt = s.parse::<DateTime<Utc>>()?;
211 Ok(Self::from_datetime(dt))
212 }
213}
214
215#[derive(Debug, Clone, Serialize, Deserialize)]
221pub struct Message {
222 #[serde(with = "bytes_serde")]
223 pub body: Bytes,
224 pub attributes: HashMap<String, String>,
225 pub session_id: Option<SessionId>,
226 pub correlation_id: Option<String>,
227 pub time_to_live: Option<Duration>,
228}
229
230mod bytes_serde {
232 use base64::{engine::general_purpose, Engine as _};
233 use bytes::Bytes;
234 use serde::{Deserialize, Deserializer, Serialize, Serializer};
235
236 pub fn serialize<S>(bytes: &Bytes, serializer: S) -> Result<S::Ok, S::Error>
237 where
238 S: Serializer,
239 {
240 let encoded = general_purpose::STANDARD.encode(bytes);
241 encoded.serialize(serializer)
242 }
243
244 pub fn deserialize<'de, D>(deserializer: D) -> Result<Bytes, D::Error>
245 where
246 D: Deserializer<'de>,
247 {
248 let encoded = String::deserialize(deserializer)?;
249 let decoded = general_purpose::STANDARD
250 .decode(encoded)
251 .map_err(serde::de::Error::custom)?;
252 Ok(Bytes::from(decoded))
253 }
254}
255
256impl Message {
257 pub fn new(body: Bytes) -> Self {
259 Self {
260 body,
261 attributes: HashMap::new(),
262 session_id: None,
263 correlation_id: None,
264 time_to_live: None,
265 }
266 }
267
268 pub fn with_session_id(mut self, session_id: SessionId) -> Self {
270 self.session_id = Some(session_id);
271 self
272 }
273
274 pub fn with_attribute(mut self, key: String, value: String) -> Self {
276 self.attributes.insert(key, value);
277 self
278 }
279
280 pub fn with_correlation_id(mut self, correlation_id: String) -> Self {
282 self.correlation_id = Some(correlation_id);
283 self
284 }
285
286 pub fn with_ttl(mut self, ttl: Duration) -> Self {
288 self.time_to_live = Some(ttl);
289 self
290 }
291}
292
293#[derive(Debug, Clone)]
295pub struct ReceivedMessage {
296 pub message_id: MessageId,
297 pub body: Bytes,
298 pub attributes: HashMap<String, String>,
299 pub session_id: Option<SessionId>,
300 pub correlation_id: Option<String>,
301 pub receipt_handle: ReceiptHandle,
302 pub delivery_count: u32,
303 pub first_delivered_at: Timestamp,
304 pub delivered_at: Timestamp,
305}
306
307impl ReceivedMessage {
308 pub fn message(&self) -> Message {
310 Message {
311 body: self.body.clone(),
312 attributes: self.attributes.clone(),
313 session_id: self.session_id.clone(),
314 correlation_id: self.correlation_id.clone(),
315 time_to_live: None, }
317 }
318
319 pub fn has_exceeded_max_delivery_count(&self, max_count: u32) -> bool {
321 self.delivery_count > max_count
322 }
323}
324
325#[derive(Debug, Clone, PartialEq, Eq)]
327pub struct ReceiptHandle {
328 handle: String,
329 expires_at: Timestamp,
330 provider_type: ProviderType,
331}
332
333impl ReceiptHandle {
334 pub fn new(handle: String, expires_at: Timestamp, provider_type: ProviderType) -> Self {
336 Self {
337 handle,
338 expires_at,
339 provider_type,
340 }
341 }
342
343 pub fn handle(&self) -> &str {
345 &self.handle
346 }
347
348 pub fn is_expired(&self) -> bool {
350 Timestamp::now() >= self.expires_at
351 }
352
353 pub fn time_until_expiry(&self) -> Duration {
355 let now = Timestamp::now();
356 if now >= self.expires_at {
357 Duration::zero()
358 } else {
359 self.expires_at.as_datetime() - now.as_datetime()
360 }
361 }
362
363 pub fn provider_type(&self) -> ProviderType {
365 self.provider_type
366 }
367}
368
369#[derive(Debug, Clone, Default)]
375pub struct SendOptions {
376 pub session_id: Option<SessionId>,
378 pub correlation_id: Option<String>,
380 pub scheduled_enqueue_time: Option<Timestamp>,
382 pub time_to_live: Option<Duration>,
384 pub properties: HashMap<String, String>,
386 pub content_type: Option<String>,
388 pub duplicate_detection_id: Option<String>,
390}
391
392impl SendOptions {
393 pub fn new() -> Self {
395 Self::default()
396 }
397
398 pub fn with_session_id(mut self, session_id: SessionId) -> Self {
400 self.session_id = Some(session_id);
401 self
402 }
403
404 pub fn with_correlation_id(mut self, correlation_id: String) -> Self {
406 self.correlation_id = Some(correlation_id);
407 self
408 }
409
410 pub fn with_scheduled_enqueue_time(mut self, time: Timestamp) -> Self {
412 self.scheduled_enqueue_time = Some(time);
413 self
414 }
415
416 pub fn with_delay(mut self, delay: Duration) -> Self {
418 let scheduled_time = Timestamp::from_datetime(Utc::now() + delay);
419 self.scheduled_enqueue_time = Some(scheduled_time);
420 self
421 }
422
423 pub fn with_time_to_live(mut self, ttl: Duration) -> Self {
425 self.time_to_live = Some(ttl);
426 self
427 }
428
429 pub fn with_property(mut self, key: String, value: String) -> Self {
431 self.properties.insert(key, value);
432 self
433 }
434
435 pub fn with_content_type(mut self, content_type: String) -> Self {
437 self.content_type = Some(content_type);
438 self
439 }
440
441 pub fn with_duplicate_detection_id(mut self, id: String) -> Self {
443 self.duplicate_detection_id = Some(id);
444 self
445 }
446}
447
448#[derive(Debug, Clone)]
450pub struct ReceiveOptions {
451 pub max_messages: u32,
453 pub timeout: Duration,
455 pub session_id: Option<SessionId>,
457 pub accept_any_session: bool,
459 pub lock_duration: Option<Duration>,
461 pub peek_only: bool,
463 pub from_sequence_number: Option<u64>,
465}
466
467impl Default for ReceiveOptions {
468 fn default() -> Self {
469 Self {
470 max_messages: 1,
471 timeout: Duration::seconds(30),
472 session_id: None,
473 accept_any_session: false,
474 lock_duration: None,
475 peek_only: false,
476 from_sequence_number: None,
477 }
478 }
479}
480
481impl ReceiveOptions {
482 pub fn new() -> Self {
484 Self::default()
485 }
486
487 pub fn with_max_messages(mut self, max: u32) -> Self {
489 self.max_messages = max;
490 self
491 }
492
493 pub fn with_timeout(mut self, timeout: Duration) -> Self {
495 self.timeout = timeout;
496 self
497 }
498
499 pub fn with_session_id(mut self, session_id: SessionId) -> Self {
501 self.session_id = Some(session_id);
502 self.accept_any_session = false;
503 self
504 }
505
506 pub fn accept_any_session(mut self) -> Self {
508 self.accept_any_session = true;
509 self.session_id = None;
510 self
511 }
512
513 pub fn with_lock_duration(mut self, duration: Duration) -> Self {
515 self.lock_duration = Some(duration);
516 self
517 }
518
519 pub fn peek_only(mut self) -> Self {
521 self.peek_only = true;
522 self
523 }
524
525 pub fn from_sequence_number(mut self, sequence: u64) -> Self {
527 self.from_sequence_number = Some(sequence);
528 self
529 }
530}
531
532#[cfg(test)]
533#[path = "message_tests.rs"]
534mod tests;