use std::mem::replace;
use std::panic::{catch_unwind, AssertUnwindSafe};
use std::pin::Pin;
use std::sync::atomic::{AtomicPtr, AtomicUsize, Ordering};
use std::{fmt, ptr};
use crate::shared::scheduler::{ProcessData, RunQueue};
use crate::ProcessId;
const LEVEL_SHIFT: usize = 2;
const N_BRANCHES: usize = 1 << LEVEL_SHIFT; const LEVEL_MASK: usize = (1 << LEVEL_SHIFT) - 1;
const SKIP_BITS: usize = 2;
const SKIP_MASK: usize = (1 << SKIP_BITS) - 1;
pub(super) fn ok_ptr(ptr: *const ()) -> bool {
ptr as usize & SKIP_MASK == 0
}
#[derive(Debug)]
pub(super) struct Inactive {
root: Branch,
length: AtomicUsize,
}
impl Inactive {
pub(super) const fn empty() -> Inactive {
Inactive {
root: Branch::empty(),
length: AtomicUsize::new(0),
}
}
pub(super) fn len(&self) -> usize {
let len = self.length.load(Ordering::Relaxed);
if len & (1 << 63) == 0 {
len
} else {
0
}
}
pub(super) fn has_process(&self) -> bool {
self.length.load(Ordering::Relaxed) != 0
}
pub(super) fn add(&self, process: Pin<Box<ProcessData>>, run_queue: &RunQueue) {
let pid = process.as_ref().id();
debug_assert!(ok_ptr(pid.0 as *mut ()));
let changed = self.root.add(process, pid.0 >> SKIP_BITS, 0, run_queue);
self.update_length(changed);
}
pub(super) fn mark_ready(&self, pid: ProcessId, run_queue: &RunQueue) {
debug_assert!(ok_ptr(pid.0 as *mut ()));
let changed = self.root.mark_ready(pid, pid.0 >> SKIP_BITS, 0, run_queue);
self.update_length(changed);
}
pub(super) fn complete(&self, process: Pin<Box<ProcessData>>) {
let pid = process.as_ref().id();
debug_assert!(ok_ptr(pid.0 as *mut ()));
let tagged_pid = ready_to_run(pid);
let mut node = &self.root;
let mut w_pid = pid.0 >> SKIP_BITS;
let mut old_ptr = node.branches[w_pid & LEVEL_MASK].load(Ordering::Acquire);
loop {
if old_ptr == tagged_pid {
debug_assert!(is_ready_marker(old_ptr));
match node.branches[w_pid & LEVEL_MASK].compare_exchange(
tagged_pid,
ptr::null_mut(),
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => {
break;
}
Err(old) => old_ptr = old,
}
} else if old_ptr.is_null() || is_process(old_ptr) || is_ready_marker(old_ptr) {
break;
} else {
debug_assert!(is_branch(old_ptr));
let branch_ptr: *mut Branch = as_ptr(old_ptr).cast();
w_pid >>= LEVEL_SHIFT;
debug_assert!(!branch_ptr.is_null());
node = unsafe { &*branch_ptr };
old_ptr = node.branches[w_pid & LEVEL_MASK].load(Ordering::Acquire);
}
}
drop(catch_unwind(AssertUnwindSafe(move || drop(process))));
}
fn update_length(&self, n: isize) {
#[allow(clippy::cast_sign_loss)]
match n {
0 => {}
n if n.is_negative() => {
let _ = self.length.fetch_sub(-n as usize, Ordering::AcqRel);
}
n => {
let _ = self.length.fetch_add(n as usize, Ordering::AcqRel);
}
}
}
}
struct Branch {
branches: [AtomicPtr<()>; N_BRANCHES],
}
type TaggedPointer = *mut ();
const TAG_BITS: usize = 2;
const TAG_MASK: usize = (1 << TAG_BITS) - 1;
const BRANCH_TAG: usize = 0b00;
const PROCESS_TAG: usize = 0b01;
const READY_TO_RUN: usize = 0b10;
impl Branch {
const fn empty() -> Branch {
#[allow(clippy::declare_interior_mutable_const)]
const NONE: AtomicPtr<()> = AtomicPtr::new(ptr::null_mut());
Branch {
branches: [NONE; N_BRANCHES],
}
}
fn add(
&self,
process: Pin<Box<ProcessData>>,
w_pid: usize,
depth: usize,
run_queue: &RunQueue,
) -> isize {
let process = tag_process(process);
self._add(process, w_pid, depth, run_queue)
}
fn _add(
&self,
process: TaggedPointer,
mut w_pid: usize,
mut depth: usize,
run_queue: &RunQueue,
) -> isize {
debug_assert!(is_process(process));
let mut node = self;
let mut old_ptr = node.branches[w_pid & LEVEL_MASK].load(Ordering::Acquire);
let mut changed = 0;
loop {
if old_ptr.is_null() {
match node.branches[w_pid & LEVEL_MASK].compare_exchange(
ptr::null_mut(),
process,
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => return changed + 1,
Err(old) => old_ptr = old,
}
} else if is_branch(old_ptr) {
let branch_ptr: *mut Branch = as_ptr(old_ptr).cast();
w_pid >>= LEVEL_SHIFT;
depth += 1;
debug_assert!(!branch_ptr.is_null());
node = unsafe { &*branch_ptr };
old_ptr = node.branches[w_pid & LEVEL_MASK].load(Ordering::Acquire);
} else if is_ready_marker(old_ptr) && as_pid(old_ptr) == as_pid(process) {
match node.branches[w_pid & LEVEL_MASK].compare_exchange(
old_ptr,
ptr::null_mut(),
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(old) => {
debug_assert!(is_ready_marker(old));
debug_assert!(as_pid(old) == as_pid(process));
debug_assert!(is_process(process));
let process = unsafe { process_from_tagged(process) };
run_queue.add(process);
return changed;
}
Err(old) => old_ptr = old,
}
} else {
debug_assert!(is_process(old_ptr) || is_ready_marker(old_ptr));
match node.branches[w_pid & LEVEL_MASK].compare_exchange(
old_ptr,
ptr::null_mut(),
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(other_process) => {
let req_depth = diff_branch_depth(as_pid(other_process), as_pid(process));
debug_assert!(req_depth > depth);
changed +=
node.add_branches(req_depth, ptr::null_mut(), w_pid, depth, run_queue);
changed += if is_process(other_process) {
let w_pid = wpid_for(other_process, depth);
node._add(other_process, w_pid, depth, run_queue) - 1
} else {
debug_assert!(is_ready_marker(other_process));
let w_pid = wpid_for(other_process, depth);
node._mark_ready(other_process, w_pid, depth, run_queue)
};
old_ptr = node.branches[w_pid & LEVEL_MASK].load(Ordering::Acquire);
}
Err(old) => old_ptr = old,
}
}
}
}
fn mark_ready(
&self,
pid: ProcessId,
w_pid: usize,
depth: usize,
run_queue: &RunQueue,
) -> isize {
let marker = ready_to_run(pid);
self._mark_ready(marker, w_pid, depth, run_queue)
}
#[allow(clippy::cognitive_complexity)]
fn _mark_ready(
&self,
marker: TaggedPointer,
mut w_pid: usize,
mut depth: usize,
run_queue: &RunQueue,
) -> isize {
debug_assert!(is_ready_marker(marker));
let mut node = self;
let mut old_ptr = node.branches[w_pid & LEVEL_MASK].load(Ordering::Acquire);
let mut changed = 0;
loop {
if old_ptr.is_null() {
match node.branches[w_pid & LEVEL_MASK].compare_exchange(
old_ptr,
marker,
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => return changed,
Err(old) => old_ptr = old,
}
} else if is_branch(old_ptr) {
let branch_ptr: *mut Branch = as_ptr(old_ptr).cast();
w_pid >>= LEVEL_SHIFT;
depth += 1;
debug_assert!(!branch_ptr.is_null());
node = unsafe { &*branch_ptr };
old_ptr = node.branches[w_pid & LEVEL_MASK].load(Ordering::Acquire);
} else if is_ready_marker(old_ptr) && as_pid(old_ptr) == as_pid(marker) {
return changed;
} else if is_process(old_ptr) && as_pid(old_ptr) == as_pid(marker) {
match node.branches[w_pid & LEVEL_MASK].compare_exchange(
old_ptr,
ptr::null_mut(),
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => {
debug_assert!(is_process(old_ptr));
debug_assert!(!as_ptr(old_ptr).is_null());
let process = unsafe { process_from_tagged(old_ptr) };
run_queue.add(process);
return changed - 1;
}
Err(old) => old_ptr = old,
}
} else {
debug_assert!(is_process(old_ptr) || is_ready_marker(old_ptr));
match node.branches[w_pid & LEVEL_MASK].compare_exchange(
old_ptr,
ptr::null_mut(),
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(other_process) => {
debug_assert!(is_process(other_process) || is_ready_marker(other_process));
let req_depth = diff_branch_depth(as_pid(other_process), as_pid(marker));
debug_assert!(req_depth > depth);
changed +=
node.add_branches(req_depth, ptr::null_mut(), w_pid, depth, run_queue);
changed += if is_process(other_process) {
debug_assert!(is_process(other_process));
let w_pid = wpid_for(other_process, depth);
node._add(other_process, w_pid, depth, run_queue) - 1
} else {
debug_assert!(is_ready_marker(other_process));
let w_pid = wpid_for(other_process, depth);
node._mark_ready(other_process, w_pid, depth, run_queue)
};
old_ptr = node.branches[w_pid & LEVEL_MASK].load(Ordering::Acquire);
}
Err(old) => old_ptr = old,
}
}
}
}
fn add_branches(
&self,
req_depth: usize,
mut old_ptr: TaggedPointer,
mut w_pid: usize,
mut depth: usize,
run_queue: &RunQueue,
) -> isize {
let mut node = self;
let mut w_branch = None;
let mut changed = 0;
while depth < req_depth {
let branch = if let Some(branch) = w_branch.take() {
debug_assert!(is_branch(branch));
branch
} else {
let branch = Box::pin(Branch::empty());
tag_branch(branch)
};
match node.branches[w_pid & LEVEL_MASK].compare_exchange(
old_ptr,
branch,
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(old) => {
debug_assert!(is_branch(branch));
let branch_ptr: *mut Branch = as_ptr(branch).cast();
w_pid >>= LEVEL_SHIFT;
depth += 1;
debug_assert!(!branch_ptr.is_null());
node = unsafe { &*branch_ptr.cast() };
old_ptr = node.branches[w_pid & LEVEL_MASK].load(Ordering::Acquire);
if is_process(old) {
debug_assert!(is_process(old));
let w_pid = wpid_for(old, depth);
changed += node._add(old, w_pid, depth, run_queue) - 1;
} else if is_ready_marker(old) {
debug_assert!(is_ready_marker(old));
let w_pid = wpid_for(old, depth);
changed += node._mark_ready(old, w_pid, depth, run_queue);
} else {
debug_assert!(old_ptr.is_null());
}
}
Err(old) => {
w_branch = Some(branch);
old_ptr = old
}
}
while !as_ptr(old_ptr).is_null() && is_branch(old_ptr) {
let branch_ptr: *mut Branch = as_ptr(old_ptr).cast();
w_pid >>= LEVEL_SHIFT;
depth += 1;
debug_assert!(!branch_ptr.is_null());
node = unsafe { &*branch_ptr.cast() };
old_ptr = node.branches[w_pid & LEVEL_MASK].load(Ordering::Acquire);
}
}
if let Some(branch) = w_branch {
unsafe { drop(branch_from_tagged(branch)) };
}
changed
}
}
impl fmt::Debug for Branch {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fn debug_pointer(ptr: &AtomicPtr<()>) -> &dyn fmt::Debug {
let ptr = ptr.load(Ordering::Relaxed);
match ptr as usize & TAG_MASK {
_ if ptr.is_null() => &"null",
BRANCH_TAG => {
let branch: *mut Branch = as_ptr(ptr).cast();
unsafe { &*branch }
}
PROCESS_TAG => &"process",
READY_TO_RUN => &"marker",
_ => unreachable!(),
}
}
f.debug_map()
.entry(&"00", debug_pointer(&self.branches[0]))
.entry(&"01", debug_pointer(&self.branches[1]))
.entry(&"10", debug_pointer(&self.branches[2]))
.entry(&"11", debug_pointer(&self.branches[3]))
.finish()
}
}
fn diff_branch_depth(pid1: ProcessId, pid2: ProcessId) -> usize {
debug_assert!(pid1 != pid2);
((pid1.0 ^ pid2.0) >> SKIP_BITS).trailing_zeros() as usize / LEVEL_SHIFT
}
fn tag_process(process: Pin<Box<ProcessData>>) -> TaggedPointer {
#[allow(trivial_casts)]
let ptr = Box::into_raw(Pin::into_inner(process)) as *mut _;
debug_assert!(ok_ptr(ptr));
(ptr as usize | PROCESS_TAG) as *mut ()
}
fn tag_branch(branch: Pin<Box<Branch>>) -> TaggedPointer {
#[allow(trivial_casts)]
let ptr = Box::into_raw(Pin::into_inner(branch)) as *mut _;
debug_assert!(ok_ptr(ptr));
(ptr as usize | BRANCH_TAG) as *mut ()
}
fn ready_to_run(pid: ProcessId) -> TaggedPointer {
debug_assert!(ok_ptr(pid.0 as *mut ()));
(pid.0 | READY_TO_RUN) as *mut ()
}
unsafe fn process_from_tagged(ptr: TaggedPointer) -> Pin<Box<ProcessData>> {
debug_assert!(is_process(ptr));
Pin::new(Box::from_raw(as_ptr(ptr).cast()))
}
unsafe fn branch_from_tagged(ptr: TaggedPointer) -> Pin<Box<Branch>> {
debug_assert!(is_branch(ptr));
Pin::new(Box::from_raw(as_ptr(ptr).cast()))
}
fn is_branch(ptr: TaggedPointer) -> bool {
has_tag(ptr, BRANCH_TAG)
}
fn is_process(ptr: TaggedPointer) -> bool {
has_tag(ptr, PROCESS_TAG)
}
fn is_ready_marker(ptr: TaggedPointer) -> bool {
has_tag(ptr, READY_TO_RUN)
}
fn has_tag(ptr: TaggedPointer, tag: usize) -> bool {
(ptr as usize & TAG_MASK) == tag
}
fn as_pid(ptr: TaggedPointer) -> ProcessId {
debug_assert!(is_process(ptr) || is_ready_marker(ptr));
ProcessId(as_ptr(ptr) as usize)
}
fn as_ptr(ptr: TaggedPointer) -> *mut () {
(ptr as usize & !TAG_MASK) as *mut ()
}
fn wpid_for(ptr: TaggedPointer, depth: usize) -> usize {
ptr as usize >> ((depth * LEVEL_SHIFT) + SKIP_BITS)
}
impl Drop for Branch {
fn drop(&mut self) {
for ptr in &mut self.branches {
let ptr = replace(ptr.get_mut(), ptr::null_mut());
unsafe { drop_tagged_pointer(ptr) };
}
}
}
unsafe fn drop_tagged_pointer(ptr: TaggedPointer) {
if ptr.is_null() {
return;
}
match ptr as usize & TAG_MASK {
BRANCH_TAG => drop(branch_from_tagged(ptr)),
PROCESS_TAG => drop(process_from_tagged(ptr)),
READY_TO_RUN => { }
_ => unreachable!(),
}
}
#[cfg(test)]
mod tests {
use std::mem::{align_of, size_of};
use std::pin::Pin;
use std::ptr;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use crate::process::{Process, ProcessId, ProcessResult};
use crate::shared::scheduler::RunQueue;
use crate::spawn::options::Priority;
use crate::RuntimeRef;
use super::{
as_pid, branch_from_tagged, diff_branch_depth, drop_tagged_pointer, is_branch, is_process,
is_ready_marker, process_from_tagged, ready_to_run, tag_branch, tag_process, Branch,
Inactive, ProcessData, TaggedPointer,
};
#[test]
fn pointer_is_send() {
fn assert_send<T: Send>() {}
fn assert_sync<T: Sync>() {}
assert_send::<Pin<Box<ProcessData>>>();
assert_sync::<Pin<Box<ProcessData>>>();
assert_send::<Pin<Box<Branch>>>();
assert_sync::<Pin<Box<Branch>>>();
assert_send::<Inactive>();
assert_sync::<Inactive>();
}
struct TestProcess;
impl Process for TestProcess {
fn name(&self) -> &'static str {
"TestProcess"
}
fn run(self: Pin<&mut Self>, _: &mut RuntimeRef, _: ProcessId) -> ProcessResult {
unimplemented!()
}
}
fn test_process() -> Pin<Box<ProcessData>> {
Box::pin(ProcessData::new(Priority::default(), Box::pin(TestProcess)))
}
#[test]
fn size_assertions() {
assert_eq!(size_of::<TaggedPointer>(), size_of::<usize>());
assert_eq!(size_of::<Branch>(), 4 * size_of::<usize>());
}
#[test]
fn process_data_alignment() {
assert!(align_of::<ProcessData>() >= 2);
}
#[test]
fn branch_alignment() {
assert!(align_of::<Branch>() >= 2);
}
#[test]
fn test_diff_branch_depth() {
#[rustfmt::skip]
let tests = &[
(0b0000, 0b0100, 0),
(0b0000, 0b1000, 0),
(0b00_0000, 0b01_0000, 1),
(0b00_0000, 0b10_0000, 1),
(0b0000_0000_0000_0000_0000_0000, 0b0100_0000_0000_0000_0000_0000, 10),
(0b0000_0000_0000_0000_0000_0000, 0b1000_0000_0000_0000_0000_0000, 10),
(0b0000_0000_0000_0000_0000_0000, 0b0100_0000_0000_0000_0000_0100, 0),
(0b0000_0000_0000_0000_0000_0000, 0b1000_0000_0000_0000_0000_1000, 0),
(0b0000, 0b0101, 0),
(0b0000, 0b1001, 0),
];
for (pid1, pid2, expected) in tests.iter().copied() {
let pid1 = ProcessId(pid1);
let pid2 = ProcessId(pid2);
let got = diff_branch_depth(pid1, pid2);
assert_eq!(got, expected, "pid1: {}, pid2: {}", pid1, pid2);
}
}
#[test]
fn process_tagging() {
let process = test_process();
let tagged_process = tag_process(process);
assert!(is_process(tagged_process));
assert!(!is_branch(tagged_process));
assert!(!is_ready_marker(tagged_process));
let process = unsafe { process_from_tagged(tagged_process) };
drop(process);
}
#[test]
fn branch_tagging() {
let branch = Box::pin(Branch::empty());
let tagged_branch = tag_branch(branch);
assert!(!is_process(tagged_branch));
assert!(is_branch(tagged_branch));
assert!(!is_ready_marker(tagged_branch));
let branch = unsafe { branch_from_tagged(tagged_branch) };
drop(branch);
}
#[test]
fn ready_marker_tagging() {
let pid = ProcessId(500);
let tagged_pid = ready_to_run(pid);
assert!(!is_process(tagged_pid));
assert!(!is_branch(tagged_pid));
assert!(is_ready_marker(tagged_pid));
let pid2 = as_pid(tagged_pid);
assert_eq!(pid, pid2);
}
struct DropTest(Arc<AtomicUsize>);
impl Drop for DropTest {
fn drop(&mut self) {
let _ = self.0.fetch_add(1, Ordering::SeqCst);
}
}
impl Process for DropTest {
fn name(&self) -> &'static str {
"DropTest"
}
fn run(self: Pin<&mut Self>, _: &mut RuntimeRef, _: ProcessId) -> ProcessResult {
unimplemented!()
}
}
#[test]
fn dropping_tagged_process() {
let dropped = Arc::new(AtomicUsize::new(0));
let process = Box::pin(DropTest(dropped.clone()));
let process = Box::pin(ProcessData::new(Priority::default(), process));
let ptr = tag_process(process);
assert_eq!(dropped.load(Ordering::Acquire), 0);
unsafe { drop_tagged_pointer(ptr) };
assert_eq!(dropped.load(Ordering::Acquire), 1);
}
#[test]
fn dropping_tagged_branch() {
let dropped = Arc::new(AtomicUsize::new(0));
let process = Box::pin(DropTest(dropped.clone()));
let process = Box::pin(ProcessData::new(Priority::default(), process));
let process_ptr = tag_process(process);
let mut branch = Box::pin(Branch::empty());
*branch.branches[0].get_mut() = process_ptr;
let branch_ptr = tag_branch(branch);
assert_eq!(dropped.load(Ordering::Acquire), 0);
unsafe { drop_tagged_pointer(branch_ptr) };
assert_eq!(dropped.load(Ordering::Acquire), 1);
}
#[test]
fn dropping_tagged_pid() {
unsafe { drop_tagged_pointer(ready_to_run(ProcessId(500))) };
}
#[test]
fn dropping_null_tagged_pointer() {
unsafe { drop_tagged_pointer(ptr::null_mut()) };
}
#[test]
fn marking_as_ready_to_run() {
let tests = &[1, 2, 3, 4, 5, 100, 200];
for n in tests {
let tree = Inactive::empty();
let run_queue = RunQueue::empty();
let processes = (0..*n)
.map(|_| {
let process = test_process();
let pid = process.as_ref().id();
tree.mark_ready(pid, &run_queue);
assert!(!run_queue.has_process());
process
})
.collect::<Vec<_>>();
for process in processes {
let pid = process.as_ref().id();
tree.add(process, &run_queue);
assert!(run_queue.has_process());
let process = run_queue.remove().unwrap();
assert_eq!(process.as_ref().id(), pid);
}
}
}
fn add_process(tree: &Inactive, run_queue: &RunQueue) -> ProcessId {
assert!(!run_queue.has_process());
let process = test_process();
let pid = process.as_ref().id();
tree.add(process, run_queue);
assert!(!run_queue.has_process());
pid
}
fn test(remove_order: Vec<usize>) {
let tree = Inactive::empty();
let run_queue = RunQueue::empty();
let pids: Vec<ProcessId> = (0..remove_order.len())
.map(|_| add_process(&tree, &run_queue))
.collect();
assert!(tree.has_process());
println!(
"After adding all {} processes: {:#?}",
remove_order.len(),
tree
);
let mut processes = Vec::with_capacity(pids.len());
for index in remove_order {
assert!(!run_queue.has_process());
let pid = pids[index];
tree.mark_ready(pid, &run_queue);
let process = if let Some(p) = run_queue.remove() {
p
} else {
panic!(
"failed to remove {}th process: pid={:064b} ({}), tree: {:#?}",
index + 1,
pid.0,
pid,
tree
);
};
assert_eq!(process.as_ref().id(), pid);
processes.push(process);
assert!(!run_queue.has_process());
tree.mark_ready(pid, &run_queue);
assert!(!run_queue.has_process());
}
assert!(!tree.has_process(), "tree: {:#?}", tree);
for process in processes {
tree.complete(process);
}
assert!(!tree.has_process());
println!("Ok.");
}
fn combinations(length: usize) -> Vec<Vec<usize>> {
let mut all_indices: Vec<Vec<usize>> = Vec::new();
for start in 0..length {
let iter = (0..length).cycle().skip(start).take(length);
all_indices.push(iter.collect());
}
for idx in 0..all_indices.len() {
for i in 0..length - 1 {
for j in i + 1..length {
let mut new = all_indices[idx].clone();
new.swap(i, j);
all_indices.push(new);
}
}
}
all_indices.sort();
all_indices.dedup();
all_indices
}
macro_rules! inactive_test {
(all $name: ident, $n: expr) => {
#[test]
fn $name() {
let n = $n;
let remove_orders = combinations(n);
for remove_order in remove_orders {
test(remove_order);
}
}
};
(one $name: ident, $n: expr) => {
#[test]
fn $name() {
let n = $n;
let remove_order = (0..n).collect();
test(remove_order);
}
};
}
inactive_test!(all single_process, 1);
inactive_test!(all two_processes, 2);
inactive_test!(all three_processes, 3);
inactive_test!(all four_processes, 4);
inactive_test!(all five_processes, 5);
inactive_test!(all ten_processes, 10);
inactive_test!(one hundred_processes, 100);
inactive_test!(one thousand_processes, 1000);
}