1use std::collections::{BTreeMap, BTreeSet, HashMap};
5use std::collections::btree_map;
6use std::collections::hash_map;
7use index::hash::{CheckIndex, HashIndex};
8use join::SkipIterator;
9use aggregator::Aggregator;
10
11type Tuple = Vec<usize>;
12
13fn permute(perm: &[usize], tup: &[usize]) -> Tuple {
14 let mut out = Vec::new();
15 for col in perm {
16 out.push(tup[*col]);
17 }
18 out
19}
20
21pub struct Projection {
23 perm: Vec<usize>,
24 inner: BTreeMap<Tuple, Vec<usize>>,
25}
26
27impl Projection {
28 fn new(perm: &[usize]) -> Self {
29 Self {
30 perm: perm.iter().cloned().collect(),
31 inner: BTreeMap::new(),
32 }
33 }
34 fn take(&mut self) -> Self {
35 let mut out_inner = BTreeMap::new();
36 ::std::mem::swap(&mut self.inner, &mut out_inner);
37 Self {
38 perm: self.perm.clone(),
39 inner: out_inner,
40 }
41 }
42 fn insert(&mut self, tup: &[usize], fids: Vec<usize>) {
43 self.inner.insert(permute(&self.perm, &tup), fids);
44 }
45 fn remove(&mut self, tup: &[usize]) {
46 self.inner.remove(&permute(&self.perm, &tup));
47 }
48 fn arity(&self) -> usize {
49 self.perm.len()
50 }
51 fn len(&self) -> usize {
52 self.inner.len()
53 }
54 pub fn skip_iter<'a>(&'a self) -> ProjectionIter<'a> {
56 ProjectionIter {
57 proj: self,
58 iter: self.inner.range(vec![]..),
59 }
60 }
61}
62
63pub struct ProjectionIter<'a> {
65 proj: &'a Projection,
66 iter: btree_map::Range<'a, Tuple, Vec<usize>>,
67}
68
69impl<'a> SkipIterator for ProjectionIter<'a> {
70 fn skip(&mut self, tup: Tuple) {
71 self.iter = self.proj.inner.range(tup..);
72 }
73 fn next(&mut self) -> Option<(Tuple, Vec<usize>)> {
74 self.iter.next().map(|(t, f)| (t.clone(), f.clone()))
75 }
76 fn arity(&self) -> usize {
77 self.proj.arity()
78 }
79 fn len(&self) -> usize {
80 self.proj.len()
81 }
82}
83
84pub type FactId = usize;
86pub type MetaId = usize;
88
89#[derive(Ord, Eq, Debug, PartialOrd, PartialEq, Clone)]
92pub enum MergeRef {
93 MetaId(MetaId),
95 FactIds(Vec<FactId>),
97}
98
99#[derive(Ord, Eq, Debug, PartialOrd, PartialEq, Clone)]
101pub enum Provenance {
102 Base,
104 Rule {
106 rule_id: usize,
108 premises: Vec<MergeRef>,
110 },
111}
112
113impl Provenance {
114 fn uses_fid<F: Fn(usize, usize) -> usize>(&self, pred_id: usize, fid: FactId, f: F) -> bool {
115 match *self {
116 Provenance::Base => false,
117 Provenance::Rule {
118 rule_id,
119 ref premises,
120 } => {
121 for (col, premise) in premises.iter().enumerate() {
122 if f(rule_id, col) == pred_id {
123 match *premise {
124 MergeRef::MetaId(_) => continue,
125 MergeRef::FactIds(ref fids) => if fids.contains(&fid) {
126 return true;
127 },
128 }
129 }
130 }
131 false
132 }
133 }
134 }
135 fn uses_mid<F: Fn(usize, usize) -> usize>(&self, pred_id: usize, mid: MetaId, f: F) -> bool {
136 match *self {
137 Provenance::Base => false,
138 Provenance::Rule {
139 rule_id,
140 ref premises,
141 } => {
142 for (col, premise) in premises.iter().enumerate() {
143 if f(rule_id, col) == pred_id {
144 match *premise {
145 MergeRef::MetaId(mid2) => if mid == mid2 {
146 return true;
147 },
148 MergeRef::FactIds(_) => continue,
149 }
150 }
151 }
152 false
153 }
154 }
155 }
156}
157
158#[derive(Default, Eq, Ord, Hash, Debug, PartialOrd, PartialEq, Clone)]
159struct AggVal {
160 fids: Vec<usize>,
161 fids_loaded: usize,
162 agg: Option<Vec<usize>>,
163}
164
165impl AggVal {
166 fn new() -> Self {
167 Self::default()
168 }
169 fn partial_load(&mut self, ts: &Tuples) {
170 if self.agg.is_none() {
171 self.agg = Some(permute(&ts.agg_indices, &ts.get(self.fids[0])));
172 self.fids_loaded = 1;
173 }
174 }
175 fn force(&mut self, ts: &Tuples) -> Vec<usize> {
176 self.partial_load(ts);
177 let mut new_agg = Vec::new();
178 for (idx, agg_val) in self.agg.take().unwrap().iter().enumerate() {
179 let mut input = vec![*agg_val];
180 for fid in &self.fids[self.fids_loaded..] {
181 input.push(ts.inner[ts.agg_indices[idx]][*fid]);
182 }
183 new_agg.push(ts.aggs[idx].aggregate(&input));
184 }
185 self.agg = Some(new_agg.clone());
186 self.fids_loaded = self.fids.len();
187 new_agg
188 }
189 fn current(&self) -> Option<Vec<usize>> {
190 self.agg.clone()
191 }
192 fn purge_fid(&mut self, fid: usize) {
193 let mut old_fids = Vec::new();
194 ::std::mem::swap(&mut old_fids, &mut self.fids);
195 self.fids = old_fids.into_iter().filter(|x| *x != fid).collect();
196 self.fids_loaded = 0;
197 }
198 fn add_fid(&mut self, fid: usize) -> bool {
199 if !self.fids.contains(&fid) {
200 self.fids.push(fid);
201 true
202 } else {
203 false
204 }
205 }
206 fn is_empty(&self) -> bool {
207 self.fids.is_empty()
208 }
209}
210
211pub struct Tuples {
217 inner: Vec<Tuple>,
218 index: HashIndex<[usize]>,
219 agg_map: HashMap<Vec<usize>, AggVal>,
220 agg_indices: Vec<usize>,
221 key_indices: Vec<usize>,
222 aggs: Vec<Box<Aggregator + 'static>>,
223 projections: HashMap<Vec<usize>, Projection>,
224 mailboxes: Vec<Projection>,
225 provenance: Vec<BTreeSet<Provenance>>,
226 meta: Vec<Vec<FactId>>,
227 inv_meta: HashMap<Vec<FactId>, MetaId>,
228 delayed: BTreeSet<Vec<usize>>,
229}
230
231struct Rows<'a> {
232 inner: &'a Vec<Vec<usize>>,
233}
234
235impl<'a> CheckIndex<[usize]> for Rows<'a> {
236 fn check_index(&self, index: usize, row: &[usize]) -> bool {
237 assert!(row.len() == self.inner.len());
238 for col in 0..row.len() {
239 if self.inner[col][index] != row[col] {
240 return false;
241 }
242 }
243 return true;
244 }
245}
246
247fn get_unchecked(inner: &Vec<Vec<usize>>, key: FactId) -> Vec<usize> {
248 let mut out = Vec::new();
249 for col in inner {
250 out.push(col[key]);
251 }
252 out
253}
254
255impl Tuples {
256 fn forced(&self) -> bool {
257 self.delayed.is_empty()
258 }
259 pub fn force(&mut self) {
262 let mut all_delayed = BTreeSet::new();
263 ::std::mem::swap(&mut all_delayed, &mut self.delayed);
264 for delayed in all_delayed {
265 let mut agg_val = self.agg_map.remove(&delayed).unwrap();
266 let mut tup = Vec::new();
267 tup.resize(self.arity(), 0);
268 for (i, key_idx) in self.key_indices.iter().enumerate() {
269 tup[*key_idx] = delayed[i];
270 }
271 if let Some(cur) = agg_val.current() {
272 for (i, agg_idx) in self.agg_indices.iter().enumerate() {
273 tup[*agg_idx] = cur[i];
274 }
275
276 for proj in self.projections.values_mut() {
277 proj.remove(&tup)
278 }
279 for mailbox in self.mailboxes.iter_mut() {
280 mailbox.remove(&tup)
281 }
282 }
283
284 if !agg_val.is_empty() {
285 let new = agg_val.force(&self);
286 for (i, agg_idx) in self.agg_indices.iter().enumerate() {
287 tup[*agg_idx] = new[i]
288 }
289
290 let fids = agg_val.fids.clone();
291
292 for proj in self.projections.values_mut() {
293 proj.insert(&tup, fids.clone())
294 }
295 for mailbox in self.mailboxes.iter_mut() {
296 mailbox.insert(&tup, fids.clone())
297 }
298
299 self.agg_map.insert(delayed, agg_val);
300 }
301 }
302 }
303
304 fn integrity(&self) -> bool {
306 if self.arity() == 0 {
308 return false;
309 }
310 for col in self.inner.iter() {
312 if col.len() != self.inner[0].len() {
313 return false;
314 }
315 }
316 if self.inner[0].len() != self.provenance.len() {
319 return false;
320 }
321
322 return true;
323 }
324 pub fn projection(&self, fields: &[usize]) -> &Projection {
327 assert!(self.forced());
328 match self.projections.get(fields) {
329 Some(ref p) => p,
330 None => {
331 panic!("You must register the projection first");
337 }
338 }
339 }
340 pub fn mailbox(&mut self, mailbox: usize) -> Projection {
343 self.force();
344 self.mailboxes[mailbox].take()
345 }
346 pub fn register_projection(&mut self, fields: &[usize]) {
350 if self.projections.contains_key(fields) {
351 return;
352 }
353 let mut projection = Projection::new(fields);
354 for i in 0..self.len() {
355 projection.insert(&self.get(i), vec![i])
356 }
357 self.projections
358 .insert(fields.iter().cloned().collect(), projection);
359 }
360 pub fn register_mailbox(&mut self, fields: &[usize]) -> usize {
367 let mut projection = Projection::new(fields);
369 for i in 0..self.len() {
370 projection.insert(&self.get(i), vec![i])
371 }
372 self.mailboxes.push(projection);
373 self.mailboxes.len() - 1
374 }
375 pub fn new(m_aggs: Vec<Option<Box<Aggregator + 'static>>>) -> Self {
377 let arity = m_aggs.len();
378 assert!(arity > 0);
379 let mut inner = Vec::new();
380 for _ in 0..arity {
381 inner.push(Vec::new());
382 }
383
384 let mut agg_indices = Vec::new();
385 let mut key_indices = Vec::new();
386 let mut aggs = Vec::new();
387
388 for (i, m_agg) in m_aggs.into_iter().enumerate() {
389 match m_agg {
390 Some(agg) => {
391 agg_indices.push(i);
392 aggs.push(agg);
393 }
394 None => key_indices.push(i),
395 }
396 }
397
398 Tuples {
399 inner: inner,
400 index: HashIndex::new(),
401 projections: HashMap::new(),
402 mailboxes: Vec::new(),
403 provenance: Vec::new(),
404 meta: Vec::new(),
405 inv_meta: HashMap::new(),
406 aggs: aggs,
407 agg_map: HashMap::new(),
408 agg_indices: agg_indices,
409 key_indices: key_indices,
410 delayed: BTreeSet::new(),
411 }
412 }
413 pub fn arity(&self) -> usize {
415 self.inner.len()
416 }
417 pub fn len(&self) -> usize {
419 debug_assert!(self.integrity());
420 self.inner[0].len()
421 }
422 pub fn find(&self, needle: &[usize]) -> Option<usize> {
425 assert_eq!(needle.len(), self.arity());
426 debug_assert!(self.integrity());
427 self.index.find(needle, &Rows { inner: &self.inner })
428 }
429 pub fn get(&self, key: usize) -> Vec<usize> {
431 get_unchecked(&self.inner, key)
432 }
433 pub fn get_provenance(&self, key: usize) -> &BTreeSet<Provenance> {
435 &self.provenance[key]
436 }
437 pub fn get_meta(&self, mid: MetaId) -> Vec<FactId> {
439 self.meta[mid].clone()
440 }
441 pub fn make_meta(&mut self, fids: &[FactId]) -> MetaId {
443 match self.inv_meta.entry(fids.to_vec()) {
444 hash_map::Entry::Occupied(oe) => *oe.get(),
445 hash_map::Entry::Vacant(ve) => {
446 let key = self.meta.len();
447 self.meta.push(fids.to_vec());
448 ve.insert(key);
449 key
450 }
451 }
452 }
453 pub fn purge_mid_prov<F: Fn(usize, usize) -> usize>(
455 &mut self,
456 fid: FactId,
457 pred_id: usize,
458 dep_mid: MetaId,
459 f: F,
460 ) -> (Option<FactId>, Option<MetaId>) {
461 self.provenance[fid] = self.provenance[fid]
462 .iter()
463 .cloned()
464 .filter(|p| !p.uses_mid(pred_id, dep_mid, &f))
465 .collect();
466 if self.provenance[fid].is_empty() {
467 (Some(fid), self.purge(fid))
468 } else {
469 (None, None)
470 }
471 }
472 pub fn purge_fid_prov<F: Fn(usize, usize) -> usize>(
474 &mut self,
475 fid: FactId,
476 pred_id: usize,
477 dep_fid: FactId,
478 f: F,
479 ) -> (Option<FactId>, Option<MetaId>) {
480 self.provenance[fid] = self.provenance[fid]
481 .iter()
482 .cloned()
483 .filter(|p| !p.uses_fid(pred_id, dep_fid, &f))
484 .collect();
485 if self.provenance[fid].is_empty() {
486 (Some(fid), self.purge(fid))
487 } else {
488 (None, None)
489 }
490 }
491 fn purge(&mut self, fid: FactId) -> Option<MetaId> {
495 let vals = self.get(fid);
496 let key_tuple = permute(&self.key_indices, &vals);
497 match self.agg_map.entry(key_tuple) {
498 hash_map::Entry::Occupied(mut oe) => {
499 assert!(oe.get().fids.contains(&fid));
500 let old_fids = oe.get().fids.clone();
501
502 self.delayed.insert(oe.key().clone());
503 oe.get_mut().purge_fid(fid);
504 self.inv_meta.get(&old_fids).map(|x| x.clone())
505 }
506 hash_map::Entry::Vacant(_) => panic!("Purged a fact not in the aggmap"),
507 }
508 }
509 pub fn insert(&mut self, val: &[usize], p: Provenance) -> (FactId, bool, Option<MetaId>) {
514 let key = match self.find(&val) {
515 Some(id) => {
516 self.provenance[id].insert(p);
517 id
518 }
519 None => {
520 assert_eq!(val.len(), self.arity());
521 debug_assert!(self.integrity());
522 let key = self.len();
523 self.index.insert(key, val);
524 let mut ps = BTreeSet::new();
525 ps.insert(p);
526 self.provenance.push(ps);
527 for (col, new_val) in self.inner.iter_mut().zip(val.into_iter()) {
528 col.push(*new_val)
529 }
530 key
531 }
532 };
533
534 let key_tuple = permute(&self.key_indices, &val);
535
536 match self.agg_map.entry(key_tuple) {
537 hash_map::Entry::Occupied(mut oe) => {
538 let mmid = self.inv_meta.get(&oe.get().fids).map(|x| *x);
539 let fresh = oe.get_mut().add_fid(key);
540 if fresh {
541 self.delayed.insert(oe.key().clone());
542 }
543 (key, fresh, mmid)
544 }
545 hash_map::Entry::Vacant(ve) => {
546 let mut agg_val = AggVal::new();
547 agg_val.add_fid(key);
548 self.delayed.insert(ve.key().clone());
549 ve.insert(agg_val.clone());
550 (key, true, None)
551 }
552 }
553 }
554}