async_fusex/
session.rs

1//! The implementation of FUSE session
2
3use std::fs::File;
4use std::io::Read;
5use std::os::fd::FromRawFd;
6use std::os::unix::io::RawFd;
7use std::path::{Path, PathBuf};
8use std::sync::Arc;
9use std::thread;
10use std::time::{Duration, UNIX_EPOCH};
11
12use aligned_utils::bytes::AlignedBytes;
13use anyhow::{anyhow, Context};
14use clippy_utilities::Cast;
15use crossbeam_channel::{Receiver, Sender};
16use crossbeam_utils::atomic::AtomicCell;
17use nix::errno::Errno;
18use nix::sys::stat::SFlag;
19use nix::unistd;
20use tokio::runtime::Handle;
21use tokio_util::sync::CancellationToken;
22use tracing::{debug, error, info, instrument};
23
24use super::context::ProtoVersion;
25use super::file_system::{FileSystem};
26use super::fuse_reply::{
27    ReplyAttr, ReplyBMap, ReplyCreate, ReplyData, ReplyDirectory, ReplyEmpty, ReplyEntry,
28    ReplyInit, ReplyLock, ReplyOpen, ReplyStatFs, ReplyWrite, ReplyXAttr,
29};
30use super::fuse_request::{Operation, Request};
31use super::{mount, FuseFs};
32#[cfg(feature = "abi-7-23")]
33use super::protocol::FATTR_CTIME;
34#[cfg(feature = "abi-7-9")]
35use super::protocol::FATTR_LOCKOWNER; // {FATTR_ATIME_NOW, FATTR_MTIME_NOW};
36use super::protocol::{
37    FuseInitIn, FuseInitOut, FuseSetXAttrIn, FATTR_ATIME, FATTR_FH, FATTR_GID, FATTR_MODE,
38    FATTR_MTIME, FATTR_SIZE, FATTR_UID, FUSE_ASYNC_READ, FUSE_KERNEL_MINOR_VERSION,
39    FUSE_KERNEL_VERSION, FUSE_RELEASE_FLUSH,
40};
41use crate::fs_util::{CreateParam, FileLockParam, RenameParam, SetAttrParam};
42
43/// We generally support async reads
44#[cfg(target_os = "linux")]
45const INIT_FLAGS: u32 = FUSE_ASYNC_READ;
46// TODO: Add FUSE_EXPORT_SUPPORT and FUSE_BIG_WRITES (requires ABI 7.10)
47
48/// The max size of write requests from the kernel. The absolute minimum is 4k,
49/// FUSE recommends at least 128k, max 16M. The FUSE default is  128k on Linux.
50#[cfg(target_os = "linux")]
51const MAX_WRITE_SIZE: u32 = 128 * 1024;
52
53/// Size of the buffer for reading a request from the kernel. Since the kernel
54/// may send up to `MAX_WRITE_SIZE` bytes in a write request, we use that value
55/// plus some extra space.
56const BUFFER_SIZE: u32 = MAX_WRITE_SIZE + 512;
57
58/// We use `PAGE_SIZE` (4 KiB) as the alignment of the buffer.
59const PAGE_SIZE: usize = 4096;
60/// Max background pending requests under processing, at least to be 4,
61/// otherwise deadlock
62const MAX_BACKGROUND: u16 = 10; // TODO: set to larger value when release
63/// The max number of FUSE device reader threads.
64const MAX_FUSE_READER: usize = 2; // TODO: make it custom
65
66/// The implementation of fuse fd clone.
67/// This module is just for avoiding the `missing_docs` of `ioctl_read` macro.
68#[allow(missing_docs)] // Raised by `ioctl_read!`
69mod _fuse_fd_clone {
70    use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd, RawFd};
71
72    use clippy_utilities::Cast;
73    use nix::fcntl::{self, FcntlArg, FdFlag, OFlag};
74    use nix::ioctl_read;
75    use nix::sys::stat::Mode;
76    ioctl_read!(fuse_fd_clone_impl, 229, 0, u32);
77
78    /// Clones a FUSE session fd into a FUSE worker fd.
79    ///
80    /// # Safety
81    /// Behavior is undefined if any of the following conditions are violated:
82    ///
83    /// - `session_fd` must be a valid file descriptor to an open FUSE device.
84    #[allow(clippy::unnecessary_safety_comment)]
85    pub unsafe fn fuse_fd_clone(session_fd: RawFd) -> nix::Result<RawFd> {
86        let devname = "/dev/fuse";
87        let cloned_fd = fcntl::open(devname, OFlag::O_RDWR | OFlag::O_CLOEXEC, Mode::empty())?;
88        // use `OwnedFd` here is just to release the fd when error occurs
89        // SAFETY: the `cloned_fd` is just opened
90        let cloned_fd = unsafe {
91            OwnedFd::from_raw_fd(cloned_fd)
92        };
93
94        fcntl::fcntl(cloned_fd.as_raw_fd(), FcntlArg::F_SETFD(FdFlag::FD_CLOEXEC))?;
95
96        let mut result_fd: u32 = session_fd.cast();
97        // SAFETY: `cloned_fd` is ensured to be valid, and `&mut result_fd` is a valid
98        // pointer to a value on stack
99        unsafe {
100            fuse_fd_clone_impl(cloned_fd.as_raw_fd(), &mut result_fd)?
101        };
102        Ok(cloned_fd.into_raw_fd()) // use `into_raw_fd` to transfer the
103                                    // ownership of the fd
104    }
105}
106
107use _fuse_fd_clone::fuse_fd_clone;
108use crate::de::DeserializeError;
109
110/// A loop to read requests from FUSE device continuously
111#[allow(clippy::needless_pass_by_value)]
112fn fuse_device_reader(
113    buffer_tx: Sender<(File, AlignedBytes)>,
114    buffer_rx: Receiver<(File, AlignedBytes)>,
115    runtime_handle: Handle,
116    proto_version: ProtoVersion,
117    fs: Arc<dyn FileSystem + Send + Sync>,
118) {
119    loop {
120        let Ok((mut file, mut buffer)) = buffer_rx.recv() else {
121            error!("The channel of buffer is exhausted and closed.");
122            return;
123        };
124
125        let size = match file.read(&mut buffer) {
126            Ok(size) => size,
127            Err(e) => {
128                let errno = e.raw_os_error().map(Errno::from_raw);
129                match errno {
130                    None => {
131                        panic!("Failed to get the errno on reading failure. The error is {e}");
132                    }
133                    // Operation interrupted. Accordingly to FUSE, this is safe to retry
134                    Some(Errno::ENOENT) => {
135                        info!("operation interrupted, retry.");
136                        buffer_tx.send((file, buffer)).unwrap_or_else(|_| {
137                            unreachable!("The buffer channel is not to be closed.")
138                        });
139                        continue;
140                    }
141                    // Interrupted system call, retry
142                    Some(Errno::EINTR) => {
143                        info!("interrupted system call, retry");
144                        buffer_tx.send((file, buffer)).unwrap_or_else(|_| {
145                            unreachable!("The buffer channel is not to be closed.")
146                        });
147                        continue;
148                    }
149                    // Explicitly try again
150                    Some(Errno::EAGAIN) => {
151                        info!("Explicitly retry");
152                        buffer_tx.send((file, buffer)).unwrap_or_else(|_| {
153                            unreachable!("The buffer channel is not to be closed.")
154                        });
155                        continue;
156                    }
157                    // Filesystem was unmounted, quit the loop
158                    Some(Errno::ENODEV) => {
159                        info!("filesystem destroyed, quit the run loop");
160                        return;
161                    }
162                    // Unhandled error
163                    Some(errno) => {
164                        panic!(
165                            "non-recoverable io error when read FUSE device, \
166                            the error is: {errno}",
167                        );
168                    }
169                }
170            }
171        };
172
173        runtime_handle.block_on(async{
174            process_fuse_request(
175                buffer,
176                size,
177                file,
178                Arc::clone(&fs),
179                buffer_tx.clone(),
180                proto_version,
181            ).await
182        });
183        // if spawn_result.is_err() {
184        //     info!("Try to spawn task of `FuseRequest` after shutdow.");
185        //     return;
186        // }
187    }
188}
189
190/// Process one FUSE request
191async fn process_fuse_request(
192    byte_buffer: AlignedBytes,
193    read_size: usize,
194    mut file: File,
195    fs: Arc<dyn FileSystem + Send + Sync + 'static>,
196    sender: Sender<(File, AlignedBytes)>,
197    proto_version: ProtoVersion,
198) {
199    let bytes = byte_buffer
200        .get(..read_size)
201        .unwrap_or_else(|| panic!("failed to read {read_size} bytes from the buffer",));
202    let fuse_req = match Request::new(bytes, proto_version) {
203        // Dispatch request
204        Ok(r) => r,
205        // Quit on illegal request
206        Err(e) => {
207            if let &DeserializeError::UnknownOpCode { code, unique } = &e {
208                error!("Unknown operation code found: {code}, with context: {e}");
209                let unique = unique.unwrap_or_else(|| {
210                    unreachable!("A `unique` must be filled in by deserializer.")
211                });
212                ReplyEmpty::new(unique, &mut file)
213                    .error_code(Errno::ENOSYS)
214                    .await
215                    .unwrap_or_else(|reply_err| {
216                        panic!("Failed to reply an error code: {reply_err}.")
217                    });
218                sender.send((file, byte_buffer)).unwrap_or_else(|_| {
219                    error!("The buffer pool is closed.");
220                });
221                return;
222            }
223
224            // TODO: graceful handle request build failure
225            panic!("failed to build FUSE request, the error is: {e}");
226        }
227    };
228    debug!("received FUSE req={}", fuse_req);
229    let res = dispatch(&fuse_req, &mut file, fs).await;
230    if let Err(e) = res {
231        panic!(
232            "failed to process req={:?}, the error is: {}",
233            fuse_req,
234            crate::util::format_nix_error(e), // TODO: refactor format_nix_error()
235        );
236    }
237    let res = sender.send((file, byte_buffer));
238    if let Err(e) = res {
239        panic!("failed to put the buffer back to buffer pool, the error is: {e}",);
240    }
241}
242
243/// FUSE session
244#[allow(missing_debug_implementations)]
245pub struct Session<F: FileSystem + Send + Sync + 'static> {
246    /// FUSE device fd
247    fuse_fd: Arc<FuseFd>,
248    /// Kernel FUSE protocol version
249    proto_version: AtomicCell<ProtoVersion>,
250    /// Mount path (relative)
251    mount_path: PathBuf,
252    /// The underlying FUSE file system
253    filesystem: Arc<F>,
254    // fuse_request_spawn_handle: GcHandle,
255}
256pub async fn new_session(mount_path: &Path, fs: FuseFs)->anyhow::Result<Session<FuseFs>>{
257    let fuse_fd = mount::mount(mount_path).await.context("Failed to mount FUSE")?;
258
259    Ok(Session{
260        fuse_fd: Arc::new(FuseFd(fuse_fd)),
261        proto_version: AtomicCell::new(ProtoVersion::UNSPECIFIED),
262        mount_path: mount_path.to_owned(),
263        filesystem: Arc::new(fs),
264    })
265}
266
267/// FUSE device fd
268#[derive(Debug)]
269struct FuseFd(RawFd);
270
271impl Drop for FuseFd {
272    fn drop(&mut self) {
273        println!("Dropping FUSE fd {}", self.0);
274        unistd::close(self.0).ok();
275    }
276}
277
278impl<F: FileSystem + Send + Sync + 'static> Drop for Session<F> {
279    fn drop(&mut self) {
280        println!("Dropping FUSE session");
281        futures::executor::block_on(async {
282            let mount_path = &self.mount_path;
283            let res = mount::umount(mount_path).await;
284            println!("umount result: {:?}", res);
285            match res {
286                Ok(..) => info!("Session::drop() successfully umount {:?}", mount_path),
287                Err(e) => error!(
288                    "Session::drop() failed to umount {:?}, the error is: {}",
289                    mount_path, e,
290                ),
291            };
292        });
293    }
294}
295
296impl<F: FileSystem + Send + Sync + 'static> Session<F> {
297    /// Get FUSE device fd
298    #[inline]
299    pub fn dev_fd(&self) -> RawFd {
300        self.fuse_fd.0
301    }
302
303    /// Run the FUSE session
304    #[allow(clippy::arithmetic_side_effects, clippy::pattern_type_mismatch)] // The `select!` macro will generate code that goes against these rules.
305    pub async fn run(self, token: CancellationToken) -> anyhow::Result<()> {
306        // For recycling the buffers used by process_fuse_request.
307        let (pool_sender, pool_receiver) = self
308            .setup_buffer_pool()
309            .await
310            .context("failed to setup buffer pool")?;
311
312        for _ in 0..MAX_FUSE_READER {
313            let pool_tx = pool_sender.clone();
314            let pool_rx = pool_receiver.clone();
315            // let gc_handle = self.fuse_request_spawn_handle.clone();
316            let handle = Handle::current();
317            let fs = Arc::clone(&self.filesystem);
318            let protocol_version = self.proto_version.load();
319            // The `JoinHandle` is ignored
320            thread::spawn(move || {
321                fuse_device_reader(pool_tx, pool_rx, handle, protocol_version, fs);
322            });
323        }
324
325        drop(pool_receiver);
326
327        token.cancelled().await;
328        info!("Async FUSE session exits.");
329
330        Ok(())
331    }
332
333    /// Setup buffer pool
334    async fn setup_buffer_pool(
335        &self,
336    ) -> anyhow::Result<(Sender<(File, AlignedBytes)>, Receiver<(File, AlignedBytes)>)> {
337        let (pool_sender, pool_receiver) =
338            crossbeam_channel::bounded::<(File, AlignedBytes)>(MAX_BACKGROUND.into());
339
340        for _ in 0..MAX_BACKGROUND {
341            let buf = AlignedBytes::new_zeroed(BUFFER_SIZE.cast(), PAGE_SIZE);
342            let session_fd = self.dev_fd();
343
344            let file = unsafe {
345                // SAFETY: We assume that the fuse session fd is valid.
346                let worker_fd = fuse_fd_clone(session_fd)?;
347                // SAFETY: The worker fd is just cloned.
348                File::from_raw_fd(worker_fd)
349            };
350
351            let res = pool_sender.send((file, buf));
352            if let Err(e) = res {
353                panic!(
354                    "failed to insert buffer to buffer pool when initializing, the error is: {e}",
355                );
356            }
357        }
358
359        let (mut file, mut byte_buf) = pool_receiver.recv()?;
360        let (read_result, mut file, byte_buf) = tokio::task::spawn_blocking(move || {
361            let res = file.read(&mut byte_buf);
362            (res, file, byte_buf)
363        })
364        .await?;
365        if let Ok(read_size) = read_result {
366            debug!("read successfully {} byte data from FUSE device", read_size);
367            let bytes = byte_buf.get(..read_size).unwrap_or_else(|| {
368                panic!(
369                    "read_size is greater than buffer size: read_size = {}, buffer size = {}",
370                    read_size,
371                    byte_buf.len()
372                )
373            });
374            if let Ok(req) = Request::new(bytes, self.proto_version.load()) {
375                if let Operation::Init { arg } = *req.operation() {
376                    let filesystem = Arc::clone(&self.filesystem);
377                    self.init(arg, &req, &*filesystem, &mut file).await?;
378                }
379            }
380        }
381        pool_sender
382            .send((file, byte_buf))
383            .context("failed to put buffer back to buffer pool after FUSE init")?;
384
385        Ok((pool_sender, pool_receiver))
386    }
387
388    /// Initialize FUSE session
389    #[allow(single_use_lifetimes)] // false positive
390    async fn init<'a>(
391        &self,
392        arg: &'_ FuseInitIn,
393        req: &'_ Request<'a>,
394        fs: &'_ (dyn FileSystem + Send + Sync + 'static),
395        file: &mut File,
396    ) -> anyhow::Result<()> {
397        debug!("Init args={:?}", arg);
398        // TODO: rewrite init based on do_init() in fuse_lowlevel.c
399        // https://github.com/libfuse/libfuse/blob/master/lib/fuse_lowlevel.c#L1892
400        let reply = ReplyInit::new(req.unique(), file);
401        // We don't support ABI versions before 7.8
402        if arg.major < 7 || (arg.major == 7 && arg.minor < 8) {
403            error!("Unsupported FUSE ABI version={}.{}", arg.major, arg.minor);
404            reply.error_code(Errno::EPROTO).await?;
405            return Err(anyhow!("FUSE ABI version too low"));
406        }
407        // Call filesystem init method and give it a chance to return an error
408        let filesystem = fs;
409        let init_res = filesystem.init(req).await;
410        if let Err(err) = init_res {
411            reply.error_code(Errno::ENOSYS).await?;
412            return Err(anyhow!("user defined init failed, the error is: {}", err,));
413        }
414        let flags = arg.flags & INIT_FLAGS; // TODO: handle init flags properly
415        #[cfg(not(feature = "abi-7-13"))]
416        let unused = 0_u32;
417        #[cfg(feature = "abi-7-13")]
418        let congestion_threshold = 10_u16; // TODO: set congestion threshold
419        #[cfg(feature = "abi-7-23")]
420        let time_gran = 1_u32; // TODO: set time_gran
421        #[cfg(all(feature = "abi-7-23", not(feature = "abi-7-28")))]
422        let unused = [0_u32; 9];
423        #[cfg(feature = "abi-7-28")]
424        let max_pages = 0_u16; // TODO: max_pages = (max_write - 1) / getpagesize() + 1;
425        #[cfg(feature = "abi-7-28")]
426        let padding = 0_u16;
427        #[cfg(feature = "abi-7-28")]
428        let unused = [0_u32; 8];
429        // Reply with our desired version and settings. If the kernel supports a
430        // larger major version, it'll re-send a matching init message. If it
431        // supports only lower major versions, we replied with an error above.
432        reply
433            .init(FuseInitOut {
434                major: FUSE_KERNEL_VERSION,
435                minor: FUSE_KERNEL_MINOR_VERSION, /* Do not change minor version, otherwise
436                                                   * unknown panic */
437                max_readahead: arg.max_readahead, // accept FUSE kernel module max_readahead
438                flags,                            /* TODO: use features given in INIT_FLAGS and
439                                                   * reported as capable */
440                #[cfg(not(feature = "abi-7-13"))]
441                unused,
442                #[cfg(feature = "abi-7-13")]
443                max_background: MAX_BACKGROUND,
444                #[cfg(feature = "abi-7-13")]
445                congestion_threshold,
446                max_write: MAX_WRITE_SIZE,
447                #[cfg(feature = "abi-7-23")]
448                time_gran,
449                #[cfg(all(feature = "abi-7-23", not(feature = "abi-7-28")))]
450                unused,
451                #[cfg(feature = "abi-7-28")]
452                max_pages,
453                #[cfg(feature = "abi-7-28")]
454                padding,
455                #[cfg(feature = "abi-7-28")]
456                unused,
457            })
458            .await?;
459        debug!(
460            "INIT response: ABI version={}.{}, flags={:#x}, max readahead={}, max write={}",
461            FUSE_KERNEL_VERSION,
462            FUSE_KERNEL_MINOR_VERSION,
463            flags,
464            arg.max_readahead,
465            MAX_WRITE_SIZE,
466        );
467
468        // Store the kernel FUSE major and minor version
469        self.proto_version.store(ProtoVersion {
470            major: arg.major,
471            minor: arg.minor,
472        });
473
474        Ok(())
475    }
476}
477
478/// Dispatch request to the filesystem
479/// This calls the appropriate filesystem operation method for the
480/// request and sends back the returned reply to the kernel
481#[allow(clippy::too_many_lines)]
482#[instrument(name="request",skip(req, file, fs), fields(fuse_id =req.unique(),ino=req.nodeid(), op=%req.operation(), len=req.len()),ret)]
483async fn dispatch<'a>(
484    req: &'a Request<'a>,
485    file: &mut File,
486    fs: Arc<dyn FileSystem + Send + Sync + 'static>,
487) -> nix::Result<usize> {
488    let result = match *req.operation() {
489        // Filesystem initialization
490        Operation::Init { .. } => panic!("FUSE should have already initialized"),
491
492        // Filesystem destroyed
493        Operation::Destroy => {
494            fs.destroy(req).await;
495            let reply = ReplyEmpty::new(req.unique(), file);
496            reply.ok().await
497        }
498
499        Operation::Interrupt { arg } => {
500            fs.interrupt(req, arg.unique).await; // No reply
501            Ok(0)
502        }
503
504        Operation::Lookup { name } => {
505            let reply = ReplyEntry::new(req.unique(), file);
506            fs.lookup(req, req.nodeid(), name, reply).await
507        }
508        Operation::Forget { arg } => {
509            fs.forget(req, arg.nlookup).await; // No reply
510            Ok(0)
511        }
512        Operation::GetAttr => {
513            let reply = ReplyAttr::new(req.unique(), file);
514            fs.getattr(req, reply).await
515        }
516        Operation::SetAttr { arg } => {
517            #[cfg(feature = "abi-7-9")]
518            use std::time::SystemTime;
519
520            #[cfg(feature = "abi-7-9")]
521            use super::protocol::{FATTR_ATIME_NOW, FATTR_MTIME_NOW};
522
523            let mode = match arg.valid & FATTR_MODE {
524                0 => None,
525                _ => Some(arg.mode),
526            };
527            let u_id = match arg.valid & FATTR_UID {
528                0 => None,
529                _ => Some(arg.uid),
530            };
531            let g_id = match arg.valid & FATTR_GID {
532                0 => None,
533                _ => Some(arg.gid),
534            };
535            let size = match arg.valid & FATTR_SIZE {
536                0 => None,
537                _ => Some(arg.size),
538            };
539            let a_time = match arg.valid & FATTR_ATIME {
540                0 => None,
541                _ => Some(UNIX_EPOCH + Duration::new(arg.atime, arg.atimensec)),
542            };
543            let m_time = match arg.valid & FATTR_MTIME {
544                0 => None,
545                _ => Some(UNIX_EPOCH + Duration::new(arg.mtime, arg.mtimensec)),
546            };
547            let fh = match arg.valid & FATTR_FH {
548                0 => None,
549                _ => Some(arg.fh),
550            };
551
552            #[cfg(feature = "abi-7-9")]
553            let a_time = match arg.valid & FATTR_ATIME_NOW {
554                0 => a_time,
555                _ => Some(SystemTime::now()),
556            };
557            #[cfg(feature = "abi-7-9")]
558            let m_time = match arg.valid & FATTR_MTIME_NOW {
559                0 => m_time,
560                _ => Some(SystemTime::now()),
561            };
562
563            #[cfg(feature = "abi-7-9")]
564            let lock_owner = match arg.valid & FATTR_LOCKOWNER {
565                0 => None,
566                _ => Some(arg.lock_owner),
567            };
568            #[cfg(feature = "abi-7-23")]
569            let c_time = match arg.valid & FATTR_CTIME {
570                0 => None,
571                _ => Some(UNIX_EPOCH + Duration::new(arg.ctime, arg.ctimensec)),
572            };
573
574            let reply = ReplyAttr::new(req.unique(), file);
575            let param = SetAttrParam {
576                valid: arg.valid,
577                fh,
578                mode,
579                u_id,
580                g_id,
581                size,
582                #[cfg(feature = "abi-7-9")]
583                lock_owner,
584                a_time,
585                m_time,
586                #[cfg(feature = "abi-7-23")]
587                c_time,
588            };
589            fs.setattr(req, param, reply).await
590        }
591        Operation::ReadLink => {
592            let reply = ReplyData::new(req.unique(), file);
593            fs.readlink(req, reply).await
594        }
595        Operation::MkNod { arg, name } => {
596            let param = CreateParam {
597                parent: req.nodeid(),
598                name: name.to_owned(),
599                mode: arg.mode,
600                rdev: arg.rdev,
601                uid: req.uid(),
602                gid: req.gid(),
603                node_type: SFlag::S_IFREG,
604                link: None,
605            };
606            let reply = ReplyEntry::new(req.unique(), file);
607            fs.mknod(req, param, reply).await
608        }
609        Operation::MkDir { arg, name } => {
610            let reply = ReplyEntry::new(req.unique(), file);
611            fs.mkdir(req, req.nodeid(), name, arg.mode, reply).await
612        }
613        Operation::Unlink { name } => {
614            let reply = ReplyEmpty::new(req.unique(), file);
615            fs.unlink(req, req.nodeid(), name, reply).await
616        }
617        Operation::RmDir { name } => {
618            let reply = ReplyEmpty::new(req.unique(), file);
619            fs.rmdir(req, req.nodeid(), name, reply).await
620        }
621        Operation::SymLink { name, link } => {
622            let reply = ReplyEntry::new(req.unique(), file);
623            fs.symlink(req, req.nodeid(), name, Path::new(link), reply)
624                .await
625        }
626        Operation::Rename {
627            arg,
628            oldname,
629            newname,
630        } => {
631            let reply = ReplyEmpty::new(req.unique(), file);
632            let param = RenameParam {
633                old_parent: req.nodeid(),
634                old_name: oldname.to_owned(),
635                new_parent: arg.newdir,
636                new_name: newname.to_owned(),
637                flags: 0,
638            };
639            fs.rename(req, param, reply).await
640        }
641        Operation::Link { arg, name } => {
642            let reply = ReplyEntry::new(req.unique(), file);
643            fs.link(req, arg.oldnodeid, name, reply).await
644        }
645        Operation::Open { arg } => {
646            let reply = ReplyOpen::new(req.unique(), file);
647            fs.open(req, arg.flags, reply).await
648        }
649        Operation::Read { arg } => {
650            let reply = ReplyData::new(req.unique(), file);
651            fs.read(req, arg.fh, arg.offset.cast(), arg.size, reply)
652                .await
653        }
654        Operation::Write { arg, data } => {
655            info!("operation:write: {:?}", arg);
656            assert_eq!(data.len(), arg.size.cast::<usize>());
657            let reply = ReplyWrite::new(req.unique(), file);
658            fs.write(
659                req,
660                arg.fh,
661                arg.offset.cast(),
662                data.to_vec(), // TODO: consider zero copy
663                arg.write_flags,
664                reply,
665            )
666            .await
667        }
668        Operation::Flush { arg } => {
669            let reply = ReplyEmpty::new(req.unique(), file);
670            fs.flush(req, arg.fh, arg.lock_owner, reply).await
671        }
672        Operation::Release { arg } => {
673            let flush = !matches!(arg.release_flags & FUSE_RELEASE_FLUSH, 0);
674            let reply = ReplyEmpty::new(req.unique(), file);
675            fs.release(req, arg.fh, arg.flags, arg.lock_owner, flush, reply)
676                .await
677        }
678        Operation::FSync { arg } => {
679            let datasync = !matches!(arg.fsync_flags & 1, 0);
680            let reply = ReplyEmpty::new(req.unique(), file);
681            fs.fsync(req, arg.fh, datasync, reply).await
682        }
683        Operation::OpenDir { arg } => {
684            let reply = ReplyOpen::new(req.unique(), file);
685            fs.opendir(req, arg.flags, reply).await
686        }
687        Operation::ReadDir { arg } => {
688            let reply = ReplyDirectory::new(req.unique(), file, arg.size.cast());
689            fs.readdir(req, arg.fh, arg.offset.cast(), reply).await
690        }
691        Operation::ReleaseDir { arg } => {
692            let reply = ReplyEmpty::new(req.unique(), file);
693            fs.releasedir(req, arg.fh, arg.flags, reply).await
694        }
695        Operation::FSyncDir { arg } => {
696            let datasync = !matches!(arg.fsync_flags & 1, 0);
697            let reply = ReplyEmpty::new(req.unique(), file);
698            fs.fsyncdir(req, arg.fh, datasync, reply).await
699        }
700        Operation::StatFs => {
701            let reply = ReplyStatFs::new(req.unique(), file);
702            fs.statfs(req, reply).await
703        }
704        Operation::SetXAttr { arg, name, value } => {
705            /// Set the position of an extended attribute
706            /// zero for Linux
707            #[cfg(target_os = "linux")]
708            #[inline]
709            const fn get_position(_arg: &FuseSetXAttrIn) -> u32 {
710                0
711            }
712            assert!(value.len() == arg.size.cast::<usize>());
713            let reply = ReplyEmpty::new(req.unique(), file);
714            fs.setxattr(req, name, value, arg.flags, get_position(arg), reply)
715                .await
716        }
717        Operation::GetXAttr { arg, name } => {
718            let reply = ReplyXAttr::new(req.unique(), file);
719            fs.getxattr(req, name, arg.size, reply).await
720        }
721        Operation::ListXAttr { arg } => {
722            let reply = ReplyXAttr::new(req.unique(), file);
723            fs.listxattr(req, arg.size, reply).await
724        }
725        Operation::RemoveXAttr { name } => {
726            let reply = ReplyEmpty::new(req.unique(), file);
727            fs.removexattr(req, name, reply).await
728        }
729        Operation::Access { arg } => {
730            let reply = ReplyEmpty::new(req.unique(), file);
731            fs.access(req, arg.mask, reply).await
732        }
733        Operation::Create { arg, name } => {
734            let reply = ReplyCreate::new(req.unique(), file);
735            fs.create(req, req.nodeid(), name, arg.mode, arg.flags, reply)
736                .await
737        }
738        Operation::GetLk { arg } => {
739            let reply = ReplyLock::new(req.unique(), file);
740            let lock_param = FileLockParam {
741                fh: arg.fh,
742                lock_owner: arg.owner,
743                start: arg.lk.start,
744                end: arg.lk.end,
745                typ: arg.lk.typ,
746                pid: arg.lk.pid,
747            };
748            fs.getlk(req, lock_param, reply).await
749        }
750        Operation::SetLk { arg } => {
751            let reply = ReplyEmpty::new(req.unique(), file);
752            let lock_param = FileLockParam {
753                fh: arg.fh,
754                lock_owner: arg.owner,
755                start: arg.lk.start,
756                end: arg.lk.end,
757                typ: arg.lk.typ,
758                pid: arg.lk.pid,
759            };
760            fs.setlk(req, lock_param, false, reply).await
761        }
762        Operation::SetLkW { arg } => {
763            let reply = ReplyEmpty::new(req.unique(), file);
764            let lock_param = FileLockParam {
765                fh: arg.fh,
766                lock_owner: arg.owner,
767                start: arg.lk.start,
768                end: arg.lk.end,
769                typ: arg.lk.typ,
770                pid: arg.lk.pid,
771            };
772            fs.setlk(
773                req, lock_param, true, // sleep
774                reply,
775            )
776            .await
777        }
778        Operation::BMap { arg } => {
779            let reply = ReplyBMap::new(req.unique(), file);
780            fs.bmap(req, arg.blocksize, arg.block, reply).await
781        }
782
783        #[cfg(feature = "abi-7-11")]
784        Operation::IoCtl { arg, data } => {
785            error!("IoCtl not implemented, arg={:?}, data={:?}", arg, data);
786            not_implement_helper(req, file).await
787        }
788        #[cfg(feature = "abi-7-11")]
789        Operation::Poll { arg } => {
790            error!("Poll not implemented, arg={:?}", arg);
791            not_implement_helper(req, file).await
792        }
793        #[cfg(feature = "abi-7-15")]
794        Operation::NotifyReply { data } => {
795            error!("NotifyReply not implemented, data={:?}", data);
796            not_implement_helper(req, file).await
797        }
798        #[cfg(feature = "abi-7-16")]
799        Operation::BatchForget { arg, nodes } => {
800            error!(
801                "BatchForget not implemented, arg={:?}, nodes={:?}",
802                arg, nodes
803            );
804            not_implement_helper(req, file).await
805        }
806        #[cfg(feature = "abi-7-19")]
807        Operation::FAllocate { arg } => {
808            error!("FAllocate not implemented, arg={:?}", arg);
809            not_implement_helper(req, file).await
810        }
811        #[cfg(feature = "abi-7-21")]
812        Operation::ReadDirPlus { arg } => {
813            error!("ReadDirPlus not implemented, arg={:?}", arg);
814            not_implement_helper(req, file).await
815        }
816        #[cfg(feature = "abi-7-23")]
817        Operation::Rename2 {
818            arg,
819            oldname,
820            newname,
821        } => {
822            let reply = ReplyEmpty::new(req.unique(), file);
823            let param = RenameParam {
824                old_parent: req.nodeid(),
825                old_name: oldname.to_owned(),
826                new_parent: arg.newdir,
827                new_name: newname.to_owned(),
828                flags: arg.flags,
829            };
830            fs.rename(req, param, reply).await
831        }
832        // #[cfg(feature = "abi-7-24")]
833        Operation::LSeek { arg } => {
834            error!("LSeek not implemented, arg={:?}", arg);
835            not_implement_helper(req, file).await
836        }
837        // #[cfg(feature = "abi-7-28")]
838        Operation::CopyFileRange { arg } => {
839            error!("ReadDirPlusCopyFileRange not implemented, arg={:?}", arg);
840            not_implement_helper(req, file).await
841        }
842        #[cfg(feature = "abi-7-11")]
843        Operation::CuseInit { arg } => {
844            panic!("unsupported CuseInit arg={arg:?}");
845        }
846    };
847
848    result
849}
850
851/// Replies ENOSYS
852async fn not_implement_helper(req: &Request<'_>, file: &mut File) -> nix::Result<usize> {
853    let reply = ReplyEmpty::new(req.unique(), file);
854    reply.error_code(Errno::ENOSYS).await
855}