fluxion-stream-time
Time-based operators for fluxion-stream using real-world time.
This crate provides specialized time-based operators (delay, debounce, throttle, sample, timeout) and the ChronoTimestamped<T> wrapper for working with real-world timestamps using chrono::DateTime<Utc>.
Why This Crate Exists
Fluxion's core design is timestamp-agnostic: operators in fluxion-stream work with any type implementing the HasTimestamp trait, regardless of the underlying timestamp representation (u64 counters, DateTime, custom types, etc.). This flexibility is a core strength.
However, time-based operators like delay, debounce, throttle, and timeout inherently need to perform arithmetic on timestamps (e.g., "add 100ms to this timestamp"). This requires a specific timestamp type that supports duration arithmetic.
The Two Timestamp Approaches
1. Counter-Based (fluxion-test-utils)
- Type:
Sequenced<T>withu64timestamps - Use case: Testing, simulation, reproducible scenarios
- Operators: All core operators in
fluxion-stream - Advantage: Deterministic, no time dependencies, fast
2. Real-Time Based (fluxion-stream-time)
- Type:
ChronoTimestamped<T>withDateTime<Utc>timestamps - Use case: Production systems, real-world scheduling, wall-clock time
- Operators: Time-based operators (
delay,debounce,throttle,sample,timeout) - Advantage: Real calendar time, duration arithmetic, timezone-aware
Why Not Merge These?
Keeping fluxion-stream timestamp-agnostic means:
- ✅ Zero dependencies on
chronofor users who don't need time-based operators - ✅ Flexible timestamp types: You can use custom timestamp representations
- ✅ Faster compile times:
chronois only pulled in when you need time operators - ✅ Testing independence: Counter-based timestamps for deterministic tests
Features
ChronoTimestamped<T>- Wrapper type withDateTime<Utc>timestampsdelay(duration)- Delays each emission by a specified durationdebounce(duration)- Emits values only after a quiet periodthrottle(duration)- Emits a value and then ignores subsequent values for a durationsample(duration)- Emits the most recent value within periodic time intervalstimeout(duration)- Errors if no emission within durationChronoStreamOps- Extension trait forFluxionStreamwith chrono-timestamped items
Quick Reference Table
| Operator | Purpose | Behavior | Use Case |
|---|---|---|---|
delay |
Time-shift emissions | Delays each item by duration, errors pass through | Artificial delays, scheduling |
debounce |
Trailing debounce | Emits after quiet period, resets on new value | Search input, button debouncing |
throttle |
Leading throttle | Emits first, ignores subsequent for duration | Rate limiting, scroll/resize handlers |
sample |
Periodic sampling | Emits latest value at intervals | Downsampling high-frequency streams |
timeout |
Watchdog timer | Errors if no emission within duration | Network reliability, health checks |
Operator Details
delay
Delays each emission by a specified duration
let delayed = stream.delay;
- Each item delayed independently
- Errors pass through immediately (no delay)
- Preserves temporal ordering
- Use when: Adding artificial delays, scheduling emissions
debounce
Emits only after a period of inactivity (trailing)
let debounced = stream.debounce;
- Emits latest value after quiet period
- Timer resets on each new value
- Pending value emitted when stream ends
- Errors pass through immediately
- Use when: Search-as-you-type, button debouncing, rate limiting user actions
throttle
Rate-limits emissions (leading)
let throttled = stream.throttle;
- Emits first value immediately
- Ignores subsequent values for duration
- Then accepts next value and repeats
- Errors pass through immediately
- Use when: Scroll/resize handlers, API rate limiting, UI event throttling
sample
Samples stream at periodic intervals
let sampled = stream.sample;
- Emits most recent value within each interval
- No emission if no value in interval
- Errors pass through immediately
- Use when: Downsampling sensors, periodic snapshots, metrics aggregation
timeout
Errors if no emission within duration
let with_timeout = stream.timeout;
- Monitors time between emissions
- Emits
FluxionError::StreamProcessingError("Timeout")if exceeded - Timer resets on each emission (value or error)
- Stream terminates on timeout
- Use when: Watchdog timers, network reliability, health checks
Seamless Operator Chaining
Key insight: Operators from both crates chain seamlessly because they all work with streams where S: Stream<Item = StreamItem<T>> and T: HasTimestamp.
The only requirement is that your stream items implement HasTimestamp with a compatible timestamp type:
- Core operators (
map_ordered,filter_ordered,combine_latest, etc.) work with any timestamp type - Time operators (
delay,debounce, etc.) only work withDateTime<Utc>timestamps
Example: Mixing Operators
use ;
use ;
use Duration;
// Start with time-based stream
let stream = source_stream
// Time operator (requires DateTime<Utc>)
.debounce
// Core operators work seamlessly
.filter_ordered
.map_ordered
// Back to time operator
.delay
// More core operators
.distinct_until_changed;
This works because:
debouncereturnsimpl Stream<Item = StreamItem<ChronoTimestamped<T>>>- Core operators like
filter_orderedaccept any stream where items implementHasTimestamp - The timestamp type (
DateTime<Utc>) is preserved through the chain delaycan then use it again because the type is stillChronoTimestamped<T>
When to Use Each Crate
Use fluxion-stream (core operators) when:
- You need ordering, combining, filtering, transformation
- You want timestamp-agnostic code
- You're testing with counter-based timestamps (
Sequenced<T>)
Use fluxion-stream-time (time operators) when:
- You need real-world time-based behavior (
delay,debounce, etc.) - You're working with production event streams with wall-clock timestamps
- You need duration arithmetic or timezone handling
Use both together when:
- You need time-based rate limiting plus complex stream transformations
- You want to debounce user input, then combine it with other streams
- You're building real-world reactive systems with temporal constraints
Usage
use ;
use Duration;
// Delay all emissions by 100ms
let delayed_stream = source_stream
.delay;
// Debounce emissions
let debounced_stream = source_stream
.debounce;
// Sample emissions
let sampled_stream = source_stream
.sample;
ChronoTimestamped vs Sequenced
| Feature | Sequenced<T> (test-utils) |
ChronoTimestamped<T> (stream-time) |
|---|---|---|
| Timestamp Type | u64 (counter) |
DateTime<Utc> |
| Crate | fluxion-test-utils |
fluxion-stream-time |
| Use Case | Testing, simulation | Production, real time |
| Time Operators | ❌ No | ✅ Yes |
| Core Operators | ✅ Yes | ✅ Yes |
| Deterministic | ✅ Yes | ❌ No (wall-clock) |
| Duration Math | ❌ No | ✅ Yes |
| Dependencies | None | chrono |
Requirements
This crate uses std::time::Duration for time operations, chrono for timestamps, and tokio for async delays.
License
Licensed under the Apache License, Version 2.0. See LICENSE for details.