Skip to main content

ckb_script/
scheduler.rs

1use crate::cost_model::transferred_byte_cycles;
2use crate::syscalls::{
3    EXEC_LOAD_ELF_V2_CYCLES_BASE, INVALID_FD, MAX_FDS_CREATED, MAX_VMS_SPAWNED, OTHER_END_CLOSED,
4    SPAWN_EXTRA_CYCLES_BASE, SUCCESS, WAIT_FAILURE,
5};
6
7use crate::types::{
8    DataLocation, DataPieceId, FIRST_FD_SLOT, FIRST_VM_ID, Fd, FdArgs, FullSuspendedState,
9    IterationResult, Message, ReadState, RunMode, SgData, SyscallGenerator, TerminatedResult,
10    VmArgs, VmContext, VmId, VmState, WriteState,
11};
12use ckb_traits::{CellDataProvider, ExtensionProvider, HeaderProvider};
13use ckb_types::core::Cycle;
14use ckb_vm::snapshot2::Snapshot2Context;
15use ckb_vm::{
16    Error, FlattenedArgsReader, Register,
17    bytes::Bytes,
18    cost_model::estimate_cycles,
19    elf::parse_elf,
20    machine::{CoreMachine, DefaultMachineBuilder, DefaultMachineRunner, Pause, SupportMachine},
21    memory::Memory,
22    registers::A0,
23    snapshot2::Snapshot2,
24};
25use std::collections::{BTreeMap, HashMap};
26use std::sync::{
27    Arc, Mutex,
28    atomic::{AtomicU64, Ordering},
29};
30
31/// Root process's id.
32pub const ROOT_VM_ID: VmId = FIRST_VM_ID;
33/// The maximum number of VMs that can be created at the same time.
34pub const MAX_VMS_COUNT: u64 = 16;
35/// The maximum number of instantiated VMs.
36pub const MAX_INSTANTIATED_VMS: usize = 4;
37/// The maximum number of fds.
38pub const MAX_FDS: u64 = 64;
39
40/// A single Scheduler instance is used to verify a single script
41/// within a CKB transaction.
42///
43/// A scheduler holds & manipulates a core, the scheduler also holds
44/// all CKB-VM machines, each CKB-VM machine also gets a mutable reference
45/// of the core for IO operations.
46pub struct Scheduler<DL, V, M>
47where
48    DL: CellDataProvider,
49    M: DefaultMachineRunner,
50{
51    /// Immutable context data for current running transaction & script.
52    sg_data: SgData<DL>,
53
54    /// Syscall generator
55    syscall_generator: SyscallGenerator<DL, V, M::Inner>,
56    /// Syscall generator context
57    syscall_context: V,
58
59    /// Total cycles. When a scheduler executes, there are 3 variables
60    /// that might all contain charged cycles: +total_cycles+,
61    /// +iteration_cycles+ and +machine.cycles()+ from the current
62    /// executing virtual machine. At any given time, the sum of all 3
63    /// variables here, represent the total consumed cycles by the current
64    /// scheduler.
65    /// But there are also exceptions: at certain period of time, the cycles
66    /// stored in `machine.cycles()` are moved over to +iteration_cycles+,
67    /// the cycles stored in +iteration_cycles+ would also be moved over to
68    /// +total_cycles+:
69    ///
70    /// * The current running virtual machine would contain consumed
71    ///   cycles in its own machine.cycles() structure.
72    /// * +iteration_cycles+ holds the current consumed cycles each time
73    ///   we executed a virtual machine(also named an iteration). It will
74    ///   always be zero before each iteration(i.e., before each VM starts
75    ///   execution). When a virtual machine finishes execution, the cycles
76    ///   stored in `machine.cycles()` will be moved over to +iteration_cycles+.
77    ///   `machine.cycles()` will then be reset to zero.
78    /// * Processing messages in the message box would alao charge cycles
79    ///   for operations, such as suspending/resuming VMs, transferring data
80    ///   etc. Those cycles were added to +iteration_cycles+ directly. When all
81    ///   postprocessing work is completed, the cycles consumed in
82    ///   +iteration_cycles+ will then be moved to +total_cycles+.
83    ///   +iteration_cycles+ will then be reset to zero.
84    ///
85    /// One can consider that +total_cycles+ contains the total cycles
86    /// consumed in current scheduler, when the scheduler is not busy executing.
87    ///
88    /// NOTE: the above workflow describes the optimal case: `iteration_cycles`
89    /// will always be zero after each iteration. However, our initial implementation
90    /// for Meepo hardfork contains a bug: cycles charged by suspending / resuming
91    /// VMs when processing IOs, will not be reflected in `current cycles` syscalls
92    /// of the subsequent running VMs. To preserve this behavior, consumed cycles in
93    /// iteration_cycles cannot be moved at iterate boundaries. Later hardfork versions
94    /// might fix this, but for the Meepo hardfork, we will have to preserve this behavior.
95    total_cycles: Arc<AtomicU64>,
96    /// Iteration cycles, see +total_cycles+ on its usage
97    iteration_cycles: Cycle,
98    /// Next vm id used by spawn.
99    next_vm_id: VmId,
100    /// Next fd used by pipe.
101    next_fd_slot: u64,
102    /// Used to store VM state.
103    states: BTreeMap<VmId, VmState>,
104    /// Used to confirm the owner of fd.
105    fds: BTreeMap<Fd, VmId>,
106    /// Verify the VM's inherited fd list.
107    inherited_fd: BTreeMap<VmId, Vec<Fd>>,
108    /// Instantiated vms.
109    instantiated: BTreeMap<VmId, (VmContext<DL>, M)>,
110    /// Suspended vms.
111    suspended: BTreeMap<VmId, Snapshot2<DataPieceId>>,
112    /// Terminated vms.
113    terminated_vms: BTreeMap<VmId, i8>,
114    /// Root vm's arguments. Provided for compatibility with surrounding tools. You should not
115    /// read it anywhere except when initializing the root vm.
116    /// Note: This field is intentionally not serialized in FullSuspendedState.
117    root_vm_args: Vec<Bytes>,
118
119    /// MessageBox is expected to be empty before returning from `run`
120    /// function, there is no need to persist messages.
121    message_box: Arc<Mutex<Vec<Message>>>,
122}
123
124impl<DL, V, M> Scheduler<DL, V, M>
125where
126    DL: CellDataProvider + HeaderProvider + ExtensionProvider + Clone,
127    V: Clone,
128    M: DefaultMachineRunner,
129{
130    /// Create a new scheduler from empty state
131    pub fn new(
132        sg_data: SgData<DL>,
133        syscall_generator: SyscallGenerator<DL, V, M::Inner>,
134        syscall_context: V,
135    ) -> Self {
136        Self {
137            sg_data,
138            syscall_generator,
139            syscall_context,
140            total_cycles: Arc::new(AtomicU64::new(0)),
141            iteration_cycles: 0,
142            next_vm_id: FIRST_VM_ID,
143            next_fd_slot: FIRST_FD_SLOT,
144            states: BTreeMap::default(),
145            fds: BTreeMap::default(),
146            inherited_fd: BTreeMap::default(),
147            instantiated: BTreeMap::default(),
148            suspended: BTreeMap::default(),
149            message_box: Arc::new(Mutex::new(Vec::new())),
150            terminated_vms: BTreeMap::default(),
151            root_vm_args: Vec::new(),
152        }
153    }
154
155    /// Return total cycles.
156    pub fn consumed_cycles(&self) -> Cycle {
157        self.total_cycles.load(Ordering::Acquire)
158    }
159
160    /// Fetch specified VM state
161    pub fn state(&self, vm_id: &VmId) -> Option<VmState> {
162        self.states.get(vm_id).cloned()
163    }
164
165    /// Access the SgData data structure
166    pub fn sg_data(&self) -> &SgData<DL> {
167        &self.sg_data
168    }
169
170    /// This function provides a peek into one of the current created
171    /// VM. Depending on the actual state, the VM might either be instantiated
172    /// or suspended. As a result, 2 callback functions must be provided to handle
173    /// both cases. The function only provides a *peek*, meaning the caller must
174    /// not make any changes to an instantiated VMs. the VM is passed as a mutable
175    /// reference only because memory load functions in CKB-VM require mutable
176    /// references. It does not mean the caller can modify the VM in any sense.
177    /// Even a slight tampering of the VM can result in non-determinism.
178    pub fn peek<F, G, W>(&mut self, vm_id: &VmId, mut f: F, mut g: G) -> Result<W, Error>
179    where
180        F: FnMut(&mut M) -> Result<W, Error>,
181        G: FnMut(&Snapshot2<DataPieceId>, &SgData<DL>) -> Result<W, Error>,
182    {
183        if let Some((_, machine)) = self.instantiated.get_mut(vm_id) {
184            return f(machine);
185        }
186        if let Some(snapshot) = self.suspended.get(vm_id) {
187            return g(snapshot, &self.sg_data);
188        }
189        Err(Error::Unexpected(format!("VM {} does not exist!", vm_id)))
190    }
191
192    /// Add cycles to total cycles.
193    fn consume_cycles(&mut self, cycles: Cycle) -> Result<(), Error> {
194        match self
195            .total_cycles
196            .fetch_update(Ordering::AcqRel, Ordering::Acquire, |total_cycles| {
197                total_cycles.checked_add(cycles)
198            }) {
199            Ok(_) => Ok(()),
200            Err(_) => Err(Error::CyclesExceeded),
201        }
202    }
203
204    /// Resume a previously suspended scheduler state
205    pub fn resume(
206        sg_data: SgData<DL>,
207        syscall_generator: SyscallGenerator<DL, V, M::Inner>,
208        syscall_context: V,
209        full: FullSuspendedState,
210    ) -> Self {
211        let mut scheduler = Self {
212            sg_data,
213            syscall_generator,
214            syscall_context,
215            total_cycles: Arc::new(AtomicU64::new(full.total_cycles)),
216            iteration_cycles: full.iteration_cycles,
217            next_vm_id: full.next_vm_id,
218            next_fd_slot: full.next_fd_slot,
219            states: full
220                .vms
221                .iter()
222                .map(|(id, state, _)| (*id, state.clone()))
223                .collect(),
224            fds: full.fds.into_iter().collect(),
225            inherited_fd: full.inherited_fd.into_iter().collect(),
226            instantiated: BTreeMap::default(),
227            suspended: full
228                .vms
229                .into_iter()
230                .map(|(id, _, snapshot)| (id, snapshot))
231                .collect(),
232            message_box: Arc::new(Mutex::new(Vec::new())),
233            terminated_vms: full.terminated_vms.into_iter().collect(),
234            root_vm_args: Vec::new(),
235        };
236        scheduler
237            .ensure_vms_instantiated(&full.instantiated_ids)
238            .unwrap();
239        // NOTE: suspending/resuming a scheduler is part of CKB's implementation
240        // details. It is not part of execution consensue. We should not charge
241        // cycles for them.
242        scheduler.iteration_cycles = 0;
243        scheduler
244    }
245
246    /// Suspend current scheduler into a serializable full state
247    pub fn suspend(mut self) -> Result<FullSuspendedState, Error> {
248        assert!(self.message_box.lock().expect("lock").is_empty());
249        let mut vms = Vec::with_capacity(self.states.len());
250        let instantiated_ids: Vec<_> = self.instantiated.keys().cloned().collect();
251        for id in &instantiated_ids {
252            self.suspend_vm(id)?;
253        }
254        for (id, state) in self.states {
255            let snapshot = self
256                .suspended
257                .remove(&id)
258                .ok_or_else(|| Error::Unexpected("Unable to find VM Id".to_string()))?;
259            vms.push((id, state, snapshot));
260        }
261        Ok(FullSuspendedState {
262            // NOTE: suspending a scheduler is actually part of CKB's
263            // internal execution logic, it does not belong to VM execution
264            // consensus. We are not charging cycles for suspending
265            // a VM in the process of suspending the whole scheduler.
266            total_cycles: self.total_cycles.load(Ordering::Acquire),
267            iteration_cycles: self.iteration_cycles,
268            next_vm_id: self.next_vm_id,
269            next_fd_slot: self.next_fd_slot,
270            vms,
271            fds: self.fds.into_iter().collect(),
272            inherited_fd: self.inherited_fd.into_iter().collect(),
273            terminated_vms: self.terminated_vms.into_iter().collect(),
274            instantiated_ids,
275        })
276    }
277
278    /// This is the only entrypoint for running the scheduler,
279    /// both newly created instance and resumed instance are supported.
280    /// It accepts 2 run mode, one can either limit the cycles to execute,
281    /// or use a pause signal to trigger termination.
282    ///
283    /// Only when the execution terminates without VM errors, will this
284    /// function return an exit code(could still be non-zero) and total
285    /// consumed cycles.
286    ///
287    /// Err would be returned in the following cases:
288    /// * Cycle limit reached, the returned error would be ckb_vm::Error::CyclesExceeded,
289    /// * Pause trigger, the returned error would be ckb_vm::Error::Pause,
290    /// * Other terminating errors
291    pub fn run(&mut self, mode: RunMode) -> Result<TerminatedResult, Error> {
292        self.boot_root_vm_if_needed()?;
293
294        let (pause, mut limit_cycles) = match mode {
295            RunMode::LimitCycles(limit_cycles) => (Pause::new(), limit_cycles),
296            RunMode::Pause(pause, limit_cycles) => (pause, limit_cycles),
297        };
298
299        while !self.terminated() {
300            limit_cycles = self.iterate_outer(&pause, limit_cycles)?.1;
301        }
302        assert_eq!(self.iteration_cycles, 0);
303
304        self.terminated_result()
305    }
306
307    /// Public API that runs a single VM, processes all messages, then returns the
308    /// executed VM ID(so caller can fetch later data). This can be used when more
309    /// finer tweaks are required for a single VM.
310    pub fn iterate(&mut self) -> Result<IterationResult, Error> {
311        self.boot_root_vm_if_needed()?;
312
313        if self.terminated() {
314            return Ok(IterationResult {
315                executed_vm: ROOT_VM_ID,
316                terminated_status: Some(self.terminated_result()?),
317            });
318        }
319
320        let (id, _) = self.iterate_outer(&Pause::new(), u64::MAX)?;
321        let terminated_status = if self.terminated() {
322            assert_eq!(self.iteration_cycles, 0);
323            Some(self.terminated_result()?)
324        } else {
325            None
326        };
327
328        Ok(IterationResult {
329            executed_vm: id,
330            terminated_status,
331        })
332    }
333
334    /// Returns the machine that needs to be executed in the current iterate.
335    fn iterate_prepare_machine(&mut self) -> Result<(u64, &mut M), Error> {
336        // Find a runnable VM that has the largest ID.
337        let vm_id_to_run = self
338            .states
339            .iter()
340            .rev()
341            .filter(|(_, state)| matches!(state, VmState::Runnable))
342            .map(|(id, _)| *id)
343            .next();
344        let vm_id_to_run = vm_id_to_run.ok_or_else(|| {
345            Error::Unexpected("A deadlock situation has been reached!".to_string())
346        })?;
347        let (_context, machine) = self.ensure_get_instantiated(&vm_id_to_run)?;
348        Ok((vm_id_to_run, machine))
349    }
350
351    /// Process machine execution results in the current iterate.
352    fn iterate_process_results(
353        &mut self,
354        vm_id_to_run: u64,
355        result: Result<i8, Error>,
356    ) -> Result<(), Error> {
357        // Process message box, update VM states accordingly
358        self.process_message_box()?;
359        assert!(self.message_box.lock().expect("lock").is_empty());
360        // If the VM terminates, update VMs in join state, also closes its fds
361
362        match result {
363            Ok(code) => {
364                self.terminated_vms.insert(vm_id_to_run, code);
365                // When root VM terminates, the execution stops immediately, we will purge
366                // all non-root VMs, and only keep root VM in states.
367                // When non-root VM terminates, we only purge the VM's own states.
368                if vm_id_to_run == ROOT_VM_ID {
369                    self.ensure_vms_instantiated(&[vm_id_to_run])?;
370                    self.instantiated.retain(|id, _| *id == vm_id_to_run);
371                    self.suspended.clear();
372                    self.states.clear();
373                    self.states.insert(vm_id_to_run, VmState::Terminated);
374                } else {
375                    let joining_vms: Vec<(VmId, u64)> = self
376                        .states
377                        .iter()
378                        .filter_map(|(vm_id, state)| match state {
379                            VmState::Wait {
380                                target_vm_id,
381                                exit_code_addr,
382                            } if *target_vm_id == vm_id_to_run => Some((*vm_id, *exit_code_addr)),
383                            _ => None,
384                        })
385                        .collect();
386                    // For all joining VMs, update exit code, then mark them as
387                    // runnable state.
388                    for (vm_id, exit_code_addr) in joining_vms {
389                        let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
390                        machine
391                            .inner_mut()
392                            .memory_mut()
393                            .store8(&Self::u64_to_reg(exit_code_addr), &Self::i8_to_reg(code))?;
394                        machine
395                            .inner_mut()
396                            .set_register(A0, Self::u8_to_reg(SUCCESS));
397                        self.states.insert(vm_id, VmState::Runnable);
398                    }
399                    // Close fds
400                    self.fds.retain(|_, vm_id| *vm_id != vm_id_to_run);
401                    // Clear terminated VM states
402                    self.states.remove(&vm_id_to_run);
403                    self.instantiated.remove(&vm_id_to_run);
404                    self.suspended.remove(&vm_id_to_run);
405                }
406                Ok(())
407            }
408            Err(Error::Yield) => Ok(()),
409            Err(e) => Err(e),
410        }
411    }
412
413    // This internal function is actually a wrapper over +iterate_inner+,
414    // it is split into a different function, so cycle calculation will be
415    // executed no matter what result +iterate_inner+ returns.
416    #[inline]
417    fn iterate_outer(
418        &mut self,
419        pause: &Pause,
420        limit_cycles: Cycle,
421    ) -> Result<(VmId, Cycle), Error> {
422        let iterate_return = self.iterate_inner(pause.clone(), limit_cycles);
423        self.consume_cycles(self.iteration_cycles)?;
424        let remaining_cycles = limit_cycles
425            .checked_sub(self.iteration_cycles)
426            .ok_or(Error::CyclesExceeded)?;
427        // Clear iteration cycles intentionally after each run
428        self.iteration_cycles = 0;
429        // Process all pending VM reads & writes. Notice ideally, this invocation
430        // should be put at the end of `iterate_inner` function. However, 2 things
431        // prevent this:
432        //
433        // * In earlier implementation of the Meepo hardfork version, `self.process_io`
434        // was put at the very start of +iterate_prepare_machine+ method. Meaning we used
435        // to process IO syscalls at the very start of a new iteration.
436        // * Earlier implementation contains a bug that cycles consumed by suspending / resuming
437        // VMs are not updated in the subsequent VM's `current cycles` syscalls.
438        //
439        // To make ckb-script package suitable for outside usage, we want IOs processed at
440        // the end of each iteration, not at the start of the next iteration. We also need
441        // to replicate the exact same runtime behavior of Meepo hardfork. This means the only
442        // viable change will be:
443        //
444        // * Move `self.process_io` call to the very end of `iterate_outer` method, which is
445        // exactly current location
446        // * For now we have to live with the fact that `iteration_cycles` will not always be
447        // zero at iteration boundaries, and also preserve its value in `FullSuspendedState`.
448        //
449        // One expected change is that +process_io+ is now called once more
450        // after the whole scheduler terminates, and not called at the very beginning
451        // when no VM is executing. But since no VMs will be in IO states at this 2 timeslot,
452        // we should be fine here.
453        self.process_io()?;
454        let id = iterate_return?;
455        Ok((id, remaining_cycles))
456    }
457
458    // This is internal function that does the actual VM execution loop.
459    // Here both pause signal and limit_cycles are provided so as to simplify
460    // branches.
461    fn iterate_inner(&mut self, pause: Pause, limit_cycles: Cycle) -> Result<VmId, Error> {
462        // Execute the VM for real, consumed cycles in the virtual machine is
463        // moved over to +iteration_cycles+, then we reset virtual machine's own
464        // cycle count to zero.
465        let (id, result, cycles) = {
466            let (id, vm) = self.iterate_prepare_machine()?;
467            vm.inner_mut().set_max_cycles(limit_cycles);
468            vm.machine_mut().set_pause(pause);
469            let result = vm.run();
470            let cycles = vm.machine().cycles();
471            vm.inner_mut().set_cycles(0);
472            (id, result, cycles)
473        };
474        self.iteration_cycles = self
475            .iteration_cycles
476            .checked_add(cycles)
477            .ok_or(Error::CyclesExceeded)?;
478        self.iterate_process_results(id, result)?;
479        Ok(id)
480    }
481
482    fn process_message_box(&mut self) -> Result<(), Error> {
483        let messages: Vec<Message> = self.message_box.lock().expect("lock").drain(..).collect();
484        for message in messages {
485            match message {
486                Message::ExecV2(vm_id, args) => {
487                    let (old_context, old_machine) = self
488                        .instantiated
489                        .get_mut(&vm_id)
490                        .ok_or_else(|| Error::Unexpected("Unable to find VM Id".to_string()))?;
491                    old_machine
492                        .inner_mut()
493                        .add_cycles_no_checking(EXEC_LOAD_ELF_V2_CYCLES_BASE)?;
494                    let old_cycles = old_machine.machine().cycles();
495                    let max_cycles = old_machine.machine().max_cycles();
496                    let program = {
497                        let sc = old_context.snapshot2_context.lock().expect("lock");
498                        sc.load_data(
499                            &args.location.data_piece_id,
500                            args.location.offset,
501                            args.location.length,
502                        )?
503                        .0
504                    };
505                    let (context, mut new_machine) = self.create_dummy_vm(&vm_id)?;
506                    new_machine.inner_mut().set_max_cycles(max_cycles);
507                    new_machine.inner_mut().add_cycles_no_checking(old_cycles)?;
508                    self.load_vm_program(
509                        &context,
510                        &mut new_machine,
511                        &args.location,
512                        program,
513                        VmArgs::Reader {
514                            vm_id,
515                            argc: args.argc,
516                            argv: args.argv,
517                        },
518                    )?;
519                    // The insert operation removes the old vm instance and adds the new vm instance.
520                    debug_assert!(self.instantiated.contains_key(&vm_id));
521                    self.instantiated.insert(vm_id, (context, new_machine));
522                }
523                Message::Spawn(vm_id, args) => {
524                    // All fds must belong to the correct owner
525                    if args.fds.iter().any(|fd| self.fds.get(fd) != Some(&vm_id)) {
526                        let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
527                        machine
528                            .inner_mut()
529                            .set_register(A0, Self::u8_to_reg(INVALID_FD));
530                        continue;
531                    }
532                    if self.suspended.len() + self.instantiated.len() > MAX_VMS_COUNT as usize {
533                        let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
534                        machine
535                            .inner_mut()
536                            .set_register(A0, Self::u8_to_reg(MAX_VMS_SPAWNED));
537                        continue;
538                    }
539                    let spawned_vm_id = self.boot_vm(
540                        &args.location,
541                        VmArgs::Reader {
542                            vm_id,
543                            argc: args.argc,
544                            argv: args.argv,
545                        },
546                    )?;
547                    // Move passed fds from spawner to spawnee
548                    for fd in &args.fds {
549                        self.fds.insert(*fd, spawned_vm_id);
550                    }
551                    // Here we keep the original version of file descriptors.
552                    // If one fd is moved afterward, this inherited file descriptors doesn't change.
553                    self.inherited_fd.insert(spawned_vm_id, args.fds.clone());
554
555                    let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
556                    machine.inner_mut().memory_mut().store64(
557                        &Self::u64_to_reg(args.process_id_addr),
558                        &Self::u64_to_reg(spawned_vm_id),
559                    )?;
560                    machine
561                        .inner_mut()
562                        .set_register(A0, Self::u8_to_reg(SUCCESS));
563                }
564                Message::Wait(vm_id, args) => {
565                    if let Some(exit_code) = self.terminated_vms.get(&args.target_id).copied() {
566                        let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
567                        machine.inner_mut().memory_mut().store8(
568                            &Self::u64_to_reg(args.exit_code_addr),
569                            &Self::i8_to_reg(exit_code),
570                        )?;
571                        machine
572                            .inner_mut()
573                            .set_register(A0, Self::u8_to_reg(SUCCESS));
574                        self.states.insert(vm_id, VmState::Runnable);
575                        self.terminated_vms.retain(|id, _| id != &args.target_id);
576                        continue;
577                    }
578                    if !self.states.contains_key(&args.target_id) {
579                        let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
580                        machine
581                            .inner_mut()
582                            .set_register(A0, Self::u8_to_reg(WAIT_FAILURE));
583                        continue;
584                    }
585                    // Return code will be updated when the joining VM exits
586                    self.states.insert(
587                        vm_id,
588                        VmState::Wait {
589                            target_vm_id: args.target_id,
590                            exit_code_addr: args.exit_code_addr,
591                        },
592                    );
593                }
594                Message::Pipe(vm_id, args) => {
595                    if self.fds.len() as u64 >= MAX_FDS {
596                        let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
597                        machine
598                            .inner_mut()
599                            .set_register(A0, Self::u8_to_reg(MAX_FDS_CREATED));
600                        continue;
601                    }
602                    let (p1, p2, slot) = Fd::create(self.next_fd_slot);
603                    self.next_fd_slot = slot;
604                    self.fds.insert(p1, vm_id);
605                    self.fds.insert(p2, vm_id);
606                    let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
607                    machine
608                        .inner_mut()
609                        .memory_mut()
610                        .store64(&Self::u64_to_reg(args.fd1_addr), &Self::u64_to_reg(p1.0))?;
611                    machine
612                        .inner_mut()
613                        .memory_mut()
614                        .store64(&Self::u64_to_reg(args.fd2_addr), &Self::u64_to_reg(p2.0))?;
615                    machine
616                        .inner_mut()
617                        .set_register(A0, Self::u8_to_reg(SUCCESS));
618                }
619                Message::FdRead(vm_id, args) => {
620                    if !(self.fds.contains_key(&args.fd) && (self.fds[&args.fd] == vm_id)) {
621                        let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
622                        machine
623                            .inner_mut()
624                            .set_register(A0, Self::u8_to_reg(INVALID_FD));
625                        continue;
626                    }
627                    if !self.fds.contains_key(&args.fd.other_fd()) {
628                        let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
629                        machine
630                            .inner_mut()
631                            .set_register(A0, Self::u8_to_reg(OTHER_END_CLOSED));
632                        continue;
633                    }
634                    // Return code will be updated when the read operation finishes
635                    self.states.insert(
636                        vm_id,
637                        VmState::WaitForRead(ReadState {
638                            fd: args.fd,
639                            length: args.length,
640                            buffer_addr: args.buffer_addr,
641                            length_addr: args.length_addr,
642                        }),
643                    );
644                }
645                Message::FdWrite(vm_id, args) => {
646                    if !(self.fds.contains_key(&args.fd) && (self.fds[&args.fd] == vm_id)) {
647                        let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
648                        machine
649                            .inner_mut()
650                            .set_register(A0, Self::u8_to_reg(INVALID_FD));
651                        continue;
652                    }
653                    if !self.fds.contains_key(&args.fd.other_fd()) {
654                        let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
655                        machine
656                            .inner_mut()
657                            .set_register(A0, Self::u8_to_reg(OTHER_END_CLOSED));
658                        continue;
659                    }
660                    // Return code will be updated when the write operation finishes
661                    self.states.insert(
662                        vm_id,
663                        VmState::WaitForWrite(WriteState {
664                            fd: args.fd,
665                            consumed: 0,
666                            length: args.length,
667                            buffer_addr: args.buffer_addr,
668                            length_addr: args.length_addr,
669                        }),
670                    );
671                }
672                Message::InheritedFileDescriptor(vm_id, args) => {
673                    let inherited_fd = if vm_id == ROOT_VM_ID {
674                        Vec::new()
675                    } else {
676                        self.inherited_fd[&vm_id].clone()
677                    };
678                    let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
679                    let FdArgs {
680                        buffer_addr,
681                        length_addr,
682                        ..
683                    } = args;
684                    let full_length = machine
685                        .inner_mut()
686                        .memory_mut()
687                        .load64(&Self::u64_to_reg(length_addr))?
688                        .to_u64();
689                    let real_length = inherited_fd.len() as u64;
690                    let copy_length = u64::min(full_length, real_length);
691                    for i in 0..copy_length {
692                        let fd = inherited_fd[i as usize].0;
693                        let offset = i.checked_mul(8).ok_or(Error::MemOutOfBound)?;
694                        let addr = buffer_addr
695                            .checked_add(offset)
696                            .ok_or(Error::MemOutOfBound)?;
697                        machine
698                            .inner_mut()
699                            .memory_mut()
700                            .store64(&Self::u64_to_reg(addr), &Self::u64_to_reg(fd))?;
701                    }
702                    machine.inner_mut().memory_mut().store64(
703                        &Self::u64_to_reg(length_addr),
704                        &Self::u64_to_reg(real_length),
705                    )?;
706                    machine
707                        .inner_mut()
708                        .set_register(A0, Self::u8_to_reg(SUCCESS));
709                }
710                Message::Close(vm_id, fd) => {
711                    if self.fds.get(&fd) != Some(&vm_id) {
712                        let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
713                        machine
714                            .inner_mut()
715                            .set_register(A0, Self::u8_to_reg(INVALID_FD));
716                    } else {
717                        self.fds.remove(&fd);
718                        let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
719                        machine
720                            .inner_mut()
721                            .set_register(A0, Self::u8_to_reg(SUCCESS));
722                    }
723                }
724            }
725        }
726        Ok(())
727    }
728
729    fn process_io(&mut self) -> Result<(), Error> {
730        let mut reads: HashMap<Fd, (VmId, ReadState)> = HashMap::default();
731        let mut closed_fds: Vec<VmId> = Vec::new();
732
733        self.states.iter().for_each(|(vm_id, state)| {
734            if let VmState::WaitForRead(inner_state) = state {
735                if self.fds.contains_key(&inner_state.fd.other_fd()) {
736                    reads.insert(inner_state.fd, (*vm_id, inner_state.clone()));
737                } else {
738                    closed_fds.push(*vm_id);
739                }
740            }
741        });
742        let mut pairs: Vec<(VmId, ReadState, VmId, WriteState)> = Vec::new();
743        self.states.iter().for_each(|(vm_id, state)| {
744            if let VmState::WaitForWrite(inner_state) = state {
745                if self.fds.contains_key(&inner_state.fd.other_fd()) {
746                    if let Some((read_vm_id, read_state)) = reads.get(&inner_state.fd.other_fd()) {
747                        pairs.push((*read_vm_id, read_state.clone(), *vm_id, inner_state.clone()));
748                    }
749                } else {
750                    closed_fds.push(*vm_id);
751                }
752            }
753        });
754        // Finish read / write syscalls for fds that are closed on the other end
755        for vm_id in closed_fds {
756            match self.states[&vm_id].clone() {
757                VmState::WaitForRead(ReadState { length_addr, .. }) => {
758                    let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
759                    machine.inner_mut().memory_mut().store64(
760                        &Self::u64_to_reg(length_addr),
761                        &<M::Inner as CoreMachine>::REG::zero(),
762                    )?;
763                    machine
764                        .inner_mut()
765                        .set_register(A0, Self::u8_to_reg(SUCCESS));
766                    self.states.insert(vm_id, VmState::Runnable);
767                }
768                VmState::WaitForWrite(WriteState {
769                    consumed,
770                    length_addr,
771                    ..
772                }) => {
773                    let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
774                    machine
775                        .inner_mut()
776                        .memory_mut()
777                        .store64(&Self::u64_to_reg(length_addr), &Self::u64_to_reg(consumed))?;
778                    machine
779                        .inner_mut()
780                        .set_register(A0, Self::u8_to_reg(SUCCESS));
781                    self.states.insert(vm_id, VmState::Runnable);
782                }
783                _ => (),
784            }
785        }
786        // Transferring data from write fds to read fds
787        for (read_vm_id, read_state, write_vm_id, write_state) in pairs {
788            let ReadState {
789                length: read_length,
790                buffer_addr: read_buffer_addr,
791                length_addr: read_length_addr,
792                ..
793            } = read_state;
794            let WriteState {
795                fd: write_fd,
796                mut consumed,
797                length: write_length,
798                buffer_addr: write_buffer_addr,
799                length_addr: write_length_addr,
800            } = write_state;
801
802            self.ensure_vms_instantiated(&[read_vm_id, write_vm_id])?;
803            {
804                let fillable = read_length;
805                let consumable = write_length - consumed;
806                let copiable = std::cmp::min(fillable, consumable);
807
808                // Actual data copying
809                let (_, write_machine) = self
810                    .instantiated
811                    .get_mut(&write_vm_id)
812                    .ok_or_else(|| Error::Unexpected("Unable to find VM Id".to_string()))?;
813                write_machine
814                    .inner_mut()
815                    .add_cycles_no_checking(transferred_byte_cycles(copiable))?;
816                let data = write_machine.inner_mut().memory_mut().load_bytes(
817                    write_buffer_addr
818                        .checked_add(consumed)
819                        .ok_or(Error::MemOutOfBound)?,
820                    copiable,
821                )?;
822                let (_, read_machine) = self
823                    .instantiated
824                    .get_mut(&read_vm_id)
825                    .ok_or_else(|| Error::Unexpected("Unable to find VM Id".to_string()))?;
826                read_machine
827                    .inner_mut()
828                    .add_cycles_no_checking(transferred_byte_cycles(copiable))?;
829                read_machine
830                    .inner_mut()
831                    .memory_mut()
832                    .store_bytes(read_buffer_addr, &data)?;
833                // Read syscall terminates as soon as some data are filled
834                read_machine.inner_mut().memory_mut().store64(
835                    &Self::u64_to_reg(read_length_addr),
836                    &Self::u64_to_reg(copiable),
837                )?;
838                read_machine
839                    .inner_mut()
840                    .set_register(A0, Self::u8_to_reg(SUCCESS));
841                self.states.insert(read_vm_id, VmState::Runnable);
842
843                // Write syscall, however, terminates only when all the data
844                // have been written, or when the pairing read fd is closed.
845                consumed += copiable;
846                if consumed == write_length {
847                    // Write VM has fulfilled its write request
848                    let (_, write_machine) = self
849                        .instantiated
850                        .get_mut(&write_vm_id)
851                        .ok_or_else(|| Error::Unexpected("Unable to find VM Id".to_string()))?;
852                    write_machine.inner_mut().memory_mut().store64(
853                        &Self::u64_to_reg(write_length_addr),
854                        &Self::u64_to_reg(write_length),
855                    )?;
856                    write_machine
857                        .inner_mut()
858                        .set_register(A0, Self::u8_to_reg(SUCCESS));
859                    self.states.insert(write_vm_id, VmState::Runnable);
860                } else {
861                    // Only update write VM state
862                    self.states.insert(
863                        write_vm_id,
864                        VmState::WaitForWrite(WriteState {
865                            fd: write_fd,
866                            consumed,
867                            length: write_length,
868                            buffer_addr: write_buffer_addr,
869                            length_addr: write_length_addr,
870                        }),
871                    );
872                }
873            }
874        }
875        Ok(())
876    }
877
878    /// If current scheduler is terminated
879    pub fn terminated(&self) -> bool {
880        self.states
881            .get(&ROOT_VM_ID)
882            .map(|state| *state == VmState::Terminated)
883            .unwrap_or(false)
884    }
885
886    fn terminated_result(&mut self) -> Result<TerminatedResult, Error> {
887        assert!(self.terminated());
888
889        let exit_code = {
890            let root_vm = &self.ensure_get_instantiated(&ROOT_VM_ID)?.1;
891            root_vm.machine().exit_code()
892        };
893        Ok(TerminatedResult {
894            exit_code,
895            consumed_cycles: self.consumed_cycles(),
896        })
897    }
898
899    // Ensure VMs are instantiated
900    fn ensure_vms_instantiated(&mut self, ids: &[VmId]) -> Result<(), Error> {
901        if ids.len() > MAX_INSTANTIATED_VMS {
902            return Err(Error::Unexpected(format!(
903                "At most {} VMs can be instantiated but {} are requested!",
904                MAX_INSTANTIATED_VMS,
905                ids.len()
906            )));
907        }
908
909        let mut uninstantiated_ids: Vec<VmId> = ids
910            .iter()
911            .filter(|id| !self.instantiated.contains_key(id))
912            .copied()
913            .collect();
914        while (!uninstantiated_ids.is_empty()) && (self.instantiated.len() < MAX_INSTANTIATED_VMS) {
915            let id = uninstantiated_ids
916                .pop()
917                .ok_or_else(|| Error::Unexpected("Map should not be empty".to_string()))?;
918            self.resume_vm(&id)?;
919        }
920
921        if !uninstantiated_ids.is_empty() {
922            // Instantiated is a BTreeMap, an iterator on it maintains key order to ensure deterministic behavior
923            let suspendable_ids: Vec<VmId> = self
924                .instantiated
925                .keys()
926                .filter(|id| !ids.contains(id))
927                .copied()
928                .collect();
929
930            assert!(suspendable_ids.len() >= uninstantiated_ids.len());
931            for i in 0..uninstantiated_ids.len() {
932                self.suspend_vm(&suspendable_ids[i])?;
933                self.resume_vm(&uninstantiated_ids[i])?;
934            }
935        }
936
937        Ok(())
938    }
939
940    /// Ensure corresponding VM is instantiated and return a mutable reference to it
941    fn ensure_get_instantiated(&mut self, id: &VmId) -> Result<&mut (VmContext<DL>, M), Error> {
942        self.ensure_vms_instantiated(&[*id])?;
943        self.instantiated
944            .get_mut(id)
945            .ok_or_else(|| Error::Unexpected("Unable to find VM Id".to_string()))
946    }
947
948    // Resume a suspended VM
949    fn resume_vm(&mut self, id: &VmId) -> Result<(), Error> {
950        if !self.suspended.contains_key(id) {
951            return Err(Error::Unexpected(format!("VM {:?} is not suspended!", id)));
952        }
953        let snapshot = &self.suspended[id];
954        self.iteration_cycles = self
955            .iteration_cycles
956            .checked_add(SPAWN_EXTRA_CYCLES_BASE)
957            .ok_or(Error::CyclesExceeded)?;
958        let (context, mut machine) = self.create_dummy_vm(id)?;
959        {
960            let mut sc = context.snapshot2_context.lock().expect("lock");
961            sc.resume(machine.inner_mut(), snapshot)?;
962        }
963        self.instantiated.insert(*id, (context, machine));
964        self.suspended.remove(id);
965        Ok(())
966    }
967
968    // Suspend an instantiated VM
969    fn suspend_vm(&mut self, id: &VmId) -> Result<(), Error> {
970        if !self.instantiated.contains_key(id) {
971            return Err(Error::Unexpected(format!(
972                "VM {:?} is not instantiated!",
973                id
974            )));
975        }
976        self.iteration_cycles = self
977            .iteration_cycles
978            .checked_add(SPAWN_EXTRA_CYCLES_BASE)
979            .ok_or(Error::CyclesExceeded)?;
980        let (context, machine) = self
981            .instantiated
982            .get_mut(id)
983            .ok_or_else(|| Error::Unexpected("Unable to find VM Id".to_string()))?;
984        let snapshot = {
985            let sc = context.snapshot2_context.lock().expect("lock");
986            sc.make_snapshot(machine.inner_mut())?
987        };
988        self.suspended.insert(*id, snapshot);
989        self.instantiated.remove(id);
990        Ok(())
991    }
992
993    fn boot_root_vm_if_needed(&mut self) -> Result<(), Error> {
994        if self.states.is_empty() {
995            // Booting phase, we will need to initialize the first VM.
996            let program_id = self.sg_data.sg_info.program_data_piece_id.clone();
997            assert_eq!(
998                self.boot_vm(
999                    &DataLocation {
1000                        data_piece_id: program_id,
1001                        offset: 0,
1002                        length: u64::MAX,
1003                    },
1004                    VmArgs::Vector(self.root_vm_args.clone()),
1005                )?,
1006                ROOT_VM_ID
1007            );
1008        }
1009        assert!(self.states.contains_key(&ROOT_VM_ID));
1010
1011        Ok(())
1012    }
1013
1014    /// Boot a vm by given program and args.
1015    fn boot_vm(&mut self, location: &DataLocation, args: VmArgs) -> Result<VmId, Error> {
1016        let id = self.next_vm_id;
1017        self.next_vm_id += 1;
1018        let (context, mut machine) = self.create_dummy_vm(&id)?;
1019        let (program, _) = {
1020            let sc = context.snapshot2_context.lock().expect("lock");
1021            sc.load_data(&location.data_piece_id, location.offset, location.length)?
1022        };
1023        self.load_vm_program(&context, &mut machine, location, program, args)?;
1024        // Newly booted VM will be instantiated by default
1025        while self.instantiated.len() >= MAX_INSTANTIATED_VMS {
1026            // Instantiated is a BTreeMap, first_entry will maintain key order
1027            let id = *self
1028                .instantiated
1029                .first_entry()
1030                .ok_or_else(|| Error::Unexpected("Map should not be empty".to_string()))?
1031                .key();
1032            self.suspend_vm(&id)?;
1033        }
1034
1035        self.instantiated.insert(id, (context, machine));
1036        self.states.insert(id, VmState::Runnable);
1037
1038        Ok(id)
1039    }
1040
1041    // Load the program into an empty vm.
1042    fn load_vm_program(
1043        &mut self,
1044        context: &VmContext<DL>,
1045        machine: &mut M,
1046        location: &DataLocation,
1047        program: Bytes,
1048        args: VmArgs,
1049    ) -> Result<u64, Error> {
1050        let metadata = parse_elf::<u64>(&program, machine.inner_mut().version())?;
1051        let bytes = match args {
1052            VmArgs::Reader { vm_id, argc, argv } => {
1053                let (_, machine_from) = self.ensure_get_instantiated(&vm_id)?;
1054                let argc = Self::u64_to_reg(argc);
1055                let argv = Self::u64_to_reg(argv);
1056                let argv =
1057                    FlattenedArgsReader::new(machine_from.inner_mut().memory_mut(), argc, argv);
1058                machine.load_program_with_metadata(&program, &metadata, argv)?
1059            }
1060            VmArgs::Vector(data) => {
1061                machine.load_program_with_metadata(&program, &metadata, data.into_iter().map(Ok))?
1062            }
1063        };
1064        let mut sc = context.snapshot2_context.lock().expect("lock");
1065        sc.mark_program(
1066            machine.inner_mut(),
1067            &metadata,
1068            &location.data_piece_id,
1069            location.offset,
1070        )?;
1071        machine
1072            .inner_mut()
1073            .add_cycles_no_checking(transferred_byte_cycles(bytes))?;
1074        Ok(bytes)
1075    }
1076
1077    // Create a new VM instance with syscalls attached
1078    fn create_dummy_vm(&self, id: &VmId) -> Result<(VmContext<DL>, M), Error> {
1079        let version = &self.sg_data.sg_info.script_version;
1080        let core_machine = M::Inner::new(
1081            version.vm_isa(),
1082            version.vm_version(),
1083            // We will update max_cycles for each machine when it gets a chance to run
1084            u64::MAX,
1085        );
1086        let vm_context = VmContext {
1087            base_cycles: Arc::clone(&self.total_cycles),
1088            message_box: Arc::clone(&self.message_box),
1089            snapshot2_context: Arc::new(Mutex::new(Snapshot2Context::new(self.sg_data.clone()))),
1090        };
1091
1092        let machine_builder = DefaultMachineBuilder::new(core_machine)
1093            .instruction_cycle_func(Box::new(estimate_cycles));
1094        let machine_builder =
1095            (self.syscall_generator)(id, &self.sg_data, &vm_context, &self.syscall_context)
1096                .into_iter()
1097                .fold(machine_builder, |builder, syscall| builder.syscall(syscall));
1098        let default_machine = machine_builder.build();
1099        Ok((vm_context, M::new(default_machine)))
1100    }
1101
1102    /// Provided for compatibility with surrounding tools. Normally, you should not call this
1103    /// function from within the current crate.
1104    pub fn set_root_vm_args(&mut self, args: Vec<Bytes>) {
1105        self.root_vm_args = args;
1106    }
1107
1108    fn i8_to_reg(v: i8) -> <M::Inner as CoreMachine>::REG {
1109        <M::Inner as CoreMachine>::REG::from_i8(v)
1110    }
1111
1112    fn u8_to_reg(v: u8) -> <M::Inner as CoreMachine>::REG {
1113        <M::Inner as CoreMachine>::REG::from_u8(v)
1114    }
1115
1116    fn u64_to_reg(v: u64) -> <M::Inner as CoreMachine>::REG {
1117        <M::Inner as CoreMachine>::REG::from_u64(v)
1118    }
1119}