ckb_script/
scheduler.rs

1use crate::cost_model::transferred_byte_cycles;
2use crate::syscalls::{
3    generator::generate_ckb_syscalls, EXEC_LOAD_ELF_V2_CYCLES_BASE, INVALID_FD, MAX_FDS_CREATED,
4    MAX_VMS_SPAWNED, OTHER_END_CLOSED, SPAWN_EXTRA_CYCLES_BASE, SUCCESS, WAIT_FAILURE,
5};
6
7use crate::types::{
8    CoreMachineType, DataLocation, DataPieceId, DebugContext, Fd, FdArgs, FullSuspendedState,
9    Machine, Message, ReadState, RunMode, SgData, VmArgs, VmContext, VmId, VmState, WriteState,
10    FIRST_FD_SLOT, FIRST_VM_ID,
11};
12use ckb_traits::{CellDataProvider, ExtensionProvider, HeaderProvider};
13use ckb_types::core::Cycle;
14use ckb_vm::snapshot2::Snapshot2Context;
15use ckb_vm::{
16    bytes::Bytes,
17    cost_model::estimate_cycles,
18    elf::parse_elf,
19    machine::{CoreMachine, DefaultMachineBuilder, Pause, SupportMachine},
20    memory::Memory,
21    registers::A0,
22    snapshot2::Snapshot2,
23    Error, FlattenedArgsReader, Register,
24};
25use std::collections::{BTreeMap, HashMap};
26use std::sync::{
27    atomic::{AtomicU64, Ordering},
28    Arc, Mutex,
29};
30
31/// Root process's id.
32pub const ROOT_VM_ID: VmId = FIRST_VM_ID;
33/// The maximum number of VMs that can be created at the same time.
34pub const MAX_VMS_COUNT: u64 = 16;
35/// The maximum number of instantiated VMs.
36pub const MAX_INSTANTIATED_VMS: usize = 4;
37/// The maximum number of fds.
38pub const MAX_FDS: u64 = 64;
39
40/// A single Scheduler instance is used to verify a single script
41/// within a CKB transaction.
42///
43/// A scheduler holds & manipulates a core, the scheduler also holds
44/// all CKB-VM machines, each CKB-VM machine also gets a mutable reference
45/// of the core for IO operations.
46pub struct Scheduler<DL>
47where
48    DL: CellDataProvider,
49{
50    /// Immutable context data for current running transaction & script.
51    pub sg_data: SgData<DL>,
52
53    /// Mutable context data used by current scheduler
54    pub debug_context: DebugContext,
55
56    /// Total cycles. When a scheduler executes, there are 3 variables
57    /// that might all contain charged cycles: +total_cycles+,
58    /// +iteration_cycles+ and +machine.cycles()+ from the current
59    /// executing virtual machine. At any given time, the sum of all 3
60    /// variables here, represent the total consumed cycles by the current
61    /// scheduler.
62    /// But there are also exceptions: at certain period of time, the cycles
63    /// stored in `machine.cycles()` are moved over to +iteration_cycles+,
64    /// the cycles stored in +iteration_cycles+ would also be moved over to
65    /// +total_cycles+:
66    ///
67    /// * The current running virtual machine would contain consumed
68    ///   cycles in its own machine.cycles() structure.
69    /// * +iteration_cycles+ holds the current consumed cycles each time
70    ///   we executed a virtual machine(also named an iteration). It will
71    ///   always be zero before each iteration(i.e., before each VM starts
72    ///   execution). When a virtual machine finishes execution, the cycles
73    ///   stored in `machine.cycles()` will be moved over to +iteration_cycles+.
74    ///   `machine.cycles()` will then be reset to zero.
75    /// * Processing messages in the message box would alao charge cycles
76    ///   for operations, such as suspending/resuming VMs, transferring data
77    ///   etc. Those cycles were added to +iteration_cycles+ directly. When all
78    ///   postprocessing work is completed, the cycles consumed in
79    ///   +iteration_cycles+ will then be moved to +total_cycles+.
80    ///   +iteration_cycles+ will then be reset to zero.
81    ///
82    /// One can consider that +total_cycles+ contains the total cycles
83    /// consumed in current scheduler, when the scheduler is not busy executing.
84    pub total_cycles: Arc<AtomicU64>,
85    /// Iteration cycles, see +total_cycles+ on its usage
86    pub iteration_cycles: Cycle,
87    /// Next vm id used by spawn.
88    pub next_vm_id: VmId,
89    /// Next fd used by pipe.
90    pub next_fd_slot: u64,
91    /// Used to store VM state.
92    pub states: BTreeMap<VmId, VmState>,
93    /// Used to confirm the owner of fd.
94    pub fds: BTreeMap<Fd, VmId>,
95    /// Verify the VM's inherited fd list.
96    pub inherited_fd: BTreeMap<VmId, Vec<Fd>>,
97    /// Instantiated vms.
98    pub instantiated: BTreeMap<VmId, (VmContext<DL>, Machine)>,
99    /// Suspended vms.
100    pub suspended: BTreeMap<VmId, Snapshot2<DataPieceId>>,
101    /// Terminated vms.
102    pub terminated_vms: BTreeMap<VmId, i8>,
103
104    /// MessageBox is expected to be empty before returning from `run`
105    /// function, there is no need to persist messages.
106    pub message_box: Arc<Mutex<Vec<Message>>>,
107}
108
109impl<DL> Scheduler<DL>
110where
111    DL: CellDataProvider + HeaderProvider + ExtensionProvider + Send + Sync + Clone + 'static,
112{
113    /// Create a new scheduler from empty state
114    pub fn new(sg_data: SgData<DL>, debug_context: DebugContext) -> Self {
115        Self {
116            sg_data,
117            debug_context,
118            total_cycles: Arc::new(AtomicU64::new(0)),
119            iteration_cycles: 0,
120            next_vm_id: FIRST_VM_ID,
121            next_fd_slot: FIRST_FD_SLOT,
122            states: BTreeMap::default(),
123            fds: BTreeMap::default(),
124            inherited_fd: BTreeMap::default(),
125            instantiated: BTreeMap::default(),
126            suspended: BTreeMap::default(),
127            message_box: Arc::new(Mutex::new(Vec::new())),
128            terminated_vms: BTreeMap::default(),
129        }
130    }
131
132    /// Return total cycles.
133    pub fn consumed_cycles(&self) -> Cycle {
134        self.total_cycles.load(Ordering::Acquire)
135    }
136
137    /// Add cycles to total cycles.
138    pub fn consume_cycles(&mut self, cycles: Cycle) -> Result<(), Error> {
139        match self
140            .total_cycles
141            .fetch_update(Ordering::AcqRel, Ordering::Acquire, |total_cycles| {
142                total_cycles.checked_add(cycles)
143            }) {
144            Ok(_) => Ok(()),
145            Err(_) => Err(Error::CyclesExceeded),
146        }
147    }
148
149    /// Resume a previously suspended scheduler state
150    pub fn resume(
151        sg_data: SgData<DL>,
152        debug_context: DebugContext,
153        full: FullSuspendedState,
154    ) -> Self {
155        let mut scheduler = Self {
156            sg_data,
157            debug_context,
158            total_cycles: Arc::new(AtomicU64::new(full.total_cycles)),
159            iteration_cycles: 0,
160            next_vm_id: full.next_vm_id,
161            next_fd_slot: full.next_fd_slot,
162            states: full
163                .vms
164                .iter()
165                .map(|(id, state, _)| (*id, state.clone()))
166                .collect(),
167            fds: full.fds.into_iter().collect(),
168            inherited_fd: full.inherited_fd.into_iter().collect(),
169            instantiated: BTreeMap::default(),
170            suspended: full
171                .vms
172                .into_iter()
173                .map(|(id, _, snapshot)| (id, snapshot))
174                .collect(),
175            message_box: Arc::new(Mutex::new(Vec::new())),
176            terminated_vms: full.terminated_vms.into_iter().collect(),
177        };
178        scheduler
179            .ensure_vms_instantiated(&full.instantiated_ids)
180            .unwrap();
181        // NOTE: suspending/resuming a scheduler is part of CKB's implementation
182        // details. It is not part of execution consensue. We should not charge
183        // cycles for them.
184        scheduler.iteration_cycles = 0;
185        scheduler
186    }
187
188    /// Suspend current scheduler into a serializable full state
189    pub fn suspend(mut self) -> Result<FullSuspendedState, Error> {
190        assert!(self.message_box.lock().expect("lock").is_empty());
191        let mut vms = Vec::with_capacity(self.states.len());
192        let instantiated_ids: Vec<_> = self.instantiated.keys().cloned().collect();
193        for id in &instantiated_ids {
194            self.suspend_vm(id)?;
195        }
196        for (id, state) in self.states {
197            let snapshot = self
198                .suspended
199                .remove(&id)
200                .ok_or_else(|| Error::Unexpected("Unable to find VM Id".to_string()))?;
201            vms.push((id, state, snapshot));
202        }
203        Ok(FullSuspendedState {
204            // NOTE: suspending a scheduler is actually part of CKB's
205            // internal execution logic, it does not belong to VM execution
206            // consensus. We are not charging cycles for suspending
207            // a VM in the process of suspending the whole scheduler.
208            total_cycles: self.total_cycles.load(Ordering::Acquire),
209            next_vm_id: self.next_vm_id,
210            next_fd_slot: self.next_fd_slot,
211            vms,
212            fds: self.fds.into_iter().collect(),
213            inherited_fd: self.inherited_fd.into_iter().collect(),
214            terminated_vms: self.terminated_vms.into_iter().collect(),
215            instantiated_ids,
216        })
217    }
218
219    /// This is the only entrypoint for running the scheduler,
220    /// both newly created instance and resumed instance are supported.
221    /// It accepts 2 run mode, one can either limit the cycles to execute,
222    /// or use a pause signal to trigger termination.
223    ///
224    /// Only when the execution terminates without VM errors, will this
225    /// function return an exit code(could still be non-zero) and total
226    /// consumed cycles.
227    ///
228    /// Err would be returned in the following cases:
229    /// * Cycle limit reached, the returned error would be ckb_vm::Error::CyclesExceeded,
230    /// * Pause trigger, the returned error would be ckb_vm::Error::Pause,
231    /// * Other terminating errors
232    pub fn run(&mut self, mode: RunMode) -> Result<(i8, Cycle), Error> {
233        if self.states.is_empty() {
234            // Booting phase, we will need to initialize the first VM.
235            let program_id = self.sg_data.sg_info.program_data_piece_id.clone();
236            assert_eq!(
237                self.boot_vm(
238                    &DataLocation {
239                        data_piece_id: program_id,
240                        offset: 0,
241                        length: u64::MAX,
242                    },
243                    VmArgs::Vector(vec![]),
244                )?,
245                ROOT_VM_ID
246            );
247        }
248        assert!(self.states.contains_key(&ROOT_VM_ID));
249
250        let (pause, mut limit_cycles) = match mode {
251            RunMode::LimitCycles(limit_cycles) => (Pause::new(), limit_cycles),
252            RunMode::Pause(pause) => (pause, u64::MAX),
253        };
254
255        while self.states[&ROOT_VM_ID] != VmState::Terminated {
256            assert_eq!(self.iteration_cycles, 0);
257            let iterate_return = self.iterate(pause.clone(), limit_cycles);
258            self.consume_cycles(self.iteration_cycles)?;
259            limit_cycles = limit_cycles
260                .checked_sub(self.iteration_cycles)
261                .ok_or(Error::CyclesExceeded)?;
262            // Clear iteration cycles intentionally after each run
263            self.iteration_cycles = 0;
264            iterate_return?;
265        }
266
267        // At this point, root VM cannot be suspended
268        let root_vm = &self.instantiated[&ROOT_VM_ID];
269        Ok((root_vm.1.machine.exit_code(), self.consumed_cycles()))
270    }
271
272    /// Returns the machine that needs to be executed in the current iterate.
273    pub fn iterate_prepare_machine(&mut self) -> Result<(u64, &mut Machine), Error> {
274        // Process all pending VM reads & writes.
275        self.process_io()?;
276        // Find a runnable VM that has the largest ID.
277        let vm_id_to_run = self
278            .states
279            .iter()
280            .rev()
281            .filter(|(_, state)| matches!(state, VmState::Runnable))
282            .map(|(id, _)| *id)
283            .next();
284        let vm_id_to_run = vm_id_to_run.ok_or_else(|| {
285            Error::Unexpected("A deadlock situation has been reached!".to_string())
286        })?;
287        let (_context, machine) = self.ensure_get_instantiated(&vm_id_to_run)?;
288        Ok((vm_id_to_run, machine))
289    }
290
291    /// Process machine execution results in the current iterate.
292    pub fn iterate_process_results(
293        &mut self,
294        vm_id_to_run: u64,
295        result: Result<i8, Error>,
296    ) -> Result<(), Error> {
297        // Process message box, update VM states accordingly
298        self.process_message_box()?;
299        assert!(self.message_box.lock().expect("lock").is_empty());
300        // If the VM terminates, update VMs in join state, also closes its fds
301        match result {
302            Ok(code) => {
303                self.terminated_vms.insert(vm_id_to_run, code);
304                // When root VM terminates, the execution stops immediately, we will purge
305                // all non-root VMs, and only keep root VM in states.
306                // When non-root VM terminates, we only purge the VM's own states.
307                if vm_id_to_run == ROOT_VM_ID {
308                    self.ensure_vms_instantiated(&[vm_id_to_run])?;
309                    self.instantiated.retain(|id, _| *id == vm_id_to_run);
310                    self.suspended.clear();
311                    self.states.clear();
312                    self.states.insert(vm_id_to_run, VmState::Terminated);
313                } else {
314                    let joining_vms: Vec<(VmId, u64)> = self
315                        .states
316                        .iter()
317                        .filter_map(|(vm_id, state)| match state {
318                            VmState::Wait {
319                                target_vm_id,
320                                exit_code_addr,
321                            } if *target_vm_id == vm_id_to_run => Some((*vm_id, *exit_code_addr)),
322                            _ => None,
323                        })
324                        .collect();
325                    // For all joining VMs, update exit code, then mark them as
326                    // runnable state.
327                    for (vm_id, exit_code_addr) in joining_vms {
328                        let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
329                        machine
330                            .machine
331                            .memory_mut()
332                            .store8(&exit_code_addr, &u64::from_i8(code))?;
333                        machine.machine.set_register(A0, SUCCESS as u64);
334                        self.states.insert(vm_id, VmState::Runnable);
335                    }
336                    // Close fds
337                    self.fds.retain(|_, vm_id| *vm_id != vm_id_to_run);
338                    // Clear terminated VM states
339                    self.states.remove(&vm_id_to_run);
340                    self.instantiated.remove(&vm_id_to_run);
341                    self.suspended.remove(&vm_id_to_run);
342                }
343                Ok(())
344            }
345            Err(Error::Yield) => Ok(()),
346            Err(e) => Err(e),
347        }
348    }
349
350    // This is internal function that does the actual VM execution loop.
351    // Here both pause signal and limit_cycles are provided so as to simplify
352    // branches.
353    fn iterate(&mut self, pause: Pause, limit_cycles: Cycle) -> Result<(), Error> {
354        // Execute the VM for real, consumed cycles in the virtual machine is
355        // moved over to +iteration_cycles+, then we reset virtual machine's own
356        // cycle count to zero.
357        let (id, result, cycles) = {
358            let (id, vm) = self.iterate_prepare_machine()?;
359            vm.set_max_cycles(limit_cycles);
360            vm.machine.set_pause(pause);
361            let result = vm.run();
362            let cycles = vm.machine.cycles();
363            vm.machine.set_cycles(0);
364            (id, result, cycles)
365        };
366        self.iteration_cycles = self
367            .iteration_cycles
368            .checked_add(cycles)
369            .ok_or(Error::CyclesExceeded)?;
370        self.iterate_process_results(id, result)
371    }
372
373    fn process_message_box(&mut self) -> Result<(), Error> {
374        let messages: Vec<Message> = self.message_box.lock().expect("lock").drain(..).collect();
375        for message in messages {
376            match message {
377                Message::ExecV2(vm_id, args) => {
378                    let (old_context, old_machine) = self
379                        .instantiated
380                        .get_mut(&vm_id)
381                        .ok_or_else(|| Error::Unexpected("Unable to find VM Id".to_string()))?;
382                    old_machine
383                        .machine
384                        .add_cycles_no_checking(EXEC_LOAD_ELF_V2_CYCLES_BASE)?;
385                    let old_cycles = old_machine.machine.cycles();
386                    let max_cycles = old_machine.machine.max_cycles();
387                    let program = {
388                        let mut sc = old_context.snapshot2_context.lock().expect("lock");
389                        sc.load_data(
390                            &args.location.data_piece_id,
391                            args.location.offset,
392                            args.location.length,
393                        )?
394                        .0
395                    };
396                    let (context, mut new_machine) = self.create_dummy_vm(&vm_id)?;
397                    new_machine.set_max_cycles(max_cycles);
398                    new_machine.machine.add_cycles_no_checking(old_cycles)?;
399                    self.load_vm_program(
400                        &context,
401                        &mut new_machine,
402                        &args.location,
403                        program,
404                        VmArgs::Reader {
405                            vm_id,
406                            argc: args.argc,
407                            argv: args.argv,
408                        },
409                    )?;
410                    // The insert operation removes the old vm instance and adds the new vm instance.
411                    debug_assert!(self.instantiated.contains_key(&vm_id));
412                    self.instantiated.insert(vm_id, (context, new_machine));
413                }
414                Message::Spawn(vm_id, args) => {
415                    // All fds must belong to the correct owner
416                    if args.fds.iter().any(|fd| self.fds.get(fd) != Some(&vm_id)) {
417                        let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
418                        machine.machine.set_register(A0, INVALID_FD as u64);
419                        continue;
420                    }
421                    if self.suspended.len() + self.instantiated.len() > MAX_VMS_COUNT as usize {
422                        let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
423                        machine.machine.set_register(A0, MAX_VMS_SPAWNED as u64);
424                        continue;
425                    }
426                    let spawned_vm_id = self.boot_vm(
427                        &args.location,
428                        VmArgs::Reader {
429                            vm_id,
430                            argc: args.argc,
431                            argv: args.argv,
432                        },
433                    )?;
434                    // Move passed fds from spawner to spawnee
435                    for fd in &args.fds {
436                        self.fds.insert(*fd, spawned_vm_id);
437                    }
438                    // Here we keep the original version of file descriptors.
439                    // If one fd is moved afterward, this inherited file descriptors doesn't change.
440                    self.inherited_fd.insert(spawned_vm_id, args.fds.clone());
441
442                    let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
443                    machine
444                        .machine
445                        .memory_mut()
446                        .store64(&args.process_id_addr, &spawned_vm_id)?;
447                    machine.machine.set_register(A0, SUCCESS as u64);
448                }
449                Message::Wait(vm_id, args) => {
450                    if let Some(exit_code) = self.terminated_vms.get(&args.target_id).copied() {
451                        let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
452                        machine
453                            .machine
454                            .memory_mut()
455                            .store8(&args.exit_code_addr, &u64::from_i8(exit_code))?;
456                        machine.machine.set_register(A0, SUCCESS as u64);
457                        self.states.insert(vm_id, VmState::Runnable);
458                        self.terminated_vms.retain(|id, _| id != &args.target_id);
459                        continue;
460                    }
461                    if !self.states.contains_key(&args.target_id) {
462                        let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
463                        machine.machine.set_register(A0, WAIT_FAILURE as u64);
464                        continue;
465                    }
466                    // Return code will be updated when the joining VM exits
467                    self.states.insert(
468                        vm_id,
469                        VmState::Wait {
470                            target_vm_id: args.target_id,
471                            exit_code_addr: args.exit_code_addr,
472                        },
473                    );
474                }
475                Message::Pipe(vm_id, args) => {
476                    if self.fds.len() as u64 >= MAX_FDS {
477                        let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
478                        machine.machine.set_register(A0, MAX_FDS_CREATED as u64);
479                        continue;
480                    }
481                    let (p1, p2, slot) = Fd::create(self.next_fd_slot);
482                    self.next_fd_slot = slot;
483                    self.fds.insert(p1, vm_id);
484                    self.fds.insert(p2, vm_id);
485                    let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
486                    machine
487                        .machine
488                        .memory_mut()
489                        .store64(&args.fd1_addr, &p1.0)?;
490                    machine
491                        .machine
492                        .memory_mut()
493                        .store64(&args.fd2_addr, &p2.0)?;
494                    machine.machine.set_register(A0, SUCCESS as u64);
495                }
496                Message::FdRead(vm_id, args) => {
497                    if !(self.fds.contains_key(&args.fd) && (self.fds[&args.fd] == vm_id)) {
498                        let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
499                        machine.machine.set_register(A0, INVALID_FD as u64);
500                        continue;
501                    }
502                    if !self.fds.contains_key(&args.fd.other_fd()) {
503                        let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
504                        machine.machine.set_register(A0, OTHER_END_CLOSED as u64);
505                        continue;
506                    }
507                    // Return code will be updated when the read operation finishes
508                    self.states.insert(
509                        vm_id,
510                        VmState::WaitForRead(ReadState {
511                            fd: args.fd,
512                            length: args.length,
513                            buffer_addr: args.buffer_addr,
514                            length_addr: args.length_addr,
515                        }),
516                    );
517                }
518                Message::FdWrite(vm_id, args) => {
519                    if !(self.fds.contains_key(&args.fd) && (self.fds[&args.fd] == vm_id)) {
520                        let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
521                        machine.machine.set_register(A0, INVALID_FD as u64);
522                        continue;
523                    }
524                    if !self.fds.contains_key(&args.fd.other_fd()) {
525                        let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
526                        machine.machine.set_register(A0, OTHER_END_CLOSED as u64);
527                        continue;
528                    }
529                    // Return code will be updated when the write operation finishes
530                    self.states.insert(
531                        vm_id,
532                        VmState::WaitForWrite(WriteState {
533                            fd: args.fd,
534                            consumed: 0,
535                            length: args.length,
536                            buffer_addr: args.buffer_addr,
537                            length_addr: args.length_addr,
538                        }),
539                    );
540                }
541                Message::InheritedFileDescriptor(vm_id, args) => {
542                    let inherited_fd = if vm_id == ROOT_VM_ID {
543                        Vec::new()
544                    } else {
545                        self.inherited_fd[&vm_id].clone()
546                    };
547                    let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
548                    let FdArgs {
549                        buffer_addr,
550                        length_addr,
551                        ..
552                    } = args;
553                    let full_length = machine
554                        .machine
555                        .inner_mut()
556                        .memory_mut()
557                        .load64(&length_addr)?;
558                    let real_length = inherited_fd.len() as u64;
559                    let copy_length = u64::min(full_length, real_length);
560                    for i in 0..copy_length {
561                        let fd = inherited_fd[i as usize].0;
562                        let addr = buffer_addr.checked_add(i * 8).ok_or(Error::MemOutOfBound)?;
563                        machine
564                            .machine
565                            .inner_mut()
566                            .memory_mut()
567                            .store64(&addr, &fd)?;
568                    }
569                    machine
570                        .machine
571                        .inner_mut()
572                        .memory_mut()
573                        .store64(&length_addr, &real_length)?;
574                    machine.machine.set_register(A0, SUCCESS as u64);
575                }
576                Message::Close(vm_id, fd) => {
577                    if self.fds.get(&fd) != Some(&vm_id) {
578                        let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
579                        machine.machine.set_register(A0, INVALID_FD as u64);
580                    } else {
581                        self.fds.remove(&fd);
582                        let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
583                        machine.machine.set_register(A0, SUCCESS as u64);
584                    }
585                }
586            }
587        }
588        Ok(())
589    }
590
591    fn process_io(&mut self) -> Result<(), Error> {
592        let mut reads: HashMap<Fd, (VmId, ReadState)> = HashMap::default();
593        let mut closed_fds: Vec<VmId> = Vec::new();
594        self.states.iter().for_each(|(vm_id, state)| {
595            if let VmState::WaitForRead(inner_state) = state {
596                if self.fds.contains_key(&inner_state.fd.other_fd()) {
597                    reads.insert(inner_state.fd, (*vm_id, inner_state.clone()));
598                } else {
599                    closed_fds.push(*vm_id);
600                }
601            }
602        });
603        let mut pairs: Vec<(VmId, ReadState, VmId, WriteState)> = Vec::new();
604        self.states.iter().for_each(|(vm_id, state)| {
605            if let VmState::WaitForWrite(inner_state) = state {
606                if self.fds.contains_key(&inner_state.fd.other_fd()) {
607                    if let Some((read_vm_id, read_state)) = reads.get(&inner_state.fd.other_fd()) {
608                        pairs.push((*read_vm_id, read_state.clone(), *vm_id, inner_state.clone()));
609                    }
610                } else {
611                    closed_fds.push(*vm_id);
612                }
613            }
614        });
615        // Finish read / write syscalls for fds that are closed on the other end
616        for vm_id in closed_fds {
617            match self.states[&vm_id].clone() {
618                VmState::WaitForRead(ReadState { length_addr, .. }) => {
619                    let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
620                    machine.machine.memory_mut().store64(&length_addr, &0)?;
621                    machine.machine.set_register(A0, SUCCESS as u64);
622                    self.states.insert(vm_id, VmState::Runnable);
623                }
624                VmState::WaitForWrite(WriteState {
625                    consumed,
626                    length_addr,
627                    ..
628                }) => {
629                    let (_, machine) = self.ensure_get_instantiated(&vm_id)?;
630                    machine
631                        .machine
632                        .memory_mut()
633                        .store64(&length_addr, &consumed)?;
634                    machine.machine.set_register(A0, SUCCESS as u64);
635                    self.states.insert(vm_id, VmState::Runnable);
636                }
637                _ => (),
638            }
639        }
640        // Transferring data from write fds to read fds
641        for (read_vm_id, read_state, write_vm_id, write_state) in pairs {
642            let ReadState {
643                length: read_length,
644                buffer_addr: read_buffer_addr,
645                length_addr: read_length_addr,
646                ..
647            } = read_state;
648            let WriteState {
649                fd: write_fd,
650                mut consumed,
651                length: write_length,
652                buffer_addr: write_buffer_addr,
653                length_addr: write_length_addr,
654            } = write_state;
655
656            self.ensure_vms_instantiated(&[read_vm_id, write_vm_id])?;
657            {
658                let fillable = read_length;
659                let consumable = write_length - consumed;
660                let copiable = std::cmp::min(fillable, consumable);
661
662                // Actual data copying
663                let (_, write_machine) = self
664                    .instantiated
665                    .get_mut(&write_vm_id)
666                    .ok_or_else(|| Error::Unexpected("Unable to find VM Id".to_string()))?;
667                write_machine
668                    .machine
669                    .add_cycles_no_checking(transferred_byte_cycles(copiable))?;
670                let data = write_machine
671                    .machine
672                    .memory_mut()
673                    .load_bytes(write_buffer_addr.wrapping_add(consumed), copiable)?;
674                let (_, read_machine) = self
675                    .instantiated
676                    .get_mut(&read_vm_id)
677                    .ok_or_else(|| Error::Unexpected("Unable to find VM Id".to_string()))?;
678                read_machine
679                    .machine
680                    .add_cycles_no_checking(transferred_byte_cycles(copiable))?;
681                read_machine
682                    .machine
683                    .memory_mut()
684                    .store_bytes(read_buffer_addr, &data)?;
685                // Read syscall terminates as soon as some data are filled
686                read_machine
687                    .machine
688                    .memory_mut()
689                    .store64(&read_length_addr, &copiable)?;
690                read_machine.machine.set_register(A0, SUCCESS as u64);
691                self.states.insert(read_vm_id, VmState::Runnable);
692
693                // Write syscall, however, terminates only when all the data
694                // have been written, or when the pairing read fd is closed.
695                consumed += copiable;
696                if consumed == write_length {
697                    // Write VM has fulfilled its write request
698                    let (_, write_machine) = self
699                        .instantiated
700                        .get_mut(&write_vm_id)
701                        .ok_or_else(|| Error::Unexpected("Unable to find VM Id".to_string()))?;
702                    write_machine
703                        .machine
704                        .memory_mut()
705                        .store64(&write_length_addr, &write_length)?;
706                    write_machine.machine.set_register(A0, SUCCESS as u64);
707                    self.states.insert(write_vm_id, VmState::Runnable);
708                } else {
709                    // Only update write VM state
710                    self.states.insert(
711                        write_vm_id,
712                        VmState::WaitForWrite(WriteState {
713                            fd: write_fd,
714                            consumed,
715                            length: write_length,
716                            buffer_addr: write_buffer_addr,
717                            length_addr: write_length_addr,
718                        }),
719                    );
720                }
721            }
722        }
723        Ok(())
724    }
725
726    // Ensure VMs are instantiated
727    fn ensure_vms_instantiated(&mut self, ids: &[VmId]) -> Result<(), Error> {
728        if ids.len() > MAX_INSTANTIATED_VMS {
729            return Err(Error::Unexpected(format!(
730                "At most {} VMs can be instantiated but {} are requested!",
731                MAX_INSTANTIATED_VMS,
732                ids.len()
733            )));
734        }
735
736        let mut uninstantiated_ids: Vec<VmId> = ids
737            .iter()
738            .filter(|id| !self.instantiated.contains_key(id))
739            .copied()
740            .collect();
741        while (!uninstantiated_ids.is_empty()) && (self.instantiated.len() < MAX_INSTANTIATED_VMS) {
742            let id = uninstantiated_ids
743                .pop()
744                .ok_or_else(|| Error::Unexpected("Map should not be empty".to_string()))?;
745            self.resume_vm(&id)?;
746        }
747
748        if !uninstantiated_ids.is_empty() {
749            // Instantiated is a BTreeMap, an iterator on it maintains key order to ensure deterministic behavior
750            let suspendable_ids: Vec<VmId> = self
751                .instantiated
752                .keys()
753                .filter(|id| !ids.contains(id))
754                .copied()
755                .collect();
756
757            assert!(suspendable_ids.len() >= uninstantiated_ids.len());
758            for i in 0..uninstantiated_ids.len() {
759                self.suspend_vm(&suspendable_ids[i])?;
760                self.resume_vm(&uninstantiated_ids[i])?;
761            }
762        }
763
764        Ok(())
765    }
766
767    // Ensure corresponding VM is instantiated and return a mutable reference to it
768    fn ensure_get_instantiated(
769        &mut self,
770        id: &VmId,
771    ) -> Result<&mut (VmContext<DL>, Machine), Error> {
772        self.ensure_vms_instantiated(&[*id])?;
773        self.instantiated
774            .get_mut(id)
775            .ok_or_else(|| Error::Unexpected("Unable to find VM Id".to_string()))
776    }
777
778    // Resume a suspended VM
779    fn resume_vm(&mut self, id: &VmId) -> Result<(), Error> {
780        if !self.suspended.contains_key(id) {
781            return Err(Error::Unexpected(format!("VM {:?} is not suspended!", id)));
782        }
783        let snapshot = &self.suspended[id];
784        self.iteration_cycles = self
785            .iteration_cycles
786            .checked_add(SPAWN_EXTRA_CYCLES_BASE)
787            .ok_or(Error::CyclesExceeded)?;
788        let (context, mut machine) = self.create_dummy_vm(id)?;
789        {
790            let mut sc = context.snapshot2_context.lock().expect("lock");
791            sc.resume(&mut machine.machine, snapshot)?;
792        }
793        self.instantiated.insert(*id, (context, machine));
794        self.suspended.remove(id);
795        Ok(())
796    }
797
798    // Suspend an instantiated VM
799    fn suspend_vm(&mut self, id: &VmId) -> Result<(), Error> {
800        if !self.instantiated.contains_key(id) {
801            return Err(Error::Unexpected(format!(
802                "VM {:?} is not instantiated!",
803                id
804            )));
805        }
806        self.iteration_cycles = self
807            .iteration_cycles
808            .checked_add(SPAWN_EXTRA_CYCLES_BASE)
809            .ok_or(Error::CyclesExceeded)?;
810        let (context, machine) = self
811            .instantiated
812            .get_mut(id)
813            .ok_or_else(|| Error::Unexpected("Unable to find VM Id".to_string()))?;
814        let snapshot = {
815            let sc = context.snapshot2_context.lock().expect("lock");
816            sc.make_snapshot(&mut machine.machine)?
817        };
818        self.suspended.insert(*id, snapshot);
819        self.instantiated.remove(id);
820        Ok(())
821    }
822
823    /// Boot a vm by given program and args.
824    pub fn boot_vm(&mut self, location: &DataLocation, args: VmArgs) -> Result<VmId, Error> {
825        let id = self.next_vm_id;
826        self.next_vm_id += 1;
827        let (context, mut machine) = self.create_dummy_vm(&id)?;
828        let (program, _) = {
829            let mut sc = context.snapshot2_context.lock().expect("lock");
830            sc.load_data(&location.data_piece_id, location.offset, location.length)?
831        };
832        self.load_vm_program(&context, &mut machine, location, program, args)?;
833        // Newly booted VM will be instantiated by default
834        while self.instantiated.len() >= MAX_INSTANTIATED_VMS {
835            // Instantiated is a BTreeMap, first_entry will maintain key order
836            let id = *self
837                .instantiated
838                .first_entry()
839                .ok_or_else(|| Error::Unexpected("Map should not be empty".to_string()))?
840                .key();
841            self.suspend_vm(&id)?;
842        }
843
844        self.instantiated.insert(id, (context, machine));
845        self.states.insert(id, VmState::Runnable);
846
847        Ok(id)
848    }
849
850    // Load the program into an empty vm.
851    fn load_vm_program(
852        &mut self,
853        context: &VmContext<DL>,
854        machine: &mut Machine,
855        location: &DataLocation,
856        program: Bytes,
857        args: VmArgs,
858    ) -> Result<u64, Error> {
859        let metadata = parse_elf::<u64>(&program, machine.machine.version())?;
860        let bytes = match args {
861            VmArgs::Reader { vm_id, argc, argv } => {
862                let (_, machine_from) = self.ensure_get_instantiated(&vm_id)?;
863                let argv = FlattenedArgsReader::new(machine_from.machine.memory_mut(), argc, argv);
864                machine.load_program_with_metadata(&program, &metadata, argv)?
865            }
866            VmArgs::Vector(data) => {
867                machine.load_program_with_metadata(&program, &metadata, data.into_iter().map(Ok))?
868            }
869        };
870        let mut sc = context.snapshot2_context.lock().expect("lock");
871        sc.mark_program(
872            &mut machine.machine,
873            &metadata,
874            &location.data_piece_id,
875            location.offset,
876        )?;
877        machine
878            .machine
879            .add_cycles_no_checking(transferred_byte_cycles(bytes))?;
880        Ok(bytes)
881    }
882
883    // Create a new VM instance with syscalls attached
884    fn create_dummy_vm(&self, id: &VmId) -> Result<(VmContext<DL>, Machine), Error> {
885        // The code here looks slightly weird, since I don't want to copy over all syscall
886        // impls here again. Ideally, this scheduler package should be merged with ckb-script,
887        // or simply replace ckb-script. That way, the quirks here will be eliminated.
888        let version = &self.sg_data.sg_info.script_version;
889        let core_machine = CoreMachineType::new(
890            version.vm_isa(),
891            version.vm_version(),
892            // We will update max_cycles for each machine when it gets a chance to run
893            u64::MAX,
894        );
895        let vm_context = VmContext {
896            base_cycles: Arc::clone(&self.total_cycles),
897            message_box: Arc::clone(&self.message_box),
898            snapshot2_context: Arc::new(Mutex::new(Snapshot2Context::new(self.sg_data.clone()))),
899        };
900
901        let machine_builder = DefaultMachineBuilder::new(core_machine)
902            .instruction_cycle_func(Box::new(estimate_cycles));
903        let machine_builder =
904            generate_ckb_syscalls(id, &self.sg_data, &vm_context, &self.debug_context)
905                .into_iter()
906                .fold(machine_builder, |builder, syscall| builder.syscall(syscall));
907        let default_machine = machine_builder.build();
908        Ok((vm_context, Machine::new(default_machine)))
909    }
910}