Skip to main content

async_fuser/
session_async.rs

1//! Async filesystem session
2//!
3//! A session runs a filesystem implementation while it is being mounted to a specific mount
4//! point. A session begins by mounting the filesystem and ends by unmounting it. While the
5//! filesystem is mounted, the session loop receives, dispatches and replies to kernel requests
6//! for filesystem operations under its mount point.
7
8use std::io;
9use std::path::Path;
10use std::sync::Arc;
11
12use log::error;
13use log::warn;
14use nix::unistd::Uid;
15use nix::unistd::geteuid;
16
17use crate::Config;
18use crate::KernelConfig;
19use crate::MountOption;
20use crate::Request;
21use crate::SessionACL;
22use crate::channel_async::AsyncChannel;
23use crate::lib_async::AsyncFilesystem;
24use crate::ll;
25use crate::ll::AnyRequest;
26use crate::ll::Version;
27use crate::ll::fuse_abi as abi;
28use crate::ll::reply::Response;
29use crate::ll::request_async::AsyncRequestWithSender;
30use crate::mnt::AsyncMount;
31use crate::mnt::mount_options::check_option_conflicts;
32use crate::read_buf::FuseReadBuf;
33use crate::session::MAX_WRITE_SIZE;
34use parking_lot::Mutex;
35
36type DropTx<T> = Arc<Mutex<Option<tokio::sync::mpsc::Sender<T>>>>;
37
38/// Calls `destroy` on drop.
39#[derive(Debug)]
40pub(crate) struct AsyncSessionGuard<FS: AsyncFilesystem> {
41    pub(crate) fs: Option<FS>,
42    pub(crate) unmount_tx: DropTx<()>,
43}
44
45/// Calls `destroy` on drop to do any user-specific cleanup.
46impl<FS: AsyncFilesystem> AsyncSessionGuard<FS> {
47    fn destroy(&mut self) {
48        if let Some(tx) = self.unmount_tx.lock().take() {
49            tx.try_send(()).ok();
50        }
51        if let Some(mut fs) = self.fs.take() {
52            fs.destroy();
53        }
54    }
55}
56
57/// Calls `destroy` on drop to do any user-specific cleanup.
58impl<FS: AsyncFilesystem> Drop for AsyncSessionGuard<FS> {
59    fn drop(&mut self) {
60        self.destroy();
61    }
62}
63
64/// Builder for [`AsyncSession`]. This is used to construct an instance of [`AsyncSession`]
65/// within an asynchronous context.
66#[derive(Default, Debug)]
67pub struct AsyncSessionBuilder<FS: AsyncFilesystem> {
68    filesystem: Option<FS>,
69    mountpoint: Option<String>,
70    options: Option<Config>,
71}
72
73impl<FS: AsyncFilesystem> AsyncSessionBuilder<FS> {
74    /// Create a new builder for [`AsyncSession`].
75    pub fn new() -> Self {
76        Self {
77            filesystem: None,
78            mountpoint: None,
79            options: None,
80        }
81    }
82
83    /// Set the filesystem implementation for this session. This is required.
84    pub fn filesystem(mut self, fs: FS) -> Self {
85        self.filesystem = Some(fs);
86        self
87    }
88
89    /// Set the mountpoint for this session. This is required.
90    pub fn mountpoint(mut self, mountpoint: impl AsRef<Path>) -> Self {
91        self.mountpoint = Some(mountpoint.as_ref().to_string_lossy().to_string());
92        self
93    }
94
95    /// Set the options for this session. This is required.
96    pub fn options(mut self, options: Config) -> io::Result<Self> {
97        check_option_conflicts(&options)?;
98
99        // validate permissions options
100        if options.mount_options.contains(&MountOption::AutoUnmount)
101            && options.acl == SessionACL::Owner
102        {
103            return Err(io::Error::new(
104                io::ErrorKind::InvalidInput,
105                "auto_unmount requires acl != Owner".to_string(),
106            ));
107        }
108
109        self.options = Some(options);
110        Ok(self)
111    }
112
113    /// Build the session. This will mount the filesystem and return an `AsyncSession` if successful.
114    pub async fn build(self) -> io::Result<AsyncSession<FS>> {
115        let filesystem = self.filesystem.ok_or_else(|| {
116            io::Error::new(io::ErrorKind::InvalidInput, "`filesystem` is required")
117        })?;
118        let mountpoint = self.mountpoint.ok_or_else(|| {
119            io::Error::new(io::ErrorKind::InvalidInput, "`mountpoint` is required")
120        })?;
121        let options = self
122            .options
123            .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidInput, "`options` are required"))?;
124
125        AsyncSession::init(filesystem, mountpoint, &options).await
126    }
127}
128
129/// The async session data structure
130#[derive(Debug)]
131pub struct AsyncSession<FS: AsyncFilesystem> {
132    /// Filesystem operation access and drop guard.
133    pub(crate) guard: AsyncSessionGuard<FS>,
134    /// Communication channel to the kernel driver
135    pub(crate) ch: AsyncChannel,
136    /// Whether to restrict access to owner, root + owner, or unrestricted
137    /// Used to implement `allow_root` and `auto_unmount`
138    pub(crate) allowed: SessionACL,
139    /// User that launched the fuser process
140    pub(crate) session_owner: Uid,
141    /// FUSE protocol version, as reported by the kernel.
142    /// The field is set to `Some` when the init message is received.
143    pub(crate) proto_version: Option<Version>,
144    /// Config options for this session, used for debugging and for
145    /// feature gating in the future.
146    pub(crate) config: Config,
147}
148
149impl<FS: AsyncFilesystem> AsyncSession<FS> {
150    /// Create a new session and mount the given async filesystem to the given mountpoint.
151    ///
152    /// # Errors
153    /// Returns an error if the options are incorrect, or if the fuse device can't be mounted.
154    async fn init<P: AsRef<Path>>(
155        filesystem: FS,
156        mountpoint: P,
157        options: &Config,
158    ) -> io::Result<Self> {
159        let mountpoint = mountpoint.as_ref();
160
161        // mount (async)
162        let mut mount = AsyncMount::new();
163        mount = mount
164            .mount(mountpoint, &options.mount_options, options.acl)
165            .await?;
166        let file = mount.dev_fuse().ok_or_else(|| {
167            io::Error::new(
168                io::ErrorKind::Other,
169                "Failed to get /dev/fuse file descriptor from mount",
170            )
171        })?;
172        let ch = AsyncChannel::new(file.clone());
173
174        // mount drop guard
175        let (unmount_tx, mut unmount_rx) = tokio::sync::mpsc::channel::<()>(1);
176        tokio::spawn({
177            let mount = Arc::new(Mutex::new(Some(mount)));
178            async move {
179                // Wait for the signal to unmount
180                let _ = unmount_rx.recv().await;
181                if let Some(mount) = mount.lock().take() {
182                    drop(mount);
183                }
184            }
185        });
186
187        let mut session = AsyncSession {
188            guard: AsyncSessionGuard {
189                fs: Some(filesystem),
190                unmount_tx: Arc::new(Mutex::new(Some(unmount_tx))),
191            },
192            ch,
193            allowed: options.acl,
194            session_owner: geteuid(),
195            proto_version: None,
196            config: options.clone(),
197        };
198
199        session.handshake().await?;
200
201        Ok(session)
202    }
203
204    /// Run the session async loop that receives kernel requests and dispatches them to method
205    /// calls into the filesystem.
206    ///
207    /// # Errors
208    /// Returns any final error when the session comes to an end.
209    pub async fn run(self) -> io::Result<()> {
210        let AsyncSession {
211            guard,
212            ch,
213            allowed,
214            session_owner,
215            proto_version: _,
216            config,
217        } = self;
218        let mut filesystem = Arc::new(guard);
219
220        let n_threads = config.n_threads.unwrap_or(1);
221        if n_threads == 0 {
222            return Err(io::Error::other("n_threads"));
223        }
224        let Some(n_threads_minus_one) = n_threads.checked_sub(1) else {
225            return Err(io::Error::other("n_threads"));
226        };
227        if !cfg!(target_os = "linux") && n_threads != 1 {
228            return Err(io::Error::other(
229                "multi-threaded async sessions are only supported on Linux",
230            ));
231        }
232
233        // Give each individual thread its own channel by cloning or using `FUSE_DEV_IOC_CLONE` if requested,
234        // which allows for more efficient request processing when multiple threads are used.
235        let mut channels = Vec::with_capacity(n_threads);
236        for _ in 0..n_threads_minus_one {
237            if config.clone_fd {
238                #[cfg(target_os = "linux")]
239                {
240                    channels.push(ch.clone_fd().await?);
241                    continue;
242                }
243                #[cfg(not(target_os = "linux"))]
244                {
245                    return Err(io::Error::other("clone_fd is only supported on Linux"));
246                }
247            } else {
248                channels.push(ch.clone());
249            }
250        }
251        channels.push(ch);
252
253        // Construct the event loop for each thread.
254        let mut tasks = Vec::with_capacity(n_threads);
255        for (i, ch) in channels.into_iter().enumerate() {
256            let thread_name = format!("fuser-async-{i}");
257            let event_loop = AsyncSessionEventLoop {
258                thread_name: thread_name.clone(),
259                filesystem: filesystem.clone(),
260                ch,
261                allowed,
262                session_owner,
263            };
264            tasks.push(tokio::spawn(async move { event_loop.event_loop().await }));
265        }
266
267        // Wait for all event loop tasks to finish (shouldn't happen till unmount), and return the first error
268        // if any of them fail.
269        let mut reply: io::Result<()> = Ok(());
270        for task in tasks {
271            let res = match task.await {
272                Ok(res) => res,
273                Err(_) => {
274                    return Err(io::Error::other("event loop task panicked"));
275                }
276            };
277            if let Err(e) = res {
278                if reply.is_ok() {
279                    reply = Err(e);
280                }
281            }
282        }
283
284        // Clean up the filesystem
285        let Some(filesystem) = Arc::get_mut(&mut filesystem) else {
286            return Err(io::Error::other(
287                "BUG: must have one refcount for filesystem",
288            ));
289        };
290        filesystem.destroy();
291
292        reply
293    }
294
295    /// Perform the initial handshake with the kernel, which involves receiving the init message,
296    /// replying with the kernel config, and setting the protocol version for this session. This must be
297    /// called before any other communication with the kernel can be done.
298    async fn handshake(&mut self) -> io::Result<()> {
299        let mut buf = vec![0u8; MAX_WRITE_SIZE];
300        let sender = self.ch.sender();
301        let Some(fs) = &mut self.guard.fs else {
302            return Err(io::Error::new(
303                io::ErrorKind::Other,
304                "Filesystem was not available during handshake",
305            ));
306        };
307
308        // Keep checking for an init message from the kernel until we get one with a supported version,
309        // at which point we reply to finish the handshake and return
310        loop {
311            let size = match self.ch.receive_retrying(&mut buf).await {
312                Ok(size) => size,
313                Err(nix::errno::Errno::ENODEV) => {
314                    return Err(io::Error::new(
315                        io::ErrorKind::NotConnected,
316                        "FUSE device disconnected during handshake",
317                    ));
318                }
319                Err(err) => return Err(err.into()),
320            };
321            let request = AnyRequest::try_from(&buf[..size])
322                .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
323
324            // Convert the handshake request from the kernel to a usable operation
325            let init = match request.operation() {
326                Ok(ll::Operation::Init(init)) => init,
327                Ok(_) => {
328                    error!("Received non-init FUSE operation before init: {}", request);
329                    ll::ResponseErrno(ll::Errno::EIO)
330                        .send_reply(&sender, request.unique())
331                        .await
332                        .map_err(|e| {
333                            io::Error::new(e.kind(), format!("send handshake error reply: {e}"))
334                        })?;
335                    return Err(io::Error::new(
336                        io::ErrorKind::InvalidData,
337                        "Received non-init FUSE operation during handshake",
338                    ));
339                }
340                Err(_) => {
341                    ll::ResponseErrno(ll::Errno::EIO)
342                        .send_reply(&sender, request.unique())
343                        .await
344                        .map_err(|e| {
345                            io::Error::new(e.kind(), format!("send handshake error reply: {e}"))
346                        })?;
347                    return Err(io::Error::new(
348                        io::ErrorKind::InvalidData,
349                        "Failed to parse FUSE operation",
350                    ));
351                }
352            };
353            let v = init.version();
354
355            // Validate version support
356            if v.0 > abi::FUSE_KERNEL_VERSION {
357                init.reply_version_only()
358                    .send_reply(&sender, request.unique())
359                    .await?;
360                continue;
361            }
362            if v < Version(7, 6) {
363                ll::ResponseErrno(ll::Errno::EPROTO)
364                    .send_reply(&sender, request.unique())
365                    .await
366                    .map_err(|e| {
367                        io::Error::new(e.kind(), format!("send handshake error reply: {e}"))
368                    })?;
369                return Err(io::Error::new(
370                    io::ErrorKind::Unsupported,
371                    format!("Unsupported FUSE ABI version {v}"),
372                ));
373            }
374
375            // Construct kernel config from the init message user init() implementation and reply with it to finish
376            // the handshake
377            let mut config = KernelConfig::new(init.capabilities(), init.max_readahead(), v);
378            if let Err(error) = fs
379                .init(Request::ref_cast(request.header()), &mut config)
380                .await
381            {
382                let errno = ll::Errno::from_i32(error.raw_os_error().unwrap_or(0));
383                ll::ResponseErrno(errno)
384                    .send_reply(&sender, request.unique())
385                    .await
386                    .map_err(|e| {
387                        io::Error::new(e.kind(), format!("send handshake error reply: {e}"))
388                    })?;
389                return Err(error);
390            }
391            self.proto_version = Some(v);
392            let response = init.reply(&config);
393            response
394                .send_reply(&sender, request.unique())
395                .await
396                .map_err(|e| io::Error::new(e.kind(), format!("send init reply: {e}")))?;
397            return Ok(());
398        }
399    }
400}
401
402pub(crate) struct AsyncSessionEventLoop<FS: AsyncFilesystem> {
403    /// Cache thread name for faster `debug!`.
404    pub(crate) thread_name: String,
405    pub(crate) filesystem: Arc<AsyncSessionGuard<FS>>,
406    pub(crate) ch: AsyncChannel,
407    pub(crate) allowed: SessionACL,
408    pub(crate) session_owner: Uid,
409}
410
411impl<FS: AsyncFilesystem> Clone for AsyncSessionEventLoop<FS> {
412    fn clone(&self) -> Self {
413        Self {
414            thread_name: self.thread_name.clone(),
415            filesystem: self.filesystem.clone(),
416            ch: self.ch.clone(),
417            allowed: self.allowed,
418            session_owner: self.session_owner,
419        }
420    }
421}
422
423impl<FS: AsyncFilesystem> AsyncSessionEventLoop<FS> {
424    async fn event_loop(&self) -> io::Result<()> {
425        let mut buf = FuseReadBuf::new();
426        let buf = buf.as_mut();
427
428        loop {
429            let resp_size = match self.ch.receive_retrying(buf).await {
430                Ok(res) => res,
431                // Fs unmounted or session ended, exit the loop and end the thread
432                Err(nix::errno::Errno::ENODEV) => return Ok(()),
433                Err(err) => {
434                    return Err(io::Error::new(
435                        io::ErrorKind::Other,
436                        format!("receive_retrying: {err:?}"),
437                    ));
438                }
439            };
440
441            let sender = self.ch.sender();
442            let session = self.clone();
443            if let Ok(request) = AsyncRequestWithSender::new(sender, &buf[..resp_size]) {
444                tokio::spawn(async move {
445                    request.dispatch(&session).await;
446                });
447            } else {
448                warn!("Received invalid request, skipping...");
449            }
450        }
451    }
452}