1use std::borrow::Cow;
9use std::fs::File;
10use std::io;
11use std::os::fd::AsFd;
12use std::os::fd::BorrowedFd;
13use std::os::fd::OwnedFd;
14use std::path::Path;
15use std::sync::Arc;
16use std::thread::JoinHandle;
17use std::thread::{self};
18
19use log::debug;
20use log::error;
21use log::info;
22use log::warn;
23use nix::unistd::Uid;
24use nix::unistd::geteuid;
25use parking_lot::Mutex;
26
27use crate::Errno;
28use crate::Filesystem;
29use crate::KernelConfig;
30use crate::MountOption;
31use crate::ReplyEmpty;
32use crate::Request;
33use crate::channel::Channel;
34use crate::channel::ChannelSender;
35use crate::dev_fuse::DevFuse;
36use crate::ll;
37use crate::ll::Operation;
38use crate::ll::ResponseErrno;
39use crate::ll::Version;
40use crate::ll::flags::init_flags::InitFlags;
41use crate::ll::fuse_abi as abi;
42use crate::mnt::Mount;
43use crate::mnt::mount_options::Config;
44use crate::mnt::mount_options::check_option_conflicts;
45use crate::notify::Notifier;
46use crate::read_buf::FuseReadBuf;
47use crate::reply::Reply;
48use crate::reply::ReplyRaw;
49use crate::reply::ReplySender;
50use crate::request::RequestWithSender;
51
52pub(crate) const MAX_WRITE_SIZE: usize = 16 * 1024 * 1024;
56
57#[derive(Default, Debug, Eq, PartialEq, Clone, Copy)]
58pub enum SessionACL {
60 All,
62 RootAndOwner,
64 #[default]
66 Owner,
67}
68
69impl SessionACL {
70 #[allow(dead_code)]
74 pub(crate) fn to_mount_option(self) -> Option<&'static str> {
75 match self {
76 SessionACL::All | SessionACL::RootAndOwner => Some("allow_other"),
77 SessionACL::Owner => None,
78 }
79 }
80}
81
82#[derive(Debug)]
84pub(crate) struct FilesystemHolder<FS: Filesystem> {
85 pub(crate) fs: Option<FS>,
86}
87
88impl<FS: Filesystem> FilesystemHolder<FS> {
89 fn destroy(&mut self) {
90 if let Some(mut fs) = self.fs.take() {
91 fs.destroy();
92 }
93 }
94}
95
96impl<FS: Filesystem> Drop for FilesystemHolder<FS> {
97 fn drop(&mut self) {
98 self.destroy();
99 }
100}
101
102#[derive(Debug)]
103struct UmountOnDrop {
104 mount: Arc<Mutex<Option<Mount>>>,
105}
106
107impl UmountOnDrop {
108 fn umount(&self) -> io::Result<()> {
109 if let Some(mount) = self.mount.lock().take() {
110 mount.umount()?;
111 }
112 Ok(())
113 }
114}
115
116impl Drop for UmountOnDrop {
117 fn drop(&mut self) {
118 if let Err(e) = self.umount() {
119 warn!("Failed to umount filesystem: {}", e);
120 }
121 }
122}
123
124#[derive(Debug)]
126pub struct Session<FS: Filesystem> {
127 pub(crate) filesystem: FilesystemHolder<FS>,
129 pub(crate) ch: Channel,
131 mount: UmountOnDrop,
133 pub(crate) allowed: SessionACL,
136 pub(crate) session_owner: Uid,
138 pub(crate) proto_version: Option<Version>,
141 pub(crate) config: Config,
142}
143
144impl<FS: Filesystem> AsFd for Session<FS> {
145 fn as_fd(&self) -> BorrowedFd<'_> {
146 self.ch.as_fd()
147 }
148}
149
150impl<FS: Filesystem> Session<FS> {
151 pub fn new<P: AsRef<Path>>(
155 filesystem: FS,
156 mountpoint: P,
157 options: &Config,
158 ) -> io::Result<Session<FS>> {
159 check_option_conflicts(options)?;
160
161 let mountpoint = mountpoint.as_ref();
162 info!("Mounting {}", mountpoint.display());
163 if options.mount_options.contains(&MountOption::AutoUnmount)
166 && options.acl == SessionACL::Owner
167 {
168 return Err(io::Error::new(
169 io::ErrorKind::InvalidInput,
170 format!("auto_unmount requires acl != Owner, got: {:?}", options.acl),
171 ));
172 }
173 let (file, mount) = Mount::new(mountpoint, &options.mount_options, options.acl)?;
174
175 let ch = Channel::new(file);
176
177 let mut session = Session {
178 filesystem: FilesystemHolder {
179 fs: Some(filesystem),
180 },
181 ch,
182 mount: UmountOnDrop {
183 mount: Arc::new(Mutex::new(Some(mount))),
184 },
185 allowed: options.acl,
186 session_owner: geteuid(),
187 proto_version: None,
188 config: options.clone(),
189 };
190
191 session.handshake()?;
192
193 Ok(session)
194 }
195
196 pub fn from_fd(
199 filesystem: FS,
200 fd: OwnedFd,
201 acl: SessionACL,
202 config: Config,
203 ) -> io::Result<Self> {
204 let ch = Channel::new(Arc::new(DevFuse(File::from(fd))));
205 let mut session = Session {
206 filesystem: FilesystemHolder {
207 fs: Some(filesystem),
208 },
209 ch,
210 mount: UmountOnDrop {
211 mount: Arc::new(Mutex::new(None)),
212 },
213 allowed: acl,
214 session_owner: geteuid(),
215 proto_version: None,
216 config,
217 };
218
219 session.handshake()?;
220
221 Ok(session)
222 }
223
224 pub fn spawn(self) -> io::Result<BackgroundSession> {
227 let sender = self.ch.sender();
228 let mount = std::mem::take(&mut *self.mount.mount.lock());
230 let guard = thread::Builder::new()
231 .name("fuser-bg".to_string())
232 .spawn(move || self.run())?;
233 Ok(BackgroundSession {
234 guard,
235 sender,
236 mount,
237 })
238 }
239
240 pub fn run(self) -> io::Result<()> {
247 let Session {
248 filesystem,
249 ch,
250 mount: _do_not_umount_yet,
251 allowed,
252 session_owner,
253 proto_version: _,
254 config,
255 } = self;
256
257 let n_threads = config.n_threads.unwrap_or(1);
258
259 if !cfg!(target_os = "linux") && n_threads != 1 {
260 return Err(io::Error::other(
262 "n_threads != 1 is only supported on Linux",
263 ));
264 }
265
266 let Some(n_threads_minus_one) = n_threads.checked_sub(1) else {
267 return Err(io::Error::other("n_threads"));
268 };
269
270 let mut filesystem = Arc::new(filesystem);
271
272 let mut channels = Vec::with_capacity(n_threads);
273
274 for _ in 0..n_threads_minus_one {
275 if config.clone_fd {
276 #[cfg(target_os = "linux")]
277 {
278 channels.push(ch.clone_fd()?);
279 continue;
280 }
281 #[cfg(not(target_os = "linux"))]
282 {
283 return Err(io::Error::other("clone_fd is only supported on Linux"));
284 }
285 } else {
286 channels.push(ch.clone());
287 }
288 }
289 channels.push(ch);
290
291 let mut threads = Vec::with_capacity(n_threads);
292
293 for (i, ch) in channels.into_iter().enumerate() {
294 let thread_name = format!("fuser-{i}");
295 let event_loop = SessionEventLoop {
296 thread_name: thread_name.clone(),
297 filesystem: filesystem.clone(),
298 ch,
299 allowed,
300 session_owner,
301 };
302 threads.push(
303 thread::Builder::new()
304 .name(thread_name)
305 .spawn(move || event_loop.event_loop())?,
306 );
307 }
308
309 let mut reply: io::Result<()> = Ok(());
310 for thread in threads {
311 let res = match thread.join() {
312 Ok(res) => res,
313 Err(_) => {
314 return Err(io::Error::other("event loop thread panicked"));
315 }
316 };
317 if let Err(e) = res {
318 if reply.is_ok() {
319 reply = Err(e);
320 }
321 }
322 }
323
324 let Some(filesystem) = Arc::get_mut(&mut filesystem) else {
325 return Err(io::Error::other(
326 "BUG: must have one refcount for filesystem",
327 ));
328 };
329
330 filesystem.destroy();
331
332 reply
333 }
334
335 fn handshake(&mut self) -> io::Result<()> {
336 let mut buf = FuseReadBuf::new();
337 let buf = buf.as_mut();
338
339 loop {
340 let size = match self.ch.receive_retrying(buf) {
342 Ok(size) => size,
343 Err(nix::errno::Errno::ENODEV) => {
344 return Err(io::Error::new(
345 io::ErrorKind::NotConnected,
346 "FUSE device disconnected during handshake",
347 ));
348 }
349 Err(err) => return Err(err.into()),
350 };
351
352 let request = match ll::AnyRequest::try_from(&buf[..size]) {
354 Ok(request) => request,
355 Err(err) => {
356 error!("{err}");
357 return Err(io::Error::new(io::ErrorKind::InvalidData, err.to_string()));
358 }
359 };
360
361 let op = match request.operation() {
363 Ok(op) => op,
364 Err(_) => {
365 return Err(io::Error::new(
366 io::ErrorKind::InvalidData,
367 "Failed to parse FUSE operation",
368 ));
369 }
370 };
371
372 let init = match op {
373 ll::Operation::Init(init) => init,
374 _ => {
375 error!("Received non-init FUSE operation before init: {}", request);
376 <ReplyRaw as Reply>::new(
378 request.unique(),
379 ReplySender::Channel(self.ch.sender()),
380 )
381 .send_ll(&ResponseErrno(ll::Errno::EIO));
382 return Err(io::Error::new(
383 io::ErrorKind::InvalidData,
384 "Received non-init FUSE operation during handshake",
385 ));
386 }
387 };
388
389 let v = init.version();
390 if v.0 > abi::FUSE_KERNEL_VERSION {
391 debug!(
394 "INIT: Kernel version {} > our version {}, sending our version and waiting for next init",
395 v.0,
396 abi::FUSE_KERNEL_VERSION
397 );
398 let response = init.reply_version_only();
399 <ReplyRaw as Reply>::new(request.unique(), ReplySender::Channel(self.ch.sender()))
400 .send_ll(&response);
401 continue;
402 }
403
404 if v < Version(7, 6) {
406 error!("Unsupported FUSE ABI version {v}");
407 <ReplyRaw as Reply>::new(request.unique(), ReplySender::Channel(self.ch.sender()))
408 .send_ll(&ResponseErrno(ll::Errno::EPROTO));
409 return Err(io::Error::new(
410 io::ErrorKind::Unsupported,
411 format!("Unsupported FUSE ABI version {v}"),
412 ));
413 }
414
415 let mut config = KernelConfig::new(init.capabilities(), init.max_readahead(), v);
416
417 let Some(filesystem) = &mut self.filesystem.fs else {
419 return Err(io::Error::new(
420 io::ErrorKind::InvalidInput,
421 "Bug: filesystem must be initialized during handshake",
422 ));
423 };
424 let res = filesystem.init(Request::ref_cast(request.header()), &mut config);
425 if let Err(error) = res {
426 let errno = Errno::from_i32(error.raw_os_error().unwrap_or(0));
427 <ReplyRaw as Reply>::new(request.unique(), ReplySender::Channel(self.ch.sender()))
428 .send_ll(&ResponseErrno(errno));
429 return Err(error);
430 }
431
432 self.proto_version = Some(v);
434
435 for bit in 0..64 {
437 let bitflags = InitFlags::from_bits_retain(1 << bit);
438 if bitflags == InitFlags::FUSE_INIT_EXT {
439 continue;
440 }
441 let bitflag_is_known = InitFlags::all().contains(bitflags);
442 let kernel_supports = init.capabilities().contains(bitflags);
443 let we_requested = config.requested.contains(bitflags);
444 let name = if let Some((name, _)) = bitflags.iter_names().last() {
447 Cow::Borrowed(name)
448 } else {
449 Cow::Owned(format!("(1 << {bit})"))
450 };
451 if we_requested && kernel_supports {
452 debug!("capability {name} enabled")
453 } else if we_requested {
454 debug!("capability {name} not supported by kernel")
455 } else if kernel_supports {
456 debug!("capability {name} not requested by client")
457 } else if bitflag_is_known {
458 debug!("capability {name} not supported nor requested")
459 }
460 }
461
462 debug!(
464 "INIT response: ABI {}.{}, flags {:#x}, max readahead {}, max write {}",
465 abi::FUSE_KERNEL_VERSION,
466 abi::FUSE_KERNEL_MINOR_VERSION,
467 init.capabilities() & config.requested,
468 config.max_readahead,
469 config.max_write
470 );
471
472 let response = init.reply(&config);
473 <ReplyRaw as Reply>::new(request.unique(), ReplySender::Channel(self.ch.sender()))
474 .send_ll(&response);
475
476 return Ok(());
477 }
478 }
479
480 pub fn unmount(&mut self) -> io::Result<()> {
482 self.mount.umount()
483 }
484
485 pub fn unmount_callable(&mut self) -> SessionUnmounter {
487 SessionUnmounter {
488 mount: self.mount.mount.clone(),
489 }
490 }
491
492 pub fn notifier(&self) -> Notifier {
494 Notifier::new(self.ch.sender())
495 }
496}
497
498#[derive(Debug)]
499pub struct SessionUnmounter {
501 mount: Arc<Mutex<Option<Mount>>>,
502}
503
504impl SessionUnmounter {
505 pub fn unmount(&mut self) -> io::Result<()> {
507 if let Some(mount) = std::mem::take(&mut *self.mount.lock()) {
508 mount.umount()?;
509 }
510 Ok(())
511 }
512}
513
514pub(crate) struct SessionEventLoop<FS: Filesystem> {
515 pub(crate) thread_name: String,
517 pub(crate) ch: Channel,
518 pub(crate) filesystem: Arc<FilesystemHolder<FS>>,
519 pub(crate) allowed: SessionACL,
520 pub(crate) session_owner: Uid,
521}
522
523impl<FS: Filesystem> SessionEventLoop<FS> {
524 fn event_loop(&self) -> io::Result<()> {
525 let mut buf = FuseReadBuf::new();
528 let buf = buf.as_mut();
529 loop {
530 match self.ch.receive_retrying(buf) {
533 Ok(size) => match RequestWithSender::new(self.ch.sender(), &buf[..size]) {
534 Some(req) => {
536 if let Ok(Operation::Destroy(_)) = req.request.operation() {
537 req.reply::<ReplyEmpty>().ok();
538 return Ok(());
539 } else {
540 req.dispatch(self)
541 }
542 }
543 None => {
545 return Err(io::Error::new(
546 io::ErrorKind::InvalidData,
547 "Invalid request",
548 ));
549 }
550 },
551 Err(nix::errno::Errno::ENODEV) => return Ok(()),
552 Err(err) => return Err(err.into()),
553 }
554 }
555 }
556}
557
558#[derive(Debug)]
560pub struct BackgroundSession {
561 pub guard: JoinHandle<io::Result<()>>,
563 sender: ChannelSender,
565 mount: Option<Mount>,
567}
568
569impl BackgroundSession {
570 pub fn umount_and_join(mut self) -> io::Result<()> {
572 if let Some(mount) = self.mount.take() {
573 mount.umount()?;
574 }
575 self.join()
576 }
577
578 pub fn notifier(&self) -> Notifier {
580 Notifier::new(self.sender.clone())
581 }
582
583 pub fn join(self) -> io::Result<()> {
585 self.guard
586 .join()
587 .map_err(|_panic: Box<dyn std::any::Any + Send>| {
588 io::Error::new(
589 io::ErrorKind::Other,
590 "filesystem background thread panicked",
591 )
592 })?
593 }
594}