lite_sync/request_response/
one_to_one.rs

1/// Lightweight bidirectional request-response channel for one-to-one communication
2/// 
3/// Optimized for strict request-response pattern where side A sends requests
4/// and side B must respond before A can send the next request. No buffer needed.
5/// 
6/// 轻量级一对一双向请求-响应通道
7/// 
8/// 为严格的请求-响应模式优化,A方发送请求,B方必须响应后A方才能发送下一个请求。
9/// 无需缓冲区。
10use std::cell::UnsafeCell;
11use std::mem::MaybeUninit;
12use std::sync::atomic::{AtomicU8, AtomicBool, Ordering};
13use std::future::Future;
14use std::pin::Pin;
15use std::task::{Context, Poll};
16use std::sync::Arc;
17
18use crate::atomic_waker::AtomicWaker;
19use super::common::{ChannelError, state};
20
21// Channel states - re-export from core
22const IDLE: u8 = state::IDLE;
23const WAITING_RESPONSE: u8 = state::WAITING_RESPONSE;
24const RESPONSE_READY: u8 = state::RESPONSE_READY;
25
26/// Internal state for bidirectional request-response channel
27/// 
28/// Uses atomic operations and UnsafeCell for lock-free bidirectional communication.
29/// State machine ensures strict request-response ordering.
30/// 
31/// 双向请求-响应 channel 的内部状态
32/// 
33/// 使用原子操作和 UnsafeCell 实现无锁的双向通信。
34/// 通过状态机确保严格的请求-响应顺序。
35struct Inner<Req, Resp> {
36    /// Channel state
37    state: AtomicU8,
38    
39    /// Waker for A waiting for response
40    a_waker: AtomicWaker,
41    
42    /// Waker for B waiting for request
43    b_waker: AtomicWaker,
44    
45    /// Whether side A is closed
46    a_closed: AtomicBool,
47    
48    /// Whether side B is closed
49    b_closed: AtomicBool,
50    
51    /// Storage for request data
52    request: UnsafeCell<MaybeUninit<Req>>,
53    
54    /// Storage for response data
55    response: UnsafeCell<MaybeUninit<Resp>>,
56}
57
58// SAFETY: Access to UnsafeCell is synchronized via atomic state machine
59unsafe impl<Req: Send, Resp: Send> Send for Inner<Req, Resp> {}
60unsafe impl<Req: Send, Resp: Send> Sync for Inner<Req, Resp> {}
61
62impl<Req, Resp> Inner<Req, Resp> {
63    /// Create new channel internal state
64    /// 
65    /// 创建新的 channel 内部状态
66    #[inline]
67    fn new() -> Self {
68        Self {
69            state: AtomicU8::new(IDLE),
70            a_waker: AtomicWaker::new(),
71            b_waker: AtomicWaker::new(),
72            a_closed: AtomicBool::new(false),
73            b_closed: AtomicBool::new(false),
74            request: UnsafeCell::new(MaybeUninit::uninit()),
75            response: UnsafeCell::new(MaybeUninit::uninit()),
76        }
77    }
78    
79    /// Try to send request (state transition: IDLE -> WAITING_RESPONSE)
80    /// 
81    /// 尝试发送请求(状态转换:IDLE -> WAITING_RESPONSE)
82    #[inline]
83    fn try_send_request(&self) -> bool {
84        self.state.compare_exchange(
85            IDLE,
86            WAITING_RESPONSE,
87            Ordering::AcqRel,
88            Ordering::Acquire,
89        ).is_ok()
90    }
91    
92    /// Mark response as ready (state transition: WAITING_RESPONSE -> RESPONSE_READY)
93    /// 
94    /// 标记响应已就绪(状态转换:WAITING_RESPONSE -> RESPONSE_READY)
95    #[inline]
96    fn mark_response_ready(&self) {
97        self.state.store(RESPONSE_READY, Ordering::Release);
98    }
99    
100    /// Complete response reception (state transition: RESPONSE_READY -> IDLE)
101    /// 
102    /// 完成响应接收(状态转换:RESPONSE_READY -> IDLE)
103    #[inline]
104    fn complete_response(&self) {
105        self.state.store(IDLE, Ordering::Release);
106    }
107    
108    /// Check if side A is closed
109    /// 
110    /// 检查 A 方是否已关闭
111    #[inline]
112    fn is_a_closed(&self) -> bool {
113        self.a_closed.load(Ordering::Acquire)
114    }
115    
116    /// Check if side B is closed
117    /// 
118    /// 检查 B 方是否已关闭
119    #[inline]
120    fn is_b_closed(&self) -> bool {
121        self.b_closed.load(Ordering::Acquire)
122    }
123    
124    /// Get current state
125    /// 
126    /// 获取当前状态
127    #[inline]
128    fn current_state(&self) -> u8 {
129        self.state.load(Ordering::Acquire)
130    }
131}
132
133/// Side A endpoint (request sender, response receiver)
134/// 
135/// A 方的 channel 端点(请求发送方,响应接收方)
136pub struct SideA<Req, Resp> {
137    inner: Arc<Inner<Req, Resp>>,
138}
139
140/// Side B endpoint (request receiver, response sender)
141/// 
142/// B 方的 channel 端点(请求接收方,响应发送方)
143pub struct SideB<Req, Resp> {
144    inner: Arc<Inner<Req, Resp>>,
145}
146
147/// Create a new request-response channel
148/// 
149/// Returns (SideA, SideB) tuple representing both ends of the channel.
150/// 
151/// 创建一个新的请求-响应 channel
152/// 
153/// 返回 (SideA, SideB) 元组,分别代表 channel 的两端。
154/// 
155/// # Example
156/// 
157/// ```
158/// use lite_sync::request_response::one_to_one::channel;
159/// 
160/// # tokio_test::block_on(async {
161/// let (side_a, side_b) = channel::<String, i32>();
162/// 
163/// // Side B uses convenient handle_request method
164/// tokio::spawn(async move {
165///     while side_b.handle_request(|request| request.len() as i32).await.is_ok() {
166///         // Continue handling requests
167///     }
168/// });
169/// 
170/// let response = side_a.request("Hello".to_string()).await;
171/// assert_eq!(response, Ok(5));
172/// # });
173/// ```
174/// 
175/// # Advanced Example: Async Processing
176/// 
177/// ```
178/// use lite_sync::request_response::one_to_one::channel;
179/// 
180/// # tokio_test::block_on(async {
181/// let (side_a, side_b) = channel::<String, String>();
182/// 
183/// tokio::spawn(async move {
184///     while side_b.handle_request_async(|req| async move {
185///         // Async processing logic
186///         req.to_uppercase()
187///     }).await.is_ok() {
188///         // Continue handling
189///     }
190/// });
191/// 
192/// let result = side_a.request("hello".to_string()).await;
193/// assert_eq!(result, Ok("HELLO".to_string()));
194/// # });
195/// ```
196#[inline]
197pub fn channel<Req, Resp>() -> (SideA<Req, Resp>, SideB<Req, Resp>) {
198    let inner = Arc::new(Inner::new());
199    
200    let side_a = SideA {
201        inner: inner.clone(),
202    };
203    
204    let side_b = SideB {
205        inner,
206    };
207    
208    (side_a, side_b)
209}
210
211impl<Req, Resp> SideA<Req, Resp> {
212    /// Send a request and wait for response
213    /// 
214    /// This method will:
215    /// 1. Wait for channel to be idle (if previous request is still being processed)
216    /// 2. Send request to side B
217    /// 3. Wait for side B's response
218    /// 4. Return the response
219    /// 
220    /// 发送请求并等待响应
221    /// 
222    /// 这个方法会:
223    /// 1. 等待 channel 进入空闲状态(如果之前的请求还在处理中)
224    /// 2. 发送请求到 B 方
225    /// 3. 等待 B 方的响应
226    /// 4. 返回响应
227    /// 
228    /// # Returns
229    /// 
230    /// - `Ok(response)`: Received response from side B
231    /// - `Err(ChannelError::Closed)`: Side B has been closed
232    /// 
233    /// # Example
234    /// 
235    /// ```
236    /// # use lite_sync::request_response::one_to_one::channel;
237    /// # tokio_test::block_on(async {
238    /// let (side_a, side_b) = channel::<String, i32>();
239    /// 
240    /// tokio::spawn(async move {
241    ///     while let Ok(guard) = side_b.recv_request().await {
242    ///         let response = guard.request().len() as i32;
243    ///         guard.reply(response);
244    ///     }
245    /// });
246    /// 
247    /// let response = side_a.request("Hello".to_string()).await;
248    /// assert_eq!(response, Ok(5));
249    /// # });
250    /// ```
251    pub async fn request(&self, req: Req) -> Result<Resp, ChannelError> {
252        // Send request
253        self.send_request(req).await?;
254        
255        // Wait for response
256        self.recv_response().await
257    }
258    
259    /// Send request (without waiting for response)
260    /// 
261    /// Will wait if channel is not in idle state.
262    /// 
263    /// 发送请求(不等待响应)
264    /// 
265    /// 如果 channel 不在空闲状态,会等待直到可以发送。
266    async fn send_request(&self, req: Req) -> Result<(), ChannelError> {
267        SendRequest {
268            inner: &self.inner,
269            request: Some(req),
270            registered: false,
271        }.await
272    }
273    
274    /// Wait for and receive response
275    /// 
276    /// 等待并接收响应
277    /// 
278    /// # Returns
279    /// 
280    /// - `Ok(response)`: Received response
281    /// - `Err(ChannelError::Closed)`: Side B closed
282    async fn recv_response(&self) -> Result<Resp, ChannelError> {
283        RecvResponse {
284            inner: &self.inner,
285            registered: false,
286        }.await
287    }
288}
289
290/// Request guard that enforces B must reply
291/// 
292/// This guard ensures that B must call `reply()` before dropping the guard.
293/// If the guard is dropped without replying, it will panic to prevent A from deadlocking.
294/// 
295/// 强制 B 必须回复的 Guard
296/// 
297/// 这个 guard 确保 B 必须在丢弃 guard 之前调用 `reply()`。
298/// 如果 guard 在没有回复的情况下被丢弃,会 panic 以防止 A 死锁。
299pub struct RequestGuard<'a, Req, Resp>
300where
301    Req: Send, Resp: Send,
302{
303    inner: &'a Inner<Req, Resp>,
304    req: Option<Req>,
305    replied: bool,
306}
307
308impl<'a, Req, Resp> std::fmt::Debug for RequestGuard<'a, Req, Resp>
309where
310    Req: Send + std::fmt::Debug,
311    Resp: Send,
312{
313    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
314        f.debug_struct("RequestGuard")
315            .field("req", &self.req)
316            .finish_non_exhaustive()
317    }
318}
319
320// PartialEq for testing purposes - comparing RequestGuards doesn't make sense
321// but we need it for Result<RequestGuard, _> comparisons in tests
322impl<'a, Req, Resp> PartialEq for RequestGuard<'a, Req, Resp>
323where
324    Req: Send + PartialEq,
325    Resp: Send,
326{
327    fn eq(&self, other: &Self) -> bool {
328        // Two guards are equal if they hold the same request value
329        // This is mainly for testing purposes
330        self.req == other.req
331    }
332}
333
334impl<'a, Req, Resp> RequestGuard<'a, Req, Resp>
335where
336    Req: Send, Resp: Send,
337{
338    /// Get a reference to the request
339    /// 
340    /// 获取请求内容的引用
341    #[inline]
342    pub fn request(&self) -> &Req {
343        self.req.as_ref().expect("RequestGuard logic error: request already consumed")
344    }
345    
346    /// Consume the guard and send reply
347    /// 
348    /// This method will:
349    /// 1. Store the response
350    /// 2. Update state to RESPONSE_READY
351    /// 3. Wake up side A
352    /// 
353    /// 消耗 Guard 并发送回复
354    /// 
355    /// 这个方法会:
356    /// 1. 存储响应
357    /// 2. 更新状态为 RESPONSE_READY
358    /// 3. 唤醒 A 方
359    #[inline]
360    pub fn reply(mut self, resp: Resp) {
361        // Store response
362        // SAFETY: Side B has exclusive access to response storage
363        unsafe {
364            (*self.inner.response.get()).write(resp);
365        }
366        
367        // Mark state as RESPONSE_READY
368        self.inner.mark_response_ready();
369        
370        // Wake up side A
371        self.inner.a_waker.wake();
372        
373        // Mark as replied (prevent Drop panic)
374        self.replied = true;
375    }
376    
377}
378
379/// Drop guard: If B drops the guard without calling `reply`, we panic.
380/// This enforces the "must reply" protocol.
381/// 
382/// Drop 守卫:如果 B 不调用 `reply` 就丢弃了 Guard,我们会 panic。
383/// 这强制执行了 "必须回复" 的协议。
384impl<'a, Req, Resp> Drop for RequestGuard<'a, Req, Resp>
385where
386    Req: Send, Resp: Send,
387{
388    fn drop(&mut self) {
389        if !self.replied {
390            // B dropped the guard without replying
391            // This is a protocol error that would cause A to deadlock
392            // We must panic to prevent this
393            panic!("RequestGuard dropped without replying! This would cause the requester to deadlock. You must call reply() before dropping the guard.");
394        }
395    }
396}
397
398impl<Req, Resp> SideB<Req, Resp> {
399    /// Wait for and receive request, returning a guard that must be replied to
400    /// 
401    /// The returned `RequestGuard` enforces that you must call `reply()` on it.
402    /// If you drop the guard without calling `reply()`, it will panic.
403    /// 
404    /// 等待并接收请求,返回一个必须回复的 guard
405    /// 
406    /// 返回的 `RequestGuard` 强制你必须调用 `reply()`。
407    /// 如果你在没有调用 `reply()` 的情况下丢弃 guard,会 panic。
408    /// 
409    /// # Returns
410    /// 
411    /// - `Ok(RequestGuard)`: Received request from side A
412    /// - `Err(ChannelError::Closed)`: Side A has been closed
413    /// 
414    /// # Example
415    /// 
416    /// ```
417    /// # use lite_sync::request_response::one_to_one::channel;
418    /// # tokio_test::block_on(async {
419    /// let (side_a, side_b) = channel::<String, i32>();
420    /// 
421    /// tokio::spawn(async move {
422    ///     while let Ok(guard) = side_b.recv_request().await {
423    ///         let len = guard.request().len() as i32;
424    ///         guard.reply(len);
425    ///     }
426    /// });
427    /// 
428    /// let response = side_a.request("Hello".to_string()).await;
429    /// assert_eq!(response, Ok(5));
430    /// # });
431    /// ```
432    pub async fn recv_request(&self) -> Result<RequestGuard<'_, Req, Resp>, ChannelError>
433    where
434        Req: Send,
435        Resp: Send,
436    {
437        let req = RecvRequest {
438            inner: &self.inner,
439            registered: false,
440        }.await?;
441        
442        Ok(RequestGuard {
443            inner: &self.inner,
444            req: Some(req),
445            replied: false,
446        })
447    }
448    
449    /// Convenient method to handle request and send response
450    /// 
451    /// This method will:
452    /// 1. Wait for and receive request
453    /// 2. Call the handler function
454    /// 3. Send the response via the guard
455    /// 
456    /// 处理请求并发送响应的便捷方法
457    /// 
458    /// 这个方法会:
459    /// 1. 等待并接收请求
460    /// 2. 调用处理函数
461    /// 3. 通过 guard 发送响应
462    /// 
463    /// # Returns
464    /// 
465    /// - `Ok(())`: Successfully handled request and sent response
466    /// - `Err(ChannelError::Closed)`: Side A closed
467    /// 
468    /// # Example
469    /// 
470    /// ```
471    /// # use lite_sync::request_response::one_to_one::channel;
472    /// # tokio_test::block_on(async {
473    /// let (side_a, side_b) = channel::<String, i32>();
474    /// 
475    /// tokio::spawn(async move {
476    ///     while side_b.handle_request(|req| req.len() as i32).await.is_ok() {
477    ///         // Continue handling
478    ///     }
479    /// });
480    /// 
481    /// let response = side_a.request("Hello".to_string()).await;
482    /// assert_eq!(response, Ok(5));
483    /// # });
484    /// ```
485    pub async fn handle_request<F>(&self, handler: F) -> Result<(), ChannelError>
486    where
487        Req: Send,
488        Resp: Send,
489        F: FnOnce(&Req) -> Resp,
490    {
491        let guard = self.recv_request().await?;
492        let resp = handler(guard.request());
493        guard.reply(resp);
494        Ok(())
495    }
496    
497    /// Convenient async method to handle request and send response
498    /// 
499    /// Similar to `handle_request`, but supports async handler functions.
500    /// Note: The handler takes ownership of the request to avoid lifetime issues.
501    /// 
502    /// 处理请求并发送响应的异步便捷方法
503    /// 
504    /// 与 `handle_request` 类似,但支持异步处理函数。
505    /// 注意:处理函数会获取请求的所有权以避免生命周期问题。
506    /// 
507    /// # Example
508    /// 
509    /// ```
510    /// # use lite_sync::request_response::one_to_one::channel;
511    /// # tokio_test::block_on(async {
512    /// let (side_a, side_b) = channel::<String, String>();
513    /// 
514    /// tokio::spawn(async move {
515    ///     while side_b.handle_request_async(|req| async move {
516    ///         // Async processing - req is owned
517    ///         req.to_uppercase()
518    ///     }).await.is_ok() {
519    ///         // Continue handling
520    ///     }
521    /// });
522    /// 
523    /// let response = side_a.request("hello".to_string()).await;
524    /// assert_eq!(response, Ok("HELLO".to_string()));
525    /// # });
526    /// ```
527    pub async fn handle_request_async<F, Fut>(&self, handler: F) -> Result<(), ChannelError>
528    where
529        Req: Send,
530        Resp: Send,
531        F: FnOnce(Req) -> Fut,
532        Fut: Future<Output = Resp>,
533    {
534        let mut guard = self.recv_request().await?;
535        let req = guard.req.take().expect("RequestGuard logic error: request already consumed");
536        let resp = handler(req).await;
537        
538        // Manually send the reply since we've consumed the request
539        unsafe {
540            (*guard.inner.response.get()).write(resp);
541        }
542        guard.inner.mark_response_ready();
543        guard.inner.a_waker.wake();
544        guard.replied = true;
545        
546        Ok(())
547    }
548}
549
550// Drop implementations to clean up wakers
551impl<Req, Resp> Drop for SideA<Req, Resp> {
552    fn drop(&mut self) {
553        // Side A closed, wake up side B that might be waiting
554        self.inner.a_closed.store(true, Ordering::Release);
555        self.inner.b_waker.wake();
556    }
557}
558
559impl<Req, Resp> Drop for SideB<Req, Resp> {
560    fn drop(&mut self) {
561        // Side B closed, wake up side A that might be waiting
562        self.inner.b_closed.store(true, Ordering::Release);
563        self.inner.a_waker.wake();
564    }
565}
566
567/// Future: Side A sends request
568struct SendRequest<'a, Req, Resp> {
569    inner: &'a Inner<Req, Resp>,
570    request: Option<Req>,
571    registered: bool,
572}
573
574// SendRequest is Unpin because we only need to move data, not pin it
575impl<Req, Resp> Unpin for SendRequest<'_, Req, Resp> {}
576
577impl<Req, Resp> Future for SendRequest<'_, Req, Resp> {
578    type Output = Result<(), ChannelError>;
579    
580    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
581        let this = self.get_mut();
582        
583        // Check if side B is closed
584        if this.inner.is_b_closed() {
585            return Poll::Ready(Err(ChannelError::Closed));
586        }
587        
588        // Try to send request
589        if this.inner.try_send_request() {
590            // Successfully sent request
591            // SAFETY: We have exclusive access (guaranteed by state machine)
592            unsafe {
593                (*this.inner.request.get()).write(this.request.take().unwrap());
594            }
595            
596            // Wake up side B
597            this.inner.b_waker.wake();
598            
599            return Poll::Ready(Ok(()));
600        }
601        
602        // Channel busy, register waker and wait
603        if !this.registered {
604            this.inner.a_waker.register(cx.waker());
605            this.registered = true;
606            
607            // Check again (avoid race condition)
608            if this.inner.is_b_closed() {
609                return Poll::Ready(Err(ChannelError::Closed));
610            }
611            
612            if this.inner.current_state() == IDLE {
613                cx.waker().wake_by_ref();
614            }
615        }
616        
617        Poll::Pending
618    }
619}
620
621/// Future: Side A receives response
622struct RecvResponse<'a, Req, Resp> {
623    inner: &'a Inner<Req, Resp>,
624    registered: bool,
625}
626
627// RecvResponse is Unpin because it only holds references and a bool
628impl<Req, Resp> Unpin for RecvResponse<'_, Req, Resp> {}
629
630impl<Req, Resp> Future for RecvResponse<'_, Req, Resp> {
631    type Output = Result<Resp, ChannelError>;
632    
633    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
634        // Check if state is RESPONSE_READY first
635        // Try to receive sent response even if B closed
636        if self.inner.current_state() == RESPONSE_READY {
637            // Take the response
638            // SAFETY: Response must exist when state is RESPONSE_READY
639            let response = unsafe {
640                (*self.inner.response.get()).assume_init_read()
641            };
642            
643            // Reset to IDLE for next round
644            self.inner.complete_response();
645            
646            return Poll::Ready(Ok(response));
647        }
648        
649        // Check if B closed (after confirming no response)
650        if self.inner.is_b_closed() {
651            return Poll::Ready(Err(ChannelError::Closed));
652        }
653        
654        // Register waker
655        if !self.registered {
656            self.inner.a_waker.register(cx.waker());
657            self.registered = true;
658            
659            // Check state again (avoid race condition)
660            if self.inner.current_state() == RESPONSE_READY {
661                let response = unsafe {
662                    (*self.inner.response.get()).assume_init_read()
663                };
664                self.inner.complete_response();
665                return Poll::Ready(Ok(response));
666            }
667            
668            // Check again if B closed
669            if self.inner.is_b_closed() {
670                return Poll::Ready(Err(ChannelError::Closed));
671            }
672        }
673        
674        Poll::Pending
675    }
676}
677
678/// Future: Side B receives request
679struct RecvRequest<'a, Req, Resp> {
680    inner: &'a Inner<Req, Resp>,
681    registered: bool,
682}
683
684// RecvRequest is Unpin because it only holds references and a bool
685impl<Req, Resp> Unpin for RecvRequest<'_, Req, Resp> {}
686
687impl<Req, Resp> Future for RecvRequest<'_, Req, Resp> {
688    type Output = Result<Req, ChannelError>;
689    
690    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
691        // Check state first, then access data (fixes race condition)
692        // Try to receive sent request even if A closed
693        if self.inner.current_state() == WAITING_RESPONSE {
694            // Take the request
695            // SAFETY: Request must exist when state is WAITING_RESPONSE
696            let request = unsafe {
697                (*self.inner.request.get()).assume_init_read()
698            };
699            
700            return Poll::Ready(Ok(request));
701        }
702        
703        // Check if A closed (after confirming no request)
704        if self.inner.is_a_closed() {
705            return Poll::Ready(Err(ChannelError::Closed));
706        }
707        
708        // Register waker
709        if !self.registered {
710            self.inner.b_waker.register(cx.waker());
711            self.registered = true;
712            
713            // Check again (avoid race condition)
714            if self.inner.current_state() == WAITING_RESPONSE {
715                let request = unsafe {
716                    (*self.inner.request.get()).assume_init_read()
717                };
718                return Poll::Ready(Ok(request));
719            }
720            
721            // Check again if A closed
722            if self.inner.is_a_closed() {
723                return Poll::Ready(Err(ChannelError::Closed));
724            }
725        }
726        
727        Poll::Pending
728    }
729}
730
731
732#[cfg(test)]
733mod tests {
734    use super::*;
735    use tokio::time::{sleep, Duration};
736
737    #[tokio::test]
738    async fn test_basic_request_response() {
739        let (side_a, side_b) = channel::<String, i32>();
740        
741        tokio::spawn(async move {
742            let guard = side_b.recv_request().await.unwrap();
743            assert_eq!(guard.request(), "Hello");
744            guard.reply(42);
745        });
746        
747        let response = side_a.request("Hello".to_string()).await;
748        assert_eq!(response, Ok(42));
749    }
750
751    #[tokio::test]
752    async fn test_multiple_rounds() {
753        let (side_a, side_b) = channel::<i32, i32>();
754        
755        tokio::spawn(async move {
756            for i in 0..5 {
757                let guard = side_b.recv_request().await.unwrap();
758                assert_eq!(*guard.request(), i);
759                guard.reply(i * 2);
760            }
761        });
762        
763        for i in 0..5 {
764            let response = side_a.request(i).await;
765            assert_eq!(response, Ok(i * 2));
766        }
767    }
768
769    #[tokio::test]
770    async fn test_delayed_response() {
771        let (side_a, side_b) = channel::<String, String>();
772        
773        tokio::spawn(async move {
774            let guard = side_b.recv_request().await.unwrap();
775            sleep(Duration::from_millis(50)).await;
776            let response = guard.request().to_uppercase();
777            guard.reply(response);
778        });
779        
780        let response = side_a.request("hello".to_string()).await;
781        assert_eq!(response, Ok("HELLO".to_string()));
782    }
783
784    #[tokio::test]
785    async fn test_side_b_closes() {
786        let (side_a, side_b) = channel::<i32, i32>();
787        
788        // Side B closes immediately
789        drop(side_b);
790        
791        // Side A should receive Err
792        let response = side_a.request(42).await;
793        assert_eq!(response, Err(ChannelError::Closed));
794    }
795
796    #[tokio::test]
797    async fn test_side_a_closes() {
798        let (side_a, side_b) = channel::<i32, i32>();
799        
800        // Side A closes immediately
801        drop(side_a);
802        
803        // Side B should receive Err
804        let request = side_b.recv_request().await;
805        assert_eq!(request, Err(ChannelError::Closed));
806    }
807
808    #[tokio::test]
809    async fn test_concurrent_requests() {
810        let (side_a, side_b) = channel::<i32, i32>();
811        
812        let handle_b = tokio::spawn(async move {
813            let mut count = 0;
814            loop {
815                if let Ok(guard) = side_b.recv_request().await {
816                    count += 1;
817                    let response = *guard.request() * 2;
818                    guard.reply(response);
819                } else {
820                    break;
821                }
822            }
823            count
824        });
825        
826        let handle_a = tokio::spawn(async move {
827            for i in 0..10 {
828                let response = side_a.request(i).await.unwrap();
829                assert_eq!(response, i * 2);
830            }
831            drop(side_a);
832        });
833        
834        handle_a.await.unwrap();
835        let count = handle_b.await.unwrap();
836        assert_eq!(count, 10);
837    }
838
839    #[tokio::test]
840    async fn test_string_messages() {
841        let (side_a, side_b) = channel::<String, String>();
842        
843        tokio::spawn(async move {
844            loop {
845                if let Ok(guard) = side_b.recv_request().await {
846                    let response = format!("Echo: {}", guard.request());
847                    guard.reply(response);
848                } else {
849                    break;
850                }
851            }
852        });
853        
854        let messages = vec!["Hello", "World", "Rust"];
855        for msg in messages {
856            let response = side_a.request(msg.to_string()).await.unwrap();
857            assert_eq!(response, format!("Echo: {}", msg));
858        }
859    }
860
861    #[tokio::test]
862    async fn test_handle_request() {
863        let (side_a, side_b) = channel::<i32, i32>();
864        
865        tokio::spawn(async move {
866            // Using handle_request convenience method
867            while side_b.handle_request(|req| req * 3).await.is_ok() {
868                // Continue handling
869            }
870        });
871        
872        for i in 0..5 {
873            let response = side_a.request(i).await.unwrap();
874            assert_eq!(response, i * 3);
875        }
876    }
877
878    #[tokio::test]
879    async fn test_handle_request_async() {
880        let (side_a, side_b) = channel::<String, usize>();
881        
882        tokio::spawn(async move {
883            // Using handle_request_async async convenience method
884            while side_b.handle_request_async(|req| async move {
885                sleep(Duration::from_millis(10)).await;
886                req.len()
887            }).await.is_ok() {
888                // Continue handling
889            }
890        });
891        
892        let test_strings = vec!["Hello", "World", "Rust", "Async"];
893        for s in test_strings {
894            let response = side_a.request(s.to_string()).await.unwrap();
895            assert_eq!(response, s.len());
896        }
897    }
898
899    #[tokio::test]
900    async fn test_error_display() {
901        // Test Display implementation for error types
902        assert_eq!(format!("{}", ChannelError::Closed), "channel closed");
903    }
904
905    #[tokio::test]
906    async fn test_multiple_handle_request_rounds() {
907        let (side_a, side_b) = channel::<String, String>();
908        
909        let handle = tokio::spawn(async move {
910            let mut count = 0;
911            // Manually handle to maintain state
912            while let Ok(guard) = side_b.recv_request().await {
913                count += 1;
914                let resp = format!("{}:{}", count, guard.request().to_uppercase());
915                guard.reply(resp);
916            }
917            count
918        });
919        
920        let response1 = side_a.request("hello".to_string()).await.unwrap();
921        assert_eq!(response1, "1:HELLO");
922        
923        let response2 = side_a.request("world".to_string()).await.unwrap();
924        assert_eq!(response2, "2:WORLD");
925        
926        let response3 = side_a.request("rust".to_string()).await.unwrap();
927        assert_eq!(response3, "3:RUST");
928        
929        drop(side_a);
930        let count = handle.await.unwrap();
931        assert_eq!(count, 3);
932    }
933
934    #[tokio::test]
935    async fn test_request_guard_must_reply() {
936        let (side_a, side_b) = channel::<i32, i32>();
937        
938        let handle = tokio::spawn(async move {
939            let _guard = side_b.recv_request().await.unwrap();
940            // Intentionally not calling reply() - this should panic
941        });
942        
943        // Send a request
944        tokio::spawn(async move {
945            let _ = side_a.request(42).await;
946        });
947        
948        // Wait for the spawned task and verify it panicked
949        let result = handle.await;
950        assert!(result.is_err(), "Task should have panicked");
951        
952        // Verify the panic message contains our expected text
953        if let Err(e) = result {
954            if let Ok(panic_payload) = e.try_into_panic() {
955                if let Some(s) = panic_payload.downcast_ref::<String>() {
956                    assert!(s.contains("RequestGuard dropped without replying"), 
957                        "Panic message should mention RequestGuard: {}", s);
958                } else if let Some(s) = panic_payload.downcast_ref::<&str>() {
959                    assert!(s.contains("RequestGuard dropped without replying"), 
960                        "Panic message should mention RequestGuard: {}", s);
961                } else {
962                    panic!("Unexpected panic type");
963                }
964            }
965        }
966    }
967}