fluxion-stream-time
Runtime-agnostic time-based operators for fluxion-stream.
This crate provides specialized time-based operators (delay, debounce, throttle, sample, timeout) that work with any async runtime through the Timer trait abstraction. The InstantTimestamped<T, TM> wrapper provides timestamping with the runtime's monotonic instant type.
Runtime Support
fluxion-stream-time supports multiple async runtimes through feature flags:
runtime-tokio(default) - Tokio runtime withTokioTimerruntime-smol- smol runtime withSmolTimerruntime-wasm- WebAssembly withWasmTimer(Node.js and browser)runtime-async-std- async-std runtime ⚠️ DEPRECATED (unmaintained)runtime-embassy- Embassy for embedded/no_std + alloc (requires manual timer implementation)
All operators are fully runtime-agnostic thanks to the Timer trait abstraction.
⚠️ Note: async-std has been discontinued (RUSTSEC-2025-0052, Aug 2024). The implementation is kept for compatibility with existing projects only. New projects should use tokio or smol runtimes instead.
Why This Crate Exists
Fluxion's core design is timestamp-agnostic: operators in fluxion-stream work with any type implementing the HasTimestamp trait, regardless of the underlying timestamp representation (u64 counters, DateTime, custom types, etc.). This flexibility is a core strength.
However, time-based operators like delay, debounce, throttle, and timeout inherently need to:
- Perform arithmetic on timestamps (e.g., "add 100ms to this timestamp")
- Sleep asynchronously for specific durations
- Support different async runtimes (Tokio, async-std, smol)
The Timer trait abstraction solves all three requirements, enabling operators that work with any runtime while maintaining zero-cost performance.
The Two Timestamp Approaches
1. Counter-Based (fluxion-test-utils)
- Type:
Sequenced<T>withu64timestamps - Use case: Testing, simulation, reproducible scenarios
- Operators: All core operators in
fluxion-stream - Advantage: Deterministic, no time dependencies, fast
2. Real-Time Based (fluxion-stream-time)
- Type:
InstantTimestamped<T, TM: Timer>with runtime'sInstanttype - Use case: Production systems, real-world scheduling, monotonic time
- Operators: Time-based operators (
delay,debounce,throttle,sample,timeout) - Advantage: Monotonic time, duration arithmetic, runtime flexibility
Why Not Merge These?
Keeping fluxion-stream timestamp-agnostic means:
- ✅ Minimal dependencies: Core stream operators have no runtime dependencies
- ✅ Flexible timestamp types: You can use custom timestamp representations
- ✅ Faster compile times: Time operators are optional and lightweight
- ✅ Testing independence: Counter-based timestamps for deterministic tests
- ✅ Runtime flexibility: Choose your async runtime via feature flags
Features
Core Types
Timertrait - Runtime-agnostic timer abstraction withsleep_future()andnow()TokioTimer- Zero-cost Tokio implementation (whentime-tokioenabled)InstantTimestamped<T, TM>- Generic wrapper with timer'sInstanttypeTokioTimestamped<T>- Type alias forInstantTimestamped<T, TokioTimer>
Operators
All time operators provide two variants:
1. Convenience Methods (.delay(), .debounce(), etc.)
- Available when compiling with
stdruntime features (tokio, smol, async-std, wasm) - Automatically use the default timer for your runtime
- Simplest API for most use cases
2. Explicit Timer Methods (_with_timer suffix)
- Required for
no_stdenvironments (Embassy) - Available in all runtimes when you need custom timer control
- Explicit timer parameter gives you full control
Operator List:
delay(duration)/delay_with_timer(duration, timer)- Delays each emission by a specified durationdebounce(duration)/debounce_with_timer(duration, timer)- Emits values only after a quiet periodthrottle(duration)/throttle_with_timer(duration, timer)- Emits a value and then ignores subsequent values for a durationsample(duration)/sample_with_timer(duration, timer)- Emits the most recent value within periodic time intervalstimeout(duration)/timeout_with_timer(duration, timer)- Errors if no emission within duration
Quick Reference Table
| Operator | Purpose | Behavior | Use Case |
|---|---|---|---|
delay |
Time-shift emissions | Delays each item by duration, errors pass through | Artificial delays, scheduling |
debounce |
Trailing debounce | Emits after quiet period, resets on new value | Search input, button debouncing |
throttle |
Leading throttle | Emits first, ignores subsequent for duration | Rate limiting, scroll/resize handlers |
sample |
Periodic sampling | Emits latest value at intervals | Downsampling high-frequency streams |
timeout |
Watchdog timer | Errors if no emission within duration | Network reliability, health checks |
Operator Details
delay
Delays each emission by a specified duration
use *;
// Convenience method (automatically uses default timer for your runtime)
let delayed = stream.delay;
// Or use explicit timer when you need custom control
let timer = TokioTimer;
let delayed = stream.delay_with_timer;
- Each item delayed independently
- Errors pass through immediately (no delay)
- Preserves temporal ordering
- Use when: Adding artificial delays, scheduling emissions
debounce
Emits only after a period of inactivity (trailing)
use *;
// Convenience method (automatically uses default timer for your runtime)
let debounced = stream.debounce;
// Or use explicit timer when you need custom control
let timer = TokioTimer;
let debounced = stream.debounce_with_timer;
- Emits latest value after quiet period
- Timer resets on each new value
- Pending value emitted when stream ends
- Errors pass through immediately
- Use when: Search-as-you-type, button debouncing, rate limiting user actions
throttle
Rate-limits emissions (leading)
use *;
// Convenience method (automatically uses default timer for your runtime)
let throttled = stream.throttle;
// Or use explicit timer when you need custom control
let timer = TokioTimer;
let throttled = stream.throttle_with_timer;
- Emits first value immediately
- Ignores subsequent values for duration
- Then accepts next value and repeats
- Errors pass through immediately
- Use when: Scroll/resize handlers, API rate limiting, UI event throttling
sample
Samples stream at periodic intervals
use *;
// Convenience method (automatically uses default timer for your runtime)
let sampled = stream.sample;
// Or use explicit timer when you need custom control
let timer = TokioTimer;
let sampled = stream.sample_with_timer;
- Emits most recent value within each interval
- No emission if no value in interval
- Errors pass through immediately
- Use when: Downsampling sensors, periodic snapshots, metrics aggregation
timeout
Errors if no emission within duration
use *;
// Convenience method (automatically uses default timer for your runtime)
let with_timeout = stream.timeout;
// Or use explicit timer when you need custom control
let timer = TokioTimer;
let with_timeout = stream.timeout_with_timer;
- Monitors time between emissions
- Emits
FluxionError::TimeoutError("Timeout")if exceeded - Timer resets on each emission (value or error)
- Stream terminates on timeout
- Use when: Watchdog timers, network reliability, health checks
Quick Start Example
use *;
use ;
use Timer;
use StreamItem;
use StreamExt;
use Duration;
use mpsc;
use UnboundedReceiverStream;
async
Seamless Operator Chaining
Key insight: Operators from both crates chain seamlessly because they all work with streams where S: Stream<Item = StreamItem<T>> and T: HasTimestamp.
The only requirement is that your stream items implement HasTimestamp with a compatible timestamp type:
- Core operators (
map_ordered,filter_ordered,combine_latest, etc.) work with any timestamp type - Time operators (
delay,debounce, etc.) work withInstantTimestamped<T, TM: Timer>types
Example: Mixing Operators
use ;
use *; // Gets convenience methods
use ;
use Duration;
// Start with time-based stream - convenience methods!
let stream = source_stream
// Time operator (no timer parameter needed!)
.debounce
// Core operators work seamlessly
.filter_ordered
.map_ordered
// Back to time operator
.delay
// More core operators
.distinct_until_changed;
This works because:
debouncereturnsimpl Stream<Item = StreamItem<TokioTimestamped<T>>>- Core operators like
filter_orderedaccept any stream where items implementHasTimestamp - The timestamp type (Timer's
Instant) is preserved through the chain delaycan then use it again because the type is stillInstantTimestamped<T, TM>
When to Use Each Crate
Use fluxion-stream (core operators) when:
- You need ordering, combining, filtering, transformation
- You want timestamp-agnostic code
- You're testing with counter-based timestamps (
Sequenced<T>)
Use fluxion-stream-time (time operators) when:
- You need real-world time-based behavior (
delay,debounce, etc.) - You're working with production event streams requiring monotonic time
- You need duration arithmetic without system clock dependencies
Use both together when:
- You need time-based rate limiting plus complex stream transformations
- You want to debounce user input, then combine it with other streams
- You're building real-world reactive systems with temporal constraints
Usage with Different Runtimes
Tokio (default)
use *; // Convenience methods
use ;
use Timer;
use Duration;
let timer = TokioTimer;
// Delay all emissions by 100ms (convenience method)
let delayed_stream = source_stream
.delay;
// Debounce emissions (convenience method)
let debounced_stream = source_stream
.debounce;
// For explicit timer control, use *_with_timer methods:
let delayed_custom = source_stream
.delay_with_timer;
// Create timestamped values
let timestamped = new;
async-std ⚠️ DEPRECATED
⚠️ WARNING: async-std has been discontinued and is unmaintained (RUSTSEC-2025-0052). This implementation is kept for compatibility only. New projects should use tokio or smol.
use *; // Convenience methods
use AsyncStdTimer;
use ;
use Timer;
use Duration;
let timer = AsyncStdTimer;
// Delay all emissions by 100ms (convenience method)
let delayed_stream = source_stream
.delay;
// Debounce emissions (convenience method)
let debounced_stream = source_stream
.debounce;
// For explicit timer control, use *_with_timer methods:
let delayed_custom = source_stream
.delay_with_timer;
// Create timestamped values
let timestamped = new;
async-std Notes:
- Uses
async-io::Timerfor async sleep operations - Supports both single-threaded and multi-threaded execution
- Multi-threaded tests use
async_core::task::spawnfor true concurrency - Tests run with
cargo test --features runtime-async-std --no-default-features
smol
use *; // Convenience methods
use SmolTimer;
use ;
use Timer;
use Duration;
let timer = SmolTimer;
// Delay all emissions by 100ms (convenience method)
let delayed_stream = source_stream
.delay;
// Debounce emissions (convenience method)
let debounced_stream = source_stream
.debounce;
// For explicit timer control, use *_with_timer methods:
let delayed_custom = source_stream
.delay_with_timer;
// Create timestamped values
let timestamped = new;
smol Notes:
- Uses
async-iofor timer implementation (shared with async-std) - Custom
SmolTimerbased onasync_io::Timerfor sleep operations - Standard
std::time::Instantfor monotonic time tracking - Tests run with
cargo test --features runtime-smol --no-default-features - 10 comprehensive tests validate all time-based operators in single & multi-threaded modes
- Supports both
smol::block_on()(single-threaded) andsmol::Executor(multi-threaded)
WASM (WebAssembly)
use *; // Convenience methods
use WasmTimer;
use ;
use Timer;
use Duration;
let timer = new;
// Delay all emissions by 100ms (convenience method)
let delayed_stream = source_stream
.delay;
// Debounce emissions (convenience method)
let debounced_stream = source_stream
.debounce;
// For explicit timer control, use *_with_timer methods:
let delayed_custom = source_stream
.delay_with_timer;
// Create timestamped values
let timestamped = new;
WASM Notes:
- Uses
gloo-timersfor async sleep (compatible with Node.js and browsers) - Custom
WasmInstantbased onjs-sys::Date.now()for monotonic time - Tests run with
wasm-pack test --nodeor--headless --chrome - 5 comprehensive tests validate all time-based operators in WASM environments
Embassy (embedded/no_std + alloc)
⚠️ Note: Embassy runtime support is in compilation-only mode. Tests verify that time operators compile correctly with Embassy, but full runtime testing requires hardware or emulator setup.
Embassy support enables time-based operators in no_std + alloc embedded environments:
extern crate alloc;
use *;
use EmbassyTimer;
use Timer;
use Duration;
let timer = EmbassyTimer;
// Time operators work the same way in embedded environments
// But you must use *_with_timer methods (no default timer in no_std)
let delayed_stream = source_stream
.delay_with_timer;
let debounced_stream = source_stream
.debounce_with_timer;
let throttled_stream = source_stream
.throttle_with_timer;
Embassy Notes:
- Requires
no_std+allocenvironment - Uses
embassy_time::{Timer, Duration, Instant}for monotonic time - Must use
*_with_timermethods (convenience methods unavailable inno_std) - Compilation tests in
tests/embassy/verify all 5 time operators - Full runtime testing requires hardware/emulator (see embassy tests README)
Why Compilation-Only Tests?
- Embassy requires actual hardware or emulator for async execution
- GitHub Actions CI doesn't provide embedded hardware
- Compilation tests ensure operators work correctly with Embassy types
- Full integration tests can be run on target hardware
Supported Operators:
- ✅
delay_with_timer- Delays emissions - ✅
debounce_with_timer- Trailing debounce - ✅
throttle_with_timer- Leading throttle - ✅
sample_with_timer- Periodic sampling - ✅
timeout_with_timer- Watchdog timer
async-std Support ⚠️ DEPRECATED
⚠️ CRITICAL: async-std is no longer maintained (discontinued Aug 2024, RUSTSEC-2025-0052).
This implementation is provided for compatibility with existing projects only. For new projects, use tokio (default) or consider smol as an alternative.
async-std support is fully implemented via AsyncStdTimer using async-io for time-based operations. The Timer trait abstraction enabled this with zero operator changes.
Implementation Details:
AsyncStdTimer- Zero-cost async-std implementation usingasync_io::TimerAsyncStdSleep- Future wrapper forasync_io::Timer::after(duration)std::time::Instant- Standard monotonic instant type- Arithmetic support - Standard Duration operations through std::time::Instant
- Runtime compatibility - Works with async-std's multi-threaded executor
Testing:
- Integration tests in
tests/async_std/single_threaded/andtests/async_std/multi_threaded/ - Tests use real delays with
async_core::task::sleepand external spawning - Run with
cargo test --features runtime-async-std --no-default-features - See
.ci/async_std_tests.ps1for CI testing configuration
Platform Support:
- ✅ Single-threaded execution (inline async)
- ✅ Multi-threaded execution (
async_core::task::spawn) - ✅ GitHub Actions CI integration
- ✅ 10 comprehensive tests (5 operators × 2 threading models)
Deprecation Timeline:
- Aug 2024: async-std discontinued by maintainers
- Dec 2024: Implementation added to Fluxion for compatibility
- Status: Maintained for existing users, not recommended for new projects
- Future: May be removed in v1.0 if ecosystem adoption drops to near-zero
Timer Trait Implementation
To add support for a custom runtime, implement the Timer trait:
use Timer;
use ;
;
InstantTimestamped vs Sequenced
| Feature | Sequenced<T> (test-utils) |
InstantTimestamped<T, TM> (stream-time) |
|---|---|---|
| Timestamp Type | u64 (counter) |
TM::Instant (runtime's instant) |
| Crate | fluxion-test-utils |
fluxion-stream-time |
| Use Case | Testing, simulation | Production, real time |
| Time Operators | ❌ No | ✅ Yes |
| Core Operators | ✅ Yes | ✅ Yes |
| Deterministic | ✅ Yes | ❌ No (monotonic) |
| Duration Math | ❌ No | ✅ Yes |
| Runtime Support | N/A | Tokio, smol, WASM, async-std (deprecated) |
⚠️ Note: async-std support is deprecated due to discontinuation (RUSTSEC-2025-0052).
Future Platform Support
no_std Feasibility
The Timer trait abstraction makes no_std support architecturally feasible. The trait design deliberately avoids std-specific dependencies, enabling potential embedded system support.
What's Already Compatible
- ✅ Timer trait - Uses only
core::future::Futureandcore::time::Duration - ✅ Generic operators - Work with any Timer implementation
- ✅ Pin projection - pin-project supports no_std
- ✅ Type-safe instant arithmetic - Generic over
Timer::Instant
Challenges for no_std
-
std::time::Instantunavailable- Embedded systems need platform-specific tick counters
- Solution: Custom
Instanttype based on hardware timers
-
allocrequirement- Operators use
Box::pin()for stream composition - Requires heap allocation (minimal: one box per operator)
- Some embedded environments may not have allocators
- Operators use
-
Async runtime dependency
- Tokio requires std
- Potential alternatives: Embassy (async for embedded), RTIC (real-time)
- Would require separate Timer implementations per runtime
Implementation Paths
Embassy Support (recommended for async embedded):
// Future: EmbassyTimer implementation
use ;
Bare Metal (custom hardware timers):
- Custom tick-based Instant type
- Manual future polling against hardware timer registers
- More complex but possible with current architecture
Trade-offs
The Timer abstraction enables no_std support without forcing it:
- std users get ergonomic APIs (Tokio, async-std)
- Embedded users can implement custom timers when needed
- No compromise in type safety or performance for either
Status: Architecturally sound, implementation work required. The generic design means no_std support won't break existing std code.
WASM Support
WASM support is fully implemented via WasmTimer using gloo-timers and js-sys. The Timer trait abstraction enabled this with zero operator changes.
Implementation Details:
WasmTimer- Zero-cost WASM implementation usinggloo_timers::future::sleepWasmInstant- Custom instant type based onjs-sys::Date.now()(returns milliseconds since epoch)- Arithmetic support - Implements
Add<Duration>,Sub<Duration>, andSub<Self>for duration calculations - Runtime compatibility - Works in both Node.js and browser environments
Testing:
- Integration tests in
tests/wasm/single_threaded/ - Tests use real delays with
gloo_timers::future::sleep(no time control like Tokio) - Run with
wasm-pack test --node --features runtime-wasm
Platform Support:
- ✅ Node.js (v14+)
- ✅ Modern browsers (Chrome, Firefox, Safari, Edge)
- ✅ GitHub Actions CI with wasm-pack
- ✅ Single-threaded execution model
Requirements
- Rust 1.70+
- Choose runtime via feature flags (
time-tokioenabled by default) - No external time dependencies beyond the chosen async runtime
License
Licensed under the Apache License, Version 2.0. See LICENSE for details.