Skip to main content

crossfire/
null.rs

1//! A null flavor type that use to notify thread/future to close
2//!
3//! It's common practice we use `()` channel in async code, not intended for any message, just
4//! subscribe for close event. (For example, cancelling socket operations, stopping worker loops...)
5//! This is a module designed for that, with minimized polling cost.
6//!
7//! You can initialize a null channel with [crate::mpsc::Null::new_async()] or
8//! [crate::mpmc::Null::new_async()], which return a [CloseHandle], (which can only be `clone` or `drop`,
9//! but unable to send any message), and a normal receiver type (which recv method is always
10//! blocked until the all copy of `CloseHandle` is dropped).
11//!
12//! > NOTE: using mpsc version has less cost then mpmc version.
13//!
14//! # Examples
15//!
16//! Use null channel to stop a background loop.
17//!
18//! ```rust
19//! use crossfire::{null::CloseHandle, *};
20//! use std::time::Duration;
21//!
22//! # #[tokio::main]
23//! # async fn main() {
24//! // Create a null channel
25//! let (stop_tx, stop_rx): (CloseHandle<mpmc::Null>, MAsyncRx<mpmc::Null>)  = mpmc::Null::new().new_async();
26//! let (data_tx, data_rx): (MAsyncTx<mpmc::Array<i32>>, MAsyncRx<mpmc::Array<i32>>) = mpmc::bounded_async::<i32>(10);
27//!
28//! // Spawn a background task
29//! let task = tokio::spawn(async move {
30//!     loop {
31//!         tokio::select! {
32//!             // If the null channel is closed (stop_tx dropped), this branch will be selected
33//!             res = stop_rx.recv() => {
34//!                 if res.is_err() {
35//!                     println!("Stopping task");
36//!                     break;
37//!                 }
38//!             }
39//!             res = data_rx.recv() => {
40//!                 match res {
41//!                     Ok(data) => println!("Received data: {}", data),
42//!                     Err(_) => break,
43//!                 }
44//!             }
45//!         }
46//!     }
47//! });
48//!
49//! data_tx.send(1).await.unwrap();
50//! tokio::time::sleep(Duration::from_millis(10)).await;
51//!
52//! // Drop the stop handle to signal the task to stop
53//! drop(stop_tx);
54//!
55//! task.await.unwrap();
56//! # }
57//! ```
58
59use crate::flavor::{
60    Flavor, FlavorImpl, FlavorMC, FlavorMP, FlavorNew, FlavorSelect, Queue, Token,
61};
62use crate::shared::ChannelShared;
63use crate::SenderType;
64use core::mem::MaybeUninit;
65use std::sync::Arc;
66
67/// an flavor type can never receive any message
68pub struct Null();
69
70impl Queue for Null {
71    type Item = ();
72
73    #[inline(always)]
74    fn pop(&self) -> Option<()> {
75        None
76    }
77
78    #[inline(always)]
79    fn push(&self, _item: ()) -> Result<(), ()> {
80        unreachable!();
81    }
82
83    #[inline(always)]
84    fn len(&self) -> usize {
85        0
86    }
87
88    #[inline(always)]
89    fn capacity(&self) -> Option<usize> {
90        None
91    }
92
93    #[inline(always)]
94    fn is_full(&self) -> bool {
95        true
96    }
97
98    #[inline(always)]
99    fn is_empty(&self) -> bool {
100        true
101    }
102}
103
104impl FlavorImpl for Null {
105    const IS_BOUNDED: bool = false;
106
107    #[inline(always)]
108    fn try_send(&self, _item: &MaybeUninit<()>) -> bool {
109        // work as an /dev/null, although normally init with CloseHandle which don't have send() method
110        true
111    }
112
113    #[inline(always)]
114    fn try_send_oneshot(&self, _item: *const ()) -> Option<bool> {
115        Some(true)
116    }
117
118    #[inline(always)]
119    fn try_recv(&self) -> Option<Self::Item> {
120        // always empty
121        None
122    }
123
124    #[inline(always)]
125    fn try_recv_final(&self) -> Option<Self::Item> {
126        None
127    }
128
129    #[inline]
130    fn backoff_limit(&self) -> u16 {
131        0
132    }
133}
134
135impl FlavorNew for Null {
136    #[inline]
137    fn new() -> Self {
138        Self()
139    }
140}
141
142impl FlavorSelect for Null {
143    #[inline(always)]
144    fn try_select(&self, _final_check: bool) -> Option<Token> {
145        None
146    }
147
148    #[inline(always)]
149    fn read_with_token(&self, _token: Token) {
150        unreachable!();
151    }
152}
153
154impl FlavorMP for Null {}
155impl FlavorMC for Null {}
156
157/// The CloseHandle is a special type for flavor [Null], only impl `Clone` and `Drop`
158pub struct CloseHandle<F: Flavor>(Arc<ChannelShared<F>>);
159
160impl<F: Flavor> Clone for CloseHandle<F> {
161    #[inline(always)]
162    fn clone(&self) -> Self {
163        self.0.add_tx();
164        Self(self.0.clone())
165    }
166}
167
168impl<F: Flavor> Drop for CloseHandle<F> {
169    #[inline(always)]
170    fn drop(&mut self) {
171        self.0.close_tx();
172    }
173}
174
175impl<F: Flavor> SenderType for CloseHandle<F>
176where
177    F: Flavor<Item = ()>,
178{
179    type Flavor = F;
180
181    #[inline(always)]
182    fn new(shared: Arc<ChannelShared<Self::Flavor>>) -> Self {
183        CloseHandle(shared)
184    }
185}