fuser_async/
fuse.rs

1use std::sync::Arc;
2use std::time::{Duration, SystemTime};
3
4use fuser::TimeOrNow;
5use tokio::sync::RwLock;
6use tracing::*;
7
8use super::{error::Error, Filesystem};
9
10const TTL: Duration = Duration::from_secs(5);
11
12/// Wrapper around a [`Filesystem`], implementing [`fuser::Filesystem`].
13pub struct FilesystemFUSE<T> {
14    inner: Arc<RwLock<T>>,
15    handle: tokio::runtime::Handle,
16}
17impl<T> Clone for FilesystemFUSE<T> {
18    fn clone(&self) -> Self {
19        Self {
20            inner: self.inner.clone(),
21            handle: self.handle.clone(),
22        }
23    }
24}
25
26impl<T: Filesystem + Send + Sync> FilesystemFUSE<T> {
27    /// Create from a [`Filesystem`].
28    pub fn new(inner: T) -> Self {
29        Self {
30            inner: Arc::new(RwLock::new(inner)),
31            handle: tokio::runtime::Handle::current(),
32        }
33    }
34    /// Return a read lock guard on the inner [`Filesystem`].
35    pub async fn inner(&self) -> tokio::sync::RwLockReadGuard<T> {
36        let inner = &*self.inner;
37        inner.read().await
38    }
39    /// Return a write lock guard on the inner [`Filesystem`].
40    pub async fn inner_mut(&self) -> tokio::sync::RwLockWriteGuard<T> {
41        let inner = &*self.inner;
42        inner.write().await
43    }
44    /// Try to return a read-write lock guard on the inner [`Filesystem`].
45    /// Fails if the are already locks (e.g. ongoing filesystem operations).
46    pub async fn try_inner_mut(
47        &self,
48    ) -> Result<tokio::sync::RwLockWriteGuard<T>, tokio::sync::TryLockError> {
49        self.inner.try_write()
50    }
51}
52impl<T: Filesystem + Send + Sync + 'static> fuser::Filesystem for FilesystemFUSE<T>
53where
54    T::Error: std::fmt::Display,
55{
56    fn init(
57        &mut self,
58        _req: &fuser::Request,
59        _config: &mut fuser::KernelConfig,
60    ) -> Result<(), libc::c_int> {
61        Ok(())
62    }
63    fn destroy(&mut self) {
64        debug!("Cleaning up filesystem");
65        let mut inner = self.inner.clone();
66        self.handle.spawn(async move {
67            if let Err(e) = inner.destroy().await {
68                error!("Error while destroying filesystem: {}", e);
69            }
70        });
71    }
72    fn open(&mut self, _req: &fuser::Request<'_>, ino: u64, flags: i32, reply: fuser::ReplyOpen) {
73        debug!(ino, "Opening file handle for inode");
74
75        let inner = self.inner.clone();
76        self.handle.spawn(async move {
77            match inner.open(ino, flags).await {
78                Ok(fh) => {
79                    reply.opened(fh, fuser::consts::FOPEN_KEEP_CACHE);
80                }
81                Err(e) => {
82                    warn!(ino, "Error when opening file: {}", e);
83                    let e: Error = e.into();
84                    reply.error((&e).into());
85                }
86            }
87        });
88    }
89    fn release(
90        &mut self,
91        _req: &fuser::Request<'_>,
92        ino: u64,
93        fh: u64,
94        _flags: i32,
95        _lock_owner: Option<u64>,
96        _flush: bool,
97        reply: fuser::ReplyEmpty,
98    ) {
99        debug!(ino, fh, "Releasing file handle");
100
101        let inner = self.inner.clone();
102        self.handle.spawn(async move {
103            match inner.clone().release(ino, fh).await {
104                Ok(_) => {
105                    reply.ok();
106                }
107                Err(e) => {
108                    warn!(ino, fh, "Error when closing file: {}", e);
109                    let e: Error = e.into();
110                    reply.error((&e).into());
111                }
112            }
113        });
114    }
115    fn lookup(
116        &mut self,
117        _req: &fuser::Request,
118        parent: u64,
119        name: &std::ffi::OsStr,
120        reply: fuser::ReplyEntry,
121    ) {
122        debug!(parent, ?name, "Looking up directory entry");
123        let inner = self.inner.clone();
124        let name = name.to_owned();
125        self.handle.spawn(async move {
126            match inner.lookup(parent, &name).await {
127                Ok(attr) => {
128                    debug!("Sending directory entry response");
129                    reply.entry(&TTL, &attr, 0);
130                }
131                Err(e) => {
132                    debug!(
133                        parent,
134                        "Error when looking up entry {:?} in directory: {}", name, e
135                    );
136                    let e: Error = e.into();
137                    reply.error((&e).into());
138                }
139            }
140        });
141    }
142    fn getattr(&mut self, _req: &fuser::Request, ino: u64, reply: fuser::ReplyAttr) {
143        debug!(ino, "Getting attribute");
144        let inner = self.inner.clone();
145        self.handle.spawn(async move {
146            match inner.getattr(ino).await {
147                Ok(attr) => {
148                    reply.attr(&TTL, &attr);
149                }
150                Err(e) => {
151                    warn!("Error when getting inode attribute: {}", e);
152                    let e: Error = e.into();
153                    reply.error((&e).into());
154                }
155            }
156        });
157    }
158    fn setattr(
159        &mut self,
160        _req: &fuser::Request<'_>,
161        ino: u64,
162        _mode: Option<u32>,
163        _uid: Option<u32>,
164        _gid: Option<u32>,
165        size: Option<u64>,
166        _atime: Option<TimeOrNow>,
167        _mtime: Option<TimeOrNow>,
168        _ctime: Option<SystemTime>,
169        _fh: Option<u64>,
170        _crtime: Option<SystemTime>,
171        _chgtime: Option<SystemTime>,
172        _bkuptime: Option<SystemTime>,
173        _flags: Option<u32>,
174        reply: fuser::ReplyAttr,
175    ) {
176        debug!(ino, "Setting attributes");
177        // TODO: Check for ignored attributes
178        let mut inner = self.inner.clone();
179        self.handle.spawn(async move {
180            match inner.setattr(ino, size).await {
181                Ok(attr) => {
182                    reply.attr(&TTL, &attr);
183                }
184                Err(e) => {
185                    warn!("Error when setting attributes: {}", e);
186                    let e: Error = e.into();
187                    reply.error((&e).into());
188                }
189            };
190        });
191    }
192    fn readdir(
193        &mut self,
194        _req: &fuser::Request,
195        ino: u64,
196        _fh: u64,
197        offset: i64,
198        mut reply: fuser::ReplyDirectory,
199    ) {
200        debug!(ino, "Reading directory");
201        if offset < 0 {
202            reply.error((&Error::InvalidArgument).into());
203            return;
204        }
205        let inner = self.inner.clone();
206        self.handle.spawn(async move {
207            match inner.readdir(ino, offset as u64).await {
208                Ok(it) => {
209                    for (mut i, e) in it.enumerate() {
210                        i += offset as usize;
211                        if reply.add(e.inode, (i + 1) as i64, e.file_type, &e.name) {
212                            break;
213                        }
214                    }
215                    reply.ok();
216                }
217                Err(e) => {
218                    warn!("Error when reading directory: {}", e);
219                    let e: Error = e.into();
220                    reply.error((&e).into());
221                }
222            };
223        });
224    }
225    fn read(
226        &mut self,
227        _req: &fuser::Request,
228        ino: u64,
229        fh: u64,
230        offset: i64,
231        size: u32,
232        _flags: i32,
233        _lock: Option<u64>,
234        reply: fuser::ReplyData,
235    ) {
236        let start = std::time::Instant::now();
237        debug!(ino, fh, "Reading {} bytes from {}", size, offset);
238        let inner = self.inner.clone();
239        self.handle.spawn(async move {
240            if offset < 0 {
241                error!("Negative offsets not supported");
242                reply.error(libc::EINVAL);
243                return;
244            }
245            match inner.read(ino, fh, offset, size).await {
246                Ok(data) => {
247                    debug!(
248                        ino,
249                        fh,
250                        speed_ms_s = data.len() as f64 / 1e6 / start.elapsed().as_secs_f64(),
251                        "Read {} bytes from {}",
252                        data.len(),
253                        offset,
254                    );
255                    reply.data(&data);
256                }
257                Err(e) => {
258                    error!(ino, fh, "Error when reading file: {}", e);
259                    let e: Error = e.into();
260                    reply.error((&e).into());
261                }
262            }
263        });
264    }
265    fn write(
266        &mut self,
267        _req: &fuser::Request,
268        ino: u64,
269        fh: u64,
270        offset: i64,
271        data: &[u8],
272        _write_flags: u32,
273        _flags: i32,
274        _lock_owner: Option<u64>,
275        reply: fuser::ReplyWrite,
276    ) {
277        let start = std::time::Instant::now();
278        debug!(ino, fh, "Writing {} bytes from {}", data.len(), offset);
279        let inner = self.inner.clone();
280        let data = bytes::Bytes::copy_from_slice(data);
281        self.handle.spawn(async move {
282            if offset < 0 {
283                error!("Negative offsets not supported");
284                reply.error(libc::EINVAL);
285                return;
286            }
287            match inner.write(ino, fh, data, offset).await {
288                Ok(written) => {
289                    debug!(
290                        ino,
291                        fh,
292                        speed_ms_s = written as f64 / 1e6 / start.elapsed().as_secs_f64(),
293                        "Wrote {} bytes from {}",
294                        written,
295                        offset,
296                    );
297                    reply.written(written);
298                }
299                Err(e) => {
300                    error!(ino, fh, "Error when writing file: {}", e);
301                    let e: Error = e.into();
302                    reply.error((&e).into());
303                }
304            }
305        });
306    }
307    fn create(
308        &mut self,
309        _req: &fuser::Request,
310        parent: u64,
311        name: &std::ffi::OsStr,
312        mode: u32,
313        umask: u32,
314        flags: i32,
315        reply: fuser::ReplyCreate,
316    ) {
317        debug!("Creating file");
318        let mut inner = self.inner.clone();
319        let name = name.into();
320        self.handle.spawn(async move {
321            match inner.create(parent, name, mode, umask, flags).await {
322                Ok((attr, fh)) => {
323                    reply.created(&TTL, &attr, 0, fh, 0);
324                }
325                Err(e) => {
326                    warn!("Error when creating file: {}", e);
327                    let e: Error = e.into();
328                    reply.error((&e).into());
329                }
330            }
331        });
332    }
333    fn mkdir(
334        &mut self,
335        _req: &fuser::Request,
336        parent: u64,
337        name: &std::ffi::OsStr,
338        _mode: u32,
339        _umask: u32,
340        reply: fuser::ReplyEntry,
341    ) {
342        let mut inner = self.inner.clone();
343        let name = name.into();
344        self.handle.spawn(async move {
345            match inner.mkdir(parent, name).await {
346                Ok(attr) => {
347                    reply.entry(&TTL, &attr, 0);
348                }
349                Err(e) => {
350                    warn!("Error when creating node: {}", e);
351                    let e: Error = e.into();
352                    reply.error((&e).into());
353                }
354            }
355        });
356    }
357}