1use crate::{
2 bytes::{Bytes, FillBytes},
3 conn::{Connection, MountOptions},
4 decoder::Decoder,
5 op::{DecodeError, Operation},
6};
7use polyfuse_kernel::*;
8use std::{
9 cmp,
10 convert::{TryFrom, TryInto as _},
11 ffi::OsStr,
12 fmt,
13 io::{self, prelude::*, IoSlice, IoSliceMut},
14 mem::{self, MaybeUninit},
15 os::unix::prelude::*,
16 path::{Path, PathBuf},
17 sync::{
18 atomic::{AtomicBool, AtomicU64, Ordering},
19 Arc,
20 },
21};
22use zerocopy::IntoBytes as _;
23
24const MINIMUM_SUPPORTED_MINOR_VERSION: u32 = 23;
26
27const DEFAULT_MAX_WRITE: u32 = 16 * 1024 * 1024;
28const MIN_MAX_WRITE: u32 = FUSE_MIN_READ_BUFFER - BUFFER_HEADER_SIZE as u32;
29
30const MAX_MAX_PAGES: usize = 256;
32const BUFFER_HEADER_SIZE: usize = 0x1000;
34
35const DEFAULT_INIT_FLAGS: u32 = FUSE_ASYNC_READ
37 | FUSE_PARALLEL_DIROPS
38 | FUSE_AUTO_INVAL_DATA
39 | FUSE_HANDLE_KILLPRIV
40 | FUSE_ASYNC_DIO
41 | FUSE_ATOMIC_O_TRUNC;
42
43const INIT_FLAGS_MASK: u32 = FUSE_ASYNC_READ
44 | FUSE_ATOMIC_O_TRUNC
45 | FUSE_AUTO_INVAL_DATA
46 | FUSE_ASYNC_DIO
47 | FUSE_PARALLEL_DIROPS
48 | FUSE_HANDLE_KILLPRIV
49 | FUSE_POSIX_LOCKS
50 | FUSE_FLOCK_LOCKS
51 | FUSE_EXPORT_SUPPORT
52 | FUSE_DONT_MASK
53 | FUSE_WRITEBACK_CACHE
54 | FUSE_POSIX_ACL
55 | FUSE_DO_READDIRPLUS
56 | FUSE_READDIRPLUS_AUTO;
57
58pub struct KernelConfig {
63 mountopts: MountOptions,
64 init_out: fuse_init_out,
65}
66
67impl Default for KernelConfig {
68 fn default() -> Self {
69 Self {
70 mountopts: MountOptions::default(),
71 init_out: default_init_out(),
72 }
73 }
74}
75
76impl KernelConfig {
77 #[doc(hidden)] pub fn auto_unmount(&mut self, enabled: bool) -> &mut Self {
79 self.mountopts.auto_unmount = enabled;
80 self
81 }
82
83 #[doc(hidden)] pub fn mount_option(&mut self, option: &str) -> &mut Self {
85 for option in option.split(',').map(|s| s.trim()) {
86 match option {
87 "auto_unmount" => {
88 self.auto_unmount(true);
89 }
90 option => self.mountopts.options.push(option.to_owned()),
91 }
92 }
93 self
94 }
95
96 #[doc(hidden)] pub fn fusermount_path(&mut self, program: impl AsRef<OsStr>) -> &mut Self {
98 let program = Path::new(program.as_ref());
99 assert!(
100 program.is_absolute(),
101 "the binary path to `fusermount` must be absolute."
102 );
103 self.mountopts.fusermount_path = Some(program.to_owned());
104 self
105 }
106
107 #[doc(hidden)] pub fn fuse_comm_fd(&mut self, name: impl AsRef<OsStr>) -> &mut Self {
109 self.mountopts.fuse_comm_fd = Some(name.as_ref().to_owned());
110 self
111 }
112
113 #[inline]
114 fn set_init_flag(&mut self, flag: u32, enabled: bool) {
115 if enabled {
116 self.init_out.flags |= flag;
117 } else {
118 self.init_out.flags &= !flag;
119 }
120 }
121
122 pub fn async_read(&mut self, enabled: bool) -> &mut Self {
126 self.set_init_flag(FUSE_ASYNC_READ, enabled);
127 self
128 }
129
130 pub fn atomic_o_trunc(&mut self, enabled: bool) -> &mut Self {
134 self.set_init_flag(FUSE_ATOMIC_O_TRUNC, enabled);
135 self
136 }
137
138 pub fn auto_inval_data(&mut self, enabled: bool) -> &mut Self {
142 self.set_init_flag(FUSE_AUTO_INVAL_DATA, enabled);
143 self
144 }
145
146 pub fn async_dio(&mut self, enabled: bool) -> &mut Self {
150 self.set_init_flag(FUSE_ASYNC_DIO, enabled);
151 self
152 }
153
154 pub fn parallel_dirops(&mut self, enabled: bool) -> &mut Self {
158 self.set_init_flag(FUSE_PARALLEL_DIROPS, enabled);
159 self
160 }
161
162 pub fn handle_killpriv(&mut self, enabled: bool) -> &mut Self {
167 self.set_init_flag(FUSE_HANDLE_KILLPRIV, enabled);
168 self
169 }
170
171 pub fn posix_locks(&mut self, enabled: bool) -> &mut Self {
173 self.set_init_flag(FUSE_POSIX_LOCKS, enabled);
174 self
175 }
176
177 pub fn flock_locks(&mut self, enabled: bool) -> &mut Self {
179 self.set_init_flag(FUSE_FLOCK_LOCKS, enabled);
180 self
181 }
182
183 pub fn export_support(&mut self, enabled: bool) -> &mut Self {
185 self.set_init_flag(FUSE_EXPORT_SUPPORT, enabled);
186 self
187 }
188
189 pub fn dont_mask(&mut self, enabled: bool) -> &mut Self {
192 self.set_init_flag(FUSE_DONT_MASK, enabled);
193 self
194 }
195
196 pub fn writeback_cache(&mut self, enabled: bool) -> &mut Self {
198 self.set_init_flag(FUSE_WRITEBACK_CACHE, enabled);
199 self
200 }
201
202 pub fn posix_acl(&mut self, enabled: bool) -> &mut Self {
204 self.set_init_flag(FUSE_POSIX_ACL, enabled);
205 self
206 }
207
208 pub fn max_pages(&mut self, enabled: bool) -> &mut Self {
210 self.set_init_flag(FUSE_MAX_PAGES, enabled);
211 self
212 }
213
214 pub fn readdirplus(&mut self, enabled: bool) -> &mut Self {
216 self.set_init_flag(FUSE_DO_READDIRPLUS, enabled);
217 self
218 }
219
220 pub fn readdirplus_auto(&mut self, enabled: bool) -> &mut Self {
224 self.set_init_flag(FUSE_READDIRPLUS_AUTO, enabled);
225 self
226 }
227
228 pub fn max_readahead(&mut self, value: u32) -> &mut Self {
230 self.init_out.max_readahead = value;
231 self
232 }
233
234 pub fn max_write(&mut self, value: u32) -> &mut Self {
239 assert!(
240 value >= MIN_MAX_WRITE,
241 "max_write must be greater or equal to {}",
242 MIN_MAX_WRITE,
243 );
244 self.init_out.max_write = value;
245 self
246 }
247
248 pub fn max_background(&mut self, max_background: u16) -> &mut Self {
250 self.init_out.max_background = max_background;
251 self
252 }
253
254 pub fn congestion_threshold(&mut self, mut threshold: u16) -> &mut Self {
262 assert!(
263 threshold <= self.init_out.max_background,
264 "The congestion_threshold must be less or equal to max_background"
265 );
266 if threshold == 0 {
267 threshold = self.init_out.max_background * 3 / 4;
268 tracing::debug!(congestion_threshold = threshold);
269 }
270 self.init_out.congestion_threshold = threshold;
271 self
272 }
273
274 pub fn time_gran(&mut self, time_gran: u32) -> &mut Self {
280 self.init_out.time_gran = time_gran;
281 self
282 }
283}
284
285pub struct Session {
289 inner: Arc<SessionInner>,
290}
291
292impl fmt::Debug for Session {
293 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
294 f.debug_struct("Session").finish()
295 }
296}
297
298struct SessionInner {
299 conn: Connection,
300 init_out: fuse_init_out,
301 bufsize: usize,
302 exited: AtomicBool,
303 notify_unique: AtomicU64,
304}
305
306impl SessionInner {
307 #[inline]
308 fn exited(&self) -> bool {
309 self.exited.load(Ordering::SeqCst)
311 }
312
313 #[inline]
314 fn exit(&self) {
315 self.exited.store(true, Ordering::SeqCst)
317 }
318}
319
320impl Drop for Session {
321 fn drop(&mut self) {
322 self.inner.exit();
323 }
324}
325
326impl AsRawFd for Session {
327 fn as_raw_fd(&self) -> RawFd {
328 self.inner.conn.as_raw_fd()
329 }
330}
331
332impl Session {
333 pub fn mount(mountpoint: PathBuf, config: KernelConfig) -> io::Result<Self> {
335 let KernelConfig {
336 mountopts,
337 mut init_out,
338 } = config;
339
340 let conn = Connection::open(mountpoint, mountopts)?;
341
342 init_session(&mut init_out, &conn, &conn)?;
343 let bufsize = BUFFER_HEADER_SIZE + init_out.max_write as usize;
344
345 Ok(Self {
346 inner: Arc::new(SessionInner {
347 conn,
348 init_out,
349 bufsize,
350 exited: AtomicBool::new(false),
351 notify_unique: AtomicU64::new(0),
352 }),
353 })
354 }
355
356 pub fn no_open_support(&self) -> bool {
363 self.inner.init_out.flags & FUSE_NO_OPEN_SUPPORT != 0
364 }
365
366 pub fn no_opendir_support(&self) -> bool {
370 self.inner.init_out.flags & FUSE_NO_OPENDIR_SUPPORT != 0
371 }
372
373 pub fn next_request(&self) -> io::Result<Option<Request>> {
375 let mut conn = &self.inner.conn;
376
377 let mut header = fuse_in_header::default();
379 let mut arg = vec![0u8; self.inner.bufsize - mem::size_of::<fuse_in_header>()];
380
381 loop {
382 match conn.read_vectored(&mut [
383 io::IoSliceMut::new(header.as_mut_bytes()),
384 io::IoSliceMut::new(&mut arg[..]),
385 ]) {
386 Ok(len) => {
387 if len < mem::size_of::<fuse_in_header>() {
388 return Err(io::Error::new(
389 io::ErrorKind::InvalidData,
390 "dequeued request message is too short",
391 ));
392 }
393 unsafe {
394 arg.set_len(len - mem::size_of::<fuse_in_header>());
395 }
396
397 break;
398 }
399
400 Err(err) => match err.raw_os_error() {
401 Some(libc::ENODEV) => {
402 tracing::debug!("ENODEV");
403 return Ok(None);
404 }
405 Some(libc::ENOENT) => {
406 tracing::debug!("ENOENT");
407 continue;
408 }
409 _ => return Err(err),
410 },
411 }
412 }
413
414 Ok(Some(Request {
415 session: self.inner.clone(),
416 header,
417 arg,
418 }))
419 }
420
421 pub fn notifier(&self) -> Notifier {
423 Notifier {
424 session: self.inner.clone(),
425 }
426 }
427}
428
429fn init_session<R, W>(init_out: &mut fuse_init_out, mut reader: R, mut writer: W) -> io::Result<()>
430where
431 R: io::Read,
432 W: io::Write,
433{
434 let mut header = fuse_in_header::default();
436 let mut arg = vec![0u8; pagesize() * MAX_MAX_PAGES];
437
438 for _ in 0..10 {
439 let len = reader.read_vectored(&mut [
440 io::IoSliceMut::new(header.as_mut_bytes()),
441 io::IoSliceMut::new(&mut arg[..]),
442 ])?;
443 if len < mem::size_of::<fuse_in_header>() {
444 return Err(io::Error::new(
445 io::ErrorKind::InvalidData,
446 "request message is too short",
447 ));
448 }
449
450 let mut decoder = Decoder::new(&arg[..]);
451
452 match fuse_opcode::try_from(header.opcode) {
453 Ok(fuse_opcode::FUSE_INIT) => {
454 let init_in = decoder
455 .fetch::<fuse_init_in>() .map_err(|_| {
457 io::Error::new(io::ErrorKind::Other, "failed to decode fuse_init_in")
458 })?;
459
460 let capable = init_in.flags & INIT_FLAGS_MASK;
461 let readonly_flags = init_in.flags & !INIT_FLAGS_MASK;
462
463 tracing::debug!("INIT request:");
464 tracing::debug!(" proto = {}.{}:", init_in.major, init_in.minor);
465 tracing::debug!(" flags = 0x{:08x} ({:?})", init_in.flags, capable);
466 tracing::debug!(" max_readahead = 0x{:08X}", init_in.max_readahead);
467 tracing::debug!(" max_pages = {}", readonly_flags & FUSE_MAX_PAGES != 0);
468 tracing::debug!(
469 " no_open_support = {}",
470 readonly_flags & FUSE_NO_OPEN_SUPPORT != 0
471 );
472 tracing::debug!(
473 " no_opendir_support = {}",
474 readonly_flags & FUSE_NO_OPENDIR_SUPPORT != 0
475 );
476
477 if init_in.major > 7 {
478 tracing::debug!("wait for a second INIT request with an older version.");
479 let init_out = fuse_init_out {
480 major: FUSE_KERNEL_VERSION,
481 minor: FUSE_KERNEL_MINOR_VERSION,
482 ..Default::default()
483 };
484 write_bytes(
485 &mut writer,
486 Reply::new(header.unique, 0, init_out.as_bytes()),
487 )?;
488 continue;
489 }
490
491 if init_in.major < 7 || init_in.minor < MINIMUM_SUPPORTED_MINOR_VERSION {
492 tracing::warn!(
493 "polyfuse supports only ABI 7.{} or later. {}.{} is not supported",
494 MINIMUM_SUPPORTED_MINOR_VERSION,
495 init_in.major,
496 init_in.minor
497 );
498 write_bytes(&mut writer, Reply::new(header.unique, libc::EPROTO, ()))?;
499 continue;
500 }
501
502 init_out.minor = cmp::min(init_out.minor, init_in.minor);
503
504 init_out.max_readahead = cmp::min(init_out.max_readahead, init_in.max_readahead);
505
506 init_out.flags &= capable;
507 init_out.flags |= FUSE_BIG_WRITES; if init_in.flags & FUSE_MAX_PAGES != 0 {
510 init_out.flags |= FUSE_MAX_PAGES;
511 init_out.max_pages = cmp::min(
512 (init_out.max_write - 1) / (pagesize() as u32) + 1,
513 u16::max_value() as u32,
514 ) as u16;
515 }
516
517 debug_assert_eq!(init_out.major, FUSE_KERNEL_VERSION);
518 debug_assert!(init_out.minor >= MINIMUM_SUPPORTED_MINOR_VERSION);
519
520 tracing::debug!("Reply to INIT:");
521 tracing::debug!(" proto = {}.{}:", init_out.major, init_out.minor);
522 tracing::debug!(" flags = 0x{:08x}", init_out.flags);
523 tracing::debug!(" max_readahead = 0x{:08X}", init_out.max_readahead);
524 tracing::debug!(" max_write = 0x{:08X}", init_out.max_write);
525 tracing::debug!(" max_background = 0x{:04X}", init_out.max_background);
526 tracing::debug!(
527 " congestion_threshold = 0x{:04X}",
528 init_out.congestion_threshold
529 );
530 tracing::debug!(" time_gran = {}", init_out.time_gran);
531 write_bytes(writer, Reply::new(header.unique, 0, init_out.as_bytes()))?;
532
533 init_out.flags |= readonly_flags;
534
535 return Ok(());
536 }
537
538 _ => {
539 tracing::warn!(
540 "ignoring an operation before init (opcode={:?})",
541 header.opcode
542 );
543 write_bytes(&mut writer, Reply::new(header.unique, libc::EIO, ()))?;
544 continue;
545 }
546 }
547 }
548
549 Err(io::Error::new(
550 io::ErrorKind::ConnectionRefused,
551 "session initialization is aborted",
552 ))
553}
554
555pub struct Request {
559 session: Arc<SessionInner>,
560 header: fuse_in_header,
561 arg: Vec<u8>,
562}
563
564impl Request {
565 #[inline]
567 pub fn unique(&self) -> u64 {
568 self.header.unique
569 }
570
571 #[inline]
573 pub fn uid(&self) -> u32 {
574 self.header.uid
575 }
576
577 #[inline]
579 pub fn gid(&self) -> u32 {
580 self.header.gid
581 }
582
583 #[inline]
585 pub fn pid(&self) -> u32 {
586 self.header.pid
587 }
588
589 pub fn operation(&self) -> Result<Operation<'_, Data<'_>>, DecodeError> {
591 if self.session.exited() {
592 return Ok(Operation::unknown());
593 }
594
595 let (arg, data) = match fuse_opcode::try_from(self.header.opcode).ok() {
596 Some(fuse_opcode::FUSE_WRITE) | Some(fuse_opcode::FUSE_NOTIFY_REPLY) => {
597 self.arg.split_at(mem::size_of::<fuse_write_in>())
598 }
599 _ => (&self.arg[..], &[] as &[_]),
600 };
601
602 Operation::decode(&self.header, arg, Data { data })
603 }
604
605 pub fn reply<T>(&self, arg: T) -> io::Result<()>
606 where
607 T: Bytes,
608 {
609 write_bytes(&self.session.conn, Reply::new(self.unique(), 0, arg))
610 }
611
612 pub fn reply_error(&self, code: i32) -> io::Result<()> {
613 write_bytes(&self.session.conn, Reply::new(self.unique(), code, ()))
614 }
615}
616
617pub struct Data<'op> {
619 data: &'op [u8],
620}
621
622impl fmt::Debug for Data<'_> {
623 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
624 f.debug_struct("Data").finish()
625 }
626}
627
628impl<'op> io::Read for Data<'op> {
629 #[inline]
630 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
631 io::Read::read(&mut self.data, buf)
632 }
633
634 #[inline]
635 fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> {
636 io::Read::read_vectored(&mut self.data, bufs)
637 }
638}
639
640impl<'op> BufRead for Data<'op> {
641 #[inline]
642 fn fill_buf(&mut self) -> io::Result<&[u8]> {
643 io::BufRead::fill_buf(&mut self.data)
644 }
645
646 #[inline]
647 fn consume(&mut self, amt: usize) {
648 io::BufRead::consume(&mut self.data, amt)
649 }
650}
651
652#[derive(Clone)]
655pub struct Notifier {
656 session: Arc<SessionInner>,
657}
658
659impl Notifier {
660 pub fn inval_inode(&self, ino: u64, off: i64, len: i64) -> io::Result<()> {
662 let total_len = u32::try_from(
663 mem::size_of::<fuse_out_header>() + mem::size_of::<fuse_notify_inval_inode_out>(),
664 )
665 .unwrap();
666
667 return write_bytes(
668 &self.session.conn,
669 InvalInode {
670 header: fuse_out_header {
671 len: total_len,
672 error: fuse_notify_code::FUSE_NOTIFY_INVAL_INODE as i32,
673 unique: 0,
674 },
675 arg: fuse_notify_inval_inode_out { ino, off, len },
676 },
677 );
678
679 struct InvalInode {
680 header: fuse_out_header,
681 arg: fuse_notify_inval_inode_out,
682 }
683 impl Bytes for InvalInode {
684 fn size(&self) -> usize {
685 self.header.len as usize
686 }
687
688 fn count(&self) -> usize {
689 2
690 }
691
692 fn fill_bytes<'a>(&'a self, dst: &mut dyn FillBytes<'a>) {
693 dst.put(self.header.as_bytes());
694 dst.put(self.arg.as_bytes());
695 }
696 }
697 }
698
699 pub fn inval_entry<T>(&self, parent: u64, name: T) -> io::Result<()>
701 where
702 T: AsRef<OsStr>,
703 {
704 let namelen = u32::try_from(name.as_ref().len()).expect("provided name is too long");
705
706 let total_len = u32::try_from(
707 mem::size_of::<fuse_out_header>()
708 + mem::size_of::<fuse_notify_inval_entry_out>()
709 + name.as_ref().len()
710 + 1,
711 )
712 .unwrap();
713
714 return write_bytes(
715 &self.session.conn,
716 InvalEntry {
717 header: fuse_out_header {
718 len: total_len,
719 error: fuse_notify_code::FUSE_NOTIFY_INVAL_ENTRY as i32,
720 unique: 0,
721 },
722 arg: fuse_notify_inval_entry_out {
723 parent,
724 namelen,
725 padding: 0,
726 },
727 name,
728 },
729 );
730
731 struct InvalEntry<T>
732 where
733 T: AsRef<OsStr>,
734 {
735 header: fuse_out_header,
736 arg: fuse_notify_inval_entry_out,
737 name: T,
738 }
739 impl<T> Bytes for InvalEntry<T>
740 where
741 T: AsRef<OsStr>,
742 {
743 fn size(&self) -> usize {
744 self.header.len as usize
745 }
746
747 fn count(&self) -> usize {
748 4
749 }
750
751 fn fill_bytes<'a>(&'a self, dst: &mut dyn FillBytes<'a>) {
752 dst.put(self.header.as_bytes());
753 dst.put(self.arg.as_bytes());
754 dst.put(self.name.as_ref().as_bytes());
755 dst.put(b"\0"); }
757 }
758 }
759
760 pub fn delete<T>(&self, parent: u64, child: u64, name: T) -> io::Result<()>
767 where
768 T: AsRef<OsStr>,
769 {
770 let namelen = u32::try_from(name.as_ref().len()).expect("provided name is too long");
771
772 let total_len = u32::try_from(
773 mem::size_of::<fuse_out_header>()
774 + mem::size_of::<fuse_notify_delete_out>()
775 + name.as_ref().len()
776 + 1,
777 )
778 .expect("payload is too long");
779
780 return write_bytes(
781 &self.session.conn,
782 Delete {
783 header: fuse_out_header {
784 len: total_len,
785 error: fuse_notify_code::FUSE_NOTIFY_DELETE as i32,
786 unique: 0,
787 },
788 arg: fuse_notify_delete_out {
789 parent,
790 child,
791 namelen,
792 padding: 0,
793 },
794 name,
795 },
796 );
797
798 struct Delete<T>
799 where
800 T: AsRef<OsStr>,
801 {
802 header: fuse_out_header,
803 arg: fuse_notify_delete_out,
804 name: T,
805 }
806 impl<T> Bytes for Delete<T>
807 where
808 T: AsRef<OsStr>,
809 {
810 fn size(&self) -> usize {
811 self.header.len as usize
812 }
813
814 fn count(&self) -> usize {
815 4
816 }
817
818 fn fill_bytes<'a>(&'a self, dst: &mut dyn FillBytes<'a>) {
819 dst.put(self.header.as_bytes());
820 dst.put(self.arg.as_bytes());
821 dst.put(self.name.as_ref().as_bytes());
822 dst.put(b"\0"); }
824 }
825 }
826
827 pub fn store<T>(&self, ino: u64, offset: u64, data: T) -> io::Result<()>
829 where
830 T: Bytes,
831 {
832 let size = u32::try_from(data.size()).expect("provided data is too large");
833
834 let total_len = u32::try_from(
835 mem::size_of::<fuse_out_header>()
836 + mem::size_of::<fuse_notify_store_out>()
837 + data.size(),
838 )
839 .expect("payload is too long");
840
841 return write_bytes(
842 &self.session.conn,
843 Store {
844 header: fuse_out_header {
845 len: total_len,
846 error: fuse_notify_code::FUSE_NOTIFY_STORE as i32,
847 unique: 0,
848 },
849 arg: fuse_notify_store_out {
850 nodeid: ino,
851 offset,
852 size,
853 padding: 0,
854 },
855 data,
856 },
857 );
858
859 struct Store<T>
860 where
861 T: Bytes,
862 {
863 header: fuse_out_header,
864 arg: fuse_notify_store_out,
865 data: T,
866 }
867 impl<T> Bytes for Store<T>
868 where
869 T: Bytes,
870 {
871 fn size(&self) -> usize {
872 self.header.len as usize
873 }
874
875 fn count(&self) -> usize {
876 2 + self.data.count()
877 }
878
879 fn fill_bytes<'a>(&'a self, dst: &mut dyn FillBytes<'a>) {
880 dst.put(self.header.as_bytes());
881 dst.put(self.arg.as_bytes());
882 self.data.fill_bytes(dst);
883 }
884 }
885 }
886
887 pub fn retrieve(&self, ino: u64, offset: u64, size: u32) -> io::Result<u64> {
889 let total_len = u32::try_from(
890 mem::size_of::<fuse_out_header>() + mem::size_of::<fuse_notify_retrieve_out>(),
891 )
892 .unwrap();
893
894 let notify_unique = self.session.notify_unique.fetch_add(1, Ordering::SeqCst);
896
897 write_bytes(
898 &self.session.conn,
899 Retrieve {
900 header: fuse_out_header {
901 len: total_len,
902 error: fuse_notify_code::FUSE_NOTIFY_RETRIEVE as i32,
903 unique: 0,
904 },
905 arg: fuse_notify_retrieve_out {
906 nodeid: ino,
907 offset,
908 size,
909 notify_unique,
910 padding: 0,
911 },
912 },
913 )?;
914
915 return Ok(notify_unique);
916
917 struct Retrieve {
918 header: fuse_out_header,
919 arg: fuse_notify_retrieve_out,
920 }
921 impl Bytes for Retrieve {
922 fn size(&self) -> usize {
923 self.header.len as usize
924 }
925
926 fn count(&self) -> usize {
927 2
928 }
929
930 fn fill_bytes<'a>(&'a self, dst: &mut dyn FillBytes<'a>) {
931 dst.put(self.header.as_bytes());
932 dst.put(self.arg.as_bytes());
933 }
934 }
935 }
936
937 pub fn poll_wakeup(&self, kh: u64) -> io::Result<()> {
939 let total_len = u32::try_from(
940 mem::size_of::<fuse_out_header>() + mem::size_of::<fuse_notify_poll_wakeup_out>(),
941 )
942 .unwrap();
943
944 return write_bytes(
945 &self.session.conn,
946 PollWakeup {
947 header: fuse_out_header {
948 len: total_len,
949 error: fuse_notify_code::FUSE_NOTIFY_POLL as i32,
950 unique: 0,
951 },
952 arg: fuse_notify_poll_wakeup_out { kh },
953 },
954 );
955
956 struct PollWakeup {
957 header: fuse_out_header,
958 arg: fuse_notify_poll_wakeup_out,
959 }
960 impl Bytes for PollWakeup {
961 fn size(&self) -> usize {
962 self.header.len as usize
963 }
964
965 fn count(&self) -> usize {
966 2
967 }
968
969 fn fill_bytes<'a>(&'a self, dst: &mut dyn FillBytes<'a>) {
970 dst.put(self.header.as_bytes());
971 dst.put(self.arg.as_bytes());
972 }
973 }
974 }
975}
976
977struct Reply<T> {
980 header: fuse_out_header,
981 arg: T,
982}
983impl<T> Reply<T>
984where
985 T: Bytes,
986{
987 #[inline]
988 fn new(unique: u64, error: i32, arg: T) -> Self {
989 let len = (mem::size_of::<fuse_out_header>() + arg.size())
990 .try_into()
991 .expect("Argument size is too large");
992 Self {
993 header: fuse_out_header {
994 len,
995 error: -error,
996 unique,
997 },
998 arg,
999 }
1000 }
1001}
1002impl<T> Bytes for Reply<T>
1003where
1004 T: Bytes,
1005{
1006 #[inline]
1007 fn size(&self) -> usize {
1008 self.header.len as usize
1009 }
1010
1011 #[inline]
1012 fn count(&self) -> usize {
1013 self.arg.count() + 1
1014 }
1015
1016 fn fill_bytes<'a>(&'a self, dst: &mut dyn FillBytes<'a>) {
1017 dst.put(self.header.as_bytes());
1018 self.arg.fill_bytes(dst);
1019 }
1020}
1021
1022#[inline]
1023fn write_bytes<W, T>(mut writer: W, bytes: T) -> io::Result<()>
1024where
1025 W: io::Write,
1026 T: Bytes,
1027{
1028 let size = bytes.size();
1029 let count = bytes.count();
1030
1031 let written;
1032
1033 macro_rules! small_write {
1034 ($n:expr) => {{
1035 let mut vec: [MaybeUninit<IoSlice<'_>>; $n] =
1036 unsafe { MaybeUninit::uninit().assume_init() };
1037 bytes.fill_bytes(&mut FillWriteBytes {
1038 vec: &mut vec[..],
1039 offset: 0,
1040 });
1041 let vec = unsafe { slice_assume_init_ref(&vec[..]) };
1042
1043 written = writer.write_vectored(vec)?;
1044 }};
1045 }
1046
1047 match count {
1048 0 => return Ok(()),
1050
1051 1 => small_write!(1),
1053 2 => small_write!(2),
1054 3 => small_write!(3),
1055 4 => small_write!(4),
1056
1057 count => {
1058 let mut vec: Vec<IoSlice<'_>> = Vec::with_capacity(count);
1059 unsafe {
1060 let dst = std::slice::from_raw_parts_mut(
1061 vec.as_mut_ptr().cast(), count,
1063 );
1064 bytes.fill_bytes(&mut FillWriteBytes {
1065 vec: dst,
1066 offset: 0,
1067 });
1068 vec.set_len(count);
1069 }
1070
1071 written = writer.write_vectored(&*vec)?;
1072 }
1073 }
1074
1075 if written < size {
1076 return Err(io::Error::new(
1077 io::ErrorKind::Other,
1078 "written data is too short",
1079 ));
1080 }
1081
1082 Ok(())
1083}
1084
1085struct FillWriteBytes<'a, 'vec> {
1086 vec: &'vec mut [MaybeUninit<IoSlice<'a>>],
1087 offset: usize,
1088}
1089
1090impl<'a, 'vec> FillBytes<'a> for FillWriteBytes<'a, 'vec> {
1091 fn put(&mut self, chunk: &'a [u8]) {
1092 self.vec[self.offset] = MaybeUninit::new(IoSlice::new(chunk));
1093 self.offset += 1;
1094 }
1095}
1096
1097#[inline(always)]
1099unsafe fn slice_assume_init_ref<T>(slice: &[MaybeUninit<T>]) -> &[T] {
1100 #[allow(unused_unsafe)]
1101 unsafe {
1102 &*(slice as *const [MaybeUninit<T>] as *const [T])
1103 }
1104}
1105
1106#[inline]
1107fn pagesize() -> usize {
1108 unsafe { libc::sysconf(libc::_SC_PAGESIZE) as usize }
1109}
1110
1111#[inline]
1112const fn default_init_out() -> fuse_init_out {
1113 fuse_init_out {
1114 major: FUSE_KERNEL_VERSION,
1115 minor: FUSE_KERNEL_MINOR_VERSION,
1116 max_readahead: u32::MAX,
1117 flags: DEFAULT_INIT_FLAGS,
1118 max_background: 0,
1119 congestion_threshold: 0,
1120 max_write: DEFAULT_MAX_WRITE,
1121 time_gran: 1,
1122 max_pages: 0,
1123 padding: 0,
1124 unused: [0; 8],
1125 }
1126}
1127
1128#[cfg(test)]
1129mod tests {
1130 use super::*;
1131 use std::mem;
1132
1133 #[test]
1134 fn init_default() {
1135 let input_len = mem::size_of::<fuse_in_header>() + mem::size_of::<fuse_init_in>();
1136 let in_header = fuse_in_header {
1137 len: input_len as u32,
1138 opcode: fuse_opcode::FUSE_INIT as u32,
1139 unique: 2,
1140 nodeid: 0,
1141 uid: 100,
1142 gid: 100,
1143 pid: 12,
1144 padding: 0,
1145 };
1146 let init_in = fuse_init_in {
1147 major: 7,
1148 minor: 23,
1149 max_readahead: 40,
1150 flags: INIT_FLAGS_MASK
1151 | FUSE_MAX_PAGES
1152 | FUSE_NO_OPEN_SUPPORT
1153 | FUSE_NO_OPENDIR_SUPPORT,
1154 };
1155
1156 let mut input = Vec::with_capacity(input_len);
1157 input.extend_from_slice(in_header.as_bytes());
1158 input.extend_from_slice(init_in.as_bytes());
1159 assert_eq!(input.len(), input_len);
1160
1161 let mut output = Vec::<u8>::new();
1162
1163 let mut init_out = default_init_out();
1164 init_session(&mut init_out, &input[..], &mut output).expect("initialization failed");
1165
1166 let expected_max_pages = (DEFAULT_MAX_WRITE / (pagesize() as u32)) as u16;
1167
1168 assert_eq!(init_out.major, 7);
1169 assert_eq!(init_out.minor, 23);
1170 assert_eq!(init_out.max_readahead, 40);
1171 assert_eq!(init_out.max_background, 0);
1172 assert_eq!(init_out.congestion_threshold, 0);
1173 assert_eq!(init_out.max_write, DEFAULT_MAX_WRITE);
1174 assert_eq!(init_out.max_pages, expected_max_pages);
1175 assert_eq!(init_out.time_gran, 1);
1176 assert!(init_out.flags & FUSE_NO_OPEN_SUPPORT != 0);
1177 assert!(init_out.flags & FUSE_NO_OPENDIR_SUPPORT != 0);
1178
1179 let output_len = mem::size_of::<fuse_out_header>() + mem::size_of::<fuse_init_out>();
1180 let out_header = fuse_out_header {
1181 len: output_len as u32,
1182 error: 0,
1183 unique: 2,
1184 };
1185 let init_out = fuse_init_out {
1186 major: 7,
1187 minor: 23,
1188 max_readahead: 40,
1189 flags: DEFAULT_INIT_FLAGS | FUSE_MAX_PAGES | FUSE_BIG_WRITES,
1190 max_background: 0,
1191 congestion_threshold: 0,
1192 max_write: DEFAULT_MAX_WRITE,
1193 time_gran: 1,
1194 max_pages: expected_max_pages,
1195 padding: 0,
1196 unused: [0; 8],
1197 };
1198
1199 let mut expected = Vec::with_capacity(output_len);
1200 expected.extend_from_slice(out_header.as_bytes());
1201 expected.extend_from_slice(init_out.as_bytes());
1202 assert_eq!(output.len(), output_len);
1203
1204 assert_eq!(expected[0..4], output[0..4], "out_header.len");
1205 assert_eq!(expected[4..8], output[4..8], "out_header.error");
1206 assert_eq!(expected[8..16], output[8..16], "out_header.unique");
1207
1208 let expected = &expected[mem::size_of::<fuse_out_header>()..];
1209 let output = &output[mem::size_of::<fuse_out_header>()..];
1210 assert_eq!(expected[0..4], output[0..4], "init_out.major");
1211 assert_eq!(expected[4..8], output[4..8], "init_out.minor");
1212 assert_eq!(expected[8..12], output[8..12], "init_out.max_readahead");
1213 assert_eq!(expected[12..16], output[12..16], "init_out.flags");
1214 assert_eq!(expected[16..18], output[16..18], "init_out.max_background");
1215 assert_eq!(
1216 expected[18..20],
1217 output[18..20],
1218 "init_out.congestion_threshold"
1219 );
1220 assert_eq!(expected[20..24], output[20..24], "init_out.max_write");
1221 assert_eq!(expected[24..28], output[24..28], "init_out.time_gran");
1222 assert_eq!(expected[28..30], output[28..30], "init_out.max_pages");
1223 assert!(
1224 output[30..30 + 2 + 4 * 8].iter().all(|&b| b == 0x00),
1225 "init_out.paddings"
1226 );
1227 }
1228
1229 #[inline]
1230 fn bytes(bytes: &[u8]) -> &[u8] {
1231 bytes
1232 }
1233 macro_rules! b {
1234 ($($b:expr),*$(,)?) => ( *bytes(&[$($b),*]) );
1235 }
1236
1237 #[test]
1238 fn send_msg_empty() {
1239 let mut buf = vec![0u8; 0];
1240 write_bytes(&mut buf, Reply::new(42, -4, &[])).unwrap();
1241 assert_eq!(buf[0..4], b![0x10, 0x00, 0x00, 0x00], "header.len");
1242 assert_eq!(buf[4..8], b![0x04, 0x00, 0x00, 0x00], "header.error");
1243 assert_eq!(
1244 buf[8..16],
1245 b![0x2a, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00],
1246 "header.unique"
1247 );
1248 }
1249
1250 #[test]
1251 fn send_msg_single_data() {
1252 let mut buf = vec![0u8; 0];
1253 write_bytes(&mut buf, Reply::new(42, 0, "hello")).unwrap();
1254 assert_eq!(buf[0..4], b![0x15, 0x00, 0x00, 0x00], "header.len");
1255 assert_eq!(buf[4..8], b![0x00, 0x00, 0x00, 0x00], "header.error");
1256 assert_eq!(
1257 buf[8..16],
1258 b![0x2a, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00],
1259 "header.unique"
1260 );
1261 assert_eq!(buf[16..], b![0x68, 0x65, 0x6c, 0x6c, 0x6f], "payload");
1262 }
1263
1264 #[test]
1265 fn send_msg_chunked_data() {
1266 let payload: &[&[u8]] = &[
1267 "hello, ".as_ref(), "this ".as_ref(),
1269 "is a ".as_ref(),
1270 "message.".as_ref(),
1271 ];
1272 let mut buf = vec![0u8; 0];
1273 write_bytes(&mut buf, Reply::new(26, 0, payload)).unwrap();
1274 assert_eq!(buf[0..4], b![0x29, 0x00, 0x00, 0x00], "header.len");
1275 assert_eq!(buf[4..8], b![0x00, 0x00, 0x00, 0x00], "header.error");
1276 assert_eq!(
1277 buf[8..16],
1278 b![0x1a, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00],
1279 "header.unique"
1280 );
1281 assert_eq!(buf[16..], *b"hello, this is a message.", "payload");
1282 }
1283}