use std::cell::RefCell;
use std::collections::{HashMap, HashSet};
use std::hash::Hash;
use std::rc::Rc;
use std::time::{Duration, Instant};
use timely::communication::Allocate;
use timely::dataflow::operators::capture::event::link::EventLink;
use timely::dataflow::{ProbeHandle, Scope};
use timely::logging::{BatchLogger, TimelyEvent};
use timely::progress::Timestamp;
use timely::worker::Worker;
use differential_dataflow::collection::Collection;
use differential_dataflow::lattice::Lattice;
use differential_dataflow::logging::DifferentialEvent;
use crate::domain::Domain;
use crate::logging::DeclarativeEvent;
use crate::plan::ImplContext;
use crate::sinks::Sink;
use crate::sources::{Source, Sourceable, SourcingContext};
use crate::Rule;
use crate::{implement, implement_neu, AttributeConfig, RelationHandle, ShutdownHandle};
use crate::{Aid, Error, Rewind, Time, TxData, Value};
use crate::{TraceKeyHandle, TraceValHandle};
pub mod scheduler;
use self::scheduler::Scheduler;
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Configuration {
pub tick: Option<Duration>,
pub manual_advance: bool,
pub enable_logging: bool,
pub enable_optimizer: bool,
}
impl Default for Configuration {
fn default() -> Self {
Configuration {
tick: None,
manual_advance: false,
enable_logging: false,
enable_optimizer: false,
}
}
}
#[cfg(feature = "getopts")]
impl Configuration {
pub fn options() -> getopts::Options {
let mut opts = getopts::Options::new();
opts.optopt(
"",
"tick",
"advance domain at a regular interval",
"SECONDS",
);
opts.optflag(
"",
"manual-advance",
"forces clients to call AdvanceDomain explicitely",
);
opts.optflag("", "enable-logging", "enable log event sources");
opts.optflag("", "enable-optimizer", "enable WCO queries");
opts.optflag("", "enable-meta", "enable queries on the query graph");
opts
}
pub fn from_args<I: Iterator<Item = String>>(args: I) -> Result<Self, String> {
let default: Self = Default::default();
let opts = Self::options();
let matches = opts.parse(args)?;
let tick: Option<Duration> = matches
.opt_str("tick")
.map(|x| Duration::from_secs(x.parse().expect("failed to parse tick duration")));
Self {
tick,
manual_advance: matches.opt_present("manual-advance"),
enable_logging: matches.opt_present("enable-logging"),
enable_optimizer: matches.opt_present("enable-optimizer"),
}
}
}
pub type TxId = u64;
#[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
pub struct Interest {
pub name: String,
pub granularity: Option<Time>,
pub sink: Option<Sink>,
pub disable_logging: Option<bool>,
}
impl std::convert::From<&Interest> for crate::sinks::SinkingContext {
fn from(interest: &Interest) -> Self {
Self {
name: interest.name.clone(),
granularity: interest.granularity.clone(),
}
}
}
#[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
pub struct Register {
pub rules: Vec<Rule>,
pub publish: Vec<String>,
}
#[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
pub struct CreateAttribute {
pub name: String,
pub config: AttributeConfig,
}
#[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
pub enum Request {
Transact(Vec<TxData>),
Interest(Interest),
Uninterest(String),
Register(Register),
RegisterSource(Source),
CreateAttribute(CreateAttribute),
AdvanceDomain(Option<String>, Time),
Tick,
CloseInput(String),
Disconnect,
Setup,
Status,
Shutdown,
}
pub struct Server<T, Token>
where
T: Timestamp + Lattice,
Token: Hash + Eq + Copy,
{
pub config: Configuration,
pub t0: Instant,
pub context: Context<T>,
pub interests: HashMap<String, HashSet<Token>>,
shutdown_handles: HashMap<String, ShutdownHandle>,
pub probe: ProbeHandle<T>,
pub scheduler: Rc<RefCell<Scheduler>>,
timely_events: Option<Rc<EventLink<Duration, (Duration, usize, TimelyEvent)>>>,
differential_events: Option<Rc<EventLink<Duration, (Duration, usize, DifferentialEvent)>>>,
}
pub struct Context<T>
where
T: Timestamp + Lattice,
{
pub rules: HashMap<Aid, Rule>,
pub underconstrained: HashSet<Aid>,
pub internal: Domain<T>,
}
impl<T> ImplContext<T> for Context<T>
where
T: Timestamp + Lattice,
{
fn rule(&self, name: &str) -> Option<&Rule> {
self.rules.get(name)
}
fn global_arrangement(&mut self, name: &str) -> Option<&mut RelationHandle<T>> {
self.internal.arrangements.get_mut(name)
}
fn has_attribute(&self, name: &str) -> bool {
self.internal.attributes.contains_key(name)
}
fn forward_count(&mut self, name: &str) -> Option<&mut TraceKeyHandle<Value, T, isize>> {
self.internal.forward_count.get_mut(name)
}
fn forward_propose(
&mut self,
name: &str,
) -> Option<&mut TraceValHandle<Value, Value, T, isize>> {
self.internal.forward_propose.get_mut(name)
}
fn forward_validate(
&mut self,
name: &str,
) -> Option<&mut TraceKeyHandle<(Value, Value), T, isize>> {
self.internal.forward_validate.get_mut(name)
}
fn reverse_count(&mut self, name: &str) -> Option<&mut TraceKeyHandle<Value, T, isize>> {
self.internal.reverse_count.get_mut(name)
}
fn reverse_propose(
&mut self,
name: &str,
) -> Option<&mut TraceValHandle<Value, Value, T, isize>> {
self.internal.reverse_propose.get_mut(name)
}
fn reverse_validate(
&mut self,
name: &str,
) -> Option<&mut TraceKeyHandle<(Value, Value), T, isize>> {
self.internal.reverse_validate.get_mut(name)
}
fn is_underconstrained(&self, _name: &str) -> bool {
true
}
}
impl<T, Token> Server<T, Token>
where
T: Timestamp + Lattice + Default + Rewind,
Token: Hash + Eq + Copy,
{
pub fn new(config: Configuration) -> Self {
Server::new_at(config, Instant::now())
}
pub fn new_at(config: Configuration, t0: Instant) -> Self {
let timely_events = Some(Rc::new(EventLink::new()));
let differential_events = Some(Rc::new(EventLink::new()));
Server {
config,
t0,
context: Context {
rules: HashMap::new(),
internal: Domain::new(Default::default()),
underconstrained: HashSet::new(),
},
interests: HashMap::new(),
shutdown_handles: HashMap::new(),
probe: ProbeHandle::new(),
scheduler: Rc::new(RefCell::new(Scheduler::new())),
timely_events,
differential_events,
}
}
pub fn builtins() -> Vec<Request> {
vec![
]
}
fn shutdown_query(&mut self, name: &str) {
info!("Shutting down {}", name);
self.shutdown_handles.remove(name);
}
pub fn transact(
&mut self,
tx_data: Vec<TxData>,
owner: usize,
worker_index: usize,
) -> Result<(), Error> {
if owner == worker_index {
self.context.internal.transact(tx_data)
} else {
Ok(())
}
}
pub fn interest<S: Scope<Timestamp = T>>(
&mut self,
name: &str,
scope: &mut S,
) -> Result<Collection<S, Vec<Value>, isize>, Error> {
if self.context.internal.arrangements.contains_key(name) {
let relation = self
.context
.global_arrangement(name)
.unwrap()
.import_named(scope, name)
.as_collection(|tuple, _| tuple.clone());
Ok(relation)
} else {
let (mut rel_map, shutdown_handle) = if self.config.enable_optimizer {
implement_neu(name, scope, &mut self.context)?
} else {
implement(name, scope, &mut self.context)?
};
match rel_map.remove(name) {
None => Err(Error::fault(format!(
"Relation of interest ({}) wasn't actually implemented.",
name
))),
Some(relation) => {
self.shutdown_handles
.insert(name.to_string(), shutdown_handle);
Ok(relation)
}
}
}
}
pub fn register(&mut self, req: Register) -> Result<(), Error> {
let Register { rules, .. } = req;
for rule in rules.into_iter() {
if self.context.rules.contains_key(&rule.name) {
continue;
} else {
self.context.rules.insert(rule.name.to_string(), rule);
}
}
Ok(())
}
pub fn register_source<S: Scope<Timestamp = T>>(
&mut self,
source: Box<dyn Sourceable<S>>,
scope: &mut S,
) -> Result<(), Error> {
let context = SourcingContext {
t0: self.t0,
scheduler: Rc::downgrade(&self.scheduler),
domain_probe: self.context.internal.domain_probe().clone(),
timely_events: self.timely_events.clone().unwrap(),
differential_events: self.differential_events.clone().unwrap(),
};
let mut attribute_streams = source.source(scope, context);
for (aid, config, datoms) in attribute_streams.drain(..) {
self.context
.internal
.create_sourced_attribute(&aid, config, &datoms)?;
}
Ok(())
}
pub fn advance_domain(&mut self, name: Option<String>, next: T) -> Result<(), Error> {
match name {
None => self.context.internal.advance_epoch(next),
Some(_) => Err(Error::unsupported("Named domains are not yet supported.")),
}
}
pub fn uninterest(&mut self, client: Token, name: &str) -> Result<(), Error> {
if let Some(entry) = self.interests.get_mut(name) {
entry.remove(&client);
if entry.is_empty() {
self.shutdown_query(name);
self.interests.remove(name);
}
}
Ok(())
}
pub fn disconnect_client(&mut self, client: Token) -> Result<(), Error> {
let names: Vec<String> = self.interests.keys().cloned().collect();
for query_name in names.iter() {
self.uninterest(client, query_name)?
}
Ok(())
}
pub fn is_any_outdated(&self) -> bool {
self.probe
.with_frontier(|out_frontier| self.context.internal.dominates(out_frontier))
}
pub fn test_single<S: Scope<Timestamp = T>>(
&mut self,
scope: &mut S,
rule: Rule,
) -> Collection<S, Vec<Value>, isize> {
let interest_name = rule.name.clone();
let publish_name = rule.name.clone();
self.register(Register {
rules: vec![rule],
publish: vec![publish_name],
})
.unwrap();
match self.interest(&interest_name, scope) {
Err(error) => panic!("{:?}", error),
Ok(relation) => relation.probe_with(&mut self.probe),
}
}
}
impl<Token> Server<Duration, Token>
where
Token: Hash + Eq + Copy,
{
pub fn enable_logging<A: Allocate>(&self, worker: &mut Worker<A>) -> Result<(), Error> {
let mut timely_logger = BatchLogger::new(self.timely_events.clone().unwrap());
worker
.log_register()
.insert::<TimelyEvent, _>("timely", move |time, data| {
timely_logger.publish_batch(time, data)
});
let mut differential_logger = BatchLogger::new(self.differential_events.clone().unwrap());
worker
.log_register()
.insert::<DifferentialEvent, _>("differential/arrange", move |time, data| {
differential_logger.publish_batch(time, data)
});
Ok(())
}
pub fn shutdown_logging<A: Allocate>(&self, worker: &mut Worker<A>) -> Result<(), Error> {
worker
.log_register()
.insert::<TimelyEvent, _>("timely", move |_time, _data| {});
worker
.log_register()
.insert::<DifferentialEvent, _>("differential/arrange", move |_time, _data| {});
worker
.log_register()
.insert::<DeclarativeEvent, _>("declarative", move |_time, _data| {});
Ok(())
}
}