arcium-primitives 0.6.0

Arcium primitives
Documentation
// ================================
// ===== Buffer Configuration =====
// ================================

use std::{sync::Arc, time::Duration};

use blanket::blanket;
use parking_lot::{Mutex, MutexGuard};

use crate::correlated_randomness::stream::CorrelatedStreamError;

/// Configuration for the `PreprocessingStream`.
#[derive(Debug, Clone)]
pub struct BufferConfig {
    /// Maximum number of elements the buffer can hold.
    capacity: usize,
    /// Maximum number of elements that can be requested at once.
    max_request_size: usize,
    /// Threshold to trigger generation of elements.
    refill_threshold: usize,
}

impl BufferConfig {
    /// Creates a new `BufferConfig` with the specified capacity, and sets:
    /// - `max_request_size = capacity` (allowing requests up to the full buffer size).
    /// - `refill_threshold = capacity` (always refill when any elements are consumed).
    pub fn eager(capacity: usize) -> Self {
        assert!(capacity > 0, "capacity must be greater than 0");
        Self {
            capacity,
            max_request_size: capacity,
            refill_threshold: capacity,
        }
    }

    /// Creates a new `BufferConfig` with the specified capacity and max_request_size, and sets:
    /// - `refill_threshold = capacity` (always refill when any elements are consumed).
    pub fn eager_with(capacity: usize, max_request_size: usize) -> Self {
        assert!(capacity > 0, "capacity must be greater than 0");
        Self {
            capacity,
            max_request_size: max_request_size.min(capacity),
            refill_threshold: capacity,
        }
    }

    /// Creates a new `BufferConfig` with the specified capacity and refill threshold, and sets:
    /// - `max_request_size = capacity` (allowing requests up to the full buffer size).
    ///
    /// If `refill_threshold` > `capacity`, it is adjusted down to match `capacity`.
    pub fn lazy(capacity: usize, refill_threshold: usize) -> Self {
        assert!(capacity > 0, "capacity must be greater than 0");
        Self {
            capacity,
            max_request_size: capacity,
            refill_threshold: refill_threshold.min(capacity),
        }
    }

    /// Creates a new `BufferConfig` with the specified capacity, max_request_size, and
    /// refill_threshold.
    ///
    /// If `max_request_size` > `capacity` or `refill_threshold` > `capacity`, they are adjusted
    /// down to match `capacity`.
    pub fn lazy_with(capacity: usize, refill_threshold: usize, max_request_size: usize) -> Self {
        assert!(capacity > 0, "capacity must be greater than 0");
        Self {
            capacity,
            max_request_size: max_request_size.min(capacity),
            refill_threshold: refill_threshold.min(capacity),
        }
    }

    /// The maximum number of elements the buffer can hold (validated to be >= max_request_size).
    #[inline]
    pub fn capacity(&self) -> usize {
        self.capacity
    }

    /// The maximum number of elements that can be requested at once (validated to be <= capacity).
    #[inline]
    pub fn max_request_size(&self) -> usize {
        self.max_request_size
    }

    /// The threshold to trigger generation of elements (validated to be <= capacity).
    #[inline]
    pub fn refill_threshold(&self) -> usize {
        self.refill_threshold
    }

    /// Sets the capacity. If the new capacity is smaller than the current
    /// max_request_size or refill_threshold, also updates them to match the new capacity.
    pub fn set_capacity(&mut self, capacity: usize) {
        assert!(capacity > 0, "capacity must be greater than 0");
        self.capacity = capacity;
        if self.max_request_size > capacity {
            self.max_request_size = capacity;
        }
        if self.refill_threshold > capacity {
            self.refill_threshold = capacity;
        }
    }

    /// Sets the max_request_size. If the new max_request_size is greater
    /// than the current capacity, also updates capacity to match the new max_request_size.
    pub fn set_max_request_size(&mut self, max_request_size: usize) {
        self.max_request_size = max_request_size;
        if self.max_request_size > self.capacity {
            self.capacity = max_request_size;
        }
    }

    /// Sets the refill_threshold. If the new refill_threshold is greater than the current capacity,
    /// also updates capacity to match the new refill_threshold.
    pub fn set_refill_threshold(&mut self, refill_threshold: usize) {
        self.refill_threshold = refill_threshold;
        if self.refill_threshold > self.capacity {
            self.capacity = refill_threshold;
        }
    }
}

// ================================
// ===== Buffer trait =============
// ================================

/// Default timeout when acquiring the config lock. Critical sections are tiny CPU work
/// (a few `usize` reads/writes) and never held across `.await`, so any contention longer
/// than this indicates a logic bug or stuck thread.
pub const LOCK_TIMEOUT: Duration = Duration::from_millis(500);

/// Tries to lock the given `parking_lot::Mutex` for at most `LOCK_TIMEOUT`. Returns
/// `LockTimeout` on failure. `parking_lot::Mutex` does not poison, so this never panics.
#[inline]
pub fn try_lock_config(
    m: &Mutex<BufferConfig>,
) -> Result<MutexGuard<'_, BufferConfig>, CorrelatedStreamError> {
    m.try_lock_for(LOCK_TIMEOUT)
        .ok_or(CorrelatedStreamError::LockTimeout {
            timeout_ms: LOCK_TIMEOUT.as_millis() as u64,
        })
}

/// A buffer-backed entity exposing a shared `BufferConfig`.
///
/// Default-impl methods provide thread-safe getters and setters through an
/// `Arc<parking_lot::Mutex<_>>`, with a bounded timeout (`LOCK_TIMEOUT`) on acquisition.
/// Critical sections are tiny and never held across `.await`, so a sync mutex is sufficient.
#[blanket(derive(Arc, Ref, Mut))]
pub trait Buffer {
    /// The shared configuration handle.
    fn config(&self) -> &Arc<Mutex<BufferConfig>>;

    #[inline]
    fn capacity(&self) -> Result<usize, CorrelatedStreamError> {
        Ok(try_lock_config(self.config())?.capacity())
    }
    #[inline]
    fn max_request_size(&self) -> Result<usize, CorrelatedStreamError> {
        Ok(try_lock_config(self.config())?.max_request_size())
    }
    #[inline]
    fn refill_threshold(&self) -> Result<usize, CorrelatedStreamError> {
        Ok(try_lock_config(self.config())?.refill_threshold())
    }

    #[inline]
    fn set_capacity(&self, capacity: usize) -> Result<(), CorrelatedStreamError> {
        try_lock_config(self.config())?.set_capacity(capacity);
        Ok(())
    }
    #[inline]
    fn set_max_request_size(&self, n: usize) -> Result<(), CorrelatedStreamError> {
        try_lock_config(self.config())?.set_max_request_size(n);
        Ok(())
    }
    #[inline]
    fn set_refill_threshold(&self, n: usize) -> Result<(), CorrelatedStreamError> {
        try_lock_config(self.config())?.set_refill_threshold(n);
        Ok(())
    }
}