1use std::error;
7use std::fs::File;
8use std::io;
9use std::os::fd::AsFd;
10#[cfg(feature = "postcopy")]
11use std::os::fd::FromRawFd;
12use std::os::unix::io::AsRawFd;
13use std::sync::Arc;
14use std::thread;
15
16use crate::bitmap::{BitmapReplace, MemRegionBitmap, MmapLogReg};
17#[cfg(feature = "postcopy")]
18use userfaultfd::{Uffd, UffdBuilder};
19use vhost::vhost_user::message::{
20 VhostTransferStateDirection, VhostTransferStatePhase, VhostUserConfigFlags, VhostUserLog,
21 VhostUserMemoryRegion, VhostUserProtocolFeatures, VhostUserSharedMsg,
22 VhostUserSingleMemoryRegion, VhostUserVirtioFeatures, VhostUserVringAddrFlags,
23 VhostUserVringState,
24};
25use vhost::vhost_user::GpuBackend;
26use vhost::vhost_user::{
27 Backend, Error as VhostUserError, Result as VhostUserResult, VhostUserBackendReqHandlerMut,
28};
29
30use virtio_bindings::bindings::virtio_ring::VIRTIO_RING_F_EVENT_IDX;
31use virtio_queue::{Error as VirtQueError, QueueT};
32use vm_memory::mmap::NewBitmap;
33use vm_memory::{GuestAddress, GuestAddressSpace, GuestMemory, GuestMemoryMmap, GuestRegionMmap};
34use vmm_sys_util::epoll::EventSet;
35
36use super::backend::VhostUserBackend;
37use super::event_loop::VringEpollHandler;
38use super::event_loop::{VringEpollError, VringEpollResult};
39use super::vring::VringT;
40use super::GM;
41
42const MAX_MEM_SLOTS: u64 = 509;
46
47#[derive(Debug)]
48pub enum VhostUserHandlerError {
50 CreateVring(VirtQueError),
52 CreateEpollHandler(VringEpollError),
54 SpawnVringWorker(io::Error),
56 MissingMemoryMapping,
58}
59
60impl std::fmt::Display for VhostUserHandlerError {
61 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
62 match self {
63 VhostUserHandlerError::CreateVring(e) => {
64 write!(f, "failed to create vring: {}", e)
65 }
66 VhostUserHandlerError::CreateEpollHandler(e) => {
67 write!(f, "failed to create vring epoll handler: {}", e)
68 }
69 VhostUserHandlerError::SpawnVringWorker(e) => {
70 write!(f, "failed spawning the vring worker: {}", e)
71 }
72 VhostUserHandlerError::MissingMemoryMapping => write!(f, "Missing memory mapping"),
73 }
74 }
75}
76
77impl error::Error for VhostUserHandlerError {}
78
79pub type VhostUserHandlerResult<T> = std::result::Result<T, VhostUserHandlerError>;
81
82#[derive(Debug)]
83struct AddrMapping {
84 #[cfg(feature = "postcopy")]
85 local_addr: u64,
86 vmm_addr: u64,
87 size: u64,
88 gpa_base: u64,
89}
90
91pub struct VhostUserHandler<T: VhostUserBackend> {
92 backend: T,
93 handlers: Vec<Arc<VringEpollHandler<T>>>,
94 owned: bool,
95 features_acked: bool,
96 acked_features: u64,
97 acked_protocol_features: u64,
98 num_queues: usize,
99 max_queue_size: usize,
100 queues_per_thread: Vec<u64>,
101 mappings: Vec<AddrMapping>,
102 atomic_mem: GM<T::Bitmap>,
103 vrings: Vec<T::Vring>,
104 #[cfg(feature = "postcopy")]
105 uffd: Option<Uffd>,
106 worker_threads: Vec<thread::JoinHandle<VringEpollResult<()>>>,
107}
108
109impl<T> VhostUserHandler<T>
111where
112 T: VhostUserBackend + Clone + 'static,
113 T::Vring: Clone + Send + Sync + 'static,
114 T::Bitmap: Clone + Send + Sync + 'static,
115{
116 pub(crate) fn new(backend: T, atomic_mem: GM<T::Bitmap>) -> VhostUserHandlerResult<Self> {
117 let num_queues = backend.num_queues();
118 let max_queue_size = backend.max_queue_size();
119 let queues_per_thread = backend.queues_per_thread();
120
121 let mut vrings = Vec::new();
122 for _ in 0..num_queues {
123 let vring = T::Vring::new(atomic_mem.clone(), max_queue_size as u16)
124 .map_err(VhostUserHandlerError::CreateVring)?;
125 vrings.push(vring);
126 }
127
128 let mut handlers = Vec::new();
129 let mut worker_threads = Vec::new();
130 for (thread_id, queues_mask) in queues_per_thread.iter().enumerate() {
131 let mut thread_vrings = Vec::new();
132 for (index, vring) in vrings.iter().enumerate() {
133 if (queues_mask >> index) & 1u64 == 1u64 {
134 thread_vrings.push(vring.clone());
135 }
136 }
137
138 let handler = Arc::new(
139 VringEpollHandler::new(backend.clone(), thread_vrings, thread_id)
140 .map_err(VhostUserHandlerError::CreateEpollHandler)?,
141 );
142 let handler2 = handler.clone();
143 let worker_thread = thread::Builder::new()
144 .name("vring_worker".to_string())
145 .spawn(move || handler2.run())
146 .map_err(VhostUserHandlerError::SpawnVringWorker)?;
147
148 handlers.push(handler);
149 worker_threads.push(worker_thread);
150 }
151
152 Ok(VhostUserHandler {
153 backend,
154 handlers,
155 owned: false,
156 features_acked: false,
157 acked_features: 0,
158 acked_protocol_features: 0,
159 num_queues,
160 max_queue_size,
161 queues_per_thread,
162 mappings: Vec::new(),
163 atomic_mem,
164 vrings,
165 #[cfg(feature = "postcopy")]
166 uffd: None,
167 worker_threads,
168 })
169 }
170}
171
172impl<T: VhostUserBackend> VhostUserHandler<T> {
173 pub(crate) fn send_exit_event(&self) {
174 for handler in self.handlers.iter() {
175 handler.send_exit_event();
176 }
177 }
178
179 fn vmm_va_to_gpa(&self, vmm_va: u64) -> VhostUserHandlerResult<u64> {
180 for mapping in self.mappings.iter() {
181 if vmm_va >= mapping.vmm_addr && vmm_va < mapping.vmm_addr + mapping.size {
182 return Ok(vmm_va - mapping.vmm_addr + mapping.gpa_base);
183 }
184 }
185
186 Err(VhostUserHandlerError::MissingMemoryMapping)
187 }
188}
189
190impl<T> VhostUserHandler<T>
191where
192 T: VhostUserBackend,
193{
194 pub(crate) fn get_epoll_handlers(&self) -> Vec<Arc<VringEpollHandler<T>>> {
195 self.handlers.clone()
196 }
197
198 fn vring_needs_init(&self, vring: &T::Vring) -> bool {
199 let vring_state = vring.get_ref();
200
201 !vring_state.get_queue().ready() && vring_state.get_kick().is_some()
204 }
205
206 fn initialize_vring(&self, vring: &T::Vring, index: u8) -> VhostUserResult<()> {
207 assert!(vring.get_ref().get_kick().is_some());
208
209 if let Some(fd) = vring.get_ref().get_kick() {
210 for (thread_index, queues_mask) in self.queues_per_thread.iter().enumerate() {
211 let shifted_queues_mask = queues_mask >> index;
212 if shifted_queues_mask & 1u64 == 1u64 {
213 let evt_idx = queues_mask.count_ones() - shifted_queues_mask.count_ones();
214 self.handlers[thread_index]
215 .register_event(fd.as_raw_fd(), EventSet::IN, u64::from(evt_idx))
216 .map_err(VhostUserError::ReqHandlerError)?;
217 break;
218 }
219 }
220 }
221
222 vring.set_queue_ready(true);
223
224 Ok(())
225 }
226
227 fn check_feature(&self, feat: VhostUserVirtioFeatures) -> VhostUserResult<()> {
229 if self.acked_features & feat.bits() != 0 {
230 Ok(())
231 } else {
232 Err(VhostUserError::InactiveFeature(feat))
233 }
234 }
235}
236
237impl<T: VhostUserBackend> VhostUserBackendReqHandlerMut for VhostUserHandler<T>
238where
239 T::Bitmap: BitmapReplace + NewBitmap + Clone,
240{
241 fn set_owner(&mut self) -> VhostUserResult<()> {
242 if self.owned {
243 return Err(VhostUserError::InvalidOperation("already claimed"));
244 }
245 self.owned = true;
246 Ok(())
247 }
248
249 fn reset_owner(&mut self) -> VhostUserResult<()> {
250 self.owned = false;
251 self.features_acked = false;
252 self.acked_features = 0;
253 self.acked_protocol_features = 0;
254 Ok(())
255 }
256
257 fn reset_device(&mut self) -> VhostUserResult<()> {
258 for vring in self.vrings.iter_mut() {
260 vring.set_enabled(false);
261 }
262
263 self.features_acked = false;
265 self.acked_features = 0;
266 self.backend.reset_device();
267 Ok(())
268 }
269
270 fn get_features(&mut self) -> VhostUserResult<u64> {
271 Ok(self.backend.features())
272 }
273
274 fn set_features(&mut self, features: u64) -> VhostUserResult<()> {
275 if (features & !self.backend.features()) != 0 {
276 return Err(VhostUserError::InvalidParam);
277 }
278
279 self.acked_features = features;
280 self.features_acked = true;
281
282 if self.acked_features & VhostUserVirtioFeatures::PROTOCOL_FEATURES.bits() == 0 {
291 for vring in self.vrings.iter_mut() {
292 vring.set_enabled(true);
293 }
294 }
295
296 let event_idx: bool = (self.acked_features & (1 << VIRTIO_RING_F_EVENT_IDX)) != 0;
297 for vring in self.vrings.iter_mut() {
298 vring.set_queue_event_idx(event_idx);
299 }
300 self.backend.set_event_idx(event_idx);
301 self.backend.acked_features(self.acked_features);
302
303 Ok(())
304 }
305
306 fn set_mem_table(
307 &mut self,
308 ctx: &[VhostUserMemoryRegion],
309 files: Vec<File>,
310 ) -> VhostUserResult<()> {
311 let mut regions = Vec::new();
314 let mut mappings: Vec<AddrMapping> = Vec::new();
315
316 for (region, file) in ctx.iter().zip(files) {
317 let guest_region = GuestRegionMmap::new(
318 region.mmap_region(file)?,
319 GuestAddress(region.guest_phys_addr),
320 )
321 .map_err(|e| {
322 VhostUserError::ReqHandlerError(io::Error::new(io::ErrorKind::Other, e))
323 })?;
324 mappings.push(AddrMapping {
325 #[cfg(feature = "postcopy")]
326 local_addr: guest_region.as_ptr() as u64,
327 vmm_addr: region.user_addr,
328 size: region.memory_size,
329 gpa_base: region.guest_phys_addr,
330 });
331 regions.push(guest_region);
332 }
333
334 let mem = GuestMemoryMmap::from_regions(regions).map_err(|e| {
335 VhostUserError::ReqHandlerError(io::Error::new(io::ErrorKind::Other, e))
336 })?;
337
338 self.atomic_mem.lock().unwrap().replace(mem);
341
342 self.backend
343 .update_memory(self.atomic_mem.clone())
344 .map_err(|e| {
345 VhostUserError::ReqHandlerError(io::Error::new(io::ErrorKind::Other, e))
346 })?;
347 self.mappings = mappings;
348
349 Ok(())
350 }
351
352 fn set_vring_num(&mut self, index: u32, num: u32) -> VhostUserResult<()> {
353 let vring = self
354 .vrings
355 .get(index as usize)
356 .ok_or(VhostUserError::InvalidParam)?;
357
358 if num == 0 || num as usize > self.max_queue_size {
359 return Err(VhostUserError::InvalidParam);
360 }
361 vring.set_queue_size(num as u16);
362 Ok(())
363 }
364
365 fn set_vring_addr(
366 &mut self,
367 index: u32,
368 _flags: VhostUserVringAddrFlags,
369 descriptor: u64,
370 used: u64,
371 available: u64,
372 _log: u64,
373 ) -> VhostUserResult<()> {
374 let vring = self
375 .vrings
376 .get(index as usize)
377 .ok_or(VhostUserError::InvalidParam)?;
378
379 if !self.mappings.is_empty() {
380 let desc_table = self.vmm_va_to_gpa(descriptor).map_err(|e| {
381 VhostUserError::ReqHandlerError(io::Error::new(io::ErrorKind::Other, e))
382 })?;
383 let avail_ring = self.vmm_va_to_gpa(available).map_err(|e| {
384 VhostUserError::ReqHandlerError(io::Error::new(io::ErrorKind::Other, e))
385 })?;
386 let used_ring = self.vmm_va_to_gpa(used).map_err(|e| {
387 VhostUserError::ReqHandlerError(io::Error::new(io::ErrorKind::Other, e))
388 })?;
389 vring
390 .set_queue_info(desc_table, avail_ring, used_ring)
391 .map_err(|_| VhostUserError::InvalidParam)?;
392
393 let idx = vring
404 .queue_used_idx()
405 .map_err(|_| VhostUserError::BackendInternalError)?;
406 vring.set_queue_next_used(idx);
407
408 Ok(())
409 } else {
410 Err(VhostUserError::InvalidParam)
411 }
412 }
413
414 fn set_vring_base(&mut self, index: u32, base: u32) -> VhostUserResult<()> {
415 let vring = self
416 .vrings
417 .get(index as usize)
418 .ok_or(VhostUserError::InvalidParam)?;
419
420 vring.set_queue_next_avail(base as u16);
421
422 Ok(())
423 }
424
425 fn get_vring_base(&mut self, index: u32) -> VhostUserResult<VhostUserVringState> {
426 let vring = self
427 .vrings
428 .get(index as usize)
429 .ok_or(VhostUserError::InvalidParam)?;
430
431 vring.set_queue_ready(false);
437
438 if let Some(fd) = vring.get_ref().get_kick() {
439 for (thread_index, queues_mask) in self.queues_per_thread.iter().enumerate() {
440 let shifted_queues_mask = queues_mask >> index;
441 if shifted_queues_mask & 1u64 == 1u64 {
442 let evt_idx = queues_mask.count_ones() - shifted_queues_mask.count_ones();
443 self.handlers[thread_index]
444 .unregister_event(fd.as_raw_fd(), EventSet::IN, u64::from(evt_idx))
445 .map_err(VhostUserError::ReqHandlerError)?;
446 break;
447 }
448 }
449 }
450
451 let next_avail = vring.queue_next_avail();
452
453 vring.set_kick(None);
454 vring.set_call(None);
455
456 Ok(VhostUserVringState::new(index, u32::from(next_avail)))
457 }
458
459 fn set_vring_kick(&mut self, index: u8, file: Option<File>) -> VhostUserResult<()> {
460 let vring = self
461 .vrings
462 .get(index as usize)
463 .ok_or(VhostUserError::InvalidParam)?;
464
465 vring.set_kick(file);
470
471 if self.vring_needs_init(vring) {
472 self.initialize_vring(vring, index)?;
473 }
474
475 Ok(())
476 }
477
478 fn set_vring_call(&mut self, index: u8, file: Option<File>) -> VhostUserResult<()> {
479 let vring = self
480 .vrings
481 .get(index as usize)
482 .ok_or(VhostUserError::InvalidParam)?;
483
484 vring.set_call(file);
485
486 if self.vring_needs_init(vring) {
487 self.initialize_vring(vring, index)?;
488 }
489
490 Ok(())
491 }
492
493 fn set_vring_err(&mut self, index: u8, file: Option<File>) -> VhostUserResult<()> {
494 let vring = self
495 .vrings
496 .get(index as usize)
497 .ok_or(VhostUserError::InvalidParam)?;
498
499 vring.set_err(file);
500
501 Ok(())
502 }
503
504 fn get_protocol_features(&mut self) -> VhostUserResult<VhostUserProtocolFeatures> {
505 Ok(self.backend.protocol_features())
506 }
507
508 fn set_protocol_features(&mut self, features: u64) -> VhostUserResult<()> {
509 self.acked_protocol_features = features;
513 Ok(())
514 }
515
516 fn get_queue_num(&mut self) -> VhostUserResult<u64> {
517 Ok(self.num_queues as u64)
518 }
519
520 fn set_vring_enable(&mut self, index: u32, enable: bool) -> VhostUserResult<()> {
521 self.check_feature(VhostUserVirtioFeatures::PROTOCOL_FEATURES)?;
524
525 let vring = self
526 .vrings
527 .get(index as usize)
528 .ok_or(VhostUserError::InvalidParam)?;
529
530 vring.set_enabled(enable);
535
536 Ok(())
537 }
538
539 fn get_config(
540 &mut self,
541 offset: u32,
542 size: u32,
543 _flags: VhostUserConfigFlags,
544 ) -> VhostUserResult<Vec<u8>> {
545 Ok(self.backend.get_config(offset, size))
546 }
547
548 fn set_config(
549 &mut self,
550 offset: u32,
551 buf: &[u8],
552 _flags: VhostUserConfigFlags,
553 ) -> VhostUserResult<()> {
554 self.backend
555 .set_config(offset, buf)
556 .map_err(VhostUserError::ReqHandlerError)
557 }
558
559 fn set_backend_req_fd(&mut self, backend: Backend) {
560 if self.acked_protocol_features & VhostUserProtocolFeatures::REPLY_ACK.bits() != 0 {
561 backend.set_reply_ack_flag(true);
562 }
563 if self.acked_protocol_features & VhostUserProtocolFeatures::SHARED_OBJECT.bits() != 0 {
564 backend.set_shared_object_flag(true);
565 }
566 self.backend.set_backend_req_fd(backend);
567 }
568
569 fn set_gpu_socket(&mut self, gpu_backend: GpuBackend) -> VhostUserResult<()> {
570 self.backend
571 .set_gpu_socket(gpu_backend)
572 .map_err(VhostUserError::ReqHandlerError)
573 }
574
575 fn get_shared_object(&mut self, uuid: VhostUserSharedMsg) -> VhostUserResult<File> {
576 match self.backend.get_shared_object(uuid) {
577 Ok(shared_file) => Ok(shared_file),
578 Err(e) => Err(VhostUserError::ReqHandlerError(io::Error::new(
579 io::ErrorKind::Other,
580 e,
581 ))),
582 }
583 }
584
585 fn get_inflight_fd(
586 &mut self,
587 _inflight: &vhost::vhost_user::message::VhostUserInflight,
588 ) -> VhostUserResult<(vhost::vhost_user::message::VhostUserInflight, File)> {
589 Err(VhostUserError::InvalidOperation("not supported"))
593 }
594
595 fn set_inflight_fd(
596 &mut self,
597 _inflight: &vhost::vhost_user::message::VhostUserInflight,
598 _file: File,
599 ) -> VhostUserResult<()> {
600 Err(VhostUserError::InvalidOperation("not supported"))
601 }
602
603 fn get_max_mem_slots(&mut self) -> VhostUserResult<u64> {
604 Ok(MAX_MEM_SLOTS)
605 }
606
607 fn add_mem_region(
608 &mut self,
609 region: &VhostUserSingleMemoryRegion,
610 file: File,
611 ) -> VhostUserResult<()> {
612 let guest_region = Arc::new(
613 GuestRegionMmap::new(
614 region.mmap_region(file)?,
615 GuestAddress(region.guest_phys_addr),
616 )
617 .map_err(|e| {
618 VhostUserError::ReqHandlerError(io::Error::new(io::ErrorKind::Other, e))
619 })?,
620 );
621
622 let addr_mapping = AddrMapping {
623 #[cfg(feature = "postcopy")]
624 local_addr: guest_region.as_ptr() as u64,
625 vmm_addr: region.user_addr,
626 size: region.memory_size,
627 gpa_base: region.guest_phys_addr,
628 };
629
630 let mem = self
631 .atomic_mem
632 .memory()
633 .insert_region(guest_region)
634 .map_err(|e| {
635 VhostUserError::ReqHandlerError(io::Error::new(io::ErrorKind::Other, e))
636 })?;
637
638 self.atomic_mem.lock().unwrap().replace(mem);
639
640 self.backend
641 .update_memory(self.atomic_mem.clone())
642 .map_err(|e| {
643 VhostUserError::ReqHandlerError(io::Error::new(io::ErrorKind::Other, e))
644 })?;
645
646 self.mappings.push(addr_mapping);
647
648 Ok(())
649 }
650
651 fn remove_mem_region(&mut self, region: &VhostUserSingleMemoryRegion) -> VhostUserResult<()> {
652 let (mem, _) = self
653 .atomic_mem
654 .memory()
655 .remove_region(GuestAddress(region.guest_phys_addr), region.memory_size)
656 .map_err(|e| {
657 VhostUserError::ReqHandlerError(io::Error::new(io::ErrorKind::Other, e))
658 })?;
659
660 self.atomic_mem.lock().unwrap().replace(mem);
661
662 self.backend
663 .update_memory(self.atomic_mem.clone())
664 .map_err(|e| {
665 VhostUserError::ReqHandlerError(io::Error::new(io::ErrorKind::Other, e))
666 })?;
667
668 self.mappings
669 .retain(|mapping| mapping.gpa_base != region.guest_phys_addr);
670
671 Ok(())
672 }
673
674 fn set_device_state_fd(
675 &mut self,
676 direction: VhostTransferStateDirection,
677 phase: VhostTransferStatePhase,
678 file: File,
679 ) -> VhostUserResult<Option<File>> {
680 self.backend
681 .set_device_state_fd(direction, phase, file)
682 .map_err(VhostUserError::ReqHandlerError)
683 }
684
685 fn check_device_state(&mut self) -> VhostUserResult<()> {
686 self.backend
687 .check_device_state()
688 .map_err(VhostUserError::ReqHandlerError)
689 }
690
691 #[cfg(feature = "postcopy")]
692 fn postcopy_advice(&mut self) -> VhostUserResult<File> {
693 let mut uffd_builder = UffdBuilder::new();
694
695 let uffd = uffd_builder
696 .close_on_exec(true)
697 .non_blocking(true)
698 .user_mode_only(false)
699 .create()
700 .map_err(|e| {
701 VhostUserError::ReqHandlerError(io::Error::new(io::ErrorKind::Other, e))
702 })?;
703
704 let uffd_dup = unsafe { libc::dup(uffd.as_raw_fd()) };
712 if uffd_dup < 0 {
713 return Err(VhostUserError::ReqHandlerError(io::Error::last_os_error()));
714 }
715
716 let uffd_file = unsafe { File::from_raw_fd(uffd_dup) };
719
720 self.uffd = Some(uffd);
721
722 Ok(uffd_file)
723 }
724
725 #[cfg(feature = "postcopy")]
726 fn postcopy_listen(&mut self) -> VhostUserResult<()> {
727 let Some(ref uffd) = self.uffd else {
728 return Err(VhostUserError::ReqHandlerError(io::Error::new(
729 io::ErrorKind::Other,
730 "No registered UFFD handler",
731 )));
732 };
733
734 for mapping in self.mappings.iter() {
735 uffd.register(
736 mapping.local_addr as *mut libc::c_void,
737 mapping.size as usize,
738 )
739 .map_err(|e| {
740 VhostUserError::ReqHandlerError(io::Error::new(io::ErrorKind::Other, e))
741 })?;
742 }
743
744 Ok(())
745 }
746
747 #[cfg(feature = "postcopy")]
748 fn postcopy_end(&mut self) -> VhostUserResult<()> {
749 self.uffd = None;
750 Ok(())
751 }
752
753 fn set_log_base(&mut self, log: &VhostUserLog, file: File) -> VhostUserResult<()> {
768 let mem = self.atomic_mem.memory();
769
770 let logmem = Arc::new(
771 MmapLogReg::from_file(file.as_fd(), log.mmap_offset, log.mmap_size)
772 .map_err(VhostUserError::ReqHandlerError)?,
773 );
774
775 let mut bitmaps = Vec::new();
777 for region in mem.iter() {
778 let bitmap = <<T as VhostUserBackend>::Bitmap as BitmapReplace>::InnerBitmap::new(
779 region,
780 Arc::clone(&logmem),
781 )
782 .map_err(VhostUserError::ReqHandlerError)?;
783
784 bitmaps.push((region, bitmap));
785 }
786
787 for (region, bitmap) in bitmaps {
788 (*region).bitmap().replace(bitmap);
789 }
790
791 Ok(())
792 }
793}
794
795impl<T: VhostUserBackend> Drop for VhostUserHandler<T> {
796 fn drop(&mut self) {
797 self.send_exit_event();
799
800 for thread in self.worker_threads.drain(..) {
801 if let Err(e) = thread.join() {
802 error!("Error in vring worker: {:?}", e);
803 }
804 }
805 }
806}