1use std::fs::File;
7use std::io::{ErrorKind, Read, SeekFrom};
8use std::ops::Range;
9use std::sync::Arc;
10
11#[cfg(unix)]
13use std::os::unix::fs::FileExt;
14#[cfg(windows)]
15use std::os::windows::fs::FileExt;
16
17use async_trait::async_trait;
18use bytes::{Bytes, BytesMut};
19use deepsize::DeepSizeOf;
20use lance_core::{Error, Result};
21use object_store::path::Path;
22use snafu::location;
23use tokio::io::AsyncSeekExt;
24use tokio::sync::OnceCell;
25use tracing::instrument;
26
27use crate::object_store::DEFAULT_LOCAL_IO_PARALLELISM;
28use crate::traits::{Reader, Writer};
29
30pub fn to_local_path(path: &Path) -> String {
32 if cfg!(windows) {
33 path.to_string()
34 } else {
35 format!("/{path}")
36 }
37}
38
39pub fn remove_dir_all(path: &Path) -> Result<()> {
41 let local_path = to_local_path(path);
42 std::fs::remove_dir_all(local_path).map_err(|err| match err.kind() {
43 ErrorKind::NotFound => Error::NotFound {
44 uri: path.to_string(),
45 location: location!(),
46 },
47 _ => Error::from(err),
48 })?;
49 Ok(())
50}
51
52pub fn copy_file(from: &Path, to: &Path) -> Result<()> {
56 let from_path = to_local_path(from);
57 let to_path = to_local_path(to);
58
59 if let Some(parent) = std::path::Path::new(&to_path).parent() {
61 std::fs::create_dir_all(parent).map_err(Error::from)?;
62 }
63
64 std::fs::copy(&from_path, &to_path).map_err(|err| match err.kind() {
65 ErrorKind::NotFound => Error::NotFound {
66 uri: from.to_string(),
67 location: location!(),
68 },
69 _ => Error::from(err),
70 })?;
71 Ok(())
72}
73
74#[derive(Debug)]
76pub struct LocalObjectReader {
77 file: Arc<File>,
79
80 path: Path,
82
83 size: OnceCell<usize>,
86
87 block_size: usize,
89}
90
91impl DeepSizeOf for LocalObjectReader {
92 fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize {
93 self.path.as_ref().deep_size_of_children(context)
95 }
96}
97
98impl LocalObjectReader {
99 pub async fn open_local_path(
100 path: impl AsRef<std::path::Path>,
101 block_size: usize,
102 known_size: Option<usize>,
103 ) -> Result<Box<dyn Reader>> {
104 let path = path.as_ref().to_owned();
105 let object_store_path = Path::from_filesystem_path(&path)?;
106 Self::open(&object_store_path, block_size, known_size).await
107 }
108
109 #[instrument(level = "debug")]
111 pub async fn open(
112 path: &Path,
113 block_size: usize,
114 known_size: Option<usize>,
115 ) -> Result<Box<dyn Reader>> {
116 let path = path.clone();
117 let local_path = to_local_path(&path);
118 tokio::task::spawn_blocking(move || {
119 let file = File::open(&local_path).map_err(|e| match e.kind() {
120 ErrorKind::NotFound => Error::NotFound {
121 uri: path.to_string(),
122 location: location!(),
123 },
124 _ => e.into(),
125 })?;
126 let size = OnceCell::new_with(known_size);
127 Ok(Box::new(Self {
128 file: Arc::new(file),
129 block_size,
130 size,
131 path,
132 }) as Box<dyn Reader>)
133 })
134 .await?
135 }
136}
137
138#[async_trait]
139impl Reader for LocalObjectReader {
140 fn path(&self) -> &Path {
141 &self.path
142 }
143
144 fn block_size(&self) -> usize {
145 self.block_size
146 }
147
148 fn io_parallelism(&self) -> usize {
149 DEFAULT_LOCAL_IO_PARALLELISM
150 }
151
152 async fn size(&self) -> object_store::Result<usize> {
154 let file = self.file.clone();
155 self.size
156 .get_or_try_init(|| async move {
157 let metadata = tokio::task::spawn_blocking(move || {
158 file.metadata().map_err(|err| object_store::Error::Generic {
159 store: "LocalFileSystem",
160 source: err.into(),
161 })
162 })
163 .await??;
164 Ok(metadata.len() as usize)
165 })
166 .await
167 .cloned()
168 }
169
170 #[instrument(level = "debug", skip(self))]
172 async fn get_range(&self, range: Range<usize>) -> object_store::Result<Bytes> {
173 let file = self.file.clone();
174 tokio::task::spawn_blocking(move || {
175 let mut buf = BytesMut::with_capacity(range.len());
176 unsafe { buf.set_len(range.len()) };
179 #[cfg(unix)]
180 file.read_exact_at(buf.as_mut(), range.start as u64)?;
181 #[cfg(windows)]
182 read_exact_at(file, buf.as_mut(), range.start as u64)?;
183
184 Ok(buf.freeze())
185 })
186 .await?
187 .map_err(|err: std::io::Error| object_store::Error::Generic {
188 store: "LocalFileSystem",
189 source: err.into(),
190 })
191 }
192
193 #[instrument(level = "debug", skip(self))]
195 async fn get_all(&self) -> object_store::Result<Bytes> {
196 let mut file = self.file.clone();
197 tokio::task::spawn_blocking(move || {
198 let mut buf = Vec::new();
199 file.read_to_end(buf.as_mut())?;
200 Ok(Bytes::from(buf))
201 })
202 .await?
203 .map_err(|err: std::io::Error| object_store::Error::Generic {
204 store: "LocalFileSystem",
205 source: err.into(),
206 })
207 }
208}
209
210#[cfg(windows)]
211fn read_exact_at(file: Arc<File>, mut buf: &mut [u8], mut offset: u64) -> std::io::Result<()> {
212 let expected_len = buf.len();
213 while !buf.is_empty() {
214 match file.seek_read(buf, offset) {
215 Ok(0) => break,
216 Ok(n) => {
217 let tmp = buf;
218 buf = &mut tmp[n..];
219 offset += n as u64;
220 }
221 Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
222 Err(e) => return Err(e),
223 }
224 }
225 if !buf.is_empty() {
226 Err(std::io::Error::new(
227 std::io::ErrorKind::UnexpectedEof,
228 format!(
229 "failed to fill whole buffer. Expected {} bytes, got {}",
230 expected_len, offset
231 ),
232 ))
233 } else {
234 Ok(())
235 }
236}
237
238#[async_trait]
239impl Writer for tokio::fs::File {
240 async fn tell(&mut self) -> Result<usize> {
241 Ok(self.seek(SeekFrom::Current(0)).await? as usize)
242 }
243}