1use std::any::Any;
8use std::collections::VecDeque;
9use std::future::Future;
10use std::io::{self, ErrorKind, Read, Seek, SeekFrom, Write};
11use std::os::unix::ffi::OsStrExt;
12use std::os::unix::fs::DirBuilderExt;
13use std::os::unix::fs::MetadataExt;
14use std::os::unix::fs::OpenOptionsExt;
15use std::os::unix::fs::PermissionsExt;
16use std::path::{Path, PathBuf};
17use std::pin::Pin;
18use std::sync::atomic::{AtomicU32, Ordering};
19use std::sync::Arc;
20use std::task::{Context, Poll};
21use std::time::{Duration, SystemTime, UNIX_EPOCH};
22
23use bytes::{Buf, Bytes, BytesMut};
24use futures::{future, future::BoxFuture, FutureExt, Stream};
25use pin_utils::pin_mut;
26use tokio::task;
27
28use libc;
29
30use crate::davpath::DavPath;
31use crate::fs::*;
32use crate::localfs_macos::DUCacheBuilder;
33
34const RUNTIME_TYPE_BASIC: u32 = 1;
35const RUNTIME_TYPE_THREADPOOL: u32 = 2;
36static RUNTIME_TYPE: AtomicU32 = AtomicU32::new(0);
37
38#[derive(Clone, Copy)]
39#[repr(u32)]
40enum RuntimeType {
41 Basic = RUNTIME_TYPE_BASIC,
42 ThreadPool = RUNTIME_TYPE_THREADPOOL,
43}
44
45impl RuntimeType {
46 #[inline]
47 fn get() -> RuntimeType {
48 match RUNTIME_TYPE.load(Ordering::Relaxed) {
49 RUNTIME_TYPE_BASIC => RuntimeType::Basic,
50 RUNTIME_TYPE_THREADPOOL => RuntimeType::ThreadPool,
51 _ => {
52 let dbg = format!("{:?}", tokio::runtime::Handle::current());
53 let rt = if dbg.contains("ThreadPool") {
54 RuntimeType::ThreadPool
55 } else {
56 RuntimeType::Basic
57 };
58 RUNTIME_TYPE.store(rt as u32, Ordering::SeqCst);
59 rt
60 },
61 }
62 }
63}
64
65#[inline]
70async fn blocking<F, R>(func: F) -> R
71where
72 F: FnOnce() -> R,
73 F: Send + 'static,
74 R: Send + 'static,
75{
76 match RuntimeType::get() {
77 RuntimeType::Basic => task::spawn_blocking(func).await.unwrap(),
78 RuntimeType::ThreadPool => task::block_in_place(func),
79 }
80}
81
82#[derive(Debug, Clone)]
83struct LocalFsMetaData(std::fs::Metadata);
84
85#[derive(Clone)]
87pub struct LocalFs {
88 pub(crate) inner: Arc<LocalFsInner>,
89}
90
91pub(crate) struct LocalFsInner {
93 pub basedir: PathBuf,
94 pub public: bool,
95 pub case_insensitive: bool,
96 pub macos: bool,
97 pub is_file: bool,
98 pub fs_access_guard: Option<Box<dyn Fn() -> Box<dyn Any> + Send + Sync + 'static>>,
99}
100
101#[derive(Debug)]
102struct LocalFsFile(Option<std::fs::File>);
103
104struct LocalFsReadDir {
105 fs: LocalFs,
106 do_meta: ReadDirMeta,
107 buffer: VecDeque<io::Result<LocalFsDirEntry>>,
108 dir_cache: Option<DUCacheBuilder>,
109 iterator: Option<std::fs::ReadDir>,
110 fut: Option<BoxFuture<'static, ReadDirBatch>>,
111}
112
113enum Meta {
116 Data(io::Result<std::fs::Metadata>),
117 Fs(LocalFs),
118}
119
120struct LocalFsDirEntry {
122 meta: Meta,
123 entry: std::fs::DirEntry,
124}
125
126impl LocalFs {
127 pub fn new<P: AsRef<Path>>(base: P, public: bool, case_insensitive: bool, macos: bool) -> Box<LocalFs> {
136 let inner = LocalFsInner {
137 basedir: base.as_ref().to_path_buf(),
138 public: public,
139 macos: macos,
140 case_insensitive: case_insensitive,
141 is_file: false,
142 fs_access_guard: None,
143 };
144 Box::new({
145 LocalFs {
146 inner: Arc::new(inner),
147 }
148 })
149 }
150
151 pub fn new_file<P: AsRef<Path>>(file: P, public: bool) -> Box<LocalFs> {
156 let inner = LocalFsInner {
157 basedir: file.as_ref().to_path_buf(),
158 public: public,
159 macos: false,
160 case_insensitive: false,
161 is_file: true,
162 fs_access_guard: None,
163 };
164 Box::new({
165 LocalFs {
166 inner: Arc::new(inner),
167 }
168 })
169 }
170
171 #[doc(hidden)]
173 pub fn new_with_fs_access_guard<P: AsRef<Path>>(
174 base: P,
175 public: bool,
176 case_insensitive: bool,
177 macos: bool,
178 fs_access_guard: Option<Box<dyn Fn() -> Box<dyn Any> + Send + Sync + 'static>>,
179 ) -> Box<LocalFs>
180 {
181 let inner = LocalFsInner {
182 basedir: base.as_ref().to_path_buf(),
183 public: public,
184 macos: macos,
185 case_insensitive: case_insensitive,
186 is_file: false,
187 fs_access_guard: fs_access_guard,
188 };
189 Box::new({
190 LocalFs {
191 inner: Arc::new(inner),
192 }
193 })
194 }
195
196 fn fspath_dbg(&self, path: &DavPath) -> PathBuf {
197 let mut pathbuf = self.inner.basedir.clone();
198 if !self.inner.is_file {
199 pathbuf.push(path.as_rel_ospath());
200 }
201 pathbuf
202 }
203
204 fn fspath(&self, path: &DavPath) -> PathBuf {
205 if self.inner.case_insensitive {
206 crate::localfs_windows::resolve(&self.inner.basedir, &path)
207 } else {
208 let mut pathbuf = self.inner.basedir.clone();
209 if !self.inner.is_file {
210 pathbuf.push(path.as_rel_ospath());
211 }
212 pathbuf
213 }
214 }
215
216 #[doc(hidden)]
218 pub async fn blocking<F, R>(&self, func: F) -> R
219 where
220 F: FnOnce() -> R + Send + 'static,
221 R: Send + 'static,
222 {
223 let this = self.clone();
224 blocking(move || {
225 let _guard = this.inner.fs_access_guard.as_ref().map(|f| f());
226 func()
227 })
228 .await
229 }
230}
231
232impl DavFileSystem for LocalFs {
235 fn metadata<'a>(&'a self, davpath: &'a DavPath) -> FsFuture<Box<dyn DavMetaData>> {
236 async move {
237 if let Some(meta) = self.is_virtual(davpath) {
238 return Ok(meta);
239 }
240 let path = self.fspath(davpath);
241 if self.is_notfound(&path) {
242 return Err(FsError::NotFound);
243 }
244 self.blocking(move || {
245 match std::fs::metadata(path) {
246 Ok(meta) => Ok(Box::new(LocalFsMetaData(meta)) as Box<dyn DavMetaData>),
247 Err(e) => Err(e.into()),
248 }
249 })
250 .await
251 }
252 .boxed()
253 }
254
255 fn symlink_metadata<'a>(&'a self, davpath: &'a DavPath) -> FsFuture<Box<dyn DavMetaData>> {
256 async move {
257 if let Some(meta) = self.is_virtual(davpath) {
258 return Ok(meta);
259 }
260 let path = self.fspath(davpath);
261 if self.is_notfound(&path) {
262 return Err(FsError::NotFound);
263 }
264 self.blocking(move || {
265 match std::fs::symlink_metadata(path) {
266 Ok(meta) => Ok(Box::new(LocalFsMetaData(meta)) as Box<dyn DavMetaData>),
267 Err(e) => Err(e.into()),
268 }
269 })
270 .await
271 }
272 .boxed()
273 }
274
275 fn read_dir<'a>(
278 &'a self,
279 davpath: &'a DavPath,
280 meta: ReadDirMeta,
281 ) -> FsFuture<FsStream<Box<dyn DavDirEntry>>>
282 {
283 async move {
284 trace!("FS: read_dir {:?}", self.fspath_dbg(davpath));
285 let path = self.fspath(davpath);
286 let path2 = path.clone();
287 let iter = self.blocking(move || std::fs::read_dir(&path)).await;
288 match iter {
289 Ok(iterator) => {
290 let strm = LocalFsReadDir {
291 fs: self.clone(),
292 do_meta: meta,
293 buffer: VecDeque::new(),
294 dir_cache: self.dir_cache_builder(path2),
295 iterator: Some(iterator),
296 fut: None,
297 };
298 Ok(Box::pin(strm) as FsStream<Box<dyn DavDirEntry>>)
299 },
300 Err(e) => Err(e.into()),
301 }
302 }
303 .boxed()
304 }
305
306 fn open<'a>(&'a self, path: &'a DavPath, options: OpenOptions) -> FsFuture<Box<dyn DavFile>> {
307 async move {
308 trace!("FS: open {:?}", self.fspath_dbg(path));
309 if self.is_forbidden(path) {
310 return Err(FsError::Forbidden);
311 }
312 let mode = if self.inner.public { 0o644 } else { 0o600 };
313 let path = self.fspath(path);
314 self.blocking(move || {
315 let res = std::fs::OpenOptions::new()
316 .read(options.read)
317 .write(options.write)
318 .append(options.append)
319 .truncate(options.truncate)
320 .create(options.create)
321 .create_new(options.create_new)
322 .mode(mode)
323 .open(path);
324 match res {
325 Ok(file) => Ok(Box::new(LocalFsFile(Some(file))) as Box<dyn DavFile>),
326 Err(e) => Err(e.into()),
327 }
328 })
329 .await
330 }
331 .boxed()
332 }
333
334 fn create_dir<'a>(&'a self, path: &'a DavPath) -> FsFuture<()> {
335 async move {
336 trace!("FS: create_dir {:?}", self.fspath_dbg(path));
337 if self.is_forbidden(path) {
338 return Err(FsError::Forbidden);
339 }
340 let mode = if self.inner.public { 0o755 } else { 0o700 };
341 let path = self.fspath(path);
342 self.blocking(move || {
343 std::fs::DirBuilder::new()
344 .mode(mode)
345 .create(path)
346 .map_err(|e| e.into())
347 })
348 .await
349 }
350 .boxed()
351 }
352
353 fn remove_dir<'a>(&'a self, path: &'a DavPath) -> FsFuture<()> {
354 async move {
355 trace!("FS: remove_dir {:?}", self.fspath_dbg(path));
356 let path = self.fspath(path);
357 self.blocking(move || std::fs::remove_dir(path).map_err(|e| e.into()))
358 .await
359 }
360 .boxed()
361 }
362
363 fn remove_file<'a>(&'a self, path: &'a DavPath) -> FsFuture<()> {
364 async move {
365 trace!("FS: remove_file {:?}", self.fspath_dbg(path));
366 if self.is_forbidden(path) {
367 return Err(FsError::Forbidden);
368 }
369 let path = self.fspath(path);
370 self.blocking(move || std::fs::remove_file(path).map_err(|e| e.into()))
371 .await
372 }
373 .boxed()
374 }
375
376 fn rename<'a>(&'a self, from: &'a DavPath, to: &'a DavPath) -> FsFuture<()> {
377 async move {
378 trace!("FS: rename {:?} {:?}", self.fspath_dbg(from), self.fspath_dbg(to));
379 if self.is_forbidden(from) || self.is_forbidden(to) {
380 return Err(FsError::Forbidden);
381 }
382 let frompath = self.fspath(from);
383 let topath = self.fspath(to);
384 self.blocking(move || {
385 match std::fs::rename(&frompath, &topath) {
386 Ok(v) => Ok(v),
387 Err(e) => {
388 if e.raw_os_error() == Some(libc::ENOTDIR) && frompath.is_dir() {
392 let _ = std::fs::remove_file(&topath);
394 std::fs::rename(frompath, topath).map_err(|e| e.into())
395 } else {
396 Err(e.into())
397 }
398 },
399 }
400 })
401 .await
402 }
403 .boxed()
404 }
405
406 fn copy<'a>(&'a self, from: &'a DavPath, to: &'a DavPath) -> FsFuture<()> {
407 async move {
408 trace!("FS: copy {:?} {:?}", self.fspath_dbg(from), self.fspath_dbg(to));
409 if self.is_forbidden(from) || self.is_forbidden(to) {
410 return Err(FsError::Forbidden);
411 }
412 let path_from = self.fspath(from);
413 let path_to = self.fspath(to);
414
415 match self.blocking(move || std::fs::copy(path_from, path_to)).await {
416 Ok(_) => Ok(()),
417 Err(e) => {
418 debug!(
419 "copy({:?}, {:?}) failed: {}",
420 self.fspath_dbg(from),
421 self.fspath_dbg(to),
422 e
423 );
424 Err(e.into())
425 },
426 }
427 }
428 .boxed()
429 }
430}
431
432struct ReadDirBatch {
434 iterator: Option<std::fs::ReadDir>,
435 buffer: VecDeque<io::Result<LocalFsDirEntry>>,
436}
437
438fn read_batch(iterator: Option<std::fs::ReadDir>, fs: LocalFs, do_meta: ReadDirMeta) -> ReadDirBatch {
441 let mut buffer = VecDeque::new();
442 let mut iterator = match iterator {
443 Some(i) => i,
444 None => {
445 return ReadDirBatch {
446 buffer,
447 iterator: None,
448 }
449 },
450 };
451 let _guard = match do_meta {
452 ReadDirMeta::None => None,
453 _ => fs.inner.fs_access_guard.as_ref().map(|f| f()),
454 };
455 for _ in 0..256 {
456 match iterator.next() {
457 Some(Ok(entry)) => {
458 let meta = match do_meta {
459 ReadDirMeta::Data => Meta::Data(std::fs::metadata(entry.path())),
460 ReadDirMeta::DataSymlink => Meta::Data(entry.metadata()),
461 ReadDirMeta::None => Meta::Fs(fs.clone()),
462 };
463 let d = LocalFsDirEntry {
464 meta: meta,
465 entry: entry,
466 };
467 buffer.push_back(Ok(d))
468 },
469 Some(Err(e)) => {
470 buffer.push_back(Err(e));
471 break;
472 },
473 None => break,
474 }
475 }
476 ReadDirBatch {
477 buffer,
478 iterator: Some(iterator),
479 }
480}
481
482impl LocalFsReadDir {
483 fn read_batch(&mut self) -> BoxFuture<'static, ReadDirBatch> {
488 let iterator = self.iterator.take();
489 let fs = self.fs.clone();
490 let do_meta = self.do_meta;
491
492 let fut: BoxFuture<ReadDirBatch> = blocking(move || read_batch(iterator, fs, do_meta)).boxed();
493 fut
494 }
495}
496
497impl<'a> Stream for LocalFsReadDir {
499 type Item = Box<dyn DavDirEntry>;
500
501 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
502 let this = Pin::into_inner(self);
503
504 if this.buffer.len() == 0 {
506 if this.fut.is_none() {
508 if this.iterator.is_none() {
509 return Poll::Ready(None);
510 }
511 this.fut = Some(this.read_batch());
512 }
513
514 let fut = this.fut.as_mut().unwrap();
516 pin_mut!(fut);
517 match Pin::new(&mut fut).poll(cx) {
518 Poll::Ready(batch) => {
519 this.fut.take();
520 if let Some(ref mut nb) = this.dir_cache {
521 for e in &batch.buffer {
522 if let Ok(ref e) = e {
523 nb.add(e.entry.file_name());
524 }
525 }
526 }
527 this.buffer = batch.buffer;
528 this.iterator = batch.iterator;
529 },
530 Poll::Pending => return Poll::Pending,
531 }
532 }
533
534 match this.buffer.pop_front() {
536 Some(Ok(item)) => Poll::Ready(Some(Box::new(item))),
537 Some(Err(_)) | None => {
538 this.iterator.take();
540 if let Some(ref mut nb) = this.dir_cache {
542 nb.finish();
543 }
544 Poll::Ready(None)
546 },
547 }
548 }
549}
550
551enum Is {
552 File,
553 Dir,
554 Symlink,
555}
556
557impl LocalFsDirEntry {
558 async fn is_a(&self, is: Is) -> FsResult<bool> {
559 match self.meta {
560 Meta::Data(Ok(ref meta)) => {
561 Ok(match is {
562 Is::File => meta.file_type().is_file(),
563 Is::Dir => meta.file_type().is_dir(),
564 Is::Symlink => meta.file_type().is_symlink(),
565 })
566 },
567 Meta::Data(Err(ref e)) => Err(e.into()),
568 Meta::Fs(ref fs) => {
569 let fullpath = self.entry.path();
570 let ft = fs
571 .blocking(move || std::fs::metadata(&fullpath))
572 .await?
573 .file_type();
574 Ok(match is {
575 Is::File => ft.is_file(),
576 Is::Dir => ft.is_dir(),
577 Is::Symlink => ft.is_symlink(),
578 })
579 },
580 }
581 }
582}
583
584impl DavDirEntry for LocalFsDirEntry {
585 fn metadata<'a>(&'a self) -> FsFuture<Box<dyn DavMetaData>> {
586 match self.meta {
587 Meta::Data(ref meta) => {
588 let m = match meta {
589 Ok(meta) => Ok(Box::new(LocalFsMetaData(meta.clone())) as Box<dyn DavMetaData>),
590 Err(e) => Err(e.into()),
591 };
592 Box::pin(future::ready(m))
593 },
594 Meta::Fs(ref fs) => {
595 let fullpath = self.entry.path();
596 fs.blocking(move || {
597 match std::fs::metadata(&fullpath) {
598 Ok(meta) => Ok(Box::new(LocalFsMetaData(meta)) as Box<dyn DavMetaData>),
599 Err(e) => Err(e.into()),
600 }
601 })
602 .boxed()
603 },
604 }
605 }
606
607 fn name(&self) -> Vec<u8> {
608 self.entry.file_name().as_bytes().to_vec()
609 }
610
611 fn is_dir<'a>(&'a self) -> FsFuture<bool> {
612 Box::pin(self.is_a(Is::Dir))
613 }
614
615 fn is_file<'a>(&'a self) -> FsFuture<bool> {
616 Box::pin(self.is_a(Is::File))
617 }
618
619 fn is_symlink<'a>(&'a self) -> FsFuture<bool> {
620 Box::pin(self.is_a(Is::Symlink))
621 }
622}
623
624impl DavFile for LocalFsFile {
625 fn metadata<'a>(&'a mut self) -> FsFuture<Box<dyn DavMetaData>> {
626 async move {
627 let file = self.0.take().unwrap();
628 let (meta, file) = blocking(move || (file.metadata(), file)).await;
629 self.0 = Some(file);
630 Ok(Box::new(LocalFsMetaData(meta?)) as Box<dyn DavMetaData>)
631 }
632 .boxed()
633 }
634
635 fn write_bytes<'a>(&'a mut self, buf: Bytes) -> FsFuture<()> {
636 async move {
637 let mut file = self.0.take().unwrap();
638 let (res, file) = blocking(move || (file.write_all(&buf), file)).await;
639 self.0 = Some(file);
640 res.map_err(|e| e.into())
641 }
642 .boxed()
643 }
644
645 fn write_buf<'a>(&'a mut self, mut buf: Box<dyn Buf + Send>) -> FsFuture<()> {
646 async move {
647 let mut file = self.0.take().unwrap();
648 let (res, file) = blocking(move || {
649 while buf.remaining() > 0 {
650 let n = match file.write(buf.chunk()) {
651 Ok(n) => n,
652 Err(e) => return (Err(e), file),
653 };
654 buf.advance(n);
655 }
656 (Ok(()), file)
657 })
658 .await;
659 self.0 = Some(file);
660 res.map_err(|e| e.into())
661 }
662 .boxed()
663 }
664
665 fn read_bytes<'a>(&'a mut self, count: usize) -> FsFuture<Bytes> {
666 async move {
667 let mut file = self.0.take().unwrap();
668 let (res, file) = blocking(move || {
669 let mut buf = BytesMut::with_capacity(count);
670 let res = unsafe {
671 buf.set_len(count);
672 file.read(&mut buf).map(|n| {
673 buf.set_len(n);
674 buf.freeze()
675 })
676 };
677 (res, file)
678 })
679 .await;
680 self.0 = Some(file);
681 res.map_err(|e| e.into())
682 }
683 .boxed()
684 }
685
686 fn seek<'a>(&'a mut self, pos: SeekFrom) -> FsFuture<u64> {
687 async move {
688 let mut file = self.0.take().unwrap();
689 let (res, file) = blocking(move || (file.seek(pos), file)).await;
690 self.0 = Some(file);
691 res.map_err(|e| e.into())
692 }
693 .boxed()
694 }
695
696 fn flush<'a>(&'a mut self) -> FsFuture<()> {
697 async move {
698 let mut file = self.0.take().unwrap();
699 let (res, file) = blocking(move || (file.flush(), file)).await;
700 self.0 = Some(file);
701 res.map_err(|e| e.into())
702 }
703 .boxed()
704 }
705}
706
707impl DavMetaData for LocalFsMetaData {
708 fn len(&self) -> u64 {
709 self.0.len()
710 }
711 fn created(&self) -> FsResult<SystemTime> {
712 self.0.created().map_err(|e| e.into())
713 }
714 fn modified(&self) -> FsResult<SystemTime> {
715 self.0.modified().map_err(|e| e.into())
716 }
717 fn accessed(&self) -> FsResult<SystemTime> {
718 self.0.accessed().map_err(|e| e.into())
719 }
720
721 fn status_changed(&self) -> FsResult<SystemTime> {
722 Ok(UNIX_EPOCH + Duration::new(self.0.ctime() as u64, 0))
723 }
724
725 fn is_dir(&self) -> bool {
726 self.0.is_dir()
727 }
728 fn is_file(&self) -> bool {
729 self.0.is_file()
730 }
731 fn is_symlink(&self) -> bool {
732 self.0.file_type().is_symlink()
733 }
734 fn executable(&self) -> FsResult<bool> {
735 if self.0.is_file() {
736 return Ok((self.0.permissions().mode() & 0o100) > 0);
737 }
738 Err(FsError::NotImplemented)
739 }
740
741 fn etag(&self) -> Option<String> {
743 let modified = self.0.modified().ok()?;
744 let t = modified.duration_since(UNIX_EPOCH).ok()?;
745 let t = t.as_secs() * 1000000 + t.subsec_nanos() as u64 / 1000;
746 if self.is_file() {
747 Some(format!("{:x}-{:x}-{:x}", self.0.ino(), self.0.len(), t))
748 } else {
749 Some(format!("{:x}-{:x}", self.0.ino(), t))
750 }
751 }
752}
753
754impl From<&io::Error> for FsError {
755 fn from(e: &io::Error) -> Self {
756 if let Some(errno) = e.raw_os_error() {
757 match errno {
759 libc::EMLINK | libc::ENOSPC | libc::EDQUOT => return FsError::InsufficientStorage,
760 libc::EFBIG => return FsError::TooLarge,
761 libc::EACCES | libc::EPERM => return FsError::Forbidden,
762 libc::ENOTEMPTY | libc::EEXIST => return FsError::Exists,
763 libc::ELOOP => return FsError::LoopDetected,
764 libc::ENAMETOOLONG => return FsError::PathTooLong,
765 libc::ENOTDIR => return FsError::Forbidden,
766 libc::EISDIR => return FsError::Forbidden,
767 libc::EROFS => return FsError::Forbidden,
768 libc::ENOENT => return FsError::NotFound,
769 libc::ENOSYS => return FsError::NotImplemented,
770 libc::EXDEV => return FsError::IsRemote,
771 _ => {},
772 }
773 } else {
774 return FsError::NotImplemented;
777 }
778 match e.kind() {
780 ErrorKind::NotFound => FsError::NotFound,
781 ErrorKind::PermissionDenied => FsError::Forbidden,
782 _ => FsError::GeneralFailure,
783 }
784 }
785}
786
787impl From<io::Error> for FsError {
788 fn from(e: io::Error) -> Self {
789 (&e).into()
790 }
791}