ckb_script/
scheduler.rs

1use crate::cost_model::transferred_byte_cycles;
2use crate::syscalls::{
3    EXEC_LOAD_ELF_V2_CYCLES_BASE, INVALID_FD, MAX_FDS_CREATED, MAX_VMS_SPAWNED, OTHER_END_CLOSED,
4    SPAWN_EXTRA_CYCLES_BASE, SUCCESS, WAIT_FAILURE,
5};
6
7use crate::types::{
8    DataLocation, DataPieceId, FIRST_FD_SLOT, FIRST_VM_ID, Fd, FdArgs, FullSuspendedState,
9    IterationResult, Message, ReadState, RunMode, SgData, SyscallGenerator, TerminatedResult,
10    VmArgs, VmContext, VmId, VmState, WriteState,
11};
12use ckb_traits::{CellDataProvider, ExtensionProvider, HeaderProvider};
13use ckb_types::core::Cycle;
14use ckb_vm::snapshot2::Snapshot2Context;
15use ckb_vm::{
16    Error, FlattenedArgsReader, Register,
17    bytes::Bytes,
18    cost_model::estimate_cycles,
19    elf::parse_elf,
20    machine::{CoreMachine, DefaultMachineBuilder, DefaultMachineRunner, Pause, SupportMachine},
21    memory::Memory,
22    registers::A0,
23    snapshot2::Snapshot2,
24};
25use std::collections::{BTreeMap, HashMap};
26use std::sync::{
27    Arc, Mutex,
28    atomic::{AtomicU64, Ordering},
29};
30
31/// Root process's id.
32pub const ROOT_VM_ID: VmId = FIRST_VM_ID;
33/// The maximum number of VMs that can be created at the same time.
34pub const MAX_VMS_COUNT: u64 = 16;
35/// The maximum number of instantiated VMs.
36pub const MAX_INSTANTIATED_VMS: usize = 4;
37/// The maximum number of fds.
38pub const MAX_FDS: u64 = 64;
39
40/// A single Scheduler instance is used to verify a single script
41/// within a CKB transaction.
42///
43/// A scheduler holds & manipulates a core, the scheduler also holds
44/// all CKB-VM machines, each CKB-VM machine also gets a mutable reference
45/// of the core for IO operations.
46pub struct Scheduler<DL, V, M>
47where
48    DL: CellDataProvider,
49    M: DefaultMachineRunner,
50{
51    /// Immutable context data for current running transaction & script.
52    sg_data: SgData<DL>,
53
54    /// Syscall generator
55    syscall_generator: SyscallGenerator<DL, V, M::Inner>,
56    /// Syscall generator context
57    syscall_context: V,
58
59    /// Total cycles. When a scheduler executes, there are 3 variables
60    /// that might all contain charged cycles: +total_cycles+,
61    /// +iteration_cycles+ and +machine.cycles()+ from the current
62    /// executing virtual machine. At any given time, the sum of all 3
63    /// variables here, represent the total consumed cycles by the current
64    /// scheduler.
65    /// But there are also exceptions: at certain period of time, the cycles
66    /// stored in `machine.cycles()` are moved over to +iteration_cycles+,
67    /// the cycles stored in +iteration_cycles+ would also be moved over to
68    /// +total_cycles+:
69    ///
70    /// * The current running virtual machine would contain consumed
71    ///   cycles in its own machine.cycles() structure.
72    /// * +iteration_cycles+ holds the current consumed cycles each time
73    ///   we executed a virtual machine(also named an iteration). It will
74    ///   always be zero before each iteration(i.e., before each VM starts
75    ///   execution). When a virtual machine finishes execution, the cycles
76    ///   stored in `machine.cycles()` will be moved over to +iteration_cycles+.
77    ///   `machine.cycles()` will then be reset to zero.
78    /// * Processing messages in the message box would alao charge cycles
79    ///   for operations, such as suspending/resuming VMs, transferring data
80    ///   etc. Those cycles were added to +iteration_cycles+ directly. When all
81    ///   postprocessing work is completed, the cycles consumed in
82    ///   +iteration_cycles+ will then be moved to +total_cycles+.
83    ///   +iteration_cycles+ will then be reset to zero.
84    ///
85    /// One can consider that +total_cycles+ contains the total cycles
86    /// consumed in current scheduler, when the scheduler is not busy executing.
87    ///
88    /// NOTE: the above workflow describes the optimal case: `iteration_cycles`
89    /// will always be zero after each iteration. However, our initial implementation
90    /// for Meepo hardfork contains a bug: cycles charged by suspending / resuming
91    /// VMs when processing IOs, will not be reflected in `current cycles` syscalls
92    /// of the subsequent running VMs. To preserve this behavior, consumed cycles in
93    /// iteration_cycles cannot be moved at iterate boundaries. Later hardfork versions
94    /// might fix this, but for the Meepo hardfork, we will have to preserve this behavior.
95    total_cycles: Arc<AtomicU64>,
96    /// Iteration cycles, see +total_cycles+ on its usage
97    iteration_cycles: Cycle,
98    /// Next vm id used by spawn.
99    next_vm_id: VmId,
100    /// Next fd used by pipe.
101    next_fd_slot: u64,
102    /// Used to store VM state.
103    states: BTreeMap<VmId, VmState>,
104    /// Used to confirm the owner of fd.
105    fds: BTreeMap<Fd, VmId>,
106    /// Verify the VM's inherited fd list.
107    inherited_fd: BTreeMap<VmId, Vec<Fd>>,
108    /// Instantiated vms.
109    instantiated: BTreeMap<VmId, (VmContext<DL>, M)>,
110    /// Suspended vms.
111    suspended: BTreeMap<VmId, Snapshot2<DataPieceId>>,
112    /// Terminated vms.
113    terminated_vms: BTreeMap<VmId, i8>,
114
115    /// MessageBox is expected to be empty before returning from `run`
116    /// function, there is no need to persist messages.
117    message_box: Arc<Mutex<Vec<Message>>>,
118}
119
120impl<DL, V, M> Scheduler<DL, V, M>
121where
122    DL: CellDataProvider + HeaderProvider + ExtensionProvider + Clone,
123    V: Clone,
124    M: DefaultMachineRunner,
125{
126    /// Create a new scheduler from empty state
127    pub fn new(
128        sg_data: SgData<DL>,
129        syscall_generator: SyscallGenerator<DL, V, M::Inner>,
130        syscall_context: V,
131    ) -> Self {
132        Self {
133            sg_data,
134            syscall_generator,
135            syscall_context,
136            total_cycles: Arc::new(AtomicU64::new(0)),
137            iteration_cycles: 0,
138            next_vm_id: FIRST_VM_ID,
139            next_fd_slot: FIRST_FD_SLOT,
140            states: BTreeMap::default(),
141            fds: BTreeMap::default(),
142            inherited_fd: BTreeMap::default(),
143            instantiated: BTreeMap::default(),
144            suspended: BTreeMap::default(),
145            message_box: Arc::new(Mutex::new(Vec::new())),
146            terminated_vms: BTreeMap::default(),
147        }
148    }
149
150    /// Return total cycles.
151    pub fn consumed_cycles(&self) -> Cycle {
152        self.total_cycles.load(Ordering::Acquire)
153    }
154
155    /// Fetch specified VM state
156    pub fn state(&self, vm_id: &VmId) -> Option<VmState> {
157        self.states.get(vm_id).cloned()
158    }
159
160    /// Access the SgData data structure
161    pub fn sg_data(&self) -> &SgData<DL> {
162        &self.sg_data
163    }
164
165    /// This function provides a peek into one of the current created
166    /// VM. Depending on the actual state, the VM might either be instantiated
167    /// or suspended. As a result, 2 callback functions must be provided to handle
168    /// both cases. The function only provides a *peek*, meaning the caller must
169    /// not make any changes to an instantiated VMs. the VM is passed as a mutable
170    /// reference only because memory load functions in CKB-VM require mutable
171    /// references. It does not mean the caller can modify the VM in any sense.
172    /// Even a slight tampering of the VM can result in non-determinism.
173    pub fn peek<F, G, W>(&mut self, vm_id: &VmId, mut f: F, mut g: G) -> Result<W, Error>
174    where
175        F: FnMut(&mut M) -> Result<W, Error>,
176        G: FnMut(&Snapshot2<DataPieceId>, &SgData<DL>) -> Result<W, Error>,
177    {
178        if let Some((_, machine)) = self.instantiated.get_mut(vm_id) {
179            return f(machine);
180        }
181        if let Some(snapshot) = self.suspended.get(vm_id) {
182            return g(snapshot, &self.sg_data);
183        }
184        Err(Error::Unexpected(format!("VM {} does not exist!", vm_id)))
185    }
186
187    /// Add cycles to total cycles.
188    fn consume_cycles(&mut self, cycles: Cycle) -> Result<(), Error> {
189        match self
190            .total_cycles
191            .fetch_update(Ordering::AcqRel, Ordering::Acquire, |total_cycles| {
192                total_cycles.checked_add(cycles)
193            }) {
194            Ok(_) => Ok(()),
195            Err(_) => Err(Error::CyclesExceeded),
196        }
197    }
198
199    /// Resume a previously suspended scheduler state
200    pub fn resume(
201        sg_data: SgData<DL>,
202        syscall_generator: SyscallGenerator<DL, V, M::Inner>,
203        syscall_context: V,
204        full: FullSuspendedState,
205    ) -> Self {
206        let mut scheduler = Self {
207            sg_data,
208            syscall_generator,
209            syscall_context,
210            total_cycles: Arc::new(AtomicU64::new(full.total_cycles)),
211            iteration_cycles: full.iteration_cycles,
212            next_vm_id: full.next_vm_id,
213            next_fd_slot: full.next_fd_slot,
214            states: full
215                .vms
216                .iter()
217                .map(|(id, state, _)| (*id, state.clone()))
218                .collect(),
219            fds: full.fds.into_iter().collect(),
220            inherited_fd: full.inherited_fd.into_iter().collect(),
221            instantiated: BTreeMap::default(),
222            suspended: full
223                .vms
224                .into_iter()
225                .map(|(id, _, snapshot)| (id, snapshot))
226                .collect(),
227            message_box: Arc::new(Mutex::new(Vec::new())),
228            terminated_vms: full.terminated_vms.into_iter().collect(),
229        };
230        scheduler
231            .ensure_vms_instantiated(&full.instantiated_ids)
232            .unwrap();
233        // NOTE: suspending/resuming a scheduler is part of CKB's implementation
234        // details. It is not part of execution consensue. We should not charge
235        // cycles for them.
236        scheduler.iteration_cycles = 0;
237        scheduler
238    }
239
240    /// Suspend current scheduler into a serializable full state
241    pub fn suspend(mut self) -> Result<FullSuspendedState, Error> {
242        assert!(self.message_box.lock().expect("lock").is_empty());
243        let mut vms = Vec::with_capacity(self.states.len());
244        let instantiated_ids: Vec<_> = self.instantiated.keys().cloned().collect();
245        for id in &instantiated_ids {
246            self.suspend_vm(id)?;
247        }
248        for (id, state) in self.states {
249            let snapshot = self
250                .suspended
251                .remove(&id)
252                .ok_or_else(|| Error::Unexpected("Unable to find VM Id".to_string()))?;
253            vms.push((id, state, snapshot));
254        }
255        Ok(FullSuspendedState {
256            // NOTE: suspending a scheduler is actually part of CKB's
257            // internal execution logic, it does not belong to VM execution
258            // consensus. We are not charging cycles for suspending
259            // a VM in the process of suspending the whole scheduler.
260            total_cycles: self.total_cycles.load(Ordering::Acquire),
261            iteration_cycles: self.iteration_cycles,
262            next_vm_id: self.next_vm_id,
263            next_fd_slot: self.next_fd_slot,
264            vms,
265            fds: self.fds.into_iter().collect(),
266            inherited_fd: self.inherited_fd.into_iter().collect(),
267            terminated_vms: self.terminated_vms.into_iter().collect(),
268            instantiated_ids,
269        })
270    }
271
272    /// This is the only entrypoint for running the scheduler,
273    /// both newly created instance and resumed instance are supported.
274    /// It accepts 2 run mode, one can either limit the cycles to execute,
275    /// or use a pause signal to trigger termination.
276    ///
277    /// Only when the execution terminates without VM errors, will this
278    /// function return an exit code(could still be non-zero) and total
279    /// consumed cycles.
280    ///
281    /// Err would be returned in the following cases:
282    /// * Cycle limit reached, the returned error would be ckb_vm::Error::CyclesExceeded,
283    /// * Pause trigger, the returned error would be ckb_vm::Error::Pause,
284    /// * Other terminating errors
285    pub fn run(&mut self, mode: RunMode) -> Result<TerminatedResult, Error> {
286        self.boot_root_vm_if_needed()?;
287
288        let (pause, mut limit_cycles) = match mode {
289            RunMode::LimitCycles(limit_cycles) => (Pause::new(), limit_cycles),
290            RunMode::Pause(pause, limit_cycles) => (pause, limit_cycles),
291        };
292
293        while !self.terminated() {
294            limit_cycles = self.iterate_outer(&pause, limit_cycles)?.1;
295        }
296        assert_eq!(self.iteration_cycles, 0);
297
298        self.terminated_result()
299    }
300
301    /// Public API that runs a single VM, processes all messages, then returns the
302    /// executed VM ID(so caller can fetch later data). This can be used when more
303    /// finer tweaks are required for a single VM.
304    pub fn iterate(&mut self) -> Result<IterationResult, Error> {
305        self.boot_root_vm_if_needed()?;
306
307        if self.terminated() {
308            return Ok(IterationResult {
309                executed_vm: ROOT_VM_ID,
310                terminated_status: Some(self.terminated_result()?),
311            });
312        }
313
314        let (id, _) = self.iterate_outer(&Pause::new(), u64::MAX)?;
315        let terminated_status = if self.terminated() {
316            assert_eq!(self.iteration_cycles, 0);
317            Some(self.terminated_result()?)
318        } else {
319            None
320        };
321
322        Ok(IterationResult {
323            executed_vm: id,
324            terminated_status,
325        })
326    }
327
328    /// Returns the machine that needs to be executed in the current iterate.
329    fn iterate_prepare_machine(&mut self) -> Result<(u64, &mut M), Error> {
330        // Find a runnable VM that has the largest ID.
331        let vm_id_to_run = self
332            .states
333            .iter()
334            .rev()
335            .filter(|(_, state)| matches!(state, VmState::Runnable))
336            .map(|(id, _)| *id)
337            .next();
338        let vm_id_to_run = vm_id_to_run.ok_or_else(|| {
339            Error::Unexpected("A deadlock situation has been reached!".to_string())
340        })?;
341        let (_context, machine) = self.ensure_get_instantiated(&vm_id_to_run)?;
342        Ok((vm_id_to_run, machine))
343    }
344
345    /// Process machine execution results in the current iterate.
346    fn iterate_process_results(
347        &mut self,
348        vm_id_to_run: u64,
349        result: Result<i8, Error>,
350    ) -> Result<(), Error> {
351        // Process message box, update VM states accordingly
352        self.process_message_box()?;
353        assert!(self.message_box.lock().expect("lock").is_empty());
354        // If the VM terminates, update VMs in join state, also closes its fds
355        let result = match result {
356            Ok(code) => {
357                self.terminated_vms.insert(vm_id_to_run, code);
358                // When root VM terminates, the execution stops immediately, we will purge
359                // all non-root VMs, and only keep root VM in states.
360                // When non-root VM terminates, we only purge the VM's own states.
361                if vm_id_to_run == ROOT_VM_ID {
362                    self.ensure_vms_instantiated(&[vm_id_to_run])?;
363                    self.instantiated.retain(|id, _| *id == vm_id_to_run);
364                    self.suspended.clear();
365                    self.states.clear();
366                    self.states.insert(vm_id_to_run, VmState::Terminated);
367                } else {
368                    let joining_vms: Vec<(VmId, u64)> = self
369                        .states
370                        .iter()
371                        .filter_map(|(vm_id, state)| match state {
372                            VmState::Wait {
373                                target_vm_id,
374                                exit_code_addr,
375                            } if *target_vm_id == vm_id_to_run => Some((*vm_id, *exit_code_addr)),
376                            _ => None,
377                        })
378                        .collect();
379                    // For all joining VMs, update exit code, then mark them as
380                    // runnable state.
381                    for (vm_id, exit_code_addr) in joining_vms {
382                        let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
383                        machine
384                            .inner_mut()
385                            .memory_mut()
386                            .store8(&Self::u64_to_reg(exit_code_addr), &Self::i8_to_reg(code))?;
387                        machine
388                            .inner_mut()
389                            .set_register(A0, Self::u8_to_reg(SUCCESS));
390                        self.states.insert(vm_id, VmState::Runnable);
391                    }
392                    // Close fds
393                    self.fds.retain(|_, vm_id| *vm_id != vm_id_to_run);
394                    // Clear terminated VM states
395                    self.states.remove(&vm_id_to_run);
396                    self.instantiated.remove(&vm_id_to_run);
397                    self.suspended.remove(&vm_id_to_run);
398                }
399                Ok(())
400            }
401            Err(Error::Yield) => Ok(()),
402            Err(e) => Err(e),
403        };
404        result
405    }
406
407    // This internal function is actually a wrapper over +iterate_inner+,
408    // it is split into a different function, so cycle calculation will be
409    // executed no matter what result +iterate_inner+ returns.
410    #[inline]
411    fn iterate_outer(
412        &mut self,
413        pause: &Pause,
414        limit_cycles: Cycle,
415    ) -> Result<(VmId, Cycle), Error> {
416        let iterate_return = self.iterate_inner(pause.clone(), limit_cycles);
417        self.consume_cycles(self.iteration_cycles)?;
418        let remaining_cycles = limit_cycles
419            .checked_sub(self.iteration_cycles)
420            .ok_or(Error::CyclesExceeded)?;
421        // Clear iteration cycles intentionally after each run
422        self.iteration_cycles = 0;
423        // Process all pending VM reads & writes. Notice ideally, this invocation
424        // should be put at the end of `iterate_inner` function. However, 2 things
425        // prevent this:
426        //
427        // * In earlier implementation of the Meepo hardfork version, `self.process_io`
428        // was put at the very start of +iterate_prepare_machine+ method. Meaning we used
429        // to process IO syscalls at the very start of a new iteration.
430        // * Earlier implementation contains a bug that cycles consumed by suspending / resuming
431        // VMs are not updated in the subsequent VM's `current cycles` syscalls.
432        //
433        // To make ckb-script package suitable for outside usage, we want IOs processed at
434        // the end of each iteration, not at the start of the next iteration. We also need
435        // to replicate the exact same runtime behavior of Meepo hardfork. This means the only
436        // viable change will be:
437        //
438        // * Move `self.process_io` call to the very end of `iterate_outer` method, which is
439        // exactly current location
440        // * For now we have to live with the fact that `iteration_cycles` will not always be
441        // zero at iteration boundaries, and also preserve its value in `FullSuspendedState`.
442        //
443        // One expected change is that +process_io+ is now called once more
444        // after the whole scheduler terminates, and not called at the very beginning
445        // when no VM is executing. But since no VMs will be in IO states at this 2 timeslot,
446        // we should be fine here.
447        self.process_io()?;
448        let id = iterate_return?;
449        Ok((id, remaining_cycles))
450    }
451
452    // This is internal function that does the actual VM execution loop.
453    // Here both pause signal and limit_cycles are provided so as to simplify
454    // branches.
455    fn iterate_inner(&mut self, pause: Pause, limit_cycles: Cycle) -> Result<VmId, Error> {
456        // Execute the VM for real, consumed cycles in the virtual machine is
457        // moved over to +iteration_cycles+, then we reset virtual machine's own
458        // cycle count to zero.
459        let (id, result, cycles) = {
460            let (id, vm) = self.iterate_prepare_machine()?;
461            vm.inner_mut().set_max_cycles(limit_cycles);
462            vm.machine_mut().set_pause(pause);
463            let result = vm.run();
464            let cycles = vm.machine().cycles();
465            vm.inner_mut().set_cycles(0);
466            (id, result, cycles)
467        };
468        self.iteration_cycles = self
469            .iteration_cycles
470            .checked_add(cycles)
471            .ok_or(Error::CyclesExceeded)?;
472        self.iterate_process_results(id, result)?;
473        Ok(id)
474    }
475
476    fn process_message_box(&mut self) -> Result<(), Error> {
477        let messages: Vec<Message> = self.message_box.lock().expect("lock").drain(..).collect();
478        for message in messages {
479            match message {
480                Message::ExecV2(vm_id, args) => {
481                    let (old_context, old_machine) = self
482                        .instantiated
483                        .get_mut(&vm_id)
484                        .ok_or_else(|| Error::Unexpected("Unable to find VM Id".to_string()))?;
485                    old_machine
486                        .inner_mut()
487                        .add_cycles_no_checking(EXEC_LOAD_ELF_V2_CYCLES_BASE)?;
488                    let old_cycles = old_machine.machine().cycles();
489                    let max_cycles = old_machine.machine().max_cycles();
490                    let program = {
491                        let sc = old_context.snapshot2_context.lock().expect("lock");
492                        sc.load_data(
493                            &args.location.data_piece_id,
494                            args.location.offset,
495                            args.location.length,
496                        )?
497                        .0
498                    };
499                    let (context, mut new_machine) = self.create_dummy_vm(&vm_id)?;
500                    new_machine.inner_mut().set_max_cycles(max_cycles);
501                    new_machine.inner_mut().add_cycles_no_checking(old_cycles)?;
502                    self.load_vm_program(
503                        &context,
504                        &mut new_machine,
505                        &args.location,
506                        program,
507                        VmArgs::Reader {
508                            vm_id,
509                            argc: args.argc,
510                            argv: args.argv,
511                        },
512                    )?;
513                    // The insert operation removes the old vm instance and adds the new vm instance.
514                    debug_assert!(self.instantiated.contains_key(&vm_id));
515                    self.instantiated.insert(vm_id, (context, new_machine));
516                }
517                Message::Spawn(vm_id, args) => {
518                    // All fds must belong to the correct owner
519                    if args.fds.iter().any(|fd| self.fds.get(fd) != Some(&vm_id)) {
520                        let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
521                        machine
522                            .inner_mut()
523                            .set_register(A0, Self::u8_to_reg(INVALID_FD));
524                        continue;
525                    }
526                    if self.suspended.len() + self.instantiated.len() > MAX_VMS_COUNT as usize {
527                        let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
528                        machine
529                            .inner_mut()
530                            .set_register(A0, Self::u8_to_reg(MAX_VMS_SPAWNED));
531                        continue;
532                    }
533                    let spawned_vm_id = self.boot_vm(
534                        &args.location,
535                        VmArgs::Reader {
536                            vm_id,
537                            argc: args.argc,
538                            argv: args.argv,
539                        },
540                    )?;
541                    // Move passed fds from spawner to spawnee
542                    for fd in &args.fds {
543                        self.fds.insert(*fd, spawned_vm_id);
544                    }
545                    // Here we keep the original version of file descriptors.
546                    // If one fd is moved afterward, this inherited file descriptors doesn't change.
547                    self.inherited_fd.insert(spawned_vm_id, args.fds.clone());
548
549                    let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
550                    machine.inner_mut().memory_mut().store64(
551                        &Self::u64_to_reg(args.process_id_addr),
552                        &Self::u64_to_reg(spawned_vm_id),
553                    )?;
554                    machine
555                        .inner_mut()
556                        .set_register(A0, Self::u8_to_reg(SUCCESS));
557                }
558                Message::Wait(vm_id, args) => {
559                    if let Some(exit_code) = self.terminated_vms.get(&args.target_id).copied() {
560                        let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
561                        machine.inner_mut().memory_mut().store8(
562                            &Self::u64_to_reg(args.exit_code_addr),
563                            &Self::i8_to_reg(exit_code),
564                        )?;
565                        machine
566                            .inner_mut()
567                            .set_register(A0, Self::u8_to_reg(SUCCESS));
568                        self.states.insert(vm_id, VmState::Runnable);
569                        self.terminated_vms.retain(|id, _| id != &args.target_id);
570                        continue;
571                    }
572                    if !self.states.contains_key(&args.target_id) {
573                        let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
574                        machine
575                            .inner_mut()
576                            .set_register(A0, Self::u8_to_reg(WAIT_FAILURE));
577                        continue;
578                    }
579                    // Return code will be updated when the joining VM exits
580                    self.states.insert(
581                        vm_id,
582                        VmState::Wait {
583                            target_vm_id: args.target_id,
584                            exit_code_addr: args.exit_code_addr,
585                        },
586                    );
587                }
588                Message::Pipe(vm_id, args) => {
589                    if self.fds.len() as u64 >= MAX_FDS {
590                        let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
591                        machine
592                            .inner_mut()
593                            .set_register(A0, Self::u8_to_reg(MAX_FDS_CREATED));
594                        continue;
595                    }
596                    let (p1, p2, slot) = Fd::create(self.next_fd_slot);
597                    self.next_fd_slot = slot;
598                    self.fds.insert(p1, vm_id);
599                    self.fds.insert(p2, vm_id);
600                    let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
601                    machine
602                        .inner_mut()
603                        .memory_mut()
604                        .store64(&Self::u64_to_reg(args.fd1_addr), &Self::u64_to_reg(p1.0))?;
605                    machine
606                        .inner_mut()
607                        .memory_mut()
608                        .store64(&Self::u64_to_reg(args.fd2_addr), &Self::u64_to_reg(p2.0))?;
609                    machine
610                        .inner_mut()
611                        .set_register(A0, Self::u8_to_reg(SUCCESS));
612                }
613                Message::FdRead(vm_id, args) => {
614                    if !(self.fds.contains_key(&args.fd) && (self.fds[&args.fd] == vm_id)) {
615                        let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
616                        machine
617                            .inner_mut()
618                            .set_register(A0, Self::u8_to_reg(INVALID_FD));
619                        continue;
620                    }
621                    if !self.fds.contains_key(&args.fd.other_fd()) {
622                        let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
623                        machine
624                            .inner_mut()
625                            .set_register(A0, Self::u8_to_reg(OTHER_END_CLOSED));
626                        continue;
627                    }
628                    // Return code will be updated when the read operation finishes
629                    self.states.insert(
630                        vm_id,
631                        VmState::WaitForRead(ReadState {
632                            fd: args.fd,
633                            length: args.length,
634                            buffer_addr: args.buffer_addr,
635                            length_addr: args.length_addr,
636                        }),
637                    );
638                }
639                Message::FdWrite(vm_id, args) => {
640                    if !(self.fds.contains_key(&args.fd) && (self.fds[&args.fd] == vm_id)) {
641                        let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
642                        machine
643                            .inner_mut()
644                            .set_register(A0, Self::u8_to_reg(INVALID_FD));
645                        continue;
646                    }
647                    if !self.fds.contains_key(&args.fd.other_fd()) {
648                        let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
649                        machine
650                            .inner_mut()
651                            .set_register(A0, Self::u8_to_reg(OTHER_END_CLOSED));
652                        continue;
653                    }
654                    // Return code will be updated when the write operation finishes
655                    self.states.insert(
656                        vm_id,
657                        VmState::WaitForWrite(WriteState {
658                            fd: args.fd,
659                            consumed: 0,
660                            length: args.length,
661                            buffer_addr: args.buffer_addr,
662                            length_addr: args.length_addr,
663                        }),
664                    );
665                }
666                Message::InheritedFileDescriptor(vm_id, args) => {
667                    let inherited_fd = if vm_id == ROOT_VM_ID {
668                        Vec::new()
669                    } else {
670                        self.inherited_fd[&vm_id].clone()
671                    };
672                    let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
673                    let FdArgs {
674                        buffer_addr,
675                        length_addr,
676                        ..
677                    } = args;
678                    let full_length = machine
679                        .inner_mut()
680                        .memory_mut()
681                        .load64(&Self::u64_to_reg(length_addr))?
682                        .to_u64();
683                    let real_length = inherited_fd.len() as u64;
684                    let copy_length = u64::min(full_length, real_length);
685                    for i in 0..copy_length {
686                        let fd = inherited_fd[i as usize].0;
687                        let addr = buffer_addr.checked_add(i * 8).ok_or(Error::MemOutOfBound)?;
688                        machine
689                            .inner_mut()
690                            .memory_mut()
691                            .store64(&Self::u64_to_reg(addr), &Self::u64_to_reg(fd))?;
692                    }
693                    machine.inner_mut().memory_mut().store64(
694                        &Self::u64_to_reg(length_addr),
695                        &Self::u64_to_reg(real_length),
696                    )?;
697                    machine
698                        .inner_mut()
699                        .set_register(A0, Self::u8_to_reg(SUCCESS));
700                }
701                Message::Close(vm_id, fd) => {
702                    if self.fds.get(&fd) != Some(&vm_id) {
703                        let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
704                        machine
705                            .inner_mut()
706                            .set_register(A0, Self::u8_to_reg(INVALID_FD));
707                    } else {
708                        self.fds.remove(&fd);
709                        let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
710                        machine
711                            .inner_mut()
712                            .set_register(A0, Self::u8_to_reg(SUCCESS));
713                    }
714                }
715            }
716        }
717        Ok(())
718    }
719
720    fn process_io(&mut self) -> Result<(), Error> {
721        let mut reads: HashMap<Fd, (VmId, ReadState)> = HashMap::default();
722        let mut closed_fds: Vec<VmId> = Vec::new();
723
724        self.states.iter().for_each(|(vm_id, state)| {
725            if let VmState::WaitForRead(inner_state) = state {
726                if self.fds.contains_key(&inner_state.fd.other_fd()) {
727                    reads.insert(inner_state.fd, (*vm_id, inner_state.clone()));
728                } else {
729                    closed_fds.push(*vm_id);
730                }
731            }
732        });
733        let mut pairs: Vec<(VmId, ReadState, VmId, WriteState)> = Vec::new();
734        self.states.iter().for_each(|(vm_id, state)| {
735            if let VmState::WaitForWrite(inner_state) = state {
736                if self.fds.contains_key(&inner_state.fd.other_fd()) {
737                    if let Some((read_vm_id, read_state)) = reads.get(&inner_state.fd.other_fd()) {
738                        pairs.push((*read_vm_id, read_state.clone(), *vm_id, inner_state.clone()));
739                    }
740                } else {
741                    closed_fds.push(*vm_id);
742                }
743            }
744        });
745        // Finish read / write syscalls for fds that are closed on the other end
746        for vm_id in closed_fds {
747            match self.states[&vm_id].clone() {
748                VmState::WaitForRead(ReadState { length_addr, .. }) => {
749                    let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
750                    machine.inner_mut().memory_mut().store64(
751                        &Self::u64_to_reg(length_addr),
752                        &<M::Inner as CoreMachine>::REG::zero(),
753                    )?;
754                    machine
755                        .inner_mut()
756                        .set_register(A0, Self::u8_to_reg(SUCCESS));
757                    self.states.insert(vm_id, VmState::Runnable);
758                }
759                VmState::WaitForWrite(WriteState {
760                    consumed,
761                    length_addr,
762                    ..
763                }) => {
764                    let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
765                    machine
766                        .inner_mut()
767                        .memory_mut()
768                        .store64(&Self::u64_to_reg(length_addr), &Self::u64_to_reg(consumed))?;
769                    machine
770                        .inner_mut()
771                        .set_register(A0, Self::u8_to_reg(SUCCESS));
772                    self.states.insert(vm_id, VmState::Runnable);
773                }
774                _ => (),
775            }
776        }
777        // Transferring data from write fds to read fds
778        for (read_vm_id, read_state, write_vm_id, write_state) in pairs {
779            let ReadState {
780                length: read_length,
781                buffer_addr: read_buffer_addr,
782                length_addr: read_length_addr,
783                ..
784            } = read_state;
785            let WriteState {
786                fd: write_fd,
787                mut consumed,
788                length: write_length,
789                buffer_addr: write_buffer_addr,
790                length_addr: write_length_addr,
791            } = write_state;
792
793            self.ensure_vms_instantiated(&[read_vm_id, write_vm_id])?;
794            {
795                let fillable = read_length;
796                let consumable = write_length - consumed;
797                let copiable = std::cmp::min(fillable, consumable);
798
799                // Actual data copying
800                let (_, write_machine) = self
801                    .instantiated
802                    .get_mut(&write_vm_id)
803                    .ok_or_else(|| Error::Unexpected("Unable to find VM Id".to_string()))?;
804                write_machine
805                    .inner_mut()
806                    .add_cycles_no_checking(transferred_byte_cycles(copiable))?;
807                let data = write_machine
808                    .inner_mut()
809                    .memory_mut()
810                    .load_bytes(write_buffer_addr.wrapping_add(consumed), copiable)?;
811                let (_, read_machine) = self
812                    .instantiated
813                    .get_mut(&read_vm_id)
814                    .ok_or_else(|| Error::Unexpected("Unable to find VM Id".to_string()))?;
815                read_machine
816                    .inner_mut()
817                    .add_cycles_no_checking(transferred_byte_cycles(copiable))?;
818                read_machine
819                    .inner_mut()
820                    .memory_mut()
821                    .store_bytes(read_buffer_addr, &data)?;
822                // Read syscall terminates as soon as some data are filled
823                read_machine.inner_mut().memory_mut().store64(
824                    &Self::u64_to_reg(read_length_addr),
825                    &Self::u64_to_reg(copiable),
826                )?;
827                read_machine
828                    .inner_mut()
829                    .set_register(A0, Self::u8_to_reg(SUCCESS));
830                self.states.insert(read_vm_id, VmState::Runnable);
831
832                // Write syscall, however, terminates only when all the data
833                // have been written, or when the pairing read fd is closed.
834                consumed += copiable;
835                if consumed == write_length {
836                    // Write VM has fulfilled its write request
837                    let (_, write_machine) = self
838                        .instantiated
839                        .get_mut(&write_vm_id)
840                        .ok_or_else(|| Error::Unexpected("Unable to find VM Id".to_string()))?;
841                    write_machine.inner_mut().memory_mut().store64(
842                        &Self::u64_to_reg(write_length_addr),
843                        &Self::u64_to_reg(write_length),
844                    )?;
845                    write_machine
846                        .inner_mut()
847                        .set_register(A0, Self::u8_to_reg(SUCCESS));
848                    self.states.insert(write_vm_id, VmState::Runnable);
849                } else {
850                    // Only update write VM state
851                    self.states.insert(
852                        write_vm_id,
853                        VmState::WaitForWrite(WriteState {
854                            fd: write_fd,
855                            consumed,
856                            length: write_length,
857                            buffer_addr: write_buffer_addr,
858                            length_addr: write_length_addr,
859                        }),
860                    );
861                }
862            }
863        }
864        Ok(())
865    }
866
867    /// If current scheduler is terminated
868    pub fn terminated(&self) -> bool {
869        self.states
870            .get(&ROOT_VM_ID)
871            .map(|state| *state == VmState::Terminated)
872            .unwrap_or(false)
873    }
874
875    fn terminated_result(&mut self) -> Result<TerminatedResult, Error> {
876        assert!(self.terminated());
877
878        let exit_code = {
879            let root_vm = &self.ensure_get_instantiated(&ROOT_VM_ID)?.1;
880            root_vm.machine().exit_code()
881        };
882        Ok(TerminatedResult {
883            exit_code,
884            consumed_cycles: self.consumed_cycles(),
885        })
886    }
887
888    // Ensure VMs are instantiated
889    fn ensure_vms_instantiated(&mut self, ids: &[VmId]) -> Result<(), Error> {
890        if ids.len() > MAX_INSTANTIATED_VMS {
891            return Err(Error::Unexpected(format!(
892                "At most {} VMs can be instantiated but {} are requested!",
893                MAX_INSTANTIATED_VMS,
894                ids.len()
895            )));
896        }
897
898        let mut uninstantiated_ids: Vec<VmId> = ids
899            .iter()
900            .filter(|id| !self.instantiated.contains_key(id))
901            .copied()
902            .collect();
903        while (!uninstantiated_ids.is_empty()) && (self.instantiated.len() < MAX_INSTANTIATED_VMS) {
904            let id = uninstantiated_ids
905                .pop()
906                .ok_or_else(|| Error::Unexpected("Map should not be empty".to_string()))?;
907            self.resume_vm(&id)?;
908        }
909
910        if !uninstantiated_ids.is_empty() {
911            // Instantiated is a BTreeMap, an iterator on it maintains key order to ensure deterministic behavior
912            let suspendable_ids: Vec<VmId> = self
913                .instantiated
914                .keys()
915                .filter(|id| !ids.contains(id))
916                .copied()
917                .collect();
918
919            assert!(suspendable_ids.len() >= uninstantiated_ids.len());
920            for i in 0..uninstantiated_ids.len() {
921                self.suspend_vm(&suspendable_ids[i])?;
922                self.resume_vm(&uninstantiated_ids[i])?;
923            }
924        }
925
926        Ok(())
927    }
928
929    /// Ensure corresponding VM is instantiated and return a mutable reference to it
930    fn ensure_get_instantiated(&mut self, id: &VmId) -> Result<&mut (VmContext<DL>, M), Error> {
931        self.ensure_vms_instantiated(&[*id])?;
932        self.instantiated
933            .get_mut(id)
934            .ok_or_else(|| Error::Unexpected("Unable to find VM Id".to_string()))
935    }
936
937    // Resume a suspended VM
938    fn resume_vm(&mut self, id: &VmId) -> Result<(), Error> {
939        if !self.suspended.contains_key(id) {
940            return Err(Error::Unexpected(format!("VM {:?} is not suspended!", id)));
941        }
942        let snapshot = &self.suspended[id];
943        self.iteration_cycles = self
944            .iteration_cycles
945            .checked_add(SPAWN_EXTRA_CYCLES_BASE)
946            .ok_or(Error::CyclesExceeded)?;
947        let (context, mut machine) = self.create_dummy_vm(id)?;
948        {
949            let mut sc = context.snapshot2_context.lock().expect("lock");
950            sc.resume(machine.inner_mut(), snapshot)?;
951        }
952        self.instantiated.insert(*id, (context, machine));
953        self.suspended.remove(id);
954        Ok(())
955    }
956
957    // Suspend an instantiated VM
958    fn suspend_vm(&mut self, id: &VmId) -> Result<(), Error> {
959        if !self.instantiated.contains_key(id) {
960            return Err(Error::Unexpected(format!(
961                "VM {:?} is not instantiated!",
962                id
963            )));
964        }
965        self.iteration_cycles = self
966            .iteration_cycles
967            .checked_add(SPAWN_EXTRA_CYCLES_BASE)
968            .ok_or(Error::CyclesExceeded)?;
969        let (context, machine) = self
970            .instantiated
971            .get_mut(id)
972            .ok_or_else(|| Error::Unexpected("Unable to find VM Id".to_string()))?;
973        let snapshot = {
974            let sc = context.snapshot2_context.lock().expect("lock");
975            sc.make_snapshot(machine.inner_mut())?
976        };
977        self.suspended.insert(*id, snapshot);
978        self.instantiated.remove(id);
979        Ok(())
980    }
981
982    fn boot_root_vm_if_needed(&mut self) -> Result<(), Error> {
983        if self.states.is_empty() {
984            // Booting phase, we will need to initialize the first VM.
985            let program_id = self.sg_data.sg_info.program_data_piece_id.clone();
986            assert_eq!(
987                self.boot_vm(
988                    &DataLocation {
989                        data_piece_id: program_id,
990                        offset: 0,
991                        length: u64::MAX,
992                    },
993                    VmArgs::Vector(vec![]),
994                )?,
995                ROOT_VM_ID
996            );
997        }
998        assert!(self.states.contains_key(&ROOT_VM_ID));
999
1000        Ok(())
1001    }
1002
1003    /// Boot a vm by given program and args.
1004    fn boot_vm(&mut self, location: &DataLocation, args: VmArgs) -> Result<VmId, Error> {
1005        let id = self.next_vm_id;
1006        self.next_vm_id += 1;
1007        let (context, mut machine) = self.create_dummy_vm(&id)?;
1008        let (program, _) = {
1009            let sc = context.snapshot2_context.lock().expect("lock");
1010            sc.load_data(&location.data_piece_id, location.offset, location.length)?
1011        };
1012        self.load_vm_program(&context, &mut machine, location, program, args)?;
1013        // Newly booted VM will be instantiated by default
1014        while self.instantiated.len() >= MAX_INSTANTIATED_VMS {
1015            // Instantiated is a BTreeMap, first_entry will maintain key order
1016            let id = *self
1017                .instantiated
1018                .first_entry()
1019                .ok_or_else(|| Error::Unexpected("Map should not be empty".to_string()))?
1020                .key();
1021            self.suspend_vm(&id)?;
1022        }
1023
1024        self.instantiated.insert(id, (context, machine));
1025        self.states.insert(id, VmState::Runnable);
1026
1027        Ok(id)
1028    }
1029
1030    // Load the program into an empty vm.
1031    fn load_vm_program(
1032        &mut self,
1033        context: &VmContext<DL>,
1034        machine: &mut M,
1035        location: &DataLocation,
1036        program: Bytes,
1037        args: VmArgs,
1038    ) -> Result<u64, Error> {
1039        let metadata = parse_elf::<u64>(&program, machine.inner_mut().version())?;
1040        let bytes = match args {
1041            VmArgs::Reader { vm_id, argc, argv } => {
1042                let (_, machine_from) = self.ensure_get_instantiated(&vm_id)?;
1043                let argc = Self::u64_to_reg(argc);
1044                let argv = Self::u64_to_reg(argv);
1045                let argv =
1046                    FlattenedArgsReader::new(machine_from.inner_mut().memory_mut(), argc, argv);
1047                machine.load_program_with_metadata(&program, &metadata, argv)?
1048            }
1049            VmArgs::Vector(data) => {
1050                machine.load_program_with_metadata(&program, &metadata, data.into_iter().map(Ok))?
1051            }
1052        };
1053        let mut sc = context.snapshot2_context.lock().expect("lock");
1054        sc.mark_program(
1055            machine.inner_mut(),
1056            &metadata,
1057            &location.data_piece_id,
1058            location.offset,
1059        )?;
1060        machine
1061            .inner_mut()
1062            .add_cycles_no_checking(transferred_byte_cycles(bytes))?;
1063        Ok(bytes)
1064    }
1065
1066    // Create a new VM instance with syscalls attached
1067    fn create_dummy_vm(&self, id: &VmId) -> Result<(VmContext<DL>, M), Error> {
1068        let version = &self.sg_data.sg_info.script_version;
1069        let core_machine = M::Inner::new(
1070            version.vm_isa(),
1071            version.vm_version(),
1072            // We will update max_cycles for each machine when it gets a chance to run
1073            u64::MAX,
1074        );
1075        let vm_context = VmContext {
1076            base_cycles: Arc::clone(&self.total_cycles),
1077            message_box: Arc::clone(&self.message_box),
1078            snapshot2_context: Arc::new(Mutex::new(Snapshot2Context::new(self.sg_data.clone()))),
1079        };
1080
1081        let machine_builder = DefaultMachineBuilder::new(core_machine)
1082            .instruction_cycle_func(Box::new(estimate_cycles));
1083        let machine_builder =
1084            (self.syscall_generator)(id, &self.sg_data, &vm_context, &self.syscall_context)
1085                .into_iter()
1086                .fold(machine_builder, |builder, syscall| builder.syscall(syscall));
1087        let default_machine = machine_builder.build();
1088        Ok((vm_context, M::new(default_machine)))
1089    }
1090
1091    fn i8_to_reg(v: i8) -> <M::Inner as CoreMachine>::REG {
1092        <M::Inner as CoreMachine>::REG::from_i8(v)
1093    }
1094
1095    fn u8_to_reg(v: u8) -> <M::Inner as CoreMachine>::REG {
1096        <M::Inner as CoreMachine>::REG::from_u8(v)
1097    }
1098
1099    fn u64_to_reg(v: u64) -> <M::Inner as CoreMachine>::REG {
1100        <M::Inner as CoreMachine>::REG::from_u64(v)
1101    }
1102}