1use std::fs::File;
4use std::io::Read;
5use std::os::fd::FromRawFd;
6use std::os::unix::io::RawFd;
7use std::path::{Path, PathBuf};
8use std::sync::Arc;
9use std::thread;
10use std::time::{Duration, UNIX_EPOCH};
11
12use aligned_utils::bytes::AlignedBytes;
13use anyhow::{anyhow, Context};
14use clippy_utilities::Cast;
15use crossbeam_channel::{Receiver, Sender};
16use crossbeam_utils::atomic::AtomicCell;
17use nix::errno::Errno;
18use nix::sys::stat::SFlag;
19use nix::unistd;
20use tokio::runtime::Handle;
21use tokio_util::sync::CancellationToken;
22use tracing::{debug, error, info, instrument};
23
24use super::context::ProtoVersion;
25use super::file_system::{FileSystem};
26use super::fuse_reply::{
27 ReplyAttr, ReplyBMap, ReplyCreate, ReplyData, ReplyDirectory, ReplyEmpty, ReplyEntry,
28 ReplyInit, ReplyLock, ReplyOpen, ReplyStatFs, ReplyWrite, ReplyXAttr,
29};
30use super::fuse_request::{Operation, Request};
31use super::mount;
32#[cfg(feature = "abi-7-23")]
33use super::protocol::FATTR_CTIME;
34#[cfg(feature = "abi-7-9")]
35use super::protocol::FATTR_LOCKOWNER; use super::protocol::{
37 FuseInitIn, FuseInitOut, FuseSetXAttrIn, FATTR_ATIME, FATTR_FH, FATTR_GID, FATTR_MODE,
38 FATTR_MTIME, FATTR_SIZE, FATTR_UID, FUSE_ASYNC_READ, FUSE_KERNEL_MINOR_VERSION,
39 FUSE_KERNEL_VERSION, FUSE_RELEASE_FLUSH,
40};
41use crate::fs_util::{CreateParam, FileLockParam, RenameParam, SetAttrParam};
42
43#[cfg(target_os = "linux")]
45const INIT_FLAGS: u32 = FUSE_ASYNC_READ;
46#[cfg(target_os = "linux")]
51const MAX_WRITE_SIZE: u32 = 128 * 1024;
52
53const BUFFER_SIZE: u32 = MAX_WRITE_SIZE + 512;
57
58const PAGE_SIZE: usize = 4096;
60const MAX_BACKGROUND: u16 = 10; const MAX_FUSE_READER: usize = 2; #[allow(missing_docs)] mod _fuse_fd_clone {
70 use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd, RawFd};
71
72 use clippy_utilities::Cast;
73 use nix::fcntl::{self, FcntlArg, FdFlag, OFlag};
74 use nix::ioctl_read;
75 use nix::sys::stat::Mode;
76 ioctl_read!(fuse_fd_clone_impl, 229, 0, u32);
77
78 #[allow(clippy::unnecessary_safety_comment)]
85 pub unsafe fn fuse_fd_clone(session_fd: RawFd) -> nix::Result<RawFd> {
86 let devname = "/dev/fuse";
87 let cloned_fd = fcntl::open(devname, OFlag::O_RDWR | OFlag::O_CLOEXEC, Mode::empty())?;
88 let cloned_fd = OwnedFd::from_raw_fd(cloned_fd);
91
92 fcntl::fcntl(cloned_fd.as_raw_fd(), FcntlArg::F_SETFD(FdFlag::FD_CLOEXEC))?;
93
94 let mut result_fd: u32 = session_fd.cast();
95 fuse_fd_clone_impl(cloned_fd.as_raw_fd(), &mut result_fd)?;
98 Ok(cloned_fd.into_raw_fd()) }
101}
102
103use _fuse_fd_clone::fuse_fd_clone;
104use crate::de::DeserializeError;
105
106#[allow(clippy::needless_pass_by_value)]
108fn fuse_device_reader(
109 buffer_tx: Sender<(File, AlignedBytes)>,
110 buffer_rx: Receiver<(File, AlignedBytes)>,
111 runtime_handle: Handle,
112 proto_version: ProtoVersion,
113 fs: Arc<dyn FileSystem + Send + Sync>,
114) {
115 loop {
116 let Ok((mut file, mut buffer)) = buffer_rx.recv() else {
117 error!("The channel of buffer is exhausted and closed.");
118 return;
119 };
120
121 let size = match file.read(&mut buffer) {
122 Ok(size) => size,
123 Err(e) => {
124 let errno = e.raw_os_error().map(Errno::from_raw);
125 match errno {
126 None => {
127 panic!("Failed to get the errno on reading failure. The error is {e}");
128 }
129 Some(Errno::ENOENT) => {
131 info!("operation interrupted, retry.");
132 buffer_tx.send((file, buffer)).unwrap_or_else(|_| {
133 unreachable!("The buffer channel is not to be closed.")
134 });
135 continue;
136 }
137 Some(Errno::EINTR) => {
139 info!("interrupted system call, retry");
140 buffer_tx.send((file, buffer)).unwrap_or_else(|_| {
141 unreachable!("The buffer channel is not to be closed.")
142 });
143 continue;
144 }
145 Some(Errno::EAGAIN) => {
147 info!("Explicitly retry");
148 buffer_tx.send((file, buffer)).unwrap_or_else(|_| {
149 unreachable!("The buffer channel is not to be closed.")
150 });
151 continue;
152 }
153 Some(Errno::ENODEV) => {
155 info!("filesystem destroyed, quit the run loop");
156 return;
157 }
158 Some(errno) => {
160 panic!(
161 "non-recoverable io error when read FUSE device, \
162 the error is: {errno}",
163 );
164 }
165 }
166 }
167 };
168
169 runtime_handle.block_on(async{
170 process_fuse_request(
171 buffer,
172 size,
173 file,
174 Arc::clone(&fs),
175 buffer_tx.clone(),
176 proto_version,
177 ).await
178 });
179 }
184}
185
186async fn process_fuse_request(
188 byte_buffer: AlignedBytes,
189 read_size: usize,
190 mut file: File,
191 fs: Arc<dyn FileSystem + Send + Sync + 'static>,
192 sender: Sender<(File, AlignedBytes)>,
193 proto_version: ProtoVersion,
194) {
195 let bytes = byte_buffer
196 .get(..read_size)
197 .unwrap_or_else(|| panic!("failed to read {read_size} bytes from the buffer",));
198 let fuse_req = match Request::new(bytes, proto_version) {
199 Ok(r) => r,
201 Err(e) => {
203 if let &DeserializeError::UnknownOpCode { code, unique } = &e {
204 error!("Unknown operation code found: {code}, with context: {e}");
205 let unique = unique.unwrap_or_else(|| {
206 unreachable!("A `unique` must be filled in by deserializer.")
207 });
208 ReplyEmpty::new(unique, &mut file)
209 .error_code(Errno::ENOSYS)
210 .await
211 .unwrap_or_else(|reply_err| {
212 panic!("Failed to reply an error code: {reply_err}.")
213 });
214 sender.send((file, byte_buffer)).unwrap_or_else(|_| {
215 error!("The buffer pool is closed.");
216 });
217 return;
218 }
219
220 panic!("failed to build FUSE request, the error is: {e}");
222 }
223 };
224 debug!("received FUSE req={}", fuse_req);
225 let res = dispatch(&fuse_req, &mut file, fs).await;
226 if let Err(e) = res {
227 panic!(
228 "failed to process req={:?}, the error is: {}",
229 fuse_req,
230 crate::util::format_nix_error(e), );
232 }
233 let res = sender.send((file, byte_buffer));
234 if let Err(e) = res {
235 panic!("failed to put the buffer back to buffer pool, the error is: {e}",);
236 }
237}
238
239#[allow(missing_debug_implementations)]
241pub struct Session<F: FileSystem + Send + Sync + 'static> {
242 fuse_fd: Arc<FuseFd>,
244 proto_version: AtomicCell<ProtoVersion>,
246 mount_path: PathBuf,
248 filesystem: Arc<F>,
250 }
252pub async fn new_session<F>( mount_path: &Path, fs: F)->anyhow::Result<Session<F>> where F: FileSystem + Send + Sync + 'static {
253 let fuse_fd = mount::mount(mount_path).await.context("Failed to mount FUSE")?;
254 let fsarc = Arc::new(fs);
255
256 Ok(Session{
257 fuse_fd: Arc::new(FuseFd(fuse_fd)),
258 proto_version: AtomicCell::new(ProtoVersion::UNSPECIFIED),
259 mount_path: mount_path.to_owned(),
260 filesystem: fsarc,
261 })
262}
263
264#[derive(Debug)]
266struct FuseFd(RawFd);
267
268impl Drop for FuseFd {
269 fn drop(&mut self) {
270 println!("Dropping FUSE fd {}", self.0);
271 unistd::close(self.0).ok();
272 }
273}
274
275impl<F: FileSystem + Send + Sync + 'static> Drop for Session<F> {
276 fn drop(&mut self) {
277 println!("Dropping FUSE session");
278 futures::executor::block_on(async {
279 let mount_path = &self.mount_path;
280 let res = mount::umount(mount_path).await;
281 println!("umount result: {:?}", res);
282 match res {
283 Ok(..) => info!("Session::drop() successfully umount {:?}", mount_path),
284 Err(e) => error!(
285 "Session::drop() failed to umount {:?}, the error is: {}",
286 mount_path, e,
287 ),
288 };
289 });
290 }
291}
292
293impl<F: FileSystem + Send + Sync + 'static> Session<F> {
294 #[inline]
296 pub fn dev_fd(&self) -> RawFd {
297 self.fuse_fd.0
298 }
299
300 #[allow(clippy::arithmetic_side_effects, clippy::pattern_type_mismatch)] pub async fn run(self, token: CancellationToken) -> anyhow::Result<()> {
303 let (pool_sender, pool_receiver) = self
305 .setup_buffer_pool()
306 .await
307 .context("failed to setup buffer pool")?;
308
309 for _ in 0..MAX_FUSE_READER {
310 let pool_tx = pool_sender.clone();
311 let pool_rx = pool_receiver.clone();
312 let handle = Handle::current();
314 let fs = Arc::clone(&self.filesystem);
315 let protocol_version = self.proto_version.load();
316 thread::spawn(move || {
318 fuse_device_reader(pool_tx, pool_rx, handle, protocol_version, fs);
319 });
320 }
321
322 drop(pool_receiver);
323
324 token.cancelled().await;
325 info!("Async FUSE session exits.");
326
327 Ok(())
328 }
329
330 async fn setup_buffer_pool(
332 &self,
333 ) -> anyhow::Result<(Sender<(File, AlignedBytes)>, Receiver<(File, AlignedBytes)>)> {
334 let (pool_sender, pool_receiver) =
335 crossbeam_channel::bounded::<(File, AlignedBytes)>(MAX_BACKGROUND.into());
336
337 for _ in 0..MAX_BACKGROUND {
338 let buf = AlignedBytes::new_zeroed(BUFFER_SIZE.cast(), PAGE_SIZE);
339 let session_fd = self.dev_fd();
340
341 let file = unsafe {
342 let worker_fd = fuse_fd_clone(session_fd)?;
344 File::from_raw_fd(worker_fd)
346 };
347
348 let res = pool_sender.send((file, buf));
349 if let Err(e) = res {
350 panic!(
351 "failed to insert buffer to buffer pool when initializing, the error is: {e}",
352 );
353 }
354 }
355
356 let (mut file, mut byte_buf) = pool_receiver.recv()?;
357 let (read_result, mut file, byte_buf) = tokio::task::spawn_blocking(move || {
358 let res = file.read(&mut byte_buf);
359 (res, file, byte_buf)
360 })
361 .await?;
362 if let Ok(read_size) = read_result {
363 debug!("read successfully {} byte data from FUSE device", read_size);
364 let bytes = byte_buf.get(..read_size).unwrap_or_else(|| {
365 panic!(
366 "read_size is greater than buffer size: read_size = {}, buffer size = {}",
367 read_size,
368 byte_buf.len()
369 )
370 });
371 if let Ok(req) = Request::new(bytes, self.proto_version.load()) {
372 if let Operation::Init { arg } = *req.operation() {
373 let filesystem = Arc::clone(&self.filesystem);
374 self.init(arg, &req, &*filesystem, &mut file).await?;
375 }
376 }
377 }
378 pool_sender
379 .send((file, byte_buf))
380 .context("failed to put buffer back to buffer pool after FUSE init")?;
381
382 Ok((pool_sender, pool_receiver))
383 }
384
385 #[allow(single_use_lifetimes)] async fn init<'a>(
388 &self,
389 arg: &'_ FuseInitIn,
390 req: &'_ Request<'a>,
391 fs: &'_ (dyn FileSystem + Send + Sync + 'static),
392 file: &mut File,
393 ) -> anyhow::Result<()> {
394 debug!("Init args={:?}", arg);
395 let reply = ReplyInit::new(req.unique(), file);
398 if arg.major < 7 || (arg.major == 7 && arg.minor < 8) {
400 error!("Unsupported FUSE ABI version={}.{}", arg.major, arg.minor);
401 reply.error_code(Errno::EPROTO).await?;
402 return Err(anyhow!("FUSE ABI version too low"));
403 }
404 let filesystem = fs;
406 let init_res = filesystem.init(req).await;
407 if let Err(err) = init_res {
408 reply.error_code(Errno::ENOSYS).await?;
409 return Err(anyhow!("user defined init failed, the error is: {}", err,));
410 }
411 let flags = arg.flags & INIT_FLAGS; #[cfg(not(feature = "abi-7-13"))]
413 let unused = 0_u32;
414 #[cfg(feature = "abi-7-13")]
415 let congestion_threshold = 10_u16; #[cfg(feature = "abi-7-23")]
417 let time_gran = 1_u32; #[cfg(all(feature = "abi-7-23", not(feature = "abi-7-28")))]
419 let unused = [0_u32; 9];
420 #[cfg(feature = "abi-7-28")]
421 let max_pages = 0_u16; #[cfg(feature = "abi-7-28")]
423 let padding = 0_u16;
424 #[cfg(feature = "abi-7-28")]
425 let unused = [0_u32; 8];
426 reply
430 .init(FuseInitOut {
431 major: FUSE_KERNEL_VERSION,
432 minor: FUSE_KERNEL_MINOR_VERSION, max_readahead: arg.max_readahead, flags, #[cfg(not(feature = "abi-7-13"))]
438 unused,
439 #[cfg(feature = "abi-7-13")]
440 max_background: MAX_BACKGROUND,
441 #[cfg(feature = "abi-7-13")]
442 congestion_threshold,
443 max_write: MAX_WRITE_SIZE,
444 #[cfg(feature = "abi-7-23")]
445 time_gran,
446 #[cfg(all(feature = "abi-7-23", not(feature = "abi-7-28")))]
447 unused,
448 #[cfg(feature = "abi-7-28")]
449 max_pages,
450 #[cfg(feature = "abi-7-28")]
451 padding,
452 #[cfg(feature = "abi-7-28")]
453 unused,
454 })
455 .await?;
456 debug!(
457 "INIT response: ABI version={}.{}, flags={:#x}, max readahead={}, max write={}",
458 FUSE_KERNEL_VERSION,
459 FUSE_KERNEL_MINOR_VERSION,
460 flags,
461 arg.max_readahead,
462 MAX_WRITE_SIZE,
463 );
464
465 self.proto_version.store(ProtoVersion {
467 major: arg.major,
468 minor: arg.minor,
469 });
470
471 Ok(())
472 }
473}
474
475#[allow(clippy::too_many_lines)]
479#[instrument(name="request",skip(req, file, fs), fields(fuse_id =req.unique(),ino=req.nodeid(), op=%req.operation(), len=req.len()),ret)]
480async fn dispatch<'a>(
481 req: &'a Request<'a>,
482 file: &mut File,
483 fs: Arc<dyn FileSystem + Send + Sync + 'static>,
484) -> nix::Result<usize> {
485 let result = match *req.operation() {
486 Operation::Init { .. } => panic!("FUSE should have already initialized"),
488
489 Operation::Destroy => {
491 fs.destroy(req).await;
492 let reply = ReplyEmpty::new(req.unique(), file);
493 reply.ok().await
494 }
495
496 Operation::Interrupt { arg } => {
497 fs.interrupt(req, arg.unique).await; Ok(0)
499 }
500
501 Operation::Lookup { name } => {
502 let reply = ReplyEntry::new(req.unique(), file);
503 fs.lookup(req, req.nodeid(), name, reply).await
504 }
505 Operation::Forget { arg } => {
506 fs.forget(req, arg.nlookup).await; Ok(0)
508 }
509 Operation::GetAttr => {
510 let reply = ReplyAttr::new(req.unique(), file);
511 fs.getattr(req, reply).await
512 }
513 Operation::SetAttr { arg } => {
514 #[cfg(feature = "abi-7-9")]
515 use std::time::SystemTime;
516
517 #[cfg(feature = "abi-7-9")]
518 use super::protocol::{FATTR_ATIME_NOW, FATTR_MTIME_NOW};
519
520 let mode = match arg.valid & FATTR_MODE {
521 0 => None,
522 _ => Some(arg.mode),
523 };
524 let u_id = match arg.valid & FATTR_UID {
525 0 => None,
526 _ => Some(arg.uid),
527 };
528 let g_id = match arg.valid & FATTR_GID {
529 0 => None,
530 _ => Some(arg.gid),
531 };
532 let size = match arg.valid & FATTR_SIZE {
533 0 => None,
534 _ => Some(arg.size),
535 };
536 let a_time = match arg.valid & FATTR_ATIME {
537 0 => None,
538 _ => Some(UNIX_EPOCH + Duration::new(arg.atime, arg.atimensec)),
539 };
540 let m_time = match arg.valid & FATTR_MTIME {
541 0 => None,
542 _ => Some(UNIX_EPOCH + Duration::new(arg.mtime, arg.mtimensec)),
543 };
544 let fh = match arg.valid & FATTR_FH {
545 0 => None,
546 _ => Some(arg.fh),
547 };
548
549 #[cfg(feature = "abi-7-9")]
550 let a_time = match arg.valid & FATTR_ATIME_NOW {
551 0 => a_time,
552 _ => Some(SystemTime::now()),
553 };
554 #[cfg(feature = "abi-7-9")]
555 let m_time = match arg.valid & FATTR_MTIME_NOW {
556 0 => m_time,
557 _ => Some(SystemTime::now()),
558 };
559
560 #[cfg(feature = "abi-7-9")]
561 let lock_owner = match arg.valid & FATTR_LOCKOWNER {
562 0 => None,
563 _ => Some(arg.lock_owner),
564 };
565 #[cfg(feature = "abi-7-23")]
566 let c_time = match arg.valid & FATTR_CTIME {
567 0 => None,
568 _ => Some(UNIX_EPOCH + Duration::new(arg.ctime, arg.ctimensec)),
569 };
570
571 let reply = ReplyAttr::new(req.unique(), file);
572 let param = SetAttrParam {
573 valid: arg.valid,
574 fh,
575 mode,
576 u_id,
577 g_id,
578 size,
579 #[cfg(feature = "abi-7-9")]
580 lock_owner,
581 a_time,
582 m_time,
583 #[cfg(feature = "abi-7-23")]
584 c_time,
585 };
586 fs.setattr(req, param, reply).await
587 }
588 Operation::ReadLink => {
589 let reply = ReplyData::new(req.unique(), file);
590 fs.readlink(req, reply).await
591 }
592 Operation::MkNod { arg, name } => {
593 let param = CreateParam {
594 parent: req.nodeid(),
595 name: name.to_owned(),
596 mode: arg.mode,
597 rdev: arg.rdev,
598 uid: req.uid(),
599 gid: req.gid(),
600 node_type: SFlag::S_IFREG,
601 link: None,
602 };
603 let reply = ReplyEntry::new(req.unique(), file);
604 fs.mknod(req, param, reply).await
605 }
606 Operation::MkDir { arg, name } => {
607 let reply = ReplyEntry::new(req.unique(), file);
608 fs.mkdir(req, req.nodeid(), name, arg.mode, reply).await
609 }
610 Operation::Unlink { name } => {
611 let reply = ReplyEmpty::new(req.unique(), file);
612 fs.unlink(req, req.nodeid(), name, reply).await
613 }
614 Operation::RmDir { name } => {
615 let reply = ReplyEmpty::new(req.unique(), file);
616 fs.rmdir(req, req.nodeid(), name, reply).await
617 }
618 Operation::SymLink { name, link } => {
619 let reply = ReplyEntry::new(req.unique(), file);
620 fs.symlink(req, req.nodeid(), name, Path::new(link), reply)
621 .await
622 }
623 Operation::Rename {
624 arg,
625 oldname,
626 newname,
627 } => {
628 let reply = ReplyEmpty::new(req.unique(), file);
629 let param = RenameParam {
630 old_parent: req.nodeid(),
631 old_name: oldname.to_owned(),
632 new_parent: arg.newdir,
633 new_name: newname.to_owned(),
634 flags: 0,
635 };
636 fs.rename(req, param, reply).await
637 }
638 Operation::Link { arg, name } => {
639 let reply = ReplyEntry::new(req.unique(), file);
640 fs.link(req, arg.oldnodeid, name, reply).await
641 }
642 Operation::Open { arg } => {
643 let reply = ReplyOpen::new(req.unique(), file);
644 fs.open(req, arg.flags, reply).await
645 }
646 Operation::Read { arg } => {
647 let reply = ReplyData::new(req.unique(), file);
648 fs.read(req, arg.fh, arg.offset.cast(), arg.size, reply)
649 .await
650 }
651 Operation::Write { arg, data } => {
652 info!("operation:write: {:?}", arg);
653 assert_eq!(data.len(), arg.size.cast::<usize>());
654 let reply = ReplyWrite::new(req.unique(), file);
655 fs.write(
656 req,
657 arg.fh,
658 arg.offset.cast(),
659 data.to_vec(), arg.write_flags,
661 reply,
662 )
663 .await
664 }
665 Operation::Flush { arg } => {
666 let reply = ReplyEmpty::new(req.unique(), file);
667 fs.flush(req, arg.fh, arg.lock_owner, reply).await
668 }
669 Operation::Release { arg } => {
670 let flush = !matches!(arg.release_flags & FUSE_RELEASE_FLUSH, 0);
671 let reply = ReplyEmpty::new(req.unique(), file);
672 fs.release(req, arg.fh, arg.flags, arg.lock_owner, flush, reply)
673 .await
674 }
675 Operation::FSync { arg } => {
676 let datasync = !matches!(arg.fsync_flags & 1, 0);
677 let reply = ReplyEmpty::new(req.unique(), file);
678 fs.fsync(req, arg.fh, datasync, reply).await
679 }
680 Operation::OpenDir { arg } => {
681 let reply = ReplyOpen::new(req.unique(), file);
682 fs.opendir(req, arg.flags, reply).await
683 }
684 Operation::ReadDir { arg } => {
685 let reply = ReplyDirectory::new(req.unique(), file, arg.size.cast());
686 fs.readdir(req, arg.fh, arg.offset.cast(), reply).await
687 }
688 Operation::ReleaseDir { arg } => {
689 let reply = ReplyEmpty::new(req.unique(), file);
690 fs.releasedir(req, arg.fh, arg.flags, reply).await
691 }
692 Operation::FSyncDir { arg } => {
693 let datasync = !matches!(arg.fsync_flags & 1, 0);
694 let reply = ReplyEmpty::new(req.unique(), file);
695 fs.fsyncdir(req, arg.fh, datasync, reply).await
696 }
697 Operation::StatFs => {
698 let reply = ReplyStatFs::new(req.unique(), file);
699 fs.statfs(req, reply).await
700 }
701 Operation::SetXAttr { arg, name, value } => {
702 #[cfg(target_os = "linux")]
705 #[inline]
706 const fn get_position(_arg: &FuseSetXAttrIn) -> u32 {
707 0
708 }
709 assert!(value.len() == arg.size.cast::<usize>());
710 let reply = ReplyEmpty::new(req.unique(), file);
711 fs.setxattr(req, name, value, arg.flags, get_position(arg), reply)
712 .await
713 }
714 Operation::GetXAttr { arg, name } => {
715 let reply = ReplyXAttr::new(req.unique(), file);
716 fs.getxattr(req, name, arg.size, reply).await
717 }
718 Operation::ListXAttr { arg } => {
719 let reply = ReplyXAttr::new(req.unique(), file);
720 fs.listxattr(req, arg.size, reply).await
721 }
722 Operation::RemoveXAttr { name } => {
723 let reply = ReplyEmpty::new(req.unique(), file);
724 fs.removexattr(req, name, reply).await
725 }
726 Operation::Access { arg } => {
727 let reply = ReplyEmpty::new(req.unique(), file);
728 fs.access(req, arg.mask, reply).await
729 }
730 Operation::Create { arg, name } => {
731 let reply = ReplyCreate::new(req.unique(), file);
732 fs.create(req, req.nodeid(), name, arg.mode, arg.flags, reply)
733 .await
734 }
735 Operation::GetLk { arg } => {
736 let reply = ReplyLock::new(req.unique(), file);
737 let lock_param = FileLockParam {
738 fh: arg.fh,
739 lock_owner: arg.owner,
740 start: arg.lk.start,
741 end: arg.lk.end,
742 typ: arg.lk.typ,
743 pid: arg.lk.pid,
744 };
745 fs.getlk(req, lock_param, reply).await
746 }
747 Operation::SetLk { arg } => {
748 let reply = ReplyEmpty::new(req.unique(), file);
749 let lock_param = FileLockParam {
750 fh: arg.fh,
751 lock_owner: arg.owner,
752 start: arg.lk.start,
753 end: arg.lk.end,
754 typ: arg.lk.typ,
755 pid: arg.lk.pid,
756 };
757 fs.setlk(req, lock_param, false, reply).await
758 }
759 Operation::SetLkW { arg } => {
760 let reply = ReplyEmpty::new(req.unique(), file);
761 let lock_param = FileLockParam {
762 fh: arg.fh,
763 lock_owner: arg.owner,
764 start: arg.lk.start,
765 end: arg.lk.end,
766 typ: arg.lk.typ,
767 pid: arg.lk.pid,
768 };
769 fs.setlk(
770 req, lock_param, true, reply,
772 )
773 .await
774 }
775 Operation::BMap { arg } => {
776 let reply = ReplyBMap::new(req.unique(), file);
777 fs.bmap(req, arg.blocksize, arg.block, reply).await
778 }
779
780 #[cfg(feature = "abi-7-11")]
781 Operation::IoCtl { arg, data } => {
782 error!("IoCtl not implemented, arg={:?}, data={:?}", arg, data);
783 not_implement_helper(req, file).await
784 }
785 #[cfg(feature = "abi-7-11")]
786 Operation::Poll { arg } => {
787 error!("Poll not implemented, arg={:?}", arg);
788 not_implement_helper(req, file).await
789 }
790 #[cfg(feature = "abi-7-15")]
791 Operation::NotifyReply { data } => {
792 error!("NotifyReply not implemented, data={:?}", data);
793 not_implement_helper(req, file).await
794 }
795 #[cfg(feature = "abi-7-16")]
796 Operation::BatchForget { arg, nodes } => {
797 error!(
798 "BatchForget not implemented, arg={:?}, nodes={:?}",
799 arg, nodes
800 );
801 not_implement_helper(req, file).await
802 }
803 #[cfg(feature = "abi-7-19")]
804 Operation::FAllocate { arg } => {
805 error!("FAllocate not implemented, arg={:?}", arg);
806 not_implement_helper(req, file).await
807 }
808 #[cfg(feature = "abi-7-21")]
809 Operation::ReadDirPlus { arg } => {
810 error!("ReadDirPlus not implemented, arg={:?}", arg);
811 not_implement_helper(req, file).await
812 }
813 #[cfg(feature = "abi-7-23")]
814 Operation::Rename2 {
815 arg,
816 oldname,
817 newname,
818 } => {
819 let reply = ReplyEmpty::new(req.unique(), file);
820 let param = RenameParam {
821 old_parent: req.nodeid(),
822 old_name: oldname.to_owned(),
823 new_parent: arg.newdir,
824 new_name: newname.to_owned(),
825 flags: arg.flags,
826 };
827 fs.rename(req, param, reply).await
828 }
829 Operation::LSeek { arg } => {
831 error!("LSeek not implemented, arg={:?}", arg);
832 not_implement_helper(req, file).await
833 }
834 Operation::CopyFileRange { arg } => {
836 error!("ReadDirPlusCopyFileRange not implemented, arg={:?}", arg);
837 not_implement_helper(req, file).await
838 }
839 #[cfg(feature = "abi-7-11")]
840 Operation::CuseInit { arg } => {
841 panic!("unsupported CuseInit arg={arg:?}");
842 }
843 };
844
845 result
846}
847
848async fn not_implement_helper(req: &Request<'_>, file: &mut File) -> nix::Result<usize> {
850 let reply = ReplyEmpty::new(req.unique(), file);
851 reply.error_code(Errno::ENOSYS).await
852}