use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Duration;
use bytes::Bytes;
use tokio::sync::Mutex;
use tokio::time::sleep;
pub struct Reader<R> {
inner: Arc<R>,
reads: Arc<AtomicUsize>,
len_reads: Arc<AtomicUsize>,
read_delay: Option<Duration>,
}
impl<R> Clone for Reader<R> {
fn clone(&self) -> Self {
Self {
inner: Arc::clone(&self.inner),
reads: Arc::clone(&self.reads),
len_reads: Arc::clone(&self.len_reads),
read_delay: self.read_delay,
}
}
}
impl<R> Reader<R> {
pub fn new(inner: R) -> Self {
Self {
inner: Arc::new(inner),
reads: Arc::new(AtomicUsize::new(0)),
len_reads: Arc::new(AtomicUsize::new(0)),
read_delay: None,
}
}
pub fn with_read_delay(mut self, delay: Duration) -> Self {
self.read_delay = Some(delay);
self
}
pub fn read_count(&self) -> usize {
self.reads.load(Ordering::SeqCst)
}
pub fn len_read_count(&self) -> usize {
self.len_reads.load(Ordering::SeqCst)
}
}
impl<R> crate::Reader for Reader<R>
where
R: crate::Reader + Send + Sync + 'static,
{
async fn read_at(&self, offset: usize, buffer: &mut [u8]) -> std::io::Result<usize> {
self.reads.fetch_add(1, Ordering::SeqCst);
if let Some(delay) = self.read_delay {
sleep(delay).await;
}
self.inner.read_at(offset, buffer).await
}
async fn len(&self) -> std::io::Result<usize> {
self.len_reads.fetch_add(1, Ordering::SeqCst);
self.inner.len().await
}
}
pub struct Writer<W> {
inner: Arc<Mutex<W>>,
creates: Arc<AtomicUsize>,
reads: Arc<AtomicUsize>,
deletes: Arc<AtomicUsize>,
}
impl<W> Clone for Writer<W> {
fn clone(&self) -> Self {
Self {
inner: Arc::clone(&self.inner),
creates: Arc::clone(&self.creates),
reads: Arc::clone(&self.reads),
deletes: Arc::clone(&self.deletes),
}
}
}
impl<W> Writer<W> {
pub fn new(inner: W) -> Self {
Self {
inner: Arc::new(Mutex::new(inner)),
creates: Arc::new(AtomicUsize::new(0)),
reads: Arc::new(AtomicUsize::new(0)),
deletes: Arc::new(AtomicUsize::new(0)),
}
}
pub fn create_count(&self) -> usize {
self.creates.load(Ordering::SeqCst)
}
pub fn read_count(&self) -> usize {
self.reads.load(Ordering::SeqCst)
}
pub fn delete_count(&self) -> usize {
self.deletes.load(Ordering::SeqCst)
}
}
impl<W> crate::Writer for Writer<W>
where
W: crate::Writer + Send + Sync + 'static,
{
async fn create_extent(&mut self, offset: usize, data: Bytes) -> std::io::Result<()> {
self.creates.fetch_add(1, Ordering::SeqCst);
self.inner.lock().await.create_extent(offset, data).await
}
async fn read_extent(&self, offset: usize) -> std::io::Result<Bytes> {
self.reads.fetch_add(1, Ordering::SeqCst);
self.inner.lock().await.read_extent(offset).await
}
async fn delete_extent(&mut self, offset: usize) -> std::io::Result<()> {
self.deletes.fetch_add(1, Ordering::SeqCst);
self.inner.lock().await.delete_extent(offset).await
}
}