1use crate::cost_model::transferred_byte_cycles;
2use crate::syscalls::{
3 EXEC_LOAD_ELF_V2_CYCLES_BASE, INVALID_FD, MAX_FDS_CREATED, MAX_VMS_SPAWNED, OTHER_END_CLOSED,
4 SPAWN_EXTRA_CYCLES_BASE, SUCCESS, WAIT_FAILURE,
5};
6
7use crate::types::{
8 DataLocation, DataPieceId, FIRST_FD_SLOT, FIRST_VM_ID, Fd, FdArgs, FullSuspendedState,
9 IterationResult, Message, ReadState, RunMode, SgData, SyscallGenerator, TerminatedResult,
10 VmArgs, VmContext, VmId, VmState, WriteState,
11};
12use ckb_traits::{CellDataProvider, ExtensionProvider, HeaderProvider};
13use ckb_types::core::Cycle;
14use ckb_vm::snapshot2::Snapshot2Context;
15use ckb_vm::{
16 Error, FlattenedArgsReader, Register,
17 bytes::Bytes,
18 cost_model::estimate_cycles,
19 elf::parse_elf,
20 machine::{CoreMachine, DefaultMachineBuilder, DefaultMachineRunner, Pause, SupportMachine},
21 memory::Memory,
22 registers::A0,
23 snapshot2::Snapshot2,
24};
25use std::collections::{BTreeMap, HashMap};
26use std::sync::{
27 Arc, Mutex,
28 atomic::{AtomicU64, Ordering},
29};
30
31pub const ROOT_VM_ID: VmId = FIRST_VM_ID;
33pub const MAX_VMS_COUNT: u64 = 16;
35pub const MAX_INSTANTIATED_VMS: usize = 4;
37pub const MAX_FDS: u64 = 64;
39
40pub struct Scheduler<DL, V, M>
47where
48 DL: CellDataProvider,
49 M: DefaultMachineRunner,
50{
51 sg_data: SgData<DL>,
53
54 syscall_generator: SyscallGenerator<DL, V, M::Inner>,
56 syscall_context: V,
58
59 total_cycles: Arc<AtomicU64>,
96 iteration_cycles: Cycle,
98 next_vm_id: VmId,
100 next_fd_slot: u64,
102 states: BTreeMap<VmId, VmState>,
104 fds: BTreeMap<Fd, VmId>,
106 inherited_fd: BTreeMap<VmId, Vec<Fd>>,
108 instantiated: BTreeMap<VmId, (VmContext<DL>, M)>,
110 suspended: BTreeMap<VmId, Snapshot2<DataPieceId>>,
112 terminated_vms: BTreeMap<VmId, i8>,
114 root_vm_args: Vec<Bytes>,
118
119 message_box: Arc<Mutex<Vec<Message>>>,
122}
123
124impl<DL, V, M> Scheduler<DL, V, M>
125where
126 DL: CellDataProvider + HeaderProvider + ExtensionProvider + Clone,
127 V: Clone,
128 M: DefaultMachineRunner,
129{
130 pub fn new(
132 sg_data: SgData<DL>,
133 syscall_generator: SyscallGenerator<DL, V, M::Inner>,
134 syscall_context: V,
135 ) -> Self {
136 Self {
137 sg_data,
138 syscall_generator,
139 syscall_context,
140 total_cycles: Arc::new(AtomicU64::new(0)),
141 iteration_cycles: 0,
142 next_vm_id: FIRST_VM_ID,
143 next_fd_slot: FIRST_FD_SLOT,
144 states: BTreeMap::default(),
145 fds: BTreeMap::default(),
146 inherited_fd: BTreeMap::default(),
147 instantiated: BTreeMap::default(),
148 suspended: BTreeMap::default(),
149 message_box: Arc::new(Mutex::new(Vec::new())),
150 terminated_vms: BTreeMap::default(),
151 root_vm_args: Vec::new(),
152 }
153 }
154
155 pub fn consumed_cycles(&self) -> Cycle {
157 self.total_cycles.load(Ordering::Acquire)
158 }
159
160 pub fn state(&self, vm_id: &VmId) -> Option<VmState> {
162 self.states.get(vm_id).cloned()
163 }
164
165 pub fn sg_data(&self) -> &SgData<DL> {
167 &self.sg_data
168 }
169
170 pub fn peek<F, G, W>(&mut self, vm_id: &VmId, mut f: F, mut g: G) -> Result<W, Error>
179 where
180 F: FnMut(&mut M) -> Result<W, Error>,
181 G: FnMut(&Snapshot2<DataPieceId>, &SgData<DL>) -> Result<W, Error>,
182 {
183 if let Some((_, machine)) = self.instantiated.get_mut(vm_id) {
184 return f(machine);
185 }
186 if let Some(snapshot) = self.suspended.get(vm_id) {
187 return g(snapshot, &self.sg_data);
188 }
189 Err(Error::Unexpected(format!("VM {} does not exist!", vm_id)))
190 }
191
192 fn consume_cycles(&mut self, cycles: Cycle) -> Result<(), Error> {
194 match self
195 .total_cycles
196 .fetch_update(Ordering::AcqRel, Ordering::Acquire, |total_cycles| {
197 total_cycles.checked_add(cycles)
198 }) {
199 Ok(_) => Ok(()),
200 Err(_) => Err(Error::CyclesExceeded),
201 }
202 }
203
204 pub fn resume(
206 sg_data: SgData<DL>,
207 syscall_generator: SyscallGenerator<DL, V, M::Inner>,
208 syscall_context: V,
209 full: FullSuspendedState,
210 ) -> Self {
211 let mut scheduler = Self {
212 sg_data,
213 syscall_generator,
214 syscall_context,
215 total_cycles: Arc::new(AtomicU64::new(full.total_cycles)),
216 iteration_cycles: full.iteration_cycles,
217 next_vm_id: full.next_vm_id,
218 next_fd_slot: full.next_fd_slot,
219 states: full
220 .vms
221 .iter()
222 .map(|(id, state, _)| (*id, state.clone()))
223 .collect(),
224 fds: full.fds.into_iter().collect(),
225 inherited_fd: full.inherited_fd.into_iter().collect(),
226 instantiated: BTreeMap::default(),
227 suspended: full
228 .vms
229 .into_iter()
230 .map(|(id, _, snapshot)| (id, snapshot))
231 .collect(),
232 message_box: Arc::new(Mutex::new(Vec::new())),
233 terminated_vms: full.terminated_vms.into_iter().collect(),
234 root_vm_args: Vec::new(),
235 };
236 scheduler
237 .ensure_vms_instantiated(&full.instantiated_ids)
238 .unwrap();
239 scheduler.iteration_cycles = 0;
243 scheduler
244 }
245
246 pub fn suspend(mut self) -> Result<FullSuspendedState, Error> {
248 assert!(self.message_box.lock().expect("lock").is_empty());
249 let mut vms = Vec::with_capacity(self.states.len());
250 let instantiated_ids: Vec<_> = self.instantiated.keys().cloned().collect();
251 for id in &instantiated_ids {
252 self.suspend_vm(id)?;
253 }
254 for (id, state) in self.states {
255 let snapshot = self
256 .suspended
257 .remove(&id)
258 .ok_or_else(|| Error::Unexpected("Unable to find VM Id".to_string()))?;
259 vms.push((id, state, snapshot));
260 }
261 Ok(FullSuspendedState {
262 total_cycles: self.total_cycles.load(Ordering::Acquire),
267 iteration_cycles: self.iteration_cycles,
268 next_vm_id: self.next_vm_id,
269 next_fd_slot: self.next_fd_slot,
270 vms,
271 fds: self.fds.into_iter().collect(),
272 inherited_fd: self.inherited_fd.into_iter().collect(),
273 terminated_vms: self.terminated_vms.into_iter().collect(),
274 instantiated_ids,
275 })
276 }
277
278 pub fn run(&mut self, mode: RunMode) -> Result<TerminatedResult, Error> {
292 self.boot_root_vm_if_needed()?;
293
294 let (pause, mut limit_cycles) = match mode {
295 RunMode::LimitCycles(limit_cycles) => (Pause::new(), limit_cycles),
296 RunMode::Pause(pause, limit_cycles) => (pause, limit_cycles),
297 };
298
299 while !self.terminated() {
300 limit_cycles = self.iterate_outer(&pause, limit_cycles)?.1;
301 }
302 assert_eq!(self.iteration_cycles, 0);
303
304 self.terminated_result()
305 }
306
307 pub fn iterate(&mut self) -> Result<IterationResult, Error> {
311 self.boot_root_vm_if_needed()?;
312
313 if self.terminated() {
314 return Ok(IterationResult {
315 executed_vm: ROOT_VM_ID,
316 terminated_status: Some(self.terminated_result()?),
317 });
318 }
319
320 let (id, _) = self.iterate_outer(&Pause::new(), u64::MAX)?;
321 let terminated_status = if self.terminated() {
322 assert_eq!(self.iteration_cycles, 0);
323 Some(self.terminated_result()?)
324 } else {
325 None
326 };
327
328 Ok(IterationResult {
329 executed_vm: id,
330 terminated_status,
331 })
332 }
333
334 fn iterate_prepare_machine(&mut self) -> Result<(u64, &mut M), Error> {
336 let vm_id_to_run = self
338 .states
339 .iter()
340 .rev()
341 .filter(|(_, state)| matches!(state, VmState::Runnable))
342 .map(|(id, _)| *id)
343 .next();
344 let vm_id_to_run = vm_id_to_run.ok_or_else(|| {
345 Error::Unexpected("A deadlock situation has been reached!".to_string())
346 })?;
347 let (_context, machine) = self.ensure_get_instantiated(&vm_id_to_run)?;
348 Ok((vm_id_to_run, machine))
349 }
350
351 fn iterate_process_results(
353 &mut self,
354 vm_id_to_run: u64,
355 result: Result<i8, Error>,
356 ) -> Result<(), Error> {
357 self.process_message_box()?;
359 assert!(self.message_box.lock().expect("lock").is_empty());
360 let result = match result {
362 Ok(code) => {
363 self.terminated_vms.insert(vm_id_to_run, code);
364 if vm_id_to_run == ROOT_VM_ID {
368 self.ensure_vms_instantiated(&[vm_id_to_run])?;
369 self.instantiated.retain(|id, _| *id == vm_id_to_run);
370 self.suspended.clear();
371 self.states.clear();
372 self.states.insert(vm_id_to_run, VmState::Terminated);
373 } else {
374 let joining_vms: Vec<(VmId, u64)> = self
375 .states
376 .iter()
377 .filter_map(|(vm_id, state)| match state {
378 VmState::Wait {
379 target_vm_id,
380 exit_code_addr,
381 } if *target_vm_id == vm_id_to_run => Some((*vm_id, *exit_code_addr)),
382 _ => None,
383 })
384 .collect();
385 for (vm_id, exit_code_addr) in joining_vms {
388 let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
389 machine
390 .inner_mut()
391 .memory_mut()
392 .store8(&Self::u64_to_reg(exit_code_addr), &Self::i8_to_reg(code))?;
393 machine
394 .inner_mut()
395 .set_register(A0, Self::u8_to_reg(SUCCESS));
396 self.states.insert(vm_id, VmState::Runnable);
397 }
398 self.fds.retain(|_, vm_id| *vm_id != vm_id_to_run);
400 self.states.remove(&vm_id_to_run);
402 self.instantiated.remove(&vm_id_to_run);
403 self.suspended.remove(&vm_id_to_run);
404 }
405 Ok(())
406 }
407 Err(Error::Yield) => Ok(()),
408 Err(e) => Err(e),
409 };
410 result
411 }
412
413 #[inline]
417 fn iterate_outer(
418 &mut self,
419 pause: &Pause,
420 limit_cycles: Cycle,
421 ) -> Result<(VmId, Cycle), Error> {
422 let iterate_return = self.iterate_inner(pause.clone(), limit_cycles);
423 self.consume_cycles(self.iteration_cycles)?;
424 let remaining_cycles = limit_cycles
425 .checked_sub(self.iteration_cycles)
426 .ok_or(Error::CyclesExceeded)?;
427 self.iteration_cycles = 0;
429 self.process_io()?;
454 let id = iterate_return?;
455 Ok((id, remaining_cycles))
456 }
457
458 fn iterate_inner(&mut self, pause: Pause, limit_cycles: Cycle) -> Result<VmId, Error> {
462 let (id, result, cycles) = {
466 let (id, vm) = self.iterate_prepare_machine()?;
467 vm.inner_mut().set_max_cycles(limit_cycles);
468 vm.machine_mut().set_pause(pause);
469 let result = vm.run();
470 let cycles = vm.machine().cycles();
471 vm.inner_mut().set_cycles(0);
472 (id, result, cycles)
473 };
474 self.iteration_cycles = self
475 .iteration_cycles
476 .checked_add(cycles)
477 .ok_or(Error::CyclesExceeded)?;
478 self.iterate_process_results(id, result)?;
479 Ok(id)
480 }
481
482 fn process_message_box(&mut self) -> Result<(), Error> {
483 let messages: Vec<Message> = self.message_box.lock().expect("lock").drain(..).collect();
484 for message in messages {
485 match message {
486 Message::ExecV2(vm_id, args) => {
487 let (old_context, old_machine) = self
488 .instantiated
489 .get_mut(&vm_id)
490 .ok_or_else(|| Error::Unexpected("Unable to find VM Id".to_string()))?;
491 old_machine
492 .inner_mut()
493 .add_cycles_no_checking(EXEC_LOAD_ELF_V2_CYCLES_BASE)?;
494 let old_cycles = old_machine.machine().cycles();
495 let max_cycles = old_machine.machine().max_cycles();
496 let program = {
497 let sc = old_context.snapshot2_context.lock().expect("lock");
498 sc.load_data(
499 &args.location.data_piece_id,
500 args.location.offset,
501 args.location.length,
502 )?
503 .0
504 };
505 let (context, mut new_machine) = self.create_dummy_vm(&vm_id)?;
506 new_machine.inner_mut().set_max_cycles(max_cycles);
507 new_machine.inner_mut().add_cycles_no_checking(old_cycles)?;
508 self.load_vm_program(
509 &context,
510 &mut new_machine,
511 &args.location,
512 program,
513 VmArgs::Reader {
514 vm_id,
515 argc: args.argc,
516 argv: args.argv,
517 },
518 )?;
519 debug_assert!(self.instantiated.contains_key(&vm_id));
521 self.instantiated.insert(vm_id, (context, new_machine));
522 }
523 Message::Spawn(vm_id, args) => {
524 if args.fds.iter().any(|fd| self.fds.get(fd) != Some(&vm_id)) {
526 let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
527 machine
528 .inner_mut()
529 .set_register(A0, Self::u8_to_reg(INVALID_FD));
530 continue;
531 }
532 if self.suspended.len() + self.instantiated.len() > MAX_VMS_COUNT as usize {
533 let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
534 machine
535 .inner_mut()
536 .set_register(A0, Self::u8_to_reg(MAX_VMS_SPAWNED));
537 continue;
538 }
539 let spawned_vm_id = self.boot_vm(
540 &args.location,
541 VmArgs::Reader {
542 vm_id,
543 argc: args.argc,
544 argv: args.argv,
545 },
546 )?;
547 for fd in &args.fds {
549 self.fds.insert(*fd, spawned_vm_id);
550 }
551 self.inherited_fd.insert(spawned_vm_id, args.fds.clone());
554
555 let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
556 machine.inner_mut().memory_mut().store64(
557 &Self::u64_to_reg(args.process_id_addr),
558 &Self::u64_to_reg(spawned_vm_id),
559 )?;
560 machine
561 .inner_mut()
562 .set_register(A0, Self::u8_to_reg(SUCCESS));
563 }
564 Message::Wait(vm_id, args) => {
565 if let Some(exit_code) = self.terminated_vms.get(&args.target_id).copied() {
566 let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
567 machine.inner_mut().memory_mut().store8(
568 &Self::u64_to_reg(args.exit_code_addr),
569 &Self::i8_to_reg(exit_code),
570 )?;
571 machine
572 .inner_mut()
573 .set_register(A0, Self::u8_to_reg(SUCCESS));
574 self.states.insert(vm_id, VmState::Runnable);
575 self.terminated_vms.retain(|id, _| id != &args.target_id);
576 continue;
577 }
578 if !self.states.contains_key(&args.target_id) {
579 let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
580 machine
581 .inner_mut()
582 .set_register(A0, Self::u8_to_reg(WAIT_FAILURE));
583 continue;
584 }
585 self.states.insert(
587 vm_id,
588 VmState::Wait {
589 target_vm_id: args.target_id,
590 exit_code_addr: args.exit_code_addr,
591 },
592 );
593 }
594 Message::Pipe(vm_id, args) => {
595 if self.fds.len() as u64 >= MAX_FDS {
596 let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
597 machine
598 .inner_mut()
599 .set_register(A0, Self::u8_to_reg(MAX_FDS_CREATED));
600 continue;
601 }
602 let (p1, p2, slot) = Fd::create(self.next_fd_slot);
603 self.next_fd_slot = slot;
604 self.fds.insert(p1, vm_id);
605 self.fds.insert(p2, vm_id);
606 let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
607 machine
608 .inner_mut()
609 .memory_mut()
610 .store64(&Self::u64_to_reg(args.fd1_addr), &Self::u64_to_reg(p1.0))?;
611 machine
612 .inner_mut()
613 .memory_mut()
614 .store64(&Self::u64_to_reg(args.fd2_addr), &Self::u64_to_reg(p2.0))?;
615 machine
616 .inner_mut()
617 .set_register(A0, Self::u8_to_reg(SUCCESS));
618 }
619 Message::FdRead(vm_id, args) => {
620 if !(self.fds.contains_key(&args.fd) && (self.fds[&args.fd] == vm_id)) {
621 let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
622 machine
623 .inner_mut()
624 .set_register(A0, Self::u8_to_reg(INVALID_FD));
625 continue;
626 }
627 if !self.fds.contains_key(&args.fd.other_fd()) {
628 let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
629 machine
630 .inner_mut()
631 .set_register(A0, Self::u8_to_reg(OTHER_END_CLOSED));
632 continue;
633 }
634 self.states.insert(
636 vm_id,
637 VmState::WaitForRead(ReadState {
638 fd: args.fd,
639 length: args.length,
640 buffer_addr: args.buffer_addr,
641 length_addr: args.length_addr,
642 }),
643 );
644 }
645 Message::FdWrite(vm_id, args) => {
646 if !(self.fds.contains_key(&args.fd) && (self.fds[&args.fd] == vm_id)) {
647 let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
648 machine
649 .inner_mut()
650 .set_register(A0, Self::u8_to_reg(INVALID_FD));
651 continue;
652 }
653 if !self.fds.contains_key(&args.fd.other_fd()) {
654 let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
655 machine
656 .inner_mut()
657 .set_register(A0, Self::u8_to_reg(OTHER_END_CLOSED));
658 continue;
659 }
660 self.states.insert(
662 vm_id,
663 VmState::WaitForWrite(WriteState {
664 fd: args.fd,
665 consumed: 0,
666 length: args.length,
667 buffer_addr: args.buffer_addr,
668 length_addr: args.length_addr,
669 }),
670 );
671 }
672 Message::InheritedFileDescriptor(vm_id, args) => {
673 let inherited_fd = if vm_id == ROOT_VM_ID {
674 Vec::new()
675 } else {
676 self.inherited_fd[&vm_id].clone()
677 };
678 let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
679 let FdArgs {
680 buffer_addr,
681 length_addr,
682 ..
683 } = args;
684 let full_length = machine
685 .inner_mut()
686 .memory_mut()
687 .load64(&Self::u64_to_reg(length_addr))?
688 .to_u64();
689 let real_length = inherited_fd.len() as u64;
690 let copy_length = u64::min(full_length, real_length);
691 for i in 0..copy_length {
692 let fd = inherited_fd[i as usize].0;
693 let addr = buffer_addr.checked_add(i * 8).ok_or(Error::MemOutOfBound)?;
694 machine
695 .inner_mut()
696 .memory_mut()
697 .store64(&Self::u64_to_reg(addr), &Self::u64_to_reg(fd))?;
698 }
699 machine.inner_mut().memory_mut().store64(
700 &Self::u64_to_reg(length_addr),
701 &Self::u64_to_reg(real_length),
702 )?;
703 machine
704 .inner_mut()
705 .set_register(A0, Self::u8_to_reg(SUCCESS));
706 }
707 Message::Close(vm_id, fd) => {
708 if self.fds.get(&fd) != Some(&vm_id) {
709 let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
710 machine
711 .inner_mut()
712 .set_register(A0, Self::u8_to_reg(INVALID_FD));
713 } else {
714 self.fds.remove(&fd);
715 let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
716 machine
717 .inner_mut()
718 .set_register(A0, Self::u8_to_reg(SUCCESS));
719 }
720 }
721 }
722 }
723 Ok(())
724 }
725
726 fn process_io(&mut self) -> Result<(), Error> {
727 let mut reads: HashMap<Fd, (VmId, ReadState)> = HashMap::default();
728 let mut closed_fds: Vec<VmId> = Vec::new();
729
730 self.states.iter().for_each(|(vm_id, state)| {
731 if let VmState::WaitForRead(inner_state) = state {
732 if self.fds.contains_key(&inner_state.fd.other_fd()) {
733 reads.insert(inner_state.fd, (*vm_id, inner_state.clone()));
734 } else {
735 closed_fds.push(*vm_id);
736 }
737 }
738 });
739 let mut pairs: Vec<(VmId, ReadState, VmId, WriteState)> = Vec::new();
740 self.states.iter().for_each(|(vm_id, state)| {
741 if let VmState::WaitForWrite(inner_state) = state {
742 if self.fds.contains_key(&inner_state.fd.other_fd()) {
743 if let Some((read_vm_id, read_state)) = reads.get(&inner_state.fd.other_fd()) {
744 pairs.push((*read_vm_id, read_state.clone(), *vm_id, inner_state.clone()));
745 }
746 } else {
747 closed_fds.push(*vm_id);
748 }
749 }
750 });
751 for vm_id in closed_fds {
753 match self.states[&vm_id].clone() {
754 VmState::WaitForRead(ReadState { length_addr, .. }) => {
755 let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
756 machine.inner_mut().memory_mut().store64(
757 &Self::u64_to_reg(length_addr),
758 &<M::Inner as CoreMachine>::REG::zero(),
759 )?;
760 machine
761 .inner_mut()
762 .set_register(A0, Self::u8_to_reg(SUCCESS));
763 self.states.insert(vm_id, VmState::Runnable);
764 }
765 VmState::WaitForWrite(WriteState {
766 consumed,
767 length_addr,
768 ..
769 }) => {
770 let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
771 machine
772 .inner_mut()
773 .memory_mut()
774 .store64(&Self::u64_to_reg(length_addr), &Self::u64_to_reg(consumed))?;
775 machine
776 .inner_mut()
777 .set_register(A0, Self::u8_to_reg(SUCCESS));
778 self.states.insert(vm_id, VmState::Runnable);
779 }
780 _ => (),
781 }
782 }
783 for (read_vm_id, read_state, write_vm_id, write_state) in pairs {
785 let ReadState {
786 length: read_length,
787 buffer_addr: read_buffer_addr,
788 length_addr: read_length_addr,
789 ..
790 } = read_state;
791 let WriteState {
792 fd: write_fd,
793 mut consumed,
794 length: write_length,
795 buffer_addr: write_buffer_addr,
796 length_addr: write_length_addr,
797 } = write_state;
798
799 self.ensure_vms_instantiated(&[read_vm_id, write_vm_id])?;
800 {
801 let fillable = read_length;
802 let consumable = write_length - consumed;
803 let copiable = std::cmp::min(fillable, consumable);
804
805 let (_, write_machine) = self
807 .instantiated
808 .get_mut(&write_vm_id)
809 .ok_or_else(|| Error::Unexpected("Unable to find VM Id".to_string()))?;
810 write_machine
811 .inner_mut()
812 .add_cycles_no_checking(transferred_byte_cycles(copiable))?;
813 let data = write_machine
814 .inner_mut()
815 .memory_mut()
816 .load_bytes(write_buffer_addr.wrapping_add(consumed), copiable)?;
817 let (_, read_machine) = self
818 .instantiated
819 .get_mut(&read_vm_id)
820 .ok_or_else(|| Error::Unexpected("Unable to find VM Id".to_string()))?;
821 read_machine
822 .inner_mut()
823 .add_cycles_no_checking(transferred_byte_cycles(copiable))?;
824 read_machine
825 .inner_mut()
826 .memory_mut()
827 .store_bytes(read_buffer_addr, &data)?;
828 read_machine.inner_mut().memory_mut().store64(
830 &Self::u64_to_reg(read_length_addr),
831 &Self::u64_to_reg(copiable),
832 )?;
833 read_machine
834 .inner_mut()
835 .set_register(A0, Self::u8_to_reg(SUCCESS));
836 self.states.insert(read_vm_id, VmState::Runnable);
837
838 consumed += copiable;
841 if consumed == write_length {
842 let (_, write_machine) = self
844 .instantiated
845 .get_mut(&write_vm_id)
846 .ok_or_else(|| Error::Unexpected("Unable to find VM Id".to_string()))?;
847 write_machine.inner_mut().memory_mut().store64(
848 &Self::u64_to_reg(write_length_addr),
849 &Self::u64_to_reg(write_length),
850 )?;
851 write_machine
852 .inner_mut()
853 .set_register(A0, Self::u8_to_reg(SUCCESS));
854 self.states.insert(write_vm_id, VmState::Runnable);
855 } else {
856 self.states.insert(
858 write_vm_id,
859 VmState::WaitForWrite(WriteState {
860 fd: write_fd,
861 consumed,
862 length: write_length,
863 buffer_addr: write_buffer_addr,
864 length_addr: write_length_addr,
865 }),
866 );
867 }
868 }
869 }
870 Ok(())
871 }
872
873 pub fn terminated(&self) -> bool {
875 self.states
876 .get(&ROOT_VM_ID)
877 .map(|state| *state == VmState::Terminated)
878 .unwrap_or(false)
879 }
880
881 fn terminated_result(&mut self) -> Result<TerminatedResult, Error> {
882 assert!(self.terminated());
883
884 let exit_code = {
885 let root_vm = &self.ensure_get_instantiated(&ROOT_VM_ID)?.1;
886 root_vm.machine().exit_code()
887 };
888 Ok(TerminatedResult {
889 exit_code,
890 consumed_cycles: self.consumed_cycles(),
891 })
892 }
893
894 fn ensure_vms_instantiated(&mut self, ids: &[VmId]) -> Result<(), Error> {
896 if ids.len() > MAX_INSTANTIATED_VMS {
897 return Err(Error::Unexpected(format!(
898 "At most {} VMs can be instantiated but {} are requested!",
899 MAX_INSTANTIATED_VMS,
900 ids.len()
901 )));
902 }
903
904 let mut uninstantiated_ids: Vec<VmId> = ids
905 .iter()
906 .filter(|id| !self.instantiated.contains_key(id))
907 .copied()
908 .collect();
909 while (!uninstantiated_ids.is_empty()) && (self.instantiated.len() < MAX_INSTANTIATED_VMS) {
910 let id = uninstantiated_ids
911 .pop()
912 .ok_or_else(|| Error::Unexpected("Map should not be empty".to_string()))?;
913 self.resume_vm(&id)?;
914 }
915
916 if !uninstantiated_ids.is_empty() {
917 let suspendable_ids: Vec<VmId> = self
919 .instantiated
920 .keys()
921 .filter(|id| !ids.contains(id))
922 .copied()
923 .collect();
924
925 assert!(suspendable_ids.len() >= uninstantiated_ids.len());
926 for i in 0..uninstantiated_ids.len() {
927 self.suspend_vm(&suspendable_ids[i])?;
928 self.resume_vm(&uninstantiated_ids[i])?;
929 }
930 }
931
932 Ok(())
933 }
934
935 fn ensure_get_instantiated(&mut self, id: &VmId) -> Result<&mut (VmContext<DL>, M), Error> {
937 self.ensure_vms_instantiated(&[*id])?;
938 self.instantiated
939 .get_mut(id)
940 .ok_or_else(|| Error::Unexpected("Unable to find VM Id".to_string()))
941 }
942
943 fn resume_vm(&mut self, id: &VmId) -> Result<(), Error> {
945 if !self.suspended.contains_key(id) {
946 return Err(Error::Unexpected(format!("VM {:?} is not suspended!", id)));
947 }
948 let snapshot = &self.suspended[id];
949 self.iteration_cycles = self
950 .iteration_cycles
951 .checked_add(SPAWN_EXTRA_CYCLES_BASE)
952 .ok_or(Error::CyclesExceeded)?;
953 let (context, mut machine) = self.create_dummy_vm(id)?;
954 {
955 let mut sc = context.snapshot2_context.lock().expect("lock");
956 sc.resume(machine.inner_mut(), snapshot)?;
957 }
958 self.instantiated.insert(*id, (context, machine));
959 self.suspended.remove(id);
960 Ok(())
961 }
962
963 fn suspend_vm(&mut self, id: &VmId) -> Result<(), Error> {
965 if !self.instantiated.contains_key(id) {
966 return Err(Error::Unexpected(format!(
967 "VM {:?} is not instantiated!",
968 id
969 )));
970 }
971 self.iteration_cycles = self
972 .iteration_cycles
973 .checked_add(SPAWN_EXTRA_CYCLES_BASE)
974 .ok_or(Error::CyclesExceeded)?;
975 let (context, machine) = self
976 .instantiated
977 .get_mut(id)
978 .ok_or_else(|| Error::Unexpected("Unable to find VM Id".to_string()))?;
979 let snapshot = {
980 let sc = context.snapshot2_context.lock().expect("lock");
981 sc.make_snapshot(machine.inner_mut())?
982 };
983 self.suspended.insert(*id, snapshot);
984 self.instantiated.remove(id);
985 Ok(())
986 }
987
988 fn boot_root_vm_if_needed(&mut self) -> Result<(), Error> {
989 if self.states.is_empty() {
990 let program_id = self.sg_data.sg_info.program_data_piece_id.clone();
992 assert_eq!(
993 self.boot_vm(
994 &DataLocation {
995 data_piece_id: program_id,
996 offset: 0,
997 length: u64::MAX,
998 },
999 VmArgs::Vector(self.root_vm_args.clone()),
1000 )?,
1001 ROOT_VM_ID
1002 );
1003 }
1004 assert!(self.states.contains_key(&ROOT_VM_ID));
1005
1006 Ok(())
1007 }
1008
1009 fn boot_vm(&mut self, location: &DataLocation, args: VmArgs) -> Result<VmId, Error> {
1011 let id = self.next_vm_id;
1012 self.next_vm_id += 1;
1013 let (context, mut machine) = self.create_dummy_vm(&id)?;
1014 let (program, _) = {
1015 let sc = context.snapshot2_context.lock().expect("lock");
1016 sc.load_data(&location.data_piece_id, location.offset, location.length)?
1017 };
1018 self.load_vm_program(&context, &mut machine, location, program, args)?;
1019 while self.instantiated.len() >= MAX_INSTANTIATED_VMS {
1021 let id = *self
1023 .instantiated
1024 .first_entry()
1025 .ok_or_else(|| Error::Unexpected("Map should not be empty".to_string()))?
1026 .key();
1027 self.suspend_vm(&id)?;
1028 }
1029
1030 self.instantiated.insert(id, (context, machine));
1031 self.states.insert(id, VmState::Runnable);
1032
1033 Ok(id)
1034 }
1035
1036 fn load_vm_program(
1038 &mut self,
1039 context: &VmContext<DL>,
1040 machine: &mut M,
1041 location: &DataLocation,
1042 program: Bytes,
1043 args: VmArgs,
1044 ) -> Result<u64, Error> {
1045 let metadata = parse_elf::<u64>(&program, machine.inner_mut().version())?;
1046 let bytes = match args {
1047 VmArgs::Reader { vm_id, argc, argv } => {
1048 let (_, machine_from) = self.ensure_get_instantiated(&vm_id)?;
1049 let argc = Self::u64_to_reg(argc);
1050 let argv = Self::u64_to_reg(argv);
1051 let argv =
1052 FlattenedArgsReader::new(machine_from.inner_mut().memory_mut(), argc, argv);
1053 machine.load_program_with_metadata(&program, &metadata, argv)?
1054 }
1055 VmArgs::Vector(data) => {
1056 machine.load_program_with_metadata(&program, &metadata, data.into_iter().map(Ok))?
1057 }
1058 };
1059 let mut sc = context.snapshot2_context.lock().expect("lock");
1060 sc.mark_program(
1061 machine.inner_mut(),
1062 &metadata,
1063 &location.data_piece_id,
1064 location.offset,
1065 )?;
1066 machine
1067 .inner_mut()
1068 .add_cycles_no_checking(transferred_byte_cycles(bytes))?;
1069 Ok(bytes)
1070 }
1071
1072 fn create_dummy_vm(&self, id: &VmId) -> Result<(VmContext<DL>, M), Error> {
1074 let version = &self.sg_data.sg_info.script_version;
1075 let core_machine = M::Inner::new(
1076 version.vm_isa(),
1077 version.vm_version(),
1078 u64::MAX,
1080 );
1081 let vm_context = VmContext {
1082 base_cycles: Arc::clone(&self.total_cycles),
1083 message_box: Arc::clone(&self.message_box),
1084 snapshot2_context: Arc::new(Mutex::new(Snapshot2Context::new(self.sg_data.clone()))),
1085 };
1086
1087 let machine_builder = DefaultMachineBuilder::new(core_machine)
1088 .instruction_cycle_func(Box::new(estimate_cycles));
1089 let machine_builder =
1090 (self.syscall_generator)(id, &self.sg_data, &vm_context, &self.syscall_context)
1091 .into_iter()
1092 .fold(machine_builder, |builder, syscall| builder.syscall(syscall));
1093 let default_machine = machine_builder.build();
1094 Ok((vm_context, M::new(default_machine)))
1095 }
1096
1097 pub fn set_root_vm_args(&mut self, args: Vec<Bytes>) {
1100 self.root_vm_args = args;
1101 }
1102
1103 fn i8_to_reg(v: i8) -> <M::Inner as CoreMachine>::REG {
1104 <M::Inner as CoreMachine>::REG::from_i8(v)
1105 }
1106
1107 fn u8_to_reg(v: u8) -> <M::Inner as CoreMachine>::REG {
1108 <M::Inner as CoreMachine>::REG::from_u8(v)
1109 }
1110
1111 fn u64_to_reg(v: u64) -> <M::Inner as CoreMachine>::REG {
1112 <M::Inner as CoreMachine>::REG::from_u64(v)
1113 }
1114}