1use std::fs::File;
7use std::io::{ErrorKind, Read, SeekFrom};
8use std::ops::Range;
9use std::sync::Arc;
10
11#[cfg(unix)]
13use std::os::unix::fs::FileExt;
14#[cfg(windows)]
15use std::os::windows::fs::FileExt;
16
17use async_trait::async_trait;
18use bytes::{Bytes, BytesMut};
19use deepsize::DeepSizeOf;
20use futures::future::BoxFuture;
21use lance_core::{Error, Result};
22use object_store::path::Path;
23use tokio::io::AsyncSeekExt;
24use tokio::sync::OnceCell;
25use tracing::instrument;
26
27use crate::object_reader::stream_local_range;
28use crate::object_store::DEFAULT_LOCAL_IO_PARALLELISM;
29use crate::object_writer::WriteResult;
30use crate::traits::{ByteStream, Reader, Writer};
31use crate::utils::tracking_store::IOTracker;
32
33pub fn to_local_path(path: &Path) -> String {
35 if cfg!(windows) {
36 path.to_string()
37 } else {
38 format!("/{path}")
39 }
40}
41
42pub fn remove_dir_all(path: &Path) -> Result<()> {
44 let local_path = to_local_path(path);
45 std::fs::remove_dir_all(local_path).map_err(|err| match err.kind() {
46 ErrorKind::NotFound => Error::not_found(path.to_string()),
47 _ => Error::from(err),
48 })?;
49 Ok(())
50}
51
52pub fn copy_file(from: &Path, to: &Path) -> Result<()> {
56 let from_path = to_local_path(from);
57 let to_path = to_local_path(to);
58
59 if let Some(parent) = std::path::Path::new(&to_path).parent() {
61 std::fs::create_dir_all(parent).map_err(Error::from)?;
62 }
63
64 std::fs::copy(&from_path, &to_path).map_err(|err| match err.kind() {
65 ErrorKind::NotFound => Error::not_found(from.to_string()),
66 _ => Error::from(err),
67 })?;
68 Ok(())
69}
70
71#[derive(Debug)]
73pub struct LocalObjectReader {
74 file: Arc<File>,
76
77 path: Path,
79
80 size: OnceCell<usize>,
83
84 block_size: usize,
86
87 io_tracker: Arc<IOTracker>,
89}
90
91impl DeepSizeOf for LocalObjectReader {
92 fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize {
93 self.path.as_ref().deep_size_of_children(context)
95 }
96}
97
98impl LocalObjectReader {
99 pub async fn open_local_path(
100 path: impl AsRef<std::path::Path>,
101 block_size: usize,
102 known_size: Option<usize>,
103 ) -> Result<Box<dyn Reader>> {
104 let path = path.as_ref().to_owned();
105 let object_store_path = Path::from_filesystem_path(&path)?;
106 Self::open(&object_store_path, block_size, known_size).await
107 }
108
109 #[instrument(level = "debug")]
113 pub async fn open(
114 path: &Path,
115 block_size: usize,
116 known_size: Option<usize>,
117 ) -> Result<Box<dyn Reader>> {
118 Self::open_with_tracker(path, block_size, known_size, Default::default()).await
119 }
120
121 #[instrument(level = "debug")]
123 pub(crate) async fn open_with_tracker(
124 path: &Path,
125 block_size: usize,
126 known_size: Option<usize>,
127 io_tracker: Arc<IOTracker>,
128 ) -> Result<Box<dyn Reader>> {
129 let path = path.clone();
130 let local_path = to_local_path(&path);
131 tokio::task::spawn_blocking(move || {
132 let file = File::open(&local_path).map_err(|e| match e.kind() {
133 ErrorKind::NotFound => Error::not_found(path.to_string()),
134 _ => e.into(),
135 })?;
136 let size = OnceCell::new_with(known_size);
137 Ok(Box::new(Self {
138 file: Arc::new(file),
139 block_size,
140 size,
141 path,
142 io_tracker,
143 }) as Box<dyn Reader>)
144 })
145 .await?
146 }
147}
148
149impl Reader for LocalObjectReader {
150 fn path(&self) -> &Path {
151 &self.path
152 }
153
154 fn block_size(&self) -> usize {
155 self.block_size
156 }
157
158 fn io_parallelism(&self) -> usize {
159 DEFAULT_LOCAL_IO_PARALLELISM
160 }
161
162 fn size(&self) -> BoxFuture<'_, object_store::Result<usize>> {
164 Box::pin(async move {
165 let file = self.file.clone();
166 self.size
167 .get_or_try_init(|| async move {
168 let metadata = tokio::task::spawn_blocking(move || {
169 file.metadata().map_err(|err| object_store::Error::Generic {
170 store: "LocalFileSystem",
171 source: err.into(),
172 })
173 })
174 .await??;
175 Ok(metadata.len() as usize)
176 })
177 .await
178 .cloned()
179 })
180 }
181
182 #[instrument(level = "debug", skip(self))]
184 fn get_range(&self, range: Range<usize>) -> BoxFuture<'static, object_store::Result<Bytes>> {
185 let file = self.file.clone();
186 let io_tracker = self.io_tracker.clone();
187 let path = self.path.clone();
188 let num_bytes = range.len() as u64;
189 let range_u64 = (range.start as u64)..(range.end as u64);
190
191 Box::pin(async move {
192 let result = tokio::task::spawn_blocking(move || {
193 let mut buf = BytesMut::with_capacity(range.len());
194 unsafe { buf.set_len(range.len()) };
197 #[cfg(unix)]
198 file.read_exact_at(buf.as_mut(), range.start as u64)?;
199 #[cfg(windows)]
200 read_exact_at(file, buf.as_mut(), range.start as u64)?;
201
202 Ok(buf.freeze())
203 })
204 .await?
205 .map_err(|err: std::io::Error| object_store::Error::Generic {
206 store: "LocalFileSystem",
207 source: err.into(),
208 });
209
210 if result.is_ok() {
211 io_tracker.record_read("get_range", path, num_bytes, Some(range_u64));
212 }
213
214 result
215 })
216 }
217
218 #[instrument(level = "debug", skip(self))]
220 fn get_all(&self) -> BoxFuture<'_, object_store::Result<Bytes>> {
221 Box::pin(async move {
222 let mut file = self.file.clone();
223 let io_tracker = self.io_tracker.clone();
224 let path = self.path.clone();
225
226 let result = tokio::task::spawn_blocking(move || {
227 let mut buf = Vec::new();
228 file.read_to_end(buf.as_mut())?;
229 Ok(Bytes::from(buf))
230 })
231 .await?
232 .map_err(|err: std::io::Error| object_store::Error::Generic {
233 store: "LocalFileSystem",
234 source: err.into(),
235 });
236
237 if let Ok(bytes) = &result {
238 io_tracker.record_read("get_all", path, bytes.len() as u64, None);
239 }
240
241 result
242 })
243 }
244
245 fn get_stream(&self) -> BoxFuture<'_, object_store::Result<ByteStream>> {
246 Box::pin(async move {
247 let size = self.size().await?;
248 Ok(stream_local_range(
249 self.file.clone(),
250 self.path.clone(),
251 self.io_tracker.clone(),
252 0..size,
253 self.block_size.max(8 * 1024),
254 ))
255 })
256 }
257
258 fn get_range_stream(
259 &self,
260 range: Range<usize>,
261 ) -> BoxFuture<'_, object_store::Result<ByteStream>> {
262 let file = self.file.clone();
263 let path = self.path.clone();
264 let io_tracker = self.io_tracker.clone();
265 let chunk_size = self.block_size.max(8 * 1024);
266 Box::pin(async move {
267 Ok(stream_local_range(
268 file, path, io_tracker, range, chunk_size,
269 ))
270 })
271 }
272}
273
274#[cfg(windows)]
275pub(crate) fn read_exact_at(
276 file: Arc<File>,
277 mut buf: &mut [u8],
278 mut offset: u64,
279) -> std::io::Result<()> {
280 let expected_len = buf.len();
281 while !buf.is_empty() {
282 match file.seek_read(buf, offset) {
283 Ok(0) => break,
284 Ok(n) => {
285 let tmp = buf;
286 buf = &mut tmp[n..];
287 offset += n as u64;
288 }
289 Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
290 Err(e) => return Err(e),
291 }
292 }
293 if !buf.is_empty() {
294 Err(std::io::Error::new(
295 std::io::ErrorKind::UnexpectedEof,
296 format!(
297 "failed to fill whole buffer. Expected {} bytes, got {}",
298 expected_len, offset
299 ),
300 ))
301 } else {
302 Ok(())
303 }
304}
305
306#[async_trait]
307impl Writer for tokio::fs::File {
308 async fn tell(&mut self) -> Result<usize> {
309 Ok(self.seek(SeekFrom::Current(0)).await? as usize)
310 }
311
312 async fn shutdown(&mut self) -> Result<WriteResult> {
313 let size = self.seek(SeekFrom::Current(0)).await? as usize;
314 tokio::io::AsyncWriteExt::shutdown(self).await?;
315 Ok(WriteResult { size, e_tag: None })
316 }
317}