1use std::fs::File;
7use std::io::{ErrorKind, Read, SeekFrom};
8use std::ops::Range;
9use std::sync::Arc;
10
11#[cfg(unix)]
13use std::os::unix::fs::FileExt;
14#[cfg(windows)]
15use std::os::windows::fs::FileExt;
16
17use async_trait::async_trait;
18use bytes::{Bytes, BytesMut};
19use deepsize::DeepSizeOf;
20use lance_core::{Error, Result};
21use object_store::path::Path;
22use snafu::location;
23use tokio::io::AsyncSeekExt;
24use tokio::sync::OnceCell;
25use tracing::instrument;
26
27use crate::object_store::DEFAULT_LOCAL_IO_PARALLELISM;
28use crate::traits::{Reader, Writer};
29use crate::utils::tracking_store::IOTracker;
30
31pub fn to_local_path(path: &Path) -> String {
33 if cfg!(windows) {
34 path.to_string()
35 } else {
36 format!("/{path}")
37 }
38}
39
40pub fn remove_dir_all(path: &Path) -> Result<()> {
42 let local_path = to_local_path(path);
43 std::fs::remove_dir_all(local_path).map_err(|err| match err.kind() {
44 ErrorKind::NotFound => Error::NotFound {
45 uri: path.to_string(),
46 location: location!(),
47 },
48 _ => Error::from(err),
49 })?;
50 Ok(())
51}
52
53pub fn copy_file(from: &Path, to: &Path) -> Result<()> {
57 let from_path = to_local_path(from);
58 let to_path = to_local_path(to);
59
60 if let Some(parent) = std::path::Path::new(&to_path).parent() {
62 std::fs::create_dir_all(parent).map_err(Error::from)?;
63 }
64
65 std::fs::copy(&from_path, &to_path).map_err(|err| match err.kind() {
66 ErrorKind::NotFound => Error::NotFound {
67 uri: from.to_string(),
68 location: location!(),
69 },
70 _ => Error::from(err),
71 })?;
72 Ok(())
73}
74
75#[derive(Debug)]
77pub struct LocalObjectReader {
78 file: Arc<File>,
80
81 path: Path,
83
84 size: OnceCell<usize>,
87
88 block_size: usize,
90
91 io_tracker: Arc<IOTracker>,
93}
94
95impl DeepSizeOf for LocalObjectReader {
96 fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize {
97 self.path.as_ref().deep_size_of_children(context)
99 }
100}
101
102impl LocalObjectReader {
103 pub async fn open_local_path(
104 path: impl AsRef<std::path::Path>,
105 block_size: usize,
106 known_size: Option<usize>,
107 ) -> Result<Box<dyn Reader>> {
108 let path = path.as_ref().to_owned();
109 let object_store_path = Path::from_filesystem_path(&path)?;
110 Self::open(&object_store_path, block_size, known_size).await
111 }
112
113 #[instrument(level = "debug")]
117 pub async fn open(
118 path: &Path,
119 block_size: usize,
120 known_size: Option<usize>,
121 ) -> Result<Box<dyn Reader>> {
122 Self::open_with_tracker(path, block_size, known_size, Default::default()).await
123 }
124
125 #[instrument(level = "debug")]
127 pub(crate) async fn open_with_tracker(
128 path: &Path,
129 block_size: usize,
130 known_size: Option<usize>,
131 io_tracker: Arc<IOTracker>,
132 ) -> Result<Box<dyn Reader>> {
133 let path = path.clone();
134 let local_path = to_local_path(&path);
135 tokio::task::spawn_blocking(move || {
136 let file = File::open(&local_path).map_err(|e| match e.kind() {
137 ErrorKind::NotFound => Error::NotFound {
138 uri: path.to_string(),
139 location: location!(),
140 },
141 _ => e.into(),
142 })?;
143 let size = OnceCell::new_with(known_size);
144 Ok(Box::new(Self {
145 file: Arc::new(file),
146 block_size,
147 size,
148 path,
149 io_tracker,
150 }) as Box<dyn Reader>)
151 })
152 .await?
153 }
154}
155
156#[async_trait]
157impl Reader for LocalObjectReader {
158 fn path(&self) -> &Path {
159 &self.path
160 }
161
162 fn block_size(&self) -> usize {
163 self.block_size
164 }
165
166 fn io_parallelism(&self) -> usize {
167 DEFAULT_LOCAL_IO_PARALLELISM
168 }
169
170 async fn size(&self) -> object_store::Result<usize> {
172 let file = self.file.clone();
173 self.size
174 .get_or_try_init(|| async move {
175 let metadata = tokio::task::spawn_blocking(move || {
176 file.metadata().map_err(|err| object_store::Error::Generic {
177 store: "LocalFileSystem",
178 source: err.into(),
179 })
180 })
181 .await??;
182 Ok(metadata.len() as usize)
183 })
184 .await
185 .cloned()
186 }
187
188 #[instrument(level = "debug", skip(self))]
190 async fn get_range(&self, range: Range<usize>) -> object_store::Result<Bytes> {
191 let file = self.file.clone();
192 let io_tracker = self.io_tracker.clone();
193 let path = self.path.clone();
194 let num_bytes = range.len() as u64;
195 let range_u64 = (range.start as u64)..(range.end as u64);
196
197 let result = tokio::task::spawn_blocking(move || {
198 let mut buf = BytesMut::with_capacity(range.len());
199 unsafe { buf.set_len(range.len()) };
202 #[cfg(unix)]
203 file.read_exact_at(buf.as_mut(), range.start as u64)?;
204 #[cfg(windows)]
205 read_exact_at(file, buf.as_mut(), range.start as u64)?;
206
207 Ok(buf.freeze())
208 })
209 .await?
210 .map_err(|err: std::io::Error| object_store::Error::Generic {
211 store: "LocalFileSystem",
212 source: err.into(),
213 });
214
215 if result.is_ok() {
216 io_tracker.record_read("get_range", path, num_bytes, Some(range_u64));
217 }
218
219 result
220 }
221
222 #[instrument(level = "debug", skip(self))]
224 async fn get_all(&self) -> object_store::Result<Bytes> {
225 let mut file = self.file.clone();
226 let io_tracker = self.io_tracker.clone();
227 let path = self.path.clone();
228
229 let result = tokio::task::spawn_blocking(move || {
230 let mut buf = Vec::new();
231 file.read_to_end(buf.as_mut())?;
232 Ok(Bytes::from(buf))
233 })
234 .await?
235 .map_err(|err: std::io::Error| object_store::Error::Generic {
236 store: "LocalFileSystem",
237 source: err.into(),
238 });
239
240 if let Ok(bytes) = &result {
241 io_tracker.record_read("get_all", path, bytes.len() as u64, None);
242 }
243
244 result
245 }
246}
247
248#[cfg(windows)]
249fn read_exact_at(file: Arc<File>, mut buf: &mut [u8], mut offset: u64) -> std::io::Result<()> {
250 let expected_len = buf.len();
251 while !buf.is_empty() {
252 match file.seek_read(buf, offset) {
253 Ok(0) => break,
254 Ok(n) => {
255 let tmp = buf;
256 buf = &mut tmp[n..];
257 offset += n as u64;
258 }
259 Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
260 Err(e) => return Err(e),
261 }
262 }
263 if !buf.is_empty() {
264 Err(std::io::Error::new(
265 std::io::ErrorKind::UnexpectedEof,
266 format!(
267 "failed to fill whole buffer. Expected {} bytes, got {}",
268 expected_len, offset
269 ),
270 ))
271 } else {
272 Ok(())
273 }
274}
275
276#[async_trait]
277impl Writer for tokio::fs::File {
278 async fn tell(&mut self) -> Result<usize> {
279 Ok(self.seek(SeekFrom::Current(0)).await? as usize)
280 }
281}