use super::{Batch, Disposition};
use crate::packets::Packet;
use std::cell::Cell;
use std::collections::{HashMap, VecDeque};
use std::hash::Hash;
use std::rc::Rc;
#[allow(missing_debug_implementations)]
#[derive(Default)]
pub struct Bridge<T: Packet>(Rc<Cell<Option<T>>>);
impl<T: Packet> Bridge<T> {
pub fn new() -> Self {
Bridge(Rc::new(Cell::new(None)))
}
pub fn set(&self, pkt: T) {
self.0.set(Some(pkt));
}
}
impl<T: Packet> Clone for Bridge<T> {
fn clone(&self) -> Self {
Bridge(Rc::clone(&self.0))
}
}
impl<T: Packet> Batch for Bridge<T> {
type Item = T;
fn replenish(&mut self) {
}
fn next(&mut self) -> Option<Disposition<Self::Item>> {
self.0.take().map(Disposition::Act)
}
}
pub type GroupByBatchBuilder<T> = dyn FnOnce(Bridge<T>) -> Box<dyn Batch<Item = T>>;
#[allow(missing_debug_implementations)]
pub struct GroupBy<B: Batch, D, S>
where
D: Eq + Clone + Hash,
S: Fn(&B::Item) -> D,
{
batch: B,
selector: S,
bridge: Bridge<B::Item>,
groups: HashMap<D, Box<dyn Batch<Item = B::Item>>>,
catchall: Box<dyn Batch<Item = B::Item>>,
fanouts: VecDeque<Disposition<B::Item>>,
}
impl<B: Batch, D, S> GroupBy<B, D, S>
where
D: Eq + Clone + Hash,
S: Fn(&B::Item) -> D,
{
#[inline]
pub fn new<C>(batch: B, selector: S, composer: C) -> Self
where
C: FnOnce(&mut HashMap<Option<D>, Box<GroupByBatchBuilder<B::Item>>>),
{
let mut builders = HashMap::new();
composer(&mut builders);
let bridge = Bridge::new();
let catchall = builders.remove(&None).unwrap()(bridge.clone());
let groups = builders
.into_iter()
.map(|(key, build)| {
let key = key.unwrap();
let group = build(bridge.clone());
(key, group)
})
.collect::<HashMap<_, _>>();
GroupBy {
batch,
selector,
bridge,
groups,
catchall,
fanouts: VecDeque::new(),
}
}
}
impl<B: Batch, D, S> Batch for GroupBy<B, D, S>
where
D: Eq + Clone + Hash,
S: Fn(&B::Item) -> D,
{
type Item = B::Item;
#[inline]
fn replenish(&mut self) {
self.batch.replenish();
}
#[inline]
fn next(&mut self) -> Option<Disposition<Self::Item>> {
if let Some(disp) = self.fanouts.pop_front() {
Some(disp)
} else {
self.batch.next().map(|disp| {
disp.map(|pkt| {
let key = (self.selector)(&pkt);
self.bridge.set(pkt);
let batch = match self.groups.get_mut(&key) {
Some(group) => group,
None => &mut self.catchall,
};
while let Some(next) = batch.next() {
self.fanouts.push_back(next)
}
self.fanouts.pop_front().unwrap()
})
})
}
}
}
#[doc(hidden)]
#[macro_export]
macro_rules! __compose {
($map:ident, $($key:expr => |$arg:tt| $body:block),*) => {{
$(
$map.insert(Some($key), Box::new(|$arg| Box::new($body)));
)*
}};
}
#[macro_export]
macro_rules! compose {
($map:ident { $($key:expr => |$arg:tt| $body:block)+ }) => {{
$crate::__compose!($map, $($key => |$arg| $body),*);
$map.insert(None, Box::new(|group| Box::new(group)));
}};
($map:ident { $($key:expr => |$arg:tt| $body:block)+ _ => |$_arg:tt| $_body:block }) => {{
$crate::__compose!($map, $($key => |$arg| $body),*);
$map.insert(None, Box::new(|$_arg| Box::new($_body)));
}};
($map:ident { $($key:expr),+ => |$arg:tt| $body:block }) => {{
$crate::compose!($map { $($key => |$arg| $body)+ });
}};
($map:ident { $($key:expr),+ => |$arg:tt| $body:block _ => |$_arg:tt| $_body:block }) => {{
$crate::compose!($map { $($key => |$arg| $body)+ _ => |$_arg| $_body });
}};
($map:ident { _ => |$_arg:tt| $_body:block }) => {{
$map.insert(None, Box::new(|$_arg| Box::new($_body)));
}};
}