1use crate::cost_model::transferred_byte_cycles;
2use crate::syscalls::{
3 generator::generate_ckb_syscalls, EXEC_LOAD_ELF_V2_CYCLES_BASE, INVALID_FD, MAX_FDS_CREATED,
4 MAX_VMS_SPAWNED, OTHER_END_CLOSED, SPAWN_EXTRA_CYCLES_BASE, SUCCESS, WAIT_FAILURE,
5};
6
7use crate::types::{
8 CoreMachineType, DataLocation, DataPieceId, DebugContext, Fd, FdArgs, FullSuspendedState,
9 Machine, Message, ReadState, RunMode, SgData, VmArgs, VmContext, VmId, VmState, WriteState,
10 FIRST_FD_SLOT, FIRST_VM_ID,
11};
12use ckb_traits::{CellDataProvider, ExtensionProvider, HeaderProvider};
13use ckb_types::core::Cycle;
14use ckb_vm::snapshot2::Snapshot2Context;
15use ckb_vm::{
16 bytes::Bytes,
17 cost_model::estimate_cycles,
18 elf::parse_elf,
19 machine::{CoreMachine, DefaultMachineBuilder, Pause, SupportMachine},
20 memory::Memory,
21 registers::A0,
22 snapshot2::Snapshot2,
23 Error, FlattenedArgsReader, Register,
24};
25use std::collections::{BTreeMap, HashMap};
26use std::sync::{
27 atomic::{AtomicU64, Ordering},
28 Arc, Mutex,
29};
30
31pub const ROOT_VM_ID: VmId = FIRST_VM_ID;
33pub const MAX_VMS_COUNT: u64 = 16;
35pub const MAX_INSTANTIATED_VMS: usize = 4;
37pub const MAX_FDS: u64 = 64;
39
40pub struct Scheduler<DL>
47where
48 DL: CellDataProvider,
49{
50 pub sg_data: SgData<DL>,
52
53 pub debug_context: DebugContext,
55
56 pub total_cycles: Arc<AtomicU64>,
85 pub iteration_cycles: Cycle,
87 pub next_vm_id: VmId,
89 pub next_fd_slot: u64,
91 pub states: BTreeMap<VmId, VmState>,
93 pub fds: BTreeMap<Fd, VmId>,
95 pub inherited_fd: BTreeMap<VmId, Vec<Fd>>,
97 pub instantiated: BTreeMap<VmId, (VmContext<DL>, Machine)>,
99 pub suspended: BTreeMap<VmId, Snapshot2<DataPieceId>>,
101 pub terminated_vms: BTreeMap<VmId, i8>,
103
104 pub message_box: Arc<Mutex<Vec<Message>>>,
107}
108
109impl<DL> Scheduler<DL>
110where
111 DL: CellDataProvider + HeaderProvider + ExtensionProvider + Send + Sync + Clone + 'static,
112{
113 pub fn new(sg_data: SgData<DL>, debug_context: DebugContext) -> Self {
115 Self {
116 sg_data,
117 debug_context,
118 total_cycles: Arc::new(AtomicU64::new(0)),
119 iteration_cycles: 0,
120 next_vm_id: FIRST_VM_ID,
121 next_fd_slot: FIRST_FD_SLOT,
122 states: BTreeMap::default(),
123 fds: BTreeMap::default(),
124 inherited_fd: BTreeMap::default(),
125 instantiated: BTreeMap::default(),
126 suspended: BTreeMap::default(),
127 message_box: Arc::new(Mutex::new(Vec::new())),
128 terminated_vms: BTreeMap::default(),
129 }
130 }
131
132 pub fn consumed_cycles(&self) -> Cycle {
134 self.total_cycles.load(Ordering::Acquire)
135 }
136
137 pub fn consume_cycles(&mut self, cycles: Cycle) -> Result<(), Error> {
139 match self
140 .total_cycles
141 .fetch_update(Ordering::AcqRel, Ordering::Acquire, |total_cycles| {
142 total_cycles.checked_add(cycles)
143 }) {
144 Ok(_) => Ok(()),
145 Err(_) => Err(Error::CyclesExceeded),
146 }
147 }
148
149 pub fn resume(
151 sg_data: SgData<DL>,
152 debug_context: DebugContext,
153 full: FullSuspendedState,
154 ) -> Self {
155 let mut scheduler = Self {
156 sg_data,
157 debug_context,
158 total_cycles: Arc::new(AtomicU64::new(full.total_cycles)),
159 iteration_cycles: 0,
160 next_vm_id: full.next_vm_id,
161 next_fd_slot: full.next_fd_slot,
162 states: full
163 .vms
164 .iter()
165 .map(|(id, state, _)| (*id, state.clone()))
166 .collect(),
167 fds: full.fds.into_iter().collect(),
168 inherited_fd: full.inherited_fd.into_iter().collect(),
169 instantiated: BTreeMap::default(),
170 suspended: full
171 .vms
172 .into_iter()
173 .map(|(id, _, snapshot)| (id, snapshot))
174 .collect(),
175 message_box: Arc::new(Mutex::new(Vec::new())),
176 terminated_vms: full.terminated_vms.into_iter().collect(),
177 };
178 scheduler
179 .ensure_vms_instantiated(&full.instantiated_ids)
180 .unwrap();
181 scheduler.iteration_cycles = 0;
185 scheduler
186 }
187
188 pub fn suspend(mut self) -> Result<FullSuspendedState, Error> {
190 assert!(self.message_box.lock().expect("lock").is_empty());
191 let mut vms = Vec::with_capacity(self.states.len());
192 let instantiated_ids: Vec<_> = self.instantiated.keys().cloned().collect();
193 for id in &instantiated_ids {
194 self.suspend_vm(id)?;
195 }
196 for (id, state) in self.states {
197 let snapshot = self
198 .suspended
199 .remove(&id)
200 .ok_or_else(|| Error::Unexpected("Unable to find VM Id".to_string()))?;
201 vms.push((id, state, snapshot));
202 }
203 Ok(FullSuspendedState {
204 total_cycles: self.total_cycles.load(Ordering::Acquire),
209 next_vm_id: self.next_vm_id,
210 next_fd_slot: self.next_fd_slot,
211 vms,
212 fds: self.fds.into_iter().collect(),
213 inherited_fd: self.inherited_fd.into_iter().collect(),
214 terminated_vms: self.terminated_vms.into_iter().collect(),
215 instantiated_ids,
216 })
217 }
218
219 pub fn run(&mut self, mode: RunMode) -> Result<(i8, Cycle), Error> {
233 if self.states.is_empty() {
234 let program_id = self.sg_data.sg_info.program_data_piece_id.clone();
236 assert_eq!(
237 self.boot_vm(
238 &DataLocation {
239 data_piece_id: program_id,
240 offset: 0,
241 length: u64::MAX,
242 },
243 VmArgs::Vector(vec![]),
244 )?,
245 ROOT_VM_ID
246 );
247 }
248 assert!(self.states.contains_key(&ROOT_VM_ID));
249
250 let (pause, mut limit_cycles) = match mode {
251 RunMode::LimitCycles(limit_cycles) => (Pause::new(), limit_cycles),
252 RunMode::Pause(pause) => (pause, u64::MAX),
253 };
254
255 while self.states[&ROOT_VM_ID] != VmState::Terminated {
256 assert_eq!(self.iteration_cycles, 0);
257 let iterate_return = self.iterate(pause.clone(), limit_cycles);
258 self.consume_cycles(self.iteration_cycles)?;
259 limit_cycles = limit_cycles
260 .checked_sub(self.iteration_cycles)
261 .ok_or(Error::CyclesExceeded)?;
262 self.iteration_cycles = 0;
264 iterate_return?;
265 }
266
267 let root_vm = &self.instantiated[&ROOT_VM_ID];
269 Ok((root_vm.1.machine.exit_code(), self.consumed_cycles()))
270 }
271
272 pub fn iterate_prepare_machine(&mut self) -> Result<(u64, &mut Machine), Error> {
274 self.process_io()?;
276 let vm_id_to_run = self
278 .states
279 .iter()
280 .rev()
281 .filter(|(_, state)| matches!(state, VmState::Runnable))
282 .map(|(id, _)| *id)
283 .next();
284 let vm_id_to_run = vm_id_to_run.ok_or_else(|| {
285 Error::Unexpected("A deadlock situation has been reached!".to_string())
286 })?;
287 let (_context, machine) = self.ensure_get_instantiated(&vm_id_to_run)?;
288 Ok((vm_id_to_run, machine))
289 }
290
291 pub fn iterate_process_results(
293 &mut self,
294 vm_id_to_run: u64,
295 result: Result<i8, Error>,
296 ) -> Result<(), Error> {
297 self.process_message_box()?;
299 assert!(self.message_box.lock().expect("lock").is_empty());
300 match result {
302 Ok(code) => {
303 self.terminated_vms.insert(vm_id_to_run, code);
304 if vm_id_to_run == ROOT_VM_ID {
308 self.ensure_vms_instantiated(&[vm_id_to_run])?;
309 self.instantiated.retain(|id, _| *id == vm_id_to_run);
310 self.suspended.clear();
311 self.states.clear();
312 self.states.insert(vm_id_to_run, VmState::Terminated);
313 } else {
314 let joining_vms: Vec<(VmId, u64)> = self
315 .states
316 .iter()
317 .filter_map(|(vm_id, state)| match state {
318 VmState::Wait {
319 target_vm_id,
320 exit_code_addr,
321 } if *target_vm_id == vm_id_to_run => Some((*vm_id, *exit_code_addr)),
322 _ => None,
323 })
324 .collect();
325 for (vm_id, exit_code_addr) in joining_vms {
328 let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
329 machine
330 .machine
331 .memory_mut()
332 .store8(&exit_code_addr, &u64::from_i8(code))?;
333 machine.machine.set_register(A0, SUCCESS as u64);
334 self.states.insert(vm_id, VmState::Runnable);
335 }
336 self.fds.retain(|_, vm_id| *vm_id != vm_id_to_run);
338 self.states.remove(&vm_id_to_run);
340 self.instantiated.remove(&vm_id_to_run);
341 self.suspended.remove(&vm_id_to_run);
342 }
343 Ok(())
344 }
345 Err(Error::Yield) => Ok(()),
346 Err(e) => Err(e),
347 }
348 }
349
350 fn iterate(&mut self, pause: Pause, limit_cycles: Cycle) -> Result<(), Error> {
354 let (id, result, cycles) = {
358 let (id, vm) = self.iterate_prepare_machine()?;
359 vm.set_max_cycles(limit_cycles);
360 vm.machine.set_pause(pause);
361 let result = vm.run();
362 let cycles = vm.machine.cycles();
363 vm.machine.set_cycles(0);
364 (id, result, cycles)
365 };
366 self.iteration_cycles = self
367 .iteration_cycles
368 .checked_add(cycles)
369 .ok_or(Error::CyclesExceeded)?;
370 self.iterate_process_results(id, result)
371 }
372
373 fn process_message_box(&mut self) -> Result<(), Error> {
374 let messages: Vec<Message> = self.message_box.lock().expect("lock").drain(..).collect();
375 for message in messages {
376 match message {
377 Message::ExecV2(vm_id, args) => {
378 let (old_context, old_machine) = self
379 .instantiated
380 .get_mut(&vm_id)
381 .ok_or_else(|| Error::Unexpected("Unable to find VM Id".to_string()))?;
382 old_machine
383 .machine
384 .add_cycles_no_checking(EXEC_LOAD_ELF_V2_CYCLES_BASE)?;
385 let old_cycles = old_machine.machine.cycles();
386 let max_cycles = old_machine.machine.max_cycles();
387 let program = {
388 let mut sc = old_context.snapshot2_context.lock().expect("lock");
389 sc.load_data(
390 &args.location.data_piece_id,
391 args.location.offset,
392 args.location.length,
393 )?
394 .0
395 };
396 let (context, mut new_machine) = self.create_dummy_vm(&vm_id)?;
397 new_machine.set_max_cycles(max_cycles);
398 new_machine.machine.add_cycles_no_checking(old_cycles)?;
399 self.load_vm_program(
400 &context,
401 &mut new_machine,
402 &args.location,
403 program,
404 VmArgs::Reader {
405 vm_id,
406 argc: args.argc,
407 argv: args.argv,
408 },
409 )?;
410 debug_assert!(self.instantiated.contains_key(&vm_id));
412 self.instantiated.insert(vm_id, (context, new_machine));
413 }
414 Message::Spawn(vm_id, args) => {
415 if args.fds.iter().any(|fd| self.fds.get(fd) != Some(&vm_id)) {
417 let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
418 machine.machine.set_register(A0, INVALID_FD as u64);
419 continue;
420 }
421 if self.suspended.len() + self.instantiated.len() > MAX_VMS_COUNT as usize {
422 let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
423 machine.machine.set_register(A0, MAX_VMS_SPAWNED as u64);
424 continue;
425 }
426 let spawned_vm_id = self.boot_vm(
427 &args.location,
428 VmArgs::Reader {
429 vm_id,
430 argc: args.argc,
431 argv: args.argv,
432 },
433 )?;
434 for fd in &args.fds {
436 self.fds.insert(*fd, spawned_vm_id);
437 }
438 self.inherited_fd.insert(spawned_vm_id, args.fds.clone());
441
442 let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
443 machine
444 .machine
445 .memory_mut()
446 .store64(&args.process_id_addr, &spawned_vm_id)?;
447 machine.machine.set_register(A0, SUCCESS as u64);
448 }
449 Message::Wait(vm_id, args) => {
450 if let Some(exit_code) = self.terminated_vms.get(&args.target_id).copied() {
451 let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
452 machine
453 .machine
454 .memory_mut()
455 .store8(&args.exit_code_addr, &u64::from_i8(exit_code))?;
456 machine.machine.set_register(A0, SUCCESS as u64);
457 self.states.insert(vm_id, VmState::Runnable);
458 self.terminated_vms.retain(|id, _| id != &args.target_id);
459 continue;
460 }
461 if !self.states.contains_key(&args.target_id) {
462 let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
463 machine.machine.set_register(A0, WAIT_FAILURE as u64);
464 continue;
465 }
466 self.states.insert(
468 vm_id,
469 VmState::Wait {
470 target_vm_id: args.target_id,
471 exit_code_addr: args.exit_code_addr,
472 },
473 );
474 }
475 Message::Pipe(vm_id, args) => {
476 if self.fds.len() as u64 >= MAX_FDS {
477 let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
478 machine.machine.set_register(A0, MAX_FDS_CREATED as u64);
479 continue;
480 }
481 let (p1, p2, slot) = Fd::create(self.next_fd_slot);
482 self.next_fd_slot = slot;
483 self.fds.insert(p1, vm_id);
484 self.fds.insert(p2, vm_id);
485 let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
486 machine
487 .machine
488 .memory_mut()
489 .store64(&args.fd1_addr, &p1.0)?;
490 machine
491 .machine
492 .memory_mut()
493 .store64(&args.fd2_addr, &p2.0)?;
494 machine.machine.set_register(A0, SUCCESS as u64);
495 }
496 Message::FdRead(vm_id, args) => {
497 if !(self.fds.contains_key(&args.fd) && (self.fds[&args.fd] == vm_id)) {
498 let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
499 machine.machine.set_register(A0, INVALID_FD as u64);
500 continue;
501 }
502 if !self.fds.contains_key(&args.fd.other_fd()) {
503 let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
504 machine.machine.set_register(A0, OTHER_END_CLOSED as u64);
505 continue;
506 }
507 self.states.insert(
509 vm_id,
510 VmState::WaitForRead(ReadState {
511 fd: args.fd,
512 length: args.length,
513 buffer_addr: args.buffer_addr,
514 length_addr: args.length_addr,
515 }),
516 );
517 }
518 Message::FdWrite(vm_id, args) => {
519 if !(self.fds.contains_key(&args.fd) && (self.fds[&args.fd] == vm_id)) {
520 let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
521 machine.machine.set_register(A0, INVALID_FD as u64);
522 continue;
523 }
524 if !self.fds.contains_key(&args.fd.other_fd()) {
525 let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
526 machine.machine.set_register(A0, OTHER_END_CLOSED as u64);
527 continue;
528 }
529 self.states.insert(
531 vm_id,
532 VmState::WaitForWrite(WriteState {
533 fd: args.fd,
534 consumed: 0,
535 length: args.length,
536 buffer_addr: args.buffer_addr,
537 length_addr: args.length_addr,
538 }),
539 );
540 }
541 Message::InheritedFileDescriptor(vm_id, args) => {
542 let inherited_fd = if vm_id == ROOT_VM_ID {
543 Vec::new()
544 } else {
545 self.inherited_fd[&vm_id].clone()
546 };
547 let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
548 let FdArgs {
549 buffer_addr,
550 length_addr,
551 ..
552 } = args;
553 let full_length = machine
554 .machine
555 .inner_mut()
556 .memory_mut()
557 .load64(&length_addr)?;
558 let real_length = inherited_fd.len() as u64;
559 let copy_length = u64::min(full_length, real_length);
560 for i in 0..copy_length {
561 let fd = inherited_fd[i as usize].0;
562 let addr = buffer_addr.checked_add(i * 8).ok_or(Error::MemOutOfBound)?;
563 machine
564 .machine
565 .inner_mut()
566 .memory_mut()
567 .store64(&addr, &fd)?;
568 }
569 machine
570 .machine
571 .inner_mut()
572 .memory_mut()
573 .store64(&length_addr, &real_length)?;
574 machine.machine.set_register(A0, SUCCESS as u64);
575 }
576 Message::Close(vm_id, fd) => {
577 if self.fds.get(&fd) != Some(&vm_id) {
578 let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
579 machine.machine.set_register(A0, INVALID_FD as u64);
580 } else {
581 self.fds.remove(&fd);
582 let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
583 machine.machine.set_register(A0, SUCCESS as u64);
584 }
585 }
586 }
587 }
588 Ok(())
589 }
590
591 fn process_io(&mut self) -> Result<(), Error> {
592 let mut reads: HashMap<Fd, (VmId, ReadState)> = HashMap::default();
593 let mut closed_fds: Vec<VmId> = Vec::new();
594 self.states.iter().for_each(|(vm_id, state)| {
595 if let VmState::WaitForRead(inner_state) = state {
596 if self.fds.contains_key(&inner_state.fd.other_fd()) {
597 reads.insert(inner_state.fd, (*vm_id, inner_state.clone()));
598 } else {
599 closed_fds.push(*vm_id);
600 }
601 }
602 });
603 let mut pairs: Vec<(VmId, ReadState, VmId, WriteState)> = Vec::new();
604 self.states.iter().for_each(|(vm_id, state)| {
605 if let VmState::WaitForWrite(inner_state) = state {
606 if self.fds.contains_key(&inner_state.fd.other_fd()) {
607 if let Some((read_vm_id, read_state)) = reads.get(&inner_state.fd.other_fd()) {
608 pairs.push((*read_vm_id, read_state.clone(), *vm_id, inner_state.clone()));
609 }
610 } else {
611 closed_fds.push(*vm_id);
612 }
613 }
614 });
615 for vm_id in closed_fds {
617 match self.states[&vm_id].clone() {
618 VmState::WaitForRead(ReadState { length_addr, .. }) => {
619 let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
620 machine.machine.memory_mut().store64(&length_addr, &0)?;
621 machine.machine.set_register(A0, SUCCESS as u64);
622 self.states.insert(vm_id, VmState::Runnable);
623 }
624 VmState::WaitForWrite(WriteState {
625 consumed,
626 length_addr,
627 ..
628 }) => {
629 let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
630 machine
631 .machine
632 .memory_mut()
633 .store64(&length_addr, &consumed)?;
634 machine.machine.set_register(A0, SUCCESS as u64);
635 self.states.insert(vm_id, VmState::Runnable);
636 }
637 _ => (),
638 }
639 }
640 for (read_vm_id, read_state, write_vm_id, write_state) in pairs {
642 let ReadState {
643 length: read_length,
644 buffer_addr: read_buffer_addr,
645 length_addr: read_length_addr,
646 ..
647 } = read_state;
648 let WriteState {
649 fd: write_fd,
650 mut consumed,
651 length: write_length,
652 buffer_addr: write_buffer_addr,
653 length_addr: write_length_addr,
654 } = write_state;
655
656 self.ensure_vms_instantiated(&[read_vm_id, write_vm_id])?;
657 {
658 let fillable = read_length;
659 let consumable = write_length - consumed;
660 let copiable = std::cmp::min(fillable, consumable);
661
662 let (_, write_machine) = self
664 .instantiated
665 .get_mut(&write_vm_id)
666 .ok_or_else(|| Error::Unexpected("Unable to find VM Id".to_string()))?;
667 write_machine
668 .machine
669 .add_cycles_no_checking(transferred_byte_cycles(copiable))?;
670 let data = write_machine
671 .machine
672 .memory_mut()
673 .load_bytes(write_buffer_addr.wrapping_add(consumed), copiable)?;
674 let (_, read_machine) = self
675 .instantiated
676 .get_mut(&read_vm_id)
677 .ok_or_else(|| Error::Unexpected("Unable to find VM Id".to_string()))?;
678 read_machine
679 .machine
680 .add_cycles_no_checking(transferred_byte_cycles(copiable))?;
681 read_machine
682 .machine
683 .memory_mut()
684 .store_bytes(read_buffer_addr, &data)?;
685 read_machine
687 .machine
688 .memory_mut()
689 .store64(&read_length_addr, &copiable)?;
690 read_machine.machine.set_register(A0, SUCCESS as u64);
691 self.states.insert(read_vm_id, VmState::Runnable);
692
693 consumed += copiable;
696 if consumed == write_length {
697 let (_, write_machine) = self
699 .instantiated
700 .get_mut(&write_vm_id)
701 .ok_or_else(|| Error::Unexpected("Unable to find VM Id".to_string()))?;
702 write_machine
703 .machine
704 .memory_mut()
705 .store64(&write_length_addr, &write_length)?;
706 write_machine.machine.set_register(A0, SUCCESS as u64);
707 self.states.insert(write_vm_id, VmState::Runnable);
708 } else {
709 self.states.insert(
711 write_vm_id,
712 VmState::WaitForWrite(WriteState {
713 fd: write_fd,
714 consumed,
715 length: write_length,
716 buffer_addr: write_buffer_addr,
717 length_addr: write_length_addr,
718 }),
719 );
720 }
721 }
722 }
723 Ok(())
724 }
725
726 fn ensure_vms_instantiated(&mut self, ids: &[VmId]) -> Result<(), Error> {
728 if ids.len() > MAX_INSTANTIATED_VMS {
729 return Err(Error::Unexpected(format!(
730 "At most {} VMs can be instantiated but {} are requested!",
731 MAX_INSTANTIATED_VMS,
732 ids.len()
733 )));
734 }
735
736 let mut uninstantiated_ids: Vec<VmId> = ids
737 .iter()
738 .filter(|id| !self.instantiated.contains_key(id))
739 .copied()
740 .collect();
741 while (!uninstantiated_ids.is_empty()) && (self.instantiated.len() < MAX_INSTANTIATED_VMS) {
742 let id = uninstantiated_ids
743 .pop()
744 .ok_or_else(|| Error::Unexpected("Map should not be empty".to_string()))?;
745 self.resume_vm(&id)?;
746 }
747
748 if !uninstantiated_ids.is_empty() {
749 let suspendable_ids: Vec<VmId> = self
751 .instantiated
752 .keys()
753 .filter(|id| !ids.contains(id))
754 .copied()
755 .collect();
756
757 assert!(suspendable_ids.len() >= uninstantiated_ids.len());
758 for i in 0..uninstantiated_ids.len() {
759 self.suspend_vm(&suspendable_ids[i])?;
760 self.resume_vm(&uninstantiated_ids[i])?;
761 }
762 }
763
764 Ok(())
765 }
766
767 fn ensure_get_instantiated(
769 &mut self,
770 id: &VmId,
771 ) -> Result<&mut (VmContext<DL>, Machine), Error> {
772 self.ensure_vms_instantiated(&[*id])?;
773 self.instantiated
774 .get_mut(id)
775 .ok_or_else(|| Error::Unexpected("Unable to find VM Id".to_string()))
776 }
777
778 fn resume_vm(&mut self, id: &VmId) -> Result<(), Error> {
780 if !self.suspended.contains_key(id) {
781 return Err(Error::Unexpected(format!("VM {:?} is not suspended!", id)));
782 }
783 let snapshot = &self.suspended[id];
784 self.iteration_cycles = self
785 .iteration_cycles
786 .checked_add(SPAWN_EXTRA_CYCLES_BASE)
787 .ok_or(Error::CyclesExceeded)?;
788 let (context, mut machine) = self.create_dummy_vm(id)?;
789 {
790 let mut sc = context.snapshot2_context.lock().expect("lock");
791 sc.resume(&mut machine.machine, snapshot)?;
792 }
793 self.instantiated.insert(*id, (context, machine));
794 self.suspended.remove(id);
795 Ok(())
796 }
797
798 fn suspend_vm(&mut self, id: &VmId) -> Result<(), Error> {
800 if !self.instantiated.contains_key(id) {
801 return Err(Error::Unexpected(format!(
802 "VM {:?} is not instantiated!",
803 id
804 )));
805 }
806 self.iteration_cycles = self
807 .iteration_cycles
808 .checked_add(SPAWN_EXTRA_CYCLES_BASE)
809 .ok_or(Error::CyclesExceeded)?;
810 let (context, machine) = self
811 .instantiated
812 .get_mut(id)
813 .ok_or_else(|| Error::Unexpected("Unable to find VM Id".to_string()))?;
814 let snapshot = {
815 let sc = context.snapshot2_context.lock().expect("lock");
816 sc.make_snapshot(&mut machine.machine)?
817 };
818 self.suspended.insert(*id, snapshot);
819 self.instantiated.remove(id);
820 Ok(())
821 }
822
823 pub fn boot_vm(&mut self, location: &DataLocation, args: VmArgs) -> Result<VmId, Error> {
825 let id = self.next_vm_id;
826 self.next_vm_id += 1;
827 let (context, mut machine) = self.create_dummy_vm(&id)?;
828 let (program, _) = {
829 let mut sc = context.snapshot2_context.lock().expect("lock");
830 sc.load_data(&location.data_piece_id, location.offset, location.length)?
831 };
832 self.load_vm_program(&context, &mut machine, location, program, args)?;
833 while self.instantiated.len() >= MAX_INSTANTIATED_VMS {
835 let id = *self
837 .instantiated
838 .first_entry()
839 .ok_or_else(|| Error::Unexpected("Map should not be empty".to_string()))?
840 .key();
841 self.suspend_vm(&id)?;
842 }
843
844 self.instantiated.insert(id, (context, machine));
845 self.states.insert(id, VmState::Runnable);
846
847 Ok(id)
848 }
849
850 fn load_vm_program(
852 &mut self,
853 context: &VmContext<DL>,
854 machine: &mut Machine,
855 location: &DataLocation,
856 program: Bytes,
857 args: VmArgs,
858 ) -> Result<u64, Error> {
859 let metadata = parse_elf::<u64>(&program, machine.machine.version())?;
860 let bytes = match args {
861 VmArgs::Reader { vm_id, argc, argv } => {
862 let (_, machine_from) = self.ensure_get_instantiated(&vm_id)?;
863 let argv = FlattenedArgsReader::new(machine_from.machine.memory_mut(), argc, argv);
864 machine.load_program_with_metadata(&program, &metadata, argv)?
865 }
866 VmArgs::Vector(data) => {
867 machine.load_program_with_metadata(&program, &metadata, data.into_iter().map(Ok))?
868 }
869 };
870 let mut sc = context.snapshot2_context.lock().expect("lock");
871 sc.mark_program(
872 &mut machine.machine,
873 &metadata,
874 &location.data_piece_id,
875 location.offset,
876 )?;
877 machine
878 .machine
879 .add_cycles_no_checking(transferred_byte_cycles(bytes))?;
880 Ok(bytes)
881 }
882
883 fn create_dummy_vm(&self, id: &VmId) -> Result<(VmContext<DL>, Machine), Error> {
885 let version = &self.sg_data.sg_info.script_version;
889 let core_machine = CoreMachineType::new(
890 version.vm_isa(),
891 version.vm_version(),
892 u64::MAX,
894 );
895 let vm_context = VmContext {
896 base_cycles: Arc::clone(&self.total_cycles),
897 message_box: Arc::clone(&self.message_box),
898 snapshot2_context: Arc::new(Mutex::new(Snapshot2Context::new(self.sg_data.clone()))),
899 };
900
901 let machine_builder = DefaultMachineBuilder::new(core_machine)
902 .instruction_cycle_func(Box::new(estimate_cycles));
903 let machine_builder =
904 generate_ckb_syscalls(id, &self.sg_data, &vm_context, &self.debug_context)
905 .into_iter()
906 .fold(machine_builder, |builder, syscall| builder.syscall(syscall));
907 let default_machine = machine_builder.build();
908 Ok((vm_context, Machine::new(default_machine)))
909 }
910}