lance_io/
local.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Optimized local I/Os
5
6use std::fs::File;
7use std::io::{ErrorKind, Read, SeekFrom};
8use std::ops::Range;
9use std::sync::Arc;
10
11// TODO: Clean up windows/unix stuff
12#[cfg(unix)]
13use std::os::unix::fs::FileExt;
14#[cfg(windows)]
15use std::os::windows::fs::FileExt;
16
17use async_trait::async_trait;
18use bytes::{Bytes, BytesMut};
19use deepsize::DeepSizeOf;
20use lance_core::{Error, Result};
21use object_store::path::Path;
22use snafu::location;
23use tokio::io::AsyncSeekExt;
24use tokio::sync::OnceCell;
25use tracing::instrument;
26
27use crate::object_store::DEFAULT_LOCAL_IO_PARALLELISM;
28use crate::traits::{Reader, Writer};
29
30/// Convert an [`object_store::path::Path`] to a [`std::path::Path`].
31pub fn to_local_path(path: &Path) -> String {
32    if cfg!(windows) {
33        path.to_string()
34    } else {
35        format!("/{path}")
36    }
37}
38
39/// Recursively remove a directory, specified by [`object_store::path::Path`].
40pub fn remove_dir_all(path: &Path) -> Result<()> {
41    let local_path = to_local_path(path);
42    std::fs::remove_dir_all(local_path).map_err(|err| match err.kind() {
43        ErrorKind::NotFound => Error::NotFound {
44            uri: path.to_string(),
45            location: location!(),
46        },
47        _ => Error::from(err),
48    })?;
49    Ok(())
50}
51
52/// Copy a file from one location to another, supporting cross-filesystem copies.
53///
54/// Unlike hard links, this function works across filesystem boundaries.
55pub fn copy_file(from: &Path, to: &Path) -> Result<()> {
56    let from_path = to_local_path(from);
57    let to_path = to_local_path(to);
58
59    // Ensure the parent directory exists
60    if let Some(parent) = std::path::Path::new(&to_path).parent() {
61        std::fs::create_dir_all(parent).map_err(Error::from)?;
62    }
63
64    std::fs::copy(&from_path, &to_path).map_err(|err| match err.kind() {
65        ErrorKind::NotFound => Error::NotFound {
66            uri: from.to_string(),
67            location: location!(),
68        },
69        _ => Error::from(err),
70    })?;
71    Ok(())
72}
73
74/// [ObjectReader] for local file system.
75#[derive(Debug)]
76pub struct LocalObjectReader {
77    /// File handler.
78    file: Arc<File>,
79
80    /// Fie path.
81    path: Path,
82
83    /// Known size of the file. This is either passed in on construction or
84    /// cached on the first metadata call.
85    size: OnceCell<usize>,
86
87    /// Block size, in bytes.
88    block_size: usize,
89}
90
91impl DeepSizeOf for LocalObjectReader {
92    fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize {
93        // Skipping `file` as it should just be a file handle
94        self.path.as_ref().deep_size_of_children(context)
95    }
96}
97
98impl LocalObjectReader {
99    pub async fn open_local_path(
100        path: impl AsRef<std::path::Path>,
101        block_size: usize,
102        known_size: Option<usize>,
103    ) -> Result<Box<dyn Reader>> {
104        let path = path.as_ref().to_owned();
105        let object_store_path = Path::from_filesystem_path(&path)?;
106        Self::open(&object_store_path, block_size, known_size).await
107    }
108
109    /// Open a local object reader, with default prefetch size.
110    #[instrument(level = "debug")]
111    pub async fn open(
112        path: &Path,
113        block_size: usize,
114        known_size: Option<usize>,
115    ) -> Result<Box<dyn Reader>> {
116        let path = path.clone();
117        let local_path = to_local_path(&path);
118        tokio::task::spawn_blocking(move || {
119            let file = File::open(&local_path).map_err(|e| match e.kind() {
120                ErrorKind::NotFound => Error::NotFound {
121                    uri: path.to_string(),
122                    location: location!(),
123                },
124                _ => e.into(),
125            })?;
126            let size = OnceCell::new_with(known_size);
127            Ok(Box::new(Self {
128                file: Arc::new(file),
129                block_size,
130                size,
131                path,
132            }) as Box<dyn Reader>)
133        })
134        .await?
135    }
136}
137
138#[async_trait]
139impl Reader for LocalObjectReader {
140    fn path(&self) -> &Path {
141        &self.path
142    }
143
144    fn block_size(&self) -> usize {
145        self.block_size
146    }
147
148    fn io_parallelism(&self) -> usize {
149        DEFAULT_LOCAL_IO_PARALLELISM
150    }
151
152    /// Returns the file size.
153    async fn size(&self) -> object_store::Result<usize> {
154        let file = self.file.clone();
155        self.size
156            .get_or_try_init(|| async move {
157                let metadata = tokio::task::spawn_blocking(move || {
158                    file.metadata().map_err(|err| object_store::Error::Generic {
159                        store: "LocalFileSystem",
160                        source: err.into(),
161                    })
162                })
163                .await??;
164                Ok(metadata.len() as usize)
165            })
166            .await
167            .cloned()
168    }
169
170    /// Reads a range of data.
171    #[instrument(level = "debug", skip(self))]
172    async fn get_range(&self, range: Range<usize>) -> object_store::Result<Bytes> {
173        let file = self.file.clone();
174        tokio::task::spawn_blocking(move || {
175            let mut buf = BytesMut::with_capacity(range.len());
176            // Safety: `buf` is set with appropriate capacity above. It is
177            // written to below and we check all data is initialized at that point.
178            unsafe { buf.set_len(range.len()) };
179            #[cfg(unix)]
180            file.read_exact_at(buf.as_mut(), range.start as u64)?;
181            #[cfg(windows)]
182            read_exact_at(file, buf.as_mut(), range.start as u64)?;
183
184            Ok(buf.freeze())
185        })
186        .await?
187        .map_err(|err: std::io::Error| object_store::Error::Generic {
188            store: "LocalFileSystem",
189            source: err.into(),
190        })
191    }
192
193    /// Reads the entire file.
194    #[instrument(level = "debug", skip(self))]
195    async fn get_all(&self) -> object_store::Result<Bytes> {
196        let mut file = self.file.clone();
197        tokio::task::spawn_blocking(move || {
198            let mut buf = Vec::new();
199            file.read_to_end(buf.as_mut())?;
200            Ok(Bytes::from(buf))
201        })
202        .await?
203        .map_err(|err: std::io::Error| object_store::Error::Generic {
204            store: "LocalFileSystem",
205            source: err.into(),
206        })
207    }
208}
209
210#[cfg(windows)]
211fn read_exact_at(file: Arc<File>, mut buf: &mut [u8], mut offset: u64) -> std::io::Result<()> {
212    let expected_len = buf.len();
213    while !buf.is_empty() {
214        match file.seek_read(buf, offset) {
215            Ok(0) => break,
216            Ok(n) => {
217                let tmp = buf;
218                buf = &mut tmp[n..];
219                offset += n as u64;
220            }
221            Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
222            Err(e) => return Err(e),
223        }
224    }
225    if !buf.is_empty() {
226        Err(std::io::Error::new(
227            std::io::ErrorKind::UnexpectedEof,
228            format!(
229                "failed to fill whole buffer. Expected {} bytes, got {}",
230                expected_len, offset
231            ),
232        ))
233    } else {
234        Ok(())
235    }
236}
237
238#[async_trait]
239impl Writer for tokio::fs::File {
240    async fn tell(&mut self) -> Result<usize> {
241        Ok(self.seek(SeekFrom::Current(0)).await? as usize)
242    }
243}