crossfire/null.rs
1//! A null flavor type that use to notify thread/future to close
2//!
3//! It's common practice we use `()` channel in async code, not intended for any message, just
4//! subscribe for close event. (For example, cancelling socket operations, stopping worker loops...)
5//! This is a module designed for that, with minimized polling cost.
6//!
7//! You can initialize a null channel with [crate::mpsc::Null::new_async()] or
8//! [crate::mpmc::Null::new_async()], which return a [CloseHandle], (which can only be `clone` or `drop`,
9//! but unable to send any message), and a normal receiver type (which recv method is always
10//! blocked until the all copy of `CloseHandle` is dropped).
11//!
12//! > NOTE: using mpsc version has less cost then mpmc version.
13//!
14//! # Examples
15//!
16//! Use null channel to stop a background loop.
17//!
18//! ```rust
19//! use crossfire::{null::CloseHandle, *};
20//! use std::time::Duration;
21//!
22//! # #[tokio::main]
23//! # async fn main() {
24//! // Create a null channel
25//! let (stop_tx, stop_rx): (CloseHandle<mpmc::Null>, MAsyncRx<mpmc::Null>) = mpmc::Null::new().new_async();
26//! let (data_tx, data_rx): (MAsyncTx<mpmc::Array<i32>>, MAsyncRx<mpmc::Array<i32>>) = mpmc::bounded_async::<i32>(10);
27//!
28//! // Spawn a background task
29//! let task = tokio::spawn(async move {
30//! loop {
31//! tokio::select! {
32//! // If the null channel is closed (stop_tx dropped), this branch will be selected
33//! res = stop_rx.recv() => {
34//! if res.is_err() {
35//! println!("Stopping task");
36//! break;
37//! }
38//! }
39//! res = data_rx.recv() => {
40//! match res {
41//! Ok(data) => println!("Received data: {}", data),
42//! Err(_) => break,
43//! }
44//! }
45//! }
46//! }
47//! });
48//!
49//! data_tx.send(1).await.unwrap();
50//! tokio::time::sleep(Duration::from_millis(10)).await;
51//!
52//! // Drop the stop handle to signal the task to stop
53//! drop(stop_tx);
54//!
55//! task.await.unwrap();
56//! # }
57//! ```
58
59use crate::flavor::{
60 Flavor, FlavorImpl, FlavorMC, FlavorMP, FlavorNew, FlavorSelect, Queue, Token,
61};
62use crate::shared::ChannelShared;
63use crate::SenderType;
64use core::mem::MaybeUninit;
65use std::sync::Arc;
66
67/// an flavor type can never receive any message
68pub struct Null();
69
70impl Queue for Null {
71 type Item = ();
72
73 #[inline(always)]
74 fn pop(&self) -> Option<()> {
75 None
76 }
77
78 #[inline(always)]
79 fn push(&self, _item: ()) -> Result<(), ()> {
80 unreachable!();
81 }
82
83 #[inline(always)]
84 fn len(&self) -> usize {
85 0
86 }
87
88 #[inline(always)]
89 fn capacity(&self) -> Option<usize> {
90 None
91 }
92
93 #[inline(always)]
94 fn is_full(&self) -> bool {
95 true
96 }
97
98 #[inline(always)]
99 fn is_empty(&self) -> bool {
100 true
101 }
102}
103
104impl FlavorImpl for Null {
105 const IS_BOUNDED: bool = false;
106
107 #[inline(always)]
108 fn try_send(&self, _item: &MaybeUninit<()>) -> bool {
109 // work as an /dev/null, although normally init with CloseHandle which don't have send() method
110 true
111 }
112
113 #[inline(always)]
114 fn try_send_oneshot(&self, _item: *const ()) -> Option<bool> {
115 Some(true)
116 }
117
118 #[inline(always)]
119 fn try_recv(&self) -> Option<Self::Item> {
120 // always empty
121 None
122 }
123
124 #[inline(always)]
125 fn try_recv_final(&self) -> Option<Self::Item> {
126 None
127 }
128
129 #[inline]
130 fn backoff_limit(&self) -> u16 {
131 0
132 }
133}
134
135impl FlavorNew for Null {
136 #[inline]
137 fn new() -> Self {
138 Self()
139 }
140}
141
142impl FlavorSelect for Null {
143 #[inline(always)]
144 fn try_select(&self, _final_check: bool) -> Option<Token> {
145 None
146 }
147
148 #[inline(always)]
149 fn read_with_token(&self, _token: Token) {
150 unreachable!();
151 }
152}
153
154impl FlavorMP for Null {}
155impl FlavorMC for Null {}
156
157/// The CloseHandle is a special type for flavor [Null], only impl `Clone` and `Drop`
158pub struct CloseHandle<F: Flavor>(Arc<ChannelShared<F>>);
159
160impl<F: Flavor> Clone for CloseHandle<F> {
161 #[inline(always)]
162 fn clone(&self) -> Self {
163 self.0.add_tx();
164 Self(self.0.clone())
165 }
166}
167
168impl<F: Flavor> Drop for CloseHandle<F> {
169 #[inline(always)]
170 fn drop(&mut self) {
171 self.0.close_tx();
172 }
173}
174
175impl<F: Flavor> SenderType for CloseHandle<F>
176where
177 F: Flavor<Item = ()>,
178{
179 type Flavor = F;
180
181 #[inline(always)]
182 fn new(shared: Arc<ChannelShared<Self::Flavor>>) -> Self {
183 CloseHandle(shared)
184 }
185}