Skip to main content

lance_io/uring/
reader.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! UringReader implementation.
5
6use super::future::UringReadFuture;
7use super::requests::IoRequest;
8use super::thread::{SUBMITTED_COUNTER, THREAD_SELECTOR, URING_THREADS};
9use super::{DEFAULT_URING_BLOCK_SIZE, DEFAULT_URING_IO_PARALLELISM, URING_BLOCK_SIZE};
10use crate::local::to_local_path;
11use crate::traits::Reader;
12use crate::uring::requests::RequestState;
13use crate::utils::tracking_store::IOTracker;
14use bytes::{Bytes, BytesMut};
15use deepsize::DeepSizeOf;
16use futures::future::BoxFuture;
17use futures::{FutureExt, TryFutureExt};
18use lance_core::{Error, Result};
19use object_store::path::Path;
20use std::fs::File;
21use std::future::Future;
22use std::io::{self, ErrorKind};
23use std::ops::Range;
24use std::os::unix::io::{AsRawFd, RawFd};
25use std::pin::Pin;
26use std::sync::atomic::Ordering;
27use std::sync::{Arc, LazyLock, Mutex};
28use std::time::Duration;
29use tracing::instrument;
30
31/// Cache key for UringReader instances.
32/// We cache by (path, block_size) because block_size affects reader behavior.
33#[derive(Clone, Debug, Hash, Eq, PartialEq)]
34pub(super) struct CacheKey {
35    path: String,
36    block_size: usize,
37}
38
39impl CacheKey {
40    pub(super) fn new(path: &Path, block_size: usize) -> Self {
41        Self {
42            path: path.to_string(),
43            block_size,
44        }
45    }
46}
47
48/// Data stored in the cache for each opened file.
49#[derive(Clone)]
50pub(super) struct CachedReaderData {
51    pub(super) handle: Arc<UringFileHandle>,
52    pub(super) size: usize,
53}
54
55/// Global cache of open file handles.
56/// Entries expire after 60 seconds to ensure files are eventually closed.
57pub(super) static HANDLE_CACHE: LazyLock<moka::future::Cache<CacheKey, CachedReaderData>> =
58    LazyLock::new(|| {
59        moka::future::Cache::builder()
60            .time_to_live(Duration::from_secs(60))
61            .max_capacity(10_000)
62            .build()
63    });
64
65/// File handle for io_uring operations.
66///
67/// Keeps the file alive and provides the raw file descriptor.
68#[derive(Debug)]
69pub(super) struct UringFileHandle {
70    /// The file (kept alive via Arc)
71    #[allow(unused)]
72    file: Arc<File>,
73
74    /// Raw file descriptor for io_uring
75    pub(super) fd: RawFd,
76
77    /// Object store path
78    pub(super) path: Path,
79}
80
81impl UringFileHandle {
82    pub(super) fn new(file: File, path: Path) -> Self {
83        let fd = file.as_raw_fd();
84        Self {
85            file: Arc::new(file),
86            fd,
87            path,
88        }
89    }
90}
91
92/// io_uring-based reader for local files.
93///
94/// This reader uses a dedicated process-wide thread running an io_uring event loop
95/// for high-performance asynchronous I/O.
96#[derive(Debug)]
97pub struct UringReader {
98    /// File handle
99    handle: Arc<UringFileHandle>,
100
101    /// Block size for I/O operations
102    block_size: usize,
103
104    /// File size (determined at open time)
105    size: usize,
106
107    /// I/O tracker for monitoring operations
108    io_tracker: Arc<IOTracker>,
109}
110
111impl DeepSizeOf for UringReader {
112    fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize {
113        // Skip file handle (just a system resource)
114        // Only count the path's deep size
115        self.handle.path.as_ref().deep_size_of_children(context)
116    }
117}
118
119impl UringReader {
120    /// Open a file with io_uring.
121    ///
122    /// This is the internal constructor used by ObjectStore.
123    #[instrument(level = "debug")]
124    pub(crate) async fn open(
125        path: &Path,
126        block_size: usize,
127        known_size: Option<usize>,
128        io_tracker: Arc<IOTracker>,
129    ) -> Result<Box<dyn Reader>> {
130        // Determine block size with environment variable override
131        let block_size = URING_BLOCK_SIZE.unwrap_or(block_size.max(DEFAULT_URING_BLOCK_SIZE));
132
133        let cache_key = CacheKey::new(path, block_size);
134
135        // Try to get from cache first
136        if let Some(data) = HANDLE_CACHE.get(&cache_key).await {
137            // Use known_size if provided, otherwise use cached size
138            let size = known_size.unwrap_or(data.size);
139            return Ok(Box::new(Self {
140                handle: data.handle,
141                block_size,
142                size,
143                io_tracker,
144            }) as Box<dyn Reader>);
145        }
146
147        // Cache miss - open file and get size
148        let path_clone = path.clone();
149        let local_path = to_local_path(path);
150
151        let data = tokio::task::spawn_blocking(move || {
152            let file = File::open(&local_path).map_err(|e| match e.kind() {
153                ErrorKind::NotFound => Error::not_found(path_clone.to_string()),
154                _ => e.into(),
155            })?;
156
157            // Get size from known_size or file metadata
158            let size = match known_size {
159                Some(s) => s,
160                None => file.metadata()?.len() as usize,
161            };
162
163            Ok::<_, Error>(CachedReaderData {
164                handle: Arc::new(UringFileHandle::new(file, path_clone)),
165                size,
166            })
167        })
168        .await??;
169
170        // Insert into cache
171        HANDLE_CACHE.insert(cache_key, data.clone()).await;
172
173        // Return new reader instance
174        Ok(Box::new(Self {
175            handle: data.handle.clone(),
176            block_size,
177            size: data.size,
178            io_tracker,
179        }) as Box<dyn Reader>)
180    }
181
182    /// Submit a read request to the io_uring thread via channel and return a future.
183    fn submit_read(
184        &self,
185        offset: u64,
186        length: usize,
187    ) -> Pin<Box<dyn Future<Output = object_store::Result<Bytes>> + Send>> {
188        let mut buffer = BytesMut::with_capacity(length);
189        unsafe {
190            buffer.set_len(length);
191        }
192
193        // Create IoRequest with all data
194        let request = Arc::new(IoRequest {
195            fd: self.handle.fd,
196            offset,
197            length,
198            thread_id: std::thread::current().id(),
199            state: Mutex::new(RequestState {
200                completed: false,
201                waker: None,
202                err: None,
203                buffer,
204                bytes_read: 0,
205            }),
206        });
207
208        // Increment submitted counter before sending to channel
209        SUBMITTED_COUNTER.fetch_add(1, Ordering::Relaxed);
210
211        // Select thread in round-robin fashion
212        let thread_idx =
213            (THREAD_SELECTOR.fetch_add(1, Ordering::Relaxed) as usize) % URING_THREADS.len();
214
215        // Send to selected thread via channel
216        match URING_THREADS[thread_idx]
217            .request_tx
218            .send(Arc::clone(&request))
219        {
220            Ok(()) => {
221                // Return future that will be woken when operation completes
222                Box::pin(UringReadFuture { request })
223            }
224            Err(_) => {
225                // Thread died - decrement counter and return error future
226                SUBMITTED_COUNTER.fetch_sub(1, Ordering::Relaxed);
227                Box::pin(async move {
228                    Err(object_store::Error::Generic {
229                        store: "UringReader",
230                        source: Box::new(io::Error::new(
231                            io::ErrorKind::BrokenPipe,
232                            "io_uring thread died",
233                        )),
234                    })
235                })
236            }
237        }
238    }
239}
240
241impl Reader for UringReader {
242    fn path(&self) -> &Path {
243        &self.handle.path
244    }
245
246    fn block_size(&self) -> usize {
247        self.block_size
248    }
249
250    fn io_parallelism(&self) -> usize {
251        std::env::var("LANCE_URING_IO_PARALLELISM")
252            .ok()
253            .and_then(|s| s.parse().ok())
254            .unwrap_or(DEFAULT_URING_IO_PARALLELISM)
255    }
256
257    /// Returns the file size.
258    fn size(&self) -> BoxFuture<'_, object_store::Result<usize>> {
259        Box::pin(async move { Ok(self.size) })
260    }
261
262    /// Read a range of bytes using io_uring.
263    #[instrument(level = "debug", skip(self))]
264    fn get_range(&self, range: Range<usize>) -> BoxFuture<'static, object_store::Result<Bytes>> {
265        let io_tracker = self.io_tracker.clone();
266        let path = self.handle.path.clone();
267        let num_bytes = range.len() as u64;
268        let range_u64 = (range.start as u64)..(range.end as u64);
269
270        self.submit_read(range.start as u64, range.len())
271            .map_ok(move |bytes| {
272                io_tracker.record_read("get_range", path, num_bytes, Some(range_u64));
273                bytes
274            })
275            .boxed()
276    }
277
278    /// Read the entire file using io_uring.
279    #[instrument(level = "debug", skip(self))]
280    fn get_all(&self) -> BoxFuture<'static, object_store::Result<Bytes>> {
281        let size = self.size;
282        let io_tracker = self.io_tracker.clone();
283        let path = self.handle.path.clone();
284
285        self.submit_read(0, size)
286            .map_ok(move |bytes| {
287                io_tracker.record_read("get_all", path, bytes.len() as u64, None);
288                bytes
289            })
290            .boxed()
291    }
292}