1use std::cell::RefCell;
4use std::collections::{HashMap, HashSet};
5use std::hash::Hash;
6use std::rc::Rc;
7use std::time::{Duration, Instant};
8
9use timely::communication::Allocate;
10use timely::dataflow::operators::capture::event::link::EventLink;
11use timely::dataflow::{ProbeHandle, Scope};
12use timely::logging::{BatchLogger, TimelyEvent};
13use timely::progress::Timestamp;
14use timely::worker::Worker;
15
16use differential_dataflow::collection::Collection;
17use differential_dataflow::lattice::Lattice;
18use differential_dataflow::logging::DifferentialEvent;
19
20use crate::domain::Domain;
21use crate::logging::DeclarativeEvent;
22use crate::plan::ImplContext;
23use crate::sinks::Sink;
24use crate::sources::{Source, Sourceable, SourcingContext};
25use crate::Rule;
26use crate::{implement, implement_neu, AttributeConfig, RelationHandle, ShutdownHandle};
27use crate::{Aid, Error, Rewind, Time, TxData, Value};
28use crate::{TraceKeyHandle, TraceValHandle};
29
30pub mod scheduler;
31use self::scheduler::Scheduler;
32
33#[derive(Clone, Debug, Serialize, Deserialize)]
35pub struct Configuration {
36 pub tick: Option<Duration>,
38 pub manual_advance: bool,
40 pub enable_logging: bool,
42 pub enable_optimizer: bool,
44}
45
46impl Default for Configuration {
47 fn default() -> Self {
48 Configuration {
49 tick: None,
50 manual_advance: false,
51 enable_logging: false,
52 enable_optimizer: false,
53 }
54 }
55}
56
57#[cfg(feature = "getopts")]
58impl Configuration {
59 pub fn options() -> getopts::Options {
62 let mut opts = getopts::Options::new();
63
64 opts.optopt(
65 "",
66 "tick",
67 "advance domain at a regular interval",
68 "SECONDS",
69 );
70 opts.optflag(
71 "",
72 "manual-advance",
73 "forces clients to call AdvanceDomain explicitely",
74 );
75 opts.optflag("", "enable-logging", "enable log event sources");
76 opts.optflag("", "enable-optimizer", "enable WCO queries");
77 opts.optflag("", "enable-meta", "enable queries on the query graph");
78
79 opts
80 }
81
82 pub fn from_args<I: Iterator<Item = String>>(args: I) -> Result<Self, String> {
84 let default: Self = Default::default();
85 let opts = Self::options();
86
87 let matches = opts.parse(args)?;
88
89 let tick: Option<Duration> = matches
90 .opt_str("tick")
91 .map(|x| Duration::from_secs(x.parse().expect("failed to parse tick duration")));
92
93 Self {
94 tick,
95 manual_advance: matches.opt_present("manual-advance"),
96 enable_logging: matches.opt_present("enable-logging"),
97 enable_optimizer: matches.opt_present("enable-optimizer"),
98 }
99 }
100}
101
102pub type TxId = u64;
104
105#[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
108pub struct Interest {
109 pub name: String,
111 pub granularity: Option<Time>,
113 pub sink: Option<Sink>,
115 pub disable_logging: Option<bool>,
117}
118
119impl std::convert::From<&Interest> for crate::sinks::SinkingContext {
120 fn from(interest: &Interest) -> Self {
121 Self {
122 name: interest.name.clone(),
123 granularity: interest.granularity.clone(),
124 }
125 }
126}
127
128#[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
131pub struct Register {
132 pub rules: Vec<Rule>,
134 pub publish: Vec<String>,
136}
137
138#[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
141pub struct CreateAttribute {
142 pub name: String,
145 pub config: AttributeConfig,
147}
148
149#[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
151pub enum Request {
152 Transact(Vec<TxData>),
154 Interest(Interest),
156 Uninterest(String),
160 Register(Register),
162 RegisterSource(Source),
165 CreateAttribute(CreateAttribute),
167 AdvanceDomain(Option<String>, Time),
169 Tick,
173 CloseInput(String),
175 Disconnect,
177 Setup,
180 Status,
182 Shutdown,
184}
185
186pub struct Server<T, Token>
189where
190 T: Timestamp + Lattice,
191 Token: Hash + Eq + Copy,
192{
193 pub config: Configuration,
195 pub t0: Instant,
198 pub context: Context<T>,
200 pub interests: HashMap<String, HashSet<Token>>,
202 shutdown_handles: HashMap<String, ShutdownHandle>,
204 pub probe: ProbeHandle<T>,
206 pub scheduler: Rc<RefCell<Scheduler>>,
208 timely_events: Option<Rc<EventLink<Duration, (Duration, usize, TimelyEvent)>>>,
210 differential_events: Option<Rc<EventLink<Duration, (Duration, usize, DifferentialEvent)>>>,
212}
213
214pub struct Context<T>
216where
217 T: Timestamp + Lattice,
218{
219 pub rules: HashMap<Aid, Rule>,
221 pub underconstrained: HashSet<Aid>,
223 pub internal: Domain<T>,
225}
226
227impl<T> ImplContext<T> for Context<T>
228where
229 T: Timestamp + Lattice,
230{
231 fn rule(&self, name: &str) -> Option<&Rule> {
232 self.rules.get(name)
233 }
234
235 fn global_arrangement(&mut self, name: &str) -> Option<&mut RelationHandle<T>> {
236 self.internal.arrangements.get_mut(name)
237 }
238
239 fn has_attribute(&self, name: &str) -> bool {
240 self.internal.attributes.contains_key(name)
241 }
242
243 fn forward_count(&mut self, name: &str) -> Option<&mut TraceKeyHandle<Value, T, isize>> {
244 self.internal.forward_count.get_mut(name)
245 }
246
247 fn forward_propose(
248 &mut self,
249 name: &str,
250 ) -> Option<&mut TraceValHandle<Value, Value, T, isize>> {
251 self.internal.forward_propose.get_mut(name)
252 }
253
254 fn forward_validate(
255 &mut self,
256 name: &str,
257 ) -> Option<&mut TraceKeyHandle<(Value, Value), T, isize>> {
258 self.internal.forward_validate.get_mut(name)
259 }
260
261 fn reverse_count(&mut self, name: &str) -> Option<&mut TraceKeyHandle<Value, T, isize>> {
262 self.internal.reverse_count.get_mut(name)
263 }
264
265 fn reverse_propose(
266 &mut self,
267 name: &str,
268 ) -> Option<&mut TraceValHandle<Value, Value, T, isize>> {
269 self.internal.reverse_propose.get_mut(name)
270 }
271
272 fn reverse_validate(
273 &mut self,
274 name: &str,
275 ) -> Option<&mut TraceKeyHandle<(Value, Value), T, isize>> {
276 self.internal.reverse_validate.get_mut(name)
277 }
278
279 fn is_underconstrained(&self, _name: &str) -> bool {
280 true
282 }
283}
284
285impl<T, Token> Server<T, Token>
286where
287 T: Timestamp + Lattice + Default + Rewind,
288 Token: Hash + Eq + Copy,
289{
290 pub fn new(config: Configuration) -> Self {
292 Server::new_at(config, Instant::now())
293 }
294
295 pub fn new_at(config: Configuration, t0: Instant) -> Self {
299 let timely_events = Some(Rc::new(EventLink::new()));
300 let differential_events = Some(Rc::new(EventLink::new()));
301
302 Server {
303 config,
304 t0,
305 context: Context {
306 rules: HashMap::new(),
307 internal: Domain::new(Default::default()),
308 underconstrained: HashSet::new(),
309 },
310 interests: HashMap::new(),
311 shutdown_handles: HashMap::new(),
312 probe: ProbeHandle::new(),
313 scheduler: Rc::new(RefCell::new(Scheduler::new())),
314 timely_events,
315 differential_events,
316 }
317 }
318
319 pub fn builtins() -> Vec<Request> {
321 vec![
322 ]
335 }
336
337 fn shutdown_query(&mut self, name: &str) {
340 info!("Shutting down {}", name);
341 self.shutdown_handles.remove(name);
342 }
343
344 pub fn transact(
346 &mut self,
347 tx_data: Vec<TxData>,
348 owner: usize,
349 worker_index: usize,
350 ) -> Result<(), Error> {
351 if owner == worker_index {
353 self.context.internal.transact(tx_data)
354 } else {
355 Ok(())
356 }
357 }
358
359 pub fn interest<S: Scope<Timestamp = T>>(
361 &mut self,
362 name: &str,
363 scope: &mut S,
364 ) -> Result<Collection<S, Vec<Value>, isize>, Error> {
365 if self.context.internal.arrangements.contains_key(name) {
368 let relation = self
370 .context
371 .global_arrangement(name)
372 .unwrap()
373 .import_named(scope, name)
374 .as_collection(|tuple, _| tuple.clone());
375
376 Ok(relation)
377 } else {
378 let (mut rel_map, shutdown_handle) = if self.config.enable_optimizer {
379 implement_neu(name, scope, &mut self.context)?
380 } else {
381 implement(name, scope, &mut self.context)?
382 };
383
384 match rel_map.remove(name) {
391 None => Err(Error::fault(format!(
392 "Relation of interest ({}) wasn't actually implemented.",
393 name
394 ))),
395 Some(relation) => {
396 self.shutdown_handles
397 .insert(name.to_string(), shutdown_handle);
398
399 Ok(relation)
400 }
401 }
402 }
403 }
404
405 pub fn register(&mut self, req: Register) -> Result<(), Error> {
407 let Register { rules, .. } = req;
408
409 for rule in rules.into_iter() {
410 if self.context.rules.contains_key(&rule.name) {
411 continue;
414 } else {
415 self.context.rules.insert(rule.name.to_string(), rule);
426 }
427 }
428
429 Ok(())
430 }
431
432 pub fn register_source<S: Scope<Timestamp = T>>(
434 &mut self,
435 source: Box<dyn Sourceable<S>>,
436 scope: &mut S,
437 ) -> Result<(), Error> {
438 let context = SourcingContext {
444 t0: self.t0,
445 scheduler: Rc::downgrade(&self.scheduler),
446 domain_probe: self.context.internal.domain_probe().clone(),
447 timely_events: self.timely_events.clone().unwrap(),
448 differential_events: self.differential_events.clone().unwrap(),
449 };
450
451 let mut attribute_streams = source.source(scope, context);
455
456 for (aid, config, datoms) in attribute_streams.drain(..) {
457 self.context
458 .internal
459 .create_sourced_attribute(&aid, config, &datoms)?;
460 }
461
462 Ok(())
479 }
480
481 pub fn advance_domain(&mut self, name: Option<String>, next: T) -> Result<(), Error> {
483 match name {
484 None => self.context.internal.advance_epoch(next),
485 Some(_) => Err(Error::unsupported("Named domains are not yet supported.")),
486 }
487 }
488
489 pub fn uninterest(&mut self, client: Token, name: &str) -> Result<(), Error> {
492 if let Some(entry) = self.interests.get_mut(name) {
495 entry.remove(&client);
496
497 if entry.is_empty() {
498 self.shutdown_query(name);
499 self.interests.remove(name);
500 }
501 }
502
503 Ok(())
504 }
505
506 pub fn disconnect_client(&mut self, client: Token) -> Result<(), Error> {
508 let names: Vec<String> = self.interests.keys().cloned().collect();
509
510 for query_name in names.iter() {
511 self.uninterest(client, query_name)?
512 }
513
514 Ok(())
515 }
516
517 pub fn is_any_outdated(&self) -> bool {
521 self.probe
522 .with_frontier(|out_frontier| self.context.internal.dominates(out_frontier))
523 }
524
525 pub fn test_single<S: Scope<Timestamp = T>>(
528 &mut self,
529 scope: &mut S,
530 rule: Rule,
531 ) -> Collection<S, Vec<Value>, isize> {
532 let interest_name = rule.name.clone();
533 let publish_name = rule.name.clone();
534
535 self.register(Register {
536 rules: vec![rule],
537 publish: vec![publish_name],
538 })
539 .unwrap();
540
541 match self.interest(&interest_name, scope) {
542 Err(error) => panic!("{:?}", error),
543 Ok(relation) => relation.probe_with(&mut self.probe),
544 }
545 }
546}
547
548impl<Token> Server<Duration, Token>
549where
550 Token: Hash + Eq + Copy,
551{
552 pub fn enable_logging<A: Allocate>(&self, worker: &mut Worker<A>) -> Result<(), Error> {
554 let mut timely_logger = BatchLogger::new(self.timely_events.clone().unwrap());
555 worker
556 .log_register()
557 .insert::<TimelyEvent, _>("timely", move |time, data| {
558 timely_logger.publish_batch(time, data)
559 });
560
561 let mut differential_logger = BatchLogger::new(self.differential_events.clone().unwrap());
562 worker
563 .log_register()
564 .insert::<DifferentialEvent, _>("differential/arrange", move |time, data| {
565 differential_logger.publish_batch(time, data)
566 });
567
568 Ok(())
569 }
570
571 pub fn shutdown_logging<A: Allocate>(&self, worker: &mut Worker<A>) -> Result<(), Error> {
573 worker
574 .log_register()
575 .insert::<TimelyEvent, _>("timely", move |_time, _data| {});
576
577 worker
578 .log_register()
579 .insert::<DifferentialEvent, _>("differential/arrange", move |_time, _data| {});
580
581 worker
582 .log_register()
583 .insert::<DeclarativeEvent, _>("declarative", move |_time, _data| {});
584
585 Ok(())
586 }
587}