pub mod monitor;
pub mod noop;
pub mod psync;
#[cfg(target_os = "linux")]
pub mod uring;
use std::{
fmt::Debug,
future::Future,
pin::Pin,
sync::Arc,
task::{ready, Context, Poll},
};
#[cfg(feature = "tracing")]
use fastrace::{future::InSpan, prelude::*};
use foyer_common::{error::Result, spawn::Spawner};
use futures_core::future::BoxFuture;
use pin_project::pin_project;
use crate::io::{
bytes::{IoB, IoBuf, IoBufMut},
device::Partition,
};
#[cfg(not(feature = "tracing"))]
type IoHandleInner = BoxFuture<'static, (Box<dyn IoB>, Result<()>)>;
#[cfg(feature = "tracing")]
type IoHandleInner = InSpan<BoxFuture<'static, (Box<dyn IoB>, Result<()>)>>;
#[pin_project]
pub struct IoHandle {
#[pin]
inner: IoHandleInner,
callback: Option<Box<dyn FnOnce() + Send + 'static>>,
}
impl Debug for IoHandle {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("IoHandle").finish()
}
}
#[cfg(not(feature = "tracing"))]
impl From<BoxFuture<'static, (Box<dyn IoB>, Result<()>)>> for IoHandle {
fn from(inner: BoxFuture<'static, (Box<dyn IoB>, Result<()>)>) -> Self {
Self { inner, callback: None }
}
}
#[cfg(feature = "tracing")]
impl From<BoxFuture<'static, (Box<dyn IoB>, Result<()>)>> for IoHandle {
fn from(inner: BoxFuture<'static, (Box<dyn IoB>, Result<()>)>) -> Self {
let inner = inner.in_span(Span::enter_with_local_parent("foyer::storage::io::io_handle"));
Self { inner, callback: None }
}
}
impl IoHandle {
pub(crate) fn with_callback<F>(mut self, callback: F) -> Self
where
F: FnOnce() + Send + 'static,
{
assert!(self.callback.is_none(), "io handle callback can only be set once");
self.callback = Some(Box::new(callback));
self
}
}
impl Future for IoHandle {
type Output = (Box<dyn IoB>, Result<()>);
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
let res = ready!(this.inner.poll(cx));
if let Some(callback) = this.callback.take() {
callback();
}
Poll::Ready(res)
}
}
pub struct IoEngineBuildContext {
pub spawner: Spawner,
}
pub trait IoEngineConfig: Send + Sync + 'static + Debug {
fn build(self: Box<Self>, ctx: IoEngineBuildContext) -> BoxFuture<'static, Result<Arc<dyn IoEngine>>>;
fn boxed(self) -> Box<Self>
where
Self: Sized,
{
Box::new(self)
}
}
pub trait IoEngine: Send + Sync + 'static + Debug {
fn read(&self, buf: Box<dyn IoBufMut>, partition: &dyn Partition, offset: u64) -> IoHandle;
fn write(&self, buf: Box<dyn IoBuf>, partition: &dyn Partition, offset: u64) -> IoHandle;
}
#[cfg(test)]
mod tests {
use std::path::Path;
use rand::{rng, Fill};
use tempfile::tempdir;
use super::*;
#[cfg(not(madsim))]
#[cfg(target_os = "linux")]
use crate::io::engine::uring::UringIoEngineConfig;
use crate::io::{
bytes::IoSliceMut,
device::{file::FileDeviceBuilder, Device, DeviceBuilder},
engine::psync::PsyncIoEngineConfig,
};
const KIB: usize = 1024;
const MIB: usize = 1024 * 1024;
fn build_test_file_device(path: impl AsRef<Path>) -> Result<Arc<dyn Device>> {
let device = FileDeviceBuilder::new(&path).with_capacity(16 * MIB).build()?;
for _ in 0..16 {
device.create_partition(MIB)?;
}
Ok(device)
}
async fn test_read_write(engine: Arc<dyn IoEngine>, device: &dyn Device) {
let mut b1 = Box::new(IoSliceMut::new(16 * KIB));
Fill::fill(&mut b1[..], &mut rng());
let (b1, res) = engine.write(b1, device.partition(0).as_ref(), 0).await;
res.unwrap();
let b1 = b1.try_into_io_slice_mut().unwrap();
let b2 = Box::new(IoSliceMut::new(16 * KIB));
let (b2, res) = engine.read(b2, device.partition(0).as_ref(), 0).await;
res.unwrap();
let b2 = b2.try_into_io_slice_mut().unwrap();
assert_eq!(b1, b2);
}
#[test_log::test(tokio::test)]
async fn test_io_engine() {
let dir = tempdir().unwrap();
#[cfg(not(madsim))]
#[cfg(target_os = "linux")]
{
let path = dir.path().join("test_file_1");
let device = build_test_file_device(&path).unwrap();
let engine = UringIoEngineConfig::new()
.with_threads(4)
.with_io_depth(64)
.boxed()
.build(IoEngineBuildContext {
spawner: Spawner::current(),
})
.await
.unwrap();
test_read_write(engine, device.as_ref()).await;
}
let path = dir.path().join("test_file_1");
let device = build_test_file_device(&path).unwrap();
let engine = PsyncIoEngineConfig::new()
.boxed()
.build(IoEngineBuildContext {
spawner: Spawner::current(),
})
.await
.unwrap();
test_read_write(engine, device.as_ref()).await;
}
}