declarative_dataflow/server/
mod.rs

1//! Server logic for driving the library via commands.
2
3use std::cell::RefCell;
4use std::collections::{HashMap, HashSet};
5use std::hash::Hash;
6use std::rc::Rc;
7use std::time::{Duration, Instant};
8
9use timely::communication::Allocate;
10use timely::dataflow::operators::capture::event::link::EventLink;
11use timely::dataflow::{ProbeHandle, Scope};
12use timely::logging::{BatchLogger, TimelyEvent};
13use timely::progress::Timestamp;
14use timely::worker::Worker;
15
16use differential_dataflow::collection::Collection;
17use differential_dataflow::lattice::Lattice;
18use differential_dataflow::logging::DifferentialEvent;
19
20use crate::domain::Domain;
21use crate::logging::DeclarativeEvent;
22use crate::plan::ImplContext;
23use crate::sinks::Sink;
24use crate::sources::{Source, Sourceable, SourcingContext};
25use crate::Rule;
26use crate::{implement, implement_neu, AttributeConfig, RelationHandle, ShutdownHandle};
27use crate::{Aid, Error, Rewind, Time, TxData, Value};
28use crate::{TraceKeyHandle, TraceValHandle};
29
30pub mod scheduler;
31use self::scheduler::Scheduler;
32
33/// Server configuration.
34#[derive(Clone, Debug, Serialize, Deserialize)]
35pub struct Configuration {
36    /// Automatic domain tick interval.
37    pub tick: Option<Duration>,
38    /// Do clients have to call AdvanceDomain explicitely?
39    pub manual_advance: bool,
40    /// Should logging streams be created?
41    pub enable_logging: bool,
42    /// Should queries use the optimizer during implementation?
43    pub enable_optimizer: bool,
44}
45
46impl Default for Configuration {
47    fn default() -> Self {
48        Configuration {
49            tick: None,
50            manual_advance: false,
51            enable_logging: false,
52            enable_optimizer: false,
53        }
54    }
55}
56
57#[cfg(feature = "getopts")]
58impl Configuration {
59    /// Returns a `getopts::Options` struct describing all available
60    /// configuration options.
61    pub fn options() -> getopts::Options {
62        let mut opts = getopts::Options::new();
63
64        opts.optopt(
65            "",
66            "tick",
67            "advance domain at a regular interval",
68            "SECONDS",
69        );
70        opts.optflag(
71            "",
72            "manual-advance",
73            "forces clients to call AdvanceDomain explicitely",
74        );
75        opts.optflag("", "enable-logging", "enable log event sources");
76        opts.optflag("", "enable-optimizer", "enable WCO queries");
77        opts.optflag("", "enable-meta", "enable queries on the query graph");
78
79        opts
80    }
81
82    /// Parses configuration options from the provided arguments.
83    pub fn from_args<I: Iterator<Item = String>>(args: I) -> Result<Self, String> {
84        let default: Self = Default::default();
85        let opts = Self::options();
86
87        let matches = opts.parse(args)?;
88
89        let tick: Option<Duration> = matches
90            .opt_str("tick")
91            .map(|x| Duration::from_secs(x.parse().expect("failed to parse tick duration")));
92
93        Self {
94            tick,
95            manual_advance: matches.opt_present("manual-advance"),
96            enable_logging: matches.opt_present("enable-logging"),
97            enable_optimizer: matches.opt_present("enable-optimizer"),
98        }
99    }
100}
101
102/// Transaction ids.
103pub type TxId = u64;
104
105/// A request expressing interest in receiving results published under
106/// the specified name.
107#[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
108pub struct Interest {
109    /// The name of a previously registered dataflow.
110    pub name: String,
111    /// Granularity at which to send results. None indicates no delay.
112    pub granularity: Option<Time>,
113    /// An optional sink configuration.
114    pub sink: Option<Sink>,
115    /// Whether or not to log events from this dataflow.
116    pub disable_logging: Option<bool>,
117}
118
119impl std::convert::From<&Interest> for crate::sinks::SinkingContext {
120    fn from(interest: &Interest) -> Self {
121        Self {
122            name: interest.name.clone(),
123            granularity: interest.granularity.clone(),
124        }
125    }
126}
127
128/// A request with the intent of synthesising one or more new rules
129/// and optionally publishing one or more of them.
130#[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
131pub struct Register {
132    /// A list of rules to synthesise in order.
133    pub rules: Vec<Rule>,
134    /// The names of rules that should be published.
135    pub publish: Vec<String>,
136}
137
138/// A request with the intent of creating a new named, globally
139/// available input that can be transacted upon.
140#[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
141pub struct CreateAttribute {
142    /// A globally unique name under which to publish data sent via
143    /// this input.
144    pub name: String,
145    /// Semantics enforced on this attribute by 3DF.
146    pub config: AttributeConfig,
147}
148
149/// Possible request types.
150#[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
151pub enum Request {
152    /// Sends inputs via one or more registered handles.
153    Transact(Vec<TxData>),
154    /// Expresses interest in a named relation.
155    Interest(Interest),
156    /// Expresses that the interest in a named relation has
157    /// stopped. Once all interested clients have sent this, the
158    /// dataflow can be cleaned up.
159    Uninterest(String),
160    /// Registers one or more named relations.
161    Register(Register),
162    /// A request with the intent of attaching to an external data
163    /// source that publishes one or more attributes and relations.
164    RegisterSource(Source),
165    /// Creates a named input handle that can be `Transact`ed upon.
166    CreateAttribute(CreateAttribute),
167    /// Advances the specified domain to the specified time.
168    AdvanceDomain(Option<String>, Time),
169    /// Requests a domain advance to whatever epoch the server
170    /// determines is *now*. Used by clients to enforce a minimum
171    /// granularity of responses, if inputs happen only infrequently.
172    Tick,
173    /// Closes a named input handle.
174    CloseInput(String),
175    /// Client has disconnected.
176    Disconnect,
177    /// Requests any setup logic that needs to be executed
178    /// deterministically across all workers.
179    Setup,
180    /// Requests a heartbeat containing status information.
181    Status,
182    /// Requests orderly shutdown of the system.
183    Shutdown,
184}
185
186/// Server context maintaining globally registered arrangements and
187/// input handles.
188pub struct Server<T, Token>
189where
190    T: Timestamp + Lattice,
191    Token: Hash + Eq + Copy,
192{
193    /// Server configuration.
194    pub config: Configuration,
195    /// A timer started at the initation of the timely computation
196    /// (copied from worker).
197    pub t0: Instant,
198    /// Implementation context.
199    pub context: Context<T>,
200    /// Mapping from query names to interested client tokens.
201    pub interests: HashMap<String, HashSet<Token>>,
202    // Mapping from query names to their shutdown handles.
203    shutdown_handles: HashMap<String, ShutdownHandle>,
204    /// Probe keeping track of overall dataflow progress.
205    pub probe: ProbeHandle<T>,
206    /// Scheduler managing deferred operator activations.
207    pub scheduler: Rc<RefCell<Scheduler>>,
208    // Link to replayable Timely logging events.
209    timely_events: Option<Rc<EventLink<Duration, (Duration, usize, TimelyEvent)>>>,
210    // Link to replayable Differential logging events.
211    differential_events: Option<Rc<EventLink<Duration, (Duration, usize, DifferentialEvent)>>>,
212}
213
214/// Implementation context.
215pub struct Context<T>
216where
217    T: Timestamp + Lattice,
218{
219    /// Representation of named rules.
220    pub rules: HashMap<Aid, Rule>,
221    /// Set of rules known to be underconstrained.
222    pub underconstrained: HashSet<Aid>,
223    /// Internal domain of command sequence numbers.
224    pub internal: Domain<T>,
225}
226
227impl<T> ImplContext<T> for Context<T>
228where
229    T: Timestamp + Lattice,
230{
231    fn rule(&self, name: &str) -> Option<&Rule> {
232        self.rules.get(name)
233    }
234
235    fn global_arrangement(&mut self, name: &str) -> Option<&mut RelationHandle<T>> {
236        self.internal.arrangements.get_mut(name)
237    }
238
239    fn has_attribute(&self, name: &str) -> bool {
240        self.internal.attributes.contains_key(name)
241    }
242
243    fn forward_count(&mut self, name: &str) -> Option<&mut TraceKeyHandle<Value, T, isize>> {
244        self.internal.forward_count.get_mut(name)
245    }
246
247    fn forward_propose(
248        &mut self,
249        name: &str,
250    ) -> Option<&mut TraceValHandle<Value, Value, T, isize>> {
251        self.internal.forward_propose.get_mut(name)
252    }
253
254    fn forward_validate(
255        &mut self,
256        name: &str,
257    ) -> Option<&mut TraceKeyHandle<(Value, Value), T, isize>> {
258        self.internal.forward_validate.get_mut(name)
259    }
260
261    fn reverse_count(&mut self, name: &str) -> Option<&mut TraceKeyHandle<Value, T, isize>> {
262        self.internal.reverse_count.get_mut(name)
263    }
264
265    fn reverse_propose(
266        &mut self,
267        name: &str,
268    ) -> Option<&mut TraceValHandle<Value, Value, T, isize>> {
269        self.internal.reverse_propose.get_mut(name)
270    }
271
272    fn reverse_validate(
273        &mut self,
274        name: &str,
275    ) -> Option<&mut TraceKeyHandle<(Value, Value), T, isize>> {
276        self.internal.reverse_validate.get_mut(name)
277    }
278
279    fn is_underconstrained(&self, _name: &str) -> bool {
280        // self.underconstrained.contains(name)
281        true
282    }
283}
284
285impl<T, Token> Server<T, Token>
286where
287    T: Timestamp + Lattice + Default + Rewind,
288    Token: Hash + Eq + Copy,
289{
290    /// Creates a new server state from a configuration.
291    pub fn new(config: Configuration) -> Self {
292        Server::new_at(config, Instant::now())
293    }
294
295    /// Creates a new server state from a configuration with an
296    /// additionally specified beginning of the computation: an
297    /// instant in relation to which all durations will be measured.
298    pub fn new_at(config: Configuration, t0: Instant) -> Self {
299        let timely_events = Some(Rc::new(EventLink::new()));
300        let differential_events = Some(Rc::new(EventLink::new()));
301
302        Server {
303            config,
304            t0,
305            context: Context {
306                rules: HashMap::new(),
307                internal: Domain::new(Default::default()),
308                underconstrained: HashSet::new(),
309            },
310            interests: HashMap::new(),
311            shutdown_handles: HashMap::new(),
312            probe: ProbeHandle::new(),
313            scheduler: Rc::new(RefCell::new(Scheduler::new())),
314            timely_events,
315            differential_events,
316        }
317    }
318
319    /// Returns commands to install built-in plans.
320    pub fn builtins() -> Vec<Request> {
321        vec![
322            // Request::CreateAttribute(CreateAttribute {
323            //     name: "df.pattern/e".to_string(),
324            //     semantics: InputSemantics::Raw,
325            // }),
326            // Request::CreateAttribute(CreateAttribute {
327            //     name: "df.pattern/a".to_string(),
328            //     semantics: InputSemantics::Raw,
329            // }),
330            // Request::CreateAttribute(CreateAttribute {
331            //     name: "df.pattern/v".to_string(),
332            //     semantics: InputSemantics::Raw,
333            // }),
334        ]
335    }
336
337    /// Drops all shutdown handles associated with the specified
338    /// query, resulting in its dataflow getting cleaned up.
339    fn shutdown_query(&mut self, name: &str) {
340        info!("Shutting down {}", name);
341        self.shutdown_handles.remove(name);
342    }
343
344    /// Handle a Transact request.
345    pub fn transact(
346        &mut self,
347        tx_data: Vec<TxData>,
348        owner: usize,
349        worker_index: usize,
350    ) -> Result<(), Error> {
351        // only the owner should actually introduce new inputs
352        if owner == worker_index {
353            self.context.internal.transact(tx_data)
354        } else {
355            Ok(())
356        }
357    }
358
359    /// Handles an Interest request.
360    pub fn interest<S: Scope<Timestamp = T>>(
361        &mut self,
362        name: &str,
363        scope: &mut S,
364    ) -> Result<Collection<S, Vec<Value>, isize>, Error> {
365        // We need to do a `contains_key` here to avoid taking
366        // a mut ref on context.
367        if self.context.internal.arrangements.contains_key(name) {
368            // Rule is already implemented.
369            let relation = self
370                .context
371                .global_arrangement(name)
372                .unwrap()
373                .import_named(scope, name)
374                .as_collection(|tuple, _| tuple.clone());
375
376            Ok(relation)
377        } else {
378            let (mut rel_map, shutdown_handle) = if self.config.enable_optimizer {
379                implement_neu(name, scope, &mut self.context)?
380            } else {
381                implement(name, scope, &mut self.context)?
382            };
383
384            // @TODO when do we actually want to register result traces for re-use?
385            // for (name, relation) in rel_map.into_iter() {
386            // let trace = relation.map(|t| (t, ())).arrange_named(name).trace;
387            //     self.context.register_arrangement(name, config, trace);
388            // }
389
390            match rel_map.remove(name) {
391                None => Err(Error::fault(format!(
392                    "Relation of interest ({}) wasn't actually implemented.",
393                    name
394                ))),
395                Some(relation) => {
396                    self.shutdown_handles
397                        .insert(name.to_string(), shutdown_handle);
398
399                    Ok(relation)
400                }
401            }
402        }
403    }
404
405    /// Handle a Register request.
406    pub fn register(&mut self, req: Register) -> Result<(), Error> {
407        let Register { rules, .. } = req;
408
409        for rule in rules.into_iter() {
410            if self.context.rules.contains_key(&rule.name) {
411                // @TODO panic if hashes don't match
412                // panic!("Attempted to re-register a named relation");
413                continue;
414            } else {
415                // if self.config.enable_meta {
416                //     let mut data = rule.plan.datafy();
417                //     let tx_data: Vec<TxData> = data
418                //         .drain(..)
419                //         .map(|(e, a, v)| TxData(1, Value::Eid(e), a, v, None))
420                //         .collect();
421
422                //     self.transact(tx_data, 0, 0)?;
423                // }
424
425                self.context.rules.insert(rule.name.to_string(), rule);
426            }
427        }
428
429        Ok(())
430    }
431
432    /// Handle a RegisterSource request.
433    pub fn register_source<S: Scope<Timestamp = T>>(
434        &mut self,
435        source: Box<dyn Sourceable<S>>,
436        scope: &mut S,
437    ) -> Result<(), Error> {
438        // use timely::logging::Logger;
439        // let timely_logger = scope.log_register().remove("timely");
440
441        // let differential_logger = scope.log_register().remove("differential/arrange");
442
443        let context = SourcingContext {
444            t0: self.t0,
445            scheduler: Rc::downgrade(&self.scheduler),
446            domain_probe: self.context.internal.domain_probe().clone(),
447            timely_events: self.timely_events.clone().unwrap(),
448            differential_events: self.differential_events.clone().unwrap(),
449        };
450
451        // self.timely_events = None;
452        // self.differential_events = None;
453
454        let mut attribute_streams = source.source(scope, context);
455
456        for (aid, config, datoms) in attribute_streams.drain(..) {
457            self.context
458                .internal
459                .create_sourced_attribute(&aid, config, &datoms)?;
460        }
461
462        // if let Some(logger) = timely_logger {
463        //     if let Ok(logger) = logger.downcast::<Logger<TimelyEvent>>() {
464        //         scope
465        //             .log_register()
466        //             .insert_logger::<TimelyEvent>("timely", *logger);
467        //     }
468        // }
469
470        // if let Some(logger) = differential_logger {
471        //     if let Ok(logger) = logger.downcast::<Logger<DifferentialEvent>>() {
472        //         scope
473        //             .log_register()
474        //             .insert_logger::<DifferentialEvent>("differential/arrange", *logger);
475        //     }
476        // }
477
478        Ok(())
479    }
480
481    /// Handle an AdvanceDomain request.
482    pub fn advance_domain(&mut self, name: Option<String>, next: T) -> Result<(), Error> {
483        match name {
484            None => self.context.internal.advance_epoch(next),
485            Some(_) => Err(Error::unsupported("Named domains are not yet supported.")),
486        }
487    }
488
489    /// Handles an Uninterest request, possibly cleaning up dataflows
490    /// that are no longer interesting to any client.
491    pub fn uninterest(&mut self, client: Token, name: &str) -> Result<(), Error> {
492        // All workers keep track of every client's interests, s.t. they
493        // know when to clean up unused dataflows.
494        if let Some(entry) = self.interests.get_mut(name) {
495            entry.remove(&client);
496
497            if entry.is_empty() {
498                self.shutdown_query(name);
499                self.interests.remove(name);
500            }
501        }
502
503        Ok(())
504    }
505
506    /// Cleans up all bookkeeping state for the specified client.
507    pub fn disconnect_client(&mut self, client: Token) -> Result<(), Error> {
508        let names: Vec<String> = self.interests.keys().cloned().collect();
509
510        for query_name in names.iter() {
511            self.uninterest(client, query_name)?
512        }
513
514        Ok(())
515    }
516
517    /// Returns true iff the probe is behind any input handle. Mostly
518    /// used as a convenience method during testing. Using this within
519    /// `step_while` is not safe in general and might lead to stalls.
520    pub fn is_any_outdated(&self) -> bool {
521        self.probe
522            .with_frontier(|out_frontier| self.context.internal.dominates(out_frontier))
523    }
524
525    /// Helper for registering, publishing, and indicating interest in
526    /// a single, named query. Used for testing.
527    pub fn test_single<S: Scope<Timestamp = T>>(
528        &mut self,
529        scope: &mut S,
530        rule: Rule,
531    ) -> Collection<S, Vec<Value>, isize> {
532        let interest_name = rule.name.clone();
533        let publish_name = rule.name.clone();
534
535        self.register(Register {
536            rules: vec![rule],
537            publish: vec![publish_name],
538        })
539        .unwrap();
540
541        match self.interest(&interest_name, scope) {
542            Err(error) => panic!("{:?}", error),
543            Ok(relation) => relation.probe_with(&mut self.probe),
544        }
545    }
546}
547
548impl<Token> Server<Duration, Token>
549where
550    Token: Hash + Eq + Copy,
551{
552    /// Registers loggers for use in the various logging sources.
553    pub fn enable_logging<A: Allocate>(&self, worker: &mut Worker<A>) -> Result<(), Error> {
554        let mut timely_logger = BatchLogger::new(self.timely_events.clone().unwrap());
555        worker
556            .log_register()
557            .insert::<TimelyEvent, _>("timely", move |time, data| {
558                timely_logger.publish_batch(time, data)
559            });
560
561        let mut differential_logger = BatchLogger::new(self.differential_events.clone().unwrap());
562        worker
563            .log_register()
564            .insert::<DifferentialEvent, _>("differential/arrange", move |time, data| {
565                differential_logger.publish_batch(time, data)
566            });
567
568        Ok(())
569    }
570
571    /// Unregisters loggers.
572    pub fn shutdown_logging<A: Allocate>(&self, worker: &mut Worker<A>) -> Result<(), Error> {
573        worker
574            .log_register()
575            .insert::<TimelyEvent, _>("timely", move |_time, _data| {});
576
577        worker
578            .log_register()
579            .insert::<DifferentialEvent, _>("differential/arrange", move |_time, _data| {});
580
581        worker
582            .log_register()
583            .insert::<DeclarativeEvent, _>("declarative", move |_time, _data| {});
584
585        Ok(())
586    }
587}