1use std::io;
9use std::path::Path;
10use std::sync::Arc;
11
12use log::error;
13use log::warn;
14use nix::unistd::Uid;
15use nix::unistd::geteuid;
16
17use crate::Config;
18use crate::KernelConfig;
19use crate::MountOption;
20use crate::Request;
21use crate::SessionACL;
22use crate::channel_async::AsyncChannel;
23use crate::lib_async::AsyncFilesystem;
24use crate::ll;
25use crate::ll::AnyRequest;
26use crate::ll::Version;
27use crate::ll::fuse_abi as abi;
28use crate::ll::reply::Response;
29use crate::ll::request_async::AsyncRequestWithSender;
30use crate::mnt::AsyncMount;
31use crate::mnt::mount_options::check_option_conflicts;
32use crate::read_buf::FuseReadBuf;
33use crate::session::MAX_WRITE_SIZE;
34use parking_lot::Mutex;
35
36type DropTx<T> = Arc<Mutex<Option<tokio::sync::mpsc::Sender<T>>>>;
37
38#[derive(Debug)]
40pub(crate) struct AsyncSessionGuard<FS: AsyncFilesystem> {
41 pub(crate) fs: Option<FS>,
42 pub(crate) unmount_tx: DropTx<()>,
43}
44
45impl<FS: AsyncFilesystem> AsyncSessionGuard<FS> {
47 fn destroy(&mut self) {
48 if let Some(tx) = self.unmount_tx.lock().take() {
49 tx.try_send(()).ok();
50 }
51 if let Some(mut fs) = self.fs.take() {
52 fs.destroy();
53 }
54 }
55}
56
57impl<FS: AsyncFilesystem> Drop for AsyncSessionGuard<FS> {
59 fn drop(&mut self) {
60 self.destroy();
61 }
62}
63
64#[derive(Default, Debug)]
67pub struct AsyncSessionBuilder<FS: AsyncFilesystem> {
68 filesystem: Option<FS>,
69 mountpoint: Option<String>,
70 options: Option<Config>,
71}
72
73impl<FS: AsyncFilesystem> AsyncSessionBuilder<FS> {
74 pub fn new() -> Self {
76 Self {
77 filesystem: None,
78 mountpoint: None,
79 options: None,
80 }
81 }
82
83 pub fn filesystem(mut self, fs: FS) -> Self {
85 self.filesystem = Some(fs);
86 self
87 }
88
89 pub fn mountpoint(mut self, mountpoint: impl AsRef<Path>) -> Self {
91 self.mountpoint = Some(mountpoint.as_ref().to_string_lossy().to_string());
92 self
93 }
94
95 pub fn options(mut self, options: Config) -> io::Result<Self> {
97 check_option_conflicts(&options)?;
98
99 if options.mount_options.contains(&MountOption::AutoUnmount)
101 && options.acl == SessionACL::Owner
102 {
103 return Err(io::Error::new(
104 io::ErrorKind::InvalidInput,
105 "auto_unmount requires acl != Owner".to_string(),
106 ));
107 }
108
109 self.options = Some(options);
110 Ok(self)
111 }
112
113 pub async fn build(self) -> io::Result<AsyncSession<FS>> {
115 let filesystem = self.filesystem.ok_or_else(|| {
116 io::Error::new(io::ErrorKind::InvalidInput, "`filesystem` is required")
117 })?;
118 let mountpoint = self.mountpoint.ok_or_else(|| {
119 io::Error::new(io::ErrorKind::InvalidInput, "`mountpoint` is required")
120 })?;
121 let options = self
122 .options
123 .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidInput, "`options` are required"))?;
124
125 AsyncSession::init(filesystem, mountpoint, &options).await
126 }
127}
128
129#[derive(Debug)]
131pub struct AsyncSession<FS: AsyncFilesystem> {
132 pub(crate) guard: AsyncSessionGuard<FS>,
134 pub(crate) ch: AsyncChannel,
136 pub(crate) allowed: SessionACL,
139 pub(crate) session_owner: Uid,
141 pub(crate) proto_version: Option<Version>,
144 pub(crate) config: Config,
147}
148
149impl<FS: AsyncFilesystem> AsyncSession<FS> {
150 async fn init<P: AsRef<Path>>(
155 filesystem: FS,
156 mountpoint: P,
157 options: &Config,
158 ) -> io::Result<Self> {
159 let mountpoint = mountpoint.as_ref();
160
161 let mut mount = AsyncMount::new();
163 mount = mount
164 .mount(mountpoint, &options.mount_options, options.acl)
165 .await?;
166 let file = mount.dev_fuse().ok_or_else(|| {
167 io::Error::new(
168 io::ErrorKind::Other,
169 "Failed to get /dev/fuse file descriptor from mount",
170 )
171 })?;
172 let ch = AsyncChannel::new(file.clone());
173
174 let (unmount_tx, mut unmount_rx) = tokio::sync::mpsc::channel::<()>(1);
176 tokio::spawn({
177 let mount = Arc::new(Mutex::new(Some(mount)));
178 async move {
179 let _ = unmount_rx.recv().await;
181 if let Some(mount) = mount.lock().take() {
182 drop(mount);
183 }
184 }
185 });
186
187 let mut session = AsyncSession {
188 guard: AsyncSessionGuard {
189 fs: Some(filesystem),
190 unmount_tx: Arc::new(Mutex::new(Some(unmount_tx))),
191 },
192 ch,
193 allowed: options.acl,
194 session_owner: geteuid(),
195 proto_version: None,
196 config: options.clone(),
197 };
198
199 session.handshake().await?;
200
201 Ok(session)
202 }
203
204 pub async fn run(self) -> io::Result<()> {
210 let AsyncSession {
211 guard,
212 ch,
213 allowed,
214 session_owner,
215 proto_version: _,
216 config,
217 } = self;
218 let mut filesystem = Arc::new(guard);
219
220 let n_threads = config.n_threads.unwrap_or(1);
221 if n_threads == 0 {
222 return Err(io::Error::other("n_threads"));
223 }
224 let Some(n_threads_minus_one) = n_threads.checked_sub(1) else {
225 return Err(io::Error::other("n_threads"));
226 };
227 if !cfg!(target_os = "linux") && n_threads != 1 {
228 return Err(io::Error::other(
229 "multi-threaded async sessions are only supported on Linux",
230 ));
231 }
232
233 let mut channels = Vec::with_capacity(n_threads);
236 for _ in 0..n_threads_minus_one {
237 if config.clone_fd {
238 #[cfg(target_os = "linux")]
239 {
240 channels.push(ch.clone_fd().await?);
241 continue;
242 }
243 #[cfg(not(target_os = "linux"))]
244 {
245 return Err(io::Error::other("clone_fd is only supported on Linux"));
246 }
247 } else {
248 channels.push(ch.clone());
249 }
250 }
251 channels.push(ch);
252
253 let mut tasks = Vec::with_capacity(n_threads);
255 for (i, ch) in channels.into_iter().enumerate() {
256 let thread_name = format!("fuser-async-{i}");
257 let event_loop = AsyncSessionEventLoop {
258 thread_name: thread_name.clone(),
259 filesystem: filesystem.clone(),
260 ch,
261 allowed,
262 session_owner,
263 };
264 tasks.push(tokio::spawn(async move { event_loop.event_loop().await }));
265 }
266
267 let mut reply: io::Result<()> = Ok(());
270 for task in tasks {
271 let res = match task.await {
272 Ok(res) => res,
273 Err(_) => {
274 return Err(io::Error::other("event loop task panicked"));
275 }
276 };
277 if let Err(e) = res {
278 if reply.is_ok() {
279 reply = Err(e);
280 }
281 }
282 }
283
284 let Some(filesystem) = Arc::get_mut(&mut filesystem) else {
286 return Err(io::Error::other(
287 "BUG: must have one refcount for filesystem",
288 ));
289 };
290 filesystem.destroy();
291
292 reply
293 }
294
295 async fn handshake(&mut self) -> io::Result<()> {
299 let mut buf = vec![0u8; MAX_WRITE_SIZE];
300 let sender = self.ch.sender();
301 let Some(fs) = &mut self.guard.fs else {
302 return Err(io::Error::new(
303 io::ErrorKind::Other,
304 "Filesystem was not available during handshake",
305 ));
306 };
307
308 loop {
311 let size = match self.ch.receive_retrying(&mut buf).await {
312 Ok(size) => size,
313 Err(nix::errno::Errno::ENODEV) => {
314 return Err(io::Error::new(
315 io::ErrorKind::NotConnected,
316 "FUSE device disconnected during handshake",
317 ));
318 }
319 Err(err) => return Err(err.into()),
320 };
321 let request = AnyRequest::try_from(&buf[..size])
322 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
323
324 let init = match request.operation() {
326 Ok(ll::Operation::Init(init)) => init,
327 Ok(_) => {
328 error!("Received non-init FUSE operation before init: {}", request);
329 ll::ResponseErrno(ll::Errno::EIO)
330 .send_reply(&sender, request.unique())
331 .await
332 .map_err(|e| {
333 io::Error::new(e.kind(), format!("send handshake error reply: {e}"))
334 })?;
335 return Err(io::Error::new(
336 io::ErrorKind::InvalidData,
337 "Received non-init FUSE operation during handshake",
338 ));
339 }
340 Err(_) => {
341 ll::ResponseErrno(ll::Errno::EIO)
342 .send_reply(&sender, request.unique())
343 .await
344 .map_err(|e| {
345 io::Error::new(e.kind(), format!("send handshake error reply: {e}"))
346 })?;
347 return Err(io::Error::new(
348 io::ErrorKind::InvalidData,
349 "Failed to parse FUSE operation",
350 ));
351 }
352 };
353 let v = init.version();
354
355 if v.0 > abi::FUSE_KERNEL_VERSION {
357 init.reply_version_only()
358 .send_reply(&sender, request.unique())
359 .await?;
360 continue;
361 }
362 if v < Version(7, 6) {
363 ll::ResponseErrno(ll::Errno::EPROTO)
364 .send_reply(&sender, request.unique())
365 .await
366 .map_err(|e| {
367 io::Error::new(e.kind(), format!("send handshake error reply: {e}"))
368 })?;
369 return Err(io::Error::new(
370 io::ErrorKind::Unsupported,
371 format!("Unsupported FUSE ABI version {v}"),
372 ));
373 }
374
375 let mut config = KernelConfig::new(init.capabilities(), init.max_readahead(), v);
378 if let Err(error) = fs
379 .init(Request::ref_cast(request.header()), &mut config)
380 .await
381 {
382 let errno = ll::Errno::from_i32(error.raw_os_error().unwrap_or(0));
383 ll::ResponseErrno(errno)
384 .send_reply(&sender, request.unique())
385 .await
386 .map_err(|e| {
387 io::Error::new(e.kind(), format!("send handshake error reply: {e}"))
388 })?;
389 return Err(error);
390 }
391 self.proto_version = Some(v);
392 let response = init.reply(&config);
393 response
394 .send_reply(&sender, request.unique())
395 .await
396 .map_err(|e| io::Error::new(e.kind(), format!("send init reply: {e}")))?;
397 return Ok(());
398 }
399 }
400}
401
402pub(crate) struct AsyncSessionEventLoop<FS: AsyncFilesystem> {
403 pub(crate) thread_name: String,
405 pub(crate) filesystem: Arc<AsyncSessionGuard<FS>>,
406 pub(crate) ch: AsyncChannel,
407 pub(crate) allowed: SessionACL,
408 pub(crate) session_owner: Uid,
409}
410
411impl<FS: AsyncFilesystem> Clone for AsyncSessionEventLoop<FS> {
412 fn clone(&self) -> Self {
413 Self {
414 thread_name: self.thread_name.clone(),
415 filesystem: self.filesystem.clone(),
416 ch: self.ch.clone(),
417 allowed: self.allowed,
418 session_owner: self.session_owner,
419 }
420 }
421}
422
423impl<FS: AsyncFilesystem> AsyncSessionEventLoop<FS> {
424 async fn event_loop(&self) -> io::Result<()> {
425 let mut buf = FuseReadBuf::new();
426 let buf = buf.as_mut();
427
428 loop {
429 let resp_size = match self.ch.receive_retrying(buf).await {
430 Ok(res) => res,
431 Err(nix::errno::Errno::ENODEV) => return Ok(()),
433 Err(err) => {
434 return Err(io::Error::new(
435 io::ErrorKind::Other,
436 format!("receive_retrying: {err:?}"),
437 ));
438 }
439 };
440
441 let sender = self.ch.sender();
442 let session = self.clone();
443 if let Ok(request) = AsyncRequestWithSender::new(sender, &buf[..resp_size]) {
444 tokio::spawn(async move {
445 request.dispatch(&session).await;
446 });
447 } else {
448 warn!("Received invalid request, skipping...");
449 }
450 }
451 }
452}