1use std::fs::File;
7use std::io::{ErrorKind, Read, SeekFrom};
8use std::ops::Range;
9use std::sync::Arc;
10
11#[cfg(unix)]
13use std::os::unix::fs::FileExt;
14#[cfg(windows)]
15use std::os::windows::fs::FileExt;
16
17use async_trait::async_trait;
18use bytes::{Bytes, BytesMut};
19use deepsize::DeepSizeOf;
20use futures::future::BoxFuture;
21use lance_core::{Error, Result};
22use object_store::path::Path;
23use tokio::io::AsyncSeekExt;
24use tokio::sync::OnceCell;
25use tracing::instrument;
26
27use crate::object_store::DEFAULT_LOCAL_IO_PARALLELISM;
28use crate::object_writer::WriteResult;
29use crate::traits::{Reader, Writer};
30use crate::utils::tracking_store::IOTracker;
31
32pub fn to_local_path(path: &Path) -> String {
34 if cfg!(windows) {
35 path.to_string()
36 } else {
37 format!("/{path}")
38 }
39}
40
41pub fn remove_dir_all(path: &Path) -> Result<()> {
43 let local_path = to_local_path(path);
44 std::fs::remove_dir_all(local_path).map_err(|err| match err.kind() {
45 ErrorKind::NotFound => Error::not_found(path.to_string()),
46 _ => Error::from(err),
47 })?;
48 Ok(())
49}
50
51pub fn copy_file(from: &Path, to: &Path) -> Result<()> {
55 let from_path = to_local_path(from);
56 let to_path = to_local_path(to);
57
58 if let Some(parent) = std::path::Path::new(&to_path).parent() {
60 std::fs::create_dir_all(parent).map_err(Error::from)?;
61 }
62
63 std::fs::copy(&from_path, &to_path).map_err(|err| match err.kind() {
64 ErrorKind::NotFound => Error::not_found(from.to_string()),
65 _ => Error::from(err),
66 })?;
67 Ok(())
68}
69
70#[derive(Debug)]
72pub struct LocalObjectReader {
73 file: Arc<File>,
75
76 path: Path,
78
79 size: OnceCell<usize>,
82
83 block_size: usize,
85
86 io_tracker: Arc<IOTracker>,
88}
89
90impl DeepSizeOf for LocalObjectReader {
91 fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize {
92 self.path.as_ref().deep_size_of_children(context)
94 }
95}
96
97impl LocalObjectReader {
98 pub async fn open_local_path(
99 path: impl AsRef<std::path::Path>,
100 block_size: usize,
101 known_size: Option<usize>,
102 ) -> Result<Box<dyn Reader>> {
103 let path = path.as_ref().to_owned();
104 let object_store_path = Path::from_filesystem_path(&path)?;
105 Self::open(&object_store_path, block_size, known_size).await
106 }
107
108 #[instrument(level = "debug")]
112 pub async fn open(
113 path: &Path,
114 block_size: usize,
115 known_size: Option<usize>,
116 ) -> Result<Box<dyn Reader>> {
117 Self::open_with_tracker(path, block_size, known_size, Default::default()).await
118 }
119
120 #[instrument(level = "debug")]
122 pub(crate) async fn open_with_tracker(
123 path: &Path,
124 block_size: usize,
125 known_size: Option<usize>,
126 io_tracker: Arc<IOTracker>,
127 ) -> Result<Box<dyn Reader>> {
128 let path = path.clone();
129 let local_path = to_local_path(&path);
130 tokio::task::spawn_blocking(move || {
131 let file = File::open(&local_path).map_err(|e| match e.kind() {
132 ErrorKind::NotFound => Error::not_found(path.to_string()),
133 _ => e.into(),
134 })?;
135 let size = OnceCell::new_with(known_size);
136 Ok(Box::new(Self {
137 file: Arc::new(file),
138 block_size,
139 size,
140 path,
141 io_tracker,
142 }) as Box<dyn Reader>)
143 })
144 .await?
145 }
146}
147
148impl Reader for LocalObjectReader {
149 fn path(&self) -> &Path {
150 &self.path
151 }
152
153 fn block_size(&self) -> usize {
154 self.block_size
155 }
156
157 fn io_parallelism(&self) -> usize {
158 DEFAULT_LOCAL_IO_PARALLELISM
159 }
160
161 fn size(&self) -> BoxFuture<'_, object_store::Result<usize>> {
163 Box::pin(async move {
164 let file = self.file.clone();
165 self.size
166 .get_or_try_init(|| async move {
167 let metadata = tokio::task::spawn_blocking(move || {
168 file.metadata().map_err(|err| object_store::Error::Generic {
169 store: "LocalFileSystem",
170 source: err.into(),
171 })
172 })
173 .await??;
174 Ok(metadata.len() as usize)
175 })
176 .await
177 .cloned()
178 })
179 }
180
181 #[instrument(level = "debug", skip(self))]
183 fn get_range(&self, range: Range<usize>) -> BoxFuture<'static, object_store::Result<Bytes>> {
184 let file = self.file.clone();
185 let io_tracker = self.io_tracker.clone();
186 let path = self.path.clone();
187 let num_bytes = range.len() as u64;
188 let range_u64 = (range.start as u64)..(range.end as u64);
189
190 Box::pin(async move {
191 let result = tokio::task::spawn_blocking(move || {
192 let mut buf = BytesMut::with_capacity(range.len());
193 unsafe { buf.set_len(range.len()) };
196 #[cfg(unix)]
197 file.read_exact_at(buf.as_mut(), range.start as u64)?;
198 #[cfg(windows)]
199 read_exact_at(file, buf.as_mut(), range.start as u64)?;
200
201 Ok(buf.freeze())
202 })
203 .await?
204 .map_err(|err: std::io::Error| object_store::Error::Generic {
205 store: "LocalFileSystem",
206 source: err.into(),
207 });
208
209 if result.is_ok() {
210 io_tracker.record_read("get_range", path, num_bytes, Some(range_u64));
211 }
212
213 result
214 })
215 }
216
217 #[instrument(level = "debug", skip(self))]
219 fn get_all(&self) -> BoxFuture<'_, object_store::Result<Bytes>> {
220 Box::pin(async move {
221 let mut file = self.file.clone();
222 let io_tracker = self.io_tracker.clone();
223 let path = self.path.clone();
224
225 let result = tokio::task::spawn_blocking(move || {
226 let mut buf = Vec::new();
227 file.read_to_end(buf.as_mut())?;
228 Ok(Bytes::from(buf))
229 })
230 .await?
231 .map_err(|err: std::io::Error| object_store::Error::Generic {
232 store: "LocalFileSystem",
233 source: err.into(),
234 });
235
236 if let Ok(bytes) = &result {
237 io_tracker.record_read("get_all", path, bytes.len() as u64, None);
238 }
239
240 result
241 })
242 }
243}
244
245#[cfg(windows)]
246fn read_exact_at(file: Arc<File>, mut buf: &mut [u8], mut offset: u64) -> std::io::Result<()> {
247 let expected_len = buf.len();
248 while !buf.is_empty() {
249 match file.seek_read(buf, offset) {
250 Ok(0) => break,
251 Ok(n) => {
252 let tmp = buf;
253 buf = &mut tmp[n..];
254 offset += n as u64;
255 }
256 Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
257 Err(e) => return Err(e),
258 }
259 }
260 if !buf.is_empty() {
261 Err(std::io::Error::new(
262 std::io::ErrorKind::UnexpectedEof,
263 format!(
264 "failed to fill whole buffer. Expected {} bytes, got {}",
265 expected_len, offset
266 ),
267 ))
268 } else {
269 Ok(())
270 }
271}
272
273#[async_trait]
274impl Writer for tokio::fs::File {
275 async fn tell(&mut self) -> Result<usize> {
276 Ok(self.seek(SeekFrom::Current(0)).await? as usize)
277 }
278
279 async fn shutdown(&mut self) -> Result<WriteResult> {
280 let size = self.seek(SeekFrom::Current(0)).await? as usize;
281 tokio::io::AsyncWriteExt::shutdown(self).await?;
282 Ok(WriteResult { size, e_tag: None })
283 }
284}