zmux 1.0.2

Rust implementation of the ZMux v1 stream multiplexing protocol
Documentation
use crate::settings::Settings;
use crate::varint::MAX_VARINT62;

#[inline]
pub(super) fn negotiated_frame_payload(local: &Settings, peer: &Settings) -> u64 {
    min_nonzero(local.max_frame_payload, peer.max_frame_payload)
        .unwrap_or(Settings::DEFAULT.max_frame_payload)
}

#[inline]
pub(super) fn session_window_target(local: &Settings, session_data_high_watermark: usize) -> u64 {
    local
        .initial_max_data
        .max(scaled_high_watermark(session_data_high_watermark, 4))
}

#[inline]
pub(super) fn stream_window_target(
    initial_receive_window: u64,
    per_stream_data_high_watermark: usize,
) -> u64 {
    initial_receive_window.max(scaled_high_watermark(per_stream_data_high_watermark, 2))
}

#[inline]
pub(super) fn session_emergency_threshold(payload: u64) -> u64 {
    payload.saturating_mul(2)
}

#[inline]
pub(super) fn stream_emergency_threshold(target: u64, payload: u64) -> u64 {
    let threshold = quarter_threshold(target);
    if payload == 0 {
        threshold
    } else {
        payload.min(threshold)
    }
}

#[inline]
pub(super) fn replenish_min_pending(target: u64, payload: u64) -> u64 {
    let min_pending = quarter_threshold(target);
    if payload == 0 {
        min_pending
    } else {
        min_pending.min(payload)
    }
}

#[inline]
pub(super) fn receive_window_exceeded(received: u64, advertised: u64, amount: u64) -> bool {
    amount > advertised.saturating_sub(received)
}

#[inline]
pub(super) fn should_flush_receive_credit(
    advertised: u64,
    received: u64,
    pending: u64,
    target: u64,
    emergency_threshold: u64,
    min_pending: u64,
    force: bool,
) -> bool {
    if pending == 0 {
        return false;
    }
    if force {
        return true;
    }
    let remaining = advertised.saturating_sub(received);
    if remaining <= emergency_threshold {
        return true;
    }
    if remaining <= quarter_threshold(target) && advertised >= target {
        return true;
    }
    pending >= min_pending
}

#[inline]
pub(super) fn next_credit_limit(
    advertised: u64,
    pending: u64,
    received: u64,
    target: u64,
    standing_growth_allowed: bool,
) -> u64 {
    let floor = advertised.saturating_add(pending);
    let desired = if standing_growth_allowed {
        floor.max(received.saturating_add(target))
    } else {
        floor
    };
    desired.min(MAX_VARINT62)
}

#[inline]
pub(super) fn session_standing_growth_allowed(
    memory_pressure_high: bool,
    buffered: u64,
    pending: u64,
    session_data_high_watermark: usize,
) -> bool {
    standing_growth_allowed(
        memory_pressure_high,
        buffered,
        pending,
        session_data_high_watermark,
    )
}

#[inline]
pub(super) fn stream_standing_growth_allowed(
    memory_pressure_high: bool,
    buffered: u64,
    pending: u64,
    per_stream_data_high_watermark: usize,
) -> bool {
    standing_growth_allowed(
        memory_pressure_high,
        buffered,
        pending,
        per_stream_data_high_watermark,
    )
}

#[inline]
fn quarter_threshold(value: u64) -> u64 {
    if value <= 4 {
        1
    } else {
        value / 4
    }
}

#[inline]
fn standing_growth_allowed(
    memory_pressure_high: bool,
    buffered: u64,
    pending: u64,
    high_watermark: usize,
) -> bool {
    if memory_pressure_high {
        return false;
    }
    let high_watermark = usize_to_u64_saturating(high_watermark);
    buffered < high_watermark && pending < high_watermark - buffered
}

#[inline]
fn min_nonzero(a: u64, b: u64) -> Option<u64> {
    match (a, b) {
        (0, 0) => None,
        (0, b) => Some(b),
        (a, 0) => Some(a),
        (a, b) => Some(a.min(b)),
    }
}

#[inline]
fn scaled_high_watermark(value: usize, scale: u64) -> u64 {
    usize_to_u64_saturating(value)
        .saturating_mul(scale)
        .min(MAX_VARINT62)
}

