lite_sync/oneshot/generic.rs
1use std::sync::Arc;
2use std::sync::atomic::{AtomicU8, Ordering};
3use std::future::Future;
4use std::pin::Pin;
5use std::task::{Context, Poll};
6use std::cell::UnsafeCell;
7use std::mem::MaybeUninit;
8
9use crate::atomic_waker::AtomicWaker;
10
11// States for the value cell
12const EMPTY: u8 = 0; // No value stored
13const READY: u8 = 1; // Value is ready
14
15/// Inner state for one-shot value transfer
16///
17/// Uses UnsafeCell<MaybeUninit<T>> for direct value storage without any overhead:
18/// - Waker itself is just 2 pointers (16 bytes on 64-bit), no additional heap allocation
19/// - Value is stored directly with zero overhead (no Option discriminant)
20/// - Atomic state machine (AtomicU8) tracks initialization state
21/// - UnsafeCell allows interior mutability with proper synchronization
22///
23/// 一次性值传递的内部状态
24///
25/// 使用 UnsafeCell<MaybeUninit<T>> 实现零开销的直接值存储:
26/// - Waker 本身只是 2 个指针(64 位系统上 16 字节),无额外堆分配
27/// - 值直接存储,零开销(无 Option 判别标记)
28/// - 原子状态机(AtomicU8)跟踪初始化状态
29/// - UnsafeCell 允许通过适当同步实现内部可变性
30pub(crate) struct Inner<T> {
31 waker: AtomicWaker,
32 state: AtomicU8,
33 value: UnsafeCell<MaybeUninit<T>>,
34}
35
36impl<T> Inner<T> {
37 /// Create a new oneshot inner state
38 ///
39 /// Extremely fast: just initializes empty waker and uninitialized memory
40 ///
41 /// 创建一个新的 oneshot 内部状态
42 ///
43 /// 极快:仅初始化空 waker 和未初始化内存
44 #[inline]
45 pub(crate) fn new() -> Arc<Self> {
46 Arc::new(Self {
47 waker: AtomicWaker::new(),
48 state: AtomicU8::new(EMPTY),
49 value: UnsafeCell::new(MaybeUninit::uninit()),
50 })
51 }
52
53 /// Send a value (store value and wake)
54 ///
55 /// 发送一个值(存储值并唤醒)
56 #[inline]
57 pub(crate) fn send(&self, value: T) -> Result<(), T> {
58 // Try to transition from EMPTY to READY
59 match self.state.compare_exchange(
60 EMPTY,
61 READY,
62 Ordering::AcqRel,
63 Ordering::Acquire,
64 ) {
65 Ok(_) => {
66 // Successfully transitioned, store the value
67 // SAFETY: We have exclusive access via the state transition
68 // No other thread can access the value until state is READY
69 // We're initializing previously uninitialized memory
70 unsafe {
71 (*self.value.get()).write(value);
72 }
73
74 // Wake the registered waker if any
75 self.waker.wake();
76 Ok(())
77 }
78 Err(_) => {
79 // State was not EMPTY, value already sent
80 Err(value)
81 }
82 }
83 }
84
85 /// Try to receive the value without blocking
86 ///
87 /// 尝试接收值而不阻塞
88 #[inline]
89 fn try_recv(&self) -> Option<T> {
90 // Check if value is ready
91 if self.state.load(Ordering::Acquire) == READY {
92 // Try to transition from READY back to EMPTY to claim the value
93 match self.state.compare_exchange(
94 READY,
95 EMPTY,
96 Ordering::AcqRel,
97 Ordering::Acquire,
98 ) {
99 Ok(_) => {
100 // Successfully claimed the value
101 // SAFETY: We have exclusive access via the state transition
102 // State was READY, so value must be initialized
103 unsafe {
104 Some((*self.value.get()).assume_init_read())
105 }
106 }
107 Err(_) => {
108 // Someone else already claimed it
109 None
110 }
111 }
112 } else {
113 None
114 }
115 }
116
117 /// Register a waker to be notified on completion
118 ///
119 /// 注册一个 waker 以在完成时收到通知
120 #[inline]
121 fn register_waker(&self, waker: &std::task::Waker) {
122 self.waker.register(waker);
123 }
124}
125
126// SAFETY: Inner<T> is Send + Sync as long as T is Send
127// - UnsafeCell<MaybeUninit<T>> is protected by atomic state transitions
128// - Only one thread can access the value at a time (enforced by state machine)
129// - Value is only accessed when state is READY (initialized)
130// - Waker is properly synchronized via AtomicWaker
131//
132// SAFETY: 只要 T 是 Send,Inner<T> 就是 Send + Sync
133// - UnsafeCell<MaybeUninit<T>> 由原子状态转换保护
134// - 每次只有一个线程可以访问值(由状态机强制执行)
135// - 只有在状态为 READY(已初始化)时才访问值
136// - Waker 通过 AtomicWaker 正确同步
137unsafe impl<T: Send> Send for Inner<T> {}
138unsafe impl<T: Send> Sync for Inner<T> {}
139
140impl<T> Drop for Inner<T> {
141 fn drop(&mut self) {
142 // Clean up the value if it was sent but not received
143 // SAFETY: We have exclusive access in drop (&mut self)
144 // Only drop if state is READY (value was initialized)
145 if *self.state.get_mut() == READY {
146 unsafe {
147 // Value is initialized, must drop it
148 (*self.value.get()).assume_init_drop();
149 }
150 }
151 // If state is EMPTY, value is uninitialized, nothing to drop
152 }
153}
154
155/// Error returned when the receiver is dropped before receiving a value
156///
157/// 当接收器在接收值之前被丢弃时返回的错误
158#[derive(Debug, Clone, Copy, PartialEq, Eq)]
159pub struct RecvError;
160
161impl std::fmt::Display for RecvError {
162 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
163 write!(f, "channel closed")
164 }
165}
166
167impl std::error::Error for RecvError {}
168
169#[inline]
170pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
171 Sender::new()
172}
173
174/// Sender for one-shot value transfer
175///
176/// 一次性值传递的发送器
177pub struct Sender<T> {
178 inner: Arc<Inner<T>>,
179}
180
181impl<T> Sender<T> {
182 /// Create a new oneshot sender with receiver
183 ///
184 /// 创建一个新的 oneshot 发送器和接收器
185 ///
186 /// # Returns
187 /// Returns a tuple of (sender, receiver)
188 ///
189 /// 返回 (发送器, 接收器) 元组
190 #[inline]
191 pub fn new() -> (Self, Receiver<T>) {
192 let inner = Inner::new();
193
194 let sender = Sender {
195 inner: inner.clone(),
196 };
197 let receiver = Receiver {
198 inner,
199 };
200
201 (sender, receiver)
202 }
203
204 /// Send a value through the channel
205 ///
206 /// Returns `Err(value)` if the receiver was already dropped
207 ///
208 /// 通过通道发送一个值
209 ///
210 /// 如果接收器已被丢弃则返回 `Err(value)`
211 #[inline]
212 pub fn send(self, value: T) -> Result<(), T> {
213 self.inner.send(value)
214 }
215}
216
217/// Receiver for one-shot value transfer
218///
219/// Implements `Future` directly, allowing direct `.await` usage on both owned values and mutable references
220///
221/// 一次性值传递的接收器
222///
223/// 直接实现了 `Future`,允许对拥有的值和可变引用都直接使用 `.await`
224///
225/// # Examples
226///
227/// ## Basic usage
228///
229/// ```
230/// use lite_sync::oneshot::generic::Sender;
231///
232/// # tokio_test::block_on(async {
233/// let (sender, receiver) = Sender::<String>::new();
234///
235/// tokio::spawn(async move {
236/// sender.send("Hello, World!".to_string()).unwrap();
237/// });
238///
239/// // Direct await via Future impl
240/// let message = receiver.await;
241/// assert_eq!(message, "Hello, World!");
242/// # });
243/// ```
244///
245/// ## Awaiting on mutable reference
246///
247/// ```
248/// use lite_sync::oneshot::generic::Sender;
249///
250/// # tokio_test::block_on(async {
251/// let (sender, mut receiver) = Sender::<i32>::new();
252///
253/// tokio::spawn(async move {
254/// sender.send(42).unwrap();
255/// });
256///
257/// // Can also await on &mut receiver
258/// let value = (&mut receiver).await;
259/// assert_eq!(value, 42);
260/// # });
261/// ```
262pub struct Receiver<T> {
263 inner: Arc<Inner<T>>,
264}
265
266// Receiver is Unpin because all its fields are Unpin
267impl<T> Unpin for Receiver<T> {}
268
269impl<T> Receiver<T> {
270 /// Wait for value asynchronously
271 ///
272 /// This is equivalent to using `.await` directly on the receiver
273 ///
274 /// 异步等待值
275 ///
276 /// 这等同于直接在 receiver 上使用 `.await`
277 ///
278 /// # Returns
279 /// Returns the received value
280 ///
281 /// # 返回值
282 /// 返回接收到的值
283 #[inline]
284 pub async fn wait(self) -> T {
285 self.await
286 }
287
288 /// Try to receive a value without blocking
289 ///
290 /// Returns `None` if no value has been sent yet
291 ///
292 /// 尝试接收值而不阻塞
293 ///
294 /// 如果还没有发送值则返回 `None`
295 #[inline]
296 pub fn try_recv(&mut self) -> Option<T> {
297 self.inner.try_recv()
298 }
299}
300
301/// Direct Future implementation for Receiver
302///
303/// This allows both `receiver.await` and `(&mut receiver).await` to work
304///
305/// Optimized implementation:
306/// - Fast path: Immediate return if value already sent (no allocation)
307/// - Slow path: Direct waker registration (no Box allocation, just copy two pointers)
308/// - No intermediate future state needed
309///
310/// 为 Receiver 直接实现 Future
311///
312/// 这允许 `receiver.await` 和 `(&mut receiver).await` 都能工作
313///
314/// 优化实现:
315/// - 快速路径:如值已发送则立即返回(无分配)
316/// - 慢速路径:直接注册 waker(无 Box 分配,只复制两个指针)
317/// - 无需中间 future 状态
318impl<T> Future for Receiver<T> {
319 type Output = T;
320
321 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
322 // SAFETY: Receiver is Unpin, so we can safely get a mutable reference
323 let this = self.get_mut();
324
325 // Fast path: check if value already sent
326 if let Some(value) = this.inner.try_recv() {
327 return Poll::Ready(value);
328 }
329
330 // Slow path: register waker for notification
331 this.inner.register_waker(cx.waker());
332
333 // Check again after registering waker to avoid race condition
334 // The sender might have sent between our first check and waker registration
335 if let Some(value) = this.inner.try_recv() {
336 return Poll::Ready(value);
337 }
338
339 Poll::Pending
340 }
341}
342
343#[cfg(test)]
344mod tests {
345 use super::*;
346
347 #[tokio::test]
348 async fn test_oneshot_string() {
349 let (sender, receiver) = Sender::<String>::new();
350
351 tokio::spawn(async move {
352 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
353 sender.send("Hello".to_string()).unwrap();
354 });
355
356 let result = receiver.wait().await;
357 assert_eq!(result, "Hello");
358 }
359
360 #[tokio::test]
361 async fn test_oneshot_integer() {
362 let (sender, receiver) = Sender::<i32>::new();
363
364 tokio::spawn(async move {
365 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
366 sender.send(42).unwrap();
367 });
368
369 let result = receiver.wait().await;
370 assert_eq!(result, 42);
371 }
372
373 #[tokio::test]
374 async fn test_oneshot_immediate() {
375 let (sender, receiver) = Sender::<String>::new();
376
377 // Send before waiting (fast path)
378 sender.send("Immediate".to_string()).unwrap();
379
380 let result = receiver.wait().await;
381 assert_eq!(result, "Immediate");
382 }
383
384 #[tokio::test]
385 async fn test_oneshot_custom_struct() {
386 #[derive(Debug, Clone, PartialEq)]
387 struct CustomData {
388 id: u64,
389 name: String,
390 }
391
392 let (sender, receiver) = Sender::<CustomData>::new();
393
394 let data = CustomData {
395 id: 123,
396 name: "Test".to_string(),
397 };
398
399 tokio::spawn(async move {
400 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
401 sender.send(data).unwrap();
402 });
403
404 let result = receiver.wait().await;
405 assert_eq!(result.id, 123);
406 assert_eq!(result.name, "Test");
407 }
408
409 #[tokio::test]
410 async fn test_oneshot_direct_await() {
411 let (sender, receiver) = Sender::<i32>::new();
412
413 tokio::spawn(async move {
414 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
415 sender.send(99).unwrap();
416 });
417
418 // Direct await without .wait()
419 let result = receiver.await;
420 assert_eq!(result, 99);
421 }
422
423 #[tokio::test]
424 async fn test_oneshot_await_mut_reference() {
425 let (sender, mut receiver) = Sender::<String>::new();
426
427 tokio::spawn(async move {
428 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
429 sender.send("Mutable".to_string()).unwrap();
430 });
431
432 // Await on mutable reference
433 let result = (&mut receiver).await;
434 assert_eq!(result, "Mutable");
435 }
436
437 #[tokio::test]
438 async fn test_oneshot_immediate_await() {
439 let (sender, receiver) = Sender::<Vec<u8>>::new();
440
441 // Immediate send (fast path)
442 sender.send(vec![1, 2, 3]).unwrap();
443
444 // Direct await
445 let result = receiver.await;
446 assert_eq!(result, vec![1, 2, 3]);
447 }
448
449 #[tokio::test]
450 async fn test_oneshot_try_recv() {
451 let (sender, mut receiver) = Sender::<i32>::new();
452
453 // Try receive before sending
454 assert_eq!(receiver.try_recv(), None);
455
456 // Send value
457 sender.send(42).unwrap();
458
459 // Try receive after sending
460 assert_eq!(receiver.try_recv(), Some(42));
461 }
462
463 #[tokio::test]
464 async fn test_oneshot_large_data() {
465 let (sender, receiver) = Sender::<Vec<u8>>::new();
466
467 let large_vec = vec![0u8; 1024 * 1024]; // 1MB
468
469 tokio::spawn(async move {
470 sender.send(large_vec).unwrap();
471 });
472
473 let result = receiver.await;
474 assert_eq!(result.len(), 1024 * 1024);
475 }
476}