1use crate::cost_model::transferred_byte_cycles;
2use crate::syscalls::{
3 EXEC_LOAD_ELF_V2_CYCLES_BASE, INVALID_FD, MAX_FDS_CREATED, MAX_VMS_SPAWNED, OTHER_END_CLOSED,
4 SPAWN_EXTRA_CYCLES_BASE, SUCCESS, WAIT_FAILURE,
5};
6
7use crate::types::{
8 DataLocation, DataPieceId, FIRST_FD_SLOT, FIRST_VM_ID, Fd, FdArgs, FullSuspendedState, Message,
9 ReadState, RunMode, SgData, SyscallGenerator, VmArgs, VmContext, VmId, VmState, WriteState,
10};
11use ckb_traits::{CellDataProvider, ExtensionProvider, HeaderProvider};
12use ckb_types::core::Cycle;
13use ckb_vm::snapshot2::Snapshot2Context;
14use ckb_vm::{
15 Error, FlattenedArgsReader, Register,
16 bytes::Bytes,
17 cost_model::estimate_cycles,
18 elf::parse_elf,
19 machine::{CoreMachine, DefaultMachineBuilder, DefaultMachineRunner, Pause, SupportMachine},
20 memory::Memory,
21 registers::A0,
22 snapshot2::Snapshot2,
23};
24use std::collections::{BTreeMap, HashMap};
25use std::sync::{
26 Arc, Mutex,
27 atomic::{AtomicU64, Ordering},
28};
29
30pub const ROOT_VM_ID: VmId = FIRST_VM_ID;
32pub const MAX_VMS_COUNT: u64 = 16;
34pub const MAX_INSTANTIATED_VMS: usize = 4;
36pub const MAX_FDS: u64 = 64;
38
39pub struct Scheduler<DL, V, M>
46where
47 DL: CellDataProvider,
48 M: DefaultMachineRunner,
49{
50 pub sg_data: SgData<DL>,
52
53 pub syscall_generator: SyscallGenerator<DL, V, M::Inner>,
55 pub syscall_context: V,
57
58 pub total_cycles: Arc<AtomicU64>,
87 pub iteration_cycles: Cycle,
89 pub next_vm_id: VmId,
91 pub next_fd_slot: u64,
93 pub states: BTreeMap<VmId, VmState>,
95 pub fds: BTreeMap<Fd, VmId>,
97 pub inherited_fd: BTreeMap<VmId, Vec<Fd>>,
99 pub instantiated: BTreeMap<VmId, (VmContext<DL>, M)>,
101 pub suspended: BTreeMap<VmId, Snapshot2<DataPieceId>>,
103 pub terminated_vms: BTreeMap<VmId, i8>,
105
106 pub message_box: Arc<Mutex<Vec<Message>>>,
109}
110
111impl<DL, V, M> Scheduler<DL, V, M>
112where
113 DL: CellDataProvider + HeaderProvider + ExtensionProvider + Clone,
114 V: Clone,
115 M: DefaultMachineRunner,
116{
117 pub fn new(
119 sg_data: SgData<DL>,
120 syscall_generator: SyscallGenerator<DL, V, M::Inner>,
121 syscall_context: V,
122 ) -> Self {
123 Self {
124 sg_data,
125 syscall_generator,
126 syscall_context,
127 total_cycles: Arc::new(AtomicU64::new(0)),
128 iteration_cycles: 0,
129 next_vm_id: FIRST_VM_ID,
130 next_fd_slot: FIRST_FD_SLOT,
131 states: BTreeMap::default(),
132 fds: BTreeMap::default(),
133 inherited_fd: BTreeMap::default(),
134 instantiated: BTreeMap::default(),
135 suspended: BTreeMap::default(),
136 message_box: Arc::new(Mutex::new(Vec::new())),
137 terminated_vms: BTreeMap::default(),
138 }
139 }
140
141 pub fn consumed_cycles(&self) -> Cycle {
143 self.total_cycles.load(Ordering::Acquire)
144 }
145
146 pub fn consume_cycles(&mut self, cycles: Cycle) -> Result<(), Error> {
148 match self
149 .total_cycles
150 .fetch_update(Ordering::AcqRel, Ordering::Acquire, |total_cycles| {
151 total_cycles.checked_add(cycles)
152 }) {
153 Ok(_) => Ok(()),
154 Err(_) => Err(Error::CyclesExceeded),
155 }
156 }
157
158 pub fn resume(
160 sg_data: SgData<DL>,
161 syscall_generator: SyscallGenerator<DL, V, M::Inner>,
162 syscall_context: V,
163 full: FullSuspendedState,
164 ) -> Self {
165 let mut scheduler = Self {
166 sg_data,
167 syscall_generator,
168 syscall_context,
169 total_cycles: Arc::new(AtomicU64::new(full.total_cycles)),
170 iteration_cycles: 0,
171 next_vm_id: full.next_vm_id,
172 next_fd_slot: full.next_fd_slot,
173 states: full
174 .vms
175 .iter()
176 .map(|(id, state, _)| (*id, state.clone()))
177 .collect(),
178 fds: full.fds.into_iter().collect(),
179 inherited_fd: full.inherited_fd.into_iter().collect(),
180 instantiated: BTreeMap::default(),
181 suspended: full
182 .vms
183 .into_iter()
184 .map(|(id, _, snapshot)| (id, snapshot))
185 .collect(),
186 message_box: Arc::new(Mutex::new(Vec::new())),
187 terminated_vms: full.terminated_vms.into_iter().collect(),
188 };
189 scheduler
190 .ensure_vms_instantiated(&full.instantiated_ids)
191 .unwrap();
192 scheduler.iteration_cycles = 0;
196 scheduler
197 }
198
199 pub fn suspend(mut self) -> Result<FullSuspendedState, Error> {
201 assert!(self.message_box.lock().expect("lock").is_empty());
202 let mut vms = Vec::with_capacity(self.states.len());
203 let instantiated_ids: Vec<_> = self.instantiated.keys().cloned().collect();
204 for id in &instantiated_ids {
205 self.suspend_vm(id)?;
206 }
207 for (id, state) in self.states {
208 let snapshot = self
209 .suspended
210 .remove(&id)
211 .ok_or_else(|| Error::Unexpected("Unable to find VM Id".to_string()))?;
212 vms.push((id, state, snapshot));
213 }
214 Ok(FullSuspendedState {
215 total_cycles: self.total_cycles.load(Ordering::Acquire),
220 next_vm_id: self.next_vm_id,
221 next_fd_slot: self.next_fd_slot,
222 vms,
223 fds: self.fds.into_iter().collect(),
224 inherited_fd: self.inherited_fd.into_iter().collect(),
225 terminated_vms: self.terminated_vms.into_iter().collect(),
226 instantiated_ids,
227 })
228 }
229
230 pub fn run(&mut self, mode: RunMode) -> Result<(i8, Cycle), Error> {
244 if self.states.is_empty() {
245 let program_id = self.sg_data.sg_info.program_data_piece_id.clone();
247 assert_eq!(
248 self.boot_vm(
249 &DataLocation {
250 data_piece_id: program_id,
251 offset: 0,
252 length: u64::MAX,
253 },
254 VmArgs::Vector(vec![]),
255 )?,
256 ROOT_VM_ID
257 );
258 }
259 assert!(self.states.contains_key(&ROOT_VM_ID));
260
261 let (pause, mut limit_cycles) = match mode {
262 RunMode::LimitCycles(limit_cycles) => (Pause::new(), limit_cycles),
263 RunMode::Pause(pause) => (pause, u64::MAX),
264 };
265
266 while self.states[&ROOT_VM_ID] != VmState::Terminated {
267 assert_eq!(self.iteration_cycles, 0);
268 let iterate_return = self.iterate(pause.clone(), limit_cycles);
269 self.consume_cycles(self.iteration_cycles)?;
270 limit_cycles = limit_cycles
271 .checked_sub(self.iteration_cycles)
272 .ok_or(Error::CyclesExceeded)?;
273 self.iteration_cycles = 0;
275 iterate_return?;
276 }
277
278 let root_vm = &self.instantiated[&ROOT_VM_ID];
280 Ok((root_vm.1.machine().exit_code(), self.consumed_cycles()))
281 }
282
283 pub fn iterate_prepare_machine(&mut self) -> Result<(u64, &mut M), Error> {
285 self.process_io()?;
287 let vm_id_to_run = self
289 .states
290 .iter()
291 .rev()
292 .filter(|(_, state)| matches!(state, VmState::Runnable))
293 .map(|(id, _)| *id)
294 .next();
295 let vm_id_to_run = vm_id_to_run.ok_or_else(|| {
296 Error::Unexpected("A deadlock situation has been reached!".to_string())
297 })?;
298 let (_context, machine) = self.ensure_get_instantiated(&vm_id_to_run)?;
299 Ok((vm_id_to_run, machine))
300 }
301
302 pub fn iterate_process_results(
304 &mut self,
305 vm_id_to_run: u64,
306 result: Result<i8, Error>,
307 ) -> Result<(), Error> {
308 self.process_message_box()?;
310 assert!(self.message_box.lock().expect("lock").is_empty());
311 match result {
313 Ok(code) => {
314 self.terminated_vms.insert(vm_id_to_run, code);
315 if vm_id_to_run == ROOT_VM_ID {
319 self.ensure_vms_instantiated(&[vm_id_to_run])?;
320 self.instantiated.retain(|id, _| *id == vm_id_to_run);
321 self.suspended.clear();
322 self.states.clear();
323 self.states.insert(vm_id_to_run, VmState::Terminated);
324 } else {
325 let joining_vms: Vec<(VmId, u64)> = self
326 .states
327 .iter()
328 .filter_map(|(vm_id, state)| match state {
329 VmState::Wait {
330 target_vm_id,
331 exit_code_addr,
332 } if *target_vm_id == vm_id_to_run => Some((*vm_id, *exit_code_addr)),
333 _ => None,
334 })
335 .collect();
336 for (vm_id, exit_code_addr) in joining_vms {
339 let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
340 machine
341 .inner_mut()
342 .memory_mut()
343 .store8(&Self::u64_to_reg(exit_code_addr), &Self::i8_to_reg(code))?;
344 machine
345 .inner_mut()
346 .set_register(A0, Self::u8_to_reg(SUCCESS));
347 self.states.insert(vm_id, VmState::Runnable);
348 }
349 self.fds.retain(|_, vm_id| *vm_id != vm_id_to_run);
351 self.states.remove(&vm_id_to_run);
353 self.instantiated.remove(&vm_id_to_run);
354 self.suspended.remove(&vm_id_to_run);
355 }
356 Ok(())
357 }
358 Err(Error::Yield) => Ok(()),
359 Err(e) => Err(e),
360 }
361 }
362
363 fn iterate(&mut self, pause: Pause, limit_cycles: Cycle) -> Result<(), Error> {
367 let (id, result, cycles) = {
371 let (id, vm) = self.iterate_prepare_machine()?;
372 vm.inner_mut().set_max_cycles(limit_cycles);
373 vm.machine_mut().set_pause(pause);
374 let result = vm.run();
375 let cycles = vm.machine().cycles();
376 vm.inner_mut().set_cycles(0);
377 (id, result, cycles)
378 };
379 self.iteration_cycles = self
380 .iteration_cycles
381 .checked_add(cycles)
382 .ok_or(Error::CyclesExceeded)?;
383 self.iterate_process_results(id, result)
384 }
385
386 fn process_message_box(&mut self) -> Result<(), Error> {
387 let messages: Vec<Message> = self.message_box.lock().expect("lock").drain(..).collect();
388 for message in messages {
389 match message {
390 Message::ExecV2(vm_id, args) => {
391 let (old_context, old_machine) = self
392 .instantiated
393 .get_mut(&vm_id)
394 .ok_or_else(|| Error::Unexpected("Unable to find VM Id".to_string()))?;
395 old_machine
396 .inner_mut()
397 .add_cycles_no_checking(EXEC_LOAD_ELF_V2_CYCLES_BASE)?;
398 let old_cycles = old_machine.machine().cycles();
399 let max_cycles = old_machine.machine().max_cycles();
400 let program = {
401 let sc = old_context.snapshot2_context.lock().expect("lock");
402 sc.load_data(
403 &args.location.data_piece_id,
404 args.location.offset,
405 args.location.length,
406 )?
407 .0
408 };
409 let (context, mut new_machine) = self.create_dummy_vm(&vm_id)?;
410 new_machine.inner_mut().set_max_cycles(max_cycles);
411 new_machine.inner_mut().add_cycles_no_checking(old_cycles)?;
412 self.load_vm_program(
413 &context,
414 &mut new_machine,
415 &args.location,
416 program,
417 VmArgs::Reader {
418 vm_id,
419 argc: args.argc,
420 argv: args.argv,
421 },
422 )?;
423 debug_assert!(self.instantiated.contains_key(&vm_id));
425 self.instantiated.insert(vm_id, (context, new_machine));
426 }
427 Message::Spawn(vm_id, args) => {
428 if args.fds.iter().any(|fd| self.fds.get(fd) != Some(&vm_id)) {
430 let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
431 machine
432 .inner_mut()
433 .set_register(A0, Self::u8_to_reg(INVALID_FD));
434 continue;
435 }
436 if self.suspended.len() + self.instantiated.len() > MAX_VMS_COUNT as usize {
437 let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
438 machine
439 .inner_mut()
440 .set_register(A0, Self::u8_to_reg(MAX_VMS_SPAWNED));
441 continue;
442 }
443 let spawned_vm_id = self.boot_vm(
444 &args.location,
445 VmArgs::Reader {
446 vm_id,
447 argc: args.argc,
448 argv: args.argv,
449 },
450 )?;
451 for fd in &args.fds {
453 self.fds.insert(*fd, spawned_vm_id);
454 }
455 self.inherited_fd.insert(spawned_vm_id, args.fds.clone());
458
459 let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
460 machine.inner_mut().memory_mut().store64(
461 &Self::u64_to_reg(args.process_id_addr),
462 &Self::u64_to_reg(spawned_vm_id),
463 )?;
464 machine
465 .inner_mut()
466 .set_register(A0, Self::u8_to_reg(SUCCESS));
467 }
468 Message::Wait(vm_id, args) => {
469 if let Some(exit_code) = self.terminated_vms.get(&args.target_id).copied() {
470 let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
471 machine.inner_mut().memory_mut().store8(
472 &Self::u64_to_reg(args.exit_code_addr),
473 &Self::i8_to_reg(exit_code),
474 )?;
475 machine
476 .inner_mut()
477 .set_register(A0, Self::u8_to_reg(SUCCESS));
478 self.states.insert(vm_id, VmState::Runnable);
479 self.terminated_vms.retain(|id, _| id != &args.target_id);
480 continue;
481 }
482 if !self.states.contains_key(&args.target_id) {
483 let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
484 machine
485 .inner_mut()
486 .set_register(A0, Self::u8_to_reg(WAIT_FAILURE));
487 continue;
488 }
489 self.states.insert(
491 vm_id,
492 VmState::Wait {
493 target_vm_id: args.target_id,
494 exit_code_addr: args.exit_code_addr,
495 },
496 );
497 }
498 Message::Pipe(vm_id, args) => {
499 if self.fds.len() as u64 >= MAX_FDS {
500 let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
501 machine
502 .inner_mut()
503 .set_register(A0, Self::u8_to_reg(MAX_FDS_CREATED));
504 continue;
505 }
506 let (p1, p2, slot) = Fd::create(self.next_fd_slot);
507 self.next_fd_slot = slot;
508 self.fds.insert(p1, vm_id);
509 self.fds.insert(p2, vm_id);
510 let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
511 machine
512 .inner_mut()
513 .memory_mut()
514 .store64(&Self::u64_to_reg(args.fd1_addr), &Self::u64_to_reg(p1.0))?;
515 machine
516 .inner_mut()
517 .memory_mut()
518 .store64(&Self::u64_to_reg(args.fd2_addr), &Self::u64_to_reg(p2.0))?;
519 machine
520 .inner_mut()
521 .set_register(A0, Self::u8_to_reg(SUCCESS));
522 }
523 Message::FdRead(vm_id, args) => {
524 if !(self.fds.contains_key(&args.fd) && (self.fds[&args.fd] == vm_id)) {
525 let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
526 machine
527 .inner_mut()
528 .set_register(A0, Self::u8_to_reg(INVALID_FD));
529 continue;
530 }
531 if !self.fds.contains_key(&args.fd.other_fd()) {
532 let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
533 machine
534 .inner_mut()
535 .set_register(A0, Self::u8_to_reg(OTHER_END_CLOSED));
536 continue;
537 }
538 self.states.insert(
540 vm_id,
541 VmState::WaitForRead(ReadState {
542 fd: args.fd,
543 length: args.length,
544 buffer_addr: args.buffer_addr,
545 length_addr: args.length_addr,
546 }),
547 );
548 }
549 Message::FdWrite(vm_id, args) => {
550 if !(self.fds.contains_key(&args.fd) && (self.fds[&args.fd] == vm_id)) {
551 let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
552 machine
553 .inner_mut()
554 .set_register(A0, Self::u8_to_reg(INVALID_FD));
555 continue;
556 }
557 if !self.fds.contains_key(&args.fd.other_fd()) {
558 let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
559 machine
560 .inner_mut()
561 .set_register(A0, Self::u8_to_reg(OTHER_END_CLOSED));
562 continue;
563 }
564 self.states.insert(
566 vm_id,
567 VmState::WaitForWrite(WriteState {
568 fd: args.fd,
569 consumed: 0,
570 length: args.length,
571 buffer_addr: args.buffer_addr,
572 length_addr: args.length_addr,
573 }),
574 );
575 }
576 Message::InheritedFileDescriptor(vm_id, args) => {
577 let inherited_fd = if vm_id == ROOT_VM_ID {
578 Vec::new()
579 } else {
580 self.inherited_fd[&vm_id].clone()
581 };
582 let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
583 let FdArgs {
584 buffer_addr,
585 length_addr,
586 ..
587 } = args;
588 let full_length = machine
589 .inner_mut()
590 .memory_mut()
591 .load64(&Self::u64_to_reg(length_addr))?
592 .to_u64();
593 let real_length = inherited_fd.len() as u64;
594 let copy_length = u64::min(full_length, real_length);
595 for i in 0..copy_length {
596 let fd = inherited_fd[i as usize].0;
597 let addr = buffer_addr.checked_add(i * 8).ok_or(Error::MemOutOfBound)?;
598 machine
599 .inner_mut()
600 .memory_mut()
601 .store64(&Self::u64_to_reg(addr), &Self::u64_to_reg(fd))?;
602 }
603 machine.inner_mut().memory_mut().store64(
604 &Self::u64_to_reg(length_addr),
605 &Self::u64_to_reg(real_length),
606 )?;
607 machine
608 .inner_mut()
609 .set_register(A0, Self::u8_to_reg(SUCCESS));
610 }
611 Message::Close(vm_id, fd) => {
612 if self.fds.get(&fd) != Some(&vm_id) {
613 let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
614 machine
615 .inner_mut()
616 .set_register(A0, Self::u8_to_reg(INVALID_FD));
617 } else {
618 self.fds.remove(&fd);
619 let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
620 machine
621 .inner_mut()
622 .set_register(A0, Self::u8_to_reg(SUCCESS));
623 }
624 }
625 }
626 }
627 Ok(())
628 }
629
630 fn process_io(&mut self) -> Result<(), Error> {
631 let mut reads: HashMap<Fd, (VmId, ReadState)> = HashMap::default();
632 let mut closed_fds: Vec<VmId> = Vec::new();
633
634 self.states.iter().for_each(|(vm_id, state)| {
635 if let VmState::WaitForRead(inner_state) = state {
636 if self.fds.contains_key(&inner_state.fd.other_fd()) {
637 reads.insert(inner_state.fd, (*vm_id, inner_state.clone()));
638 } else {
639 closed_fds.push(*vm_id);
640 }
641 }
642 });
643 let mut pairs: Vec<(VmId, ReadState, VmId, WriteState)> = Vec::new();
644 self.states.iter().for_each(|(vm_id, state)| {
645 if let VmState::WaitForWrite(inner_state) = state {
646 if self.fds.contains_key(&inner_state.fd.other_fd()) {
647 if let Some((read_vm_id, read_state)) = reads.get(&inner_state.fd.other_fd()) {
648 pairs.push((*read_vm_id, read_state.clone(), *vm_id, inner_state.clone()));
649 }
650 } else {
651 closed_fds.push(*vm_id);
652 }
653 }
654 });
655 for vm_id in closed_fds {
657 match self.states[&vm_id].clone() {
658 VmState::WaitForRead(ReadState { length_addr, .. }) => {
659 let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
660 machine.inner_mut().memory_mut().store64(
661 &Self::u64_to_reg(length_addr),
662 &<M::Inner as CoreMachine>::REG::zero(),
663 )?;
664 machine
665 .inner_mut()
666 .set_register(A0, Self::u8_to_reg(SUCCESS));
667 self.states.insert(vm_id, VmState::Runnable);
668 }
669 VmState::WaitForWrite(WriteState {
670 consumed,
671 length_addr,
672 ..
673 }) => {
674 let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
675 machine
676 .inner_mut()
677 .memory_mut()
678 .store64(&Self::u64_to_reg(length_addr), &Self::u64_to_reg(consumed))?;
679 machine
680 .inner_mut()
681 .set_register(A0, Self::u8_to_reg(SUCCESS));
682 self.states.insert(vm_id, VmState::Runnable);
683 }
684 _ => (),
685 }
686 }
687 for (read_vm_id, read_state, write_vm_id, write_state) in pairs {
689 let ReadState {
690 length: read_length,
691 buffer_addr: read_buffer_addr,
692 length_addr: read_length_addr,
693 ..
694 } = read_state;
695 let WriteState {
696 fd: write_fd,
697 mut consumed,
698 length: write_length,
699 buffer_addr: write_buffer_addr,
700 length_addr: write_length_addr,
701 } = write_state;
702
703 self.ensure_vms_instantiated(&[read_vm_id, write_vm_id])?;
704 {
705 let fillable = read_length;
706 let consumable = write_length - consumed;
707 let copiable = std::cmp::min(fillable, consumable);
708
709 let (_, write_machine) = self
711 .instantiated
712 .get_mut(&write_vm_id)
713 .ok_or_else(|| Error::Unexpected("Unable to find VM Id".to_string()))?;
714 write_machine
715 .inner_mut()
716 .add_cycles_no_checking(transferred_byte_cycles(copiable))?;
717 let data = write_machine
718 .inner_mut()
719 .memory_mut()
720 .load_bytes(write_buffer_addr.wrapping_add(consumed), copiable)?;
721 let (_, read_machine) = self
722 .instantiated
723 .get_mut(&read_vm_id)
724 .ok_or_else(|| Error::Unexpected("Unable to find VM Id".to_string()))?;
725 read_machine
726 .inner_mut()
727 .add_cycles_no_checking(transferred_byte_cycles(copiable))?;
728 read_machine
729 .inner_mut()
730 .memory_mut()
731 .store_bytes(read_buffer_addr, &data)?;
732 read_machine.inner_mut().memory_mut().store64(
734 &Self::u64_to_reg(read_length_addr),
735 &Self::u64_to_reg(copiable),
736 )?;
737 read_machine
738 .inner_mut()
739 .set_register(A0, Self::u8_to_reg(SUCCESS));
740 self.states.insert(read_vm_id, VmState::Runnable);
741
742 consumed += copiable;
745 if consumed == write_length {
746 let (_, write_machine) = self
748 .instantiated
749 .get_mut(&write_vm_id)
750 .ok_or_else(|| Error::Unexpected("Unable to find VM Id".to_string()))?;
751 write_machine.inner_mut().memory_mut().store64(
752 &Self::u64_to_reg(write_length_addr),
753 &Self::u64_to_reg(write_length),
754 )?;
755 write_machine
756 .inner_mut()
757 .set_register(A0, Self::u8_to_reg(SUCCESS));
758 self.states.insert(write_vm_id, VmState::Runnable);
759 } else {
760 self.states.insert(
762 write_vm_id,
763 VmState::WaitForWrite(WriteState {
764 fd: write_fd,
765 consumed,
766 length: write_length,
767 buffer_addr: write_buffer_addr,
768 length_addr: write_length_addr,
769 }),
770 );
771 }
772 }
773 }
774 Ok(())
775 }
776
777 fn ensure_vms_instantiated(&mut self, ids: &[VmId]) -> Result<(), Error> {
779 if ids.len() > MAX_INSTANTIATED_VMS {
780 return Err(Error::Unexpected(format!(
781 "At most {} VMs can be instantiated but {} are requested!",
782 MAX_INSTANTIATED_VMS,
783 ids.len()
784 )));
785 }
786
787 let mut uninstantiated_ids: Vec<VmId> = ids
788 .iter()
789 .filter(|id| !self.instantiated.contains_key(id))
790 .copied()
791 .collect();
792 while (!uninstantiated_ids.is_empty()) && (self.instantiated.len() < MAX_INSTANTIATED_VMS) {
793 let id = uninstantiated_ids
794 .pop()
795 .ok_or_else(|| Error::Unexpected("Map should not be empty".to_string()))?;
796 self.resume_vm(&id)?;
797 }
798
799 if !uninstantiated_ids.is_empty() {
800 let suspendable_ids: Vec<VmId> = self
802 .instantiated
803 .keys()
804 .filter(|id| !ids.contains(id))
805 .copied()
806 .collect();
807
808 assert!(suspendable_ids.len() >= uninstantiated_ids.len());
809 for i in 0..uninstantiated_ids.len() {
810 self.suspend_vm(&suspendable_ids[i])?;
811 self.resume_vm(&uninstantiated_ids[i])?;
812 }
813 }
814
815 Ok(())
816 }
817
818 fn ensure_get_instantiated(&mut self, id: &VmId) -> Result<&mut (VmContext<DL>, M), Error> {
820 self.ensure_vms_instantiated(&[*id])?;
821 self.instantiated
822 .get_mut(id)
823 .ok_or_else(|| Error::Unexpected("Unable to find VM Id".to_string()))
824 }
825
826 fn resume_vm(&mut self, id: &VmId) -> Result<(), Error> {
828 if !self.suspended.contains_key(id) {
829 return Err(Error::Unexpected(format!("VM {:?} is not suspended!", id)));
830 }
831 let snapshot = &self.suspended[id];
832 self.iteration_cycles = self
833 .iteration_cycles
834 .checked_add(SPAWN_EXTRA_CYCLES_BASE)
835 .ok_or(Error::CyclesExceeded)?;
836 let (context, mut machine) = self.create_dummy_vm(id)?;
837 {
838 let mut sc = context.snapshot2_context.lock().expect("lock");
839 sc.resume(machine.inner_mut(), snapshot)?;
840 }
841 self.instantiated.insert(*id, (context, machine));
842 self.suspended.remove(id);
843 Ok(())
844 }
845
846 fn suspend_vm(&mut self, id: &VmId) -> Result<(), Error> {
848 if !self.instantiated.contains_key(id) {
849 return Err(Error::Unexpected(format!(
850 "VM {:?} is not instantiated!",
851 id
852 )));
853 }
854 self.iteration_cycles = self
855 .iteration_cycles
856 .checked_add(SPAWN_EXTRA_CYCLES_BASE)
857 .ok_or(Error::CyclesExceeded)?;
858 let (context, machine) = self
859 .instantiated
860 .get_mut(id)
861 .ok_or_else(|| Error::Unexpected("Unable to find VM Id".to_string()))?;
862 let snapshot = {
863 let sc = context.snapshot2_context.lock().expect("lock");
864 sc.make_snapshot(machine.inner_mut())?
865 };
866 self.suspended.insert(*id, snapshot);
867 self.instantiated.remove(id);
868 Ok(())
869 }
870
871 pub fn boot_vm(&mut self, location: &DataLocation, args: VmArgs) -> Result<VmId, Error> {
873 let id = self.next_vm_id;
874 self.next_vm_id += 1;
875 let (context, mut machine) = self.create_dummy_vm(&id)?;
876 let (program, _) = {
877 let sc = context.snapshot2_context.lock().expect("lock");
878 sc.load_data(&location.data_piece_id, location.offset, location.length)?
879 };
880 self.load_vm_program(&context, &mut machine, location, program, args)?;
881 while self.instantiated.len() >= MAX_INSTANTIATED_VMS {
883 let id = *self
885 .instantiated
886 .first_entry()
887 .ok_or_else(|| Error::Unexpected("Map should not be empty".to_string()))?
888 .key();
889 self.suspend_vm(&id)?;
890 }
891
892 self.instantiated.insert(id, (context, machine));
893 self.states.insert(id, VmState::Runnable);
894
895 Ok(id)
896 }
897
898 fn load_vm_program(
900 &mut self,
901 context: &VmContext<DL>,
902 machine: &mut M,
903 location: &DataLocation,
904 program: Bytes,
905 args: VmArgs,
906 ) -> Result<u64, Error> {
907 let metadata = parse_elf::<u64>(&program, machine.inner_mut().version())?;
908 let bytes = match args {
909 VmArgs::Reader { vm_id, argc, argv } => {
910 let (_, machine_from) = self.ensure_get_instantiated(&vm_id)?;
911 let argc = Self::u64_to_reg(argc);
912 let argv = Self::u64_to_reg(argv);
913 let argv =
914 FlattenedArgsReader::new(machine_from.inner_mut().memory_mut(), argc, argv);
915 machine.load_program_with_metadata(&program, &metadata, argv)?
916 }
917 VmArgs::Vector(data) => {
918 machine.load_program_with_metadata(&program, &metadata, data.into_iter().map(Ok))?
919 }
920 };
921 let mut sc = context.snapshot2_context.lock().expect("lock");
922 sc.mark_program(
923 machine.inner_mut(),
924 &metadata,
925 &location.data_piece_id,
926 location.offset,
927 )?;
928 machine
929 .inner_mut()
930 .add_cycles_no_checking(transferred_byte_cycles(bytes))?;
931 Ok(bytes)
932 }
933
934 fn create_dummy_vm(&self, id: &VmId) -> Result<(VmContext<DL>, M), Error> {
936 let version = &self.sg_data.sg_info.script_version;
937 let core_machine = M::Inner::new(
938 version.vm_isa(),
939 version.vm_version(),
940 u64::MAX,
942 );
943 let vm_context = VmContext {
944 base_cycles: Arc::clone(&self.total_cycles),
945 message_box: Arc::clone(&self.message_box),
946 snapshot2_context: Arc::new(Mutex::new(Snapshot2Context::new(self.sg_data.clone()))),
947 };
948
949 let machine_builder = DefaultMachineBuilder::new(core_machine)
950 .instruction_cycle_func(Box::new(estimate_cycles));
951 let machine_builder =
952 (self.syscall_generator)(id, &self.sg_data, &vm_context, &self.syscall_context)
953 .into_iter()
954 .fold(machine_builder, |builder, syscall| builder.syscall(syscall));
955 let default_machine = machine_builder.build();
956 Ok((vm_context, M::new(default_machine)))
957 }
958
959 fn i8_to_reg(v: i8) -> <M::Inner as CoreMachine>::REG {
960 <M::Inner as CoreMachine>::REG::from_i8(v)
961 }
962
963 fn u8_to_reg(v: u8) -> <M::Inner as CoreMachine>::REG {
964 <M::Inner as CoreMachine>::REG::from_u8(v)
965 }
966
967 fn u64_to_reg(v: u64) -> <M::Inner as CoreMachine>::REG {
968 <M::Inner as CoreMachine>::REG::from_u64(v)
969 }
970}