lite_sync/oneshot/generic.rs
1use std::sync::Arc;
2use std::sync::atomic::{AtomicU8, Ordering};
3use std::future::Future;
4use std::pin::Pin;
5use std::task::{Context, Poll};
6use std::cell::UnsafeCell;
7use std::mem::MaybeUninit;
8
9use crate::atomic_waker::AtomicWaker;
10
11// States for the value cell
12const EMPTY: u8 = 0; // No value stored
13const READY: u8 = 1; // Value is ready
14
15/// Inner state for one-shot value transfer
16///
17/// Uses UnsafeCell<MaybeUninit<T>> for direct value storage without any overhead:
18/// - Waker itself is just 2 pointers (16 bytes on 64-bit), no additional heap allocation
19/// - Value is stored directly with zero overhead (no Option discriminant)
20/// - Atomic state machine (AtomicU8) tracks initialization state
21/// - UnsafeCell allows interior mutability with proper synchronization
22///
23/// 一次性值传递的内部状态
24///
25/// 使用 UnsafeCell<MaybeUninit<T>> 实现零开销的直接值存储:
26/// - Waker 本身只是 2 个指针(64 位系统上 16 字节),无额外堆分配
27/// - 值直接存储,零开销(无 Option 判别标记)
28/// - 原子状态机(AtomicU8)跟踪初始化状态
29/// - UnsafeCell 允许通过适当同步实现内部可变性
30pub(crate) struct Inner<T> {
31 waker: AtomicWaker,
32 state: AtomicU8,
33 value: UnsafeCell<MaybeUninit<T>>,
34}
35
36impl<T> Inner<T> {
37 /// Create a new oneshot inner state
38 ///
39 /// Extremely fast: just initializes empty waker and uninitialized memory
40 ///
41 /// 创建一个新的 oneshot 内部状态
42 ///
43 /// 极快:仅初始化空 waker 和未初始化内存
44 #[inline]
45 pub(crate) fn new() -> Arc<Self> {
46 Arc::new(Self {
47 waker: AtomicWaker::new(),
48 state: AtomicU8::new(EMPTY),
49 value: UnsafeCell::new(MaybeUninit::uninit()),
50 })
51 }
52
53 /// Send a value (store value and wake)
54 ///
55 /// 发送一个值(存储值并唤醒)
56 #[inline]
57 pub(crate) fn send(&self, value: T) -> Result<(), T> {
58 // Try to transition from EMPTY to READY
59 match self.state.compare_exchange(
60 EMPTY,
61 READY,
62 Ordering::AcqRel,
63 Ordering::Acquire,
64 ) {
65 Ok(_) => {
66 // Successfully transitioned, store the value
67 // SAFETY: We have exclusive access via the state transition
68 // No other thread can access the value until state is READY
69 // We're initializing previously uninitialized memory
70 unsafe {
71 (*self.value.get()).write(value);
72 }
73
74 // Wake the registered waker if any
75 self.waker.wake();
76 Ok(())
77 }
78 Err(_) => {
79 // State was not EMPTY, value already sent
80 Err(value)
81 }
82 }
83 }
84
85 /// Try to receive the value without blocking
86 ///
87 /// 尝试接收值而不阻塞
88 #[inline]
89 fn try_recv(&self) -> Option<T> {
90 // Check if value is ready
91 if self.state.load(Ordering::Acquire) == READY {
92 // Try to transition from READY back to EMPTY to claim the value
93 match self.state.compare_exchange(
94 READY,
95 EMPTY,
96 Ordering::AcqRel,
97 Ordering::Acquire,
98 ) {
99 Ok(_) => {
100 // Successfully claimed the value
101 // SAFETY: We have exclusive access via the state transition
102 // State was READY, so value must be initialized
103 unsafe {
104 Some((*self.value.get()).assume_init_read())
105 }
106 }
107 Err(_) => {
108 // Someone else already claimed it
109 None
110 }
111 }
112 } else {
113 None
114 }
115 }
116
117 /// Register a waker to be notified on completion
118 ///
119 /// 注册一个 waker 以在完成时收到通知
120 #[inline]
121 fn register_waker(&self, waker: &std::task::Waker) {
122 self.waker.register(waker);
123 }
124}
125
126// SAFETY: Inner<T> is Send + Sync as long as T is Send
127// - UnsafeCell<MaybeUninit<T>> is protected by atomic state transitions
128// - Only one thread can access the value at a time (enforced by state machine)
129// - Value is only accessed when state is READY (initialized)
130// - Waker is properly synchronized via AtomicWaker
131//
132// SAFETY: 只要 T 是 Send,Inner<T> 就是 Send + Sync
133// - UnsafeCell<MaybeUninit<T>> 由原子状态转换保护
134// - 每次只有一个线程可以访问值(由状态机强制执行)
135// - 只有在状态为 READY(已初始化)时才访问值
136// - Waker 通过 AtomicWaker 正确同步
137unsafe impl<T: Send> Send for Inner<T> {}
138unsafe impl<T: Send> Sync for Inner<T> {}
139
140impl<T> Drop for Inner<T> {
141 fn drop(&mut self) {
142 // Clean up the value if it was sent but not received
143 // SAFETY: We have exclusive access in drop (&mut self)
144 // Only drop if state is READY (value was initialized)
145 if *self.state.get_mut() == READY {
146 unsafe {
147 // Value is initialized, must drop it
148 (*self.value.get()).assume_init_drop();
149 }
150 }
151 // If state is EMPTY, value is uninitialized, nothing to drop
152 }
153}
154
155impl<T> std::fmt::Debug for Inner<T> {
156 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
157 let state = self.state.load(Ordering::Acquire);
158 let state_str = match state {
159 EMPTY => "Empty",
160 READY => "Ready",
161 _ => "Unknown",
162 };
163 f.debug_struct("Inner")
164 .field("state", &state_str)
165 .finish()
166 }
167}
168
169/// Error returned when the receiver is dropped before receiving a value
170///
171/// 当接收器在接收值之前被丢弃时返回的错误
172#[derive(Debug, Clone, Copy, PartialEq, Eq)]
173pub struct RecvError;
174
175impl std::fmt::Display for RecvError {
176 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
177 write!(f, "channel closed")
178 }
179}
180
181impl std::error::Error for RecvError {}
182
183#[inline]
184pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
185 Sender::new()
186}
187
188/// Sender for one-shot value transfer
189///
190/// 一次性值传递的发送器
191pub struct Sender<T> {
192 inner: Arc<Inner<T>>,
193}
194
195impl<T> std::fmt::Debug for Sender<T> {
196 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
197 let state = self.inner.state.load(Ordering::Acquire);
198 let state_str = match state {
199 EMPTY => "Empty",
200 READY => "Ready",
201 _ => "Unknown",
202 };
203 f.debug_struct("Sender")
204 .field("state", &state_str)
205 .finish()
206 }
207}
208
209impl<T> Sender<T> {
210 /// Create a new oneshot sender with receiver
211 ///
212 /// 创建一个新的 oneshot 发送器和接收器
213 ///
214 /// # Returns
215 /// Returns a tuple of (sender, receiver)
216 ///
217 /// 返回 (发送器, 接收器) 元组
218 #[inline]
219 pub fn new() -> (Self, Receiver<T>) {
220 let inner = Inner::new();
221
222 let sender = Sender {
223 inner: inner.clone(),
224 };
225 let receiver = Receiver {
226 inner,
227 };
228
229 (sender, receiver)
230 }
231
232 /// Send a value through the channel
233 ///
234 /// Returns `Err(value)` if the receiver was already dropped
235 ///
236 /// 通过通道发送一个值
237 ///
238 /// 如果接收器已被丢弃则返回 `Err(value)`
239 #[inline]
240 pub fn send(self, value: T) -> Result<(), T> {
241 self.inner.send(value)
242 }
243}
244
245/// Receiver for one-shot value transfer
246///
247/// Implements `Future` directly, allowing direct `.await` usage on both owned values and mutable references
248///
249/// 一次性值传递的接收器
250///
251/// 直接实现了 `Future`,允许对拥有的值和可变引用都直接使用 `.await`
252///
253/// # Examples
254///
255/// ## Basic usage
256///
257/// ```
258/// use lite_sync::oneshot::generic::Sender;
259///
260/// # tokio_test::block_on(async {
261/// let (sender, receiver) = Sender::<String>::new();
262///
263/// tokio::spawn(async move {
264/// sender.send("Hello, World!".to_string()).unwrap();
265/// });
266///
267/// // Direct await via Future impl
268/// let message = receiver.await;
269/// assert_eq!(message, "Hello, World!");
270/// # });
271/// ```
272///
273/// ## Awaiting on mutable reference
274///
275/// ```
276/// use lite_sync::oneshot::generic::Sender;
277///
278/// # tokio_test::block_on(async {
279/// let (sender, mut receiver) = Sender::<i32>::new();
280///
281/// tokio::spawn(async move {
282/// sender.send(42).unwrap();
283/// });
284///
285/// // Can also await on &mut receiver
286/// let value = (&mut receiver).await;
287/// assert_eq!(value, 42);
288/// # });
289/// ```
290pub struct Receiver<T> {
291 inner: Arc<Inner<T>>,
292}
293
294impl<T> std::fmt::Debug for Receiver<T> {
295 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
296 let state = self.inner.state.load(Ordering::Acquire);
297 let state_str = match state {
298 EMPTY => "Empty",
299 READY => "Ready",
300 _ => "Unknown",
301 };
302 f.debug_struct("Receiver")
303 .field("state", &state_str)
304 .finish()
305 }
306}
307
308// Receiver is Unpin because all its fields are Unpin
309impl<T> Unpin for Receiver<T> {}
310
311impl<T> Receiver<T> {
312 /// Wait for value asynchronously
313 ///
314 /// This is equivalent to using `.await` directly on the receiver
315 ///
316 /// 异步等待值
317 ///
318 /// 这等同于直接在 receiver 上使用 `.await`
319 ///
320 /// # Returns
321 /// Returns the received value
322 ///
323 /// # 返回值
324 /// 返回接收到的值
325 #[inline]
326 pub async fn wait(self) -> T {
327 self.await
328 }
329
330 /// Try to receive a value without blocking
331 ///
332 /// Returns `None` if no value has been sent yet
333 ///
334 /// 尝试接收值而不阻塞
335 ///
336 /// 如果还没有发送值则返回 `None`
337 #[inline]
338 pub fn try_recv(&mut self) -> Option<T> {
339 self.inner.try_recv()
340 }
341}
342
343/// Direct Future implementation for Receiver
344///
345/// This allows both `receiver.await` and `(&mut receiver).await` to work
346///
347/// Optimized implementation:
348/// - Fast path: Immediate return if value already sent (no allocation)
349/// - Slow path: Direct waker registration (no Box allocation, just copy two pointers)
350/// - No intermediate future state needed
351///
352/// 为 Receiver 直接实现 Future
353///
354/// 这允许 `receiver.await` 和 `(&mut receiver).await` 都能工作
355///
356/// 优化实现:
357/// - 快速路径:如值已发送则立即返回(无分配)
358/// - 慢速路径:直接注册 waker(无 Box 分配,只复制两个指针)
359/// - 无需中间 future 状态
360impl<T> Future for Receiver<T> {
361 type Output = T;
362
363 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
364 // SAFETY: Receiver is Unpin, so we can safely get a mutable reference
365 let this = self.get_mut();
366
367 // Fast path: check if value already sent
368 if let Some(value) = this.inner.try_recv() {
369 return Poll::Ready(value);
370 }
371
372 // Slow path: register waker for notification
373 this.inner.register_waker(cx.waker());
374
375 // Check again after registering waker to avoid race condition
376 // The sender might have sent between our first check and waker registration
377 if let Some(value) = this.inner.try_recv() {
378 return Poll::Ready(value);
379 }
380
381 Poll::Pending
382 }
383}
384
385#[cfg(test)]
386mod tests {
387 use super::*;
388
389 #[tokio::test]
390 async fn test_oneshot_string() {
391 let (sender, receiver) = Sender::<String>::new();
392
393 tokio::spawn(async move {
394 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
395 sender.send("Hello".to_string()).unwrap();
396 });
397
398 let result = receiver.wait().await;
399 assert_eq!(result, "Hello");
400 }
401
402 #[tokio::test]
403 async fn test_oneshot_integer() {
404 let (sender, receiver) = Sender::<i32>::new();
405
406 tokio::spawn(async move {
407 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
408 sender.send(42).unwrap();
409 });
410
411 let result = receiver.wait().await;
412 assert_eq!(result, 42);
413 }
414
415 #[tokio::test]
416 async fn test_oneshot_immediate() {
417 let (sender, receiver) = Sender::<String>::new();
418
419 // Send before waiting (fast path)
420 sender.send("Immediate".to_string()).unwrap();
421
422 let result = receiver.wait().await;
423 assert_eq!(result, "Immediate");
424 }
425
426 #[tokio::test]
427 async fn test_oneshot_custom_struct() {
428 #[derive(Debug, Clone, PartialEq)]
429 struct CustomData {
430 id: u64,
431 name: String,
432 }
433
434 let (sender, receiver) = Sender::<CustomData>::new();
435
436 let data = CustomData {
437 id: 123,
438 name: "Test".to_string(),
439 };
440
441 tokio::spawn(async move {
442 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
443 sender.send(data).unwrap();
444 });
445
446 let result = receiver.wait().await;
447 assert_eq!(result.id, 123);
448 assert_eq!(result.name, "Test");
449 }
450
451 #[tokio::test]
452 async fn test_oneshot_direct_await() {
453 let (sender, receiver) = Sender::<i32>::new();
454
455 tokio::spawn(async move {
456 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
457 sender.send(99).unwrap();
458 });
459
460 // Direct await without .wait()
461 let result = receiver.await;
462 assert_eq!(result, 99);
463 }
464
465 #[tokio::test]
466 async fn test_oneshot_await_mut_reference() {
467 let (sender, mut receiver) = Sender::<String>::new();
468
469 tokio::spawn(async move {
470 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
471 sender.send("Mutable".to_string()).unwrap();
472 });
473
474 // Await on mutable reference
475 let result = (&mut receiver).await;
476 assert_eq!(result, "Mutable");
477 }
478
479 #[tokio::test]
480 async fn test_oneshot_immediate_await() {
481 let (sender, receiver) = Sender::<Vec<u8>>::new();
482
483 // Immediate send (fast path)
484 sender.send(vec![1, 2, 3]).unwrap();
485
486 // Direct await
487 let result = receiver.await;
488 assert_eq!(result, vec![1, 2, 3]);
489 }
490
491 #[tokio::test]
492 async fn test_oneshot_try_recv() {
493 let (sender, mut receiver) = Sender::<i32>::new();
494
495 // Try receive before sending
496 assert_eq!(receiver.try_recv(), None);
497
498 // Send value
499 sender.send(42).unwrap();
500
501 // Try receive after sending
502 assert_eq!(receiver.try_recv(), Some(42));
503 }
504
505 #[tokio::test]
506 async fn test_oneshot_large_data() {
507 let (sender, receiver) = Sender::<Vec<u8>>::new();
508
509 let large_vec = vec![0u8; 1024 * 1024]; // 1MB
510
511 tokio::spawn(async move {
512 sender.send(large_vec).unwrap();
513 });
514
515 let result = receiver.await;
516 assert_eq!(result.len(), 1024 * 1024);
517 }
518}