ckb_script/
scheduler.rs

1use crate::cost_model::transferred_byte_cycles;
2use crate::syscalls::{
3    EXEC_LOAD_ELF_V2_CYCLES_BASE, INVALID_FD, MAX_FDS_CREATED, MAX_VMS_SPAWNED, OTHER_END_CLOSED,
4    SPAWN_EXTRA_CYCLES_BASE, SUCCESS, WAIT_FAILURE,
5};
6
7use crate::types::{
8    DataLocation, DataPieceId, FIRST_FD_SLOT, FIRST_VM_ID, Fd, FdArgs, FullSuspendedState,
9    IterationResult, Message, ReadState, RunMode, SgData, SyscallGenerator, TerminatedResult,
10    VmArgs, VmContext, VmId, VmState, WriteState,
11};
12use ckb_traits::{CellDataProvider, ExtensionProvider, HeaderProvider};
13use ckb_types::core::Cycle;
14use ckb_vm::snapshot2::Snapshot2Context;
15use ckb_vm::{
16    Error, FlattenedArgsReader, Register,
17    bytes::Bytes,
18    cost_model::estimate_cycles,
19    elf::parse_elf,
20    machine::{CoreMachine, DefaultMachineBuilder, DefaultMachineRunner, Pause, SupportMachine},
21    memory::Memory,
22    registers::A0,
23    snapshot2::Snapshot2,
24};
25use std::collections::{BTreeMap, HashMap};
26use std::sync::{
27    Arc, Mutex,
28    atomic::{AtomicU64, Ordering},
29};
30
31/// Root process's id.
32pub const ROOT_VM_ID: VmId = FIRST_VM_ID;
33/// The maximum number of VMs that can be created at the same time.
34pub const MAX_VMS_COUNT: u64 = 16;
35/// The maximum number of instantiated VMs.
36pub const MAX_INSTANTIATED_VMS: usize = 4;
37/// The maximum number of fds.
38pub const MAX_FDS: u64 = 64;
39
40/// A single Scheduler instance is used to verify a single script
41/// within a CKB transaction.
42///
43/// A scheduler holds & manipulates a core, the scheduler also holds
44/// all CKB-VM machines, each CKB-VM machine also gets a mutable reference
45/// of the core for IO operations.
46pub struct Scheduler<DL, V, M>
47where
48    DL: CellDataProvider,
49    M: DefaultMachineRunner,
50{
51    /// Immutable context data for current running transaction & script.
52    sg_data: SgData<DL>,
53
54    /// Syscall generator
55    syscall_generator: SyscallGenerator<DL, V, M::Inner>,
56    /// Syscall generator context
57    syscall_context: V,
58
59    /// Total cycles. When a scheduler executes, there are 3 variables
60    /// that might all contain charged cycles: +total_cycles+,
61    /// +iteration_cycles+ and +machine.cycles()+ from the current
62    /// executing virtual machine. At any given time, the sum of all 3
63    /// variables here, represent the total consumed cycles by the current
64    /// scheduler.
65    /// But there are also exceptions: at certain period of time, the cycles
66    /// stored in `machine.cycles()` are moved over to +iteration_cycles+,
67    /// the cycles stored in +iteration_cycles+ would also be moved over to
68    /// +total_cycles+:
69    ///
70    /// * The current running virtual machine would contain consumed
71    ///   cycles in its own machine.cycles() structure.
72    /// * +iteration_cycles+ holds the current consumed cycles each time
73    ///   we executed a virtual machine(also named an iteration). It will
74    ///   always be zero before each iteration(i.e., before each VM starts
75    ///   execution). When a virtual machine finishes execution, the cycles
76    ///   stored in `machine.cycles()` will be moved over to +iteration_cycles+.
77    ///   `machine.cycles()` will then be reset to zero.
78    /// * Processing messages in the message box would alao charge cycles
79    ///   for operations, such as suspending/resuming VMs, transferring data
80    ///   etc. Those cycles were added to +iteration_cycles+ directly. When all
81    ///   postprocessing work is completed, the cycles consumed in
82    ///   +iteration_cycles+ will then be moved to +total_cycles+.
83    ///   +iteration_cycles+ will then be reset to zero.
84    ///
85    /// One can consider that +total_cycles+ contains the total cycles
86    /// consumed in current scheduler, when the scheduler is not busy executing.
87    ///
88    /// NOTE: the above workflow describes the optimal case: `iteration_cycles`
89    /// will always be zero after each iteration. However, our initial implementation
90    /// for Meepo hardfork contains a bug: cycles charged by suspending / resuming
91    /// VMs when processing IOs, will not be reflected in `current cycles` syscalls
92    /// of the subsequent running VMs. To preserve this behavior, consumed cycles in
93    /// iteration_cycles cannot be moved at iterate boundaries. Later hardfork versions
94    /// might fix this, but for the Meepo hardfork, we will have to preserve this behavior.
95    total_cycles: Arc<AtomicU64>,
96    /// Iteration cycles, see +total_cycles+ on its usage
97    iteration_cycles: Cycle,
98    /// Next vm id used by spawn.
99    next_vm_id: VmId,
100    /// Next fd used by pipe.
101    next_fd_slot: u64,
102    /// Used to store VM state.
103    states: BTreeMap<VmId, VmState>,
104    /// Used to confirm the owner of fd.
105    fds: BTreeMap<Fd, VmId>,
106    /// Verify the VM's inherited fd list.
107    inherited_fd: BTreeMap<VmId, Vec<Fd>>,
108    /// Instantiated vms.
109    instantiated: BTreeMap<VmId, (VmContext<DL>, M)>,
110    /// Suspended vms.
111    suspended: BTreeMap<VmId, Snapshot2<DataPieceId>>,
112    /// Terminated vms.
113    terminated_vms: BTreeMap<VmId, i8>,
114    /// Root vm's arguments. Provided for compatibility with surrounding tools. You should not
115    /// read it anywhere except when initializing the root vm.
116    /// Note: This field is intentionally not serialized in FullSuspendedState.
117    root_vm_args: Vec<Bytes>,
118
119    /// MessageBox is expected to be empty before returning from `run`
120    /// function, there is no need to persist messages.
121    message_box: Arc<Mutex<Vec<Message>>>,
122}
123
124impl<DL, V, M> Scheduler<DL, V, M>
125where
126    DL: CellDataProvider + HeaderProvider + ExtensionProvider + Clone,
127    V: Clone,
128    M: DefaultMachineRunner,
129{
130    /// Create a new scheduler from empty state
131    pub fn new(
132        sg_data: SgData<DL>,
133        syscall_generator: SyscallGenerator<DL, V, M::Inner>,
134        syscall_context: V,
135    ) -> Self {
136        Self {
137            sg_data,
138            syscall_generator,
139            syscall_context,
140            total_cycles: Arc::new(AtomicU64::new(0)),
141            iteration_cycles: 0,
142            next_vm_id: FIRST_VM_ID,
143            next_fd_slot: FIRST_FD_SLOT,
144            states: BTreeMap::default(),
145            fds: BTreeMap::default(),
146            inherited_fd: BTreeMap::default(),
147            instantiated: BTreeMap::default(),
148            suspended: BTreeMap::default(),
149            message_box: Arc::new(Mutex::new(Vec::new())),
150            terminated_vms: BTreeMap::default(),
151            root_vm_args: Vec::new(),
152        }
153    }
154
155    /// Return total cycles.
156    pub fn consumed_cycles(&self) -> Cycle {
157        self.total_cycles.load(Ordering::Acquire)
158    }
159
160    /// Fetch specified VM state
161    pub fn state(&self, vm_id: &VmId) -> Option<VmState> {
162        self.states.get(vm_id).cloned()
163    }
164
165    /// Access the SgData data structure
166    pub fn sg_data(&self) -> &SgData<DL> {
167        &self.sg_data
168    }
169
170    /// This function provides a peek into one of the current created
171    /// VM. Depending on the actual state, the VM might either be instantiated
172    /// or suspended. As a result, 2 callback functions must be provided to handle
173    /// both cases. The function only provides a *peek*, meaning the caller must
174    /// not make any changes to an instantiated VMs. the VM is passed as a mutable
175    /// reference only because memory load functions in CKB-VM require mutable
176    /// references. It does not mean the caller can modify the VM in any sense.
177    /// Even a slight tampering of the VM can result in non-determinism.
178    pub fn peek<F, G, W>(&mut self, vm_id: &VmId, mut f: F, mut g: G) -> Result<W, Error>
179    where
180        F: FnMut(&mut M) -> Result<W, Error>,
181        G: FnMut(&Snapshot2<DataPieceId>, &SgData<DL>) -> Result<W, Error>,
182    {
183        if let Some((_, machine)) = self.instantiated.get_mut(vm_id) {
184            return f(machine);
185        }
186        if let Some(snapshot) = self.suspended.get(vm_id) {
187            return g(snapshot, &self.sg_data);
188        }
189        Err(Error::Unexpected(format!("VM {} does not exist!", vm_id)))
190    }
191
192    /// Add cycles to total cycles.
193    fn consume_cycles(&mut self, cycles: Cycle) -> Result<(), Error> {
194        match self
195            .total_cycles
196            .fetch_update(Ordering::AcqRel, Ordering::Acquire, |total_cycles| {
197                total_cycles.checked_add(cycles)
198            }) {
199            Ok(_) => Ok(()),
200            Err(_) => Err(Error::CyclesExceeded),
201        }
202    }
203
204    /// Resume a previously suspended scheduler state
205    pub fn resume(
206        sg_data: SgData<DL>,
207        syscall_generator: SyscallGenerator<DL, V, M::Inner>,
208        syscall_context: V,
209        full: FullSuspendedState,
210    ) -> Self {
211        let mut scheduler = Self {
212            sg_data,
213            syscall_generator,
214            syscall_context,
215            total_cycles: Arc::new(AtomicU64::new(full.total_cycles)),
216            iteration_cycles: full.iteration_cycles,
217            next_vm_id: full.next_vm_id,
218            next_fd_slot: full.next_fd_slot,
219            states: full
220                .vms
221                .iter()
222                .map(|(id, state, _)| (*id, state.clone()))
223                .collect(),
224            fds: full.fds.into_iter().collect(),
225            inherited_fd: full.inherited_fd.into_iter().collect(),
226            instantiated: BTreeMap::default(),
227            suspended: full
228                .vms
229                .into_iter()
230                .map(|(id, _, snapshot)| (id, snapshot))
231                .collect(),
232            message_box: Arc::new(Mutex::new(Vec::new())),
233            terminated_vms: full.terminated_vms.into_iter().collect(),
234            root_vm_args: Vec::new(),
235        };
236        scheduler
237            .ensure_vms_instantiated(&full.instantiated_ids)
238            .unwrap();
239        // NOTE: suspending/resuming a scheduler is part of CKB's implementation
240        // details. It is not part of execution consensue. We should not charge
241        // cycles for them.
242        scheduler.iteration_cycles = 0;
243        scheduler
244    }
245
246    /// Suspend current scheduler into a serializable full state
247    pub fn suspend(mut self) -> Result<FullSuspendedState, Error> {
248        assert!(self.message_box.lock().expect("lock").is_empty());
249        let mut vms = Vec::with_capacity(self.states.len());
250        let instantiated_ids: Vec<_> = self.instantiated.keys().cloned().collect();
251        for id in &instantiated_ids {
252            self.suspend_vm(id)?;
253        }
254        for (id, state) in self.states {
255            let snapshot = self
256                .suspended
257                .remove(&id)
258                .ok_or_else(|| Error::Unexpected("Unable to find VM Id".to_string()))?;
259            vms.push((id, state, snapshot));
260        }
261        Ok(FullSuspendedState {
262            // NOTE: suspending a scheduler is actually part of CKB's
263            // internal execution logic, it does not belong to VM execution
264            // consensus. We are not charging cycles for suspending
265            // a VM in the process of suspending the whole scheduler.
266            total_cycles: self.total_cycles.load(Ordering::Acquire),
267            iteration_cycles: self.iteration_cycles,
268            next_vm_id: self.next_vm_id,
269            next_fd_slot: self.next_fd_slot,
270            vms,
271            fds: self.fds.into_iter().collect(),
272            inherited_fd: self.inherited_fd.into_iter().collect(),
273            terminated_vms: self.terminated_vms.into_iter().collect(),
274            instantiated_ids,
275        })
276    }
277
278    /// This is the only entrypoint for running the scheduler,
279    /// both newly created instance and resumed instance are supported.
280    /// It accepts 2 run mode, one can either limit the cycles to execute,
281    /// or use a pause signal to trigger termination.
282    ///
283    /// Only when the execution terminates without VM errors, will this
284    /// function return an exit code(could still be non-zero) and total
285    /// consumed cycles.
286    ///
287    /// Err would be returned in the following cases:
288    /// * Cycle limit reached, the returned error would be ckb_vm::Error::CyclesExceeded,
289    /// * Pause trigger, the returned error would be ckb_vm::Error::Pause,
290    /// * Other terminating errors
291    pub fn run(&mut self, mode: RunMode) -> Result<TerminatedResult, Error> {
292        self.boot_root_vm_if_needed()?;
293
294        let (pause, mut limit_cycles) = match mode {
295            RunMode::LimitCycles(limit_cycles) => (Pause::new(), limit_cycles),
296            RunMode::Pause(pause, limit_cycles) => (pause, limit_cycles),
297        };
298
299        while !self.terminated() {
300            limit_cycles = self.iterate_outer(&pause, limit_cycles)?.1;
301        }
302        assert_eq!(self.iteration_cycles, 0);
303
304        self.terminated_result()
305    }
306
307    /// Public API that runs a single VM, processes all messages, then returns the
308    /// executed VM ID(so caller can fetch later data). This can be used when more
309    /// finer tweaks are required for a single VM.
310    pub fn iterate(&mut self) -> Result<IterationResult, Error> {
311        self.boot_root_vm_if_needed()?;
312
313        if self.terminated() {
314            return Ok(IterationResult {
315                executed_vm: ROOT_VM_ID,
316                terminated_status: Some(self.terminated_result()?),
317            });
318        }
319
320        let (id, _) = self.iterate_outer(&Pause::new(), u64::MAX)?;
321        let terminated_status = if self.terminated() {
322            assert_eq!(self.iteration_cycles, 0);
323            Some(self.terminated_result()?)
324        } else {
325            None
326        };
327
328        Ok(IterationResult {
329            executed_vm: id,
330            terminated_status,
331        })
332    }
333
334    /// Returns the machine that needs to be executed in the current iterate.
335    fn iterate_prepare_machine(&mut self) -> Result<(u64, &mut M), Error> {
336        // Find a runnable VM that has the largest ID.
337        let vm_id_to_run = self
338            .states
339            .iter()
340            .rev()
341            .filter(|(_, state)| matches!(state, VmState::Runnable))
342            .map(|(id, _)| *id)
343            .next();
344        let vm_id_to_run = vm_id_to_run.ok_or_else(|| {
345            Error::Unexpected("A deadlock situation has been reached!".to_string())
346        })?;
347        let (_context, machine) = self.ensure_get_instantiated(&vm_id_to_run)?;
348        Ok((vm_id_to_run, machine))
349    }
350
351    /// Process machine execution results in the current iterate.
352    fn iterate_process_results(
353        &mut self,
354        vm_id_to_run: u64,
355        result: Result<i8, Error>,
356    ) -> Result<(), Error> {
357        // Process message box, update VM states accordingly
358        self.process_message_box()?;
359        assert!(self.message_box.lock().expect("lock").is_empty());
360        // If the VM terminates, update VMs in join state, also closes its fds
361        let result = match result {
362            Ok(code) => {
363                self.terminated_vms.insert(vm_id_to_run, code);
364                // When root VM terminates, the execution stops immediately, we will purge
365                // all non-root VMs, and only keep root VM in states.
366                // When non-root VM terminates, we only purge the VM's own states.
367                if vm_id_to_run == ROOT_VM_ID {
368                    self.ensure_vms_instantiated(&[vm_id_to_run])?;
369                    self.instantiated.retain(|id, _| *id == vm_id_to_run);
370                    self.suspended.clear();
371                    self.states.clear();
372                    self.states.insert(vm_id_to_run, VmState::Terminated);
373                } else {
374                    let joining_vms: Vec<(VmId, u64)> = self
375                        .states
376                        .iter()
377                        .filter_map(|(vm_id, state)| match state {
378                            VmState::Wait {
379                                target_vm_id,
380                                exit_code_addr,
381                            } if *target_vm_id == vm_id_to_run => Some((*vm_id, *exit_code_addr)),
382                            _ => None,
383                        })
384                        .collect();
385                    // For all joining VMs, update exit code, then mark them as
386                    // runnable state.
387                    for (vm_id, exit_code_addr) in joining_vms {
388                        let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
389                        machine
390                            .inner_mut()
391                            .memory_mut()
392                            .store8(&Self::u64_to_reg(exit_code_addr), &Self::i8_to_reg(code))?;
393                        machine
394                            .inner_mut()
395                            .set_register(A0, Self::u8_to_reg(SUCCESS));
396                        self.states.insert(vm_id, VmState::Runnable);
397                    }
398                    // Close fds
399                    self.fds.retain(|_, vm_id| *vm_id != vm_id_to_run);
400                    // Clear terminated VM states
401                    self.states.remove(&vm_id_to_run);
402                    self.instantiated.remove(&vm_id_to_run);
403                    self.suspended.remove(&vm_id_to_run);
404                }
405                Ok(())
406            }
407            Err(Error::Yield) => Ok(()),
408            Err(e) => Err(e),
409        };
410        result
411    }
412
413    // This internal function is actually a wrapper over +iterate_inner+,
414    // it is split into a different function, so cycle calculation will be
415    // executed no matter what result +iterate_inner+ returns.
416    #[inline]
417    fn iterate_outer(
418        &mut self,
419        pause: &Pause,
420        limit_cycles: Cycle,
421    ) -> Result<(VmId, Cycle), Error> {
422        let iterate_return = self.iterate_inner(pause.clone(), limit_cycles);
423        self.consume_cycles(self.iteration_cycles)?;
424        let remaining_cycles = limit_cycles
425            .checked_sub(self.iteration_cycles)
426            .ok_or(Error::CyclesExceeded)?;
427        // Clear iteration cycles intentionally after each run
428        self.iteration_cycles = 0;
429        // Process all pending VM reads & writes. Notice ideally, this invocation
430        // should be put at the end of `iterate_inner` function. However, 2 things
431        // prevent this:
432        //
433        // * In earlier implementation of the Meepo hardfork version, `self.process_io`
434        // was put at the very start of +iterate_prepare_machine+ method. Meaning we used
435        // to process IO syscalls at the very start of a new iteration.
436        // * Earlier implementation contains a bug that cycles consumed by suspending / resuming
437        // VMs are not updated in the subsequent VM's `current cycles` syscalls.
438        //
439        // To make ckb-script package suitable for outside usage, we want IOs processed at
440        // the end of each iteration, not at the start of the next iteration. We also need
441        // to replicate the exact same runtime behavior of Meepo hardfork. This means the only
442        // viable change will be:
443        //
444        // * Move `self.process_io` call to the very end of `iterate_outer` method, which is
445        // exactly current location
446        // * For now we have to live with the fact that `iteration_cycles` will not always be
447        // zero at iteration boundaries, and also preserve its value in `FullSuspendedState`.
448        //
449        // One expected change is that +process_io+ is now called once more
450        // after the whole scheduler terminates, and not called at the very beginning
451        // when no VM is executing. But since no VMs will be in IO states at this 2 timeslot,
452        // we should be fine here.
453        self.process_io()?;
454        let id = iterate_return?;
455        Ok((id, remaining_cycles))
456    }
457
458    // This is internal function that does the actual VM execution loop.
459    // Here both pause signal and limit_cycles are provided so as to simplify
460    // branches.
461    fn iterate_inner(&mut self, pause: Pause, limit_cycles: Cycle) -> Result<VmId, Error> {
462        // Execute the VM for real, consumed cycles in the virtual machine is
463        // moved over to +iteration_cycles+, then we reset virtual machine's own
464        // cycle count to zero.
465        let (id, result, cycles) = {
466            let (id, vm) = self.iterate_prepare_machine()?;
467            vm.inner_mut().set_max_cycles(limit_cycles);
468            vm.machine_mut().set_pause(pause);
469            let result = vm.run();
470            let cycles = vm.machine().cycles();
471            vm.inner_mut().set_cycles(0);
472            (id, result, cycles)
473        };
474        self.iteration_cycles = self
475            .iteration_cycles
476            .checked_add(cycles)
477            .ok_or(Error::CyclesExceeded)?;
478        self.iterate_process_results(id, result)?;
479        Ok(id)
480    }
481
482    fn process_message_box(&mut self) -> Result<(), Error> {
483        let messages: Vec<Message> = self.message_box.lock().expect("lock").drain(..).collect();
484        for message in messages {
485            match message {
486                Message::ExecV2(vm_id, args) => {
487                    let (old_context, old_machine) = self
488                        .instantiated
489                        .get_mut(&vm_id)
490                        .ok_or_else(|| Error::Unexpected("Unable to find VM Id".to_string()))?;
491                    old_machine
492                        .inner_mut()
493                        .add_cycles_no_checking(EXEC_LOAD_ELF_V2_CYCLES_BASE)?;
494                    let old_cycles = old_machine.machine().cycles();
495                    let max_cycles = old_machine.machine().max_cycles();
496                    let program = {
497                        let sc = old_context.snapshot2_context.lock().expect("lock");
498                        sc.load_data(
499                            &args.location.data_piece_id,
500                            args.location.offset,
501                            args.location.length,
502                        )?
503                        .0
504                    };
505                    let (context, mut new_machine) = self.create_dummy_vm(&vm_id)?;
506                    new_machine.inner_mut().set_max_cycles(max_cycles);
507                    new_machine.inner_mut().add_cycles_no_checking(old_cycles)?;
508                    self.load_vm_program(
509                        &context,
510                        &mut new_machine,
511                        &args.location,
512                        program,
513                        VmArgs::Reader {
514                            vm_id,
515                            argc: args.argc,
516                            argv: args.argv,
517                        },
518                    )?;
519                    // The insert operation removes the old vm instance and adds the new vm instance.
520                    debug_assert!(self.instantiated.contains_key(&vm_id));
521                    self.instantiated.insert(vm_id, (context, new_machine));
522                }
523                Message::Spawn(vm_id, args) => {
524                    // All fds must belong to the correct owner
525                    if args.fds.iter().any(|fd| self.fds.get(fd) != Some(&vm_id)) {
526                        let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
527                        machine
528                            .inner_mut()
529                            .set_register(A0, Self::u8_to_reg(INVALID_FD));
530                        continue;
531                    }
532                    if self.suspended.len() + self.instantiated.len() > MAX_VMS_COUNT as usize {
533                        let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
534                        machine
535                            .inner_mut()
536                            .set_register(A0, Self::u8_to_reg(MAX_VMS_SPAWNED));
537                        continue;
538                    }
539                    let spawned_vm_id = self.boot_vm(
540                        &args.location,
541                        VmArgs::Reader {
542                            vm_id,
543                            argc: args.argc,
544                            argv: args.argv,
545                        },
546                    )?;
547                    // Move passed fds from spawner to spawnee
548                    for fd in &args.fds {
549                        self.fds.insert(*fd, spawned_vm_id);
550                    }
551                    // Here we keep the original version of file descriptors.
552                    // If one fd is moved afterward, this inherited file descriptors doesn't change.
553                    self.inherited_fd.insert(spawned_vm_id, args.fds.clone());
554
555                    let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
556                    machine.inner_mut().memory_mut().store64(
557                        &Self::u64_to_reg(args.process_id_addr),
558                        &Self::u64_to_reg(spawned_vm_id),
559                    )?;
560                    machine
561                        .inner_mut()
562                        .set_register(A0, Self::u8_to_reg(SUCCESS));
563                }
564                Message::Wait(vm_id, args) => {
565                    if let Some(exit_code) = self.terminated_vms.get(&args.target_id).copied() {
566                        let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
567                        machine.inner_mut().memory_mut().store8(
568                            &Self::u64_to_reg(args.exit_code_addr),
569                            &Self::i8_to_reg(exit_code),
570                        )?;
571                        machine
572                            .inner_mut()
573                            .set_register(A0, Self::u8_to_reg(SUCCESS));
574                        self.states.insert(vm_id, VmState::Runnable);
575                        self.terminated_vms.retain(|id, _| id != &args.target_id);
576                        continue;
577                    }
578                    if !self.states.contains_key(&args.target_id) {
579                        let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
580                        machine
581                            .inner_mut()
582                            .set_register(A0, Self::u8_to_reg(WAIT_FAILURE));
583                        continue;
584                    }
585                    // Return code will be updated when the joining VM exits
586                    self.states.insert(
587                        vm_id,
588                        VmState::Wait {
589                            target_vm_id: args.target_id,
590                            exit_code_addr: args.exit_code_addr,
591                        },
592                    );
593                }
594                Message::Pipe(vm_id, args) => {
595                    if self.fds.len() as u64 >= MAX_FDS {
596                        let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
597                        machine
598                            .inner_mut()
599                            .set_register(A0, Self::u8_to_reg(MAX_FDS_CREATED));
600                        continue;
601                    }
602                    let (p1, p2, slot) = Fd::create(self.next_fd_slot);
603                    self.next_fd_slot = slot;
604                    self.fds.insert(p1, vm_id);
605                    self.fds.insert(p2, vm_id);
606                    let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
607                    machine
608                        .inner_mut()
609                        .memory_mut()
610                        .store64(&Self::u64_to_reg(args.fd1_addr), &Self::u64_to_reg(p1.0))?;
611                    machine
612                        .inner_mut()
613                        .memory_mut()
614                        .store64(&Self::u64_to_reg(args.fd2_addr), &Self::u64_to_reg(p2.0))?;
615                    machine
616                        .inner_mut()
617                        .set_register(A0, Self::u8_to_reg(SUCCESS));
618                }
619                Message::FdRead(vm_id, args) => {
620                    if !(self.fds.contains_key(&args.fd) && (self.fds[&args.fd] == vm_id)) {
621                        let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
622                        machine
623                            .inner_mut()
624                            .set_register(A0, Self::u8_to_reg(INVALID_FD));
625                        continue;
626                    }
627                    if !self.fds.contains_key(&args.fd.other_fd()) {
628                        let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
629                        machine
630                            .inner_mut()
631                            .set_register(A0, Self::u8_to_reg(OTHER_END_CLOSED));
632                        continue;
633                    }
634                    // Return code will be updated when the read operation finishes
635                    self.states.insert(
636                        vm_id,
637                        VmState::WaitForRead(ReadState {
638                            fd: args.fd,
639                            length: args.length,
640                            buffer_addr: args.buffer_addr,
641                            length_addr: args.length_addr,
642                        }),
643                    );
644                }
645                Message::FdWrite(vm_id, args) => {
646                    if !(self.fds.contains_key(&args.fd) && (self.fds[&args.fd] == vm_id)) {
647                        let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
648                        machine
649                            .inner_mut()
650                            .set_register(A0, Self::u8_to_reg(INVALID_FD));
651                        continue;
652                    }
653                    if !self.fds.contains_key(&args.fd.other_fd()) {
654                        let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
655                        machine
656                            .inner_mut()
657                            .set_register(A0, Self::u8_to_reg(OTHER_END_CLOSED));
658                        continue;
659                    }
660                    // Return code will be updated when the write operation finishes
661                    self.states.insert(
662                        vm_id,
663                        VmState::WaitForWrite(WriteState {
664                            fd: args.fd,
665                            consumed: 0,
666                            length: args.length,
667                            buffer_addr: args.buffer_addr,
668                            length_addr: args.length_addr,
669                        }),
670                    );
671                }
672                Message::InheritedFileDescriptor(vm_id, args) => {
673                    let inherited_fd = if vm_id == ROOT_VM_ID {
674                        Vec::new()
675                    } else {
676                        self.inherited_fd[&vm_id].clone()
677                    };
678                    let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
679                    let FdArgs {
680                        buffer_addr,
681                        length_addr,
682                        ..
683                    } = args;
684                    let full_length = machine
685                        .inner_mut()
686                        .memory_mut()
687                        .load64(&Self::u64_to_reg(length_addr))?
688                        .to_u64();
689                    let real_length = inherited_fd.len() as u64;
690                    let copy_length = u64::min(full_length, real_length);
691                    for i in 0..copy_length {
692                        let fd = inherited_fd[i as usize].0;
693                        let addr = buffer_addr.checked_add(i * 8).ok_or(Error::MemOutOfBound)?;
694                        machine
695                            .inner_mut()
696                            .memory_mut()
697                            .store64(&Self::u64_to_reg(addr), &Self::u64_to_reg(fd))?;
698                    }
699                    machine.inner_mut().memory_mut().store64(
700                        &Self::u64_to_reg(length_addr),
701                        &Self::u64_to_reg(real_length),
702                    )?;
703                    machine
704                        .inner_mut()
705                        .set_register(A0, Self::u8_to_reg(SUCCESS));
706                }
707                Message::Close(vm_id, fd) => {
708                    if self.fds.get(&fd) != Some(&vm_id) {
709                        let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
710                        machine
711                            .inner_mut()
712                            .set_register(A0, Self::u8_to_reg(INVALID_FD));
713                    } else {
714                        self.fds.remove(&fd);
715                        let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
716                        machine
717                            .inner_mut()
718                            .set_register(A0, Self::u8_to_reg(SUCCESS));
719                    }
720                }
721            }
722        }
723        Ok(())
724    }
725
726    fn process_io(&mut self) -> Result<(), Error> {
727        let mut reads: HashMap<Fd, (VmId, ReadState)> = HashMap::default();
728        let mut closed_fds: Vec<VmId> = Vec::new();
729
730        self.states.iter().for_each(|(vm_id, state)| {
731            if let VmState::WaitForRead(inner_state) = state {
732                if self.fds.contains_key(&inner_state.fd.other_fd()) {
733                    reads.insert(inner_state.fd, (*vm_id, inner_state.clone()));
734                } else {
735                    closed_fds.push(*vm_id);
736                }
737            }
738        });
739        let mut pairs: Vec<(VmId, ReadState, VmId, WriteState)> = Vec::new();
740        self.states.iter().for_each(|(vm_id, state)| {
741            if let VmState::WaitForWrite(inner_state) = state {
742                if self.fds.contains_key(&inner_state.fd.other_fd()) {
743                    if let Some((read_vm_id, read_state)) = reads.get(&inner_state.fd.other_fd()) {
744                        pairs.push((*read_vm_id, read_state.clone(), *vm_id, inner_state.clone()));
745                    }
746                } else {
747                    closed_fds.push(*vm_id);
748                }
749            }
750        });
751        // Finish read / write syscalls for fds that are closed on the other end
752        for vm_id in closed_fds {
753            match self.states[&vm_id].clone() {
754                VmState::WaitForRead(ReadState { length_addr, .. }) => {
755                    let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
756                    machine.inner_mut().memory_mut().store64(
757                        &Self::u64_to_reg(length_addr),
758                        &<M::Inner as CoreMachine>::REG::zero(),
759                    )?;
760                    machine
761                        .inner_mut()
762                        .set_register(A0, Self::u8_to_reg(SUCCESS));
763                    self.states.insert(vm_id, VmState::Runnable);
764                }
765                VmState::WaitForWrite(WriteState {
766                    consumed,
767                    length_addr,
768                    ..
769                }) => {
770                    let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
771                    machine
772                        .inner_mut()
773                        .memory_mut()
774                        .store64(&Self::u64_to_reg(length_addr), &Self::u64_to_reg(consumed))?;
775                    machine
776                        .inner_mut()
777                        .set_register(A0, Self::u8_to_reg(SUCCESS));
778                    self.states.insert(vm_id, VmState::Runnable);
779                }
780                _ => (),
781            }
782        }
783        // Transferring data from write fds to read fds
784        for (read_vm_id, read_state, write_vm_id, write_state) in pairs {
785            let ReadState {
786                length: read_length,
787                buffer_addr: read_buffer_addr,
788                length_addr: read_length_addr,
789                ..
790            } = read_state;
791            let WriteState {
792                fd: write_fd,
793                mut consumed,
794                length: write_length,
795                buffer_addr: write_buffer_addr,
796                length_addr: write_length_addr,
797            } = write_state;
798
799            self.ensure_vms_instantiated(&[read_vm_id, write_vm_id])?;
800            {
801                let fillable = read_length;
802                let consumable = write_length - consumed;
803                let copiable = std::cmp::min(fillable, consumable);
804
805                // Actual data copying
806                let (_, write_machine) = self
807                    .instantiated
808                    .get_mut(&write_vm_id)
809                    .ok_or_else(|| Error::Unexpected("Unable to find VM Id".to_string()))?;
810                write_machine
811                    .inner_mut()
812                    .add_cycles_no_checking(transferred_byte_cycles(copiable))?;
813                let data = write_machine
814                    .inner_mut()
815                    .memory_mut()
816                    .load_bytes(write_buffer_addr.wrapping_add(consumed), copiable)?;
817                let (_, read_machine) = self
818                    .instantiated
819                    .get_mut(&read_vm_id)
820                    .ok_or_else(|| Error::Unexpected("Unable to find VM Id".to_string()))?;
821                read_machine
822                    .inner_mut()
823                    .add_cycles_no_checking(transferred_byte_cycles(copiable))?;
824                read_machine
825                    .inner_mut()
826                    .memory_mut()
827                    .store_bytes(read_buffer_addr, &data)?;
828                // Read syscall terminates as soon as some data are filled
829                read_machine.inner_mut().memory_mut().store64(
830                    &Self::u64_to_reg(read_length_addr),
831                    &Self::u64_to_reg(copiable),
832                )?;
833                read_machine
834                    .inner_mut()
835                    .set_register(A0, Self::u8_to_reg(SUCCESS));
836                self.states.insert(read_vm_id, VmState::Runnable);
837
838                // Write syscall, however, terminates only when all the data
839                // have been written, or when the pairing read fd is closed.
840                consumed += copiable;
841                if consumed == write_length {
842                    // Write VM has fulfilled its write request
843                    let (_, write_machine) = self
844                        .instantiated
845                        .get_mut(&write_vm_id)
846                        .ok_or_else(|| Error::Unexpected("Unable to find VM Id".to_string()))?;
847                    write_machine.inner_mut().memory_mut().store64(
848                        &Self::u64_to_reg(write_length_addr),
849                        &Self::u64_to_reg(write_length),
850                    )?;
851                    write_machine
852                        .inner_mut()
853                        .set_register(A0, Self::u8_to_reg(SUCCESS));
854                    self.states.insert(write_vm_id, VmState::Runnable);
855                } else {
856                    // Only update write VM state
857                    self.states.insert(
858                        write_vm_id,
859                        VmState::WaitForWrite(WriteState {
860                            fd: write_fd,
861                            consumed,
862                            length: write_length,
863                            buffer_addr: write_buffer_addr,
864                            length_addr: write_length_addr,
865                        }),
866                    );
867                }
868            }
869        }
870        Ok(())
871    }
872
873    /// If current scheduler is terminated
874    pub fn terminated(&self) -> bool {
875        self.states
876            .get(&ROOT_VM_ID)
877            .map(|state| *state == VmState::Terminated)
878            .unwrap_or(false)
879    }
880
881    fn terminated_result(&mut self) -> Result<TerminatedResult, Error> {
882        assert!(self.terminated());
883
884        let exit_code = {
885            let root_vm = &self.ensure_get_instantiated(&ROOT_VM_ID)?.1;
886            root_vm.machine().exit_code()
887        };
888        Ok(TerminatedResult {
889            exit_code,
890            consumed_cycles: self.consumed_cycles(),
891        })
892    }
893
894    // Ensure VMs are instantiated
895    fn ensure_vms_instantiated(&mut self, ids: &[VmId]) -> Result<(), Error> {
896        if ids.len() > MAX_INSTANTIATED_VMS {
897            return Err(Error::Unexpected(format!(
898                "At most {} VMs can be instantiated but {} are requested!",
899                MAX_INSTANTIATED_VMS,
900                ids.len()
901            )));
902        }
903
904        let mut uninstantiated_ids: Vec<VmId> = ids
905            .iter()
906            .filter(|id| !self.instantiated.contains_key(id))
907            .copied()
908            .collect();
909        while (!uninstantiated_ids.is_empty()) && (self.instantiated.len() < MAX_INSTANTIATED_VMS) {
910            let id = uninstantiated_ids
911                .pop()
912                .ok_or_else(|| Error::Unexpected("Map should not be empty".to_string()))?;
913            self.resume_vm(&id)?;
914        }
915
916        if !uninstantiated_ids.is_empty() {
917            // Instantiated is a BTreeMap, an iterator on it maintains key order to ensure deterministic behavior
918            let suspendable_ids: Vec<VmId> = self
919                .instantiated
920                .keys()
921                .filter(|id| !ids.contains(id))
922                .copied()
923                .collect();
924
925            assert!(suspendable_ids.len() >= uninstantiated_ids.len());
926            for i in 0..uninstantiated_ids.len() {
927                self.suspend_vm(&suspendable_ids[i])?;
928                self.resume_vm(&uninstantiated_ids[i])?;
929            }
930        }
931
932        Ok(())
933    }
934
935    /// Ensure corresponding VM is instantiated and return a mutable reference to it
936    fn ensure_get_instantiated(&mut self, id: &VmId) -> Result<&mut (VmContext<DL>, M), Error> {
937        self.ensure_vms_instantiated(&[*id])?;
938        self.instantiated
939            .get_mut(id)
940            .ok_or_else(|| Error::Unexpected("Unable to find VM Id".to_string()))
941    }
942
943    // Resume a suspended VM
944    fn resume_vm(&mut self, id: &VmId) -> Result<(), Error> {
945        if !self.suspended.contains_key(id) {
946            return Err(Error::Unexpected(format!("VM {:?} is not suspended!", id)));
947        }
948        let snapshot = &self.suspended[id];
949        self.iteration_cycles = self
950            .iteration_cycles
951            .checked_add(SPAWN_EXTRA_CYCLES_BASE)
952            .ok_or(Error::CyclesExceeded)?;
953        let (context, mut machine) = self.create_dummy_vm(id)?;
954        {
955            let mut sc = context.snapshot2_context.lock().expect("lock");
956            sc.resume(machine.inner_mut(), snapshot)?;
957        }
958        self.instantiated.insert(*id, (context, machine));
959        self.suspended.remove(id);
960        Ok(())
961    }
962
963    // Suspend an instantiated VM
964    fn suspend_vm(&mut self, id: &VmId) -> Result<(), Error> {
965        if !self.instantiated.contains_key(id) {
966            return Err(Error::Unexpected(format!(
967                "VM {:?} is not instantiated!",
968                id
969            )));
970        }
971        self.iteration_cycles = self
972            .iteration_cycles
973            .checked_add(SPAWN_EXTRA_CYCLES_BASE)
974            .ok_or(Error::CyclesExceeded)?;
975        let (context, machine) = self
976            .instantiated
977            .get_mut(id)
978            .ok_or_else(|| Error::Unexpected("Unable to find VM Id".to_string()))?;
979        let snapshot = {
980            let sc = context.snapshot2_context.lock().expect("lock");
981            sc.make_snapshot(machine.inner_mut())?
982        };
983        self.suspended.insert(*id, snapshot);
984        self.instantiated.remove(id);
985        Ok(())
986    }
987
988    fn boot_root_vm_if_needed(&mut self) -> Result<(), Error> {
989        if self.states.is_empty() {
990            // Booting phase, we will need to initialize the first VM.
991            let program_id = self.sg_data.sg_info.program_data_piece_id.clone();
992            assert_eq!(
993                self.boot_vm(
994                    &DataLocation {
995                        data_piece_id: program_id,
996                        offset: 0,
997                        length: u64::MAX,
998                    },
999                    VmArgs::Vector(self.root_vm_args.clone()),
1000                )?,
1001                ROOT_VM_ID
1002            );
1003        }
1004        assert!(self.states.contains_key(&ROOT_VM_ID));
1005
1006        Ok(())
1007    }
1008
1009    /// Boot a vm by given program and args.
1010    fn boot_vm(&mut self, location: &DataLocation, args: VmArgs) -> Result<VmId, Error> {
1011        let id = self.next_vm_id;
1012        self.next_vm_id += 1;
1013        let (context, mut machine) = self.create_dummy_vm(&id)?;
1014        let (program, _) = {
1015            let sc = context.snapshot2_context.lock().expect("lock");
1016            sc.load_data(&location.data_piece_id, location.offset, location.length)?
1017        };
1018        self.load_vm_program(&context, &mut machine, location, program, args)?;
1019        // Newly booted VM will be instantiated by default
1020        while self.instantiated.len() >= MAX_INSTANTIATED_VMS {
1021            // Instantiated is a BTreeMap, first_entry will maintain key order
1022            let id = *self
1023                .instantiated
1024                .first_entry()
1025                .ok_or_else(|| Error::Unexpected("Map should not be empty".to_string()))?
1026                .key();
1027            self.suspend_vm(&id)?;
1028        }
1029
1030        self.instantiated.insert(id, (context, machine));
1031        self.states.insert(id, VmState::Runnable);
1032
1033        Ok(id)
1034    }
1035
1036    // Load the program into an empty vm.
1037    fn load_vm_program(
1038        &mut self,
1039        context: &VmContext<DL>,
1040        machine: &mut M,
1041        location: &DataLocation,
1042        program: Bytes,
1043        args: VmArgs,
1044    ) -> Result<u64, Error> {
1045        let metadata = parse_elf::<u64>(&program, machine.inner_mut().version())?;
1046        let bytes = match args {
1047            VmArgs::Reader { vm_id, argc, argv } => {
1048                let (_, machine_from) = self.ensure_get_instantiated(&vm_id)?;
1049                let argc = Self::u64_to_reg(argc);
1050                let argv = Self::u64_to_reg(argv);
1051                let argv =
1052                    FlattenedArgsReader::new(machine_from.inner_mut().memory_mut(), argc, argv);
1053                machine.load_program_with_metadata(&program, &metadata, argv)?
1054            }
1055            VmArgs::Vector(data) => {
1056                machine.load_program_with_metadata(&program, &metadata, data.into_iter().map(Ok))?
1057            }
1058        };
1059        let mut sc = context.snapshot2_context.lock().expect("lock");
1060        sc.mark_program(
1061            machine.inner_mut(),
1062            &metadata,
1063            &location.data_piece_id,
1064            location.offset,
1065        )?;
1066        machine
1067            .inner_mut()
1068            .add_cycles_no_checking(transferred_byte_cycles(bytes))?;
1069        Ok(bytes)
1070    }
1071
1072    // Create a new VM instance with syscalls attached
1073    fn create_dummy_vm(&self, id: &VmId) -> Result<(VmContext<DL>, M), Error> {
1074        let version = &self.sg_data.sg_info.script_version;
1075        let core_machine = M::Inner::new(
1076            version.vm_isa(),
1077            version.vm_version(),
1078            // We will update max_cycles for each machine when it gets a chance to run
1079            u64::MAX,
1080        );
1081        let vm_context = VmContext {
1082            base_cycles: Arc::clone(&self.total_cycles),
1083            message_box: Arc::clone(&self.message_box),
1084            snapshot2_context: Arc::new(Mutex::new(Snapshot2Context::new(self.sg_data.clone()))),
1085        };
1086
1087        let machine_builder = DefaultMachineBuilder::new(core_machine)
1088            .instruction_cycle_func(Box::new(estimate_cycles));
1089        let machine_builder =
1090            (self.syscall_generator)(id, &self.sg_data, &vm_context, &self.syscall_context)
1091                .into_iter()
1092                .fold(machine_builder, |builder, syscall| builder.syscall(syscall));
1093        let default_machine = machine_builder.build();
1094        Ok((vm_context, M::new(default_machine)))
1095    }
1096
1097    /// Provided for compatibility with surrounding tools. Normally, you should not call this
1098    /// function from within the current crate.
1099    pub fn set_root_vm_args(&mut self, args: Vec<Bytes>) {
1100        self.root_vm_args = args;
1101    }
1102
1103    fn i8_to_reg(v: i8) -> <M::Inner as CoreMachine>::REG {
1104        <M::Inner as CoreMachine>::REG::from_i8(v)
1105    }
1106
1107    fn u8_to_reg(v: u8) -> <M::Inner as CoreMachine>::REG {
1108        <M::Inner as CoreMachine>::REG::from_u8(v)
1109    }
1110
1111    fn u64_to_reg(v: u64) -> <M::Inner as CoreMachine>::REG {
1112        <M::Inner as CoreMachine>::REG::from_u64(v)
1113    }
1114}