lance_io/
local.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Optimized local I/Os
5
6use std::fs::File;
7use std::io::{ErrorKind, Read, SeekFrom};
8use std::ops::Range;
9use std::sync::Arc;
10
11// TODO: Clean up windows/unix stuff
12#[cfg(unix)]
13use std::os::unix::fs::FileExt;
14#[cfg(windows)]
15use std::os::windows::fs::FileExt;
16
17use async_trait::async_trait;
18use bytes::{Bytes, BytesMut};
19use deepsize::DeepSizeOf;
20use lance_core::{Error, Result};
21use object_store::path::Path;
22use snafu::location;
23use tokio::io::AsyncSeekExt;
24use tokio::sync::OnceCell;
25use tracing::instrument;
26
27use crate::object_store::DEFAULT_LOCAL_IO_PARALLELISM;
28use crate::traits::{Reader, Writer};
29use crate::utils::tracking_store::IOTracker;
30
31/// Convert an [`object_store::path::Path`] to a [`std::path::Path`].
32pub fn to_local_path(path: &Path) -> String {
33    if cfg!(windows) {
34        path.to_string()
35    } else {
36        format!("/{path}")
37    }
38}
39
40/// Recursively remove a directory, specified by [`object_store::path::Path`].
41pub fn remove_dir_all(path: &Path) -> Result<()> {
42    let local_path = to_local_path(path);
43    std::fs::remove_dir_all(local_path).map_err(|err| match err.kind() {
44        ErrorKind::NotFound => Error::NotFound {
45            uri: path.to_string(),
46            location: location!(),
47        },
48        _ => Error::from(err),
49    })?;
50    Ok(())
51}
52
53/// Copy a file from one location to another, supporting cross-filesystem copies.
54///
55/// Unlike hard links, this function works across filesystem boundaries.
56pub fn copy_file(from: &Path, to: &Path) -> Result<()> {
57    let from_path = to_local_path(from);
58    let to_path = to_local_path(to);
59
60    // Ensure the parent directory exists
61    if let Some(parent) = std::path::Path::new(&to_path).parent() {
62        std::fs::create_dir_all(parent).map_err(Error::from)?;
63    }
64
65    std::fs::copy(&from_path, &to_path).map_err(|err| match err.kind() {
66        ErrorKind::NotFound => Error::NotFound {
67            uri: from.to_string(),
68            location: location!(),
69        },
70        _ => Error::from(err),
71    })?;
72    Ok(())
73}
74
75/// [ObjectReader] for local file system.
76#[derive(Debug)]
77pub struct LocalObjectReader {
78    /// File handler.
79    file: Arc<File>,
80
81    /// Fie path.
82    path: Path,
83
84    /// Known size of the file. This is either passed in on construction or
85    /// cached on the first metadata call.
86    size: OnceCell<usize>,
87
88    /// Block size, in bytes.
89    block_size: usize,
90
91    /// IO tracker for monitoring read operations.
92    io_tracker: Arc<IOTracker>,
93}
94
95impl DeepSizeOf for LocalObjectReader {
96    fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize {
97        // Skipping `file` as it should just be a file handle
98        self.path.as_ref().deep_size_of_children(context)
99    }
100}
101
102impl LocalObjectReader {
103    pub async fn open_local_path(
104        path: impl AsRef<std::path::Path>,
105        block_size: usize,
106        known_size: Option<usize>,
107    ) -> Result<Box<dyn Reader>> {
108        let path = path.as_ref().to_owned();
109        let object_store_path = Path::from_filesystem_path(&path)?;
110        Self::open(&object_store_path, block_size, known_size).await
111    }
112
113    /// Open a local object reader, with default prefetch size.
114    ///
115    /// For backward compatibility with existing code that doesn't need tracking.
116    #[instrument(level = "debug")]
117    pub async fn open(
118        path: &Path,
119        block_size: usize,
120        known_size: Option<usize>,
121    ) -> Result<Box<dyn Reader>> {
122        Self::open_with_tracker(path, block_size, known_size, Default::default()).await
123    }
124
125    /// Open a local object reader with optional IO tracking.
126    #[instrument(level = "debug")]
127    pub(crate) async fn open_with_tracker(
128        path: &Path,
129        block_size: usize,
130        known_size: Option<usize>,
131        io_tracker: Arc<IOTracker>,
132    ) -> Result<Box<dyn Reader>> {
133        let path = path.clone();
134        let local_path = to_local_path(&path);
135        tokio::task::spawn_blocking(move || {
136            let file = File::open(&local_path).map_err(|e| match e.kind() {
137                ErrorKind::NotFound => Error::NotFound {
138                    uri: path.to_string(),
139                    location: location!(),
140                },
141                _ => e.into(),
142            })?;
143            let size = OnceCell::new_with(known_size);
144            Ok(Box::new(Self {
145                file: Arc::new(file),
146                block_size,
147                size,
148                path,
149                io_tracker,
150            }) as Box<dyn Reader>)
151        })
152        .await?
153    }
154}
155
156#[async_trait]
157impl Reader for LocalObjectReader {
158    fn path(&self) -> &Path {
159        &self.path
160    }
161
162    fn block_size(&self) -> usize {
163        self.block_size
164    }
165
166    fn io_parallelism(&self) -> usize {
167        DEFAULT_LOCAL_IO_PARALLELISM
168    }
169
170    /// Returns the file size.
171    async fn size(&self) -> object_store::Result<usize> {
172        let file = self.file.clone();
173        self.size
174            .get_or_try_init(|| async move {
175                let metadata = tokio::task::spawn_blocking(move || {
176                    file.metadata().map_err(|err| object_store::Error::Generic {
177                        store: "LocalFileSystem",
178                        source: err.into(),
179                    })
180                })
181                .await??;
182                Ok(metadata.len() as usize)
183            })
184            .await
185            .cloned()
186    }
187
188    /// Reads a range of data.
189    #[instrument(level = "debug", skip(self))]
190    async fn get_range(&self, range: Range<usize>) -> object_store::Result<Bytes> {
191        let file = self.file.clone();
192        let io_tracker = self.io_tracker.clone();
193        let path = self.path.clone();
194        let num_bytes = range.len() as u64;
195        let range_u64 = (range.start as u64)..(range.end as u64);
196
197        let result = tokio::task::spawn_blocking(move || {
198            let mut buf = BytesMut::with_capacity(range.len());
199            // Safety: `buf` is set with appropriate capacity above. It is
200            // written to below and we check all data is initialized at that point.
201            unsafe { buf.set_len(range.len()) };
202            #[cfg(unix)]
203            file.read_exact_at(buf.as_mut(), range.start as u64)?;
204            #[cfg(windows)]
205            read_exact_at(file, buf.as_mut(), range.start as u64)?;
206
207            Ok(buf.freeze())
208        })
209        .await?
210        .map_err(|err: std::io::Error| object_store::Error::Generic {
211            store: "LocalFileSystem",
212            source: err.into(),
213        });
214
215        if result.is_ok() {
216            io_tracker.record_read("get_range", path, num_bytes, Some(range_u64));
217        }
218
219        result
220    }
221
222    /// Reads the entire file.
223    #[instrument(level = "debug", skip(self))]
224    async fn get_all(&self) -> object_store::Result<Bytes> {
225        let mut file = self.file.clone();
226        let io_tracker = self.io_tracker.clone();
227        let path = self.path.clone();
228
229        let result = tokio::task::spawn_blocking(move || {
230            let mut buf = Vec::new();
231            file.read_to_end(buf.as_mut())?;
232            Ok(Bytes::from(buf))
233        })
234        .await?
235        .map_err(|err: std::io::Error| object_store::Error::Generic {
236            store: "LocalFileSystem",
237            source: err.into(),
238        });
239
240        if let Ok(bytes) = &result {
241            io_tracker.record_read("get_all", path, bytes.len() as u64, None);
242        }
243
244        result
245    }
246}
247
248#[cfg(windows)]
249fn read_exact_at(file: Arc<File>, mut buf: &mut [u8], mut offset: u64) -> std::io::Result<()> {
250    let expected_len = buf.len();
251    while !buf.is_empty() {
252        match file.seek_read(buf, offset) {
253            Ok(0) => break,
254            Ok(n) => {
255                let tmp = buf;
256                buf = &mut tmp[n..];
257                offset += n as u64;
258            }
259            Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
260            Err(e) => return Err(e),
261        }
262    }
263    if !buf.is_empty() {
264        Err(std::io::Error::new(
265            std::io::ErrorKind::UnexpectedEof,
266            format!(
267                "failed to fill whole buffer. Expected {} bytes, got {}",
268                expected_len, offset
269            ),
270        ))
271    } else {
272        Ok(())
273    }
274}
275
276#[async_trait]
277impl Writer for tokio::fs::File {
278    async fn tell(&mut self) -> Result<usize> {
279        Ok(self.seek(SeekFrom::Current(0)).await? as usize)
280    }
281}