nexus-stats-core 2.2.0

Core types and utilities shared across nexus-stats subcrates
Documentation
use super::windowed::{
    WindowedMinF32, WindowedMinF64, WindowedMinI32, WindowedMinI64, WindowedMinI128,
};
use crate::Condition;

// =========================================================================
// Raw (u64 timestamp) variants — no_std compatible
// =========================================================================

macro_rules! impl_codel_raw_int {
    ($name:ident, $builder:ident, $ty:ty, $windowed_min_raw:ty) => {
        /// CoDel — Controlled Delay queue monitor (Nichols & Jacobson, 2012).
        ///
        /// Composes a windowed minimum of sojourn times with a threshold.
        /// Reports `Degraded` when even the minimum sojourn time in the
        /// observation window exceeds the target. `no_std` compatible.
        #[derive(Debug, Clone)]
        pub struct $name {
            windowed_min: $windowed_min_raw,
            target: $ty,
            min_samples: u64,
        }

        /// Builder for [`
        #[doc = stringify!($name)]
        /// `].
        #[derive(Debug, Clone)]
        pub struct $builder {
            target: Option<$ty>,
            window: Option<u64>,
            min_samples: u64,
        }

        impl $name {
            /// Creates a builder.
            #[inline]
            #[must_use]
            pub fn builder() -> $builder {
                $builder {
                    target: Option::None,
                    window: Option::None,
                    min_samples: 1,
                }
            }

            /// Feeds a sojourn time at the given timestamp.
            ///
            /// Returns `Some(Condition)` once primed, `None` before.
            #[inline]
            #[must_use]
            pub fn update(&mut self, timestamp: u64, sojourn: $ty) -> Option<Condition> {
                let min = self.windowed_min.update(timestamp, sojourn);

                if self.windowed_min.count() < self.min_samples {
                    return Option::None;
                }

                if min > self.target {
                    Option::Some(Condition::Degraded)
                } else {
                    Option::Some(Condition::Normal)
                }
            }

            /// Convenience for `i64` timestamps (e.g., wire protocol epoch nanos).
            ///
            /// Timestamps must be non-negative. Negative values wrap to large
            /// `u64` values and will produce incorrect window expiration.
            #[inline]
            #[must_use]
            pub fn update_i64(&mut self, timestamp: i64, sojourn: $ty) -> Option<Condition> {
                debug_assert!(timestamp >= 0, "negative timestamp: {timestamp}");
                self.update(timestamp as u64, sojourn)
            }

            /// Current windowed minimum sojourn time, or `None` if empty.
            #[inline]
            #[must_use]
            pub fn min_sojourn(&self) -> Option<$ty> {
                self.windowed_min.min()
            }

            /// Whether the queue is currently elevated.
            #[inline]
            #[must_use]
            pub fn is_elevated(&self) -> bool {
                if let Some(min) = self.windowed_min.min() {
                    min > self.target
                } else {
                    false
                }
            }

            /// Number of samples processed.
            #[inline]
            #[must_use]
            pub fn count(&self) -> u64 {
                self.windowed_min.count()
            }

            /// Whether the monitor has reached `min_samples`.
            #[inline]
            #[must_use]
            pub fn is_primed(&self) -> bool {
                self.windowed_min.count() >= self.min_samples
            }

            /// Resets to empty state. Parameters unchanged.
            #[inline]
            pub fn reset(&mut self) {
                self.windowed_min.reset();
            }
        }

        impl $builder {
            /// Target sojourn time. Elevated when minimum exceeds this.
            #[inline]
            #[must_use]
            pub fn target(mut self, target: $ty) -> Self {
                self.target = Option::Some(target);
                self
            }

            /// Observation window in raw units (same as timestamps).
            #[inline]
            #[must_use]
            pub fn window(mut self, window: u64) -> Self {
                self.window = Option::Some(window);
                self
            }

            /// Minimum samples before monitoring activates. Default: 1.
            #[inline]
            #[must_use]
            pub fn min_samples(mut self, min: u64) -> Self {
                self.min_samples = min;
                self
            }

            /// Builds the CoDel monitor.
            ///
            /// # Errors
            ///
            /// - Target must have been set.
            /// - Window must have been set and be positive.
            #[inline]
            pub fn build(self) -> Result<$name, crate::ConfigError> {
                let target = self.target.ok_or(crate::ConfigError::Missing("target"))?;
                let window = self.window.ok_or(crate::ConfigError::Missing("window"))?;
                if window == 0 {
                    return Err(crate::ConfigError::Invalid("CoDel window must be positive"));
                }

                Ok($name {
                    windowed_min: <$windowed_min_raw>::new(window)?,
                    target,
                    min_samples: self.min_samples,
                })
            }
        }
    };
}

