async_duplex_channel/
lib.rs

1//! # Async Duplex Channel Library
2//!
3//! This library provides an asynchronous duplex communication channel between multiple clients and a single responder
4//! in different asynchronous blocks. It enables multiple clients to communicate with a single responder,
5//! allowing a client to send requests and receive responses, while the responder processes requests
6//! and sends back responses. It is designed for high-concurrency scenarios and supports both single
7//! and batch requests, as well as configurable timeouts and concurrency strategies.
8//!
9//! ## Key Features
10//! - **Asynchronous Communication**: Built on top of `tokio` and `futures`, enabling non-blocking I/O.
11//! - **Thread Safety**: Uses `Arc` and `Mutex` to ensure thread-safe communication.
12//! - **Flexible Concurrency**: Supports both fixed and dynamic concurrency strategies for request processing.
13//! - **Batch Requests**: Allows sending multiple requests in a batch with configurable concurrency.
14//! - **Configurable Timeouts**: Provides customizable timeouts for requests and responses.
15//! - **Error Handling**: Detailed error types for easy debugging and error recovery.
16//!
17//! ## Usage
18//!
19//! ### Basic Example
20//!
21//! ```rust
22//! use async_duplex_channel::{channel, Client, ResponderBuilder, ConcurrencyStrategy};
23//! use std::time::Duration;
24//! use tokio::runtime::Runtime;
25//!
26//! fn basic_example() {
27//!     let rt = Runtime::new().unwrap();
28//!     rt.block_on(async {
29//!         // Create a channel with a buffer size of 64 and a default timeout of 2 seconds.
30//!         let (mut client, responder_builder) = channel(64, Duration::from_secs(2));
31//!
32//!         // Build the responder with a simple echo handler.
33//!         let responder = responder_builder.build(|req: String| async move {
34//!             req
35//!         });
36//!
37//!         // Spawn the responder to process requests.
38//!         tokio::spawn(async move {
39//!             responder.process_requests().await.unwrap();
40//!         });
41//!
42//!         // Send a request and wait for the response.
43//!         let response = client.request("Hello, world!".to_string()).await.unwrap();
44//!         println!("Response: {}", response);
45//!     });
46//! }
47//! ```
48//!
49//! ### Batch Requests Example
50//!
51//! ```rust
52//! use async_duplex_channel::{channel, Client, ResponderBuilder, ConcurrencyStrategy};
53//! use std::time::Duration;
54//! use tokio::runtime::Runtime;
55//!
56//! fn batch_requests() {
57//!     let rt = Runtime::new().unwrap();
58//!     rt.block_on(async {
59//!         // Create a channel with a buffer size of 64 and a default timeout of 2 seconds.
60//!         let (mut client, responder_builder) = channel(64, Duration::from_secs(2));
61//!
62//!         // Build the responder with a simple echo handler.
63//!         let responder = responder_builder.build(|req: String| async move {
64//!             req
65//!         });
66//!
67//!         // Spawn the responder to process requests.
68//!         tokio::spawn(async move {
69//!             responder.process_requests().await.unwrap();
70//!         });
71//!
72//!         // Send a batch of requests with a concurrency limit of 4.
73//!         let requests = vec!["Request 1".to_string(), "Request 2".to_string()];
74//!         let responses = client.request_batch(requests, 4).await.unwrap();
75//!         for response in responses {
76//!             println!("Response: {}", response);
77//!         }
78//!     });
79//! }
80//! ```
81//!
82//! ### Dynamic Concurrency Example
83//!
84//! ```rust
85//! use async_duplex_channel::{channel, Client, ResponderBuilder, ConcurrencyStrategy};
86//! use std::time::Duration;
87//! use tokio::runtime::Runtime;
88//!
89//! fn dynamic_concurrency() {
90//!     let rt = Runtime::new().unwrap();
91//!     rt.block_on(async {
92//!         // Create a channel with a buffer size of 64 and a default timeout of 2 seconds.
93//!         let (mut client, responder_builder) = channel(64, Duration::from_secs(2));
94//!
95//!         // Build the responder with a simple echo handler.
96//!         let responder = responder_builder.build(|req: String| async move {
97//!             req
98//!         });
99//!
100//!         // Spawn the responder to process requests with dynamic concurrency.
101//!         tokio::spawn(async move {
102//!             responder
103//!                 .process_requests_with_strategy(ConcurrencyStrategy::Dynamic(4, 16))
104//!                 .await
105//!                 .unwrap();
106//!         });
107//!
108//!         // Send a request and wait for the response.
109//!         let response = client.request("Hello, world!".to_string()).await.unwrap();
110//!         println!("Response: {}", response);
111//!     });
112//! }
113//! ```
114//!
115//! ## Error Handling
116//!
117//! The library provides detailed error types through the `Error` enum, which covers common failure
118//! scenarios such as send failures, receive failures, timeouts, and internal errors. Each error variant
119//! includes a descriptive message for easy debugging.
120//!
121//! ## Concurrency Strategies
122//!
123//! The responder supports two concurrency strategies:
124//! - **Fixed Concurrency**: Processes a fixed number of requests concurrently.
125//! - **Dynamic Concurrency**: Adjusts the concurrency level based on response times, ensuring optimal
126//!   resource utilization under varying workloads.
127//!
128//! ## Performance Considerations
129//! - Use `request_batch` for high-throughput scenarios to reduce overhead.
130//! - Choose the appropriate concurrency strategy based on your workload characteristics.
131//! - Monitor and adjust timeouts to balance responsiveness and resource usage.
132//!
133//! ## Dependencies
134//! - `tokio`: For asynchronous runtime support.
135//! - `futures`: For future and stream utilities.
136//! - `sharded_slab`: For efficient storage of pending requests.
137//! - `thiserror`: For ergonomic error handling.
138//!
139//! ## License
140//! This library is licensed under the MIT License. See the LICENSE file for details.
141
142use std::pin::Pin;
143use std::sync::atomic::{AtomicUsize, Ordering};
144use std::sync::Arc;
145use std::time::{Duration, Instant};
146
147use future::FutureExt;
148use futures::channel::{mpsc, oneshot};
149use futures::future::join_all;
150use futures::prelude::*;
151
152use sharded_slab::Slab;
153
154use tokio::sync::Mutex;
155use tokio::time::sleep;
156
157use thiserror::Error;
158
159/// Represents errors that can occur during the operation of the client or responder.
160#[derive(Debug, Error)]
161pub enum Error {
162    #[error("Send failed: {0}")]
163    SendFailed(String),
164    #[error("Receive failed: {0}")]
165    ReceiveFailed(String),
166    #[error("Response timeout")]
167    Timeout,
168    #[error("Send response failed: {0}")]
169    SendResponseFailed(String),
170    #[error("Internal error: {0}")]
171    InternalError(String),
172}
173
174/// A client for sending requests and receiving responses.
175///
176/// The client is thread-safe and can be cloned to share across multiple tasks.
177/// It supports both single and batch requests, with configurable timeouts.
178///
179/// # Type Parameters
180/// - `Req`: The type of the request, must be `Send + Sync + 'static`.
181/// - `Resp`: The type of the response, must be `Send + Sync + 'static`.
182#[derive(Clone)]
183pub struct Client<Req, Resp>
184where
185    Req: Send + Sync + 'static,
186    Resp: Send + Sync + 'static,
187{
188    tx: Arc<Mutex<mpsc::Sender<(usize, Req)>>>,
189    pending: Arc<Slab<oneshot::Sender<Resp>>>,
190    timeout: Duration,
191}
192
193/// A builder for creating a responder.
194///
195/// The responder processes incoming requests and sends back responses.
196/// It supports both fixed and dynamic concurrency strategies.
197///
198/// # Type Parameters
199/// - `Req`: The type of the request, must be `Send + Sync + 'static`.
200/// - `Resp`: The type of the response, must be `Send + Sync + 'static`.
201pub struct ResponderBuilder<Req, Resp>
202where
203    Req: Send + Sync + 'static,
204    Resp: Send + Sync + 'static,
205{
206    req_rx: mpsc::Receiver<(usize, Req)>,
207    resp_tx: mpsc::Sender<(usize, Resp)>,
208}
209
210impl<Req, Resp> ResponderBuilder<Req, Resp>
211where
212    Req: Send + Sync + 'static,
213    Resp: Send + Sync + 'static,
214{
215    /// Builds a responder with the provided request handler.
216    ///
217    /// # Parameters
218    /// - `handler`: A function that processes a request and returns a future resolving to a response.
219    ///
220    /// # Returns
221    /// A `Responder` instance ready to process requests.
222    pub fn build<
223        Fut: Future<Output = Resp> + Send + 'static,
224        F: (FnMut(Req) -> Fut) + Send + 'static,
225    >(
226        self,
227        mut handler: F,
228    ) -> Responder<Req, Resp> {
229        // let hdl = Box::pin(move |req: Req| {
230
231        // });
232        let hdl = move |req: Req| {
233            let pinned_fut: InnerPinBoxFuture<Resp> = Box::pin(handler(req));
234            pinned_fut
235        };
236        let handler: PinBoxHandler<Req, Resp> = Box::pin(hdl);
237        Responder {
238            req_rx: self.req_rx,
239            resp_tx: self.resp_tx,
240            handler,
241        }
242    }
243}
244
245type InnerPinBoxFuture<Resp> = Pin<Box<dyn Future<Output = Resp> + Send>>;
246type PinBoxHandler<Req, Resp> = Pin<Box<dyn (FnMut(Req) -> InnerPinBoxFuture<Resp>) + Send>>;
247
248/// A responder that processes incoming requests and sends back responses.
249///
250/// The responder supports both fixed and dynamic concurrency strategies.
251///
252/// # Type Parameters
253/// - `Req`: The type of the request, must be `Send + Sync + 'static`.
254/// - `Resp`: The type of the response, must be `Send + Sync + 'static`.
255/// - `Fut`: The future returned by the request handler.
256/// - `F`: The type of the request handler function.
257pub struct Responder<Req, Resp>
258where
259    Req: Send + Sync + 'static,
260    Resp: Send + Sync + 'static,
261    // Fut: Future<Output = Resp> + Send,
262    // F: (FnMut(Req) -> Fut) + Send + Sync,
263{
264    req_rx: mpsc::Receiver<(usize, Req)>,
265    resp_tx: mpsc::Sender<(usize, Resp)>,
266    handler: PinBoxHandler<Req, Resp>,
267}
268
269/// Represents the concurrency strategy for processing requests.
270///
271/// - `Fixed`: A fixed number of concurrent requests.
272/// - `Dynamic`: Dynamically adjusts the concurrency based on response times.
273#[derive(Debug, Clone, Copy)]
274pub enum ConcurrencyStrategy {
275    Fixed(usize),
276    Dynamic(usize, usize),
277}
278
279impl<Req, Resp> Client<Req, Resp>
280where
281    Req: Send + Sync + 'static,
282    Resp: Send + Sync + 'static,
283{
284    /// Sends a single request with a custom timeout.
285    ///
286    /// # Parameters
287    /// - `req`: The request to send.
288    /// - `timeout`: The maximum duration to wait for a response.
289    ///
290    /// # Returns
291    /// - `Ok(Resp)`: The response from the server.
292    /// - `Err(Error)`: An error if the request fails or times out.
293    pub async fn request_timeout(&self, req: Req, timeout: Duration) -> Result<Resp, Error> {
294        let (tx, rx) = oneshot::channel();
295
296        let id = match self.pending.insert(tx) {
297            Some(id) => id,
298            None => {
299                return Err(Error::InternalError(
300                    "Failed to insert into pending slab".into(),
301                ));
302            }
303        };
304
305        self.tx
306            .lock()
307            .await
308            .send((id, req))
309            .map_err(|e| Error::SendFailed(e.to_string()))
310            .await?;
311
312        let pending = self.pending.clone();
313        tokio::select! {
314            resp = rx => {
315                resp.map_err(|e| Error::ReceiveFailed(e.to_string()))
316            },
317            _ = sleep(timeout) => {
318                pending.remove(id);
319                Err(Error::Timeout)
320            }
321        }
322    }
323
324    /// Sends a single request with the default timeout.
325    ///
326    /// # Parameters
327    /// - `req`: The request to send.
328    ///
329    /// # Returns
330    /// - `Ok(Resp)`: The response from the server.
331    /// - `Err(Error)`: An error if the request fails or times out.
332    pub async fn request(&self, req: Req) -> Result<Resp, Error> {
333        self.request_timeout(req, self.timeout).await
334    }
335
336    /// Sends a batch of requests with a custom timeout and concurrency limit.
337    ///
338    /// # Parameters
339    /// - `reqs`: An iterator of requests to send.
340    /// - `timeout`: The maximum duration to wait for all responses.
341    /// - `concurrency`: The maximum number of concurrent requests.
342    ///
343    /// # Returns
344    /// - `Ok(Vec<Resp>)`: A vector of responses from the server.
345    /// - `Err(Error)`: An error if any request fails or times out.
346    pub async fn request_batch_timeout<ReqSeq>(
347        &self,
348        reqs: ReqSeq,
349        timeout: Duration,
350        concurrency: usize,
351    ) -> Result<Vec<Resp>, Error>
352    where
353        ReqSeq: IntoIterator<Item = Req> + Send + 'static,
354    {
355        let req_seq: Vec<_> = reqs.into_iter().collect();
356        let count = req_seq.len();
357
358        let mut ids: Vec<usize> = Vec::with_capacity(count);
359        let mut rxs: Vec<oneshot::Receiver<Resp>> = Vec::with_capacity(count);
360
361        for _ in 0..count {
362            let (tx, rx) = oneshot::channel();
363            let id = self.pending.insert(tx).unwrap();
364
365            ids.push(id);
366            rxs.push(rx);
367        }
368
369        stream::iter(ids.clone().into_iter().zip(req_seq.into_iter()))
370            .for_each_concurrent(concurrency, |v| async {
371                self.tx.lock().await.send(v).await.unwrap();
372            })
373            .await;
374
375        tokio::select! {
376            results = join_all(rxs) => {
377                let results = results
378                    .into_iter()
379                    .collect::<Result<Vec<_>, _>>()
380                    .map_err(|e| Error::ReceiveFailed(e.to_string()))?;
381                Ok(results)
382            },
383            _ = sleep(timeout) => {
384                for &id in ids.iter() {
385                    self.pending.remove(id);
386                }
387                Err(Error::Timeout)
388            }
389        }
390    }
391
392    /// Sends a batch of requests with the default timeout and a concurrency limit.
393    ///
394    /// # Parameters
395    /// - `reqs`: An iterator of requests to send.
396    /// - `concurrency`: The maximum number of concurrent requests.
397    ///
398    /// # Returns
399    /// - `Ok(Vec<Resp>)`: A vector of responses from the server.
400    /// - `Err(Error)`: An error if any request fails or times out.
401    pub async fn request_batch<ReqSeq>(
402        &self,
403        reqs: ReqSeq,
404        concurrency: usize,
405    ) -> Result<Vec<Resp>, Error>
406    where
407        ReqSeq: IntoIterator<Item = Req> + Send + 'static,
408    {
409        self.request_batch_timeout(reqs, self.timeout * (concurrency as u32), concurrency)
410            .await
411    }
412}
413
414impl<Req, Resp> Responder<Req, Resp>
415where
416    Req: Send + Sync + 'static,
417    Resp: Send + Sync + 'static,
418{
419    /// Processes incoming requests using the specified concurrency strategy.
420    ///
421    /// # Parameters
422    /// - `strategy`: The concurrency strategy to use (`Fixed` or `Dynamic`).
423    ///
424    /// # Returns
425    /// - `Ok(())`: If all requests are processed successfully.
426    /// - `Err(Error)`: If an error occurs during processing.
427    pub async fn process_requests_with_strategy(
428        self,
429        strategy: ConcurrencyStrategy,
430    ) -> Result<(), Error> {
431        match strategy {
432            ConcurrencyStrategy::Fixed(concurrency) => {
433                self.process_requests_fixed(concurrency).await
434            }
435            ConcurrencyStrategy::Dynamic(initial, max) => {
436                self.process_requests_dynamic(initial, max).await
437            }
438        }
439    }
440
441    /// Processes incoming requests with a fixed concurrency of 16.
442    ///
443    /// # Returns
444    /// - `Ok(())`: If all requests are processed successfully.
445    /// - `Err(Error)`: If an error occurs during processing.
446    pub async fn process_requests(self) -> Result<(), Error> {
447        self.process_requests_fixed(16).await
448    }
449
450    /// Processes incoming requests with a fixed concurrency.
451    ///
452    /// # Parameters
453    /// - `concurrency`: The number of concurrent requests to process.
454    ///
455    /// # Returns
456    /// - `Ok(())`: If all requests are processed successfully.
457    /// - `Err(Error)`: If an error occurs during processing.
458    pub async fn process_requests_fixed(self, concurrency: usize) -> Result<(), Error> {
459        let Self {
460            req_rx,
461            resp_tx,
462            mut handler,
463        } = self;
464        req_rx
465            .map(move |(id, req)| {
466                let hdl = unsafe { handler.as_mut().get_unchecked_mut() };
467                let fut = hdl(req);
468                fut.map(move |resp| Ok((id, resp)))
469            })
470            .buffer_unordered(concurrency)
471            .forward(resp_tx)
472            .map_err(|e| Error::SendResponseFailed(e.to_string()))
473            .await?;
474
475        Ok(())
476    }
477
478    /// Processes incoming requests with dynamic concurrency adjustment.
479    ///
480    /// The concurrency is adjusted based on the response time of requests.
481    ///
482    /// # Parameters
483    /// - `initial_concurrency`: The initial number of concurrent requests.
484    /// - `max_concurrency`: The maximum number of concurrent requests.
485    ///
486    /// # Returns
487    /// - `Ok(())`: If all requests are processed successfully.
488    /// - `Err(Error)`: If an error occurs during processing.
489    pub async fn process_requests_dynamic(
490        self,
491        initial_concurrency: usize,
492        max_concurrency: usize,
493    ) -> Result<(), Error> {
494        let Self {
495            req_rx,
496            resp_tx,
497            mut handler,
498        } = self;
499
500        let concurrency = Arc::new(AtomicUsize::new(initial_concurrency));
501
502        let concurrency_cloned = concurrency.clone();
503        req_rx
504            .map(move |(id, req)| {
505                let concurrency = Arc::clone(&concurrency);
506                let start = Instant::now();
507
508                let hdl = unsafe { handler.as_mut().get_unchecked_mut() };
509
510                let fut = hdl(req);
511
512                fut.map(move |resp| {
513                    let dur = start.elapsed();
514                    let currency = concurrency.load(Ordering::Relaxed);
515                    if dur < Duration::from_millis(10) {
516                        if currency < max_concurrency {
517                            let increment = std::cmp::max(1, currency / 4);
518                            let new_value =
519                                std::cmp::min(max_concurrency, currency.saturating_add(increment));
520                            concurrency.store(new_value, Ordering::Relaxed);
521                        }
522                    } else if dur < Duration::from_millis(100) {
523                        if currency > 1 {
524                            concurrency.fetch_sub(1, Ordering::Relaxed);
525                        }
526                    } else {
527                        let decrement = std::cmp::max(2, currency / 4);
528                        let new_value = std::cmp::min(1, currency.saturating_sub(decrement));
529                        concurrency.store(new_value, Ordering::Relaxed);
530                    }
531                    Ok((id, resp))
532                })
533            })
534            .buffer_unordered(concurrency_cloned.load(Ordering::Relaxed))
535            .forward(resp_tx)
536            .map_err(|e| Error::SendResponseFailed(e.to_string()))
537            .await?;
538
539        Ok(())
540    }
541}
542
543/// Creates a new client-responder pair with the specified buffer size and timeout.
544///
545/// # Parameters
546/// - `buffer`: The size of the request and response buffers.
547/// - `timeout`: The default timeout for requests.
548///
549/// # Returns
550/// A tuple containing a `Client` and a `ResponderBuilder`.
551pub fn channel<Req, Resp>(
552    buffer: usize,
553    timeout: Duration,
554) -> (Client<Req, Resp>, ResponderBuilder<Req, Resp>)
555where
556    Req: Send + Sync + 'static,
557    Resp: Send + Sync + 'static,
558{
559    let (req_tx, req_rx) = mpsc::channel(buffer);
560    let (resp_tx, resp_rx) = mpsc::channel(buffer);
561
562    let client = Client {
563        tx: Arc::new(Mutex::new(req_tx)),
564        pending: Arc::new(Slab::new()),
565        timeout,
566    };
567
568    let responder_builder = ResponderBuilder { req_rx, resp_tx };
569
570    let pending = client.pending.clone();
571
572    tokio::spawn(async move {
573        resp_rx
574            .for_each_concurrent(64, |(id, res)| {
575                let pending = pending.clone();
576                async move {
577                    if let Some(tx) = pending.take(id) {
578                        let _ = tx.send(res);
579                    }
580                }
581            })
582            .await;
583    });
584
585    (client, responder_builder)
586}
587
588#[cfg(test)]
589mod tests {
590    use super::*;
591    use std::time::Duration;
592    use tokio::runtime::Runtime;
593
594    /// Test sending a single request and receiving a response.
595    #[test]
596    fn test_single_request() {
597        let rt = Runtime::new().unwrap();
598        rt.block_on(async {
599            let (client, responder_builder) = channel(64, Duration::from_secs(2));
600            let responder = responder_builder.build(|req: String| async move { req });
601
602            tokio::spawn(async move {
603                responder.process_requests().await.unwrap();
604            });
605
606            let response = client.request("Hello, world!".to_string()).await.unwrap();
607            assert_eq!(response, "Hello, world!");
608        });
609    }
610
611    /// Test sending a batch of requests and receiving responses.
612    #[test]
613    fn test_batch_requests() {
614        let rt = Runtime::new().unwrap();
615        rt.block_on(async {
616            let (client, responder_builder) = channel(64, Duration::from_secs(2));
617            let responder = responder_builder.build(|req: String| async move { req });
618
619            tokio::spawn(async move {
620                responder.process_requests().await.unwrap();
621            });
622
623            let requests = vec!["Request 1".to_string(), "Request 2".to_string()];
624            let responses = client.request_batch(requests, 4).await.unwrap();
625            assert_eq!(
626                responses,
627                vec!["Request 1".to_string(), "Request 2".to_string()]
628            );
629        });
630    }
631
632    /// Test processing requests with fixed concurrency.
633    #[test]
634    fn test_fixed_concurrency() {
635        let rt = Runtime::new().unwrap();
636        rt.block_on(async {
637            let (client, responder_builder) = channel(64, Duration::from_secs(2));
638            let responder = responder_builder.build(|req: String| async move { req });
639
640            tokio::spawn(async move {
641                responder
642                    .process_requests_with_strategy(ConcurrencyStrategy::Fixed(16))
643                    .await
644                    .unwrap();
645            });
646
647            let response = client.request("Hello, world!".to_string()).await.unwrap();
648            assert_eq!(response, "Hello, world!");
649        });
650    }
651
652    /// Test processing requests with dynamic concurrency.
653    #[test]
654    fn test_dynamic_concurrency() {
655        let rt = Runtime::new().unwrap();
656        rt.block_on(async {
657            let (client, responder_builder) = channel(64, Duration::from_secs(2));
658            let responder = responder_builder.build(|req: String| async move { req });
659
660            tokio::spawn(async move {
661                responder
662                    .process_requests_with_strategy(ConcurrencyStrategy::Dynamic(4, 16))
663                    .await
664                    .unwrap();
665            });
666
667            let response = client.request("Hello, world!".to_string()).await.unwrap();
668            assert_eq!(response, "Hello, world!");
669        });
670    }
671
672    /// Test sending a single request with a custom timeout.
673    #[test]
674    fn test_single_request_timeout() {
675        let rt = Runtime::new().unwrap();
676        rt.block_on(async {
677            let (client, responder_builder) = channel(64, Duration::from_secs(2));
678            let responder = responder_builder.build(|req: String| async move { req });
679
680            tokio::spawn(async move {
681                responder.process_requests().await.unwrap();
682            });
683
684            let response = client
685                .request_timeout("Hello, world!".to_string(), Duration::from_secs(1))
686                .await
687                .unwrap();
688            assert_eq!(response, "Hello, world!");
689        });
690    }
691
692    /// Test sending a batch of requests with a custom timeout.
693    #[test]
694    fn test_batch_requests_timeout() {
695        let rt = Runtime::new().unwrap();
696        rt.block_on(async {
697            let (client, responder_builder) = channel(64, Duration::from_secs(2));
698            let responder = responder_builder.build(|req: String| async move { req });
699
700            tokio::spawn(async move {
701                responder.process_requests().await.unwrap();
702            });
703
704            let requests = vec!["Request 1".to_string(), "Request 2".to_string()];
705            let responses = client
706                .request_batch_timeout(requests, Duration::from_secs(1), 4)
707                .await
708                .unwrap();
709            assert_eq!(
710                responses,
711                vec!["Request 1".to_string(), "Request 2".to_string()]
712            );
713        });
714    }
715
716    /// Test error handling for a failed request.
717    #[test]
718    fn test_request_failure() {
719        let rt = Runtime::new().unwrap();
720        rt.block_on(async {
721            let (client, responder_builder) = channel(64, Duration::from_secs(2));
722            let responder = responder_builder.build(|req: String| async move {
723                if req == "fail" {
724                    Err(Error::InternalError("Request failed".into()))
725                } else {
726                    Ok(req)
727                }
728            });
729
730            tokio::spawn(async move {
731                responder.process_requests().await.unwrap();
732            });
733
734            let result = client.request("fail".to_string()).await.unwrap();
735            assert!(matches!(result, Err(Error::InternalError(_))));
736        });
737    }
738
739    /// Test error handling for a timeout.
740    #[test]
741    fn test_request_timeout() {
742        let rt = Runtime::new().unwrap();
743        rt.block_on(async {
744            let (client, responder_builder) = channel(64, Duration::from_secs(2));
745            let responder = responder_builder.build(|req: String| async move {
746                tokio::time::sleep(Duration::from_secs(3)).await;
747                req
748            });
749
750            tokio::spawn(async move {
751                responder.process_requests().await.unwrap();
752            });
753
754            let result = client
755                .request_timeout("Hello, world!".to_string(), Duration::from_secs(1))
756                .await;
757            assert!(matches!(result, Err(Error::Timeout)));
758        });
759    }
760}