1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
use std::io;
use std::os::fd::AsFd;
use std::os::fd::BorrowedFd;
use std::sync::Arc;
use nix::errno::Errno;
use crate::dev_fuse::DevFuse;
use crate::passthrough::BackingId;
/// A raw communication channel to the FUSE kernel driver
#[derive(Debug, Clone)]
pub(crate) struct Channel(Arc<DevFuse>);
impl AsFd for Channel {
fn as_fd(&self) -> BorrowedFd<'_> {
self.0.as_fd()
}
}
impl Channel {
/// Create a new communication channel to the kernel driver by mounting the
/// given path. The kernel driver will delegate filesystem operations of
/// the given path to the channel.
pub(crate) fn new(device: Arc<DevFuse>) -> Self {
Self(device)
}
/// Receives data up to the capacity of the given buffer (can block).
fn receive(&self, buffer: &mut [u8]) -> nix::Result<usize> {
nix::unistd::read(&self.0, buffer)
}
/// Receives data up to the capacity of the given buffer (can block),
/// retrying on errors that are safe to retry (ENOENT, EINTR, EAGAIN).
///
/// - ENOENT: Operation interrupted. According to FUSE, this is safe to retry.
/// - EINTR: Interrupted system call, retry.
/// - EAGAIN: Explicitly instructed to try again.
pub(crate) fn receive_retrying(&self, buffer: &mut [u8]) -> nix::Result<usize> {
loop {
match self.receive(buffer) {
Ok(size) => return Ok(size),
Err(Errno::ENOENT | Errno::EINTR | Errno::EAGAIN) => continue,
Err(err) => return Err(err),
}
}
}
/// Returns a sender object for this channel. The sender object can be
/// used to send to the channel. Multiple sender objects can be used
/// and they can safely be sent to other threads.
pub(crate) fn sender(&self) -> ChannelSender {
// Since write/writev syscalls are threadsafe, we can simply create
// a sender by using the same file and use it in other threads.
ChannelSender(self.0.clone())
}
/// Clone the FUSE device fd using FUSE_DEV_IOC_CLONE ioctl.
///
/// This creates a new fd that can read FUSE requests independently,
/// enabling true parallel request processing. The kernel distributes
/// requests across all cloned fds.
///
/// Requires Linux 4.5+. Returns an error on older kernels or non-Linux.
#[cfg(target_os = "linux")]
pub(crate) fn clone_fd(&self) -> io::Result<Channel> {
use std::os::fd::AsRawFd;
let new_dev = DevFuse::open()?;
let mut source_fd = self.0.as_raw_fd() as u32;
// SAFETY: fuse_dev_ioc_clone is a valid ioctl for /dev/fuse
unsafe {
crate::ll::ioctl::fuse_dev_ioc_clone(new_dev.as_raw_fd(), &mut source_fd)?;
}
Ok(Channel::new(Arc::new(new_dev)))
}
}
#[derive(Clone, Debug)]
pub(crate) struct ChannelSender(Arc<DevFuse>);
impl ChannelSender {
pub(crate) fn send(&self, bufs: &[io::IoSlice<'_>]) -> io::Result<()> {
let rc = nix::sys::uio::writev(&self.0, bufs)?;
// writev is atomic, so do not need to check how many bytes are written.
// libfuse does not do it either
// https://github.com/libfuse/libfuse/blob/6278995cca991978abd25ebb2c20ebd3fc9e8a13/lib/fuse_lowlevel.c#L267
debug_assert_eq!(bufs.iter().map(|b| b.len()).sum::<usize>(), rc);
Ok(())
}
pub(crate) fn open_backing(&self, fd: BorrowedFd<'_>) -> std::io::Result<BackingId> {
BackingId::create(&self.0, fd)
}
}