1#![forbid(missing_docs)]
8
9#[macro_use]
10extern crate log;
11#[macro_use]
12extern crate serde_derive;
13
14pub mod binding;
15pub mod domain;
16pub mod logging;
17pub mod operators;
18pub mod plan;
19pub mod server;
20pub mod sinks;
21pub mod sources;
22pub mod timestamp;
23
24use std::collections::{HashMap, HashSet, VecDeque};
25use std::time::Duration;
26
27use timely::dataflow::operators::CapabilitySet;
28use timely::dataflow::scopes::child::Iterative;
29use timely::dataflow::*;
30use timely::order::Product;
31use timely::progress::Timestamp;
32
33use differential_dataflow::lattice::Lattice;
34use differential_dataflow::operators::arrange::{ShutdownButton, TraceAgent};
35use differential_dataflow::operators::iterate::Variable;
36#[cfg(not(feature = "set-semantics"))]
37use differential_dataflow::operators::Consolidate;
38#[cfg(feature = "set-semantics")]
39use differential_dataflow::operators::Threshold;
40use differential_dataflow::trace::implementations::ord::{OrdKeySpine, OrdValSpine};
41use differential_dataflow::trace::TraceReader;
42use differential_dataflow::{Collection, ExchangeData};
43
44#[cfg(feature = "uuid")]
45pub use uuid::Uuid;
46
47pub use num_rational::Rational32;
48
49pub use binding::{AsBinding, AttributeBinding, Binding};
50pub use plan::{Hector, ImplContext, Implementable, Plan};
51pub use timestamp::{Rewind, Time};
52
53pub type Eid = u64;
55
56pub type Aid = String; #[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
64pub enum Value {
65 Aid(Aid),
67 String(String),
69 Bool(bool),
71 Number(i64),
73 Rational32(Rational32),
75 Eid(Eid),
77 Instant(u64),
79 #[cfg(feature = "uuid")]
81 Uuid(Uuid),
82 #[cfg(feature = "real")]
84 Real(fixed::types::I16F16),
85}
86
87impl Value {
88 pub fn aid(v: &str) -> Self {
90 Value::Aid(v.to_string())
91 }
92
93 #[cfg(feature = "uuid")]
95 pub fn uuid_str(v: &str) -> Self {
96 let uuid = Uuid::parse_str(v).expect("failed to parse UUID");
97 Value::Uuid(uuid)
98 }
99}
100
101impl std::convert::From<&str> for Value {
102 fn from(v: &str) -> Self {
103 Value::String(v.to_string())
104 }
105}
106
107#[cfg(feature = "real")]
108impl std::convert::From<f64> for Value {
109 fn from(v: f64) -> Self {
110 let real =
111 fixed::types::I16F16::checked_from_float(v).expect("failed to convert to I16F16");
112
113 Value::Real(real)
114 }
115}
116
117#[cfg(feature = "serde_json")]
118impl std::convert::From<Value> for serde_json::Value {
119 fn from(v: Value) -> Self {
120 match v {
121 Value::Eid(v) => serde_json::Value::String(v.to_string()),
122 Value::Aid(v) => serde_json::Value::String(v),
123 Value::String(v) => serde_json::Value::String(v),
124 Value::Bool(v) => serde_json::Value::Bool(v),
125 Value::Number(v) => serde_json::Value::Number(serde_json::Number::from(v)),
126 _ => unimplemented!(),
127 }
128 }
129}
130
131impl std::convert::From<Value> for Eid {
132 fn from(v: Value) -> Eid {
133 if let Value::Eid(eid) = v {
134 eid
135 } else {
136 panic!("Value {:?} can't be converted to Eid", v);
137 }
138 }
139}
140
141#[derive(Clone, Debug, Serialize, Deserialize)]
143pub struct Error {
144 #[serde(rename = "df.error/category")]
146 pub category: String,
147 #[serde(rename = "df.error/message")]
149 pub message: String,
150}
151
152impl Error {
153 pub fn incorrect<E: std::string::ToString>(error: E) -> Error {
155 Error {
156 category: "df.error.category/incorrect".to_string(),
157 message: error.to_string(),
158 }
159 }
160
161 pub fn not_found<E: std::string::ToString>(error: E) -> Error {
163 Error {
164 category: "df.error.category/not-found".to_string(),
165 message: error.to_string(),
166 }
167 }
168
169 pub fn conflict<E: std::string::ToString>(error: E) -> Error {
171 Error {
172 category: "df.error.category/conflict".to_string(),
173 message: error.to_string(),
174 }
175 }
176
177 pub fn fault<E: std::string::ToString>(error: E) -> Error {
179 Error {
180 category: "df.error.category/fault".to_string(),
181 message: error.to_string(),
182 }
183 }
184
185 pub fn unsupported<E: std::string::ToString>(error: E) -> Error {
187 Error {
188 category: "df.error.category/unsupported".to_string(),
189 message: error.to_string(),
190 }
191 }
192}
193
194#[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
196pub struct TxData(pub isize, pub Value, pub Aid, pub Value, pub Option<Time>);
197
198impl TxData {
199 pub fn add(e: Eid, a: &str, v: Value) -> Self {
201 TxData(1, Value::Eid(e), a.to_string(), v, None)
202 }
203
204 pub fn add_at(e: Eid, a: &str, v: Value, t: Time) -> Self {
207 TxData(1, Value::Eid(e), a.to_string(), v, Some(t))
208 }
209
210 pub fn retract(e: Eid, a: &str, v: Value) -> Self {
212 TxData(-1, Value::Eid(e), a.to_string(), v, None)
213 }
214
215 pub fn retract_at(e: Eid, a: &str, v: Value, t: Time) -> Self {
218 TxData(-1, Value::Eid(e), a.to_string(), v, Some(t))
219 }
220}
221
222pub type ResultDiff<T> = (Vec<Value>, T, isize);
224
225pub type Client = usize;
227
228#[derive(Clone, Debug, Serialize, Deserialize)]
230pub enum Output {
231 QueryDiff(String, Vec<ResultDiff<Time>>),
234 #[cfg(feature = "serde_json")]
236 Json(String, serde_json::Value, Time, isize),
237 #[cfg(feature = "serde_json")]
239 Message(Client, serde_json::Value),
240 Error(Client, Error, server::TxId),
242}
243
244pub type TraceKeyHandle<K, T, R> = TraceAgent<OrdKeySpine<K, T, R>>;
246
247pub type TraceValHandle<K, V, T, R> = TraceAgent<OrdValSpine<K, V, T, R>>;
249
250pub type RelationHandle<T> = TraceKeyHandle<Vec<Value>, T, isize>;
252
253type VariableMap<G> = HashMap<String, Variable<G, Vec<Value>, isize>>;
256
257trait Shutdownable {
258 fn press(&mut self);
259}
260
261impl<T> Shutdownable for ShutdownButton<T> {
262 #[inline(always)]
263 fn press(&mut self) {
264 self.press();
265 }
266}
267
268pub struct ShutdownHandle {
271 shutdown_buttons: Vec<Box<dyn Shutdownable>>,
272}
273
274impl Drop for ShutdownHandle {
275 fn drop(&mut self) {
276 for mut button in self.shutdown_buttons.drain(..) {
277 trace!("pressing shutdown button");
278 button.press();
279 }
280 }
281}
282
283impl ShutdownHandle {
284 pub fn empty() -> Self {
286 ShutdownHandle {
287 shutdown_buttons: Vec::new(),
288 }
289 }
290
291 pub fn from_button<T: Timestamp>(button: ShutdownButton<CapabilitySet<T>>) -> Self {
293 ShutdownHandle {
294 shutdown_buttons: vec![Box::new(button)],
295 }
296 }
297
298 pub fn add_button<T: Timestamp>(&mut self, button: ShutdownButton<CapabilitySet<T>>) {
302 self.shutdown_buttons.push(Box::new(button));
303 }
304
305 pub fn merge_with(&mut self, mut other: Self) {
307 self.shutdown_buttons.append(&mut other.shutdown_buttons);
308 }
309
310 pub fn merge(mut left: Self, mut right: Self) -> Self {
313 let mut shutdown_buttons =
314 Vec::with_capacity(left.shutdown_buttons.len() + right.shutdown_buttons.len());
315 shutdown_buttons.append(&mut left.shutdown_buttons);
316 shutdown_buttons.append(&mut right.shutdown_buttons);
317
318 ShutdownHandle { shutdown_buttons }
319 }
320}
321
322#[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
325pub enum InputSemantics {
326 Raw,
329 CardinalityOne,
331 CardinalityMany,
334 }
337
338#[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
344pub enum IndexDirection {
345 Forward,
347 Both,
349}
350
351#[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
354pub enum QuerySupport {
355 Basic = 0,
358 Delta = 1,
361 AdaptiveWCO = 2,
365}
366
367#[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
369pub struct AttributeConfig {
370 pub input_semantics: InputSemantics,
373 pub trace_slack: Option<Time>,
376 pub index_direction: IndexDirection,
378 pub query_support: QuerySupport,
380 pub timeless: bool,
384}
385
386impl Default for AttributeConfig {
387 fn default() -> Self {
388 AttributeConfig {
389 input_semantics: InputSemantics::Raw,
390 trace_slack: None,
391 index_direction: IndexDirection::Forward,
392 query_support: QuerySupport::Basic,
393 timeless: false,
394 }
395 }
396}
397
398impl AttributeConfig {
399 pub fn tx_time(input_semantics: InputSemantics) -> Self {
403 AttributeConfig {
404 input_semantics,
405 trace_slack: Some(Time::TxId(1)),
410 ..Default::default()
411 }
412 }
413
414 pub fn real_time(input_semantics: InputSemantics) -> Self {
418 AttributeConfig {
419 input_semantics,
420 trace_slack: Some(Time::Real(Duration::from_secs(0))),
421 ..Default::default()
422 }
423 }
424
425 pub fn uncompacted(input_semantics: InputSemantics) -> Self {
428 AttributeConfig {
429 input_semantics,
430 trace_slack: None,
431 ..Default::default()
432 }
433 }
434}
435
436#[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Debug)]
438pub struct RelationConfig {
439 pub trace_slack: Option<Time>,
442}
443
444type Var = u32;
446
447#[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
449pub struct Rule {
450 pub name: String,
452 pub plan: Plan,
454}
455
456trait Relation<'a, G, I>: AsBinding
462where
463 G: Scope,
464 G::Timestamp: Lattice + ExchangeData,
465 I: ImplContext<G::Timestamp>,
466{
467 fn tuples(
469 self,
470 nested: &mut Iterative<'a, G, u64>,
471 context: &mut I,
472 ) -> (
473 Collection<Iterative<'a, G, u64>, Vec<Value>, isize>,
474 ShutdownHandle,
475 );
476
477 fn projected(
480 self,
481 nested: &mut Iterative<'a, G, u64>,
482 context: &mut I,
483 target_variables: &[Var],
484 ) -> (
485 Collection<Iterative<'a, G, u64>, Vec<Value>, isize>,
486 ShutdownHandle,
487 );
488
489 fn tuples_by_variables(
496 self,
497 nested: &mut Iterative<'a, G, u64>,
498 context: &mut I,
499 variables: &[Var],
500 ) -> (
501 Collection<Iterative<'a, G, u64>, (Vec<Value>, Vec<Value>), isize>,
502 ShutdownHandle,
503 );
504}
505
506pub struct CollectionRelation<'a, G: Scope> {
508 variables: Vec<Var>,
509 tuples: Collection<Iterative<'a, G, u64>, Vec<Value>, isize>,
510}
511
512impl<'a, G: Scope> AsBinding for CollectionRelation<'a, G>
513where
514 G::Timestamp: Lattice + ExchangeData,
515{
516 fn variables(&self) -> Vec<Var> {
517 self.variables.clone()
518 }
519
520 fn binds(&self, variable: Var) -> Option<usize> {
521 self.variables.binds(variable)
522 }
523
524 fn ready_to_extend(&self, _prefix: &AsBinding) -> Option<Var> {
525 unimplemented!();
526 }
527
528 fn required_to_extend(&self, _prefix: &AsBinding, _target: Var) -> Option<Option<Var>> {
529 unimplemented!();
530 }
531}
532
533impl<'a, G, I> Relation<'a, G, I> for CollectionRelation<'a, G>
534where
535 G: Scope,
536 G::Timestamp: Lattice + ExchangeData,
537 I: ImplContext<G::Timestamp>,
538{
539 fn tuples(
540 self,
541 _nested: &mut Iterative<'a, G, u64>,
542 _context: &mut I,
543 ) -> (
544 Collection<Iterative<'a, G, u64>, Vec<Value>, isize>,
545 ShutdownHandle,
546 ) {
547 (self.tuples, ShutdownHandle::empty())
548 }
549
550 fn projected(
551 self,
552 _nested: &mut Iterative<'a, G, u64>,
553 _context: &mut I,
554 target_variables: &[Var],
555 ) -> (
556 Collection<Iterative<'a, G, u64>, Vec<Value>, isize>,
557 ShutdownHandle,
558 ) {
559 if self.variables() == target_variables {
560 (self.tuples, ShutdownHandle::empty())
561 } else {
562 let relation_variables = self.variables();
563 let target_variables = target_variables.to_vec();
564
565 let tuples = self.tuples.map(move |tuple| {
566 target_variables
567 .iter()
568 .map(|x| {
569 let idx = relation_variables.binds(*x).unwrap();
570 tuple[idx].clone()
571 })
572 .collect()
573 });
574
575 (tuples, ShutdownHandle::empty())
576 }
577 }
578
579 fn tuples_by_variables(
580 self,
581 _nested: &mut Iterative<'a, G, u64>,
582 _context: &mut I,
583 variables: &[Var],
584 ) -> (
585 Collection<Iterative<'a, G, u64>, (Vec<Value>, Vec<Value>), isize>,
586 ShutdownHandle,
587 ) {
588 if variables == &self.variables()[..] {
589 (
590 self.tuples.map(|x| (x, Vec::new())),
591 ShutdownHandle::empty(),
592 )
593 } else if variables.is_empty() {
594 (
595 self.tuples.map(|x| (Vec::new(), x)),
596 ShutdownHandle::empty(),
597 )
598 } else {
599 let key_length = variables.len();
600 let values_length = self.variables().len() - key_length;
601
602 let mut key_offsets: Vec<usize> = Vec::with_capacity(key_length);
603 let mut value_offsets: Vec<usize> = Vec::with_capacity(values_length);
604 let variable_set: HashSet<Var> = variables.iter().cloned().collect();
605
606 for variable in variables.iter() {
609 key_offsets.push(self.binds(*variable).unwrap());
610 }
611
612 for (idx, variable) in self.variables().iter().enumerate() {
614 if !variable_set.contains(variable) {
615 value_offsets.push(idx);
616 }
617 }
618
619 let arranged = self.tuples.map(move |tuple| {
620 let key: Vec<Value> = key_offsets.iter().map(|i| tuple[*i].clone()).collect();
621 let values: Vec<Value> = value_offsets
623 .iter()
624 .map(move |i| tuple[*i].clone())
625 .collect();
626
627 (key, values)
628 });
629
630 (arranged, ShutdownHandle::empty())
631 }
632 }
633}
634
635impl<'a, G, I> Relation<'a, G, I> for AttributeBinding
636where
637 G: Scope,
638 G::Timestamp: Lattice + ExchangeData,
639 I: ImplContext<G::Timestamp>,
640{
641 fn tuples(
642 self,
643 nested: &mut Iterative<'a, G, u64>,
644 context: &mut I,
645 ) -> (
646 Collection<Iterative<'a, G, u64>, Vec<Value>, isize>,
647 ShutdownHandle,
648 ) {
649 let variables = self.variables();
650 self.projected(nested, context, &variables)
651 }
652
653 fn projected(
654 self,
655 nested: &mut Iterative<'a, G, u64>,
656 context: &mut I,
657 target_variables: &[Var],
658 ) -> (
659 Collection<Iterative<'a, G, u64>, Vec<Value>, isize>,
660 ShutdownHandle,
661 ) {
662 match context.forward_propose(&self.source_attribute) {
663 None => panic!("attribute {:?} does not exist", self.source_attribute),
664 Some(propose_trace) => {
665 let frontier = propose_trace.advance_frontier().to_vec();
666 let (propose, shutdown_propose) =
667 propose_trace.import_core(&nested.parent, &self.source_attribute);
668
669 let tuples = propose.enter_at(nested, move |_, _, time| {
670 let mut forwarded = time.clone();
671 forwarded.advance_by(&frontier);
672 Product::new(forwarded, 0)
673 });
674
675 let (e, v) = self.variables;
676 let projected = if target_variables == [e, v] {
677 tuples.as_collection(|e, v| vec![e.clone(), v.clone()])
678 } else if target_variables == [v, e] {
679 tuples.as_collection(|e, v| vec![v.clone(), e.clone()])
680 } else if target_variables == [e] {
681 tuples.as_collection(|e, _v| vec![e.clone()])
682 } else if target_variables == [v] {
683 tuples.as_collection(|_e, v| vec![v.clone()])
684 } else {
685 panic!("invalid projection")
686 };
687
688 (projected, ShutdownHandle::from_button(shutdown_propose))
689 }
690 }
691 }
692
693 fn tuples_by_variables(
694 self,
695 nested: &mut Iterative<'a, G, u64>,
696 context: &mut I,
697 variables: &[Var],
698 ) -> (
699 Collection<Iterative<'a, G, u64>, (Vec<Value>, Vec<Value>), isize>,
700 ShutdownHandle,
701 ) {
702 match context.forward_propose(&self.source_attribute) {
703 None => panic!("attribute {:?} does not exist", self.source_attribute),
704 Some(propose_trace) => {
705 let frontier = propose_trace.advance_frontier().to_vec();
706 let (propose, shutdown_propose) =
707 propose_trace.import_core(&nested.parent, &self.source_attribute);
708
709 let tuples = propose.enter_at(nested, move |_, _, time| {
710 let mut forwarded = time.clone();
711 forwarded.advance_by(&frontier);
712 Product::new(forwarded, 0)
713 });
714
715 let (e, v) = self.variables;
716 let arranged = if variables == [e, v] {
717 tuples.as_collection(|e, v| (vec![e.clone(), v.clone()], vec![]))
718 } else if variables == [v, e] {
719 tuples.as_collection(|e, v| (vec![v.clone(), e.clone()], vec![]))
720 } else if variables == [e] {
721 tuples.as_collection(|e, v| (vec![e.clone()], vec![v.clone()]))
722 } else if variables == [v] {
723 tuples.as_collection(|e, v| (vec![v.clone()], vec![e.clone()]))
724 } else {
725 panic!("invalid projection")
726 };
727
728 (arranged, ShutdownHandle::from_button(shutdown_propose))
729 }
730 }
731 }
732}
733
734pub enum Implemented<'a, G>
736where
737 G: Scope,
738 G::Timestamp: Lattice + ExchangeData,
739{
740 Attribute(AttributeBinding),
742 Collection(CollectionRelation<'a, G>),
744 }
746
747impl<'a, G: Scope> AsBinding for Implemented<'a, G>
748where
749 G::Timestamp: Lattice + ExchangeData,
750{
751 fn variables(&self) -> Vec<Var> {
752 match self {
753 Implemented::Attribute(attribute_binding) => attribute_binding.variables(),
754 Implemented::Collection(relation) => relation.variables(),
755 }
756 }
757
758 fn binds(&self, variable: Var) -> Option<usize> {
759 match self {
760 Implemented::Attribute(attribute_binding) => attribute_binding.binds(variable),
761 Implemented::Collection(relation) => relation.binds(variable),
762 }
763 }
764
765 fn ready_to_extend(&self, prefix: &AsBinding) -> Option<Var> {
766 match self {
767 Implemented::Attribute(attribute_binding) => attribute_binding.ready_to_extend(prefix),
768 Implemented::Collection(relation) => relation.ready_to_extend(prefix),
769 }
770 }
771
772 fn required_to_extend(&self, prefix: &AsBinding, target: Var) -> Option<Option<Var>> {
773 match self {
774 Implemented::Attribute(attribute_binding) => {
775 attribute_binding.required_to_extend(prefix, target)
776 }
777 Implemented::Collection(relation) => relation.required_to_extend(prefix, target),
778 }
779 }
780}
781
782impl<'a, G, I> Relation<'a, G, I> for Implemented<'a, G>
783where
784 G: Scope,
785 G::Timestamp: Lattice + ExchangeData,
786 I: ImplContext<G::Timestamp>,
787{
788 fn tuples(
789 self,
790 nested: &mut Iterative<'a, G, u64>,
791 context: &mut I,
792 ) -> (
793 Collection<Iterative<'a, G, u64>, Vec<Value>, isize>,
794 ShutdownHandle,
795 ) {
796 match self {
797 Implemented::Attribute(attribute_binding) => attribute_binding.tuples(nested, context),
798 Implemented::Collection(relation) => relation.tuples(nested, context),
799 }
800 }
801
802 fn projected(
803 self,
804 nested: &mut Iterative<'a, G, u64>,
805 context: &mut I,
806 target_variables: &[Var],
807 ) -> (
808 Collection<Iterative<'a, G, u64>, Vec<Value>, isize>,
809 ShutdownHandle,
810 ) {
811 match self {
812 Implemented::Attribute(attribute_binding) => {
813 attribute_binding.projected(nested, context, target_variables)
814 }
815 Implemented::Collection(relation) => {
816 relation.projected(nested, context, target_variables)
817 }
818 }
819 }
820
821 fn tuples_by_variables(
822 self,
823 nested: &mut Iterative<'a, G, u64>,
824 context: &mut I,
825 variables: &[Var],
826 ) -> (
827 Collection<Iterative<'a, G, u64>, (Vec<Value>, Vec<Value>), isize>,
828 ShutdownHandle,
829 ) {
830 match self {
831 Implemented::Attribute(attribute_binding) => {
832 attribute_binding.tuples_by_variables(nested, context, variables)
833 }
834 Implemented::Collection(relation) => {
835 relation.tuples_by_variables(nested, context, variables)
836 }
837 }
838 }
839}
840
841pub fn q(target_variables: Vec<Var>, bindings: Vec<Binding>) -> Plan {
855 Plan::Hector(Hector {
856 variables: target_variables,
857 bindings,
858 })
859}
860
861pub fn collect_dependencies<T, I>(context: &I, names: &[&str]) -> Result<Vec<Rule>, Error>
864where
865 T: Timestamp + Lattice,
866 I: ImplContext<T>,
867{
868 let mut seen = HashSet::new();
869 let mut rules = Vec::new();
870 let mut queue = VecDeque::new();
871
872 for name in names {
873 match context.rule(name) {
874 None => {
875 return Err(Error::not_found(format!("Unknown rule {}.", name)));
876 }
877 Some(rule) => {
878 seen.insert(name.to_string());
879 queue.push_back(rule.clone());
880 }
881 }
882 }
883
884 while let Some(next) = queue.pop_front() {
885 let dependencies = next.plan.dependencies();
886 for dep_name in dependencies.names.iter() {
887 if !seen.contains(dep_name) {
888 match context.rule(dep_name) {
889 None => {
890 return Err(Error::not_found(format!("Unknown rule {}", dep_name)));
891 }
892 Some(rule) => {
893 seen.insert(dep_name.to_string());
894 queue.push_back(rule.clone());
895 }
896 }
897 }
898 }
899
900 for aid in dependencies.attributes.iter() {
902 if !context.has_attribute(aid) {
903 return Err(Error::not_found(format!(
904 "Rule depends on unknown attribute {}",
905 aid
906 )));
907 }
908 }
909
910 rules.push(next);
911 }
912
913 Ok(rules)
914}
915
916pub fn implement<T, I, S>(
918 name: &str,
919 scope: &mut S,
920 context: &mut I,
921) -> Result<
922 (
923 HashMap<String, Collection<S, Vec<Value>, isize>>,
924 ShutdownHandle,
925 ),
926 Error,
927>
928where
929 T: Timestamp + Lattice + Default,
930 I: ImplContext<T>,
931 S: Scope<Timestamp = T>,
932{
933 scope.iterative::<u64, _, _>(|nested| {
934 let publish = vec![name];
935 let mut rules = collect_dependencies(&*context, &publish[..])?;
936
937 let mut local_arrangements = VariableMap::new();
938 let mut result_map = HashMap::new();
939
940 if rules.is_empty() {
942 return Err(Error::not_found(format!(
943 "Couldn't find any rules for name {}.",
944 name
945 )));
946 }
947
948 rules.sort_by(|x, y| x.name.cmp(&y.name));
949 for index in 1..rules.len() - 1 {
950 if rules[index].name == rules[index - 1].name {
951 return Err(Error::conflict(format!(
952 "Duplicate rule definitions for rule {}",
953 rules[index].name
954 )));
955 }
956 }
957
958 for rule in rules.iter() {
960 if context.is_underconstrained(&rule.name) {
961 local_arrangements.insert(
962 rule.name.clone(),
963 Variable::new(nested, Product::new(Default::default(), 1)),
964 );
965 }
966 }
967
968 for name in publish.into_iter() {
970 if let Some(relation) = local_arrangements.get(name) {
971 result_map.insert(name.to_string(), relation.leave());
972 } else {
973 return Err(Error::not_found(format!(
974 "Attempted to publish undefined name {}.",
975 name
976 )));
977 }
978 }
979
980 let mut executions = Vec::with_capacity(rules.len());
982 let mut shutdown_handle = ShutdownHandle::empty();
983 for rule in rules.iter() {
984 info!("planning {:?}", rule.name);
985 let (relation, shutdown) = rule.plan.implement(nested, &local_arrangements, context);
986
987 executions.push(relation);
988 shutdown_handle.merge_with(shutdown);
989 }
990
991 for (rule, execution) in rules.iter().zip(executions.drain(..)) {
993 match local_arrangements.remove(&rule.name) {
994 None => {
995 return Err(Error::not_found(format!(
996 "Rule {} should be in local arrangements, but isn't.",
997 &rule.name
998 )));
999 }
1000 Some(variable) => {
1001 let (tuples, shutdown) = execution.tuples(nested, context);
1002 shutdown_handle.merge_with(shutdown);
1003
1004 #[cfg(feature = "set-semantics")]
1005 variable.set(&tuples.distinct());
1006
1007 #[cfg(not(feature = "set-semantics"))]
1008 variable.set(&tuples.consolidate());
1009 }
1010 }
1011 }
1012
1013 Ok((result_map, shutdown_handle))
1014 })
1015}
1016
1017pub fn implement_neu<T, I, S>(
1019 name: &str,
1020 scope: &mut S,
1021 context: &mut I,
1022) -> Result<
1023 (
1024 HashMap<String, Collection<S, Vec<Value>, isize>>,
1025 ShutdownHandle,
1026 ),
1027 Error,
1028>
1029where
1030 T: Timestamp + Lattice + Default,
1031 I: ImplContext<T>,
1032 S: Scope<Timestamp = T>,
1033{
1034 scope.iterative::<u64, _, _>(move |nested| {
1035 let publish = vec![name];
1036 let mut rules = collect_dependencies(&*context, &publish[..])?;
1037
1038 let mut local_arrangements = VariableMap::new();
1039 let mut result_map = HashMap::new();
1040
1041 if rules.is_empty() {
1043 return Err(Error::not_found(format!(
1044 "Couldn't find any rules for name {}.",
1045 name
1046 )));
1047 }
1048
1049 rules.sort_by(|x, y| x.name.cmp(&y.name));
1050 for index in 1..rules.len() - 1 {
1051 if rules[index].name == rules[index - 1].name {
1052 return Err(Error::conflict(format!(
1053 "Duplicate rule definitions for rule {}",
1054 rules[index].name
1055 )));
1056 }
1057 }
1058
1059 for name in publish.iter() {
1069 if context.is_underconstrained(name) {
1070 local_arrangements.insert(
1071 name.to_string(),
1072 Variable::new(nested, Product::new(Default::default(), 1)),
1073 );
1074 }
1075 }
1076
1077 for name in publish.into_iter() {
1079 if let Some(relation) = local_arrangements.get(name) {
1080 result_map.insert(name.to_string(), relation.leave());
1081 } else {
1082 return Err(Error::not_found(format!(
1083 "Attempted to publish undefined name {}.",
1084 name
1085 )));
1086 }
1087 }
1088
1089 let mut executions = Vec::with_capacity(rules.len());
1091 let mut shutdown_handle = ShutdownHandle::empty();
1092 for rule in rules.iter() {
1093 info!("neu_planning {:?}", rule.name);
1094
1095 let plan = q(rule.plan.variables(), rule.plan.into_bindings());
1096
1097 let (relation, shutdown) = plan.implement(nested, &local_arrangements, context);
1098
1099 executions.push(relation);
1100 shutdown_handle.merge_with(shutdown);
1101 }
1102
1103 for (rule, execution) in rules.iter().zip(executions.drain(..)) {
1105 match local_arrangements.remove(&rule.name) {
1106 None => {
1107 return Err(Error::not_found(format!(
1108 "Rule {} should be in local arrangements, but isn't.",
1109 &rule.name
1110 )));
1111 }
1112 Some(variable) => {
1113 let (tuples, shutdown) = execution.tuples(nested, context);
1114 shutdown_handle.merge_with(shutdown);
1115
1116 #[cfg(feature = "set-semantics")]
1117 variable.set(&tuples.distinct());
1118
1119 #[cfg(not(feature = "set-semantics"))]
1120 variable.set(&tuples.consolidate());
1121 }
1122 }
1123 }
1124
1125 Ok((result_map, shutdown_handle))
1126 })
1127}