use std::{
sync::Arc,
thread,
};
use bytes::{
Bytes,
BytesMut,
};
use crossbeam_channel::{
Receiver,
Sender,
bounded,
};
use crate::{
block::{
BLOCK_SIZE,
Block,
},
io::buffer_pool::{
BufferPool,
PooledBuffer,
},
segment::BlockType,
segment_reader::SegmentReader,
utils::Deserializer,
};
pub struct ReadResult {
pub block_index: usize,
pub block: crate::block::ReadOnlyBlock,
pub block_type: BlockType,
}
#[derive(Clone)]
pub struct ParallelReaderConfig {
pub num_threads: usize,
pub queue_size: usize,
pub buffer_pool: BufferPool,
}
impl ParallelReaderConfig {
pub fn new() -> Self {
let num_threads = std::thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(4);
Self {
num_threads,
queue_size: 128,
buffer_pool: BufferPool::new(),
}
}
pub fn with_threads(mut self, threads: usize) -> Self {
self.num_threads = threads.max(1);
self
}
pub fn with_queue_size(mut self, size: usize) -> Self {
self.queue_size = size.max(1);
self
}
pub fn with_buffer_pool(mut self, pool: BufferPool) -> Self {
self.buffer_pool = pool;
self
}
}
impl Default for ParallelReaderConfig {
fn default() -> Self {
Self::new()
}
}
struct ReadTask {
reader: Arc<SegmentReader>,
block_index: usize,
block_type: BlockType,
}
pub struct ParallelReader {
workers: Option<Vec<thread::JoinHandle<()>>>,
task_sender: Sender<Option<ReadTask>>,
result_receiver: Receiver<ReadResult>,
config: ParallelReaderConfig,
}
impl ParallelReader {
pub fn new(config: ParallelReaderConfig) -> Self {
let (task_sender, task_receiver) = bounded::<Option<ReadTask>>(config.queue_size);
let (result_sender, result_receiver) = bounded::<ReadResult>(config.queue_size);
let mut workers = Vec::with_capacity(config.num_threads);
for worker_id in 0..config.num_threads {
let task_rx = task_receiver.clone();
let result_tx = result_sender.clone();
let buffer_pool = config.buffer_pool.clone();
let worker = thread::Builder::new()
.name(format!("parallel-reader-{}", worker_id))
.spawn(move || {
Self::worker_loop(task_rx, result_tx, buffer_pool);
})
.expect("failed to spawn reader thread");
workers.push(worker);
}
Self {
workers: Some(workers),
task_sender,
result_receiver,
config,
}
}
fn worker_loop(
task_rx: Receiver<Option<ReadTask>>,
result_tx: Sender<ReadResult>,
_buffer_pool: BufferPool,
) {
while let Ok(Some(task)) = task_rx.recv() {
let offset = task.block_index * BLOCK_SIZE;
let handle = match task.block_type {
| BlockType::Key => Arc::clone(task.reader.key_handle()),
| BlockType::Value => Arc::clone(task.reader.val_handle()),
};
if offset + BLOCK_SIZE > handle.len() {
continue;
}
let bytes = handle
.read_range(offset..offset + BLOCK_SIZE, |slice| {
Bytes::copy_from_slice(slice)
})
.ok();
if bytes.is_none() {
continue;
}
let block = crate::block::ReadOnlyBlock::deserialize(bytes.unwrap());
let result = ReadResult {
block_index: task.block_index,
block,
block_type: task.block_type,
};
if result_tx.send(result).is_err() {
break;
}
}
}
pub fn read_block(
&self,
reader: Arc<SegmentReader>,
block_index: usize,
block_type: BlockType,
) -> bool {
let task = ReadTask {
reader,
block_index,
block_type,
};
self.task_sender.send(Some(task)).is_ok()
}
pub fn try_recv(&self) -> Option<ReadResult> {
self.result_receiver.try_recv().ok()
}
pub fn recv(&self) -> Option<ReadResult> {
self.result_receiver.recv().ok()
}
pub fn pending_tasks(&self) -> usize {
self.task_sender.len()
}
pub fn available_results(&self) -> usize {
self.result_receiver.len()
}
pub fn buffer_pool_stats(&self) -> crate::io::buffer_pool::BufferPoolStats {
self.config.buffer_pool.stats()
}
pub fn shutdown(mut self) {
for _ in 0..self.config.num_threads {
let _ = self.task_sender.send(None);
}
if let Some(workers) = self.workers.take() {
for worker in workers {
let _ = worker.join();
}
}
}
}
impl Drop for ParallelReader {
fn drop(&mut self) {
for _ in 0..self.config.num_threads {
let _ = self.task_sender.send(None);
}
}
}
#[cfg(test)]
mod tests {
use tempfile::TempDir;
use super::*;
use crate::segment_builder::SegmentBuilder;
fn create_test_reader() -> Arc<SegmentReader> {
let temp_dir = TempDir::new().unwrap();
let builder = SegmentBuilder::new(temp_dir.path().to_path_buf()).unwrap();
let segment_id = 1;
let _segment = builder
.new_segment(segment_id, 12345, 64 * 1024 * 1024)
.unwrap();
drop(_segment);
let segment = builder.open(segment_id).unwrap();
Arc::new(segment.reader().unwrap())
}
#[test]
fn test_reader_creation() {
let config = ParallelReaderConfig::new();
let reader = ParallelReader::new(config);
assert_eq!(reader.pending_tasks(), 0);
assert_eq!(reader.available_results(), 0);
reader.shutdown();
}
#[test]
fn test_config_builder() {
let config = ParallelReaderConfig::new()
.with_threads(4)
.with_queue_size(256);
assert_eq!(config.num_threads, 4);
assert_eq!(config.queue_size, 256);
}
#[test]
fn test_read_task_submission() {
let config = ParallelReaderConfig::new().with_queue_size(10);
let parallel_reader = ParallelReader::new(config);
let reader = create_test_reader();
assert!(parallel_reader.read_block(reader, 0, BlockType::Key));
parallel_reader.shutdown();
}
#[test]
fn test_parallel_reads() {
let config = ParallelReaderConfig::new()
.with_threads(2)
.with_queue_size(20);
let parallel_reader = ParallelReader::new(config);
let reader = create_test_reader();
for i in 0..10 {
assert!(parallel_reader.read_block(reader.clone(), i, BlockType::Key));
}
let mut results = Vec::new();
for _ in 0..10 {
if let Some(result) = parallel_reader.recv() {
results.push(result);
} else {
break;
}
}
assert_eq!(results.len(), 10);
parallel_reader.shutdown();
}
#[test]
fn test_buffer_pool_integration() {
let pool = BufferPool::with_config(4096, 50);
let config = ParallelReaderConfig::new().with_buffer_pool(pool.clone());
let parallel_reader = ParallelReader::new(config);
let reader = create_test_reader();
for i in 0..5 {
parallel_reader.read_block(reader.clone(), i, BlockType::Key);
}
for _ in 0..5 {
let _ = parallel_reader.recv();
}
let _stats = parallel_reader.buffer_pool_stats();
parallel_reader.shutdown();
}
#[test]
fn test_shutdown() {
let config = ParallelReaderConfig::new();
let reader = ParallelReader::new(config);
reader.shutdown();
}
}