pub use cache::{TimedSizedMap, TimedSizedVec, Timestamp};
pub use reduce::{Reduce, ReduceWithEquality, ReduceWithThreshold, ReducedResult, ReductionError};
mod cache;
mod reduce;
#[cfg(test)]
mod tests;
use futures_channel::mpsc;
use futures_util::StreamExt;
use std::collections::{btree_map, btree_map::IntoIter as BTreeMapIntoIter, BTreeMap};
use std::fmt::Debug;
use std::iter::FusedIterator;
use tower::{Service, ServiceExt};
pub async fn parallel_call<S, I, RequestId, Request, Response, Error>(
service: S,
requests: I,
) -> (S, MultiResults<RequestId, Response, Error>)
where
S: Service<Request, Response = Response, Error = Error>,
I: IntoIterator<Item = (RequestId, Request)>,
RequestId: Ord,
{
let (tx_id, rx_id) = mpsc::unbounded();
let (tx, rx) = mpsc::unbounded();
let responses = service.call_all(rx);
for (id, request) in requests.into_iter() {
tx_id.unbounded_send(id).expect("BUG: channel closed");
tx.unbounded_send(request).expect("BUG: channel closed");
}
drop(tx_id);
drop(tx);
let mut results = MultiResults::default();
let mut zip = rx_id.zip(responses);
while let Some((id, response)) = zip.next().await {
results.insert_once(id, response);
}
let (_, parallel_service) = zip.into_inner();
(parallel_service.into_inner(), results)
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct MultiResults<K, V, E> {
ok_results: BTreeMap<K, V>,
errors: BTreeMap<K, E>,
}
impl<K, V, E> Default for MultiResults<K, V, E> {
fn default() -> Self {
Self::new()
}
}
impl<K, V, E> MultiResults<K, V, E> {
pub fn new() -> Self {
Self {
ok_results: BTreeMap::new(),
errors: BTreeMap::new(),
}
}
pub fn into_inner(self) -> (BTreeMap<K, V>, BTreeMap<K, E>) {
(self.ok_results, self.errors)
}
pub fn ok_results(&self) -> &BTreeMap<K, V> {
&self.ok_results
}
pub fn len(&self) -> usize {
self.ok_results.len() + self.errors.len()
}
pub fn is_empty(&self) -> bool {
self.ok_results.is_empty() && self.errors.is_empty()
}
}
impl<K: Ord, V, E> MultiResults<K, V, E> {
pub fn from_non_empty_iter<I>(iter: I) -> Self
where
I: IntoIterator<Item = (K, Result<V, E>)>,
{
let mut results = MultiResults::default();
for (key, result) in iter {
results.insert_once(key, result);
}
assert!(!results.is_empty(), "ERROR: MultiResults cannot be empty");
results
}
pub fn get(&self, id: &K) -> Option<Result<&V, &E>> {
self.ok_results
.get(id)
.map(Ok)
.or_else(|| self.errors.get(id).map(Err))
}
pub fn insert_once(&mut self, key: K, result: Result<V, E>) {
match result {
Ok(value) => {
self.insert_once_ok(key, value);
}
Err(error) => {
self.insert_once_err(key, error);
}
}
}
fn insert_once_ok(&mut self, key: K, value: V) {
assert!(
!self.errors.contains_key(&key),
"ERROR: duplicate key in `errors`"
);
assert!(
self.ok_results.insert(key, value).is_none(),
"ERROR: duplicate key in `ok_results`"
);
}
fn insert_once_err(&mut self, key: K, error: E) {
assert!(
!self.ok_results.contains_key(&key),
"ERROR: duplicate key in `ok_results`"
);
assert!(
self.errors.insert(key, error).is_none(),
"ERROR: duplicate key in `errors`"
);
}
pub fn add_errors<I>(&mut self, errors: I)
where
I: IntoIterator<Item = (K, E)>,
{
for (key, error) in errors.into_iter() {
self.insert_once_err(key, error);
}
}
pub fn iter(&self) -> Iter<'_, K, V, E> {
Iter {
ok_results_iter: self.ok_results.iter(),
errors_iter: self.errors.iter(),
}
}
}
pub struct Iter<'a, K, V, E> {
ok_results_iter: btree_map::Iter<'a, K, V>,
errors_iter: btree_map::Iter<'a, K, E>,
}
impl<'a, K, V, E> Iterator for Iter<'a, K, V, E> {
type Item = (&'a K, Result<&'a V, &'a E>);
fn next(&mut self) -> Option<Self::Item> {
self.ok_results_iter
.next()
.map(|(k, v)| (k, Ok(v)))
.or_else(|| self.errors_iter.next().map(|(k, e)| (k, Err(e))))
}
}
impl<'a, K, V, E> FusedIterator for Iter<'a, K, V, E> {}
pub struct IntoIter<K, V, E> {
ok_results_iter: BTreeMapIntoIter<K, V>,
errors_iter: BTreeMapIntoIter<K, E>,
}
impl<K, V, E> Iterator for IntoIter<K, V, E> {
type Item = (K, Result<V, E>);
fn next(&mut self) -> Option<Self::Item> {
self.ok_results_iter
.next()
.map(|(k, v)| (k, Ok(v)))
.or_else(|| self.errors_iter.next().map(|(k, e)| (k, Err(e))))
}
}
impl<K, V, E> FusedIterator for IntoIter<K, V, E> {}
impl<K, V, E> IntoIterator for MultiResults<K, V, E> {
type Item = (K, Result<V, E>);
type IntoIter = IntoIter<K, V, E>;
fn into_iter(self) -> Self::IntoIter {
Self::IntoIter {
ok_results_iter: self.ok_results.into_iter(),
errors_iter: self.errors.into_iter(),
}
}
}