1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
//! # Crossfire
//!
//! High-performance lockless spsc/mpsc/mpmc channels, algorithm derives crossbeam with improvements.
//!
//! It supports async contexts and bridges the gap between async and blocking contexts.
//!
//! For the concept, please refer to the [wiki](https://github.com/frostyplanet/crossfire-rs/wiki).
//!
//! ## Version history
//!
//! * v1.0: Used in production since 2022.12.
//!
//! * v2.0: [2025.6] Refactored the codebase and API
//! by removing generic types from the ChannelShared type, which made it easier to code with.
//!
//! * v2.1: [2025.9] Removed the dependency on crossbeam-channel
//! and implemented with [a modified version of crossbeam-queue](https://github.com/frostyplanet/crossfire-rs/wiki/crossbeam-related),
//! brings 2x performance improvements for both async and blocking contexts.
//!
//! * v3.0: [2026.1] Refactored API back to generic flavor interface, added [select].
//! Dedicated optimization: Bounded SPSC +70%, MPSC +30%, one-size +20%.
//! Eliminate enum dispatch cost, async performance improved for another 33%. Checkout [compat] for migiration from v2.x.
//!
//! ## Test status
//!
//! Refer to the [README](https://github.com/frostyplanet/crossfire-rs?tab=readme-ov-file#test-status) page for known issue on specified platform and runtime.
//!
//! ## Performance
//!
//! Being a lockless channel, crossfire outperforms other async-capable channels.
//! And thanks to a lighter notification mechanism, most cases in blocking context are even
//! better than the original crossbeam-channel,
//!
//! benchmark data is posted on [wiki](https://github.com/frostyplanet/crossfire-rs/wiki/benchmark-v3.0.0-2026%E2%80%9001%E2%80%9018).
//!
//! Also, being a lockless channel, the algorithm relies on spinning and yielding. Spinning is good on
//! multi-core systems, but not friendly to single-core systems (like virtual machines).
//! So we provide a function [detect_backoff_cfg()] to detect the running platform.
//! Calling it within the initialization section of your code, will get a 2x performance boost on
//! VPS.
//!
//! The benchmark is written in the criterion framework. You can run the benchmark by:
//!
//! ``` shell
//! make bench crossfire
//! make bench crossfire_select
//! ```
//!
//! ## APIs
//!
//! ### Concurrency Modules
//!
//! - [spsc], [mpsc], [mpmc]. Each has different underlying implementation
//! optimized to its concurrent model.
//! The SP or SC interface is only for non-concurrent operation. It's more memory-efficient in waker registration,
//! and has atomic ops cost reduced in the lockless algorithm.
//!
//! - [oneshot] has its special sender/receiver type because using `Tx` / `Rx` will be too heavy.
//!
//! - [select]:
//! - [Select<'a>](crate::select::Select): crossbeam-channel style type erased API, borrows receiver address and select with "token"
//! - [Multiplex](crate::select::Multiplex): Multiplex stream that owns multiple receiver, select from the same type of
//! channel flavors, for the same type of message.
//!
//! - [waitgroup]: High performance WaitGroup which allows custom threshold
//!
//! ### Flavors
//!
//! The following lockless queues are expose in [flavor] module, and each one have type alias in spsc/mpsc/mpmc:
//!
//! - `List` (which use crossbeam `SegQueue`)
//! - `Array` (which is an enum that wraps crossbeam `ArrayQueue`, and a `One` if init with size<=1)
//! - For a bounded channel, a 0 size case is not supported yet. (rewrite as 1 size).
//! - The implementation for spsc & mpsc is simplified from mpmc version.
//! - `One` (which derives from `ArrayQueue` algorithm, but have better performance in size=1
//! scenario, because it have two slots to allow reader and writer works concurrently)
//! - `Null` (See the doc [crate::null]), for cancellation purpose channel, that only wakeup on
//! closing.
//!
//! **NOTE** :
//! Although the name [Array](crate::mpmc::Array), [List](crate::mpmc::List) are the same between spsc/mpsc/mpmc module,
//! they are different type alias local to its parent module. We suggest distinguish by
//! namespace when import for use.
//!
//! ### Channel builder function
//!
//! Aside from function `bounded_*`, `unbounded_*` which specify the sender / receiver type,
//! each module has [build()](crate::mpmc::build()) and [new()](crate::mpmc::new()) function, which can apply to any channel flavors, and any async/blocking combinations.
//!
//!
//! ### Types
//!
//! <table align="center" cellpadding="20">
//! <tr> <th rowspan="2"> Context</th><th colspan="2" align="center">Sender (Producer)</th> <th colspan="2" align="center">Receiver (Consumer)</th> </tr>
//! <tr> <td>Single</td> <td>Multiple</td><td>Single</td><td>Multiple</td></tr>
//! <tr><td align="center" rowspan="2"><b>Blocking</b></td><td colspan="2" align="center"><a href="trait.BlockingTxTrait.html">BlockingTxTrait</a></td>
//! <td colspan="2" align="center"><a href="trait.BlockingRxTrait.html">BlockingRxTrait</a></td></tr>
//! <tr>
//! <td align="center"><a href="struct.Tx.html">Tx</a></td>
//! <td align="center"><a href="struct.MTx.html">MTx</a></td>
//! <td align="center"><a href="struct.Rx.html">Rx</a></td>
//! <td align="center"><a href="struct.MRx.html">MRx</a></td> </tr>
//!
//! <tr><td><b>Weak reference</b></td><td></td><td><a href="struct.WeakTx.html">WeakTx</a></td></tr>
//!
//! <tr><td align="center" rowspan="2"><b>Async</b></td>
//! <td colspan="2" align="center"><a href="trait.AsyncTxTrait.html">AsyncTxTrait</a></td>
//! <td colspan="2" align="center"><a href="trait.AsyncRxTrait.html">AsyncRxTrait</a></td></tr>
//! <tr><td><a href="struct.AsyncTx.html">AsyncTx</a></td>
//! <td><a href="struct.MAsyncTx.html">MAsyncTx</a></td><td><a href="struct.AsyncRx.html">AsyncRx</a></td>
//! <td><a href="struct.MAsyncRx.html">MAsyncRx</a></td></tr>
//! </table>
//!
//! *Safety*: For the SP / SC version, [AsyncTx], [AsyncRx], [Tx], and [Rx] are not `Clone` and without `Sync`.
//! Although can be moved to other threads, but not allowed to use send/recv while in an Arc. (Refer to the compile_fail
//! examples in the type document).
//!
//! The benefit of using the SP / SC API is completely lockless waker registration, in exchange for a performance boost.
//!
//! The sender/receiver can use the **`From`** trait to convert between blocking and async context
//! counterparts (refer to the [example](#example) below)
//!
//! ### Error types
//!
//! Error types are the same as crossbeam-channel:
//!
//! [TrySendError], [SendError], [SendTimeoutError], [TryRecvError], [RecvError], [RecvTimeoutError]
//!
//! ### Async compatibility
//!
//! Tested on tokio-1.x and async-std-1.x, crossfire is runtime-agnostic.
//!
//! The following scenarios are considered:
//!
//! * The [AsyncTx::send()] and [AsyncRx::recv()] operations are **cancellation-safe** in an async context.
//! You can safely use the select! macro and timeout() function in tokio/futures in combination with recv().
//! On cancellation, [SendFuture] and [RecvFuture] will trigger drop(), which will clean up the state of the waker,
//! making sure there is no memory-leak and deadlock.
//! But you cannot know the true result from SendFuture, since it's dropped
//! upon cancellation. Thus, we suggest using [AsyncTx::send_timeout()] instead.
//!
//! * When the "tokio" or "async_std" feature is enabled, we also provide two additional functions:
//!
//! - [send_timeout()](crate::AsyncTx::send_timeout()), which will return the message that failed to be sent in
//! [SendTimeoutError]. We guarantee the result is atomic. Alternatively, you can use
//! [send_with_timer()](crate::AsyncTx::send_with_timer()).
//!
//! - [recv_timeout()](crate::AsyncRx::recv_timeout()), we guarantee the result is atomic.
//! Alternatively, you can use [recv_with_timer()](crate::AsyncRx::recv_with_timer())
//!
//! * The waker footprint:
//!
//! When using a multi-producer and multi-consumer scenario, there's a small memory overhead to pass along a `Weak`
//! reference of wakers.
//! Because we aim to be lockless, when the sending/receiving futures are canceled (like tokio::time::timeout()),
//! it might trigger an immediate cleanup if the try-lock is successful, otherwise will rely on lazy cleanup.
//! (This won't be an issue because weak wakers will be consumed by actual message send and recv).
//! On an idle-select scenario, like a notification for close, the waker will be reused as much as possible
//! if poll() returns pending.
//!
//! * Handle written future:
//!
//! The future object created by [AsyncTx::send()], [AsyncTx::send_timeout()], [AsyncRx::recv()],
//! [AsyncRx::recv_timeout()] is `Sized`. You don't need to put them in `Box`.
//!
//! If you like to use poll function directly for complex behavior, you can call
//! [AsyncSink::poll_send()](crate::sink::AsyncSink::poll_send()) or [AsyncStream::poll_item()](crate::stream::AsyncStream::poll_item()) with Context.
//!
//! ## Usage
//!
//! Cargo.toml:
//! ```toml
//! [dependencies]
//! crossfire = "3.1"
//! ```
//!
//! ### Feature flags
//!
//! * `compat`: Enable the [compat] model, which has the same API namespace struct as V2.x
//!
//! * `tokio`: Enable [send_timeout](crate::AsyncTx::send_timeout()), [recv_timeout](crate::AsyncRx::recv_timeout()) with tokio sleep function. (conflict
//! with `async_std` feature)
//!
//! * `async_std`: Enable send_timeout, recv_timeout with async-std sleep function. (conflict
//! with `tokio` feature)
//!
//! * `trace_log`: Development mode, to enable internal log while testing or benchmark, to debug deadlock issues.
//!
//! ### Example
//!
//! blocking / async sender receiver mixed together
//!
//! ```rust
//!
//! extern crate crossfire;
//! use crossfire::*;
//! #[macro_use]
//! extern crate tokio;
//! use tokio::time::{sleep, interval, Duration};
//!
//! #[tokio::main]
//! async fn main() {
//! let (tx, rx) = mpmc::bounded_async::<usize>(100);
//! let mut recv_counter = 0;
//! let mut co_tx = Vec::new();
//! let mut co_rx = Vec::new();
//! const ROUND: usize = 1000;
//!
//! let _tx: MTx<mpmc::Array<usize>> = tx.clone().into_blocking();
//! co_tx.push(tokio::task::spawn_blocking(move || {
//! for i in 0..ROUND {
//! _tx.send(i).expect("send ok");
//! }
//! }));
//! co_tx.push(tokio::spawn(async move {
//! for i in 0..ROUND {
//! tx.send(i).await.expect("send ok");
//! }
//! }));
//! let _rx: MRx<mpmc::Array<usize>> = rx.clone().into_blocking();
//! co_rx.push(tokio::task::spawn_blocking(move || {
//! let mut count: usize = 0;
//! 'A: loop {
//! match _rx.recv() {
//! Ok(_i) => {
//! count += 1;
//! }
//! Err(_) => break 'A,
//! }
//! }
//! count
//! }));
//! co_rx.push(tokio::spawn(async move {
//! let mut count: usize = 0;
//! 'A: loop {
//! match rx.recv().await {
//! Ok(_i) => {
//! count += 1;
//! }
//! Err(_) => break 'A,
//! }
//! }
//! count
//! }));
//! for th in co_tx {
//! let _ = th.await.unwrap();
//! }
//! for th in co_rx {
//! recv_counter += th.await.unwrap();
//! }
//! assert_eq!(recv_counter, ROUND * 2);
//! }
//! ```
/// lockless queue implementation and channel flavor traits
pub use ChannelShared;
pub use detect_backoff_cfg;
pub use *;
pub use *;
pub use *;
pub use *;
pub use WeakTx;
pub use *;
/// logging macro for development
/// logging macro for development under tokio
use Flavor;
use Arc;
/// type limiter for channel builder
/// type limiter for channel builder