Skip to main content

lance_io/
local.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Optimized local I/Os
5
6use std::fs::File;
7use std::io::{ErrorKind, Read, SeekFrom};
8use std::ops::Range;
9use std::sync::Arc;
10
11// TODO: Clean up windows/unix stuff
12#[cfg(unix)]
13use std::os::unix::fs::FileExt;
14#[cfg(windows)]
15use std::os::windows::fs::FileExt;
16
17use async_trait::async_trait;
18use bytes::{Bytes, BytesMut};
19use deepsize::DeepSizeOf;
20use futures::future::BoxFuture;
21use lance_core::{Error, Result};
22use object_store::path::Path;
23use tokio::io::AsyncSeekExt;
24use tokio::sync::OnceCell;
25use tracing::instrument;
26
27use crate::object_store::DEFAULT_LOCAL_IO_PARALLELISM;
28use crate::object_writer::WriteResult;
29use crate::traits::{Reader, Writer};
30use crate::utils::tracking_store::IOTracker;
31
32/// Convert an [`object_store::path::Path`] to a [`std::path::Path`].
33pub fn to_local_path(path: &Path) -> String {
34    if cfg!(windows) {
35        path.to_string()
36    } else {
37        format!("/{path}")
38    }
39}
40
41/// Recursively remove a directory, specified by [`object_store::path::Path`].
42pub fn remove_dir_all(path: &Path) -> Result<()> {
43    let local_path = to_local_path(path);
44    std::fs::remove_dir_all(local_path).map_err(|err| match err.kind() {
45        ErrorKind::NotFound => Error::not_found(path.to_string()),
46        _ => Error::from(err),
47    })?;
48    Ok(())
49}
50
51/// Copy a file from one location to another, supporting cross-filesystem copies.
52///
53/// Unlike hard links, this function works across filesystem boundaries.
54pub fn copy_file(from: &Path, to: &Path) -> Result<()> {
55    let from_path = to_local_path(from);
56    let to_path = to_local_path(to);
57
58    // Ensure the parent directory exists
59    if let Some(parent) = std::path::Path::new(&to_path).parent() {
60        std::fs::create_dir_all(parent).map_err(Error::from)?;
61    }
62
63    std::fs::copy(&from_path, &to_path).map_err(|err| match err.kind() {
64        ErrorKind::NotFound => Error::not_found(from.to_string()),
65        _ => Error::from(err),
66    })?;
67    Ok(())
68}
69
70/// Object reader for local file system.
71#[derive(Debug)]
72pub struct LocalObjectReader {
73    /// File handler.
74    file: Arc<File>,
75
76    /// Fie path.
77    path: Path,
78
79    /// Known size of the file. This is either passed in on construction or
80    /// cached on the first metadata call.
81    size: OnceCell<usize>,
82
83    /// Block size, in bytes.
84    block_size: usize,
85
86    /// IO tracker for monitoring read operations.
87    io_tracker: Arc<IOTracker>,
88}
89
90impl DeepSizeOf for LocalObjectReader {
91    fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize {
92        // Skipping `file` as it should just be a file handle
93        self.path.as_ref().deep_size_of_children(context)
94    }
95}
96
97impl LocalObjectReader {
98    pub async fn open_local_path(
99        path: impl AsRef<std::path::Path>,
100        block_size: usize,
101        known_size: Option<usize>,
102    ) -> Result<Box<dyn Reader>> {
103        let path = path.as_ref().to_owned();
104        let object_store_path = Path::from_filesystem_path(&path)?;
105        Self::open(&object_store_path, block_size, known_size).await
106    }
107
108    /// Open a local object reader, with default prefetch size.
109    ///
110    /// For backward compatibility with existing code that doesn't need tracking.
111    #[instrument(level = "debug")]
112    pub async fn open(
113        path: &Path,
114        block_size: usize,
115        known_size: Option<usize>,
116    ) -> Result<Box<dyn Reader>> {
117        Self::open_with_tracker(path, block_size, known_size, Default::default()).await
118    }
119
120    /// Open a local object reader with optional IO tracking.
121    #[instrument(level = "debug")]
122    pub(crate) async fn open_with_tracker(
123        path: &Path,
124        block_size: usize,
125        known_size: Option<usize>,
126        io_tracker: Arc<IOTracker>,
127    ) -> Result<Box<dyn Reader>> {
128        let path = path.clone();
129        let local_path = to_local_path(&path);
130        tokio::task::spawn_blocking(move || {
131            let file = File::open(&local_path).map_err(|e| match e.kind() {
132                ErrorKind::NotFound => Error::not_found(path.to_string()),
133                _ => e.into(),
134            })?;
135            let size = OnceCell::new_with(known_size);
136            Ok(Box::new(Self {
137                file: Arc::new(file),
138                block_size,
139                size,
140                path,
141                io_tracker,
142            }) as Box<dyn Reader>)
143        })
144        .await?
145    }
146}
147
148impl Reader for LocalObjectReader {
149    fn path(&self) -> &Path {
150        &self.path
151    }
152
153    fn block_size(&self) -> usize {
154        self.block_size
155    }
156
157    fn io_parallelism(&self) -> usize {
158        DEFAULT_LOCAL_IO_PARALLELISM
159    }
160
161    /// Returns the file size.
162    fn size(&self) -> BoxFuture<'_, object_store::Result<usize>> {
163        Box::pin(async move {
164            let file = self.file.clone();
165            self.size
166                .get_or_try_init(|| async move {
167                    let metadata = tokio::task::spawn_blocking(move || {
168                        file.metadata().map_err(|err| object_store::Error::Generic {
169                            store: "LocalFileSystem",
170                            source: err.into(),
171                        })
172                    })
173                    .await??;
174                    Ok(metadata.len() as usize)
175                })
176                .await
177                .cloned()
178        })
179    }
180
181    /// Reads a range of data.
182    #[instrument(level = "debug", skip(self))]
183    fn get_range(&self, range: Range<usize>) -> BoxFuture<'static, object_store::Result<Bytes>> {
184        let file = self.file.clone();
185        let io_tracker = self.io_tracker.clone();
186        let path = self.path.clone();
187        let num_bytes = range.len() as u64;
188        let range_u64 = (range.start as u64)..(range.end as u64);
189
190        Box::pin(async move {
191            let result = tokio::task::spawn_blocking(move || {
192                let mut buf = BytesMut::with_capacity(range.len());
193                // Safety: `buf` is set with appropriate capacity above. It is
194                // written to below and we check all data is initialized at that point.
195                unsafe { buf.set_len(range.len()) };
196                #[cfg(unix)]
197                file.read_exact_at(buf.as_mut(), range.start as u64)?;
198                #[cfg(windows)]
199                read_exact_at(file, buf.as_mut(), range.start as u64)?;
200
201                Ok(buf.freeze())
202            })
203            .await?
204            .map_err(|err: std::io::Error| object_store::Error::Generic {
205                store: "LocalFileSystem",
206                source: err.into(),
207            });
208
209            if result.is_ok() {
210                io_tracker.record_read("get_range", path, num_bytes, Some(range_u64));
211            }
212
213            result
214        })
215    }
216
217    /// Reads the entire file.
218    #[instrument(level = "debug", skip(self))]
219    fn get_all(&self) -> BoxFuture<'_, object_store::Result<Bytes>> {
220        Box::pin(async move {
221            let mut file = self.file.clone();
222            let io_tracker = self.io_tracker.clone();
223            let path = self.path.clone();
224
225            let result = tokio::task::spawn_blocking(move || {
226                let mut buf = Vec::new();
227                file.read_to_end(buf.as_mut())?;
228                Ok(Bytes::from(buf))
229            })
230            .await?
231            .map_err(|err: std::io::Error| object_store::Error::Generic {
232                store: "LocalFileSystem",
233                source: err.into(),
234            });
235
236            if let Ok(bytes) = &result {
237                io_tracker.record_read("get_all", path, bytes.len() as u64, None);
238            }
239
240            result
241        })
242    }
243}
244
245#[cfg(windows)]
246fn read_exact_at(file: Arc<File>, mut buf: &mut [u8], mut offset: u64) -> std::io::Result<()> {
247    let expected_len = buf.len();
248    while !buf.is_empty() {
249        match file.seek_read(buf, offset) {
250            Ok(0) => break,
251            Ok(n) => {
252                let tmp = buf;
253                buf = &mut tmp[n..];
254                offset += n as u64;
255            }
256            Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
257            Err(e) => return Err(e),
258        }
259    }
260    if !buf.is_empty() {
261        Err(std::io::Error::new(
262            std::io::ErrorKind::UnexpectedEof,
263            format!(
264                "failed to fill whole buffer. Expected {} bytes, got {}",
265                expected_len, offset
266            ),
267        ))
268    } else {
269        Ok(())
270    }
271}
272
273#[async_trait]
274impl Writer for tokio::fs::File {
275    async fn tell(&mut self) -> Result<usize> {
276        Ok(self.seek(SeekFrom::Current(0)).await? as usize)
277    }
278
279    async fn shutdown(&mut self) -> Result<WriteResult> {
280        let size = self.seek(SeekFrom::Current(0)).await? as usize;
281        tokio::io::AsyncWriteExt::shutdown(self).await?;
282        Ok(WriteResult { size, e_tag: None })
283    }
284}