1use super::future::UringReadFuture;
7use super::requests::IoRequest;
8use super::thread::{SUBMITTED_COUNTER, THREAD_SELECTOR, URING_THREADS};
9use super::{DEFAULT_URING_BLOCK_SIZE, DEFAULT_URING_IO_PARALLELISM, URING_BLOCK_SIZE};
10use crate::local::to_local_path;
11use crate::traits::Reader;
12use crate::uring::requests::RequestState;
13use crate::utils::tracking_store::IOTracker;
14use bytes::{Bytes, BytesMut};
15use deepsize::DeepSizeOf;
16use futures::future::BoxFuture;
17use futures::{FutureExt, TryFutureExt};
18use lance_core::{Error, Result};
19use object_store::path::Path;
20use std::fs::File;
21use std::future::Future;
22use std::io::{self, ErrorKind};
23use std::ops::Range;
24use std::os::unix::io::{AsRawFd, RawFd};
25use std::pin::Pin;
26use std::sync::atomic::Ordering;
27use std::sync::{Arc, LazyLock, Mutex};
28use std::time::Duration;
29use tracing::instrument;
30
31#[derive(Clone, Debug, Hash, Eq, PartialEq)]
34pub(super) struct CacheKey {
35 path: String,
36 block_size: usize,
37}
38
39impl CacheKey {
40 pub(super) fn new(path: &Path, block_size: usize) -> Self {
41 Self {
42 path: path.to_string(),
43 block_size,
44 }
45 }
46}
47
48#[derive(Clone)]
50pub(super) struct CachedReaderData {
51 pub(super) handle: Arc<UringFileHandle>,
52 pub(super) size: usize,
53}
54
55pub(super) static HANDLE_CACHE: LazyLock<moka::future::Cache<CacheKey, CachedReaderData>> =
58 LazyLock::new(|| {
59 moka::future::Cache::builder()
60 .time_to_live(Duration::from_secs(60))
61 .max_capacity(10_000)
62 .build()
63 });
64
65#[derive(Debug)]
69pub(super) struct UringFileHandle {
70 #[allow(unused)]
72 file: Arc<File>,
73
74 pub(super) fd: RawFd,
76
77 pub(super) path: Path,
79}
80
81impl UringFileHandle {
82 pub(super) fn new(file: File, path: Path) -> Self {
83 let fd = file.as_raw_fd();
84 Self {
85 file: Arc::new(file),
86 fd,
87 path,
88 }
89 }
90}
91
92#[derive(Debug)]
97pub struct UringReader {
98 handle: Arc<UringFileHandle>,
100
101 block_size: usize,
103
104 size: usize,
106
107 io_tracker: Arc<IOTracker>,
109}
110
111impl DeepSizeOf for UringReader {
112 fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize {
113 self.handle.path.as_ref().deep_size_of_children(context)
116 }
117}
118
119impl UringReader {
120 #[instrument(level = "debug")]
124 pub(crate) async fn open(
125 path: &Path,
126 block_size: usize,
127 known_size: Option<usize>,
128 io_tracker: Arc<IOTracker>,
129 ) -> Result<Box<dyn Reader>> {
130 let block_size = URING_BLOCK_SIZE.unwrap_or(block_size.max(DEFAULT_URING_BLOCK_SIZE));
132
133 let cache_key = CacheKey::new(path, block_size);
134
135 if let Some(data) = HANDLE_CACHE.get(&cache_key).await {
137 let size = known_size.unwrap_or(data.size);
139 return Ok(Box::new(Self {
140 handle: data.handle,
141 block_size,
142 size,
143 io_tracker,
144 }) as Box<dyn Reader>);
145 }
146
147 let path_clone = path.clone();
149 let local_path = to_local_path(path);
150
151 let data = tokio::task::spawn_blocking(move || {
152 let file = File::open(&local_path).map_err(|e| match e.kind() {
153 ErrorKind::NotFound => Error::not_found(path_clone.to_string()),
154 _ => e.into(),
155 })?;
156
157 let size = match known_size {
159 Some(s) => s,
160 None => file.metadata()?.len() as usize,
161 };
162
163 Ok::<_, Error>(CachedReaderData {
164 handle: Arc::new(UringFileHandle::new(file, path_clone)),
165 size,
166 })
167 })
168 .await??;
169
170 HANDLE_CACHE.insert(cache_key, data.clone()).await;
172
173 Ok(Box::new(Self {
175 handle: data.handle.clone(),
176 block_size,
177 size: data.size,
178 io_tracker,
179 }) as Box<dyn Reader>)
180 }
181
182 fn submit_read(
184 &self,
185 offset: u64,
186 length: usize,
187 ) -> Pin<Box<dyn Future<Output = object_store::Result<Bytes>> + Send>> {
188 let mut buffer = BytesMut::with_capacity(length);
189 unsafe {
190 buffer.set_len(length);
191 }
192
193 let request = Arc::new(IoRequest {
195 fd: self.handle.fd,
196 offset,
197 length,
198 thread_id: std::thread::current().id(),
199 state: Mutex::new(RequestState {
200 completed: false,
201 waker: None,
202 err: None,
203 buffer,
204 bytes_read: 0,
205 }),
206 });
207
208 SUBMITTED_COUNTER.fetch_add(1, Ordering::Relaxed);
210
211 let thread_idx =
213 (THREAD_SELECTOR.fetch_add(1, Ordering::Relaxed) as usize) % URING_THREADS.len();
214
215 match URING_THREADS[thread_idx]
217 .request_tx
218 .send(Arc::clone(&request))
219 {
220 Ok(()) => {
221 Box::pin(UringReadFuture { request })
223 }
224 Err(_) => {
225 SUBMITTED_COUNTER.fetch_sub(1, Ordering::Relaxed);
227 Box::pin(async move {
228 Err(object_store::Error::Generic {
229 store: "UringReader",
230 source: Box::new(io::Error::new(
231 io::ErrorKind::BrokenPipe,
232 "io_uring thread died",
233 )),
234 })
235 })
236 }
237 }
238 }
239}
240
241impl Reader for UringReader {
242 fn path(&self) -> &Path {
243 &self.handle.path
244 }
245
246 fn block_size(&self) -> usize {
247 self.block_size
248 }
249
250 fn io_parallelism(&self) -> usize {
251 std::env::var("LANCE_URING_IO_PARALLELISM")
252 .ok()
253 .and_then(|s| s.parse().ok())
254 .unwrap_or(DEFAULT_URING_IO_PARALLELISM)
255 }
256
257 fn size(&self) -> BoxFuture<'_, object_store::Result<usize>> {
259 Box::pin(async move { Ok(self.size) })
260 }
261
262 #[instrument(level = "debug", skip(self))]
264 fn get_range(&self, range: Range<usize>) -> BoxFuture<'static, object_store::Result<Bytes>> {
265 let io_tracker = self.io_tracker.clone();
266 let path = self.handle.path.clone();
267 let num_bytes = range.len() as u64;
268 let range_u64 = (range.start as u64)..(range.end as u64);
269
270 self.submit_read(range.start as u64, range.len())
271 .map_ok(move |bytes| {
272 io_tracker.record_read("get_range", path, num_bytes, Some(range_u64));
273 bytes
274 })
275 .boxed()
276 }
277
278 #[instrument(level = "debug", skip(self))]
280 fn get_all(&self) -> BoxFuture<'static, object_store::Result<Bytes>> {
281 let size = self.size;
282 let io_tracker = self.io_tracker.clone();
283 let path = self.handle.path.clone();
284
285 self.submit_read(0, size)
286 .map_ok(move |bytes| {
287 io_tracker.record_read("get_all", path, bytes.len() as u64, None);
288 bytes
289 })
290 .boxed()
291 }
292}