1use std::fs::{File, OpenOptions};
12use std::ops::Deref;
13use std::os::unix::fs::PermissionsExt;
14use std::os::unix::io::AsRawFd;
15use std::os::unix::net::UnixStream;
16use std::path::{Path, PathBuf};
17use std::sync::{Arc, Mutex};
18
19use crate::transport::fusedev::FuseSessionExt;
20use mio::{Events, Poll, Token, Waker};
21use nix::errno::Errno;
22use nix::fcntl::{fcntl, FcntlArg, FdFlag, OFlag};
23use nix::mount::{mount, umount2, MntFlags, MsFlags};
24use nix::poll::{poll, PollFd, PollFlags};
25use nix::sys::epoll::{epoll_ctl, EpollEvent, EpollFlags, EpollOp};
26use nix::unistd::{getgid, getuid, read};
27
28use super::{
29 super::pagesize,
30 Error::{IoError, SessionFailure},
31 FuseBuf, FuseDevWriter, Reader, Result, FUSE_HEADER_SIZE, FUSE_KERN_BUF_PAGES,
32};
33
34const POLL_EVENTS_CAPACITY: usize = 1024;
36
37const FUSE_DEVICE: &str = "/dev/fuse";
38const FUSE_FSTYPE: &str = "fuse";
39const FUSERMOUNT_BIN: &str = "fusermount3";
40
41const EXIT_FUSE_EVENT: Token = Token(0);
42const FUSE_DEV_EVENT: Token = Token(1);
43
44pub struct FuseSession {
46 mountpoint: PathBuf,
47 fsname: String,
48 subtype: String,
49 file: Option<File>,
50 keep_alive: Option<UnixStream>,
52 bufsize: usize,
53 readonly: bool,
54 wakers: Mutex<Vec<Arc<Waker>>>,
55 auto_unmount: bool,
56 allow_other: bool,
57 target_mntns: Option<libc::pid_t>,
58 fusermount: String,
60}
61
62impl FuseSession {
63 pub fn new(
65 mountpoint: &Path,
66 fsname: &str,
67 subtype: &str,
68 readonly: bool,
69 ) -> Result<FuseSession> {
70 FuseSession::new_with_autounmount(mountpoint, fsname, subtype, readonly, false)
71 }
72
73 pub fn new_with_autounmount(
75 mountpoint: &Path,
76 fsname: &str,
77 subtype: &str,
78 readonly: bool,
79 auto_unmount: bool,
80 ) -> Result<FuseSession> {
81 let dest = mountpoint
82 .canonicalize()
83 .map_err(|_| SessionFailure(format!("invalid mountpoint {mountpoint:?}")))?;
84 if !dest.is_dir() {
85 return Err(SessionFailure(format!("{dest:?} is not a directory")));
86 }
87
88 Ok(FuseSession {
89 mountpoint: dest,
90 fsname: fsname.to_owned(),
91 subtype: subtype.to_owned(),
92 file: None,
93 keep_alive: None,
94 bufsize: FUSE_KERN_BUF_PAGES * pagesize() + FUSE_HEADER_SIZE,
95 readonly,
96 wakers: Mutex::new(Vec::new()),
97 auto_unmount,
98 target_mntns: None,
99 fusermount: FUSERMOUNT_BIN.to_string(),
100 allow_other: true,
101 })
102 }
103
104 pub fn set_target_mntns(&mut self, pid: Option<libc::pid_t>) {
107 self.target_mntns = pid;
108 }
109
110 pub fn set_fusermount(&mut self, bin: &str) {
112 self.fusermount = bin.to_string();
113 }
114
115 pub fn set_allow_other(&mut self, allow_other: bool) {
119 self.allow_other = allow_other;
120 }
121
122 pub fn get_fusermount(&self) -> &str {
124 self.fusermount.as_str()
125 }
126
127 pub fn get_fuse_file(&self) -> Option<&File> {
129 self.file.as_ref()
130 }
131
132 pub fn set_fuse_file(&mut self, file: File) {
134 self.file = Some(file);
135 }
136
137 pub fn clone_fuse_file(&self) -> Result<File> {
139 let mut old_fd = self
140 .file
141 .as_ref()
142 .ok_or(SessionFailure(
143 "fuse session file doesn't exist".to_string(),
144 ))?
145 .as_raw_fd();
146
147 let cloned_file = OpenOptions::new()
148 .create(false)
149 .read(true)
150 .write(true)
151 .open(FUSE_DEVICE)
152 .map_err(|e| SessionFailure(format!("open {FUSE_DEVICE}: {e}")))?;
153
154 nix::ioctl_read!(clone_fuse_fd, 229, 0, i32);
159
160 unsafe { clone_fuse_fd(cloned_file.as_raw_fd(), (&mut old_fd) as *mut i32) }
161 .map_err(|e| SessionFailure(format!("failed to clone fuse file: {:?}", e)))?;
162
163 Ok(cloned_file)
164 }
165
166 pub fn mountpoint(&self) -> &Path {
168 &self.mountpoint
169 }
170
171 pub fn fsname(&self) -> &str {
173 &self.fsname
174 }
175
176 pub fn subtype(&self) -> &str {
178 &self.subtype
179 }
180
181 pub fn bufsize(&self) -> usize {
183 self.bufsize
184 }
185
186 pub fn mount(&mut self) -> Result<()> {
188 let mut flags = MsFlags::MS_NOSUID | MsFlags::MS_NODEV | MsFlags::MS_NOATIME;
189 if self.readonly {
190 flags |= MsFlags::MS_RDONLY;
191 }
192 let (file, socket) = fuse_kern_mount(
193 &self.mountpoint,
194 &self.fsname,
195 &self.subtype,
196 flags,
197 self.auto_unmount,
198 self.allow_other,
199 self.target_mntns,
200 &self.fusermount,
201 )?;
202
203 fcntl(file.as_raw_fd(), FcntlArg::F_SETFL(OFlag::O_NONBLOCK))
204 .map_err(|e| SessionFailure(format!("set fd nonblocking: {e}")))?;
205 self.file = Some(file);
206 self.keep_alive = socket;
207
208 Ok(())
209 }
210
211 pub fn umount(&mut self) -> Result<()> {
213 if let (None, Some(file)) = (self.keep_alive.take(), self.file.take()) {
216 if let Some(mountpoint) = self.mountpoint.to_str() {
217 fuse_kern_umount(mountpoint, file, self.fusermount.as_str())
218 } else {
219 Err(SessionFailure("invalid mountpoint".to_string()))
220 }
221 } else {
222 Ok(())
223 }
224 }
225
226 pub fn new_channel(&self) -> Result<FuseChannel> {
228 if let Some(file) = &self.file {
229 let file = file
230 .try_clone()
231 .map_err(|e| SessionFailure(format!("dup fd: {e}")))?;
232 let channel = FuseChannel::new(file, self.bufsize)?;
233 let waker = channel.get_waker();
234 self.add_waker(waker)?;
235
236 Ok(channel)
237 } else {
238 Err(SessionFailure("invalid fuse session".to_string()))
239 }
240 }
241
242 pub fn wake(&self) -> Result<()> {
244 let wakers = self
245 .wakers
246 .lock()
247 .map_err(|e| SessionFailure(format!("lock wakers: {e}")))?;
248 for waker in wakers.iter() {
249 waker
250 .wake()
251 .map_err(|e| SessionFailure(format!("wake channel: {e}")))?;
252 }
253 Ok(())
254 }
255
256 fn add_waker(&self, waker: Arc<Waker>) -> Result<()> {
257 let mut wakers = self
258 .wakers
259 .lock()
260 .map_err(|e| SessionFailure(format!("lock wakers: {e}")))?;
261 wakers.push(waker);
262 Ok(())
263 }
264}
265
266impl Drop for FuseSession {
267 fn drop(&mut self) {
268 let _ = self.umount();
269 }
270}
271
272impl FuseSessionExt for FuseSession {
273 fn file(&self) -> Option<&File> {
274 self.file.as_ref()
275 }
276
277 fn bufsize(&self) -> usize {
278 self.bufsize
279 }
280}
281
282pub struct FuseChannel {
286 file: File,
287 poll: Poll,
288 waker: Arc<Waker>,
289 buf: Vec<u8>,
290}
291
292impl FuseChannel {
293 fn new(file: File, bufsize: usize) -> Result<Self> {
294 let poll = Poll::new().map_err(|e| SessionFailure(format!("epoll create: {e}")))?;
295 let waker = Waker::new(poll.registry(), EXIT_FUSE_EVENT)
296 .map_err(|e| SessionFailure(format!("epoll register session fd: {e}")))?;
297 let waker = Arc::new(waker);
298
299 let epoll = poll.as_raw_fd();
303 let mut event = EpollEvent::new(EpollFlags::EPOLLIN, usize::from(FUSE_DEV_EVENT) as u64);
304 epoll_ctl(
305 epoll,
306 EpollOp::EpollCtlAdd,
307 file.as_raw_fd(),
308 Some(&mut event),
309 )
310 .map_err(|e| SessionFailure(format!("epoll register channel fd: {e}")))?;
311
312 Ok(FuseChannel {
313 file,
314 poll,
315 waker,
316 buf: vec![0x0u8; bufsize],
317 })
318 }
319
320 fn get_waker(&self) -> Arc<Waker> {
321 self.waker.clone()
322 }
323
324 pub fn get_request(&mut self) -> Result<Option<(Reader<'_>, FuseDevWriter<'_>)>> {
331 let mut events = Events::with_capacity(POLL_EVENTS_CAPACITY);
332 let mut need_exit = false;
333 loop {
334 let mut fusereq_available = false;
335 match self.poll.poll(&mut events, None) {
336 Ok(_) => {}
337 Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => continue,
338 Err(e) => return Err(SessionFailure(format!("epoll wait: {e}"))),
339 }
340
341 for event in events.iter() {
342 if event.is_readable() {
343 match event.token() {
344 EXIT_FUSE_EVENT => need_exit = true,
345 FUSE_DEV_EVENT => fusereq_available = true,
346 x => {
347 error!("unexpected epoll event");
348 return Err(SessionFailure(format!("unexpected epoll event: {}", x.0)));
349 }
350 }
351 } else if event.is_error() {
352 info!("FUSE channel already closed!");
353 return Err(SessionFailure("epoll error".to_string()));
354 } else {
355 panic!("unknown epoll result events");
357 }
358 }
359
360 if need_exit {
363 info!("Will exit from fuse service");
364 return Ok(None);
365 }
366 if fusereq_available {
367 let fd = self.file.as_raw_fd();
368 match read(fd, &mut self.buf) {
369 Ok(len) => {
370 let buf = unsafe {
377 std::slice::from_raw_parts_mut(self.buf.as_mut_ptr(), self.buf.len())
378 };
379 let reader =
381 Reader::from_fuse_buffer(FuseBuf::new(&mut self.buf[..len])).unwrap();
382 let writer = FuseDevWriter::new(fd, buf).unwrap();
383 return Ok(Some((reader, writer)));
384 }
385 Err(e) => match e {
386 Errno::ENOENT => {
387 trace!("restart reading due to ENOENT");
389 continue;
390 }
391 Errno::EAGAIN => {
392 trace!("restart reading due to EAGAIN");
393 continue;
394 }
395 Errno::EINTR => {
396 trace!("syscall interrupted");
397 continue;
398 }
399 Errno::ENODEV => {
400 info!("fuse filesystem umounted");
401 return Ok(None);
402 }
403 e => {
404 warn! {"read fuse dev failed on fd {}: {}", fd, e};
405 return Err(SessionFailure(format!("read new request: {e:?}")));
406 }
407 },
408 }
409 }
410 }
411 }
412}
413
414#[allow(clippy::too_many_arguments)]
416fn fuse_kern_mount(
417 mountpoint: &Path,
418 fsname: &str,
419 subtype: &str,
420 flags: MsFlags,
421 auto_unmount: bool,
422 allow_other: bool,
423 target_mntns: Option<libc::pid_t>,
424 fusermount: &str,
425) -> Result<(File, Option<UnixStream>)> {
426 let file = OpenOptions::new()
427 .create(false)
428 .read(true)
429 .write(true)
430 .open(FUSE_DEVICE)
431 .map_err(|e| SessionFailure(format!("open {FUSE_DEVICE}: {e}")))?;
432 let meta = mountpoint
433 .metadata()
434 .map_err(|e| SessionFailure(format!("stat {mountpoint:?}: {e}")))?;
435 let max_read = FUSE_KERN_BUF_PAGES * pagesize() + FUSE_HEADER_SIZE;
446
447 let mut opts = format!(
448 "default_permissions,fd={},rootmode={:o},user_id={},group_id={},max_read={}",
449 file.as_raw_fd(),
450 meta.permissions().mode() & libc::S_IFMT,
451 getuid(),
452 getgid(),
453 max_read
454 );
455 if allow_other {
456 opts.push_str(",allow_other");
457 }
458 let mut fstype = String::from(FUSE_FSTYPE);
459 if !subtype.is_empty() {
460 fstype.push('.');
461 fstype.push_str(subtype);
462 }
463
464 if let Some(mountpoint) = mountpoint.to_str() {
465 info!(
466 "mount source {} dest {} with fstype {} opts {} fd {}",
467 fsname,
468 mountpoint,
469 fstype,
470 opts,
471 file.as_raw_fd(),
472 );
473 }
474
475 if auto_unmount || target_mntns.is_some() {
479 fuse_fusermount_mount(
480 mountpoint,
481 fsname,
482 subtype,
483 opts,
484 flags,
485 auto_unmount,
486 target_mntns,
487 fusermount,
488 )
489 } else {
490 match mount(
491 Some(fsname),
492 mountpoint,
493 Some(fstype.deref()),
494 flags,
495 Some(opts.deref()),
496 ) {
497 Ok(()) => Ok((file, None)),
498 Err(Errno::EPERM) => fuse_fusermount_mount(
499 mountpoint,
500 fsname,
501 subtype,
502 opts,
503 flags,
504 auto_unmount,
505 target_mntns,
506 fusermount,
507 ),
508 Err(e) => Err(SessionFailure(format!(
509 "failed to mount {mountpoint:?}: {e}"
510 ))),
511 }
512 }
513}
514
515fn msflags_to_string(flags: MsFlags) -> String {
516 [
517 (MsFlags::MS_RDONLY, ("rw", "ro")),
518 (MsFlags::MS_NOSUID, ("suid", "nosuid")),
519 (MsFlags::MS_NODEV, ("dev", "nodev")),
520 (MsFlags::MS_NOEXEC, ("exec", "noexec")),
521 (MsFlags::MS_SYNCHRONOUS, ("async", "sync")),
522 (MsFlags::MS_NOATIME, ("atime", "noatime")),
523 ]
524 .map(
525 |(flag, (neg, pos))| {
526 if flags.contains(flag) {
527 pos
528 } else {
529 neg
530 }
531 },
532 )
533 .join(",")
534}
535
536#[allow(clippy::too_many_arguments)]
538fn fuse_fusermount_mount(
539 mountpoint: &Path,
540 fsname: &str,
541 subtype: &str,
542 opts: String,
543 flags: MsFlags,
544 auto_unmount: bool,
545 target_mntns: Option<libc::pid_t>,
546 fusermount: &str,
547) -> Result<(File, Option<UnixStream>)> {
548 let mut opts = vec![format!("fsname={fsname}"), opts, msflags_to_string(flags)];
549 if !subtype.is_empty() {
550 opts.push(format!("subtype={subtype}"));
551 }
552 if auto_unmount {
553 opts.push("auto_unmount".to_owned());
554 }
555 let opts = opts.join(",");
556
557 let (send, recv) = UnixStream::pair().unwrap();
558
559 fcntl(send.as_raw_fd(), FcntlArg::F_SETFD(FdFlag::empty()))
564 .map_err(|e| SessionFailure(format!("Failed to remove close-on-exec flag: {e}")))?;
565
566 let mut cmd = match target_mntns {
567 Some(pid) => {
568 let mut c = std::process::Command::new("nsenter");
569 c.arg("-t")
570 .arg(format!("{}", pid))
571 .arg("-m")
572 .arg(fusermount);
573 c
574 }
575 None => std::process::Command::new(fusermount),
576 };
577 let mut proc = cmd
579 .env("_FUSE_COMMFD", format!("{}", send.as_raw_fd()))
580 .arg("-o")
581 .arg(opts)
582 .arg("--")
583 .arg(mountpoint)
584 .spawn()
585 .map_err(IoError)?;
586
587 if auto_unmount {
588 std::thread::spawn(move || {
589 let _ = proc.wait();
590 });
591 } else {
592 match proc.wait().map_err(IoError)?.code() {
593 Some(0) => {}
594 exit_code => {
595 return Err(SessionFailure(format!(
596 "Unexpected exit code when running fusermount: {exit_code:?}"
597 )))
598 }
599 }
600 }
601 drop(send);
602
603 match vmm_sys_util::sock_ctrl_msg::ScmSocket::recv_with_fd(&recv, &mut [0u8; 8]).map_err(
604 |e| {
605 SessionFailure(format!(
606 "Unexpected error when receiving fuse file descriptor from fusermount: {}",
607 e
608 ))
609 },
610 )? {
611 (_recv_bytes, Some(file)) => Ok((file, if auto_unmount { Some(recv) } else { None })),
612 (recv_bytes, None) => Err(SessionFailure(format!(
613 "fusermount did not send a file descriptor. We received {recv_bytes} bytes."
614 ))),
615 }
616}
617
618fn fuse_kern_umount(mountpoint: &str, file: File, fusermount: &str) -> Result<()> {
620 let mut fds = [PollFd::new(file.as_raw_fd(), PollFlags::empty())];
621
622 if poll(&mut fds, 0).is_ok() {
623 if let Some(event) = fds[0].revents() {
626 if event == PollFlags::POLLERR {
627 return Ok(());
628 }
629 }
630 }
631
632 drop(file);
635 match umount2(mountpoint, MntFlags::MNT_DETACH) {
636 Ok(()) => Ok(()),
637 Err(Errno::EPERM) => fuse_fusermount_umount(mountpoint, fusermount),
638 Err(e) => Err(SessionFailure(format!(
639 "failed to umount {mountpoint}: {e}"
640 ))),
641 }
642}
643
644fn fuse_fusermount_umount(mountpoint: &str, fusermount: &str) -> Result<()> {
646 match std::process::Command::new(fusermount)
647 .arg("--unmount")
648 .arg("--quiet")
649 .arg("--lazy")
650 .arg("--")
651 .arg(mountpoint)
652 .status()
653 .map_err(IoError)?
654 .code()
655 {
656 Some(0) => Ok(()),
657 exit_code => Err(SessionFailure(format!(
658 "Unexpected exit code when unmounting via running fusermount: {exit_code:?}"
659 ))),
660 }
661}
662
663#[cfg(test)]
664mod tests {
665 use super::*;
666 use std::fs::File;
667 use std::os::unix::io::FromRawFd;
668 use std::path::Path;
669 use vmm_sys_util::tempdir::TempDir;
670
671 #[test]
672 fn test_new_session() {
673 let se = FuseSession::new(Path::new("haha"), "foo", "bar", true);
674 assert!(se.is_err());
675
676 let dir = TempDir::new().unwrap();
677 let se = FuseSession::new(dir.as_path(), "foo", "bar", false);
678 assert!(se.is_ok());
679 }
680
681 #[test]
682 fn test_new_channel() {
683 let fd = nix::unistd::dup(std::io::stdout().as_raw_fd()).unwrap();
684 let file = unsafe { File::from_raw_fd(fd) };
685 let _ = FuseChannel::new(file, 3).unwrap();
686 }
687
688 #[test]
689 fn test_fusermount() {
690 let dir = TempDir::new().unwrap();
691 let se = FuseSession::new(dir.as_path(), "foo", "bar", true);
692 assert!(se.is_ok());
693 let mut se = se.unwrap();
694 assert_eq!(se.get_fusermount(), FUSERMOUNT_BIN);
695
696 se.set_fusermount("fusermount");
697 assert_eq!(se.get_fusermount(), "fusermount");
698 }
699
700 #[test]
701 fn test_clone_fuse_file() {
702 let dir = TempDir::new().unwrap();
703 let mut se = FuseSession::new(dir.as_path(), "foo", "bar", true).unwrap();
704 se.mount().unwrap();
705
706 let cloned_file = se.clone_fuse_file().unwrap();
707 assert!(cloned_file.as_raw_fd() > 0);
708
709 se.umount().unwrap();
710 se.set_fuse_file(cloned_file);
711 se.mount().unwrap();
712 }
713}
714
715#[cfg(feature = "async_io")]
716pub use asyncio::FuseDevTask;
717
718#[cfg(feature = "async_io")]
719mod asyncio {
721 use std::os::unix::io::RawFd;
722 use std::sync::Arc;
723
724 use crate::api::filesystem::AsyncFileSystem;
725 use crate::api::server::Server;
726 use crate::transport::{FuseBuf, Reader, Writer};
727
728 pub struct FuseDevTask<F: AsyncFileSystem + Sync> {
746 fd: RawFd,
747 buf: Vec<u8>,
748 state: AsyncExecutorState,
749 server: Arc<Server<F>>,
750 }
751
752 impl<F: AsyncFileSystem + Sync> FuseDevTask<F> {
753 pub fn new(
764 buf_size: usize,
765 fd: RawFd,
766 server: Arc<Server<F>>,
767 state: AsyncExecutorState,
768 ) -> Self {
769 FuseDevTask {
770 fd,
771 server,
772 state,
773 buf: vec![0x0u8; buf_size],
774 }
775 }
776
777 pub async fn poll_handler(&mut self) {
787 let drive = AsyncDriver::default();
789
790 while !self.state.quiescing() {
791 let result = AsyncUtil::read(drive.clone(), self.fd, &mut self.buf, 0).await;
792 match result {
793 Ok(len) => {
794 let buf = unsafe {
801 std::slice::from_raw_parts_mut(self.buf.as_mut_ptr(), self.buf.len())
802 };
803 let reader =
805 Reader::<()>::new(FuseBuf::new(&mut self.buf[0..len])).unwrap();
806 let writer = Writer::new(self.fd, buf).unwrap();
807 let result = unsafe {
808 self.server
809 .async_handle_message(drive.clone(), reader, writer, None, None)
810 .await
811 };
812
813 if let Err(e) = result {
814 error!("failed to handle fuse request, {}", e);
816 }
817 }
818 Err(e) => {
819 error!("failed to read request from fuse device fd, {}", e);
821 }
822 }
823 }
824
825 self.state.report();
829 }
830 }
831
832 impl<F: AsyncFileSystem + Sync> Clone for FuseDevTask<F> {
833 fn clone(&self) -> Self {
834 FuseDevTask {
835 fd: self.fd,
836 server: self.server.clone(),
837 state: self.state.clone(),
838 buf: vec![0x0u8; self.buf.capacity()],
839 }
840 }
841 }
842
843 #[cfg(test)]
844 mod tests {
845 use std::os::unix::io::AsRawFd;
846
847 use super::*;
848 use crate::api::{Vfs, VfsOptions};
849 use crate::async_util::{AsyncDriver, AsyncExecutor};
850
851 #[test]
852 fn test_fuse_task() {
853 let state = AsyncExecutorState::new();
854 let fs = Vfs::<AsyncDriver, ()>::new(VfsOptions::default());
855 let _server = Arc::new(Server::<Vfs<AsyncDriver, ()>, AsyncDriver, ()>::new(fs));
856 let file = vmm_sys_util::tempfile::TempFile::new().unwrap();
857 let _fd = file.as_file().as_raw_fd();
858
859 let mut executor = AsyncExecutor::new(32);
860 executor.setup().unwrap();
861
862 for _i in 0..10 {
879 executor.run_once(false).unwrap();
880 }
881
882 state.quiesce();
884 drop(file);
886
887 for _i in 0..10 {
888 executor.run_once(false).unwrap();
889 }
890 }
891 }
892}