use crate::error::{LogError, Result};
use crate::file_manager::FileManager;
use crate::log_buffer::LogBuffer;
use noxu_latch::ExclusiveLatch;
use noxu_sync::Mutex;
use noxu_util::lsn::Lsn;
use std::sync::Arc;
pub struct LogBufferPool {
buffers: Vec<Arc<Mutex<LogBuffer>>>,
dirty_start: i32,
dirty_end: i32,
current_write_buffer_index: usize,
num_buffers: usize,
buffer_size: usize,
buffer_pool_latch: ExclusiveLatch,
min_buffer_lsn: Lsn,
n_not_resident: u64,
n_cache_miss: u64,
n_no_free_buffer: u64,
file_manager: Arc<FileManager>,
}
impl LogBufferPool {
pub fn new(
num_buffers: usize,
buffer_size: usize,
file_manager: Arc<FileManager>,
) -> Self {
let mut buffers = Vec::with_capacity(num_buffers);
for _ in 0..num_buffers {
buffers.push(Arc::new(Mutex::new(LogBuffer::new(buffer_size))));
}
LogBufferPool {
buffers,
dirty_start: -1,
dirty_end: -1,
current_write_buffer_index: 0,
num_buffers,
buffer_size,
buffer_pool_latch: ExclusiveLatch::named("LogBufferPool"),
min_buffer_lsn: Lsn::from_u64(0),
n_not_resident: 0,
n_cache_miss: 0,
n_no_free_buffer: 0,
file_manager,
}
}
pub fn get_log_buffer_size(&self) -> usize {
self.buffer_size
}
pub fn get_write_buffer(
&mut self,
size_needed: usize,
flipped_file: bool,
) -> Result<Arc<Mutex<LogBuffer>>> {
if flipped_file {
self.bump_and_write_dirty(size_needed, true)?;
self.file_manager.sync_log_end_and_finish_file()?;
} else {
let current = self.buffers[self.current_write_buffer_index].lock();
let has_room = current.has_room(size_needed);
drop(current);
if !has_room {
if !self.bump_current(size_needed)? {
self.bump_and_write_dirty(size_needed, false)?;
} else {
let current =
self.buffers[self.current_write_buffer_index].lock();
let has_room_after_bump = current.has_room(size_needed);
drop(current);
if !has_room_after_bump {
self.bump_and_write_dirty(size_needed, false)?;
}
}
}
}
Ok(Arc::clone(&self.buffers[self.current_write_buffer_index]))
}
fn bump_and_write_dirty(
&mut self,
size_needed: usize,
flush_write_queue: bool,
) -> Result<()> {
if !self.bump_current(size_needed)? {
self.write_dirty(flush_write_queue)?;
if !self.bump_current(size_needed)? {
return Err(LogError::Internal(
"No free log buffers after flushing dirty buffers"
.to_string(),
));
}
}
self.write_dirty(flush_write_queue)
}
fn bump_current(&mut self, _size_needed: usize) -> Result<bool> {
let _guard = self
.buffer_pool_latch
.acquire()
.map_err(|e| LogError::LatchTimeout(e.to_string()))?;
let current = self.buffers[self.current_write_buffer_index].lock();
current.latch_for_write();
if current.get_first_lsn().is_null() {
current.release();
return Ok(true);
}
if self.dirty_start >= 0 {
let next_slot = self.get_next_slot(self.current_write_buffer_index);
if next_slot == self.dirty_start as usize {
self.n_no_free_buffer += 1;
current.release();
return Ok(false);
}
} else {
self.dirty_start = self.current_write_buffer_index as i32;
}
self.dirty_end = self.current_write_buffer_index as i32;
self.current_write_buffer_index =
self.get_next_slot(self.current_write_buffer_index);
let next_buffer_index = self.current_write_buffer_index;
let new_initial_buffer_index =
self.get_next_slot(self.current_write_buffer_index);
current.release();
drop(current);
let mut next_to_use = self.buffers[next_buffer_index].lock();
next_to_use.reinit();
drop(next_to_use);
let new_initial_buffer = self.buffers[new_initial_buffer_index].lock();
let new_min_lsn = new_initial_buffer.get_first_lsn();
drop(new_initial_buffer);
if !new_min_lsn.is_null() {
self.min_buffer_lsn = new_min_lsn;
}
Ok(true)
}
fn get_next_slot(&self, slot_number: usize) -> usize {
if slot_number < self.buffers.len() - 1 { slot_number + 1 } else { 0 }
}
fn write_dirty(&mut self, _flush_write_queue: bool) -> Result<()> {
let _guard = self
.buffer_pool_latch
.acquire()
.map_err(|e| LogError::LatchTimeout(e.to_string()))?;
if self.dirty_start < 0 {
return Ok(());
}
let mut current_dirty = self.dirty_start as usize;
loop {
let is_last = current_dirty == self.dirty_end as usize;
{
let mut buffer = self.buffers[current_dirty].lock();
buffer.wait_for_zero_and_latch();
let first_lsn = buffer.get_first_lsn();
if !first_lsn.is_null() {
let unflushed = buffer.get_unflushed_data();
if !unflushed.is_empty() {
let offset = buffer.flushed_file_offset();
let data = unflushed.to_vec();
buffer.mark_flushed();
buffer.release();
drop(buffer);
self.file_manager.write_buffer_to_file(
first_lsn.file_number(),
&data,
offset,
)?;
} else {
buffer.release();
drop(buffer);
}
} else {
buffer.release();
drop(buffer);
}
}
if is_last {
break;
}
current_dirty = self.get_next_slot(current_dirty);
}
self.dirty_start = -1;
self.dirty_end = -1;
Ok(())
}
pub fn get_read_buffer_by_lsn(
&mut self,
lsn: Lsn,
) -> Result<Option<Arc<Mutex<LogBuffer>>>> {
self.n_not_resident += 1;
if lsn < self.min_buffer_lsn {
self.n_cache_miss += 1;
return Ok(None);
}
let _guard = self
.buffer_pool_latch
.acquire()
.map_err(|e| LogError::LatchTimeout(e.to_string()))?;
for buffer_arc in &self.buffers {
let buffer = buffer_arc.lock();
if buffer.contains_lsn(lsn) {
drop(buffer);
return Ok(Some(Arc::clone(buffer_arc)));
}
drop(buffer);
}
self.n_cache_miss += 1;
Ok(None)
}
pub fn get_all_buffers(&self) -> Vec<Arc<Mutex<LogBuffer>>> {
self.buffers.clone()
}
pub fn get_stats(&self) -> BufferPoolStats {
BufferPoolStats {
num_buffers: self.num_buffers,
buffer_size: self.buffer_size,
n_not_resident: self.n_not_resident,
n_cache_miss: self.n_cache_miss,
n_no_free_buffer: self.n_no_free_buffer,
}
}
}
#[derive(Debug, Clone)]
pub struct BufferPoolStats {
pub num_buffers: usize,
pub buffer_size: usize,
pub n_not_resident: u64,
pub n_cache_miss: u64,
pub n_no_free_buffer: u64,
}
#[cfg(test)]
mod tests {
use super::*;
use crate::file_manager::FileManager;
use tempfile::TempDir;
fn make_pool(
num_buffers: usize,
buffer_size: usize,
) -> (LogBufferPool, TempDir) {
let dir = TempDir::new().unwrap();
let fm = Arc::new(
FileManager::new(dir.path(), false, 100_000_000, 10).unwrap(),
);
let pool = LogBufferPool::new(num_buffers, buffer_size, fm);
(pool, dir)
}
#[test]
fn test_new_pool() {
let (pool, _dir) = make_pool(3, 1024);
assert_eq!(pool.get_log_buffer_size(), 1024);
assert_eq!(pool.num_buffers, 3);
}
#[test]
fn test_get_write_buffer() {
let (mut pool, _dir) = make_pool(3, 1024);
let buffer =
pool.get_write_buffer(100, false).expect("get_write_buffer");
let buf = buffer.lock();
assert!(buf.has_room(100));
}
#[test]
fn test_buffer_cycling() {
let (mut pool, _dir) = make_pool(3, 100);
{
let buffer =
pool.get_write_buffer(50, false).expect("get_write_buffer");
let mut buf = buffer.lock();
buf.latch_for_write();
buf.register_lsn(Lsn::new(0, 0));
buf.allocate(50);
buf.release();
}
{
let buffer =
pool.get_write_buffer(60, false).expect("get_write_buffer");
let buf = buffer.lock();
assert!(buf.has_room(60));
}
}
#[test]
fn test_get_next_slot() {
let (pool, _dir) = make_pool(3, 1024);
assert_eq!(pool.get_next_slot(0), 1);
assert_eq!(pool.get_next_slot(1), 2);
assert_eq!(pool.get_next_slot(2), 0); }
#[test]
fn test_stats_initial() {
let (pool, _dir) = make_pool(3, 1024);
let stats = pool.get_stats();
assert_eq!(stats.num_buffers, 3);
assert_eq!(stats.buffer_size, 1024);
assert_eq!(stats.n_not_resident, 0);
assert_eq!(stats.n_cache_miss, 0);
assert_eq!(stats.n_no_free_buffer, 0);
}
#[test]
fn test_read_buffer_lsn_below_min_is_miss() {
let (mut pool, _dir) = make_pool(3, 1024);
let lsn = Lsn::new(99, 5000);
let result =
pool.get_read_buffer_by_lsn(lsn).expect("get_read_buffer_by_lsn");
assert!(result.is_none());
assert_eq!(pool.get_stats().n_cache_miss, 1);
}
#[test]
fn test_write_buffer_has_enough_space() {
let (mut pool, _dir) = make_pool(3, 512);
let buf = pool.get_write_buffer(256, false).expect("get_write_buffer");
let inner = buf.lock();
assert!(inner.has_room(256));
}
#[test]
fn test_two_buffers_pool_wraps_around() {
let (pool, _dir) = make_pool(2, 64);
assert_eq!(pool.get_next_slot(0), 1);
assert_eq!(pool.get_next_slot(1), 0);
}
#[test]
fn test_write_dirty_drains_ring_no_panic() {
use crate::entry_type::LogEntryType;
use crate::log_manager::LogManager;
use crate::provisional::Provisional;
let dir = TempDir::new().unwrap();
let fm = Arc::new(
FileManager::new(dir.path(), false, 100_000_000, 10).unwrap(),
);
let lm = LogManager::new(Arc::clone(&fm), 3, 64, 4096);
let mut lsns = Vec::new();
for i in 0u8..20 {
let lsn = lm
.log(LogEntryType::Trace, &[i], Provisional::No, false, false)
.expect("log() must not panic (DRIFT-2 fix)");
lsns.push((lsn, i));
}
lm.flush_no_sync().expect("flush_no_sync");
for (lsn, expected) in &lsns[..5] {
let (_, payload) = lm.read_entry(*lsn).expect("read_entry");
assert_eq!(payload, &[*expected], "payload mismatch at {lsn:?}");
}
}
}