async_duplex_channel/lib.rs
1//! # Async Duplex Channel Library
2//!
3//! This library provides an asynchronous duplex communication channel between multiple clients and a single responder
4//! in different asynchronous blocks. It enables multiple clients to communicate with a single responder,
5//! allowing a client to send requests and receive responses, while the responder processes requests
6//! and sends back responses. It is designed for high-concurrency scenarios and supports both single
7//! and batch requests, as well as configurable timeouts and concurrency strategies.
8//!
9//! ## Key Features
10//! - **Asynchronous Communication**: Built on top of `tokio` and `futures`, enabling non-blocking I/O.
11//! - **Thread Safety**: Uses `Arc` and `Mutex` to ensure thread-safe communication.
12//! - **Flexible Concurrency**: Supports both fixed and dynamic concurrency strategies for request processing.
13//! - **Batch Requests**: Allows sending multiple requests in a batch with configurable concurrency.
14//! - **Configurable Timeouts**: Provides customizable timeouts for requests and responses.
15//! - **Error Handling**: Detailed error types for easy debugging and error recovery.
16//!
17//! ## Usage
18//!
19//! ### Basic Example
20//!
21//! ```rust
22//! use async_duplex_channel::{channel, Client, ResponderBuilder, ConcurrencyStrategy};
23//! use std::time::Duration;
24//! use tokio::runtime::Runtime;
25//!
26//! fn basic_example() {
27//! let rt = Runtime::new().unwrap();
28//! rt.block_on(async {
29//! // Create a channel with a buffer size of 64 and a default timeout of 2 seconds.
30//! let (mut client, responder_builder) = channel(64, Duration::from_secs(2));
31//!
32//! // Build the responder with a simple echo handler.
33//! let responder = responder_builder.build(|req: String| async move {
34//! req
35//! });
36//!
37//! // Spawn the responder to process requests.
38//! tokio::spawn(async move {
39//! responder.process_requests().await.unwrap();
40//! });
41//!
42//! // Send a request and wait for the response.
43//! let response = client.request("Hello, world!".to_string()).await.unwrap();
44//! println!("Response: {}", response);
45//! });
46//! }
47//! ```
48//!
49//! ### Batch Requests Example
50//!
51//! ```rust
52//! use async_duplex_channel::{channel, Client, ResponderBuilder, ConcurrencyStrategy};
53//! use std::time::Duration;
54//! use tokio::runtime::Runtime;
55//!
56//! fn batch_requests() {
57//! let rt = Runtime::new().unwrap();
58//! rt.block_on(async {
59//! // Create a channel with a buffer size of 64 and a default timeout of 2 seconds.
60//! let (mut client, responder_builder) = channel(64, Duration::from_secs(2));
61//!
62//! // Build the responder with a simple echo handler.
63//! let responder = responder_builder.build(|req: String| async move {
64//! req
65//! });
66//!
67//! // Spawn the responder to process requests.
68//! tokio::spawn(async move {
69//! responder.process_requests().await.unwrap();
70//! });
71//!
72//! // Send a batch of requests with a concurrency limit of 4.
73//! let requests = vec!["Request 1".to_string(), "Request 2".to_string()];
74//! let responses = client.request_batch(requests, 4).await.unwrap();
75//! for response in responses {
76//! println!("Response: {}", response);
77//! }
78//! });
79//! }
80//! ```
81//!
82//! ### Dynamic Concurrency Example
83//!
84//! ```rust
85//! use async_duplex_channel::{channel, Client, ResponderBuilder, ConcurrencyStrategy};
86//! use std::time::Duration;
87//! use tokio::runtime::Runtime;
88//!
89//! fn dynamic_concurrency() {
90//! let rt = Runtime::new().unwrap();
91//! rt.block_on(async {
92//! // Create a channel with a buffer size of 64 and a default timeout of 2 seconds.
93//! let (mut client, responder_builder) = channel(64, Duration::from_secs(2));
94//!
95//! // Build the responder with a simple echo handler.
96//! let responder = responder_builder.build(|req: String| async move {
97//! req
98//! });
99//!
100//! // Spawn the responder to process requests with dynamic concurrency.
101//! tokio::spawn(async move {
102//! responder
103//! .process_requests_with_strategy(ConcurrencyStrategy::Dynamic(4, 16))
104//! .await
105//! .unwrap();
106//! });
107//!
108//! // Send a request and wait for the response.
109//! let response = client.request("Hello, world!".to_string()).await.unwrap();
110//! println!("Response: {}", response);
111//! });
112//! }
113//! ```
114//!
115//! ## Error Handling
116//!
117//! The library provides detailed error types through the `Error` enum, which covers common failure
118//! scenarios such as send failures, receive failures, timeouts, and internal errors. Each error variant
119//! includes a descriptive message for easy debugging.
120//!
121//! ## Concurrency Strategies
122//!
123//! The responder supports two concurrency strategies:
124//! - **Fixed Concurrency**: Processes a fixed number of requests concurrently.
125//! - **Dynamic Concurrency**: Adjusts the concurrency level based on response times, ensuring optimal
126//! resource utilization under varying workloads.
127//!
128//! ## Performance Considerations
129//! - Use `request_batch` for high-throughput scenarios to reduce overhead.
130//! - Choose the appropriate concurrency strategy based on your workload characteristics.
131//! - Monitor and adjust timeouts to balance responsiveness and resource usage.
132//!
133//! ## Dependencies
134//! - `tokio`: For asynchronous runtime support.
135//! - `futures`: For future and stream utilities.
136//! - `sharded_slab`: For efficient storage of pending requests.
137//! - `thiserror`: For ergonomic error handling.
138//!
139//! ## License
140//! This library is licensed under the MIT License. See the LICENSE file for details.
141
142use std::pin::Pin;
143use std::sync::atomic::{AtomicUsize, Ordering};
144use std::sync::Arc;
145use std::time::{Duration, Instant};
146
147use future::FutureExt;
148use futures::channel::{mpsc, oneshot};
149use futures::future::join_all;
150use futures::prelude::*;
151
152use sharded_slab::Slab;
153
154use tokio::sync::Mutex;
155use tokio::time::sleep;
156
157use thiserror::Error;
158
159/// Represents errors that can occur during the operation of the client or responder.
160#[derive(Debug, Error)]
161pub enum Error {
162 #[error("Send failed: {0}")]
163 SendFailed(String),
164 #[error("Receive failed: {0}")]
165 ReceiveFailed(String),
166 #[error("Response timeout")]
167 Timeout,
168 #[error("Send response failed: {0}")]
169 SendResponseFailed(String),
170 #[error("Internal error: {0}")]
171 InternalError(String),
172}
173
174/// A client for sending requests and receiving responses.
175///
176/// The client is thread-safe and can be cloned to share across multiple tasks.
177/// It supports both single and batch requests, with configurable timeouts.
178///
179/// # Type Parameters
180/// - `Req`: The type of the request, must be `Send + Sync + 'static`.
181/// - `Resp`: The type of the response, must be `Send + Sync + 'static`.
182#[derive(Clone)]
183pub struct Client<Req, Resp>
184where
185 Req: Send + Sync + 'static,
186 Resp: Send + Sync + 'static,
187{
188 tx: Arc<Mutex<mpsc::Sender<(usize, Req)>>>,
189 pending: Arc<Slab<oneshot::Sender<Resp>>>,
190 timeout: Duration,
191}
192
193/// A builder for creating a responder.
194///
195/// The responder processes incoming requests and sends back responses.
196/// It supports both fixed and dynamic concurrency strategies.
197///
198/// # Type Parameters
199/// - `Req`: The type of the request, must be `Send + Sync + 'static`.
200/// - `Resp`: The type of the response, must be `Send + Sync + 'static`.
201pub struct ResponderBuilder<Req, Resp>
202where
203 Req: Send + Sync + 'static,
204 Resp: Send + Sync + 'static,
205{
206 req_rx: mpsc::Receiver<(usize, Req)>,
207 resp_tx: mpsc::Sender<(usize, Resp)>,
208}
209
210impl<Req, Resp> ResponderBuilder<Req, Resp>
211where
212 Req: Send + Sync + 'static,
213 Resp: Send + Sync + 'static,
214{
215 /// Builds a responder with the provided request handler.
216 ///
217 /// # Parameters
218 /// - `handler`: A function that processes a request and returns a future resolving to a response.
219 ///
220 /// # Returns
221 /// A `Responder` instance ready to process requests.
222 pub fn build<
223 Fut: Future<Output = Resp> + Send + 'static,
224 F: (FnMut(Req) -> Fut) + Send + 'static,
225 >(
226 self,
227 mut handler: F,
228 ) -> Responder<Req, Resp> {
229 // let hdl = Box::pin(move |req: Req| {
230
231 // });
232 let hdl = move |req: Req| {
233 let pinned_fut: InnerPinBoxFuture<Resp> = Box::pin(handler(req));
234 pinned_fut
235 };
236 let handler: PinBoxHandler<Req, Resp> = Box::pin(hdl);
237 Responder {
238 req_rx: self.req_rx,
239 resp_tx: self.resp_tx,
240 handler,
241 }
242 }
243}
244
245type InnerPinBoxFuture<Resp> = Pin<Box<dyn Future<Output = Resp> + Send>>;
246type PinBoxHandler<Req, Resp> = Pin<Box<dyn (FnMut(Req) -> InnerPinBoxFuture<Resp>) + Send>>;
247
248/// A responder that processes incoming requests and sends back responses.
249///
250/// The responder supports both fixed and dynamic concurrency strategies.
251///
252/// # Type Parameters
253/// - `Req`: The type of the request, must be `Send + Sync + 'static`.
254/// - `Resp`: The type of the response, must be `Send + Sync + 'static`.
255/// - `Fut`: The future returned by the request handler.
256/// - `F`: The type of the request handler function.
257pub struct Responder<Req, Resp>
258where
259 Req: Send + Sync + 'static,
260 Resp: Send + Sync + 'static,
261 // Fut: Future<Output = Resp> + Send,
262 // F: (FnMut(Req) -> Fut) + Send + Sync,
263{
264 req_rx: mpsc::Receiver<(usize, Req)>,
265 resp_tx: mpsc::Sender<(usize, Resp)>,
266 handler: PinBoxHandler<Req, Resp>,
267}
268
269/// Represents the concurrency strategy for processing requests.
270///
271/// - `Fixed`: A fixed number of concurrent requests.
272/// - `Dynamic`: Dynamically adjusts the concurrency based on response times.
273#[derive(Debug, Clone, Copy)]
274pub enum ConcurrencyStrategy {
275 Fixed(usize),
276 Dynamic(usize, usize),
277}
278
279impl<Req, Resp> Client<Req, Resp>
280where
281 Req: Send + Sync + 'static,
282 Resp: Send + Sync + 'static,
283{
284 /// Sends a single request with a custom timeout.
285 ///
286 /// # Parameters
287 /// - `req`: The request to send.
288 /// - `timeout`: The maximum duration to wait for a response.
289 ///
290 /// # Returns
291 /// - `Ok(Resp)`: The response from the server.
292 /// - `Err(Error)`: An error if the request fails or times out.
293 pub async fn request_timeout(&self, req: Req, timeout: Duration) -> Result<Resp, Error> {
294 let (tx, rx) = oneshot::channel();
295
296 let id = match self.pending.insert(tx) {
297 Some(id) => id,
298 None => {
299 return Err(Error::InternalError(
300 "Failed to insert into pending slab".into(),
301 ));
302 }
303 };
304
305 self.tx
306 .lock()
307 .await
308 .send((id, req))
309 .map_err(|e| Error::SendFailed(e.to_string()))
310 .await?;
311
312 let pending = self.pending.clone();
313 tokio::select! {
314 resp = rx => {
315 resp.map_err(|e| Error::ReceiveFailed(e.to_string()))
316 },
317 _ = sleep(timeout) => {
318 pending.remove(id);
319 Err(Error::Timeout)
320 }
321 }
322 }
323
324 /// Sends a single request with the default timeout.
325 ///
326 /// # Parameters
327 /// - `req`: The request to send.
328 ///
329 /// # Returns
330 /// - `Ok(Resp)`: The response from the server.
331 /// - `Err(Error)`: An error if the request fails or times out.
332 pub async fn request(&self, req: Req) -> Result<Resp, Error> {
333 self.request_timeout(req, self.timeout).await
334 }
335
336 /// Sends a batch of requests with a custom timeout and concurrency limit.
337 ///
338 /// # Parameters
339 /// - `reqs`: An iterator of requests to send.
340 /// - `timeout`: The maximum duration to wait for all responses.
341 /// - `concurrency`: The maximum number of concurrent requests.
342 ///
343 /// # Returns
344 /// - `Ok(Vec<Resp>)`: A vector of responses from the server.
345 /// - `Err(Error)`: An error if any request fails or times out.
346 pub async fn request_batch_timeout<ReqSeq>(
347 &self,
348 reqs: ReqSeq,
349 timeout: Duration,
350 concurrency: usize,
351 ) -> Result<Vec<Resp>, Error>
352 where
353 ReqSeq: IntoIterator<Item = Req> + Send + 'static,
354 {
355 let req_seq: Vec<_> = reqs.into_iter().collect();
356 let count = req_seq.len();
357
358 let mut ids: Vec<usize> = Vec::with_capacity(count);
359 let mut rxs: Vec<oneshot::Receiver<Resp>> = Vec::with_capacity(count);
360
361 for _ in 0..count {
362 let (tx, rx) = oneshot::channel();
363 let id = self.pending.insert(tx).unwrap();
364
365 ids.push(id);
366 rxs.push(rx);
367 }
368
369 stream::iter(ids.clone().into_iter().zip(req_seq.into_iter()))
370 .for_each_concurrent(concurrency, |v| async {
371 self.tx.lock().await.send(v).await.unwrap();
372 })
373 .await;
374
375 tokio::select! {
376 results = join_all(rxs) => {
377 let results = results
378 .into_iter()
379 .collect::<Result<Vec<_>, _>>()
380 .map_err(|e| Error::ReceiveFailed(e.to_string()))?;
381 Ok(results)
382 },
383 _ = sleep(timeout) => {
384 for &id in ids.iter() {
385 self.pending.remove(id);
386 }
387 Err(Error::Timeout)
388 }
389 }
390 }
391
392 /// Sends a batch of requests with the default timeout and a concurrency limit.
393 ///
394 /// # Parameters
395 /// - `reqs`: An iterator of requests to send.
396 /// - `concurrency`: The maximum number of concurrent requests.
397 ///
398 /// # Returns
399 /// - `Ok(Vec<Resp>)`: A vector of responses from the server.
400 /// - `Err(Error)`: An error if any request fails or times out.
401 pub async fn request_batch<ReqSeq>(
402 &self,
403 reqs: ReqSeq,
404 concurrency: usize,
405 ) -> Result<Vec<Resp>, Error>
406 where
407 ReqSeq: IntoIterator<Item = Req> + Send + 'static,
408 {
409 self.request_batch_timeout(reqs, self.timeout * (concurrency as u32), concurrency)
410 .await
411 }
412}
413
414impl<Req, Resp> Responder<Req, Resp>
415where
416 Req: Send + Sync + 'static,
417 Resp: Send + Sync + 'static,
418{
419 /// Processes incoming requests using the specified concurrency strategy.
420 ///
421 /// # Parameters
422 /// - `strategy`: The concurrency strategy to use (`Fixed` or `Dynamic`).
423 ///
424 /// # Returns
425 /// - `Ok(())`: If all requests are processed successfully.
426 /// - `Err(Error)`: If an error occurs during processing.
427 pub async fn process_requests_with_strategy(
428 self,
429 strategy: ConcurrencyStrategy,
430 ) -> Result<(), Error> {
431 match strategy {
432 ConcurrencyStrategy::Fixed(concurrency) => {
433 self.process_requests_fixed(concurrency).await
434 }
435 ConcurrencyStrategy::Dynamic(initial, max) => {
436 self.process_requests_dynamic(initial, max).await
437 }
438 }
439 }
440
441 /// Processes incoming requests with a fixed concurrency of 16.
442 ///
443 /// # Returns
444 /// - `Ok(())`: If all requests are processed successfully.
445 /// - `Err(Error)`: If an error occurs during processing.
446 pub async fn process_requests(self) -> Result<(), Error> {
447 self.process_requests_fixed(16).await
448 }
449
450 /// Processes incoming requests with a fixed concurrency.
451 ///
452 /// # Parameters
453 /// - `concurrency`: The number of concurrent requests to process.
454 ///
455 /// # Returns
456 /// - `Ok(())`: If all requests are processed successfully.
457 /// - `Err(Error)`: If an error occurs during processing.
458 pub async fn process_requests_fixed(self, concurrency: usize) -> Result<(), Error> {
459 let Self {
460 req_rx,
461 resp_tx,
462 mut handler,
463 } = self;
464 req_rx
465 .map(move |(id, req)| {
466 let hdl = unsafe { handler.as_mut().get_unchecked_mut() };
467 let fut = hdl(req);
468 fut.map(move |resp| Ok((id, resp)))
469 })
470 .buffer_unordered(concurrency)
471 .forward(resp_tx)
472 .map_err(|e| Error::SendResponseFailed(e.to_string()))
473 .await?;
474
475 Ok(())
476 }
477
478 /// Processes incoming requests with dynamic concurrency adjustment.
479 ///
480 /// The concurrency is adjusted based on the response time of requests.
481 ///
482 /// # Parameters
483 /// - `initial_concurrency`: The initial number of concurrent requests.
484 /// - `max_concurrency`: The maximum number of concurrent requests.
485 ///
486 /// # Returns
487 /// - `Ok(())`: If all requests are processed successfully.
488 /// - `Err(Error)`: If an error occurs during processing.
489 pub async fn process_requests_dynamic(
490 self,
491 initial_concurrency: usize,
492 max_concurrency: usize,
493 ) -> Result<(), Error> {
494 let Self {
495 req_rx,
496 resp_tx,
497 mut handler,
498 } = self;
499
500 let concurrency = Arc::new(AtomicUsize::new(initial_concurrency));
501
502 let concurrency_cloned = concurrency.clone();
503 req_rx
504 .map(move |(id, req)| {
505 let concurrency = Arc::clone(&concurrency);
506 let start = Instant::now();
507
508 let hdl = unsafe { handler.as_mut().get_unchecked_mut() };
509
510 let fut = hdl(req);
511
512 fut.map(move |resp| {
513 let dur = start.elapsed();
514 let currency = concurrency.load(Ordering::Relaxed);
515 if dur < Duration::from_millis(10) {
516 if currency < max_concurrency {
517 let increment = std::cmp::max(1, currency / 4);
518 let new_value =
519 std::cmp::min(max_concurrency, currency.saturating_add(increment));
520 concurrency.store(new_value, Ordering::Relaxed);
521 }
522 } else if dur < Duration::from_millis(100) {
523 if currency > 1 {
524 concurrency.fetch_sub(1, Ordering::Relaxed);
525 }
526 } else {
527 let decrement = std::cmp::max(2, currency / 4);
528 let new_value = std::cmp::min(1, currency.saturating_sub(decrement));
529 concurrency.store(new_value, Ordering::Relaxed);
530 }
531 Ok((id, resp))
532 })
533 })
534 .buffer_unordered(concurrency_cloned.load(Ordering::Relaxed))
535 .forward(resp_tx)
536 .map_err(|e| Error::SendResponseFailed(e.to_string()))
537 .await?;
538
539 Ok(())
540 }
541}
542
543/// Creates a new client-responder pair with the specified buffer size and timeout.
544///
545/// # Parameters
546/// - `buffer`: The size of the request and response buffers.
547/// - `timeout`: The default timeout for requests.
548///
549/// # Returns
550/// A tuple containing a `Client` and a `ResponderBuilder`.
551pub fn channel<Req, Resp>(
552 buffer: usize,
553 timeout: Duration,
554) -> (Client<Req, Resp>, ResponderBuilder<Req, Resp>)
555where
556 Req: Send + Sync + 'static,
557 Resp: Send + Sync + 'static,
558{
559 let (req_tx, req_rx) = mpsc::channel(buffer);
560 let (resp_tx, resp_rx) = mpsc::channel(buffer);
561
562 let client = Client {
563 tx: Arc::new(Mutex::new(req_tx)),
564 pending: Arc::new(Slab::new()),
565 timeout,
566 };
567
568 let responder_builder = ResponderBuilder { req_rx, resp_tx };
569
570 let pending = client.pending.clone();
571
572 tokio::spawn(async move {
573 resp_rx
574 .for_each_concurrent(64, |(id, res)| {
575 let pending = pending.clone();
576 async move {
577 if let Some(tx) = pending.take(id) {
578 let _ = tx.send(res);
579 }
580 }
581 })
582 .await;
583 });
584
585 (client, responder_builder)
586}
587
588#[cfg(test)]
589mod tests {
590 use super::*;
591 use std::time::Duration;
592 use tokio::runtime::Runtime;
593
594 /// Test sending a single request and receiving a response.
595 #[test]
596 fn test_single_request() {
597 let rt = Runtime::new().unwrap();
598 rt.block_on(async {
599 let (client, responder_builder) = channel(64, Duration::from_secs(2));
600 let responder = responder_builder.build(|req: String| async move { req });
601
602 tokio::spawn(async move {
603 responder.process_requests().await.unwrap();
604 });
605
606 let response = client.request("Hello, world!".to_string()).await.unwrap();
607 assert_eq!(response, "Hello, world!");
608 });
609 }
610
611 /// Test sending a batch of requests and receiving responses.
612 #[test]
613 fn test_batch_requests() {
614 let rt = Runtime::new().unwrap();
615 rt.block_on(async {
616 let (client, responder_builder) = channel(64, Duration::from_secs(2));
617 let responder = responder_builder.build(|req: String| async move { req });
618
619 tokio::spawn(async move {
620 responder.process_requests().await.unwrap();
621 });
622
623 let requests = vec!["Request 1".to_string(), "Request 2".to_string()];
624 let responses = client.request_batch(requests, 4).await.unwrap();
625 assert_eq!(
626 responses,
627 vec!["Request 1".to_string(), "Request 2".to_string()]
628 );
629 });
630 }
631
632 /// Test processing requests with fixed concurrency.
633 #[test]
634 fn test_fixed_concurrency() {
635 let rt = Runtime::new().unwrap();
636 rt.block_on(async {
637 let (client, responder_builder) = channel(64, Duration::from_secs(2));
638 let responder = responder_builder.build(|req: String| async move { req });
639
640 tokio::spawn(async move {
641 responder
642 .process_requests_with_strategy(ConcurrencyStrategy::Fixed(16))
643 .await
644 .unwrap();
645 });
646
647 let response = client.request("Hello, world!".to_string()).await.unwrap();
648 assert_eq!(response, "Hello, world!");
649 });
650 }
651
652 /// Test processing requests with dynamic concurrency.
653 #[test]
654 fn test_dynamic_concurrency() {
655 let rt = Runtime::new().unwrap();
656 rt.block_on(async {
657 let (client, responder_builder) = channel(64, Duration::from_secs(2));
658 let responder = responder_builder.build(|req: String| async move { req });
659
660 tokio::spawn(async move {
661 responder
662 .process_requests_with_strategy(ConcurrencyStrategy::Dynamic(4, 16))
663 .await
664 .unwrap();
665 });
666
667 let response = client.request("Hello, world!".to_string()).await.unwrap();
668 assert_eq!(response, "Hello, world!");
669 });
670 }
671
672 /// Test sending a single request with a custom timeout.
673 #[test]
674 fn test_single_request_timeout() {
675 let rt = Runtime::new().unwrap();
676 rt.block_on(async {
677 let (client, responder_builder) = channel(64, Duration::from_secs(2));
678 let responder = responder_builder.build(|req: String| async move { req });
679
680 tokio::spawn(async move {
681 responder.process_requests().await.unwrap();
682 });
683
684 let response = client
685 .request_timeout("Hello, world!".to_string(), Duration::from_secs(1))
686 .await
687 .unwrap();
688 assert_eq!(response, "Hello, world!");
689 });
690 }
691
692 /// Test sending a batch of requests with a custom timeout.
693 #[test]
694 fn test_batch_requests_timeout() {
695 let rt = Runtime::new().unwrap();
696 rt.block_on(async {
697 let (client, responder_builder) = channel(64, Duration::from_secs(2));
698 let responder = responder_builder.build(|req: String| async move { req });
699
700 tokio::spawn(async move {
701 responder.process_requests().await.unwrap();
702 });
703
704 let requests = vec!["Request 1".to_string(), "Request 2".to_string()];
705 let responses = client
706 .request_batch_timeout(requests, Duration::from_secs(1), 4)
707 .await
708 .unwrap();
709 assert_eq!(
710 responses,
711 vec!["Request 1".to_string(), "Request 2".to_string()]
712 );
713 });
714 }
715
716 /// Test error handling for a failed request.
717 #[test]
718 fn test_request_failure() {
719 let rt = Runtime::new().unwrap();
720 rt.block_on(async {
721 let (client, responder_builder) = channel(64, Duration::from_secs(2));
722 let responder = responder_builder.build(|req: String| async move {
723 if req == "fail" {
724 Err(Error::InternalError("Request failed".into()))
725 } else {
726 Ok(req)
727 }
728 });
729
730 tokio::spawn(async move {
731 responder.process_requests().await.unwrap();
732 });
733
734 let result = client.request("fail".to_string()).await.unwrap();
735 assert!(matches!(result, Err(Error::InternalError(_))));
736 });
737 }
738
739 /// Test error handling for a timeout.
740 #[test]
741 fn test_request_timeout() {
742 let rt = Runtime::new().unwrap();
743 rt.block_on(async {
744 let (client, responder_builder) = channel(64, Duration::from_secs(2));
745 let responder = responder_builder.build(|req: String| async move {
746 tokio::time::sleep(Duration::from_secs(3)).await;
747 req
748 });
749
750 tokio::spawn(async move {
751 responder.process_requests().await.unwrap();
752 });
753
754 let result = client
755 .request_timeout("Hello, world!".to_string(), Duration::from_secs(1))
756 .await;
757 assert!(matches!(result, Err(Error::Timeout)));
758 });
759 }
760}