Skip to main content

tlq_client/
message.rs

1use serde::{Deserialize, Serialize};
2use uuid::Uuid;
3
4/// Represents a message in the TLQ queue system.
5///
6/// Each message has a unique identifier, content, and metadata about its processing state.
7/// Messages are automatically assigned UUID v7 identifiers which provide time-ordering.
8///
9/// # Examples
10///
11/// ```
12/// use tlq_client::Message;
13///
14/// // Create a new message
15/// let message = Message::new("Hello, World!".to_string());
16/// println!("Message ID: {}", message.id);
17/// println!("Message body: {}", message.body);
18/// ```
19#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
20pub struct Message {
21    /// Unique identifier for the message (UUID v7 format for time-ordering)
22    pub id: Uuid,
23    /// The message content/body as a string
24    pub body: String,
25    /// Current processing state of the message
26    pub state: MessageState,
27    /// Unix timestamp in milliseconds indicating when the message lock expires
28    #[serde(skip_serializing_if = "Option::is_none")]
29    pub lock_until: Option<u64>,
30    /// Number of times this message has been retried after failure
31    pub retry_count: u32,
32}
33
34/// Represents the current processing state of a message in the queue.
35///
36/// Messages transition through these states as they are processed:
37/// - `Ready` → `Processing` (when retrieved by a consumer)
38/// - `Processing` → `Ready` (when retried)
39/// - Any state → deleted (when explicitly deleted)
40///
41/// Messages that exceed the server's max retry limit are automatically
42/// removed by the background reaper and counted as "dead" in queue statistics.
43///
44/// # Serialization
45///
46/// States are serialized in PascalCase format ("Ready", "Processing")
47/// to match the TLQ server API expectations.
48///
49/// # Examples
50///
51/// ```
52/// use tlq_client::MessageState;
53///
54/// let state = MessageState::Ready;
55/// assert_eq!(serde_json::to_string(&state).unwrap(), "\"Ready\"");
56/// ```
57#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
58#[serde(rename_all = "PascalCase")]
59pub enum MessageState {
60    /// Message is ready to be processed by a consumer
61    Ready,
62    /// Message is currently being processed by a consumer
63    Processing,
64}
65
66impl Message {
67    /// Creates a new message with the specified body content.
68    ///
69    /// The message is initialized with:
70    /// - A new UUID v7 identifier (provides time-ordering)
71    /// - State set to [`MessageState::Ready`]
72    /// - No lock expiration time
73    /// - Zero retry count
74    ///
75    /// # Arguments
76    ///
77    /// * `body` - The message content as a String
78    ///
79    /// # Examples
80    ///
81    /// ```
82    /// use tlq_client::{Message, MessageState};
83    ///
84    /// let message = Message::new("Process this task".to_string());
85    /// assert_eq!(message.body, "Process this task");
86    /// assert_eq!(message.state, MessageState::Ready);
87    /// assert_eq!(message.retry_count, 0);
88    /// assert!(message.lock_until.is_none());
89    /// ```
90    pub fn new(body: String) -> Self {
91        Self {
92            id: Uuid::now_v7(),
93            body,
94            state: MessageState::Ready,
95            lock_until: None,
96            retry_count: 0,
97        }
98    }
99}
100
101/// Represents queue statistics returned by the TLQ server.
102///
103/// Provides counts of messages in each state, plus the cumulative count of
104/// dead messages (those removed by the reaper after exceeding max retries).
105#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
106pub struct QueueStats {
107    /// Number of messages in Ready state
108    pub ready: u64,
109    /// Number of messages currently being processed
110    pub processing: u64,
111    /// Cumulative count of messages removed after exceeding max retries
112    pub dead: u64,
113}
114
115// Internal request structures for TLQ API communication
116
117/// Request structure for adding a message to the queue
118#[derive(Debug, Serialize)]
119pub struct AddMessageRequest {
120    pub body: String,
121}
122
123/// Request structure for retrieving messages from the queue
124#[derive(Debug, Serialize)]
125pub struct GetMessagesRequest {
126    pub count: u32,
127}
128
129/// Request structure for deleting messages from the queue
130#[derive(Debug, Serialize)]
131pub struct DeleteMessagesRequest {
132    pub ids: Vec<Uuid>,
133}
134
135/// Request structure for retrying failed messages
136#[derive(Debug, Serialize)]
137pub struct RetryMessagesRequest {
138    pub ids: Vec<Uuid>,
139}
140
141#[cfg(test)]
142mod tests {
143    use super::*;
144    use serde_json;
145
146    #[test]
147    fn test_message_creation() {
148        let message = Message::new("Test message".to_string());
149
150        assert_eq!(message.body, "Test message");
151        assert_eq!(message.state, MessageState::Ready);
152        assert_eq!(message.retry_count, 0);
153
154        // UUID should be valid
155        assert!(!message.id.to_string().is_empty());
156    }
157
158    #[test]
159    fn test_message_state_serialization() {
160        // Test that MessageState serializes to the expected Pascal case
161        assert_eq!(
162            serde_json::to_string(&MessageState::Ready).unwrap(),
163            "\"Ready\""
164        );
165        assert_eq!(
166            serde_json::to_string(&MessageState::Processing).unwrap(),
167            "\"Processing\""
168        );
169    }
170
171    #[test]
172    fn test_message_state_deserialization() {
173        // Test that MessageState deserializes from Pascal case
174        assert_eq!(
175            serde_json::from_str::<MessageState>("\"Ready\"").unwrap(),
176            MessageState::Ready
177        );
178        assert_eq!(
179            serde_json::from_str::<MessageState>("\"Processing\"").unwrap(),
180            MessageState::Processing
181        );
182    }
183
184    #[test]
185    fn test_message_state_invalid_deserialization() {
186        // Test that invalid states fail to deserialize
187        let result = serde_json::from_str::<MessageState>("\"Invalid\"");
188        assert!(result.is_err());
189
190        let result = serde_json::from_str::<MessageState>("\"ready\""); // lowercase
191        assert!(result.is_err());
192
193        let result = serde_json::from_str::<MessageState>("\"READY\""); // uppercase
194        assert!(result.is_err());
195    }
196
197    #[test]
198    fn test_message_serialization() {
199        let message = Message::new("test body".to_string());
200
201        let json = serde_json::to_string(&message).unwrap();
202
203        // Should contain all fields
204        assert!(json.contains("\"id\":"));
205        assert!(json.contains("\"body\":\"test body\""));
206        assert!(json.contains("\"state\":\"Ready\""));
207        assert!(json.contains("\"retry_count\":0"));
208
209        // Should deserialize back correctly
210        let deserialized: Message = serde_json::from_str(&json).unwrap();
211        assert_eq!(deserialized.body, message.body);
212        assert_eq!(deserialized.state, message.state);
213        assert_eq!(deserialized.retry_count, message.retry_count);
214        assert_eq!(deserialized.id, message.id);
215    }
216
217    #[test]
218    fn test_message_with_special_characters() {
219        let special_body = "Test with 🦀 emojis and \"quotes\" and \n newlines \t tabs";
220        let message = Message::new(special_body.to_string());
221
222        assert_eq!(message.body, special_body);
223
224        // Should serialize and deserialize correctly
225        let json = serde_json::to_string(&message).unwrap();
226        let deserialized: Message = serde_json::from_str(&json).unwrap();
227        assert_eq!(deserialized.body, special_body);
228    }
229
230    #[test]
231    fn test_message_with_very_long_body() {
232        let long_body = "a".repeat(100_000);
233        let message = Message::new(long_body.clone());
234
235        assert_eq!(message.body, long_body);
236        assert_eq!(message.body.len(), 100_000);
237    }
238
239    #[test]
240    fn test_message_with_empty_body() {
241        let message = Message::new("".to_string());
242
243        assert_eq!(message.body, "");
244        assert_eq!(message.state, MessageState::Ready);
245        assert_eq!(message.retry_count, 0);
246    }
247
248    #[test]
249    fn test_request_response_structures() {
250        // Test AddMessageRequest
251        let add_req = AddMessageRequest {
252            body: "test message".to_string(),
253        };
254        let json = serde_json::to_string(&add_req).unwrap();
255        assert!(json.contains("\"body\":\"test message\""));
256
257        // Test GetMessagesRequest
258        let get_req = GetMessagesRequest { count: 5 };
259        let json = serde_json::to_string(&get_req).unwrap();
260        assert!(json.contains("\"count\":5"));
261
262        // Test DeleteMessagesRequest
263        use uuid::Uuid;
264        let id1 = Uuid::now_v7();
265        let id2 = Uuid::now_v7();
266        let delete_req = DeleteMessagesRequest {
267            ids: vec![id1, id2],
268        };
269        let json = serde_json::to_string(&delete_req).unwrap();
270        assert!(json.contains("\"ids\":"));
271
272        // Test RetryMessagesRequest
273        let retry_req = RetryMessagesRequest { ids: vec![id1] };
274        let json = serde_json::to_string(&retry_req).unwrap();
275        assert!(json.contains("\"ids\":"));
276    }
277
278    #[test]
279    fn test_response_deserialization() {
280        // Test direct Message response (for add_message)
281        let message_json = r#"{"id":"0198fbd8-344e-7b70-841f-3fbd4b371e4c","body":"test","state":"Ready","lock_until":null,"retry_count":0}"#;
282        let message: Message = serde_json::from_str(message_json).unwrap();
283        assert_eq!(message.body, "test");
284        assert_eq!(message.state, MessageState::Ready);
285        assert_eq!(message.retry_count, 0);
286        assert_eq!(message.lock_until, None);
287
288        // Test array of messages response (for get_messages)
289        let messages_json = r#"[{"id":"0198fbd8-344e-7b70-841f-3fbd4b371e4c","body":"test1","state":"Processing","lock_until":null,"retry_count":1}]"#;
290        let messages: Vec<Message> = serde_json::from_str(messages_json).unwrap();
291        assert_eq!(messages.len(), 1);
292        assert_eq!(messages[0].body, "test1");
293        assert_eq!(messages[0].state, MessageState::Processing);
294
295        // Test message with numeric lock_until (Unix timestamp ms)
296        let locked_json = r#"{"id":"0198fbd8-344e-7b70-841f-3fbd4b371e4c","body":"locked","state":"Processing","lock_until":1700000000000,"retry_count":1}"#;
297        let locked: Message = serde_json::from_str(locked_json).unwrap();
298        assert_eq!(locked.lock_until, Some(1700000000000u64));
299        assert_eq!(locked.state, MessageState::Processing);
300
301        // Test success string responses (for delete/retry/purge)
302        let success_response: String = serde_json::from_str(r#""Success""#).unwrap();
303        assert_eq!(success_response, "Success");
304
305        // Test health check response
306        let health_response: String = serde_json::from_str(r#""Hello World""#).unwrap();
307        assert_eq!(health_response, "Hello World");
308    }
309
310    #[test]
311    fn test_malformed_response_deserialization() {
312        // Test that malformed JSON fails gracefully
313        let malformed_json = r#"{"id": invalid}"#;
314        let result = serde_json::from_str::<Message>(malformed_json);
315        assert!(result.is_err());
316
317        // Test missing required fields in Message
318        let incomplete_json = r#"{"id":"0198fbd8-344e-7b70-841f-3fbd4b371e4c","body":"test"}"#; // Missing state and retry_count
319        let result = serde_json::from_str::<Message>(incomplete_json);
320        assert!(result.is_err());
321
322        // Test wrong field types in Message
323        let wrong_type_json = r#"{"id":"0198fbd8-344e-7b70-841f-3fbd4b371e4c","body":"test","state":"Ready","retry_count":"not_a_number"}"#;
324        let result = serde_json::from_str::<Message>(wrong_type_json);
325        assert!(result.is_err());
326
327        // Test malformed message with invalid UUID
328        let bad_uuid_json = r#"{"id":"invalid-uuid","body":"test","state":"Ready","lock_until":null,"retry_count":0}"#;
329        let result = serde_json::from_str::<Message>(bad_uuid_json);
330        assert!(result.is_err()); // Should fail due to invalid UUID
331
332        // Test malformed array
333        let bad_array_json = r#"[{"id":"invalid"}]"#;
334        let result = serde_json::from_str::<Vec<Message>>(bad_array_json);
335        assert!(result.is_err());
336    }
337
338    #[test]
339    fn test_queue_stats_deserialization() {
340        let json = r#"{"ready":10,"processing":5,"dead":2}"#;
341        let stats: QueueStats = serde_json::from_str(json).unwrap();
342        assert_eq!(stats.ready, 10);
343        assert_eq!(stats.processing, 5);
344        assert_eq!(stats.dead, 2);
345    }
346
347    #[test]
348    fn test_queue_stats_serialization() {
349        let stats = QueueStats {
350            ready: 3,
351            processing: 1,
352            dead: 0,
353        };
354        let json = serde_json::to_string(&stats).unwrap();
355        let deserialized: QueueStats = serde_json::from_str(&json).unwrap();
356        assert_eq!(stats, deserialized);
357    }
358}