fluxion_stream/
lib.rs

1// Copyright 2025 Umberto Gotti <umberto.gotti@umbertogotti.dev>
2// Licensed under the Apache License, Version 2.0
3// http://www.apache.org/licenses/LICENSE-2.0
4
5//! Stream operators with temporal ordering guarantees.
6//!
7//! This crate provides reactive stream combinators that maintain temporal ordering
8//! across asynchronous operations. All operators work with types implementing the
9//! [`Ordered`] trait, which provides an ordering value for correct temporal sequencing.
10//!
11//! # Architecture
12//!
13//! The crate is built around several key concepts:
14//!
15//! - **[`FluxionStream`]**: A wrapper around any `Stream` that provides access to all operators
16//! - **[`Ordered`] trait**: Types must have an intrinsic ordering (timestamp, sequence number, etc.)
17//! - **Extension traits**: Each operator is provided via an extension trait for composability
18//! - **Temporal correctness**: All operators respect the ordering of items across streams
19//!
20//! ## Operator Categories
21//!
22//! ### Combination Operators
23//!
24//! - **[`combine_latest`](CombineLatestExt::combine_latest)**: Emits when any stream emits, combining latest values from all streams
25//! - **[`with_latest_from`](WithLatestFromExt::with_latest_from)**: Samples secondary streams only when primary emits
26//! - **[`ordered_merge`](OrderedStreamExt::ordered_merge)**: Merges multiple streams preserving temporal order
27//!
28//! ### Filtering Operators
29//!
30//! - **[`emit_when`](EmitWhenExt::emit_when)**: Gates source emissions based on filter stream conditions
31//! - **[`take_latest_when`](TakeLatestWhenExt::take_latest_when)**: Samples source when filter condition is met
32//! - **[`take_while_with`](TakeWhileExt::take_while_with)**: Emits while condition holds, terminates when false
33//!
34//! ### Transformation Operators
35//!
36//! - **[`combine_with_previous`](CombineWithPreviousExt::combine_with_previous)**: Pairs each value with previous value
37//!
38//! # Temporal Ordering Explained
39//!
40//! All operators in this crate maintain **temporal ordering** - items are processed in the
41//! order of their intrinsic ordering value, not the order they arrive at the operator.
42//!
43//! ## How It Works
44//!
45//! When multiple streams are combined:
46//!
47//! 1. Each stream item must implement [`Ordered`], providing a comparable ordering value
48//! 2. Operators use [`ordered_merge`](OrderedStreamExt::ordered_merge) internally to sequence items
49//! 3. Items are buffered and emitted in order of their ordering value
50//! 4. Late-arriving items with earlier ordering are placed correctly in the sequence
51//!
52//! ## Example: Out-of-Order Delivery
53//!
54//! ```
55//! use fluxion_stream::{FluxionStream, OrderedStreamExt};
56//! use fluxion_test_utils::Sequenced;
57//! use futures::StreamExt;
58//!
59//! # #[tokio::main]
60//! # async fn main() {
61//! let (tx1, rx1) = tokio::sync::mpsc::unbounded_channel();
62//! let (tx2, rx2) = tokio::sync::mpsc::unbounded_channel();
63//!
64//! let stream1 = FluxionStream::from_unbounded_receiver(rx1);
65//! let stream2 = FluxionStream::from_unbounded_receiver(rx2);
66//!
67//! let merged = stream1.ordered_merge(vec![stream2]);
68//! let mut merged = merged;
69//!
70//! // Send out of order - stream2 sends seq=1, stream1 sends seq=2
71//! tx2.send(Sequenced::with_sequence(100, 1)).unwrap();
72//! tx1.send(Sequenced::with_sequence(200, 2)).unwrap();
73//!
74//! // Items are emitted in temporal order (seq 1, then seq 2)
75//! let first = merged.next().await.unwrap().unwrap();
76//! assert_eq!(first.value, 100); // seq=1 arrives first despite being sent second
77//! # }
78//! ```
79//!
80//! # Operator Selection Guide
81//!
82//! Choose the right operator for your use case:
83//!
84//! ## When You Need Combined State
85//!
86//! | Operator | Use When | Triggers On | Example Use Case |
87//! |----------|----------|-------------|------------------|
88//! | [`combine_latest`] | You need latest from all streams | Any stream emits | Dashboard combining multiple data sources |
89//! | [`with_latest_from`] | You have primary + context streams | Primary emits only | User clicks enriched with latest config |
90//!
91//! ## When You Need All Items
92//!
93//! | Operator | Use When | Output | Example Use Case |
94//! |----------|----------|--------|------------------|
95//! | [`ordered_merge`] | Merge multiple sources in order | Every item from all streams | Event log from multiple services |
96//! | [`combine_with_previous`] | Compare consecutive items | Pairs of (previous, current) | Detecting value changes |
97//!
98//! ## When You Need Conditional Emission
99//!
100//! | Operator | Use When | Behavior | Example Use Case |
101//! |----------|----------|----------|------------------|
102//! | [`emit_when`] | Gate by condition | Emits source when filter is true | Send notifications only when enabled |
103//! | [`take_latest_when`] | Sample on condition | Emits latest source when filter triggers | Sample sensor on button press |
104//! | [`take_while_with`] | Stop on condition | Emits until condition false, then stops | Process until timeout |
105//!
106//! # Performance Characteristics
107//!
108//! ## Memory Usage
109//!
110//! - **[`combine_latest`]**: $O(n)$ - stores one latest value per stream
111//! - **[`ordered_merge`]**: $O(k)$ - buffers items until ordering confirmed ($k$ = buffer size)
112//! - **[`with_latest_from`]**: $O(n)$ - stores one value per secondary stream
113//! - **[`combine_with_previous`]**: $O(1)$ - stores only previous value
114//!
115//! ## Latency Considerations
116//!
117//! - **Ordered operators**: May buffer items waiting for earlier-ordered items
118//! - **Unordered operators**: Process items immediately as they arrive
119//! - **Combining operators**: Wait for all streams to emit at least once before first emission
120//!
121//! ## Throughput
122//!
123//! All operators use lock-free or minimally-locked designs:
124//!
125//! - Single mutex per operator (not per item)
126//! - No blocking operations in hot paths
127//! - Efficient polling with `futures::StreamExt`
128//!
129//! # Return Type Patterns
130//!
131//! Fluxion operators use three different return type patterns, each chosen for specific
132//! reasons related to type erasure, composability, and performance.
133//!
134//! ## Pattern 1: `impl Stream<Item = T>`
135//!
136//! **When used:** Lightweight operators with simple transformations
137//!
138//! **Examples:**
139//! - [`ordered_merge`](OrderedStreamExt::ordered_merge)
140//! - [`map_ordered`](FluxionStream::map_ordered)
141//! - [`filter_ordered`](FluxionStream::filter_ordered)
142//!
143//! **Benefits:**
144//! - Zero-cost abstraction (no boxing)
145//! - Compiler can fully optimize the stream pipeline
146//! - Type information preserved for further optimizations
147//!
148//! **Tradeoffs:**
149//! - Concrete type exposed in signatures (can be complex)
150//! - May increase compile times for deeply nested operators
151//!
152//! ## Pattern 2: `FluxionStream<impl Stream<Item = T>>`
153//!
154//! **When used:** Operators that should compose with other FluxionStream methods
155//!
156//! **Examples:**
157//! - [`combine_with_previous`](CombineWithPreviousExt::combine_with_previous)
158//! - [`with_latest_from`](WithLatestFromExt::with_latest_from)
159//! - [`combine_latest`](CombineLatestExt::combine_latest)
160//!
161//! **Benefits:**
162//! - Enables method chaining with `FluxionStream` convenience methods
163//! - Still zero-cost (no boxing)
164//! - Provides consistent API surface
165//!
166//! **Use cases:**
167//! - When users are likely to chain multiple operators
168//! - When the operator produces a complex transformed type
169//!
170//! ## Pattern 3: `Pin<Box<dyn Stream<Item = T>>>`
171//!
172//! **When used:** Operators with dynamic dispatch requirements or complex internal state
173//!
174//! **Examples:**
175//! - [`emit_when`](EmitWhenExt::emit_when)
176//! - [`take_latest_when`](TakeLatestWhenExt::take_latest_when)
177//! - [`take_while_with`](TakeWhileExt::take_while_with)
178//!
179//! **Benefits:**
180//! - Type erasure simplifies signatures
181//! - Reduces compile time for complex operator chains
182//! - Hides internal implementation details
183//!
184//! **Tradeoffs:**
185//! - Heap allocation (small overhead)
186//! - Dynamic dispatch prevents some optimizations
187//! - Runtime cost typically negligible compared to async operations
188//!
189//! **Why used for these operators:**
190//! These operators maintain internal state machines with multiple branches and complex
191//! lifetime requirements. Type erasure keeps the public API simple while allowing
192//! internal flexibility.
193//!
194//! ## Choosing the Right Pattern
195//!
196//! As a user, you typically don't need to worry about these patterns - all three compose
197//! seamlessly. For example, combining different operators in a single chain works naturally
198//! regardless of their internal implementation patterns. Each operator returns either
199//! `impl Stream` or `FluxionStream<impl Stream>`, and they compose transparently.
200//!
201//! The patterns are implementation details chosen to balance performance, ergonomics,
202//! and maintainability.
203//!
204//! # Common Patterns
205//!
206//! ## Pattern: Enriching Events with Context
207//!
208//! Uses [`with_latest_from`](WithLatestFromExt::with_latest_from) to combine a primary stream
209//! with context from a secondary stream.
210//!
211//! ```rust
212//! use fluxion_stream::{FluxionStream, WithLatestFromExt};
213//! use fluxion_test_utils::Sequenced;
214//! use fluxion_core::Ordered;
215//! use futures::StreamExt;
216//!
217//! # async fn example() {
218//! // User clicks enriched with latest configuration
219//! let (click_tx, click_rx) = tokio::sync::mpsc::unbounded_channel();
220//! let (config_tx, config_rx) = tokio::sync::mpsc::unbounded_channel();
221//!
222//! let clicks = FluxionStream::from_unbounded_receiver(click_rx);
223//! let configs = FluxionStream::from_unbounded_receiver(config_rx);
224//!
225//! let enriched = clicks.with_latest_from(
226//!     configs,
227//!     |state| state.values().clone()
228//! );
229//! let mut enriched = enriched;
230//!
231//! // Send config first, then click
232//! config_tx.send(Sequenced::with_sequence("theme=dark".to_string(), 1)).unwrap();
233//! click_tx.send(Sequenced::with_sequence("button1".to_string(), 2)).unwrap();
234//!
235//! let result = enriched.next().await.unwrap().unwrap();
236//! assert_eq!(result.get().len(), 2); // Has both click and config
237//! # }
238//! ```
239//!
240//! ## Pattern: Merging Multiple Event Sources
241//!
242//! Uses [`ordered_merge`](OrderedStreamExt::ordered_merge) to combine logs from multiple
243//! services in temporal order.
244//!
245//! ```rust
246//! use fluxion_stream::{FluxionStream, OrderedStreamExt};
247//! use fluxion_test_utils::Sequenced;
248//! use futures::StreamExt;
249//!
250//! # async fn example() {
251//! // Combine logs from multiple services in temporal order
252//! let (service1_tx, service1_rx) = tokio::sync::mpsc::unbounded_channel();
253//! let (service2_tx, service2_rx) = tokio::sync::mpsc::unbounded_channel();
254//!
255//! let service1 = FluxionStream::from_unbounded_receiver(service1_rx);
256//! let service2 = FluxionStream::from_unbounded_receiver(service2_rx);
257//!
258//! let unified_log = service1.ordered_merge(vec![service2]);
259//! let mut unified_log = unified_log;
260//!
261//! // Send logs with different timestamps
262//! service1_tx.send(Sequenced::with_sequence("service1: started".to_string(), 1)).unwrap();
263//! service2_tx.send(Sequenced::with_sequence("service2: ready".to_string(), 2)).unwrap();
264//!
265//! let first = unified_log.next().await.unwrap().unwrap();
266//! assert_eq!(first.value, "service1: started");
267//! # }
268//! ```
269//!
270//! ## Pattern: Change Detection
271//!
272//! Uses [`combine_with_previous`](CombineWithPreviousExt::combine_with_previous) to detect
273//! when values change by comparing with the previous value.
274//!
275//! ```rust
276//! use fluxion_stream::{FluxionStream, CombineWithPreviousExt};
277//! use fluxion_test_utils::Sequenced;
278//! use fluxion_core::Ordered;
279//! use futures::StreamExt;
280//!
281//! # async fn example() {
282//! let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
283//! let stream = FluxionStream::from_unbounded_receiver(rx);
284//!
285//! // Pair each value with its previous value
286//! let paired = stream.combine_with_previous();
287//! let mut paired = paired;
288//!
289//! // Send values
290//! tx.send(Sequenced::with_sequence(1, 1)).unwrap();
291//! tx.send(Sequenced::with_sequence(1, 2)).unwrap(); // Same value
292//! tx.send(Sequenced::with_sequence(2, 3)).unwrap(); // Changed!
293//!
294//! let result = paired.next().await.unwrap().unwrap();
295//! assert!(result.previous.is_none()); // First has no previous
296//!
297//! let result = paired.next().await.unwrap().unwrap();
298//! let changed = result.previous.as_ref().unwrap().value != result.current.value;
299//! assert!(!changed); // 1 == 1, no change
300//!
301//! let result = paired.next().await.unwrap().unwrap();
302//! let changed = result.previous.as_ref().unwrap().value != result.current.value;
303//! assert!(changed); // 1 != 2, changed!
304//! # }
305//! ```
306//!
307//! ## Pattern: Conditional Processing
308//!
309//! Uses [`emit_when`](EmitWhenExt::emit_when) to gate emissions based on a filter stream,
310//! only emitting when the condition is satisfied.
311//!
312//! ```rust
313//! use fluxion_stream::{FluxionStream, EmitWhenExt};
314//! use fluxion_test_utils::Sequenced;
315//! use fluxion_core::Ordered;
316//! use futures::StreamExt;
317//!
318//! # async fn example() {
319//! // Send notifications only when enabled
320//! let (event_tx, event_rx) = tokio::sync::mpsc::unbounded_channel();
321//! let (enabled_tx, enabled_rx) = tokio::sync::mpsc::unbounded_channel();
322//!
323//! let events = FluxionStream::from_unbounded_receiver(event_rx);
324//! let enabled = FluxionStream::from_unbounded_receiver(enabled_rx);
325//!
326//! let notifications = events.emit_when(
327//!     enabled,
328//!     |state| state.values().get(1).map(|v| *v > 0).unwrap_or(false)
329//! );
330//! let mut notifications = notifications;
331//!
332//! // Enable notifications
333//! enabled_tx.send(Sequenced::with_sequence(1, 1)).unwrap();
334//! // Send event
335//! event_tx.send(Sequenced::with_sequence(999, 2)).unwrap();
336//!
337//! let result = notifications.next().await.unwrap().unwrap();
338//! assert_eq!(*result.get(), 999);
339//! # }
340//! ```
341//!
342//! # Anti-Patterns
343//!
344//! ## ❌ Don't: Use `ordered_merge` When Order Doesn't Matter
345//!
346//! ```text
347//! // BAD: Ordering overhead when you don't need it
348//! let merged = stream1.ordered_merge(vec![stream2]);
349//! ```
350//!
351//! Use standard futures combinators instead:
352//!
353//! ```text
354//! // GOOD: Use futures::stream::select for unordered merging
355//! use futures::stream::select;
356//! let merged = select(stream1, stream2);
357//! ```
358//!
359//! ## ❌ Don't: Use `combine_latest` for All Items
360//!
361//! ```text
362//! // BAD: combine_latest only emits latest, loses intermediate values
363//! let combined = stream1.combine_latest(vec![stream2], |_| true);
364//! ```
365//!
366//! Use `ordered_merge` to get all items:
367//!
368//! ```text
369//! // GOOD: ordered_merge emits every item
370//! let merged = stream1.ordered_merge(vec![stream2]);
371//! ```
372//!
373//! ## ❌ Don't: Complex Filter Logic in Operators
374//!
375//! ```text
376//! // BAD: Complex business logic in filter predicate
377//! stream1.combine_latest(vec![stream2], |state| {
378//!     // 50 lines of complex filtering logic...
379//! });
380//! ```
381//!
382//! Extract to well-tested function:
383//!
384//! ```text
385//! // GOOD: Testable, reusable filter logic
386//! fn should_emit(state: &CombinedState<i32>) -> bool {
387//!     // Clear, testable logic
388//!     state.values().iter().all(|&v| v > 0)
389//! }
390//!
391//! stream1.combine_latest(vec![stream2], should_emit);
392//! ```
393//!
394//! # Operator Chaining
395//!
396//! Fluxion operators are designed to be composed together, creating sophisticated
397//! data flows from simple building blocks. The key to successful chaining is
398//! understanding how each operator transforms the stream.
399//!
400//! ## Basic Chaining Pattern
401//!
402//! ```rust
403//! use fluxion_stream::{FluxionStream, CombineWithPreviousExt, TakeLatestWhenExt, Ordered};
404//! use fluxion_test_utils::{Sequenced, test_channel};
405//! use futures::StreamExt;
406//!
407//! async fn example() {
408//! let (source_tx, source_stream) = test_channel::<Sequenced<i32>>();
409//! let (filter_tx, filter_stream) = test_channel::<Sequenced<i32>>();
410//!
411//! // Chain: sample when filter emits, then pair with previous value
412//! let sampled = source_stream.take_latest_when(filter_stream, |_| true);
413//! let mut composed = FluxionStream::new(sampled).combine_with_previous();
414//!
415//! source_tx.send(Sequenced::new(42)).unwrap();
416//! filter_tx.send(Sequenced::new(1)).unwrap();
417//!
418//! let item = composed.next().await.unwrap().unwrap();
419//! assert!(item.previous.is_none());
420//! assert_eq!(item.current.get(), &42);
421//! }
422//! ```
423//!
424//! ## Chaining with Transformation
425//!
426//! Use [`map_ordered`] and [`filter_ordered`] to transform streams while preserving
427//! temporal ordering:
428//!
429//! ```rust
430//! use fluxion_stream::{FluxionStream, Ordered};
431//! use fluxion_test_utils::{Sequenced, test_channel};
432//! use futures::StreamExt;
433//!
434//! async fn example() {
435//! let (tx, stream) = test_channel::<Sequenced<i32>>();
436//!
437//! // Chain: filter positives, map to string
438//! let mut composed = FluxionStream::new(stream)
439//!     .filter_ordered(|n| *n > 0)
440//!     .map_ordered(|sequenced| format!("Value: {}", sequenced.get()));
441//!
442//! tx.send(Sequenced::new(-1)).unwrap();
443//! tx.send(Sequenced::new(5)).unwrap();
444//!
445//! let result = composed.next().await.unwrap().unwrap();
446//! assert_eq!(result, "Value: 5");
447//! }
448//! ```
449//!
450//! ## Multi-Stream Chaining
451//!
452//! Combine multiple streams and then process the result:
453//!
454//! ```rust
455//! use fluxion_stream::{FluxionStream, CombineLatestExt, CombineWithPreviousExt, Ordered};
456//! use fluxion_test_utils::{Sequenced, test_channel};
457//! use futures::StreamExt;
458//!
459//! async fn example() {
460//! let (tx1, stream1) = test_channel::<Sequenced<i32>>();
461//! let (tx2, stream2) = test_channel::<Sequenced<i32>>();
462//!
463//! // Chain: combine latest from both streams, then track changes
464//! let mut composed = stream1
465//!     .combine_latest(vec![stream2], |_| true)
466//!     .combine_with_previous();
467//!
468//! tx1.send(Sequenced::new(1)).unwrap();
469//! tx2.send(Sequenced::new(2)).unwrap();
470//!
471//! let item = composed.next().await.unwrap().unwrap();
472//! assert!(item.previous.is_none());
473//! assert_eq!(item.current.get().values().len(), 2);
474//! }
475//! ```
476//!
477//! ## Key Principles for Chaining
478//!
479//! 1. **Use `map_ordered` and `filter_ordered`**: These preserve the `FluxionStream` wrapper
480//!    and maintain temporal ordering guarantees
481//! 2. **Order matters**: `combine_with_previous().filter_ordered()` is different from
482//!    `filter_ordered().combine_with_previous()`
483//! 3. **Type awareness**: Each operator changes the item type - track what flows through
484//!    the chain
485//! 4. **Test incrementally**: Build complex chains step by step, testing each addition
486//!
487//! ## Advanced Composition Examples
488//!
489//! ### 1. Ordered Merge → Combine With Previous
490//!
491//! Merge multiple streams in temporal order, then track consecutive values:
492//!
493//! ```rust
494//! use fluxion_stream::{FluxionStream, OrderedStreamExt, CombineWithPreviousExt, Ordered};
495//! use fluxion_test_utils::{Sequenced, test_channel};
496//! use futures::StreamExt;
497//!
498//! async fn example() {
499//! let (tx1, stream1) = test_channel::<Sequenced<i32>>();
500//! let (tx2, stream2) = test_channel::<Sequenced<i32>>();
501//!
502//! // Merge streams in temporal order, then pair consecutive values
503//! let mut composed = stream1
504//!     .ordered_merge(vec![FluxionStream::new(stream2)])
505//!     .combine_with_previous();
506//!
507//! tx1.send(Sequenced::new(1)).unwrap();
508//! let item = composed.next().await.unwrap().unwrap();
509//! assert!(item.previous.is_none());
510//! assert_eq!(item.current.get(), &1);
511//!
512//! tx2.send(Sequenced::new(2)).unwrap();
513//! let item = composed.next().await.unwrap().unwrap();
514//! assert_eq!(item.previous.unwrap().get(), &1);
515//! assert_eq!(item.current.get(), &2);
516//! }
517//! ```
518//!
519//! ### 2. Combine Latest → Combine With Previous
520//!
521//! Combine latest values from multiple streams, then track state changes:
522//!
523//! ```rust
524//! use fluxion_stream::{FluxionStream, CombineLatestExt, CombineWithPreviousExt, Ordered};
525//! use fluxion_test_utils::{Sequenced, test_channel};
526//! use futures::StreamExt;
527//!
528//! async fn example() {
529//! let (tx1, stream1) = test_channel::<Sequenced<i32>>();
530//! let (tx2, stream2) = test_channel::<Sequenced<i32>>();
531//!
532//! // Combine latest, then track previous combined state
533//! let mut composed = stream1
534//!     .combine_latest(vec![stream2], |_| true)
535//!     .combine_with_previous();
536//!
537//! tx1.send(Sequenced::new(1)).unwrap();
538//! tx2.send(Sequenced::new(2)).unwrap();
539//!
540//! let item = composed.next().await.unwrap().unwrap();
541//! assert!(item.previous.is_none());
542//! assert_eq!(item.current.get().values().len(), 2);
543//!
544//! tx1.send(Sequenced::new(3)).unwrap();
545//! let item = composed.next().await.unwrap().unwrap();
546//! // Previous state had [1, 2], current has [3, 2]
547//! assert!(item.previous.is_some());
548//! }
549//! ```
550//!
551//! ### 3. Combine Latest → Take While With
552//!
553//! Combine streams and continue only while a condition holds:
554//!
555//! ```rust
556//! use fluxion_stream::{FluxionStream, CombineLatestExt, TakeWhileExt, Ordered};
557//! use fluxion_test_utils::{Sequenced, test_channel};
558//! use futures::StreamExt;
559//!
560//! async fn example() {
561//! let (tx1, stream1) = test_channel::<Sequenced<i32>>();
562//! let (tx2, stream2) = test_channel::<Sequenced<i32>>();
563//! let (filter_tx, filter_stream) = test_channel::<Sequenced<bool>>();
564//!
565//! // Combine latest values, but stop when filter becomes false
566//! let mut composed = stream1
567//!     .combine_latest(vec![stream2], |_| true)
568//!     .take_while_with(filter_stream, |f| *f);
569//!
570//! filter_tx.send(Sequenced::new(true)).unwrap();
571//! tx1.send(Sequenced::new(1)).unwrap();
572//! tx2.send(Sequenced::new(2)).unwrap();
573//!
574//! let combined = composed.next().await.unwrap().unwrap();
575//! assert_eq!(combined.get().values().len(), 2);
576//! }
577//! ```
578//!
579//! ### 4. Ordered Merge → Take While With
580//!
581//! Merge streams in order and terminate based on external condition:
582//!
583//! ```rust
584//! use fluxion_stream::{FluxionStream, OrderedStreamExt, TakeWhileExt};
585//! use fluxion_test_utils::{Sequenced, test_channel};
586//! use futures::StreamExt;
587//!
588//! async fn example() {
589//! let (tx1, stream1) = test_channel::<Sequenced<i32>>();
590//! let (tx2, stream2) = test_channel::<Sequenced<i32>>();
591//! let (filter_tx, filter_stream) = test_channel::<Sequenced<bool>>();
592//!
593//! // Merge all values in order, but stop when filter says so
594//! let mut composed = stream1
595//!     .ordered_merge(vec![FluxionStream::new(stream2)])
596//!     .take_while_with(filter_stream, |f| *f);
597//!
598//! filter_tx.send(Sequenced::new(true)).unwrap();
599//! tx1.send(Sequenced::new(1)).unwrap();
600//!
601//! let item = composed.next().await.unwrap().unwrap().get().clone();
602//! assert_eq!(item, 1);
603//!
604//! tx2.send(Sequenced::new(2)).unwrap();
605//! let item = composed.next().await.unwrap().unwrap().get().clone();
606//! assert_eq!(item, 2);
607//! }
608//! ```
609//!
610//! ### 5. Take Latest When → Combine With Previous
611//!
612//! Sample latest value on trigger, then pair with previous sampled value:
613//!
614//! ```rust
615//! use fluxion_stream::{FluxionStream, TakeLatestWhenExt, CombineWithPreviousExt, Ordered};
616//! use fluxion_test_utils::{Sequenced, test_channel};
617//! use futures::StreamExt;
618//!
619//! async fn example() {
620//! let (source_tx, source_stream) = test_channel::<Sequenced<i32>>();
621//! let (filter_tx, filter_stream) = test_channel::<Sequenced<i32>>();
622//!
623//! // Sample source when filter emits, then track consecutive samples
624//! let sampled = source_stream.take_latest_when(filter_stream, |_| true);
625//! let mut composed = FluxionStream::new(sampled).combine_with_previous();
626//!
627//! source_tx.send(Sequenced::new(42)).unwrap();
628//! filter_tx.send(Sequenced::new(0)).unwrap();
629//!
630//! let item = composed.next().await.unwrap().unwrap();
631//! assert!(item.previous.is_none());
632//! assert_eq!(item.current.get(), &42);
633//!
634//! source_tx.send(Sequenced::new(99)).unwrap();
635//! let item = composed.next().await.unwrap().unwrap();
636//! assert_eq!(item.previous.unwrap().get(), &42);
637//! assert_eq!(item.current.get(), &99);
638//! }
639//! ```
640//!
641//! These patterns demonstrate how Fluxion operators compose to create sophisticated
642//! data flows. See the composition tests in the source repository for more examples.
643//!
644//! [`map_ordered`]: fluxion_stream::FluxionStream::map_ordered
645//! [`filter_ordered`]: fluxion_stream::FluxionStream::filter_ordered
646//!
647//! # Getting Started
648//!
649//! Add to your `Cargo.toml`:
650//!
651//! ```toml
652//! [dependencies]
653//! fluxion-stream = { path = "../fluxion-stream" }
654//! tokio = { version = "1.48", features = ["sync", "rt"] }
655//! futures = "0.3"
656//! ```
657//!
658//! See individual operator documentation for detailed examples.
659//!
660//! [`combine_latest`]: CombineLatestExt::combine_latest
661//! [`with_latest_from`]: WithLatestFromExt::with_latest_from
662//! [`ordered_merge`]: OrderedStreamExt::ordered_merge
663//! [`emit_when`]: EmitWhenExt::emit_when
664//! [`take_latest_when`]: TakeLatestWhenExt::take_latest_when
665//! [`take_while_with`]: TakeWhileExt::take_while_with
666//! [`combine_with_previous`]: CombineWithPreviousExt::combine_with_previous
667
668#![allow(clippy::multiple_crate_versions, clippy::doc_markdown)]
669#[macro_use]
670mod logging;
671pub mod combine_latest;
672pub mod combine_with_previous;
673pub mod emit_when;
674pub mod fluxion_stream;
675pub mod ordered_merge;
676pub mod take_latest_when;
677pub mod take_while_with;
678pub mod types;
679pub mod with_latest_from;
680
681// Re-export commonly used types
682pub use combine_latest::CombineLatestExt;
683pub use combine_with_previous::CombineWithPreviousExt;
684pub use emit_when::EmitWhenExt;
685pub use fluxion_core::{Ordered, OrderedWrapper};
686pub use fluxion_stream::FluxionStream;
687pub use ordered_merge::OrderedStreamExt;
688pub use take_latest_when::TakeLatestWhenExt;
689pub use take_while_with::TakeWhileExt;
690pub use types::{
691    CombinedState, OrderedInner, OrderedInnerUnwrapped, OrderedStreamItem, WithPrevious,
692};
693pub use with_latest_from::WithLatestFromExt;