tlq_client/client.rs
1use crate::{
2 config::{Config, ConfigBuilder},
3 error::{Result, TlqError},
4 message::*,
5 retry::RetryStrategy,
6};
7use serde::{de::DeserializeOwned, Serialize};
8use std::time::Duration;
9use tokio::io::{AsyncReadExt, AsyncWriteExt};
10use tokio::net::TcpStream;
11use tokio::time::timeout;
12use uuid::Uuid;
13
14const MAX_MESSAGE_SIZE: usize = 65536;
15
16/// The main client for interacting with TLQ (Tiny Little Queue) servers.
17///
18/// `TlqClient` provides an async, type-safe interface for all TLQ operations including
19/// adding messages, retrieving messages, and managing queue state. The client handles
20/// automatic retry with exponential backoff for transient failures.
21///
22/// # Examples
23///
24/// Basic usage:
25/// ```no_run
26/// use tlq_client::TlqClient;
27///
28/// #[tokio::main]
29/// async fn main() -> Result<(), tlq_client::TlqError> {
30/// let client = TlqClient::new("localhost", 1337)?;
31///
32/// // Add a message
33/// let message = client.add_message("Hello, World!").await?;
34/// println!("Added message: {}", message.id);
35///
36/// // Get messages
37/// let messages = client.get_messages(1).await?;
38/// if let Some(msg) = messages.first() {
39/// println!("Retrieved: {}", msg.body);
40/// }
41///
42/// Ok(())
43/// }
44/// ```
45pub struct TlqClient {
46 config: Config,
47 base_url: String,
48}
49
50impl TlqClient {
51 /// Creates a new TLQ client with default configuration.
52 ///
53 /// This is the simplest way to create a client, using default values for
54 /// timeout (30s), max retries (3), and retry delay (100ms).
55 ///
56 /// # Arguments
57 ///
58 /// * `host` - The hostname or IP address of the TLQ server
59 /// * `port` - The port number of the TLQ server
60 ///
61 /// # Examples
62 ///
63 /// ```no_run
64 /// use tlq_client::TlqClient;
65 ///
66 /// # fn example() -> Result<(), tlq_client::TlqError> {
67 /// let client = TlqClient::new("localhost", 1337)?;
68 /// # Ok(())
69 /// # }
70 /// ```
71 ///
72 /// # Errors
73 ///
74 /// Currently this method always returns `Ok`, but the `Result` is preserved
75 /// for future compatibility.
76 pub fn new(host: impl Into<String>, port: u16) -> Result<Self> {
77 Ok(ConfigBuilder::new().host(host).port(port).build())
78 }
79
80 /// Creates a new TLQ client with custom configuration.
81 ///
82 /// Use this method when you need to customize timeout, retry behavior,
83 /// or other client settings.
84 ///
85 /// # Arguments
86 ///
87 /// * `config` - A [`Config`] instance with your desired settings
88 ///
89 /// # Examples
90 ///
91 /// ```no_run
92 /// use tlq_client::{TlqClient, Config};
93 ///
94 /// # fn example() {
95 /// let config = Config::default();
96 /// let client = TlqClient::with_config(config);
97 /// # }
98 /// ```
99 pub fn with_config(config: Config) -> Self {
100 let base_url = format!("{}:{}", config.host, config.port);
101 Self { config, base_url }
102 }
103
104 /// Returns a [`ConfigBuilder`] for creating custom configurations.
105 ///
106 /// This is a convenience method that's equivalent to [`ConfigBuilder::new()`].
107 ///
108 /// # Examples
109 ///
110 /// ```no_run
111 /// use tlq_client::TlqClient;
112 /// use std::time::Duration;
113 ///
114 /// # fn example() {
115 /// let client = TlqClient::builder()
116 /// .host("localhost")
117 /// .port(1337)
118 /// .timeout(Duration::from_secs(10))
119 /// .build();
120 /// # }
121 /// ```
122 pub fn builder() -> ConfigBuilder {
123 ConfigBuilder::new()
124 }
125
126 async fn request<T, R>(&self, endpoint: &str, body: &T) -> Result<R>
127 where
128 T: Serialize,
129 R: DeserializeOwned,
130 {
131 let retry_strategy = RetryStrategy::new(self.config.max_retries, self.config.retry_delay);
132
133 retry_strategy
134 .execute(|| async { self.single_request(endpoint, body).await })
135 .await
136 }
137
138 async fn single_request<T, R>(&self, endpoint: &str, body: &T) -> Result<R>
139 where
140 T: Serialize,
141 R: DeserializeOwned,
142 {
143 let json_body = serde_json::to_vec(body)?;
144
145 let request = format!(
146 "POST {} HTTP/1.1\r\n\
147 Host: {}\r\n\
148 Content-Type: application/json\r\n\
149 Content-Length: {}\r\n\
150 Connection: close\r\n\
151 \r\n",
152 endpoint,
153 self.base_url,
154 json_body.len()
155 );
156
157 let mut stream = timeout(self.config.timeout, TcpStream::connect(&self.base_url))
158 .await
159 .map_err(|_| TlqError::Timeout(self.config.timeout.as_millis() as u64))?
160 .map_err(|e| TlqError::Connection(e.to_string()))?;
161
162 stream.write_all(request.as_bytes()).await?;
163 stream.write_all(&json_body).await?;
164 stream.flush().await?;
165
166 let mut response = Vec::new();
167 stream.read_to_end(&mut response).await?;
168
169 let response_str = String::from_utf8_lossy(&response);
170 let body = Self::parse_http_response(&response_str)?;
171 serde_json::from_str(body).map_err(Into::into)
172 }
173
174 async fn get_request<R>(&self, endpoint: &str) -> Result<R>
175 where
176 R: DeserializeOwned,
177 {
178 let retry_strategy = RetryStrategy::new(self.config.max_retries, self.config.retry_delay);
179
180 retry_strategy
181 .execute(|| async { self.single_get_request(endpoint).await })
182 .await
183 }
184
185 async fn single_get_request<R>(&self, endpoint: &str) -> Result<R>
186 where
187 R: DeserializeOwned,
188 {
189 let request = format!(
190 "GET {} HTTP/1.1\r\n\
191 Host: {}\r\n\
192 Connection: close\r\n\
193 \r\n",
194 endpoint, self.base_url,
195 );
196
197 let mut stream = timeout(self.config.timeout, TcpStream::connect(&self.base_url))
198 .await
199 .map_err(|_| TlqError::Timeout(self.config.timeout.as_millis() as u64))?
200 .map_err(|e| TlqError::Connection(e.to_string()))?;
201
202 stream.write_all(request.as_bytes()).await?;
203 stream.flush().await?;
204
205 let mut response = Vec::new();
206 stream.read_to_end(&mut response).await?;
207
208 let response_str = String::from_utf8_lossy(&response);
209 let body = Self::parse_http_response(&response_str)?;
210 serde_json::from_str(body).map_err(Into::into)
211 }
212
213 /// Performs a health check against the TLQ server.
214 ///
215 /// This method sends a GET request to the `/hello` endpoint to verify
216 /// that the server is responding. It uses a fixed 5-second timeout
217 /// regardless of the client's configured timeout.
218 ///
219 /// # Returns
220 ///
221 /// * `Ok(true)` if the server responds with HTTP 200 OK
222 /// * `Ok(false)` if the server responds but not with 200 OK
223 /// * `Err` if there's a connection error or timeout
224 ///
225 /// # Examples
226 ///
227 /// ```no_run
228 /// use tlq_client::TlqClient;
229 ///
230 /// #[tokio::main]
231 /// async fn main() -> Result<(), tlq_client::TlqError> {
232 /// let client = TlqClient::new("localhost", 1337)?;
233 ///
234 /// if client.health_check().await? {
235 /// println!("Server is healthy");
236 /// } else {
237 /// println!("Server is not responding correctly");
238 /// }
239 ///
240 /// Ok(())
241 /// }
242 /// ```
243 ///
244 /// # Errors
245 ///
246 /// Returns [`TlqError::Connection`] for network issues, or [`TlqError::Timeout`]
247 /// if the server doesn't respond within 5 seconds.
248 pub async fn health_check(&self) -> Result<bool> {
249 let mut stream = timeout(Duration::from_secs(5), TcpStream::connect(&self.base_url))
250 .await
251 .map_err(|_| TlqError::Timeout(5000))?
252 .map_err(|e| TlqError::Connection(e.to_string()))?;
253
254 let request = format!(
255 "GET /hello HTTP/1.1\r\n\
256 Host: {}\r\n\
257 Connection: close\r\n\
258 \r\n",
259 self.base_url
260 );
261
262 stream.write_all(request.as_bytes()).await?;
263 stream.flush().await?;
264
265 let mut response = Vec::new();
266 stream.read_to_end(&mut response).await?;
267
268 let response_str = String::from_utf8_lossy(&response);
269 Ok(response_str.contains("200 OK"))
270 }
271
272 /// Adds a new message to the TLQ server.
273 ///
274 /// The message will be assigned a UUID v7 identifier and placed in the queue
275 /// with state [`MessageState::Ready`]. Messages have a maximum size limit of 64KB.
276 ///
277 /// # Arguments
278 ///
279 /// * `body` - The message content (any type that can be converted to String)
280 ///
281 /// # Returns
282 ///
283 /// Returns the created [`Message`] with its assigned ID and metadata.
284 ///
285 /// # Examples
286 ///
287 /// ```no_run
288 /// use tlq_client::TlqClient;
289 ///
290 /// #[tokio::main]
291 /// async fn main() -> Result<(), tlq_client::TlqError> {
292 /// let client = TlqClient::new("localhost", 1337)?;
293 ///
294 /// // Add a simple string message
295 /// let message = client.add_message("Hello, World!").await?;
296 /// println!("Created message {} with body: {}", message.id, message.body);
297 ///
298 /// // Add a formatted message
299 /// let user_data = "important data";
300 /// let message = client.add_message(format!("Processing: {}", user_data)).await?;
301 ///
302 /// Ok(())
303 /// }
304 /// ```
305 ///
306 /// # Errors
307 ///
308 /// * [`TlqError::MessageTooLarge`] if the message exceeds 64KB (65,536 bytes)
309 /// * [`TlqError::Connection`] for network connectivity issues
310 /// * [`TlqError::Timeout`] if the request times out
311 /// * [`TlqError::Server`] for server-side errors (4xx/5xx HTTP responses)
312 pub async fn add_message(&self, body: impl Into<String>) -> Result<Message> {
313 let body = body.into();
314
315 if body.len() > MAX_MESSAGE_SIZE {
316 return Err(TlqError::MessageTooLarge { size: body.len() });
317 }
318
319 let request = AddMessageRequest { body };
320 let message: Message = self.request("/add", &request).await?;
321 Ok(message)
322 }
323
324 /// Retrieves multiple messages from the TLQ server.
325 ///
326 /// This method fetches up to `count` messages from the queue. Messages are returned
327 /// in the order they were added and their state is changed to [`MessageState::Processing`].
328 /// The server may return fewer messages than requested if there are not enough
329 /// messages in the queue.
330 ///
331 /// # Arguments
332 ///
333 /// * `count` - Maximum number of messages to retrieve (must be greater than 0)
334 ///
335 /// # Returns
336 ///
337 /// Returns a vector of [`Message`] objects. The vector may be empty if no messages
338 /// are available in the queue.
339 ///
340 /// # Examples
341 ///
342 /// ```no_run
343 /// use tlq_client::TlqClient;
344 ///
345 /// #[tokio::main]
346 /// async fn main() -> Result<(), tlq_client::TlqError> {
347 /// let client = TlqClient::new("localhost", 1337)?;
348 ///
349 /// // Get up to 5 messages from the queue
350 /// let messages = client.get_messages(5).await?;
351 ///
352 /// for message in messages {
353 /// println!("Processing message {}: {}", message.id, message.body);
354 ///
355 /// // Process the message...
356 ///
357 /// // Delete when done
358 /// client.delete_message(message.id).await?;
359 /// }
360 ///
361 /// Ok(())
362 /// }
363 /// ```
364 ///
365 /// # Errors
366 ///
367 /// * [`TlqError::Validation`] if count is 0
368 /// * [`TlqError::Connection`] for network connectivity issues
369 /// * [`TlqError::Timeout`] if the request times out
370 /// * [`TlqError::Server`] for server-side errors (4xx/5xx HTTP responses)
371 pub async fn get_messages(&self, count: u32) -> Result<Vec<Message>> {
372 if count == 0 {
373 return Err(TlqError::Validation(
374 "Count must be greater than 0".to_string(),
375 ));
376 }
377
378 let request = GetMessagesRequest { count };
379 let messages: Vec<Message> = self.request("/get", &request).await?;
380 Ok(messages)
381 }
382
383 /// Retrieves a single message from the TLQ server.
384 ///
385 /// This is a convenience method equivalent to calling [`get_messages(1)`](Self::get_messages)
386 /// and taking the first result. If no messages are available, returns `None`.
387 ///
388 /// # Returns
389 ///
390 /// * `Ok(Some(message))` if a message was retrieved
391 /// * `Ok(None)` if no messages are available in the queue
392 /// * `Err` for connection or server errors
393 ///
394 /// # Examples
395 ///
396 /// ```no_run
397 /// use tlq_client::TlqClient;
398 ///
399 /// #[tokio::main]
400 /// async fn main() -> Result<(), tlq_client::TlqError> {
401 /// let client = TlqClient::new("localhost", 1337)?;
402 ///
403 /// // Get a single message
404 /// match client.get_message().await? {
405 /// Some(message) => {
406 /// println!("Got message: {}", message.body);
407 /// client.delete_message(message.id).await?;
408 /// }
409 /// None => println!("No messages available"),
410 /// }
411 ///
412 /// Ok(())
413 /// }
414 /// ```
415 ///
416 /// # Errors
417 ///
418 /// * [`TlqError::Connection`] for network connectivity issues
419 /// * [`TlqError::Timeout`] if the request times out
420 /// * [`TlqError::Server`] for server-side errors (4xx/5xx HTTP responses)
421 pub async fn get_message(&self) -> Result<Option<Message>> {
422 let messages = self.get_messages(1).await?;
423 Ok(messages.into_iter().next())
424 }
425
426 /// Deletes a single message from the TLQ server.
427 ///
428 /// This is a convenience method that calls [`delete_messages`](Self::delete_messages)
429 /// with a single message ID.
430 ///
431 /// # Arguments
432 ///
433 /// * `id` - The UUID of the message to delete
434 ///
435 /// # Returns
436 ///
437 /// Returns a string indicating the result of the operation (typically "Success" or a count).
438 ///
439 /// # Examples
440 ///
441 /// ```no_run
442 /// use tlq_client::TlqClient;
443 ///
444 /// #[tokio::main]
445 /// async fn main() -> Result<(), tlq_client::TlqError> {
446 /// let client = TlqClient::new("localhost", 1337)?;
447 ///
448 /// if let Some(message) = client.get_message().await? {
449 /// let result = client.delete_message(message.id).await?;
450 /// println!("Delete result: {}", result);
451 /// }
452 ///
453 /// Ok(())
454 /// }
455 /// ```
456 ///
457 /// # Errors
458 ///
459 /// * [`TlqError::Connection`] for network connectivity issues
460 /// * [`TlqError::Timeout`] if the request times out
461 /// * [`TlqError::Server`] for server-side errors (4xx/5xx HTTP responses)
462 pub async fn delete_message(&self, id: Uuid) -> Result<String> {
463 self.delete_messages(&[id]).await
464 }
465
466 /// Deletes multiple messages from the TLQ server.
467 ///
468 /// This method removes the specified messages from the queue permanently.
469 /// Messages can be in any state when deleted.
470 ///
471 /// # Arguments
472 ///
473 /// * `ids` - A slice of message UUIDs to delete (must not be empty)
474 ///
475 /// # Returns
476 ///
477 /// Returns a string indicating the number of messages deleted or "Success".
478 ///
479 /// # Examples
480 ///
481 /// ```no_run
482 /// use tlq_client::TlqClient;
483 ///
484 /// #[tokio::main]
485 /// async fn main() -> Result<(), tlq_client::TlqError> {
486 /// let client = TlqClient::new("localhost", 1337)?;
487 ///
488 /// let messages = client.get_messages(3).await?;
489 /// if !messages.is_empty() {
490 /// let ids: Vec<_> = messages.iter().map(|m| m.id).collect();
491 /// let result = client.delete_messages(&ids).await?;
492 /// println!("Deleted {} messages", result);
493 /// }
494 ///
495 /// Ok(())
496 /// }
497 /// ```
498 ///
499 /// # Errors
500 ///
501 /// * [`TlqError::Validation`] if the `ids` slice is empty
502 /// * [`TlqError::Connection`] for network connectivity issues
503 /// * [`TlqError::Timeout`] if the request times out
504 /// * [`TlqError::Server`] for server-side errors (4xx/5xx HTTP responses)
505 pub async fn delete_messages(&self, ids: &[Uuid]) -> Result<String> {
506 if ids.is_empty() {
507 return Err(TlqError::Validation("No message IDs provided".to_string()));
508 }
509
510 let request = DeleteMessagesRequest { ids: ids.to_vec() };
511 let response: String = self.request("/delete", &request).await?;
512 Ok(response)
513 }
514
515 /// Retries a single message on the TLQ server.
516 ///
517 /// This is a convenience method that calls [`retry_messages`](Self::retry_messages)
518 /// with a single message ID. The message state will be changed back to
519 /// [`MessageState::Ready`], making it available for processing again.
520 ///
521 /// # Arguments
522 ///
523 /// * `id` - The UUID of the message to retry
524 ///
525 /// # Returns
526 ///
527 /// Returns a string indicating the result of the operation (typically "Success").
528 ///
529 /// # Examples
530 ///
531 /// ```no_run
532 /// use tlq_client::TlqClient;
533 ///
534 /// #[tokio::main]
535 /// async fn main() -> Result<(), tlq_client::TlqError> {
536 /// let client = TlqClient::new("localhost", 1337)?;
537 ///
538 /// if let Some(message) = client.get_message().await? {
539 /// // Processing failed, retry the message
540 /// let result = client.retry_message(message.id).await?;
541 /// println!("Retry result: {}", result);
542 /// }
543 ///
544 /// Ok(())
545 /// }
546 /// ```
547 ///
548 /// # Errors
549 ///
550 /// * [`TlqError::Connection`] for network connectivity issues
551 /// * [`TlqError::Timeout`] if the request times out
552 /// * [`TlqError::Server`] for server-side errors (4xx/5xx HTTP responses)
553 pub async fn retry_message(&self, id: Uuid) -> Result<String> {
554 self.retry_messages(&[id]).await
555 }
556
557 /// Retries multiple messages on the TLQ server.
558 ///
559 /// This method changes the state of the specified messages back to
560 /// [`MessageState::Ready`], making them available for processing again.
561 /// The retry count for each message will be incremented.
562 ///
563 /// # Arguments
564 ///
565 /// * `ids` - A slice of message UUIDs to retry (must not be empty)
566 ///
567 /// # Returns
568 ///
569 /// Returns a string indicating the result of the operation (typically "Success").
570 ///
571 /// # Examples
572 ///
573 /// ```no_run
574 /// use tlq_client::TlqClient;
575 ///
576 /// #[tokio::main]
577 /// async fn main() -> Result<(), tlq_client::TlqError> {
578 /// let client = TlqClient::new("localhost", 1337)?;
579 ///
580 /// let messages = client.get_messages(10).await?;
581 /// let ids: Vec<_> = messages.iter().map(|m| m.id).collect();
582 ///
583 /// if !ids.is_empty() {
584 /// let result = client.retry_messages(&ids).await?;
585 /// println!("Retry result: {}", result);
586 /// }
587 ///
588 /// Ok(())
589 /// }
590 /// ```
591 ///
592 /// # Errors
593 ///
594 /// * [`TlqError::Validation`] if the `ids` slice is empty
595 /// * [`TlqError::Connection`] for network connectivity issues
596 /// * [`TlqError::Timeout`] if the request times out
597 /// * [`TlqError::Server`] for server-side errors (4xx/5xx HTTP responses)
598 pub async fn retry_messages(&self, ids: &[Uuid]) -> Result<String> {
599 if ids.is_empty() {
600 return Err(TlqError::Validation("No message IDs provided".to_string()));
601 }
602
603 let request = RetryMessagesRequest { ids: ids.to_vec() };
604 let response: String = self.request("/retry", &request).await?;
605 Ok(response)
606 }
607
608 /// Removes all messages from the TLQ server queue.
609 ///
610 /// This method permanently deletes all messages in the queue regardless of their state.
611 /// Use with caution as this operation cannot be undone.
612 ///
613 /// # Returns
614 ///
615 /// Returns a string indicating the result of the operation (typically "Success").
616 ///
617 /// # Examples
618 ///
619 /// ```no_run
620 /// use tlq_client::TlqClient;
621 ///
622 /// #[tokio::main]
623 /// async fn main() -> Result<(), tlq_client::TlqError> {
624 /// let client = TlqClient::new("localhost", 1337)?;
625 ///
626 /// // Clear all messages from the queue
627 /// let result = client.purge_queue().await?;
628 /// println!("Purge result: {}", result);
629 ///
630 /// Ok(())
631 /// }
632 /// ```
633 ///
634 /// # Errors
635 ///
636 /// * [`TlqError::Connection`] for network connectivity issues
637 /// * [`TlqError::Timeout`] if the request times out
638 /// * [`TlqError::Server`] for server-side errors (4xx/5xx HTTP responses)
639 pub async fn purge_queue(&self) -> Result<String> {
640 let response: String = self.request("/purge", &serde_json::json!({})).await?;
641 Ok(response)
642 }
643
644 /// Retrieves queue statistics from the TLQ server.
645 ///
646 /// Returns counts of messages in each state (ready, processing) and
647 /// the cumulative count of dead messages removed by the reaper.
648 ///
649 /// # Returns
650 ///
651 /// Returns a [`QueueStats`] with the current queue statistics.
652 ///
653 /// # Examples
654 ///
655 /// ```no_run
656 /// use tlq_client::TlqClient;
657 ///
658 /// #[tokio::main]
659 /// async fn main() -> Result<(), tlq_client::TlqError> {
660 /// let client = TlqClient::new("localhost", 1337)?;
661 ///
662 /// let stats = client.stats().await?;
663 /// println!("Ready: {}, Processing: {}, Dead: {}",
664 /// stats.ready, stats.processing, stats.dead);
665 ///
666 /// Ok(())
667 /// }
668 /// ```
669 ///
670 /// # Errors
671 ///
672 /// * [`TlqError::Connection`] for network connectivity issues
673 /// * [`TlqError::Timeout`] if the request times out
674 /// * [`TlqError::Server`] for server-side errors (4xx/5xx HTTP responses)
675 pub async fn stats(&self) -> Result<QueueStats> {
676 self.get_request("/stats").await
677 }
678
679 // Helper function to parse HTTP response - extracted for testing
680 fn parse_http_response(response: &str) -> Result<&str> {
681 if let Some(body_start) = response.find("\r\n\r\n") {
682 let headers = &response[..body_start];
683 let body = &response[body_start + 4..];
684
685 if let Some(status_line) = headers.lines().next() {
686 let parts: Vec<&str> = status_line.split_whitespace().collect();
687 if parts.len() >= 2 {
688 if let Ok(status_code) = parts[1].parse::<u16>() {
689 if status_code >= 400 {
690 return Err(TlqError::Server {
691 status: status_code,
692 message: body.to_string(),
693 });
694 }
695 }
696 }
697 }
698
699 Ok(body)
700 } else {
701 Err(TlqError::Connection("Invalid HTTP response".to_string()))
702 }
703 }
704}
705
706#[cfg(test)]
707mod tests {
708 use super::*;
709
710 #[test]
711 fn test_parse_http_response_success() {
712 let response =
713 "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\n\r\n{\"message\":\"success\"}";
714
715 let result = TlqClient::parse_http_response(response);
716 assert!(result.is_ok());
717 assert_eq!(result.unwrap(), "{\"message\":\"success\"}");
718 }
719
720 #[test]
721 fn test_parse_http_response_server_error() {
722 let response = "HTTP/1.1 500 Internal Server Error\r\nContent-Type: text/plain\r\n\r\nInternal server error occurred";
723
724 let result = TlqClient::parse_http_response(response);
725 match result {
726 Err(TlqError::Server { status, message }) => {
727 assert_eq!(status, 500);
728 assert_eq!(message, "Internal server error occurred");
729 }
730 _ => panic!("Expected server error"),
731 }
732 }
733
734 #[test]
735 fn test_parse_http_response_client_error() {
736 let response = "HTTP/1.1 400 Bad Request\r\nContent-Type: text/plain\r\n\r\nBad request";
737
738 let result = TlqClient::parse_http_response(response);
739 match result {
740 Err(TlqError::Server { status, message }) => {
741 assert_eq!(status, 400);
742 assert_eq!(message, "Bad request");
743 }
744 _ => panic!("Expected client error"),
745 }
746 }
747
748 #[test]
749 fn test_parse_http_response_no_headers_separator() {
750 let response =
751 "HTTP/1.1 200 OK\nContent-Type: application/json\n{\"incomplete\":\"response\"}";
752
753 let result = TlqClient::parse_http_response(response);
754 match result {
755 Err(TlqError::Connection(msg)) => {
756 assert_eq!(msg, "Invalid HTTP response");
757 }
758 _ => panic!("Expected connection error"),
759 }
760 }
761
762 #[test]
763 fn test_parse_http_response_malformed_status_line() {
764 let response = "INVALID_STATUS_LINE\r\n\r\n{\"data\":\"test\"}";
765
766 let result = TlqClient::parse_http_response(response);
767 // Should still succeed because we only check if parts.len() >= 2 and parse fails gracefully
768 assert!(result.is_ok());
769 assert_eq!(result.unwrap(), "{\"data\":\"test\"}");
770 }
771
772 #[test]
773 fn test_parse_http_response_empty_body() {
774 let response = "HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n";
775
776 let result = TlqClient::parse_http_response(response);
777 assert!(result.is_ok());
778 assert_eq!(result.unwrap(), "");
779 }
780
781 #[test]
782 fn test_parse_http_response_with_extra_headers() {
783 let response = "HTTP/1.1 201 Created\r\nContent-Type: application/json\r\nServer: TLQ/1.0\r\nConnection: close\r\n\r\n{\"id\":\"123\",\"status\":\"created\"}";
784
785 let result = TlqClient::parse_http_response(response);
786 assert!(result.is_ok());
787 assert_eq!(result.unwrap(), "{\"id\":\"123\",\"status\":\"created\"}");
788 }
789
790 #[test]
791 fn test_parse_http_response_status_code_edge_cases() {
792 // Test various status codes around the 400 boundary
793
794 // 399 should be success (< 400)
795 let response_399 = "HTTP/1.1 399 Custom Success\r\n\r\n{\"ok\":true}";
796 let result = TlqClient::parse_http_response(response_399);
797 assert!(result.is_ok());
798
799 // 400 should be error (>= 400)
800 let response_400 = "HTTP/1.1 400 Bad Request\r\n\r\nBad request";
801 let result = TlqClient::parse_http_response(response_400);
802 assert!(matches!(result, Err(TlqError::Server { status: 400, .. })));
803
804 // 599 should be error
805 let response_599 = "HTTP/1.1 599 Custom Error\r\n\r\nCustom error";
806 let result = TlqClient::parse_http_response(response_599);
807 assert!(matches!(result, Err(TlqError::Server { status: 599, .. })));
808 }
809
810 #[test]
811 fn test_max_message_size_constant() {
812 assert_eq!(MAX_MESSAGE_SIZE, 65536);
813 }
814
815 #[test]
816 fn test_client_creation() {
817 let client = TlqClient::new("test-host", 9999);
818 assert!(client.is_ok());
819
820 let client = client.unwrap();
821 assert_eq!(client.base_url, "test-host:9999");
822 }
823
824 #[test]
825 fn test_client_with_config() {
826 let config = Config {
827 host: "custom-host".to_string(),
828 port: 8080,
829 timeout: Duration::from_secs(10),
830 max_retries: 5,
831 retry_delay: Duration::from_millis(200),
832 };
833
834 let client = TlqClient::with_config(config);
835 assert_eq!(client.base_url, "custom-host:8080");
836 assert_eq!(client.config.max_retries, 5);
837 assert_eq!(client.config.timeout, Duration::from_secs(10));
838 }
839
840 #[test]
841 fn test_message_size_validation() {
842 let _client = TlqClient::new("localhost", 1337).unwrap();
843
844 // Test exact limit
845 let message_at_limit = "x".repeat(MAX_MESSAGE_SIZE);
846 let result = std::panic::catch_unwind(|| {
847 // We can't actually test async methods in sync tests without tokio,
848 // but we can verify the constant is correct
849 assert_eq!(message_at_limit.len(), MAX_MESSAGE_SIZE);
850 });
851 assert!(result.is_ok());
852
853 // Test over limit
854 let message_over_limit = "x".repeat(MAX_MESSAGE_SIZE + 1);
855 assert_eq!(message_over_limit.len(), MAX_MESSAGE_SIZE + 1);
856 }
857
858 #[tokio::test]
859 async fn test_add_message_size_validation() {
860 let client = TlqClient::new("localhost", 1337).unwrap();
861
862 // Test message at exact size limit (should be rejected because it's over the limit)
863 let large_message = "x".repeat(MAX_MESSAGE_SIZE + 1);
864 let result = client.add_message(large_message).await;
865
866 match result {
867 Err(TlqError::MessageTooLarge { size }) => {
868 assert_eq!(size, MAX_MESSAGE_SIZE + 1);
869 }
870 _ => panic!("Expected MessageTooLarge error"),
871 }
872
873 // Test empty message (should be valid)
874 let empty_message = "";
875 // We can't actually test without a server, but we can verify it passes size validation
876 assert!(empty_message.len() <= MAX_MESSAGE_SIZE);
877
878 // Test message exactly at limit (should be valid)
879 let max_message = "x".repeat(MAX_MESSAGE_SIZE);
880 // Size check should pass
881 assert_eq!(max_message.len(), MAX_MESSAGE_SIZE);
882 }
883
884 #[tokio::test]
885 async fn test_get_messages_validation() {
886 let client = TlqClient::new("localhost", 1337).unwrap();
887
888 // Test zero count (should be rejected)
889 let result = client.get_messages(0).await;
890 match result {
891 Err(TlqError::Validation(msg)) => {
892 assert_eq!(msg, "Count must be greater than 0");
893 }
894 _ => panic!("Expected validation error for zero count"),
895 }
896
897 // Test valid counts - these should pass without validation errors
898 let _ = client.get_messages(1).await; // Should be valid
899 let _ = client.get_messages(100).await; // Should be valid
900 let _ = client.get_messages(u32::MAX).await; // Should be valid
901 }
902
903 #[tokio::test]
904 async fn test_delete_messages_validation() {
905 let client = TlqClient::new("localhost", 1337).unwrap();
906
907 // Test empty IDs array
908 let result = client.delete_messages(&[]).await;
909 match result {
910 Err(TlqError::Validation(msg)) => {
911 assert_eq!(msg, "No message IDs provided");
912 }
913 _ => panic!("Expected validation error for empty IDs"),
914 }
915
916 // Test delete_message (single ID) - should not have validation issue
917 use uuid::Uuid;
918 let test_id = Uuid::now_v7();
919 // We can't test the actual call without a server, but we can verify
920 // it would call delete_messages with a single-item array
921 assert!(!vec![test_id].is_empty());
922 }
923
924 #[tokio::test]
925 async fn test_retry_messages_validation() {
926 let client = TlqClient::new("localhost", 1337).unwrap();
927
928 // Test empty IDs array
929 let result = client.retry_messages(&[]).await;
930 match result {
931 Err(TlqError::Validation(msg)) => {
932 assert_eq!(msg, "No message IDs provided");
933 }
934 _ => panic!("Expected validation error for empty IDs"),
935 }
936
937 // Test retry_message (single ID) - should not have validation issue
938 use uuid::Uuid;
939 let test_id = Uuid::now_v7();
940 // We can't test the actual call without a server, but we can verify
941 // it would call retry_messages with a single-item array
942 assert!(!vec![test_id].is_empty());
943 }
944
945 #[test]
946 fn test_client_builder_edge_cases() {
947 // Test builder with minimum values
948 let client = TlqClient::builder()
949 .host("")
950 .port(0)
951 .timeout_ms(0)
952 .max_retries(0)
953 .retry_delay_ms(0)
954 .build();
955
956 assert_eq!(client.base_url, ":0");
957 assert_eq!(client.config.max_retries, 0);
958 assert_eq!(client.config.timeout, Duration::from_millis(0));
959
960 // Test builder with maximum reasonable values
961 let client = TlqClient::builder()
962 .host("very-long-hostname-that-might-be-used-in-some-environments")
963 .port(65535)
964 .timeout_ms(600000) // 10 minutes
965 .max_retries(100)
966 .retry_delay_ms(10000) // 10 seconds
967 .build();
968
969 assert!(client.base_url.contains("very-long-hostname"));
970 assert_eq!(client.config.max_retries, 100);
971 assert_eq!(client.config.timeout, Duration::from_secs(600));
972 }
973
974 #[test]
975 fn test_config_validation() {
976 use crate::config::ConfigBuilder;
977 use std::time::Duration;
978
979 // Test various duration configurations
980 let config1 = ConfigBuilder::new()
981 .timeout(Duration::from_nanos(1))
982 .build_config();
983 assert_eq!(config1.timeout, Duration::from_nanos(1));
984
985 let config2 = ConfigBuilder::new()
986 .retry_delay(Duration::from_secs(3600)) // 1 hour
987 .build_config();
988 assert_eq!(config2.retry_delay, Duration::from_secs(3600));
989
990 // Test edge case ports
991 let config3 = ConfigBuilder::new().port(1).build_config();
992 assert_eq!(config3.port, 1);
993
994 let config4 = ConfigBuilder::new().port(65535).build_config();
995 assert_eq!(config4.port, 65535);
996
997 // Test very high retry counts
998 let config5 = ConfigBuilder::new().max_retries(1000).build_config();
999 assert_eq!(config5.max_retries, 1000);
1000 }
1001}