1use std::convert::TryInto;
2use std::ops::Deref;
3use std::path::{Path, PathBuf};
4use std::sync::Arc;
5use std::{fmt, io};
6use futures::{join, Future, TryFutureExt};
7use safecast::AsType;
8use tokio::fs;
9use tokio::sync::{
10 OwnedRwLockMappedWriteGuard, OwnedRwLockReadGuard, OwnedRwLockWriteGuard, RwLock,
11 RwLockMappedWriteGuard, RwLockReadGuard, RwLockWriteGuard,
12};
13
14use super::cache::Cache;
15use super::Result;
16
17const TMP: &'static str = "_freqfs";
18
19pub type FileReadGuard<'a, F> = RwLockReadGuard<'a, F>;
21
22pub type FileReadGuardOwned<FE, F> = OwnedRwLockReadGuard<Option<FE>, F>;
24
25pub type FileWriteGuard<'a, F> = RwLockMappedWriteGuard<'a, F>;
27
28pub type FileWriteGuardOwned<FE, F> = OwnedRwLockMappedWriteGuard<Option<FE>, F>;
30
31pub trait FileDeref {
33 type File;
35
36 fn as_file(&self) -> &Self::File;
38}
39
40impl<'a, F> FileDeref for FileReadGuard<'a, F> {
41 type File = F;
42
43 fn as_file(&self) -> &F {
44 self.deref()
45 }
46}
47
48impl<'a, F> FileDeref for Arc<FileReadGuard<'a, F>> {
49 type File = F;
50
51 fn as_file(&self) -> &F {
52 self.deref().as_file()
53 }
54}
55
56impl<FE, F> FileDeref for FileReadGuardOwned<FE, F> {
57 type File = F;
58
59 fn as_file(&self) -> &F {
60 self.deref()
61 }
62}
63
64impl<FE, F> FileDeref for Arc<FileReadGuardOwned<FE, F>> {
65 type File = F;
66
67 fn as_file(&self) -> &F {
68 self.deref().as_file()
69 }
70}
71
72impl<'a, F> FileDeref for FileWriteGuard<'a, F> {
73 type File = F;
74
75 fn as_file(&self) -> &F {
76 self.deref()
77 }
78}
79
80impl<FE, F> FileDeref for FileWriteGuardOwned<FE, F> {
81 type File = F;
82
83 fn as_file(&self) -> &F {
84 self.deref()
85 }
86}
87
88#[trait_variant::make(Send)]
90pub trait FileLoad: Send + Sync + Sized + 'static {
91 async fn load(path: &Path, file: fs::File, metadata: std::fs::Metadata) -> Result<Self>;
93}
94
95#[trait_variant::make(Send)]
97pub trait FileSave: Send + Sync + Sized + 'static {
98 async fn save(&self, file: &mut fs::File) -> Result<u64>;
100}
101
102#[cfg(feature = "stream")]
103impl<T> FileLoad for T
104where
105 T: destream::de::FromStream<Context = ()> + Send + Sync + 'static,
106{
107 async fn load(_path: &Path, file: fs::File, _metadata: std::fs::Metadata) -> Result<Self> {
108 tbon::de::read_from((), file)
109 .map_err(|cause| io::Error::new(io::ErrorKind::InvalidData, cause))
110 .await
111 }
112}
113
114#[cfg(feature = "stream")]
115impl<T> FileSave for T
116where
117 T: for<'en> destream::en::ToStream<'en> + Send + Sync + 'static,
118{
119 async fn save(&self, file: &mut fs::File) -> Result<u64> {
120 use futures::TryStreamExt;
121
122 let encoded = tbon::en::encode(self)
123 .map_err(|cause| io::Error::new(io::ErrorKind::InvalidData, cause))?;
124
125 let mut reader = tokio_util::io::StreamReader::new(
126 encoded
127 .map_ok(bytes::Bytes::from)
128 .map_err(|cause| io::Error::new(io::ErrorKind::InvalidData, cause)),
129 );
130
131 tokio::io::copy(&mut reader, file).await
132 }
133}
134
135#[derive(Copy, Clone)]
136enum FileLockState {
137 Pending,
138 Read(usize),
139 Modified(usize),
140 Deleted(bool),
141}
142
143impl FileLockState {
144 fn is_deleted(&self) -> bool {
145 match self {
146 Self::Deleted(_) => true,
147 _ => false,
148 }
149 }
150
151 fn is_loaded(&self) -> bool {
152 match self {
153 Self::Read(_) | Self::Modified(_) => true,
154 _ => false,
155 }
156 }
157
158 fn is_pending(&self) -> bool {
159 match self {
160 Self::Pending => true,
161 _ => false,
162 }
163 }
164
165 fn upgrade(&mut self) {
166 let size = match self {
167 Self::Read(size) | Self::Modified(size) => *size,
168 _ => unreachable!("upgrade a file not in the cache"),
169 };
170
171 *self = Self::Modified(size);
172 }
173}
174
175pub struct FileLock<FE> {
177 cache: Arc<Cache<FE>>,
178 path: Arc<PathBuf>,
179 state: Arc<RwLock<FileLockState>>,
180 contents: Arc<RwLock<Option<FE>>>,
181}
182
183impl<FE> Clone for FileLock<FE> {
184 fn clone(&self) -> Self {
185 Self {
186 cache: self.cache.clone(),
187 path: self.path.clone(),
188 state: self.state.clone(),
189 contents: self.contents.clone(),
190 }
191 }
192}
193
194impl<FE> FileLock<FE> {
195 pub fn new<F>(cache: Arc<Cache<FE>>, path: PathBuf, contents: F, size: usize) -> Self
197 where
198 FE: From<F>,
199 {
200 Self {
201 cache,
202 path: Arc::new(path),
203 state: Arc::new(RwLock::new(FileLockState::Modified(size))),
204 contents: Arc::new(RwLock::new(Some(contents.into()))),
205 }
206 }
207
208 pub fn path(&self) -> &Path {
210 self.path.as_path()
211 }
212
213 pub fn load<F>(cache: Arc<Cache<FE>>, path: PathBuf) -> Self
215 where
216 FE: From<F>,
217 {
218 Self {
219 cache,
220 path: Arc::new(path),
221 state: Arc::new(RwLock::new(FileLockState::Pending)),
222 contents: Arc::new(RwLock::new(None)),
223 }
224 }
225
226 pub async fn overwrite(&self, other: &Self) -> Result<()>
229 where
230 FE: Clone,
231 {
232 let (mut this, that) = join!(self.state.write(), other.state.read());
233
234 let old_size = match &*this {
235 FileLockState::Pending | FileLockState::Deleted(_) => 0,
236 FileLockState::Read(size) | FileLockState::Modified(size) => *size,
237 };
238
239 let new_size = match &*that {
240 FileLockState::Pending => {
241 debug_assert!(other.path.exists());
242
243 create_dir(self.path.parent().expect("file parent dir")).await?;
244
245 match fs::copy(other.path.as_path(), self.path.as_path()).await {
246 Ok(_) => {}
247 Err(cause) if cause.kind() == io::ErrorKind::NotFound => {
248 #[cfg(debug_assertions)]
249 let message = format!(
250 "tried to copy a file from a nonexistent source: {}",
251 other.path.display()
252 );
253
254 #[cfg(not(debug_assertions))]
255 let message = "tried to copy a file from a nonexistent source";
256
257 return Err(io::Error::new(io::ErrorKind::NotFound, message));
258 }
259 Err(cause) => return Err(cause),
260 }
261
262 *this = FileLockState::Pending;
263 0
264 }
265 FileLockState::Deleted(_sync) => {
266 *this = FileLockState::Deleted(true);
267 0
268 }
269 FileLockState::Read(size) | FileLockState::Modified(size) => {
270 *this = FileLockState::Modified(*size);
271 *size
272 }
273 };
274
275 if this.is_loaded() {
276 let (mut this_data, that_data) = join!(self.contents.write(), other.contents.read());
277 let that_data = that_data.as_ref().expect("file");
278 *this_data = Some(FE::clone(&*that_data));
279 }
280
281 self.cache.resize(old_size, new_size);
282
283 Ok(())
284 }
285
286 pub async fn read<F>(&self) -> Result<FileReadGuard<F>>
288 where
289 F: FileLoad,
290 FE: AsType<F>,
291 {
292 let mut state = self.state.write().await;
293
294 if state.is_deleted() {
295 return Err(deleted());
296 }
297
298 let guard = if state.is_pending() {
299 let mut contents = self.contents.try_write().expect("file contents");
300 let (size, entry) = load(&**self.path).await?;
301
302 self.cache.bump(&self.path, Some(size));
303
304 *state = FileLockState::Read(size);
305 *contents = Some(entry);
306
307 contents.downgrade()
308 } else {
309 self.cache.bump(&self.path, None);
310 self.contents.read().await
311 };
312
313 read_type(guard)
314 }
315
316 pub fn try_read<F>(&self) -> Result<FileReadGuard<F>>
318 where
319 F: FileLoad,
320 FE: AsType<F>,
321 {
322 let state = self.state.try_read().map_err(would_block)?;
323
324 match &*state {
325 FileLockState::Pending => Err(would_block("this file is not in the cache")),
326 FileLockState::Deleted(_sync) => Err(deleted()),
327 FileLockState::Read(_size) | FileLockState::Modified(_size) => {
328 self.cache.bump(&self.path, None);
329 let guard = self.contents.try_read().map_err(would_block)?;
330 read_type(guard)
331 }
332 }
333 }
334
335 pub async fn read_owned<F>(&self) -> Result<FileReadGuardOwned<FE, F>>
337 where
338 F: FileLoad,
339 FE: AsType<F>,
340 {
341 let mut state = self.state.write().await;
342
343 if state.is_deleted() {
344 return Err(deleted());
345 }
346
347 let guard = if state.is_pending() {
348 let mut contents = self
349 .contents
350 .clone()
351 .try_write_owned()
352 .expect("file contents");
353
354 let (size, entry) = load(&**self.path).await?;
355
356 self.cache.bump(&self.path, Some(size));
357
358 *state = FileLockState::Read(size);
359 *contents = Some(entry);
360
361 contents.downgrade()
362 } else {
363 self.cache.bump(&self.path, None);
364 self.contents.clone().read_owned().await
365 };
366
367 read_type_owned(guard)
368 }
369
370 pub fn try_read_owned<F>(&self) -> Result<FileReadGuardOwned<FE, F>>
372 where
373 F: FileLoad,
374 FE: AsType<F>,
375 {
376 let state = self.state.try_read().map_err(would_block)?;
377
378 match &*state {
379 FileLockState::Pending => Err(would_block("this file is not in the cache")),
380 FileLockState::Deleted(_sync) => Err(deleted()),
381 FileLockState::Read(_size) | FileLockState::Modified(_size) => {
382 self.cache.bump(&self.path, None);
383 let guard = self
384 .contents
385 .clone()
386 .try_read_owned()
387 .map_err(would_block)?;
388
389 read_type_owned(guard)
390 }
391 }
392 }
393
394 pub async fn into_read<F>(self) -> Result<FileReadGuardOwned<FE, F>>
396 where
397 F: FileLoad,
398 FE: AsType<F>,
399 {
400 let mut state = self.state.write().await;
401
402 if state.is_deleted() {
403 return Err(deleted());
404 }
405
406 let guard = if state.is_pending() {
407 let mut contents = self.contents.try_write_owned().expect("file contents");
408 let (size, entry) = load(&**self.path).await?;
409
410 self.cache.bump(&self.path, Some(size));
411
412 *state = FileLockState::Read(size);
413 *contents = Some(entry);
414
415 contents.downgrade()
416 } else {
417 self.cache.bump(&self.path, None);
418 self.contents.read_owned().await
419 };
420
421 read_type_owned(guard)
422 }
423
424 pub async fn write<F>(&self) -> Result<FileWriteGuard<F>>
426 where
427 F: FileLoad,
428 FE: AsType<F>,
429 {
430 let mut state = self.state.write().await;
431
432 if state.is_deleted() {
433 return Err(deleted());
434 }
435
436 let guard = if state.is_pending() {
437 let mut contents = self.contents.try_write().expect("file contents");
438 let (size, entry) = load(&**self.path).await?;
439
440 self.cache.bump(&self.path, Some(size));
441
442 *state = FileLockState::Modified(size);
443 *contents = Some(entry);
444
445 self.cache.bump(&self.path, Some(size));
446
447 contents
448 } else {
449 state.upgrade();
450 self.cache.bump(&self.path, None);
451 self.contents.write().await
452 };
453
454 write_type(guard)
455 }
456
457 pub fn try_write<F>(&self) -> Result<FileWriteGuard<F>>
459 where
460 F: FileLoad,
461 FE: AsType<F>,
462 {
463 let mut state = self.state.try_write().map_err(would_block)?;
464
465 if state.is_pending() {
466 Err(would_block("this file is not in the cache"))
467 } else if state.is_deleted() {
468 Err(deleted())
469 } else {
470 state.upgrade();
471 self.cache.bump(&self.path, None);
472 let guard = self.contents.try_write().map_err(would_block)?;
473 write_type(guard)
474 }
475 }
476
477 pub async fn write_owned<F>(&self) -> Result<FileWriteGuardOwned<FE, F>>
479 where
480 F: FileLoad,
481 FE: AsType<F>,
482 {
483 let mut state = self.state.write().await;
484
485 if state.is_deleted() {
486 return Err(deleted());
487 }
488
489 let guard = if state.is_pending() {
490 let mut contents = self
491 .contents
492 .clone()
493 .try_write_owned()
494 .expect("file contents");
495
496 let (size, entry) = load(&**self.path).await?;
497 self.cache.bump(&self.path, Some(size));
498
499 *state = FileLockState::Modified(size);
500 *contents = Some(entry);
501
502 contents
503 } else {
504 state.upgrade();
505 self.cache.bump(&self.path, None);
506 self.contents.clone().write_owned().await
507 };
508
509 write_type_owned(guard)
510 }
511
512 pub fn try_write_owned<F>(&self) -> Result<FileWriteGuardOwned<FE, F>>
514 where
515 FE: AsType<F>,
516 {
517 let mut state = self.state.try_write().map_err(would_block)?;
518
519 if state.is_pending() {
520 Err(would_block("this file is not in the cache"))
521 } else if state.is_deleted() {
522 Err(deleted())
523 } else {
524 state.upgrade();
525 self.cache.bump(&self.path, None);
526
527 let guard = self
528 .contents
529 .clone()
530 .try_write_owned()
531 .map_err(would_block)?;
532
533 write_type_owned(guard)
534 }
535 }
536
537 pub async fn into_write<F>(self) -> Result<FileWriteGuardOwned<FE, F>>
539 where
540 F: FileLoad,
541 FE: AsType<F>,
542 {
543 self.write_owned().await
544 }
545
546 pub fn try_into_write<F>(self) -> Result<FileWriteGuardOwned<FE, F>>
548 where
549 F: FileLoad,
550 FE: AsType<F>,
551 {
552 self.try_write_owned()
553 }
554
555 pub async fn sync(&self) -> Result<()>
557 where
558 FE: FileSave + Clone,
559 {
560 let mut state = self.state.write().await;
561
562 let new_state = match &*state {
563 FileLockState::Pending => FileLockState::Pending,
564 FileLockState::Read(size) => FileLockState::Read(*size),
565 FileLockState::Modified(old_size) => {
566 #[cfg(feature = "logging")]
567 log::trace!("sync modified file {}...", self.path.display());
568
569 let contents = self.contents.read().await;
570 let contents = contents.as_ref().cloned().expect("file");
571
572 let new_size = persist(self.path.clone(), contents).await?;
573
574 self.cache.resize(*old_size, new_size as usize);
575 FileLockState::Read(new_size as usize)
576 }
577 FileLockState::Deleted(needs_sync) => {
578 if *needs_sync {
579 if self.path.exists() {
580 delete_file(&self.path).await?;
581 }
582 }
583
584 FileLockState::Deleted(false)
585 }
586 };
587
588 *state = new_state;
589
590 Ok(())
591 }
592
593 pub(crate) async fn delete(&self, file_only: bool) {
594 let mut file_state = self.state.write().await;
595
596 let size = match &*file_state {
597 FileLockState::Pending => 0,
598 FileLockState::Read(size) => *size,
599 FileLockState::Modified(size) => *size,
600 FileLockState::Deleted(_) => return,
601 };
602
603 self.cache.remove(&self.path, size);
604
605 *file_state = FileLockState::Deleted(file_only);
606 }
607
608 pub(crate) fn evict(self) -> Option<(usize, impl Future<Output = Result<()>> + Send)>
609 where
610 FE: FileSave + Clone + 'static,
611 {
612 let mut state = self.state.try_write_owned().ok()?;
614
615 let (old_size, contents, modified) = match &*state {
616 FileLockState::Pending => {
617 return None;
619 }
620 FileLockState::Read(size) => {
621 let contents = self.contents.try_write_owned().ok()?;
622 (*size, contents, false)
623 }
624 FileLockState::Modified(size) => {
625 let contents = self.contents.try_write_owned().ok()?;
626 (*size, contents, true)
627 }
628 FileLockState::Deleted(_) => unreachable!("evict a deleted file"),
629 };
630
631 let eviction = async move {
632 if modified {
633 let contents = contents.as_ref().cloned().expect("file");
634 persist(self.path.clone(), contents).await?;
635 }
636
637 self.cache.resize(old_size, 0);
638
639 *state = FileLockState::Pending;
640 Ok(())
641 };
642
643 Some((old_size, eviction))
644 }
645}
646
647impl<FE> fmt::Debug for FileLock<FE> {
648 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
649 #[cfg(debug_assertions)]
650 write!(f, "file at {}", self.path.display())?;
651
652 #[cfg(not(debug_assertions))]
653 f.write_str("a file lock")?;
654
655 Ok(())
656 }
657}
658
659async fn load<F: FileLoad, FE: From<F>>(path: &Path) -> Result<(usize, FE)> {
660 let file = match fs::File::open(path).await {
661 Ok(file) => file,
662 Err(cause) if cause.kind() == io::ErrorKind::NotFound => {
663 #[cfg(debug_assertions)]
664 let message = format!("there is no file at {}", path.display());
665
666 #[cfg(not(debug_assertions))]
667 let message = "the requested file is not in cache and does not exist on the filesystem";
668
669 return Err(io::Error::new(io::ErrorKind::NotFound, message));
670 }
671 Err(cause) => return Err(cause),
672 };
673
674 let metadata = file.metadata().await?;
675 let size = match metadata.len().try_into() {
676 Ok(size) => size,
677 _ => {
678 return Err(io::Error::new(
679 io::ErrorKind::OutOfMemory,
680 "this file is too large to load into the cache",
681 ))
682 }
683 };
684
685 let file = F::load(path, file, metadata).await?;
686 let entry = FE::from(file);
687
688 Ok((size, entry))
689}
690
691fn persist<'a, FE: FileSave>(
694 path: Arc<PathBuf>,
695 file: FE,
696) -> impl Future<Output = Result<u64>> + Send {
697 async move {
698 let tmp = if let Some(ext) = path.extension().and_then(|ext| ext.to_str()) {
699 path.with_extension(format!("{}_{}", ext, TMP))
700 } else {
701 path.with_extension(TMP)
702 };
703
704 let size = {
705 let mut tmp_file = if tmp.exists() {
706 fs::OpenOptions::new()
707 .truncate(true)
708 .write(true)
709 .open(tmp.as_path())
710 .await?
711 } else {
712 let parent = tmp.parent().expect("dir");
713 let mut i = 0;
714 while !parent.exists() {
715 create_dir(parent).await?;
716 tokio::time::sleep(tokio::time::Duration::from_millis(i)).await;
717 i += 1;
718 }
719
720 assert!(parent.exists());
721
722 let tmp_file = fs::File::create(tmp.as_path())
723 .map_err(|cause| {
724 io::Error::new(
725 cause.kind(),
726 format!("failed to create tmp file: {}", cause),
727 )
728 })
729 .await?;
730
731 tmp_file
732 };
733
734 assert!(tmp.exists());
735 assert!(!tmp.is_dir());
736
737 let size = file
738 .save(&mut tmp_file)
739 .map_err(|cause| {
740 io::Error::new(cause.kind(), format!("failed to save tmp file: {}", cause))
741 })
742 .await?;
743
744 size
745 };
746
747 tokio::fs::rename(tmp.as_path(), path.as_path())
748 .map_err(|cause| {
749 io::Error::new(
750 cause.kind(),
751 format!("failed to rename tmp file: {}", cause),
752 )
753 })
754 .await?;
755
756 Ok(size)
757 }
758}
759
760async fn create_dir(path: &Path) -> Result<()> {
761 if path.exists() {
762 Ok(())
763 } else {
764 match tokio::fs::create_dir_all(path).await {
765 Ok(()) => Ok(()),
766 Err(cause) => {
767 if path.exists() && path.is_dir() {
768 Ok(())
769 } else {
770 return Err(io::Error::new(
771 cause.kind(),
772 format!("failed to create directory: {}", cause),
773 ));
774 }
775 }
776 }
777 }
778}
779
780#[inline]
781fn read_type<F, T>(maybe_file: RwLockReadGuard<Option<F>>) -> Result<RwLockReadGuard<T>>
782where
783 F: AsType<T>,
784{
785 match RwLockReadGuard::try_map(maybe_file, |file| file.as_ref().expect("file").as_type()) {
786 Ok(file) => Ok(file),
787 Err(_) => Err(invalid_data(format!(
788 "invalid file type, expected {}",
789 std::any::type_name::<F>()
790 ))),
791 }
792}
793
794#[inline]
795fn read_type_owned<F, T>(
796 maybe_file: OwnedRwLockReadGuard<Option<F>>,
797) -> Result<OwnedRwLockReadGuard<Option<F>, T>>
798where
799 F: AsType<T>,
800{
801 match OwnedRwLockReadGuard::try_map(maybe_file, |file| file.as_ref().expect("file").as_type()) {
802 Ok(file) => Ok(file),
803 Err(_) => Err(invalid_data(format!(
804 "invalid file type, expected {}",
805 std::any::type_name::<F>()
806 ))),
807 }
808}
809
810#[inline]
811fn write_type<F, T>(maybe_file: RwLockWriteGuard<Option<F>>) -> Result<RwLockMappedWriteGuard<T>>
812where
813 F: AsType<T>,
814{
815 match RwLockWriteGuard::try_map(maybe_file, |file| {
816 file.as_mut().expect("file").as_type_mut()
817 }) {
818 Ok(file) => Ok(file),
819 Err(_) => Err(invalid_data(format!(
820 "invalid file type, expected {}",
821 std::any::type_name::<F>()
822 ))),
823 }
824}
825
826#[inline]
827fn write_type_owned<F, T>(
828 maybe_file: OwnedRwLockWriteGuard<Option<F>>,
829) -> Result<OwnedRwLockMappedWriteGuard<Option<F>, T>>
830where
831 F: AsType<T>,
832{
833 match OwnedRwLockWriteGuard::try_map(maybe_file, |file| {
834 file.as_mut().expect("file").as_type_mut()
835 }) {
836 Ok(file) => Ok(file),
837 Err(_) => Err(invalid_data(format!(
838 "invalid file type, expected {}",
839 std::any::type_name::<F>()
840 ))),
841 }
842}
843
844async fn delete_file(path: &Path) -> Result<()> {
845 match fs::remove_file(path).await {
846 Ok(()) => Ok(()),
847 Err(cause) if cause.kind() == io::ErrorKind::NotFound => {
848 Ok(())
850 }
851 Err(cause) => Err(cause),
852 }
853}
854
855#[inline]
856fn deleted() -> io::Error {
857 io::Error::new(io::ErrorKind::NotFound, "this file has been deleted")
858}
859
860#[inline]
861fn invalid_data<E>(cause: E) -> io::Error
862where
863 E: Into<Box<dyn std::error::Error + Send + Sync>>,
864{
865 io::Error::new(io::ErrorKind::InvalidData, cause)
866}
867
868#[inline]
869fn would_block<E>(cause: E) -> io::Error
870where
871 E: Into<Box<dyn std::error::Error + Send + Sync>>,
872{
873 io::Error::new(io::ErrorKind::WouldBlock, cause)
874}