astra-core 0.1.0

Core datastore engine, Raft state machine, and control-plane semantics for Astra
Documentation
use std::collections::VecDeque;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;

use parking_lot::RwLock;
use tokio::sync::broadcast;
use tracing::debug;

use crate::config::WatchBacklogMode;

#[derive(Debug, Clone)]
pub enum WatchEventKind {
    Put,
    Delete,
}

#[derive(Debug, Clone)]
pub struct WatchEvent {
    pub kind: WatchEventKind,
    pub key: Vec<u8>,
    pub value: Arc<[u8]>,
    pub prev_value: Arc<[u8]>,
    pub create_revision: i64,
    pub mod_revision: i64,
    pub version: i64,
    pub lease: i64,
}

#[derive(Debug, Clone)]
pub struct WatchFilter {
    pub key: Vec<u8>,
    pub range_end: Vec<u8>,
    pub start_revision: i64,
}

impl WatchFilter {
    pub fn matches(&self, candidate: &[u8]) -> bool {
        if self.range_end.is_empty() {
            return candidate == self.key.as_slice();
        }

        if self.key.is_empty() && self.range_end == [0] {
            return true;
        }

        candidate >= self.key.as_slice() && candidate < self.range_end.as_slice()
    }
}

pub struct WatchSubscription {
    pub backlog: Vec<Arc<WatchEvent>>,
    pub receiver: broadcast::Receiver<Arc<WatchEvent>>,
}

#[derive(Debug)]
pub struct WatchRing {
    ring: RwLock<VecDeque<Arc<WatchEvent>>>,
    ring_bytes: AtomicUsize,
    capacity: usize,
    tx: broadcast::Sender<Arc<WatchEvent>>,
    backlog_mode: WatchBacklogMode,
    dropped_no_subscriber: AtomicUsize,
}

impl WatchRing {
    pub fn new(
        ring_capacity: usize,
        broadcast_capacity: usize,
        backlog_mode: WatchBacklogMode,
    ) -> Self {
        let (tx, _rx) = broadcast::channel(broadcast_capacity.max(32));
        Self {
            ring: RwLock::new(VecDeque::with_capacity(ring_capacity)),
            ring_bytes: AtomicUsize::new(0),
            capacity: ring_capacity,
            tx,
            backlog_mode,
            dropped_no_subscriber: AtomicUsize::new(0),
        }
    }

    pub fn publish(&self, event: WatchEvent) {
        let event = Arc::new(event);
        let subscribers = self.tx.receiver_count();
        let keep_backlog = match self.backlog_mode {
            WatchBacklogMode::Strict => true,
            WatchBacklogMode::Relaxed => subscribers > 0,
        };

        if keep_backlog {
            let mut ring = self.ring.write();
            let mut ring_bytes = self.ring_bytes.load(Ordering::Relaxed);
            while ring.len() >= self.capacity && !ring.is_empty() {
                if let Some(ev) = ring.pop_front() {
                    ring_bytes = ring_bytes.saturating_sub(event_bytes(&ev));
                }
            }

            ring.push_back(event.clone());
            ring_bytes = ring_bytes.saturating_add(event_bytes(&event));
            self.ring_bytes.store(ring_bytes, Ordering::Relaxed);
        } else if subscribers == 0 {
            let dropped = self.dropped_no_subscriber.fetch_add(1, Ordering::Relaxed) + 1;
            if dropped % 4096 == 0 {
                debug!(
                    dropped,
                    ring_len = self.ring.read().len(),
                    ring_bytes = self.ring_bytes.load(Ordering::Relaxed),
                    "watch event dropped because no subscribers and backlog_mode=relaxed"
                );
            }
        }

        if subscribers > 0 {
            let _ = self.tx.send(event);
        }
    }

    pub fn subscribe(&self, filter: &WatchFilter) -> WatchSubscription {
        let backlog = {
            let ring = self.ring.read();
            ring.iter()
                .filter(|ev| {
                    ev.mod_revision >= filter.start_revision && filter.matches(ev.key.as_slice())
                })
                .cloned()
                .collect::<Vec<_>>()
        };

        WatchSubscription {
            backlog,
            receiver: self.tx.subscribe(),
        }
    }
}

fn event_bytes(event: &WatchEvent) -> usize {
    event
        .key
        .len()
        .saturating_add(event.value.len())
        .saturating_add(event.prev_value.len())
        .saturating_add(64)
}