squashfs_async/
pools.rs

1//! Readers pools, used when reading data blocks.
2use std::path::{Path, PathBuf};
3use std::sync::Arc;
4
5use fuser_async::{FileHandle, FilesystemSSUS};
6use tokio::io::{AsyncSeekExt, BufReader};
7#[cfg(feature = "asyncfs")]
8use tokio_util::compat::{Compat, FuturesAsyncReadCompatExt};
9
10use crate::Error;
11
12/// Enumeration of available local reader pools.
13#[derive(Clone, Debug, clap::ArgEnum)]
14pub enum LocalBackend {
15    Tokio,
16    #[cfg(feature = "asyncfs")]
17    AsyncFs,
18    #[cfg(feature = "memmap")]
19    MemMap,
20}
21
22/// Reader pools for a local backend/filesystem.
23pub trait LocalReadersPool: Sized {
24    fn new(path: &Path) -> Result<Self, Error>;
25}
26
27#[cfg(feature = "asyncfs")]
28/// Local readers (backed by [`async_fs::File`])
29///
30/// This is has difference single- and multi-threaded performance characteristics than
31/// [`LocalReadersPool`].
32pub struct LocalReadersPoolAsyncFs {
33    pub path: PathBuf,
34}
35#[async_trait::async_trait]
36#[cfg(feature = "asyncfs")]
37impl deadpool::managed::Manager for LocalReadersPoolAsyncFs {
38    type Type = BufReader<Compat<async_fs::File>>;
39    type Error = std::io::Error;
40
41    async fn create(&self) -> Result<Self::Type, Self::Error> {
42        Ok(BufReader::new(
43            async_fs::File::open(&self.path).await?.compat(),
44        ))
45    }
46    async fn recycle(&self, f: &mut Self::Type) -> deadpool::managed::RecycleResult<Self::Error> {
47        f.seek(std::io::SeekFrom::Start(0)).await?;
48        Ok(())
49    }
50}
51#[cfg(feature = "asyncfs")]
52impl LocalReadersPool for LocalReadersPoolAsyncFs {
53    fn new(path: &Path) -> Result<Self, Error> {
54        Ok(Self { path: path.into() })
55    }
56}
57
58/// Local readers (backed by [`tokio::fs::File`])
59pub struct LocalReadersPoolTokio {
60    pub path: PathBuf,
61}
62#[async_trait::async_trait]
63impl deadpool::managed::Manager for LocalReadersPoolTokio {
64    type Type = BufReader<tokio::fs::File>;
65    type Error = std::io::Error;
66
67    async fn create(&self) -> Result<Self::Type, Self::Error> {
68        Ok(BufReader::new(tokio::fs::File::open(&self.path).await?))
69    }
70    async fn recycle(&self, f: &mut Self::Type) -> deadpool::managed::RecycleResult<Self::Error> {
71        f.seek(std::io::SeekFrom::Start(0)).await?;
72        Ok(())
73    }
74}
75impl LocalReadersPool for LocalReadersPoolTokio {
76    fn new(path: &Path) -> Result<Self, Error> {
77        Ok(Self { path: path.into() })
78    }
79}
80
81#[cfg(feature = "memmap")]
82/// Memory-mapped local readers (backed by [`memmap2::Mmap`])
83pub struct LocalReadersPoolMemMap {
84    pub path: PathBuf,
85    data: MemMapArc,
86}
87/// [`memmap2::Mmap`] wrapped in an [`Arc`]
88#[derive(Clone)]
89pub struct MemMapArc(Arc<memmap2::Mmap>);
90impl AsRef<[u8]> for MemMapArc {
91    fn as_ref(&self) -> &[u8] {
92        self.0.as_ref()
93    }
94}
95
96#[async_trait::async_trait]
97#[cfg(feature = "memmap")]
98impl deadpool::managed::Manager for LocalReadersPoolMemMap {
99    type Type = std::io::Cursor<MemMapArc>;
100    type Error = std::io::Error;
101
102    async fn create(&self) -> Result<Self::Type, Self::Error> {
103        Ok(std::io::Cursor::new(self.data.clone()))
104    }
105    async fn recycle(&self, _f: &mut Self::Type) -> deadpool::managed::RecycleResult<Self::Error> {
106        Ok(())
107    }
108}
109#[cfg(feature = "memmap")]
110impl LocalReadersPool for LocalReadersPoolMemMap {
111    fn new(path: &Path) -> Result<Self, Error> {
112        let file = std::fs::File::open(path).map_err(|_| Error::MemMap)?;
113        let data = unsafe { memmap2::Mmap::map(&file).map_err(|_| Error::MemMap)? };
114        Ok(Self {
115            path: path.into(),
116            data: MemMapArc(Arc::new(data)),
117        })
118    }
119}
120
121/// Flags for the `open` syscall
122pub type ReadFlags = i32;
123
124/// Readers from [`fuser_async::Filesystem`] file handles.
125pub struct FilePool<F: fuser_async::Filesystem>(pub F, pub u64, pub ReadFlags);
126#[async_trait::async_trait]
127impl<F: FilesystemSSUS + Clone> deadpool::managed::Manager for FilePool<F>
128where
129    F::Error: Send + Sync + std::fmt::Display + Into<Box<dyn std::error::Error + Send + Sync>>,
130{
131    type Type = BufReader<FileHandle<F>>;
132    type Error = tokio::io::Error;
133
134    async fn create(&self) -> Result<Self::Type, Self::Error> {
135        let fh = FileHandle::new(self.0.clone(), self.1, self.2)
136            .await
137            .map_err(|e| {
138                let e: Box<dyn std::error::Error + Send + Sync> = e.into();
139                tokio::io::Error::new(tokio::io::ErrorKind::Other, e)
140            })?;
141        let fh = tokio::io::BufReader::with_capacity(128 * 1024, fh);
142        Ok(fh)
143    }
144    async fn recycle(&self, f: &mut Self::Type) -> deadpool::managed::RecycleResult<Self::Error> {
145        f.seek(std::io::SeekFrom::Start(0)).await?;
146        Ok(())
147    }
148}