1use std::fs::File;
4use std::io::Read;
5use std::os::fd::FromRawFd;
6use std::os::unix::io::RawFd;
7use std::path::{Path, PathBuf};
8use std::sync::Arc;
9use std::thread;
10use std::time::{Duration, UNIX_EPOCH};
11
12use aligned_utils::bytes::AlignedBytes;
13use anyhow::{anyhow, Context};
14use clippy_utilities::Cast;
15use crossbeam_channel::{Receiver, Sender};
16use crossbeam_utils::atomic::AtomicCell;
17use nix::errno::Errno;
18use nix::sys::stat::SFlag;
19use nix::unistd;
20use tokio::runtime::Handle;
21use tokio_util::sync::CancellationToken;
22use tracing::{debug, error, info, instrument};
23
24use super::context::ProtoVersion;
25use super::file_system::{FileSystem};
26use super::fuse_reply::{
27 ReplyAttr, ReplyBMap, ReplyCreate, ReplyData, ReplyDirectory, ReplyEmpty, ReplyEntry,
28 ReplyInit, ReplyLock, ReplyOpen, ReplyStatFs, ReplyWrite, ReplyXAttr,
29};
30use super::fuse_request::{Operation, Request};
31use super::{mount, FuseFs};
32#[cfg(feature = "abi-7-23")]
33use super::protocol::FATTR_CTIME;
34#[cfg(feature = "abi-7-9")]
35use super::protocol::FATTR_LOCKOWNER; use super::protocol::{
37 FuseInitIn, FuseInitOut, FuseSetXAttrIn, FATTR_ATIME, FATTR_FH, FATTR_GID, FATTR_MODE,
38 FATTR_MTIME, FATTR_SIZE, FATTR_UID, FUSE_ASYNC_READ, FUSE_KERNEL_MINOR_VERSION,
39 FUSE_KERNEL_VERSION, FUSE_RELEASE_FLUSH,
40};
41use crate::fs_util::{CreateParam, FileLockParam, RenameParam, SetAttrParam};
42
43#[cfg(target_os = "linux")]
45const INIT_FLAGS: u32 = FUSE_ASYNC_READ;
46#[cfg(target_os = "linux")]
51const MAX_WRITE_SIZE: u32 = 128 * 1024;
52
53const BUFFER_SIZE: u32 = MAX_WRITE_SIZE + 512;
57
58const PAGE_SIZE: usize = 4096;
60const MAX_BACKGROUND: u16 = 10; const MAX_FUSE_READER: usize = 2; #[allow(missing_docs)] mod _fuse_fd_clone {
70 use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd, RawFd};
71
72 use clippy_utilities::Cast;
73 use nix::fcntl::{self, FcntlArg, FdFlag, OFlag};
74 use nix::ioctl_read;
75 use nix::sys::stat::Mode;
76 ioctl_read!(fuse_fd_clone_impl, 229, 0, u32);
77
78 #[allow(clippy::unnecessary_safety_comment)]
85 pub unsafe fn fuse_fd_clone(session_fd: RawFd) -> nix::Result<RawFd> {
86 let devname = "/dev/fuse";
87 let cloned_fd = fcntl::open(devname, OFlag::O_RDWR | OFlag::O_CLOEXEC, Mode::empty())?;
88 let cloned_fd = unsafe {
91 OwnedFd::from_raw_fd(cloned_fd)
92 };
93
94 fcntl::fcntl(cloned_fd.as_raw_fd(), FcntlArg::F_SETFD(FdFlag::FD_CLOEXEC))?;
95
96 let mut result_fd: u32 = session_fd.cast();
97 unsafe {
100 fuse_fd_clone_impl(cloned_fd.as_raw_fd(), &mut result_fd)?
101 };
102 Ok(cloned_fd.into_raw_fd()) }
105}
106
107use _fuse_fd_clone::fuse_fd_clone;
108use crate::de::DeserializeError;
109
110#[allow(clippy::needless_pass_by_value)]
112fn fuse_device_reader(
113 buffer_tx: Sender<(File, AlignedBytes)>,
114 buffer_rx: Receiver<(File, AlignedBytes)>,
115 runtime_handle: Handle,
116 proto_version: ProtoVersion,
117 fs: Arc<dyn FileSystem + Send + Sync>,
118) {
119 loop {
120 let Ok((mut file, mut buffer)) = buffer_rx.recv() else {
121 error!("The channel of buffer is exhausted and closed.");
122 return;
123 };
124
125 let size = match file.read(&mut buffer) {
126 Ok(size) => size,
127 Err(e) => {
128 let errno = e.raw_os_error().map(Errno::from_raw);
129 match errno {
130 None => {
131 panic!("Failed to get the errno on reading failure. The error is {e}");
132 }
133 Some(Errno::ENOENT) => {
135 info!("operation interrupted, retry.");
136 buffer_tx.send((file, buffer)).unwrap_or_else(|_| {
137 unreachable!("The buffer channel is not to be closed.")
138 });
139 continue;
140 }
141 Some(Errno::EINTR) => {
143 info!("interrupted system call, retry");
144 buffer_tx.send((file, buffer)).unwrap_or_else(|_| {
145 unreachable!("The buffer channel is not to be closed.")
146 });
147 continue;
148 }
149 Some(Errno::EAGAIN) => {
151 info!("Explicitly retry");
152 buffer_tx.send((file, buffer)).unwrap_or_else(|_| {
153 unreachable!("The buffer channel is not to be closed.")
154 });
155 continue;
156 }
157 Some(Errno::ENODEV) => {
159 info!("filesystem destroyed, quit the run loop");
160 return;
161 }
162 Some(errno) => {
164 panic!(
165 "non-recoverable io error when read FUSE device, \
166 the error is: {errno}",
167 );
168 }
169 }
170 }
171 };
172
173 runtime_handle.block_on(async{
174 process_fuse_request(
175 buffer,
176 size,
177 file,
178 Arc::clone(&fs),
179 buffer_tx.clone(),
180 proto_version,
181 ).await
182 });
183 }
188}
189
190async fn process_fuse_request(
192 byte_buffer: AlignedBytes,
193 read_size: usize,
194 mut file: File,
195 fs: Arc<dyn FileSystem + Send + Sync + 'static>,
196 sender: Sender<(File, AlignedBytes)>,
197 proto_version: ProtoVersion,
198) {
199 let bytes = byte_buffer
200 .get(..read_size)
201 .unwrap_or_else(|| panic!("failed to read {read_size} bytes from the buffer",));
202 let fuse_req = match Request::new(bytes, proto_version) {
203 Ok(r) => r,
205 Err(e) => {
207 if let &DeserializeError::UnknownOpCode { code, unique } = &e {
208 error!("Unknown operation code found: {code}, with context: {e}");
209 let unique = unique.unwrap_or_else(|| {
210 unreachable!("A `unique` must be filled in by deserializer.")
211 });
212 ReplyEmpty::new(unique, &mut file)
213 .error_code(Errno::ENOSYS)
214 .await
215 .unwrap_or_else(|reply_err| {
216 panic!("Failed to reply an error code: {reply_err}.")
217 });
218 sender.send((file, byte_buffer)).unwrap_or_else(|_| {
219 error!("The buffer pool is closed.");
220 });
221 return;
222 }
223
224 panic!("failed to build FUSE request, the error is: {e}");
226 }
227 };
228 debug!("received FUSE req={}", fuse_req);
229 let res = dispatch(&fuse_req, &mut file, fs).await;
230 if let Err(e) = res {
231 panic!(
232 "failed to process req={:?}, the error is: {}",
233 fuse_req,
234 crate::util::format_nix_error(e), );
236 }
237 let res = sender.send((file, byte_buffer));
238 if let Err(e) = res {
239 panic!("failed to put the buffer back to buffer pool, the error is: {e}",);
240 }
241}
242
243#[allow(missing_debug_implementations)]
245pub struct Session<F: FileSystem + Send + Sync + 'static> {
246 fuse_fd: Arc<FuseFd>,
248 proto_version: AtomicCell<ProtoVersion>,
250 mount_path: PathBuf,
252 filesystem: Arc<F>,
254 }
256pub async fn new_session(mount_path: &Path, fs: FuseFs)->anyhow::Result<Session<FuseFs>>{
257 let fuse_fd = mount::mount(mount_path).await.context("Failed to mount FUSE")?;
258
259 Ok(Session{
260 fuse_fd: Arc::new(FuseFd(fuse_fd)),
261 proto_version: AtomicCell::new(ProtoVersion::UNSPECIFIED),
262 mount_path: mount_path.to_owned(),
263 filesystem: Arc::new(fs),
264 })
265}
266
267#[derive(Debug)]
269struct FuseFd(RawFd);
270
271impl Drop for FuseFd {
272 fn drop(&mut self) {
273 println!("Dropping FUSE fd {}", self.0);
274 unistd::close(self.0).ok();
275 }
276}
277
278impl<F: FileSystem + Send + Sync + 'static> Drop for Session<F> {
279 fn drop(&mut self) {
280 println!("Dropping FUSE session");
281 futures::executor::block_on(async {
282 let mount_path = &self.mount_path;
283 let res = mount::umount(mount_path).await;
284 println!("umount result: {:?}", res);
285 match res {
286 Ok(..) => info!("Session::drop() successfully umount {:?}", mount_path),
287 Err(e) => error!(
288 "Session::drop() failed to umount {:?}, the error is: {}",
289 mount_path, e,
290 ),
291 };
292 });
293 }
294}
295
296impl<F: FileSystem + Send + Sync + 'static> Session<F> {
297 #[inline]
299 pub fn dev_fd(&self) -> RawFd {
300 self.fuse_fd.0
301 }
302
303 #[allow(clippy::arithmetic_side_effects, clippy::pattern_type_mismatch)] pub async fn run(self, token: CancellationToken) -> anyhow::Result<()> {
306 let (pool_sender, pool_receiver) = self
308 .setup_buffer_pool()
309 .await
310 .context("failed to setup buffer pool")?;
311
312 for _ in 0..MAX_FUSE_READER {
313 let pool_tx = pool_sender.clone();
314 let pool_rx = pool_receiver.clone();
315 let handle = Handle::current();
317 let fs = Arc::clone(&self.filesystem);
318 let protocol_version = self.proto_version.load();
319 thread::spawn(move || {
321 fuse_device_reader(pool_tx, pool_rx, handle, protocol_version, fs);
322 });
323 }
324
325 drop(pool_receiver);
326
327 token.cancelled().await;
328 info!("Async FUSE session exits.");
329
330 Ok(())
331 }
332
333 async fn setup_buffer_pool(
335 &self,
336 ) -> anyhow::Result<(Sender<(File, AlignedBytes)>, Receiver<(File, AlignedBytes)>)> {
337 let (pool_sender, pool_receiver) =
338 crossbeam_channel::bounded::<(File, AlignedBytes)>(MAX_BACKGROUND.into());
339
340 for _ in 0..MAX_BACKGROUND {
341 let buf = AlignedBytes::new_zeroed(BUFFER_SIZE.cast(), PAGE_SIZE);
342 let session_fd = self.dev_fd();
343
344 let file = unsafe {
345 let worker_fd = fuse_fd_clone(session_fd)?;
347 File::from_raw_fd(worker_fd)
349 };
350
351 let res = pool_sender.send((file, buf));
352 if let Err(e) = res {
353 panic!(
354 "failed to insert buffer to buffer pool when initializing, the error is: {e}",
355 );
356 }
357 }
358
359 let (mut file, mut byte_buf) = pool_receiver.recv()?;
360 let (read_result, mut file, byte_buf) = tokio::task::spawn_blocking(move || {
361 let res = file.read(&mut byte_buf);
362 (res, file, byte_buf)
363 })
364 .await?;
365 if let Ok(read_size) = read_result {
366 debug!("read successfully {} byte data from FUSE device", read_size);
367 let bytes = byte_buf.get(..read_size).unwrap_or_else(|| {
368 panic!(
369 "read_size is greater than buffer size: read_size = {}, buffer size = {}",
370 read_size,
371 byte_buf.len()
372 )
373 });
374 if let Ok(req) = Request::new(bytes, self.proto_version.load()) {
375 if let Operation::Init { arg } = *req.operation() {
376 let filesystem = Arc::clone(&self.filesystem);
377 self.init(arg, &req, &*filesystem, &mut file).await?;
378 }
379 }
380 }
381 pool_sender
382 .send((file, byte_buf))
383 .context("failed to put buffer back to buffer pool after FUSE init")?;
384
385 Ok((pool_sender, pool_receiver))
386 }
387
388 #[allow(single_use_lifetimes)] async fn init<'a>(
391 &self,
392 arg: &'_ FuseInitIn,
393 req: &'_ Request<'a>,
394 fs: &'_ (dyn FileSystem + Send + Sync + 'static),
395 file: &mut File,
396 ) -> anyhow::Result<()> {
397 debug!("Init args={:?}", arg);
398 let reply = ReplyInit::new(req.unique(), file);
401 if arg.major < 7 || (arg.major == 7 && arg.minor < 8) {
403 error!("Unsupported FUSE ABI version={}.{}", arg.major, arg.minor);
404 reply.error_code(Errno::EPROTO).await?;
405 return Err(anyhow!("FUSE ABI version too low"));
406 }
407 let filesystem = fs;
409 let init_res = filesystem.init(req).await;
410 if let Err(err) = init_res {
411 reply.error_code(Errno::ENOSYS).await?;
412 return Err(anyhow!("user defined init failed, the error is: {}", err,));
413 }
414 let flags = arg.flags & INIT_FLAGS; #[cfg(not(feature = "abi-7-13"))]
416 let unused = 0_u32;
417 #[cfg(feature = "abi-7-13")]
418 let congestion_threshold = 10_u16; #[cfg(feature = "abi-7-23")]
420 let time_gran = 1_u32; #[cfg(all(feature = "abi-7-23", not(feature = "abi-7-28")))]
422 let unused = [0_u32; 9];
423 #[cfg(feature = "abi-7-28")]
424 let max_pages = 0_u16; #[cfg(feature = "abi-7-28")]
426 let padding = 0_u16;
427 #[cfg(feature = "abi-7-28")]
428 let unused = [0_u32; 8];
429 reply
433 .init(FuseInitOut {
434 major: FUSE_KERNEL_VERSION,
435 minor: FUSE_KERNEL_MINOR_VERSION, max_readahead: arg.max_readahead, flags, #[cfg(not(feature = "abi-7-13"))]
441 unused,
442 #[cfg(feature = "abi-7-13")]
443 max_background: MAX_BACKGROUND,
444 #[cfg(feature = "abi-7-13")]
445 congestion_threshold,
446 max_write: MAX_WRITE_SIZE,
447 #[cfg(feature = "abi-7-23")]
448 time_gran,
449 #[cfg(all(feature = "abi-7-23", not(feature = "abi-7-28")))]
450 unused,
451 #[cfg(feature = "abi-7-28")]
452 max_pages,
453 #[cfg(feature = "abi-7-28")]
454 padding,
455 #[cfg(feature = "abi-7-28")]
456 unused,
457 })
458 .await?;
459 debug!(
460 "INIT response: ABI version={}.{}, flags={:#x}, max readahead={}, max write={}",
461 FUSE_KERNEL_VERSION,
462 FUSE_KERNEL_MINOR_VERSION,
463 flags,
464 arg.max_readahead,
465 MAX_WRITE_SIZE,
466 );
467
468 self.proto_version.store(ProtoVersion {
470 major: arg.major,
471 minor: arg.minor,
472 });
473
474 Ok(())
475 }
476}
477
478#[allow(clippy::too_many_lines)]
482#[instrument(name="request",skip(req, file, fs), fields(fuse_id =req.unique(),ino=req.nodeid(), op=%req.operation(), len=req.len()),ret)]
483async fn dispatch<'a>(
484 req: &'a Request<'a>,
485 file: &mut File,
486 fs: Arc<dyn FileSystem + Send + Sync + 'static>,
487) -> nix::Result<usize> {
488 let result = match *req.operation() {
489 Operation::Init { .. } => panic!("FUSE should have already initialized"),
491
492 Operation::Destroy => {
494 fs.destroy(req).await;
495 let reply = ReplyEmpty::new(req.unique(), file);
496 reply.ok().await
497 }
498
499 Operation::Interrupt { arg } => {
500 fs.interrupt(req, arg.unique).await; Ok(0)
502 }
503
504 Operation::Lookup { name } => {
505 let reply = ReplyEntry::new(req.unique(), file);
506 fs.lookup(req, req.nodeid(), name, reply).await
507 }
508 Operation::Forget { arg } => {
509 fs.forget(req, arg.nlookup).await; Ok(0)
511 }
512 Operation::GetAttr => {
513 let reply = ReplyAttr::new(req.unique(), file);
514 fs.getattr(req, reply).await
515 }
516 Operation::SetAttr { arg } => {
517 #[cfg(feature = "abi-7-9")]
518 use std::time::SystemTime;
519
520 #[cfg(feature = "abi-7-9")]
521 use super::protocol::{FATTR_ATIME_NOW, FATTR_MTIME_NOW};
522
523 let mode = match arg.valid & FATTR_MODE {
524 0 => None,
525 _ => Some(arg.mode),
526 };
527 let u_id = match arg.valid & FATTR_UID {
528 0 => None,
529 _ => Some(arg.uid),
530 };
531 let g_id = match arg.valid & FATTR_GID {
532 0 => None,
533 _ => Some(arg.gid),
534 };
535 let size = match arg.valid & FATTR_SIZE {
536 0 => None,
537 _ => Some(arg.size),
538 };
539 let a_time = match arg.valid & FATTR_ATIME {
540 0 => None,
541 _ => Some(UNIX_EPOCH + Duration::new(arg.atime, arg.atimensec)),
542 };
543 let m_time = match arg.valid & FATTR_MTIME {
544 0 => None,
545 _ => Some(UNIX_EPOCH + Duration::new(arg.mtime, arg.mtimensec)),
546 };
547 let fh = match arg.valid & FATTR_FH {
548 0 => None,
549 _ => Some(arg.fh),
550 };
551
552 #[cfg(feature = "abi-7-9")]
553 let a_time = match arg.valid & FATTR_ATIME_NOW {
554 0 => a_time,
555 _ => Some(SystemTime::now()),
556 };
557 #[cfg(feature = "abi-7-9")]
558 let m_time = match arg.valid & FATTR_MTIME_NOW {
559 0 => m_time,
560 _ => Some(SystemTime::now()),
561 };
562
563 #[cfg(feature = "abi-7-9")]
564 let lock_owner = match arg.valid & FATTR_LOCKOWNER {
565 0 => None,
566 _ => Some(arg.lock_owner),
567 };
568 #[cfg(feature = "abi-7-23")]
569 let c_time = match arg.valid & FATTR_CTIME {
570 0 => None,
571 _ => Some(UNIX_EPOCH + Duration::new(arg.ctime, arg.ctimensec)),
572 };
573
574 let reply = ReplyAttr::new(req.unique(), file);
575 let param = SetAttrParam {
576 valid: arg.valid,
577 fh,
578 mode,
579 u_id,
580 g_id,
581 size,
582 #[cfg(feature = "abi-7-9")]
583 lock_owner,
584 a_time,
585 m_time,
586 #[cfg(feature = "abi-7-23")]
587 c_time,
588 };
589 fs.setattr(req, param, reply).await
590 }
591 Operation::ReadLink => {
592 let reply = ReplyData::new(req.unique(), file);
593 fs.readlink(req, reply).await
594 }
595 Operation::MkNod { arg, name } => {
596 let param = CreateParam {
597 parent: req.nodeid(),
598 name: name.to_owned(),
599 mode: arg.mode,
600 rdev: arg.rdev,
601 uid: req.uid(),
602 gid: req.gid(),
603 node_type: SFlag::S_IFREG,
604 link: None,
605 };
606 let reply = ReplyEntry::new(req.unique(), file);
607 fs.mknod(req, param, reply).await
608 }
609 Operation::MkDir { arg, name } => {
610 let reply = ReplyEntry::new(req.unique(), file);
611 fs.mkdir(req, req.nodeid(), name, arg.mode, reply).await
612 }
613 Operation::Unlink { name } => {
614 let reply = ReplyEmpty::new(req.unique(), file);
615 fs.unlink(req, req.nodeid(), name, reply).await
616 }
617 Operation::RmDir { name } => {
618 let reply = ReplyEmpty::new(req.unique(), file);
619 fs.rmdir(req, req.nodeid(), name, reply).await
620 }
621 Operation::SymLink { name, link } => {
622 let reply = ReplyEntry::new(req.unique(), file);
623 fs.symlink(req, req.nodeid(), name, Path::new(link), reply)
624 .await
625 }
626 Operation::Rename {
627 arg,
628 oldname,
629 newname,
630 } => {
631 let reply = ReplyEmpty::new(req.unique(), file);
632 let param = RenameParam {
633 old_parent: req.nodeid(),
634 old_name: oldname.to_owned(),
635 new_parent: arg.newdir,
636 new_name: newname.to_owned(),
637 flags: 0,
638 };
639 fs.rename(req, param, reply).await
640 }
641 Operation::Link { arg, name } => {
642 let reply = ReplyEntry::new(req.unique(), file);
643 fs.link(req, arg.oldnodeid, name, reply).await
644 }
645 Operation::Open { arg } => {
646 let reply = ReplyOpen::new(req.unique(), file);
647 fs.open(req, arg.flags, reply).await
648 }
649 Operation::Read { arg } => {
650 let reply = ReplyData::new(req.unique(), file);
651 fs.read(req, arg.fh, arg.offset.cast(), arg.size, reply)
652 .await
653 }
654 Operation::Write { arg, data } => {
655 info!("operation:write: {:?}", arg);
656 assert_eq!(data.len(), arg.size.cast::<usize>());
657 let reply = ReplyWrite::new(req.unique(), file);
658 fs.write(
659 req,
660 arg.fh,
661 arg.offset.cast(),
662 data.to_vec(), arg.write_flags,
664 reply,
665 )
666 .await
667 }
668 Operation::Flush { arg } => {
669 let reply = ReplyEmpty::new(req.unique(), file);
670 fs.flush(req, arg.fh, arg.lock_owner, reply).await
671 }
672 Operation::Release { arg } => {
673 let flush = !matches!(arg.release_flags & FUSE_RELEASE_FLUSH, 0);
674 let reply = ReplyEmpty::new(req.unique(), file);
675 fs.release(req, arg.fh, arg.flags, arg.lock_owner, flush, reply)
676 .await
677 }
678 Operation::FSync { arg } => {
679 let datasync = !matches!(arg.fsync_flags & 1, 0);
680 let reply = ReplyEmpty::new(req.unique(), file);
681 fs.fsync(req, arg.fh, datasync, reply).await
682 }
683 Operation::OpenDir { arg } => {
684 let reply = ReplyOpen::new(req.unique(), file);
685 fs.opendir(req, arg.flags, reply).await
686 }
687 Operation::ReadDir { arg } => {
688 let reply = ReplyDirectory::new(req.unique(), file, arg.size.cast());
689 fs.readdir(req, arg.fh, arg.offset.cast(), reply).await
690 }
691 Operation::ReleaseDir { arg } => {
692 let reply = ReplyEmpty::new(req.unique(), file);
693 fs.releasedir(req, arg.fh, arg.flags, reply).await
694 }
695 Operation::FSyncDir { arg } => {
696 let datasync = !matches!(arg.fsync_flags & 1, 0);
697 let reply = ReplyEmpty::new(req.unique(), file);
698 fs.fsyncdir(req, arg.fh, datasync, reply).await
699 }
700 Operation::StatFs => {
701 let reply = ReplyStatFs::new(req.unique(), file);
702 fs.statfs(req, reply).await
703 }
704 Operation::SetXAttr { arg, name, value } => {
705 #[cfg(target_os = "linux")]
708 #[inline]
709 const fn get_position(_arg: &FuseSetXAttrIn) -> u32 {
710 0
711 }
712 assert!(value.len() == arg.size.cast::<usize>());
713 let reply = ReplyEmpty::new(req.unique(), file);
714 fs.setxattr(req, name, value, arg.flags, get_position(arg), reply)
715 .await
716 }
717 Operation::GetXAttr { arg, name } => {
718 let reply = ReplyXAttr::new(req.unique(), file);
719 fs.getxattr(req, name, arg.size, reply).await
720 }
721 Operation::ListXAttr { arg } => {
722 let reply = ReplyXAttr::new(req.unique(), file);
723 fs.listxattr(req, arg.size, reply).await
724 }
725 Operation::RemoveXAttr { name } => {
726 let reply = ReplyEmpty::new(req.unique(), file);
727 fs.removexattr(req, name, reply).await
728 }
729 Operation::Access { arg } => {
730 let reply = ReplyEmpty::new(req.unique(), file);
731 fs.access(req, arg.mask, reply).await
732 }
733 Operation::Create { arg, name } => {
734 let reply = ReplyCreate::new(req.unique(), file);
735 fs.create(req, req.nodeid(), name, arg.mode, arg.flags, reply)
736 .await
737 }
738 Operation::GetLk { arg } => {
739 let reply = ReplyLock::new(req.unique(), file);
740 let lock_param = FileLockParam {
741 fh: arg.fh,
742 lock_owner: arg.owner,
743 start: arg.lk.start,
744 end: arg.lk.end,
745 typ: arg.lk.typ,
746 pid: arg.lk.pid,
747 };
748 fs.getlk(req, lock_param, reply).await
749 }
750 Operation::SetLk { arg } => {
751 let reply = ReplyEmpty::new(req.unique(), file);
752 let lock_param = FileLockParam {
753 fh: arg.fh,
754 lock_owner: arg.owner,
755 start: arg.lk.start,
756 end: arg.lk.end,
757 typ: arg.lk.typ,
758 pid: arg.lk.pid,
759 };
760 fs.setlk(req, lock_param, false, reply).await
761 }
762 Operation::SetLkW { arg } => {
763 let reply = ReplyEmpty::new(req.unique(), file);
764 let lock_param = FileLockParam {
765 fh: arg.fh,
766 lock_owner: arg.owner,
767 start: arg.lk.start,
768 end: arg.lk.end,
769 typ: arg.lk.typ,
770 pid: arg.lk.pid,
771 };
772 fs.setlk(
773 req, lock_param, true, reply,
775 )
776 .await
777 }
778 Operation::BMap { arg } => {
779 let reply = ReplyBMap::new(req.unique(), file);
780 fs.bmap(req, arg.blocksize, arg.block, reply).await
781 }
782
783 #[cfg(feature = "abi-7-11")]
784 Operation::IoCtl { arg, data } => {
785 error!("IoCtl not implemented, arg={:?}, data={:?}", arg, data);
786 not_implement_helper(req, file).await
787 }
788 #[cfg(feature = "abi-7-11")]
789 Operation::Poll { arg } => {
790 error!("Poll not implemented, arg={:?}", arg);
791 not_implement_helper(req, file).await
792 }
793 #[cfg(feature = "abi-7-15")]
794 Operation::NotifyReply { data } => {
795 error!("NotifyReply not implemented, data={:?}", data);
796 not_implement_helper(req, file).await
797 }
798 #[cfg(feature = "abi-7-16")]
799 Operation::BatchForget { arg, nodes } => {
800 error!(
801 "BatchForget not implemented, arg={:?}, nodes={:?}",
802 arg, nodes
803 );
804 not_implement_helper(req, file).await
805 }
806 #[cfg(feature = "abi-7-19")]
807 Operation::FAllocate { arg } => {
808 error!("FAllocate not implemented, arg={:?}", arg);
809 not_implement_helper(req, file).await
810 }
811 #[cfg(feature = "abi-7-21")]
812 Operation::ReadDirPlus { arg } => {
813 error!("ReadDirPlus not implemented, arg={:?}", arg);
814 not_implement_helper(req, file).await
815 }
816 #[cfg(feature = "abi-7-23")]
817 Operation::Rename2 {
818 arg,
819 oldname,
820 newname,
821 } => {
822 let reply = ReplyEmpty::new(req.unique(), file);
823 let param = RenameParam {
824 old_parent: req.nodeid(),
825 old_name: oldname.to_owned(),
826 new_parent: arg.newdir,
827 new_name: newname.to_owned(),
828 flags: arg.flags,
829 };
830 fs.rename(req, param, reply).await
831 }
832 Operation::LSeek { arg } => {
834 error!("LSeek not implemented, arg={:?}", arg);
835 not_implement_helper(req, file).await
836 }
837 Operation::CopyFileRange { arg } => {
839 error!("ReadDirPlusCopyFileRange not implemented, arg={:?}", arg);
840 not_implement_helper(req, file).await
841 }
842 #[cfg(feature = "abi-7-11")]
843 Operation::CuseInit { arg } => {
844 panic!("unsupported CuseInit arg={arg:?}");
845 }
846 };
847
848 result
849}
850
851async fn not_implement_helper(req: &Request<'_>, file: &mut File) -> nix::Result<usize> {
853 let reply = ReplyEmpty::new(req.unique(), file);
854 reply.error_code(Errno::ENOSYS).await
855}