use std::any::Any;
use std::ops::Range;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::sync::Mutex;
use std::{fmt, io, mem};
use async_trait::async_trait;
use instant::{Duration, Instant};
use tantivy::directory::error::{LockError, OpenReadError};
use tantivy::directory::{DirectoryLock, FileHandle, Lock, OwnedBytes};
use tantivy::{Directory, HasLen};
use time::OffsetDateTime;
#[derive(Clone, Default)]
struct OperationBuffer(Arc<Mutex<Vec<ReadOperation>>>);
impl fmt::Debug for OperationBuffer {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "OperationBuffer")
}
}
impl OperationBuffer {
fn drain(&self) -> impl Iterator<Item = ReadOperation> + 'static {
let mut guard = self.0.lock().expect("poisoned");
let ops: Vec<ReadOperation> = mem::take(guard.as_mut());
ops.into_iter()
}
fn push(&self, read_operation: ReadOperation) {
let mut guard = self.0.lock().expect("poisoned");
guard.push(read_operation);
}
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct ReadOperation {
pub path: PathBuf,
pub offset: u64,
pub num_bytes: usize,
pub start_date: OffsetDateTime,
pub duration: Duration,
}
struct ReadOperationBuilder {
start_date: OffsetDateTime,
start_instant: Instant,
path: PathBuf,
offset: u64,
}
impl ReadOperationBuilder {
pub fn new(path: &Path) -> Self {
let start_instant = Instant::now();
let start_date = OffsetDateTime::now_utc();
ReadOperationBuilder {
start_date,
start_instant,
path: path.to_path_buf(),
offset: 0,
}
}
pub fn with_offset(self, offset: u64) -> Self {
ReadOperationBuilder {
start_date: self.start_date,
start_instant: self.start_instant,
path: self.path,
offset,
}
}
fn terminate(self, num_bytes: usize) -> ReadOperation {
let duration = self.start_instant.elapsed();
ReadOperation {
path: self.path.clone(),
offset: self.offset,
num_bytes,
start_date: self.start_date,
duration,
}
}
}
#[derive(Debug)]
pub struct DebugProxyDirectory {
underlying: Box<dyn Directory>,
operations: OperationBuffer,
}
impl Clone for DebugProxyDirectory {
fn clone(&self) -> Self {
DebugProxyDirectory {
underlying: self.underlying.box_clone(),
operations: self.operations.clone(),
}
}
}
impl DebugProxyDirectory {
pub fn wrap(directory: Box<dyn Directory>) -> Self {
DebugProxyDirectory {
underlying: directory,
operations: OperationBuffer::default(),
}
}
pub fn drain_read_operations(&self) -> impl Iterator<Item = ReadOperation> + '_ {
self.operations.drain()
}
fn register(&self, read_op: ReadOperation) {
self.operations.push(read_op);
}
async fn register_async(&self, read_op: ReadOperation) {
self.operations.push(read_op);
}
}
struct DebugProxyFileHandle {
directory: DebugProxyDirectory,
underlying: Arc<dyn FileHandle>,
path: PathBuf,
}
#[async_trait]
impl FileHandle for DebugProxyFileHandle {
fn read_bytes(&self, byte_range: Range<u64>) -> io::Result<OwnedBytes> {
let read_operation_builder = ReadOperationBuilder::new(&self.path).with_offset(byte_range.start);
let payload = self.underlying.read_bytes(byte_range)?;
let read_operation = read_operation_builder.terminate(payload.len());
self.directory.register(read_operation);
Ok(payload)
}
async fn read_bytes_async(&self, byte_range: Range<u64>) -> io::Result<OwnedBytes> {
let read_operation_builder = ReadOperationBuilder::new(&self.path).with_offset(byte_range.start);
let payload = self.underlying.read_bytes_async(byte_range).await?;
let read_operation = read_operation_builder.terminate(payload.len());
self.directory.register_async(read_operation).await;
Ok(payload)
}
}
impl fmt::Debug for DebugProxyFileHandle {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "DebugProxyFileHandle({:?})", &self.underlying)
}
}
impl HasLen for DebugProxyFileHandle {
fn len(&self) -> u64 {
self.underlying.len()
}
}
#[async_trait]
impl Directory for DebugProxyDirectory {
fn get_file_handle(&self, path: &Path) -> Result<Arc<dyn FileHandle>, OpenReadError> {
let underlying = self.underlying.get_file_handle(path)?;
Ok(Arc::new(DebugProxyFileHandle {
underlying,
directory: self.clone(),
path: path.to_owned(),
}))
}
fn exists(&self, path: &Path) -> Result<bool, OpenReadError> {
self.underlying.exists(path)
}
fn atomic_read(&self, path: &Path) -> Result<Vec<u8>, OpenReadError> {
let read_operation_builder = ReadOperationBuilder::new(path);
let payload = self.underlying.atomic_read(path)?;
let read_operation = read_operation_builder.terminate(payload.len());
self.register(read_operation);
Ok(payload.to_vec())
}
async fn atomic_read_async(&self, path: &Path) -> Result<Vec<u8>, OpenReadError> {
let read_operation_builder = ReadOperationBuilder::new(path);
let payload = self.underlying.atomic_read_async(path).await?;
let read_operation = read_operation_builder.terminate(payload.len());
self.register(read_operation);
Ok(payload.to_vec())
}
fn acquire_lock(&self, _lock: &Lock) -> Result<DirectoryLock, LockError> {
Ok(tantivy::directory::DirectoryLock::from(Box::new(|| {})))
}
fn as_any(&self) -> &dyn Any {
self
}
fn underlying_directory(&self) -> Option<&dyn Directory> {
Some(self.underlying.as_ref())
}
fn real_directory(&self) -> &dyn Directory {
self.underlying.real_directory()
}
}