fluxion_stream/lib.rs
1// Copyright 2025 Umberto Gotti <umberto.gotti@umbertogotti.dev>
2// Licensed under the Apache License, Version 2.0
3// http://www.apache.org/licenses/LICENSE-2.0
4
5#![cfg_attr(
6 not(any(
7 feature = "runtime-tokio",
8 feature = "runtime-smol",
9 feature = "runtime-async-std",
10 target_arch = "wasm32"
11 )),
12 no_std
13)]
14#![allow(clippy::multiple_crate_versions, clippy::doc_markdown)]
15
16//! Stream operators with temporal ordering guarantees.
17//!
18//! This crate provides reactive stream combinators that maintain temporal ordering
19//! across asynchronous operations. All operators work with types implementing the
20//! [`Timestamped`](fluxion_core::Timestamped) trait, which provides timestamp-based ordering for correct temporal sequencing.
21//!
22//! # Architecture
23//!
24//! The crate is built around several key concepts:
25//!
26//! - **Extension traits**: Each operator is provided via an extension trait for composability
27//! - **[`Timestamped`](fluxion_core::Timestamped) trait**: Types must have intrinsic timestamps for temporal ordering
28//! - **Temporal correctness**: All operators respect the timestamp ordering of items across streams
29//! - **[`prelude`] module**: Import all traits at once with `use fluxion_stream::prelude::*;`
30//!
31//! ## Operator Categories
32//!
33//! ### Combination Operators
34//!
35//! - **[`combine_latest`](CombineLatestExt::combine_latest)**: Emits when any stream emits, combining latest values from all streams
36//! - **[`with_latest_from`](WithLatestFromExt::with_latest_from)**: Samples secondary streams only when primary emits
37//! - **[`ordered_merge`](OrderedStreamExt::ordered_merge)**: Merges multiple streams preserving temporal order
38//!
39//! ### Filtering Operators
40//!
41//! - **[`emit_when`](EmitWhenExt::emit_when)**: Gates source emissions based on filter stream conditions
42//! - **[`take_latest_when`](TakeLatestWhenExt::take_latest_when)**: Samples source when filter condition is met
43//! - **[`take_while_with`](TakeWhileExt::take_while_with)**: Emits while condition holds, terminates when false
44//! - **[`filter_ordered`](FilterOrderedExt::filter_ordered)**: Filters items based on predicate
45//! - **[`distinct_until_changed`](DistinctUntilChangedExt::distinct_until_changed)**: Filters consecutive duplicates
46//!
47//! ### Transformation Operators
48//!
49//! - **[`scan_ordered`](ScanOrderedExt::scan_ordered)**: Accumulates state across stream items, emitting intermediate results
50//! - **[`combine_with_previous`](CombineWithPreviousExt::combine_with_previous)**: Pairs each value with previous value
51//! - **[`map_ordered`](MapOrderedExt::map_ordered)**: Transforms each item
52//! - **[`start_with`](StartWithExt::start_with)**: Prepends initial values
53//!
54//! # Temporal Ordering Explained
55//!
56//! All operators in this crate maintain **temporal ordering** - items are processed in the
57//! order of their intrinsic ordering value, not the order they arrive at the operator.
58//!
59//! ## How It Works
60//!
61//! When multiple streams are combined:
62//!
63//! 1. Each stream item must implement [`Timestamped`](fluxion_core::Timestamped), providing a comparable timestamp
64//! 2. Operators use [`ordered_merge`](OrderedStreamExt::ordered_merge) internally to sequence items
65//! 3. Items are buffered and emitted in order of their timestamp
66//! 4. Late-arriving items with earlier timestamps are placed correctly in the sequence
67//!
68//! ## Example: Out-of-Order Delivery
69//!
70//! ```
71//! use fluxion_stream::OrderedStreamExt;
72//! use fluxion_test_utils::{Sequenced, helpers::unwrap_stream, test_channel};
73//!
74//! # #[tokio::main]
75//! # async fn main() {
76//! let (tx1, stream1) = test_channel::<Sequenced<i32>>();
77//! let (tx2, stream2) = test_channel::<Sequenced<i32>>();
78//!
79//! let mut merged = stream1.ordered_merge(vec![stream2]);
80//!
81//! // Send out of order - stream2 sends seq=1, stream1 sends seq=2
82//! tx2.unbounded_send((100, 1).into()).unwrap();
83//! tx1.unbounded_send((200, 2).into()).unwrap();
84//!
85//! // Items are emitted in temporal order (seq 1, then seq 2)
86//! let first = unwrap_stream(&mut merged, 500).await.unwrap();
87//! assert_eq!(first.value, 100); // seq=1 arrives first despite being sent second
88//! # }
89//! ```
90//!
91//! # Operator Selection Guide
92//!
93//! Choose the right operator for your use case:
94//!
95//! ## When You Need Combined State
96//!
97//! | Operator | Use When | Triggers On | Example Use Case |
98//! |----------|----------|-------------|------------------|
99//! | [`combine_latest`] | You need latest from all streams | Any stream emits | Dashboard combining multiple data sources |
100//! | [`with_latest_from`] | You have primary + context streams | Primary emits only | User clicks enriched with latest config |
101//!
102//! ## When You Need All Items
103//!
104//! | Operator | Use When | Output | Example Use Case |
105//! |----------|----------|--------|------------------|
106//! | [`ordered_merge`] | Merge multiple sources in order | Every item from all streams | Event log from multiple services |
107//! | [`combine_with_previous`] | Compare consecutive items | Pairs of (previous, current) | Detecting value changes |
108//!
109//! ## When You Need Conditional Emission
110//!
111//! | Operator | Use When | Behavior | Example Use Case |
112//! |----------|----------|----------|------------------|
113//! | [`emit_when`] | Gate by condition | Emits source when filter is true | Send notifications only when enabled |
114//! | [`take_latest_when`] | Sample on condition | Emits latest source when filter triggers | Sample sensor on button press |
115//! | [`take_while_with`] | Stop on condition | Emits until condition false, then stops | Process until timeout |
116//!
117//! # Performance Characteristics
118//!
119//! ## Memory Usage
120//!
121//! - **[`combine_latest`]**: $O(n)$ - stores one latest value per stream
122//! - **[`ordered_merge`]**: $O(k)$ - buffers items until ordering confirmed ($k$ = buffer size)
123//! - **[`with_latest_from`]**: $O(n)$ - stores one value per secondary stream
124//! - **[`combine_with_previous`]**: $O(1)$ - stores only previous value
125//!
126//! ## Latency Considerations
127//!
128//! - **Ordered operators**: May buffer items waiting for earlier-ordered items
129//! - **Unordered operators**: Process items immediately as they arrive
130//! - **Combining operators**: Wait for all streams to emit at least once before first emission
131//!
132//! ## Throughput
133//!
134//! All operators use lock-free or minimally-locked designs:
135//!
136//! - Single mutex per operator (not per item)
137//! - No blocking operations in hot paths
138//! - Efficient polling with `futures::StreamExt`
139//!
140//! # Return Type Patterns
141//!
142//! Fluxion operators use two different return type patterns, each chosen for specific
143//! reasons related to type erasure, composability, and performance.
144//!
145//! ## Pattern 1: `impl Stream<Item = T>`
146//!
147//! **When used:** Lightweight operators with simple transformations
148//!
149//! **Examples:**
150//! - [`ordered_merge`](OrderedStreamExt::ordered_merge)
151//! - [`map_ordered`](MapOrderedExt::map_ordered)
152//! - [`filter_ordered`](FilterOrderedExt::filter_ordered)
153//!
154//! **Benefits:**
155//! - Zero-cost abstraction (no boxing)
156//! - Compiler can fully optimize the stream pipeline
157//! - Type information preserved for further optimizations
158//!
159//! **Tradeoffs:**
160//! - Concrete type exposed in signatures (can be complex)
161//! - May increase compile times for deeply nested operators
162//!
163//! ## Pattern 2: `Pin<Box<dyn Stream<Item = T>>>`
164//!
165//! **When used:** Operators with dynamic dispatch requirements or complex internal state
166//!
167//! **Examples:**
168//! - [`emit_when`](EmitWhenExt::emit_when)
169//! - [`take_latest_when`](TakeLatestWhenExt::take_latest_when)
170//! - [`take_while_with`](TakeWhileExt::take_while_with)
171//! - [`combine_latest`](CombineLatestExt::combine_latest)
172//! - [`combine_with_previous`](CombineWithPreviousExt::combine_with_previous)
173//!
174//! **Benefits:**
175//! - Type erasure simplifies signatures
176//! - Reduces compile time for complex operator chains
177//! - Hides internal implementation details
178//!
179//! **Tradeoffs:**
180//! - Heap allocation (small overhead)
181//! - Dynamic dispatch prevents some optimizations
182//! - Runtime cost typically negligible compared to async operations
183//!
184//! **Why used for these operators:**
185//! These operators maintain internal state machines with multiple branches and complex
186//! lifetime requirements. Type erasure keeps the public API simple while allowing
187//! internal flexibility.
188//!
189//! ## Composability
190//!
191//! As a user, you typically don't need to worry about these patterns - both compose
192//! seamlessly. Each operator returns something that implements `Stream`, so they chain
193//! naturally together. For example, combining different operators in a single chain works
194//! regardless of their internal implementation patterns.
195//!
196//! The patterns are implementation details chosen to balance performance, ergonomics,
197//! and maintainability.
198//!
199//! # Common Patterns
200//!
201//! ## Pattern: Enriching Events with Context
202//!
203//! Uses [`with_latest_from`](WithLatestFromExt::with_latest_from) to combine a primary stream
204//! with context from a secondary stream.
205//!
206//! ```rust
207//! use fluxion_stream::WithLatestFromExt;
208//! use fluxion_test_utils::{Sequenced, helpers::unwrap_stream, unwrap_value, test_channel};
209//! use fluxion_core::Timestamped as TimestampedTrait;
210//!
211//! # async fn example() {
212//! // User clicks enriched with latest configuration
213//! let (click_tx, clicks) = test_channel::<Sequenced<String>>();
214//! let (config_tx, configs) = test_channel::<Sequenced<String>>();
215//!
216//! let mut enriched = clicks.with_latest_from(
217//! configs,
218//! |state| state.clone()
219//! );
220//!
221//! // Send config first, then click
222//! config_tx.unbounded_send(("theme=dark".to_string(), 1).into()).unwrap();
223//! click_tx.unbounded_send(("button1".to_string(), 2).into()).unwrap();
224//!
225//! let result = unwrap_value(Some(unwrap_stream(&mut enriched, 500).await));
226//! assert_eq!(result.values().len(), 2); // Has both click and config
227//! # }
228//! ```
229//!
230//! ## Pattern: Merging Multiple Event Sources
231//!
232//! Uses [`ordered_merge`](OrderedStreamExt::ordered_merge) to combine logs from multiple
233//! services in temporal order.
234//!
235//! ```rust
236//! use fluxion_stream::OrderedStreamExt;
237//! use fluxion_test_utils::{Sequenced, helpers::unwrap_stream, unwrap_value, test_channel};
238//!
239//! # async fn example() {
240//! // Combine logs from multiple services in temporal order
241//! let (service1_tx, service1) = test_channel::<Sequenced<String>>();
242//! let (service2_tx, service2) = test_channel::<Sequenced<String>>();
243//!
244//! let mut unified_log = service1.ordered_merge(vec![service2]);
245//!
246//! // Send logs with different timestamps
247//! service1_tx.unbounded_send(("service1: started".to_string(), 1).into()).unwrap();
248//! service2_tx.unbounded_send(("service2: ready".to_string(), 2).into()).unwrap();
249//!
250//! let first = unwrap_value(Some(unwrap_stream(&mut unified_log, 500).await));
251//! assert_eq!(first.value, "service1: started");
252//! # }
253//! ```
254//!
255//! ## Pattern: Change Detection
256//!
257//! Uses [`combine_with_previous`](CombineWithPreviousExt::combine_with_previous) to detect
258//! when values change by comparing with the previous value.
259//!
260//! ```rust
261//! use fluxion_stream::CombineWithPreviousExt;
262//! use fluxion_test_utils::{Sequenced, helpers::unwrap_stream, unwrap_value, test_channel};
263//! use fluxion_core::Timestamped as TimestampedTrait;
264//!
265//! # async fn example() {
266//! let (tx, stream) = test_channel::<Sequenced<i32>>();
267//!
268//! // Pair each value with its previous value
269//! let mut paired = stream.combine_with_previous();
270//!
271//! // Send values
272//! tx.unbounded_send((1, 1).into()).unwrap();
273//! tx.unbounded_send((1, 2).into()).unwrap(); // Same value
274//! tx.unbounded_send((2, 3).into()).unwrap(); // Changed!
275//!
276//! let result = unwrap_value(Some(unwrap_stream(&mut paired, 500).await));
277//! assert!(result.previous.is_none()); // First has no previous
278//!
279//! let result = unwrap_value(Some(unwrap_stream(&mut paired, 500).await));
280//! let changed = result.previous.as_ref().unwrap().value != result.current.value;
281//! assert!(!changed); // 1 == 1, no change
282//!
283//! let result = unwrap_value(Some(unwrap_stream(&mut paired, 500).await));
284//! let changed = result.previous.as_ref().unwrap().value != result.current.value;
285//! assert!(changed); // 1 != 2, changed!
286//! # }
287//! ```
288//!
289//! ## Pattern: Conditional Processing
290//!
291//! Uses [`emit_when`](EmitWhenExt::emit_when) to gate emissions based on a filter stream,
292//! only emitting when the condition is satisfied.
293//!
294//! ```rust
295//! use fluxion_stream::EmitWhenExt;
296//! use fluxion_test_utils::{Sequenced, helpers::unwrap_stream, unwrap_value, test_channel};
297//! use fluxion_core::Timestamped as TimestampedTrait;
298//!
299//! # async fn example() {
300//! // Send notifications only when enabled
301//! let (event_tx, events) = test_channel::<Sequenced<i32>>();
302//! let (enabled_tx, enabled) = test_channel::<Sequenced<i32>>();
303//!
304//! let mut notifications = events.emit_when(
305//! enabled,
306//! |state| state.values().get(1).map(|v| *v > 0).unwrap_or(false)
307//! );
308//!
309//! // Enable notifications
310//! enabled_tx.unbounded_send((1, 1).into()).unwrap();
311//! // Send event
312//! event_tx.unbounded_send((999, 2).into()).unwrap();
313//!
314//! let result = unwrap_value(Some(unwrap_stream(&mut notifications, 500).await));
315//! assert_eq!(result.value, 999);
316//! # }
317//! ```
318//!
319//! # Anti-Patterns
320//!
321//! ## ? Don't: Use `ordered_merge` When Order Doesn't Matter
322//!
323//! ```text
324//! // BAD: Ordering overhead when you don't need it
325//! let merged = stream1.ordered_merge(vec![stream2]);
326//! ```
327//!
328//! Use standard futures combinators instead:
329//!
330//! ```text
331//! // GOOD: Use futures::stream::select for unordered merging
332//! use futures::stream::select;
333//! let merged = select(stream1, stream2);
334//! ```
335//!
336//! ## ? Don't: Use `combine_latest` for All Items
337//!
338//! ```text
339//! // BAD: combine_latest only emits latest, loses intermediate values
340//! let combined = stream1.combine_latest(vec![stream2], |_| true);
341//! ```
342//!
343//! Use `ordered_merge` to get all items:
344//!
345//! ```text
346//! // GOOD: ordered_merge emits every item
347//! let merged = stream1.ordered_merge(vec![stream2]);
348//! ```
349//!
350//! ## ? Don't: Complex Filter Logic in Operators
351//!
352//! ```text
353//! // BAD: Complex business logic in filter predicate
354//! stream1.combine_latest(vec![stream2], |state| {
355//! // 50 lines of complex filtering logic...
356//! });
357//! ```
358//!
359//! Extract to well-tested function:
360//!
361//! ```text
362//! // GOOD: Testable, reusable filter logic
363//! fn should_emit(state: &CombinedState<i32>) -> bool {
364//! // Clear, testable logic
365//! state.values().iter().all(|&v| v > 0)
366//! }
367//!
368//! stream1.combine_latest(vec![stream2], should_emit);
369//! ```
370//!
371//! # Operator Chaining
372//!
373//! Fluxion operators are designed to be composed together, creating sophisticated
374//! data flows from simple building blocks. The key to successful chaining is
375//! understanding how each operator transforms the stream.
376//!
377//! ## Basic Chaining Pattern
378//!
379//! ```rust
380//! use fluxion_stream::{CombineWithPreviousExt, TakeLatestWhenExt};
381//! use fluxion_test_utils::{Sequenced, test_channel, helpers::unwrap_stream, unwrap_value};
382//! use fluxion_core::Timestamped as TimestampedTrait;
383//!
384//! async fn example() {
385//! let (source_tx, source_stream) = test_channel::<Sequenced<i32>>();
386//! let (filter_tx, filter_stream) = test_channel::<Sequenced<i32>>();
387//!
388//! // Chain: sample when filter emits, then pair with previous value
389//! let mut composed = source_stream
390//! .take_latest_when(filter_stream, |_| true)
391//! .combine_with_previous();
392//!
393//! source_tx.unbounded_send(Sequenced::new(42)).unwrap();
394//! filter_tx.unbounded_send(Sequenced::new(1)).unwrap();
395//!
396//! let item = unwrap_value(Some(unwrap_stream(&mut composed, 500).await));
397//! assert!(item.previous.is_none());
398//! assert_eq!(&item.current.value, &42);
399//! }
400//! ```
401//!
402//! ## Chaining with Transformation
403//!
404//! Use [`map_ordered`](MapOrderedExt::map_ordered) and [`filter_ordered`](FilterOrderedExt::filter_ordered)
405//! to transform streams while preserving temporal ordering:
406//!
407//! ```rust
408//! use fluxion_stream::{MapOrderedExt, FilterOrderedExt};
409//! use fluxion_test_utils::{Sequenced, test_channel, helpers::unwrap_stream, unwrap_value};
410//! use fluxion_core::Timestamped as TimestampedTrait;
411//!
412//! async fn example() {
413//! let (tx, stream) = test_channel::<Sequenced<i32>>();
414//!
415//! // Chain: filter positives, map to string
416//! let mut composed = stream
417//! .filter_ordered(|&n| n > 0) // filter_ordered receives &T::Inner
418//! .map_ordered(|seq| Sequenced::new(format!("Value: {}", seq.value))); // map_ordered receives T
419//!
420//! tx.unbounded_send(Sequenced::new(-1)).unwrap();
421//! tx.unbounded_send(Sequenced::new(5)).unwrap();
422//!
423//! let result = unwrap_value(Some(unwrap_stream(&mut composed, 500).await));
424//! assert_eq!(result.value, "Value: 5");
425//! }
426//! ```
427//!
428//! ## Multi-Stream Chaining
429//!
430//! Combine multiple streams and then process the result:
431//!
432//! ```rust
433//! use fluxion_stream::{CombineLatestExt, CombineWithPreviousExt};
434//! use fluxion_test_utils::{Sequenced, test_channel, helpers::unwrap_stream, unwrap_value};
435//! use fluxion_core::Timestamped as TimestampedTrait;
436//!
437//! async fn example() {
438//! let (tx1, stream1) = test_channel::<Sequenced<i32>>();
439//! let (tx2, stream2) = test_channel::<Sequenced<i32>>();
440//!
441//! // Chain: combine latest from both streams, then track changes
442//! let mut composed = stream1
443//! .combine_latest(vec![stream2], |_| true)
444//! .combine_with_previous();
445//!
446//! tx1.unbounded_send(Sequenced::new(1)).unwrap();
447//! tx2.unbounded_send(Sequenced::new(2)).unwrap();
448//!
449//! let item = unwrap_value(Some(unwrap_stream(&mut composed, 500).await));
450//! assert!(item.previous.is_none());
451//! assert_eq!(item.current.values().len(), 2);
452//! }
453//! ```
454//!
455//! ## Key Principles for Chaining
456//!
457//! 1. **Use extension traits**: Import traits like `MapOrderedExt`, `FilterOrderedExt` for transformations
458//! 2. **Order matters**: `combine_with_previous().filter_ordered()` is different from
459//! `filter_ordered().combine_with_previous()`
460//! 3. **Type awareness**: Each operator changes the item type - track what flows through
461//! the chain
462//! 4. **Test incrementally**: Build complex chains step by step, testing each addition
463//!
464//! ## Advanced Composition Examples
465//!
466//! ### 1. Ordered Merge ? Combine With Previous
467//!
468//! Merge multiple streams in temporal order, then track consecutive values:
469//!
470//! ```rust
471//! use fluxion_stream::{OrderedStreamExt, CombineWithPreviousExt};
472//! use fluxion_test_utils::{Sequenced, test_channel, helpers::unwrap_stream, unwrap_value};
473//! use fluxion_core::Timestamped as TimestampedTrait;
474//!
475//! async fn example() {
476//! let (tx1, stream1) = test_channel::<Sequenced<i32>>();
477//! let (tx2, stream2) = test_channel::<Sequenced<i32>>();
478//!
479//! // Merge streams in temporal order, then pair consecutive values
480//! let mut composed = stream1
481//! .ordered_merge(vec![stream2])
482//! .combine_with_previous();
483//!
484//! tx1.unbounded_send(Sequenced::new(1)).unwrap();
485//! let item = unwrap_value(Some(unwrap_stream(&mut composed, 500).await));
486//! assert!(item.previous.is_none());
487//! assert_eq!(&item.current.value, &1);
488//!
489//! tx2.unbounded_send(Sequenced::new(2)).unwrap();
490//! let item = unwrap_value(Some(unwrap_stream(&mut composed, 500).await));
491//! assert_eq!(&item.previous.unwrap().value, &1);
492//! assert_eq!(&item.current.value, &2);
493//! }
494//! ```
495//!
496//! ### 2. Combine Latest ? Combine With Previous
497//!
498//! Combine latest values from multiple streams, then track state changes:
499//!
500//! ```rust
501//! use fluxion_stream::{CombineLatestExt, CombineWithPreviousExt};
502//! use fluxion_test_utils::{Sequenced, test_channel, helpers::unwrap_stream, unwrap_value};
503//! use fluxion_core::Timestamped as TimestampedTrait;
504//!
505//! async fn example() {
506//! let (tx1, stream1) = test_channel::<Sequenced<i32>>();
507//! let (tx2, stream2) = test_channel::<Sequenced<i32>>();
508//!
509//! // Combine latest, then track previous combined state
510//! let mut composed = stream1
511//! .combine_latest(vec![stream2], |_| true)
512//! .combine_with_previous();
513//!
514//! tx1.unbounded_send(Sequenced::new(1)).unwrap();
515//! tx2.unbounded_send(Sequenced::new(2)).unwrap();
516//!
517//! let item = unwrap_value(Some(unwrap_stream(&mut composed, 500).await));
518//! assert!(item.previous.is_none());
519//! assert_eq!(item.current.values().len(), 2);
520//!
521//! tx1.unbounded_send(Sequenced::new(3)).unwrap();
522//! let item = unwrap_value(Some(unwrap_stream(&mut composed, 500).await));
523//! // Previous state had [1, 2], current has [3, 2]
524//! assert!(item.previous.is_some());
525//! }
526//! ```
527//!
528//! ### 3. Combine Latest ? Take While With
529//!
530//! Combine streams and continue only while a condition holds:
531//!
532//! ```rust
533//! use fluxion_stream::{CombineLatestExt, TakeWhileExt};
534//! use fluxion_test_utils::{Sequenced, test_channel, helpers::unwrap_stream, unwrap_value};
535//! use fluxion_core::Timestamped as TimestampedTrait;
536//!
537//! async fn example() {
538//! let (tx1, stream1) = test_channel::<Sequenced<i32>>();
539//! let (tx2, stream2) = test_channel::<Sequenced<i32>>();
540//! let (filter_tx, filter_stream) = test_channel::<Sequenced<bool>>();
541//!
542//! // Combine latest values, but stop when filter becomes false
543//! let mut composed = stream1
544//! .combine_latest(vec![stream2], |_| true)
545//! .take_while_with(filter_stream, |f| *f);
546//!
547//! filter_tx.unbounded_send(Sequenced::new(true)).unwrap();
548//! tx1.unbounded_send(Sequenced::new(1)).unwrap();
549//! tx2.unbounded_send(Sequenced::new(2)).unwrap();
550//!
551//! let combined = unwrap_value(Some(unwrap_stream(&mut composed, 500).await));
552//! assert_eq!(combined.values().len(), 2);
553//! }
554//! ```
555//!
556//! ### 4. Ordered Merge ? Take While With
557//!
558//! Merge streams in order and terminate based on external condition:
559//!
560//! ```rust
561//! use fluxion_stream::{OrderedStreamExt, TakeWhileExt};
562//! use fluxion_test_utils::{Sequenced, test_channel, helpers::unwrap_stream, unwrap_value};
563//! use fluxion_core::Timestamped as TimestampedTrait;
564//!
565//! async fn example() {
566//! let (tx1, stream1) = test_channel::<Sequenced<i32>>();
567//! let (tx2, stream2) = test_channel::<Sequenced<i32>>();
568//! let (filter_tx, filter_stream) = test_channel::<Sequenced<bool>>();
569//!
570//! // Merge all values in order, but stop when filter says so
571//! let mut composed = stream1
572//! .ordered_merge(vec![stream2])
573//! .take_while_with(filter_stream, |f| *f);
574//!
575//! filter_tx.unbounded_send(Sequenced::new(true)).unwrap();
576//! tx1.unbounded_send(Sequenced::new(1)).unwrap();
577//!
578//! let item = unwrap_value(Some(unwrap_stream(&mut composed, 500).await)).value.clone();
579//! assert_eq!(item, 1);
580//!
581//! tx2.unbounded_send(Sequenced::new(2)).unwrap();
582//! let item = unwrap_value(Some(unwrap_stream(&mut composed, 500).await)).value.clone();
583//! assert_eq!(item, 2);
584//! }
585//! ```
586//!
587//! ### 5. Take Latest When ? Combine With Previous
588//!
589//! Sample latest value on trigger, then pair with previous sampled value:
590//!
591//! ```rust
592//! use fluxion_stream::{TakeLatestWhenExt, CombineWithPreviousExt};
593//! use fluxion_test_utils::{Sequenced, test_channel, helpers::unwrap_stream, unwrap_value};
594//! use fluxion_core::Timestamped as TimestampedTrait;
595//!
596//! async fn example() {
597//! let (source_tx, source_stream) = test_channel::<Sequenced<i32>>();
598//! let (filter_tx, filter_stream) = test_channel::<Sequenced<i32>>();
599//!
600//! // Sample source when filter emits, then track consecutive samples
601//! let mut composed = source_stream
602//! .take_latest_when(filter_stream, |_| true)
603//! .combine_with_previous();
604//!
605//! source_tx.unbounded_send(Sequenced::new(42)).unwrap();
606//! filter_tx.unbounded_send(Sequenced::new(0)).unwrap();
607//!
608//! let item = unwrap_value(Some(unwrap_stream(&mut composed, 500).await));
609//! assert!(item.previous.is_none());
610//! assert_eq!(&item.current.value, &42);
611//!
612//! source_tx.unbounded_send(Sequenced::new(99)).unwrap();
613//! let item = unwrap_value(Some(unwrap_stream(&mut composed, 500).await));
614//! assert_eq!(&item.previous.unwrap().value, &42);
615//! assert_eq!(&item.current.value, &99);
616//! }
617//! ```
618//!
619//! These patterns demonstrate how Fluxion operators compose to create sophisticated
620//! data flows. See the composition tests in the source repository for more examples.
621//!
622//! [`map_ordered`]: crate::MapOrderedExt::map_ordered
623//! [`filter_ordered`]: crate::FilterOrderedExt::filter_ordered
624//!
625//! # Getting Started
626//!
627//! Add to your `Cargo.toml`:
628//!
629//! ```toml
630//! [dependencies]
631//! fluxion-stream = { path = "../fluxion-stream" }
632//! tokio = { version = "1.48", features = ["sync", "rt"] }
633//! futures = "0.3"
634//! ```
635//!
636//! See individual operator documentation for detailed examples.
637//!
638//! [`combine_latest`]: CombineLatestExt::combine_latest
639//! [`with_latest_from`]: WithLatestFromExt::with_latest_from
640//! [`ordered_merge`]: OrderedStreamExt::ordered_merge
641//! [`emit_when`]: EmitWhenExt::emit_when
642//! [`take_latest_when`]: TakeLatestWhenExt::take_latest_when
643//! [`take_while_with`]: TakeWhileExt::take_while_with
644//! [`combine_with_previous`]: CombineWithPreviousExt::combine_with_previous
645
646extern crate alloc;
647
648pub mod combine_latest;
649pub mod combine_with_previous;
650pub mod distinct_until_changed;
651pub mod distinct_until_changed_by;
652pub mod emit_when;
653pub mod filter_ordered;
654pub mod into_fluxion_stream;
655mod logging;
656pub mod map_ordered;
657pub mod merge_with;
658pub mod on_error;
659pub mod ordered_merge;
660#[cfg(any(
661 feature = "runtime-tokio",
662 feature = "runtime-smol",
663 feature = "runtime-async-std",
664 target_arch = "wasm32"
665))]
666pub mod partition;
667pub mod prelude;
668pub mod sample_ratio;
669pub mod scan_ordered;
670#[cfg(any(
671 feature = "runtime-tokio",
672 feature = "runtime-smol",
673 feature = "runtime-async-std",
674 target_arch = "wasm32"
675))]
676pub mod share;
677pub mod skip_items;
678pub mod start_with;
679pub mod take_items;
680pub mod take_latest_when;
681pub mod take_while_with;
682pub mod tap;
683pub mod types;
684pub mod window_by_count;
685pub mod with_latest_from;
686
687// Re-export commonly used types
688pub use combine_latest::CombineLatestExt;
689pub use combine_with_previous::CombineWithPreviousExt;
690pub use distinct_until_changed::DistinctUntilChangedExt;
691pub use distinct_until_changed_by::DistinctUntilChangedByExt;
692pub use emit_when::EmitWhenExt;
693pub use filter_ordered::FilterOrderedExt;
694pub use into_fluxion_stream::IntoFluxionStream;
695pub use map_ordered::MapOrderedExt;
696pub use merge_with::MergedStream;
697pub use on_error::OnErrorExt;
698pub use ordered_merge::OrderedStreamExt;
699#[cfg(any(
700 feature = "runtime-tokio",
701 feature = "runtime-smol",
702 feature = "runtime-async-std",
703 target_arch = "wasm32"
704))]
705pub use partition::{PartitionExt, PartitionedStream};
706pub use sample_ratio::SampleRatioExt;
707pub use scan_ordered::ScanOrderedExt;
708#[cfg(any(
709 feature = "runtime-tokio",
710 feature = "runtime-smol",
711 feature = "runtime-async-std",
712 target_arch = "wasm32"
713))]
714pub use share::{FluxionShared, ShareExt};
715pub use skip_items::SkipItemsExt;
716pub use start_with::StartWithExt;
717pub use take_items::TakeItemsExt;
718pub use take_latest_when::TakeLatestWhenExt;
719pub use take_while_with::TakeWhileExt;
720pub use tap::TapExt;
721pub use types::{CombinedState, WithPrevious};
722pub use window_by_count::WindowByCountExt;
723pub use with_latest_from::WithLatestFromExt;