Skip to main content

queue_runtime/
message.rs

1//! Message types for queue operations including core domain identifiers.
2
3use crate::error::ValidationError;
4use crate::provider::ProviderType;
5use bytes::Bytes;
6use chrono::{DateTime, Duration, Utc};
7use serde::{Deserialize, Serialize};
8use std::collections::HashMap;
9use std::str::FromStr;
10
11// ============================================================================
12// Core Domain Identifiers
13// ============================================================================
14
15/// Validated queue name with length and character restrictions
16#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
17pub struct QueueName(String);
18
19impl QueueName {
20    /// Create new queue name with validation
21    pub fn new(name: String) -> Result<Self, ValidationError> {
22        // Validate length
23        if name.is_empty() || name.len() > 260 {
24            return Err(ValidationError::OutOfRange {
25                field: "queue_name".to_string(),
26                message: "must be 1-260 characters".to_string(),
27            });
28        }
29
30        // Validate characters (ASCII alphanumeric, hyphens, underscores)
31        if !name
32            .chars()
33            .all(|c| c.is_ascii_alphanumeric() || c == '-' || c == '_')
34        {
35            return Err(ValidationError::InvalidFormat {
36                field: "queue_name".to_string(),
37                message: "only ASCII alphanumeric, hyphens, and underscores allowed".to_string(),
38            });
39        }
40
41        // Validate no consecutive hyphens or leading/trailing hyphens
42        if name.starts_with('-') || name.ends_with('-') || name.contains("--") {
43            return Err(ValidationError::InvalidFormat {
44                field: "queue_name".to_string(),
45                message: "no leading/trailing hyphens or consecutive hyphens".to_string(),
46            });
47        }
48
49        Ok(Self(name))
50    }
51
52    /// Create queue name with prefix
53    pub fn with_prefix(prefix: &str, base_name: &str) -> Result<Self, ValidationError> {
54        let full_name = format!("{}-{}", prefix, base_name);
55        Self::new(full_name)
56    }
57
58    /// Get queue name as string
59    pub fn as_str(&self) -> &str {
60        &self.0
61    }
62}
63
64impl std::fmt::Display for QueueName {
65    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
66        write!(f, "{}", self.0)
67    }
68}
69
70impl FromStr for QueueName {
71    type Err = ValidationError;
72
73    fn from_str(s: &str) -> Result<Self, Self::Err> {
74        Self::new(s.to_string())
75    }
76}
77
78/// Unique identifier for messages within the queue system
79#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
80pub struct MessageId(String);
81
82impl MessageId {
83    /// Generate new random message ID
84    pub fn new() -> Self {
85        let id = uuid::Uuid::new_v4();
86        Self(id.to_string())
87    }
88
89    /// Get message ID as string
90    pub fn as_str(&self) -> &str {
91        &self.0
92    }
93}
94
95impl Default for MessageId {
96    fn default() -> Self {
97        Self::new()
98    }
99}
100
101impl std::fmt::Display for MessageId {
102    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
103        write!(f, "{}", self.0)
104    }
105}
106
107impl FromStr for MessageId {
108    type Err = ValidationError;
109
110    fn from_str(s: &str) -> Result<Self, Self::Err> {
111        if s.is_empty() {
112            return Err(ValidationError::Required {
113                field: "message_id".to_string(),
114            });
115        }
116
117        Ok(Self(s.to_string()))
118    }
119}
120
121/// Identifier for grouping related messages for ordered processing
122#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
123pub struct SessionId(String);
124
125impl SessionId {
126    /// Create new session ID with validation
127    pub fn new(id: String) -> Result<Self, ValidationError> {
128        if id.is_empty() {
129            return Err(ValidationError::Required {
130                field: "session_id".to_string(),
131            });
132        }
133
134        if id.len() > 128 {
135            return Err(ValidationError::OutOfRange {
136                field: "session_id".to_string(),
137                message: "maximum 128 characters".to_string(),
138            });
139        }
140
141        // Validate ASCII printable characters only
142        if !id.chars().all(|c| c.is_ascii() && !c.is_ascii_control()) {
143            return Err(ValidationError::InvalidFormat {
144                field: "session_id".to_string(),
145                message: "only ASCII printable characters allowed".to_string(),
146            });
147        }
148
149        Ok(Self(id))
150    }
151
152    /// Create session ID from parts (for GitHub events)
153    pub fn from_parts(owner: &str, repo: &str, entity_type: &str, entity_id: &str) -> Self {
154        let id = format!("{}/{}/{}/{}", owner, repo, entity_type, entity_id);
155        // Use unchecked creation since we control the format
156        Self(id)
157    }
158
159    /// Get session ID as string
160    pub fn as_str(&self) -> &str {
161        &self.0
162    }
163}
164
165impl std::fmt::Display for SessionId {
166    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
167        write!(f, "{}", self.0)
168    }
169}
170
171impl FromStr for SessionId {
172    type Err = ValidationError;
173
174    fn from_str(s: &str) -> Result<Self, Self::Err> {
175        Self::new(s.to_string())
176    }
177}
178
179/// Timestamp wrapper for consistent time handling
180#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
181pub struct Timestamp(DateTime<Utc>);
182
183impl Timestamp {
184    /// Create timestamp for current time
185    pub fn now() -> Self {
186        Self(Utc::now())
187    }
188
189    /// Create timestamp from DateTime
190    pub fn from_datetime(dt: DateTime<Utc>) -> Self {
191        Self(dt)
192    }
193
194    /// Get underlying DateTime
195    pub fn as_datetime(&self) -> DateTime<Utc> {
196        self.0
197    }
198}
199
200impl std::fmt::Display for Timestamp {
201    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
202        write!(f, "{}", self.0.format("%Y-%m-%d %H:%M:%S UTC"))
203    }
204}
205
206impl FromStr for Timestamp {
207    type Err = chrono::ParseError;
208
209    fn from_str(s: &str) -> Result<Self, Self::Err> {
210        let dt = s.parse::<DateTime<Utc>>()?;
211        Ok(Self::from_datetime(dt))
212    }
213}
214
215// ============================================================================
216// Message Types
217// ============================================================================
218
219/// A message to be sent through the queue system
220#[derive(Debug, Clone, Serialize, Deserialize)]
221pub struct Message {
222    #[serde(with = "bytes_serde")]
223    pub body: Bytes,
224    pub attributes: HashMap<String, String>,
225    pub session_id: Option<SessionId>,
226    pub correlation_id: Option<String>,
227    pub time_to_live: Option<Duration>,
228}
229
230/// Custom serialization for Bytes
231mod bytes_serde {
232    use base64::{engine::general_purpose, Engine as _};
233    use bytes::Bytes;
234    use serde::{Deserialize, Deserializer, Serialize, Serializer};
235
236    pub fn serialize<S>(bytes: &Bytes, serializer: S) -> Result<S::Ok, S::Error>
237    where
238        S: Serializer,
239    {
240        let encoded = general_purpose::STANDARD.encode(bytes);
241        encoded.serialize(serializer)
242    }
243
244    pub fn deserialize<'de, D>(deserializer: D) -> Result<Bytes, D::Error>
245    where
246        D: Deserializer<'de>,
247    {
248        let encoded = String::deserialize(deserializer)?;
249        let decoded = general_purpose::STANDARD
250            .decode(encoded)
251            .map_err(serde::de::Error::custom)?;
252        Ok(Bytes::from(decoded))
253    }
254}
255
256impl Message {
257    /// Create new message with body
258    pub fn new(body: Bytes) -> Self {
259        Self {
260            body,
261            attributes: HashMap::new(),
262            session_id: None,
263            correlation_id: None,
264            time_to_live: None,
265        }
266    }
267
268    /// Add session ID for ordered processing
269    pub fn with_session_id(mut self, session_id: SessionId) -> Self {
270        self.session_id = Some(session_id);
271        self
272    }
273
274    /// Add message attribute
275    pub fn with_attribute(mut self, key: String, value: String) -> Self {
276        self.attributes.insert(key, value);
277        self
278    }
279
280    /// Add correlation ID for tracking
281    pub fn with_correlation_id(mut self, correlation_id: String) -> Self {
282        self.correlation_id = Some(correlation_id);
283        self
284    }
285
286    /// Add time-to-live for message expiration
287    pub fn with_ttl(mut self, ttl: Duration) -> Self {
288        self.time_to_live = Some(ttl);
289        self
290    }
291}
292
293/// A message received from the queue with processing metadata
294#[derive(Debug, Clone)]
295pub struct ReceivedMessage {
296    pub message_id: MessageId,
297    pub body: Bytes,
298    pub attributes: HashMap<String, String>,
299    pub session_id: Option<SessionId>,
300    pub correlation_id: Option<String>,
301    pub receipt_handle: ReceiptHandle,
302    pub delivery_count: u32,
303    pub first_delivered_at: Timestamp,
304    pub delivered_at: Timestamp,
305}
306
307impl ReceivedMessage {
308    /// Convert back to Message (for forwarding/replaying)
309    pub fn message(&self) -> Message {
310        Message {
311            body: self.body.clone(),
312            attributes: self.attributes.clone(),
313            session_id: self.session_id.clone(),
314            correlation_id: self.correlation_id.clone(),
315            time_to_live: None, // TTL is not preserved in received messages
316        }
317    }
318
319    /// Check if message has exceeded maximum delivery count
320    pub fn has_exceeded_max_delivery_count(&self, max_count: u32) -> bool {
321        self.delivery_count > max_count
322    }
323}
324
325/// Opaque token for acknowledging or rejecting received messages
326#[derive(Debug, Clone, PartialEq, Eq)]
327pub struct ReceiptHandle {
328    handle: String,
329    expires_at: Timestamp,
330    provider_type: ProviderType,
331}
332
333impl ReceiptHandle {
334    /// Create new receipt handle
335    pub fn new(handle: String, expires_at: Timestamp, provider_type: ProviderType) -> Self {
336        Self {
337            handle,
338            expires_at,
339            provider_type,
340        }
341    }
342
343    /// Get handle string
344    pub fn handle(&self) -> &str {
345        &self.handle
346    }
347
348    /// Check if receipt handle is expired
349    pub fn is_expired(&self) -> bool {
350        Timestamp::now() >= self.expires_at
351    }
352
353    /// Get time until expiry
354    pub fn time_until_expiry(&self) -> Duration {
355        let now = Timestamp::now();
356        if now >= self.expires_at {
357            Duration::zero()
358        } else {
359            self.expires_at.as_datetime() - now.as_datetime()
360        }
361    }
362
363    /// Get provider type
364    pub fn provider_type(&self) -> ProviderType {
365        self.provider_type
366    }
367}
368
369// ============================================================================
370// Send and Receive Options
371// ============================================================================
372
373/// Configuration options for sending messages to queues
374#[derive(Debug, Clone, Default)]
375pub struct SendOptions {
376    /// Session ID for ordered processing workflows
377    pub session_id: Option<SessionId>,
378    /// Correlation ID for request/response and tracing patterns
379    pub correlation_id: Option<String>,
380    /// Scheduled delivery time for delayed message processing
381    pub scheduled_enqueue_time: Option<Timestamp>,
382    /// Time-to-live for automatic message expiration
383    pub time_to_live: Option<Duration>,
384    /// Custom properties for metadata and routing information
385    pub properties: HashMap<String, String>,
386    /// Content type override for specialized message formats
387    pub content_type: Option<String>,
388    /// Duplicate detection ID for exactly-once delivery guarantees
389    pub duplicate_detection_id: Option<String>,
390}
391
392impl SendOptions {
393    /// Create new send options with defaults
394    pub fn new() -> Self {
395        Self::default()
396    }
397
398    /// Set session ID for ordered processing
399    pub fn with_session_id(mut self, session_id: SessionId) -> Self {
400        self.session_id = Some(session_id);
401        self
402    }
403
404    /// Set correlation ID for tracing
405    pub fn with_correlation_id(mut self, correlation_id: String) -> Self {
406        self.correlation_id = Some(correlation_id);
407        self
408    }
409
410    /// Set scheduled delivery time
411    pub fn with_scheduled_enqueue_time(mut self, time: Timestamp) -> Self {
412        self.scheduled_enqueue_time = Some(time);
413        self
414    }
415
416    /// Set scheduled delivery with a delay from now
417    pub fn with_delay(mut self, delay: Duration) -> Self {
418        let scheduled_time = Timestamp::from_datetime(Utc::now() + delay);
419        self.scheduled_enqueue_time = Some(scheduled_time);
420        self
421    }
422
423    /// Set time-to-live for message expiration
424    pub fn with_time_to_live(mut self, ttl: Duration) -> Self {
425        self.time_to_live = Some(ttl);
426        self
427    }
428
429    /// Add a custom property
430    pub fn with_property(mut self, key: String, value: String) -> Self {
431        self.properties.insert(key, value);
432        self
433    }
434
435    /// Set content type
436    pub fn with_content_type(mut self, content_type: String) -> Self {
437        self.content_type = Some(content_type);
438        self
439    }
440
441    /// Set duplicate detection ID
442    pub fn with_duplicate_detection_id(mut self, id: String) -> Self {
443        self.duplicate_detection_id = Some(id);
444        self
445    }
446}
447
448/// Configuration options for receiving messages from queues
449#[derive(Debug, Clone)]
450pub struct ReceiveOptions {
451    /// Maximum number of messages to receive in a batch
452    pub max_messages: u32,
453    /// Timeout duration for receive operations
454    pub timeout: Duration,
455    /// Session ID for session-specific message consumption
456    pub session_id: Option<SessionId>,
457    /// Whether to accept any available session
458    pub accept_any_session: bool,
459    /// Message lock duration for processing time management
460    pub lock_duration: Option<Duration>,
461    /// Peek-only mode for message inspection without consumption
462    pub peek_only: bool,
463    /// Sequence number for replay and recovery scenarios
464    pub from_sequence_number: Option<u64>,
465}
466
467impl Default for ReceiveOptions {
468    fn default() -> Self {
469        Self {
470            max_messages: 1,
471            timeout: Duration::seconds(30),
472            session_id: None,
473            accept_any_session: false,
474            lock_duration: None,
475            peek_only: false,
476            from_sequence_number: None,
477        }
478    }
479}
480
481impl ReceiveOptions {
482    /// Create new receive options with defaults
483    pub fn new() -> Self {
484        Self::default()
485    }
486
487    /// Set maximum number of messages to receive
488    pub fn with_max_messages(mut self, max: u32) -> Self {
489        self.max_messages = max;
490        self
491    }
492
493    /// Set timeout duration
494    pub fn with_timeout(mut self, timeout: Duration) -> Self {
495        self.timeout = timeout;
496        self
497    }
498
499    /// Set specific session ID to consume from
500    pub fn with_session_id(mut self, session_id: SessionId) -> Self {
501        self.session_id = Some(session_id);
502        self.accept_any_session = false;
503        self
504    }
505
506    /// Accept messages from any available session
507    pub fn accept_any_session(mut self) -> Self {
508        self.accept_any_session = true;
509        self.session_id = None;
510        self
511    }
512
513    /// Set message lock duration
514    pub fn with_lock_duration(mut self, duration: Duration) -> Self {
515        self.lock_duration = Some(duration);
516        self
517    }
518
519    /// Enable peek-only mode (inspect without consuming)
520    pub fn peek_only(mut self) -> Self {
521        self.peek_only = true;
522        self
523    }
524
525    /// Set starting sequence number for replay
526    pub fn from_sequence_number(mut self, sequence: u64) -> Self {
527        self.from_sequence_number = Some(sequence);
528        self
529    }
530}
531
532#[cfg(test)]
533#[path = "message_tests.rs"]
534mod tests;