use crate::workqueue::WorkQueue;
use futures::stream::{FuturesOrdered, StreamExt};
use tokio::select;
use tokio::sync::mpsc;
pub struct Engine<T> {
rx: mpsc::Receiver<T>,
}
impl<T: Send + 'static> Engine<T> {
pub fn new<S, F>(queue: WorkQueue<S>, func: F) -> Self
where
F: Send + Copy + 'static + Fn(S) -> T,
S: Send + 'static,
{
let size = queue.size();
let (tx, rx) = mpsc::channel(size);
tokio::spawn(manage_workers(queue, size, tx, func));
Self { rx }
}
pub async fn next(&mut self) -> Option<T> {
self.rx.recv().await
}
}
async fn manage_workers<S, T, F>(
mut queue: WorkQueue<S>,
queue_size: usize,
tx: mpsc::Sender<T>,
func: F,
) where
F: Send + 'static + Copy + Fn(S) -> T,
S: Send + 'static,
T: Send + 'static,
{
let mut workers = FuturesOrdered::new();
'processing: loop {
select! {
biased;
maybe_work = queue.next() => {
if let Some(work) = maybe_work {
let tx = tx.clone();
workers.push(do_work(work, tx, func));
while workers.len() >= queue_size {
workers.next().await;
}
} else {
break 'processing;
}
}
_ = workers.next(), if !workers.is_empty() => {
}
}
}
while workers.next().await.is_some() {
}
}
async fn do_work<S, T, F>(item: S, tx: mpsc::Sender<T>, func: F)
where
F: Send + 'static + Fn(S) -> T,
S: Send + 'static,
T: Send + 'static,
{
let result = tokio::task::spawn_blocking(move || func(item))
.await
.unwrap();
if let Err(err) = tx.send(result).await {
panic!("failed to send result to channel: {}", err);
}
}