declarative_dataflow/
lib.rs

1//! Declarative dataflow infrastructure
2//!
3//! This crate contains types, traits, and logic for assembling
4//! differential dataflow computations from declaratively specified
5//! programs, without any additional compilation.
6
7#![forbid(missing_docs)]
8
9#[macro_use]
10extern crate log;
11#[macro_use]
12extern crate serde_derive;
13
14pub mod binding;
15pub mod domain;
16pub mod logging;
17pub mod operators;
18pub mod plan;
19pub mod server;
20pub mod sinks;
21pub mod sources;
22pub mod timestamp;
23
24use std::collections::{HashMap, HashSet, VecDeque};
25use std::time::Duration;
26
27use timely::dataflow::operators::CapabilitySet;
28use timely::dataflow::scopes::child::Iterative;
29use timely::dataflow::*;
30use timely::order::Product;
31use timely::progress::Timestamp;
32
33use differential_dataflow::lattice::Lattice;
34use differential_dataflow::operators::arrange::{ShutdownButton, TraceAgent};
35use differential_dataflow::operators::iterate::Variable;
36#[cfg(not(feature = "set-semantics"))]
37use differential_dataflow::operators::Consolidate;
38#[cfg(feature = "set-semantics")]
39use differential_dataflow::operators::Threshold;
40use differential_dataflow::trace::implementations::ord::{OrdKeySpine, OrdValSpine};
41use differential_dataflow::trace::TraceReader;
42use differential_dataflow::{Collection, ExchangeData};
43
44#[cfg(feature = "uuid")]
45pub use uuid::Uuid;
46
47pub use num_rational::Rational32;
48
49pub use binding::{AsBinding, AttributeBinding, Binding};
50pub use plan::{Hector, ImplContext, Implementable, Plan};
51pub use timestamp::{Rewind, Time};
52
53/// A unique entity identifier.
54pub type Eid = u64;
55
56/// A unique attribute identifier.
57pub type Aid = String; // u32
58
59/// Possible data values.
60///
61/// This enum captures the currently supported data types, and is the
62/// least common denominator for the types of records moved around.
63#[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
64pub enum Value {
65    /// An attribute identifier
66    Aid(Aid),
67    /// A string
68    String(String),
69    /// A boolean
70    Bool(bool),
71    /// A 64 bit signed integer
72    Number(i64),
73    /// A 32 bit rational
74    Rational32(Rational32),
75    /// An entity identifier
76    Eid(Eid),
77    /// Milliseconds since midnight, January 1, 1970 UTC
78    Instant(u64),
79    /// A 16 byte unique identifier.
80    #[cfg(feature = "uuid")]
81    Uuid(Uuid),
82    /// A fixed-precision real number.
83    #[cfg(feature = "real")]
84    Real(fixed::types::I16F16),
85}
86
87impl Value {
88    /// Helper to create an Aid value from a string representation.
89    pub fn aid(v: &str) -> Self {
90        Value::Aid(v.to_string())
91    }
92
93    /// Helper to create a UUID value from a string representation.
94    #[cfg(feature = "uuid")]
95    pub fn uuid_str(v: &str) -> Self {
96        let uuid = Uuid::parse_str(v).expect("failed to parse UUID");
97        Value::Uuid(uuid)
98    }
99}
100
101impl std::convert::From<&str> for Value {
102    fn from(v: &str) -> Self {
103        Value::String(v.to_string())
104    }
105}
106
107#[cfg(feature = "real")]
108impl std::convert::From<f64> for Value {
109    fn from(v: f64) -> Self {
110        let real =
111            fixed::types::I16F16::checked_from_float(v).expect("failed to convert to I16F16");
112
113        Value::Real(real)
114    }
115}
116
117#[cfg(feature = "serde_json")]
118impl std::convert::From<Value> for serde_json::Value {
119    fn from(v: Value) -> Self {
120        match v {
121            Value::Eid(v) => serde_json::Value::String(v.to_string()),
122            Value::Aid(v) => serde_json::Value::String(v),
123            Value::String(v) => serde_json::Value::String(v),
124            Value::Bool(v) => serde_json::Value::Bool(v),
125            Value::Number(v) => serde_json::Value::Number(serde_json::Number::from(v)),
126            _ => unimplemented!(),
127        }
128    }
129}
130
131impl std::convert::From<Value> for Eid {
132    fn from(v: Value) -> Eid {
133        if let Value::Eid(eid) = v {
134            eid
135        } else {
136            panic!("Value {:?} can't be converted to Eid", v);
137        }
138    }
139}
140
141/// A client-facing, non-exceptional error.
142#[derive(Clone, Debug, Serialize, Deserialize)]
143pub struct Error {
144    /// Error category.
145    #[serde(rename = "df.error/category")]
146    pub category: String,
147    /// Free-frorm description.
148    #[serde(rename = "df.error/message")]
149    pub message: String,
150}
151
152impl Error {
153    /// Fix client bug.
154    pub fn incorrect<E: std::string::ToString>(error: E) -> Error {
155        Error {
156            category: "df.error.category/incorrect".to_string(),
157            message: error.to_string(),
158        }
159    }
160
161    /// Fix client noun.
162    pub fn not_found<E: std::string::ToString>(error: E) -> Error {
163        Error {
164            category: "df.error.category/not-found".to_string(),
165            message: error.to_string(),
166        }
167    }
168
169    /// Coordinate with worker.
170    pub fn conflict<E: std::string::ToString>(error: E) -> Error {
171        Error {
172            category: "df.error.category/conflict".to_string(),
173            message: error.to_string(),
174        }
175    }
176
177    /// Fix worker bug.
178    pub fn fault<E: std::string::ToString>(error: E) -> Error {
179        Error {
180            category: "df.error.category/fault".to_string(),
181            message: error.to_string(),
182        }
183    }
184
185    /// Fix client verb.
186    pub fn unsupported<E: std::string::ToString>(error: E) -> Error {
187        Error {
188            category: "df.error.category/unsupported".to_string(),
189            message: error.to_string(),
190        }
191    }
192}
193
194/// Transaction data.
195#[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
196pub struct TxData(pub isize, pub Value, pub Aid, pub Value, pub Option<Time>);
197
198impl TxData {
199    /// Creates TxData representing the addition of a single fact.
200    pub fn add(e: Eid, a: &str, v: Value) -> Self {
201        TxData(1, Value::Eid(e), a.to_string(), v, None)
202    }
203
204    /// Creates TxData representing the addition of a single fact at a
205    /// specific point in time.
206    pub fn add_at(e: Eid, a: &str, v: Value, t: Time) -> Self {
207        TxData(1, Value::Eid(e), a.to_string(), v, Some(t))
208    }
209
210    /// Creates TxData representing the retraction of a single fact.
211    pub fn retract(e: Eid, a: &str, v: Value) -> Self {
212        TxData(-1, Value::Eid(e), a.to_string(), v, None)
213    }
214
215    /// Creates TxData representing the retraction of a single fact at
216    /// a specific point in time.
217    pub fn retract_at(e: Eid, a: &str, v: Value, t: Time) -> Self {
218        TxData(-1, Value::Eid(e), a.to_string(), v, Some(t))
219    }
220}
221
222/// A (tuple, time, diff) triple, as sent back to clients.
223pub type ResultDiff<T> = (Vec<Value>, T, isize);
224
225/// A worker-local client connection identifier.
226pub type Client = usize;
227
228/// Anything that can be returned to clients.
229#[derive(Clone, Debug, Serialize, Deserialize)]
230pub enum Output {
231    /// A batch of (tuple, time, diff) triples as returned by Datalog
232    /// queries.
233    QueryDiff(String, Vec<ResultDiff<Time>>),
234    /// A JSON object, e.g. as returned by GraphQL queries.
235    #[cfg(feature = "serde_json")]
236    Json(String, serde_json::Value, Time, isize),
237    /// A message forwarded to a specific client.
238    #[cfg(feature = "serde_json")]
239    Message(Client, serde_json::Value),
240    /// An error forwarded to a specific client.
241    Error(Client, Error, server::TxId),
242}
243
244/// A trace of values indexed by self.
245pub type TraceKeyHandle<K, T, R> = TraceAgent<OrdKeySpine<K, T, R>>;
246
247/// A trace of (K, V) pairs indexed by key.
248pub type TraceValHandle<K, V, T, R> = TraceAgent<OrdValSpine<K, V, T, R>>;
249
250/// A handle to an arranged relation.
251pub type RelationHandle<T> = TraceKeyHandle<Vec<Value>, T, isize>;
252
253// A map for keeping track of collections that are being actively
254// synthesized (i.e. that are not fully defined yet).
255type VariableMap<G> = HashMap<String, Variable<G, Vec<Value>, isize>>;
256
257trait Shutdownable {
258    fn press(&mut self);
259}
260
261impl<T> Shutdownable for ShutdownButton<T> {
262    #[inline(always)]
263    fn press(&mut self) {
264        self.press();
265    }
266}
267
268/// A wrapper around a vector of ShutdownButton's. Ensures they will
269/// be pressed on dropping the handle.
270pub struct ShutdownHandle {
271    shutdown_buttons: Vec<Box<dyn Shutdownable>>,
272}
273
274impl Drop for ShutdownHandle {
275    fn drop(&mut self) {
276        for mut button in self.shutdown_buttons.drain(..) {
277            trace!("pressing shutdown button");
278            button.press();
279        }
280    }
281}
282
283impl ShutdownHandle {
284    /// Returns an empty shutdown handle.
285    pub fn empty() -> Self {
286        ShutdownHandle {
287            shutdown_buttons: Vec::new(),
288        }
289    }
290
291    /// Wraps a single shutdown button into a shutdown handle.
292    pub fn from_button<T: Timestamp>(button: ShutdownButton<CapabilitySet<T>>) -> Self {
293        ShutdownHandle {
294            shutdown_buttons: vec![Box::new(button)],
295        }
296    }
297
298    /// Adds another shutdown button to this handle. This button will
299    /// then also be pressed, whenever the handle is shut down or
300    /// dropped.
301    pub fn add_button<T: Timestamp>(&mut self, button: ShutdownButton<CapabilitySet<T>>) {
302        self.shutdown_buttons.push(Box::new(button));
303    }
304
305    /// Combines the buttons of another handle into self.
306    pub fn merge_with(&mut self, mut other: Self) {
307        self.shutdown_buttons.append(&mut other.shutdown_buttons);
308    }
309
310    /// Combines two shutdown handles into a single one, which will
311    /// control both.
312    pub fn merge(mut left: Self, mut right: Self) -> Self {
313        let mut shutdown_buttons =
314            Vec::with_capacity(left.shutdown_buttons.len() + right.shutdown_buttons.len());
315        shutdown_buttons.append(&mut left.shutdown_buttons);
316        shutdown_buttons.append(&mut right.shutdown_buttons);
317
318        ShutdownHandle { shutdown_buttons }
319    }
320}
321
322/// Attribute indices can have various operations applied to them,
323/// based on their semantics.
324#[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
325pub enum InputSemantics {
326    /// No special semantics enforced. Source is responsible for
327    /// everything.
328    Raw,
329    /// Only a single value per eid is allowed at any given timestamp.
330    CardinalityOne,
331    /// Multiple different values for any given eid are allowed, but
332    /// (e,v) pairs are enforced to be distinct.
333    CardinalityMany,
334    // /// @TODO
335    // CAS,
336}
337
338/// Attributes can be indexed in two ways, once from eid to value and
339/// the other way around. More powerful query capabilities may rely on
340/// both directions being available, whereas simple queries, such as
341/// star-joins and pull queries, might get by with just a forward
342/// index.
343#[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
344pub enum IndexDirection {
345    /// Forward index only.
346    Forward,
347    /// Both directions are maintained.
348    Both,
349}
350
351/// Attributes might only appear in certain classes of queries. If
352/// that is the case, indexing overhead can be reduced.
353#[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
354pub enum QuerySupport {
355    /// Simple pull queries and star-joins require only a single
356    /// index.
357    Basic = 0,
358    /// Delta queries require an additional index for validation of
359    /// proposals.
360    Delta = 1,
361    /// Adaptive, worst-case optimal queries require three indices per
362    /// direction, one for proposals, one for validation, and one for
363    /// per-key statistics.
364    AdaptiveWCO = 2,
365}
366
367/// Per-attribute semantics.
368#[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
369pub struct AttributeConfig {
370    /// Modifiers to apply on attribute inputs, such as keeping only
371    /// the most recent value per eid, or compare-and-swap.
372    pub input_semantics: InputSemantics,
373    /// How close indexed traces should follow the computation
374    /// frontier.
375    pub trace_slack: Option<Time>,
376    /// Index directions to maintain for this attribute.
377    pub index_direction: IndexDirection,
378    /// Query capabilities supported by this attribute.
379    pub query_support: QuerySupport,
380    /// Does this attribute care about its respective time
381    /// dimension? Timeless attributes do not have an
382    /// influence on the overall progress in the system.
383    pub timeless: bool,
384}
385
386impl Default for AttributeConfig {
387    fn default() -> Self {
388        AttributeConfig {
389            input_semantics: InputSemantics::Raw,
390            trace_slack: None,
391            index_direction: IndexDirection::Forward,
392            query_support: QuerySupport::Basic,
393            timeless: false,
394        }
395    }
396}
397
398impl AttributeConfig {
399    /// Shortcut to specifying an attribute that will live in some
400    /// transaction time domain and always compact up to the
401    /// computation frontier.
402    pub fn tx_time(input_semantics: InputSemantics) -> Self {
403        AttributeConfig {
404            input_semantics,
405            // @TODO It's not super clear yet, whether this can be
406            // 0. There might be an off-by-one error hidden somewhere,
407            // s.t. traces advance to t+1 when we're still accepting
408            // inputs for t+1.
409            trace_slack: Some(Time::TxId(1)),
410            ..Default::default()
411        }
412    }
413
414    /// Shortcut to specifying an attribute that will live in some
415    /// real-time domain and always compact up to the computation
416    /// frontier.
417    pub fn real_time(input_semantics: InputSemantics) -> Self {
418        AttributeConfig {
419            input_semantics,
420            trace_slack: Some(Time::Real(Duration::from_secs(0))),
421            ..Default::default()
422        }
423    }
424
425    /// Shortcut to specifying an attribute that will live in an
426    /// arbitrary time domain and never compact its trace.
427    pub fn uncompacted(input_semantics: InputSemantics) -> Self {
428        AttributeConfig {
429            input_semantics,
430            trace_slack: None,
431            ..Default::default()
432        }
433    }
434}
435
436/// Per-relation semantics.
437#[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Debug)]
438pub struct RelationConfig {
439    /// How close the arranged trace should follow the computation
440    /// frontier.
441    pub trace_slack: Option<Time>,
442}
443
444/// A variable used in a query.
445type Var = u32;
446
447/// A named relation.
448#[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
449pub struct Rule {
450    /// The name identifying the relation.
451    pub name: String,
452    /// The plan describing contents of the relation.
453    pub plan: Plan,
454}
455
456/// A relation between a set of variables.
457///
458/// Relations can be backed by a collection of records of type
459/// `Vec<Value>`, each of a common length (with offsets corresponding
460/// to the variable offsets), or by an existing arrangement.
461trait Relation<'a, G, I>: AsBinding
462where
463    G: Scope,
464    G::Timestamp: Lattice + ExchangeData,
465    I: ImplContext<G::Timestamp>,
466{
467    /// A collection containing all tuples.
468    fn tuples(
469        self,
470        nested: &mut Iterative<'a, G, u64>,
471        context: &mut I,
472    ) -> (
473        Collection<Iterative<'a, G, u64>, Vec<Value>, isize>,
474        ShutdownHandle,
475    );
476
477    /// A collection containing all tuples projected onto the
478    /// specified variables.
479    fn projected(
480        self,
481        nested: &mut Iterative<'a, G, u64>,
482        context: &mut I,
483        target_variables: &[Var],
484    ) -> (
485        Collection<Iterative<'a, G, u64>, Vec<Value>, isize>,
486        ShutdownHandle,
487    );
488
489    /// A collection with tuples partitioned by `variables`.
490    ///
491    /// Each tuple is mapped to a pair `(Vec<Value>, Vec<Value>)`
492    /// containing first exactly those variables in `variables` in that
493    /// order, followed by the remaining values in their original
494    /// order.
495    fn tuples_by_variables(
496        self,
497        nested: &mut Iterative<'a, G, u64>,
498        context: &mut I,
499        variables: &[Var],
500    ) -> (
501        Collection<Iterative<'a, G, u64>, (Vec<Value>, Vec<Value>), isize>,
502        ShutdownHandle,
503    );
504}
505
506/// A collection and variable bindings.
507pub struct CollectionRelation<'a, G: Scope> {
508    variables: Vec<Var>,
509    tuples: Collection<Iterative<'a, G, u64>, Vec<Value>, isize>,
510}
511
512impl<'a, G: Scope> AsBinding for CollectionRelation<'a, G>
513where
514    G::Timestamp: Lattice + ExchangeData,
515{
516    fn variables(&self) -> Vec<Var> {
517        self.variables.clone()
518    }
519
520    fn binds(&self, variable: Var) -> Option<usize> {
521        self.variables.binds(variable)
522    }
523
524    fn ready_to_extend(&self, _prefix: &AsBinding) -> Option<Var> {
525        unimplemented!();
526    }
527
528    fn required_to_extend(&self, _prefix: &AsBinding, _target: Var) -> Option<Option<Var>> {
529        unimplemented!();
530    }
531}
532
533impl<'a, G, I> Relation<'a, G, I> for CollectionRelation<'a, G>
534where
535    G: Scope,
536    G::Timestamp: Lattice + ExchangeData,
537    I: ImplContext<G::Timestamp>,
538{
539    fn tuples(
540        self,
541        _nested: &mut Iterative<'a, G, u64>,
542        _context: &mut I,
543    ) -> (
544        Collection<Iterative<'a, G, u64>, Vec<Value>, isize>,
545        ShutdownHandle,
546    ) {
547        (self.tuples, ShutdownHandle::empty())
548    }
549
550    fn projected(
551        self,
552        _nested: &mut Iterative<'a, G, u64>,
553        _context: &mut I,
554        target_variables: &[Var],
555    ) -> (
556        Collection<Iterative<'a, G, u64>, Vec<Value>, isize>,
557        ShutdownHandle,
558    ) {
559        if self.variables() == target_variables {
560            (self.tuples, ShutdownHandle::empty())
561        } else {
562            let relation_variables = self.variables();
563            let target_variables = target_variables.to_vec();
564
565            let tuples = self.tuples.map(move |tuple| {
566                target_variables
567                    .iter()
568                    .map(|x| {
569                        let idx = relation_variables.binds(*x).unwrap();
570                        tuple[idx].clone()
571                    })
572                    .collect()
573            });
574
575            (tuples, ShutdownHandle::empty())
576        }
577    }
578
579    fn tuples_by_variables(
580        self,
581        _nested: &mut Iterative<'a, G, u64>,
582        _context: &mut I,
583        variables: &[Var],
584    ) -> (
585        Collection<Iterative<'a, G, u64>, (Vec<Value>, Vec<Value>), isize>,
586        ShutdownHandle,
587    ) {
588        if variables == &self.variables()[..] {
589            (
590                self.tuples.map(|x| (x, Vec::new())),
591                ShutdownHandle::empty(),
592            )
593        } else if variables.is_empty() {
594            (
595                self.tuples.map(|x| (Vec::new(), x)),
596                ShutdownHandle::empty(),
597            )
598        } else {
599            let key_length = variables.len();
600            let values_length = self.variables().len() - key_length;
601
602            let mut key_offsets: Vec<usize> = Vec::with_capacity(key_length);
603            let mut value_offsets: Vec<usize> = Vec::with_capacity(values_length);
604            let variable_set: HashSet<Var> = variables.iter().cloned().collect();
605
606            // It is important to preserve the key variables in the order
607            // they were specified.
608            for variable in variables.iter() {
609                key_offsets.push(self.binds(*variable).unwrap());
610            }
611
612            // Values we'll just take in the order they were.
613            for (idx, variable) in self.variables().iter().enumerate() {
614                if !variable_set.contains(variable) {
615                    value_offsets.push(idx);
616                }
617            }
618
619            let arranged = self.tuples.map(move |tuple| {
620                let key: Vec<Value> = key_offsets.iter().map(|i| tuple[*i].clone()).collect();
621                // @TODO second clone not really neccessary
622                let values: Vec<Value> = value_offsets
623                    .iter()
624                    .map(move |i| tuple[*i].clone())
625                    .collect();
626
627                (key, values)
628            });
629
630            (arranged, ShutdownHandle::empty())
631        }
632    }
633}
634
635impl<'a, G, I> Relation<'a, G, I> for AttributeBinding
636where
637    G: Scope,
638    G::Timestamp: Lattice + ExchangeData,
639    I: ImplContext<G::Timestamp>,
640{
641    fn tuples(
642        self,
643        nested: &mut Iterative<'a, G, u64>,
644        context: &mut I,
645    ) -> (
646        Collection<Iterative<'a, G, u64>, Vec<Value>, isize>,
647        ShutdownHandle,
648    ) {
649        let variables = self.variables();
650        self.projected(nested, context, &variables)
651    }
652
653    fn projected(
654        self,
655        nested: &mut Iterative<'a, G, u64>,
656        context: &mut I,
657        target_variables: &[Var],
658    ) -> (
659        Collection<Iterative<'a, G, u64>, Vec<Value>, isize>,
660        ShutdownHandle,
661    ) {
662        match context.forward_propose(&self.source_attribute) {
663            None => panic!("attribute {:?} does not exist", self.source_attribute),
664            Some(propose_trace) => {
665                let frontier = propose_trace.advance_frontier().to_vec();
666                let (propose, shutdown_propose) =
667                    propose_trace.import_core(&nested.parent, &self.source_attribute);
668
669                let tuples = propose.enter_at(nested, move |_, _, time| {
670                    let mut forwarded = time.clone();
671                    forwarded.advance_by(&frontier);
672                    Product::new(forwarded, 0)
673                });
674
675                let (e, v) = self.variables;
676                let projected = if target_variables == [e, v] {
677                    tuples.as_collection(|e, v| vec![e.clone(), v.clone()])
678                } else if target_variables == [v, e] {
679                    tuples.as_collection(|e, v| vec![v.clone(), e.clone()])
680                } else if target_variables == [e] {
681                    tuples.as_collection(|e, _v| vec![e.clone()])
682                } else if target_variables == [v] {
683                    tuples.as_collection(|_e, v| vec![v.clone()])
684                } else {
685                    panic!("invalid projection")
686                };
687
688                (projected, ShutdownHandle::from_button(shutdown_propose))
689            }
690        }
691    }
692
693    fn tuples_by_variables(
694        self,
695        nested: &mut Iterative<'a, G, u64>,
696        context: &mut I,
697        variables: &[Var],
698    ) -> (
699        Collection<Iterative<'a, G, u64>, (Vec<Value>, Vec<Value>), isize>,
700        ShutdownHandle,
701    ) {
702        match context.forward_propose(&self.source_attribute) {
703            None => panic!("attribute {:?} does not exist", self.source_attribute),
704            Some(propose_trace) => {
705                let frontier = propose_trace.advance_frontier().to_vec();
706                let (propose, shutdown_propose) =
707                    propose_trace.import_core(&nested.parent, &self.source_attribute);
708
709                let tuples = propose.enter_at(nested, move |_, _, time| {
710                    let mut forwarded = time.clone();
711                    forwarded.advance_by(&frontier);
712                    Product::new(forwarded, 0)
713                });
714
715                let (e, v) = self.variables;
716                let arranged = if variables == [e, v] {
717                    tuples.as_collection(|e, v| (vec![e.clone(), v.clone()], vec![]))
718                } else if variables == [v, e] {
719                    tuples.as_collection(|e, v| (vec![v.clone(), e.clone()], vec![]))
720                } else if variables == [e] {
721                    tuples.as_collection(|e, v| (vec![e.clone()], vec![v.clone()]))
722                } else if variables == [v] {
723                    tuples.as_collection(|e, v| (vec![v.clone()], vec![e.clone()]))
724                } else {
725                    panic!("invalid projection")
726                };
727
728                (arranged, ShutdownHandle::from_button(shutdown_propose))
729            }
730        }
731    }
732}
733
734/// @TODO
735pub enum Implemented<'a, G>
736where
737    G: Scope,
738    G::Timestamp: Lattice + ExchangeData,
739{
740    /// A relation backed by an attribute.
741    Attribute(AttributeBinding),
742    /// A relation backed by a Differential collection.
743    Collection(CollectionRelation<'a, G>),
744    // Arranged(ArrangedRelation<'a, G>)
745}
746
747impl<'a, G: Scope> AsBinding for Implemented<'a, G>
748where
749    G::Timestamp: Lattice + ExchangeData,
750{
751    fn variables(&self) -> Vec<Var> {
752        match self {
753            Implemented::Attribute(attribute_binding) => attribute_binding.variables(),
754            Implemented::Collection(relation) => relation.variables(),
755        }
756    }
757
758    fn binds(&self, variable: Var) -> Option<usize> {
759        match self {
760            Implemented::Attribute(attribute_binding) => attribute_binding.binds(variable),
761            Implemented::Collection(relation) => relation.binds(variable),
762        }
763    }
764
765    fn ready_to_extend(&self, prefix: &AsBinding) -> Option<Var> {
766        match self {
767            Implemented::Attribute(attribute_binding) => attribute_binding.ready_to_extend(prefix),
768            Implemented::Collection(relation) => relation.ready_to_extend(prefix),
769        }
770    }
771
772    fn required_to_extend(&self, prefix: &AsBinding, target: Var) -> Option<Option<Var>> {
773        match self {
774            Implemented::Attribute(attribute_binding) => {
775                attribute_binding.required_to_extend(prefix, target)
776            }
777            Implemented::Collection(relation) => relation.required_to_extend(prefix, target),
778        }
779    }
780}
781
782impl<'a, G, I> Relation<'a, G, I> for Implemented<'a, G>
783where
784    G: Scope,
785    G::Timestamp: Lattice + ExchangeData,
786    I: ImplContext<G::Timestamp>,
787{
788    fn tuples(
789        self,
790        nested: &mut Iterative<'a, G, u64>,
791        context: &mut I,
792    ) -> (
793        Collection<Iterative<'a, G, u64>, Vec<Value>, isize>,
794        ShutdownHandle,
795    ) {
796        match self {
797            Implemented::Attribute(attribute_binding) => attribute_binding.tuples(nested, context),
798            Implemented::Collection(relation) => relation.tuples(nested, context),
799        }
800    }
801
802    fn projected(
803        self,
804        nested: &mut Iterative<'a, G, u64>,
805        context: &mut I,
806        target_variables: &[Var],
807    ) -> (
808        Collection<Iterative<'a, G, u64>, Vec<Value>, isize>,
809        ShutdownHandle,
810    ) {
811        match self {
812            Implemented::Attribute(attribute_binding) => {
813                attribute_binding.projected(nested, context, target_variables)
814            }
815            Implemented::Collection(relation) => {
816                relation.projected(nested, context, target_variables)
817            }
818        }
819    }
820
821    fn tuples_by_variables(
822        self,
823        nested: &mut Iterative<'a, G, u64>,
824        context: &mut I,
825        variables: &[Var],
826    ) -> (
827        Collection<Iterative<'a, G, u64>, (Vec<Value>, Vec<Value>), isize>,
828        ShutdownHandle,
829    ) {
830        match self {
831            Implemented::Attribute(attribute_binding) => {
832                attribute_binding.tuples_by_variables(nested, context, variables)
833            }
834            Implemented::Collection(relation) => {
835                relation.tuples_by_variables(nested, context, variables)
836            }
837        }
838    }
839}
840
841// /// A arrangement and variable bindings.
842// struct ArrangedRelation<'a, G: Scope>
843// where
844//     G::Timestamp: Lattice+ExchangeData
845// {
846//     variables: Vec<Var>,
847//     tuples: Arranged<Iterative<'a, G, u64>, Vec<Value>, Vec<Value>, isize,
848//                      TraceValHandle<Vec<Value>, Vec<Value>, Product<G::Timestamp,u64>, isize>>,
849// }
850
851/// Helper function to create a query plan. The resulting query will
852/// provide values for the requested target variables, under the
853/// constraints expressed by the bindings provided.
854pub fn q(target_variables: Vec<Var>, bindings: Vec<Binding>) -> Plan {
855    Plan::Hector(Hector {
856        variables: target_variables,
857        bindings,
858    })
859}
860
861/// Returns a deduplicates list of all rules used in the definition of
862/// the specified names. Includes the specified names.
863pub fn collect_dependencies<T, I>(context: &I, names: &[&str]) -> Result<Vec<Rule>, Error>
864where
865    T: Timestamp + Lattice,
866    I: ImplContext<T>,
867{
868    let mut seen = HashSet::new();
869    let mut rules = Vec::new();
870    let mut queue = VecDeque::new();
871
872    for name in names {
873        match context.rule(name) {
874            None => {
875                return Err(Error::not_found(format!("Unknown rule {}.", name)));
876            }
877            Some(rule) => {
878                seen.insert(name.to_string());
879                queue.push_back(rule.clone());
880            }
881        }
882    }
883
884    while let Some(next) = queue.pop_front() {
885        let dependencies = next.plan.dependencies();
886        for dep_name in dependencies.names.iter() {
887            if !seen.contains(dep_name) {
888                match context.rule(dep_name) {
889                    None => {
890                        return Err(Error::not_found(format!("Unknown rule {}", dep_name)));
891                    }
892                    Some(rule) => {
893                        seen.insert(dep_name.to_string());
894                        queue.push_back(rule.clone());
895                    }
896                }
897            }
898        }
899
900        // Ensure all required attributes exist.
901        for aid in dependencies.attributes.iter() {
902            if !context.has_attribute(aid) {
903                return Err(Error::not_found(format!(
904                    "Rule depends on unknown attribute {}",
905                    aid
906                )));
907            }
908        }
909
910        rules.push(next);
911    }
912
913    Ok(rules)
914}
915
916/// Takes a query plan and turns it into a differential dataflow.
917pub fn implement<T, I, S>(
918    name: &str,
919    scope: &mut S,
920    context: &mut I,
921) -> Result<
922    (
923        HashMap<String, Collection<S, Vec<Value>, isize>>,
924        ShutdownHandle,
925    ),
926    Error,
927>
928where
929    T: Timestamp + Lattice + Default,
930    I: ImplContext<T>,
931    S: Scope<Timestamp = T>,
932{
933    scope.iterative::<u64, _, _>(|nested| {
934        let publish = vec![name];
935        let mut rules = collect_dependencies(&*context, &publish[..])?;
936
937        let mut local_arrangements = VariableMap::new();
938        let mut result_map = HashMap::new();
939
940        // Step 0: Canonicalize, check uniqueness of bindings.
941        if rules.is_empty() {
942            return Err(Error::not_found(format!(
943                "Couldn't find any rules for name {}.",
944                name
945            )));
946        }
947
948        rules.sort_by(|x, y| x.name.cmp(&y.name));
949        for index in 1..rules.len() - 1 {
950            if rules[index].name == rules[index - 1].name {
951                return Err(Error::conflict(format!(
952                    "Duplicate rule definitions for rule {}",
953                    rules[index].name
954                )));
955            }
956        }
957
958        // Step 1: Create new recursive variables for each rule.
959        for rule in rules.iter() {
960            if context.is_underconstrained(&rule.name) {
961                local_arrangements.insert(
962                    rule.name.clone(),
963                    Variable::new(nested, Product::new(Default::default(), 1)),
964                );
965            }
966        }
967
968        // Step 2: Create public arrangements for published relations.
969        for name in publish.into_iter() {
970            if let Some(relation) = local_arrangements.get(name) {
971                result_map.insert(name.to_string(), relation.leave());
972            } else {
973                return Err(Error::not_found(format!(
974                    "Attempted to publish undefined name {}.",
975                    name
976                )));
977            }
978        }
979
980        // Step 3: Define the executions for each rule.
981        let mut executions = Vec::with_capacity(rules.len());
982        let mut shutdown_handle = ShutdownHandle::empty();
983        for rule in rules.iter() {
984            info!("planning {:?}", rule.name);
985            let (relation, shutdown) = rule.plan.implement(nested, &local_arrangements, context);
986
987            executions.push(relation);
988            shutdown_handle.merge_with(shutdown);
989        }
990
991        // Step 4: Complete named relations in a specific order (sorted by name).
992        for (rule, execution) in rules.iter().zip(executions.drain(..)) {
993            match local_arrangements.remove(&rule.name) {
994                None => {
995                    return Err(Error::not_found(format!(
996                        "Rule {} should be in local arrangements, but isn't.",
997                        &rule.name
998                    )));
999                }
1000                Some(variable) => {
1001                    let (tuples, shutdown) = execution.tuples(nested, context);
1002                    shutdown_handle.merge_with(shutdown);
1003
1004                    #[cfg(feature = "set-semantics")]
1005                    variable.set(&tuples.distinct());
1006
1007                    #[cfg(not(feature = "set-semantics"))]
1008                    variable.set(&tuples.consolidate());
1009                }
1010            }
1011        }
1012
1013        Ok((result_map, shutdown_handle))
1014    })
1015}
1016
1017/// @TODO
1018pub fn implement_neu<T, I, S>(
1019    name: &str,
1020    scope: &mut S,
1021    context: &mut I,
1022) -> Result<
1023    (
1024        HashMap<String, Collection<S, Vec<Value>, isize>>,
1025        ShutdownHandle,
1026    ),
1027    Error,
1028>
1029where
1030    T: Timestamp + Lattice + Default,
1031    I: ImplContext<T>,
1032    S: Scope<Timestamp = T>,
1033{
1034    scope.iterative::<u64, _, _>(move |nested| {
1035        let publish = vec![name];
1036        let mut rules = collect_dependencies(&*context, &publish[..])?;
1037
1038        let mut local_arrangements = VariableMap::new();
1039        let mut result_map = HashMap::new();
1040
1041        // Step 0: Canonicalize, check uniqueness of bindings.
1042        if rules.is_empty() {
1043            return Err(Error::not_found(format!(
1044                "Couldn't find any rules for name {}.",
1045                name
1046            )));
1047        }
1048
1049        rules.sort_by(|x, y| x.name.cmp(&y.name));
1050        for index in 1..rules.len() - 1 {
1051            if rules[index].name == rules[index - 1].name {
1052                return Err(Error::conflict(format!(
1053                    "Duplicate rule definitions for rule {}",
1054                    rules[index].name
1055                )));
1056            }
1057        }
1058
1059        // @TODO at this point we need to know about...
1060        // @TODO ... which rules require recursion (and thus need wrapping in a Variable)
1061        // @TODO ... which rules are supposed to be re-used
1062        // @TODO ... which rules are supposed to be re-synthesized
1063        //
1064        // but based entirely on control data written to the server by something external
1065        // (for the old implement it could just be a decision based on whether the rule has a namespace)
1066
1067        // Step 1: Create new recursive variables for each rule.
1068        for name in publish.iter() {
1069            if context.is_underconstrained(name) {
1070                local_arrangements.insert(
1071                    name.to_string(),
1072                    Variable::new(nested, Product::new(Default::default(), 1)),
1073                );
1074            }
1075        }
1076
1077        // Step 2: Create public arrangements for published relations.
1078        for name in publish.into_iter() {
1079            if let Some(relation) = local_arrangements.get(name) {
1080                result_map.insert(name.to_string(), relation.leave());
1081            } else {
1082                return Err(Error::not_found(format!(
1083                    "Attempted to publish undefined name {}.",
1084                    name
1085                )));
1086            }
1087        }
1088
1089        // Step 3: Define the executions for each rule.
1090        let mut executions = Vec::with_capacity(rules.len());
1091        let mut shutdown_handle = ShutdownHandle::empty();
1092        for rule in rules.iter() {
1093            info!("neu_planning {:?}", rule.name);
1094
1095            let plan = q(rule.plan.variables(), rule.plan.into_bindings());
1096
1097            let (relation, shutdown) = plan.implement(nested, &local_arrangements, context);
1098
1099            executions.push(relation);
1100            shutdown_handle.merge_with(shutdown);
1101        }
1102
1103        // Step 4: Complete named relations in a specific order (sorted by name).
1104        for (rule, execution) in rules.iter().zip(executions.drain(..)) {
1105            match local_arrangements.remove(&rule.name) {
1106                None => {
1107                    return Err(Error::not_found(format!(
1108                        "Rule {} should be in local arrangements, but isn't.",
1109                        &rule.name
1110                    )));
1111                }
1112                Some(variable) => {
1113                    let (tuples, shutdown) = execution.tuples(nested, context);
1114                    shutdown_handle.merge_with(shutdown);
1115
1116                    #[cfg(feature = "set-semantics")]
1117                    variable.set(&tuples.distinct());
1118
1119                    #[cfg(not(feature = "set-semantics"))]
1120                    variable.set(&tuples.consolidate());
1121                }
1122            }
1123        }
1124
1125        Ok((result_map, shutdown_handle))
1126    })
1127}