use std::borrow::Cow;
use std::fs::File;
use std::io;
use std::os::fd::AsFd;
use std::os::fd::BorrowedFd;
use std::os::fd::OwnedFd;
use std::path::Path;
use std::sync::Arc;
use std::thread::JoinHandle;
use std::thread::{self};
use log::debug;
use log::error;
use log::info;
use log::warn;
use nix::unistd::Uid;
use nix::unistd::geteuid;
use parking_lot::Mutex;
use crate::Errno;
use crate::Filesystem;
use crate::KernelConfig;
use crate::MountOption;
use crate::ReplyEmpty;
use crate::Request;
use crate::channel::Channel;
use crate::channel::ChannelSender;
use crate::dev_fuse::DevFuse;
use crate::ll;
use crate::ll::Operation;
use crate::ll::ResponseErrno;
use crate::ll::Version;
use crate::ll::flags::init_flags::InitFlags;
use crate::ll::fuse_abi as abi;
use crate::mnt::Mount;
use crate::mnt::mount_options::Config;
use crate::notify::Notifier;
use crate::read_buf::FuseReadBuf;
use crate::reply::Reply;
use crate::reply::ReplyRaw;
use crate::reply::ReplySender;
use crate::request::RequestWithSender;
pub(crate) const MAX_WRITE_SIZE: usize = 16 * 1024 * 1024;
#[derive(Default, Debug, Eq, PartialEq, Clone, Copy)]
pub enum SessionACL {
All,
RootAndOwner,
#[default]
Owner,
}
impl SessionACL {
#[allow(dead_code)]
pub(crate) fn to_mount_option(self) -> Option<&'static str> {
match self {
SessionACL::All | SessionACL::RootAndOwner => Some("allow_other"),
SessionACL::Owner => None,
}
}
}
#[derive(Debug)]
pub(crate) struct FilesystemHolder<FS: Filesystem> {
pub(crate) fs: Option<FS>,
}
impl<FS: Filesystem> FilesystemHolder<FS> {
fn destroy(&mut self) {
if let Some(mut fs) = self.fs.take() {
fs.destroy();
}
}
}
impl<FS: Filesystem> Drop for FilesystemHolder<FS> {
fn drop(&mut self) {
self.destroy();
}
}
#[derive(Debug)]
struct UmountOnDrop {
mount: Arc<Mutex<Option<Mount>>>,
}
impl UmountOnDrop {
fn umount(&self) -> io::Result<()> {
if let Some(mount) = self.mount.lock().take() {
mount.umount()?;
}
Ok(())
}
}
impl Drop for UmountOnDrop {
fn drop(&mut self) {
if let Err(e) = self.umount() {
warn!("Failed to umount filesystem: {}", e);
}
}
}
#[derive(Debug)]
pub struct Session<FS: Filesystem> {
pub(crate) filesystem: FilesystemHolder<FS>,
pub(crate) ch: Channel,
mount: UmountOnDrop,
pub(crate) allowed: SessionACL,
pub(crate) session_owner: Uid,
pub(crate) proto_version: Option<Version>,
pub(crate) config: Config,
}
impl<FS: Filesystem> AsFd for Session<FS> {
fn as_fd(&self) -> BorrowedFd<'_> {
self.ch.as_fd()
}
}
impl<FS: Filesystem> Session<FS> {
pub fn new<P: AsRef<Path>>(
filesystem: FS,
mountpoint: P,
options: &Config,
) -> io::Result<Session<FS>> {
let mountpoint = mountpoint.as_ref();
info!("Mounting {}", mountpoint.display());
if options.mount_options.contains(&MountOption::AutoUnmount)
&& options.acl == SessionACL::Owner
{
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
format!("auto_unmount requires acl != Owner, got: {:?}", options.acl),
));
}
let (file, mount) = Mount::new(mountpoint, &options.mount_options, options.acl)?;
let ch = Channel::new(file);
let mut session = Session {
filesystem: FilesystemHolder {
fs: Some(filesystem),
},
ch,
mount: UmountOnDrop {
mount: Arc::new(Mutex::new(Some(mount))),
},
allowed: options.acl,
session_owner: geteuid(),
proto_version: None,
config: options.clone(),
};
session.handshake()?;
Ok(session)
}
pub fn from_fd(
filesystem: FS,
fd: OwnedFd,
acl: SessionACL,
config: Config,
) -> io::Result<Self> {
let ch = Channel::new(Arc::new(DevFuse(File::from(fd))));
let mut session = Session {
filesystem: FilesystemHolder {
fs: Some(filesystem),
},
ch,
mount: UmountOnDrop {
mount: Arc::new(Mutex::new(None)),
},
allowed: acl,
session_owner: geteuid(),
proto_version: None,
config,
};
session.handshake()?;
Ok(session)
}
pub fn spawn(self) -> io::Result<BackgroundSession> {
let sender = self.ch.sender();
let mount = std::mem::take(&mut *self.mount.mount.lock());
let guard = thread::Builder::new()
.name("fuser-bg".to_string())
.spawn(move || self.run())?;
Ok(BackgroundSession {
guard,
sender,
mount,
})
}
pub(crate) fn run(self) -> io::Result<()> {
let Session {
filesystem,
ch,
mount: _do_not_umount_yet,
allowed,
session_owner,
proto_version: _,
config,
} = self;
let n_threads = config.n_threads.unwrap_or(1);
if !cfg!(target_os = "linux") && n_threads != 1 {
return Err(io::Error::other(
"n_threads != 1 is only supported on Linux",
));
}
let Some(n_threads_minus_one) = n_threads.checked_sub(1) else {
return Err(io::Error::other("n_threads"));
};
let mut filesystem = Arc::new(filesystem);
let mut channels = Vec::with_capacity(n_threads);
for _ in 0..n_threads_minus_one {
if config.clone_fd {
#[cfg(target_os = "linux")]
{
channels.push(ch.clone_fd()?);
continue;
}
#[cfg(not(target_os = "linux"))]
{
return Err(io::Error::other("clone_fd is only supported on Linux"));
}
} else {
channels.push(ch.clone());
}
}
channels.push(ch);
let mut threads = Vec::with_capacity(n_threads);
for (i, ch) in channels.into_iter().enumerate() {
let thread_name = format!("fuser-{i}");
let event_loop = SessionEventLoop {
thread_name: thread_name.clone(),
filesystem: filesystem.clone(),
ch,
allowed,
session_owner,
};
threads.push(
thread::Builder::new()
.name(thread_name)
.spawn(move || event_loop.event_loop())?,
);
}
let mut reply: io::Result<()> = Ok(());
for thread in threads {
let res = match thread.join() {
Ok(res) => res,
Err(_) => {
return Err(io::Error::other("event loop thread panicked"));
}
};
if let Err(e) = res {
if reply.is_ok() {
reply = Err(e);
}
}
}
let Some(filesystem) = Arc::get_mut(&mut filesystem) else {
return Err(io::Error::other(
"BUG: must have one refcount for filesystem",
));
};
filesystem.destroy();
reply
}
fn handshake(&mut self) -> io::Result<()> {
let mut buf = FuseReadBuf::new();
let buf = buf.as_mut();
loop {
let size = match self.ch.receive_retrying(buf) {
Ok(size) => size,
Err(nix::errno::Errno::ENODEV) => {
return Err(io::Error::new(
io::ErrorKind::NotConnected,
"FUSE device disconnected during handshake",
));
}
Err(err) => return Err(err.into()),
};
let request = match ll::AnyRequest::try_from(&buf[..size]) {
Ok(request) => request,
Err(err) => {
error!("{err}");
return Err(io::Error::new(io::ErrorKind::InvalidData, err.to_string()));
}
};
let op = match request.operation() {
Ok(op) => op,
Err(_) => {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"Failed to parse FUSE operation",
));
}
};
let init = match op {
ll::Operation::Init(init) => init,
_ => {
error!("Received non-init FUSE operation before init: {}", request);
<ReplyRaw as Reply>::new(
request.unique(),
ReplySender::Channel(self.ch.sender()),
)
.send_ll(&ResponseErrno(ll::Errno::EIO));
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"Received non-init FUSE operation during handshake",
));
}
};
let v = init.version();
if v.0 > abi::FUSE_KERNEL_VERSION {
debug!(
"INIT: Kernel version {} > our version {}, sending our version and waiting for next init",
v.0,
abi::FUSE_KERNEL_VERSION
);
let response = init.reply_version_only();
<ReplyRaw as Reply>::new(request.unique(), ReplySender::Channel(self.ch.sender()))
.send_ll(&response);
continue;
}
if v < Version(7, 6) {
error!("Unsupported FUSE ABI version {v}");
<ReplyRaw as Reply>::new(request.unique(), ReplySender::Channel(self.ch.sender()))
.send_ll(&ResponseErrno(ll::Errno::EPROTO));
return Err(io::Error::new(
io::ErrorKind::Unsupported,
format!("Unsupported FUSE ABI version {v}"),
));
}
let mut config = KernelConfig::new(init.capabilities(), init.max_readahead(), v);
let Some(filesystem) = &mut self.filesystem.fs else {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"Bug: filesystem must be initialized during handshake",
));
};
let res = filesystem.init(Request::ref_cast(request.header()), &mut config);
if let Err(error) = res {
let errno = Errno::from_i32(error.raw_os_error().unwrap_or(0));
<ReplyRaw as Reply>::new(request.unique(), ReplySender::Channel(self.ch.sender()))
.send_ll(&ResponseErrno(errno));
return Err(error);
}
self.proto_version = Some(v);
for bit in 0..64 {
let bitflags = InitFlags::from_bits_retain(1 << bit);
if bitflags == InitFlags::FUSE_INIT_EXT {
continue;
}
let bitflag_is_known = InitFlags::all().contains(bitflags);
let kernel_supports = init.capabilities().contains(bitflags);
let we_requested = config.requested.contains(bitflags);
let name = if let Some((name, _)) = bitflags.iter_names().last() {
Cow::Borrowed(name)
} else {
Cow::Owned(format!("(1 << {bit})"))
};
if we_requested && kernel_supports {
debug!("capability {name} enabled")
} else if we_requested {
debug!("capability {name} not supported by kernel")
} else if kernel_supports {
debug!("capability {name} not requested by client")
} else if bitflag_is_known {
debug!("capability {name} not supported nor requested")
}
}
debug!(
"INIT response: ABI {}.{}, flags {:#x}, max readahead {}, max write {}",
abi::FUSE_KERNEL_VERSION,
abi::FUSE_KERNEL_MINOR_VERSION,
init.capabilities() & config.requested,
config.max_readahead,
config.max_write
);
let response = init.reply(&config);
<ReplyRaw as Reply>::new(request.unique(), ReplySender::Channel(self.ch.sender()))
.send_ll(&response);
return Ok(());
}
}
pub fn unmount(&mut self) -> io::Result<()> {
self.mount.umount()
}
pub fn unmount_callable(&mut self) -> SessionUnmounter {
SessionUnmounter {
mount: self.mount.mount.clone(),
}
}
pub fn notifier(&self) -> Notifier {
Notifier::new(self.ch.sender())
}
}
#[derive(Debug)]
pub struct SessionUnmounter {
mount: Arc<Mutex<Option<Mount>>>,
}
impl SessionUnmounter {
pub fn unmount(&mut self) -> io::Result<()> {
if let Some(mount) = std::mem::take(&mut *self.mount.lock()) {
mount.umount()?;
}
Ok(())
}
}
pub(crate) struct SessionEventLoop<FS: Filesystem> {
pub(crate) thread_name: String,
pub(crate) ch: Channel,
pub(crate) filesystem: Arc<FilesystemHolder<FS>>,
pub(crate) allowed: SessionACL,
pub(crate) session_owner: Uid,
}
impl<FS: Filesystem> SessionEventLoop<FS> {
fn event_loop(&self) -> io::Result<()> {
let mut buf = FuseReadBuf::new();
let buf = buf.as_mut();
loop {
match self.ch.receive_retrying(buf) {
Ok(size) => match RequestWithSender::new(self.ch.sender(), &buf[..size]) {
Some(req) => {
if let Ok(Operation::Destroy(_)) = req.request.operation() {
req.reply::<ReplyEmpty>().ok();
return Ok(());
} else {
req.dispatch(self)
}
}
None => {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"Invalid request",
));
}
},
Err(nix::errno::Errno::ENODEV) => return Ok(()),
Err(err) => return Err(err.into()),
}
}
}
}
#[derive(Debug)]
pub struct BackgroundSession {
pub guard: JoinHandle<io::Result<()>>,
sender: ChannelSender,
mount: Option<Mount>,
}
impl BackgroundSession {
pub fn umount_and_join(mut self) -> io::Result<()> {
if let Some(mount) = self.mount.take() {
mount.umount()?;
}
self.join()
}
pub fn notifier(&self) -> Notifier {
Notifier::new(self.sender.clone())
}
pub fn join(self) -> io::Result<()> {
self.guard
.join()
.map_err(|_panic: Box<dyn std::any::Any + Send>| {
io::Error::new(
io::ErrorKind::Other,
"filesystem background thread panicked",
)
})?
}
}