Skip to main content

lance_io/
local.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Optimized local I/Os
5
6use std::fs::File;
7use std::io::{ErrorKind, Read, SeekFrom};
8use std::ops::Range;
9use std::sync::Arc;
10
11// TODO: Clean up windows/unix stuff
12#[cfg(unix)]
13use std::os::unix::fs::FileExt;
14#[cfg(windows)]
15use std::os::windows::fs::FileExt;
16
17use async_trait::async_trait;
18use bytes::{Bytes, BytesMut};
19use deepsize::DeepSizeOf;
20use futures::future::BoxFuture;
21use lance_core::{Error, Result};
22use object_store::path::Path;
23use tokio::io::AsyncSeekExt;
24use tokio::sync::OnceCell;
25use tracing::instrument;
26
27use crate::object_reader::stream_local_range;
28use crate::object_store::DEFAULT_LOCAL_IO_PARALLELISM;
29use crate::object_writer::WriteResult;
30use crate::traits::{ByteStream, Reader, Writer};
31use crate::utils::tracking_store::IOTracker;
32
33/// Convert an [`object_store::path::Path`] to a [`std::path::Path`].
34pub fn to_local_path(path: &Path) -> String {
35    if cfg!(windows) {
36        path.to_string()
37    } else {
38        format!("/{path}")
39    }
40}
41
42/// Recursively remove a directory, specified by [`object_store::path::Path`].
43pub fn remove_dir_all(path: &Path) -> Result<()> {
44    let local_path = to_local_path(path);
45    std::fs::remove_dir_all(local_path).map_err(|err| match err.kind() {
46        ErrorKind::NotFound => Error::not_found(path.to_string()),
47        _ => Error::from(err),
48    })?;
49    Ok(())
50}
51
52/// Copy a file from one location to another, supporting cross-filesystem copies.
53///
54/// Unlike hard links, this function works across filesystem boundaries.
55pub fn copy_file(from: &Path, to: &Path) -> Result<()> {
56    let from_path = to_local_path(from);
57    let to_path = to_local_path(to);
58
59    // Ensure the parent directory exists
60    if let Some(parent) = std::path::Path::new(&to_path).parent() {
61        std::fs::create_dir_all(parent).map_err(Error::from)?;
62    }
63
64    std::fs::copy(&from_path, &to_path).map_err(|err| match err.kind() {
65        ErrorKind::NotFound => Error::not_found(from.to_string()),
66        _ => Error::from(err),
67    })?;
68    Ok(())
69}
70
71/// Object reader for local file system.
72#[derive(Debug)]
73pub struct LocalObjectReader {
74    /// File handler.
75    file: Arc<File>,
76
77    /// Fie path.
78    path: Path,
79
80    /// Known size of the file. This is either passed in on construction or
81    /// cached on the first metadata call.
82    size: OnceCell<usize>,
83
84    /// Block size, in bytes.
85    block_size: usize,
86
87    /// IO tracker for monitoring read operations.
88    io_tracker: Arc<IOTracker>,
89}
90
91impl DeepSizeOf for LocalObjectReader {
92    fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize {
93        // Skipping `file` as it should just be a file handle
94        self.path.as_ref().deep_size_of_children(context)
95    }
96}
97
98impl LocalObjectReader {
99    pub async fn open_local_path(
100        path: impl AsRef<std::path::Path>,
101        block_size: usize,
102        known_size: Option<usize>,
103    ) -> Result<Box<dyn Reader>> {
104        let path = path.as_ref().to_owned();
105        let object_store_path = Path::from_filesystem_path(&path)?;
106        Self::open(&object_store_path, block_size, known_size).await
107    }
108
109    /// Open a local object reader, with default prefetch size.
110    ///
111    /// For backward compatibility with existing code that doesn't need tracking.
112    #[instrument(level = "debug")]
113    pub async fn open(
114        path: &Path,
115        block_size: usize,
116        known_size: Option<usize>,
117    ) -> Result<Box<dyn Reader>> {
118        Self::open_with_tracker(path, block_size, known_size, Default::default()).await
119    }
120
121    /// Open a local object reader with optional IO tracking.
122    #[instrument(level = "debug")]
123    pub(crate) async fn open_with_tracker(
124        path: &Path,
125        block_size: usize,
126        known_size: Option<usize>,
127        io_tracker: Arc<IOTracker>,
128    ) -> Result<Box<dyn Reader>> {
129        let path = path.clone();
130        let local_path = to_local_path(&path);
131        tokio::task::spawn_blocking(move || {
132            let file = File::open(&local_path).map_err(|e| match e.kind() {
133                ErrorKind::NotFound => Error::not_found(path.to_string()),
134                _ => e.into(),
135            })?;
136            let size = OnceCell::new_with(known_size);
137            Ok(Box::new(Self {
138                file: Arc::new(file),
139                block_size,
140                size,
141                path,
142                io_tracker,
143            }) as Box<dyn Reader>)
144        })
145        .await?
146    }
147}
148
149impl Reader for LocalObjectReader {
150    fn path(&self) -> &Path {
151        &self.path
152    }
153
154    fn block_size(&self) -> usize {
155        self.block_size
156    }
157
158    fn io_parallelism(&self) -> usize {
159        DEFAULT_LOCAL_IO_PARALLELISM
160    }
161
162    /// Returns the file size.
163    fn size(&self) -> BoxFuture<'_, object_store::Result<usize>> {
164        Box::pin(async move {
165            let file = self.file.clone();
166            self.size
167                .get_or_try_init(|| async move {
168                    let metadata = tokio::task::spawn_blocking(move || {
169                        file.metadata().map_err(|err| object_store::Error::Generic {
170                            store: "LocalFileSystem",
171                            source: err.into(),
172                        })
173                    })
174                    .await??;
175                    Ok(metadata.len() as usize)
176                })
177                .await
178                .cloned()
179        })
180    }
181
182    /// Reads a range of data.
183    #[instrument(level = "debug", skip(self))]
184    fn get_range(&self, range: Range<usize>) -> BoxFuture<'static, object_store::Result<Bytes>> {
185        let file = self.file.clone();
186        let io_tracker = self.io_tracker.clone();
187        let path = self.path.clone();
188        let num_bytes = range.len() as u64;
189        let range_u64 = (range.start as u64)..(range.end as u64);
190
191        Box::pin(async move {
192            let result = tokio::task::spawn_blocking(move || {
193                let mut buf = BytesMut::with_capacity(range.len());
194                // Safety: `buf` is set with appropriate capacity above. It is
195                // written to below and we check all data is initialized at that point.
196                unsafe { buf.set_len(range.len()) };
197                #[cfg(unix)]
198                file.read_exact_at(buf.as_mut(), range.start as u64)?;
199                #[cfg(windows)]
200                read_exact_at(file, buf.as_mut(), range.start as u64)?;
201
202                Ok(buf.freeze())
203            })
204            .await?
205            .map_err(|err: std::io::Error| object_store::Error::Generic {
206                store: "LocalFileSystem",
207                source: err.into(),
208            });
209
210            if result.is_ok() {
211                io_tracker.record_read("get_range", path, num_bytes, Some(range_u64));
212            }
213
214            result
215        })
216    }
217
218    /// Reads the entire file.
219    #[instrument(level = "debug", skip(self))]
220    fn get_all(&self) -> BoxFuture<'_, object_store::Result<Bytes>> {
221        Box::pin(async move {
222            let mut file = self.file.clone();
223            let io_tracker = self.io_tracker.clone();
224            let path = self.path.clone();
225
226            let result = tokio::task::spawn_blocking(move || {
227                let mut buf = Vec::new();
228                file.read_to_end(buf.as_mut())?;
229                Ok(Bytes::from(buf))
230            })
231            .await?
232            .map_err(|err: std::io::Error| object_store::Error::Generic {
233                store: "LocalFileSystem",
234                source: err.into(),
235            });
236
237            if let Ok(bytes) = &result {
238                io_tracker.record_read("get_all", path, bytes.len() as u64, None);
239            }
240
241            result
242        })
243    }
244
245    fn get_stream(&self) -> BoxFuture<'_, object_store::Result<ByteStream>> {
246        Box::pin(async move {
247            let size = self.size().await?;
248            Ok(stream_local_range(
249                self.file.clone(),
250                self.path.clone(),
251                self.io_tracker.clone(),
252                0..size,
253                self.block_size.max(8 * 1024),
254            ))
255        })
256    }
257
258    fn get_range_stream(
259        &self,
260        range: Range<usize>,
261    ) -> BoxFuture<'_, object_store::Result<ByteStream>> {
262        let file = self.file.clone();
263        let path = self.path.clone();
264        let io_tracker = self.io_tracker.clone();
265        let chunk_size = self.block_size.max(8 * 1024);
266        Box::pin(async move {
267            Ok(stream_local_range(
268                file, path, io_tracker, range, chunk_size,
269            ))
270        })
271    }
272}
273
274#[cfg(windows)]
275pub(crate) fn read_exact_at(
276    file: Arc<File>,
277    mut buf: &mut [u8],
278    mut offset: u64,
279) -> std::io::Result<()> {
280    let expected_len = buf.len();
281    while !buf.is_empty() {
282        match file.seek_read(buf, offset) {
283            Ok(0) => break,
284            Ok(n) => {
285                let tmp = buf;
286                buf = &mut tmp[n..];
287                offset += n as u64;
288            }
289            Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
290            Err(e) => return Err(e),
291        }
292    }
293    if !buf.is_empty() {
294        Err(std::io::Error::new(
295            std::io::ErrorKind::UnexpectedEof,
296            format!(
297                "failed to fill whole buffer. Expected {} bytes, got {}",
298                expected_len, offset
299            ),
300        ))
301    } else {
302        Ok(())
303    }
304}
305
306#[async_trait]
307impl Writer for tokio::fs::File {
308    async fn tell(&mut self) -> Result<usize> {
309        Ok(self.seek(SeekFrom::Current(0)).await? as usize)
310    }
311
312    async fn shutdown(&mut self) -> Result<WriteResult> {
313        let size = self.seek(SeekFrom::Current(0)).await? as usize;
314        tokio::io::AsyncWriteExt::shutdown(self).await?;
315        Ok(WriteResult { size, e_tag: None })
316    }
317}