astra-core 0.1.0

Core datastore engine, Raft state machine, and control-plane semantics for Astra
Documentation
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Duration;

use tokio::sync::Mutex as AsyncMutex;
use tokio::time::Instant;

use crate::metrics;

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum IoBudgetLane {
    Bytes,
    Sqe,
}

#[derive(Debug, Clone)]
pub struct IoTokenBucket {
    inner: Arc<IoTokenBucketInner>,
}

#[derive(Debug)]
struct IoTokenBucketInner {
    lane: IoBudgetLane,
    enabled: bool,
    refill_tokens_per_sec: AtomicU64,
    burst_tokens: AtomicU64,
    state: AsyncMutex<IoTokenBucketState>,
}

#[derive(Debug)]
struct IoTokenBucketState {
    tokens: f64,
    last_refill: Instant,
}

impl IoTokenBucket {
    pub fn new(enabled: bool, refill_tokens_per_sec: u64, burst_tokens: u64) -> Self {
        Self::new_for_lane(
            IoBudgetLane::Bytes,
            enabled,
            refill_tokens_per_sec,
            burst_tokens,
        )
    }

    pub fn new_for_lane(
        lane: IoBudgetLane,
        enabled: bool,
        refill_tokens_per_sec: u64,
        burst_tokens: u64,
    ) -> Self {
        let refill_tokens_per_sec = if enabled {
            refill_tokens_per_sec.max(1)
        } else {
            0
        };
        let burst_tokens = if enabled { burst_tokens.max(1) } else { 0 };
        let now = Instant::now();
        let state = IoTokenBucketState {
            tokens: burst_tokens as f64,
            last_refill: now,
        };
        Self {
            inner: Arc::new(IoTokenBucketInner {
                lane,
                enabled,
                refill_tokens_per_sec: AtomicU64::new(refill_tokens_per_sec),
                burst_tokens: AtomicU64::new(burst_tokens),
                state: AsyncMutex::new(state),
            }),
        }
    }

    pub fn enabled(&self) -> bool {
        self.inner.enabled
    }

    pub async fn configure(&self, refill_tokens_per_sec: u64, burst_tokens: u64) {
        if !self.enabled() {
            return;
        }

        let refill_tokens_per_sec = refill_tokens_per_sec.max(1);
        let burst_tokens = burst_tokens.max(1);
        self.inner
            .refill_tokens_per_sec
            .store(refill_tokens_per_sec, Ordering::Relaxed);
        self.inner
            .burst_tokens
            .store(burst_tokens, Ordering::Relaxed);

        let mut guard = self.inner.state.lock().await;
        let burst_f = burst_tokens as f64;
        guard.tokens = guard.tokens.min(burst_f).max(0.0);
        match self.inner.lane {
            IoBudgetLane::Bytes => metrics::set_bg_io_tokens_available(guard.tokens.floor() as u64),
            IoBudgetLane::Sqe => {
                metrics::set_bg_io_sqe_tokens_available(guard.tokens.floor() as u64)
            }
        }
    }

    pub async fn acquire(&self, tokens: u64) {
        if !self.enabled() {
            return;
        }

        let burst_tokens = self.inner.burst_tokens.load(Ordering::Relaxed).max(1) as f64;
        let mut needed = tokens.max(1) as f64;
        if needed > burst_tokens {
            needed = burst_tokens;
        }

        loop {
            let maybe_wait = {
                let mut guard = self.inner.state.lock().await;
                let refill_tokens_per_sec = self
                    .inner
                    .refill_tokens_per_sec
                    .load(Ordering::Relaxed)
                    .max(1) as f64;
                let burst_tokens = self.inner.burst_tokens.load(Ordering::Relaxed).max(1) as f64;
                let now = Instant::now();
                let elapsed = now
                    .saturating_duration_since(guard.last_refill)
                    .as_secs_f64();
                if elapsed > 0.0 {
                    let refill = elapsed * refill_tokens_per_sec;
                    guard.tokens = (guard.tokens + refill).min(burst_tokens);
                    guard.last_refill = now;
                }

                if guard.tokens >= needed {
                    guard.tokens -= needed;
                    match self.inner.lane {
                        IoBudgetLane::Bytes => {
                            metrics::set_bg_io_tokens_available(guard.tokens.floor() as u64);
                        }
                        IoBudgetLane::Sqe => {
                            metrics::set_bg_io_sqe_tokens_available(guard.tokens.floor() as u64);
                        }
                    }
                    None
                } else {
                    let deficit = needed - guard.tokens;
                    let secs = deficit / refill_tokens_per_sec;
                    let wait = Duration::from_secs_f64(secs.max(0.001));
                    match self.inner.lane {
                        IoBudgetLane::Bytes => {
                            metrics::set_bg_io_tokens_available(guard.tokens.floor() as u64);
                        }
                        IoBudgetLane::Sqe => {
                            metrics::set_bg_io_sqe_tokens_available(guard.tokens.floor() as u64);
                        }
                    }
                    Some(wait)
                }
            };

            match maybe_wait {
                Some(wait) => {
                    match self.inner.lane {
                        IoBudgetLane::Bytes => {
                            metrics::observe_bg_io_throttle_wait_ms(wait.as_millis() as u64);
                        }
                        IoBudgetLane::Sqe => {
                            metrics::observe_bg_io_sqe_wait_ms(wait.as_millis() as u64);
                        }
                    }
                    tokio::time::sleep(wait).await;
                }
                None => return,
            }
        }
    }
}