1use std::sync::atomic::{AtomicU8, Ordering};
6
7use super::common::{OneshotStorage, TakeResult};
8
9pub use super::common::error;
11pub use super::common::RecvError;
12pub use super::common::TryRecvError;
13
14pub trait State: Sized + Send + Sync + 'static {
86 fn to_u8(&self) -> u8;
90
91 fn from_u8(value: u8) -> Option<Self>;
99
100 fn pending_value() -> u8;
104
105 fn closed_value() -> u8;
109
110 fn receiver_closed_value() -> u8;
114}
115
116impl State for () {
120 #[inline]
121 fn to_u8(&self) -> u8 {
122 1 }
124
125 #[inline]
126 fn from_u8(value: u8) -> Option<Self> {
127 match value {
128 1 => Some(()),
129 _ => None,
130 }
131 }
132
133 #[inline]
134 fn pending_value() -> u8 {
135 0 }
137
138 #[inline]
139 fn closed_value() -> u8 {
140 255 }
142
143 #[inline]
144 fn receiver_closed_value() -> u8 {
145 254 }
147}
148
149pub struct LiteStorage<S: State> {
157 state: AtomicU8,
158 _marker: std::marker::PhantomData<S>,
159}
160
161unsafe impl<S: State> Send for LiteStorage<S> {}
162unsafe impl<S: State> Sync for LiteStorage<S> {}
163
164impl<S: State> OneshotStorage for LiteStorage<S> {
165 type Value = S;
166
167 #[inline]
168 fn new() -> Self {
169 Self {
170 state: AtomicU8::new(S::pending_value()),
171 _marker: std::marker::PhantomData,
172 }
173 }
174
175 #[inline]
176 fn store(&self, value: S) {
177 self.state.store(value.to_u8(), Ordering::Release);
178 }
179
180 #[inline]
181 fn try_take(&self) -> TakeResult<S> {
182 let current = self.state.load(Ordering::Acquire);
183
184 if current == S::closed_value() || current == S::receiver_closed_value() {
185 return TakeResult::Closed;
186 }
187
188 if current == S::pending_value() {
189 return TakeResult::Pending;
190 }
191
192 if let Some(state) = S::from_u8(current) {
194 TakeResult::Ready(state)
195 } else {
196 TakeResult::Pending
197 }
198 }
199
200 #[inline]
201 fn is_sender_dropped(&self) -> bool {
202 self.state.load(Ordering::Acquire) == S::closed_value()
203 }
204
205 #[inline]
206 fn mark_sender_dropped(&self) {
207 self.state.store(S::closed_value(), Ordering::Release);
208 }
209
210 #[inline]
211 fn is_receiver_closed(&self) -> bool {
212 self.state.load(Ordering::Acquire) == S::receiver_closed_value()
213 }
214
215 #[inline]
216 fn mark_receiver_closed(&self) {
217 self.state.store(S::receiver_closed_value(), Ordering::Release);
218 }
219}
220
221pub type Sender<S> = super::common::Sender<LiteStorage<S>>;
229
230pub type Receiver<S> = super::common::Receiver<LiteStorage<S>>;
234
235#[inline]
239pub fn channel<S: State>() -> (Sender<S>, Receiver<S>) {
240 Sender::new()
241}
242
243impl<S: State> Receiver<S> {
248 #[inline]
256 pub async fn recv(self) -> Result<S, RecvError> {
257 self.await
258 }
259
260 #[inline]
270 pub fn try_recv(&mut self) -> Result<S, TryRecvError> {
271 match self.inner.try_recv() {
272 TakeResult::Ready(v) => Ok(v),
273 TakeResult::Pending => Err(TryRecvError::Empty),
274 TakeResult::Closed => Err(TryRecvError::Closed),
275 }
276 }
277}
278
279#[cfg(test)]
280mod tests {
281 use super::*;
282
283 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
287 enum TestCompletion {
288 Called,
292
293 Cancelled,
297 }
298
299 impl State for TestCompletion {
300 fn to_u8(&self) -> u8 {
301 match self {
302 TestCompletion::Called => 1,
303 TestCompletion::Cancelled => 2,
304 }
305 }
306
307 fn from_u8(value: u8) -> Option<Self> {
308 match value {
309 1 => Some(TestCompletion::Called),
310 2 => Some(TestCompletion::Cancelled),
311 _ => None,
312 }
313 }
314
315 fn pending_value() -> u8 {
316 0
317 }
318
319 fn closed_value() -> u8 {
320 255
321 }
322
323 fn receiver_closed_value() -> u8 {
324 254
325 }
326 }
327
328 #[tokio::test]
329 async fn test_oneshot_called() {
330 let (notifier, receiver) = Sender::<TestCompletion>::new();
331
332 tokio::spawn(async move {
333 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
334 notifier.send(TestCompletion::Called).unwrap();
335 });
336
337 let result = receiver.recv().await;
338 assert_eq!(result, Ok(TestCompletion::Called));
339 }
340
341 #[tokio::test]
342 async fn test_oneshot_cancelled() {
343 let (notifier, receiver) = Sender::<TestCompletion>::new();
344
345 tokio::spawn(async move {
346 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
347 notifier.send(TestCompletion::Cancelled).unwrap();
348 });
349
350 let result = receiver.recv().await;
351 assert_eq!(result, Ok(TestCompletion::Cancelled));
352 }
353
354 #[tokio::test]
355 async fn test_oneshot_immediate_called() {
356 let (notifier, receiver) = Sender::<TestCompletion>::new();
357
358 notifier.send(TestCompletion::Called).unwrap();
360
361 let result = receiver.recv().await;
362 assert_eq!(result, Ok(TestCompletion::Called));
363 }
364
365 #[tokio::test]
366 async fn test_oneshot_immediate_cancelled() {
367 let (notifier, receiver) = Sender::<TestCompletion>::new();
368
369 notifier.send(TestCompletion::Cancelled).unwrap();
371
372 let result = receiver.recv().await;
373 assert_eq!(result, Ok(TestCompletion::Cancelled));
374 }
375
376 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
378 enum CustomState {
379 Success,
380 Failure,
381 Timeout,
382 }
383
384 impl State for CustomState {
385 fn to_u8(&self) -> u8 {
386 match self {
387 CustomState::Success => 1,
388 CustomState::Failure => 2,
389 CustomState::Timeout => 3,
390 }
391 }
392
393 fn from_u8(value: u8) -> Option<Self> {
394 match value {
395 1 => Some(CustomState::Success),
396 2 => Some(CustomState::Failure),
397 3 => Some(CustomState::Timeout),
398 _ => None,
399 }
400 }
401
402 fn pending_value() -> u8 {
403 0
404 }
405
406 fn closed_value() -> u8 {
407 255
408 }
409
410 fn receiver_closed_value() -> u8 {
411 254
412 }
413 }
414
415 #[tokio::test]
416 async fn test_oneshot_custom_state() {
417 let (notifier, receiver) = Sender::<CustomState>::new();
418
419 tokio::spawn(async move {
420 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
421 notifier.send(CustomState::Success).unwrap();
422 });
423
424 let result = receiver.recv().await;
425 assert_eq!(result, Ok(CustomState::Success));
426 }
427
428 #[tokio::test]
429 async fn test_oneshot_custom_state_timeout() {
430 let (notifier, receiver) = Sender::<CustomState>::new();
431
432 notifier.send(CustomState::Timeout).unwrap();
434
435 let result = receiver.recv().await;
436 assert_eq!(result, Ok(CustomState::Timeout));
437 }
438
439 #[tokio::test]
440 async fn test_oneshot_unit_type() {
441 let (notifier, receiver) = Sender::<()>::new();
442
443 tokio::spawn(async move {
444 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
445 notifier.send(()).unwrap();
446 });
447
448 let result = receiver.recv().await;
449 assert_eq!(result, Ok(()));
450 }
451
452 #[tokio::test]
453 async fn test_oneshot_unit_type_immediate() {
454 let (notifier, receiver) = Sender::<()>::new();
455
456 notifier.send(()).unwrap();
458
459 let result = receiver.recv().await;
460 assert_eq!(result, Ok(()));
461 }
462
463 #[tokio::test]
465 async fn test_oneshot_into_future_called() {
466 let (notifier, receiver) = Sender::<TestCompletion>::new();
467
468 tokio::spawn(async move {
469 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
470 notifier.send(TestCompletion::Called).unwrap();
471 });
472
473 let result = receiver.await;
475 assert_eq!(result, Ok(TestCompletion::Called));
476 }
477
478 #[tokio::test]
479 async fn test_oneshot_into_future_immediate() {
480 let (notifier, receiver) = Sender::<TestCompletion>::new();
481
482 notifier.send(TestCompletion::Cancelled).unwrap();
484
485 let result = receiver.await;
487 assert_eq!(result, Ok(TestCompletion::Cancelled));
488 }
489
490 #[tokio::test]
491 async fn test_oneshot_into_future_unit_type() {
492 let (notifier, receiver) = Sender::<()>::new();
493
494 tokio::spawn(async move {
495 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
496 notifier.send(()).unwrap();
497 });
498
499 let result = receiver.await;
501 assert_eq!(result, Ok(()));
502 }
503
504 #[tokio::test]
505 async fn test_oneshot_into_future_custom_state() {
506 let (notifier, receiver) = Sender::<CustomState>::new();
507
508 tokio::spawn(async move {
509 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
510 notifier.send(CustomState::Failure).unwrap();
511 });
512
513 let result = receiver.await;
515 assert_eq!(result, Ok(CustomState::Failure));
516 }
517
518 #[tokio::test]
520 async fn test_oneshot_await_mut_reference() {
521 let (notifier, mut receiver) = Sender::<TestCompletion>::new();
522
523 tokio::spawn(async move {
524 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
525 notifier.send(TestCompletion::Called).unwrap();
526 });
527
528 let result = (&mut receiver).await;
530 assert_eq!(result, Ok(TestCompletion::Called));
531 }
532
533 #[tokio::test]
534 async fn test_oneshot_await_mut_reference_unit_type() {
535 let (notifier, mut receiver) = Sender::<()>::new();
536
537 notifier.send(()).unwrap();
539
540 let result = (&mut receiver).await;
542 assert_eq!(result, Ok(()));
543 }
544
545 #[tokio::test]
547 async fn test_oneshot_try_recv_pending() {
548 let (_notifier, mut receiver) = Sender::<TestCompletion>::new();
549
550 let result = receiver.try_recv();
552 assert_eq!(result, Err(TryRecvError::Empty));
553 }
554
555 #[tokio::test]
556 async fn test_oneshot_try_recv_ready() {
557 let (notifier, mut receiver) = Sender::<TestCompletion>::new();
558
559 notifier.send(TestCompletion::Called).unwrap();
561
562 let result = receiver.try_recv();
564 assert_eq!(result, Ok(TestCompletion::Called));
565 }
566
567 #[tokio::test]
568 async fn test_oneshot_try_recv_sender_dropped() {
569 let (notifier, mut receiver) = Sender::<TestCompletion>::new();
570
571 drop(notifier);
573
574 let result = receiver.try_recv();
576 assert_eq!(result, Err(TryRecvError::Closed));
577 }
578
579 #[tokio::test]
581 async fn test_oneshot_sender_dropped_before_recv() {
582 let (notifier, receiver) = Sender::<TestCompletion>::new();
583
584 drop(notifier);
586
587 let result = receiver.recv().await;
589 assert_eq!(result, Err(RecvError));
590 }
591
592 #[tokio::test]
593 async fn test_oneshot_sender_dropped_unit_type() {
594 let (notifier, receiver) = Sender::<()>::new();
595
596 drop(notifier);
598
599 let result = receiver.recv().await;
601 assert_eq!(result, Err(RecvError));
602 }
603
604 #[tokio::test]
605 async fn test_oneshot_sender_dropped_custom_state() {
606 let (notifier, receiver) = Sender::<CustomState>::new();
607
608 drop(notifier);
610
611 let result = receiver.recv().await;
613 assert_eq!(result, Err(RecvError));
614 }
615
616 #[test]
618 fn test_sender_is_closed_initially_false() {
619 let (sender, _receiver) = Sender::<()>::new();
620 assert!(!sender.is_closed());
621 }
622
623 #[test]
624 fn test_sender_is_closed_after_receiver_drop() {
625 let (sender, receiver) = Sender::<()>::new();
626 drop(receiver);
627 assert!(sender.is_closed());
628 }
629
630 #[test]
631 fn test_sender_is_closed_after_receiver_close() {
632 let (sender, mut receiver) = Sender::<()>::new();
633 receiver.close();
634 assert!(sender.is_closed());
635 }
636
637 #[test]
639 fn test_receiver_close_prevents_send() {
640 let (sender, mut receiver) = Sender::<TestCompletion>::new();
641 receiver.close();
642
643 assert!(sender.send(TestCompletion::Called).is_err());
645 }
646
647 #[test]
649 fn test_blocking_recv_immediate() {
650 let (sender, receiver) = Sender::<TestCompletion>::new();
651
652 sender.send(TestCompletion::Called).unwrap();
654
655 let result = receiver.blocking_recv();
656 assert_eq!(result, Ok(TestCompletion::Called));
657 }
658
659 #[test]
660 fn test_blocking_recv_with_thread() {
661 let (sender, receiver) = Sender::<()>::new();
662
663 std::thread::spawn(move || {
664 std::thread::sleep(std::time::Duration::from_millis(10));
665 sender.send(()).unwrap();
666 });
667
668 let result = receiver.blocking_recv();
669 assert_eq!(result, Ok(()));
670 }
671
672 #[test]
673 fn test_blocking_recv_sender_dropped() {
674 let (sender, receiver) = Sender::<()>::new();
675
676 std::thread::spawn(move || {
677 std::thread::sleep(std::time::Duration::from_millis(10));
678 drop(sender);
679 });
680
681 let result = receiver.blocking_recv();
682 assert_eq!(result, Err(RecvError));
683 }
684}
685