fluxion_stream/map_ordered/
mod.rs

1// Copyright 2025 Umberto Gotti <umberto.gotti@umbertogotti.dev>
2// Licensed under the Apache License, Version 2.0
3// http://www.apache.org/licenses/LICENSE-2.0
4
5//! Map ordered operator - transforms stream items while preserving temporal ordering.
6//!
7//! The [`map_ordered`](MapOrderedExt::map_ordered) operator applies a transformation function
8//! to each item in the stream while maintaining the original temporal ordering.
9//!
10//! ## Characteristics
11//!
12//! - **Chainable**: Returns a stream that can be further chained
13//! - **Timestamp-preserving**: Original timestamps are maintained
14//! - **Lazy**: Transformations are applied only when items are polled
15//! - **Type-flexible**: Can transform between different types
16//! - **Error-passthrough**: Errors pass through unchanged
17//!
18//! ## Example
19//!
20//! ```rust
21//! use fluxion_stream::{IntoFluxionStream, MapOrderedExt};
22//! use fluxion_test_utils::Sequenced;
23//! use futures::StreamExt;
24//!
25//! # async fn example() {
26//! let (tx, rx) = async_channel::unbounded();
27//!
28//! // Transform integers to their doubles
29//! let mut stream = rx.into_fluxion_stream()
30//!     .map_ordered(|n: Sequenced<i32>| Sequenced::new(n.into_inner() * 2));
31//!
32//! tx.try_send(Sequenced::new(1)).unwrap();
33//! tx.try_send(Sequenced::new(2)).unwrap();
34//! tx.try_send(Sequenced::new(3)).unwrap();
35//! drop(tx);
36//!
37//! assert_eq!(stream.next().await.unwrap().unwrap().into_inner(), 2);
38//! assert_eq!(stream.next().await.unwrap().unwrap().into_inner(), 4);
39//! assert_eq!(stream.next().await.unwrap().unwrap().into_inner(), 6);
40//! # }
41//! ```
42//!
43//! ## Use Cases
44//!
45//! - **Data transformation**: Convert between types or modify values
46//! - **Normalization**: Scale or adjust values consistently
47//! - **Extraction**: Pull specific fields from complex types
48//! - **Enrichment**: Add computed properties to data
49
50#[macro_use]
51mod implementation;
52
53#[cfg(any(
54    all(feature = "runtime-tokio", not(target_arch = "wasm32")),
55    feature = "runtime-smol",
56    feature = "runtime-async-std"
57))]
58mod multi_threaded;
59
60#[cfg(any(
61    all(feature = "runtime-tokio", not(target_arch = "wasm32")),
62    feature = "runtime-smol",
63    feature = "runtime-async-std"
64))]
65pub use multi_threaded::MapOrderedExt;
66
67#[cfg(not(any(
68    all(feature = "runtime-tokio", not(target_arch = "wasm32")),
69    feature = "runtime-smol",
70    feature = "runtime-async-std"
71)))]
72mod single_threaded;
73
74#[cfg(not(any(
75    all(feature = "runtime-tokio", not(target_arch = "wasm32")),
76    feature = "runtime-smol",
77    feature = "runtime-async-std"
78)))]
79pub use single_threaded::MapOrderedExt;