Skip to main content

rustdb/
table.rs

1use crate::*;
2
3/// Table Index.
4pub struct Index {
5    /// File.
6    pub file: Rc<SortedFile>,
7    /// columns.
8    pub cols: Rc<Vec<usize>>,
9    /// Index id.
10    pub id: i64,
11}
12
13/// List of indexes. Each index has a file and a list of column numbers.
14pub type IxList = Vec<Index>;
15
16/// Save or Rollback.
17#[derive(PartialEq, Eq, PartialOrd, Clone, Copy)]
18pub enum SaveOp {
19    /// Save.
20    Save,
21    /// Rollback.
22    RollBack,
23}
24
25/// Database base table. Underlying file, type information about the columns and id allocation.
26pub struct Table {
27    /// Underlying SortedFile.
28    pub file: Rc<SortedFile>,
29
30    /// Type information about the columns.
31    pub info: Rc<ColInfo>,
32
33    /// List of indexes. ( Maybe could eliminate the RefCell )
34    pub ixlist: RefCell<IxList>,
35
36    /// Table id in sys.Table.
37    pub id: i64,
38
39    /// Row id allocator.
40    pub id_gen: Cell<Option<i64>>,
41
42    /// Row id allocator has changed.
43    pub id_gen_dirty: Cell<bool>,
44}
45
46impl Table {
47    /// Construct a table with specified info.
48    pub fn new(id: i64, root_page: u64, id_gen: i64, info: Rc<ColInfo>) -> Rc<Table> {
49        let rec_size = info.total;
50        let key_size = 8;
51        let file = Rc::new(SortedFile::new(rec_size, key_size, root_page));
52        let ixlist = RefCell::new(Vec::new());
53        Rc::new(Table {
54            id,
55            file,
56            info,
57            ixlist,
58            id_gen: Cell::new(Some(id_gen)),
59            id_gen_dirty: Cell::new(false),
60        })
61    }
62
63    /// Save or Rollback underlying files.
64    pub fn save(&self, db: &DB, op: SaveOp) {
65        self.file.save(db, op);
66        for ix in &*self.ixlist.borrow() {
67            ix.file.save(db, op);
68        }
69    }
70
71    /// Drop the underlying file storage ( the table is not useable after this ).
72    pub fn free_pages(&self, db: &DB) {
73        let row = self.row();
74        self.file.free_pages(db, &row);
75        for ix in &*self.ixlist.borrow() {
76            let ixr = IndexRow::new(self, ix.cols.clone(), &row);
77            ix.file.free_pages(db, &ixr);
78        }
79    }
80
81    /// Insert specified row into the table.
82    pub fn insert(&self, db: &DB, row: &mut Row) {
83        row.encode(db); // Calculate codes for Binary and String values.
84        self.file.insert(db, row);
85        // Update any indexes.
86        for ix in &*self.ixlist.borrow() {
87            let ixr = IndexRow::new(self, ix.cols.clone(), row);
88            ix.file.insert(db, &ixr);
89        }
90    }
91
92    /// Remove specified loaded row from the table.
93    pub fn remove(&self, db: &DB, row: &Row) {
94        self.file.remove(db, row);
95        for ix in &*self.ixlist.borrow() {
96            let ixr = IndexRow::new(self, ix.cols.clone(), row);
97            ix.file.remove(db, &ixr);
98        }
99        row.delcodes(db); // Deletes codes for Binary and String values.
100    }
101
102    /// Look for indexed table expression based on supplied WHERE expression (we).
103    pub fn index_from(
104        self: &Rc<Table>,
105        b: &Block,
106        we: &mut Expr,
107    ) -> (Option<CExpPtr<bool>>, Option<CTableExpression>) {
108        let mut kc = SmallSet::default(); // Set of known columns.
109        get_known_cols(we, &mut kc);
110
111        let list = &*self.ixlist.borrow();
112
113        let mut best_match = 0;
114        let mut best_index = 0;
115        for (index, ix) in list.iter().enumerate() {
116            let m = covered(&ix.cols, &kc);
117            if m > best_match {
118                best_match = m;
119                best_index = index;
120            }
121        }
122        if best_match > 0 {
123            // Get the key values for the chosen index.
124            let clist = &list[best_index].cols;
125            let mut cols = SmallSet::default();
126            for col in clist.iter().take(best_match) {
127                cols.insert(*col);
128            }
129            let mut kmap = BTreeMap::new();
130            let cwe = get_keys(b, we, &mut cols, &mut kmap);
131            let keys = clist
132                .iter()
133                .take(best_match)
134                .map(|col| kmap.remove(col).unwrap())
135                .collect();
136            return (
137                cwe,
138                Some(CTableExpression::IxGet(self.clone(), keys, best_index)),
139            );
140        }
141
142        // ToDo: check for mirror expression, AND conditions, also Id = x OR Id = y ...  Id in (....) etc.
143        if let ExprIs::Binary(op, e1, e2) = &mut we.exp {
144            if *op == Token::Equal && e2.is_constant {
145                if let ExprIs::ColName(_) = &e1.exp {
146                    if e1.col == usize::MAX
147                    // Id column.
148                    {
149                        return (
150                            None,
151                            Some(CTableExpression::IdGet(self.clone(), c_int(b, e2))),
152                        );
153                    }
154                }
155            }
156        }
157        (Some(c_bool(b, we)), None)
158    }
159
160    /// Get record with specified id.
161    pub fn id_get(&self, db: &DB, id: u64) -> Option<(PagePtr, usize)> {
162        self.file.get(db, &Id { id })
163    }
164
165    /// Get record with matching key, using specified index.
166    pub fn ix_get(&self, db: &DB, key: Vec<Value>, index: usize) -> Option<(PagePtr, usize)> {
167        let list = &*self.ixlist.borrow();
168        let ix = &list[index];
169        let key = IndexKey::new(self, ix.cols.clone(), key, Ordering::Equal);
170        if let Some((pp, off)) = ix.file.get(db, &key) {
171            let p = pp.borrow();
172            let id = util::getu64(&p.data, off);
173            let row = Id { id };
174            return self.file.get(db, &row);
175        }
176        None
177    }
178
179    /// Scan all the records in the table.
180    pub fn scan(&self, db: &DB) -> Asc {
181        self.file.asc(db, Box::new(Zero {}))
182    }
183
184    /// Get a single record with specified id.
185    pub fn scan_id(self: &Rc<Table>, db: &DB, id: i64) -> IdScan {
186        IdScan {
187            table: self.clone(),
188            db: db.clone(),
189            id,
190            done: false,
191        }
192    }
193
194    /// Get records with matching key.
195    pub fn scan_key(self: &Rc<Table>, db: &DB, key: Value, index: usize) -> IndexScan {
196        let keys = vec![key];
197        self.scan_keys(db, keys, index)
198    }
199
200    /// Get records with matching keys.
201    pub fn scan_keys(self: &Rc<Table>, db: &DB, keys: Vec<Value>, index: usize) -> IndexScan {
202        let ixlist = &*self.ixlist.borrow();
203        let ix = &ixlist[index];
204        let ikey = IndexKey::new(self, ix.cols.clone(), keys.clone(), Ordering::Less);
205        let ixa = ix.file.asc(db, Box::new(ikey));
206        IndexScan {
207            ixa,
208            keys,
209            cols: ix.cols.clone(),
210            table: self.clone(),
211            db: db.clone(),
212        }
213    }
214
215    /// Add the specified index to the table.
216    pub fn add_index(&self, root: u64, cols: Vec<usize>, id: i64) {
217        let key_size = self.info.index_key_size(&cols) + 8;
218        let file = Rc::new(SortedFile::new(key_size, key_size, root));
219        let list = &mut self.ixlist.borrow_mut();
220        list.push(Index {
221            file,
222            cols: Rc::new(cols),
223            id,
224        });
225    }
226
227    /// Delete the specified index.
228    pub fn delete_index(&self, db: &DB, ix: usize) {
229        let ixlist = &*self.ixlist.borrow();
230        let ix = &ixlist[ix];
231        let row = self.row();
232        let ixr = IndexRow::new(self, ix.cols.clone(), &row);
233        ix.file.free_pages(db, &ixr);
234    }
235
236    /// Initialises last index ( called just after add_index ).
237    pub fn init_index(&self, db: &DB) {
238        let mut row = self.row();
239        let ixlist = self.ixlist.borrow();
240        let ix = ixlist.last().unwrap();
241
242        for (pp, off) in self.scan(db) {
243            let p = pp.borrow();
244            row.load(db, &p.data[off..]);
245            let ixr = IndexRow::new(self, ix.cols.clone(), &row);
246            ix.file.insert(db, &ixr);
247        }
248    }
249
250    /// Utility for accessing fields by number.
251    pub fn access<'d, 't>(&'t self, p: &'d Page, off: usize) -> Access<'d, 't> {
252        Access::<'d, 't> {
253            data: &p.data[off..],
254            info: &self.info,
255        }
256    }
257
258    /// Utility for updating fields by number.
259    pub fn write_access<'d, 't>(&'t self, p: &'d mut Page, off: usize) -> WriteAccess<'d, 't> {
260        WriteAccess::<'d, 't> {
261            data: &mut p.data[off..],
262            info: &self.info,
263        }
264    }
265
266    /// Construct a row for the table.
267    pub fn row(&self) -> Row {
268        Row::new(self.info.clone())
269    }
270
271    /// Get id generator.
272    pub fn get_id_gen(&self, db: &DB) -> i64 {
273        if let Some(result) = self.id_gen.get() {
274            result
275        } else {
276            let result = sys::get_id_gen(db, self.id as u64);
277            self.id_gen.set(Some(result));
278            result
279        }
280    }
281
282    /// Allocate row id.
283    pub fn alloc_id(&self, db: &DB) -> i64 {
284        let result = self.get_id_gen(db);
285        self.id_gen.set(Some(result + 1));
286        self.id_gen_dirty.set(true);
287        result
288    }
289
290    /// Update id allocator if supplied row id exceeds current value.
291    pub fn id_allocated(&self, db: &DB, id: i64) {
292        if id >= self.get_id_gen(db) {
293            self.id_gen.set(Some(id + 1));
294            self.id_gen_dirty.set(true);
295        }
296    }
297
298    #[cfg(feature = "pack")]
299    /// Repack the file pages.
300    pub fn repack(&self, db: &DB, k: usize) -> i64 {
301        let row = self.row();
302        if k == 0 {
303            self.file.repack(db, &row)
304        } else {
305            let list = &*self.ixlist.borrow();
306            if k <= list.len() {
307                let ix = &list[k - 1];
308                let ixr = IndexRow::new(self, ix.cols.clone(), &row);
309                ix.file.repack(db, &ixr)
310            } else {
311                -1
312            }
313        }
314    }
315
316    #[cfg(feature = "verify")]
317    /// Add the all the pages used by the table to the specified set.
318    pub fn get_used(&self, db: &DB, to: &mut HashSet<u64>) {
319        self.file.get_used(db, to);
320        for ix in &*self.ixlist.borrow() {
321            ix.file.get_used(db, to);
322        }
323    }
324} // end impl Table.
325
326/// Dummy record for iterating over whole table.
327struct Zero {}
328
329impl Record for Zero {
330    /// Always returns `Less`.
331    fn compare(&self, _db: &DB, _data: &[u8]) -> Ordering {
332        Ordering::Less
333    }
334}
335
336/// Helper struct to access byte data using ColInfo ( returned by [Table::access] method ).
337pub struct Access<'d, 'i> {
338    data: &'d [u8],
339    info: &'i ColInfo,
340}
341
342impl<'d, 'i> Access<'d, 'i> {
343    /// Extract int from byte data for specified column.
344    pub fn int(&self, colnum: usize) -> i64 {
345        debug_assert!(data_kind(self.info.typ[colnum]) == DataKind::Int);
346        util::iget(self.data, self.info.off[colnum], self.info.siz(colnum))
347    }
348
349    /// Extract string from byte data for specified column.
350    pub fn str(&self, db: &DB, colnum: usize) -> String {
351        debug_assert!(data_kind(self.info.typ[colnum]) == DataKind::String);
352        let off = self.info.off[colnum];
353        let size = self.info.siz(colnum);
354        let bytes = get_bytes(db, &self.data[off..], size).0;
355        String::from_utf8(bytes).unwrap()
356    }
357
358    /// Extract binary from byte data for specified column.
359    pub fn bin(&self, db: &DB, colnum: usize) -> Vec<u8> {
360        debug_assert!(data_kind(self.info.typ[colnum]) == DataKind::Binary);
361        let off = self.info.off[colnum];
362        let size = self.info.siz(colnum);
363        get_bytes(db, &self.data[off..], size).0
364    }
365
366    /// Extract Id from byte data.
367    pub fn id(&self) -> u64 {
368        util::getu64(self.data, 0)
369    }
370
371    /// Extract f32 from byte data for specified column.
372    pub fn f32(&self, colnum: usize) -> f32 {
373        debug_assert!(self.info.typ[colnum] == FLOAT);
374        util::getf32(self.data, self.info.off[colnum])
375    }
376
377    /// Extract f64 from byte data for specified column.
378    pub fn f64(&self, colnum: usize) -> f64 {
379        debug_assert!(self.info.typ[colnum] == DOUBLE);
380        util::getf64(self.data, self.info.off[colnum])
381    }
382}
383
384/// Helper class to read/write byte data using ColInfo.
385pub struct WriteAccess<'d, 'i> {
386    data: &'d mut [u8],
387    info: &'i ColInfo,
388}
389
390/// Helper struct to read/write byte data using ColInfo ( returned by [Table::write_access] method ).
391impl<'d, 'i> WriteAccess<'d, 'i> {
392    /// Save int to byte data.
393    pub fn set_int(&mut self, colnum: usize, val: i64) {
394        util::iset(self.data, self.info.off[colnum], val, self.info.siz(colnum));
395    }
396
397    /// Extract int from byte data for column number colnum.
398    pub fn int(&self, colnum: usize) -> i64 {
399        util::get(self.data, self.info.off[colnum], self.info.siz(colnum)) as i64
400    }
401
402    /// Extract Id from byte data.
403    pub fn id(&self) -> u64 {
404        util::getu64(self.data, 0)
405    }
406
407    /* ToDo... add more methods as appropriate */
408}
409
410/// Table name, column names/types and other calculated values for a table.
411#[non_exhaustive]
412pub struct ColInfo {
413    /// Table name.
414    pub name: ObjRef,
415    /// Map from column name to column number.
416    pub colmap: BTreeMap<String, usize>,
417    /// Column names.
418    pub colnames: Vec<String>,
419    /// Column types.
420    pub typ: Vec<DataType>,
421    /// Column offsets.
422    pub off: Vec<usize>,
423    /// Total data size, including Id.
424    pub total: usize,
425}
426
427impl ColInfo {
428    /// Construct an empty ColInfo struct with no columns.
429    pub fn empty(name: ObjRef) -> Self {
430        ColInfo {
431            name,
432            colmap: BTreeMap::new(),
433            typ: Vec::new(),
434            colnames: Vec::new(),
435            off: Vec::new(),
436            total: 8,
437        }
438    }
439
440    /// Construct a new ColInfo struct using supplied list of column names and types.
441    pub fn new(name: ObjRef, ct: &[(&str, DataType)]) -> Self {
442        let mut result = Self::empty(name);
443        for (n, t) in ct {
444            result.add((*n).to_string(), *t);
445        }
446        result
447    }
448
449    /// Add a column. If the column already exists ( an error ) the result is true.
450    pub fn add(&mut self, name: String, typ: DataType) -> bool {
451        if self.colmap.contains_key(&name) {
452            return true;
453        }
454        let cn = self.typ.len();
455        self.typ.push(typ);
456        let size = data_size(typ);
457        self.off.push(self.total);
458        self.total += size;
459        self.colnames.push(name.clone());
460        self.colmap.insert(name, cn);
461        false
462    }
463
464    pub(crate) fn add_altered(&mut self, ci: &ColInfo, cnum: usize, actions: &[AlterCol]) -> bool {
465        let cname = &ci.colnames[cnum];
466        let mut typ = ci.typ[cnum];
467        for act in actions {
468            match act {
469                AlterCol::Drop(name) => {
470                    if name == cname {
471                        return false;
472                    }
473                }
474                AlterCol::Modify(name, dt) => {
475                    if name == cname {
476                        if data_kind(typ) != data_kind(*dt) {
477                            panic!("Cannot change column data kind");
478                        }
479                        typ = *dt;
480                    }
481                }
482                _ => {}
483            }
484        }
485        self.add(cname.clone(), typ);
486        true
487    }
488
489    /// Get a column number from a column name.
490    /// usize::MAX is returned for "Id".
491    pub fn get(&self, name: &str) -> Option<&usize> {
492        if name == "Id" {
493            Some(&usize::MAX)
494        } else {
495            self.colmap.get(name)
496        }
497    }
498
499    /// Get the data size of specified column.
500    fn siz(&self, col: usize) -> usize {
501        data_size(self.typ[col])
502    }
503
504    /// Calculate the total data size for a list of index columns.
505    fn index_key_size(&self, cols: &[usize]) -> usize {
506        cols.iter().map(|cnum| self.siz(*cnum)).sum()
507    }
508} // impl ColInfo
509
510/// Index information for creating an index.
511#[non_exhaustive]
512pub struct IndexInfo {
513    /// Table name.
514    pub tname: ObjRef,
515    /// Index name.
516    pub iname: String,
517    /// Index columns.
518    pub cols: Vec<usize>,
519}
520
521/// Row of Values, with type information.
522#[derive(Clone)]
523#[non_exhaustive]
524pub struct Row {
525    /// Row Id.
526    pub id: i64,
527    /// Row values.
528    pub values: Vec<Value>,
529    /// Type information.
530    pub info: Rc<ColInfo>,
531    /// Codes for variable length ( binary, string ) values.
532    pub codes: Vec<Code>,
533}
534
535impl Row {
536    /// Construct a new row, values are initialised to defaults.
537    pub fn new(info: Rc<ColInfo>) -> Self {
538        let n = info.typ.len();
539        let mut result = Row {
540            id: 0,
541            values: Vec::with_capacity(n),
542            info,
543            codes: Vec::with_capacity(n),
544        };
545        for t in &result.info.typ {
546            result.values.push(Value::default(*t));
547        }
548        result
549    }
550
551    /// Calculate codes for current row values.
552    pub fn encode(&mut self, db: &DB) {
553        self.codes.clear();
554        for (i, val) in self.values.iter().enumerate() {
555            let size = data_size(self.info.typ[i]);
556            let u = db.encode(val, size);
557            self.codes.push(u);
558        }
559    }
560
561    /// Delete current codes.
562    pub fn delcodes(&self, db: &DB) {
563        for u in &self.codes {
564            if u.id != u64::MAX {
565                db.delcode(*u);
566            }
567        }
568    }
569
570    /// Load the row values and codes from data.
571    pub fn load(&mut self, db: &DB, data: &[u8]) {
572        self.values.clear();
573        self.codes.clear();
574        self.id = util::getu64(data, 0) as i64;
575        let mut off = 8;
576        for typ in &self.info.typ {
577            let (val, code) = Value::load(db, *typ, data, off);
578            self.values.push(val);
579            self.codes.push(code);
580            off += data_size(*typ);
581        }
582    }
583}
584
585impl Record for Row {
586    fn save(&self, data: &mut [u8]) {
587        util::setu64(data, self.id as u64);
588        let t = &self.info;
589        let mut off = 8;
590        for (i, typ) in t.typ.iter().enumerate() {
591            self.values[i].save(*typ, data, off, self.codes[i]);
592            off += data_size(*typ);
593        }
594    }
595
596    fn compare(&self, _db: &DB, data: &[u8]) -> Ordering {
597        let id = util::getu64(data, 0) as i64;
598        self.id.cmp(&id)
599    }
600}
601
602/// Row for inserting into an index.
603pub struct IndexRow {
604    tinfo: Rc<ColInfo>,
605    cols: Rc<Vec<usize>>,
606    keys: Vec<Value>,
607    codes: Vec<Code>,
608    rowid: i64,
609}
610
611impl IndexRow {
612    // Construct IndexRow from Row.
613    fn new(table: &Table, cols: Rc<Vec<usize>>, row: &Row) -> Self {
614        let n = cols.len();
615        let mut keys = Vec::with_capacity(n);
616        let mut codes = Vec::with_capacity(n);
617        if !row.codes.is_empty() {
618            for c in &*cols {
619                keys.push(row.values[*c].clone());
620                codes.push(row.codes[*c]);
621            }
622        }
623        Self {
624            tinfo: table.info.clone(),
625            cols,
626            rowid: row.id,
627            keys,
628            codes,
629        }
630    }
631
632    // Load IndexRow from data ( note: new codes are computed, as old codes may be deleted ).
633    // Since it's unusual for long strings to be keys, code computation should be rare.
634    fn load(&mut self, db: &DB, data: &[u8]) {
635        self.rowid = util::getu64(data, 0) as i64;
636        let mut off = 8;
637        for col in &*self.cols {
638            let typ = self.tinfo.typ[*col];
639            let val = Value::load(db, typ, data, off).0;
640            let size = data_size(typ);
641            let code = db.encode(&val, size);
642            self.keys.push(val);
643            self.codes.push(code);
644            off += size;
645        }
646    }
647}
648
649impl Record for IndexRow {
650    fn compare(&self, db: &DB, data: &[u8]) -> Ordering {
651        let mut ix = 0;
652        let mut off = 8;
653        loop {
654            let typ = self.tinfo.typ[self.cols[ix]];
655            // Could have special purpose Value method which compares instead of loading to save heap allocations.
656            let val = Value::load(db, typ, data, off).0;
657            let cf = val.cmp(&self.keys[ix]);
658            if cf != Ordering::Equal {
659                return cf;
660            }
661            ix += 1;
662            off += data_size(typ);
663            if ix == self.cols.len() {
664                let rowid = util::getu64(data, 0) as i64;
665                return self.rowid.cmp(&rowid);
666            }
667        }
668    }
669
670    fn save(&self, data: &mut [u8]) {
671        util::setu64(data, self.rowid as u64);
672        let mut off = 8;
673        for (ix, k) in self.keys.iter().enumerate() {
674            let typ = self.tinfo.typ[self.cols[ix]];
675            k.save(typ, data, off, self.codes[ix]);
676            off += data_size(typ);
677        }
678    }
679
680    fn key(&self, db: &DB, data: &[u8]) -> Box<dyn Record> {
681        let n = self.cols.len();
682        let mut result = Box::new(IndexRow {
683            cols: self.cols.clone(),
684            tinfo: self.tinfo.clone(),
685            rowid: 0,
686            keys: Vec::with_capacity(n),
687            codes: Vec::with_capacity(n),
688        });
689        result.load(db, data);
690        result
691    }
692
693    fn drop_key(&self, db: &DB, data: &[u8]) {
694        let mut off = 8;
695        for col in &*self.cols {
696            let typ = self.tinfo.typ[*col];
697            let code = Value::load(db, typ, data, off).1;
698            if code.id != u64::MAX {
699                db.delcode(code);
700            }
701            off += data_size(typ);
702        }
703    }
704}
705
706/// Key for searching index.
707struct IndexKey {
708    /// Information about the indexed table.
709    tinfo: Rc<ColInfo>,
710    /// List of indexed columns.
711    cols: Rc<Vec<usize>>,
712    /// Key values.
713    key: Vec<Value>,
714    /// Ordering used if keys compare equal.
715    def: Ordering,
716}
717
718impl IndexKey {
719    fn new(table: &Table, cols: Rc<Vec<usize>>, key: Vec<Value>, def: Ordering) -> Self {
720        Self {
721            tinfo: table.info.clone(),
722            key,
723            cols,
724            def,
725        }
726    }
727}
728
729impl Record for IndexKey {
730    fn compare(&self, db: &DB, data: &[u8]) -> Ordering {
731        let mut ix = 0;
732        let mut off = 8;
733        loop {
734            if ix == self.key.len() {
735                return self.def;
736            }
737            let typ = self.tinfo.typ[self.cols[ix]];
738            let val = Value::load(db, typ, data, off).0;
739            let cf = val.cmp(&self.key[ix]);
740            if cf != Ordering::Equal {
741                return cf;
742            }
743            ix += 1;
744            off += data_size(typ);
745        }
746    }
747}
748
749/// State for fetching records using an index.
750pub struct IndexScan {
751    ixa: Asc,
752    table: Rc<Table>,
753    db: DB,
754    cols: Rc<Vec<usize>>,
755    keys: Vec<Value>,
756}
757
758impl IndexScan {
759    fn keys_equal(&self, data: &[u8]) -> bool {
760        let mut off = 8;
761        for (ix, k) in self.keys.iter().enumerate() {
762            let typ = self.table.info.typ[self.cols[ix]];
763            let val = Value::load(&self.db, typ, data, off).0;
764            let cf = val.cmp(k);
765            if cf != Ordering::Equal {
766                return false;
767            }
768            off += data_size(typ);
769        }
770        true
771    }
772}
773
774impl Iterator for IndexScan {
775    type Item = (PagePtr, usize);
776
777    fn next(&mut self) -> Option<<Self as Iterator>::Item> {
778        if let Some((pp, off)) = self.ixa.next() {
779            let p = pp.borrow();
780            let data = &p.data[off..];
781            if !self.keys_equal(data) {
782                return None;
783            }
784            let id = util::getu64(data, 0);
785            return self.table.id_get(&self.db, id);
786        }
787        None
788    }
789}
790
791/// State for fetching record with specified id.
792pub struct IdScan {
793    id: i64,
794    table: Rc<Table>,
795    db: DB,
796    done: bool,
797}
798
799impl Iterator for IdScan {
800    type Item = (PagePtr, usize);
801
802    fn next(&mut self) -> Option<<Self as Iterator>::Item> {
803        if self.done {
804            return None;
805        }
806        self.done = true;
807        self.table.id_get(&self.db, self.id as u64)
808    }
809}
810
811/// Gets the list of columns that are known from a WHERE condition.
812fn get_known_cols(we: &Expr, kc: &mut SmallSet) {
813    match &we.exp {
814        ExprIs::Binary(Token::Equal, e1, e2) => {
815            if e2.is_constant {
816                if let ExprIs::ColName(_) = &e1.exp {
817                    kc.insert(e1.col);
818                }
819            } else if e1.is_constant {
820                if let ExprIs::ColName(_) = &e2.exp {
821                    kc.insert(e2.col);
822                }
823            }
824        }
825        ExprIs::Binary(Token::And, e1, e2) => {
826            get_known_cols(e1, kc);
827            get_known_cols(e2, kc);
828        }
829        _ => {}
830    }
831}
832
833/// Count the number of index columns that are known.
834fn covered(clist: &[usize], kc: &SmallSet) -> usize {
835    let mut result = 0;
836    for &c in clist {
837        if !kc.contains(c) {
838            break;
839        }
840        result += 1;
841    }
842    result
843}
844
845/// Get keys. Returns compiled bool expression ( taking into account conditions satisfied by index ).
846fn get_keys(
847    b: &Block,
848    we: &mut Expr,
849    cols: &mut SmallSet,
850    keys: &mut BTreeMap<usize, CExpPtr<Value>>,
851) -> Option<CExpPtr<bool>> {
852    match &mut we.exp {
853        ExprIs::Binary(Token::Equal, e1, e2) => {
854            if e2.is_constant {
855                if let ExprIs::ColName(_) = &e1.exp {
856                    if cols.remove(e1.col) {
857                        keys.insert(e1.col, c_value(b, e2));
858                        return None;
859                    }
860                }
861            } else if e1.is_constant {
862                if let ExprIs::ColName(_) = &e2.exp {
863                    if cols.remove(e2.col) {
864                        keys.insert(e2.col, c_value(b, e1));
865                        return None;
866                    }
867                }
868            }
869        }
870        ExprIs::Binary(Token::And, e1, e2) => {
871            let x1 = get_keys(b, e1, cols, keys);
872            let x2 = get_keys(b, e2, cols, keys);
873
874            return if let Some(c1) = x1 {
875                if let Some(c2) = x2 {
876                    Some(Box::new(cexp::And(c1, c2)))
877                } else {
878                    Some(c1)
879                }
880            } else {
881                x2
882            };
883        }
884        _ => {}
885    }
886    Some(c_bool(b, we))
887}
888
889/// Compare table rows.
890pub fn row_compare(a: &[Value], b: &[Value], desc: &[bool]) -> Ordering {
891    let mut ix = 0;
892    loop {
893        let cmp = a[ix].cmp(&b[ix]);
894        if cmp != Ordering::Equal {
895            if !desc[ix] {
896                return cmp;
897            };
898            return if cmp == Ordering::Less {
899                Ordering::Greater
900            } else {
901                Ordering::Less
902            };
903        }
904        ix += 1;
905        if ix == desc.len() {
906            return Ordering::Equal;
907        }
908    }
909}