use crossbeam_channel::{Receiver, Sender, TryRecvError};
use differential_dataflow::{
difference::{Abelian, Semigroup},
lattice::Lattice,
operators::Threshold,
Collection, Data,
};
use std::{collections::HashMap, fmt::Debug, hash::Hash, mem, num::NonZeroUsize};
use timely::{
dataflow::{
operators::capture::{Event, EventPusher, Extract},
Scope,
},
progress::ChangeBatch,
};
use crate::dataflow::utils::{Diff, XXHasher};
pub trait OperatorExt<G, D, R> {
fn distinct_named<N: AsRef<str>, R2: Abelian + From<i8>>(&self, name: N) -> Collection<G, D, R2>
where
Self: Threshold<G, D, R>,
G: Scope,
G::Timestamp: Lattice,
D: Data,
R: Semigroup,
{
self.threshold_named(name.as_ref(), |_, _| R2::from(1i8))
}
}
impl<G: Scope, D: Data, R: Semigroup> OperatorExt<G, D, R> for Collection<G, D, R> {}
#[derive(Debug, Clone)]
pub struct CrossbeamPusher<T>(Sender<T>);
impl<T> CrossbeamPusher<T> {
pub const fn new(sender: Sender<T>) -> Self {
Self(sender)
}
}
impl<T, D> EventPusher<T, D> for CrossbeamPusher<Event<T, D>> {
fn push(&mut self, event: Event<T, D>) {
let _ = self.0.send(event);
}
}
#[derive(Debug, Clone)]
pub struct CrossbeamExtractor<T>(Receiver<T>);
impl<T> CrossbeamExtractor<T> {
pub const fn new(receiver: Receiver<T>) -> Self {
Self(receiver)
}
}
impl<T, D> CrossbeamExtractor<Event<T, (D, T, Diff)>>
where
T: Debug + Ord + Hash + Clone,
D: Debug + Ord + Hash + Clone,
{
pub fn extract_with_fuel(
&self,
fuel: &mut Fuel,
sink: &mut HashMap<D, Diff, XXHasher>,
consumed: &mut ChangeBatch<T>,
) -> bool {
while !fuel.is_exhausted() {
fuel.exert(1);
match self.0.try_recv() {
Ok(Event::Messages(time, mut data)) => {
tracing::trace!(
target: "extracted_data",
time = ?time,
data = ?data,
"extracted {} events",
data.len(),
);
let complexity =
data.len() * ((64 - data.len().leading_zeros() as u64) as usize / 3);
fuel.exert(complexity);
data.sort_unstable_by_key(|(_, time, _)| time.clone());
for (data, _time, diff) in data {
sink.entry(data).and_modify(|d| *d += &diff).or_insert(diff);
}
}
Ok(Event::Progress(progress)) => consumed.extend(progress.into_iter()),
Err(TryRecvError::Empty) => break,
Err(TryRecvError::Disconnected) => return true,
}
}
if !fuel.is_exhausted() {
fuel.exert(sink.len());
sink.retain(|_, diff| !diff.is_zero());
if sink.capacity() > sink.len() * 2 {
sink.shrink_to_fit();
}
}
false
}
#[allow(dead_code)]
pub fn extract_all(self) -> Vec<D> {
let mut data = HashMap::with_hasher(XXHasher::default());
for (event, _time, diff) in self.extract().into_iter().flat_map(|(_, data)| data) {
data.entry(event)
.and_modify(|d| *d += &diff)
.or_insert(diff);
}
data.into_iter()
.filter_map(|(data, diff)| if !diff.is_zero() { Some(data) } else { None })
.collect()
}
}
impl<T: Ord, D: Ord> Extract<T, D> for CrossbeamExtractor<Event<T, D>> {
fn extract(self) -> Vec<(T, Vec<D>)> {
let mut result = Vec::new();
for event in self.0.try_iter() {
if let Event::Messages(time, data) = event {
result.push((time, data));
}
}
result.sort_by(|(ts1, _), (ts2, _)| ts1.cmp(ts2));
let mut current = 0;
for i in 1..result.len() {
if result[current].0 == result[i].0 {
let data = mem::take(&mut result[i].1);
result[current].1.extend(data);
} else {
current = i;
}
}
result.retain(|(_, data)| !data.is_empty());
for &mut (_, ref mut data) in &mut result {
data.sort();
}
result
}
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum Fuel {
Unlimited,
Limited { fuel: usize, default: NonZeroUsize },
}
impl Fuel {
pub const fn limited(fuel: NonZeroUsize) -> Self {
Self::Limited {
fuel: fuel.get(),
default: fuel,
}
}
pub const fn unlimited() -> Self {
Self::Unlimited
}
pub const fn is_unlimited(&self) -> bool {
matches!(self, Self::Unlimited)
}
pub fn exert(&mut self, effort: usize) -> bool {
if let Self::Limited { fuel, .. } = self {
*fuel = fuel.saturating_sub(effort);
*fuel == 0
} else {
false
}
}
pub const fn is_exhausted(&self) -> bool {
match *self {
Self::Unlimited => false,
Self::Limited { fuel, .. } => fuel == 0,
}
}
pub fn reset(&mut self) {
if let Self::Limited { fuel, default } = self {
*fuel = default.get();
}
}
pub const fn remaining(&self) -> Option<usize> {
if let Self::Limited { fuel, default } = *self {
Some(default.get() - fuel)
} else {
None
}
}
pub const fn used(&self) -> Option<usize> {
if let Self::Limited { fuel, default } = *self {
Some(default.get() - (default.get() - fuel))
} else {
None
}
}
}
macro_rules! located {
($name:expr, $caller:expr) => {{
let caller: &::core::panic::Location = $caller;
::std::format!(
"{} @ {}:{}:{}",
$name,
caller.file(),
caller.line(),
caller.column(),
)
}};
($name:expr) => {{
let caller = ::core::panic::Location::caller();
::std::format!(
"{} @ {}:{}:{}",
$name,
caller.file(),
caller.line(),
caller.column(),
)
}};
}