ckb_script/
scheduler.rs

1use crate::cost_model::transferred_byte_cycles;
2use crate::syscalls::{
3    EXEC_LOAD_ELF_V2_CYCLES_BASE, INVALID_FD, MAX_FDS_CREATED, MAX_VMS_SPAWNED, OTHER_END_CLOSED,
4    SPAWN_EXTRA_CYCLES_BASE, SUCCESS, WAIT_FAILURE,
5};
6
7use crate::types::{
8    DataLocation, DataPieceId, FIRST_FD_SLOT, FIRST_VM_ID, Fd, FdArgs, FullSuspendedState, Message,
9    ReadState, RunMode, SgData, SyscallGenerator, VmArgs, VmContext, VmId, VmState, WriteState,
10};
11use ckb_traits::{CellDataProvider, ExtensionProvider, HeaderProvider};
12use ckb_types::core::Cycle;
13use ckb_vm::snapshot2::Snapshot2Context;
14use ckb_vm::{
15    Error, FlattenedArgsReader, Register,
16    bytes::Bytes,
17    cost_model::estimate_cycles,
18    elf::parse_elf,
19    machine::{CoreMachine, DefaultMachineBuilder, DefaultMachineRunner, Pause, SupportMachine},
20    memory::Memory,
21    registers::A0,
22    snapshot2::Snapshot2,
23};
24use std::collections::{BTreeMap, HashMap};
25use std::sync::{
26    Arc, Mutex,
27    atomic::{AtomicU64, Ordering},
28};
29
30/// Root process's id.
31pub const ROOT_VM_ID: VmId = FIRST_VM_ID;
32/// The maximum number of VMs that can be created at the same time.
33pub const MAX_VMS_COUNT: u64 = 16;
34/// The maximum number of instantiated VMs.
35pub const MAX_INSTANTIATED_VMS: usize = 4;
36/// The maximum number of fds.
37pub const MAX_FDS: u64 = 64;
38
39/// A single Scheduler instance is used to verify a single script
40/// within a CKB transaction.
41///
42/// A scheduler holds & manipulates a core, the scheduler also holds
43/// all CKB-VM machines, each CKB-VM machine also gets a mutable reference
44/// of the core for IO operations.
45pub struct Scheduler<DL, V, M>
46where
47    DL: CellDataProvider,
48    M: DefaultMachineRunner,
49{
50    /// Immutable context data for current running transaction & script.
51    pub sg_data: SgData<DL>,
52
53    /// Syscall generator
54    pub syscall_generator: SyscallGenerator<DL, V, M::Inner>,
55    /// Syscall generator context
56    pub syscall_context: V,
57
58    /// Total cycles. When a scheduler executes, there are 3 variables
59    /// that might all contain charged cycles: +total_cycles+,
60    /// +iteration_cycles+ and +machine.cycles()+ from the current
61    /// executing virtual machine. At any given time, the sum of all 3
62    /// variables here, represent the total consumed cycles by the current
63    /// scheduler.
64    /// But there are also exceptions: at certain period of time, the cycles
65    /// stored in `machine.cycles()` are moved over to +iteration_cycles+,
66    /// the cycles stored in +iteration_cycles+ would also be moved over to
67    /// +total_cycles+:
68    ///
69    /// * The current running virtual machine would contain consumed
70    ///   cycles in its own machine.cycles() structure.
71    /// * +iteration_cycles+ holds the current consumed cycles each time
72    ///   we executed a virtual machine(also named an iteration). It will
73    ///   always be zero before each iteration(i.e., before each VM starts
74    ///   execution). When a virtual machine finishes execution, the cycles
75    ///   stored in `machine.cycles()` will be moved over to +iteration_cycles+.
76    ///   `machine.cycles()` will then be reset to zero.
77    /// * Processing messages in the message box would alao charge cycles
78    ///   for operations, such as suspending/resuming VMs, transferring data
79    ///   etc. Those cycles were added to +iteration_cycles+ directly. When all
80    ///   postprocessing work is completed, the cycles consumed in
81    ///   +iteration_cycles+ will then be moved to +total_cycles+.
82    ///   +iteration_cycles+ will then be reset to zero.
83    ///
84    /// One can consider that +total_cycles+ contains the total cycles
85    /// consumed in current scheduler, when the scheduler is not busy executing.
86    pub total_cycles: Arc<AtomicU64>,
87    /// Iteration cycles, see +total_cycles+ on its usage
88    pub iteration_cycles: Cycle,
89    /// Next vm id used by spawn.
90    pub next_vm_id: VmId,
91    /// Next fd used by pipe.
92    pub next_fd_slot: u64,
93    /// Used to store VM state.
94    pub states: BTreeMap<VmId, VmState>,
95    /// Used to confirm the owner of fd.
96    pub fds: BTreeMap<Fd, VmId>,
97    /// Verify the VM's inherited fd list.
98    pub inherited_fd: BTreeMap<VmId, Vec<Fd>>,
99    /// Instantiated vms.
100    pub instantiated: BTreeMap<VmId, (VmContext<DL>, M)>,
101    /// Suspended vms.
102    pub suspended: BTreeMap<VmId, Snapshot2<DataPieceId>>,
103    /// Terminated vms.
104    pub terminated_vms: BTreeMap<VmId, i8>,
105
106    /// MessageBox is expected to be empty before returning from `run`
107    /// function, there is no need to persist messages.
108    pub message_box: Arc<Mutex<Vec<Message>>>,
109}
110
111impl<DL, V, M> Scheduler<DL, V, M>
112where
113    DL: CellDataProvider + HeaderProvider + ExtensionProvider + Clone,
114    V: Clone,
115    M: DefaultMachineRunner,
116{
117    /// Create a new scheduler from empty state
118    pub fn new(
119        sg_data: SgData<DL>,
120        syscall_generator: SyscallGenerator<DL, V, M::Inner>,
121        syscall_context: V,
122    ) -> Self {
123        Self {
124            sg_data,
125            syscall_generator,
126            syscall_context,
127            total_cycles: Arc::new(AtomicU64::new(0)),
128            iteration_cycles: 0,
129            next_vm_id: FIRST_VM_ID,
130            next_fd_slot: FIRST_FD_SLOT,
131            states: BTreeMap::default(),
132            fds: BTreeMap::default(),
133            inherited_fd: BTreeMap::default(),
134            instantiated: BTreeMap::default(),
135            suspended: BTreeMap::default(),
136            message_box: Arc::new(Mutex::new(Vec::new())),
137            terminated_vms: BTreeMap::default(),
138        }
139    }
140
141    /// Return total cycles.
142    pub fn consumed_cycles(&self) -> Cycle {
143        self.total_cycles.load(Ordering::Acquire)
144    }
145
146    /// Add cycles to total cycles.
147    pub fn consume_cycles(&mut self, cycles: Cycle) -> Result<(), Error> {
148        match self
149            .total_cycles
150            .fetch_update(Ordering::AcqRel, Ordering::Acquire, |total_cycles| {
151                total_cycles.checked_add(cycles)
152            }) {
153            Ok(_) => Ok(()),
154            Err(_) => Err(Error::CyclesExceeded),
155        }
156    }
157
158    /// Resume a previously suspended scheduler state
159    pub fn resume(
160        sg_data: SgData<DL>,
161        syscall_generator: SyscallGenerator<DL, V, M::Inner>,
162        syscall_context: V,
163        full: FullSuspendedState,
164    ) -> Self {
165        let mut scheduler = Self {
166            sg_data,
167            syscall_generator,
168            syscall_context,
169            total_cycles: Arc::new(AtomicU64::new(full.total_cycles)),
170            iteration_cycles: 0,
171            next_vm_id: full.next_vm_id,
172            next_fd_slot: full.next_fd_slot,
173            states: full
174                .vms
175                .iter()
176                .map(|(id, state, _)| (*id, state.clone()))
177                .collect(),
178            fds: full.fds.into_iter().collect(),
179            inherited_fd: full.inherited_fd.into_iter().collect(),
180            instantiated: BTreeMap::default(),
181            suspended: full
182                .vms
183                .into_iter()
184                .map(|(id, _, snapshot)| (id, snapshot))
185                .collect(),
186            message_box: Arc::new(Mutex::new(Vec::new())),
187            terminated_vms: full.terminated_vms.into_iter().collect(),
188        };
189        scheduler
190            .ensure_vms_instantiated(&full.instantiated_ids)
191            .unwrap();
192        // NOTE: suspending/resuming a scheduler is part of CKB's implementation
193        // details. It is not part of execution consensue. We should not charge
194        // cycles for them.
195        scheduler.iteration_cycles = 0;
196        scheduler
197    }
198
199    /// Suspend current scheduler into a serializable full state
200    pub fn suspend(mut self) -> Result<FullSuspendedState, Error> {
201        assert!(self.message_box.lock().expect("lock").is_empty());
202        let mut vms = Vec::with_capacity(self.states.len());
203        let instantiated_ids: Vec<_> = self.instantiated.keys().cloned().collect();
204        for id in &instantiated_ids {
205            self.suspend_vm(id)?;
206        }
207        for (id, state) in self.states {
208            let snapshot = self
209                .suspended
210                .remove(&id)
211                .ok_or_else(|| Error::Unexpected("Unable to find VM Id".to_string()))?;
212            vms.push((id, state, snapshot));
213        }
214        Ok(FullSuspendedState {
215            // NOTE: suspending a scheduler is actually part of CKB's
216            // internal execution logic, it does not belong to VM execution
217            // consensus. We are not charging cycles for suspending
218            // a VM in the process of suspending the whole scheduler.
219            total_cycles: self.total_cycles.load(Ordering::Acquire),
220            next_vm_id: self.next_vm_id,
221            next_fd_slot: self.next_fd_slot,
222            vms,
223            fds: self.fds.into_iter().collect(),
224            inherited_fd: self.inherited_fd.into_iter().collect(),
225            terminated_vms: self.terminated_vms.into_iter().collect(),
226            instantiated_ids,
227        })
228    }
229
230    /// This is the only entrypoint for running the scheduler,
231    /// both newly created instance and resumed instance are supported.
232    /// It accepts 2 run mode, one can either limit the cycles to execute,
233    /// or use a pause signal to trigger termination.
234    ///
235    /// Only when the execution terminates without VM errors, will this
236    /// function return an exit code(could still be non-zero) and total
237    /// consumed cycles.
238    ///
239    /// Err would be returned in the following cases:
240    /// * Cycle limit reached, the returned error would be ckb_vm::Error::CyclesExceeded,
241    /// * Pause trigger, the returned error would be ckb_vm::Error::Pause,
242    /// * Other terminating errors
243    pub fn run(&mut self, mode: RunMode) -> Result<(i8, Cycle), Error> {
244        if self.states.is_empty() {
245            // Booting phase, we will need to initialize the first VM.
246            let program_id = self.sg_data.sg_info.program_data_piece_id.clone();
247            assert_eq!(
248                self.boot_vm(
249                    &DataLocation {
250                        data_piece_id: program_id,
251                        offset: 0,
252                        length: u64::MAX,
253                    },
254                    VmArgs::Vector(vec![]),
255                )?,
256                ROOT_VM_ID
257            );
258        }
259        assert!(self.states.contains_key(&ROOT_VM_ID));
260
261        let (pause, mut limit_cycles) = match mode {
262            RunMode::LimitCycles(limit_cycles) => (Pause::new(), limit_cycles),
263            RunMode::Pause(pause) => (pause, u64::MAX),
264        };
265
266        while self.states[&ROOT_VM_ID] != VmState::Terminated {
267            assert_eq!(self.iteration_cycles, 0);
268            let iterate_return = self.iterate(pause.clone(), limit_cycles);
269            self.consume_cycles(self.iteration_cycles)?;
270            limit_cycles = limit_cycles
271                .checked_sub(self.iteration_cycles)
272                .ok_or(Error::CyclesExceeded)?;
273            // Clear iteration cycles intentionally after each run
274            self.iteration_cycles = 0;
275            iterate_return?;
276        }
277
278        // At this point, root VM cannot be suspended
279        let root_vm = &self.instantiated[&ROOT_VM_ID];
280        Ok((root_vm.1.machine().exit_code(), self.consumed_cycles()))
281    }
282
283    /// Returns the machine that needs to be executed in the current iterate.
284    pub fn iterate_prepare_machine(&mut self) -> Result<(u64, &mut M), Error> {
285        // Process all pending VM reads & writes.
286        self.process_io()?;
287        // Find a runnable VM that has the largest ID.
288        let vm_id_to_run = self
289            .states
290            .iter()
291            .rev()
292            .filter(|(_, state)| matches!(state, VmState::Runnable))
293            .map(|(id, _)| *id)
294            .next();
295        let vm_id_to_run = vm_id_to_run.ok_or_else(|| {
296            Error::Unexpected("A deadlock situation has been reached!".to_string())
297        })?;
298        let (_context, machine) = self.ensure_get_instantiated(&vm_id_to_run)?;
299        Ok((vm_id_to_run, machine))
300    }
301
302    /// Process machine execution results in the current iterate.
303    pub fn iterate_process_results(
304        &mut self,
305        vm_id_to_run: u64,
306        result: Result<i8, Error>,
307    ) -> Result<(), Error> {
308        // Process message box, update VM states accordingly
309        self.process_message_box()?;
310        assert!(self.message_box.lock().expect("lock").is_empty());
311        // If the VM terminates, update VMs in join state, also closes its fds
312        match result {
313            Ok(code) => {
314                self.terminated_vms.insert(vm_id_to_run, code);
315                // When root VM terminates, the execution stops immediately, we will purge
316                // all non-root VMs, and only keep root VM in states.
317                // When non-root VM terminates, we only purge the VM's own states.
318                if vm_id_to_run == ROOT_VM_ID {
319                    self.ensure_vms_instantiated(&[vm_id_to_run])?;
320                    self.instantiated.retain(|id, _| *id == vm_id_to_run);
321                    self.suspended.clear();
322                    self.states.clear();
323                    self.states.insert(vm_id_to_run, VmState::Terminated);
324                } else {
325                    let joining_vms: Vec<(VmId, u64)> = self
326                        .states
327                        .iter()
328                        .filter_map(|(vm_id, state)| match state {
329                            VmState::Wait {
330                                target_vm_id,
331                                exit_code_addr,
332                            } if *target_vm_id == vm_id_to_run => Some((*vm_id, *exit_code_addr)),
333                            _ => None,
334                        })
335                        .collect();
336                    // For all joining VMs, update exit code, then mark them as
337                    // runnable state.
338                    for (vm_id, exit_code_addr) in joining_vms {
339                        let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
340                        machine
341                            .inner_mut()
342                            .memory_mut()
343                            .store8(&Self::u64_to_reg(exit_code_addr), &Self::i8_to_reg(code))?;
344                        machine
345                            .inner_mut()
346                            .set_register(A0, Self::u8_to_reg(SUCCESS));
347                        self.states.insert(vm_id, VmState::Runnable);
348                    }
349                    // Close fds
350                    self.fds.retain(|_, vm_id| *vm_id != vm_id_to_run);
351                    // Clear terminated VM states
352                    self.states.remove(&vm_id_to_run);
353                    self.instantiated.remove(&vm_id_to_run);
354                    self.suspended.remove(&vm_id_to_run);
355                }
356                Ok(())
357            }
358            Err(Error::Yield) => Ok(()),
359            Err(e) => Err(e),
360        }
361    }
362
363    // This is internal function that does the actual VM execution loop.
364    // Here both pause signal and limit_cycles are provided so as to simplify
365    // branches.
366    fn iterate(&mut self, pause: Pause, limit_cycles: Cycle) -> Result<(), Error> {
367        // Execute the VM for real, consumed cycles in the virtual machine is
368        // moved over to +iteration_cycles+, then we reset virtual machine's own
369        // cycle count to zero.
370        let (id, result, cycles) = {
371            let (id, vm) = self.iterate_prepare_machine()?;
372            vm.inner_mut().set_max_cycles(limit_cycles);
373            vm.machine_mut().set_pause(pause);
374            let result = vm.run();
375            let cycles = vm.machine().cycles();
376            vm.inner_mut().set_cycles(0);
377            (id, result, cycles)
378        };
379        self.iteration_cycles = self
380            .iteration_cycles
381            .checked_add(cycles)
382            .ok_or(Error::CyclesExceeded)?;
383        self.iterate_process_results(id, result)
384    }
385
386    fn process_message_box(&mut self) -> Result<(), Error> {
387        let messages: Vec<Message> = self.message_box.lock().expect("lock").drain(..).collect();
388        for message in messages {
389            match message {
390                Message::ExecV2(vm_id, args) => {
391                    let (old_context, old_machine) = self
392                        .instantiated
393                        .get_mut(&vm_id)
394                        .ok_or_else(|| Error::Unexpected("Unable to find VM Id".to_string()))?;
395                    old_machine
396                        .inner_mut()
397                        .add_cycles_no_checking(EXEC_LOAD_ELF_V2_CYCLES_BASE)?;
398                    let old_cycles = old_machine.machine().cycles();
399                    let max_cycles = old_machine.machine().max_cycles();
400                    let program = {
401                        let sc = old_context.snapshot2_context.lock().expect("lock");
402                        sc.load_data(
403                            &args.location.data_piece_id,
404                            args.location.offset,
405                            args.location.length,
406                        )?
407                        .0
408                    };
409                    let (context, mut new_machine) = self.create_dummy_vm(&vm_id)?;
410                    new_machine.inner_mut().set_max_cycles(max_cycles);
411                    new_machine.inner_mut().add_cycles_no_checking(old_cycles)?;
412                    self.load_vm_program(
413                        &context,
414                        &mut new_machine,
415                        &args.location,
416                        program,
417                        VmArgs::Reader {
418                            vm_id,
419                            argc: args.argc,
420                            argv: args.argv,
421                        },
422                    )?;
423                    // The insert operation removes the old vm instance and adds the new vm instance.
424                    debug_assert!(self.instantiated.contains_key(&vm_id));
425                    self.instantiated.insert(vm_id, (context, new_machine));
426                }
427                Message::Spawn(vm_id, args) => {
428                    // All fds must belong to the correct owner
429                    if args.fds.iter().any(|fd| self.fds.get(fd) != Some(&vm_id)) {
430                        let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
431                        machine
432                            .inner_mut()
433                            .set_register(A0, Self::u8_to_reg(INVALID_FD));
434                        continue;
435                    }
436                    if self.suspended.len() + self.instantiated.len() > MAX_VMS_COUNT as usize {
437                        let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
438                        machine
439                            .inner_mut()
440                            .set_register(A0, Self::u8_to_reg(MAX_VMS_SPAWNED));
441                        continue;
442                    }
443                    let spawned_vm_id = self.boot_vm(
444                        &args.location,
445                        VmArgs::Reader {
446                            vm_id,
447                            argc: args.argc,
448                            argv: args.argv,
449                        },
450                    )?;
451                    // Move passed fds from spawner to spawnee
452                    for fd in &args.fds {
453                        self.fds.insert(*fd, spawned_vm_id);
454                    }
455                    // Here we keep the original version of file descriptors.
456                    // If one fd is moved afterward, this inherited file descriptors doesn't change.
457                    self.inherited_fd.insert(spawned_vm_id, args.fds.clone());
458
459                    let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
460                    machine.inner_mut().memory_mut().store64(
461                        &Self::u64_to_reg(args.process_id_addr),
462                        &Self::u64_to_reg(spawned_vm_id),
463                    )?;
464                    machine
465                        .inner_mut()
466                        .set_register(A0, Self::u8_to_reg(SUCCESS));
467                }
468                Message::Wait(vm_id, args) => {
469                    if let Some(exit_code) = self.terminated_vms.get(&args.target_id).copied() {
470                        let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
471                        machine.inner_mut().memory_mut().store8(
472                            &Self::u64_to_reg(args.exit_code_addr),
473                            &Self::i8_to_reg(exit_code),
474                        )?;
475                        machine
476                            .inner_mut()
477                            .set_register(A0, Self::u8_to_reg(SUCCESS));
478                        self.states.insert(vm_id, VmState::Runnable);
479                        self.terminated_vms.retain(|id, _| id != &args.target_id);
480                        continue;
481                    }
482                    if !self.states.contains_key(&args.target_id) {
483                        let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
484                        machine
485                            .inner_mut()
486                            .set_register(A0, Self::u8_to_reg(WAIT_FAILURE));
487                        continue;
488                    }
489                    // Return code will be updated when the joining VM exits
490                    self.states.insert(
491                        vm_id,
492                        VmState::Wait {
493                            target_vm_id: args.target_id,
494                            exit_code_addr: args.exit_code_addr,
495                        },
496                    );
497                }
498                Message::Pipe(vm_id, args) => {
499                    if self.fds.len() as u64 >= MAX_FDS {
500                        let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
501                        machine
502                            .inner_mut()
503                            .set_register(A0, Self::u8_to_reg(MAX_FDS_CREATED));
504                        continue;
505                    }
506                    let (p1, p2, slot) = Fd::create(self.next_fd_slot);
507                    self.next_fd_slot = slot;
508                    self.fds.insert(p1, vm_id);
509                    self.fds.insert(p2, vm_id);
510                    let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
511                    machine
512                        .inner_mut()
513                        .memory_mut()
514                        .store64(&Self::u64_to_reg(args.fd1_addr), &Self::u64_to_reg(p1.0))?;
515                    machine
516                        .inner_mut()
517                        .memory_mut()
518                        .store64(&Self::u64_to_reg(args.fd2_addr), &Self::u64_to_reg(p2.0))?;
519                    machine
520                        .inner_mut()
521                        .set_register(A0, Self::u8_to_reg(SUCCESS));
522                }
523                Message::FdRead(vm_id, args) => {
524                    if !(self.fds.contains_key(&args.fd) && (self.fds[&args.fd] == vm_id)) {
525                        let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
526                        machine
527                            .inner_mut()
528                            .set_register(A0, Self::u8_to_reg(INVALID_FD));
529                        continue;
530                    }
531                    if !self.fds.contains_key(&args.fd.other_fd()) {
532                        let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
533                        machine
534                            .inner_mut()
535                            .set_register(A0, Self::u8_to_reg(OTHER_END_CLOSED));
536                        continue;
537                    }
538                    // Return code will be updated when the read operation finishes
539                    self.states.insert(
540                        vm_id,
541                        VmState::WaitForRead(ReadState {
542                            fd: args.fd,
543                            length: args.length,
544                            buffer_addr: args.buffer_addr,
545                            length_addr: args.length_addr,
546                        }),
547                    );
548                }
549                Message::FdWrite(vm_id, args) => {
550                    if !(self.fds.contains_key(&args.fd) && (self.fds[&args.fd] == vm_id)) {
551                        let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
552                        machine
553                            .inner_mut()
554                            .set_register(A0, Self::u8_to_reg(INVALID_FD));
555                        continue;
556                    }
557                    if !self.fds.contains_key(&args.fd.other_fd()) {
558                        let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
559                        machine
560                            .inner_mut()
561                            .set_register(A0, Self::u8_to_reg(OTHER_END_CLOSED));
562                        continue;
563                    }
564                    // Return code will be updated when the write operation finishes
565                    self.states.insert(
566                        vm_id,
567                        VmState::WaitForWrite(WriteState {
568                            fd: args.fd,
569                            consumed: 0,
570                            length: args.length,
571                            buffer_addr: args.buffer_addr,
572                            length_addr: args.length_addr,
573                        }),
574                    );
575                }
576                Message::InheritedFileDescriptor(vm_id, args) => {
577                    let inherited_fd = if vm_id == ROOT_VM_ID {
578                        Vec::new()
579                    } else {
580                        self.inherited_fd[&vm_id].clone()
581                    };
582                    let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
583                    let FdArgs {
584                        buffer_addr,
585                        length_addr,
586                        ..
587                    } = args;
588                    let full_length = machine
589                        .inner_mut()
590                        .memory_mut()
591                        .load64(&Self::u64_to_reg(length_addr))?
592                        .to_u64();
593                    let real_length = inherited_fd.len() as u64;
594                    let copy_length = u64::min(full_length, real_length);
595                    for i in 0..copy_length {
596                        let fd = inherited_fd[i as usize].0;
597                        let addr = buffer_addr.checked_add(i * 8).ok_or(Error::MemOutOfBound)?;
598                        machine
599                            .inner_mut()
600                            .memory_mut()
601                            .store64(&Self::u64_to_reg(addr), &Self::u64_to_reg(fd))?;
602                    }
603                    machine.inner_mut().memory_mut().store64(
604                        &Self::u64_to_reg(length_addr),
605                        &Self::u64_to_reg(real_length),
606                    )?;
607                    machine
608                        .inner_mut()
609                        .set_register(A0, Self::u8_to_reg(SUCCESS));
610                }
611                Message::Close(vm_id, fd) => {
612                    if self.fds.get(&fd) != Some(&vm_id) {
613                        let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
614                        machine
615                            .inner_mut()
616                            .set_register(A0, Self::u8_to_reg(INVALID_FD));
617                    } else {
618                        self.fds.remove(&fd);
619                        let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
620                        machine
621                            .inner_mut()
622                            .set_register(A0, Self::u8_to_reg(SUCCESS));
623                    }
624                }
625            }
626        }
627        Ok(())
628    }
629
630    fn process_io(&mut self) -> Result<(), Error> {
631        let mut reads: HashMap<Fd, (VmId, ReadState)> = HashMap::default();
632        let mut closed_fds: Vec<VmId> = Vec::new();
633
634        self.states.iter().for_each(|(vm_id, state)| {
635            if let VmState::WaitForRead(inner_state) = state {
636                if self.fds.contains_key(&inner_state.fd.other_fd()) {
637                    reads.insert(inner_state.fd, (*vm_id, inner_state.clone()));
638                } else {
639                    closed_fds.push(*vm_id);
640                }
641            }
642        });
643        let mut pairs: Vec<(VmId, ReadState, VmId, WriteState)> = Vec::new();
644        self.states.iter().for_each(|(vm_id, state)| {
645            if let VmState::WaitForWrite(inner_state) = state {
646                if self.fds.contains_key(&inner_state.fd.other_fd()) {
647                    if let Some((read_vm_id, read_state)) = reads.get(&inner_state.fd.other_fd()) {
648                        pairs.push((*read_vm_id, read_state.clone(), *vm_id, inner_state.clone()));
649                    }
650                } else {
651                    closed_fds.push(*vm_id);
652                }
653            }
654        });
655        // Finish read / write syscalls for fds that are closed on the other end
656        for vm_id in closed_fds {
657            match self.states[&vm_id].clone() {
658                VmState::WaitForRead(ReadState { length_addr, .. }) => {
659                    let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
660                    machine.inner_mut().memory_mut().store64(
661                        &Self::u64_to_reg(length_addr),
662                        &<M::Inner as CoreMachine>::REG::zero(),
663                    )?;
664                    machine
665                        .inner_mut()
666                        .set_register(A0, Self::u8_to_reg(SUCCESS));
667                    self.states.insert(vm_id, VmState::Runnable);
668                }
669                VmState::WaitForWrite(WriteState {
670                    consumed,
671                    length_addr,
672                    ..
673                }) => {
674                    let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
675                    machine
676                        .inner_mut()
677                        .memory_mut()
678                        .store64(&Self::u64_to_reg(length_addr), &Self::u64_to_reg(consumed))?;
679                    machine
680                        .inner_mut()
681                        .set_register(A0, Self::u8_to_reg(SUCCESS));
682                    self.states.insert(vm_id, VmState::Runnable);
683                }
684                _ => (),
685            }
686        }
687        // Transferring data from write fds to read fds
688        for (read_vm_id, read_state, write_vm_id, write_state) in pairs {
689            let ReadState {
690                length: read_length,
691                buffer_addr: read_buffer_addr,
692                length_addr: read_length_addr,
693                ..
694            } = read_state;
695            let WriteState {
696                fd: write_fd,
697                mut consumed,
698                length: write_length,
699                buffer_addr: write_buffer_addr,
700                length_addr: write_length_addr,
701            } = write_state;
702
703            self.ensure_vms_instantiated(&[read_vm_id, write_vm_id])?;
704            {
705                let fillable = read_length;
706                let consumable = write_length - consumed;
707                let copiable = std::cmp::min(fillable, consumable);
708
709                // Actual data copying
710                let (_, write_machine) = self
711                    .instantiated
712                    .get_mut(&write_vm_id)
713                    .ok_or_else(|| Error::Unexpected("Unable to find VM Id".to_string()))?;
714                write_machine
715                    .inner_mut()
716                    .add_cycles_no_checking(transferred_byte_cycles(copiable))?;
717                let data = write_machine
718                    .inner_mut()
719                    .memory_mut()
720                    .load_bytes(write_buffer_addr.wrapping_add(consumed), copiable)?;
721                let (_, read_machine) = self
722                    .instantiated
723                    .get_mut(&read_vm_id)
724                    .ok_or_else(|| Error::Unexpected("Unable to find VM Id".to_string()))?;
725                read_machine
726                    .inner_mut()
727                    .add_cycles_no_checking(transferred_byte_cycles(copiable))?;
728                read_machine
729                    .inner_mut()
730                    .memory_mut()
731                    .store_bytes(read_buffer_addr, &data)?;
732                // Read syscall terminates as soon as some data are filled
733                read_machine.inner_mut().memory_mut().store64(
734                    &Self::u64_to_reg(read_length_addr),
735                    &Self::u64_to_reg(copiable),
736                )?;
737                read_machine
738                    .inner_mut()
739                    .set_register(A0, Self::u8_to_reg(SUCCESS));
740                self.states.insert(read_vm_id, VmState::Runnable);
741
742                // Write syscall, however, terminates only when all the data
743                // have been written, or when the pairing read fd is closed.
744                consumed += copiable;
745                if consumed == write_length {
746                    // Write VM has fulfilled its write request
747                    let (_, write_machine) = self
748                        .instantiated
749                        .get_mut(&write_vm_id)
750                        .ok_or_else(|| Error::Unexpected("Unable to find VM Id".to_string()))?;
751                    write_machine.inner_mut().memory_mut().store64(
752                        &Self::u64_to_reg(write_length_addr),
753                        &Self::u64_to_reg(write_length),
754                    )?;
755                    write_machine
756                        .inner_mut()
757                        .set_register(A0, Self::u8_to_reg(SUCCESS));
758                    self.states.insert(write_vm_id, VmState::Runnable);
759                } else {
760                    // Only update write VM state
761                    self.states.insert(
762                        write_vm_id,
763                        VmState::WaitForWrite(WriteState {
764                            fd: write_fd,
765                            consumed,
766                            length: write_length,
767                            buffer_addr: write_buffer_addr,
768                            length_addr: write_length_addr,
769                        }),
770                    );
771                }
772            }
773        }
774        Ok(())
775    }
776
777    // Ensure VMs are instantiated
778    fn ensure_vms_instantiated(&mut self, ids: &[VmId]) -> Result<(), Error> {
779        if ids.len() > MAX_INSTANTIATED_VMS {
780            return Err(Error::Unexpected(format!(
781                "At most {} VMs can be instantiated but {} are requested!",
782                MAX_INSTANTIATED_VMS,
783                ids.len()
784            )));
785        }
786
787        let mut uninstantiated_ids: Vec<VmId> = ids
788            .iter()
789            .filter(|id| !self.instantiated.contains_key(id))
790            .copied()
791            .collect();
792        while (!uninstantiated_ids.is_empty()) && (self.instantiated.len() < MAX_INSTANTIATED_VMS) {
793            let id = uninstantiated_ids
794                .pop()
795                .ok_or_else(|| Error::Unexpected("Map should not be empty".to_string()))?;
796            self.resume_vm(&id)?;
797        }
798
799        if !uninstantiated_ids.is_empty() {
800            // Instantiated is a BTreeMap, an iterator on it maintains key order to ensure deterministic behavior
801            let suspendable_ids: Vec<VmId> = self
802                .instantiated
803                .keys()
804                .filter(|id| !ids.contains(id))
805                .copied()
806                .collect();
807
808            assert!(suspendable_ids.len() >= uninstantiated_ids.len());
809            for i in 0..uninstantiated_ids.len() {
810                self.suspend_vm(&suspendable_ids[i])?;
811                self.resume_vm(&uninstantiated_ids[i])?;
812            }
813        }
814
815        Ok(())
816    }
817
818    // Ensure corresponding VM is instantiated and return a mutable reference to it
819    fn ensure_get_instantiated(&mut self, id: &VmId) -> Result<&mut (VmContext<DL>, M), Error> {
820        self.ensure_vms_instantiated(&[*id])?;
821        self.instantiated
822            .get_mut(id)
823            .ok_or_else(|| Error::Unexpected("Unable to find VM Id".to_string()))
824    }
825
826    // Resume a suspended VM
827    fn resume_vm(&mut self, id: &VmId) -> Result<(), Error> {
828        if !self.suspended.contains_key(id) {
829            return Err(Error::Unexpected(format!("VM {:?} is not suspended!", id)));
830        }
831        let snapshot = &self.suspended[id];
832        self.iteration_cycles = self
833            .iteration_cycles
834            .checked_add(SPAWN_EXTRA_CYCLES_BASE)
835            .ok_or(Error::CyclesExceeded)?;
836        let (context, mut machine) = self.create_dummy_vm(id)?;
837        {
838            let mut sc = context.snapshot2_context.lock().expect("lock");
839            sc.resume(machine.inner_mut(), snapshot)?;
840        }
841        self.instantiated.insert(*id, (context, machine));
842        self.suspended.remove(id);
843        Ok(())
844    }
845
846    // Suspend an instantiated VM
847    fn suspend_vm(&mut self, id: &VmId) -> Result<(), Error> {
848        if !self.instantiated.contains_key(id) {
849            return Err(Error::Unexpected(format!(
850                "VM {:?} is not instantiated!",
851                id
852            )));
853        }
854        self.iteration_cycles = self
855            .iteration_cycles
856            .checked_add(SPAWN_EXTRA_CYCLES_BASE)
857            .ok_or(Error::CyclesExceeded)?;
858        let (context, machine) = self
859            .instantiated
860            .get_mut(id)
861            .ok_or_else(|| Error::Unexpected("Unable to find VM Id".to_string()))?;
862        let snapshot = {
863            let sc = context.snapshot2_context.lock().expect("lock");
864            sc.make_snapshot(machine.inner_mut())?
865        };
866        self.suspended.insert(*id, snapshot);
867        self.instantiated.remove(id);
868        Ok(())
869    }
870
871    /// Boot a vm by given program and args.
872    pub fn boot_vm(&mut self, location: &DataLocation, args: VmArgs) -> Result<VmId, Error> {
873        let id = self.next_vm_id;
874        self.next_vm_id += 1;
875        let (context, mut machine) = self.create_dummy_vm(&id)?;
876        let (program, _) = {
877            let sc = context.snapshot2_context.lock().expect("lock");
878            sc.load_data(&location.data_piece_id, location.offset, location.length)?
879        };
880        self.load_vm_program(&context, &mut machine, location, program, args)?;
881        // Newly booted VM will be instantiated by default
882        while self.instantiated.len() >= MAX_INSTANTIATED_VMS {
883            // Instantiated is a BTreeMap, first_entry will maintain key order
884            let id = *self
885                .instantiated
886                .first_entry()
887                .ok_or_else(|| Error::Unexpected("Map should not be empty".to_string()))?
888                .key();
889            self.suspend_vm(&id)?;
890        }
891
892        self.instantiated.insert(id, (context, machine));
893        self.states.insert(id, VmState::Runnable);
894
895        Ok(id)
896    }
897
898    // Load the program into an empty vm.
899    fn load_vm_program(
900        &mut self,
901        context: &VmContext<DL>,
902        machine: &mut M,
903        location: &DataLocation,
904        program: Bytes,
905        args: VmArgs,
906    ) -> Result<u64, Error> {
907        let metadata = parse_elf::<u64>(&program, machine.inner_mut().version())?;
908        let bytes = match args {
909            VmArgs::Reader { vm_id, argc, argv } => {
910                let (_, machine_from) = self.ensure_get_instantiated(&vm_id)?;
911                let argc = Self::u64_to_reg(argc);
912                let argv = Self::u64_to_reg(argv);
913                let argv =
914                    FlattenedArgsReader::new(machine_from.inner_mut().memory_mut(), argc, argv);
915                machine.load_program_with_metadata(&program, &metadata, argv)?
916            }
917            VmArgs::Vector(data) => {
918                machine.load_program_with_metadata(&program, &metadata, data.into_iter().map(Ok))?
919            }
920        };
921        let mut sc = context.snapshot2_context.lock().expect("lock");
922        sc.mark_program(
923            machine.inner_mut(),
924            &metadata,
925            &location.data_piece_id,
926            location.offset,
927        )?;
928        machine
929            .inner_mut()
930            .add_cycles_no_checking(transferred_byte_cycles(bytes))?;
931        Ok(bytes)
932    }
933
934    // Create a new VM instance with syscalls attached
935    fn create_dummy_vm(&self, id: &VmId) -> Result<(VmContext<DL>, M), Error> {
936        let version = &self.sg_data.sg_info.script_version;
937        let core_machine = M::Inner::new(
938            version.vm_isa(),
939            version.vm_version(),
940            // We will update max_cycles for each machine when it gets a chance to run
941            u64::MAX,
942        );
943        let vm_context = VmContext {
944            base_cycles: Arc::clone(&self.total_cycles),
945            message_box: Arc::clone(&self.message_box),
946            snapshot2_context: Arc::new(Mutex::new(Snapshot2Context::new(self.sg_data.clone()))),
947        };
948
949        let machine_builder = DefaultMachineBuilder::new(core_machine)
950            .instruction_cycle_func(Box::new(estimate_cycles));
951        let machine_builder =
952            (self.syscall_generator)(id, &self.sg_data, &vm_context, &self.syscall_context)
953                .into_iter()
954                .fold(machine_builder, |builder, syscall| builder.syscall(syscall));
955        let default_machine = machine_builder.build();
956        Ok((vm_context, M::new(default_machine)))
957    }
958
959    fn i8_to_reg(v: i8) -> <M::Inner as CoreMachine>::REG {
960        <M::Inner as CoreMachine>::REG::from_i8(v)
961    }
962
963    fn u8_to_reg(v: u8) -> <M::Inner as CoreMachine>::REG {
964        <M::Inner as CoreMachine>::REG::from_u8(v)
965    }
966
967    fn u64_to_reg(v: u64) -> <M::Inner as CoreMachine>::REG {
968        <M::Inner as CoreMachine>::REG::from_u64(v)
969    }
970}