#[cfg(feature = "std")]
use crate::{
CompressionType, TableId,
table::block::{Block, BlockIdentity, BlockTransform, BlockType, PreparedBlock},
};
#[cfg(not(feature = "std"))]
use alloc::boxed::Box;
#[cfg(feature = "std")]
use std::{
collections::BTreeMap,
sync::{Arc, Condvar, Mutex, PoisonError},
};
#[cfg(all(feature = "std", zstd_any))]
use crate::compression::ZstdDictionary;
#[cfg(feature = "std")]
use crate::encryption::EncryptionProvider;
pub trait CompactionSpawner: Send + Sync {
fn spawn(&self, task: Box<dyn FnOnce() + Send + 'static>);
}
#[cfg(feature = "parallel")]
pub struct RayonSpawner {
pool: Arc<rayon::ThreadPool>,
}
#[cfg(feature = "parallel")]
impl RayonSpawner {
pub fn with_threads(threads: usize) -> crate::Result<Self> {
let pool = rayon::ThreadPoolBuilder::new()
.num_threads(threads)
.thread_name(|i| format!("lsm-compress-{i}"))
.build()
.map_err(|e| crate::Error::Io(crate::io::Error::other(e.to_string())))?;
Ok(Self {
pool: Arc::new(pool),
})
}
#[must_use]
pub fn from_pool(pool: Arc<rayon::ThreadPool>) -> Self {
Self { pool }
}
}
#[cfg(feature = "parallel")]
impl CompactionSpawner for RayonSpawner {
fn spawn(&self, task: Box<dyn FnOnce() + Send + 'static>) {
self.pool.spawn(task);
}
}
#[cfg(feature = "std")]
struct Shared {
ready: Mutex<BTreeMap<u64, crate::Result<PreparedBlock<'static>>>>,
woke: Condvar,
}
#[cfg(feature = "std")]
pub struct BlockCompressor {
spawner: Arc<dyn CompactionSpawner>,
shared: Arc<Shared>,
table_id: TableId,
compression: CompressionType,
encryption: Option<Arc<dyn EncryptionProvider>>,
#[cfg(zstd_any)]
zstd_dict: Option<Arc<ZstdDictionary>>,
ecc: Option<crate::table::block::EccParams>,
next_submit: u64,
next_drain: u64,
}
#[cfg(feature = "std")]
impl BlockCompressor {
pub fn new(
spawner: Arc<dyn CompactionSpawner>,
table_id: TableId,
compression: CompressionType,
encryption: Option<Arc<dyn EncryptionProvider>>,
#[cfg(zstd_any)] zstd_dict: Option<Arc<ZstdDictionary>>,
ecc: Option<crate::table::block::EccParams>,
) -> Self {
Self {
spawner,
shared: Arc::new(Shared {
ready: Mutex::new(BTreeMap::new()),
woke: Condvar::new(),
}),
table_id,
compression,
encryption,
#[cfg(zstd_any)]
zstd_dict,
ecc,
next_submit: 0,
next_drain: 0,
}
}
pub fn pending(&self) -> usize {
usize::try_from(self.next_submit - self.next_drain).unwrap_or(usize::MAX)
}
pub fn submit(&mut self, encoded: Vec<u8>, extra_flags: u8) {
let seq = self.next_submit;
self.next_submit += 1;
let shared = Arc::clone(&self.shared);
let table_id = self.table_id;
let compression = self.compression;
let encryption = self.encryption.clone();
#[cfg(zstd_any)]
let zstd_dict = self.zstd_dict.clone();
let ecc = self.ecc;
self.spawner.spawn(Box::new(move || {
let result = prepare_owned(
&encoded,
table_id,
compression,
encryption.as_deref(),
#[cfg(zstd_any)]
zstd_dict.as_deref(),
ecc,
extra_flags,
);
let mut ready = shared.ready.lock().unwrap_or_else(PoisonError::into_inner);
ready.insert(seq, result);
drop(ready);
shared.woke.notify_all();
}));
}
pub fn take_next(&mut self) -> Option<crate::Result<PreparedBlock<'static>>> {
if self.next_drain == self.next_submit {
return None;
}
let seq = self.next_drain;
let mut ready = self
.shared
.ready
.lock()
.unwrap_or_else(PoisonError::into_inner);
loop {
if let Some(result) = ready.remove(&seq) {
self.next_drain += 1;
return Some(result);
}
ready = self
.shared
.woke
.wait(ready)
.unwrap_or_else(PoisonError::into_inner);
}
}
}
#[cfg(feature = "std")]
fn prepare_owned(
encoded: &[u8],
table_id: TableId,
compression: CompressionType,
encryption: Option<&dyn EncryptionProvider>,
#[cfg(zstd_any)] zstd_dict: Option<&ZstdDictionary>,
ecc: Option<crate::table::block::EccParams>,
extra_flags: u8,
) -> crate::Result<PreparedBlock<'static>> {
let transform = BlockTransform::from_parts(
compression,
encryption,
#[cfg(zstd_any)]
zstd_dict,
)?;
let transform = if let Some(ecc) = ecc {
transform.with_ecc(ecc)
} else {
transform
};
let identity = BlockIdentity {
table_id,
block_type: BlockType::Data,
dict_id: compression.dict_id(),
window_log: 0,
};
Ok(Block::prepare_with_flags(encoded, identity, &transform, extra_flags)?.into_owned())
}
#[cfg(test)]
mod tests {
#![expect(clippy::expect_used, reason = "test code")]
use super::*;
struct InlineSpawner;
impl CompactionSpawner for InlineSpawner {
fn spawn(&self, task: Box<dyn FnOnce() + Send + 'static>) {
task();
}
}
fn encode_plain(payload: &[u8]) -> Vec<u8> {
payload.to_vec()
}
#[derive(Default)]
struct ReverseSpawner {
tasks: Mutex<Vec<Box<dyn FnOnce() + Send + 'static>>>,
}
impl ReverseSpawner {
fn run_all_reverse(&self) {
let mut tasks =
std::mem::take(&mut *self.tasks.lock().unwrap_or_else(PoisonError::into_inner));
tasks.reverse();
for task in tasks {
task();
}
}
}
impl CompactionSpawner for ReverseSpawner {
fn spawn(&self, task: Box<dyn FnOnce() + Send + 'static>) {
self.tasks
.lock()
.unwrap_or_else(PoisonError::into_inner)
.push(task);
}
}
#[test]
fn take_next_returns_blocks_in_submission_order() {
let mut c = BlockCompressor::new(
Arc::new(InlineSpawner),
7,
CompressionType::None,
None,
#[cfg(zstd_any)]
None,
None,
);
assert_eq!(c.pending(), 0);
assert!(c.take_next().is_none());
c.submit(encode_plain(b"alpha"), 0);
c.submit(encode_plain(b"beta"), 0);
c.submit(encode_plain(b"gamma"), 0);
assert_eq!(c.pending(), 3);
let mut out = Vec::new();
while c.pending() > 0 {
let prepared = c
.take_next()
.expect("pending > 0 yields a block")
.expect("plain block prepares without error");
let mut buf = Vec::new();
prepared.write_to(&mut buf).expect("write to vec");
out.push(buf);
}
assert_eq!(out.len(), 3);
assert!(c.take_next().is_none());
}
#[test]
fn take_next_reorders_out_of_order_completions() {
let spawner = Arc::new(ReverseSpawner::default());
let mut c = BlockCompressor::new(
spawner.clone() as Arc<dyn CompactionSpawner>,
7,
CompressionType::None,
None,
#[cfg(zstd_any)]
None,
None,
);
c.submit(vec![0u8; 1], 0);
c.submit(vec![0u8; 2], 0);
c.submit(vec![0u8; 3], 0);
assert_eq!(c.pending(), 3);
spawner.run_all_reverse();
for expected_len in [1u32, 2, 3] {
let prepared = c
.take_next()
.expect("pending > 0 yields a block")
.expect("plain block prepares without error");
let mut buf = Vec::new();
let header = prepared.write_to(&mut buf).expect("write to vec");
assert_eq!(
header.uncompressed_length, expected_len,
"blocks must drain in submission order regardless of completion order",
);
}
assert!(c.take_next().is_none());
}
}