Skip to main content

flowstats/
traits.rs

1//! Core traits for streaming algorithms
2//!
3//! All sketches implement the base [`Sketch`] trait, with specialized traits
4//! for different algorithm families (cardinality, frequency, quantiles, etc.)
5
6use core::fmt::Debug;
7
8#[cfg(feature = "std")]
9use std::{string::String, vec::Vec};
10
11#[cfg(not(feature = "std"))]
12extern crate alloc;
13#[cfg(not(feature = "std"))]
14use alloc::{string::String, vec::Vec};
15
16/// Error during sketch merge operation
17#[derive(Debug, Clone, PartialEq, Eq)]
18pub enum MergeError {
19    /// Sketches have incompatible configurations
20    IncompatibleConfig {
21        expected: String,
22        found: String,
23    },
24    /// Sketches have incompatible versions
25    VersionMismatch {
26        expected: u32,
27        found: u32,
28    },
29}
30
31impl core::fmt::Display for MergeError {
32    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
33        match self {
34            MergeError::IncompatibleConfig { expected, found } => {
35                write!(f, "incompatible config: expected {}, found {}", expected, found)
36            }
37            MergeError::VersionMismatch { expected, found } => {
38                write!(f, "version mismatch: expected {}, found {}", expected, found)
39            }
40        }
41    }
42}
43
44#[cfg(feature = "std")]
45impl std::error::Error for MergeError {}
46
47/// Error during sketch decoding
48#[derive(Debug, Clone, PartialEq, Eq)]
49pub enum DecodeError {
50    /// Input buffer too short
51    BufferTooShort { expected: usize, found: usize },
52    /// Invalid magic number or header
53    InvalidHeader,
54    /// Unsupported version
55    UnsupportedVersion(u32),
56    /// Corrupted data
57    Corrupted(String),
58}
59
60impl core::fmt::Display for DecodeError {
61    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
62        match self {
63            DecodeError::BufferTooShort { expected, found } => {
64                write!(f, "buffer too short: expected {}, found {}", expected, found)
65            }
66            DecodeError::InvalidHeader => write!(f, "invalid header"),
67            DecodeError::UnsupportedVersion(v) => write!(f, "unsupported version: {}", v),
68            DecodeError::Corrupted(msg) => write!(f, "corrupted data: {}", msg),
69        }
70    }
71}
72
73#[cfg(feature = "std")]
74impl std::error::Error for DecodeError {}
75
76/// Error bounds for a sketch estimate
77#[derive(Debug, Clone, Copy, PartialEq)]
78pub struct ErrorBounds {
79    /// Lower bound of the estimate
80    pub lower: f64,
81    /// Point estimate
82    pub estimate: f64,
83    /// Upper bound of the estimate
84    pub upper: f64,
85    /// Confidence level (e.g., 0.95 for 95%)
86    pub confidence: f64,
87}
88
89impl ErrorBounds {
90    /// Create new error bounds
91    pub fn new(lower: f64, estimate: f64, upper: f64, confidence: f64) -> Self {
92        Self {
93            lower,
94            estimate,
95            upper,
96            confidence,
97        }
98    }
99
100    /// Check if a value falls within bounds
101    pub fn contains(&self, value: f64) -> bool {
102        value >= self.lower && value <= self.upper
103    }
104
105    /// Width of the confidence interval
106    pub fn width(&self) -> f64 {
107        self.upper - self.lower
108    }
109
110    /// Relative width (width / estimate)
111    pub fn relative_width(&self) -> f64 {
112        if self.estimate == 0.0 {
113            0.0
114        } else {
115            self.width() / self.estimate
116        }
117    }
118}
119
120/// Core trait for all streaming sketches
121pub trait Sketch: Clone + Debug {
122    /// The type of item this sketch processes
123    type Item: ?Sized;
124
125    /// Add an item to the sketch
126    fn update(&mut self, item: &Self::Item);
127
128    /// Merge another sketch into this one
129    ///
130    /// Returns an error if sketches are incompatible
131    fn merge(&mut self, other: &Self) -> Result<(), MergeError>;
132
133    /// Reset sketch to empty state
134    fn clear(&mut self);
135
136    /// Memory usage in bytes
137    fn size_bytes(&self) -> usize;
138
139    /// Number of items processed
140    fn count(&self) -> u64;
141
142    /// Check if sketch is empty
143    fn is_empty(&self) -> bool {
144        self.count() == 0
145    }
146}
147
148/// Cardinality (distinct count) estimation sketches
149pub trait CardinalitySketch: Sketch {
150    /// Estimate number of distinct items seen
151    fn estimate(&self) -> f64;
152
153    /// Get error bounds at given confidence level (0.0 to 1.0)
154    fn error_bounds(&self, confidence: f64) -> ErrorBounds;
155
156    /// Relative standard error (RSE) of the estimate
157    ///
158    /// RSE = standard_error / true_value ≈ 1.04 / sqrt(m) for HLL
159    fn relative_error(&self) -> f64;
160
161    /// Estimate with default 95% confidence bounds
162    fn estimate_with_bounds(&self) -> ErrorBounds {
163        self.error_bounds(0.95)
164    }
165}
166
167/// Frequency estimation sketches
168pub trait FrequencySketch: Sketch {
169    /// Estimate frequency of an item
170    fn estimate_frequency(&self, item: &Self::Item) -> u64;
171
172    /// Check if frequency exceeds threshold
173    fn exceeds_threshold(&self, item: &Self::Item, threshold: u64) -> bool {
174        self.estimate_frequency(item) >= threshold
175    }
176}
177
178/// Heavy hitters / Top-K capability
179pub trait HeavyHitters: FrequencySketch
180where
181    Self::Item: Sized + Clone,
182{
183    /// Get items with estimated frequency above threshold
184    ///
185    /// Threshold is a fraction of total count (0.0 to 1.0)
186    fn heavy_hitters(&self, threshold: f64) -> Vec<(Self::Item, u64)>;
187
188    /// Get top-k most frequent items
189    fn top_k(&self, k: usize) -> Vec<(Self::Item, u64)>;
190}
191
192/// Quantile estimation sketches
193pub trait QuantileSketch: Sketch {
194    /// The value type being tracked
195    type Value: PartialOrd + Clone;
196
197    /// Add a value to the sketch
198    fn add(&mut self, value: Self::Value);
199
200    /// Get quantile value at given rank (0.0 to 1.0)
201    ///
202    /// rank=0.5 returns the median
203    fn quantile(&self, rank: f64) -> Option<Self::Value>;
204
205    /// Get rank of a value (0.0 to 1.0)
206    fn rank(&self, value: &Self::Value) -> f64;
207
208    /// Get CDF value at given point
209    fn cdf(&self, value: &Self::Value) -> f64 {
210        self.rank(value)
211    }
212
213    /// Get minimum value seen
214    fn min(&self) -> Option<Self::Value>;
215
216    /// Get maximum value seen
217    fn max(&self) -> Option<Self::Value>;
218
219    /// Get median (50th percentile)
220    fn median(&self) -> Option<Self::Value> {
221        self.quantile(0.5)
222    }
223
224    /// Get multiple quantiles at once
225    fn quantiles(&self, ranks: &[f64]) -> Vec<Option<Self::Value>> {
226        ranks.iter().map(|&r| self.quantile(r)).collect()
227    }
228}
229
230/// Membership testing sketches (Bloom filters, etc.)
231pub trait MembershipSketch: Sketch {
232    /// Test if item might be in set
233    ///
234    /// - `true` means item might be present (possible false positive)
235    /// - `false` means item is definitely not present
236    fn contains(&self, item: &Self::Item) -> bool;
237
238    /// Theoretical false positive rate given current state
239    fn false_positive_rate(&self) -> f64;
240
241    /// Number of items added
242    fn len(&self) -> usize;
243
244    /// Check if filter is empty
245    fn is_filter_empty(&self) -> bool {
246        self.len() == 0
247    }
248}
249
250/// Set operation sketches (Theta sketch, etc.)
251pub trait SetSketch: Sketch {
252    /// Create union of two sketches
253    fn union(&self, other: &Self) -> Self;
254
255    /// Create intersection of two sketches
256    fn intersection(&self, other: &Self) -> Self;
257
258    /// Difference (items in self but not in other)
259    fn difference(&self, other: &Self) -> Self;
260
261    /// Estimate Jaccard similarity between two sets
262    fn jaccard_similarity(&self, other: &Self) -> f64;
263}
264
265/// Sampling sketches
266pub trait SamplingSketch: Sketch
267where
268    Self::Item: Sized + Clone,
269{
270    /// Get current sample
271    fn sample(&self) -> &[Self::Item];
272
273    /// Sample size limit
274    fn capacity(&self) -> usize;
275
276    /// Current sample size
277    fn sample_size(&self) -> usize {
278        self.sample().len()
279    }
280}
281
282#[cfg(test)]
283mod tests {
284    use super::*;
285
286    #[test]
287    fn test_error_bounds() {
288        let bounds = ErrorBounds::new(90.0, 100.0, 110.0, 0.95);
289        
290        assert!(bounds.contains(100.0));
291        assert!(bounds.contains(90.0));
292        assert!(bounds.contains(110.0));
293        assert!(!bounds.contains(89.0));
294        assert!(!bounds.contains(111.0));
295        
296        assert_eq!(bounds.width(), 20.0);
297        assert!((bounds.relative_width() - 0.2).abs() < 0.001);
298    }
299}