async_fusex/
session.rs

1//! The implementation of FUSE session
2
3use std::fs::File;
4use std::io::Read;
5use std::os::fd::FromRawFd;
6use std::os::unix::io::RawFd;
7use std::path::{Path, PathBuf};
8use std::sync::Arc;
9use std::thread;
10use std::time::{Duration, UNIX_EPOCH};
11
12use aligned_utils::bytes::AlignedBytes;
13use anyhow::{anyhow, Context};
14use clippy_utilities::Cast;
15use crossbeam_channel::{Receiver, Sender};
16use crossbeam_utils::atomic::AtomicCell;
17use nix::errno::Errno;
18use nix::sys::stat::SFlag;
19use nix::unistd;
20use tokio::runtime::Handle;
21use tokio_util::sync::CancellationToken;
22use tracing::{debug, error, info, instrument};
23
24use super::context::ProtoVersion;
25use super::file_system::{FileSystem};
26use super::fuse_reply::{
27    ReplyAttr, ReplyBMap, ReplyCreate, ReplyData, ReplyDirectory, ReplyEmpty, ReplyEntry,
28    ReplyInit, ReplyLock, ReplyOpen, ReplyStatFs, ReplyWrite, ReplyXAttr,
29};
30use super::fuse_request::{Operation, Request};
31use super::mount;
32#[cfg(feature = "abi-7-23")]
33use super::protocol::FATTR_CTIME;
34#[cfg(feature = "abi-7-9")]
35use super::protocol::FATTR_LOCKOWNER; // {FATTR_ATIME_NOW, FATTR_MTIME_NOW};
36use super::protocol::{
37    FuseInitIn, FuseInitOut, FuseSetXAttrIn, FATTR_ATIME, FATTR_FH, FATTR_GID, FATTR_MODE,
38    FATTR_MTIME, FATTR_SIZE, FATTR_UID, FUSE_ASYNC_READ, FUSE_KERNEL_MINOR_VERSION,
39    FUSE_KERNEL_VERSION, FUSE_RELEASE_FLUSH,
40};
41use crate::fs_util::{CreateParam, FileLockParam, RenameParam, SetAttrParam};
42
43/// We generally support async reads
44#[cfg(target_os = "linux")]
45const INIT_FLAGS: u32 = FUSE_ASYNC_READ;
46// TODO: Add FUSE_EXPORT_SUPPORT and FUSE_BIG_WRITES (requires ABI 7.10)
47
48/// The max size of write requests from the kernel. The absolute minimum is 4k,
49/// FUSE recommends at least 128k, max 16M. The FUSE default is  128k on Linux.
50#[cfg(target_os = "linux")]
51const MAX_WRITE_SIZE: u32 = 128 * 1024;
52
53/// Size of the buffer for reading a request from the kernel. Since the kernel
54/// may send up to `MAX_WRITE_SIZE` bytes in a write request, we use that value
55/// plus some extra space.
56const BUFFER_SIZE: u32 = MAX_WRITE_SIZE + 512;
57
58/// We use `PAGE_SIZE` (4 KiB) as the alignment of the buffer.
59const PAGE_SIZE: usize = 4096;
60/// Max background pending requests under processing, at least to be 4,
61/// otherwise deadlock
62const MAX_BACKGROUND: u16 = 10; // TODO: set to larger value when release
63/// The max number of FUSE device reader threads.
64const MAX_FUSE_READER: usize = 2; // TODO: make it custom
65
66/// The implementation of fuse fd clone.
67/// This module is just for avoiding the `missing_docs` of `ioctl_read` macro.
68#[allow(missing_docs)] // Raised by `ioctl_read!`
69mod _fuse_fd_clone {
70    use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd, RawFd};
71
72    use clippy_utilities::Cast;
73    use nix::fcntl::{self, FcntlArg, FdFlag, OFlag};
74    use nix::ioctl_read;
75    use nix::sys::stat::Mode;
76    ioctl_read!(fuse_fd_clone_impl, 229, 0, u32);
77
78    /// Clones a FUSE session fd into a FUSE worker fd.
79    ///
80    /// # Safety
81    /// Behavior is undefined if any of the following conditions are violated:
82    ///
83    /// - `session_fd` must be a valid file descriptor to an open FUSE device.
84    #[allow(clippy::unnecessary_safety_comment)]
85    pub unsafe fn fuse_fd_clone(session_fd: RawFd) -> nix::Result<RawFd> {
86        let devname = "/dev/fuse";
87        let cloned_fd = fcntl::open(devname, OFlag::O_RDWR | OFlag::O_CLOEXEC, Mode::empty())?;
88        // use `OwnedFd` here is just to release the fd when error occurs
89        // SAFETY: the `cloned_fd` is just opened
90        let cloned_fd = OwnedFd::from_raw_fd(cloned_fd);
91
92        fcntl::fcntl(cloned_fd.as_raw_fd(), FcntlArg::F_SETFD(FdFlag::FD_CLOEXEC))?;
93
94        let mut result_fd: u32 = session_fd.cast();
95        // SAFETY: `cloned_fd` is ensured to be valid, and `&mut result_fd` is a valid
96        // pointer to a value on stack
97        fuse_fd_clone_impl(cloned_fd.as_raw_fd(), &mut result_fd)?;
98        Ok(cloned_fd.into_raw_fd()) // use `into_raw_fd` to transfer the
99                                    // ownership of the fd
100    }
101}
102
103use _fuse_fd_clone::fuse_fd_clone;
104use crate::de::DeserializeError;
105
106/// A loop to read requests from FUSE device continuously
107#[allow(clippy::needless_pass_by_value)]
108fn fuse_device_reader(
109    buffer_tx: Sender<(File, AlignedBytes)>,
110    buffer_rx: Receiver<(File, AlignedBytes)>,
111    runtime_handle: Handle,
112    proto_version: ProtoVersion,
113    fs: Arc<dyn FileSystem + Send + Sync>,
114) {
115    loop {
116        let Ok((mut file, mut buffer)) = buffer_rx.recv() else {
117            error!("The channel of buffer is exhausted and closed.");
118            return;
119        };
120
121        let size = match file.read(&mut buffer) {
122            Ok(size) => size,
123            Err(e) => {
124                let errno = e.raw_os_error().map(Errno::from_raw);
125                match errno {
126                    None => {
127                        panic!("Failed to get the errno on reading failure. The error is {e}");
128                    }
129                    // Operation interrupted. Accordingly to FUSE, this is safe to retry
130                    Some(Errno::ENOENT) => {
131                        info!("operation interrupted, retry.");
132                        buffer_tx.send((file, buffer)).unwrap_or_else(|_| {
133                            unreachable!("The buffer channel is not to be closed.")
134                        });
135                        continue;
136                    }
137                    // Interrupted system call, retry
138                    Some(Errno::EINTR) => {
139                        info!("interrupted system call, retry");
140                        buffer_tx.send((file, buffer)).unwrap_or_else(|_| {
141                            unreachable!("The buffer channel is not to be closed.")
142                        });
143                        continue;
144                    }
145                    // Explicitly try again
146                    Some(Errno::EAGAIN) => {
147                        info!("Explicitly retry");
148                        buffer_tx.send((file, buffer)).unwrap_or_else(|_| {
149                            unreachable!("The buffer channel is not to be closed.")
150                        });
151                        continue;
152                    }
153                    // Filesystem was unmounted, quit the loop
154                    Some(Errno::ENODEV) => {
155                        info!("filesystem destroyed, quit the run loop");
156                        return;
157                    }
158                    // Unhandled error
159                    Some(errno) => {
160                        panic!(
161                            "non-recoverable io error when read FUSE device, \
162                            the error is: {errno}",
163                        );
164                    }
165                }
166            }
167        };
168
169        runtime_handle.block_on(async{
170            process_fuse_request(
171                buffer,
172                size,
173                file,
174                Arc::clone(&fs),
175                buffer_tx.clone(),
176                proto_version,
177            ).await
178        });
179        // if spawn_result.is_err() {
180        //     info!("Try to spawn task of `FuseRequest` after shutdow.");
181        //     return;
182        // }
183    }
184}
185
186/// Process one FUSE request
187async fn process_fuse_request(
188    byte_buffer: AlignedBytes,
189    read_size: usize,
190    mut file: File,
191    fs: Arc<dyn FileSystem + Send + Sync + 'static>,
192    sender: Sender<(File, AlignedBytes)>,
193    proto_version: ProtoVersion,
194) {
195    let bytes = byte_buffer
196        .get(..read_size)
197        .unwrap_or_else(|| panic!("failed to read {read_size} bytes from the buffer",));
198    let fuse_req = match Request::new(bytes, proto_version) {
199        // Dispatch request
200        Ok(r) => r,
201        // Quit on illegal request
202        Err(e) => {
203            if let &DeserializeError::UnknownOpCode { code, unique } = &e {
204                error!("Unknown operation code found: {code}, with context: {e}");
205                let unique = unique.unwrap_or_else(|| {
206                    unreachable!("A `unique` must be filled in by deserializer.")
207                });
208                ReplyEmpty::new(unique, &mut file)
209                    .error_code(Errno::ENOSYS)
210                    .await
211                    .unwrap_or_else(|reply_err| {
212                        panic!("Failed to reply an error code: {reply_err}.")
213                    });
214                sender.send((file, byte_buffer)).unwrap_or_else(|_| {
215                    error!("The buffer pool is closed.");
216                });
217                return;
218            }
219
220            // TODO: graceful handle request build failure
221            panic!("failed to build FUSE request, the error is: {e}");
222        }
223    };
224    debug!("received FUSE req={}", fuse_req);
225    let res = dispatch(&fuse_req, &mut file, fs).await;
226    if let Err(e) = res {
227        panic!(
228            "failed to process req={:?}, the error is: {}",
229            fuse_req,
230            crate::util::format_nix_error(e), // TODO: refactor format_nix_error()
231        );
232    }
233    let res = sender.send((file, byte_buffer));
234    if let Err(e) = res {
235        panic!("failed to put the buffer back to buffer pool, the error is: {e}",);
236    }
237}
238
239/// FUSE session
240#[allow(missing_debug_implementations)]
241pub struct Session<F: FileSystem + Send + Sync + 'static> {
242    /// FUSE device fd
243    fuse_fd: Arc<FuseFd>,
244    /// Kernel FUSE protocol version
245    proto_version: AtomicCell<ProtoVersion>,
246    /// Mount path (relative)
247    mount_path: PathBuf,
248    /// The underlying FUSE file system
249    filesystem: Arc<F>,
250    // fuse_request_spawn_handle: GcHandle,
251}
252pub async fn new_session<F>( mount_path: &Path, fs: F)->anyhow::Result<Session<F>> where F: FileSystem + Send + Sync + 'static {
253    let fuse_fd = mount::mount(mount_path).await.context("Failed to mount FUSE")?;
254    let fsarc = Arc::new(fs);
255
256    Ok(Session{
257        fuse_fd: Arc::new(FuseFd(fuse_fd)),
258        proto_version: AtomicCell::new(ProtoVersion::UNSPECIFIED),
259        mount_path: mount_path.to_owned(),
260        filesystem: fsarc,
261    })
262}
263
264/// FUSE device fd
265#[derive(Debug)]
266struct FuseFd(RawFd);
267
268impl Drop for FuseFd {
269    fn drop(&mut self) {
270        println!("Dropping FUSE fd {}", self.0);
271        unistd::close(self.0).ok();
272    }
273}
274
275impl<F: FileSystem + Send + Sync + 'static> Drop for Session<F> {
276    fn drop(&mut self) {
277        println!("Dropping FUSE session");
278        futures::executor::block_on(async {
279            let mount_path = &self.mount_path;
280            let res = mount::umount(mount_path).await;
281            println!("umount result: {:?}", res);
282            match res {
283                Ok(..) => info!("Session::drop() successfully umount {:?}", mount_path),
284                Err(e) => error!(
285                    "Session::drop() failed to umount {:?}, the error is: {}",
286                    mount_path, e,
287                ),
288            };
289        });
290    }
291}
292
293impl<F: FileSystem + Send + Sync + 'static> Session<F> {
294    /// Get FUSE device fd
295    #[inline]
296    pub fn dev_fd(&self) -> RawFd {
297        self.fuse_fd.0
298    }
299
300    /// Run the FUSE session
301    #[allow(clippy::arithmetic_side_effects, clippy::pattern_type_mismatch)] // The `select!` macro will generate code that goes against these rules.
302    pub async fn run(self, token: CancellationToken) -> anyhow::Result<()> {
303        // For recycling the buffers used by process_fuse_request.
304        let (pool_sender, pool_receiver) = self
305            .setup_buffer_pool()
306            .await
307            .context("failed to setup buffer pool")?;
308
309        for _ in 0..MAX_FUSE_READER {
310            let pool_tx = pool_sender.clone();
311            let pool_rx = pool_receiver.clone();
312            // let gc_handle = self.fuse_request_spawn_handle.clone();
313            let handle = Handle::current();
314            let fs = Arc::clone(&self.filesystem);
315            let protocol_version = self.proto_version.load();
316            // The `JoinHandle` is ignored
317            thread::spawn(move || {
318                fuse_device_reader(pool_tx, pool_rx, handle, protocol_version, fs);
319            });
320        }
321
322        drop(pool_receiver);
323
324        token.cancelled().await;
325        info!("Async FUSE session exits.");
326
327        Ok(())
328    }
329
330    /// Setup buffer pool
331    async fn setup_buffer_pool(
332        &self,
333    ) -> anyhow::Result<(Sender<(File, AlignedBytes)>, Receiver<(File, AlignedBytes)>)> {
334        let (pool_sender, pool_receiver) =
335            crossbeam_channel::bounded::<(File, AlignedBytes)>(MAX_BACKGROUND.into());
336
337        for _ in 0..MAX_BACKGROUND {
338            let buf = AlignedBytes::new_zeroed(BUFFER_SIZE.cast(), PAGE_SIZE);
339            let session_fd = self.dev_fd();
340
341            let file = unsafe {
342                // SAFETY: We assume that the fuse session fd is valid.
343                let worker_fd = fuse_fd_clone(session_fd)?;
344                // SAFETY: The worker fd is just cloned.
345                File::from_raw_fd(worker_fd)
346            };
347
348            let res = pool_sender.send((file, buf));
349            if let Err(e) = res {
350                panic!(
351                    "failed to insert buffer to buffer pool when initializing, the error is: {e}",
352                );
353            }
354        }
355
356        let (mut file, mut byte_buf) = pool_receiver.recv()?;
357        let (read_result, mut file, byte_buf) = tokio::task::spawn_blocking(move || {
358            let res = file.read(&mut byte_buf);
359            (res, file, byte_buf)
360        })
361        .await?;
362        if let Ok(read_size) = read_result {
363            debug!("read successfully {} byte data from FUSE device", read_size);
364            let bytes = byte_buf.get(..read_size).unwrap_or_else(|| {
365                panic!(
366                    "read_size is greater than buffer size: read_size = {}, buffer size = {}",
367                    read_size,
368                    byte_buf.len()
369                )
370            });
371            if let Ok(req) = Request::new(bytes, self.proto_version.load()) {
372                if let Operation::Init { arg } = *req.operation() {
373                    let filesystem = Arc::clone(&self.filesystem);
374                    self.init(arg, &req, &*filesystem, &mut file).await?;
375                }
376            }
377        }
378        pool_sender
379            .send((file, byte_buf))
380            .context("failed to put buffer back to buffer pool after FUSE init")?;
381
382        Ok((pool_sender, pool_receiver))
383    }
384
385    /// Initialize FUSE session
386    #[allow(single_use_lifetimes)] // false positive
387    async fn init<'a>(
388        &self,
389        arg: &'_ FuseInitIn,
390        req: &'_ Request<'a>,
391        fs: &'_ (dyn FileSystem + Send + Sync + 'static),
392        file: &mut File,
393    ) -> anyhow::Result<()> {
394        debug!("Init args={:?}", arg);
395        // TODO: rewrite init based on do_init() in fuse_lowlevel.c
396        // https://github.com/libfuse/libfuse/blob/master/lib/fuse_lowlevel.c#L1892
397        let reply = ReplyInit::new(req.unique(), file);
398        // We don't support ABI versions before 7.8
399        if arg.major < 7 || (arg.major == 7 && arg.minor < 8) {
400            error!("Unsupported FUSE ABI version={}.{}", arg.major, arg.minor);
401            reply.error_code(Errno::EPROTO).await?;
402            return Err(anyhow!("FUSE ABI version too low"));
403        }
404        // Call filesystem init method and give it a chance to return an error
405        let filesystem = fs;
406        let init_res = filesystem.init(req).await;
407        if let Err(err) = init_res {
408            reply.error_code(Errno::ENOSYS).await?;
409            return Err(anyhow!("user defined init failed, the error is: {}", err,));
410        }
411        let flags = arg.flags & INIT_FLAGS; // TODO: handle init flags properly
412        #[cfg(not(feature = "abi-7-13"))]
413        let unused = 0_u32;
414        #[cfg(feature = "abi-7-13")]
415        let congestion_threshold = 10_u16; // TODO: set congestion threshold
416        #[cfg(feature = "abi-7-23")]
417        let time_gran = 1_u32; // TODO: set time_gran
418        #[cfg(all(feature = "abi-7-23", not(feature = "abi-7-28")))]
419        let unused = [0_u32; 9];
420        #[cfg(feature = "abi-7-28")]
421        let max_pages = 0_u16; // TODO: max_pages = (max_write - 1) / getpagesize() + 1;
422        #[cfg(feature = "abi-7-28")]
423        let padding = 0_u16;
424        #[cfg(feature = "abi-7-28")]
425        let unused = [0_u32; 8];
426        // Reply with our desired version and settings. If the kernel supports a
427        // larger major version, it'll re-send a matching init message. If it
428        // supports only lower major versions, we replied with an error above.
429        reply
430            .init(FuseInitOut {
431                major: FUSE_KERNEL_VERSION,
432                minor: FUSE_KERNEL_MINOR_VERSION, /* Do not change minor version, otherwise
433                                                   * unknown panic */
434                max_readahead: arg.max_readahead, // accept FUSE kernel module max_readahead
435                flags,                            /* TODO: use features given in INIT_FLAGS and
436                                                   * reported as capable */
437                #[cfg(not(feature = "abi-7-13"))]
438                unused,
439                #[cfg(feature = "abi-7-13")]
440                max_background: MAX_BACKGROUND,
441                #[cfg(feature = "abi-7-13")]
442                congestion_threshold,
443                max_write: MAX_WRITE_SIZE,
444                #[cfg(feature = "abi-7-23")]
445                time_gran,
446                #[cfg(all(feature = "abi-7-23", not(feature = "abi-7-28")))]
447                unused,
448                #[cfg(feature = "abi-7-28")]
449                max_pages,
450                #[cfg(feature = "abi-7-28")]
451                padding,
452                #[cfg(feature = "abi-7-28")]
453                unused,
454            })
455            .await?;
456        debug!(
457            "INIT response: ABI version={}.{}, flags={:#x}, max readahead={}, max write={}",
458            FUSE_KERNEL_VERSION,
459            FUSE_KERNEL_MINOR_VERSION,
460            flags,
461            arg.max_readahead,
462            MAX_WRITE_SIZE,
463        );
464
465        // Store the kernel FUSE major and minor version
466        self.proto_version.store(ProtoVersion {
467            major: arg.major,
468            minor: arg.minor,
469        });
470
471        Ok(())
472    }
473}
474
475/// Dispatch request to the filesystem
476/// This calls the appropriate filesystem operation method for the
477/// request and sends back the returned reply to the kernel
478#[allow(clippy::too_many_lines)]
479#[instrument(name="request",skip(req, file, fs), fields(fuse_id =req.unique(),ino=req.nodeid(), op=%req.operation(), len=req.len()),ret)]
480async fn dispatch<'a>(
481    req: &'a Request<'a>,
482    file: &mut File,
483    fs: Arc<dyn FileSystem + Send + Sync + 'static>,
484) -> nix::Result<usize> {
485    let result = match *req.operation() {
486        // Filesystem initialization
487        Operation::Init { .. } => panic!("FUSE should have already initialized"),
488
489        // Filesystem destroyed
490        Operation::Destroy => {
491            fs.destroy(req).await;
492            let reply = ReplyEmpty::new(req.unique(), file);
493            reply.ok().await
494        }
495
496        Operation::Interrupt { arg } => {
497            fs.interrupt(req, arg.unique).await; // No reply
498            Ok(0)
499        }
500
501        Operation::Lookup { name } => {
502            let reply = ReplyEntry::new(req.unique(), file);
503            fs.lookup(req, req.nodeid(), name, reply).await
504        }
505        Operation::Forget { arg } => {
506            fs.forget(req, arg.nlookup).await; // No reply
507            Ok(0)
508        }
509        Operation::GetAttr => {
510            let reply = ReplyAttr::new(req.unique(), file);
511            fs.getattr(req, reply).await
512        }
513        Operation::SetAttr { arg } => {
514            #[cfg(feature = "abi-7-9")]
515            use std::time::SystemTime;
516
517            #[cfg(feature = "abi-7-9")]
518            use super::protocol::{FATTR_ATIME_NOW, FATTR_MTIME_NOW};
519
520            let mode = match arg.valid & FATTR_MODE {
521                0 => None,
522                _ => Some(arg.mode),
523            };
524            let u_id = match arg.valid & FATTR_UID {
525                0 => None,
526                _ => Some(arg.uid),
527            };
528            let g_id = match arg.valid & FATTR_GID {
529                0 => None,
530                _ => Some(arg.gid),
531            };
532            let size = match arg.valid & FATTR_SIZE {
533                0 => None,
534                _ => Some(arg.size),
535            };
536            let a_time = match arg.valid & FATTR_ATIME {
537                0 => None,
538                _ => Some(UNIX_EPOCH + Duration::new(arg.atime, arg.atimensec)),
539            };
540            let m_time = match arg.valid & FATTR_MTIME {
541                0 => None,
542                _ => Some(UNIX_EPOCH + Duration::new(arg.mtime, arg.mtimensec)),
543            };
544            let fh = match arg.valid & FATTR_FH {
545                0 => None,
546                _ => Some(arg.fh),
547            };
548
549            #[cfg(feature = "abi-7-9")]
550            let a_time = match arg.valid & FATTR_ATIME_NOW {
551                0 => a_time,
552                _ => Some(SystemTime::now()),
553            };
554            #[cfg(feature = "abi-7-9")]
555            let m_time = match arg.valid & FATTR_MTIME_NOW {
556                0 => m_time,
557                _ => Some(SystemTime::now()),
558            };
559
560            #[cfg(feature = "abi-7-9")]
561            let lock_owner = match arg.valid & FATTR_LOCKOWNER {
562                0 => None,
563                _ => Some(arg.lock_owner),
564            };
565            #[cfg(feature = "abi-7-23")]
566            let c_time = match arg.valid & FATTR_CTIME {
567                0 => None,
568                _ => Some(UNIX_EPOCH + Duration::new(arg.ctime, arg.ctimensec)),
569            };
570
571            let reply = ReplyAttr::new(req.unique(), file);
572            let param = SetAttrParam {
573                valid: arg.valid,
574                fh,
575                mode,
576                u_id,
577                g_id,
578                size,
579                #[cfg(feature = "abi-7-9")]
580                lock_owner,
581                a_time,
582                m_time,
583                #[cfg(feature = "abi-7-23")]
584                c_time,
585            };
586            fs.setattr(req, param, reply).await
587        }
588        Operation::ReadLink => {
589            let reply = ReplyData::new(req.unique(), file);
590            fs.readlink(req, reply).await
591        }
592        Operation::MkNod { arg, name } => {
593            let param = CreateParam {
594                parent: req.nodeid(),
595                name: name.to_owned(),
596                mode: arg.mode,
597                rdev: arg.rdev,
598                uid: req.uid(),
599                gid: req.gid(),
600                node_type: SFlag::S_IFREG,
601                link: None,
602            };
603            let reply = ReplyEntry::new(req.unique(), file);
604            fs.mknod(req, param, reply).await
605        }
606        Operation::MkDir { arg, name } => {
607            let reply = ReplyEntry::new(req.unique(), file);
608            fs.mkdir(req, req.nodeid(), name, arg.mode, reply).await
609        }
610        Operation::Unlink { name } => {
611            let reply = ReplyEmpty::new(req.unique(), file);
612            fs.unlink(req, req.nodeid(), name, reply).await
613        }
614        Operation::RmDir { name } => {
615            let reply = ReplyEmpty::new(req.unique(), file);
616            fs.rmdir(req, req.nodeid(), name, reply).await
617        }
618        Operation::SymLink { name, link } => {
619            let reply = ReplyEntry::new(req.unique(), file);
620            fs.symlink(req, req.nodeid(), name, Path::new(link), reply)
621                .await
622        }
623        Operation::Rename {
624            arg,
625            oldname,
626            newname,
627        } => {
628            let reply = ReplyEmpty::new(req.unique(), file);
629            let param = RenameParam {
630                old_parent: req.nodeid(),
631                old_name: oldname.to_owned(),
632                new_parent: arg.newdir,
633                new_name: newname.to_owned(),
634                flags: 0,
635            };
636            fs.rename(req, param, reply).await
637        }
638        Operation::Link { arg, name } => {
639            let reply = ReplyEntry::new(req.unique(), file);
640            fs.link(req, arg.oldnodeid, name, reply).await
641        }
642        Operation::Open { arg } => {
643            let reply = ReplyOpen::new(req.unique(), file);
644            fs.open(req, arg.flags, reply).await
645        }
646        Operation::Read { arg } => {
647            let reply = ReplyData::new(req.unique(), file);
648            fs.read(req, arg.fh, arg.offset.cast(), arg.size, reply)
649                .await
650        }
651        Operation::Write { arg, data } => {
652            info!("operation:write: {:?}", arg);
653            assert_eq!(data.len(), arg.size.cast::<usize>());
654            let reply = ReplyWrite::new(req.unique(), file);
655            fs.write(
656                req,
657                arg.fh,
658                arg.offset.cast(),
659                data.to_vec(), // TODO: consider zero copy
660                arg.write_flags,
661                reply,
662            )
663            .await
664        }
665        Operation::Flush { arg } => {
666            let reply = ReplyEmpty::new(req.unique(), file);
667            fs.flush(req, arg.fh, arg.lock_owner, reply).await
668        }
669        Operation::Release { arg } => {
670            let flush = !matches!(arg.release_flags & FUSE_RELEASE_FLUSH, 0);
671            let reply = ReplyEmpty::new(req.unique(), file);
672            fs.release(req, arg.fh, arg.flags, arg.lock_owner, flush, reply)
673                .await
674        }
675        Operation::FSync { arg } => {
676            let datasync = !matches!(arg.fsync_flags & 1, 0);
677            let reply = ReplyEmpty::new(req.unique(), file);
678            fs.fsync(req, arg.fh, datasync, reply).await
679        }
680        Operation::OpenDir { arg } => {
681            let reply = ReplyOpen::new(req.unique(), file);
682            fs.opendir(req, arg.flags, reply).await
683        }
684        Operation::ReadDir { arg } => {
685            let reply = ReplyDirectory::new(req.unique(), file, arg.size.cast());
686            fs.readdir(req, arg.fh, arg.offset.cast(), reply).await
687        }
688        Operation::ReleaseDir { arg } => {
689            let reply = ReplyEmpty::new(req.unique(), file);
690            fs.releasedir(req, arg.fh, arg.flags, reply).await
691        }
692        Operation::FSyncDir { arg } => {
693            let datasync = !matches!(arg.fsync_flags & 1, 0);
694            let reply = ReplyEmpty::new(req.unique(), file);
695            fs.fsyncdir(req, arg.fh, datasync, reply).await
696        }
697        Operation::StatFs => {
698            let reply = ReplyStatFs::new(req.unique(), file);
699            fs.statfs(req, reply).await
700        }
701        Operation::SetXAttr { arg, name, value } => {
702            /// Set the position of an extended attribute
703            /// zero for Linux
704            #[cfg(target_os = "linux")]
705            #[inline]
706            const fn get_position(_arg: &FuseSetXAttrIn) -> u32 {
707                0
708            }
709            assert!(value.len() == arg.size.cast::<usize>());
710            let reply = ReplyEmpty::new(req.unique(), file);
711            fs.setxattr(req, name, value, arg.flags, get_position(arg), reply)
712                .await
713        }
714        Operation::GetXAttr { arg, name } => {
715            let reply = ReplyXAttr::new(req.unique(), file);
716            fs.getxattr(req, name, arg.size, reply).await
717        }
718        Operation::ListXAttr { arg } => {
719            let reply = ReplyXAttr::new(req.unique(), file);
720            fs.listxattr(req, arg.size, reply).await
721        }
722        Operation::RemoveXAttr { name } => {
723            let reply = ReplyEmpty::new(req.unique(), file);
724            fs.removexattr(req, name, reply).await
725        }
726        Operation::Access { arg } => {
727            let reply = ReplyEmpty::new(req.unique(), file);
728            fs.access(req, arg.mask, reply).await
729        }
730        Operation::Create { arg, name } => {
731            let reply = ReplyCreate::new(req.unique(), file);
732            fs.create(req, req.nodeid(), name, arg.mode, arg.flags, reply)
733                .await
734        }
735        Operation::GetLk { arg } => {
736            let reply = ReplyLock::new(req.unique(), file);
737            let lock_param = FileLockParam {
738                fh: arg.fh,
739                lock_owner: arg.owner,
740                start: arg.lk.start,
741                end: arg.lk.end,
742                typ: arg.lk.typ,
743                pid: arg.lk.pid,
744            };
745            fs.getlk(req, lock_param, reply).await
746        }
747        Operation::SetLk { arg } => {
748            let reply = ReplyEmpty::new(req.unique(), file);
749            let lock_param = FileLockParam {
750                fh: arg.fh,
751                lock_owner: arg.owner,
752                start: arg.lk.start,
753                end: arg.lk.end,
754                typ: arg.lk.typ,
755                pid: arg.lk.pid,
756            };
757            fs.setlk(req, lock_param, false, reply).await
758        }
759        Operation::SetLkW { arg } => {
760            let reply = ReplyEmpty::new(req.unique(), file);
761            let lock_param = FileLockParam {
762                fh: arg.fh,
763                lock_owner: arg.owner,
764                start: arg.lk.start,
765                end: arg.lk.end,
766                typ: arg.lk.typ,
767                pid: arg.lk.pid,
768            };
769            fs.setlk(
770                req, lock_param, true, // sleep
771                reply,
772            )
773            .await
774        }
775        Operation::BMap { arg } => {
776            let reply = ReplyBMap::new(req.unique(), file);
777            fs.bmap(req, arg.blocksize, arg.block, reply).await
778        }
779
780        #[cfg(feature = "abi-7-11")]
781        Operation::IoCtl { arg, data } => {
782            error!("IoCtl not implemented, arg={:?}, data={:?}", arg, data);
783            not_implement_helper(req, file).await
784        }
785        #[cfg(feature = "abi-7-11")]
786        Operation::Poll { arg } => {
787            error!("Poll not implemented, arg={:?}", arg);
788            not_implement_helper(req, file).await
789        }
790        #[cfg(feature = "abi-7-15")]
791        Operation::NotifyReply { data } => {
792            error!("NotifyReply not implemented, data={:?}", data);
793            not_implement_helper(req, file).await
794        }
795        #[cfg(feature = "abi-7-16")]
796        Operation::BatchForget { arg, nodes } => {
797            error!(
798                "BatchForget not implemented, arg={:?}, nodes={:?}",
799                arg, nodes
800            );
801            not_implement_helper(req, file).await
802        }
803        #[cfg(feature = "abi-7-19")]
804        Operation::FAllocate { arg } => {
805            error!("FAllocate not implemented, arg={:?}", arg);
806            not_implement_helper(req, file).await
807        }
808        #[cfg(feature = "abi-7-21")]
809        Operation::ReadDirPlus { arg } => {
810            error!("ReadDirPlus not implemented, arg={:?}", arg);
811            not_implement_helper(req, file).await
812        }
813        #[cfg(feature = "abi-7-23")]
814        Operation::Rename2 {
815            arg,
816            oldname,
817            newname,
818        } => {
819            let reply = ReplyEmpty::new(req.unique(), file);
820            let param = RenameParam {
821                old_parent: req.nodeid(),
822                old_name: oldname.to_owned(),
823                new_parent: arg.newdir,
824                new_name: newname.to_owned(),
825                flags: arg.flags,
826            };
827            fs.rename(req, param, reply).await
828        }
829        // #[cfg(feature = "abi-7-24")]
830        Operation::LSeek { arg } => {
831            error!("LSeek not implemented, arg={:?}", arg);
832            not_implement_helper(req, file).await
833        }
834        // #[cfg(feature = "abi-7-28")]
835        Operation::CopyFileRange { arg } => {
836            error!("ReadDirPlusCopyFileRange not implemented, arg={:?}", arg);
837            not_implement_helper(req, file).await
838        }
839        #[cfg(feature = "abi-7-11")]
840        Operation::CuseInit { arg } => {
841            panic!("unsupported CuseInit arg={arg:?}");
842        }
843    };
844
845    result
846}
847
848/// Replies ENOSYS
849async fn not_implement_helper(req: &Request<'_>, file: &mut File) -> nix::Result<usize> {
850    let reply = ReplyEmpty::new(req.unique(), file);
851    reply.error_code(Errno::ENOSYS).await
852}