use std::collections::VecDeque;
use std::sync::mpsc::{self, Receiver, Sender};
use std::thread::JoinHandle;
use std::time::Duration;
use crate::error::Result;
use crate::http::{Request, Response};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub struct EasyId(pub u64);
type Completion = (EasyId, Result<Response>);
pub struct Multi {
next_id: u64,
pending: Vec<(EasyId, Request)>,
tx: Sender<Completion>,
rx: Receiver<Completion>,
workers: Vec<(EasyId, JoinHandle<()>)>,
running: usize,
ready: VecDeque<Completion>,
}
impl Default for Multi {
fn default() -> Self {
Self::new()
}
}
impl Multi {
pub fn new() -> Self {
let (tx, rx) = mpsc::channel();
Multi {
next_id: 0,
pending: Vec::new(),
tx,
rx,
workers: Vec::new(),
running: 0,
ready: VecDeque::new(),
}
}
pub fn add(&mut self, req: Request) -> EasyId {
let id = EasyId(self.next_id);
self.next_id += 1;
self.pending.push((id, req));
id
}
pub fn remove(&mut self, id: EasyId) -> bool {
if let Some(pos) = self.pending.iter().position(|(i, _)| *i == id) {
self.pending.remove(pos);
true
} else {
false
}
}
pub fn perform(&mut self) -> usize {
for (id, req) in self.pending.drain(..) {
let tx = self.tx.clone();
let handle = std::thread::spawn(move || {
let _ = tx.send((id, req.send()));
});
self.workers.push((id, handle));
self.running += 1;
}
self.drain_ready();
self.running
}
fn drain_ready(&mut self) {
while let Ok((id, result)) = self.rx.try_recv() {
self.join_worker(id);
self.running -= 1;
self.ready.push_back((id, result));
}
}
fn join_worker(&mut self, id: EasyId) {
if let Some(pos) = self.workers.iter().position(|(i, _)| *i == id) {
let (_, handle) = self.workers.remove(pos);
let _ = handle.join();
}
}
pub fn poll(&mut self, timeout: Option<Duration>) -> bool {
self.drain_ready();
if !self.ready.is_empty() {
return true;
}
if self.running == 0 {
return false;
}
let got = match timeout {
Some(t) => self.rx.recv_timeout(t).ok(),
None => self.rx.recv().ok(),
};
if let Some((id, result)) = got {
self.join_worker(id);
self.running -= 1;
self.ready.push_back((id, result));
self.drain_ready();
true
} else {
false
}
}
pub fn next_completed(&mut self) -> Option<Completion> {
self.drain_ready();
self.ready.pop_front()
}
pub fn running(&self) -> usize {
self.running
}
pub fn is_empty(&self) -> bool {
self.pending.is_empty() && self.running == 0 && self.ready.is_empty()
}
pub fn wait_all(&mut self) -> Vec<Completion> {
self.perform();
let mut out = Vec::new();
loop {
while let Some(c) = self.next_completed() {
out.push(c);
}
if self.running == 0 {
break;
}
self.poll(None);
}
out
}
}
impl Drop for Multi {
fn drop(&mut self) {
for (_, handle) in self.workers.drain(..) {
let _ = handle.join();
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn add_assigns_increasing_ids_and_counts_pending() {
let mut m = Multi::new();
let a = m.add(Request::get("http://127.0.0.1:1/a").unwrap());
let b = m.add(Request::get("http://127.0.0.1:1/b").unwrap());
assert_eq!(a, EasyId(0));
assert_eq!(b, EasyId(1));
assert_eq!(m.running(), 0);
assert!(!m.is_empty());
}
#[test]
fn remove_drops_pending_transfer() {
let mut m = Multi::new();
let a = m.add(Request::get("http://127.0.0.1:1/a").unwrap());
assert!(m.remove(a));
assert!(!m.remove(a));
assert!(m.is_empty());
}
#[test]
fn poll_returns_false_when_nothing_running() {
let mut m = Multi::new();
assert!(!m.poll(Some(Duration::from_millis(10))));
assert!(m.next_completed().is_none());
}
}