fluxion_stream/
lib.rs

1// Copyright 2025 Umberto Gotti <umberto.gotti@umbertogotti.dev>
2// Licensed under the Apache License, Version 2.0
3// http://www.apache.org/licenses/LICENSE-2.0
4
5//! Stream operators with temporal ordering guarantees.
6//!
7//! This crate provides reactive stream combinators that maintain temporal ordering
8//! across asynchronous operations. All operators work with types implementing the
9//! [`Timestamped`](fluxion_core::Timestamped) trait, which provides timestamp-based ordering for correct temporal sequencing.
10//!
11//! # Architecture
12//!
13//! The crate is built around several key concepts:
14//!
15//! - **[`FluxionStream`]**: A wrapper around any `Stream` that provides access to all operators
16//! - **[`Timestamped`](fluxion_core::Timestamped) trait**: Types must have intrinsic timestamps for temporal ordering
17//! - **Extension traits**: Each operator is provided via an extension trait for composability
18//! - **Temporal correctness**: All operators respect the timestamp ordering of items across streams
19//!
20//! ## Operator Categories
21//!
22//! ### Combination Operators
23//!
24//! - **[`combine_latest`](CombineLatestExt::combine_latest)**: Emits when any stream emits, combining latest values from all streams
25//! - **[`with_latest_from`](WithLatestFromExt::with_latest_from)**: Samples secondary streams only when primary emits
26//! - **[`ordered_merge`](OrderedStreamExt::ordered_merge)**: Merges multiple streams preserving temporal order
27//!
28//! ### Filtering Operators
29//!
30//! - **[`emit_when`](EmitWhenExt::emit_when)**: Gates source emissions based on filter stream conditions
31//! - **[`take_latest_when`](TakeLatestWhenExt::take_latest_when)**: Samples source when filter condition is met
32//! - **[`take_while_with`](TakeWhileExt::take_while_with)**: Emits while condition holds, terminates when false
33//!
34//! ### Transformation Operators
35//!
36//! - **[`combine_with_previous`](CombineWithPreviousExt::combine_with_previous)**: Pairs each value with previous value
37//!
38//! # Temporal Ordering Explained
39//!
40//! All operators in this crate maintain **temporal ordering** - items are processed in the
41//! order of their intrinsic ordering value, not the order they arrive at the operator.
42//!
43//! ## How It Works
44//!
45//! When multiple streams are combined:
46//!
47//! 1. Each stream item must implement [`Timestamped`](fluxion_core::Timestamped), providing a comparable timestamp
48//! 2. Operators use [`ordered_merge`](OrderedStreamExt::ordered_merge) internally to sequence items
49//! 3. Items are buffered and emitted in order of their timestamp
50//! 4. Late-arriving items with earlier timestamps are placed correctly in the sequence
51//!
52//! ## Example: Out-of-Order Delivery
53//!
54//! ```
55//! use fluxion_stream::{FluxionStream, OrderedStreamExt};
56//! use fluxion_test_utils::Sequenced;
57//! use futures::StreamExt;
58//!
59//! # #[tokio::main]
60//! # async fn main() {
61//! let (tx1, rx1) = tokio::sync::mpsc::unbounded_channel::<Sequenced<i32>>();
62//! let (tx2, rx2) = tokio::sync::mpsc::unbounded_channel::<Sequenced<i32>>();
63//!
64//! let stream1 = FluxionStream::from_unbounded_receiver(rx1);
65//! let stream2 = FluxionStream::from_unbounded_receiver(rx2);
66//!
67//! let merged = stream1.ordered_merge(vec![stream2]);
68//! let mut merged = merged;
69//!
70//! // Send out of order - stream2 sends seq=1, stream1 sends seq=2
71//! tx2.send((100, 1).into()).unwrap();
72//! tx1.send((200, 2).into()).unwrap();
73//!
74//! // Items are emitted in temporal order (seq 1, then seq 2)
75//! let first = merged.next().await.unwrap().unwrap();
76//! assert_eq!(first.value, 100); // seq=1 arrives first despite being sent second
77//! # }
78//! ```
79//!
80//! # Operator Selection Guide
81//!
82//! Choose the right operator for your use case:
83//!
84//! ## When You Need Combined State
85//!
86//! | Operator | Use When | Triggers On | Example Use Case |
87//! |----------|----------|-------------|------------------|
88//! | [`combine_latest`] | You need latest from all streams | Any stream emits | Dashboard combining multiple data sources |
89//! | [`with_latest_from`] | You have primary + context streams | Primary emits only | User clicks enriched with latest config |
90//!
91//! ## When You Need All Items
92//!
93//! | Operator | Use When | Output | Example Use Case |
94//! |----------|----------|--------|------------------|
95//! | [`ordered_merge`] | Merge multiple sources in order | Every item from all streams | Event log from multiple services |
96//! | [`combine_with_previous`] | Compare consecutive items | Pairs of (previous, current) | Detecting value changes |
97//!
98//! ## When You Need Conditional Emission
99//!
100//! | Operator | Use When | Behavior | Example Use Case |
101//! |----------|----------|----------|------------------|
102//! | [`emit_when`] | Gate by condition | Emits source when filter is true | Send notifications only when enabled |
103//! | [`take_latest_when`] | Sample on condition | Emits latest source when filter triggers | Sample sensor on button press |
104//! | [`take_while_with`] | Stop on condition | Emits until condition false, then stops | Process until timeout |
105//!
106//! # Performance Characteristics
107//!
108//! ## Memory Usage
109//!
110//! - **[`combine_latest`]**: $O(n)$ - stores one latest value per stream
111//! - **[`ordered_merge`]**: $O(k)$ - buffers items until ordering confirmed ($k$ = buffer size)
112//! - **[`with_latest_from`]**: $O(n)$ - stores one value per secondary stream
113//! - **[`combine_with_previous`]**: $O(1)$ - stores only previous value
114//!
115//! ## Latency Considerations
116//!
117//! - **Ordered operators**: May buffer items waiting for earlier-ordered items
118//! - **Unordered operators**: Process items immediately as they arrive
119//! - **Combining operators**: Wait for all streams to emit at least once before first emission
120//!
121//! ## Throughput
122//!
123//! All operators use lock-free or minimally-locked designs:
124//!
125//! - Single mutex per operator (not per item)
126//! - No blocking operations in hot paths
127//! - Efficient polling with `futures::StreamExt`
128//!
129//! # Return Type Patterns
130//!
131//! Fluxion operators use three different return type patterns, each chosen for specific
132//! reasons related to type erasure, composability, and performance.
133//!
134//! ## Pattern 1: `impl Stream<Item = T>`
135//!
136//! **When used:** Lightweight operators with simple transformations
137//!
138//! **Examples:**
139//! - [`ordered_merge`](OrderedStreamExt::ordered_merge)
140//! - [`map_ordered`](FluxionStream::map_ordered)
141//! - [`filter_ordered`](FluxionStream::filter_ordered)
142//!
143//! **Benefits:**
144//! - Zero-cost abstraction (no boxing)
145//! - Compiler can fully optimize the stream pipeline
146//! - Type information preserved for further optimizations
147//!
148//! **Tradeoffs:**
149//! - Concrete type exposed in signatures (can be complex)
150//! - May increase compile times for deeply nested operators
151//!
152//! ## Pattern 2: `FluxionStream<impl Stream<Item = T>>`
153//!
154//! **When used:** Operators that should compose with other FluxionStream methods
155//!
156//! **Examples:**
157//! - [`combine_with_previous`](CombineWithPreviousExt::combine_with_previous)
158//! - [`with_latest_from`](WithLatestFromExt::with_latest_from)
159//! - [`combine_latest`](CombineLatestExt::combine_latest)
160//!
161//! **Benefits:**
162//! - Enables method chaining with `FluxionStream` convenience methods
163//! - Still zero-cost (no boxing)
164//! - Provides consistent API surface
165//!
166//! **Use cases:**
167//! - When users are likely to chain multiple operators
168//! - When the operator produces a complex transformed type
169//!
170//! ## Pattern 3: `Pin<Box<dyn Stream<Item = T>>>`
171//!
172//! **When used:** Operators with dynamic dispatch requirements or complex internal state
173//!
174//! **Examples:**
175//! - [`emit_when`](EmitWhenExt::emit_when)
176//! - [`take_latest_when`](TakeLatestWhenExt::take_latest_when)
177//! - [`take_while_with`](TakeWhileExt::take_while_with)
178//!
179//! **Benefits:**
180//! - Type erasure simplifies signatures
181//! - Reduces compile time for complex operator chains
182//! - Hides internal implementation details
183//!
184//! **Tradeoffs:**
185//! - Heap allocation (small overhead)
186//! - Dynamic dispatch prevents some optimizations
187//! - Runtime cost typically negligible compared to async operations
188//!
189//! **Why used for these operators:**
190//! These operators maintain internal state machines with multiple branches and complex
191//! lifetime requirements. Type erasure keeps the public API simple while allowing
192//! internal flexibility.
193//!
194//! ## Choosing the Right Pattern
195//!
196//! As a user, you typically don't need to worry about these patterns - all three compose
197//! seamlessly. For example, combining different operators in a single chain works naturally
198//! regardless of their internal implementation patterns. Each operator returns either
199//! `impl Stream` or `FluxionStream<impl Stream>`, and they compose transparently.
200//!
201//! The patterns are implementation details chosen to balance performance, ergonomics,
202//! and maintainability.
203//!
204//! # Common Patterns
205//!
206//! ## Pattern: Enriching Events with Context
207//!
208//! Uses [`with_latest_from`](WithLatestFromExt::with_latest_from) to combine a primary stream
209//! with context from a secondary stream.
210//!
211//! ```rust
212//! use fluxion_stream::{FluxionStream, WithLatestFromExt};
213//! use fluxion_test_utils::Sequenced;
214//! use fluxion_core::Timestamped as TimestampedTrait;
215//! use futures::StreamExt;
216//!
217//! # async fn example() {
218//! // User clicks enriched with latest configuration
219//! let (click_tx, click_rx) = tokio::sync::mpsc::unbounded_channel::<Sequenced<String>>();
220//! let (config_tx, config_rx) = tokio::sync::mpsc::unbounded_channel::<Sequenced<String>>();
221//!
222//! let clicks = FluxionStream::from_unbounded_receiver(click_rx);
223//! let configs = FluxionStream::from_unbounded_receiver(config_rx);
224//!
225//! let enriched = clicks.with_latest_from(
226//!     configs,
227//!     |state| state.clone()
228//! );
229//! let mut enriched = enriched;
230//!
231//! // Send config first, then click
232//! config_tx.send(("theme=dark".to_string(), 1).into()).unwrap();
233//! click_tx.send(("button1".to_string(), 2).into()).unwrap();
234//!
235//! let result = enriched.next().await.unwrap().unwrap();
236//! assert_eq!(result.values().len(), 2); // Has both click and config
237//! # }
238//! ```
239//!
240//! ## Pattern: Merging Multiple Event Sources
241//!
242//! Uses [`ordered_merge`](OrderedStreamExt::ordered_merge) to combine logs from multiple
243//! services in temporal order.
244//!
245//! ```rust
246//! use fluxion_stream::{FluxionStream, OrderedStreamExt};
247//! use fluxion_test_utils::Sequenced;
248//! use futures::StreamExt;
249//!
250//! # async fn example() {
251//! // Combine logs from multiple services in temporal order
252//! let (service1_tx, service1_rx) = tokio::sync::mpsc::unbounded_channel::<Sequenced<String>>();
253//! let (service2_tx, service2_rx) = tokio::sync::mpsc::unbounded_channel::<Sequenced<String>>();
254//!
255//! let service1 = FluxionStream::from_unbounded_receiver(service1_rx);
256//! let service2 = FluxionStream::from_unbounded_receiver(service2_rx);
257//!
258//! let unified_log = service1.ordered_merge(vec![service2]);
259//! let mut unified_log = unified_log;
260//!
261//! // Send logs with different timestamps
262//! service1_tx.send(("service1: started".to_string(), 1).into()).unwrap();
263//! service2_tx.send(("service2: ready".to_string(), 2).into()).unwrap();
264//!
265//! let first = unified_log.next().await.unwrap().unwrap();
266//! assert_eq!(first.value, "service1: started");
267//! # }
268//! ```
269//!
270//! ## Pattern: Change Detection
271//!
272//! Uses [`combine_with_previous`](CombineWithPreviousExt::combine_with_previous) to detect
273//! when values change by comparing with the previous value.
274//!
275//! ```rust
276//! use fluxion_stream::{FluxionStream, CombineWithPreviousExt};
277//! use fluxion_test_utils::Sequenced;
278//! use fluxion_core::Timestamped as TimestampedTrait;
279//! use futures::StreamExt;
280//!
281//! # async fn example() {
282//! let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<Sequenced<i32>>();
283//! let stream = FluxionStream::from_unbounded_receiver(rx);
284//!
285//! // Pair each value with its previous value
286//! let paired = stream.combine_with_previous();
287//! let mut paired = paired;
288//!
289//! // Send values
290//! tx.send((1, 1).into()).unwrap();
291//! tx.send((1, 2).into()).unwrap(); // Same value
292//! tx.send((2, 3).into()).unwrap(); // Changed!
293//!
294//! let result = paired.next().await.unwrap().unwrap();
295//! assert!(result.previous.is_none()); // First has no previous
296//!
297//! let result = paired.next().await.unwrap().unwrap();
298//! let changed = result.previous.as_ref().unwrap().value != result.current.value;
299//! assert!(!changed); // 1 == 1, no change
300//!
301//! let result = paired.next().await.unwrap().unwrap();
302//! let changed = result.previous.as_ref().unwrap().value != result.current.value;
303//! assert!(changed); // 1 != 2, changed!
304//! # }
305//! ```
306//!
307//! ## Pattern: Conditional Processing
308//!
309//! Uses [`emit_when`](EmitWhenExt::emit_when) to gate emissions based on a filter stream,
310//! only emitting when the condition is satisfied.
311//!
312//! ```rust
313//! use fluxion_stream::{FluxionStream, EmitWhenExt};
314//! use fluxion_test_utils::Sequenced;
315//! use fluxion_core::Timestamped as TimestampedTrait;
316//! use futures::StreamExt;
317//!
318//! # async fn example() {
319//! // Send notifications only when enabled
320//! let (event_tx, event_rx) = tokio::sync::mpsc::unbounded_channel::<Sequenced<i32>>();
321//! let (enabled_tx, enabled_rx) = tokio::sync::mpsc::unbounded_channel::<Sequenced<i32>>();
322//!
323//! let events = FluxionStream::from_unbounded_receiver(event_rx);
324//! let enabled = FluxionStream::from_unbounded_receiver(enabled_rx);
325//!
326//! let notifications = events.emit_when(
327//!     enabled,
328//!     |state| state.values().get(1).map(|v| *v > 0).unwrap_or(false)
329//! );
330//! let mut notifications = notifications;
331//!
332//! // Enable notifications
333//! enabled_tx.send((1, 1).into()).unwrap();
334//! // Send event
335//! event_tx.send((999, 2).into()).unwrap();
336//!
337//! let result = notifications.next().await.unwrap().unwrap();
338//! assert_eq!(result.value, 999);
339//! # }
340//! ```
341//!
342//! # Anti-Patterns
343//!
344//! ## ❌ Don't: Use `ordered_merge` When Order Doesn't Matter
345//!
346//! ```text
347//! // BAD: Ordering overhead when you don't need it
348//! let merged = stream1.ordered_merge(vec![stream2]);
349//! ```
350//!
351//! Use standard futures combinators instead:
352//!
353//! ```text
354//! // GOOD: Use futures::stream::select for unordered merging
355//! use futures::stream::select;
356//! let merged = select(stream1, stream2);
357//! ```
358//!
359//! ## ❌ Don't: Use `combine_latest` for All Items
360//!
361//! ```text
362//! // BAD: combine_latest only emits latest, loses intermediate values
363//! let combined = stream1.combine_latest(vec![stream2], |_| true);
364//! ```
365//!
366//! Use `ordered_merge` to get all items:
367//!
368//! ```text
369//! // GOOD: ordered_merge emits every item
370//! let merged = stream1.ordered_merge(vec![stream2]);
371//! ```
372//!
373//! ## ❌ Don't: Complex Filter Logic in Operators
374//!
375//! ```text
376//! // BAD: Complex business logic in filter predicate
377//! stream1.combine_latest(vec![stream2], |state| {
378//!     // 50 lines of complex filtering logic...
379//! });
380//! ```
381//!
382//! Extract to well-tested function:
383//!
384//! ```text
385//! // GOOD: Testable, reusable filter logic
386//! fn should_emit(state: &CombinedState<i32>) -> bool {
387//!     // Clear, testable logic
388//!     state.values().iter().all(|&v| v > 0)
389//! }
390//!
391//! stream1.combine_latest(vec![stream2], should_emit);
392//! ```
393//!
394//! # Operator Chaining
395//!
396//! Fluxion operators are designed to be composed together, creating sophisticated
397//! data flows from simple building blocks. The key to successful chaining is
398//! understanding how each operator transforms the stream.
399//!
400//! ## Basic Chaining Pattern
401//!
402//! ```rust
403//! use fluxion_stream::{FluxionStream, CombineWithPreviousExt, TakeLatestWhenExt};
404//! use fluxion_test_utils::{Sequenced, test_channel};
405//! use fluxion_core::Timestamped as TimestampedTrait;
406//! use futures::StreamExt;
407//!
408//! async fn example() {
409//! let (source_tx, source_stream) = test_channel::<Sequenced<i32>>();
410//! let (filter_tx, filter_stream) = test_channel::<Sequenced<i32>>();
411//!
412//! // Chain: sample when filter emits, then pair with previous value
413//! let sampled = source_stream.take_latest_when(filter_stream, |_| true);
414//! let mut composed = FluxionStream::new(sampled).combine_with_previous();
415//!
416//! source_tx.send(Sequenced::new(42)).unwrap();
417//! filter_tx.send(Sequenced::new(1)).unwrap();
418//!
419//! let item = composed.next().await.unwrap().unwrap();
420//! assert!(item.previous.is_none());
421//! assert_eq!(&item.current.value, &42);
422//! }
423//! ```
424//!
425//! ## Chaining with Transformation
426//!
427//! Use [`map_ordered`] and [`filter_ordered`] to transform streams while preserving
428//! temporal ordering:
429//!
430//! ```rust
431//! use fluxion_stream::{FluxionStream};
432//! use fluxion_test_utils::{Sequenced, test_channel};
433//! use fluxion_core::Timestamped as TimestampedTrait;
434//! use futures::StreamExt;
435//!
436//! async fn example() {
437//! let (tx, stream) = test_channel::<Sequenced<i32>>();
438//!
439//! // Chain: filter positives, map to string
440//! let mut composed = FluxionStream::new(stream)
441//!     .filter_ordered(|&n| n > 0)  // filter_ordered receives &T::Inner
442//!     .map_ordered(|seq| format!("Value: {}", seq.value));  // map_ordered receives T
443//!
444//! tx.send(Sequenced::new(-1)).unwrap();
445//! tx.send(Sequenced::new(5)).unwrap();
446//!
447//! let result = composed.next().await.unwrap().unwrap();
448//! assert_eq!(result, "Value: 5");
449//! }
450//! ```
451//!
452//! ## Multi-Stream Chaining
453//!
454//! Combine multiple streams and then process the result:
455//!
456//! ```rust
457//! use fluxion_stream::{FluxionStream, CombineLatestExt, CombineWithPreviousExt};
458//! use fluxion_test_utils::{Sequenced, test_channel};
459//! use fluxion_core::Timestamped as TimestampedTrait;
460//! use futures::StreamExt;
461//!
462//! async fn example() {
463//! let (tx1, stream1) = test_channel::<Sequenced<i32>>();
464//! let (tx2, stream2) = test_channel::<Sequenced<i32>>();
465//!
466//! // Chain: combine latest from both streams, then track changes
467//! let mut composed = stream1
468//!     .combine_latest(vec![stream2], |_| true)
469//!     .combine_with_previous();
470//!
471//! tx1.send(Sequenced::new(1)).unwrap();
472//! tx2.send(Sequenced::new(2)).unwrap();
473//!
474//! let item = composed.next().await.unwrap().unwrap();
475//! assert!(item.previous.is_none());
476//! assert_eq!(item.current.values().len(), 2);
477//! }
478//! ```
479//!
480//! ## Key Principles for Chaining
481//!
482//! 1. **Use `map_ordered` and `filter_ordered`**: These preserve the `FluxionStream` wrapper
483//!    and maintain temporal ordering guarantees
484//! 2. **Order matters**: `combine_with_previous().filter_ordered()` is different from
485//!    `filter_ordered().combine_with_previous()`
486//! 3. **Type awareness**: Each operator changes the item type - track what flows through
487//!    the chain
488//! 4. **Test incrementally**: Build complex chains step by step, testing each addition
489//!
490//! ## Advanced Composition Examples
491//!
492//! ### 1. Ordered Merge → Combine With Previous
493//!
494//! Merge multiple streams in temporal order, then track consecutive values:
495//!
496//! ```rust
497//! use fluxion_stream::{FluxionStream, OrderedStreamExt, CombineWithPreviousExt};
498//! use fluxion_test_utils::{Sequenced, test_channel};
499//! use fluxion_core::Timestamped as TimestampedTrait;
500//! use futures::StreamExt;
501//!
502//! async fn example() {
503//! let (tx1, stream1) = test_channel::<Sequenced<i32>>();
504//! let (tx2, stream2) = test_channel::<Sequenced<i32>>();
505//!
506//! // Merge streams in temporal order, then pair consecutive values
507//! let mut composed = stream1
508//!     .ordered_merge(vec![FluxionStream::new(stream2)])
509//!     .combine_with_previous();
510//!
511//! tx1.send(Sequenced::new(1)).unwrap();
512//! let item = composed.next().await.unwrap().unwrap();
513//! assert!(item.previous.is_none());
514//! assert_eq!(&item.current.value, &1);
515//!
516//! tx2.send(Sequenced::new(2)).unwrap();
517//! let item = composed.next().await.unwrap().unwrap();
518//! assert_eq!(&item.previous.unwrap().value, &1);
519//! assert_eq!(&item.current.value, &2);
520//! }
521//! ```
522//!
523//! ### 2. Combine Latest → Combine With Previous
524//!
525//! Combine latest values from multiple streams, then track state changes:
526//!
527//! ```rust
528//! use fluxion_stream::{FluxionStream, CombineLatestExt, CombineWithPreviousExt};
529//! use fluxion_test_utils::{Sequenced, test_channel};
530//! use fluxion_core::Timestamped as TimestampedTrait;
531//! use futures::StreamExt;
532//!
533//! async fn example() {
534//! let (tx1, stream1) = test_channel::<Sequenced<i32>>();
535//! let (tx2, stream2) = test_channel::<Sequenced<i32>>();
536//!
537//! // Combine latest, then track previous combined state
538//! let mut composed = stream1
539//!     .combine_latest(vec![stream2], |_| true)
540//!     .combine_with_previous();
541//!
542//! tx1.send(Sequenced::new(1)).unwrap();
543//! tx2.send(Sequenced::new(2)).unwrap();
544//!
545//! let item = composed.next().await.unwrap().unwrap();
546//! assert!(item.previous.is_none());
547//! assert_eq!(item.current.values().len(), 2);
548//!
549//! tx1.send(Sequenced::new(3)).unwrap();
550//! let item = composed.next().await.unwrap().unwrap();
551//! // Previous state had [1, 2], current has [3, 2]
552//! assert!(item.previous.is_some());
553//! }
554//! ```
555//!
556//! ### 3. Combine Latest → Take While With
557//!
558//! Combine streams and continue only while a condition holds:
559//!
560//! ```rust
561//! use fluxion_stream::{FluxionStream, CombineLatestExt, TakeWhileExt};
562//! use fluxion_test_utils::{Sequenced, test_channel};
563//! use fluxion_core::Timestamped as TimestampedTrait;
564//! use futures::StreamExt;
565//!
566//! async fn example() {
567//! let (tx1, stream1) = test_channel::<Sequenced<i32>>();
568//! let (tx2, stream2) = test_channel::<Sequenced<i32>>();
569//! let (filter_tx, filter_stream) = test_channel::<Sequenced<bool>>();
570//!
571//! // Combine latest values, but stop when filter becomes false
572//! let mut composed = stream1
573//!     .combine_latest(vec![stream2], |_| true)
574//!     .take_while_with(filter_stream, |f| *f);
575//!
576//! filter_tx.send(Sequenced::new(true)).unwrap();
577//! tx1.send(Sequenced::new(1)).unwrap();
578//! tx2.send(Sequenced::new(2)).unwrap();
579//!
580//! let combined = composed.next().await.unwrap().unwrap();
581//! assert_eq!(combined.values().len(), 2);
582//! }
583//! ```
584//!
585//! ### 4. Ordered Merge → Take While With
586//!
587//! Merge streams in order and terminate based on external condition:
588//!
589//! ```rust
590//! use fluxion_stream::{FluxionStream, OrderedStreamExt, TakeWhileExt};
591//! use fluxion_test_utils::{Sequenced, test_channel};
592//! use fluxion_core::Timestamped as TimestampedTrait;
593//! use futures::StreamExt;
594//!
595//! async fn example() {
596//! let (tx1, stream1) = test_channel::<Sequenced<i32>>();
597//! let (tx2, stream2) = test_channel::<Sequenced<i32>>();
598//! let (filter_tx, filter_stream) = test_channel::<Sequenced<bool>>();
599//!
600//! // Merge all values in order, but stop when filter says so
601//! let mut composed = stream1
602//!     .ordered_merge(vec![FluxionStream::new(stream2)])
603//!     .take_while_with(filter_stream, |f| *f);
604//!
605//! filter_tx.send(Sequenced::new(true)).unwrap();
606//! tx1.send(Sequenced::new(1)).unwrap();
607//!
608//! let item = composed.next().await.unwrap().unwrap().value.clone();
609//! assert_eq!(item, 1);
610//!
611//! tx2.send(Sequenced::new(2)).unwrap();
612//! let item = composed.next().await.unwrap().unwrap().value.clone();
613//! assert_eq!(item, 2);
614//! }
615//! ```
616//!
617//! ### 5. Take Latest When → Combine With Previous
618//!
619//! Sample latest value on trigger, then pair with previous sampled value:
620//!
621//! ```rust
622//! use fluxion_stream::{FluxionStream, TakeLatestWhenExt, CombineWithPreviousExt};
623//! use fluxion_test_utils::{Sequenced, test_channel};
624//! use fluxion_core::Timestamped as TimestampedTrait;
625//! use futures::StreamExt;
626//!
627//! async fn example() {
628//! let (source_tx, source_stream) = test_channel::<Sequenced<i32>>();
629//! let (filter_tx, filter_stream) = test_channel::<Sequenced<i32>>();
630//!
631//! // Sample source when filter emits, then track consecutive samples
632//! let sampled = source_stream.take_latest_when(filter_stream, |_| true);
633//! let mut composed = FluxionStream::new(sampled).combine_with_previous();
634//!
635//! source_tx.send(Sequenced::new(42)).unwrap();
636//! filter_tx.send(Sequenced::new(0)).unwrap();
637//!
638//! let item = composed.next().await.unwrap().unwrap();
639//! assert!(item.previous.is_none());
640//! assert_eq!(&item.current.value, &42);
641//!
642//! source_tx.send(Sequenced::new(99)).unwrap();
643//! let item = composed.next().await.unwrap().unwrap();
644//! assert_eq!(&item.previous.unwrap().value, &42);
645//! assert_eq!(&item.current.value, &99);
646//! }
647//! ```
648//!
649//! These patterns demonstrate how Fluxion operators compose to create sophisticated
650//! data flows. See the composition tests in the source repository for more examples.
651//!
652//! [`map_ordered`]: fluxion_stream::FluxionStream::map_ordered
653//! [`filter_ordered`]: fluxion_stream::FluxionStream::filter_ordered
654//!
655//! # Getting Started
656//!
657//! Add to your `Cargo.toml`:
658//!
659//! ```toml
660//! [dependencies]
661//! fluxion-stream = { path = "../fluxion-stream" }
662//! tokio = { version = "1.48", features = ["sync", "rt"] }
663//! futures = "0.3"
664//! ```
665//!
666//! See individual operator documentation for detailed examples.
667//!
668//! [`combine_latest`]: CombineLatestExt::combine_latest
669//! [`with_latest_from`]: WithLatestFromExt::with_latest_from
670//! [`ordered_merge`]: OrderedStreamExt::ordered_merge
671//! [`emit_when`]: EmitWhenExt::emit_when
672//! [`take_latest_when`]: TakeLatestWhenExt::take_latest_when
673//! [`take_while_with`]: TakeWhileExt::take_while_with
674//! [`combine_with_previous`]: CombineWithPreviousExt::combine_with_previous
675
676#![allow(clippy::multiple_crate_versions, clippy::doc_markdown)]
677#[macro_use]
678mod logging;
679pub mod combine_latest;
680pub mod combine_with_previous;
681pub mod emit_when;
682pub mod fluxion_stream;
683pub mod merge_with;
684pub mod ordered_merge;
685pub mod take_latest_when;
686pub mod take_while_with;
687pub mod types;
688pub mod with_latest_from;
689
690// Re-export commonly used types
691pub use combine_latest::CombineLatestExt;
692pub use combine_with_previous::CombineWithPreviousExt;
693pub use emit_when::EmitWhenExt;
694pub use fluxion_stream::FluxionStream;
695pub use merge_with::MergedStream;
696pub use ordered_merge::OrderedStreamExt;
697pub use take_latest_when::TakeLatestWhenExt;
698pub use take_while_with::TakeWhileExt;
699pub use types::{CombinedState, WithPrevious};
700pub use with_latest_from::WithLatestFromExt;