1use std::path::{Path, PathBuf};
3use std::sync::Arc;
4
5use fuser_async::{FileHandle, FilesystemSSUS};
6use tokio::io::{AsyncSeekExt, BufReader};
7#[cfg(feature = "asyncfs")]
8use tokio_util::compat::{Compat, FuturesAsyncReadCompatExt};
9
10use crate::Error;
11
12#[derive(Clone, Debug, clap::ArgEnum)]
14pub enum LocalBackend {
15 Tokio,
16 #[cfg(feature = "asyncfs")]
17 AsyncFs,
18 #[cfg(feature = "memmap")]
19 MemMap,
20}
21
22pub trait LocalReadersPool: Sized {
24 fn new(path: &Path) -> Result<Self, Error>;
25}
26
27#[cfg(feature = "asyncfs")]
28pub struct LocalReadersPoolAsyncFs {
33 pub path: PathBuf,
34}
35#[async_trait::async_trait]
36#[cfg(feature = "asyncfs")]
37impl deadpool::managed::Manager for LocalReadersPoolAsyncFs {
38 type Type = BufReader<Compat<async_fs::File>>;
39 type Error = std::io::Error;
40
41 async fn create(&self) -> Result<Self::Type, Self::Error> {
42 Ok(BufReader::new(
43 async_fs::File::open(&self.path).await?.compat(),
44 ))
45 }
46 async fn recycle(&self, f: &mut Self::Type) -> deadpool::managed::RecycleResult<Self::Error> {
47 f.seek(std::io::SeekFrom::Start(0)).await?;
48 Ok(())
49 }
50}
51#[cfg(feature = "asyncfs")]
52impl LocalReadersPool for LocalReadersPoolAsyncFs {
53 fn new(path: &Path) -> Result<Self, Error> {
54 Ok(Self { path: path.into() })
55 }
56}
57
58pub struct LocalReadersPoolTokio {
60 pub path: PathBuf,
61}
62#[async_trait::async_trait]
63impl deadpool::managed::Manager for LocalReadersPoolTokio {
64 type Type = BufReader<tokio::fs::File>;
65 type Error = std::io::Error;
66
67 async fn create(&self) -> Result<Self::Type, Self::Error> {
68 Ok(BufReader::new(tokio::fs::File::open(&self.path).await?))
69 }
70 async fn recycle(&self, f: &mut Self::Type) -> deadpool::managed::RecycleResult<Self::Error> {
71 f.seek(std::io::SeekFrom::Start(0)).await?;
72 Ok(())
73 }
74}
75impl LocalReadersPool for LocalReadersPoolTokio {
76 fn new(path: &Path) -> Result<Self, Error> {
77 Ok(Self { path: path.into() })
78 }
79}
80
81#[cfg(feature = "memmap")]
82pub struct LocalReadersPoolMemMap {
84 pub path: PathBuf,
85 data: MemMapArc,
86}
87#[derive(Clone)]
89pub struct MemMapArc(Arc<memmap2::Mmap>);
90impl AsRef<[u8]> for MemMapArc {
91 fn as_ref(&self) -> &[u8] {
92 self.0.as_ref()
93 }
94}
95
96#[async_trait::async_trait]
97#[cfg(feature = "memmap")]
98impl deadpool::managed::Manager for LocalReadersPoolMemMap {
99 type Type = std::io::Cursor<MemMapArc>;
100 type Error = std::io::Error;
101
102 async fn create(&self) -> Result<Self::Type, Self::Error> {
103 Ok(std::io::Cursor::new(self.data.clone()))
104 }
105 async fn recycle(&self, _f: &mut Self::Type) -> deadpool::managed::RecycleResult<Self::Error> {
106 Ok(())
107 }
108}
109#[cfg(feature = "memmap")]
110impl LocalReadersPool for LocalReadersPoolMemMap {
111 fn new(path: &Path) -> Result<Self, Error> {
112 let file = std::fs::File::open(path).map_err(|_| Error::MemMap)?;
113 let data = unsafe { memmap2::Mmap::map(&file).map_err(|_| Error::MemMap)? };
114 Ok(Self {
115 path: path.into(),
116 data: MemMapArc(Arc::new(data)),
117 })
118 }
119}
120
121pub type ReadFlags = i32;
123
124pub struct FilePool<F: fuser_async::Filesystem>(pub F, pub u64, pub ReadFlags);
126#[async_trait::async_trait]
127impl<F: FilesystemSSUS + Clone> deadpool::managed::Manager for FilePool<F>
128where
129 F::Error: Send + Sync + std::fmt::Display + Into<Box<dyn std::error::Error + Send + Sync>>,
130{
131 type Type = BufReader<FileHandle<F>>;
132 type Error = tokio::io::Error;
133
134 async fn create(&self) -> Result<Self::Type, Self::Error> {
135 let fh = FileHandle::new(self.0.clone(), self.1, self.2)
136 .await
137 .map_err(|e| {
138 let e: Box<dyn std::error::Error + Send + Sync> = e.into();
139 tokio::io::Error::new(tokio::io::ErrorKind::Other, e)
140 })?;
141 let fh = tokio::io::BufReader::with_capacity(128 * 1024, fh);
142 Ok(fh)
143 }
144 async fn recycle(&self, f: &mut Self::Type) -> deadpool::managed::RecycleResult<Self::Error> {
145 f.seek(std::io::SeekFrom::Start(0)).await?;
146 Ok(())
147 }
148}