1use crate::cost_model::transferred_byte_cycles;
2use crate::syscalls::{
3 EXEC_LOAD_ELF_V2_CYCLES_BASE, INVALID_FD, MAX_FDS_CREATED, MAX_VMS_SPAWNED, OTHER_END_CLOSED,
4 SPAWN_EXTRA_CYCLES_BASE, SUCCESS, WAIT_FAILURE,
5};
6
7use crate::types::{
8 DataLocation, DataPieceId, FIRST_FD_SLOT, FIRST_VM_ID, Fd, FdArgs, FullSuspendedState,
9 IterationResult, Message, ReadState, RunMode, SgData, SyscallGenerator, TerminatedResult,
10 VmArgs, VmContext, VmId, VmState, WriteState,
11};
12use ckb_traits::{CellDataProvider, ExtensionProvider, HeaderProvider};
13use ckb_types::core::Cycle;
14use ckb_vm::snapshot2::Snapshot2Context;
15use ckb_vm::{
16 Error, FlattenedArgsReader, Register,
17 bytes::Bytes,
18 cost_model::estimate_cycles,
19 elf::parse_elf,
20 machine::{CoreMachine, DefaultMachineBuilder, DefaultMachineRunner, Pause, SupportMachine},
21 memory::Memory,
22 registers::A0,
23 snapshot2::Snapshot2,
24};
25use std::collections::{BTreeMap, HashMap};
26use std::sync::{
27 Arc, Mutex,
28 atomic::{AtomicU64, Ordering},
29};
30
31pub const ROOT_VM_ID: VmId = FIRST_VM_ID;
33pub const MAX_VMS_COUNT: u64 = 16;
35pub const MAX_INSTANTIATED_VMS: usize = 4;
37pub const MAX_FDS: u64 = 64;
39
40pub struct Scheduler<DL, V, M>
47where
48 DL: CellDataProvider,
49 M: DefaultMachineRunner,
50{
51 sg_data: SgData<DL>,
53
54 syscall_generator: SyscallGenerator<DL, V, M::Inner>,
56 syscall_context: V,
58
59 total_cycles: Arc<AtomicU64>,
96 iteration_cycles: Cycle,
98 next_vm_id: VmId,
100 next_fd_slot: u64,
102 states: BTreeMap<VmId, VmState>,
104 fds: BTreeMap<Fd, VmId>,
106 inherited_fd: BTreeMap<VmId, Vec<Fd>>,
108 instantiated: BTreeMap<VmId, (VmContext<DL>, M)>,
110 suspended: BTreeMap<VmId, Snapshot2<DataPieceId>>,
112 terminated_vms: BTreeMap<VmId, i8>,
114
115 message_box: Arc<Mutex<Vec<Message>>>,
118}
119
120impl<DL, V, M> Scheduler<DL, V, M>
121where
122 DL: CellDataProvider + HeaderProvider + ExtensionProvider + Clone,
123 V: Clone,
124 M: DefaultMachineRunner,
125{
126 pub fn new(
128 sg_data: SgData<DL>,
129 syscall_generator: SyscallGenerator<DL, V, M::Inner>,
130 syscall_context: V,
131 ) -> Self {
132 Self {
133 sg_data,
134 syscall_generator,
135 syscall_context,
136 total_cycles: Arc::new(AtomicU64::new(0)),
137 iteration_cycles: 0,
138 next_vm_id: FIRST_VM_ID,
139 next_fd_slot: FIRST_FD_SLOT,
140 states: BTreeMap::default(),
141 fds: BTreeMap::default(),
142 inherited_fd: BTreeMap::default(),
143 instantiated: BTreeMap::default(),
144 suspended: BTreeMap::default(),
145 message_box: Arc::new(Mutex::new(Vec::new())),
146 terminated_vms: BTreeMap::default(),
147 }
148 }
149
150 pub fn consumed_cycles(&self) -> Cycle {
152 self.total_cycles.load(Ordering::Acquire)
153 }
154
155 pub fn state(&self, vm_id: &VmId) -> Option<VmState> {
157 self.states.get(vm_id).cloned()
158 }
159
160 pub fn sg_data(&self) -> &SgData<DL> {
162 &self.sg_data
163 }
164
165 pub fn peek<F, G, W>(&mut self, vm_id: &VmId, mut f: F, mut g: G) -> Result<W, Error>
174 where
175 F: FnMut(&mut M) -> Result<W, Error>,
176 G: FnMut(&Snapshot2<DataPieceId>, &SgData<DL>) -> Result<W, Error>,
177 {
178 if let Some((_, machine)) = self.instantiated.get_mut(vm_id) {
179 return f(machine);
180 }
181 if let Some(snapshot) = self.suspended.get(vm_id) {
182 return g(snapshot, &self.sg_data);
183 }
184 Err(Error::Unexpected(format!("VM {} does not exist!", vm_id)))
185 }
186
187 fn consume_cycles(&mut self, cycles: Cycle) -> Result<(), Error> {
189 match self
190 .total_cycles
191 .fetch_update(Ordering::AcqRel, Ordering::Acquire, |total_cycles| {
192 total_cycles.checked_add(cycles)
193 }) {
194 Ok(_) => Ok(()),
195 Err(_) => Err(Error::CyclesExceeded),
196 }
197 }
198
199 pub fn resume(
201 sg_data: SgData<DL>,
202 syscall_generator: SyscallGenerator<DL, V, M::Inner>,
203 syscall_context: V,
204 full: FullSuspendedState,
205 ) -> Self {
206 let mut scheduler = Self {
207 sg_data,
208 syscall_generator,
209 syscall_context,
210 total_cycles: Arc::new(AtomicU64::new(full.total_cycles)),
211 iteration_cycles: full.iteration_cycles,
212 next_vm_id: full.next_vm_id,
213 next_fd_slot: full.next_fd_slot,
214 states: full
215 .vms
216 .iter()
217 .map(|(id, state, _)| (*id, state.clone()))
218 .collect(),
219 fds: full.fds.into_iter().collect(),
220 inherited_fd: full.inherited_fd.into_iter().collect(),
221 instantiated: BTreeMap::default(),
222 suspended: full
223 .vms
224 .into_iter()
225 .map(|(id, _, snapshot)| (id, snapshot))
226 .collect(),
227 message_box: Arc::new(Mutex::new(Vec::new())),
228 terminated_vms: full.terminated_vms.into_iter().collect(),
229 };
230 scheduler
231 .ensure_vms_instantiated(&full.instantiated_ids)
232 .unwrap();
233 scheduler.iteration_cycles = 0;
237 scheduler
238 }
239
240 pub fn suspend(mut self) -> Result<FullSuspendedState, Error> {
242 assert!(self.message_box.lock().expect("lock").is_empty());
243 let mut vms = Vec::with_capacity(self.states.len());
244 let instantiated_ids: Vec<_> = self.instantiated.keys().cloned().collect();
245 for id in &instantiated_ids {
246 self.suspend_vm(id)?;
247 }
248 for (id, state) in self.states {
249 let snapshot = self
250 .suspended
251 .remove(&id)
252 .ok_or_else(|| Error::Unexpected("Unable to find VM Id".to_string()))?;
253 vms.push((id, state, snapshot));
254 }
255 Ok(FullSuspendedState {
256 total_cycles: self.total_cycles.load(Ordering::Acquire),
261 iteration_cycles: self.iteration_cycles,
262 next_vm_id: self.next_vm_id,
263 next_fd_slot: self.next_fd_slot,
264 vms,
265 fds: self.fds.into_iter().collect(),
266 inherited_fd: self.inherited_fd.into_iter().collect(),
267 terminated_vms: self.terminated_vms.into_iter().collect(),
268 instantiated_ids,
269 })
270 }
271
272 pub fn run(&mut self, mode: RunMode) -> Result<TerminatedResult, Error> {
286 self.boot_root_vm_if_needed()?;
287
288 let (pause, mut limit_cycles) = match mode {
289 RunMode::LimitCycles(limit_cycles) => (Pause::new(), limit_cycles),
290 RunMode::Pause(pause, limit_cycles) => (pause, limit_cycles),
291 };
292
293 while !self.terminated() {
294 limit_cycles = self.iterate_outer(&pause, limit_cycles)?.1;
295 }
296 assert_eq!(self.iteration_cycles, 0);
297
298 self.terminated_result()
299 }
300
301 pub fn iterate(&mut self) -> Result<IterationResult, Error> {
305 self.boot_root_vm_if_needed()?;
306
307 if self.terminated() {
308 return Ok(IterationResult {
309 executed_vm: ROOT_VM_ID,
310 terminated_status: Some(self.terminated_result()?),
311 });
312 }
313
314 let (id, _) = self.iterate_outer(&Pause::new(), u64::MAX)?;
315 let terminated_status = if self.terminated() {
316 assert_eq!(self.iteration_cycles, 0);
317 Some(self.terminated_result()?)
318 } else {
319 None
320 };
321
322 Ok(IterationResult {
323 executed_vm: id,
324 terminated_status,
325 })
326 }
327
328 fn iterate_prepare_machine(&mut self) -> Result<(u64, &mut M), Error> {
330 let vm_id_to_run = self
332 .states
333 .iter()
334 .rev()
335 .filter(|(_, state)| matches!(state, VmState::Runnable))
336 .map(|(id, _)| *id)
337 .next();
338 let vm_id_to_run = vm_id_to_run.ok_or_else(|| {
339 Error::Unexpected("A deadlock situation has been reached!".to_string())
340 })?;
341 let (_context, machine) = self.ensure_get_instantiated(&vm_id_to_run)?;
342 Ok((vm_id_to_run, machine))
343 }
344
345 fn iterate_process_results(
347 &mut self,
348 vm_id_to_run: u64,
349 result: Result<i8, Error>,
350 ) -> Result<(), Error> {
351 self.process_message_box()?;
353 assert!(self.message_box.lock().expect("lock").is_empty());
354 let result = match result {
356 Ok(code) => {
357 self.terminated_vms.insert(vm_id_to_run, code);
358 if vm_id_to_run == ROOT_VM_ID {
362 self.ensure_vms_instantiated(&[vm_id_to_run])?;
363 self.instantiated.retain(|id, _| *id == vm_id_to_run);
364 self.suspended.clear();
365 self.states.clear();
366 self.states.insert(vm_id_to_run, VmState::Terminated);
367 } else {
368 let joining_vms: Vec<(VmId, u64)> = self
369 .states
370 .iter()
371 .filter_map(|(vm_id, state)| match state {
372 VmState::Wait {
373 target_vm_id,
374 exit_code_addr,
375 } if *target_vm_id == vm_id_to_run => Some((*vm_id, *exit_code_addr)),
376 _ => None,
377 })
378 .collect();
379 for (vm_id, exit_code_addr) in joining_vms {
382 let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
383 machine
384 .inner_mut()
385 .memory_mut()
386 .store8(&Self::u64_to_reg(exit_code_addr), &Self::i8_to_reg(code))?;
387 machine
388 .inner_mut()
389 .set_register(A0, Self::u8_to_reg(SUCCESS));
390 self.states.insert(vm_id, VmState::Runnable);
391 }
392 self.fds.retain(|_, vm_id| *vm_id != vm_id_to_run);
394 self.states.remove(&vm_id_to_run);
396 self.instantiated.remove(&vm_id_to_run);
397 self.suspended.remove(&vm_id_to_run);
398 }
399 Ok(())
400 }
401 Err(Error::Yield) => Ok(()),
402 Err(e) => Err(e),
403 };
404 result
405 }
406
407 #[inline]
411 fn iterate_outer(
412 &mut self,
413 pause: &Pause,
414 limit_cycles: Cycle,
415 ) -> Result<(VmId, Cycle), Error> {
416 let iterate_return = self.iterate_inner(pause.clone(), limit_cycles);
417 self.consume_cycles(self.iteration_cycles)?;
418 let remaining_cycles = limit_cycles
419 .checked_sub(self.iteration_cycles)
420 .ok_or(Error::CyclesExceeded)?;
421 self.iteration_cycles = 0;
423 self.process_io()?;
448 let id = iterate_return?;
449 Ok((id, remaining_cycles))
450 }
451
452 fn iterate_inner(&mut self, pause: Pause, limit_cycles: Cycle) -> Result<VmId, Error> {
456 let (id, result, cycles) = {
460 let (id, vm) = self.iterate_prepare_machine()?;
461 vm.inner_mut().set_max_cycles(limit_cycles);
462 vm.machine_mut().set_pause(pause);
463 let result = vm.run();
464 let cycles = vm.machine().cycles();
465 vm.inner_mut().set_cycles(0);
466 (id, result, cycles)
467 };
468 self.iteration_cycles = self
469 .iteration_cycles
470 .checked_add(cycles)
471 .ok_or(Error::CyclesExceeded)?;
472 self.iterate_process_results(id, result)?;
473 Ok(id)
474 }
475
476 fn process_message_box(&mut self) -> Result<(), Error> {
477 let messages: Vec<Message> = self.message_box.lock().expect("lock").drain(..).collect();
478 for message in messages {
479 match message {
480 Message::ExecV2(vm_id, args) => {
481 let (old_context, old_machine) = self
482 .instantiated
483 .get_mut(&vm_id)
484 .ok_or_else(|| Error::Unexpected("Unable to find VM Id".to_string()))?;
485 old_machine
486 .inner_mut()
487 .add_cycles_no_checking(EXEC_LOAD_ELF_V2_CYCLES_BASE)?;
488 let old_cycles = old_machine.machine().cycles();
489 let max_cycles = old_machine.machine().max_cycles();
490 let program = {
491 let sc = old_context.snapshot2_context.lock().expect("lock");
492 sc.load_data(
493 &args.location.data_piece_id,
494 args.location.offset,
495 args.location.length,
496 )?
497 .0
498 };
499 let (context, mut new_machine) = self.create_dummy_vm(&vm_id)?;
500 new_machine.inner_mut().set_max_cycles(max_cycles);
501 new_machine.inner_mut().add_cycles_no_checking(old_cycles)?;
502 self.load_vm_program(
503 &context,
504 &mut new_machine,
505 &args.location,
506 program,
507 VmArgs::Reader {
508 vm_id,
509 argc: args.argc,
510 argv: args.argv,
511 },
512 )?;
513 debug_assert!(self.instantiated.contains_key(&vm_id));
515 self.instantiated.insert(vm_id, (context, new_machine));
516 }
517 Message::Spawn(vm_id, args) => {
518 if args.fds.iter().any(|fd| self.fds.get(fd) != Some(&vm_id)) {
520 let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
521 machine
522 .inner_mut()
523 .set_register(A0, Self::u8_to_reg(INVALID_FD));
524 continue;
525 }
526 if self.suspended.len() + self.instantiated.len() > MAX_VMS_COUNT as usize {
527 let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
528 machine
529 .inner_mut()
530 .set_register(A0, Self::u8_to_reg(MAX_VMS_SPAWNED));
531 continue;
532 }
533 let spawned_vm_id = self.boot_vm(
534 &args.location,
535 VmArgs::Reader {
536 vm_id,
537 argc: args.argc,
538 argv: args.argv,
539 },
540 )?;
541 for fd in &args.fds {
543 self.fds.insert(*fd, spawned_vm_id);
544 }
545 self.inherited_fd.insert(spawned_vm_id, args.fds.clone());
548
549 let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
550 machine.inner_mut().memory_mut().store64(
551 &Self::u64_to_reg(args.process_id_addr),
552 &Self::u64_to_reg(spawned_vm_id),
553 )?;
554 machine
555 .inner_mut()
556 .set_register(A0, Self::u8_to_reg(SUCCESS));
557 }
558 Message::Wait(vm_id, args) => {
559 if let Some(exit_code) = self.terminated_vms.get(&args.target_id).copied() {
560 let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
561 machine.inner_mut().memory_mut().store8(
562 &Self::u64_to_reg(args.exit_code_addr),
563 &Self::i8_to_reg(exit_code),
564 )?;
565 machine
566 .inner_mut()
567 .set_register(A0, Self::u8_to_reg(SUCCESS));
568 self.states.insert(vm_id, VmState::Runnable);
569 self.terminated_vms.retain(|id, _| id != &args.target_id);
570 continue;
571 }
572 if !self.states.contains_key(&args.target_id) {
573 let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
574 machine
575 .inner_mut()
576 .set_register(A0, Self::u8_to_reg(WAIT_FAILURE));
577 continue;
578 }
579 self.states.insert(
581 vm_id,
582 VmState::Wait {
583 target_vm_id: args.target_id,
584 exit_code_addr: args.exit_code_addr,
585 },
586 );
587 }
588 Message::Pipe(vm_id, args) => {
589 if self.fds.len() as u64 >= MAX_FDS {
590 let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
591 machine
592 .inner_mut()
593 .set_register(A0, Self::u8_to_reg(MAX_FDS_CREATED));
594 continue;
595 }
596 let (p1, p2, slot) = Fd::create(self.next_fd_slot);
597 self.next_fd_slot = slot;
598 self.fds.insert(p1, vm_id);
599 self.fds.insert(p2, vm_id);
600 let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
601 machine
602 .inner_mut()
603 .memory_mut()
604 .store64(&Self::u64_to_reg(args.fd1_addr), &Self::u64_to_reg(p1.0))?;
605 machine
606 .inner_mut()
607 .memory_mut()
608 .store64(&Self::u64_to_reg(args.fd2_addr), &Self::u64_to_reg(p2.0))?;
609 machine
610 .inner_mut()
611 .set_register(A0, Self::u8_to_reg(SUCCESS));
612 }
613 Message::FdRead(vm_id, args) => {
614 if !(self.fds.contains_key(&args.fd) && (self.fds[&args.fd] == vm_id)) {
615 let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
616 machine
617 .inner_mut()
618 .set_register(A0, Self::u8_to_reg(INVALID_FD));
619 continue;
620 }
621 if !self.fds.contains_key(&args.fd.other_fd()) {
622 let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
623 machine
624 .inner_mut()
625 .set_register(A0, Self::u8_to_reg(OTHER_END_CLOSED));
626 continue;
627 }
628 self.states.insert(
630 vm_id,
631 VmState::WaitForRead(ReadState {
632 fd: args.fd,
633 length: args.length,
634 buffer_addr: args.buffer_addr,
635 length_addr: args.length_addr,
636 }),
637 );
638 }
639 Message::FdWrite(vm_id, args) => {
640 if !(self.fds.contains_key(&args.fd) && (self.fds[&args.fd] == vm_id)) {
641 let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
642 machine
643 .inner_mut()
644 .set_register(A0, Self::u8_to_reg(INVALID_FD));
645 continue;
646 }
647 if !self.fds.contains_key(&args.fd.other_fd()) {
648 let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
649 machine
650 .inner_mut()
651 .set_register(A0, Self::u8_to_reg(OTHER_END_CLOSED));
652 continue;
653 }
654 self.states.insert(
656 vm_id,
657 VmState::WaitForWrite(WriteState {
658 fd: args.fd,
659 consumed: 0,
660 length: args.length,
661 buffer_addr: args.buffer_addr,
662 length_addr: args.length_addr,
663 }),
664 );
665 }
666 Message::InheritedFileDescriptor(vm_id, args) => {
667 let inherited_fd = if vm_id == ROOT_VM_ID {
668 Vec::new()
669 } else {
670 self.inherited_fd[&vm_id].clone()
671 };
672 let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
673 let FdArgs {
674 buffer_addr,
675 length_addr,
676 ..
677 } = args;
678 let full_length = machine
679 .inner_mut()
680 .memory_mut()
681 .load64(&Self::u64_to_reg(length_addr))?
682 .to_u64();
683 let real_length = inherited_fd.len() as u64;
684 let copy_length = u64::min(full_length, real_length);
685 for i in 0..copy_length {
686 let fd = inherited_fd[i as usize].0;
687 let addr = buffer_addr.checked_add(i * 8).ok_or(Error::MemOutOfBound)?;
688 machine
689 .inner_mut()
690 .memory_mut()
691 .store64(&Self::u64_to_reg(addr), &Self::u64_to_reg(fd))?;
692 }
693 machine.inner_mut().memory_mut().store64(
694 &Self::u64_to_reg(length_addr),
695 &Self::u64_to_reg(real_length),
696 )?;
697 machine
698 .inner_mut()
699 .set_register(A0, Self::u8_to_reg(SUCCESS));
700 }
701 Message::Close(vm_id, fd) => {
702 if self.fds.get(&fd) != Some(&vm_id) {
703 let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
704 machine
705 .inner_mut()
706 .set_register(A0, Self::u8_to_reg(INVALID_FD));
707 } else {
708 self.fds.remove(&fd);
709 let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
710 machine
711 .inner_mut()
712 .set_register(A0, Self::u8_to_reg(SUCCESS));
713 }
714 }
715 }
716 }
717 Ok(())
718 }
719
720 fn process_io(&mut self) -> Result<(), Error> {
721 let mut reads: HashMap<Fd, (VmId, ReadState)> = HashMap::default();
722 let mut closed_fds: Vec<VmId> = Vec::new();
723
724 self.states.iter().for_each(|(vm_id, state)| {
725 if let VmState::WaitForRead(inner_state) = state {
726 if self.fds.contains_key(&inner_state.fd.other_fd()) {
727 reads.insert(inner_state.fd, (*vm_id, inner_state.clone()));
728 } else {
729 closed_fds.push(*vm_id);
730 }
731 }
732 });
733 let mut pairs: Vec<(VmId, ReadState, VmId, WriteState)> = Vec::new();
734 self.states.iter().for_each(|(vm_id, state)| {
735 if let VmState::WaitForWrite(inner_state) = state {
736 if self.fds.contains_key(&inner_state.fd.other_fd()) {
737 if let Some((read_vm_id, read_state)) = reads.get(&inner_state.fd.other_fd()) {
738 pairs.push((*read_vm_id, read_state.clone(), *vm_id, inner_state.clone()));
739 }
740 } else {
741 closed_fds.push(*vm_id);
742 }
743 }
744 });
745 for vm_id in closed_fds {
747 match self.states[&vm_id].clone() {
748 VmState::WaitForRead(ReadState { length_addr, .. }) => {
749 let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
750 machine.inner_mut().memory_mut().store64(
751 &Self::u64_to_reg(length_addr),
752 &<M::Inner as CoreMachine>::REG::zero(),
753 )?;
754 machine
755 .inner_mut()
756 .set_register(A0, Self::u8_to_reg(SUCCESS));
757 self.states.insert(vm_id, VmState::Runnable);
758 }
759 VmState::WaitForWrite(WriteState {
760 consumed,
761 length_addr,
762 ..
763 }) => {
764 let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
765 machine
766 .inner_mut()
767 .memory_mut()
768 .store64(&Self::u64_to_reg(length_addr), &Self::u64_to_reg(consumed))?;
769 machine
770 .inner_mut()
771 .set_register(A0, Self::u8_to_reg(SUCCESS));
772 self.states.insert(vm_id, VmState::Runnable);
773 }
774 _ => (),
775 }
776 }
777 for (read_vm_id, read_state, write_vm_id, write_state) in pairs {
779 let ReadState {
780 length: read_length,
781 buffer_addr: read_buffer_addr,
782 length_addr: read_length_addr,
783 ..
784 } = read_state;
785 let WriteState {
786 fd: write_fd,
787 mut consumed,
788 length: write_length,
789 buffer_addr: write_buffer_addr,
790 length_addr: write_length_addr,
791 } = write_state;
792
793 self.ensure_vms_instantiated(&[read_vm_id, write_vm_id])?;
794 {
795 let fillable = read_length;
796 let consumable = write_length - consumed;
797 let copiable = std::cmp::min(fillable, consumable);
798
799 let (_, write_machine) = self
801 .instantiated
802 .get_mut(&write_vm_id)
803 .ok_or_else(|| Error::Unexpected("Unable to find VM Id".to_string()))?;
804 write_machine
805 .inner_mut()
806 .add_cycles_no_checking(transferred_byte_cycles(copiable))?;
807 let data = write_machine
808 .inner_mut()
809 .memory_mut()
810 .load_bytes(write_buffer_addr.wrapping_add(consumed), copiable)?;
811 let (_, read_machine) = self
812 .instantiated
813 .get_mut(&read_vm_id)
814 .ok_or_else(|| Error::Unexpected("Unable to find VM Id".to_string()))?;
815 read_machine
816 .inner_mut()
817 .add_cycles_no_checking(transferred_byte_cycles(copiable))?;
818 read_machine
819 .inner_mut()
820 .memory_mut()
821 .store_bytes(read_buffer_addr, &data)?;
822 read_machine.inner_mut().memory_mut().store64(
824 &Self::u64_to_reg(read_length_addr),
825 &Self::u64_to_reg(copiable),
826 )?;
827 read_machine
828 .inner_mut()
829 .set_register(A0, Self::u8_to_reg(SUCCESS));
830 self.states.insert(read_vm_id, VmState::Runnable);
831
832 consumed += copiable;
835 if consumed == write_length {
836 let (_, write_machine) = self
838 .instantiated
839 .get_mut(&write_vm_id)
840 .ok_or_else(|| Error::Unexpected("Unable to find VM Id".to_string()))?;
841 write_machine.inner_mut().memory_mut().store64(
842 &Self::u64_to_reg(write_length_addr),
843 &Self::u64_to_reg(write_length),
844 )?;
845 write_machine
846 .inner_mut()
847 .set_register(A0, Self::u8_to_reg(SUCCESS));
848 self.states.insert(write_vm_id, VmState::Runnable);
849 } else {
850 self.states.insert(
852 write_vm_id,
853 VmState::WaitForWrite(WriteState {
854 fd: write_fd,
855 consumed,
856 length: write_length,
857 buffer_addr: write_buffer_addr,
858 length_addr: write_length_addr,
859 }),
860 );
861 }
862 }
863 }
864 Ok(())
865 }
866
867 pub fn terminated(&self) -> bool {
869 self.states
870 .get(&ROOT_VM_ID)
871 .map(|state| *state == VmState::Terminated)
872 .unwrap_or(false)
873 }
874
875 fn terminated_result(&mut self) -> Result<TerminatedResult, Error> {
876 assert!(self.terminated());
877
878 let exit_code = {
879 let root_vm = &self.ensure_get_instantiated(&ROOT_VM_ID)?.1;
880 root_vm.machine().exit_code()
881 };
882 Ok(TerminatedResult {
883 exit_code,
884 consumed_cycles: self.consumed_cycles(),
885 })
886 }
887
888 fn ensure_vms_instantiated(&mut self, ids: &[VmId]) -> Result<(), Error> {
890 if ids.len() > MAX_INSTANTIATED_VMS {
891 return Err(Error::Unexpected(format!(
892 "At most {} VMs can be instantiated but {} are requested!",
893 MAX_INSTANTIATED_VMS,
894 ids.len()
895 )));
896 }
897
898 let mut uninstantiated_ids: Vec<VmId> = ids
899 .iter()
900 .filter(|id| !self.instantiated.contains_key(id))
901 .copied()
902 .collect();
903 while (!uninstantiated_ids.is_empty()) && (self.instantiated.len() < MAX_INSTANTIATED_VMS) {
904 let id = uninstantiated_ids
905 .pop()
906 .ok_or_else(|| Error::Unexpected("Map should not be empty".to_string()))?;
907 self.resume_vm(&id)?;
908 }
909
910 if !uninstantiated_ids.is_empty() {
911 let suspendable_ids: Vec<VmId> = self
913 .instantiated
914 .keys()
915 .filter(|id| !ids.contains(id))
916 .copied()
917 .collect();
918
919 assert!(suspendable_ids.len() >= uninstantiated_ids.len());
920 for i in 0..uninstantiated_ids.len() {
921 self.suspend_vm(&suspendable_ids[i])?;
922 self.resume_vm(&uninstantiated_ids[i])?;
923 }
924 }
925
926 Ok(())
927 }
928
929 fn ensure_get_instantiated(&mut self, id: &VmId) -> Result<&mut (VmContext<DL>, M), Error> {
931 self.ensure_vms_instantiated(&[*id])?;
932 self.instantiated
933 .get_mut(id)
934 .ok_or_else(|| Error::Unexpected("Unable to find VM Id".to_string()))
935 }
936
937 fn resume_vm(&mut self, id: &VmId) -> Result<(), Error> {
939 if !self.suspended.contains_key(id) {
940 return Err(Error::Unexpected(format!("VM {:?} is not suspended!", id)));
941 }
942 let snapshot = &self.suspended[id];
943 self.iteration_cycles = self
944 .iteration_cycles
945 .checked_add(SPAWN_EXTRA_CYCLES_BASE)
946 .ok_or(Error::CyclesExceeded)?;
947 let (context, mut machine) = self.create_dummy_vm(id)?;
948 {
949 let mut sc = context.snapshot2_context.lock().expect("lock");
950 sc.resume(machine.inner_mut(), snapshot)?;
951 }
952 self.instantiated.insert(*id, (context, machine));
953 self.suspended.remove(id);
954 Ok(())
955 }
956
957 fn suspend_vm(&mut self, id: &VmId) -> Result<(), Error> {
959 if !self.instantiated.contains_key(id) {
960 return Err(Error::Unexpected(format!(
961 "VM {:?} is not instantiated!",
962 id
963 )));
964 }
965 self.iteration_cycles = self
966 .iteration_cycles
967 .checked_add(SPAWN_EXTRA_CYCLES_BASE)
968 .ok_or(Error::CyclesExceeded)?;
969 let (context, machine) = self
970 .instantiated
971 .get_mut(id)
972 .ok_or_else(|| Error::Unexpected("Unable to find VM Id".to_string()))?;
973 let snapshot = {
974 let sc = context.snapshot2_context.lock().expect("lock");
975 sc.make_snapshot(machine.inner_mut())?
976 };
977 self.suspended.insert(*id, snapshot);
978 self.instantiated.remove(id);
979 Ok(())
980 }
981
982 fn boot_root_vm_if_needed(&mut self) -> Result<(), Error> {
983 if self.states.is_empty() {
984 let program_id = self.sg_data.sg_info.program_data_piece_id.clone();
986 assert_eq!(
987 self.boot_vm(
988 &DataLocation {
989 data_piece_id: program_id,
990 offset: 0,
991 length: u64::MAX,
992 },
993 VmArgs::Vector(vec![]),
994 )?,
995 ROOT_VM_ID
996 );
997 }
998 assert!(self.states.contains_key(&ROOT_VM_ID));
999
1000 Ok(())
1001 }
1002
1003 fn boot_vm(&mut self, location: &DataLocation, args: VmArgs) -> Result<VmId, Error> {
1005 let id = self.next_vm_id;
1006 self.next_vm_id += 1;
1007 let (context, mut machine) = self.create_dummy_vm(&id)?;
1008 let (program, _) = {
1009 let sc = context.snapshot2_context.lock().expect("lock");
1010 sc.load_data(&location.data_piece_id, location.offset, location.length)?
1011 };
1012 self.load_vm_program(&context, &mut machine, location, program, args)?;
1013 while self.instantiated.len() >= MAX_INSTANTIATED_VMS {
1015 let id = *self
1017 .instantiated
1018 .first_entry()
1019 .ok_or_else(|| Error::Unexpected("Map should not be empty".to_string()))?
1020 .key();
1021 self.suspend_vm(&id)?;
1022 }
1023
1024 self.instantiated.insert(id, (context, machine));
1025 self.states.insert(id, VmState::Runnable);
1026
1027 Ok(id)
1028 }
1029
1030 fn load_vm_program(
1032 &mut self,
1033 context: &VmContext<DL>,
1034 machine: &mut M,
1035 location: &DataLocation,
1036 program: Bytes,
1037 args: VmArgs,
1038 ) -> Result<u64, Error> {
1039 let metadata = parse_elf::<u64>(&program, machine.inner_mut().version())?;
1040 let bytes = match args {
1041 VmArgs::Reader { vm_id, argc, argv } => {
1042 let (_, machine_from) = self.ensure_get_instantiated(&vm_id)?;
1043 let argc = Self::u64_to_reg(argc);
1044 let argv = Self::u64_to_reg(argv);
1045 let argv =
1046 FlattenedArgsReader::new(machine_from.inner_mut().memory_mut(), argc, argv);
1047 machine.load_program_with_metadata(&program, &metadata, argv)?
1048 }
1049 VmArgs::Vector(data) => {
1050 machine.load_program_with_metadata(&program, &metadata, data.into_iter().map(Ok))?
1051 }
1052 };
1053 let mut sc = context.snapshot2_context.lock().expect("lock");
1054 sc.mark_program(
1055 machine.inner_mut(),
1056 &metadata,
1057 &location.data_piece_id,
1058 location.offset,
1059 )?;
1060 machine
1061 .inner_mut()
1062 .add_cycles_no_checking(transferred_byte_cycles(bytes))?;
1063 Ok(bytes)
1064 }
1065
1066 fn create_dummy_vm(&self, id: &VmId) -> Result<(VmContext<DL>, M), Error> {
1068 let version = &self.sg_data.sg_info.script_version;
1069 let core_machine = M::Inner::new(
1070 version.vm_isa(),
1071 version.vm_version(),
1072 u64::MAX,
1074 );
1075 let vm_context = VmContext {
1076 base_cycles: Arc::clone(&self.total_cycles),
1077 message_box: Arc::clone(&self.message_box),
1078 snapshot2_context: Arc::new(Mutex::new(Snapshot2Context::new(self.sg_data.clone()))),
1079 };
1080
1081 let machine_builder = DefaultMachineBuilder::new(core_machine)
1082 .instruction_cycle_func(Box::new(estimate_cycles));
1083 let machine_builder =
1084 (self.syscall_generator)(id, &self.sg_data, &vm_context, &self.syscall_context)
1085 .into_iter()
1086 .fold(machine_builder, |builder, syscall| builder.syscall(syscall));
1087 let default_machine = machine_builder.build();
1088 Ok((vm_context, M::new(default_machine)))
1089 }
1090
1091 fn i8_to_reg(v: i8) -> <M::Inner as CoreMachine>::REG {
1092 <M::Inner as CoreMachine>::REG::from_i8(v)
1093 }
1094
1095 fn u8_to_reg(v: u8) -> <M::Inner as CoreMachine>::REG {
1096 <M::Inner as CoreMachine>::REG::from_u8(v)
1097 }
1098
1099 fn u64_to_reg(v: u64) -> <M::Inner as CoreMachine>::REG {
1100 <M::Inner as CoreMachine>::REG::from_u64(v)
1101 }
1102}