use bytes::BytesMut;
use noxu_sync::RawMutex;
use noxu_sync::lock_api::RawMutex as RawMutexTrait;
use noxu_util::lsn::{Lsn, NULL_LSN};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicI32, Ordering};
use std::thread;
use std::time::Duration;
pub struct LogBuffer {
data: BytesMut,
first_lsn: Lsn,
last_lsn: Lsn,
capacity: usize,
control: Arc<LogBufferControl>,
rewrite_allowed: bool,
flushed_len: usize,
}
struct LogBufferControl {
read_latch: RawMutex,
latch_held: AtomicBool,
write_pin_count: AtomicI32,
}
impl LogBufferControl {
fn new() -> Self {
LogBufferControl {
read_latch: RawMutex::INIT,
latch_held: AtomicBool::new(false),
write_pin_count: AtomicI32::new(0),
}
}
}
impl LogBuffer {
pub fn new(capacity: usize) -> Self {
LogBuffer {
data: BytesMut::with_capacity(capacity),
first_lsn: NULL_LSN,
last_lsn: NULL_LSN,
capacity,
control: Arc::new(LogBufferControl::new()),
rewrite_allowed: false,
flushed_len: 0,
}
}
pub fn wrap(data: BytesMut, first_lsn: Lsn) -> Self {
let capacity = data.capacity();
LogBuffer {
data,
first_lsn,
last_lsn: first_lsn,
capacity,
control: Arc::new(LogBufferControl::new()),
rewrite_allowed: false,
flushed_len: 0,
}
}
pub fn reinit(&mut self) {
self.latch_for_write();
self.data.clear();
self.first_lsn = NULL_LSN;
self.last_lsn = NULL_LSN;
self.rewrite_allowed = false;
self.control.write_pin_count.store(0, Ordering::Relaxed);
self.flushed_len = 0;
self.release();
}
pub fn get_unflushed_data(&self) -> &[u8] {
&self.data[self.flushed_len..]
}
pub fn flushed_file_offset(&self) -> u64 {
self.first_lsn.file_offset() as u64 + self.flushed_len as u64
}
pub fn mark_flushed(&mut self) {
self.flushed_len = self.data.len();
}
pub fn get_first_lsn(&self) -> Lsn {
self.first_lsn
}
pub fn register_lsn(&mut self, lsn: Lsn) {
assert!(
self.control.latch_held.load(Ordering::Relaxed),
"read_latch must be held"
);
if !self.last_lsn.is_null() {
assert!(
lsn > self.last_lsn,
"lsn={:?} must be > last_lsn={:?}",
lsn,
self.last_lsn
);
}
self.last_lsn = lsn;
if self.first_lsn.is_null() {
self.first_lsn = lsn;
}
}
pub fn has_room(&self, num_bytes: usize) -> bool {
num_bytes <= (self.capacity - self.data.len())
}
pub fn get_data(&self) -> &[u8] {
&self.data
}
pub fn capacity(&self) -> usize {
self.capacity
}
pub fn contains_lsn(&self, lsn: Lsn) -> bool {
assert!(!lsn.is_null());
self.wait_for_zero_and_latch();
let found = if !self.first_lsn.is_null()
&& self.first_lsn.file_number() == lsn.file_number()
{
let file_offset = lsn.file_offset();
let content_size = self.data.len();
let first_lsn_offset = self.first_lsn.file_offset();
let last_content_offset = first_lsn_offset + content_size as u32;
first_lsn_offset <= file_offset && last_content_offset > file_offset
} else {
false
};
if !found {
self.release();
}
found
}
pub fn latch_for_write(&self) {
self.control.read_latch.lock();
self.control.latch_held.store(true, Ordering::Relaxed);
}
pub fn release(&self) {
if self.control.latch_held.swap(false, Ordering::Relaxed) {
unsafe {
self.control.read_latch.unlock();
}
}
}
pub fn get_rewrite_allowed(&self) -> bool {
self.rewrite_allowed
}
pub fn set_rewrite_allowed(&mut self) {
self.rewrite_allowed = true;
}
pub fn allocate(&mut self, size: usize) -> Option<LogBufferSegment> {
assert!(
self.control.latch_held.load(Ordering::Relaxed),
"read_latch must be held"
);
if self.has_room(size) {
let offset = self.data.len();
self.data.resize(offset + size, 0);
self.control.write_pin_count.fetch_add(1, Ordering::Relaxed);
let data_ptr = unsafe { self.data.as_mut_ptr().add(offset) };
Some(LogBufferSegment {
data_ptr,
control: Arc::clone(&self.control),
size,
})
} else {
None
}
}
pub fn free(&self) {
self.control.write_pin_count.fetch_sub(1, Ordering::Release);
}
pub fn wait_for_zero_and_latch(&self) {
loop {
if self.control.write_pin_count.load(Ordering::Acquire) > 0 {
thread::park_timeout(Duration::from_nanos(100));
} else {
self.latch_for_write();
if self.control.write_pin_count.load(Ordering::Acquire) == 0 {
return;
} else {
self.release();
}
}
}
}
pub fn get_bytes(&self, file_offset: u32) -> &[u8] {
let buffer_offset =
(file_offset - self.first_lsn.file_offset()) as usize;
&self.data[buffer_offset..]
}
}
pub struct LogBufferSegment {
data_ptr: *mut u8,
control: Arc<LogBufferControl>,
size: usize,
}
unsafe impl Send for LogBufferSegment {}
impl LogBufferSegment {
pub fn put(&self, data: &[u8]) {
assert_eq!(
data.len(),
self.size,
"data size must match allocated segment size"
);
self.control.read_latch.lock();
self.control.latch_held.store(true, Ordering::Relaxed);
unsafe {
std::ptr::copy_nonoverlapping(
data.as_ptr(),
self.data_ptr,
data.len(),
);
}
self.control.latch_held.store(false, Ordering::Relaxed);
unsafe {
self.control.read_latch.unlock();
}
self.control.write_pin_count.fetch_sub(1, Ordering::Release);
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_new_buffer() {
let buffer = LogBuffer::new(1024);
assert_eq!(buffer.capacity(), 1024);
assert!(buffer.get_first_lsn().is_null());
assert!(buffer.has_room(1024));
}
#[test]
fn test_allocate_and_put() {
let mut buffer = LogBuffer::new(1024);
buffer.latch_for_write();
let segment = buffer.allocate(100).expect("should allocate");
let data = vec![42u8; 100];
buffer.release();
segment.put(&data);
buffer.latch_for_write();
assert_eq!(buffer.get_data()[0..100], data[..]);
buffer.release();
}
#[test]
fn test_segment_survives_buffer_move() {
let mut buffer = LogBuffer::new(1024);
buffer.latch_for_write();
let segment = buffer.allocate(64).expect("should allocate");
buffer.release();
let moved = buffer;
let boxed = Box::new(moved);
let data = vec![7u8; 64];
segment.put(&data);
boxed.latch_for_write();
assert_eq!(boxed.get_data()[0..64], data[..]);
boxed.release();
}
#[test]
fn test_register_lsn() {
let mut buffer = LogBuffer::new(1024);
buffer.latch_for_write();
let lsn1 = Lsn::new(0, 100);
buffer.register_lsn(lsn1);
assert_eq!(buffer.get_first_lsn(), lsn1);
let lsn2 = Lsn::new(0, 200);
buffer.register_lsn(lsn2);
assert_eq!(buffer.get_first_lsn(), lsn1);
buffer.release();
}
#[test]
fn test_has_room() {
let mut buffer = LogBuffer::new(100);
buffer.latch_for_write();
assert!(buffer.has_room(100));
assert!(buffer.has_room(50));
assert!(!buffer.has_room(101));
let _seg = buffer.allocate(50);
assert!(buffer.has_room(50));
assert!(!buffer.has_room(51));
buffer.release();
buffer.free();
}
#[test]
fn test_reinit() {
let mut buffer = LogBuffer::new(1024);
buffer.latch_for_write();
let _seg = buffer.allocate(100);
buffer.register_lsn(Lsn::new(0, 100));
buffer.release();
buffer.free(); buffer.reinit();
buffer.latch_for_write();
assert!(buffer.get_first_lsn().is_null());
assert_eq!(buffer.get_data().len(), 0);
assert!(buffer.has_room(1024));
buffer.release();
}
#[test]
fn test_contains_lsn() {
let mut buffer = LogBuffer::new(1024);
buffer.latch_for_write();
let seg = buffer.allocate(100).unwrap();
let lsn = Lsn::new(5, 1000);
buffer.register_lsn(lsn);
buffer.release();
seg.put(&[0u8; 100]);
assert!(buffer.contains_lsn(Lsn::new(5, 1000)));
buffer.release();
assert!(buffer.contains_lsn(Lsn::new(5, 1050)));
buffer.release();
assert!(!buffer.contains_lsn(Lsn::new(5, 1100)));
assert!(!buffer.contains_lsn(Lsn::new(6, 1000)));
}
#[test]
fn test_wrap_constructor() {
let mut data = bytes::BytesMut::with_capacity(256);
data.resize(64, 0xAB);
let lsn = Lsn::new(2, 400);
let buffer = LogBuffer::wrap(data, lsn);
assert_eq!(buffer.get_first_lsn(), lsn);
assert_eq!(buffer.capacity(), 256);
}
#[test]
fn test_multiple_allocations() {
let mut buffer = LogBuffer::new(1024);
buffer.latch_for_write();
let seg1 = buffer.allocate(100).expect("first allocation");
let seg2 = buffer.allocate(200).expect("second allocation");
assert!(!buffer.has_room(725)); assert!(buffer.has_room(724));
buffer.release();
seg1.put(&[1u8; 100]);
seg2.put(&[2u8; 200]);
buffer.latch_for_write();
let data = buffer.get_data();
assert_eq!(&data[0..100], &[1u8; 100]);
assert_eq!(&data[100..300], &[2u8; 200]);
buffer.release();
}
#[test]
fn test_allocate_exactly_capacity() {
let mut buffer = LogBuffer::new(256);
buffer.latch_for_write();
let seg = buffer.allocate(256).expect("should fill exactly");
assert!(!buffer.has_room(1));
buffer.release();
seg.put(&[0xCCu8; 256]);
buffer.latch_for_write();
let data = buffer.get_data();
assert_eq!(data.len(), 256);
assert!(data.iter().all(|&b| b == 0xCC));
buffer.release();
}
#[test]
fn test_allocate_too_large_returns_none() {
let mut buffer = LogBuffer::new(128);
buffer.latch_for_write();
let result = buffer.allocate(129);
assert!(result.is_none());
assert!(buffer.has_room(128));
buffer.release();
}
#[test]
fn test_get_bytes_after_write() {
let mut buffer = LogBuffer::new(512);
buffer.latch_for_write();
let lsn = Lsn::new(7, 2000);
let seg = buffer.allocate(50).unwrap();
buffer.register_lsn(lsn);
buffer.release();
seg.put(&[0xAAu8; 50]);
buffer.latch_for_write();
let slice = buffer.get_bytes(lsn.file_offset());
assert_eq!(&slice[..50], &[0xAAu8; 50]);
buffer.release();
}
#[test]
fn test_rewrite_allowed_flag() {
let mut buffer = LogBuffer::new(64);
assert!(!buffer.get_rewrite_allowed());
buffer.set_rewrite_allowed();
assert!(buffer.get_rewrite_allowed());
}
#[test]
fn test_pin_count_release_acquire_ordering() {
use std::sync::{Arc, Mutex};
use std::thread;
let buf = Arc::new(Mutex::new(LogBuffer::new(256)));
let segment = {
let mut b = buf.lock().unwrap();
b.latch_for_write();
let seg = b.allocate(64).expect("must allocate 64 bytes");
b.release(); seg
};
let written_pattern = [0xABu8; 64];
let t = thread::spawn(move || {
segment.put(&written_pattern);
});
{
let b = buf.lock().unwrap();
b.wait_for_zero_and_latch(); let data = b.get_data();
assert_eq!(
&data[..64],
&[0xABu8; 64],
"C-7: writer's data must be visible after pin_count reaches zero"
);
b.release();
}
t.join().unwrap();
}
}