crossfire/null.rs
1//! A null flavor type that use to notify thread/future to close
2//!
3//! It's common practice we use `()` channel in async code, not intended for any message, just
4//! subscribe for close event. (For example, cancelling socket operations, stopping worker loops...)
5//! This is a module designed for that, with minimized polling cost.
6//!
7//! You can initialize a null channel with [crate::mpsc::Null::new_async()] or
8//! [crate::mpmc::Null::new_async()], which return a [CloseHandle], (which can only be `clone` or `drop`,
9//! but unable to send any message), and a normal receiver type (which recv method is always
10//! blocked until the all copy of `CloseHandle` is dropped).
11//!
12//! > NOTE: using mpsc version has less cost then mpmc version.
13//!
14//! # Examples
15//!
16//! Use null channel to stop a background loop.
17//!
18//! ```rust
19//! use crossfire::{null::CloseHandle, *};
20//! use std::time::Duration;
21//!
22//! # #[tokio::main]
23//! # async fn main() {
24//! // Create a null channel
25//! let (stop_tx, stop_rx): (CloseHandle<mpmc::Null>, MAsyncRx<mpmc::Null>) = mpmc::Null::new().new_async();
26//! let (data_tx, data_rx): (MAsyncTx<mpmc::Array<i32>>, MAsyncRx<mpmc::Array<i32>>) = mpmc::bounded_async::<i32>(10);
27//!
28//! // Spawn a background task
29//! let task = tokio::spawn(async move {
30//! loop {
31//! tokio::select! {
32//! // If the null channel is closed (stop_tx dropped), this branch will be selected
33//! res = stop_rx.recv() => {
34//! if res.is_err() {
35//! println!("Stopping task");
36//! break;
37//! }
38//! }
39//! res = data_rx.recv() => {
40//! match res {
41//! Ok(data) => println!("Received data: {}", data),
42//! Err(_) => break,
43//! }
44//! }
45//! }
46//! }
47//! });
48//!
49//! data_tx.send(1).await.unwrap();
50//! tokio::time::sleep(Duration::from_millis(10)).await;
51//!
52//! // Drop the stop handle to signal the task to stop
53//! drop(stop_tx);
54//!
55//! task.await.unwrap();
56//! # }
57//! ```
58
59use crate::flavor::Flavor;
60use crate::flavor::{FlavorImpl, FlavorNew, FlavorSelect, Queue, Token};
61use crate::shared::ChannelShared;
62use crate::SenderType;
63use core::mem::MaybeUninit;
64use std::sync::Arc;
65
66/// an flavor type can never receive any message
67pub struct Null();
68
69impl Queue for Null {
70 type Item = ();
71
72 #[inline(always)]
73 fn pop(&self) -> Option<()> {
74 None
75 }
76
77 #[inline(always)]
78 fn push(&self, _item: ()) -> Result<(), ()> {
79 unreachable!();
80 }
81
82 #[inline(always)]
83 fn len(&self) -> usize {
84 0
85 }
86
87 #[inline(always)]
88 fn capacity(&self) -> Option<usize> {
89 None
90 }
91
92 #[inline(always)]
93 fn is_full(&self) -> bool {
94 true
95 }
96
97 #[inline(always)]
98 fn is_empty(&self) -> bool {
99 true
100 }
101}
102
103impl FlavorImpl for Null {
104 #[inline(always)]
105 fn try_send(&self, _item: &MaybeUninit<()>) -> bool {
106 // work as an /dev/null, although normally init with CloseHandle which don't have send() method
107 true
108 }
109
110 #[inline(always)]
111 fn try_send_oneshot(&self, _item: *const ()) -> Option<bool> {
112 Some(true)
113 }
114
115 #[inline(always)]
116 fn try_recv(&self) -> Option<Self::Item> {
117 // always empty
118 None
119 }
120
121 #[inline(always)]
122 fn try_recv_final(&self) -> Option<Self::Item> {
123 None
124 }
125
126 #[inline]
127 fn backoff_limit(&self) -> u16 {
128 0
129 }
130}
131
132impl FlavorNew for Null {
133 #[inline]
134 fn new() -> Self {
135 Self()
136 }
137}
138
139impl FlavorSelect for Null {
140 #[inline(always)]
141 fn try_select(&self, _final_check: bool) -> Option<Token> {
142 None
143 }
144
145 #[inline(always)]
146 fn read_with_token(&self, _token: Token) -> () {
147 unreachable!();
148 }
149}
150
151/// The CloseHandle is a special type for flavor [Null], only impl `Clone` and `Drop`
152pub struct CloseHandle<F: Flavor>(Arc<ChannelShared<F>>);
153
154impl<F: Flavor> Clone for CloseHandle<F> {
155 #[inline(always)]
156 fn clone(&self) -> Self {
157 self.0.add_tx();
158 Self(self.0.clone())
159 }
160}
161
162impl<F: Flavor> Drop for CloseHandle<F> {
163 #[inline(always)]
164 fn drop(&mut self) {
165 self.0.close_tx();
166 }
167}
168
169impl<F: Flavor> SenderType for CloseHandle<F>
170where
171 F: Flavor<Item = ()>,
172{
173 type Flavor = F;
174
175 #[inline(always)]
176 fn new(shared: Arc<ChannelShared<Self::Flavor>>) -> Self {
177 CloseHandle(shared)
178 }
179}