use std::rc::Rc;
use std::cell::RefCell;
use crate::Data;
use crate::progress::Timestamp;
use crate::progress::ChangeBatch;
use crate::progress::frontier::MutableAntichain;
use crate::dataflow::channels::pullers::Counter as PullCounter;
use crate::dataflow::channels::pushers::Counter as PushCounter;
use crate::dataflow::channels::pushers::buffer::{Buffer, Session};
use crate::dataflow::channels::Bundle;
use crate::communication::{Push, Pull, message::RefOrMut};
use crate::logging::TimelyLogger as Logger;
use crate::dataflow::operators::CapabilityRef;
use crate::dataflow::operators::capability::mint_ref as mint_capability_ref;
use crate::dataflow::operators::capability::CapabilityTrait;
pub struct InputHandle<T: Timestamp, D, P: Pull<Bundle<T, D>>> {
pull_counter: PullCounter<T, D, P>,
internal: Rc<RefCell<Vec<Rc<RefCell<ChangeBatch<T>>>>>>,
logging: Option<Logger>,
}
pub struct FrontieredInputHandle<'a, T: Timestamp, D: 'a, P: Pull<Bundle<T, D>>+'a> {
pub handle: &'a mut InputHandle<T, D, P>,
pub frontier: &'a MutableAntichain<T>,
}
impl<'a, T: Timestamp, D: Data, P: Pull<Bundle<T, D>>> InputHandle<T, D, P> {
#[inline(always)]
pub fn next(&mut self) -> Option<(CapabilityRef<T>, RefOrMut<Vec<D>>)> {
let internal = &self.internal;
self.pull_counter.next().map(|bundle| {
match bundle.as_ref_or_mut() {
RefOrMut::Ref(bundle) => {
(mint_capability_ref(&bundle.time, internal.clone()), RefOrMut::Ref(&bundle.data))
},
RefOrMut::Mut(bundle) => {
(mint_capability_ref(&bundle.time, internal.clone()), RefOrMut::Mut(&mut bundle.data))
},
}
})
}
#[inline]
pub fn for_each<F: FnMut(CapabilityRef<T>, RefOrMut<Vec<D>>)>(&mut self, mut logic: F) {
let mut logging = self.logging.clone();
while let Some((cap, data)) = self.next() {
logging.as_mut().map(|l| l.log(crate::logging::GuardedMessageEvent { is_start: true }));
logic(cap, data);
logging.as_mut().map(|l| l.log(crate::logging::GuardedMessageEvent { is_start: false }));
}
}
}
impl<'a, T: Timestamp, D: Data, P: Pull<Bundle<T, D>>+'a> FrontieredInputHandle<'a, T, D, P> {
pub fn new(handle: &'a mut InputHandle<T, D, P>, frontier: &'a MutableAntichain<T>) -> Self {
FrontieredInputHandle {
handle,
frontier,
}
}
#[inline(always)]
pub fn next(&mut self) -> Option<(CapabilityRef<T>, RefOrMut<Vec<D>>)> {
self.handle.next()
}
#[inline]
pub fn for_each<F: FnMut(CapabilityRef<T>, RefOrMut<Vec<D>>)>(&mut self, logic: F) {
self.handle.for_each(logic)
}
#[inline(always)]
pub fn frontier(&self) -> &'a MutableAntichain<T> {
self.frontier
}
}
pub fn _access_pull_counter<T: Timestamp, D, P: Pull<Bundle<T, D>>>(input: &mut InputHandle<T, D, P>) -> &mut PullCounter<T, D, P> {
&mut input.pull_counter
}
pub fn new_input_handle<T: Timestamp, D, P: Pull<Bundle<T, D>>>(pull_counter: PullCounter<T, D, P>, internal: Rc<RefCell<Vec<Rc<RefCell<ChangeBatch<T>>>>>>, logging: Option<Logger>) -> InputHandle<T, D, P> {
InputHandle {
pull_counter,
internal,
logging,
}
}
pub struct OutputWrapper<T: Timestamp, D, P: Push<Bundle<T, D>>> {
push_buffer: Buffer<T, D, PushCounter<T, D, P>>,
internal_buffer: Rc<RefCell<ChangeBatch<T>>>,
}
impl<T: Timestamp, D, P: Push<Bundle<T, D>>> OutputWrapper<T, D, P> {
pub fn new(push_buffer: Buffer<T, D, PushCounter<T, D, P>>, internal_buffer: Rc<RefCell<ChangeBatch<T>>>) -> Self {
OutputWrapper {
push_buffer,
internal_buffer,
}
}
pub fn activate(&mut self) -> OutputHandle<T, D, P> {
OutputHandle {
push_buffer: &mut self.push_buffer,
internal_buffer: &self.internal_buffer,
}
}
}
pub struct OutputHandle<'a, T: Timestamp, D: 'a, P: Push<Bundle<T, D>>+'a> {
push_buffer: &'a mut Buffer<T, D, PushCounter<T, D, P>>,
internal_buffer: &'a Rc<RefCell<ChangeBatch<T>>>,
}
impl<'a, T: Timestamp, D, P: Push<Bundle<T, D>>> OutputHandle<'a, T, D, P> {
pub fn session<'b, C: CapabilityTrait<T>>(&'b mut self, cap: &'b C) -> Session<'b, T, D, PushCounter<T, D, P>> where 'a: 'b {
assert!(cap.valid_for_output(&self.internal_buffer), "Attempted to open output session with invalid capability");
self.push_buffer.session(cap.time())
}
}
impl<'a, T: Timestamp, D, P: Push<Bundle<T, D>>> Drop for OutputHandle<'a, T, D, P> {
fn drop(&mut self) {
self.push_buffer.cease();
}
}