Skip to main content

tlq_client/
client.rs

1use crate::{
2    config::{Config, ConfigBuilder},
3    error::{Result, TlqError},
4    message::*,
5    retry::RetryStrategy,
6};
7use serde::{de::DeserializeOwned, Serialize};
8use std::time::Duration;
9use tokio::io::{AsyncReadExt, AsyncWriteExt};
10use tokio::net::TcpStream;
11use tokio::time::timeout;
12use uuid::Uuid;
13
14const MAX_MESSAGE_SIZE: usize = 65536;
15
16/// The main client for interacting with TLQ (Tiny Little Queue) servers.
17///
18/// `TlqClient` provides an async, type-safe interface for all TLQ operations including
19/// adding messages, retrieving messages, and managing queue state. The client handles
20/// automatic retry with exponential backoff for transient failures.
21///
22/// # Examples
23///
24/// Basic usage:
25/// ```no_run
26/// use tlq_client::TlqClient;
27///
28/// #[tokio::main]
29/// async fn main() -> Result<(), tlq_client::TlqError> {
30///     let client = TlqClient::new("localhost", 1337)?;
31///     
32///     // Add a message
33///     let message = client.add_message("Hello, World!").await?;
34///     println!("Added message: {}", message.id);
35///     
36///     // Get messages
37///     let messages = client.get_messages(1).await?;
38///     if let Some(msg) = messages.first() {
39///         println!("Retrieved: {}", msg.body);
40///     }
41///     
42///     Ok(())
43/// }
44/// ```
45pub struct TlqClient {
46    config: Config,
47    base_url: String,
48}
49
50impl TlqClient {
51    /// Creates a new TLQ client with default configuration.
52    ///
53    /// This is the simplest way to create a client, using default values for
54    /// timeout (30s), max retries (3), and retry delay (100ms).
55    ///
56    /// # Arguments
57    ///
58    /// * `host` - The hostname or IP address of the TLQ server
59    /// * `port` - The port number of the TLQ server
60    ///
61    /// # Examples
62    ///
63    /// ```no_run
64    /// use tlq_client::TlqClient;
65    ///
66    /// # fn example() -> Result<(), tlq_client::TlqError> {
67    /// let client = TlqClient::new("localhost", 1337)?;
68    /// # Ok(())
69    /// # }
70    /// ```
71    ///
72    /// # Errors
73    ///
74    /// Currently this method always returns `Ok`, but the `Result` is preserved
75    /// for future compatibility.
76    pub fn new(host: impl Into<String>, port: u16) -> Result<Self> {
77        Ok(ConfigBuilder::new().host(host).port(port).build())
78    }
79
80    /// Creates a new TLQ client with custom configuration.
81    ///
82    /// Use this method when you need to customize timeout, retry behavior,
83    /// or other client settings.
84    ///
85    /// # Arguments
86    ///
87    /// * `config` - A [`Config`] instance with your desired settings
88    ///
89    /// # Examples
90    ///
91    /// ```no_run
92    /// use tlq_client::{TlqClient, Config};
93    ///
94    /// # fn example() {
95    /// let config = Config::default();
96    /// let client = TlqClient::with_config(config);
97    /// # }
98    /// ```
99    pub fn with_config(config: Config) -> Self {
100        let base_url = format!("{}:{}", config.host, config.port);
101        Self { config, base_url }
102    }
103
104    /// Returns a [`ConfigBuilder`] for creating custom configurations.
105    ///
106    /// This is a convenience method that's equivalent to [`ConfigBuilder::new()`].
107    ///
108    /// # Examples
109    ///
110    /// ```no_run
111    /// use tlq_client::TlqClient;
112    /// use std::time::Duration;
113    ///
114    /// # fn example() {
115    /// let client = TlqClient::builder()
116    ///     .host("localhost")
117    ///     .port(1337)
118    ///     .timeout(Duration::from_secs(10))
119    ///     .build();
120    /// # }
121    /// ```
122    pub fn builder() -> ConfigBuilder {
123        ConfigBuilder::new()
124    }
125
126    async fn request<T, R>(&self, endpoint: &str, body: &T) -> Result<R>
127    where
128        T: Serialize,
129        R: DeserializeOwned,
130    {
131        let retry_strategy = RetryStrategy::new(self.config.max_retries, self.config.retry_delay);
132
133        retry_strategy
134            .execute(|| async { self.single_request(endpoint, body).await })
135            .await
136    }
137
138    async fn single_request<T, R>(&self, endpoint: &str, body: &T) -> Result<R>
139    where
140        T: Serialize,
141        R: DeserializeOwned,
142    {
143        let json_body = serde_json::to_vec(body)?;
144
145        let request = format!(
146            "POST {} HTTP/1.1\r\n\
147             Host: {}\r\n\
148             Content-Type: application/json\r\n\
149             Content-Length: {}\r\n\
150             Connection: close\r\n\
151             \r\n",
152            endpoint,
153            self.base_url,
154            json_body.len()
155        );
156
157        let mut stream = timeout(self.config.timeout, TcpStream::connect(&self.base_url))
158            .await
159            .map_err(|_| TlqError::Timeout(self.config.timeout.as_millis() as u64))?
160            .map_err(|e| TlqError::Connection(e.to_string()))?;
161
162        stream.write_all(request.as_bytes()).await?;
163        stream.write_all(&json_body).await?;
164        stream.flush().await?;
165
166        let mut response = Vec::new();
167        stream.read_to_end(&mut response).await?;
168
169        let response_str = String::from_utf8_lossy(&response);
170        let body = Self::parse_http_response(&response_str)?;
171        serde_json::from_str(body).map_err(Into::into)
172    }
173
174    async fn get_request<R>(&self, endpoint: &str) -> Result<R>
175    where
176        R: DeserializeOwned,
177    {
178        let retry_strategy = RetryStrategy::new(self.config.max_retries, self.config.retry_delay);
179
180        retry_strategy
181            .execute(|| async { self.single_get_request(endpoint).await })
182            .await
183    }
184
185    async fn single_get_request<R>(&self, endpoint: &str) -> Result<R>
186    where
187        R: DeserializeOwned,
188    {
189        let request = format!(
190            "GET {} HTTP/1.1\r\n\
191             Host: {}\r\n\
192             Connection: close\r\n\
193             \r\n",
194            endpoint, self.base_url,
195        );
196
197        let mut stream = timeout(self.config.timeout, TcpStream::connect(&self.base_url))
198            .await
199            .map_err(|_| TlqError::Timeout(self.config.timeout.as_millis() as u64))?
200            .map_err(|e| TlqError::Connection(e.to_string()))?;
201
202        stream.write_all(request.as_bytes()).await?;
203        stream.flush().await?;
204
205        let mut response = Vec::new();
206        stream.read_to_end(&mut response).await?;
207
208        let response_str = String::from_utf8_lossy(&response);
209        let body = Self::parse_http_response(&response_str)?;
210        serde_json::from_str(body).map_err(Into::into)
211    }
212
213    /// Performs a health check against the TLQ server.
214    ///
215    /// This method sends a GET request to the `/hello` endpoint to verify
216    /// that the server is responding. It uses a fixed 5-second timeout
217    /// regardless of the client's configured timeout.
218    ///
219    /// # Returns
220    ///
221    /// * `Ok(true)` if the server responds with HTTP 200 OK
222    /// * `Ok(false)` if the server responds but not with 200 OK
223    /// * `Err` if there's a connection error or timeout
224    ///
225    /// # Examples
226    ///
227    /// ```no_run
228    /// use tlq_client::TlqClient;
229    ///
230    /// #[tokio::main]
231    /// async fn main() -> Result<(), tlq_client::TlqError> {
232    ///     let client = TlqClient::new("localhost", 1337)?;
233    ///
234    ///     if client.health_check().await? {
235    ///         println!("Server is healthy");
236    ///     } else {
237    ///         println!("Server is not responding correctly");
238    ///     }
239    ///     
240    ///     Ok(())
241    /// }
242    /// ```
243    ///
244    /// # Errors
245    ///
246    /// Returns [`TlqError::Connection`] for network issues, or [`TlqError::Timeout`]
247    /// if the server doesn't respond within 5 seconds.
248    pub async fn health_check(&self) -> Result<bool> {
249        let mut stream = timeout(Duration::from_secs(5), TcpStream::connect(&self.base_url))
250            .await
251            .map_err(|_| TlqError::Timeout(5000))?
252            .map_err(|e| TlqError::Connection(e.to_string()))?;
253
254        let request = format!(
255            "GET /hello HTTP/1.1\r\n\
256             Host: {}\r\n\
257             Connection: close\r\n\
258             \r\n",
259            self.base_url
260        );
261
262        stream.write_all(request.as_bytes()).await?;
263        stream.flush().await?;
264
265        let mut response = Vec::new();
266        stream.read_to_end(&mut response).await?;
267
268        let response_str = String::from_utf8_lossy(&response);
269        Ok(response_str.contains("200 OK"))
270    }
271
272    /// Adds a new message to the TLQ server.
273    ///
274    /// The message will be assigned a UUID v7 identifier and placed in the queue
275    /// with state [`MessageState::Ready`]. Messages have a maximum size limit of 64KB.
276    ///
277    /// # Arguments
278    ///
279    /// * `body` - The message content (any type that can be converted to String)
280    ///
281    /// # Returns
282    ///
283    /// Returns the created [`Message`] with its assigned ID and metadata.
284    ///
285    /// # Examples
286    ///
287    /// ```no_run
288    /// use tlq_client::TlqClient;
289    ///
290    /// #[tokio::main]
291    /// async fn main() -> Result<(), tlq_client::TlqError> {
292    ///     let client = TlqClient::new("localhost", 1337)?;
293    ///
294    ///     // Add a simple string message
295    ///     let message = client.add_message("Hello, World!").await?;
296    ///     println!("Created message {} with body: {}", message.id, message.body);
297    ///
298    ///     // Add a formatted message
299    ///     let user_data = "important data";
300    ///     let message = client.add_message(format!("Processing: {}", user_data)).await?;
301    ///     
302    ///     Ok(())
303    /// }
304    /// ```
305    ///
306    /// # Errors
307    ///
308    /// * [`TlqError::MessageTooLarge`] if the message exceeds 64KB (65,536 bytes)
309    /// * [`TlqError::Connection`] for network connectivity issues
310    /// * [`TlqError::Timeout`] if the request times out
311    /// * [`TlqError::Server`] for server-side errors (4xx/5xx HTTP responses)
312    pub async fn add_message(&self, body: impl Into<String>) -> Result<Message> {
313        let body = body.into();
314
315        if body.len() > MAX_MESSAGE_SIZE {
316            return Err(TlqError::MessageTooLarge { size: body.len() });
317        }
318
319        let request = AddMessageRequest { body };
320        let message: Message = self.request("/add", &request).await?;
321        Ok(message)
322    }
323
324    /// Retrieves multiple messages from the TLQ server.
325    ///
326    /// This method fetches up to `count` messages from the queue. Messages are returned
327    /// in the order they were added and their state is changed to [`MessageState::Processing`].
328    /// The server may return fewer messages than requested if there are not enough
329    /// messages in the queue.
330    ///
331    /// # Arguments
332    ///
333    /// * `count` - Maximum number of messages to retrieve (must be greater than 0)
334    ///
335    /// # Returns
336    ///
337    /// Returns a vector of [`Message`] objects. The vector may be empty if no messages
338    /// are available in the queue.
339    ///
340    /// # Examples
341    ///
342    /// ```no_run
343    /// use tlq_client::TlqClient;
344    ///
345    /// #[tokio::main]
346    /// async fn main() -> Result<(), tlq_client::TlqError> {
347    ///     let client = TlqClient::new("localhost", 1337)?;
348    ///
349    ///     // Get up to 5 messages from the queue
350    ///     let messages = client.get_messages(5).await?;
351    ///     
352    ///     for message in messages {
353    ///         println!("Processing message {}: {}", message.id, message.body);
354    ///         
355    ///         // Process the message...
356    ///         
357    ///         // Delete when done
358    ///         client.delete_message(message.id).await?;
359    ///     }
360    ///     
361    ///     Ok(())
362    /// }
363    /// ```
364    ///
365    /// # Errors
366    ///
367    /// * [`TlqError::Validation`] if count is 0
368    /// * [`TlqError::Connection`] for network connectivity issues  
369    /// * [`TlqError::Timeout`] if the request times out
370    /// * [`TlqError::Server`] for server-side errors (4xx/5xx HTTP responses)
371    pub async fn get_messages(&self, count: u32) -> Result<Vec<Message>> {
372        if count == 0 {
373            return Err(TlqError::Validation(
374                "Count must be greater than 0".to_string(),
375            ));
376        }
377
378        let request = GetMessagesRequest { count };
379        let messages: Vec<Message> = self.request("/get", &request).await?;
380        Ok(messages)
381    }
382
383    /// Retrieves a single message from the TLQ server.
384    ///
385    /// This is a convenience method equivalent to calling [`get_messages(1)`](Self::get_messages)
386    /// and taking the first result. If no messages are available, returns `None`.
387    ///
388    /// # Returns
389    ///
390    /// * `Ok(Some(message))` if a message was retrieved
391    /// * `Ok(None)` if no messages are available in the queue
392    /// * `Err` for connection or server errors
393    ///
394    /// # Examples
395    ///
396    /// ```no_run
397    /// use tlq_client::TlqClient;
398    ///
399    /// #[tokio::main]
400    /// async fn main() -> Result<(), tlq_client::TlqError> {
401    ///     let client = TlqClient::new("localhost", 1337)?;
402    ///
403    ///     // Get a single message
404    ///     match client.get_message().await? {
405    ///         Some(message) => {
406    ///             println!("Got message: {}", message.body);
407    ///             client.delete_message(message.id).await?;
408    ///         }
409    ///         None => println!("No messages available"),
410    ///     }
411    ///     
412    ///     Ok(())
413    /// }
414    /// ```
415    ///
416    /// # Errors
417    ///
418    /// * [`TlqError::Connection`] for network connectivity issues
419    /// * [`TlqError::Timeout`] if the request times out  
420    /// * [`TlqError::Server`] for server-side errors (4xx/5xx HTTP responses)
421    pub async fn get_message(&self) -> Result<Option<Message>> {
422        let messages = self.get_messages(1).await?;
423        Ok(messages.into_iter().next())
424    }
425
426    /// Deletes a single message from the TLQ server.
427    ///
428    /// This is a convenience method that calls [`delete_messages`](Self::delete_messages)
429    /// with a single message ID.
430    ///
431    /// # Arguments
432    ///
433    /// * `id` - The UUID of the message to delete
434    ///
435    /// # Returns
436    ///
437    /// Returns a string indicating the result of the operation (typically "Success" or a count).
438    ///
439    /// # Examples
440    ///
441    /// ```no_run
442    /// use tlq_client::TlqClient;
443    ///
444    /// #[tokio::main]
445    /// async fn main() -> Result<(), tlq_client::TlqError> {
446    ///     let client = TlqClient::new("localhost", 1337)?;
447    ///
448    ///     if let Some(message) = client.get_message().await? {
449    ///         let result = client.delete_message(message.id).await?;
450    ///         println!("Delete result: {}", result);
451    ///     }
452    ///     
453    ///     Ok(())
454    /// }
455    /// ```
456    ///
457    /// # Errors
458    ///
459    /// * [`TlqError::Connection`] for network connectivity issues
460    /// * [`TlqError::Timeout`] if the request times out
461    /// * [`TlqError::Server`] for server-side errors (4xx/5xx HTTP responses)
462    pub async fn delete_message(&self, id: Uuid) -> Result<String> {
463        self.delete_messages(&[id]).await
464    }
465
466    /// Deletes multiple messages from the TLQ server.
467    ///
468    /// This method removes the specified messages from the queue permanently.
469    /// Messages can be in any state when deleted.
470    ///
471    /// # Arguments
472    ///
473    /// * `ids` - A slice of message UUIDs to delete (must not be empty)
474    ///
475    /// # Returns
476    ///
477    /// Returns a string indicating the number of messages deleted or "Success".
478    ///
479    /// # Examples
480    ///
481    /// ```no_run
482    /// use tlq_client::TlqClient;
483    ///
484    /// #[tokio::main]
485    /// async fn main() -> Result<(), tlq_client::TlqError> {
486    ///     let client = TlqClient::new("localhost", 1337)?;
487    ///
488    ///     let messages = client.get_messages(3).await?;
489    ///     if !messages.is_empty() {
490    ///         let ids: Vec<_> = messages.iter().map(|m| m.id).collect();
491    ///         let result = client.delete_messages(&ids).await?;
492    ///         println!("Deleted {} messages", result);
493    ///     }
494    ///     
495    ///     Ok(())
496    /// }
497    /// ```
498    ///
499    /// # Errors
500    ///
501    /// * [`TlqError::Validation`] if the `ids` slice is empty
502    /// * [`TlqError::Connection`] for network connectivity issues
503    /// * [`TlqError::Timeout`] if the request times out
504    /// * [`TlqError::Server`] for server-side errors (4xx/5xx HTTP responses)
505    pub async fn delete_messages(&self, ids: &[Uuid]) -> Result<String> {
506        if ids.is_empty() {
507            return Err(TlqError::Validation("No message IDs provided".to_string()));
508        }
509
510        let request = DeleteMessagesRequest { ids: ids.to_vec() };
511        let response: String = self.request("/delete", &request).await?;
512        Ok(response)
513    }
514
515    /// Retries a single message on the TLQ server.
516    ///
517    /// This is a convenience method that calls [`retry_messages`](Self::retry_messages)
518    /// with a single message ID. The message state will be changed back to
519    /// [`MessageState::Ready`], making it available for processing again.
520    ///
521    /// # Arguments
522    ///
523    /// * `id` - The UUID of the message to retry
524    ///
525    /// # Returns
526    ///
527    /// Returns a string indicating the result of the operation (typically "Success").
528    ///
529    /// # Examples
530    ///
531    /// ```no_run
532    /// use tlq_client::TlqClient;
533    ///
534    /// #[tokio::main]
535    /// async fn main() -> Result<(), tlq_client::TlqError> {
536    ///     let client = TlqClient::new("localhost", 1337)?;
537    ///
538    ///     if let Some(message) = client.get_message().await? {
539    ///         // Processing failed, retry the message
540    ///         let result = client.retry_message(message.id).await?;
541    ///         println!("Retry result: {}", result);
542    ///     }
543    ///
544    ///     Ok(())
545    /// }
546    /// ```
547    ///
548    /// # Errors
549    ///
550    /// * [`TlqError::Connection`] for network connectivity issues
551    /// * [`TlqError::Timeout`] if the request times out
552    /// * [`TlqError::Server`] for server-side errors (4xx/5xx HTTP responses)
553    pub async fn retry_message(&self, id: Uuid) -> Result<String> {
554        self.retry_messages(&[id]).await
555    }
556
557    /// Retries multiple messages on the TLQ server.
558    ///
559    /// This method changes the state of the specified messages back to
560    /// [`MessageState::Ready`], making them available for processing again.
561    /// The retry count for each message will be incremented.
562    ///
563    /// # Arguments
564    ///
565    /// * `ids` - A slice of message UUIDs to retry (must not be empty)
566    ///
567    /// # Returns
568    ///
569    /// Returns a string indicating the result of the operation (typically "Success").
570    ///
571    /// # Examples
572    ///
573    /// ```no_run
574    /// use tlq_client::TlqClient;
575    ///
576    /// #[tokio::main]
577    /// async fn main() -> Result<(), tlq_client::TlqError> {
578    ///     let client = TlqClient::new("localhost", 1337)?;
579    ///
580    ///     let messages = client.get_messages(10).await?;
581    ///     let ids: Vec<_> = messages.iter().map(|m| m.id).collect();
582    ///
583    ///     if !ids.is_empty() {
584    ///         let result = client.retry_messages(&ids).await?;
585    ///         println!("Retry result: {}", result);
586    ///     }
587    ///
588    ///     Ok(())
589    /// }
590    /// ```
591    ///
592    /// # Errors
593    ///
594    /// * [`TlqError::Validation`] if the `ids` slice is empty
595    /// * [`TlqError::Connection`] for network connectivity issues
596    /// * [`TlqError::Timeout`] if the request times out
597    /// * [`TlqError::Server`] for server-side errors (4xx/5xx HTTP responses)
598    pub async fn retry_messages(&self, ids: &[Uuid]) -> Result<String> {
599        if ids.is_empty() {
600            return Err(TlqError::Validation("No message IDs provided".to_string()));
601        }
602
603        let request = RetryMessagesRequest { ids: ids.to_vec() };
604        let response: String = self.request("/retry", &request).await?;
605        Ok(response)
606    }
607
608    /// Removes all messages from the TLQ server queue.
609    ///
610    /// This method permanently deletes all messages in the queue regardless of their state.
611    /// Use with caution as this operation cannot be undone.
612    ///
613    /// # Returns
614    ///
615    /// Returns a string indicating the result of the operation (typically "Success").
616    ///
617    /// # Examples
618    ///
619    /// ```no_run
620    /// use tlq_client::TlqClient;
621    ///
622    /// #[tokio::main]
623    /// async fn main() -> Result<(), tlq_client::TlqError> {
624    ///     let client = TlqClient::new("localhost", 1337)?;
625    ///
626    ///     // Clear all messages from the queue
627    ///     let result = client.purge_queue().await?;
628    ///     println!("Purge result: {}", result);
629    ///     
630    ///     Ok(())
631    /// }
632    /// ```
633    ///
634    /// # Errors
635    ///
636    /// * [`TlqError::Connection`] for network connectivity issues
637    /// * [`TlqError::Timeout`] if the request times out
638    /// * [`TlqError::Server`] for server-side errors (4xx/5xx HTTP responses)
639    pub async fn purge_queue(&self) -> Result<String> {
640        let response: String = self.request("/purge", &serde_json::json!({})).await?;
641        Ok(response)
642    }
643
644    /// Retrieves queue statistics from the TLQ server.
645    ///
646    /// Returns counts of messages in each state (ready, processing) and
647    /// the cumulative count of dead messages removed by the reaper.
648    ///
649    /// # Returns
650    ///
651    /// Returns a [`QueueStats`] with the current queue statistics.
652    ///
653    /// # Examples
654    ///
655    /// ```no_run
656    /// use tlq_client::TlqClient;
657    ///
658    /// #[tokio::main]
659    /// async fn main() -> Result<(), tlq_client::TlqError> {
660    ///     let client = TlqClient::new("localhost", 1337)?;
661    ///
662    ///     let stats = client.stats().await?;
663    ///     println!("Ready: {}, Processing: {}, Dead: {}",
664    ///         stats.ready, stats.processing, stats.dead);
665    ///
666    ///     Ok(())
667    /// }
668    /// ```
669    ///
670    /// # Errors
671    ///
672    /// * [`TlqError::Connection`] for network connectivity issues
673    /// * [`TlqError::Timeout`] if the request times out
674    /// * [`TlqError::Server`] for server-side errors (4xx/5xx HTTP responses)
675    pub async fn stats(&self) -> Result<QueueStats> {
676        self.get_request("/stats").await
677    }
678
679    // Helper function to parse HTTP response - extracted for testing
680    fn parse_http_response(response: &str) -> Result<&str> {
681        if let Some(body_start) = response.find("\r\n\r\n") {
682            let headers = &response[..body_start];
683            let body = &response[body_start + 4..];
684
685            if let Some(status_line) = headers.lines().next() {
686                let parts: Vec<&str> = status_line.split_whitespace().collect();
687                if parts.len() >= 2 {
688                    if let Ok(status_code) = parts[1].parse::<u16>() {
689                        if status_code >= 400 {
690                            return Err(TlqError::Server {
691                                status: status_code,
692                                message: body.to_string(),
693                            });
694                        }
695                    }
696                }
697            }
698
699            Ok(body)
700        } else {
701            Err(TlqError::Connection("Invalid HTTP response".to_string()))
702        }
703    }
704}
705
706#[cfg(test)]
707mod tests {
708    use super::*;
709
710    #[test]
711    fn test_parse_http_response_success() {
712        let response =
713            "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\n\r\n{\"message\":\"success\"}";
714
715        let result = TlqClient::parse_http_response(response);
716        assert!(result.is_ok());
717        assert_eq!(result.unwrap(), "{\"message\":\"success\"}");
718    }
719
720    #[test]
721    fn test_parse_http_response_server_error() {
722        let response = "HTTP/1.1 500 Internal Server Error\r\nContent-Type: text/plain\r\n\r\nInternal server error occurred";
723
724        let result = TlqClient::parse_http_response(response);
725        match result {
726            Err(TlqError::Server { status, message }) => {
727                assert_eq!(status, 500);
728                assert_eq!(message, "Internal server error occurred");
729            }
730            _ => panic!("Expected server error"),
731        }
732    }
733
734    #[test]
735    fn test_parse_http_response_client_error() {
736        let response = "HTTP/1.1 400 Bad Request\r\nContent-Type: text/plain\r\n\r\nBad request";
737
738        let result = TlqClient::parse_http_response(response);
739        match result {
740            Err(TlqError::Server { status, message }) => {
741                assert_eq!(status, 400);
742                assert_eq!(message, "Bad request");
743            }
744            _ => panic!("Expected client error"),
745        }
746    }
747
748    #[test]
749    fn test_parse_http_response_no_headers_separator() {
750        let response =
751            "HTTP/1.1 200 OK\nContent-Type: application/json\n{\"incomplete\":\"response\"}";
752
753        let result = TlqClient::parse_http_response(response);
754        match result {
755            Err(TlqError::Connection(msg)) => {
756                assert_eq!(msg, "Invalid HTTP response");
757            }
758            _ => panic!("Expected connection error"),
759        }
760    }
761
762    #[test]
763    fn test_parse_http_response_malformed_status_line() {
764        let response = "INVALID_STATUS_LINE\r\n\r\n{\"data\":\"test\"}";
765
766        let result = TlqClient::parse_http_response(response);
767        // Should still succeed because we only check if parts.len() >= 2 and parse fails gracefully
768        assert!(result.is_ok());
769        assert_eq!(result.unwrap(), "{\"data\":\"test\"}");
770    }
771
772    #[test]
773    fn test_parse_http_response_empty_body() {
774        let response = "HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n";
775
776        let result = TlqClient::parse_http_response(response);
777        assert!(result.is_ok());
778        assert_eq!(result.unwrap(), "");
779    }
780
781    #[test]
782    fn test_parse_http_response_with_extra_headers() {
783        let response = "HTTP/1.1 201 Created\r\nContent-Type: application/json\r\nServer: TLQ/1.0\r\nConnection: close\r\n\r\n{\"id\":\"123\",\"status\":\"created\"}";
784
785        let result = TlqClient::parse_http_response(response);
786        assert!(result.is_ok());
787        assert_eq!(result.unwrap(), "{\"id\":\"123\",\"status\":\"created\"}");
788    }
789
790    #[test]
791    fn test_parse_http_response_status_code_edge_cases() {
792        // Test various status codes around the 400 boundary
793
794        // 399 should be success (< 400)
795        let response_399 = "HTTP/1.1 399 Custom Success\r\n\r\n{\"ok\":true}";
796        let result = TlqClient::parse_http_response(response_399);
797        assert!(result.is_ok());
798
799        // 400 should be error (>= 400)
800        let response_400 = "HTTP/1.1 400 Bad Request\r\n\r\nBad request";
801        let result = TlqClient::parse_http_response(response_400);
802        assert!(matches!(result, Err(TlqError::Server { status: 400, .. })));
803
804        // 599 should be error
805        let response_599 = "HTTP/1.1 599 Custom Error\r\n\r\nCustom error";
806        let result = TlqClient::parse_http_response(response_599);
807        assert!(matches!(result, Err(TlqError::Server { status: 599, .. })));
808    }
809
810    #[test]
811    fn test_max_message_size_constant() {
812        assert_eq!(MAX_MESSAGE_SIZE, 65536);
813    }
814
815    #[test]
816    fn test_client_creation() {
817        let client = TlqClient::new("test-host", 9999);
818        assert!(client.is_ok());
819
820        let client = client.unwrap();
821        assert_eq!(client.base_url, "test-host:9999");
822    }
823
824    #[test]
825    fn test_client_with_config() {
826        let config = Config {
827            host: "custom-host".to_string(),
828            port: 8080,
829            timeout: Duration::from_secs(10),
830            max_retries: 5,
831            retry_delay: Duration::from_millis(200),
832        };
833
834        let client = TlqClient::with_config(config);
835        assert_eq!(client.base_url, "custom-host:8080");
836        assert_eq!(client.config.max_retries, 5);
837        assert_eq!(client.config.timeout, Duration::from_secs(10));
838    }
839
840    #[test]
841    fn test_message_size_validation() {
842        let _client = TlqClient::new("localhost", 1337).unwrap();
843
844        // Test exact limit
845        let message_at_limit = "x".repeat(MAX_MESSAGE_SIZE);
846        let result = std::panic::catch_unwind(|| {
847            // We can't actually test async methods in sync tests without tokio,
848            // but we can verify the constant is correct
849            assert_eq!(message_at_limit.len(), MAX_MESSAGE_SIZE);
850        });
851        assert!(result.is_ok());
852
853        // Test over limit
854        let message_over_limit = "x".repeat(MAX_MESSAGE_SIZE + 1);
855        assert_eq!(message_over_limit.len(), MAX_MESSAGE_SIZE + 1);
856    }
857
858    #[tokio::test]
859    async fn test_add_message_size_validation() {
860        let client = TlqClient::new("localhost", 1337).unwrap();
861
862        // Test message at exact size limit (should be rejected because it's over the limit)
863        let large_message = "x".repeat(MAX_MESSAGE_SIZE + 1);
864        let result = client.add_message(large_message).await;
865
866        match result {
867            Err(TlqError::MessageTooLarge { size }) => {
868                assert_eq!(size, MAX_MESSAGE_SIZE + 1);
869            }
870            _ => panic!("Expected MessageTooLarge error"),
871        }
872
873        // Test empty message (should be valid)
874        let empty_message = "";
875        // We can't actually test without a server, but we can verify it passes size validation
876        assert!(empty_message.len() <= MAX_MESSAGE_SIZE);
877
878        // Test message exactly at limit (should be valid)
879        let max_message = "x".repeat(MAX_MESSAGE_SIZE);
880        // Size check should pass
881        assert_eq!(max_message.len(), MAX_MESSAGE_SIZE);
882    }
883
884    #[tokio::test]
885    async fn test_get_messages_validation() {
886        let client = TlqClient::new("localhost", 1337).unwrap();
887
888        // Test zero count (should be rejected)
889        let result = client.get_messages(0).await;
890        match result {
891            Err(TlqError::Validation(msg)) => {
892                assert_eq!(msg, "Count must be greater than 0");
893            }
894            _ => panic!("Expected validation error for zero count"),
895        }
896
897        // Test valid counts - these should pass without validation errors
898        let _ = client.get_messages(1).await; // Should be valid
899        let _ = client.get_messages(100).await; // Should be valid
900        let _ = client.get_messages(u32::MAX).await; // Should be valid
901    }
902
903    #[tokio::test]
904    async fn test_delete_messages_validation() {
905        let client = TlqClient::new("localhost", 1337).unwrap();
906
907        // Test empty IDs array
908        let result = client.delete_messages(&[]).await;
909        match result {
910            Err(TlqError::Validation(msg)) => {
911                assert_eq!(msg, "No message IDs provided");
912            }
913            _ => panic!("Expected validation error for empty IDs"),
914        }
915
916        // Test delete_message (single ID) - should not have validation issue
917        use uuid::Uuid;
918        let test_id = Uuid::now_v7();
919        // We can't test the actual call without a server, but we can verify
920        // it would call delete_messages with a single-item array
921        assert!(!vec![test_id].is_empty());
922    }
923
924    #[tokio::test]
925    async fn test_retry_messages_validation() {
926        let client = TlqClient::new("localhost", 1337).unwrap();
927
928        // Test empty IDs array
929        let result = client.retry_messages(&[]).await;
930        match result {
931            Err(TlqError::Validation(msg)) => {
932                assert_eq!(msg, "No message IDs provided");
933            }
934            _ => panic!("Expected validation error for empty IDs"),
935        }
936
937        // Test retry_message (single ID) - should not have validation issue
938        use uuid::Uuid;
939        let test_id = Uuid::now_v7();
940        // We can't test the actual call without a server, but we can verify
941        // it would call retry_messages with a single-item array
942        assert!(!vec![test_id].is_empty());
943    }
944
945    #[test]
946    fn test_client_builder_edge_cases() {
947        // Test builder with minimum values
948        let client = TlqClient::builder()
949            .host("")
950            .port(0)
951            .timeout_ms(0)
952            .max_retries(0)
953            .retry_delay_ms(0)
954            .build();
955
956        assert_eq!(client.base_url, ":0");
957        assert_eq!(client.config.max_retries, 0);
958        assert_eq!(client.config.timeout, Duration::from_millis(0));
959
960        // Test builder with maximum reasonable values
961        let client = TlqClient::builder()
962            .host("very-long-hostname-that-might-be-used-in-some-environments")
963            .port(65535)
964            .timeout_ms(600000) // 10 minutes
965            .max_retries(100)
966            .retry_delay_ms(10000) // 10 seconds
967            .build();
968
969        assert!(client.base_url.contains("very-long-hostname"));
970        assert_eq!(client.config.max_retries, 100);
971        assert_eq!(client.config.timeout, Duration::from_secs(600));
972    }
973
974    #[test]
975    fn test_config_validation() {
976        use crate::config::ConfigBuilder;
977        use std::time::Duration;
978
979        // Test various duration configurations
980        let config1 = ConfigBuilder::new()
981            .timeout(Duration::from_nanos(1))
982            .build_config();
983        assert_eq!(config1.timeout, Duration::from_nanos(1));
984
985        let config2 = ConfigBuilder::new()
986            .retry_delay(Duration::from_secs(3600)) // 1 hour
987            .build_config();
988        assert_eq!(config2.retry_delay, Duration::from_secs(3600));
989
990        // Test edge case ports
991        let config3 = ConfigBuilder::new().port(1).build_config();
992        assert_eq!(config3.port, 1);
993
994        let config4 = ConfigBuilder::new().port(65535).build_config();
995        assert_eq!(config4.port, 65535);
996
997        // Test very high retry counts
998        let config5 = ConfigBuilder::new().max_retries(1000).build_config();
999        assert_eq!(config5.max_retries, 1000);
1000    }
1001}