crossfire/
null.rs

1//! A null flavor type that use to notify thread/future to close
2//!
3//! It's common practice we use `()` channel in async code, not intended for any message, just
4//! subscribe for close event. (For example, cancelling socket operations, stopping worker loops...)
5//! This is a module designed for that, with minimized polling cost.
6//!
7//! You can initialize a null channel with [crate::mpsc::Null::new_async()] or
8//! [crate::mpmc::Null::new_async()], which return a [CloseHandle], (which can only be `clone` or `drop`,
9//! but unable to send any message), and a normal receiver type (which recv method is always
10//! blocked until the all copy of `CloseHandle` is dropped).
11//!
12//! > NOTE: using mpsc version has less cost then mpmc version.
13//!
14//! # Examples
15//!
16//! Use null channel to stop a background loop.
17//!
18//! ```rust
19//! use crossfire::{null::CloseHandle, *};
20//! use std::time::Duration;
21//!
22//! # #[tokio::main]
23//! # async fn main() {
24//! // Create a null channel
25//! let (stop_tx, stop_rx): (CloseHandle<mpmc::Null>, MAsyncRx<mpmc::Null>)  = mpmc::Null::new().new_async();
26//! let (data_tx, data_rx): (MAsyncTx<mpmc::Array<i32>>, MAsyncRx<mpmc::Array<i32>>) = mpmc::bounded_async::<i32>(10);
27//!
28//! // Spawn a background task
29//! let task = tokio::spawn(async move {
30//!     loop {
31//!         tokio::select! {
32//!             // If the null channel is closed (stop_tx dropped), this branch will be selected
33//!             res = stop_rx.recv() => {
34//!                 if res.is_err() {
35//!                     println!("Stopping task");
36//!                     break;
37//!                 }
38//!             }
39//!             res = data_rx.recv() => {
40//!                 match res {
41//!                     Ok(data) => println!("Received data: {}", data),
42//!                     Err(_) => break,
43//!                 }
44//!             }
45//!         }
46//!     }
47//! });
48//!
49//! data_tx.send(1).await.unwrap();
50//! tokio::time::sleep(Duration::from_millis(10)).await;
51//!
52//! // Drop the stop handle to signal the task to stop
53//! drop(stop_tx);
54//!
55//! task.await.unwrap();
56//! # }
57//! ```
58
59use crate::flavor::Flavor;
60use crate::flavor::{FlavorImpl, FlavorNew, FlavorSelect, Queue, Token};
61use crate::shared::ChannelShared;
62use crate::SenderType;
63use core::mem::MaybeUninit;
64use std::sync::Arc;
65
66/// an flavor type can never receive any message
67pub struct Null();
68
69impl Queue for Null {
70    type Item = ();
71
72    #[inline(always)]
73    fn pop(&self) -> Option<()> {
74        None
75    }
76
77    #[inline(always)]
78    fn push(&self, _item: ()) -> Result<(), ()> {
79        unreachable!();
80    }
81
82    #[inline(always)]
83    fn len(&self) -> usize {
84        0
85    }
86
87    #[inline(always)]
88    fn capacity(&self) -> Option<usize> {
89        None
90    }
91
92    #[inline(always)]
93    fn is_full(&self) -> bool {
94        true
95    }
96
97    #[inline(always)]
98    fn is_empty(&self) -> bool {
99        true
100    }
101}
102
103impl FlavorImpl for Null {
104    #[inline(always)]
105    fn try_send(&self, _item: &MaybeUninit<()>) -> bool {
106        // work as an /dev/null, although normally init with CloseHandle which don't have send() method
107        true
108    }
109
110    #[inline(always)]
111    fn try_send_oneshot(&self, _item: *const ()) -> Option<bool> {
112        Some(true)
113    }
114
115    #[inline(always)]
116    fn try_recv(&self) -> Option<Self::Item> {
117        // always empty
118        None
119    }
120
121    #[inline(always)]
122    fn try_recv_final(&self) -> Option<Self::Item> {
123        None
124    }
125
126    #[inline]
127    fn backoff_limit(&self) -> u16 {
128        0
129    }
130}
131
132impl FlavorNew for Null {
133    #[inline]
134    fn new() -> Self {
135        Self()
136    }
137}
138
139impl FlavorSelect for Null {
140    #[inline(always)]
141    fn try_select(&self, _final_check: bool) -> Option<Token> {
142        None
143    }
144
145    #[inline(always)]
146    fn read_with_token(&self, _token: Token) -> () {
147        unreachable!();
148    }
149}
150
151/// The CloseHandle is a special type for flavor [Null], only impl `Clone` and `Drop`
152pub struct CloseHandle<F: Flavor>(Arc<ChannelShared<F>>);
153
154impl<F: Flavor> Clone for CloseHandle<F> {
155    #[inline(always)]
156    fn clone(&self) -> Self {
157        self.0.add_tx();
158        Self(self.0.clone())
159    }
160}
161
162impl<F: Flavor> Drop for CloseHandle<F> {
163    #[inline(always)]
164    fn drop(&mut self) {
165        self.0.close_tx();
166    }
167}
168
169impl<F: Flavor> SenderType for CloseHandle<F>
170where
171    F: Flavor<Item = ()>,
172{
173    type Flavor = F;
174
175    #[inline(always)]
176    fn new(shared: Arc<ChannelShared<Self::Flavor>>) -> Self {
177        CloseHandle(shared)
178    }
179}