fuse_backend_rs/transport/fusedev/
linux_session.rs

1// Copyright 2020-2022 Ant Group. All rights reserved.
2//
3// SPDX-License-Identifier: Apache-2.0
4
5//! FUSE session management.
6//!
7//! A FUSE channel is a FUSE request handling context that takes care of handling FUSE requests
8//! sequentially. A FUSE session is a connection from a FUSE mountpoint to a FUSE server daemon.
9//! A FUSE session can have multiple FUSE channels so that FUSE requests are handled in parallel.
10
11use std::fs::{File, OpenOptions};
12use std::ops::Deref;
13use std::os::unix::fs::PermissionsExt;
14use std::os::unix::io::AsRawFd;
15use std::os::unix::net::UnixStream;
16use std::path::{Path, PathBuf};
17use std::sync::{Arc, Mutex};
18
19use crate::transport::fusedev::FuseSessionExt;
20use mio::{Events, Poll, Token, Waker};
21use nix::errno::Errno;
22use nix::fcntl::{fcntl, FcntlArg, FdFlag, OFlag};
23use nix::mount::{mount, umount2, MntFlags, MsFlags};
24use nix::poll::{poll, PollFd, PollFlags};
25use nix::sys::epoll::{epoll_ctl, EpollEvent, EpollFlags, EpollOp};
26use nix::unistd::{getgid, getuid, read};
27
28use super::{
29    super::pagesize,
30    Error::{IoError, SessionFailure},
31    FuseBuf, FuseDevWriter, Reader, Result, FUSE_HEADER_SIZE, FUSE_KERN_BUF_PAGES,
32};
33
34// These follows definition from libfuse.
35const POLL_EVENTS_CAPACITY: usize = 1024;
36
37const FUSE_DEVICE: &str = "/dev/fuse";
38const FUSE_FSTYPE: &str = "fuse";
39const FUSERMOUNT_BIN: &str = "fusermount3";
40
41const EXIT_FUSE_EVENT: Token = Token(0);
42const FUSE_DEV_EVENT: Token = Token(1);
43
44/// A fuse session manager to manage the connection with the in kernel fuse driver.
45pub struct FuseSession {
46    mountpoint: PathBuf,
47    fsname: String,
48    subtype: String,
49    file: Option<File>,
50    // Socket to keep alive / drop for fusermount's auto_unmount.
51    keep_alive: Option<UnixStream>,
52    bufsize: usize,
53    readonly: bool,
54    wakers: Mutex<Vec<Arc<Waker>>>,
55    auto_unmount: bool,
56    allow_other: bool,
57    target_mntns: Option<libc::pid_t>,
58    // fusermount binary, default to fusermount3
59    fusermount: String,
60}
61
62impl FuseSession {
63    /// Create a new fuse session, without mounting/connecting to the in kernel fuse driver.
64    pub fn new(
65        mountpoint: &Path,
66        fsname: &str,
67        subtype: &str,
68        readonly: bool,
69    ) -> Result<FuseSession> {
70        FuseSession::new_with_autounmount(mountpoint, fsname, subtype, readonly, false)
71    }
72
73    /// Create a new fuse session, without mounting/connecting to the in kernel fuse driver.
74    pub fn new_with_autounmount(
75        mountpoint: &Path,
76        fsname: &str,
77        subtype: &str,
78        readonly: bool,
79        auto_unmount: bool,
80    ) -> Result<FuseSession> {
81        let dest = mountpoint
82            .canonicalize()
83            .map_err(|_| SessionFailure(format!("invalid mountpoint {mountpoint:?}")))?;
84        if !dest.is_dir() {
85            return Err(SessionFailure(format!("{dest:?} is not a directory")));
86        }
87
88        Ok(FuseSession {
89            mountpoint: dest,
90            fsname: fsname.to_owned(),
91            subtype: subtype.to_owned(),
92            file: None,
93            keep_alive: None,
94            bufsize: FUSE_KERN_BUF_PAGES * pagesize() + FUSE_HEADER_SIZE,
95            readonly,
96            wakers: Mutex::new(Vec::new()),
97            auto_unmount,
98            target_mntns: None,
99            fusermount: FUSERMOUNT_BIN.to_string(),
100            allow_other: true,
101        })
102    }
103
104    /// Set the target pid of mount namespace of the fuse session mount, the fuse will be mounted
105    /// under the given mnt ns.
106    pub fn set_target_mntns(&mut self, pid: Option<libc::pid_t>) {
107        self.target_mntns = pid;
108    }
109
110    /// Set fusermount binary, default to fusermount3.
111    pub fn set_fusermount(&mut self, bin: &str) {
112        self.fusermount = bin.to_string();
113    }
114
115    /// Set the allow_other mount option. This allows other users than the one mounting the
116    /// filesystem to access the filesystem. However, this option is usually restricted to the root
117    /// user unless configured otherwise.
118    pub fn set_allow_other(&mut self, allow_other: bool) {
119        self.allow_other = allow_other;
120    }
121
122    /// Get current fusermount binary.
123    pub fn get_fusermount(&self) -> &str {
124        self.fusermount.as_str()
125    }
126
127    /// Expose the associated FUSE session file.
128    pub fn get_fuse_file(&self) -> Option<&File> {
129        self.file.as_ref()
130    }
131
132    /// Force setting the associated FUSE session file.
133    pub fn set_fuse_file(&mut self, file: File) {
134        self.file = Some(file);
135    }
136
137    /// Clone fuse file using ioctl FUSE_DEV_IOC_CLONE.
138    pub fn clone_fuse_file(&self) -> Result<File> {
139        let mut old_fd = self
140            .file
141            .as_ref()
142            .ok_or(SessionFailure(
143                "fuse session file doesn't exist".to_string(),
144            ))?
145            .as_raw_fd();
146
147        let cloned_file = OpenOptions::new()
148            .create(false)
149            .read(true)
150            .write(true)
151            .open(FUSE_DEVICE)
152            .map_err(|e| SessionFailure(format!("open {FUSE_DEVICE}: {e}")))?;
153
154        // define the function which invokes "ioctl FUSE_DEV_IOC_CLONE"
155        // refer: https://github.com/torvalds/linux/blob/c42d9eeef8e5ba9292eda36fd8e3c11f35ee065c/include/uapi/linux/fuse.h#L1051-L1052
156        // #define FUSE_DEV_IOC_MAGIC   229
157        // #define FUSE_DEV_IOC_CLONE   _IOR(FUSE_DEV_IOC_MAGIC, 0, uint32_t)
158        nix::ioctl_read!(clone_fuse_fd, 229, 0, i32);
159
160        unsafe { clone_fuse_fd(cloned_file.as_raw_fd(), (&mut old_fd) as *mut i32) }
161            .map_err(|e| SessionFailure(format!("failed to clone fuse file: {:?}", e)))?;
162
163        Ok(cloned_file)
164    }
165
166    /// Get the mountpoint of the session.
167    pub fn mountpoint(&self) -> &Path {
168        &self.mountpoint
169    }
170
171    /// Get the file system name of the session.
172    pub fn fsname(&self) -> &str {
173        &self.fsname
174    }
175
176    /// Get the subtype of the session.
177    pub fn subtype(&self) -> &str {
178        &self.subtype
179    }
180
181    /// Get the default buffer size of the session.
182    pub fn bufsize(&self) -> usize {
183        self.bufsize
184    }
185
186    /// Mount the fuse mountpoint, building connection with the in kernel fuse driver.
187    pub fn mount(&mut self) -> Result<()> {
188        let mut flags = MsFlags::MS_NOSUID | MsFlags::MS_NODEV | MsFlags::MS_NOATIME;
189        if self.readonly {
190            flags |= MsFlags::MS_RDONLY;
191        }
192        let (file, socket) = fuse_kern_mount(
193            &self.mountpoint,
194            &self.fsname,
195            &self.subtype,
196            flags,
197            self.auto_unmount,
198            self.allow_other,
199            self.target_mntns,
200            &self.fusermount,
201        )?;
202
203        fcntl(file.as_raw_fd(), FcntlArg::F_SETFL(OFlag::O_NONBLOCK))
204            .map_err(|e| SessionFailure(format!("set fd nonblocking: {e}")))?;
205        self.file = Some(file);
206        self.keep_alive = socket;
207
208        Ok(())
209    }
210
211    /// Destroy a fuse session.
212    pub fn umount(&mut self) -> Result<()> {
213        // If we have a keep_alive socket, just drop it,
214        // and let fusermount do the unmount.
215        if let (None, Some(file)) = (self.keep_alive.take(), self.file.take()) {
216            if let Some(mountpoint) = self.mountpoint.to_str() {
217                fuse_kern_umount(mountpoint, file, self.fusermount.as_str())
218            } else {
219                Err(SessionFailure("invalid mountpoint".to_string()))
220            }
221        } else {
222            Ok(())
223        }
224    }
225
226    /// Create a new fuse message channel.
227    pub fn new_channel(&self) -> Result<FuseChannel> {
228        if let Some(file) = &self.file {
229            let file = file
230                .try_clone()
231                .map_err(|e| SessionFailure(format!("dup fd: {e}")))?;
232            let channel = FuseChannel::new(file, self.bufsize)?;
233            let waker = channel.get_waker();
234            self.add_waker(waker)?;
235
236            Ok(channel)
237        } else {
238            Err(SessionFailure("invalid fuse session".to_string()))
239        }
240    }
241
242    /// Wake channel loop and exit
243    pub fn wake(&self) -> Result<()> {
244        let wakers = self
245            .wakers
246            .lock()
247            .map_err(|e| SessionFailure(format!("lock wakers: {e}")))?;
248        for waker in wakers.iter() {
249            waker
250                .wake()
251                .map_err(|e| SessionFailure(format!("wake channel: {e}")))?;
252        }
253        Ok(())
254    }
255
256    fn add_waker(&self, waker: Arc<Waker>) -> Result<()> {
257        let mut wakers = self
258            .wakers
259            .lock()
260            .map_err(|e| SessionFailure(format!("lock wakers: {e}")))?;
261        wakers.push(waker);
262        Ok(())
263    }
264}
265
266impl Drop for FuseSession {
267    fn drop(&mut self) {
268        let _ = self.umount();
269    }
270}
271
272impl FuseSessionExt for FuseSession {
273    fn file(&self) -> Option<&File> {
274        self.file.as_ref()
275    }
276
277    fn bufsize(&self) -> usize {
278        self.bufsize
279    }
280}
281
282/// A fuse channel abstraction.
283///
284/// Each session can hold multiple channels.
285pub struct FuseChannel {
286    file: File,
287    poll: Poll,
288    waker: Arc<Waker>,
289    buf: Vec<u8>,
290}
291
292impl FuseChannel {
293    fn new(file: File, bufsize: usize) -> Result<Self> {
294        let poll = Poll::new().map_err(|e| SessionFailure(format!("epoll create: {e}")))?;
295        let waker = Waker::new(poll.registry(), EXIT_FUSE_EVENT)
296            .map_err(|e| SessionFailure(format!("epoll register session fd: {e}")))?;
297        let waker = Arc::new(waker);
298
299        // mio default add EPOLLET to event flags, so epoll will use edge-triggered mode.
300        // It may let poll miss some event, so manually register the fd with only EPOLLIN flag
301        // to use level-triggered mode.
302        let epoll = poll.as_raw_fd();
303        let mut event = EpollEvent::new(EpollFlags::EPOLLIN, usize::from(FUSE_DEV_EVENT) as u64);
304        epoll_ctl(
305            epoll,
306            EpollOp::EpollCtlAdd,
307            file.as_raw_fd(),
308            Some(&mut event),
309        )
310        .map_err(|e| SessionFailure(format!("epoll register channel fd: {e}")))?;
311
312        Ok(FuseChannel {
313            file,
314            poll,
315            waker,
316            buf: vec![0x0u8; bufsize],
317        })
318    }
319
320    fn get_waker(&self) -> Arc<Waker> {
321        self.waker.clone()
322    }
323
324    /// Get next available FUSE request from the underlying fuse device file.
325    ///
326    /// Returns:
327    /// - Ok(None): signal has pending on the exiting event channel
328    /// - Ok(Some((reader, writer))): reader to receive request and writer to send reply
329    /// - Err(e): error message
330    pub fn get_request(&mut self) -> Result<Option<(Reader<'_>, FuseDevWriter<'_>)>> {
331        let mut events = Events::with_capacity(POLL_EVENTS_CAPACITY);
332        let mut need_exit = false;
333        loop {
334            let mut fusereq_available = false;
335            match self.poll.poll(&mut events, None) {
336                Ok(_) => {}
337                Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => continue,
338                Err(e) => return Err(SessionFailure(format!("epoll wait: {e}"))),
339            }
340
341            for event in events.iter() {
342                if event.is_readable() {
343                    match event.token() {
344                        EXIT_FUSE_EVENT => need_exit = true,
345                        FUSE_DEV_EVENT => fusereq_available = true,
346                        x => {
347                            error!("unexpected epoll event");
348                            return Err(SessionFailure(format!("unexpected epoll event: {}", x.0)));
349                        }
350                    }
351                } else if event.is_error() {
352                    info!("FUSE channel already closed!");
353                    return Err(SessionFailure("epoll error".to_string()));
354                } else {
355                    // We should not step into this branch as other event is not registered.
356                    panic!("unknown epoll result events");
357                }
358            }
359
360            // Handle wake up event first. We don't read the event fd so that a LEVEL triggered
361            // event can still be delivered to other threads/daemons.
362            if need_exit {
363                info!("Will exit from fuse service");
364                return Ok(None);
365            }
366            if fusereq_available {
367                let fd = self.file.as_raw_fd();
368                match read(fd, &mut self.buf) {
369                    Ok(len) => {
370                        // ###############################################
371                        // Note: it's a heavy hack to reuse the same underlying data
372                        // buffer for both Reader and Writer, in order to reduce memory
373                        // consumption. Here we assume Reader won't be used anymore once
374                        // we start to write to the Writer. To get rid of this hack,
375                        // just allocate a dedicated data buffer for Writer.
376                        let buf = unsafe {
377                            std::slice::from_raw_parts_mut(self.buf.as_mut_ptr(), self.buf.len())
378                        };
379                        // Reader::new() and Writer::new() should always return success.
380                        let reader =
381                            Reader::from_fuse_buffer(FuseBuf::new(&mut self.buf[..len])).unwrap();
382                        let writer = FuseDevWriter::new(fd, buf).unwrap();
383                        return Ok(Some((reader, writer)));
384                    }
385                    Err(e) => match e {
386                        Errno::ENOENT => {
387                            // ENOENT means the operation was interrupted, it's safe to restart
388                            trace!("restart reading due to ENOENT");
389                            continue;
390                        }
391                        Errno::EAGAIN => {
392                            trace!("restart reading due to EAGAIN");
393                            continue;
394                        }
395                        Errno::EINTR => {
396                            trace!("syscall interrupted");
397                            continue;
398                        }
399                        Errno::ENODEV => {
400                            info!("fuse filesystem umounted");
401                            return Ok(None);
402                        }
403                        e => {
404                            warn! {"read fuse dev failed on fd {}: {}", fd, e};
405                            return Err(SessionFailure(format!("read new request: {e:?}")));
406                        }
407                    },
408                }
409            }
410        }
411    }
412}
413
414/// Mount a fuse file system
415#[allow(clippy::too_many_arguments)]
416fn fuse_kern_mount(
417    mountpoint: &Path,
418    fsname: &str,
419    subtype: &str,
420    flags: MsFlags,
421    auto_unmount: bool,
422    allow_other: bool,
423    target_mntns: Option<libc::pid_t>,
424    fusermount: &str,
425) -> Result<(File, Option<UnixStream>)> {
426    let file = OpenOptions::new()
427        .create(false)
428        .read(true)
429        .write(true)
430        .open(FUSE_DEVICE)
431        .map_err(|e| SessionFailure(format!("open {FUSE_DEVICE}: {e}")))?;
432    let meta = mountpoint
433        .metadata()
434        .map_err(|e| SessionFailure(format!("stat {mountpoint:?}: {e}")))?;
435    // the current implementation of fuse-backend-rs uses a fixed buffer to store the fuse response,
436    // the default value of this buffer is as follows, but in fact, the kernel in the direct io path,
437    // the size of the request may be larger than the length of this buffer (this is determined by
438    // the max_read option to determine the maximum size of kernel requests, the default value is
439    // a very large number), which leads to the buffer is not enough to fill the read content,
440    // resulting in read failure. so here we limit the size of max_read to the length of our buffer,
441    // so that the fuse kernel will not send requests that exceed the length of the buffer.
442    // in virtiofs scene max_read can't be adjusted, his default is UINT_MAX, but we don't have to
443    // worry about it, because the buffer is allocated by the kernel driver, we just use this buffer
444    // to fill the response, so we don't need to do any adjustment.
445    let max_read = FUSE_KERN_BUF_PAGES * pagesize() + FUSE_HEADER_SIZE;
446
447    let mut opts = format!(
448        "default_permissions,fd={},rootmode={:o},user_id={},group_id={},max_read={}",
449        file.as_raw_fd(),
450        meta.permissions().mode() & libc::S_IFMT,
451        getuid(),
452        getgid(),
453        max_read
454    );
455    if allow_other {
456        opts.push_str(",allow_other");
457    }
458    let mut fstype = String::from(FUSE_FSTYPE);
459    if !subtype.is_empty() {
460        fstype.push('.');
461        fstype.push_str(subtype);
462    }
463
464    if let Some(mountpoint) = mountpoint.to_str() {
465        info!(
466            "mount source {} dest {} with fstype {} opts {} fd {}",
467            fsname,
468            mountpoint,
469            fstype,
470            opts,
471            file.as_raw_fd(),
472        );
473    }
474
475    // mount in another mntns requires mounting with fusermount, which is a new process, as
476    // multithreaded program is not allowed to join to another mntns, and the process running fuse
477    // session might be multithreaded.
478    if auto_unmount || target_mntns.is_some() {
479        fuse_fusermount_mount(
480            mountpoint,
481            fsname,
482            subtype,
483            opts,
484            flags,
485            auto_unmount,
486            target_mntns,
487            fusermount,
488        )
489    } else {
490        match mount(
491            Some(fsname),
492            mountpoint,
493            Some(fstype.deref()),
494            flags,
495            Some(opts.deref()),
496        ) {
497            Ok(()) => Ok((file, None)),
498            Err(Errno::EPERM) => fuse_fusermount_mount(
499                mountpoint,
500                fsname,
501                subtype,
502                opts,
503                flags,
504                auto_unmount,
505                target_mntns,
506                fusermount,
507            ),
508            Err(e) => Err(SessionFailure(format!(
509                "failed to mount {mountpoint:?}: {e}"
510            ))),
511        }
512    }
513}
514
515fn msflags_to_string(flags: MsFlags) -> String {
516    [
517        (MsFlags::MS_RDONLY, ("rw", "ro")),
518        (MsFlags::MS_NOSUID, ("suid", "nosuid")),
519        (MsFlags::MS_NODEV, ("dev", "nodev")),
520        (MsFlags::MS_NOEXEC, ("exec", "noexec")),
521        (MsFlags::MS_SYNCHRONOUS, ("async", "sync")),
522        (MsFlags::MS_NOATIME, ("atime", "noatime")),
523    ]
524    .map(
525        |(flag, (neg, pos))| {
526            if flags.contains(flag) {
527                pos
528            } else {
529                neg
530            }
531        },
532    )
533    .join(",")
534}
535
536/// Mount a fuse file system with fusermount
537#[allow(clippy::too_many_arguments)]
538fn fuse_fusermount_mount(
539    mountpoint: &Path,
540    fsname: &str,
541    subtype: &str,
542    opts: String,
543    flags: MsFlags,
544    auto_unmount: bool,
545    target_mntns: Option<libc::pid_t>,
546    fusermount: &str,
547) -> Result<(File, Option<UnixStream>)> {
548    let mut opts = vec![format!("fsname={fsname}"), opts, msflags_to_string(flags)];
549    if !subtype.is_empty() {
550        opts.push(format!("subtype={subtype}"));
551    }
552    if auto_unmount {
553        opts.push("auto_unmount".to_owned());
554    }
555    let opts = opts.join(",");
556
557    let (send, recv) = UnixStream::pair().unwrap();
558
559    // Keep the sending socket around after exec to pass to fusermount.
560    // When its partner recv closes, fusermount will unmount.
561    // Remove the close-on-exec flag from the socket, so we can pass it to
562    // fusermount.
563    fcntl(send.as_raw_fd(), FcntlArg::F_SETFD(FdFlag::empty()))
564        .map_err(|e| SessionFailure(format!("Failed to remove close-on-exec flag: {e}")))?;
565
566    let mut cmd = match target_mntns {
567        Some(pid) => {
568            let mut c = std::process::Command::new("nsenter");
569            c.arg("-t")
570                .arg(format!("{}", pid))
571                .arg("-m")
572                .arg(fusermount);
573            c
574        }
575        None => std::process::Command::new(fusermount),
576    };
577    // Old version of fusermount doesn't support long --options, yet.
578    let mut proc = cmd
579        .env("_FUSE_COMMFD", format!("{}", send.as_raw_fd()))
580        .arg("-o")
581        .arg(opts)
582        .arg("--")
583        .arg(mountpoint)
584        .spawn()
585        .map_err(IoError)?;
586
587    if auto_unmount {
588        std::thread::spawn(move || {
589            let _ = proc.wait();
590        });
591    } else {
592        match proc.wait().map_err(IoError)?.code() {
593            Some(0) => {}
594            exit_code => {
595                return Err(SessionFailure(format!(
596                    "Unexpected exit code when running fusermount: {exit_code:?}"
597                )))
598            }
599        }
600    }
601    drop(send);
602
603    match vmm_sys_util::sock_ctrl_msg::ScmSocket::recv_with_fd(&recv, &mut [0u8; 8]).map_err(
604        |e| {
605            SessionFailure(format!(
606                "Unexpected error when receiving fuse file descriptor from fusermount: {}",
607                e
608            ))
609        },
610    )? {
611        (_recv_bytes, Some(file)) => Ok((file, if auto_unmount { Some(recv) } else { None })),
612        (recv_bytes, None) => Err(SessionFailure(format!(
613            "fusermount did not send a file descriptor.  We received {recv_bytes} bytes."
614        ))),
615    }
616}
617
618/// Umount a fuse file system
619fn fuse_kern_umount(mountpoint: &str, file: File, fusermount: &str) -> Result<()> {
620    let mut fds = [PollFd::new(file.as_raw_fd(), PollFlags::empty())];
621
622    if poll(&mut fds, 0).is_ok() {
623        // POLLERR means the file system is already umounted,
624        // or the connection has been aborted via /sys/fs/fuse/connections/NNN/abort
625        if let Some(event) = fds[0].revents() {
626            if event == PollFlags::POLLERR {
627                return Ok(());
628            }
629        }
630    }
631
632    // Drop to close fuse session fd, otherwise synchronous umount can recurse into filesystem and
633    // cause deadlock.
634    drop(file);
635    match umount2(mountpoint, MntFlags::MNT_DETACH) {
636        Ok(()) => Ok(()),
637        Err(Errno::EPERM) => fuse_fusermount_umount(mountpoint, fusermount),
638        Err(e) => Err(SessionFailure(format!(
639            "failed to umount {mountpoint}: {e}"
640        ))),
641    }
642}
643
644/// Umount a fuse file system by fusermount helper
645fn fuse_fusermount_umount(mountpoint: &str, fusermount: &str) -> Result<()> {
646    match std::process::Command::new(fusermount)
647        .arg("--unmount")
648        .arg("--quiet")
649        .arg("--lazy")
650        .arg("--")
651        .arg(mountpoint)
652        .status()
653        .map_err(IoError)?
654        .code()
655    {
656        Some(0) => Ok(()),
657        exit_code => Err(SessionFailure(format!(
658            "Unexpected exit code when unmounting via running fusermount: {exit_code:?}"
659        ))),
660    }
661}
662
663#[cfg(test)]
664mod tests {
665    use super::*;
666    use std::fs::File;
667    use std::os::unix::io::FromRawFd;
668    use std::path::Path;
669    use vmm_sys_util::tempdir::TempDir;
670
671    #[test]
672    fn test_new_session() {
673        let se = FuseSession::new(Path::new("haha"), "foo", "bar", true);
674        assert!(se.is_err());
675
676        let dir = TempDir::new().unwrap();
677        let se = FuseSession::new(dir.as_path(), "foo", "bar", false);
678        assert!(se.is_ok());
679    }
680
681    #[test]
682    fn test_new_channel() {
683        let fd = nix::unistd::dup(std::io::stdout().as_raw_fd()).unwrap();
684        let file = unsafe { File::from_raw_fd(fd) };
685        let _ = FuseChannel::new(file, 3).unwrap();
686    }
687
688    #[test]
689    fn test_fusermount() {
690        let dir = TempDir::new().unwrap();
691        let se = FuseSession::new(dir.as_path(), "foo", "bar", true);
692        assert!(se.is_ok());
693        let mut se = se.unwrap();
694        assert_eq!(se.get_fusermount(), FUSERMOUNT_BIN);
695
696        se.set_fusermount("fusermount");
697        assert_eq!(se.get_fusermount(), "fusermount");
698    }
699
700    #[test]
701    fn test_clone_fuse_file() {
702        let dir = TempDir::new().unwrap();
703        let mut se = FuseSession::new(dir.as_path(), "foo", "bar", true).unwrap();
704        se.mount().unwrap();
705
706        let cloned_file = se.clone_fuse_file().unwrap();
707        assert!(cloned_file.as_raw_fd() > 0);
708
709        se.umount().unwrap();
710        se.set_fuse_file(cloned_file);
711        se.mount().unwrap();
712    }
713}
714
715#[cfg(feature = "async_io")]
716pub use asyncio::FuseDevTask;
717
718#[cfg(feature = "async_io")]
719/// Task context to handle fuse request in asynchronous mode.
720mod asyncio {
721    use std::os::unix::io::RawFd;
722    use std::sync::Arc;
723
724    use crate::api::filesystem::AsyncFileSystem;
725    use crate::api::server::Server;
726    use crate::transport::{FuseBuf, Reader, Writer};
727
728    /// Task context to handle fuse request in asynchronous mode.
729    ///
730    /// This structure provides a context to handle fuse request in asynchronous mode, including
731    /// the fuse fd, a internal buffer and a `Server` instance to serve requests.
732    ///
733    /// ## Examples
734    /// ```ignore
735    /// let buf_size = 0x1_0000;
736    /// let state = AsyncExecutorState::new();
737    /// let mut task = FuseDevTask::new(buf_size, fuse_dev_fd, fs_server, state.clone());
738    ///
739    /// // Run the task
740    /// executor.spawn(async move { task.poll_handler().await });
741    ///
742    /// // Stop the task
743    /// state.quiesce();
744    /// ```
745    pub struct FuseDevTask<F: AsyncFileSystem + Sync> {
746        fd: RawFd,
747        buf: Vec<u8>,
748        state: AsyncExecutorState,
749        server: Arc<Server<F>>,
750    }
751
752    impl<F: AsyncFileSystem + Sync> FuseDevTask<F> {
753        /// Create a new fuse task context for asynchronous IO.
754        ///
755        /// # Parameters
756        /// - buf_size: size of buffer to receive requests from/send reply to the fuse fd
757        /// - fd: fuse device file descriptor
758        /// - server: `Server` instance to serve requests from the fuse fd
759        /// - state: shared state object to control the task object
760        ///
761        /// # Safety
762        /// The caller must ensure `fd` is valid during the lifetime of the returned task object.
763        pub fn new(
764            buf_size: usize,
765            fd: RawFd,
766            server: Arc<Server<F>>,
767            state: AsyncExecutorState,
768        ) -> Self {
769            FuseDevTask {
770                fd,
771                server,
772                state,
773                buf: vec![0x0u8; buf_size],
774            }
775        }
776
777        /// Handler to process fuse requests in asynchronous mode.
778        ///
779        /// An async fn to handle requests from the fuse fd. It works in asynchronous IO mode when:
780        /// - receiving request from fuse fd
781        /// - handling requests by calling Server::async_handle_requests()
782        /// - sending reply to fuse fd
783        ///
784        /// The async fn repeatedly return Poll::Pending when polled until the state has been set
785        /// to quiesce mode.
786        pub async fn poll_handler(&mut self) {
787            // TODO: register self.buf as io uring buffers.
788            let drive = AsyncDriver::default();
789
790            while !self.state.quiescing() {
791                let result = AsyncUtil::read(drive.clone(), self.fd, &mut self.buf, 0).await;
792                match result {
793                    Ok(len) => {
794                        // ###############################################
795                        // Note: it's a heavy hack to reuse the same underlying data
796                        // buffer for both Reader and Writer, in order to reduce memory
797                        // consumption. Here we assume Reader won't be used anymore once
798                        // we start to write to the Writer. To get rid of this hack,
799                        // just allocate a dedicated data buffer for Writer.
800                        let buf = unsafe {
801                            std::slice::from_raw_parts_mut(self.buf.as_mut_ptr(), self.buf.len())
802                        };
803                        // Reader::new() and Writer::new() should always return success.
804                        let reader =
805                            Reader::<()>::new(FuseBuf::new(&mut self.buf[0..len])).unwrap();
806                        let writer = Writer::new(self.fd, buf).unwrap();
807                        let result = unsafe {
808                            self.server
809                                .async_handle_message(drive.clone(), reader, writer, None, None)
810                                .await
811                        };
812
813                        if let Err(e) = result {
814                            // TODO: error handling
815                            error!("failed to handle fuse request, {}", e);
816                        }
817                    }
818                    Err(e) => {
819                        // TODO: error handling
820                        error!("failed to read request from fuse device fd, {}", e);
821                    }
822                }
823            }
824
825            // TODO: unregister self.buf as io uring buffers.
826
827            // Report that the task has been quiesced.
828            self.state.report();
829        }
830    }
831
832    impl<F: AsyncFileSystem + Sync> Clone for FuseDevTask<F> {
833        fn clone(&self) -> Self {
834            FuseDevTask {
835                fd: self.fd,
836                server: self.server.clone(),
837                state: self.state.clone(),
838                buf: vec![0x0u8; self.buf.capacity()],
839            }
840        }
841    }
842
843    #[cfg(test)]
844    mod tests {
845        use std::os::unix::io::AsRawFd;
846
847        use super::*;
848        use crate::api::{Vfs, VfsOptions};
849        use crate::async_util::{AsyncDriver, AsyncExecutor};
850
851        #[test]
852        fn test_fuse_task() {
853            let state = AsyncExecutorState::new();
854            let fs = Vfs::<AsyncDriver, ()>::new(VfsOptions::default());
855            let _server = Arc::new(Server::<Vfs<AsyncDriver, ()>, AsyncDriver, ()>::new(fs));
856            let file = vmm_sys_util::tempfile::TempFile::new().unwrap();
857            let _fd = file.as_file().as_raw_fd();
858
859            let mut executor = AsyncExecutor::new(32);
860            executor.setup().unwrap();
861
862            /*
863            // Create three tasks, which could handle three concurrent fuse requests.
864            let mut task = FuseDevTask::new(0x1000, fd, server.clone(), state.clone());
865            executor
866                .spawn(async move { task.poll_handler().await })
867                .unwrap();
868            let mut task = FuseDevTask::new(0x1000, fd, server.clone(), state.clone());
869            executor
870                .spawn(async move { task.poll_handler().await })
871                .unwrap();
872            let mut task = FuseDevTask::new(0x1000, fd, server.clone(), state.clone());
873            executor
874                .spawn(async move { task.poll_handler().await })
875                .unwrap();
876             */
877
878            for _i in 0..10 {
879                executor.run_once(false).unwrap();
880            }
881
882            // Set existing flag
883            state.quiesce();
884            // Close the fusedev fd, so all pending async io requests will be aborted.
885            drop(file);
886
887            for _i in 0..10 {
888                executor.run_once(false).unwrap();
889            }
890        }
891    }
892}