#![allow(where_clauses_object_safety, clippy::type_complexity)]
use rand::Rng;
use serde_traitobject as st;
use std::{any, collections::VecDeque, env, marker, mem, thread, time};
use constellation::*;
#[serde_closure::desugar]
type Request = Box<dyn st::sc::FnOnce() -> Response>;
type Response = Box<dyn st::Any>;
struct Process {
sender: Sender<Option<Request>>,
receiver: Receiver<Response>,
queue: VecDeque<Queued<Response>>,
received: usize,
tail: usize,
}
enum Queued<T> {
Awaiting,
Got(T),
Taken,
}
impl<T> Queued<T> {
fn received(&mut self, t: T) {
if let Queued::Awaiting = mem::replace(self, Queued::Got(t)) {
} else {
panic!()
}
}
fn take(&mut self) -> T {
if let Queued::Got(t) = mem::replace(self, Queued::Taken) {
t
} else {
panic!()
}
}
}
struct ProcessPool {
processes: Vec<Process>,
i: usize,
}
#[serde_closure::desugar]
impl ProcessPool {
fn new(processes: usize, resources: Resources) -> Self {
let processes = (0..processes)
.map(|_| {
let child = spawn(
resources,
FnOnce!(move |parent| {
let receiver = Receiver::<Option<Request>>::new(parent);
let sender = Sender::<Response>::new(parent);
while let Some(work) = receiver.recv().block().unwrap() {
let ret = work.call_once_box(());
sender.send(ret).block();
}
}),
)
.block()
.expect("spawn() failed to allocate process");
let sender = Sender::new(child);
let receiver = Receiver::new(child);
let (queue, received, tail) = (VecDeque::new(), 0, 0);
Process {
sender,
receiver,
queue,
received,
tail,
}
})
.collect::<Vec<_>>();
ProcessPool { processes, i: 0 }
}
fn spawn<
F: serde_closure::traits::FnOnce() -> T
+ serde::ser::Serialize
+ serde::de::DeserializeOwned
+ 'static,
T: any::Any + serde::ser::Serialize + serde::de::DeserializeOwned,
>(
&mut self, work: F,
) -> JoinHandle<T> {
let process_index = self.i;
self.i += 1;
if self.i == self.processes.len() {
self.i = 0;
}
let process = &mut self.processes[process_index];
process
.sender
.send(Some(Box::new(FnOnce!(move || {
let work: F = work;
Box::new(work.call_once(())) as Response
})) as Request))
.block();
process.queue.push_back(Queued::Awaiting);
JoinHandle(
process_index,
process.tail + process.queue.len() - 1,
marker::PhantomData,
)
}
fn join<T: any::Any>(&mut self, key: JoinHandle<T>) -> T {
let JoinHandle(process_index, process_offset, _) = key;
drop(key); let process = &mut self.processes[process_index];
while process.received <= process_offset {
process.queue[process.received - process.tail]
.received(process.receiver.recv().block().unwrap());
process.received += 1;
}
let boxed: Box<_> = process.queue[process_offset - process.tail].take();
while let Some(Queued::Taken) = process.queue.front() {
let _ = process.queue.pop_front().unwrap();
process.tail += 1;
}
*Box::<dyn any::Any>::downcast::<T>(boxed.into_any()).unwrap()
}
}
impl Drop for ProcessPool {
fn drop(&mut self) {
for Process { sender, .. } in &self.processes {
sender.send(None).block();
}
}
}
struct JoinHandle<T: any::Any>(usize, usize, marker::PhantomData<fn() -> T>);
fn main() {
init(Resources::default());
let processes = env::args()
.nth(1)
.and_then(|arg| arg.parse::<usize>().ok())
.unwrap_or(10);
let mut pool = ProcessPool::new(processes, Resources::default());
let handles = (0..processes * 3)
.map(|i| {
pool.spawn(FnOnce!(move || -> String {
thread::sleep(
rand::thread_rng()
.gen_range(time::Duration::new(0, 0), time::Duration::new(5, 0)),
);
format!("warm greetings from job {}", i)
}))
})
.collect::<Vec<_>>();
for handle in handles {
println!("{}", pool.join(handle));
}
}