use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use manifoldb_core::TransactionError;
use manifoldb_storage::StorageEngine;
#[derive(Debug, Clone)]
pub struct ReadPoolConfig {
pub max_size: usize,
pub max_age: Duration,
pub refresh_after_writes: u64,
pub prefill: bool,
}
impl Default for ReadPoolConfig {
fn default() -> Self {
Self {
max_size: 16,
max_age: Duration::from_millis(100),
refresh_after_writes: 100,
prefill: false,
}
}
}
impl ReadPoolConfig {
#[must_use]
pub fn new() -> Self {
Self::default()
}
#[must_use]
pub const fn max_size(mut self, size: usize) -> Self {
self.max_size = size;
self
}
#[must_use]
pub const fn max_age(mut self, age: Duration) -> Self {
self.max_age = age;
self
}
#[must_use]
pub const fn refresh_after_writes(mut self, count: u64) -> Self {
self.refresh_after_writes = count;
self
}
#[must_use]
pub const fn prefill(mut self, prefill: bool) -> Self {
self.prefill = prefill;
self
}
#[must_use]
pub fn disabled() -> Self {
Self { max_size: 0, ..Default::default() }
}
}
pub struct ReadPool<E: StorageEngine> {
engine: Arc<E>,
config: ReadPoolConfig,
write_counter: AtomicU64,
}
impl<E: StorageEngine> ReadPool<E> {
pub fn new(engine: Arc<E>, config: ReadPoolConfig) -> Result<Self, TransactionError> {
Ok(Self { engine, config, write_counter: AtomicU64::new(0) })
}
pub fn with_defaults(engine: Arc<E>) -> Result<Self, TransactionError> {
Self::new(engine, ReadPoolConfig::default())
}
pub fn acquire(&self) -> Result<PooledReadTx<'_, E>, TransactionError> {
let write_count = self.write_counter.load(Ordering::Relaxed);
let created_at = Instant::now();
let tx = self
.engine
.begin_read()
.map_err(|e| TransactionError::Storage(format!("failed to begin read: {e}")))?;
Ok(PooledReadTx { pool: self, tx, created_at, created_at_write: write_count })
}
pub fn notify_write(&self) {
self.write_counter.fetch_add(1, Ordering::Relaxed);
}
pub fn notify_writes(&self, count: u64) {
self.write_counter.fetch_add(count, Ordering::Relaxed);
}
#[must_use]
pub fn write_count(&self) -> u64 {
self.write_counter.load(Ordering::Relaxed)
}
#[must_use]
pub fn available_count(&self) -> usize {
0 }
#[must_use]
pub const fn config(&self) -> &ReadPoolConfig {
&self.config
}
pub fn clear(&self) {
}
pub fn refresh(&self) {
self.clear();
}
}
pub struct PooledReadTx<'pool, E: StorageEngine> {
pool: &'pool ReadPool<E>,
tx: E::Transaction<'pool>,
created_at: Instant,
created_at_write: u64,
}
impl<'pool, E: StorageEngine> PooledReadTx<'pool, E> {
#[must_use]
pub fn transaction(&self) -> &E::Transaction<'pool> {
&self.tx
}
#[must_use]
pub fn age(&self) -> Duration {
self.created_at.elapsed()
}
#[must_use]
pub fn is_stale_by_age(&self) -> bool {
self.age() > self.pool.config.max_age
}
#[must_use]
pub fn is_stale_by_writes(&self) -> bool {
let current_writes = self.pool.write_counter.load(Ordering::Relaxed);
let writes_since = current_writes.saturating_sub(self.created_at_write);
writes_since > self.pool.config.refresh_after_writes
}
#[must_use]
pub fn is_stale(&self) -> bool {
self.is_stale_by_age() || self.is_stale_by_writes()
}
#[must_use]
pub fn created_at_write(&self) -> u64 {
self.created_at_write
}
pub fn discard(self) {
}
pub fn get(
&self,
table: &str,
key: &[u8],
) -> Result<Option<Vec<u8>>, manifoldb_storage::StorageError> {
use manifoldb_storage::Transaction;
self.tx.get(table, key)
}
#[must_use]
pub fn is_read_only(&self) -> bool {
use manifoldb_storage::Transaction;
self.tx.is_read_only()
}
}
#[cfg(test)]
mod tests {
use super::*;
use manifoldb_storage::backends::RedbEngine;
use std::thread;
fn create_test_engine() -> Arc<RedbEngine> {
Arc::new(RedbEngine::in_memory().expect("failed to create in-memory engine"))
}
#[test]
fn test_config_default() {
let config = ReadPoolConfig::default();
assert_eq!(config.max_size, 16);
assert_eq!(config.max_age, Duration::from_millis(100));
assert_eq!(config.refresh_after_writes, 100);
assert!(!config.prefill);
}
#[test]
fn test_config_builder() {
let config = ReadPoolConfig::new()
.max_size(8)
.max_age(Duration::from_millis(50))
.refresh_after_writes(50)
.prefill(true);
assert_eq!(config.max_size, 8);
assert_eq!(config.max_age, Duration::from_millis(50));
assert_eq!(config.refresh_after_writes, 50);
assert!(config.prefill);
}
#[test]
fn test_config_disabled() {
let config = ReadPoolConfig::disabled();
assert_eq!(config.max_size, 0);
}
#[test]
fn test_pool_creation() {
let engine = create_test_engine();
let pool = ReadPool::new(engine, ReadPoolConfig::default()).expect("pool creation failed");
assert_eq!(pool.available_count(), 0);
}
#[test]
fn test_acquire() {
let engine = create_test_engine();
let pool = ReadPool::new(engine, ReadPoolConfig::default()).expect("pool creation failed");
let tx = pool.acquire().expect("acquire failed");
assert!(tx.is_read_only());
}
#[test]
fn test_staleness_by_age() {
let engine = create_test_engine();
let config = ReadPoolConfig::new().max_age(Duration::from_millis(1));
let pool = ReadPool::new(engine, config).expect("pool creation failed");
let tx = pool.acquire().expect("acquire failed");
assert!(!tx.is_stale_by_age());
thread::sleep(Duration::from_millis(5));
assert!(tx.is_stale_by_age());
assert!(tx.is_stale());
}
#[test]
fn test_staleness_by_writes() {
let engine = create_test_engine();
let config = ReadPoolConfig::new().refresh_after_writes(5);
let pool = ReadPool::new(engine, config).expect("pool creation failed");
let tx = pool.acquire().expect("acquire failed");
assert!(!tx.is_stale_by_writes());
pool.notify_writes(10);
assert!(tx.is_stale_by_writes());
assert!(tx.is_stale());
}
#[test]
fn test_write_counter() {
let engine = create_test_engine();
let pool = ReadPool::new(engine, ReadPoolConfig::default()).expect("pool creation failed");
assert_eq!(pool.write_count(), 0);
pool.notify_write();
assert_eq!(pool.write_count(), 1);
pool.notify_writes(5);
assert_eq!(pool.write_count(), 6);
}
#[test]
fn test_transaction_operations() {
let engine = create_test_engine();
{
use manifoldb_storage::Transaction;
let mut tx = engine.begin_write().expect("begin_write failed");
tx.put("test", b"key", b"value").expect("put failed");
tx.commit().expect("commit failed");
}
let pool = ReadPool::new(Arc::clone(&engine), ReadPoolConfig::default())
.expect("pool creation failed");
let tx = pool.acquire().expect("acquire failed");
let value = tx.get("test", b"key").expect("get failed");
assert_eq!(value, Some(b"value".to_vec()));
}
#[test]
fn test_concurrent_access() {
let engine = create_test_engine();
let pool = Arc::new(
ReadPool::new(engine, ReadPoolConfig::default()).expect("pool creation failed"),
);
let handles: Vec<_> = (0..8)
.map(|_| {
let pool = Arc::clone(&pool);
thread::spawn(move || {
for _ in 0..10 {
let tx = pool.acquire().expect("acquire failed");
assert!(tx.is_read_only());
thread::sleep(Duration::from_micros(100));
}
})
})
.collect();
for handle in handles {
handle.join().expect("thread panicked");
}
}
}