lite_sync/request_response/one_to_one.rs
1/// Lightweight bidirectional request-response channel for one-to-one communication
2///
3/// Optimized for strict request-response pattern where side A sends requests
4/// and side B must respond before A can send the next request. No buffer needed.
5///
6/// 轻量级一对一双向请求-响应通道
7///
8/// 为严格的请求-响应模式优化,A方发送请求,B方必须响应后A方才能发送下一个请求。
9/// 无需缓冲区。
10use std::cell::UnsafeCell;
11use std::mem::MaybeUninit;
12use std::sync::atomic::{AtomicU8, AtomicBool, Ordering};
13use std::future::Future;
14use std::pin::Pin;
15use std::task::{Context, Poll};
16use std::sync::Arc;
17
18use crate::atomic_waker::AtomicWaker;
19use super::common::{ChannelError, state};
20
21// Channel states - re-export from core
22const IDLE: u8 = state::IDLE;
23const WAITING_RESPONSE: u8 = state::WAITING_RESPONSE;
24const RESPONSE_READY: u8 = state::RESPONSE_READY;
25
26/// Internal state for bidirectional request-response channel
27///
28/// Uses atomic operations and UnsafeCell for lock-free bidirectional communication.
29/// State machine ensures strict request-response ordering.
30///
31/// 双向请求-响应 channel 的内部状态
32///
33/// 使用原子操作和 UnsafeCell 实现无锁的双向通信。
34/// 通过状态机确保严格的请求-响应顺序。
35struct Inner<Req, Resp> {
36 /// Channel state
37 state: AtomicU8,
38
39 /// Waker for A waiting for response
40 a_waker: AtomicWaker,
41
42 /// Waker for B waiting for request
43 b_waker: AtomicWaker,
44
45 /// Whether side A is closed
46 a_closed: AtomicBool,
47
48 /// Whether side B is closed
49 b_closed: AtomicBool,
50
51 /// Storage for request data
52 request: UnsafeCell<MaybeUninit<Req>>,
53
54 /// Storage for response data
55 response: UnsafeCell<MaybeUninit<Resp>>,
56}
57
58// SAFETY: Access to UnsafeCell is synchronized via atomic state machine
59unsafe impl<Req: Send, Resp: Send> Send for Inner<Req, Resp> {}
60unsafe impl<Req: Send, Resp: Send> Sync for Inner<Req, Resp> {}
61
62impl<Req, Resp> Inner<Req, Resp> {
63 /// Create new channel internal state
64 ///
65 /// 创建新的 channel 内部状态
66 #[inline]
67 fn new() -> Self {
68 Self {
69 state: AtomicU8::new(IDLE),
70 a_waker: AtomicWaker::new(),
71 b_waker: AtomicWaker::new(),
72 a_closed: AtomicBool::new(false),
73 b_closed: AtomicBool::new(false),
74 request: UnsafeCell::new(MaybeUninit::uninit()),
75 response: UnsafeCell::new(MaybeUninit::uninit()),
76 }
77 }
78
79 /// Try to send request (state transition: IDLE -> WAITING_RESPONSE)
80 ///
81 /// 尝试发送请求(状态转换:IDLE -> WAITING_RESPONSE)
82 #[inline]
83 fn try_send_request(&self) -> bool {
84 self.state.compare_exchange(
85 IDLE,
86 WAITING_RESPONSE,
87 Ordering::AcqRel,
88 Ordering::Acquire,
89 ).is_ok()
90 }
91
92 /// Mark response as ready (state transition: WAITING_RESPONSE -> RESPONSE_READY)
93 ///
94 /// 标记响应已就绪(状态转换:WAITING_RESPONSE -> RESPONSE_READY)
95 #[inline]
96 fn mark_response_ready(&self) {
97 self.state.store(RESPONSE_READY, Ordering::Release);
98 }
99
100 /// Complete response reception (state transition: RESPONSE_READY -> IDLE)
101 ///
102 /// 完成响应接收(状态转换:RESPONSE_READY -> IDLE)
103 #[inline]
104 fn complete_response(&self) {
105 self.state.store(IDLE, Ordering::Release);
106 }
107
108 /// Check if side A is closed
109 ///
110 /// 检查 A 方是否已关闭
111 #[inline]
112 fn is_a_closed(&self) -> bool {
113 self.a_closed.load(Ordering::Acquire)
114 }
115
116 /// Check if side B is closed
117 ///
118 /// 检查 B 方是否已关闭
119 #[inline]
120 fn is_b_closed(&self) -> bool {
121 self.b_closed.load(Ordering::Acquire)
122 }
123
124 /// Get current state
125 ///
126 /// 获取当前状态
127 #[inline]
128 fn current_state(&self) -> u8 {
129 self.state.load(Ordering::Acquire)
130 }
131}
132
133/// Side A endpoint (request sender, response receiver)
134///
135/// A 方的 channel 端点(请求发送方,响应接收方)
136pub struct SideA<Req, Resp> {
137 inner: Arc<Inner<Req, Resp>>,
138}
139
140/// Side B endpoint (request receiver, response sender)
141///
142/// B 方的 channel 端点(请求接收方,响应发送方)
143pub struct SideB<Req, Resp> {
144 inner: Arc<Inner<Req, Resp>>,
145}
146
147/// Create a new request-response channel
148///
149/// Returns (SideA, SideB) tuple representing both ends of the channel.
150///
151/// 创建一个新的请求-响应 channel
152///
153/// 返回 (SideA, SideB) 元组,分别代表 channel 的两端。
154///
155/// # Example
156///
157/// ```
158/// use lite_sync::request_response::one_to_one::channel;
159///
160/// # tokio_test::block_on(async {
161/// let (side_a, side_b) = channel::<String, i32>();
162///
163/// // Side B uses convenient handle_request method
164/// tokio::spawn(async move {
165/// while side_b.handle_request(|request| request.len() as i32).await.is_ok() {
166/// // Continue handling requests
167/// }
168/// });
169///
170/// let response = side_a.request("Hello".to_string()).await;
171/// assert_eq!(response, Ok(5));
172/// # });
173/// ```
174///
175/// # Advanced Example: Async Processing
176///
177/// ```
178/// use lite_sync::request_response::one_to_one::channel;
179///
180/// # tokio_test::block_on(async {
181/// let (side_a, side_b) = channel::<String, String>();
182///
183/// tokio::spawn(async move {
184/// while side_b.handle_request_async(|req| async move {
185/// // Async processing logic
186/// req.to_uppercase()
187/// }).await.is_ok() {
188/// // Continue handling
189/// }
190/// });
191///
192/// let result = side_a.request("hello".to_string()).await;
193/// assert_eq!(result, Ok("HELLO".to_string()));
194/// # });
195/// ```
196#[inline]
197pub fn channel<Req, Resp>() -> (SideA<Req, Resp>, SideB<Req, Resp>) {
198 let inner = Arc::new(Inner::new());
199
200 let side_a = SideA {
201 inner: inner.clone(),
202 };
203
204 let side_b = SideB {
205 inner,
206 };
207
208 (side_a, side_b)
209}
210
211impl<Req, Resp> SideA<Req, Resp> {
212 /// Send a request and wait for response
213 ///
214 /// This method will:
215 /// 1. Wait for channel to be idle (if previous request is still being processed)
216 /// 2. Send request to side B
217 /// 3. Wait for side B's response
218 /// 4. Return the response
219 ///
220 /// 发送请求并等待响应
221 ///
222 /// 这个方法会:
223 /// 1. 等待 channel 进入空闲状态(如果之前的请求还在处理中)
224 /// 2. 发送请求到 B 方
225 /// 3. 等待 B 方的响应
226 /// 4. 返回响应
227 ///
228 /// # Returns
229 ///
230 /// - `Ok(response)`: Received response from side B
231 /// - `Err(ChannelError::Closed)`: Side B has been closed
232 ///
233 /// # Example
234 ///
235 /// ```
236 /// # use lite_sync::request_response::one_to_one::channel;
237 /// # tokio_test::block_on(async {
238 /// let (side_a, side_b) = channel::<String, i32>();
239 ///
240 /// tokio::spawn(async move {
241 /// while let Ok(guard) = side_b.recv_request().await {
242 /// let response = guard.request().len() as i32;
243 /// guard.reply(response);
244 /// }
245 /// });
246 ///
247 /// let response = side_a.request("Hello".to_string()).await;
248 /// assert_eq!(response, Ok(5));
249 /// # });
250 /// ```
251 pub async fn request(&self, req: Req) -> Result<Resp, ChannelError> {
252 // Send request
253 self.send_request(req).await?;
254
255 // Wait for response
256 self.recv_response().await
257 }
258
259 /// Send request (without waiting for response)
260 ///
261 /// Will wait if channel is not in idle state.
262 ///
263 /// 发送请求(不等待响应)
264 ///
265 /// 如果 channel 不在空闲状态,会等待直到可以发送。
266 async fn send_request(&self, req: Req) -> Result<(), ChannelError> {
267 SendRequest {
268 inner: &self.inner,
269 request: Some(req),
270 registered: false,
271 }.await
272 }
273
274 /// Wait for and receive response
275 ///
276 /// 等待并接收响应
277 ///
278 /// # Returns
279 ///
280 /// - `Ok(response)`: Received response
281 /// - `Err(ChannelError::Closed)`: Side B closed
282 async fn recv_response(&self) -> Result<Resp, ChannelError> {
283 RecvResponse {
284 inner: &self.inner,
285 registered: false,
286 }.await
287 }
288}
289
290/// Request guard that enforces B must reply
291///
292/// This guard ensures that B must call `reply()` before dropping the guard.
293/// If the guard is dropped without replying, it will panic to prevent A from deadlocking.
294///
295/// 强制 B 必须回复的 Guard
296///
297/// 这个 guard 确保 B 必须在丢弃 guard 之前调用 `reply()`。
298/// 如果 guard 在没有回复的情况下被丢弃,会 panic 以防止 A 死锁。
299pub struct RequestGuard<'a, Req, Resp>
300where
301 Req: Send, Resp: Send,
302{
303 inner: &'a Inner<Req, Resp>,
304 req: Option<Req>,
305 replied: bool,
306}
307
308impl<'a, Req, Resp> std::fmt::Debug for RequestGuard<'a, Req, Resp>
309where
310 Req: Send + std::fmt::Debug,
311 Resp: Send,
312{
313 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
314 f.debug_struct("RequestGuard")
315 .field("req", &self.req)
316 .finish_non_exhaustive()
317 }
318}
319
320// PartialEq for testing purposes - comparing RequestGuards doesn't make sense
321// but we need it for Result<RequestGuard, _> comparisons in tests
322impl<'a, Req, Resp> PartialEq for RequestGuard<'a, Req, Resp>
323where
324 Req: Send + PartialEq,
325 Resp: Send,
326{
327 fn eq(&self, other: &Self) -> bool {
328 // Two guards are equal if they hold the same request value
329 // This is mainly for testing purposes
330 self.req == other.req
331 }
332}
333
334impl<'a, Req, Resp> RequestGuard<'a, Req, Resp>
335where
336 Req: Send, Resp: Send,
337{
338 /// Get a reference to the request
339 ///
340 /// 获取请求内容的引用
341 #[inline]
342 pub fn request(&self) -> &Req {
343 self.req.as_ref().expect("RequestGuard logic error: request already consumed")
344 }
345
346 /// Consume the guard and send reply
347 ///
348 /// This method will:
349 /// 1. Store the response
350 /// 2. Update state to RESPONSE_READY
351 /// 3. Wake up side A
352 ///
353 /// 消耗 Guard 并发送回复
354 ///
355 /// 这个方法会:
356 /// 1. 存储响应
357 /// 2. 更新状态为 RESPONSE_READY
358 /// 3. 唤醒 A 方
359 #[inline]
360 pub fn reply(mut self, resp: Resp) {
361 // Store response
362 // SAFETY: Side B has exclusive access to response storage
363 unsafe {
364 (*self.inner.response.get()).write(resp);
365 }
366
367 // Mark state as RESPONSE_READY
368 self.inner.mark_response_ready();
369
370 // Wake up side A
371 self.inner.a_waker.wake();
372
373 // Mark as replied (prevent Drop panic)
374 self.replied = true;
375 }
376
377}
378
379/// Drop guard: If B drops the guard without calling `reply`, we panic.
380/// This enforces the "must reply" protocol.
381///
382/// Drop 守卫:如果 B 不调用 `reply` 就丢弃了 Guard,我们会 panic。
383/// 这强制执行了 "必须回复" 的协议。
384impl<'a, Req, Resp> Drop for RequestGuard<'a, Req, Resp>
385where
386 Req: Send, Resp: Send,
387{
388 fn drop(&mut self) {
389 if !self.replied {
390 // B dropped the guard without replying
391 // This is a protocol error that would cause A to deadlock
392 // We must panic to prevent this
393 panic!("RequestGuard dropped without replying! This would cause the requester to deadlock. You must call reply() before dropping the guard.");
394 }
395 }
396}
397
398impl<Req, Resp> SideB<Req, Resp> {
399 /// Wait for and receive request, returning a guard that must be replied to
400 ///
401 /// The returned `RequestGuard` enforces that you must call `reply()` on it.
402 /// If you drop the guard without calling `reply()`, it will panic.
403 ///
404 /// 等待并接收请求,返回一个必须回复的 guard
405 ///
406 /// 返回的 `RequestGuard` 强制你必须调用 `reply()`。
407 /// 如果你在没有调用 `reply()` 的情况下丢弃 guard,会 panic。
408 ///
409 /// # Returns
410 ///
411 /// - `Ok(RequestGuard)`: Received request from side A
412 /// - `Err(ChannelError::Closed)`: Side A has been closed
413 ///
414 /// # Example
415 ///
416 /// ```
417 /// # use lite_sync::request_response::one_to_one::channel;
418 /// # tokio_test::block_on(async {
419 /// let (side_a, side_b) = channel::<String, i32>();
420 ///
421 /// tokio::spawn(async move {
422 /// while let Ok(guard) = side_b.recv_request().await {
423 /// let len = guard.request().len() as i32;
424 /// guard.reply(len);
425 /// }
426 /// });
427 ///
428 /// let response = side_a.request("Hello".to_string()).await;
429 /// assert_eq!(response, Ok(5));
430 /// # });
431 /// ```
432 pub async fn recv_request(&self) -> Result<RequestGuard<'_, Req, Resp>, ChannelError>
433 where
434 Req: Send,
435 Resp: Send,
436 {
437 let req = RecvRequest {
438 inner: &self.inner,
439 registered: false,
440 }.await?;
441
442 Ok(RequestGuard {
443 inner: &self.inner,
444 req: Some(req),
445 replied: false,
446 })
447 }
448
449 /// Convenient method to handle request and send response
450 ///
451 /// This method will:
452 /// 1. Wait for and receive request
453 /// 2. Call the handler function
454 /// 3. Send the response via the guard
455 ///
456 /// 处理请求并发送响应的便捷方法
457 ///
458 /// 这个方法会:
459 /// 1. 等待并接收请求
460 /// 2. 调用处理函数
461 /// 3. 通过 guard 发送响应
462 ///
463 /// # Returns
464 ///
465 /// - `Ok(())`: Successfully handled request and sent response
466 /// - `Err(ChannelError::Closed)`: Side A closed
467 ///
468 /// # Example
469 ///
470 /// ```
471 /// # use lite_sync::request_response::one_to_one::channel;
472 /// # tokio_test::block_on(async {
473 /// let (side_a, side_b) = channel::<String, i32>();
474 ///
475 /// tokio::spawn(async move {
476 /// while side_b.handle_request(|req| req.len() as i32).await.is_ok() {
477 /// // Continue handling
478 /// }
479 /// });
480 ///
481 /// let response = side_a.request("Hello".to_string()).await;
482 /// assert_eq!(response, Ok(5));
483 /// # });
484 /// ```
485 pub async fn handle_request<F>(&self, handler: F) -> Result<(), ChannelError>
486 where
487 Req: Send,
488 Resp: Send,
489 F: FnOnce(&Req) -> Resp,
490 {
491 let guard = self.recv_request().await?;
492 let resp = handler(guard.request());
493 guard.reply(resp);
494 Ok(())
495 }
496
497 /// Convenient async method to handle request and send response
498 ///
499 /// Similar to `handle_request`, but supports async handler functions.
500 /// Note: The handler takes ownership of the request to avoid lifetime issues.
501 ///
502 /// 处理请求并发送响应的异步便捷方法
503 ///
504 /// 与 `handle_request` 类似,但支持异步处理函数。
505 /// 注意:处理函数会获取请求的所有权以避免生命周期问题。
506 ///
507 /// # Example
508 ///
509 /// ```
510 /// # use lite_sync::request_response::one_to_one::channel;
511 /// # tokio_test::block_on(async {
512 /// let (side_a, side_b) = channel::<String, String>();
513 ///
514 /// tokio::spawn(async move {
515 /// while side_b.handle_request_async(|req| async move {
516 /// // Async processing - req is owned
517 /// req.to_uppercase()
518 /// }).await.is_ok() {
519 /// // Continue handling
520 /// }
521 /// });
522 ///
523 /// let response = side_a.request("hello".to_string()).await;
524 /// assert_eq!(response, Ok("HELLO".to_string()));
525 /// # });
526 /// ```
527 pub async fn handle_request_async<F, Fut>(&self, handler: F) -> Result<(), ChannelError>
528 where
529 Req: Send,
530 Resp: Send,
531 F: FnOnce(Req) -> Fut,
532 Fut: Future<Output = Resp>,
533 {
534 let mut guard = self.recv_request().await?;
535 let req = guard.req.take().expect("RequestGuard logic error: request already consumed");
536 let resp = handler(req).await;
537
538 // Manually send the reply since we've consumed the request
539 unsafe {
540 (*guard.inner.response.get()).write(resp);
541 }
542 guard.inner.mark_response_ready();
543 guard.inner.a_waker.wake();
544 guard.replied = true;
545
546 Ok(())
547 }
548}
549
550// Drop implementations to clean up wakers
551impl<Req, Resp> Drop for SideA<Req, Resp> {
552 fn drop(&mut self) {
553 // Side A closed, wake up side B that might be waiting
554 self.inner.a_closed.store(true, Ordering::Release);
555 self.inner.b_waker.wake();
556 }
557}
558
559impl<Req, Resp> Drop for SideB<Req, Resp> {
560 fn drop(&mut self) {
561 // Side B closed, wake up side A that might be waiting
562 self.inner.b_closed.store(true, Ordering::Release);
563 self.inner.a_waker.wake();
564 }
565}
566
567/// Future: Side A sends request
568struct SendRequest<'a, Req, Resp> {
569 inner: &'a Inner<Req, Resp>,
570 request: Option<Req>,
571 registered: bool,
572}
573
574// SendRequest is Unpin because we only need to move data, not pin it
575impl<Req, Resp> Unpin for SendRequest<'_, Req, Resp> {}
576
577impl<Req, Resp> Future for SendRequest<'_, Req, Resp> {
578 type Output = Result<(), ChannelError>;
579
580 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
581 let this = self.get_mut();
582
583 // Check if side B is closed
584 if this.inner.is_b_closed() {
585 return Poll::Ready(Err(ChannelError::Closed));
586 }
587
588 // Try to send request
589 if this.inner.try_send_request() {
590 // Successfully sent request
591 // SAFETY: We have exclusive access (guaranteed by state machine)
592 unsafe {
593 (*this.inner.request.get()).write(this.request.take().unwrap());
594 }
595
596 // Wake up side B
597 this.inner.b_waker.wake();
598
599 return Poll::Ready(Ok(()));
600 }
601
602 // Channel busy, register waker and wait
603 if !this.registered {
604 this.inner.a_waker.register(cx.waker());
605 this.registered = true;
606
607 // Check again (avoid race condition)
608 if this.inner.is_b_closed() {
609 return Poll::Ready(Err(ChannelError::Closed));
610 }
611
612 if this.inner.current_state() == IDLE {
613 cx.waker().wake_by_ref();
614 }
615 }
616
617 Poll::Pending
618 }
619}
620
621/// Future: Side A receives response
622struct RecvResponse<'a, Req, Resp> {
623 inner: &'a Inner<Req, Resp>,
624 registered: bool,
625}
626
627// RecvResponse is Unpin because it only holds references and a bool
628impl<Req, Resp> Unpin for RecvResponse<'_, Req, Resp> {}
629
630impl<Req, Resp> Future for RecvResponse<'_, Req, Resp> {
631 type Output = Result<Resp, ChannelError>;
632
633 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
634 // Check if state is RESPONSE_READY first
635 // Try to receive sent response even if B closed
636 if self.inner.current_state() == RESPONSE_READY {
637 // Take the response
638 // SAFETY: Response must exist when state is RESPONSE_READY
639 let response = unsafe {
640 (*self.inner.response.get()).assume_init_read()
641 };
642
643 // Reset to IDLE for next round
644 self.inner.complete_response();
645
646 return Poll::Ready(Ok(response));
647 }
648
649 // Check if B closed (after confirming no response)
650 if self.inner.is_b_closed() {
651 return Poll::Ready(Err(ChannelError::Closed));
652 }
653
654 // Register waker
655 if !self.registered {
656 self.inner.a_waker.register(cx.waker());
657 self.registered = true;
658
659 // Check state again (avoid race condition)
660 if self.inner.current_state() == RESPONSE_READY {
661 let response = unsafe {
662 (*self.inner.response.get()).assume_init_read()
663 };
664 self.inner.complete_response();
665 return Poll::Ready(Ok(response));
666 }
667
668 // Check again if B closed
669 if self.inner.is_b_closed() {
670 return Poll::Ready(Err(ChannelError::Closed));
671 }
672 }
673
674 Poll::Pending
675 }
676}
677
678/// Future: Side B receives request
679struct RecvRequest<'a, Req, Resp> {
680 inner: &'a Inner<Req, Resp>,
681 registered: bool,
682}
683
684// RecvRequest is Unpin because it only holds references and a bool
685impl<Req, Resp> Unpin for RecvRequest<'_, Req, Resp> {}
686
687impl<Req, Resp> Future for RecvRequest<'_, Req, Resp> {
688 type Output = Result<Req, ChannelError>;
689
690 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
691 // Check state first, then access data (fixes race condition)
692 // Try to receive sent request even if A closed
693 if self.inner.current_state() == WAITING_RESPONSE {
694 // Take the request
695 // SAFETY: Request must exist when state is WAITING_RESPONSE
696 let request = unsafe {
697 (*self.inner.request.get()).assume_init_read()
698 };
699
700 return Poll::Ready(Ok(request));
701 }
702
703 // Check if A closed (after confirming no request)
704 if self.inner.is_a_closed() {
705 return Poll::Ready(Err(ChannelError::Closed));
706 }
707
708 // Register waker
709 if !self.registered {
710 self.inner.b_waker.register(cx.waker());
711 self.registered = true;
712
713 // Check again (avoid race condition)
714 if self.inner.current_state() == WAITING_RESPONSE {
715 let request = unsafe {
716 (*self.inner.request.get()).assume_init_read()
717 };
718 return Poll::Ready(Ok(request));
719 }
720
721 // Check again if A closed
722 if self.inner.is_a_closed() {
723 return Poll::Ready(Err(ChannelError::Closed));
724 }
725 }
726
727 Poll::Pending
728 }
729}
730
731
732#[cfg(test)]
733mod tests {
734 use super::*;
735 use tokio::time::{sleep, Duration};
736
737 #[tokio::test]
738 async fn test_basic_request_response() {
739 let (side_a, side_b) = channel::<String, i32>();
740
741 tokio::spawn(async move {
742 let guard = side_b.recv_request().await.unwrap();
743 assert_eq!(guard.request(), "Hello");
744 guard.reply(42);
745 });
746
747 let response = side_a.request("Hello".to_string()).await;
748 assert_eq!(response, Ok(42));
749 }
750
751 #[tokio::test]
752 async fn test_multiple_rounds() {
753 let (side_a, side_b) = channel::<i32, i32>();
754
755 tokio::spawn(async move {
756 for i in 0..5 {
757 let guard = side_b.recv_request().await.unwrap();
758 assert_eq!(*guard.request(), i);
759 guard.reply(i * 2);
760 }
761 });
762
763 for i in 0..5 {
764 let response = side_a.request(i).await;
765 assert_eq!(response, Ok(i * 2));
766 }
767 }
768
769 #[tokio::test]
770 async fn test_delayed_response() {
771 let (side_a, side_b) = channel::<String, String>();
772
773 tokio::spawn(async move {
774 let guard = side_b.recv_request().await.unwrap();
775 sleep(Duration::from_millis(50)).await;
776 let response = guard.request().to_uppercase();
777 guard.reply(response);
778 });
779
780 let response = side_a.request("hello".to_string()).await;
781 assert_eq!(response, Ok("HELLO".to_string()));
782 }
783
784 #[tokio::test]
785 async fn test_side_b_closes() {
786 let (side_a, side_b) = channel::<i32, i32>();
787
788 // Side B closes immediately
789 drop(side_b);
790
791 // Side A should receive Err
792 let response = side_a.request(42).await;
793 assert_eq!(response, Err(ChannelError::Closed));
794 }
795
796 #[tokio::test]
797 async fn test_side_a_closes() {
798 let (side_a, side_b) = channel::<i32, i32>();
799
800 // Side A closes immediately
801 drop(side_a);
802
803 // Side B should receive Err
804 let request = side_b.recv_request().await;
805 assert_eq!(request, Err(ChannelError::Closed));
806 }
807
808 #[tokio::test]
809 async fn test_concurrent_requests() {
810 let (side_a, side_b) = channel::<i32, i32>();
811
812 let handle_b = tokio::spawn(async move {
813 let mut count = 0;
814 loop {
815 if let Ok(guard) = side_b.recv_request().await {
816 count += 1;
817 let response = *guard.request() * 2;
818 guard.reply(response);
819 } else {
820 break;
821 }
822 }
823 count
824 });
825
826 let handle_a = tokio::spawn(async move {
827 for i in 0..10 {
828 let response = side_a.request(i).await.unwrap();
829 assert_eq!(response, i * 2);
830 }
831 drop(side_a);
832 });
833
834 handle_a.await.unwrap();
835 let count = handle_b.await.unwrap();
836 assert_eq!(count, 10);
837 }
838
839 #[tokio::test]
840 async fn test_string_messages() {
841 let (side_a, side_b) = channel::<String, String>();
842
843 tokio::spawn(async move {
844 loop {
845 if let Ok(guard) = side_b.recv_request().await {
846 let response = format!("Echo: {}", guard.request());
847 guard.reply(response);
848 } else {
849 break;
850 }
851 }
852 });
853
854 let messages = vec!["Hello", "World", "Rust"];
855 for msg in messages {
856 let response = side_a.request(msg.to_string()).await.unwrap();
857 assert_eq!(response, format!("Echo: {}", msg));
858 }
859 }
860
861 #[tokio::test]
862 async fn test_handle_request() {
863 let (side_a, side_b) = channel::<i32, i32>();
864
865 tokio::spawn(async move {
866 // Using handle_request convenience method
867 while side_b.handle_request(|req| req * 3).await.is_ok() {
868 // Continue handling
869 }
870 });
871
872 for i in 0..5 {
873 let response = side_a.request(i).await.unwrap();
874 assert_eq!(response, i * 3);
875 }
876 }
877
878 #[tokio::test]
879 async fn test_handle_request_async() {
880 let (side_a, side_b) = channel::<String, usize>();
881
882 tokio::spawn(async move {
883 // Using handle_request_async async convenience method
884 while side_b.handle_request_async(|req| async move {
885 sleep(Duration::from_millis(10)).await;
886 req.len()
887 }).await.is_ok() {
888 // Continue handling
889 }
890 });
891
892 let test_strings = vec!["Hello", "World", "Rust", "Async"];
893 for s in test_strings {
894 let response = side_a.request(s.to_string()).await.unwrap();
895 assert_eq!(response, s.len());
896 }
897 }
898
899 #[tokio::test]
900 async fn test_error_display() {
901 // Test Display implementation for error types
902 assert_eq!(format!("{}", ChannelError::Closed), "channel closed");
903 }
904
905 #[tokio::test]
906 async fn test_multiple_handle_request_rounds() {
907 let (side_a, side_b) = channel::<String, String>();
908
909 let handle = tokio::spawn(async move {
910 let mut count = 0;
911 // Manually handle to maintain state
912 while let Ok(guard) = side_b.recv_request().await {
913 count += 1;
914 let resp = format!("{}:{}", count, guard.request().to_uppercase());
915 guard.reply(resp);
916 }
917 count
918 });
919
920 let response1 = side_a.request("hello".to_string()).await.unwrap();
921 assert_eq!(response1, "1:HELLO");
922
923 let response2 = side_a.request("world".to_string()).await.unwrap();
924 assert_eq!(response2, "2:WORLD");
925
926 let response3 = side_a.request("rust".to_string()).await.unwrap();
927 assert_eq!(response3, "3:RUST");
928
929 drop(side_a);
930 let count = handle.await.unwrap();
931 assert_eq!(count, 3);
932 }
933
934 #[tokio::test]
935 async fn test_request_guard_must_reply() {
936 let (side_a, side_b) = channel::<i32, i32>();
937
938 let handle = tokio::spawn(async move {
939 let _guard = side_b.recv_request().await.unwrap();
940 // Intentionally not calling reply() - this should panic
941 });
942
943 // Send a request
944 tokio::spawn(async move {
945 let _ = side_a.request(42).await;
946 });
947
948 // Wait for the spawned task and verify it panicked
949 let result = handle.await;
950 assert!(result.is_err(), "Task should have panicked");
951
952 // Verify the panic message contains our expected text
953 if let Err(e) = result {
954 if let Ok(panic_payload) = e.try_into_panic() {
955 if let Some(s) = panic_payload.downcast_ref::<String>() {
956 assert!(s.contains("RequestGuard dropped without replying"),
957 "Panic message should mention RequestGuard: {}", s);
958 } else if let Some(s) = panic_payload.downcast_ref::<&str>() {
959 assert!(s.contains("RequestGuard dropped without replying"),
960 "Panic message should mention RequestGuard: {}", s);
961 } else {
962 panic!("Unexpected panic type");
963 }
964 }
965 }
966 }
967}