1use std::convert::TryInto;
2use std::ops::Deref;
3use std::path::{Path, PathBuf};
4use std::sync::Arc;
5use std::{fmt, io};
6use futures::{join, Future, TryFutureExt};
7use safecast::AsType;
8use tokio::fs;
9use tokio::sync::{
10    OwnedRwLockMappedWriteGuard, OwnedRwLockReadGuard, OwnedRwLockWriteGuard, RwLock,
11    RwLockMappedWriteGuard, RwLockReadGuard, RwLockWriteGuard,
12};
13
14use super::cache::Cache;
15use super::Result;
16
17const TMP: &'static str = "_freqfs";
18
19pub type FileReadGuard<'a, F> = RwLockReadGuard<'a, F>;
21
22pub type FileReadGuardOwned<FE, F> = OwnedRwLockReadGuard<Option<FE>, F>;
24
25pub type FileWriteGuard<'a, F> = RwLockMappedWriteGuard<'a, F>;
27
28pub type FileWriteGuardOwned<FE, F> = OwnedRwLockMappedWriteGuard<Option<FE>, F>;
30
31pub trait FileDeref {
33    type File;
35
36    fn as_file(&self) -> &Self::File;
38}
39
40impl<'a, F> FileDeref for FileReadGuard<'a, F> {
41    type File = F;
42
43    fn as_file(&self) -> &F {
44        self.deref()
45    }
46}
47
48impl<'a, F> FileDeref for Arc<FileReadGuard<'a, F>> {
49    type File = F;
50
51    fn as_file(&self) -> &F {
52        self.deref().as_file()
53    }
54}
55
56impl<FE, F> FileDeref for FileReadGuardOwned<FE, F> {
57    type File = F;
58
59    fn as_file(&self) -> &F {
60        self.deref()
61    }
62}
63
64impl<FE, F> FileDeref for Arc<FileReadGuardOwned<FE, F>> {
65    type File = F;
66
67    fn as_file(&self) -> &F {
68        self.deref().as_file()
69    }
70}
71
72impl<'a, F> FileDeref for FileWriteGuard<'a, F> {
73    type File = F;
74
75    fn as_file(&self) -> &F {
76        self.deref()
77    }
78}
79
80impl<FE, F> FileDeref for FileWriteGuardOwned<FE, F> {
81    type File = F;
82
83    fn as_file(&self) -> &F {
84        self.deref()
85    }
86}
87
88#[trait_variant::make(Send)]
90pub trait FileLoad: Send + Sync + Sized + 'static {
91    async fn load(path: &Path, file: fs::File, metadata: std::fs::Metadata) -> Result<Self>;
93}
94
95#[trait_variant::make(Send)]
97pub trait FileSave: Send + Sync + Sized + 'static {
98    async fn save(&self, file: &mut fs::File) -> Result<u64>;
100}
101
102#[cfg(feature = "stream")]
103impl<T> FileLoad for T
104where
105    T: destream::de::FromStream<Context = ()> + Send + Sync + 'static,
106{
107    async fn load(_path: &Path, file: fs::File, _metadata: std::fs::Metadata) -> Result<Self> {
108        tbon::de::read_from((), file)
109            .map_err(|cause| io::Error::new(io::ErrorKind::InvalidData, cause))
110            .await
111    }
112}
113
114#[cfg(feature = "stream")]
115impl<T> FileSave for T
116where
117    T: for<'en> destream::en::ToStream<'en> + Send + Sync + 'static,
118{
119    async fn save(&self, file: &mut fs::File) -> Result<u64> {
120        use futures::TryStreamExt;
121
122        let encoded = tbon::en::encode(self)
123            .map_err(|cause| io::Error::new(io::ErrorKind::InvalidData, cause))?;
124
125        let mut reader = tokio_util::io::StreamReader::new(
126            encoded
127                .map_ok(bytes::Bytes::from)
128                .map_err(|cause| io::Error::new(io::ErrorKind::InvalidData, cause)),
129        );
130
131        tokio::io::copy(&mut reader, file).await
132    }
133}
134
135#[derive(Copy, Clone)]
136enum FileLockState {
137    Pending,
138    Read(usize),
139    Modified(usize),
140    Deleted(bool),
141}
142
143impl FileLockState {
144    fn is_deleted(&self) -> bool {
145        match self {
146            Self::Deleted(_) => true,
147            _ => false,
148        }
149    }
150
151    fn is_loaded(&self) -> bool {
152        match self {
153            Self::Read(_) | Self::Modified(_) => true,
154            _ => false,
155        }
156    }
157
158    fn is_pending(&self) -> bool {
159        match self {
160            Self::Pending => true,
161            _ => false,
162        }
163    }
164
165    fn upgrade(&mut self) {
166        let size = match self {
167            Self::Read(size) | Self::Modified(size) => *size,
168            _ => unreachable!("upgrade a file not in the cache"),
169        };
170
171        *self = Self::Modified(size);
172    }
173}
174
175pub struct FileLock<FE> {
177    cache: Arc<Cache<FE>>,
178    path: Arc<PathBuf>,
179    state: Arc<RwLock<FileLockState>>,
180    contents: Arc<RwLock<Option<FE>>>,
181}
182
183impl<FE> Clone for FileLock<FE> {
184    fn clone(&self) -> Self {
185        Self {
186            cache: self.cache.clone(),
187            path: self.path.clone(),
188            state: self.state.clone(),
189            contents: self.contents.clone(),
190        }
191    }
192}
193
194impl<FE> FileLock<FE> {
195    pub fn new<F>(cache: Arc<Cache<FE>>, path: PathBuf, contents: F, size: usize) -> Self
197    where
198        FE: From<F>,
199    {
200        Self {
201            cache,
202            path: Arc::new(path),
203            state: Arc::new(RwLock::new(FileLockState::Modified(size))),
204            contents: Arc::new(RwLock::new(Some(contents.into()))),
205        }
206    }
207
208    pub fn path(&self) -> &Path {
210        self.path.as_path()
211    }
212
213    pub fn load<F>(cache: Arc<Cache<FE>>, path: PathBuf) -> Self
215    where
216        FE: From<F>,
217    {
218        Self {
219            cache,
220            path: Arc::new(path),
221            state: Arc::new(RwLock::new(FileLockState::Pending)),
222            contents: Arc::new(RwLock::new(None)),
223        }
224    }
225
226    pub async fn overwrite(&self, other: &Self) -> Result<()>
229    where
230        FE: Clone,
231    {
232        let (mut this, that) = join!(self.state.write(), other.state.read());
233
234        let old_size = match &*this {
235            FileLockState::Pending | FileLockState::Deleted(_) => 0,
236            FileLockState::Read(size) | FileLockState::Modified(size) => *size,
237        };
238
239        let new_size = match &*that {
240            FileLockState::Pending => {
241                debug_assert!(other.path.exists());
242
243                create_dir(self.path.parent().expect("file parent dir")).await?;
244
245                match fs::copy(other.path.as_path(), self.path.as_path()).await {
246                    Ok(_) => {}
247                    Err(cause) if cause.kind() == io::ErrorKind::NotFound => {
248                        #[cfg(debug_assertions)]
249                        let message = format!(
250                            "tried to copy a file from a nonexistent source: {}",
251                            other.path.display()
252                        );
253
254                        #[cfg(not(debug_assertions))]
255                        let message = "tried to copy a file from a nonexistent source";
256
257                        return Err(io::Error::new(io::ErrorKind::NotFound, message));
258                    }
259                    Err(cause) => return Err(cause),
260                }
261
262                *this = FileLockState::Pending;
263                0
264            }
265            FileLockState::Deleted(_sync) => {
266                *this = FileLockState::Deleted(true);
267                0
268            }
269            FileLockState::Read(size) | FileLockState::Modified(size) => {
270                *this = FileLockState::Modified(*size);
271                *size
272            }
273        };
274
275        if this.is_loaded() {
276            let (mut this_data, that_data) = join!(self.contents.write(), other.contents.read());
277            let that_data = that_data.as_ref().expect("file");
278            *this_data = Some(FE::clone(&*that_data));
279        }
280
281        self.cache.resize(old_size, new_size);
282
283        Ok(())
284    }
285
286    pub async fn read<F>(&self) -> Result<FileReadGuard<F>>
288    where
289        F: FileLoad,
290        FE: AsType<F>,
291    {
292        let mut state = self.state.write().await;
293
294        if state.is_deleted() {
295            return Err(deleted());
296        }
297
298        let guard = if state.is_pending() {
299            let mut contents = self.contents.try_write().expect("file contents");
300            let (size, entry) = load(&**self.path).await?;
301
302            self.cache.bump(&self.path, Some(size));
303
304            *state = FileLockState::Read(size);
305            *contents = Some(entry);
306
307            contents.downgrade()
308        } else {
309            self.cache.bump(&self.path, None);
310            self.contents.read().await
311        };
312
313        read_type(guard)
314    }
315
316    pub fn try_read<F>(&self) -> Result<FileReadGuard<F>>
318    where
319        F: FileLoad,
320        FE: AsType<F>,
321    {
322        let state = self.state.try_read().map_err(would_block)?;
323
324        match &*state {
325            FileLockState::Pending => Err(would_block("this file is not in the cache")),
326            FileLockState::Deleted(_sync) => Err(deleted()),
327            FileLockState::Read(_size) | FileLockState::Modified(_size) => {
328                self.cache.bump(&self.path, None);
329                let guard = self.contents.try_read().map_err(would_block)?;
330                read_type(guard)
331            }
332        }
333    }
334
335    pub async fn read_owned<F>(&self) -> Result<FileReadGuardOwned<FE, F>>
337    where
338        F: FileLoad,
339        FE: AsType<F>,
340    {
341        let mut state = self.state.write().await;
342
343        if state.is_deleted() {
344            return Err(deleted());
345        }
346
347        let guard = if state.is_pending() {
348            let mut contents = self
349                .contents
350                .clone()
351                .try_write_owned()
352                .expect("file contents");
353
354            let (size, entry) = load(&**self.path).await?;
355
356            self.cache.bump(&self.path, Some(size));
357
358            *state = FileLockState::Read(size);
359            *contents = Some(entry);
360
361            contents.downgrade()
362        } else {
363            self.cache.bump(&self.path, None);
364            self.contents.clone().read_owned().await
365        };
366
367        read_type_owned(guard)
368    }
369
370    pub fn try_read_owned<F>(&self) -> Result<FileReadGuardOwned<FE, F>>
372    where
373        F: FileLoad,
374        FE: AsType<F>,
375    {
376        let state = self.state.try_read().map_err(would_block)?;
377
378        match &*state {
379            FileLockState::Pending => Err(would_block("this file is not in the cache")),
380            FileLockState::Deleted(_sync) => Err(deleted()),
381            FileLockState::Read(_size) | FileLockState::Modified(_size) => {
382                self.cache.bump(&self.path, None);
383                let guard = self
384                    .contents
385                    .clone()
386                    .try_read_owned()
387                    .map_err(would_block)?;
388
389                read_type_owned(guard)
390            }
391        }
392    }
393
394    pub async fn into_read<F>(self) -> Result<FileReadGuardOwned<FE, F>>
396    where
397        F: FileLoad,
398        FE: AsType<F>,
399    {
400        let mut state = self.state.write().await;
401
402        if state.is_deleted() {
403            return Err(deleted());
404        }
405
406        let guard = if state.is_pending() {
407            let mut contents = self.contents.try_write_owned().expect("file contents");
408            let (size, entry) = load(&**self.path).await?;
409
410            self.cache.bump(&self.path, Some(size));
411
412            *state = FileLockState::Read(size);
413            *contents = Some(entry);
414
415            contents.downgrade()
416        } else {
417            self.cache.bump(&self.path, None);
418            self.contents.read_owned().await
419        };
420
421        read_type_owned(guard)
422    }
423
424    pub async fn write<F>(&self) -> Result<FileWriteGuard<F>>
426    where
427        F: FileLoad,
428        FE: AsType<F>,
429    {
430        let mut state = self.state.write().await;
431
432        if state.is_deleted() {
433            return Err(deleted());
434        }
435
436        let guard = if state.is_pending() {
437            let mut contents = self.contents.try_write().expect("file contents");
438            let (size, entry) = load(&**self.path).await?;
439
440            self.cache.bump(&self.path, Some(size));
441
442            *state = FileLockState::Modified(size);
443            *contents = Some(entry);
444
445            self.cache.bump(&self.path, Some(size));
446
447            contents
448        } else {
449            state.upgrade();
450            self.cache.bump(&self.path, None);
451            self.contents.write().await
452        };
453
454        write_type(guard)
455    }
456
457    pub fn try_write<F>(&self) -> Result<FileWriteGuard<F>>
459    where
460        F: FileLoad,
461        FE: AsType<F>,
462    {
463        let mut state = self.state.try_write().map_err(would_block)?;
464
465        if state.is_pending() {
466            Err(would_block("this file is not in the cache"))
467        } else if state.is_deleted() {
468            Err(deleted())
469        } else {
470            state.upgrade();
471            self.cache.bump(&self.path, None);
472            let guard = self.contents.try_write().map_err(would_block)?;
473            write_type(guard)
474        }
475    }
476
477    pub async fn write_owned<F>(&self) -> Result<FileWriteGuardOwned<FE, F>>
479    where
480        F: FileLoad,
481        FE: AsType<F>,
482    {
483        let mut state = self.state.write().await;
484
485        if state.is_deleted() {
486            return Err(deleted());
487        }
488
489        let guard = if state.is_pending() {
490            let mut contents = self
491                .contents
492                .clone()
493                .try_write_owned()
494                .expect("file contents");
495
496            let (size, entry) = load(&**self.path).await?;
497            self.cache.bump(&self.path, Some(size));
498
499            *state = FileLockState::Modified(size);
500            *contents = Some(entry);
501
502            contents
503        } else {
504            state.upgrade();
505            self.cache.bump(&self.path, None);
506            self.contents.clone().write_owned().await
507        };
508
509        write_type_owned(guard)
510    }
511
512    pub fn try_write_owned<F>(&self) -> Result<FileWriteGuardOwned<FE, F>>
514    where
515        FE: AsType<F>,
516    {
517        let mut state = self.state.try_write().map_err(would_block)?;
518
519        if state.is_pending() {
520            Err(would_block("this file is not in the cache"))
521        } else if state.is_deleted() {
522            Err(deleted())
523        } else {
524            state.upgrade();
525            self.cache.bump(&self.path, None);
526
527            let guard = self
528                .contents
529                .clone()
530                .try_write_owned()
531                .map_err(would_block)?;
532
533            write_type_owned(guard)
534        }
535    }
536
537    pub async fn into_write<F>(self) -> Result<FileWriteGuardOwned<FE, F>>
539    where
540        F: FileLoad,
541        FE: AsType<F>,
542    {
543        self.write_owned().await
544    }
545
546    pub fn try_into_write<F>(self) -> Result<FileWriteGuardOwned<FE, F>>
548    where
549        F: FileLoad,
550        FE: AsType<F>,
551    {
552        self.try_write_owned()
553    }
554
555    pub async fn sync(&self) -> Result<()>
557    where
558        FE: FileSave + Clone,
559    {
560        let mut state = self.state.write().await;
561
562        let new_state = match &*state {
563            FileLockState::Pending => FileLockState::Pending,
564            FileLockState::Read(size) => FileLockState::Read(*size),
565            FileLockState::Modified(old_size) => {
566                #[cfg(feature = "logging")]
567                log::trace!("sync modified file {}...", self.path.display());
568
569                let contents = self.contents.read().await;
570                let contents = contents.as_ref().cloned().expect("file");
571
572                let new_size = persist(self.path.clone(), contents).await?;
573
574                self.cache.resize(*old_size, new_size as usize);
575                FileLockState::Read(new_size as usize)
576            }
577            FileLockState::Deleted(needs_sync) => {
578                if *needs_sync {
579                    if self.path.exists() {
580                        delete_file(&self.path).await?;
581                    }
582                }
583
584                FileLockState::Deleted(false)
585            }
586        };
587
588        *state = new_state;
589
590        Ok(())
591    }
592
593    pub(crate) async fn delete(&self, file_only: bool) {
594        let mut file_state = self.state.write().await;
595
596        let size = match &*file_state {
597            FileLockState::Pending => 0,
598            FileLockState::Read(size) => *size,
599            FileLockState::Modified(size) => *size,
600            FileLockState::Deleted(_) => return,
601        };
602
603        self.cache.remove(&self.path, size);
604
605        *file_state = FileLockState::Deleted(file_only);
606    }
607
608    pub(crate) fn evict(self) -> Option<(usize, impl Future<Output = Result<()>> + Send)>
609    where
610        FE: FileSave + Clone + 'static,
611    {
612        let mut state = self.state.try_write_owned().ok()?;
614
615        let (old_size, contents, modified) = match &*state {
616            FileLockState::Pending => {
617                return None;
619            }
620            FileLockState::Read(size) => {
621                let contents = self.contents.try_write_owned().ok()?;
622                (*size, contents, false)
623            }
624            FileLockState::Modified(size) => {
625                let contents = self.contents.try_write_owned().ok()?;
626                (*size, contents, true)
627            }
628            FileLockState::Deleted(_) => unreachable!("evict a deleted file"),
629        };
630
631        let eviction = async move {
632            if modified {
633                let contents = contents.as_ref().cloned().expect("file");
634                persist(self.path.clone(), contents).await?;
635            }
636
637            self.cache.resize(old_size, 0);
638
639            *state = FileLockState::Pending;
640            Ok(())
641        };
642
643        Some((old_size, eviction))
644    }
645}
646
647impl<FE> fmt::Debug for FileLock<FE> {
648    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
649        #[cfg(debug_assertions)]
650        write!(f, "file at {}", self.path.display())?;
651
652        #[cfg(not(debug_assertions))]
653        f.write_str("a file lock")?;
654
655        Ok(())
656    }
657}
658
659async fn load<F: FileLoad, FE: From<F>>(path: &Path) -> Result<(usize, FE)> {
660    let file = match fs::File::open(path).await {
661        Ok(file) => file,
662        Err(cause) if cause.kind() == io::ErrorKind::NotFound => {
663            #[cfg(debug_assertions)]
664            let message = format!("there is no file at {}", path.display());
665
666            #[cfg(not(debug_assertions))]
667            let message = "the requested file is not in cache and does not exist on the filesystem";
668
669            return Err(io::Error::new(io::ErrorKind::NotFound, message));
670        }
671        Err(cause) => return Err(cause),
672    };
673
674    let metadata = file.metadata().await?;
675    let size = match metadata.len().try_into() {
676        Ok(size) => size,
677        _ => {
678            return Err(io::Error::new(
679                io::ErrorKind::OutOfMemory,
680                "this file is too large to load into the cache",
681            ))
682        }
683    };
684
685    let file = F::load(path, file, metadata).await?;
686    let entry = FE::from(file);
687
688    Ok((size, entry))
689}
690
691fn persist<'a, FE: FileSave>(
694    path: Arc<PathBuf>,
695    file: FE,
696) -> impl Future<Output = Result<u64>> + Send {
697    async move {
698        let tmp = if let Some(ext) = path.extension().and_then(|ext| ext.to_str()) {
699            path.with_extension(format!("{}_{}", ext, TMP))
700        } else {
701            path.with_extension(TMP)
702        };
703
704        let size = {
705            let mut tmp_file = if tmp.exists() {
706                fs::OpenOptions::new()
707                    .truncate(true)
708                    .write(true)
709                    .open(tmp.as_path())
710                    .await?
711            } else {
712                let parent = tmp.parent().expect("dir");
713                let mut i = 0;
714                while !parent.exists() {
715                    create_dir(parent).await?;
716                    tokio::time::sleep(tokio::time::Duration::from_millis(i)).await;
717                    i += 1;
718                }
719
720                assert!(parent.exists());
721
722                let tmp_file = fs::File::create(tmp.as_path())
723                    .map_err(|cause| {
724                        io::Error::new(
725                            cause.kind(),
726                            format!("failed to create tmp file: {}", cause),
727                        )
728                    })
729                    .await?;
730
731                tmp_file
732            };
733
734            assert!(tmp.exists());
735            assert!(!tmp.is_dir());
736
737            let size = file
738                .save(&mut tmp_file)
739                .map_err(|cause| {
740                    io::Error::new(cause.kind(), format!("failed to save tmp file: {}", cause))
741                })
742                .await?;
743
744            size
745        };
746
747        tokio::fs::rename(tmp.as_path(), path.as_path())
748            .map_err(|cause| {
749                io::Error::new(
750                    cause.kind(),
751                    format!("failed to rename tmp file: {}", cause),
752                )
753            })
754            .await?;
755
756        Ok(size)
757    }
758}
759
760async fn create_dir(path: &Path) -> Result<()> {
761    if path.exists() {
762        Ok(())
763    } else {
764        match tokio::fs::create_dir_all(path).await {
765            Ok(()) => Ok(()),
766            Err(cause) => {
767                if path.exists() && path.is_dir() {
768                    Ok(())
769                } else {
770                    return Err(io::Error::new(
771                        cause.kind(),
772                        format!("failed to create directory: {}", cause),
773                    ));
774                }
775            }
776        }
777    }
778}
779
780#[inline]
781fn read_type<F, T>(maybe_file: RwLockReadGuard<Option<F>>) -> Result<RwLockReadGuard<T>>
782where
783    F: AsType<T>,
784{
785    match RwLockReadGuard::try_map(maybe_file, |file| file.as_ref().expect("file").as_type()) {
786        Ok(file) => Ok(file),
787        Err(_) => Err(invalid_data(format!(
788            "invalid file type, expected {}",
789            std::any::type_name::<F>()
790        ))),
791    }
792}
793
794#[inline]
795fn read_type_owned<F, T>(
796    maybe_file: OwnedRwLockReadGuard<Option<F>>,
797) -> Result<OwnedRwLockReadGuard<Option<F>, T>>
798where
799    F: AsType<T>,
800{
801    match OwnedRwLockReadGuard::try_map(maybe_file, |file| file.as_ref().expect("file").as_type()) {
802        Ok(file) => Ok(file),
803        Err(_) => Err(invalid_data(format!(
804            "invalid file type, expected {}",
805            std::any::type_name::<F>()
806        ))),
807    }
808}
809
810#[inline]
811fn write_type<F, T>(maybe_file: RwLockWriteGuard<Option<F>>) -> Result<RwLockMappedWriteGuard<T>>
812where
813    F: AsType<T>,
814{
815    match RwLockWriteGuard::try_map(maybe_file, |file| {
816        file.as_mut().expect("file").as_type_mut()
817    }) {
818        Ok(file) => Ok(file),
819        Err(_) => Err(invalid_data(format!(
820            "invalid file type, expected {}",
821            std::any::type_name::<F>()
822        ))),
823    }
824}
825
826#[inline]
827fn write_type_owned<F, T>(
828    maybe_file: OwnedRwLockWriteGuard<Option<F>>,
829) -> Result<OwnedRwLockMappedWriteGuard<Option<F>, T>>
830where
831    F: AsType<T>,
832{
833    match OwnedRwLockWriteGuard::try_map(maybe_file, |file| {
834        file.as_mut().expect("file").as_type_mut()
835    }) {
836        Ok(file) => Ok(file),
837        Err(_) => Err(invalid_data(format!(
838            "invalid file type, expected {}",
839            std::any::type_name::<F>()
840        ))),
841    }
842}
843
844async fn delete_file(path: &Path) -> Result<()> {
845    match fs::remove_file(path).await {
846        Ok(()) => Ok(()),
847        Err(cause) if cause.kind() == io::ErrorKind::NotFound => {
848            Ok(())
850        }
851        Err(cause) => Err(cause),
852    }
853}
854
855#[inline]
856fn deleted() -> io::Error {
857    io::Error::new(io::ErrorKind::NotFound, "this file has been deleted")
858}
859
860#[inline]
861fn invalid_data<E>(cause: E) -> io::Error
862where
863    E: Into<Box<dyn std::error::Error + Send + Sync>>,
864{
865    io::Error::new(io::ErrorKind::InvalidData, cause)
866}
867
868#[inline]
869fn would_block<E>(cause: E) -> io::Error
870where
871    E: Into<Box<dyn std::error::Error + Send + Sync>>,
872{
873    io::Error::new(io::ErrorKind::WouldBlock, cause)
874}