quill-sql 0.2.1

An educational Rust relational database (RDBMS) inspired by CMU 15445
Documentation
use crate::recovery::wal::Lsn;
use crate::utils::ring_buffer::ConcurrentRingBuffer;

use super::record::WalRecord;
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};

#[derive(Debug)]
pub struct WalBuffer {
    queue: ConcurrentRingBuffer<WalRecord>,
    len: AtomicUsize,
    bytes: AtomicUsize,
    last_enqueued_end: AtomicU64,
}

impl WalBuffer {
    pub fn with_capacity(capacity: usize) -> Self {
        Self {
            queue: ConcurrentRingBuffer::with_capacity(capacity.max(1)),
            len: AtomicUsize::new(0),
            bytes: AtomicUsize::new(0),
            last_enqueued_end: AtomicU64::new(0),
        }
    }

    pub fn push(&self, record: WalRecord) {
        let encoded_len = record.encoded_len() as usize;
        let end_lsn = record.end_lsn;
        let mut pending = record;
        loop {
            match self.queue.try_push(pending) {
                Ok(()) => break,
                Err(returned) => {
                    pending = returned;
                    std::hint::spin_loop();
                }
            }
        }
        self.len.fetch_add(1, Ordering::Release);
        self.bytes.fetch_add(encoded_len, Ordering::Release);
        self.last_enqueued_end.store(end_lsn, Ordering::Release);
    }

    #[inline]
    pub fn len(&self) -> usize {
        self.len.load(Ordering::Acquire)
    }

    #[inline]
    pub fn bytes(&self) -> usize {
        self.bytes.load(Ordering::Acquire)
    }

    #[inline]
    pub fn highest_end_lsn(&self) -> Lsn {
        self.last_enqueued_end.load(Ordering::Acquire)
    }

    #[inline]
    pub fn is_empty(&self) -> bool {
        self.len.load(Ordering::Acquire) == 0
    }

    pub fn drain_until(&self, upto: Lsn) -> (Vec<WalRecord>, usize) {
        let mut drained = Vec::new();
        let mut released = 0usize;
        loop {
            let Some(front) = self.queue.peek_clone() else {
                break;
            };
            if front.end_lsn > upto {
                break;
            }
            if let Some(record) = self.queue.pop() {
                released += record.encoded_len() as usize;
                drained.push(record);
            } else {
                break;
            }
        }
        if !drained.is_empty() {
            self.len.fetch_sub(drained.len(), Ordering::Release);
            self.bytes.fetch_sub(released, Ordering::Release);
        }
        (drained, released)
    }

    pub fn pending(&self) -> Vec<WalRecord> {
        self.queue.snapshot()
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use bytes::Bytes;

    fn make_record(start: Lsn, len: usize) -> WalRecord {
        WalRecord {
            start_lsn: start,
            end_lsn: start + len as u64,
            payload: Bytes::from(vec![0u8; len]),
        }
    }

    #[test]
    fn push_updates_length_and_bytes() {
        let buffer = WalBuffer::with_capacity(8);
        buffer.push(make_record(0, 16));
        buffer.push(make_record(16, 32));

        assert_eq!(buffer.len(), 2);
        assert_eq!(buffer.bytes(), 48);
        assert_eq!(buffer.highest_end_lsn(), 48);
    }

    #[test]
    fn drain_until_releases_records_and_bytes() {
        let buffer = WalBuffer::with_capacity(8);
        buffer.push(make_record(0, 10));
        buffer.push(make_record(10, 20));
        buffer.push(make_record(30, 5));

        let (drained, released) = buffer.drain_until(30);
        assert_eq!(drained.len(), 2);
        assert_eq!(released, 30);
        assert_eq!(buffer.len(), 1);
        assert_eq!(buffer.bytes(), 5);
        assert_eq!(buffer.highest_end_lsn(), 35);
    }
}