1use libc::{EAGAIN, EINTR, ENODEV, ENOENT};
9use log::{info, warn};
10use nix::unistd::geteuid;
11use std::fmt;
12use std::os::fd::{AsFd, BorrowedFd, OwnedFd};
13use std::path::{Path, PathBuf};
14use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
15use std::sync::{Arc, Mutex};
16use std::thread::{self, JoinHandle};
17use std::{io, ops::DerefMut};
18
19use crate::Filesystem;
20use crate::MountOption;
21use crate::ll::fuse_abi as abi;
22use crate::request::Request;
23use crate::{channel::Channel, mnt::Mount};
24use crate::{channel::ChannelSender, notify::Notifier};
25
26pub const MAX_WRITE_SIZE: usize = 16 * 1024 * 1024;
30
31const BUFFER_SIZE: usize = MAX_WRITE_SIZE + 4096;
34
35#[derive(Default, Debug, Eq, PartialEq)]
36pub enum SessionACL {
38 All,
40 RootAndOwner,
42 #[default]
44 Owner,
45}
46
47#[derive(Debug)]
49pub struct Session<FS: Filesystem> {
50 pub(crate) filesystem: FS,
52 pub(crate) ch: Channel,
54 mount: Arc<Mutex<Option<(PathBuf, Mount)>>>,
56 pub(crate) allowed: SessionACL,
59 pub(crate) session_owner: u32,
61 pub(crate) proto_major: AtomicU32,
63 pub(crate) proto_minor: AtomicU32,
65 pub(crate) initialized: AtomicBool,
67 pub(crate) destroyed: AtomicBool,
69}
70
71impl<FS: Filesystem> AsFd for Session<FS> {
72 fn as_fd(&self) -> BorrowedFd<'_> {
73 self.ch.as_fd()
74 }
75}
76
77impl<FS: Filesystem> Session<FS> {
78 pub fn new<P: AsRef<Path>>(
80 filesystem: FS,
81 mountpoint: P,
82 options: &[MountOption],
83 ) -> io::Result<Session<FS>> {
84 let mountpoint = mountpoint.as_ref();
85 info!("Mounting {}", mountpoint.display());
86 let (file, mount) = if options.contains(&MountOption::AutoUnmount)
90 && !(options.contains(&MountOption::AllowRoot)
91 || options.contains(&MountOption::AllowOther))
92 {
93 warn!(
94 "Given auto_unmount without allow_root or allow_other; adding allow_other, with userspace permission handling"
95 );
96 let mut modified_options = options.to_vec();
97 modified_options.push(MountOption::AllowOther);
98 Mount::new(mountpoint, &modified_options)?
99 } else {
100 Mount::new(mountpoint, options)?
101 };
102
103 let ch = Channel::new(file);
104 let allowed = if options.contains(&MountOption::AllowRoot) {
105 SessionACL::RootAndOwner
106 } else if options.contains(&MountOption::AllowOther) {
107 SessionACL::All
108 } else {
109 SessionACL::Owner
110 };
111
112 Ok(Session {
113 filesystem,
114 ch,
115 mount: Arc::new(Mutex::new(Some((mountpoint.to_owned(), mount)))),
116 allowed,
117 session_owner: geteuid().as_raw(),
118 proto_major: AtomicU32::new(0),
119 proto_minor: AtomicU32::new(0),
120 initialized: AtomicBool::new(false),
121 destroyed: AtomicBool::new(false),
122 })
123 }
124
125 pub fn from_fd(filesystem: FS, fd: OwnedFd, acl: SessionACL) -> Self {
128 let ch = Channel::new(Arc::new(fd.into()));
129 Session {
130 filesystem,
131 ch,
132 mount: Arc::new(Mutex::new(None)),
133 allowed: acl,
134 session_owner: geteuid().as_raw(),
135 proto_major: AtomicU32::new(0),
136 proto_minor: AtomicU32::new(0),
137 initialized: AtomicBool::new(false),
138 destroyed: AtomicBool::new(false),
139 }
140 }
141
142 pub fn run(&self) -> io::Result<()> {
145 self.run_with_callbacks(|_| {}, |_| {}, false)
146 }
147
148 pub fn run_with_callbacks<FA, FB>(
153 &self,
154 before_dispatch: FB,
155 after_dispatch: FA,
156 clone_fuse_fd: bool,
157 ) -> io::Result<()>
158 where
159 FB: FnMut(&Request<'_>),
160 FA: FnMut(&Request<'_>),
161 {
162 info!(
163 "FUSE channel cloning: {}",
164 if clone_fuse_fd { "enabled" } else { "disabled" }
165 );
166
167 if clone_fuse_fd {
168 let worker_channel = self.ch.clone_channel()?;
171 self.process_requests(&worker_channel, before_dispatch, after_dispatch)
172 } else {
173 self.process_requests(&self.ch, before_dispatch, after_dispatch)
175 }
176 }
177
178 fn process_requests<FA, FB>(
180 &self,
181 channel: &Channel,
182 mut before_dispatch: FB,
183 mut after_dispatch: FA,
184 ) -> io::Result<()>
185 where
186 FB: FnMut(&Request<'_>),
187 FA: FnMut(&Request<'_>),
188 {
189 let mut buffer = vec![0; BUFFER_SIZE];
192 let buf = aligned_sub_buf(
193 buffer.deref_mut(),
194 std::mem::align_of::<abi::fuse_in_header>(),
195 );
196 loop {
197 match channel.receive(buf) {
200 Ok(size) => match Request::new(channel.sender(), &buf[..size]) {
201 Some(req) => {
203 before_dispatch(&req);
204 req.dispatch(self);
205 after_dispatch(&req);
206 }
207 None => break,
209 },
210 Err(err) => match err.raw_os_error() {
211 Some(ENOENT) => continue,
213 Some(EINTR) => continue,
215 Some(EAGAIN) => continue,
217 Some(ENODEV) => break,
219 _ => return Err(err),
221 },
222 }
223 }
224 Ok(())
225 }
226
227 pub fn unmount(&mut self) {
229 drop(std::mem::take(&mut *self.mount.lock().unwrap()));
230 }
231
232 pub fn unmount_callable(&mut self) -> SessionUnmounter {
234 SessionUnmounter {
235 mount: self.mount.clone(),
236 }
237 }
238
239 pub fn notifier(&self) -> Notifier {
241 Notifier::new(self.ch.sender())
242 }
243}
244
245#[derive(Debug)]
246pub struct SessionUnmounter {
248 mount: Arc<Mutex<Option<(PathBuf, Mount)>>>,
249}
250
251impl SessionUnmounter {
252 pub fn unmount(&mut self) -> io::Result<()> {
254 drop(std::mem::take(&mut *self.mount.lock().unwrap()));
255 Ok(())
256 }
257}
258
259fn aligned_sub_buf(buf: &mut [u8], alignment: usize) -> &mut [u8] {
260 let off = alignment - (buf.as_ptr() as usize) % alignment;
261 if off == alignment {
262 buf
263 } else {
264 &mut buf[off..]
265 }
266}
267
268impl<FS: 'static + Filesystem + Send> Session<FS> {
269 pub fn spawn(self) -> io::Result<BackgroundSession> {
271 BackgroundSession::new(self)
272 }
273}
274
275impl<FS: Filesystem> Drop for Session<FS> {
276 fn drop(&mut self) {
277 if !self.destroyed.swap(true, Ordering::SeqCst) {
278 self.filesystem.destroy();
279 }
280
281 if let Some((mountpoint, _mount)) = std::mem::take(&mut *self.mount.lock().unwrap()) {
282 info!("unmounting session at {}", mountpoint.display());
283 }
284 }
285}
286
287pub struct BackgroundSession {
289 pub guard: JoinHandle<io::Result<()>>,
291 sender: ChannelSender,
293 _mount: Option<Mount>,
295}
296
297impl BackgroundSession {
298 pub fn new<FS: Filesystem + Send + 'static>(se: Session<FS>) -> io::Result<BackgroundSession> {
302 let sender = se.ch.sender();
303 let mount = std::mem::take(&mut *se.mount.lock().unwrap()).map(|(_, mount)| mount);
305 let guard = thread::spawn(move || se.run());
306 Ok(BackgroundSession {
307 guard,
308 sender,
309 _mount: mount,
310 })
311 }
312 pub fn join(self) {
314 let Self {
315 guard,
316 sender: _,
317 _mount,
318 } = self;
319 drop(_mount);
320 guard.join().unwrap().unwrap();
321 }
322
323 pub fn notifier(&self) -> Notifier {
325 Notifier::new(self.sender.clone())
326 }
327}
328
329impl fmt::Debug for BackgroundSession {
332 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
333 write!(f, "BackgroundSession {{ guard: JoinGuard<()> }}",)
334 }
335}