use super::future::UringReadFuture;
use super::requests::IoRequest;
use super::thread::{SUBMITTED_COUNTER, THREAD_SELECTOR, URING_THREADS};
use super::{DEFAULT_URING_BLOCK_SIZE, DEFAULT_URING_IO_PARALLELISM, URING_BLOCK_SIZE};
use crate::local::to_local_path;
use crate::traits::Reader;
use crate::uring::requests::RequestState;
use crate::utils::tracking_store::IOTracker;
use bytes::{Bytes, BytesMut};
use deepsize::DeepSizeOf;
use futures::future::BoxFuture;
use futures::{FutureExt, TryFutureExt};
use lance_core::{Error, Result};
use object_store::path::Path;
use std::fs::File;
use std::future::Future;
use std::io::{self, ErrorKind};
use std::ops::Range;
use std::os::unix::io::{AsRawFd, RawFd};
use std::pin::Pin;
use std::sync::atomic::Ordering;
use std::sync::{Arc, LazyLock, Mutex};
use std::time::Duration;
use tracing::instrument;
#[derive(Clone, Debug, Hash, Eq, PartialEq)]
pub(super) struct CacheKey {
path: String,
block_size: usize,
}
impl CacheKey {
pub(super) fn new(path: &Path, block_size: usize) -> Self {
Self {
path: path.to_string(),
block_size,
}
}
}
#[derive(Clone)]
pub(super) struct CachedReaderData {
pub(super) handle: Arc<UringFileHandle>,
pub(super) size: usize,
}
pub(super) static HANDLE_CACHE: LazyLock<moka::future::Cache<CacheKey, CachedReaderData>> =
LazyLock::new(|| {
moka::future::Cache::builder()
.time_to_live(Duration::from_secs(60))
.max_capacity(10_000)
.build()
});
#[derive(Debug)]
pub(super) struct UringFileHandle {
#[allow(unused)]
file: Arc<File>,
pub(super) fd: RawFd,
pub(super) path: Path,
}
impl UringFileHandle {
pub(super) fn new(file: File, path: Path) -> Self {
let fd = file.as_raw_fd();
Self {
file: Arc::new(file),
fd,
path,
}
}
}
#[derive(Debug)]
pub struct UringReader {
handle: Arc<UringFileHandle>,
block_size: usize,
size: usize,
io_tracker: Arc<IOTracker>,
}
impl DeepSizeOf for UringReader {
fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize {
self.handle.path.as_ref().deep_size_of_children(context)
}
}
impl UringReader {
#[instrument(level = "debug")]
pub(crate) async fn open(
path: &Path,
block_size: usize,
known_size: Option<usize>,
io_tracker: Arc<IOTracker>,
) -> Result<Box<dyn Reader>> {
let block_size = URING_BLOCK_SIZE.unwrap_or(block_size.max(DEFAULT_URING_BLOCK_SIZE));
let cache_key = CacheKey::new(path, block_size);
if let Some(data) = HANDLE_CACHE.get(&cache_key).await {
let size = known_size.unwrap_or(data.size);
return Ok(Box::new(Self {
handle: data.handle,
block_size,
size,
io_tracker,
}) as Box<dyn Reader>);
}
let path_clone = path.clone();
let local_path = to_local_path(path);
let data = tokio::task::spawn_blocking(move || {
let file = File::open(&local_path).map_err(|e| match e.kind() {
ErrorKind::NotFound => Error::not_found(path_clone.to_string()),
_ => e.into(),
})?;
let size = match known_size {
Some(s) => s,
None => file.metadata()?.len() as usize,
};
Ok::<_, Error>(CachedReaderData {
handle: Arc::new(UringFileHandle::new(file, path_clone)),
size,
})
})
.await??;
HANDLE_CACHE.insert(cache_key, data.clone()).await;
Ok(Box::new(Self {
handle: data.handle.clone(),
block_size,
size: data.size,
io_tracker,
}) as Box<dyn Reader>)
}
fn submit_read(
&self,
offset: u64,
length: usize,
) -> Pin<Box<dyn Future<Output = object_store::Result<Bytes>> + Send>> {
let mut buffer = BytesMut::with_capacity(length);
unsafe {
buffer.set_len(length);
}
let request = Arc::new(IoRequest {
fd: self.handle.fd,
offset,
length,
thread_id: std::thread::current().id(),
state: Mutex::new(RequestState {
completed: false,
waker: None,
err: None,
buffer,
bytes_read: 0,
}),
});
SUBMITTED_COUNTER.fetch_add(1, Ordering::Relaxed);
let thread_idx =
(THREAD_SELECTOR.fetch_add(1, Ordering::Relaxed) as usize) % URING_THREADS.len();
match URING_THREADS[thread_idx]
.request_tx
.send(Arc::clone(&request))
{
Ok(()) => {
Box::pin(UringReadFuture { request })
}
Err(_) => {
SUBMITTED_COUNTER.fetch_sub(1, Ordering::Relaxed);
Box::pin(async move {
Err(object_store::Error::Generic {
store: "UringReader",
source: Box::new(io::Error::new(
io::ErrorKind::BrokenPipe,
"io_uring thread died",
)),
})
})
}
}
}
}
impl Reader for UringReader {
fn path(&self) -> &Path {
&self.handle.path
}
fn block_size(&self) -> usize {
self.block_size
}
fn io_parallelism(&self) -> usize {
std::env::var("LANCE_URING_IO_PARALLELISM")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(DEFAULT_URING_IO_PARALLELISM)
}
fn size(&self) -> BoxFuture<'_, object_store::Result<usize>> {
Box::pin(async move { Ok(self.size) })
}
#[instrument(level = "debug", skip(self))]
fn get_range(&self, range: Range<usize>) -> BoxFuture<'static, object_store::Result<Bytes>> {
let io_tracker = self.io_tracker.clone();
let path = self.handle.path.clone();
let num_bytes = range.len() as u64;
let range_u64 = (range.start as u64)..(range.end as u64);
self.submit_read(range.start as u64, range.len())
.map_ok(move |bytes| {
io_tracker.record_read("get_range", path, num_bytes, Some(range_u64));
bytes
})
.boxed()
}
#[instrument(level = "debug", skip(self))]
fn get_all(&self) -> BoxFuture<'static, object_store::Result<Bytes>> {
let size = self.size;
let io_tracker = self.io_tracker.clone();
let path = self.handle.path.clone();
self.submit_read(0, size)
.map_ok(move |bytes| {
io_tracker.record_read("get_all", path, bytes.len() as u64, None);
bytes
})
.boxed()
}
}