Skip to main content

async_fuser/
session.rs

1//! Filesystem session
2//!
3//! A session runs a filesystem implementation while it is being mounted to a specific mount
4//! point. A session begins by mounting the filesystem and ends by unmounting it. While the
5//! filesystem is mounted, the session loop receives, dispatches and replies to kernel requests
6//! for filesystem operations under its mount point.
7
8use std::borrow::Cow;
9use std::fs::File;
10use std::io;
11use std::os::fd::AsFd;
12use std::os::fd::BorrowedFd;
13use std::os::fd::OwnedFd;
14use std::path::Path;
15use std::sync::Arc;
16use std::thread::JoinHandle;
17use std::thread::{self};
18
19use log::debug;
20use log::error;
21use log::info;
22use log::warn;
23use nix::unistd::Uid;
24use nix::unistd::geteuid;
25use parking_lot::Mutex;
26
27use crate::Errno;
28use crate::Filesystem;
29use crate::KernelConfig;
30use crate::MountOption;
31use crate::ReplyEmpty;
32use crate::Request;
33use crate::channel::Channel;
34use crate::channel::ChannelSender;
35use crate::dev_fuse::DevFuse;
36use crate::ll;
37use crate::ll::Operation;
38use crate::ll::ResponseErrno;
39use crate::ll::Version;
40use crate::ll::flags::init_flags::InitFlags;
41use crate::ll::fuse_abi as abi;
42use crate::mnt::Mount;
43use crate::mnt::mount_options::Config;
44use crate::mnt::mount_options::check_option_conflicts;
45use crate::notify::Notifier;
46use crate::read_buf::FuseReadBuf;
47use crate::reply::Reply;
48use crate::reply::ReplyRaw;
49use crate::reply::ReplySender;
50use crate::request::RequestWithSender;
51
52/// The max size of write requests from the kernel. The absolute minimum is 4k,
53/// FUSE recommends at least 128k, max 16M. The FUSE default is 16M on macOS
54/// and 128k on other systems.
55pub(crate) const MAX_WRITE_SIZE: usize = 16 * 1024 * 1024;
56
57#[derive(Default, Debug, Eq, PartialEq, Clone, Copy)]
58/// How requests should be filtered based on the calling UID.
59pub enum SessionACL {
60    /// Allow requests from any user. Corresponds to the `allow_other` mount option.
61    All,
62    /// Allow requests from root. Corresponds to the `allow_root` mount option.
63    RootAndOwner,
64    /// Allow requests from the owning UID. This is FUSE's default mode of operation.
65    #[default]
66    Owner,
67}
68
69impl SessionACL {
70    /// Returns the mount option string for kernel/fusermount/libfuse paths.
71    /// Both `All` and `RootAndOwner` map to `allow_other` - the kernel only
72    /// understands `allow_other`, and fuser enforces the root-only restriction internally.
73    #[allow(dead_code)]
74    pub(crate) fn to_mount_option(self) -> Option<&'static str> {
75        match self {
76            SessionACL::All | SessionACL::RootAndOwner => Some("allow_other"),
77            SessionACL::Owner => None,
78        }
79    }
80}
81
82/// Calls `destroy` on drop.
83#[derive(Debug)]
84pub(crate) struct FilesystemHolder<FS: Filesystem> {
85    pub(crate) fs: Option<FS>,
86}
87
88impl<FS: Filesystem> FilesystemHolder<FS> {
89    fn destroy(&mut self) {
90        if let Some(mut fs) = self.fs.take() {
91            fs.destroy();
92        }
93    }
94}
95
96impl<FS: Filesystem> Drop for FilesystemHolder<FS> {
97    fn drop(&mut self) {
98        self.destroy();
99    }
100}
101
102#[derive(Debug)]
103struct UmountOnDrop {
104    mount: Arc<Mutex<Option<Mount>>>,
105}
106
107impl UmountOnDrop {
108    fn umount(&self) -> io::Result<()> {
109        if let Some(mount) = self.mount.lock().take() {
110            mount.umount()?;
111        }
112        Ok(())
113    }
114}
115
116impl Drop for UmountOnDrop {
117    fn drop(&mut self) {
118        if let Err(e) = self.umount() {
119            warn!("Failed to umount filesystem: {}", e);
120        }
121    }
122}
123
124/// The session data structure
125#[derive(Debug)]
126pub struct Session<FS: Filesystem> {
127    /// Filesystem operation implementations. None after `destroy` called.
128    pub(crate) filesystem: FilesystemHolder<FS>,
129    /// Communication channel to the kernel driver
130    pub(crate) ch: Channel,
131    /// Handle to the mount.  Dropping this unmounts.
132    mount: UmountOnDrop,
133    /// Whether to restrict access to owner, root + owner, or unrestricted
134    /// Used to implement `allow_root` and `auto_unmount`
135    pub(crate) allowed: SessionACL,
136    /// User that launched the fuser process
137    pub(crate) session_owner: Uid,
138    /// FUSE protocol version, as reported by the kernel.
139    /// The field is set to `Some` when the init message is received.
140    pub(crate) proto_version: Option<Version>,
141    pub(crate) config: Config,
142}
143
144impl<FS: Filesystem> AsFd for Session<FS> {
145    fn as_fd(&self) -> BorrowedFd<'_> {
146        self.ch.as_fd()
147    }
148}
149
150impl<FS: Filesystem> Session<FS> {
151    /// Create a new session by mounting the given filesystem to the given mountpoint
152    /// # Errors
153    /// Returns an error if the options are incorrect, or if the fuse device can't be mounted.
154    pub fn new<P: AsRef<Path>>(
155        filesystem: FS,
156        mountpoint: P,
157        options: &Config,
158    ) -> io::Result<Session<FS>> {
159        check_option_conflicts(options)?;
160
161        let mountpoint = mountpoint.as_ref();
162        info!("Mounting {}", mountpoint.display());
163        // If AutoUnmount is requested, but not AllowRoot or AllowOther, return an error
164        // because fusermount needs allow_root or allow_other to handle the auto_unmount option
165        if options.mount_options.contains(&MountOption::AutoUnmount)
166            && options.acl == SessionACL::Owner
167        {
168            return Err(io::Error::new(
169                io::ErrorKind::InvalidInput,
170                format!("auto_unmount requires acl != Owner, got: {:?}", options.acl),
171            ));
172        }
173        let (file, mount) = Mount::new(mountpoint, &options.mount_options, options.acl)?;
174
175        let ch = Channel::new(file);
176
177        let mut session = Session {
178            filesystem: FilesystemHolder {
179                fs: Some(filesystem),
180            },
181            ch,
182            mount: UmountOnDrop {
183                mount: Arc::new(Mutex::new(Some(mount))),
184            },
185            allowed: options.acl,
186            session_owner: geteuid(),
187            proto_version: None,
188            config: options.clone(),
189        };
190
191        session.handshake()?;
192
193        Ok(session)
194    }
195
196    /// Wrap an existing /dev/fuse file descriptor. This doesn't mount the
197    /// filesystem anywhere; that must be done separately.
198    pub fn from_fd(
199        filesystem: FS,
200        fd: OwnedFd,
201        acl: SessionACL,
202        config: Config,
203    ) -> io::Result<Self> {
204        let ch = Channel::new(Arc::new(DevFuse(File::from(fd))));
205        let mut session = Session {
206            filesystem: FilesystemHolder {
207                fs: Some(filesystem),
208            },
209            ch,
210            mount: UmountOnDrop {
211                mount: Arc::new(Mutex::new(None)),
212            },
213            allowed: acl,
214            session_owner: geteuid(),
215            proto_version: None,
216            config,
217        };
218
219        session.handshake()?;
220
221        Ok(session)
222    }
223
224    /// Run the session loop in a background thread. If the returned handle is dropped,
225    /// the filesystem is unmounted and the given session ends.
226    pub fn spawn(self) -> io::Result<BackgroundSession> {
227        let sender = self.ch.sender();
228        // Take the fuse_session, so that we can unmount it
229        let mount = std::mem::take(&mut *self.mount.mount.lock());
230        let guard = thread::Builder::new()
231            .name("fuser-bg".to_string())
232            .spawn(move || self.run())?;
233        Ok(BackgroundSession {
234            guard,
235            sender,
236            mount,
237        })
238    }
239
240    /// Run the session loop that receives kernel requests and dispatches them to method
241    /// calls into the filesystem. This read-dispatch-loop is non-concurrent to prevent
242    /// having multiple buffers (which take up much memory), but the filesystem methods
243    /// may run concurrent by spawning threads.
244    /// # Errors
245    /// Returns any final error when the session comes to an end.
246    pub fn run(self) -> io::Result<()> {
247        let Session {
248            filesystem,
249            ch,
250            mount: _do_not_umount_yet,
251            allowed,
252            session_owner,
253            proto_version: _,
254            config,
255        } = self;
256
257        let n_threads = config.n_threads.unwrap_or(1);
258
259        if !cfg!(target_os = "linux") && n_threads != 1 {
260            // TODO: check whether it works on macOS/FreeBSD and enable if it works.
261            return Err(io::Error::other(
262                "n_threads != 1 is only supported on Linux",
263            ));
264        }
265
266        let Some(n_threads_minus_one) = n_threads.checked_sub(1) else {
267            return Err(io::Error::other("n_threads"));
268        };
269
270        let mut filesystem = Arc::new(filesystem);
271
272        let mut channels = Vec::with_capacity(n_threads);
273
274        for _ in 0..n_threads_minus_one {
275            if config.clone_fd {
276                #[cfg(target_os = "linux")]
277                {
278                    channels.push(ch.clone_fd()?);
279                    continue;
280                }
281                #[cfg(not(target_os = "linux"))]
282                {
283                    return Err(io::Error::other("clone_fd is only supported on Linux"));
284                }
285            } else {
286                channels.push(ch.clone());
287            }
288        }
289        channels.push(ch);
290
291        let mut threads = Vec::with_capacity(n_threads);
292
293        for (i, ch) in channels.into_iter().enumerate() {
294            let thread_name = format!("fuser-{i}");
295            let event_loop = SessionEventLoop {
296                thread_name: thread_name.clone(),
297                filesystem: filesystem.clone(),
298                ch,
299                allowed,
300                session_owner,
301            };
302            threads.push(
303                thread::Builder::new()
304                    .name(thread_name)
305                    .spawn(move || event_loop.event_loop())?,
306            );
307        }
308
309        let mut reply: io::Result<()> = Ok(());
310        for thread in threads {
311            let res = match thread.join() {
312                Ok(res) => res,
313                Err(_) => {
314                    return Err(io::Error::other("event loop thread panicked"));
315                }
316            };
317            if let Err(e) = res {
318                if reply.is_ok() {
319                    reply = Err(e);
320                }
321            }
322        }
323
324        let Some(filesystem) = Arc::get_mut(&mut filesystem) else {
325            return Err(io::Error::other(
326                "BUG: must have one refcount for filesystem",
327            ));
328        };
329
330        filesystem.destroy();
331
332        reply
333    }
334
335    fn handshake(&mut self) -> io::Result<()> {
336        let mut buf = FuseReadBuf::new();
337        let buf = buf.as_mut();
338
339        loop {
340            // Read the init request from the kernel
341            let size = match self.ch.receive_retrying(buf) {
342                Ok(size) => size,
343                Err(nix::errno::Errno::ENODEV) => {
344                    return Err(io::Error::new(
345                        io::ErrorKind::NotConnected,
346                        "FUSE device disconnected during handshake",
347                    ));
348                }
349                Err(err) => return Err(err.into()),
350            };
351
352            // Parse the request
353            let request = match ll::AnyRequest::try_from(&buf[..size]) {
354                Ok(request) => request,
355                Err(err) => {
356                    error!("{err}");
357                    return Err(io::Error::new(io::ErrorKind::InvalidData, err.to_string()));
358                }
359            };
360
361            // Extract the init operation
362            let op = match request.operation() {
363                Ok(op) => op,
364                Err(_) => {
365                    return Err(io::Error::new(
366                        io::ErrorKind::InvalidData,
367                        "Failed to parse FUSE operation",
368                    ));
369                }
370            };
371
372            let init = match op {
373                ll::Operation::Init(init) => init,
374                _ => {
375                    error!("Received non-init FUSE operation before init: {}", request);
376                    // Send error response and return error - non-init during handshake is invalid
377                    <ReplyRaw as Reply>::new(
378                        request.unique(),
379                        ReplySender::Channel(self.ch.sender()),
380                    )
381                    .send_ll(&ResponseErrno(ll::Errno::EIO));
382                    return Err(io::Error::new(
383                        io::ErrorKind::InvalidData,
384                        "Received non-init FUSE operation during handshake",
385                    ));
386                }
387            };
388
389            let v = init.version();
390            if v.0 > abi::FUSE_KERNEL_VERSION {
391                // Kernel has a newer major version than we support.
392                // Send our version and wait for a second INIT request with a compatible version.
393                debug!(
394                    "INIT: Kernel version {} > our version {}, sending our version and waiting for next init",
395                    v.0,
396                    abi::FUSE_KERNEL_VERSION
397                );
398                let response = init.reply_version_only();
399                <ReplyRaw as Reply>::new(request.unique(), ReplySender::Channel(self.ch.sender()))
400                    .send_ll(&response);
401                continue;
402            }
403
404            // We don't support ABI versions before 7.6
405            if v < Version(7, 6) {
406                error!("Unsupported FUSE ABI version {v}");
407                <ReplyRaw as Reply>::new(request.unique(), ReplySender::Channel(self.ch.sender()))
408                    .send_ll(&ResponseErrno(ll::Errno::EPROTO));
409                return Err(io::Error::new(
410                    io::ErrorKind::Unsupported,
411                    format!("Unsupported FUSE ABI version {v}"),
412                ));
413            }
414
415            let mut config = KernelConfig::new(init.capabilities(), init.max_readahead(), v);
416
417            // Call filesystem init method and give it a chance to return an error
418            let Some(filesystem) = &mut self.filesystem.fs else {
419                return Err(io::Error::new(
420                    io::ErrorKind::InvalidInput,
421                    "Bug: filesystem must be initialized during handshake",
422                ));
423            };
424            let res = filesystem.init(Request::ref_cast(request.header()), &mut config);
425            if let Err(error) = res {
426                let errno = Errno::from_i32(error.raw_os_error().unwrap_or(0));
427                <ReplyRaw as Reply>::new(request.unique(), ReplySender::Channel(self.ch.sender()))
428                    .send_ll(&ResponseErrno(errno));
429                return Err(error);
430            }
431
432            // Remember the ABI version supported by kernel and mark the session initialized.
433            self.proto_version = Some(v);
434
435            // Log capability status for debugging
436            for bit in 0..64 {
437                let bitflags = InitFlags::from_bits_retain(1 << bit);
438                if bitflags == InitFlags::FUSE_INIT_EXT {
439                    continue;
440                }
441                let bitflag_is_known = InitFlags::all().contains(bitflags);
442                let kernel_supports = init.capabilities().contains(bitflags);
443                let we_requested = config.requested.contains(bitflags);
444                // On macOS, there's a clash between linux and macOS constants,
445                // so we pick macOS ones (last).
446                let name = if let Some((name, _)) = bitflags.iter_names().last() {
447                    Cow::Borrowed(name)
448                } else {
449                    Cow::Owned(format!("(1 << {bit})"))
450                };
451                if we_requested && kernel_supports {
452                    debug!("capability {name} enabled")
453                } else if we_requested {
454                    debug!("capability {name} not supported by kernel")
455                } else if kernel_supports {
456                    debug!("capability {name} not requested by client")
457                } else if bitflag_is_known {
458                    debug!("capability {name} not supported nor requested")
459                }
460            }
461
462            // Reply with our desired version and settings.
463            debug!(
464                "INIT response: ABI {}.{}, flags {:#x}, max readahead {}, max write {}",
465                abi::FUSE_KERNEL_VERSION,
466                abi::FUSE_KERNEL_MINOR_VERSION,
467                init.capabilities() & config.requested,
468                config.max_readahead,
469                config.max_write
470            );
471
472            let response = init.reply(&config);
473            <ReplyRaw as Reply>::new(request.unique(), ReplySender::Channel(self.ch.sender()))
474                .send_ll(&response);
475
476            return Ok(());
477        }
478    }
479
480    /// Unmount the filesystem
481    pub fn unmount(&mut self) -> io::Result<()> {
482        self.mount.umount()
483    }
484
485    /// Returns a thread-safe object that can be used to unmount the Filesystem
486    pub fn unmount_callable(&mut self) -> SessionUnmounter {
487        SessionUnmounter {
488            mount: self.mount.mount.clone(),
489        }
490    }
491
492    /// Returns an object that can be used to send notifications to the kernel
493    pub fn notifier(&self) -> Notifier {
494        Notifier::new(self.ch.sender())
495    }
496}
497
498#[derive(Debug)]
499/// A thread-safe object that can be used to unmount a Filesystem
500pub struct SessionUnmounter {
501    mount: Arc<Mutex<Option<Mount>>>,
502}
503
504impl SessionUnmounter {
505    /// Unmount the filesystem
506    pub fn unmount(&mut self) -> io::Result<()> {
507        if let Some(mount) = std::mem::take(&mut *self.mount.lock()) {
508            mount.umount()?;
509        }
510        Ok(())
511    }
512}
513
514pub(crate) struct SessionEventLoop<FS: Filesystem> {
515    /// Cache thread name for faster `debug!`.
516    pub(crate) thread_name: String,
517    pub(crate) ch: Channel,
518    pub(crate) filesystem: Arc<FilesystemHolder<FS>>,
519    pub(crate) allowed: SessionACL,
520    pub(crate) session_owner: Uid,
521}
522
523impl<FS: Filesystem> SessionEventLoop<FS> {
524    fn event_loop(&self) -> io::Result<()> {
525        // Buffer for receiving requests from the kernel. Only one is allocated and
526        // it is reused immediately after dispatching to conserve memory and allocations.
527        let mut buf = FuseReadBuf::new();
528        let buf = buf.as_mut();
529        loop {
530            // Read the next request from the given channel to kernel driver
531            // The kernel driver makes sure that we get exactly one request per read
532            match self.ch.receive_retrying(buf) {
533                Ok(size) => match RequestWithSender::new(self.ch.sender(), &buf[..size]) {
534                    // Dispatch request
535                    Some(req) => {
536                        if let Ok(Operation::Destroy(_)) = req.request.operation() {
537                            req.reply::<ReplyEmpty>().ok();
538                            return Ok(());
539                        } else {
540                            req.dispatch(self)
541                        }
542                    }
543                    // Quit loop on illegal request
544                    None => {
545                        return Err(io::Error::new(
546                            io::ErrorKind::InvalidData,
547                            "Invalid request",
548                        ));
549                    }
550                },
551                Err(nix::errno::Errno::ENODEV) => return Ok(()),
552                Err(err) => return Err(err.into()),
553            }
554        }
555    }
556}
557
558/// The background session data structure
559#[derive(Debug)]
560pub struct BackgroundSession {
561    /// Thread guard of the background session
562    pub guard: JoinHandle<io::Result<()>>,
563    /// Object for creating Notifiers for client use
564    sender: ChannelSender,
565    /// Ensures the filesystem is unmounted when the session ends
566    mount: Option<Mount>,
567}
568
569impl BackgroundSession {
570    /// Unmount the filesystem and join the background thread.
571    pub fn umount_and_join(mut self) -> io::Result<()> {
572        if let Some(mount) = self.mount.take() {
573            mount.umount()?;
574        }
575        self.join()
576    }
577
578    /// Returns an object that can be used to send notifications to the kernel
579    pub fn notifier(&self) -> Notifier {
580        Notifier::new(self.sender.clone())
581    }
582
583    /// Join the filesystem thread.
584    pub fn join(self) -> io::Result<()> {
585        self.guard
586            .join()
587            .map_err(|_panic: Box<dyn std::any::Any + Send>| {
588                io::Error::new(
589                    io::ErrorKind::Other,
590                    "filesystem background thread panicked",
591                )
592            })?
593    }
594}