fluxion_stream/
lib.rs

1// Copyright 2025 Umberto Gotti <umberto.gotti@umbertogotti.dev>
2// Licensed under the Apache License, Version 2.0
3// http://www.apache.org/licenses/LICENSE-2.0
4
5//! Stream operators with temporal ordering guarantees.
6//!
7//! This crate provides reactive stream combinators that maintain temporal ordering
8//! across asynchronous operations. All operators work with types implementing the
9//! [`Timestamped`](fluxion_core::Timestamped) trait, which provides timestamp-based ordering for correct temporal sequencing.
10//!
11//! # Architecture
12//!
13//! The crate is built around several key concepts:
14//!
15//! - **[`FluxionStream`]**: A wrapper around any `Stream` that provides access to all operators
16//! - **[`Timestamped`](fluxion_core::Timestamped) trait**: Types must have intrinsic timestamps for temporal ordering
17//! - **Extension traits**: Each operator is provided via an extension trait for composability
18//! - **Temporal correctness**: All operators respect the timestamp ordering of items across streams
19//!
20//! ## Operator Categories
21//!
22//! ### Combination Operators
23//!
24//! - **[`combine_latest`](CombineLatestExt::combine_latest)**: Emits when any stream emits, combining latest values from all streams
25//! - **[`with_latest_from`](WithLatestFromExt::with_latest_from)**: Samples secondary streams only when primary emits
26//! - **[`ordered_merge`](OrderedStreamExt::ordered_merge)**: Merges multiple streams preserving temporal order
27//!
28//! ### Filtering Operators
29//!
30//! - **[`emit_when`](EmitWhenExt::emit_when)**: Gates source emissions based on filter stream conditions
31//! - **[`take_latest_when`](TakeLatestWhenExt::take_latest_when)**: Samples source when filter condition is met
32//! - **[`take_while_with`](TakeWhileExt::take_while_with)**: Emits while condition holds, terminates when false
33//!
34//! ### Transformation Operators
35//!
36//! - **[`combine_with_previous`](CombineWithPreviousExt::combine_with_previous)**: Pairs each value with previous value
37//!
38//! # Temporal Ordering Explained
39//!
40//! All operators in this crate maintain **temporal ordering** - items are processed in the
41//! order of their intrinsic ordering value, not the order they arrive at the operator.
42//!
43//! ## How It Works
44//!
45//! When multiple streams are combined:
46//!
47//! 1. Each stream item must implement [`Timestamped`](fluxion_core::Timestamped), providing a comparable timestamp
48//! 2. Operators use [`ordered_merge`](OrderedStreamExt::ordered_merge) internally to sequence items
49//! 3. Items are buffered and emitted in order of their timestamp
50//! 4. Late-arriving items with earlier timestamps are placed correctly in the sequence
51//!
52//! ## Example: Out-of-Order Delivery
53//!
54//! ```
55//! use fluxion_stream::{FluxionStream, OrderedStreamExt};
56//! use fluxion_test_utils::{Sequenced, helpers::unwrap_stream, test_channel};
57//!
58//! # #[tokio::main]
59//! # async fn main() {
60//! let (tx1, stream1) = test_channel::<Sequenced<i32>>();
61//! let (tx2, stream2) = test_channel::<Sequenced<i32>>();
62//!
63//! let mut merged = stream1.ordered_merge(vec![stream2]);
64//!
65//! // Send out of order - stream2 sends seq=1, stream1 sends seq=2
66//! tx2.send((100, 1).into()).unwrap();
67//! tx1.send((200, 2).into()).unwrap();
68//!
69//! // Items are emitted in temporal order (seq 1, then seq 2)
70//! let first = unwrap_stream(&mut merged, 500).await.unwrap();
71//! assert_eq!(first.value, 100); // seq=1 arrives first despite being sent second
72//! # }
73//! ```
74//!
75//! # Operator Selection Guide
76//!
77//! Choose the right operator for your use case:
78//!
79//! ## When You Need Combined State
80//!
81//! | Operator | Use When | Triggers On | Example Use Case |
82//! |----------|----------|-------------|------------------|
83//! | [`combine_latest`] | You need latest from all streams | Any stream emits | Dashboard combining multiple data sources |
84//! | [`with_latest_from`] | You have primary + context streams | Primary emits only | User clicks enriched with latest config |
85//!
86//! ## When You Need All Items
87//!
88//! | Operator | Use When | Output | Example Use Case |
89//! |----------|----------|--------|------------------|
90//! | [`ordered_merge`] | Merge multiple sources in order | Every item from all streams | Event log from multiple services |
91//! | [`combine_with_previous`] | Compare consecutive items | Pairs of (previous, current) | Detecting value changes |
92//!
93//! ## When You Need Conditional Emission
94//!
95//! | Operator | Use When | Behavior | Example Use Case |
96//! |----------|----------|----------|------------------|
97//! | [`emit_when`] | Gate by condition | Emits source when filter is true | Send notifications only when enabled |
98//! | [`take_latest_when`] | Sample on condition | Emits latest source when filter triggers | Sample sensor on button press |
99//! | [`take_while_with`] | Stop on condition | Emits until condition false, then stops | Process until timeout |
100//!
101//! # Performance Characteristics
102//!
103//! ## Memory Usage
104//!
105//! - **[`combine_latest`]**: $O(n)$ - stores one latest value per stream
106//! - **[`ordered_merge`]**: $O(k)$ - buffers items until ordering confirmed ($k$ = buffer size)
107//! - **[`with_latest_from`]**: $O(n)$ - stores one value per secondary stream
108//! - **[`combine_with_previous`]**: $O(1)$ - stores only previous value
109//!
110//! ## Latency Considerations
111//!
112//! - **Ordered operators**: May buffer items waiting for earlier-ordered items
113//! - **Unordered operators**: Process items immediately as they arrive
114//! - **Combining operators**: Wait for all streams to emit at least once before first emission
115//!
116//! ## Throughput
117//!
118//! All operators use lock-free or minimally-locked designs:
119//!
120//! - Single mutex per operator (not per item)
121//! - No blocking operations in hot paths
122//! - Efficient polling with `futures::StreamExt`
123//!
124//! # Return Type Patterns
125//!
126//! Fluxion operators use three different return type patterns, each chosen for specific
127//! reasons related to type erasure, composability, and performance.
128//!
129//! ## Pattern 1: `impl Stream<Item = T>`
130//!
131//! **When used:** Lightweight operators with simple transformations
132//!
133//! **Examples:**
134//! - [`ordered_merge`](OrderedStreamExt::ordered_merge)
135//! - [`map_ordered`](FluxionStream::map_ordered)
136//! - [`filter_ordered`](FluxionStream::filter_ordered)
137//!
138//! **Benefits:**
139//! - Zero-cost abstraction (no boxing)
140//! - Compiler can fully optimize the stream pipeline
141//! - Type information preserved for further optimizations
142//!
143//! **Tradeoffs:**
144//! - Concrete type exposed in signatures (can be complex)
145//! - May increase compile times for deeply nested operators
146//!
147//! ## Pattern 2: `FluxionStream<impl Stream<Item = T>>`
148//!
149//! **When used:** Operators that should compose with other FluxionStream methods
150//!
151//! **Examples:**
152//! - [`combine_with_previous`](CombineWithPreviousExt::combine_with_previous)
153//! - [`with_latest_from`](WithLatestFromExt::with_latest_from)
154//! - [`combine_latest`](CombineLatestExt::combine_latest)
155//!
156//! **Benefits:**
157//! - Enables method chaining with `FluxionStream` convenience methods
158//! - Still zero-cost (no boxing)
159//! - Provides consistent API surface
160//!
161//! **Use cases:**
162//! - When users are likely to chain multiple operators
163//! - When the operator produces a complex transformed type
164//!
165//! ## Pattern 3: `Pin<Box<dyn Stream<Item = T>>>`
166//!
167//! **When used:** Operators with dynamic dispatch requirements or complex internal state
168//!
169//! **Examples:**
170//! - [`emit_when`](EmitWhenExt::emit_when)
171//! - [`take_latest_when`](TakeLatestWhenExt::take_latest_when)
172//! - [`take_while_with`](TakeWhileExt::take_while_with)
173//!
174//! **Benefits:**
175//! - Type erasure simplifies signatures
176//! - Reduces compile time for complex operator chains
177//! - Hides internal implementation details
178//!
179//! **Tradeoffs:**
180//! - Heap allocation (small overhead)
181//! - Dynamic dispatch prevents some optimizations
182//! - Runtime cost typically negligible compared to async operations
183//!
184//! **Why used for these operators:**
185//! These operators maintain internal state machines with multiple branches and complex
186//! lifetime requirements. Type erasure keeps the public API simple while allowing
187//! internal flexibility.
188//!
189//! ## Choosing the Right Pattern
190//!
191//! As a user, you typically don't need to worry about these patterns - all three compose
192//! seamlessly. For example, combining different operators in a single chain works naturally
193//! regardless of their internal implementation patterns. Each operator returns either
194//! `impl Stream` or `FluxionStream<impl Stream>`, and they compose transparently.
195//!
196//! The patterns are implementation details chosen to balance performance, ergonomics,
197//! and maintainability.
198//!
199//! # Common Patterns
200//!
201//! ## Pattern: Enriching Events with Context
202//!
203//! Uses [`with_latest_from`](WithLatestFromExt::with_latest_from) to combine a primary stream
204//! with context from a secondary stream.
205//!
206//! ```rust
207//! use fluxion_stream::{FluxionStream, WithLatestFromExt};
208//! use fluxion_test_utils::{Sequenced, helpers::unwrap_stream, unwrap_value, test_channel};
209//! use fluxion_core::Timestamped as TimestampedTrait;
210//!
211//! # async fn example() {
212//! // User clicks enriched with latest configuration
213//! let (click_tx, clicks) = test_channel::<Sequenced<String>>();
214//! let (config_tx, configs) = test_channel::<Sequenced<String>>();
215//!
216//! let mut enriched = clicks.with_latest_from(
217//!     configs,
218//!     |state| state.clone()
219//! );
220//!
221//! // Send config first, then click
222//! config_tx.send(("theme=dark".to_string(), 1).into()).unwrap();
223//! click_tx.send(("button1".to_string(), 2).into()).unwrap();
224//!
225//! let result = unwrap_value(Some(unwrap_stream(&mut enriched, 500).await));
226//! assert_eq!(result.values().len(), 2); // Has both click and config
227//! # }
228//! ```
229//!
230//! ## Pattern: Merging Multiple Event Sources
231//!
232//! Uses [`ordered_merge`](OrderedStreamExt::ordered_merge) to combine logs from multiple
233//! services in temporal order.
234//!
235//! ```rust
236//! use fluxion_stream::{FluxionStream, OrderedStreamExt};
237//! use fluxion_test_utils::{Sequenced, helpers::unwrap_stream, unwrap_value, test_channel};
238//!
239//! # async fn example() {
240//! // Combine logs from multiple services in temporal order
241//! let (service1_tx, service1) = test_channel::<Sequenced<String>>();
242//! let (service2_tx, service2) = test_channel::<Sequenced<String>>();
243//!
244//! let mut unified_log = service1.ordered_merge(vec![service2]);
245//!
246//! // Send logs with different timestamps
247//! service1_tx.send(("service1: started".to_string(), 1).into()).unwrap();
248//! service2_tx.send(("service2: ready".to_string(), 2).into()).unwrap();
249//!
250//! let first = unwrap_value(Some(unwrap_stream(&mut unified_log, 500).await));
251//! assert_eq!(first.value, "service1: started");
252//! # }
253//! ```
254//!
255//! ## Pattern: Change Detection
256//!
257//! Uses [`combine_with_previous`](CombineWithPreviousExt::combine_with_previous) to detect
258//! when values change by comparing with the previous value.
259//!
260//! ```rust
261//! use fluxion_stream::{FluxionStream, CombineWithPreviousExt};
262//! use fluxion_test_utils::{Sequenced, helpers::unwrap_stream, unwrap_value, test_channel};
263//! use fluxion_core::Timestamped as TimestampedTrait;
264//!
265//! # async fn example() {
266//! let (tx, stream) = test_channel::<Sequenced<i32>>();
267//!
268//! // Pair each value with its previous value
269//! let mut paired = stream.combine_with_previous();
270//!
271//! // Send values
272//! tx.send((1, 1).into()).unwrap();
273//! tx.send((1, 2).into()).unwrap(); // Same value
274//! tx.send((2, 3).into()).unwrap(); // Changed!
275//!
276//! let result = unwrap_value(Some(unwrap_stream(&mut paired, 500).await));
277//! assert!(result.previous.is_none()); // First has no previous
278//!
279//! let result = unwrap_value(Some(unwrap_stream(&mut paired, 500).await));
280//! let changed = result.previous.as_ref().unwrap().value != result.current.value;
281//! assert!(!changed); // 1 == 1, no change
282//!
283//! let result = unwrap_value(Some(unwrap_stream(&mut paired, 500).await));
284//! let changed = result.previous.as_ref().unwrap().value != result.current.value;
285//! assert!(changed); // 1 != 2, changed!
286//! # }
287//! ```
288//!
289//! ## Pattern: Conditional Processing
290//!
291//! Uses [`emit_when`](EmitWhenExt::emit_when) to gate emissions based on a filter stream,
292//! only emitting when the condition is satisfied.
293//!
294//! ```rust
295//! use fluxion_stream::{FluxionStream, EmitWhenExt};
296//! use fluxion_test_utils::{Sequenced, helpers::unwrap_stream, unwrap_value, test_channel};
297//! use fluxion_core::Timestamped as TimestampedTrait;
298//!
299//! # async fn example() {
300//! // Send notifications only when enabled
301//! let (event_tx, events) = test_channel::<Sequenced<i32>>();
302//! let (enabled_tx, enabled) = test_channel::<Sequenced<i32>>();
303//!
304//! let mut notifications = events.emit_when(
305//!     enabled,
306//!     |state| state.values().get(1).map(|v| *v > 0).unwrap_or(false)
307//! );
308//!
309//! // Enable notifications
310//! enabled_tx.send((1, 1).into()).unwrap();
311//! // Send event
312//! event_tx.send((999, 2).into()).unwrap();
313//!
314//! let result = unwrap_value(Some(unwrap_stream(&mut notifications, 500).await));
315//! assert_eq!(result.value, 999);
316//! # }
317//! ```
318//!
319//! # Anti-Patterns
320//!
321//! ## ❌ Don't: Use `ordered_merge` When Order Doesn't Matter
322//!
323//! ```text
324//! // BAD: Ordering overhead when you don't need it
325//! let merged = stream1.ordered_merge(vec![stream2]);
326//! ```
327//!
328//! Use standard futures combinators instead:
329//!
330//! ```text
331//! // GOOD: Use futures::stream::select for unordered merging
332//! use futures::stream::select;
333//! let merged = select(stream1, stream2);
334//! ```
335//!
336//! ## ❌ Don't: Use `combine_latest` for All Items
337//!
338//! ```text
339//! // BAD: combine_latest only emits latest, loses intermediate values
340//! let combined = stream1.combine_latest(vec![stream2], |_| true);
341//! ```
342//!
343//! Use `ordered_merge` to get all items:
344//!
345//! ```text
346//! // GOOD: ordered_merge emits every item
347//! let merged = stream1.ordered_merge(vec![stream2]);
348//! ```
349//!
350//! ## ❌ Don't: Complex Filter Logic in Operators
351//!
352//! ```text
353//! // BAD: Complex business logic in filter predicate
354//! stream1.combine_latest(vec![stream2], |state| {
355//!     // 50 lines of complex filtering logic...
356//! });
357//! ```
358//!
359//! Extract to well-tested function:
360//!
361//! ```text
362//! // GOOD: Testable, reusable filter logic
363//! fn should_emit(state: &CombinedState<i32>) -> bool {
364//!     // Clear, testable logic
365//!     state.values().iter().all(|&v| v > 0)
366//! }
367//!
368//! stream1.combine_latest(vec![stream2], should_emit);
369//! ```
370//!
371//! # Operator Chaining
372//!
373//! Fluxion operators are designed to be composed together, creating sophisticated
374//! data flows from simple building blocks. The key to successful chaining is
375//! understanding how each operator transforms the stream.
376//!
377//! ## Basic Chaining Pattern
378//!
379//! ```rust
380//! use fluxion_stream::{FluxionStream, CombineWithPreviousExt, TakeLatestWhenExt};
381//! use fluxion_test_utils::{Sequenced, test_channel, helpers::unwrap_stream, unwrap_value};
382//! use fluxion_core::Timestamped as TimestampedTrait;
383//!
384//! async fn example() {
385//! let (source_tx, source_stream) = test_channel::<Sequenced<i32>>();
386//! let (filter_tx, filter_stream) = test_channel::<Sequenced<i32>>();
387//!
388//! // Chain: sample when filter emits, then pair with previous value
389//! let sampled = source_stream.take_latest_when(filter_stream, |_| true);
390//! let mut composed = FluxionStream::new(sampled).combine_with_previous();
391//!
392//! source_tx.send(Sequenced::new(42)).unwrap();
393//! filter_tx.send(Sequenced::new(1)).unwrap();
394//!
395//! let item = unwrap_value(Some(unwrap_stream(&mut composed, 500).await));
396//! assert!(item.previous.is_none());
397//! assert_eq!(&item.current.value, &42);
398//! }
399//! ```
400//!
401//! ## Chaining with Transformation
402//!
403//! Use [`map_ordered`] and [`filter_ordered`] to transform streams while preserving
404//! temporal ordering:
405//!
406//! ```rust
407//! use fluxion_stream::{FluxionStream};
408//! use fluxion_test_utils::{Sequenced, test_channel, helpers::unwrap_stream, unwrap_value};
409//! use fluxion_core::Timestamped as TimestampedTrait;
410//!
411//! async fn example() {
412//! let (tx, stream) = test_channel::<Sequenced<i32>>();
413//!
414//! // Chain: filter positives, map to string
415//! let mut composed = FluxionStream::new(stream)
416//!     .filter_ordered(|&n| n > 0)  // filter_ordered receives &T::Inner
417//!     .map_ordered(|seq| format!("Value: {}", seq.value));  // map_ordered receives T
418//!
419//! tx.send(Sequenced::new(-1)).unwrap();
420//! tx.send(Sequenced::new(5)).unwrap();
421//!
422//! let result = unwrap_value(Some(unwrap_stream(&mut composed, 500).await));
423//! assert_eq!(result, "Value: 5");
424//! }
425//! ```
426//!
427//! ## Multi-Stream Chaining
428//!
429//! Combine multiple streams and then process the result:
430//!
431//! ```rust
432//! use fluxion_stream::{FluxionStream, CombineLatestExt, CombineWithPreviousExt};
433//! use fluxion_test_utils::{Sequenced, test_channel, helpers::unwrap_stream, unwrap_value};
434//! use fluxion_core::Timestamped as TimestampedTrait;
435//!
436//! async fn example() {
437//! let (tx1, stream1) = test_channel::<Sequenced<i32>>();
438//! let (tx2, stream2) = test_channel::<Sequenced<i32>>();
439//!
440//! // Chain: combine latest from both streams, then track changes
441//! let mut composed = stream1
442//!     .combine_latest(vec![stream2], |_| true)
443//!     .combine_with_previous();
444//!
445//! tx1.send(Sequenced::new(1)).unwrap();
446//! tx2.send(Sequenced::new(2)).unwrap();
447//!
448//! let item = unwrap_value(Some(unwrap_stream(&mut composed, 500).await));
449//! assert!(item.previous.is_none());
450//! assert_eq!(item.current.values().len(), 2);
451//! }
452//! ```
453//!
454//! ## Key Principles for Chaining
455//!
456//! 1. **Use `map_ordered` and `filter_ordered`**: These preserve the `FluxionStream` wrapper
457//!    and maintain temporal ordering guarantees
458//! 2. **Order matters**: `combine_with_previous().filter_ordered()` is different from
459//!    `filter_ordered().combine_with_previous()`
460//! 3. **Type awareness**: Each operator changes the item type - track what flows through
461//!    the chain
462//! 4. **Test incrementally**: Build complex chains step by step, testing each addition
463//!
464//! ## Advanced Composition Examples
465//!
466//! ### 1. Ordered Merge → Combine With Previous
467//!
468//! Merge multiple streams in temporal order, then track consecutive values:
469//!
470//! ```rust
471//! use fluxion_stream::{FluxionStream, OrderedStreamExt, CombineWithPreviousExt};
472//! use fluxion_test_utils::{Sequenced, test_channel, helpers::unwrap_stream, unwrap_value};
473//! use fluxion_core::Timestamped as TimestampedTrait;
474//!
475//! async fn example() {
476//! let (tx1, stream1) = test_channel::<Sequenced<i32>>();
477//! let (tx2, stream2) = test_channel::<Sequenced<i32>>();
478//!
479//! // Merge streams in temporal order, then pair consecutive values
480//! let mut composed = stream1
481//!     .ordered_merge(vec![FluxionStream::new(stream2)])
482//!     .combine_with_previous();
483//!
484//! tx1.send(Sequenced::new(1)).unwrap();
485//! let item = unwrap_value(Some(unwrap_stream(&mut composed, 500).await));
486//! assert!(item.previous.is_none());
487//! assert_eq!(&item.current.value, &1);
488//!
489//! tx2.send(Sequenced::new(2)).unwrap();
490//! let item = unwrap_value(Some(unwrap_stream(&mut composed, 500).await));
491//! assert_eq!(&item.previous.unwrap().value, &1);
492//! assert_eq!(&item.current.value, &2);
493//! }
494//! ```
495//!
496//! ### 2. Combine Latest → Combine With Previous
497//!
498//! Combine latest values from multiple streams, then track state changes:
499//!
500//! ```rust
501//! use fluxion_stream::{FluxionStream, CombineLatestExt, CombineWithPreviousExt};
502//! use fluxion_test_utils::{Sequenced, test_channel, helpers::unwrap_stream, unwrap_value};
503//! use fluxion_core::Timestamped as TimestampedTrait;
504//!
505//! async fn example() {
506//! let (tx1, stream1) = test_channel::<Sequenced<i32>>();
507//! let (tx2, stream2) = test_channel::<Sequenced<i32>>();
508//!
509//! // Combine latest, then track previous combined state
510//! let mut composed = stream1
511//!     .combine_latest(vec![stream2], |_| true)
512//!     .combine_with_previous();
513//!
514//! tx1.send(Sequenced::new(1)).unwrap();
515//! tx2.send(Sequenced::new(2)).unwrap();
516//!
517//! let item = unwrap_value(Some(unwrap_stream(&mut composed, 500).await));
518//! assert!(item.previous.is_none());
519//! assert_eq!(item.current.values().len(), 2);
520//!
521//! tx1.send(Sequenced::new(3)).unwrap();
522//! let item = unwrap_value(Some(unwrap_stream(&mut composed, 500).await));
523//! // Previous state had [1, 2], current has [3, 2]
524//! assert!(item.previous.is_some());
525//! }
526//! ```
527//!
528//! ### 3. Combine Latest → Take While With
529//!
530//! Combine streams and continue only while a condition holds:
531//!
532//! ```rust
533//! use fluxion_stream::{FluxionStream, CombineLatestExt, TakeWhileExt};
534//! use fluxion_test_utils::{Sequenced, test_channel, helpers::unwrap_stream, unwrap_value};
535//! use fluxion_core::Timestamped as TimestampedTrait;
536//!
537//! async fn example() {
538//! let (tx1, stream1) = test_channel::<Sequenced<i32>>();
539//! let (tx2, stream2) = test_channel::<Sequenced<i32>>();
540//! let (filter_tx, filter_stream) = test_channel::<Sequenced<bool>>();
541//!
542//! // Combine latest values, but stop when filter becomes false
543//! let mut composed = stream1
544//!     .combine_latest(vec![stream2], |_| true)
545//!     .take_while_with(filter_stream, |f| *f);
546//!
547//! filter_tx.send(Sequenced::new(true)).unwrap();
548//! tx1.send(Sequenced::new(1)).unwrap();
549//! tx2.send(Sequenced::new(2)).unwrap();
550//!
551//! let combined = unwrap_value(Some(unwrap_stream(&mut composed, 500).await));
552//! assert_eq!(combined.values().len(), 2);
553//! }
554//! ```
555//!
556//! ### 4. Ordered Merge → Take While With
557//!
558//! Merge streams in order and terminate based on external condition:
559//!
560//! ```rust
561//! use fluxion_stream::{FluxionStream, OrderedStreamExt, TakeWhileExt};
562//! use fluxion_test_utils::{Sequenced, test_channel, helpers::unwrap_stream, unwrap_value};
563//! use fluxion_core::Timestamped as TimestampedTrait;
564//!
565//! async fn example() {
566//! let (tx1, stream1) = test_channel::<Sequenced<i32>>();
567//! let (tx2, stream2) = test_channel::<Sequenced<i32>>();
568//! let (filter_tx, filter_stream) = test_channel::<Sequenced<bool>>();
569//!
570//! // Merge all values in order, but stop when filter says so
571//! let mut composed = stream1
572//!     .ordered_merge(vec![FluxionStream::new(stream2)])
573//!     .take_while_with(filter_stream, |f| *f);
574//!
575//! filter_tx.send(Sequenced::new(true)).unwrap();
576//! tx1.send(Sequenced::new(1)).unwrap();
577//!
578//! let item = unwrap_value(Some(unwrap_stream(&mut composed, 500).await)).value.clone();
579//! assert_eq!(item, 1);
580//!
581//! tx2.send(Sequenced::new(2)).unwrap();
582//! let item = unwrap_value(Some(unwrap_stream(&mut composed, 500).await)).value.clone();
583//! assert_eq!(item, 2);
584//! }
585//! ```
586//!
587//! ### 5. Take Latest When → Combine With Previous
588//!
589//! Sample latest value on trigger, then pair with previous sampled value:
590//!
591//! ```rust
592//! use fluxion_stream::{FluxionStream, TakeLatestWhenExt, CombineWithPreviousExt};
593//! use fluxion_test_utils::{Sequenced, test_channel, helpers::unwrap_stream, unwrap_value};
594//! use fluxion_core::Timestamped as TimestampedTrait;
595//!
596//! async fn example() {
597//! let (source_tx, source_stream) = test_channel::<Sequenced<i32>>();
598//! let (filter_tx, filter_stream) = test_channel::<Sequenced<i32>>();
599//!
600//! // Sample source when filter emits, then track consecutive samples
601//! let sampled = source_stream.take_latest_when(filter_stream, |_| true);
602//! let mut composed = FluxionStream::new(sampled).combine_with_previous();
603//!
604//! source_tx.send(Sequenced::new(42)).unwrap();
605//! filter_tx.send(Sequenced::new(0)).unwrap();
606//!
607//! let item = unwrap_value(Some(unwrap_stream(&mut composed, 500).await));
608//! assert!(item.previous.is_none());
609//! assert_eq!(&item.current.value, &42);
610//!
611//! source_tx.send(Sequenced::new(99)).unwrap();
612//! let item = unwrap_value(Some(unwrap_stream(&mut composed, 500).await));
613//! assert_eq!(&item.previous.unwrap().value, &42);
614//! assert_eq!(&item.current.value, &99);
615//! }
616//! ```
617//!
618//! These patterns demonstrate how Fluxion operators compose to create sophisticated
619//! data flows. See the composition tests in the source repository for more examples.
620//!
621//! [`map_ordered`]: fluxion_stream::FluxionStream::map_ordered
622//! [`filter_ordered`]: fluxion_stream::FluxionStream::filter_ordered
623//!
624//! # Getting Started
625//!
626//! Add to your `Cargo.toml`:
627//!
628//! ```toml
629//! [dependencies]
630//! fluxion-stream = { path = "../fluxion-stream" }
631//! tokio = { version = "1.48", features = ["sync", "rt"] }
632//! futures = "0.3"
633//! ```
634//!
635//! See individual operator documentation for detailed examples.
636//!
637//! [`combine_latest`]: CombineLatestExt::combine_latest
638//! [`with_latest_from`]: WithLatestFromExt::with_latest_from
639//! [`ordered_merge`]: OrderedStreamExt::ordered_merge
640//! [`emit_when`]: EmitWhenExt::emit_when
641//! [`take_latest_when`]: TakeLatestWhenExt::take_latest_when
642//! [`take_while_with`]: TakeWhileExt::take_while_with
643//! [`combine_with_previous`]: CombineWithPreviousExt::combine_with_previous
644
645#![allow(clippy::multiple_crate_versions, clippy::doc_markdown)]
646#[macro_use]
647mod logging;
648pub mod combine_latest;
649pub mod combine_with_previous;
650pub mod emit_when;
651pub mod fluxion_stream;
652pub mod merge_with;
653pub mod ordered_merge;
654pub mod take_latest_when;
655pub mod take_while_with;
656pub mod types;
657pub mod with_latest_from;
658
659// Re-export commonly used types
660pub use combine_latest::CombineLatestExt;
661pub use combine_with_previous::CombineWithPreviousExt;
662pub use emit_when::EmitWhenExt;
663pub use fluxion_stream::FluxionStream;
664pub use merge_with::MergedStream;
665pub use ordered_merge::OrderedStreamExt;
666pub use take_latest_when::TakeLatestWhenExt;
667pub use take_while_with::TakeWhileExt;
668pub use types::{CombinedState, WithPrevious};
669pub use with_latest_from::WithLatestFromExt;