1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
// Copyright 2025 Umberto Gotti <umberto.gotti@umbertogotti.dev>
// Licensed under the Apache License, Version 2.0
// http://www.apache.org/licenses/LICENSE-2.0
//! Partition operator that splits a stream into two based on a predicate.
//!
//! The [`partition`](PartitionExt::partition) operator routes each item to one of two
//! output streams based on a predicate function. Items satisfying the predicate go to
//! the "true" stream, while others go to the "false" stream.
//!
//! # Runtime Requirements
//!
//! This operator requires one of the following runtime features:
//! - `runtime-tokio` (default)
//! - `runtime-smol`
//! - `runtime-async-std`
//! - Or compiling for `wasm32` target
//!
//! It is not available when compiling without a runtime (no_std + alloc only).
//!
//! ## Characteristics
//!
//! - **Chain-breaking**: Returns two streams, cannot chain further on the original
//! - **Spawns task**: Routing runs in a background Tokio task
//! - **Timestamp-preserving**: Original timestamps are preserved in both output streams
//! - **Routing**: Every item goes to exactly one output stream
//! - **Non-blocking**: Both streams can be consumed independently
//! - **Hot**: Uses internal subjects for broadcasting (late consumers miss items)
//! - **Error propagation**: Errors are sent to both output streams
//! - **Unbounded buffers**: Items are buffered in memory until consumed
//!
//! ## Buffer Behavior
//!
//! The partition operator uses unbounded internal channels. If one partition stream
//! is consumed slowly (or not at all), items destined for that stream will accumulate
//! in memory. This is typically fine for balanced workloads, but be aware:
//!
//! - If you only consume one partition, items for the other still buffer
//! - For high-throughput streams with imbalanced consumption, consider adding
//! backpressure mechanisms downstream
//! - Dropping one partition stream is safe; items for it are simply discarded
//!
//! ## Example
//!
//! ```rust
//! use fluxion_stream::{IntoFluxionStream, PartitionExt};
//! use fluxion_test_utils::Sequenced;
//! use futures::StreamExt;
//!
//! # async fn example() {
//! let (tx, rx) = async_channel::unbounded();
//!
//! // Partition numbers into even and odd
//! let (mut evens, mut odds) = rx.into_fluxion_stream()
//! .partition(|n: &i32| n % 2 == 0);
//!
//! tx.try_send(Sequenced::new(1)).unwrap();
//! tx.try_send(Sequenced::new(2)).unwrap();
//! tx.try_send(Sequenced::new(3)).unwrap();
//! tx.try_send(Sequenced::new(4)).unwrap();
//! drop(tx);
//!
//! // Evens: 2, 4
//! assert_eq!(evens.next().await.unwrap().unwrap().into_inner(), 2);
//! assert_eq!(evens.next().await.unwrap().unwrap().into_inner(), 4);
//!
//! // Odds: 1, 3
//! assert_eq!(odds.next().await.unwrap().unwrap().into_inner(), 1);
//! assert_eq!(odds.next().await.unwrap().unwrap().into_inner(), 3);
//! # }
//! ```
//!
//! ## Use Cases
//!
//! - **Error routing**: Separate successful values from validation failures
//! - **Priority queues**: Split high-priority and low-priority items
//! - **Type routing**: Route different enum variants to specialized handlers
//! - **Threshold filtering**: Split values above/below a threshold
// Multi-threaded runtime (tokio, smol, async-std)
pub use ;
// Single-threaded runtime (wasm32, embassy)
pub use ;