1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
// Copyright 2025 Umberto Gotti <umberto.gotti@umbertogotti.dev>
// Licensed under the Apache License, Version 2.0
// http://www.apache.org/licenses/LICENSE-2.0
//! Probabilistic downsampling operator for streams.
//!
//! This module provides the [`sample_ratio`](SampleRatioExt::sample_ratio) operator that randomly
//! samples a fraction of stream items based on a probability ratio.
//!
//! # Overview
//!
//! The `sample_ratio` operator emits each item with a given probability, allowing you to
//! downsample high-frequency streams for logging, monitoring, or load reduction.
//!
//! # Deterministic Testing
//!
//! The operator requires an explicit seed parameter, enabling deterministic testing:
//!
//! ```
//! use fluxion_stream::prelude::*;
//! use fluxion_test_utils::{Sequenced, test_channel};
//! use futures::StreamExt;
//!
//! # #[tokio::main]
//! # async fn main() {
//! let (tx, stream) = test_channel::<Sequenced<i32>>();
//!
//! // Use a fixed seed for deterministic testing
//! let sampled = stream.sample_ratio(0.5, 42);
//!
//! // Send items
//! tx.unbounded_send(Sequenced::new(1)).unwrap();
//! tx.unbounded_send(Sequenced::new(2)).unwrap();
//! tx.unbounded_send(Sequenced::new(3)).unwrap();
//! tx.unbounded_send(Sequenced::new(4)).unwrap();
//! drop(tx);
//!
//! // With seed=42, the sequence is deterministic
//! let results: Vec<_> = sampled
//! .filter_map(|item| async move {
//! match item {
//! fluxion_core::StreamItem::Value(v) => Some(v.value),
//! _ => None,
//! }
//! })
//! .collect()
//! .await;
//!
//! // Results will be consistent across runs with same seed
//! assert!(!results.is_empty());
//! # }
//! ```
//!
//! # Production Usage
//!
//! In production, use the global fastrand function for random sampling:
//!
//! ```no_run
//! use fluxion_stream::prelude::*;
//! use fluxion_test_utils::Sequenced;
//! use futures::StreamExt;
//!
//! # #[tokio::main]
//! # async fn main() {
//! let (_tx, rx) = async_channel::unbounded::<Sequenced<i32>>();
//! let stream = rx.into_fluxion_stream();
//!
//! // Random sampling in production - use fastrand's global RNG
//! let seed = std::time::SystemTime::now()
//! .duration_since(std::time::UNIX_EPOCH)
//! .unwrap()
//! .as_secs();
//! let _sampled = stream.sample_ratio(0.1, seed); // Sample ~10%
//! # }
//! ```
//!
//! # Error Handling
//!
//! Errors always pass through unconditionally—they are never subject to sampling.
//! This ensures error observability is not affected by downsampling.
pub use SampleRatioExt;
pub use SampleRatioExt;