mycroft_support/storage/
tuple.rs

1//! `tuples` contains structures related to an in-memory tuple-store for `usize` values.
2//! It is intended to be used in conjunction with `storage::Data` to provid arbitrary-typed tuple
3//! functionality.
4use std::collections::{BTreeMap, BTreeSet, HashMap};
5use std::collections::btree_map;
6use std::collections::hash_map;
7use index::hash::{CheckIndex, HashIndex};
8use join::SkipIterator;
9use aggregator::Aggregator;
10
11type Tuple = Vec<usize>;
12
13fn permute(perm: &[usize], tup: &[usize]) -> Tuple {
14    let mut out = Vec::new();
15    for col in perm {
16        out.push(tup[*col]);
17    }
18    out
19}
20
21/// A projection is an ordered view of a tuple store, under a specific permutation.
22pub struct Projection {
23    perm: Vec<usize>,
24    inner: BTreeMap<Tuple, Vec<usize>>,
25}
26
27impl Projection {
28    fn new(perm: &[usize]) -> Self {
29        Self {
30            perm: perm.iter().cloned().collect(),
31            inner: BTreeMap::new(),
32        }
33    }
34    fn take(&mut self) -> Self {
35        let mut out_inner = BTreeMap::new();
36        ::std::mem::swap(&mut self.inner, &mut out_inner);
37        Self {
38            perm: self.perm.clone(),
39            inner: out_inner,
40        }
41    }
42    fn insert(&mut self, tup: &[usize], fids: Vec<usize>) {
43        self.inner.insert(permute(&self.perm, &tup), fids);
44    }
45    fn remove(&mut self, tup: &[usize]) {
46        self.inner.remove(&permute(&self.perm, &tup));
47    }
48    fn arity(&self) -> usize {
49        self.perm.len()
50    }
51    fn len(&self) -> usize {
52        self.inner.len()
53    }
54    /// Creates a `SkipIterator` for the `Projection`
55    pub fn skip_iter<'a>(&'a self) -> ProjectionIter<'a> {
56        ProjectionIter {
57            proj: self,
58            iter: self.inner.range(vec![]..),
59        }
60    }
61}
62
63/// Iterator over a projection implementing the `SkipIterator` interface
64pub struct ProjectionIter<'a> {
65    proj: &'a Projection,
66    iter: btree_map::Range<'a, Tuple, Vec<usize>>,
67}
68
69impl<'a> SkipIterator for ProjectionIter<'a> {
70    fn skip(&mut self, tup: Tuple) {
71        self.iter = self.proj.inner.range(tup..);
72    }
73    fn next(&mut self) -> Option<(Tuple, Vec<usize>)> {
74        self.iter.next().map(|(t, f)| (t.clone(), f.clone()))
75    }
76    fn arity(&self) -> usize {
77        self.proj.arity()
78    }
79    fn len(&self) -> usize {
80        self.proj.len()
81    }
82}
83
84/// Key type for referencing a fact
85pub type FactId = usize;
86/// Key type for referencing a group of facts in an aggregate
87pub type MetaId = usize;
88
89/// Type for describing one of the two ways a merged result can be created - either as a simple
90/// aggregate using the `FactIds` constructor, or as a circumscribed aggregate using a `MetaId`
91#[derive(Ord, Eq, Debug, PartialOrd, PartialEq, Clone)]
92pub enum MergeRef {
93    /// Circumscribed fact grouping
94    MetaId(MetaId),
95    /// Aggregate fact grouping (singleton for non-aggregate)
96    FactIds(Vec<FactId>),
97}
98
99/// How we know a fact to be true
100#[derive(Ord, Eq, Debug, PartialOrd, PartialEq, Clone)]
101pub enum Provenance {
102    /// Part of the IDB, e.g. user defined
103    Base,
104    /// The named rule derived the fact using the provided premises
105    Rule {
106        /// Index of rule in alphabetical order
107        rule_id: usize,
108        /// Which facts were used - which tuplestore to look up from depends on the rule
109        premises: Vec<MergeRef>,
110    },
111}
112
113impl Provenance {
114    fn uses_fid<F: Fn(usize, usize) -> usize>(&self, pred_id: usize, fid: FactId, f: F) -> bool {
115        match *self {
116            Provenance::Base => false,
117            Provenance::Rule {
118                rule_id,
119                ref premises,
120            } => {
121                for (col, premise) in premises.iter().enumerate() {
122                    if f(rule_id, col) == pred_id {
123                        match *premise {
124                            MergeRef::MetaId(_) => continue,
125                            MergeRef::FactIds(ref fids) => if fids.contains(&fid) {
126                                return true;
127                            },
128                        }
129                    }
130                }
131                false
132            }
133        }
134    }
135    fn uses_mid<F: Fn(usize, usize) -> usize>(&self, pred_id: usize, mid: MetaId, f: F) -> bool {
136        match *self {
137            Provenance::Base => false,
138            Provenance::Rule {
139                rule_id,
140                ref premises,
141            } => {
142                for (col, premise) in premises.iter().enumerate() {
143                    if f(rule_id, col) == pred_id {
144                        match *premise {
145                            MergeRef::MetaId(mid2) => if mid == mid2 {
146                                return true;
147                            },
148                            MergeRef::FactIds(_) => continue,
149                        }
150                    }
151                }
152                false
153            }
154        }
155    }
156}
157
158#[derive(Default, Eq, Ord, Hash, Debug, PartialOrd, PartialEq, Clone)]
159struct AggVal {
160    fids: Vec<usize>,
161    fids_loaded: usize,
162    agg: Option<Vec<usize>>,
163}
164
165impl AggVal {
166    fn new() -> Self {
167        Self::default()
168    }
169    fn partial_load(&mut self, ts: &Tuples) {
170        if self.agg.is_none() {
171            self.agg = Some(permute(&ts.agg_indices, &ts.get(self.fids[0])));
172            self.fids_loaded = 1;
173        }
174    }
175    fn force(&mut self, ts: &Tuples) -> Vec<usize> {
176        self.partial_load(ts);
177        let mut new_agg = Vec::new();
178        for (idx, agg_val) in self.agg.take().unwrap().iter().enumerate() {
179            let mut input = vec![*agg_val];
180            for fid in &self.fids[self.fids_loaded..] {
181                input.push(ts.inner[ts.agg_indices[idx]][*fid]);
182            }
183            new_agg.push(ts.aggs[idx].aggregate(&input));
184        }
185        self.agg = Some(new_agg.clone());
186        self.fids_loaded = self.fids.len();
187        new_agg
188    }
189    fn current(&self) -> Option<Vec<usize>> {
190        self.agg.clone()
191    }
192    fn purge_fid(&mut self, fid: usize) {
193        let mut old_fids = Vec::new();
194        ::std::mem::swap(&mut old_fids, &mut self.fids);
195        self.fids = old_fids.into_iter().filter(|x| *x != fid).collect();
196        self.fids_loaded = 0;
197    }
198    fn add_fid(&mut self, fid: usize) -> bool {
199        if !self.fids.contains(&fid) {
200            self.fids.push(fid);
201            true
202        } else {
203            false
204        }
205    }
206    fn is_empty(&self) -> bool {
207        self.fids.is_empty()
208    }
209}
210
211/// In-memory Tuple store.
212///
213/// * Stores exactly one copy of each tuple
214/// * Allows for indexes on projections of the tuple (projections)
215/// * Allows for differential indexes on projections of the tuple (mailboxes)
216pub struct Tuples {
217    inner: Vec<Tuple>,
218    index: HashIndex<[usize]>,
219    agg_map: HashMap<Vec<usize>, AggVal>,
220    agg_indices: Vec<usize>,
221    key_indices: Vec<usize>,
222    aggs: Vec<Box<Aggregator + 'static>>,
223    projections: HashMap<Vec<usize>, Projection>,
224    mailboxes: Vec<Projection>,
225    provenance: Vec<BTreeSet<Provenance>>,
226    meta: Vec<Vec<FactId>>,
227    inv_meta: HashMap<Vec<FactId>, MetaId>,
228    delayed: BTreeSet<Vec<usize>>,
229}
230
231struct Rows<'a> {
232    inner: &'a Vec<Vec<usize>>,
233}
234
235impl<'a> CheckIndex<[usize]> for Rows<'a> {
236    fn check_index(&self, index: usize, row: &[usize]) -> bool {
237        assert!(row.len() == self.inner.len());
238        for col in 0..row.len() {
239            if self.inner[col][index] != row[col] {
240                return false;
241            }
242        }
243        return true;
244    }
245}
246
247fn get_unchecked(inner: &Vec<Vec<usize>>, key: FactId) -> Vec<usize> {
248    let mut out = Vec::new();
249    for col in inner {
250        out.push(col[key]);
251    }
252    out
253}
254
255impl Tuples {
256    fn forced(&self) -> bool {
257        self.delayed.is_empty()
258    }
259    /// Take all the delayed tuples and update the indices.
260    /// Must be called prior to requesting a projection
261    pub fn force(&mut self) {
262        let mut all_delayed = BTreeSet::new();
263        ::std::mem::swap(&mut all_delayed, &mut self.delayed);
264        for delayed in all_delayed {
265            let mut agg_val = self.agg_map.remove(&delayed).unwrap();
266            let mut tup = Vec::new();
267            tup.resize(self.arity(), 0);
268            for (i, key_idx) in self.key_indices.iter().enumerate() {
269                tup[*key_idx] = delayed[i];
270            }
271            if let Some(cur) = agg_val.current() {
272                for (i, agg_idx) in self.agg_indices.iter().enumerate() {
273                    tup[*agg_idx] = cur[i];
274                }
275
276                for proj in self.projections.values_mut() {
277                    proj.remove(&tup)
278                }
279                for mailbox in self.mailboxes.iter_mut() {
280                    mailbox.remove(&tup)
281                }
282            }
283
284            if !agg_val.is_empty() {
285                let new = agg_val.force(&self);
286                for (i, agg_idx) in self.agg_indices.iter().enumerate() {
287                    tup[*agg_idx] = new[i]
288                }
289
290                let fids = agg_val.fids.clone();
291
292                for proj in self.projections.values_mut() {
293                    proj.insert(&tup, fids.clone())
294                }
295                for mailbox in self.mailboxes.iter_mut() {
296                    mailbox.insert(&tup, fids.clone())
297                }
298
299                self.agg_map.insert(delayed, agg_val);
300            }
301        }
302    }
303
304    // Basic integrity check
305    fn integrity(&self) -> bool {
306        // We have non-zero arity
307        if self.arity() == 0 {
308            return false;
309        }
310        // All our columns are the same length.
311        for col in self.inner.iter() {
312            if col.len() != self.inner[0].len() {
313                return false;
314            }
315        }
316        // We have provenance for every tuple
317        // This makes integrity() slow, so it must only be used inside debug_assert!
318        if self.inner[0].len() != self.provenance.len() {
319            return false;
320        }
321
322        return true;
323    }
324    /// Acquires a **previously registered** projection for the permutation provided. If you did not
325    /// register the projection, an assertion will trip.
326    pub fn projection(&self, fields: &[usize]) -> &Projection {
327        assert!(self.forced());
328        match self.projections.get(fields) {
329            Some(ref p) => p,
330            None => {
331                // We can't just register the projection for them here, because if we do, we'll be
332                // borrowed mutably, which prevents multiple indices from being accessed
333                // simultaneously which is needed for self joins.
334                // You really should want to create all projections at the beginning of the program
335                // anyways, so this isn't that big of a hinderance.
336                panic!("You must register the projection first");
337            }
338        }
339    }
340    /// Acquires the index at a previously registered mailbox, emptying it out and returning the
341    /// projection that was there.
342    pub fn mailbox(&mut self, mailbox: usize) -> Projection {
343        self.force();
344        self.mailboxes[mailbox].take()
345    }
346    /// Requests that a projection be made available for a given permutation. This is essentially
347    /// analogous to asking a database to produce an index on a specific order or fields.
348    /// Multiple requests for the same index ordering are idempotent.
349    pub fn register_projection(&mut self, fields: &[usize]) {
350        if self.projections.contains_key(fields) {
351            return;
352        }
353        let mut projection = Projection::new(fields);
354        for i in 0..self.len() {
355            projection.insert(&self.get(i), vec![i])
356        }
357        self.projections
358            .insert(fields.iter().cloned().collect(), projection);
359    }
360    /// Requests that a mailbox be created for a given permutation, and returns the mailbox ID.
361    /// A mailbox is basically an index for which only values added since you last checked it are
362    /// present.
363    /// Unlike `register_projection`, `register_mailbox` is **not** idempotent, since multiple
364    /// clients may want to listen on the same order of fields. It will return a new mailbox ID
365    /// every time.
366    pub fn register_mailbox(&mut self, fields: &[usize]) -> usize {
367        // TODO: dedup between this and register_projection
368        let mut projection = Projection::new(fields);
369        for i in 0..self.len() {
370            projection.insert(&self.get(i), vec![i])
371        }
372        self.mailboxes.push(projection);
373        self.mailboxes.len() - 1
374    }
375    /// Constructs a new `Tuples` tuplestore of the provided arity.
376    pub fn new(m_aggs: Vec<Option<Box<Aggregator + 'static>>>) -> Self {
377        let arity = m_aggs.len();
378        assert!(arity > 0);
379        let mut inner = Vec::new();
380        for _ in 0..arity {
381            inner.push(Vec::new());
382        }
383
384        let mut agg_indices = Vec::new();
385        let mut key_indices = Vec::new();
386        let mut aggs = Vec::new();
387
388        for (i, m_agg) in m_aggs.into_iter().enumerate() {
389            match m_agg {
390                Some(agg) => {
391                    agg_indices.push(i);
392                    aggs.push(agg);
393                }
394                None => key_indices.push(i),
395            }
396        }
397
398        Tuples {
399            inner: inner,
400            index: HashIndex::new(),
401            projections: HashMap::new(),
402            mailboxes: Vec::new(),
403            provenance: Vec::new(),
404            meta: Vec::new(),
405            inv_meta: HashMap::new(),
406            aggs: aggs,
407            agg_map: HashMap::new(),
408            agg_indices: agg_indices,
409            key_indices: key_indices,
410            delayed: BTreeSet::new(),
411        }
412    }
413    /// Returns the arity of the tuples stored
414    pub fn arity(&self) -> usize {
415        self.inner.len()
416    }
417    /// Returns the number of elements stored
418    pub fn len(&self) -> usize {
419        debug_assert!(self.integrity());
420        self.inner[0].len()
421    }
422    /// Provides the ID for a given tuple if it is present
423    /// The provided needle must have an arity equal to the `Tuples` object
424    pub fn find(&self, needle: &[usize]) -> Option<usize> {
425        assert_eq!(needle.len(), self.arity());
426        debug_assert!(self.integrity());
427        self.index.find(needle, &Rows { inner: &self.inner })
428    }
429    /// Get the fact referenced by the provided key
430    pub fn get(&self, key: usize) -> Vec<usize> {
431        get_unchecked(&self.inner, key)
432    }
433    /// Return the set of ways this tuple was derived
434    pub fn get_provenance(&self, key: usize) -> &BTreeSet<Provenance> {
435        &self.provenance[key]
436    }
437    /// Gets the list of fact IDs making up a particular meta-fact
438    pub fn get_meta(&self, mid: MetaId) -> Vec<FactId> {
439        self.meta[mid].clone()
440    }
441    /// Creates or references a `MetaId` for a circumscription over facts in this tuplestore
442    pub fn make_meta(&mut self, fids: &[FactId]) -> MetaId {
443        match self.inv_meta.entry(fids.to_vec()) {
444            hash_map::Entry::Occupied(oe) => *oe.get(),
445            hash_map::Entry::Vacant(ve) => {
446                let key = self.meta.len();
447                self.meta.push(fids.to_vec());
448                ve.insert(key);
449                key
450            }
451        }
452    }
453    /// Cleanses a fact's derivations of references to a MetaId
454    pub fn purge_mid_prov<F: Fn(usize, usize) -> usize>(
455        &mut self,
456        fid: FactId,
457        pred_id: usize,
458        dep_mid: MetaId,
459        f: F,
460    ) -> (Option<FactId>, Option<MetaId>) {
461        self.provenance[fid] = self.provenance[fid]
462            .iter()
463            .cloned()
464            .filter(|p| !p.uses_mid(pred_id, dep_mid, &f))
465            .collect();
466        if self.provenance[fid].is_empty() {
467            (Some(fid), self.purge(fid))
468        } else {
469            (None, None)
470        }
471    }
472    /// Cleanses a fact's derivations of references to a FactId
473    pub fn purge_fid_prov<F: Fn(usize, usize) -> usize>(
474        &mut self,
475        fid: FactId,
476        pred_id: usize,
477        dep_fid: FactId,
478        f: F,
479    ) -> (Option<FactId>, Option<MetaId>) {
480        self.provenance[fid] = self.provenance[fid]
481            .iter()
482            .cloned()
483            .filter(|p| !p.uses_fid(pred_id, dep_fid, &f))
484            .collect();
485        if self.provenance[fid].is_empty() {
486            (Some(fid), self.purge(fid))
487        } else {
488            (None, None)
489        }
490    }
491    /// Removes a tuple from all indices
492    /// Does not actually remove it from the tuple store, it'll just stop showing up in
493    /// projections. Returns a MetaId if it broke one
494    fn purge(&mut self, fid: FactId) -> Option<MetaId> {
495        let vals = self.get(fid);
496        let key_tuple = permute(&self.key_indices, &vals);
497        match self.agg_map.entry(key_tuple) {
498            hash_map::Entry::Occupied(mut oe) => {
499                assert!(oe.get().fids.contains(&fid));
500                let old_fids = oe.get().fids.clone();
501
502                self.delayed.insert(oe.key().clone());
503                oe.get_mut().purge_fid(fid);
504                self.inv_meta.get(&old_fids).map(|x| x.clone())
505            }
506            hash_map::Entry::Vacant(_) => panic!("Purged a fact not in the aggmap"),
507        }
508    }
509    /// Adds a new element to the tuple store.
510    /// The arity of the provided value must equal the arity of the tuple store.
511    /// The returned value is a tuple of the key, whether the value was new (true for new),
512    /// and any MetaId that got broken by the insert
513    pub fn insert(&mut self, val: &[usize], p: Provenance) -> (FactId, bool, Option<MetaId>) {
514        let key = match self.find(&val) {
515            Some(id) => {
516                self.provenance[id].insert(p);
517                id
518            }
519            None => {
520                assert_eq!(val.len(), self.arity());
521                debug_assert!(self.integrity());
522                let key = self.len();
523                self.index.insert(key, val);
524                let mut ps = BTreeSet::new();
525                ps.insert(p);
526                self.provenance.push(ps);
527                for (col, new_val) in self.inner.iter_mut().zip(val.into_iter()) {
528                    col.push(*new_val)
529                }
530                key
531            }
532        };
533
534        let key_tuple = permute(&self.key_indices, &val);
535
536        match self.agg_map.entry(key_tuple) {
537            hash_map::Entry::Occupied(mut oe) => {
538                let mmid = self.inv_meta.get(&oe.get().fids).map(|x| *x);
539                let fresh = oe.get_mut().add_fid(key);
540                if fresh {
541                    self.delayed.insert(oe.key().clone());
542                }
543                (key, fresh, mmid)
544            }
545            hash_map::Entry::Vacant(ve) => {
546                let mut agg_val = AggVal::new();
547                agg_val.add_fid(key);
548                self.delayed.insert(ve.key().clone());
549                ve.insert(agg_val.clone());
550                (key, true, None)
551            }
552        }
553    }
554}