use std::rc::Rc;
use std::cell::RefCell;
use progress::Timestamp;
use progress::ChangeBatch;
use progress::frontier::MutableAntichain;
use dataflow::channels::pullers::Counter as PullCounter;
use dataflow::channels::pushers::Counter as PushCounter;
use dataflow::channels::pushers::buffer::{Buffer, Session};
use dataflow::channels::Content;
use timely_communication::{Push, Pull};
use logging::Logger;
use dataflow::operators::Capability;
use dataflow::operators::capability::mint as mint_capability;
pub struct InputHandle<T: Timestamp, D, P: Pull<(T, Content<D>)>> {
pull_counter: PullCounter<T, D, P>,
internal: Rc<RefCell<ChangeBatch<T>>>,
logging: Logger,
}
pub struct FrontieredInputHandle<'a, T: Timestamp, D: 'a, P: Pull<(T, Content<D>)>+'a> {
pub handle: &'a mut InputHandle<T, D, P>,
pub frontier: &'a MutableAntichain<T>,
}
impl<'a, T: Timestamp, D, P: Pull<(T, Content<D>)>> InputHandle<T, D, P> {
#[inline(always)]
pub fn next(&mut self) -> Option<(Capability<T>, &mut Content<D>)> {
let internal = &mut self.internal;
self.pull_counter.next().map(|(time, content)| {
(mint_capability(time.clone(), internal.clone()), content)
})
}
#[inline]
pub fn for_each<F: FnMut(Capability<T>, &mut Content<D>)>(&mut self, mut logic: F) {
let logging = self.logging.clone();
while let Some((cap, data)) = self.next() {
logging.when_enabled(|l| l.log(::logging::TimelyEvent::GuardedMessage(
::logging::GuardedMessageEvent { is_start: true })));
logic(cap, data);
logging.when_enabled(|l| l.log(::logging::TimelyEvent::GuardedMessage(
::logging::GuardedMessageEvent { is_start: false })));
}
}
}
impl<'a, T: Timestamp, D, P: Pull<(T, Content<D>)>+'a> FrontieredInputHandle<'a, T, D, P> {
#[inline(always)]
pub fn next(&mut self) -> Option<(Capability<T>, &mut Content<D>)> {
self.handle.next()
}
#[inline]
pub fn for_each<F: FnMut(Capability<T>, &mut Content<D>)>(&mut self, logic: F) {
self.handle.for_each(logic)
}
#[inline(always)]
pub fn frontier(&self) -> &'a MutableAntichain<T> {
self.frontier
}
}
pub fn _access_pull_counter<T: Timestamp, D, P: Pull<(T, Content<D>)>>(input: &mut InputHandle<T, D, P>) -> &mut PullCounter<T, D, P> {
&mut input.pull_counter
}
pub fn new_input_handle<T: Timestamp, D, P: Pull<(T, Content<D>)>>(pull_counter: PullCounter<T, D, P>, internal: Rc<RefCell<ChangeBatch<T>>>, logging: Logger) -> InputHandle<T, D, P> {
InputHandle {
pull_counter: pull_counter,
internal: internal,
logging: logging,
}
}
pub fn new_frontier_input_handle<'a, T: Timestamp, D: 'a, P: Pull<(T, Content<D>)>+'a>(input_handle: &'a mut InputHandle<T, D, P>, frontier: &'a MutableAntichain<T>) -> FrontieredInputHandle<'a, T, D, P> {
FrontieredInputHandle {
handle: input_handle,
frontier: frontier,
}
}
pub struct OutputWrapper<T: Timestamp, D, P: Push<(T, Content<D>)>> {
push_buffer: Buffer<T, D, PushCounter<T, D, P>>
}
impl<T: Timestamp, D, P: Push<(T, Content<D>)>> OutputWrapper<T, D, P> {
pub fn new(buffer: Buffer<T, D, PushCounter<T, D, P>>) -> Self {
OutputWrapper {
push_buffer: buffer
}
}
pub fn activate(&mut self) -> OutputHandle<T, D, P> {
OutputHandle {
push_buffer: &mut self.push_buffer
}
}
}
pub struct OutputHandle<'a, T: Timestamp, D: 'a, P: Push<(T, Content<D>)>+'a> {
push_buffer: &'a mut Buffer<T, D, PushCounter<T, D, P>>,
}
impl<'a, T: Timestamp, D, P: Push<(T, Content<D>)>> OutputHandle<'a, T, D, P> {
pub fn session<'b>(&'b mut self, cap: &'b Capability<T>) -> Session<'b, T, D, PushCounter<T, D, P>> where 'a: 'b {
self.push_buffer.session(cap)
}
}
impl<'a, T: Timestamp, D, P: Push<(T, Content<D>)>> Drop for OutputHandle<'a, T, D, P> {
fn drop(&mut self) {
self.push_buffer.cease();
}
}