use std::cell::RefCell;
use std::collections::{HashMap, HashSet};
use std::hash::Hash;
use std::ops::Sub;
use std::rc::Rc;
use std::time::Instant;
use timely::dataflow::{ProbeHandle, Scope};
use timely::order::TotalOrder;
use timely::progress::Timestamp;
use differential_dataflow::collection::Collection;
use differential_dataflow::lattice::Lattice;
use crate::domain::Domain;
use crate::plan::{ImplContext, Implementable};
use crate::sinks::Sink;
use crate::sources::{Source, Sourceable};
use crate::Rule;
use crate::{
implement, implement_neu, AttributeConfig, CollectionIndex, RelationHandle, ShutdownHandle,
};
use crate::{Aid, Error, Time, TxData, Value};
pub mod scheduler;
use self::scheduler::Scheduler;
#[derive(Clone, Debug)]
pub struct Config {
pub port: u16,
pub manual_advance: bool,
pub enable_cli: bool,
pub enable_optimizer: bool,
pub enable_meta: bool,
}
impl Default for Config {
fn default() -> Config {
Config {
port: 6262,
manual_advance: false,
enable_cli: false,
enable_optimizer: false,
enable_meta: false,
}
}
}
pub type TxId = u64;
#[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
pub struct Interest {
pub name: String,
pub tenant: Option<usize>,
pub granularity: Option<u64>,
pub sink: Option<Sink>,
}
#[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
pub struct Register {
pub rules: Vec<Rule>,
pub publish: Vec<String>,
}
#[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
pub struct CreateAttribute {
pub name: String,
pub config: AttributeConfig,
}
#[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
pub enum Request {
Transact(Vec<TxData>),
Interest(Interest),
Uninterest(String),
Register(Register),
RegisterSource(Source),
CreateAttribute(CreateAttribute),
AdvanceDomain(Option<String>, Time),
CloseInput(String),
Disconnect,
Shutdown,
}
pub struct Server<T, Token>
where
T: Timestamp + Lattice + TotalOrder,
Token: Hash + Eq + Copy,
{
pub config: Config,
pub t0: Instant,
pub context: Context<T>,
pub interests: HashMap<String, HashSet<Token>>,
pub tenant_owner: Rc<RefCell<HashMap<Token, u64>>>,
shutdown_handles: HashMap<String, ShutdownHandle>,
pub probe: ProbeHandle<T>,
pub scheduler: Rc<RefCell<Scheduler>>,
}
pub struct Context<T>
where
T: Timestamp + Lattice + TotalOrder,
{
pub rules: HashMap<Aid, Rule>,
pub underconstrained: HashSet<Aid>,
pub internal: Domain<T>,
}
impl<T> ImplContext<T> for Context<T>
where
T: Timestamp + Lattice + TotalOrder,
{
fn rule(&self, name: &str) -> Option<&Rule> {
self.rules.get(name)
}
fn global_arrangement(&mut self, name: &str) -> Option<&mut RelationHandle<T>> {
self.internal.arrangements.get_mut(name)
}
fn has_attribute(&self, name: &str) -> bool {
self.internal.forward.contains_key(name)
}
fn forward_index(&mut self, name: &str) -> Option<&mut CollectionIndex<Value, Value, T>> {
self.internal.forward.get_mut(name)
}
fn reverse_index(&mut self, name: &str) -> Option<&mut CollectionIndex<Value, Value, T>> {
self.internal.reverse.get_mut(name)
}
fn is_underconstrained(&self, _name: &str) -> bool {
true
}
}
impl<T, Token> Server<T, Token>
where
T: Timestamp + Lattice + TotalOrder + Default + Sub<Output = T> + std::convert::From<Time>,
Token: Hash + Eq + Copy,
{
pub fn new(config: Config) -> Self {
Server::new_at(config, Instant::now())
}
pub fn new_at(config: Config, t0: Instant) -> Self {
Server {
config,
t0,
context: Context {
rules: HashMap::new(),
internal: Domain::new(Default::default()),
underconstrained: HashSet::new(),
},
interests: HashMap::new(),
tenant_owner: Rc::new(RefCell::new(HashMap::new())),
shutdown_handles: HashMap::new(),
probe: ProbeHandle::new(),
scheduler: Rc::new(RefCell::new(Scheduler::new())),
}
}
pub fn builtins() -> Vec<Request> {
vec![
]
}
fn shutdown_query(&mut self, name: &str) {
info!("Shutting down {}", name);
self.shutdown_handles.remove(name);
}
pub fn transact(
&mut self,
tx_data: Vec<TxData>,
owner: usize,
worker_index: usize,
) -> Result<(), Error> {
if owner == worker_index {
self.context.internal.transact(tx_data)
} else {
Ok(())
}
}
pub fn interest<S: Scope<Timestamp = T>>(
&mut self,
name: &str,
scope: &mut S,
) -> Result<Collection<S, Vec<Value>, isize>, Error> {
if self.context.internal.arrangements.contains_key(name) {
let relation = self
.context
.global_arrangement(name)
.unwrap()
.import_named(scope, name)
.as_collection(|tuple, _| tuple.clone());
Ok(relation)
} else {
let (mut rel_map, shutdown_handle) = if self.config.enable_optimizer {
implement_neu(name, scope, &mut self.context)?
} else {
implement(name, scope, &mut self.context)?
};
match rel_map.remove(name) {
None => Err(Error {
category: "df.error.category/fault",
message: format!(
"Relation of interest ({}) wasn't actually implemented.",
name
),
}),
Some(relation) => {
self.shutdown_handles
.insert(name.to_string(), shutdown_handle);
Ok(relation)
}
}
}
}
pub fn register(&mut self, req: Register) -> Result<(), Error> {
let Register { rules, .. } = req;
for rule in rules.into_iter() {
if self.context.rules.contains_key(&rule.name) {
continue;
} else {
if self.config.enable_meta {
let mut data = rule.plan.datafy();
let tx_data: Vec<TxData> =
data.drain(..).map(|(e, a, v)| TxData(1, e, a, v)).collect();
self.transact(tx_data, 0, 0)?;
}
self.context.rules.insert(rule.name.to_string(), rule);
}
}
Ok(())
}
pub fn advance_domain(&mut self, name: Option<String>, next: T) -> Result<(), Error> {
match name {
None => self.context.internal.advance_to(next),
Some(_) => Err(Error {
category: "df.error.category/unsupported",
message: "Named domains are not yet supported.".to_string(),
}),
}
}
pub fn uninterest(&mut self, client: Token, name: &str) {
if let Some(entry) = self.interests.get_mut(name) {
entry.remove(&client);
if entry.is_empty() {
self.shutdown_query(name);
self.interests.remove(name);
}
}
}
pub fn disconnect_client(&mut self, client: Token) {
let names: Vec<String> = self.interests.keys().cloned().collect();
for query_name in names.iter() {
self.uninterest(client, query_name);
}
self.tenant_owner.borrow_mut().remove(&client);
}
pub fn is_any_outdated(&self) -> bool {
if self.probe.less_than(self.context.internal.time()) {
return true;
}
false
}
pub fn test_single<S: Scope<Timestamp = T>>(
&mut self,
scope: &mut S,
rule: Rule,
) -> Collection<S, Vec<Value>, isize> {
let interest_name = rule.name.clone();
let publish_name = rule.name.clone();
self.register(Register {
rules: vec![rule],
publish: vec![publish_name],
})
.unwrap();
match self.interest(&interest_name, scope) {
Err(error) => panic!("{:?}", error),
Ok(relation) => relation.probe_with(&mut self.probe),
}
}
}
impl<Token: Hash + Eq + Copy> Server<u64, Token> {
pub fn register_source<S: Scope<Timestamp = u64>>(
&mut self,
source: Source,
scope: &mut S,
) -> Result<(), Error> {
let mut attribute_streams = source.source(scope, self.t0, Rc::downgrade(&self.scheduler));
for (aid, datoms) in attribute_streams.drain(..) {
self.context.internal.create_source(&aid, &datoms)?;
}
Ok(())
}
}
#[cfg(feature = "real-time")]
impl<Token: Hash + Eq + Copy> Server<std::time::Duration, Token> {
pub fn register_source<S: Scope<Timestamp = std::time::Duration>>(
&mut self,
source: Source,
scope: &mut S,
) -> Result<(), Error> {
let mut attribute_streams = source.source(scope, self.t0, Rc::downgrade(&self.scheduler));
for (aid, datoms) in attribute_streams.drain(..) {
self.context.internal.create_source(&aid, &datoms)?;
}
Ok(())
}
}