macro_rules! impl_codel_raw_float {
    ($name:ident, $builder:ident, $ty:ty, $windowed_min_raw:ty) => {
        /// CoDel — Controlled Delay queue monitor (Nichols & Jacobson, 2012).
        ///
        /// Composes a windowed minimum of sojourn times with a threshold.
        /// Reports `Degraded` when even the minimum sojourn time in the
        /// observation window exceeds the target. `no_std` compatible.
        #[derive(Debug, Clone)]
        pub struct $name {
            windowed_min: $windowed_min_raw,
            target: $ty,
            min_samples: u64,
        }

        /// Builder for [`
        #[doc = stringify!($name)]
        /// `].
        #[derive(Debug, Clone)]
        pub struct $builder {
            target: Option<$ty>,
            window: Option<u64>,
            min_samples: u64,
        }

        impl $name {
            /// Creates a builder.
            #[inline]
            #[must_use]
            pub fn builder() -> $builder {
                $builder {
                    target: Option::None,
                    window: Option::None,
                    min_samples: 1,
                }
            }

            /// Feeds a sojourn time at the given timestamp.
            ///
            /// Returns `Some(Condition)` once primed, `None` before.
            ///
            /// # Errors
            ///
            /// Returns `DataError::NotANumber` if the sojourn is NaN, or
            /// `DataError::Infinite` if the sojourn is infinite.
            #[inline]
            pub fn update(
                &mut self,
                timestamp: u64,
                sojourn: $ty,
            ) -> Result<Option<Condition>, crate::DataError> {
                check_finite!(sojourn);
                let min = self.windowed_min.update(timestamp, sojourn)?;

                if self.windowed_min.count() < self.min_samples {
                    return Ok(Option::None);
                }

                if min > self.target {
                    Ok(Option::Some(Condition::Degraded))
                } else {
                    Ok(Option::Some(Condition::Normal))
                }
            }

            /// Convenience for `i64` timestamps (e.g., wire protocol epoch nanos).
            ///
            /// Timestamps must be non-negative. Negative values wrap to large
            /// `u64` values and will produce incorrect window expiration.
            #[inline]
            pub fn update_i64(
                &mut self,
                timestamp: i64,
                sojourn: $ty,
            ) -> Result<Option<Condition>, crate::DataError> {
                debug_assert!(timestamp >= 0, "negative timestamp: {timestamp}");
                self.update(timestamp as u64, sojourn)
            }

            /// Current windowed minimum sojourn time, or `None` if empty.
            #[inline]
            #[must_use]
            pub fn min_sojourn(&self) -> Option<$ty> {
                self.windowed_min.min()
            }

            /// Whether the queue is currently elevated.
            #[inline]
            #[must_use]
            pub fn is_elevated(&self) -> bool {
                if let Some(min) = self.windowed_min.min() {
                    min > self.target
                } else {
                    false
                }
            }

            /// Number of samples processed.
            #[inline]
            #[must_use]
            pub fn count(&self) -> u64 {
                self.windowed_min.count()
            }

            /// Whether the monitor has reached `min_samples`.
            #[inline]
            #[must_use]
            pub fn is_primed(&self) -> bool {
                self.windowed_min.count() >= self.min_samples
            }

            /// Resets to empty state. Parameters unchanged.
            #[inline]
            pub fn reset(&mut self) {
                self.windowed_min.reset();
            }
        }

        impl $builder {
            /// Target sojourn time. Elevated when minimum exceeds this.
            #[inline]
            #[must_use]
            pub fn target(mut self, target: $ty) -> Self {
                self.target = Option::Some(target);
                self
            }

            /// Observation window in raw units (same as timestamps).
            #[inline]
            #[must_use]
            pub fn window(mut self, window: u64) -> Self {
                self.window = Option::Some(window);
                self
            }

            /// Minimum samples before monitoring activates. Default: 1.
            #[inline]
            #[must_use]
            pub fn min_samples(mut self, min: u64) -> Self {
                self.min_samples = min;
                self
            }

            /// Builds the CoDel monitor.
            ///
            /// # Errors
            ///
            /// - Target must have been set.
            /// - Window must have been set and be positive.
            #[inline]
            pub fn build(self) -> Result<$name, crate::ConfigError> {
                let target = self.target.ok_or(crate::ConfigError::Missing("target"))?;
                let window = self.window.ok_or(crate::ConfigError::Missing("window"))?;
                if window == 0 {
                    return Err(crate::ConfigError::Invalid("CoDel window must be positive"));
                }

                Ok($name {
                    windowed_min: <$windowed_min_raw>::new(window)?,
                    target,
                    min_samples: self.min_samples,
                })
            }
        }
    };
}

impl_codel_raw_int!(CoDelI64, CoDelI64Builder, i64, WindowedMinI64);
impl_codel_raw_int!(CoDelI32, CoDelI32Builder, i32, WindowedMinI32);
impl_codel_raw_int!(CoDelI128, CoDelI128Builder, i128, WindowedMinI128);
impl_codel_raw_float!(CoDelF64, CoDelF64Builder, f64, WindowedMinF64);
impl_codel_raw_float!(CoDelF32, CoDelF32Builder, f32, WindowedMinF32);

#[cfg(test)]
mod raw_tests {
    use super::*;
    use crate::Condition;

    #[test]
    fn raw_codel_normal() {
        let mut cd = CoDelI64::builder()
            .target(100)
            .window(1000)
            .build()
            .unwrap();
        assert_eq!(cd.update(0, 50), Some(Condition::Normal));
    }

    #[test]
    fn raw_codel_degraded() {
        let mut cd = CoDelI64::builder().target(50).window(1000).build().unwrap();
        for t in 0..10 {
            let _ = cd.update(t * 100, 200);
        }
        assert!(cd.is_elevated());
    }

    #[test]
    fn raw_codel_f64() {
        let mut cd = CoDelF64::builder()
            .target(0.5)
            .window(1000)
            .build()
            .unwrap();
        assert_eq!(cd.update(0, 0.1).unwrap(), Some(Condition::Normal));
    }

    #[test]
    fn rejects_nan_and_inf() {
        let mut cd = CoDelF64::builder()
            .target(0.5)
            .window(1000)
            .build()
            .unwrap();
        assert!(matches!(
            cd.update(0, f64::NAN),
            Err(crate::DataError::NotANumber)
        ));
        assert!(matches!(
            cd.update(0, f64::INFINITY),
            Err(crate::DataError::Infinite)
        ));
    }
}