1use std::io;
5use std::mem::size_of;
6use std::sync::Arc;
7use std::time::Duration;
8
9use async_trait::async_trait;
10use vm_memory::ByteValued;
11
12use crate::abi::fuse_abi::{
13 stat64, AttrOut, CreateIn, EntryOut, FallocateIn, FsyncIn, GetattrIn, Opcode, OpenIn, OpenOut,
14 OutHeader, ReadIn, SetattrIn, SetattrValid, WriteIn, WriteOut, FATTR_FH, GETATTR_FH,
15 KERNEL_MINOR_VERSION_LOOKUP_NEGATIVE_ENTRY_ZERO, READ_LOCKOWNER, WRITE_CACHE, WRITE_LOCKOWNER,
16};
17use crate::api::filesystem::{
18 AsyncFileSystem, AsyncZeroCopyReader, AsyncZeroCopyWriter, ZeroCopyReader, ZeroCopyWriter,
19};
20use crate::api::server::{
21 MetricsHook, Server, ServerUtil, SrvContext, BUFFER_HEADER_SIZE, MAX_BUFFER_SIZE,
22};
23use crate::file_traits::{AsyncFileReadWriteVolatile, FileReadWriteVolatile};
24use crate::transport::{FsCacheReqHandler, Reader, Writer};
25use crate::{bytes_to_cstr, encode_io_error_kind, BitmapSlice, Error, Result};
26
27struct AsyncZcReader<'a, S: BitmapSlice = ()>(Reader<'a, S>);
28
29unsafe impl<'a, S: BitmapSlice> Send for AsyncZcReader<'a, S> {}
33
34#[async_trait(?Send)]
35impl<'a, S: BitmapSlice> AsyncZeroCopyReader for AsyncZcReader<'a, S> {
36 async fn async_read_to(
37 &mut self,
38 f: Arc<dyn AsyncFileReadWriteVolatile>,
39 count: usize,
40 off: u64,
41 ) -> io::Result<usize> {
42 self.0.async_read_to_at(&f, count, off).await
43 }
44}
45
46impl<'a, S: BitmapSlice> ZeroCopyReader for AsyncZcReader<'a, S> {
47 fn read_to(
48 &mut self,
49 f: &mut dyn FileReadWriteVolatile,
50 count: usize,
51 off: u64,
52 ) -> io::Result<usize> {
53 self.0.read_to_at(f, count, off)
54 }
55}
56
57impl<'a, S: BitmapSlice> io::Read for AsyncZcReader<'a, S> {
58 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
59 self.0.read(buf)
60 }
61}
62
63struct AsyncZcWriter<'a, S: BitmapSlice = ()>(Writer<'a, S>);
64
65unsafe impl<'a, S: BitmapSlice> Send for AsyncZcWriter<'a, S> {}
69
70#[async_trait(?Send)]
71impl<'a, S: BitmapSlice> AsyncZeroCopyWriter for AsyncZcWriter<'a, S> {
72 async fn async_write_from(
73 &mut self,
74 f: Arc<dyn AsyncFileReadWriteVolatile>,
75 count: usize,
76 off: u64,
77 ) -> io::Result<usize> {
78 self.0.async_write_from_at(&f, count, off).await
79 }
80}
81
82impl<'a, S: BitmapSlice> ZeroCopyWriter for AsyncZcWriter<'a, S> {
83 fn write_from(
84 &mut self,
85 f: &mut dyn FileReadWriteVolatile,
86 count: usize,
87 off: u64,
88 ) -> io::Result<usize> {
89 self.0.write_from_at(f, count, off)
90 }
91
92 fn available_bytes(&self) -> usize {
93 self.0.available_bytes()
94 }
95}
96
97impl<'a, S: BitmapSlice> io::Write for AsyncZcWriter<'a, S> {
98 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
99 self.0.write(buf)
100 }
101
102 fn flush(&mut self) -> io::Result<()> {
103 self.0.flush()
104 }
105}
106
107impl<F: AsyncFileSystem + Sync> Server<F> {
108 #[allow(unused_variables)]
120 pub async unsafe fn async_handle_message<S: BitmapSlice>(
121 &self,
122 mut r: Reader<'_, S>,
123 w: Writer<'_, S>,
124 vu_req: Option<&mut dyn FsCacheReqHandler>,
125 hook: Option<&dyn MetricsHook>,
126 ) -> Result<usize> {
127 let in_header = r.read_obj().map_err(Error::DecodeMessage)?;
128 let mut ctx = SrvContext::<F, S>::new(in_header, r, w);
129 if ctx.in_header.len > (MAX_BUFFER_SIZE + BUFFER_HEADER_SIZE)
130 || ctx.w.available_bytes() < size_of::<OutHeader>()
131 {
132 return ctx
133 .async_do_reply_error(io::Error::from_raw_os_error(libc::ENOMEM), true)
134 .await;
135 }
136 let in_header = &ctx.in_header;
137
138 trace!(
139 "fuse: new req {:?}: {:?}",
140 Opcode::from(in_header.opcode),
141 in_header
142 );
143
144 if let Some(h) = hook {
145 h.collect(&in_header);
146 }
147
148 let res = match in_header.opcode {
149 x if x == Opcode::Lookup as u32 => self.async_lookup(ctx).await,
150 x if x == Opcode::Forget as u32 => self.forget(ctx), x if x == Opcode::Getattr as u32 => self.async_getattr(ctx).await,
152 x if x == Opcode::Setattr as u32 => self.async_setattr(ctx).await,
153 x if x == Opcode::Readlink as u32 => self.readlink(ctx),
154 x if x == Opcode::Symlink as u32 => self.symlink(ctx),
155 x if x == Opcode::Mknod as u32 => self.mknod(ctx),
156 x if x == Opcode::Mkdir as u32 => self.mkdir(ctx),
157 x if x == Opcode::Unlink as u32 => self.unlink(ctx),
158 x if x == Opcode::Rmdir as u32 => self.rmdir(ctx),
159 x if x == Opcode::Rename as u32 => self.rename(ctx),
160 x if x == Opcode::Link as u32 => self.link(ctx),
161 x if x == Opcode::Open as u32 => self.async_open(ctx).await,
162 x if x == Opcode::Read as u32 => self.async_read(ctx).await,
163 x if x == Opcode::Write as u32 => self.async_write(ctx).await,
164 x if x == Opcode::Statfs as u32 => self.statfs(ctx),
165 x if x == Opcode::Release as u32 => self.release(ctx),
166 x if x == Opcode::Fsync as u32 => self.async_fsync(ctx).await,
167 x if x == Opcode::Setxattr as u32 => self.setxattr(ctx),
168 x if x == Opcode::Getxattr as u32 => self.getxattr(ctx),
169 x if x == Opcode::Listxattr as u32 => self.listxattr(ctx),
170 x if x == Opcode::Removexattr as u32 => self.removexattr(ctx),
171 x if x == Opcode::Flush as u32 => self.flush(ctx),
172 x if x == Opcode::Init as u32 => self.init(ctx),
173 x if x == Opcode::Opendir as u32 => self.opendir(ctx),
174 x if x == Opcode::Readdir as u32 => self.readdir(ctx),
175 x if x == Opcode::Releasedir as u32 => self.releasedir(ctx),
176 x if x == Opcode::Fsyncdir as u32 => self.async_fsyncdir(ctx).await,
177 x if x == Opcode::Getlk as u32 => self.getlk(ctx),
178 x if x == Opcode::Setlk as u32 => self.setlk(ctx),
179 x if x == Opcode::Setlkw as u32 => self.setlkw(ctx),
180 x if x == Opcode::Access as u32 => self.access(ctx),
181 x if x == Opcode::Create as u32 => self.async_create(ctx).await,
182 x if x == Opcode::Bmap as u32 => self.bmap(ctx),
183 x if x == Opcode::Ioctl as u32 => self.ioctl(ctx),
184 x if x == Opcode::Poll as u32 => self.poll(ctx),
185 x if x == Opcode::NotifyReply as u32 => self.notify_reply(ctx),
186 x if x == Opcode::BatchForget as u32 => self.batch_forget(ctx),
187 x if x == Opcode::Fallocate as u32 => self.async_fallocate(ctx).await,
188 #[cfg(target_os = "linux")]
189 x if x == Opcode::Readdirplus as u32 => self.readdirplus(ctx),
190 #[cfg(target_os = "linux")]
191 x if x == Opcode::Rename2 as u32 => self.rename2(ctx),
192 #[cfg(target_os = "linux")]
193 x if x == Opcode::Lseek as u32 => self.lseek(ctx),
194 #[cfg(feature = "virtiofs")]
195 x if x == Opcode::SetupMapping as u32 => self.setupmapping(ctx, vu_req),
196 #[cfg(feature = "virtiofs")]
197 x if x == Opcode::RemoveMapping as u32 => self.removemapping(ctx, vu_req),
198 x => match x {
200 x if x == Opcode::Interrupt as u32 => {
201 self.interrupt(ctx);
202 Ok(0)
203 }
204 x if x == Opcode::Destroy as u32 => {
205 self.destroy(ctx);
206 Ok(0)
207 }
208 _ => {
209 ctx.async_reply_error(io::Error::from_raw_os_error(libc::ENOSYS))
210 .await
211 }
212 },
213 };
214
215 if let Some(h) = hook {
219 h.release(None);
220 }
221
222 res
223 }
224
225 async fn async_lookup<S: BitmapSlice>(&self, mut ctx: SrvContext<'_, F, S>) -> Result<usize> {
226 let buf = ServerUtil::get_message_body(&mut ctx.r, &ctx.in_header, 0)?;
227 let name = match bytes_to_cstr(buf.as_ref()) {
228 Ok(name) => name,
229 Err(e) => {
230 error!("fuse: bytes to cstr error: {:?}, {:?}", buf, e);
231 let _ = ctx
232 .async_reply_error(io::Error::from_raw_os_error(libc::EINVAL))
233 .await;
234 return Err(e);
235 }
236 };
237
238 let version = self.vers.load();
239 let result = self
240 .fs
241 .async_lookup(ctx.context(), ctx.nodeid(), name)
242 .await;
243
244 match result {
245 Ok(entry)
247 if version.minor < KERNEL_MINOR_VERSION_LOOKUP_NEGATIVE_ENTRY_ZERO
248 && entry.inode == 0 =>
249 {
250 ctx.async_reply_error(io::Error::from_raw_os_error(libc::ENOENT))
251 .await
252 }
253 Ok(entry) => {
254 let out = EntryOut::from(entry);
255 ctx.async_reply_ok(Some(out), None).await
256 }
257 Err(e) => ctx.async_reply_error(e).await,
258 }
259 }
260
261 async fn async_getattr<S: BitmapSlice>(&self, mut ctx: SrvContext<'_, F, S>) -> Result<usize> {
262 let GetattrIn { flags, fh, .. } = ctx.r.read_obj().map_err(Error::DecodeMessage)?;
263 let handle = if (flags & GETATTR_FH) != 0 {
264 Some(fh.into())
265 } else {
266 None
267 };
268 let result = self
269 .fs
270 .async_getattr(ctx.context(), ctx.nodeid(), handle)
271 .await;
272
273 ctx.async_handle_attr_result(result).await
274 }
275
276 async fn async_setattr<S: BitmapSlice>(&self, mut ctx: SrvContext<'_, F, S>) -> Result<usize> {
277 let setattr_in: SetattrIn = ctx.r.read_obj().map_err(Error::DecodeMessage)?;
278 let handle = if setattr_in.valid & FATTR_FH != 0 {
279 Some(setattr_in.fh.into())
280 } else {
281 None
282 };
283 let valid = SetattrValid::from_bits_truncate(setattr_in.valid);
284 let st: stat64 = setattr_in.into();
285 let result = self
286 .fs
287 .async_setattr(ctx.context(), ctx.nodeid(), st, handle, valid)
288 .await;
289
290 ctx.async_handle_attr_result(result).await
291 }
292
293 async fn async_open<S: BitmapSlice>(&self, mut ctx: SrvContext<'_, F, S>) -> Result<usize> {
294 let OpenIn { flags, fuse_flags } = ctx.r.read_obj().map_err(Error::DecodeMessage)?;
295 let result = self
296 .fs
297 .async_open(ctx.context(), ctx.nodeid(), flags, fuse_flags)
298 .await;
299
300 match result {
301 Ok((handle, opts)) => {
302 let out = OpenOut {
303 fh: handle.map(Into::into).unwrap_or(0),
304 open_flags: opts.bits(),
305 ..Default::default()
306 };
307
308 ctx.async_reply_ok(Some(out), None).await
309 }
310 Err(e) => ctx.async_reply_error(e).await,
311 }
312 }
313
314 async fn async_read<S: BitmapSlice>(&self, mut ctx: SrvContext<'_, F, S>) -> Result<usize> {
315 let ReadIn {
316 fh,
317 offset,
318 size,
319 read_flags,
320 lock_owner,
321 flags,
322 ..
323 } = ctx.r.read_obj().map_err(Error::DecodeMessage)?;
324
325 let owner = if read_flags & READ_LOCKOWNER != 0 {
326 Some(lock_owner)
327 } else {
328 None
329 };
330
331 let w2 = match ctx.w.split_at(size_of::<OutHeader>()) {
333 Ok(v) => v,
334 Err(_e) => return Err(Error::InvalidHeaderLength),
335 };
336 let mut data_writer = AsyncZcWriter(w2);
337 let result = self
338 .fs
339 .async_read(
340 ctx.context(),
341 ctx.nodeid(),
342 fh.into(),
343 &mut data_writer,
344 size,
345 offset,
346 owner,
347 flags,
348 )
349 .await;
350
351 match result {
352 Ok(count) => {
353 let out = OutHeader {
356 len: (size_of::<OutHeader>() + count) as u32,
357 error: 0,
358 unique: ctx.unique(),
359 };
360
361 ctx.w
362 .async_write_all(out.as_slice())
363 .await
364 .map_err(Error::EncodeMessage)?;
365 ctx.w
366 .async_commit(Some(&data_writer.0))
367 .await
368 .map_err(Error::EncodeMessage)?;
369 Ok(out.len as usize)
370 }
371 Err(e) => ctx.async_reply_error_explicit(e).await,
372 }
373 }
374
375 async fn async_write<S: BitmapSlice>(&self, mut ctx: SrvContext<'_, F, S>) -> Result<usize> {
376 let WriteIn {
377 fh,
378 offset,
379 size,
380 fuse_flags,
381 lock_owner,
382 flags,
383 ..
384 } = ctx.r.read_obj().map_err(Error::DecodeMessage)?;
385
386 if size > MAX_BUFFER_SIZE {
387 return ctx
388 .async_reply_error_explicit(io::Error::from_raw_os_error(libc::ENOMEM))
389 .await;
390 }
391
392 let owner = if fuse_flags & WRITE_LOCKOWNER != 0 {
393 Some(lock_owner)
394 } else {
395 None
396 };
397 let delayed_write = fuse_flags & WRITE_CACHE != 0;
398 let mut data_reader = AsyncZcReader(ctx.take_reader());
399 let result = self
400 .fs
401 .async_write(
402 ctx.context(),
403 ctx.nodeid(),
404 fh.into(),
405 &mut data_reader,
406 size,
407 offset,
408 owner,
409 delayed_write,
410 flags,
411 fuse_flags,
412 )
413 .await;
414
415 match result {
416 Ok(count) => {
417 let out = WriteOut {
418 size: count as u32,
419 ..Default::default()
420 };
421 ctx.async_reply_ok(Some(out), None).await
422 }
423 Err(e) => ctx.async_reply_error_explicit(e).await,
424 }
425 }
426
427 async fn async_fsync<S: BitmapSlice>(&self, mut ctx: SrvContext<'_, F, S>) -> Result<usize> {
428 let FsyncIn {
429 fh, fsync_flags, ..
430 } = ctx.r.read_obj().map_err(Error::DecodeMessage)?;
431 let datasync = fsync_flags & 0x1 != 0;
432
433 match self
434 .fs
435 .async_fsync(ctx.context(), ctx.nodeid(), datasync, fh.into())
436 .await
437 {
438 Ok(()) => ctx.async_reply_ok(None::<u8>, None).await,
439 Err(e) => ctx.async_reply_error(e).await,
440 }
441 }
442
443 async fn async_fsyncdir<S: BitmapSlice>(&self, mut ctx: SrvContext<'_, F, S>) -> Result<usize> {
444 let FsyncIn {
445 fh, fsync_flags, ..
446 } = ctx.r.read_obj().map_err(Error::DecodeMessage)?;
447 let datasync = fsync_flags & 0x1 != 0;
448 let result = self
449 .fs
450 .async_fsyncdir(ctx.context(), ctx.nodeid(), datasync, fh.into())
451 .await;
452
453 match result {
454 Ok(()) => ctx.async_reply_ok(None::<u8>, None).await,
455 Err(e) => ctx.async_reply_error(e).await,
456 }
457 }
458
459 async fn async_create<S: BitmapSlice>(&self, mut ctx: SrvContext<'_, F, S>) -> Result<usize> {
460 let args: CreateIn = ctx.r.read_obj().map_err(Error::DecodeMessage)?;
461 let buf = ServerUtil::get_message_body(&mut ctx.r, &ctx.in_header, size_of::<CreateIn>())?;
462 let name = match bytes_to_cstr(buf.as_ref()) {
463 Ok(name) => name,
464 Err(e) => {
465 error!("fuse: bytes to cstr error: {:?}, {:?}", buf, e);
466 let _ = ctx
467 .async_reply_error(io::Error::from_raw_os_error(libc::EINVAL))
468 .await;
469 return Err(e);
470 }
471 };
472
473 let result = self
474 .fs
475 .async_create(ctx.context(), ctx.nodeid(), name, args)
476 .await;
477
478 match result {
479 Ok((entry, handle, opts)) => {
480 let entry_out = EntryOut {
481 nodeid: entry.inode,
482 generation: entry.generation,
483 entry_valid: entry.entry_timeout.as_secs(),
484 attr_valid: entry.attr_timeout.as_secs(),
485 entry_valid_nsec: entry.entry_timeout.subsec_nanos(),
486 attr_valid_nsec: entry.attr_timeout.subsec_nanos(),
487 attr: entry.attr.into(),
488 };
489 let open_out = OpenOut {
490 fh: handle.map(Into::into).unwrap_or(0),
491 open_flags: opts.bits(),
492 ..Default::default()
493 };
494
495 ctx.async_reply_ok(Some(entry_out), Some(open_out.as_slice()))
497 .await
498 }
499 Err(e) => ctx.async_reply_error(e).await,
500 }
501 }
502
503 async fn async_fallocate<S: BitmapSlice>(
504 &self,
505 mut ctx: SrvContext<'_, F, S>,
506 ) -> Result<usize> {
507 let FallocateIn {
508 fh,
509 offset,
510 length,
511 mode,
512 ..
513 } = ctx.r.read_obj().map_err(Error::DecodeMessage)?;
514 let result = self
515 .fs
516 .async_fallocate(ctx.context(), ctx.nodeid(), fh.into(), mode, offset, length)
517 .await;
518
519 match result {
520 Ok(()) => ctx.async_reply_ok(None::<u8>, None).await,
521 Err(e) => ctx.async_reply_error(e).await,
522 }
523 }
524}
525
526impl<'a, F: AsyncFileSystem, S: BitmapSlice> SrvContext<'a, F, S> {
527 async fn async_reply_ok<T: ByteValued>(
528 &mut self,
529 out: Option<T>,
530 data: Option<&[u8]>,
531 ) -> Result<usize> {
532 let data2 = out.as_ref().map(|v| v.as_slice()).unwrap_or(&[]);
533 let data3 = data.unwrap_or(&[]);
534 let len = size_of::<OutHeader>() + data2.len() + data3.len();
535 let header = OutHeader {
536 len: len as u32,
537 error: 0,
538 unique: self.in_header.unique,
539 };
540 trace!("fuse: new reply {:?}", header);
541
542 let result = match (data2.len(), data3.len()) {
543 (0, 0) => self.w.async_write(header.as_slice()).await,
544 (0, _) => self.w.async_write2(header.as_slice(), data3).await,
545 (_, 0) => self.w.async_write2(header.as_slice(), data2).await,
546 (_, _) => self.w.async_write3(header.as_slice(), data2, data3).await,
547 };
548 result.map_err(Error::EncodeMessage)?;
549
550 debug_assert_eq!(len, self.w.bytes_written());
551 Ok(self.w.bytes_written())
552 }
553
554 async fn async_do_reply_error(&mut self, err: io::Error, internal_err: bool) -> Result<usize> {
555 let header = OutHeader {
556 len: size_of::<OutHeader>() as u32,
557 error: -err
558 .raw_os_error()
559 .unwrap_or_else(|| encode_io_error_kind(err.kind())),
560 unique: self.in_header.unique,
561 };
562
563 trace!("fuse: reply error header {:?}, error {:?}", header, err);
564 if internal_err {
565 error!("fuse: reply error header {:?}, error {:?}", header, err);
566 }
567 self.w
568 .async_write_all(header.as_slice())
569 .await
570 .map_err(Error::EncodeMessage)?;
571
572 self.w
574 .async_commit(None)
575 .await
576 .map(|_| {
577 debug_assert_eq!(header.len as usize, self.w.bytes_written());
578 self.w.bytes_written()
579 })
580 .map_err(Error::EncodeMessage)
581 }
582
583 async fn async_reply_error(&mut self, err: io::Error) -> Result<usize> {
586 self.async_do_reply_error(err, false).await
587 }
588
589 async fn async_reply_error_explicit(&mut self, err: io::Error) -> Result<usize> {
590 self.async_do_reply_error(err, true).await
591 }
592
593 async fn async_handle_attr_result(
594 &mut self,
595 result: io::Result<(stat64, Duration)>,
596 ) -> Result<usize> {
597 match result {
598 Ok((st, timeout)) => {
599 let out = AttrOut {
600 attr_valid: timeout.as_secs(),
601 attr_valid_nsec: timeout.subsec_nanos(),
602 dummy: 0,
603 attr: st.into(),
604 };
605 self.async_reply_ok(Some(out), None).await
606 }
607 Err(e) => self.async_reply_error(e).await,
608 }
609 }
610}
611
612#[cfg(feature = "fusedev")]
613#[cfg(test)]
614mod tests {
615 use super::*;
616 use crate::api::Vfs;
617 use crate::transport::{FuseBuf, FuseDevWriter};
618
619 use std::os::unix::io::AsRawFd;
620
621 #[test]
622 fn test_vfs_async_invalid_header() {
623 let vfs = Vfs::default();
624 let server = Server::new(vfs);
625 let mut r_buf = [0u8];
626 let r = Reader::<()>::from_fuse_buffer(FuseBuf::new(&mut r_buf)).unwrap();
627 let file = vmm_sys_util::tempfile::TempFile::new().unwrap();
628 let mut buf = vec![0x0u8; 1000];
629 let w = FuseDevWriter::<()>::new(file.as_file().as_raw_fd(), &mut buf)
630 .unwrap()
631 .into();
632
633 let result = crate::async_runtime::block_on(async {
634 unsafe { server.async_handle_message(r, w, None, None).await }
635 });
636 assert!(result.is_err());
637 }
638}