use std::rc::Rc;
use std::cell::RefCell;
use crate::scheduling::Schedule;
use crate::progress::{Timestamp, ChangeBatch, Antichain};
pub trait Operate<T: Timestamp> {
fn local(&self) -> bool { true }
fn inputs(&self) -> usize;
fn outputs(&self) -> usize;
fn initialize(self: Box<Self>) -> (Connectivity<T::Summary>, Rc<RefCell<SharedProgress<T>>>, Box<dyn Schedule>);
fn notify_me(&self) -> &[FrontierInterest];}
#[derive(Ord, PartialOrd, Eq, PartialEq, Copy, Clone, Debug)]
pub enum FrontierInterest {
Never,
IfCapability,
Always,
}
pub type Connectivity<TS> = Vec<PortConnectivity<TS>>;
#[derive(serde::Serialize, serde::Deserialize, columnar::Columnar, Debug, Clone, Eq, PartialEq)]
pub struct PortConnectivity<TS> {
tree: std::collections::BTreeMap<usize, Antichain<TS>>,
}
impl<TS> Default for PortConnectivity<TS> {
fn default() -> Self {
Self { tree: std::collections::BTreeMap::new() }
}
}
impl<TS> PortConnectivity<TS> {
pub fn insert(&mut self, index: usize, element: TS) -> bool where TS : crate::PartialOrder {
self.tree.entry(index).or_default().insert(element)
}
pub fn insert_ref(&mut self, index: usize, element: &TS) -> bool where TS : crate::PartialOrder + Clone {
self.tree.entry(index).or_default().insert_ref(element)
}
pub fn add_port(&mut self, port: usize, summary: Antichain<TS>) {
if !summary.is_empty() {
let prior = self.tree.insert(port, summary);
assert!(prior.is_none());
}
else {
assert!(self.tree.remove(&port).is_none());
}
}
pub fn iter_ports(&self) -> impl Iterator<Item = (usize, &Antichain<TS>)> {
self.tree.iter().map(|(o,p)| (*o, p))
}
pub fn get(&self, index: usize) -> Option<&Antichain<TS>> {
self.tree.get(&index)
}
}
impl<TS> FromIterator<(usize, Antichain<TS>)> for PortConnectivity<TS> {
fn from_iter<T>(iter: T) -> Self where T: IntoIterator<Item = (usize, Antichain<TS>)> {
Self { tree: iter.into_iter().filter(|(_,p)| !p.is_empty()).collect() }
}
}
#[derive(Debug)]
pub struct SharedProgress<T: Timestamp> {
pub frontiers: Vec<ChangeBatch<T>>,
pub consumeds: Vec<ChangeBatch<T>>,
pub internals: Vec<ChangeBatch<T>>,
pub produceds: Vec<ChangeBatch<T>>,
}
impl<T: Timestamp> SharedProgress<T> {
pub fn new(inputs: usize, outputs: usize) -> Self {
SharedProgress {
frontiers: vec![ChangeBatch::new(); inputs],
consumeds: vec![ChangeBatch::new(); inputs],
internals: vec![ChangeBatch::new(); outputs],
produceds: vec![ChangeBatch::new(); outputs],
}
}
}