fuse_backend_rs/api/server/
async_io.rs

1// Copyright (C) 2021-2022 Alibaba Cloud. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::io;
5use std::mem::size_of;
6use std::sync::Arc;
7use std::time::Duration;
8
9use async_trait::async_trait;
10use vm_memory::ByteValued;
11
12use crate::abi::fuse_abi::{
13    stat64, AttrOut, CreateIn, EntryOut, FallocateIn, FsyncIn, GetattrIn, Opcode, OpenIn, OpenOut,
14    OutHeader, ReadIn, SetattrIn, SetattrValid, WriteIn, WriteOut, FATTR_FH, GETATTR_FH,
15    KERNEL_MINOR_VERSION_LOOKUP_NEGATIVE_ENTRY_ZERO, READ_LOCKOWNER, WRITE_CACHE, WRITE_LOCKOWNER,
16};
17use crate::api::filesystem::{
18    AsyncFileSystem, AsyncZeroCopyReader, AsyncZeroCopyWriter, ZeroCopyReader, ZeroCopyWriter,
19};
20use crate::api::server::{
21    MetricsHook, Server, ServerUtil, SrvContext, BUFFER_HEADER_SIZE, MAX_BUFFER_SIZE,
22};
23use crate::file_traits::{AsyncFileReadWriteVolatile, FileReadWriteVolatile};
24use crate::transport::{FsCacheReqHandler, Reader, Writer};
25use crate::{bytes_to_cstr, encode_io_error_kind, BitmapSlice, Error, Result};
26
27struct AsyncZcReader<'a, S: BitmapSlice = ()>(Reader<'a, S>);
28
29// The underlying VolatileSlice contains "*mut u8", which is just a pointer to a u8 array.
30// Actually we rely on the AsyncExecutor is a single-threaded worker, and we do not really send
31// 'Reader' to other threads.
32unsafe impl<'a, S: BitmapSlice> Send for AsyncZcReader<'a, S> {}
33
34#[async_trait(?Send)]
35impl<'a, S: BitmapSlice> AsyncZeroCopyReader for AsyncZcReader<'a, S> {
36    async fn async_read_to(
37        &mut self,
38        f: Arc<dyn AsyncFileReadWriteVolatile>,
39        count: usize,
40        off: u64,
41    ) -> io::Result<usize> {
42        self.0.async_read_to_at(&f, count, off).await
43    }
44}
45
46impl<'a, S: BitmapSlice> ZeroCopyReader for AsyncZcReader<'a, S> {
47    fn read_to(
48        &mut self,
49        f: &mut dyn FileReadWriteVolatile,
50        count: usize,
51        off: u64,
52    ) -> io::Result<usize> {
53        self.0.read_to_at(f, count, off)
54    }
55}
56
57impl<'a, S: BitmapSlice> io::Read for AsyncZcReader<'a, S> {
58    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
59        self.0.read(buf)
60    }
61}
62
63struct AsyncZcWriter<'a, S: BitmapSlice = ()>(Writer<'a, S>);
64
65// The underlying VolatileSlice contains "*mut u8", which is just a pointer to a u8 array.
66// Actually we rely on the AsyncExecutor is a single-threaded worker, and we do not really send
67// 'Reader' to other threads.
68unsafe impl<'a, S: BitmapSlice> Send for AsyncZcWriter<'a, S> {}
69
70#[async_trait(?Send)]
71impl<'a, S: BitmapSlice> AsyncZeroCopyWriter for AsyncZcWriter<'a, S> {
72    async fn async_write_from(
73        &mut self,
74        f: Arc<dyn AsyncFileReadWriteVolatile>,
75        count: usize,
76        off: u64,
77    ) -> io::Result<usize> {
78        self.0.async_write_from_at(&f, count, off).await
79    }
80}
81
82impl<'a, S: BitmapSlice> ZeroCopyWriter for AsyncZcWriter<'a, S> {
83    fn write_from(
84        &mut self,
85        f: &mut dyn FileReadWriteVolatile,
86        count: usize,
87        off: u64,
88    ) -> io::Result<usize> {
89        self.0.write_from_at(f, count, off)
90    }
91
92    fn available_bytes(&self) -> usize {
93        self.0.available_bytes()
94    }
95}
96
97impl<'a, S: BitmapSlice> io::Write for AsyncZcWriter<'a, S> {
98    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
99        self.0.write(buf)
100    }
101
102    fn flush(&mut self) -> io::Result<()> {
103        self.0.flush()
104    }
105}
106
107impl<F: AsyncFileSystem + Sync> Server<F> {
108    /// Main entrance to handle requests from the transport layer.
109    ///
110    /// It receives Fuse requests from transport layers, parses the request according to Fuse ABI,
111    /// invokes filesystem drivers to server the requests, and eventually send back the result to
112    /// the transport layer.
113    ///
114    /// ## Safety
115    /// The async io framework borrows underlying buffers from `Reader` and `Writer`, so the caller
116    /// must ensure all data buffers managed by the `Reader` and `Writer` are valid until the
117    /// `Future` object returned has completed. Other subsystems, such as the transport layer, rely
118    /// on the invariant.
119    #[allow(unused_variables)]
120    pub async unsafe fn async_handle_message<S: BitmapSlice>(
121        &self,
122        mut r: Reader<'_, S>,
123        w: Writer<'_, S>,
124        vu_req: Option<&mut dyn FsCacheReqHandler>,
125        hook: Option<&dyn MetricsHook>,
126    ) -> Result<usize> {
127        let in_header = r.read_obj().map_err(Error::DecodeMessage)?;
128        let mut ctx = SrvContext::<F, S>::new(in_header, r, w);
129        if ctx.in_header.len > (MAX_BUFFER_SIZE + BUFFER_HEADER_SIZE)
130            || ctx.w.available_bytes() < size_of::<OutHeader>()
131        {
132            return ctx
133                .async_do_reply_error(io::Error::from_raw_os_error(libc::ENOMEM), true)
134                .await;
135        }
136        let in_header = &ctx.in_header;
137
138        trace!(
139            "fuse: new req {:?}: {:?}",
140            Opcode::from(in_header.opcode),
141            in_header
142        );
143
144        if let Some(h) = hook {
145            h.collect(&in_header);
146        }
147
148        let res = match in_header.opcode {
149            x if x == Opcode::Lookup as u32 => self.async_lookup(ctx).await,
150            x if x == Opcode::Forget as u32 => self.forget(ctx), // No reply.
151            x if x == Opcode::Getattr as u32 => self.async_getattr(ctx).await,
152            x if x == Opcode::Setattr as u32 => self.async_setattr(ctx).await,
153            x if x == Opcode::Readlink as u32 => self.readlink(ctx),
154            x if x == Opcode::Symlink as u32 => self.symlink(ctx),
155            x if x == Opcode::Mknod as u32 => self.mknod(ctx),
156            x if x == Opcode::Mkdir as u32 => self.mkdir(ctx),
157            x if x == Opcode::Unlink as u32 => self.unlink(ctx),
158            x if x == Opcode::Rmdir as u32 => self.rmdir(ctx),
159            x if x == Opcode::Rename as u32 => self.rename(ctx),
160            x if x == Opcode::Link as u32 => self.link(ctx),
161            x if x == Opcode::Open as u32 => self.async_open(ctx).await,
162            x if x == Opcode::Read as u32 => self.async_read(ctx).await,
163            x if x == Opcode::Write as u32 => self.async_write(ctx).await,
164            x if x == Opcode::Statfs as u32 => self.statfs(ctx),
165            x if x == Opcode::Release as u32 => self.release(ctx),
166            x if x == Opcode::Fsync as u32 => self.async_fsync(ctx).await,
167            x if x == Opcode::Setxattr as u32 => self.setxattr(ctx),
168            x if x == Opcode::Getxattr as u32 => self.getxattr(ctx),
169            x if x == Opcode::Listxattr as u32 => self.listxattr(ctx),
170            x if x == Opcode::Removexattr as u32 => self.removexattr(ctx),
171            x if x == Opcode::Flush as u32 => self.flush(ctx),
172            x if x == Opcode::Init as u32 => self.init(ctx),
173            x if x == Opcode::Opendir as u32 => self.opendir(ctx),
174            x if x == Opcode::Readdir as u32 => self.readdir(ctx),
175            x if x == Opcode::Releasedir as u32 => self.releasedir(ctx),
176            x if x == Opcode::Fsyncdir as u32 => self.async_fsyncdir(ctx).await,
177            x if x == Opcode::Getlk as u32 => self.getlk(ctx),
178            x if x == Opcode::Setlk as u32 => self.setlk(ctx),
179            x if x == Opcode::Setlkw as u32 => self.setlkw(ctx),
180            x if x == Opcode::Access as u32 => self.access(ctx),
181            x if x == Opcode::Create as u32 => self.async_create(ctx).await,
182            x if x == Opcode::Bmap as u32 => self.bmap(ctx),
183            x if x == Opcode::Ioctl as u32 => self.ioctl(ctx),
184            x if x == Opcode::Poll as u32 => self.poll(ctx),
185            x if x == Opcode::NotifyReply as u32 => self.notify_reply(ctx),
186            x if x == Opcode::BatchForget as u32 => self.batch_forget(ctx),
187            x if x == Opcode::Fallocate as u32 => self.async_fallocate(ctx).await,
188            #[cfg(target_os = "linux")]
189            x if x == Opcode::Readdirplus as u32 => self.readdirplus(ctx),
190            #[cfg(target_os = "linux")]
191            x if x == Opcode::Rename2 as u32 => self.rename2(ctx),
192            #[cfg(target_os = "linux")]
193            x if x == Opcode::Lseek as u32 => self.lseek(ctx),
194            #[cfg(feature = "virtiofs")]
195            x if x == Opcode::SetupMapping as u32 => self.setupmapping(ctx, vu_req),
196            #[cfg(feature = "virtiofs")]
197            x if x == Opcode::RemoveMapping as u32 => self.removemapping(ctx, vu_req),
198            // Group reqeusts don't need reply together
199            x => match x {
200                x if x == Opcode::Interrupt as u32 => {
201                    self.interrupt(ctx);
202                    Ok(0)
203                }
204                x if x == Opcode::Destroy as u32 => {
205                    self.destroy(ctx);
206                    Ok(0)
207                }
208                _ => {
209                    ctx.async_reply_error(io::Error::from_raw_os_error(libc::ENOSYS))
210                        .await
211                }
212            },
213        };
214
215        // Pass `None` because current API handler's design does not allow us to catch
216        // the `out_header`. Hopefully, we can reach to `out_header` after some
217        // refactoring work someday.
218        if let Some(h) = hook {
219            h.release(None);
220        }
221
222        res
223    }
224
225    async fn async_lookup<S: BitmapSlice>(&self, mut ctx: SrvContext<'_, F, S>) -> Result<usize> {
226        let buf = ServerUtil::get_message_body(&mut ctx.r, &ctx.in_header, 0)?;
227        let name = match bytes_to_cstr(buf.as_ref()) {
228            Ok(name) => name,
229            Err(e) => {
230                error!("fuse: bytes to cstr error: {:?}, {:?}", buf, e);
231                let _ = ctx
232                    .async_reply_error(io::Error::from_raw_os_error(libc::EINVAL))
233                    .await;
234                return Err(e);
235            }
236        };
237
238        let version = self.vers.load();
239        let result = self
240            .fs
241            .async_lookup(ctx.context(), ctx.nodeid(), name)
242            .await;
243
244        match result {
245            // before ABI 7.4 inode == 0 was invalid, only ENOENT means negative dentry
246            Ok(entry)
247                if version.minor < KERNEL_MINOR_VERSION_LOOKUP_NEGATIVE_ENTRY_ZERO
248                    && entry.inode == 0 =>
249            {
250                ctx.async_reply_error(io::Error::from_raw_os_error(libc::ENOENT))
251                    .await
252            }
253            Ok(entry) => {
254                let out = EntryOut::from(entry);
255                ctx.async_reply_ok(Some(out), None).await
256            }
257            Err(e) => ctx.async_reply_error(e).await,
258        }
259    }
260
261    async fn async_getattr<S: BitmapSlice>(&self, mut ctx: SrvContext<'_, F, S>) -> Result<usize> {
262        let GetattrIn { flags, fh, .. } = ctx.r.read_obj().map_err(Error::DecodeMessage)?;
263        let handle = if (flags & GETATTR_FH) != 0 {
264            Some(fh.into())
265        } else {
266            None
267        };
268        let result = self
269            .fs
270            .async_getattr(ctx.context(), ctx.nodeid(), handle)
271            .await;
272
273        ctx.async_handle_attr_result(result).await
274    }
275
276    async fn async_setattr<S: BitmapSlice>(&self, mut ctx: SrvContext<'_, F, S>) -> Result<usize> {
277        let setattr_in: SetattrIn = ctx.r.read_obj().map_err(Error::DecodeMessage)?;
278        let handle = if setattr_in.valid & FATTR_FH != 0 {
279            Some(setattr_in.fh.into())
280        } else {
281            None
282        };
283        let valid = SetattrValid::from_bits_truncate(setattr_in.valid);
284        let st: stat64 = setattr_in.into();
285        let result = self
286            .fs
287            .async_setattr(ctx.context(), ctx.nodeid(), st, handle, valid)
288            .await;
289
290        ctx.async_handle_attr_result(result).await
291    }
292
293    async fn async_open<S: BitmapSlice>(&self, mut ctx: SrvContext<'_, F, S>) -> Result<usize> {
294        let OpenIn { flags, fuse_flags } = ctx.r.read_obj().map_err(Error::DecodeMessage)?;
295        let result = self
296            .fs
297            .async_open(ctx.context(), ctx.nodeid(), flags, fuse_flags)
298            .await;
299
300        match result {
301            Ok((handle, opts)) => {
302                let out = OpenOut {
303                    fh: handle.map(Into::into).unwrap_or(0),
304                    open_flags: opts.bits(),
305                    ..Default::default()
306                };
307
308                ctx.async_reply_ok(Some(out), None).await
309            }
310            Err(e) => ctx.async_reply_error(e).await,
311        }
312    }
313
314    async fn async_read<S: BitmapSlice>(&self, mut ctx: SrvContext<'_, F, S>) -> Result<usize> {
315        let ReadIn {
316            fh,
317            offset,
318            size,
319            read_flags,
320            lock_owner,
321            flags,
322            ..
323        } = ctx.r.read_obj().map_err(Error::DecodeMessage)?;
324
325        let owner = if read_flags & READ_LOCKOWNER != 0 {
326            Some(lock_owner)
327        } else {
328            None
329        };
330
331        // Split the writer into 2 pieces: one for the `OutHeader` and the rest for the data.
332        let w2 = match ctx.w.split_at(size_of::<OutHeader>()) {
333            Ok(v) => v,
334            Err(_e) => return Err(Error::InvalidHeaderLength),
335        };
336        let mut data_writer = AsyncZcWriter(w2);
337        let result = self
338            .fs
339            .async_read(
340                ctx.context(),
341                ctx.nodeid(),
342                fh.into(),
343                &mut data_writer,
344                size,
345                offset,
346                owner,
347                flags,
348            )
349            .await;
350
351        match result {
352            Ok(count) => {
353                // Don't use `reply_ok` because we need to set a custom size length for the
354                // header.
355                let out = OutHeader {
356                    len: (size_of::<OutHeader>() + count) as u32,
357                    error: 0,
358                    unique: ctx.unique(),
359                };
360
361                ctx.w
362                    .async_write_all(out.as_slice())
363                    .await
364                    .map_err(Error::EncodeMessage)?;
365                ctx.w
366                    .async_commit(Some(&data_writer.0))
367                    .await
368                    .map_err(Error::EncodeMessage)?;
369                Ok(out.len as usize)
370            }
371            Err(e) => ctx.async_reply_error_explicit(e).await,
372        }
373    }
374
375    async fn async_write<S: BitmapSlice>(&self, mut ctx: SrvContext<'_, F, S>) -> Result<usize> {
376        let WriteIn {
377            fh,
378            offset,
379            size,
380            fuse_flags,
381            lock_owner,
382            flags,
383            ..
384        } = ctx.r.read_obj().map_err(Error::DecodeMessage)?;
385
386        if size > MAX_BUFFER_SIZE {
387            return ctx
388                .async_reply_error_explicit(io::Error::from_raw_os_error(libc::ENOMEM))
389                .await;
390        }
391
392        let owner = if fuse_flags & WRITE_LOCKOWNER != 0 {
393            Some(lock_owner)
394        } else {
395            None
396        };
397        let delayed_write = fuse_flags & WRITE_CACHE != 0;
398        let mut data_reader = AsyncZcReader(ctx.take_reader());
399        let result = self
400            .fs
401            .async_write(
402                ctx.context(),
403                ctx.nodeid(),
404                fh.into(),
405                &mut data_reader,
406                size,
407                offset,
408                owner,
409                delayed_write,
410                flags,
411                fuse_flags,
412            )
413            .await;
414
415        match result {
416            Ok(count) => {
417                let out = WriteOut {
418                    size: count as u32,
419                    ..Default::default()
420                };
421                ctx.async_reply_ok(Some(out), None).await
422            }
423            Err(e) => ctx.async_reply_error_explicit(e).await,
424        }
425    }
426
427    async fn async_fsync<S: BitmapSlice>(&self, mut ctx: SrvContext<'_, F, S>) -> Result<usize> {
428        let FsyncIn {
429            fh, fsync_flags, ..
430        } = ctx.r.read_obj().map_err(Error::DecodeMessage)?;
431        let datasync = fsync_flags & 0x1 != 0;
432
433        match self
434            .fs
435            .async_fsync(ctx.context(), ctx.nodeid(), datasync, fh.into())
436            .await
437        {
438            Ok(()) => ctx.async_reply_ok(None::<u8>, None).await,
439            Err(e) => ctx.async_reply_error(e).await,
440        }
441    }
442
443    async fn async_fsyncdir<S: BitmapSlice>(&self, mut ctx: SrvContext<'_, F, S>) -> Result<usize> {
444        let FsyncIn {
445            fh, fsync_flags, ..
446        } = ctx.r.read_obj().map_err(Error::DecodeMessage)?;
447        let datasync = fsync_flags & 0x1 != 0;
448        let result = self
449            .fs
450            .async_fsyncdir(ctx.context(), ctx.nodeid(), datasync, fh.into())
451            .await;
452
453        match result {
454            Ok(()) => ctx.async_reply_ok(None::<u8>, None).await,
455            Err(e) => ctx.async_reply_error(e).await,
456        }
457    }
458
459    async fn async_create<S: BitmapSlice>(&self, mut ctx: SrvContext<'_, F, S>) -> Result<usize> {
460        let args: CreateIn = ctx.r.read_obj().map_err(Error::DecodeMessage)?;
461        let buf = ServerUtil::get_message_body(&mut ctx.r, &ctx.in_header, size_of::<CreateIn>())?;
462        let name = match bytes_to_cstr(buf.as_ref()) {
463            Ok(name) => name,
464            Err(e) => {
465                error!("fuse: bytes to cstr error: {:?}, {:?}", buf, e);
466                let _ = ctx
467                    .async_reply_error(io::Error::from_raw_os_error(libc::EINVAL))
468                    .await;
469                return Err(e);
470            }
471        };
472
473        let result = self
474            .fs
475            .async_create(ctx.context(), ctx.nodeid(), name, args)
476            .await;
477
478        match result {
479            Ok((entry, handle, opts)) => {
480                let entry_out = EntryOut {
481                    nodeid: entry.inode,
482                    generation: entry.generation,
483                    entry_valid: entry.entry_timeout.as_secs(),
484                    attr_valid: entry.attr_timeout.as_secs(),
485                    entry_valid_nsec: entry.entry_timeout.subsec_nanos(),
486                    attr_valid_nsec: entry.attr_timeout.subsec_nanos(),
487                    attr: entry.attr.into(),
488                };
489                let open_out = OpenOut {
490                    fh: handle.map(Into::into).unwrap_or(0),
491                    open_flags: opts.bits(),
492                    ..Default::default()
493                };
494
495                // Kind of a hack to write both structs.
496                ctx.async_reply_ok(Some(entry_out), Some(open_out.as_slice()))
497                    .await
498            }
499            Err(e) => ctx.async_reply_error(e).await,
500        }
501    }
502
503    async fn async_fallocate<S: BitmapSlice>(
504        &self,
505        mut ctx: SrvContext<'_, F, S>,
506    ) -> Result<usize> {
507        let FallocateIn {
508            fh,
509            offset,
510            length,
511            mode,
512            ..
513        } = ctx.r.read_obj().map_err(Error::DecodeMessage)?;
514        let result = self
515            .fs
516            .async_fallocate(ctx.context(), ctx.nodeid(), fh.into(), mode, offset, length)
517            .await;
518
519        match result {
520            Ok(()) => ctx.async_reply_ok(None::<u8>, None).await,
521            Err(e) => ctx.async_reply_error(e).await,
522        }
523    }
524}
525
526impl<'a, F: AsyncFileSystem, S: BitmapSlice> SrvContext<'a, F, S> {
527    async fn async_reply_ok<T: ByteValued>(
528        &mut self,
529        out: Option<T>,
530        data: Option<&[u8]>,
531    ) -> Result<usize> {
532        let data2 = out.as_ref().map(|v| v.as_slice()).unwrap_or(&[]);
533        let data3 = data.unwrap_or(&[]);
534        let len = size_of::<OutHeader>() + data2.len() + data3.len();
535        let header = OutHeader {
536            len: len as u32,
537            error: 0,
538            unique: self.in_header.unique,
539        };
540        trace!("fuse: new reply {:?}", header);
541
542        let result = match (data2.len(), data3.len()) {
543            (0, 0) => self.w.async_write(header.as_slice()).await,
544            (0, _) => self.w.async_write2(header.as_slice(), data3).await,
545            (_, 0) => self.w.async_write2(header.as_slice(), data2).await,
546            (_, _) => self.w.async_write3(header.as_slice(), data2, data3).await,
547        };
548        result.map_err(Error::EncodeMessage)?;
549
550        debug_assert_eq!(len, self.w.bytes_written());
551        Ok(self.w.bytes_written())
552    }
553
554    async fn async_do_reply_error(&mut self, err: io::Error, internal_err: bool) -> Result<usize> {
555        let header = OutHeader {
556            len: size_of::<OutHeader>() as u32,
557            error: -err
558                .raw_os_error()
559                .unwrap_or_else(|| encode_io_error_kind(err.kind())),
560            unique: self.in_header.unique,
561        };
562
563        trace!("fuse: reply error header {:?}, error {:?}", header, err);
564        if internal_err {
565            error!("fuse: reply error header {:?}, error {:?}", header, err);
566        }
567        self.w
568            .async_write_all(header.as_slice())
569            .await
570            .map_err(Error::EncodeMessage)?;
571
572        // Commit header if it is buffered otherwise kernel gets nothing back.
573        self.w
574            .async_commit(None)
575            .await
576            .map(|_| {
577                debug_assert_eq!(header.len as usize, self.w.bytes_written());
578                self.w.bytes_written()
579            })
580            .map_err(Error::EncodeMessage)
581    }
582
583    // reply operation error back to fuse client, don't print error message, as they are not
584    // server's internal error, and client could deal with them.
585    async fn async_reply_error(&mut self, err: io::Error) -> Result<usize> {
586        self.async_do_reply_error(err, false).await
587    }
588
589    async fn async_reply_error_explicit(&mut self, err: io::Error) -> Result<usize> {
590        self.async_do_reply_error(err, true).await
591    }
592
593    async fn async_handle_attr_result(
594        &mut self,
595        result: io::Result<(stat64, Duration)>,
596    ) -> Result<usize> {
597        match result {
598            Ok((st, timeout)) => {
599                let out = AttrOut {
600                    attr_valid: timeout.as_secs(),
601                    attr_valid_nsec: timeout.subsec_nanos(),
602                    dummy: 0,
603                    attr: st.into(),
604                };
605                self.async_reply_ok(Some(out), None).await
606            }
607            Err(e) => self.async_reply_error(e).await,
608        }
609    }
610}
611
612#[cfg(feature = "fusedev")]
613#[cfg(test)]
614mod tests {
615    use super::*;
616    use crate::api::Vfs;
617    use crate::transport::{FuseBuf, FuseDevWriter};
618
619    use std::os::unix::io::AsRawFd;
620
621    #[test]
622    fn test_vfs_async_invalid_header() {
623        let vfs = Vfs::default();
624        let server = Server::new(vfs);
625        let mut r_buf = [0u8];
626        let r = Reader::<()>::from_fuse_buffer(FuseBuf::new(&mut r_buf)).unwrap();
627        let file = vmm_sys_util::tempfile::TempFile::new().unwrap();
628        let mut buf = vec![0x0u8; 1000];
629        let w = FuseDevWriter::<()>::new(file.as_file().as_raw_fd(), &mut buf)
630            .unwrap()
631            .into();
632
633        let result = crate::async_runtime::block_on(async {
634            unsafe { server.async_handle_message(r, w, None, None).await }
635        });
636        assert!(result.is_err());
637    }
638}