polyfuse/
session.rs

1use crate::{
2    bytes::{Bytes, FillBytes},
3    conn::{Connection, MountOptions},
4    decoder::Decoder,
5    op::{DecodeError, Operation},
6};
7use polyfuse_kernel::*;
8use std::{
9    cmp,
10    convert::{TryFrom, TryInto as _},
11    ffi::OsStr,
12    fmt,
13    io::{self, prelude::*, IoSlice, IoSliceMut},
14    mem::{self, MaybeUninit},
15    os::unix::prelude::*,
16    path::{Path, PathBuf},
17    sync::{
18        atomic::{AtomicBool, AtomicU64, Ordering},
19        Arc,
20    },
21};
22use zerocopy::IntoBytes as _;
23
24// The minimum supported ABI minor version by polyfuse.
25const MINIMUM_SUPPORTED_MINOR_VERSION: u32 = 23;
26
27const DEFAULT_MAX_WRITE: u32 = 16 * 1024 * 1024;
28const MIN_MAX_WRITE: u32 = FUSE_MIN_READ_BUFFER - BUFFER_HEADER_SIZE as u32;
29
30// copied from fuse_i.h
31const MAX_MAX_PAGES: usize = 256;
32//const DEFAULT_MAX_PAGES_PER_REQ: usize = 32;
33const BUFFER_HEADER_SIZE: usize = 0x1000;
34
35// TODO: add FUSE_IOCTL_DIR
36const DEFAULT_INIT_FLAGS: u32 = FUSE_ASYNC_READ
37    | FUSE_PARALLEL_DIROPS
38    | FUSE_AUTO_INVAL_DATA
39    | FUSE_HANDLE_KILLPRIV
40    | FUSE_ASYNC_DIO
41    | FUSE_ATOMIC_O_TRUNC;
42
43const INIT_FLAGS_MASK: u32 = FUSE_ASYNC_READ
44    | FUSE_ATOMIC_O_TRUNC
45    | FUSE_AUTO_INVAL_DATA
46    | FUSE_ASYNC_DIO
47    | FUSE_PARALLEL_DIROPS
48    | FUSE_HANDLE_KILLPRIV
49    | FUSE_POSIX_LOCKS
50    | FUSE_FLOCK_LOCKS
51    | FUSE_EXPORT_SUPPORT
52    | FUSE_DONT_MASK
53    | FUSE_WRITEBACK_CACHE
54    | FUSE_POSIX_ACL
55    | FUSE_DO_READDIRPLUS
56    | FUSE_READDIRPLUS_AUTO;
57
58// ==== KernelConfig ====
59
60/// Parameters for setting up the connection with FUSE driver
61/// and the kernel side behavior.
62pub struct KernelConfig {
63    mountopts: MountOptions,
64    init_out: fuse_init_out,
65}
66
67impl Default for KernelConfig {
68    fn default() -> Self {
69        Self {
70            mountopts: MountOptions::default(),
71            init_out: default_init_out(),
72        }
73    }
74}
75
76impl KernelConfig {
77    #[doc(hidden)] // TODO: dox
78    pub fn auto_unmount(&mut self, enabled: bool) -> &mut Self {
79        self.mountopts.auto_unmount = enabled;
80        self
81    }
82
83    #[doc(hidden)] // TODO: dox
84    pub fn mount_option(&mut self, option: &str) -> &mut Self {
85        for option in option.split(',').map(|s| s.trim()) {
86            match option {
87                "auto_unmount" => {
88                    self.auto_unmount(true);
89                }
90                option => self.mountopts.options.push(option.to_owned()),
91            }
92        }
93        self
94    }
95
96    #[doc(hidden)] // TODO: dox
97    pub fn fusermount_path(&mut self, program: impl AsRef<OsStr>) -> &mut Self {
98        let program = Path::new(program.as_ref());
99        assert!(
100            program.is_absolute(),
101            "the binary path to `fusermount` must be absolute."
102        );
103        self.mountopts.fusermount_path = Some(program.to_owned());
104        self
105    }
106
107    #[doc(hidden)] // TODO: dox
108    pub fn fuse_comm_fd(&mut self, name: impl AsRef<OsStr>) -> &mut Self {
109        self.mountopts.fuse_comm_fd = Some(name.as_ref().to_owned());
110        self
111    }
112
113    #[inline]
114    fn set_init_flag(&mut self, flag: u32, enabled: bool) {
115        if enabled {
116            self.init_out.flags |= flag;
117        } else {
118            self.init_out.flags &= !flag;
119        }
120    }
121
122    /// Specify that the filesystem supports asynchronous read requests.
123    ///
124    /// Enabled by default.
125    pub fn async_read(&mut self, enabled: bool) -> &mut Self {
126        self.set_init_flag(FUSE_ASYNC_READ, enabled);
127        self
128    }
129
130    /// Specify that the filesystem supports the `O_TRUNC` open flag.
131    ///
132    /// Enabled by default.
133    pub fn atomic_o_trunc(&mut self, enabled: bool) -> &mut Self {
134        self.set_init_flag(FUSE_ATOMIC_O_TRUNC, enabled);
135        self
136    }
137
138    /// Specify that the kernel check the validity of attributes on every read.
139    ///
140    /// Enabled by default.
141    pub fn auto_inval_data(&mut self, enabled: bool) -> &mut Self {
142        self.set_init_flag(FUSE_AUTO_INVAL_DATA, enabled);
143        self
144    }
145
146    /// Specify that the filesystem supports asynchronous direct I/O submission.
147    ///
148    /// Enabled by default.
149    pub fn async_dio(&mut self, enabled: bool) -> &mut Self {
150        self.set_init_flag(FUSE_ASYNC_DIO, enabled);
151        self
152    }
153
154    /// Specify that the kernel supports parallel directory operations.
155    ///
156    /// Enabled by default.
157    pub fn parallel_dirops(&mut self, enabled: bool) -> &mut Self {
158        self.set_init_flag(FUSE_PARALLEL_DIROPS, enabled);
159        self
160    }
161
162    /// Specify that the filesystem is responsible for unsetting setuid and setgid bits
163    /// when a file is written, truncated, or its owner is changed.
164    ///
165    /// Enabled by default.
166    pub fn handle_killpriv(&mut self, enabled: bool) -> &mut Self {
167        self.set_init_flag(FUSE_HANDLE_KILLPRIV, enabled);
168        self
169    }
170
171    /// The filesystem supports the POSIX-style file lock.
172    pub fn posix_locks(&mut self, enabled: bool) -> &mut Self {
173        self.set_init_flag(FUSE_POSIX_LOCKS, enabled);
174        self
175    }
176
177    /// Specify that the filesystem supports the `flock` handling.
178    pub fn flock_locks(&mut self, enabled: bool) -> &mut Self {
179        self.set_init_flag(FUSE_FLOCK_LOCKS, enabled);
180        self
181    }
182
183    /// Specify that the filesystem supports lookups of `"."` and `".."`.
184    pub fn export_support(&mut self, enabled: bool) -> &mut Self {
185        self.set_init_flag(FUSE_EXPORT_SUPPORT, enabled);
186        self
187    }
188
189    /// Specify that the kernel should not apply the umask to the file mode
190    /// on `create` operations.
191    pub fn dont_mask(&mut self, enabled: bool) -> &mut Self {
192        self.set_init_flag(FUSE_DONT_MASK, enabled);
193        self
194    }
195
196    /// Specify that the kernel should enable writeback caching.
197    pub fn writeback_cache(&mut self, enabled: bool) -> &mut Self {
198        self.set_init_flag(FUSE_WRITEBACK_CACHE, enabled);
199        self
200    }
201
202    /// Specify that the filesystem supports POSIX access control lists.
203    pub fn posix_acl(&mut self, enabled: bool) -> &mut Self {
204        self.set_init_flag(FUSE_POSIX_ACL, enabled);
205        self
206    }
207
208    /// Specify that the value of max_pages should be derived from max_write.
209    pub fn max_pages(&mut self, enabled: bool) -> &mut Self {
210        self.set_init_flag(FUSE_MAX_PAGES, enabled);
211        self
212    }
213
214    /// Specify that the filesystem supports `readdirplus` operations.
215    pub fn readdirplus(&mut self, enabled: bool) -> &mut Self {
216        self.set_init_flag(FUSE_DO_READDIRPLUS, enabled);
217        self
218    }
219
220    /// Indicates that the kernel uses the adaptive readdirplus.
221    ///
222    /// This option is meaningful only if `readdirplus` is enabled.
223    pub fn readdirplus_auto(&mut self, enabled: bool) -> &mut Self {
224        self.set_init_flag(FUSE_READDIRPLUS_AUTO, enabled);
225        self
226    }
227
228    /// Set the maximum readahead.
229    pub fn max_readahead(&mut self, value: u32) -> &mut Self {
230        self.init_out.max_readahead = value;
231        self
232    }
233
234    /// Set the maximum size of the write buffer.
235    ///
236    /// # Panic
237    /// It causes an assertion panic if the setting value is less than the absolute minimum.
238    pub fn max_write(&mut self, value: u32) -> &mut Self {
239        assert!(
240            value >= MIN_MAX_WRITE,
241            "max_write must be greater or equal to {}",
242            MIN_MAX_WRITE,
243        );
244        self.init_out.max_write = value;
245        self
246    }
247
248    /// Return the maximum number of pending *background* requests.
249    pub fn max_background(&mut self, max_background: u16) -> &mut Self {
250        self.init_out.max_background = max_background;
251        self
252    }
253
254    /// Set the threshold number of pending background requests that the kernel marks
255    /// the filesystem as *congested*.
256    ///
257    /// If the setting value is 0, the value is automatically calculated by using max_background.
258    ///
259    /// # Panics
260    /// It cause a panic if the setting value is greater than `max_background`.
261    pub fn congestion_threshold(&mut self, mut threshold: u16) -> &mut Self {
262        assert!(
263            threshold <= self.init_out.max_background,
264            "The congestion_threshold must be less or equal to max_background"
265        );
266        if threshold == 0 {
267            threshold = self.init_out.max_background * 3 / 4;
268            tracing::debug!(congestion_threshold = threshold);
269        }
270        self.init_out.congestion_threshold = threshold;
271        self
272    }
273
274    /// Set the timestamp resolution supported by the filesystem.
275    ///
276    /// The setting value has the nanosecond unit and should be a power of 10.
277    ///
278    /// The default value is 1.
279    pub fn time_gran(&mut self, time_gran: u32) -> &mut Self {
280        self.init_out.time_gran = time_gran;
281        self
282    }
283}
284
285// ==== Session ====
286
287/// The object containing the contextrual information about a FUSE session.
288pub struct Session {
289    inner: Arc<SessionInner>,
290}
291
292impl fmt::Debug for Session {
293    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
294        f.debug_struct("Session").finish()
295    }
296}
297
298struct SessionInner {
299    conn: Connection,
300    init_out: fuse_init_out,
301    bufsize: usize,
302    exited: AtomicBool,
303    notify_unique: AtomicU64,
304}
305
306impl SessionInner {
307    #[inline]
308    fn exited(&self) -> bool {
309        // FIXME: choose appropriate atomic ordering.
310        self.exited.load(Ordering::SeqCst)
311    }
312
313    #[inline]
314    fn exit(&self) {
315        // FIXME: choose appropriate atomic ordering.
316        self.exited.store(true, Ordering::SeqCst)
317    }
318}
319
320impl Drop for Session {
321    fn drop(&mut self) {
322        self.inner.exit();
323    }
324}
325
326impl AsRawFd for Session {
327    fn as_raw_fd(&self) -> RawFd {
328        self.inner.conn.as_raw_fd()
329    }
330}
331
332impl Session {
333    /// Start a FUSE daemon mount on the specified path.
334    pub fn mount(mountpoint: PathBuf, config: KernelConfig) -> io::Result<Self> {
335        let KernelConfig {
336            mountopts,
337            mut init_out,
338        } = config;
339
340        let conn = Connection::open(mountpoint, mountopts)?;
341
342        init_session(&mut init_out, &conn, &conn)?;
343        let bufsize = BUFFER_HEADER_SIZE + init_out.max_write as usize;
344
345        Ok(Self {
346            inner: Arc::new(SessionInner {
347                conn,
348                init_out,
349                bufsize,
350                exited: AtomicBool::new(false),
351                notify_unique: AtomicU64::new(0),
352            }),
353        })
354    }
355
356    /// Return whether the kernel supports for zero-message opens.
357    ///
358    /// When the returned value is `true`, the kernel treat an `ENOSYS`
359    /// error for a `FUSE_OPEN` request as successful and does not send
360    /// subsequent `open` requests.  Otherwise, the filesystem should
361    /// implement the handler for `open` requests appropriately.
362    pub fn no_open_support(&self) -> bool {
363        self.inner.init_out.flags & FUSE_NO_OPEN_SUPPORT != 0
364    }
365
366    /// Return whether the kernel supports for zero-message opendirs.
367    ///
368    /// See the documentation of `no_open_support` for details.
369    pub fn no_opendir_support(&self) -> bool {
370        self.inner.init_out.flags & FUSE_NO_OPENDIR_SUPPORT != 0
371    }
372
373    /// Receive an incoming FUSE request from the kernel.
374    pub fn next_request(&self) -> io::Result<Option<Request>> {
375        let mut conn = &self.inner.conn;
376
377        // FIXME: Align the allocated region in `arg` with the FUSE argument types.
378        let mut header = fuse_in_header::default();
379        let mut arg = vec![0u8; self.inner.bufsize - mem::size_of::<fuse_in_header>()];
380
381        loop {
382            match conn.read_vectored(&mut [
383                io::IoSliceMut::new(header.as_mut_bytes()),
384                io::IoSliceMut::new(&mut arg[..]),
385            ]) {
386                Ok(len) => {
387                    if len < mem::size_of::<fuse_in_header>() {
388                        return Err(io::Error::new(
389                            io::ErrorKind::InvalidData,
390                            "dequeued request message is too short",
391                        ));
392                    }
393                    unsafe {
394                        arg.set_len(len - mem::size_of::<fuse_in_header>());
395                    }
396
397                    break;
398                }
399
400                Err(err) => match err.raw_os_error() {
401                    Some(libc::ENODEV) => {
402                        tracing::debug!("ENODEV");
403                        return Ok(None);
404                    }
405                    Some(libc::ENOENT) => {
406                        tracing::debug!("ENOENT");
407                        continue;
408                    }
409                    _ => return Err(err),
410                },
411            }
412        }
413
414        Ok(Some(Request {
415            session: self.inner.clone(),
416            header,
417            arg,
418        }))
419    }
420
421    /// Create an instance of `Notifier` corresponding to this session.
422    pub fn notifier(&self) -> Notifier {
423        Notifier {
424            session: self.inner.clone(),
425        }
426    }
427}
428
429fn init_session<R, W>(init_out: &mut fuse_init_out, mut reader: R, mut writer: W) -> io::Result<()>
430where
431    R: io::Read,
432    W: io::Write,
433{
434    // FIXME: align the allocated buffer in `buf` with FUSE argument types.
435    let mut header = fuse_in_header::default();
436    let mut arg = vec![0u8; pagesize() * MAX_MAX_PAGES];
437
438    for _ in 0..10 {
439        let len = reader.read_vectored(&mut [
440            io::IoSliceMut::new(header.as_mut_bytes()),
441            io::IoSliceMut::new(&mut arg[..]),
442        ])?;
443        if len < mem::size_of::<fuse_in_header>() {
444            return Err(io::Error::new(
445                io::ErrorKind::InvalidData,
446                "request message is too short",
447            ));
448        }
449
450        let mut decoder = Decoder::new(&arg[..]);
451
452        match fuse_opcode::try_from(header.opcode) {
453            Ok(fuse_opcode::FUSE_INIT) => {
454                let init_in = decoder
455                    .fetch::<fuse_init_in>() //
456                    .map_err(|_| {
457                        io::Error::new(io::ErrorKind::Other, "failed to decode fuse_init_in")
458                    })?;
459
460                let capable = init_in.flags & INIT_FLAGS_MASK;
461                let readonly_flags = init_in.flags & !INIT_FLAGS_MASK;
462
463                tracing::debug!("INIT request:");
464                tracing::debug!("  proto = {}.{}:", init_in.major, init_in.minor);
465                tracing::debug!("  flags = 0x{:08x} ({:?})", init_in.flags, capable);
466                tracing::debug!("  max_readahead = 0x{:08X}", init_in.max_readahead);
467                tracing::debug!("  max_pages = {}", readonly_flags & FUSE_MAX_PAGES != 0);
468                tracing::debug!(
469                    "  no_open_support = {}",
470                    readonly_flags & FUSE_NO_OPEN_SUPPORT != 0
471                );
472                tracing::debug!(
473                    "  no_opendir_support = {}",
474                    readonly_flags & FUSE_NO_OPENDIR_SUPPORT != 0
475                );
476
477                if init_in.major > 7 {
478                    tracing::debug!("wait for a second INIT request with an older version.");
479                    let init_out = fuse_init_out {
480                        major: FUSE_KERNEL_VERSION,
481                        minor: FUSE_KERNEL_MINOR_VERSION,
482                        ..Default::default()
483                    };
484                    write_bytes(
485                        &mut writer,
486                        Reply::new(header.unique, 0, init_out.as_bytes()),
487                    )?;
488                    continue;
489                }
490
491                if init_in.major < 7 || init_in.minor < MINIMUM_SUPPORTED_MINOR_VERSION {
492                    tracing::warn!(
493                        "polyfuse supports only ABI 7.{} or later. {}.{} is not supported",
494                        MINIMUM_SUPPORTED_MINOR_VERSION,
495                        init_in.major,
496                        init_in.minor
497                    );
498                    write_bytes(&mut writer, Reply::new(header.unique, libc::EPROTO, ()))?;
499                    continue;
500                }
501
502                init_out.minor = cmp::min(init_out.minor, init_in.minor);
503
504                init_out.max_readahead = cmp::min(init_out.max_readahead, init_in.max_readahead);
505
506                init_out.flags &= capable;
507                init_out.flags |= FUSE_BIG_WRITES; // the flag was superseded by `max_write`.
508
509                if init_in.flags & FUSE_MAX_PAGES != 0 {
510                    init_out.flags |= FUSE_MAX_PAGES;
511                    init_out.max_pages = cmp::min(
512                        (init_out.max_write - 1) / (pagesize() as u32) + 1,
513                        u16::max_value() as u32,
514                    ) as u16;
515                }
516
517                debug_assert_eq!(init_out.major, FUSE_KERNEL_VERSION);
518                debug_assert!(init_out.minor >= MINIMUM_SUPPORTED_MINOR_VERSION);
519
520                tracing::debug!("Reply to INIT:");
521                tracing::debug!("  proto = {}.{}:", init_out.major, init_out.minor);
522                tracing::debug!("  flags = 0x{:08x}", init_out.flags);
523                tracing::debug!("  max_readahead = 0x{:08X}", init_out.max_readahead);
524                tracing::debug!("  max_write = 0x{:08X}", init_out.max_write);
525                tracing::debug!("  max_background = 0x{:04X}", init_out.max_background);
526                tracing::debug!(
527                    "  congestion_threshold = 0x{:04X}",
528                    init_out.congestion_threshold
529                );
530                tracing::debug!("  time_gran = {}", init_out.time_gran);
531                write_bytes(writer, Reply::new(header.unique, 0, init_out.as_bytes()))?;
532
533                init_out.flags |= readonly_flags;
534
535                return Ok(());
536            }
537
538            _ => {
539                tracing::warn!(
540                    "ignoring an operation before init (opcode={:?})",
541                    header.opcode
542                );
543                write_bytes(&mut writer, Reply::new(header.unique, libc::EIO, ()))?;
544                continue;
545            }
546        }
547    }
548
549    Err(io::Error::new(
550        io::ErrorKind::ConnectionRefused,
551        "session initialization is aborted",
552    ))
553}
554
555// ==== Request ====
556
557/// Context about an incoming FUSE request.
558pub struct Request {
559    session: Arc<SessionInner>,
560    header: fuse_in_header,
561    arg: Vec<u8>,
562}
563
564impl Request {
565    /// Return the unique ID of the request.
566    #[inline]
567    pub fn unique(&self) -> u64 {
568        self.header.unique
569    }
570
571    /// Return the user ID of the calling process.
572    #[inline]
573    pub fn uid(&self) -> u32 {
574        self.header.uid
575    }
576
577    /// Return the group ID of the calling process.
578    #[inline]
579    pub fn gid(&self) -> u32 {
580        self.header.gid
581    }
582
583    /// Return the process ID of the calling process.
584    #[inline]
585    pub fn pid(&self) -> u32 {
586        self.header.pid
587    }
588
589    /// Decode the argument of this request.
590    pub fn operation(&self) -> Result<Operation<'_, Data<'_>>, DecodeError> {
591        if self.session.exited() {
592            return Ok(Operation::unknown());
593        }
594
595        let (arg, data) = match fuse_opcode::try_from(self.header.opcode).ok() {
596            Some(fuse_opcode::FUSE_WRITE) | Some(fuse_opcode::FUSE_NOTIFY_REPLY) => {
597                self.arg.split_at(mem::size_of::<fuse_write_in>())
598            }
599            _ => (&self.arg[..], &[] as &[_]),
600        };
601
602        Operation::decode(&self.header, arg, Data { data })
603    }
604
605    pub fn reply<T>(&self, arg: T) -> io::Result<()>
606    where
607        T: Bytes,
608    {
609        write_bytes(&self.session.conn, Reply::new(self.unique(), 0, arg))
610    }
611
612    pub fn reply_error(&self, code: i32) -> io::Result<()> {
613        write_bytes(&self.session.conn, Reply::new(self.unique(), code, ()))
614    }
615}
616
617/// The remaining part of request message.
618pub struct Data<'op> {
619    data: &'op [u8],
620}
621
622impl fmt::Debug for Data<'_> {
623    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
624        f.debug_struct("Data").finish()
625    }
626}
627
628impl<'op> io::Read for Data<'op> {
629    #[inline]
630    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
631        io::Read::read(&mut self.data, buf)
632    }
633
634    #[inline]
635    fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> {
636        io::Read::read_vectored(&mut self.data, bufs)
637    }
638}
639
640impl<'op> BufRead for Data<'op> {
641    #[inline]
642    fn fill_buf(&mut self) -> io::Result<&[u8]> {
643        io::BufRead::fill_buf(&mut self.data)
644    }
645
646    #[inline]
647    fn consume(&mut self, amt: usize) {
648        io::BufRead::consume(&mut self.data, amt)
649    }
650}
651
652// ==== Notifier ====
653
654#[derive(Clone)]
655pub struct Notifier {
656    session: Arc<SessionInner>,
657}
658
659impl Notifier {
660    /// Notify the cache invalidation about an inode to the kernel.
661    pub fn inval_inode(&self, ino: u64, off: i64, len: i64) -> io::Result<()> {
662        let total_len = u32::try_from(
663            mem::size_of::<fuse_out_header>() + mem::size_of::<fuse_notify_inval_inode_out>(),
664        )
665        .unwrap();
666
667        return write_bytes(
668            &self.session.conn,
669            InvalInode {
670                header: fuse_out_header {
671                    len: total_len,
672                    error: fuse_notify_code::FUSE_NOTIFY_INVAL_INODE as i32,
673                    unique: 0,
674                },
675                arg: fuse_notify_inval_inode_out { ino, off, len },
676            },
677        );
678
679        struct InvalInode {
680            header: fuse_out_header,
681            arg: fuse_notify_inval_inode_out,
682        }
683        impl Bytes for InvalInode {
684            fn size(&self) -> usize {
685                self.header.len as usize
686            }
687
688            fn count(&self) -> usize {
689                2
690            }
691
692            fn fill_bytes<'a>(&'a self, dst: &mut dyn FillBytes<'a>) {
693                dst.put(self.header.as_bytes());
694                dst.put(self.arg.as_bytes());
695            }
696        }
697    }
698
699    /// Notify the invalidation about a directory entry to the kernel.
700    pub fn inval_entry<T>(&self, parent: u64, name: T) -> io::Result<()>
701    where
702        T: AsRef<OsStr>,
703    {
704        let namelen = u32::try_from(name.as_ref().len()).expect("provided name is too long");
705
706        let total_len = u32::try_from(
707            mem::size_of::<fuse_out_header>()
708                + mem::size_of::<fuse_notify_inval_entry_out>()
709                + name.as_ref().len()
710                + 1,
711        )
712        .unwrap();
713
714        return write_bytes(
715            &self.session.conn,
716            InvalEntry {
717                header: fuse_out_header {
718                    len: total_len,
719                    error: fuse_notify_code::FUSE_NOTIFY_INVAL_ENTRY as i32,
720                    unique: 0,
721                },
722                arg: fuse_notify_inval_entry_out {
723                    parent,
724                    namelen,
725                    padding: 0,
726                },
727                name,
728            },
729        );
730
731        struct InvalEntry<T>
732        where
733            T: AsRef<OsStr>,
734        {
735            header: fuse_out_header,
736            arg: fuse_notify_inval_entry_out,
737            name: T,
738        }
739        impl<T> Bytes for InvalEntry<T>
740        where
741            T: AsRef<OsStr>,
742        {
743            fn size(&self) -> usize {
744                self.header.len as usize
745            }
746
747            fn count(&self) -> usize {
748                4
749            }
750
751            fn fill_bytes<'a>(&'a self, dst: &mut dyn FillBytes<'a>) {
752                dst.put(self.header.as_bytes());
753                dst.put(self.arg.as_bytes());
754                dst.put(self.name.as_ref().as_bytes());
755                dst.put(b"\0"); // null terminator
756            }
757        }
758    }
759
760    /// Notify the invalidation about a directory entry to the kernel.
761    ///
762    /// The role of this notification is similar to `notify_inval_entry`.
763    /// Additionally, when the provided `child` inode matches the inode
764    /// in the dentry cache, the inotify will inform the deletion to
765    /// watchers if exists.
766    pub fn delete<T>(&self, parent: u64, child: u64, name: T) -> io::Result<()>
767    where
768        T: AsRef<OsStr>,
769    {
770        let namelen = u32::try_from(name.as_ref().len()).expect("provided name is too long");
771
772        let total_len = u32::try_from(
773            mem::size_of::<fuse_out_header>()
774                + mem::size_of::<fuse_notify_delete_out>()
775                + name.as_ref().len()
776                + 1,
777        )
778        .expect("payload is too long");
779
780        return write_bytes(
781            &self.session.conn,
782            Delete {
783                header: fuse_out_header {
784                    len: total_len,
785                    error: fuse_notify_code::FUSE_NOTIFY_DELETE as i32,
786                    unique: 0,
787                },
788                arg: fuse_notify_delete_out {
789                    parent,
790                    child,
791                    namelen,
792                    padding: 0,
793                },
794                name,
795            },
796        );
797
798        struct Delete<T>
799        where
800            T: AsRef<OsStr>,
801        {
802            header: fuse_out_header,
803            arg: fuse_notify_delete_out,
804            name: T,
805        }
806        impl<T> Bytes for Delete<T>
807        where
808            T: AsRef<OsStr>,
809        {
810            fn size(&self) -> usize {
811                self.header.len as usize
812            }
813
814            fn count(&self) -> usize {
815                4
816            }
817
818            fn fill_bytes<'a>(&'a self, dst: &mut dyn FillBytes<'a>) {
819                dst.put(self.header.as_bytes());
820                dst.put(self.arg.as_bytes());
821                dst.put(self.name.as_ref().as_bytes());
822                dst.put(b"\0"); // null terminator
823            }
824        }
825    }
826
827    /// Push the data in an inode for updating the kernel cache.
828    pub fn store<T>(&self, ino: u64, offset: u64, data: T) -> io::Result<()>
829    where
830        T: Bytes,
831    {
832        let size = u32::try_from(data.size()).expect("provided data is too large");
833
834        let total_len = u32::try_from(
835            mem::size_of::<fuse_out_header>()
836                + mem::size_of::<fuse_notify_store_out>()
837                + data.size(),
838        )
839        .expect("payload is too long");
840
841        return write_bytes(
842            &self.session.conn,
843            Store {
844                header: fuse_out_header {
845                    len: total_len,
846                    error: fuse_notify_code::FUSE_NOTIFY_STORE as i32,
847                    unique: 0,
848                },
849                arg: fuse_notify_store_out {
850                    nodeid: ino,
851                    offset,
852                    size,
853                    padding: 0,
854                },
855                data,
856            },
857        );
858
859        struct Store<T>
860        where
861            T: Bytes,
862        {
863            header: fuse_out_header,
864            arg: fuse_notify_store_out,
865            data: T,
866        }
867        impl<T> Bytes for Store<T>
868        where
869            T: Bytes,
870        {
871            fn size(&self) -> usize {
872                self.header.len as usize
873            }
874
875            fn count(&self) -> usize {
876                2 + self.data.count()
877            }
878
879            fn fill_bytes<'a>(&'a self, dst: &mut dyn FillBytes<'a>) {
880                dst.put(self.header.as_bytes());
881                dst.put(self.arg.as_bytes());
882                self.data.fill_bytes(dst);
883            }
884        }
885    }
886
887    /// Retrieve data in an inode from the kernel cache.
888    pub fn retrieve(&self, ino: u64, offset: u64, size: u32) -> io::Result<u64> {
889        let total_len = u32::try_from(
890            mem::size_of::<fuse_out_header>() + mem::size_of::<fuse_notify_retrieve_out>(),
891        )
892        .unwrap();
893
894        // FIXME: choose appropriate memory ordering.
895        let notify_unique = self.session.notify_unique.fetch_add(1, Ordering::SeqCst);
896
897        write_bytes(
898            &self.session.conn,
899            Retrieve {
900                header: fuse_out_header {
901                    len: total_len,
902                    error: fuse_notify_code::FUSE_NOTIFY_RETRIEVE as i32,
903                    unique: 0,
904                },
905                arg: fuse_notify_retrieve_out {
906                    nodeid: ino,
907                    offset,
908                    size,
909                    notify_unique,
910                    padding: 0,
911                },
912            },
913        )?;
914
915        return Ok(notify_unique);
916
917        struct Retrieve {
918            header: fuse_out_header,
919            arg: fuse_notify_retrieve_out,
920        }
921        impl Bytes for Retrieve {
922            fn size(&self) -> usize {
923                self.header.len as usize
924            }
925
926            fn count(&self) -> usize {
927                2
928            }
929
930            fn fill_bytes<'a>(&'a self, dst: &mut dyn FillBytes<'a>) {
931                dst.put(self.header.as_bytes());
932                dst.put(self.arg.as_bytes());
933            }
934        }
935    }
936
937    /// Send I/O readiness to the kernel.
938    pub fn poll_wakeup(&self, kh: u64) -> io::Result<()> {
939        let total_len = u32::try_from(
940            mem::size_of::<fuse_out_header>() + mem::size_of::<fuse_notify_poll_wakeup_out>(),
941        )
942        .unwrap();
943
944        return write_bytes(
945            &self.session.conn,
946            PollWakeup {
947                header: fuse_out_header {
948                    len: total_len,
949                    error: fuse_notify_code::FUSE_NOTIFY_POLL as i32,
950                    unique: 0,
951                },
952                arg: fuse_notify_poll_wakeup_out { kh },
953            },
954        );
955
956        struct PollWakeup {
957            header: fuse_out_header,
958            arg: fuse_notify_poll_wakeup_out,
959        }
960        impl Bytes for PollWakeup {
961            fn size(&self) -> usize {
962                self.header.len as usize
963            }
964
965            fn count(&self) -> usize {
966                2
967            }
968
969            fn fill_bytes<'a>(&'a self, dst: &mut dyn FillBytes<'a>) {
970                dst.put(self.header.as_bytes());
971                dst.put(self.arg.as_bytes());
972            }
973        }
974    }
975}
976
977// ==== utils ====
978
979struct Reply<T> {
980    header: fuse_out_header,
981    arg: T,
982}
983impl<T> Reply<T>
984where
985    T: Bytes,
986{
987    #[inline]
988    fn new(unique: u64, error: i32, arg: T) -> Self {
989        let len = (mem::size_of::<fuse_out_header>() + arg.size())
990            .try_into()
991            .expect("Argument size is too large");
992        Self {
993            header: fuse_out_header {
994                len,
995                error: -error,
996                unique,
997            },
998            arg,
999        }
1000    }
1001}
1002impl<T> Bytes for Reply<T>
1003where
1004    T: Bytes,
1005{
1006    #[inline]
1007    fn size(&self) -> usize {
1008        self.header.len as usize
1009    }
1010
1011    #[inline]
1012    fn count(&self) -> usize {
1013        self.arg.count() + 1
1014    }
1015
1016    fn fill_bytes<'a>(&'a self, dst: &mut dyn FillBytes<'a>) {
1017        dst.put(self.header.as_bytes());
1018        self.arg.fill_bytes(dst);
1019    }
1020}
1021
1022#[inline]
1023fn write_bytes<W, T>(mut writer: W, bytes: T) -> io::Result<()>
1024where
1025    W: io::Write,
1026    T: Bytes,
1027{
1028    let size = bytes.size();
1029    let count = bytes.count();
1030
1031    let written;
1032
1033    macro_rules! small_write {
1034        ($n:expr) => {{
1035            let mut vec: [MaybeUninit<IoSlice<'_>>; $n] =
1036                unsafe { MaybeUninit::uninit().assume_init() };
1037            bytes.fill_bytes(&mut FillWriteBytes {
1038                vec: &mut vec[..],
1039                offset: 0,
1040            });
1041            let vec = unsafe { slice_assume_init_ref(&vec[..]) };
1042
1043            written = writer.write_vectored(vec)?;
1044        }};
1045    }
1046
1047    match count {
1048        // Skip writing.
1049        0 => return Ok(()),
1050
1051        // Avoid heap allocation if count is small.
1052        1 => small_write!(1),
1053        2 => small_write!(2),
1054        3 => small_write!(3),
1055        4 => small_write!(4),
1056
1057        count => {
1058            let mut vec: Vec<IoSlice<'_>> = Vec::with_capacity(count);
1059            unsafe {
1060                let dst = std::slice::from_raw_parts_mut(
1061                    vec.as_mut_ptr().cast(), //
1062                    count,
1063                );
1064                bytes.fill_bytes(&mut FillWriteBytes {
1065                    vec: dst,
1066                    offset: 0,
1067                });
1068                vec.set_len(count);
1069            }
1070
1071            written = writer.write_vectored(&*vec)?;
1072        }
1073    }
1074
1075    if written < size {
1076        return Err(io::Error::new(
1077            io::ErrorKind::Other,
1078            "written data is too short",
1079        ));
1080    }
1081
1082    Ok(())
1083}
1084
1085struct FillWriteBytes<'a, 'vec> {
1086    vec: &'vec mut [MaybeUninit<IoSlice<'a>>],
1087    offset: usize,
1088}
1089
1090impl<'a, 'vec> FillBytes<'a> for FillWriteBytes<'a, 'vec> {
1091    fn put(&mut self, chunk: &'a [u8]) {
1092        self.vec[self.offset] = MaybeUninit::new(IoSlice::new(chunk));
1093        self.offset += 1;
1094    }
1095}
1096
1097// FIXME: replace with stabilized MaybeUninit::slice_assume_init_ref.
1098#[inline(always)]
1099unsafe fn slice_assume_init_ref<T>(slice: &[MaybeUninit<T>]) -> &[T] {
1100    #[allow(unused_unsafe)]
1101    unsafe {
1102        &*(slice as *const [MaybeUninit<T>] as *const [T])
1103    }
1104}
1105
1106#[inline]
1107fn pagesize() -> usize {
1108    unsafe { libc::sysconf(libc::_SC_PAGESIZE) as usize }
1109}
1110
1111#[inline]
1112const fn default_init_out() -> fuse_init_out {
1113    fuse_init_out {
1114        major: FUSE_KERNEL_VERSION,
1115        minor: FUSE_KERNEL_MINOR_VERSION,
1116        max_readahead: u32::MAX,
1117        flags: DEFAULT_INIT_FLAGS,
1118        max_background: 0,
1119        congestion_threshold: 0,
1120        max_write: DEFAULT_MAX_WRITE,
1121        time_gran: 1,
1122        max_pages: 0,
1123        padding: 0,
1124        unused: [0; 8],
1125    }
1126}
1127
1128#[cfg(test)]
1129mod tests {
1130    use super::*;
1131    use std::mem;
1132
1133    #[test]
1134    fn init_default() {
1135        let input_len = mem::size_of::<fuse_in_header>() + mem::size_of::<fuse_init_in>();
1136        let in_header = fuse_in_header {
1137            len: input_len as u32,
1138            opcode: fuse_opcode::FUSE_INIT as u32,
1139            unique: 2,
1140            nodeid: 0,
1141            uid: 100,
1142            gid: 100,
1143            pid: 12,
1144            padding: 0,
1145        };
1146        let init_in = fuse_init_in {
1147            major: 7,
1148            minor: 23,
1149            max_readahead: 40,
1150            flags: INIT_FLAGS_MASK
1151                | FUSE_MAX_PAGES
1152                | FUSE_NO_OPEN_SUPPORT
1153                | FUSE_NO_OPENDIR_SUPPORT,
1154        };
1155
1156        let mut input = Vec::with_capacity(input_len);
1157        input.extend_from_slice(in_header.as_bytes());
1158        input.extend_from_slice(init_in.as_bytes());
1159        assert_eq!(input.len(), input_len);
1160
1161        let mut output = Vec::<u8>::new();
1162
1163        let mut init_out = default_init_out();
1164        init_session(&mut init_out, &input[..], &mut output).expect("initialization failed");
1165
1166        let expected_max_pages = (DEFAULT_MAX_WRITE / (pagesize() as u32)) as u16;
1167
1168        assert_eq!(init_out.major, 7);
1169        assert_eq!(init_out.minor, 23);
1170        assert_eq!(init_out.max_readahead, 40);
1171        assert_eq!(init_out.max_background, 0);
1172        assert_eq!(init_out.congestion_threshold, 0);
1173        assert_eq!(init_out.max_write, DEFAULT_MAX_WRITE);
1174        assert_eq!(init_out.max_pages, expected_max_pages);
1175        assert_eq!(init_out.time_gran, 1);
1176        assert!(init_out.flags & FUSE_NO_OPEN_SUPPORT != 0);
1177        assert!(init_out.flags & FUSE_NO_OPENDIR_SUPPORT != 0);
1178
1179        let output_len = mem::size_of::<fuse_out_header>() + mem::size_of::<fuse_init_out>();
1180        let out_header = fuse_out_header {
1181            len: output_len as u32,
1182            error: 0,
1183            unique: 2,
1184        };
1185        let init_out = fuse_init_out {
1186            major: 7,
1187            minor: 23,
1188            max_readahead: 40,
1189            flags: DEFAULT_INIT_FLAGS | FUSE_MAX_PAGES | FUSE_BIG_WRITES,
1190            max_background: 0,
1191            congestion_threshold: 0,
1192            max_write: DEFAULT_MAX_WRITE,
1193            time_gran: 1,
1194            max_pages: expected_max_pages,
1195            padding: 0,
1196            unused: [0; 8],
1197        };
1198
1199        let mut expected = Vec::with_capacity(output_len);
1200        expected.extend_from_slice(out_header.as_bytes());
1201        expected.extend_from_slice(init_out.as_bytes());
1202        assert_eq!(output.len(), output_len);
1203
1204        assert_eq!(expected[0..4], output[0..4], "out_header.len");
1205        assert_eq!(expected[4..8], output[4..8], "out_header.error");
1206        assert_eq!(expected[8..16], output[8..16], "out_header.unique");
1207
1208        let expected = &expected[mem::size_of::<fuse_out_header>()..];
1209        let output = &output[mem::size_of::<fuse_out_header>()..];
1210        assert_eq!(expected[0..4], output[0..4], "init_out.major");
1211        assert_eq!(expected[4..8], output[4..8], "init_out.minor");
1212        assert_eq!(expected[8..12], output[8..12], "init_out.max_readahead");
1213        assert_eq!(expected[12..16], output[12..16], "init_out.flags");
1214        assert_eq!(expected[16..18], output[16..18], "init_out.max_background");
1215        assert_eq!(
1216            expected[18..20],
1217            output[18..20],
1218            "init_out.congestion_threshold"
1219        );
1220        assert_eq!(expected[20..24], output[20..24], "init_out.max_write");
1221        assert_eq!(expected[24..28], output[24..28], "init_out.time_gran");
1222        assert_eq!(expected[28..30], output[28..30], "init_out.max_pages");
1223        assert!(
1224            output[30..30 + 2 + 4 * 8].iter().all(|&b| b == 0x00),
1225            "init_out.paddings"
1226        );
1227    }
1228
1229    #[inline]
1230    fn bytes(bytes: &[u8]) -> &[u8] {
1231        bytes
1232    }
1233    macro_rules! b {
1234        ($($b:expr),*$(,)?) => ( *bytes(&[$($b),*]) );
1235    }
1236
1237    #[test]
1238    fn send_msg_empty() {
1239        let mut buf = vec![0u8; 0];
1240        write_bytes(&mut buf, Reply::new(42, -4, &[])).unwrap();
1241        assert_eq!(buf[0..4], b![0x10, 0x00, 0x00, 0x00], "header.len");
1242        assert_eq!(buf[4..8], b![0x04, 0x00, 0x00, 0x00], "header.error");
1243        assert_eq!(
1244            buf[8..16],
1245            b![0x2a, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00],
1246            "header.unique"
1247        );
1248    }
1249
1250    #[test]
1251    fn send_msg_single_data() {
1252        let mut buf = vec![0u8; 0];
1253        write_bytes(&mut buf, Reply::new(42, 0, "hello")).unwrap();
1254        assert_eq!(buf[0..4], b![0x15, 0x00, 0x00, 0x00], "header.len");
1255        assert_eq!(buf[4..8], b![0x00, 0x00, 0x00, 0x00], "header.error");
1256        assert_eq!(
1257            buf[8..16],
1258            b![0x2a, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00],
1259            "header.unique"
1260        );
1261        assert_eq!(buf[16..], b![0x68, 0x65, 0x6c, 0x6c, 0x6f], "payload");
1262    }
1263
1264    #[test]
1265    fn send_msg_chunked_data() {
1266        let payload: &[&[u8]] = &[
1267            "hello, ".as_ref(), //
1268            "this ".as_ref(),
1269            "is a ".as_ref(),
1270            "message.".as_ref(),
1271        ];
1272        let mut buf = vec![0u8; 0];
1273        write_bytes(&mut buf, Reply::new(26, 0, payload)).unwrap();
1274        assert_eq!(buf[0..4], b![0x29, 0x00, 0x00, 0x00], "header.len");
1275        assert_eq!(buf[4..8], b![0x00, 0x00, 0x00, 0x00], "header.error");
1276        assert_eq!(
1277            buf[8..16],
1278            b![0x1a, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00],
1279            "header.unique"
1280        );
1281        assert_eq!(buf[16..], *b"hello, this is a message.", "payload");
1282    }
1283}