#[inline]
fn usize_to_u64_saturating(value: usize) -> u64 {
    value.min(u64::MAX as usize) as u64
}

#[cfg(test)]
mod tests {
    use super::{
        negotiated_frame_payload, next_credit_limit, receive_window_exceeded,
        session_emergency_threshold, session_standing_growth_allowed, session_window_target,
        should_flush_receive_credit, stream_emergency_threshold, stream_standing_growth_allowed,
        stream_window_target,
    };
    use crate::settings::Settings;
    use crate::varint::MAX_VARINT62;

    #[test]
    fn next_credit_limit_can_preserve_standing_window_when_memory_allows() {
        assert_eq!(next_credit_limit(64, 8, 60, 128, true), 188);
        assert_eq!(next_credit_limit(64, 8, 60, 128, false), 72);
    }

    #[test]
    fn quarter_threshold_matches_receive_flow_boundaries() {
        assert_eq!(super::quarter_threshold(0), 1);
        assert_eq!(super::quarter_threshold(1), 1);
        assert_eq!(super::quarter_threshold(4), 1);
        assert_eq!(super::quarter_threshold(8), 2);
    }

    #[test]
    fn negotiated_frame_payload_uses_min_nonzero_or_default() {
        let local = Settings {
            max_frame_payload: 4096,
            ..Settings::default()
        };
        let peer = Settings {
            max_frame_payload: 2048,
            ..Settings::default()
        };

        assert_eq!(negotiated_frame_payload(&local, &peer), 2048);
        assert_eq!(
            negotiated_frame_payload(&Settings::default(), &Settings::default()),
            Settings::default().max_frame_payload
        );
    }

    #[test]
    fn credit_limit_clamps_to_varint62() {
        assert_eq!(
            next_credit_limit(MAX_VARINT62 - 2, 10, MAX_VARINT62 - 1, 16, true),
            MAX_VARINT62
        );
    }

    #[test]
    fn receive_credit_flush_uses_supplied_emergency_and_min_pending() {
        assert!(should_flush_receive_credit(100, 98, 1, 64, 2, 16, false));
        assert!(!should_flush_receive_credit(100, 10, 15, 64, 2, 16, false));
        assert!(should_flush_receive_credit(100, 10, 16, 64, 2, 16, false));
    }

    #[test]
    fn receive_window_exceeded_uses_remaining_credit_and_saturates() {
        assert!(!receive_window_exceeded(8, 10, 2));
        assert!(receive_window_exceeded(8, 10, 3));
        assert!(receive_window_exceeded(u64::MAX - 1, u64::MAX, 2));
    }

    #[test]
    fn stream_emergency_is_bounded_by_quarter_target() {
        assert_eq!(session_emergency_threshold(64), 128);
        assert_eq!(session_emergency_threshold(u64::MAX), u64::MAX);
        assert_eq!(stream_emergency_threshold(1024, 256), 256);
        assert_eq!(stream_emergency_threshold(64, 256), 16);
        assert_eq!(stream_emergency_threshold(1024, 0), 256);
    }

    #[test]
    fn zero_target_replenishment_thresholds_match_repository_policy() {
        assert_eq!(stream_emergency_threshold(0, 16_384), 1);
        assert_eq!(super::replenish_min_pending(0, 16_384), 1);
    }

    #[test]
    fn standing_growth_respects_buffer_pressure() {
        assert!(session_standing_growth_allowed(false, 32, 16, 128));
        assert!(!session_standing_growth_allowed(true, 32, 16, 128));
        assert!(!stream_standing_growth_allowed(false, 128, 0, 128));
        assert!(!stream_standing_growth_allowed(false, 120, 8, 128));
    }

    #[test]
    fn window_targets_follow_repo_defaults() {
        let settings = Settings {
            initial_max_data: 4096,
            ..Settings::default()
        };
        assert_eq!(session_window_target(&settings, 2048), 8192);
        assert_eq!(stream_window_target(1024, 2048), 4096);
    }

    #[test]
    fn window_targets_clamp_high_watermarks_to_varint62() {
        assert_eq!(
            session_window_target(&Settings::default(), usize::MAX),
            MAX_VARINT62
        );
        assert_eq!(stream_window_target(1024, usize::MAX), MAX_VARINT62);
    }
}