Skip to main content

quill_sql/storage/
holt.rs

1use std::path::{Path, PathBuf};
2use std::sync::atomic::{AtomicU64, Ordering as AtomicOrdering};
3use std::sync::Arc;
4use std::{cmp::Ordering, ops::Bound};
5
6use crate::catalog::{
7    Column, DataType, Schema, SchemaRef, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME,
8};
9use crate::error::{QuillSQLError, QuillSQLResult};
10use crate::storage::codec::TupleCodec;
11use crate::storage::engine::{IndexHandle, IndexScanRequest, TableHandle, TupleStream};
12use crate::storage::record::{RecordId, TupleMeta};
13use crate::storage::tuple::Tuple;
14use crate::transaction::{TransactionId, TransactionStatus, TxnContext};
15use crate::utils::scalar::ScalarValue;
16use crate::utils::table_ref::TableReference;
17
18const CATALOG_TREE: &str = "catalog";
19const TXN_TREE: &str = "txn";
20const NEXT_TABLE_ID: &[u8] = b"next/table-id";
21const NEXT_INDEX_ID: &[u8] = b"next/index-id";
22const NEXT_RID: &[u8] = b"next/rid";
23const TABLE_PREFIX: &str = "table";
24const INDEX_PREFIX: &str = "index";
25const TABLE_SCHEMA_PREFIX: &str = "table-schema";
26const INDEX_SCHEMA_PREFIX: &str = "index-schema";
27
28#[derive(Clone)]
29pub struct HoltStore {
30    db: Arc<holt::DB>,
31    next_table_id: Arc<AtomicU64>,
32    next_index_id: Arc<AtomicU64>,
33    next_rid: Arc<AtomicU64>,
34    dir: PathBuf,
35}
36
37impl std::fmt::Debug for HoltStore {
38    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
39        f.debug_struct("HoltStore")
40            .field("dir", &self.dir)
41            .finish_non_exhaustive()
42    }
43}
44
45impl HoltStore {
46    pub fn open(dir: impl AsRef<Path>) -> QuillSQLResult<Self> {
47        let dir = dir.as_ref().to_path_buf();
48        let db = Arc::new(holt::DB::open(holt::TreeConfig::new(&dir)).map_err(map_holt_err)?);
49        db.open_or_create_tree(CATALOG_TREE).map_err(map_holt_err)?;
50        db.open_or_create_tree(TXN_TREE).map_err(map_holt_err)?;
51
52        let next_table_id = read_counter(&db, NEXT_TABLE_ID, 1)?;
53        let next_index_id = read_counter(&db, NEXT_INDEX_ID, 1)?;
54        let next_rid = read_counter(&db, NEXT_RID, 1)?;
55
56        Ok(Self {
57            db,
58            next_table_id: Arc::new(AtomicU64::new(next_table_id)),
59            next_index_id: Arc::new(AtomicU64::new(next_index_id)),
60            next_rid: Arc::new(AtomicU64::new(next_rid)),
61            dir,
62        })
63    }
64
65    pub fn db(&self) -> Arc<holt::DB> {
66        self.db.clone()
67    }
68
69    pub fn table_tree_name(table_id: u64) -> String {
70        format!("table/{table_id}")
71    }
72
73    pub fn index_tree_name(index_id: u64) -> String {
74        format!("index/{index_id}")
75    }
76
77    pub fn canonical_table_name(table_ref: &TableReference) -> String {
78        let catalog = table_ref.catalog().unwrap_or(DEFAULT_CATALOG_NAME);
79        let schema = table_ref.schema().unwrap_or(DEFAULT_SCHEMA_NAME);
80        format!("{catalog}.{schema}.{}", table_ref.table())
81    }
82
83    pub fn table_descriptor(&self, table_ref: &TableReference) -> QuillSQLResult<Option<u64>> {
84        let key = table_key(&Self::canonical_table_name(table_ref));
85        let catalog = self.db.open_tree(CATALOG_TREE).map_err(map_holt_err)?;
86        catalog
87            .get(&key)
88            .map_err(map_holt_err)?
89            .map(|raw| decode_u64(&raw))
90            .transpose()
91    }
92
93    pub fn index_descriptor(
94        &self,
95        table_ref: &TableReference,
96        index_name: &str,
97    ) -> QuillSQLResult<Option<u64>> {
98        let table = Self::canonical_table_name(table_ref);
99        let key = index_key(&table, index_name);
100        let catalog = self.db.open_tree(CATALOG_TREE).map_err(map_holt_err)?;
101        catalog
102            .get(&key)
103            .map_err(map_holt_err)?
104            .map(|raw| decode_u64(&raw))
105            .transpose()
106    }
107
108    pub fn create_table_descriptor(
109        &self,
110        table_ref: &TableReference,
111        schema: &Schema,
112    ) -> QuillSQLResult<u64> {
113        let table = Self::canonical_table_name(table_ref);
114        let key = table_key(&table);
115        let schema_key = table_schema_key(&table);
116        let schema_value = encode_schema(schema)?;
117        let table_id = self.next_table_id.fetch_add(1, AtomicOrdering::SeqCst);
118        let next_id = table_id
119            .checked_add(1)
120            .ok_or_else(|| QuillSQLError::Storage("Holt table id overflow".to_string()))?;
121        let tree_name = Self::table_tree_name(table_id);
122        self.db
123            .open_or_create_tree(&tree_name)
124            .map_err(map_holt_err)?;
125        self.db
126            .atomic(|batch| {
127                batch.put_if_absent(CATALOG_TREE, &key, &encode_u64(table_id));
128                batch.put(CATALOG_TREE, &schema_key, &schema_value);
129                batch.put(CATALOG_TREE, NEXT_TABLE_ID, &encode_u64(next_id));
130            })
131            .map_err(map_holt_err)
132            .and_then(committed)?;
133        Ok(table_id)
134    }
135
136    pub fn create_index_descriptor(
137        &self,
138        table_ref: &TableReference,
139        index_name: &str,
140        key_schema: &Schema,
141    ) -> QuillSQLResult<u64> {
142        let table = Self::canonical_table_name(table_ref);
143        let key = index_key(&table, index_name);
144        let schema_key = index_schema_key(&table, index_name);
145        let schema_value = encode_schema(key_schema)?;
146        let index_id = self.next_index_id.fetch_add(1, AtomicOrdering::SeqCst);
147        let next_id = index_id
148            .checked_add(1)
149            .ok_or_else(|| QuillSQLError::Storage("Holt index id overflow".to_string()))?;
150        let tree_name = Self::index_tree_name(index_id);
151        self.db
152            .open_or_create_tree(&tree_name)
153            .map_err(map_holt_err)?;
154        self.db
155            .atomic(|batch| {
156                batch.put_if_absent(CATALOG_TREE, &key, &encode_u64(index_id));
157                batch.put(CATALOG_TREE, &schema_key, &schema_value);
158                batch.put(CATALOG_TREE, NEXT_INDEX_ID, &encode_u64(next_id));
159            })
160            .map_err(map_holt_err)
161            .and_then(committed)?;
162        Ok(index_id)
163    }
164
165    pub fn drop_table_descriptor(&self, table_ref: &TableReference) -> QuillSQLResult<()> {
166        let table = Self::canonical_table_name(table_ref);
167        let table_id = self.table_descriptor(table_ref)?;
168        let key = table_key(&table);
169        let schema_key = table_schema_key(&table);
170        self.db
171            .atomic(|batch| {
172                batch.delete(CATALOG_TREE, &key);
173                batch.delete(CATALOG_TREE, &schema_key);
174            })
175            .map_err(map_holt_err)
176            .and_then(committed)?;
177        if let Some(table_id) = table_id {
178            self.drop_tree_if_exists(&Self::table_tree_name(table_id))?;
179        }
180        Ok(())
181    }
182
183    pub fn drop_index_descriptor(
184        &self,
185        table_ref: &TableReference,
186        index_name: &str,
187    ) -> QuillSQLResult<()> {
188        let table = Self::canonical_table_name(table_ref);
189        let index_id = self.index_descriptor(table_ref, index_name)?;
190        let key = index_key(&table, index_name);
191        let schema_key = index_schema_key(&table, index_name);
192        self.db
193            .atomic(|batch| {
194                batch.delete(CATALOG_TREE, &key);
195                batch.delete(CATALOG_TREE, &schema_key);
196            })
197            .map_err(map_holt_err)
198            .and_then(committed)?;
199        if let Some(index_id) = index_id {
200            self.drop_tree_if_exists(&Self::index_tree_name(index_id))?;
201        }
202        Ok(())
203    }
204
205    pub fn allocate_rid(&self) -> QuillSQLResult<RecordId> {
206        let (rid, next) = self.reserve_rid()?;
207        self.db
208            .atomic(|batch| {
209                batch.put(CATALOG_TREE, NEXT_RID, &encode_u64(next));
210            })
211            .map_err(map_holt_err)
212            .and_then(committed)?;
213        Ok(rid)
214    }
215
216    pub fn insert_system_row(&self, table_id: u64, tuple: &Tuple) -> QuillSQLResult<RecordId> {
217        let (rid, next_rid) = self.reserve_rid()?;
218        let table_tree = Self::table_tree_name(table_id);
219        let row_key = encode_rid(rid);
220        let row_value = encode_row(TupleMeta::new(0, 0), tuple);
221        self.db
222            .atomic(|batch| {
223                batch.put(CATALOG_TREE, NEXT_RID, &encode_u64(next_rid));
224                batch.put(&table_tree, &row_key, &row_value);
225            })
226            .map_err(map_holt_err)
227            .and_then(committed)?;
228        Ok(rid)
229    }
230
231    pub fn mark_system_row_deleted(
232        &self,
233        table_id: u64,
234        schema: SchemaRef,
235        rid: RecordId,
236    ) -> QuillSQLResult<()> {
237        let table_tree = Self::table_tree_name(table_id);
238        let tree = self.db.open_tree(&table_tree).map_err(map_holt_err)?;
239        let row_key = encode_rid(rid);
240        let Some(raw) = tree.get(&row_key).map_err(map_holt_err)? else {
241            return Ok(());
242        };
243        let (mut meta, tuple) = decode_row(&raw, schema)?;
244        if meta.is_deleted {
245            return Ok(());
246        }
247        meta.mark_deleted(0, 0);
248        tree.put(&row_key, &encode_row(meta, &tuple))
249            .map_err(map_holt_err)
250    }
251
252    pub fn reserve_rid(&self) -> QuillSQLResult<(RecordId, u64)> {
253        let raw = self.next_rid.fetch_add(1, AtomicOrdering::SeqCst);
254        let next = raw
255            .checked_add(1)
256            .ok_or_else(|| QuillSQLError::Storage("Holt RID overflow".to_string()))?;
257        let page_id = (raw >> 32) as u32;
258        let slot_num = raw as u32;
259        Ok((RecordId::new(page_id, slot_num), next))
260    }
261
262    fn drop_tree_if_exists(&self, tree: &str) -> QuillSQLResult<()> {
263        match self.db.drop_tree(tree) {
264            Ok(()) => Ok(()),
265            Err(holt::Error::TreeNotFound { .. }) => Ok(()),
266            Err(err) => Err(map_holt_err(err)),
267        }
268    }
269
270    pub fn put_txn_status(
271        &self,
272        txn_id: TransactionId,
273        status: TransactionStatus,
274    ) -> QuillSQLResult<()> {
275        let tree = self.db.open_tree(TXN_TREE).map_err(map_holt_err)?;
276        tree.put(&encode_u64(txn_id), &[encode_txn_status(status)])
277            .map_err(map_holt_err)
278    }
279
280    pub fn txn_status(&self, txn_id: TransactionId) -> QuillSQLResult<Option<TransactionStatus>> {
281        let tree = self.db.open_tree(TXN_TREE).map_err(map_holt_err)?;
282        tree.get(&encode_u64(txn_id))
283            .map_err(map_holt_err)?
284            .map(|raw| decode_txn_status(raw.first().copied().unwrap_or(0)))
285            .transpose()
286    }
287
288    pub fn recover_txn_statuses(&self) -> QuillSQLResult<Vec<(TransactionId, TransactionStatus)>> {
289        let tree = self.db.open_tree(TXN_TREE).map_err(map_holt_err)?;
290        let mut statuses = Vec::new();
291        for entry in tree.range().into_iter() {
292            let entry = entry.map_err(map_holt_err)?;
293            let holt::RangeEntry::Key { key, value, .. } = entry else {
294                continue;
295            };
296            let txn_id = decode_u64(&key)?;
297            let mut status = decode_txn_status(value.first().copied().unwrap_or(0))?;
298            if matches!(
299                status,
300                TransactionStatus::InProgress | TransactionStatus::Unknown
301            ) {
302                status = TransactionStatus::Aborted;
303                tree.put(&key, &[encode_txn_status(status)])
304                    .map_err(map_holt_err)?;
305            }
306            statuses.push((txn_id, status));
307        }
308        Ok(statuses)
309    }
310
311    pub fn table_descriptors(&self) -> QuillSQLResult<Vec<HoltTableDescriptor>> {
312        let catalog = self.db.open_tree(CATALOG_TREE).map_err(map_holt_err)?;
313        let mut descriptors = Vec::new();
314        for entry in catalog.scan(b"table/").into_iter() {
315            let entry = entry.map_err(map_holt_err)?;
316            let holt::RangeEntry::Key { key, value, .. } = entry else {
317                continue;
318            };
319            let canonical = std::str::from_utf8(&key[b"table/".len()..])
320                .map_err(|err| QuillSQLError::Storage(format!("invalid Holt table key: {err}")))?;
321            let schema_key = table_schema_key(canonical);
322            let Some(schema_raw) = catalog.get(&schema_key).map_err(map_holt_err)? else {
323                continue;
324            };
325            let table_ref = parse_canonical_table_name(canonical)?;
326            let schema = Arc::new(decode_schema(&schema_raw)?);
327            let table_id = decode_u64(&value)?;
328            descriptors.push(HoltTableDescriptor {
329                table_name: table_ref.table().to_string(),
330                table_ref,
331                schema,
332                table_id,
333            });
334        }
335        Ok(descriptors)
336    }
337
338    pub fn index_descriptors(&self) -> QuillSQLResult<Vec<HoltIndexDescriptor>> {
339        let catalog = self.db.open_tree(CATALOG_TREE).map_err(map_holt_err)?;
340        let mut descriptors = Vec::new();
341        for entry in catalog.scan(b"index/").into_iter() {
342            let entry = entry.map_err(map_holt_err)?;
343            let holt::RangeEntry::Key { key, value, .. } = entry else {
344                continue;
345            };
346            let raw = std::str::from_utf8(&key[b"index/".len()..])
347                .map_err(|err| QuillSQLError::Storage(format!("invalid Holt index key: {err}")))?;
348            let Some((canonical, index_name)) = raw.rsplit_once('/') else {
349                continue;
350            };
351            let schema_key = index_schema_key(canonical, index_name);
352            let Some(schema_raw) = catalog.get(&schema_key).map_err(map_holt_err)? else {
353                continue;
354            };
355            descriptors.push(HoltIndexDescriptor {
356                table_ref: parse_canonical_table_name(canonical)?,
357                index_name: index_name.to_string(),
358                key_schema: Arc::new(decode_schema(&schema_raw)?),
359                index_id: decode_u64(&value)?,
360            });
361        }
362        Ok(descriptors)
363    }
364}
365
366#[derive(Debug, Clone)]
367pub struct HoltTableDescriptor {
368    pub table_name: String,
369    pub table_ref: TableReference,
370    pub schema: SchemaRef,
371    pub table_id: u64,
372}
373
374#[derive(Debug, Clone)]
375pub struct HoltIndexDescriptor {
376    pub table_ref: TableReference,
377    pub index_name: String,
378    pub key_schema: SchemaRef,
379    pub index_id: u64,
380}
381
382#[derive(Clone)]
383pub struct HoltTableHandle {
384    table_ref: TableReference,
385    schema: SchemaRef,
386    table_id: u64,
387    store: Arc<HoltStore>,
388}
389
390impl HoltTableHandle {
391    pub fn new(
392        table_ref: TableReference,
393        schema: SchemaRef,
394        table_id: u64,
395        store: Arc<HoltStore>,
396    ) -> Self {
397        Self {
398            table_ref,
399            schema,
400            table_id,
401            store,
402        }
403    }
404
405    fn tree_name(&self) -> String {
406        HoltStore::table_tree_name(self.table_id)
407    }
408
409    fn row_value(&self, meta: TupleMeta, tuple: &Tuple) -> Vec<u8> {
410        encode_row(meta, tuple)
411    }
412
413    fn put_txn_in_progress(&self, txn_id: TransactionId) -> QuillSQLResult<()> {
414        self.store
415            .put_txn_status(txn_id, TransactionStatus::InProgress)
416    }
417}
418
419impl TableHandle for HoltTableHandle {
420    fn table_ref(&self) -> &TableReference {
421        &self.table_ref
422    }
423
424    fn schema(&self) -> SchemaRef {
425        self.schema.clone()
426    }
427
428    fn full_scan(&self) -> QuillSQLResult<Box<dyn TupleStream>> {
429        let tree = self
430            .store
431            .db
432            .open_tree(&self.tree_name())
433            .map_err(map_holt_err)?;
434        Ok(Box::new(HoltTableStream {
435            iter: tree.range().into_iter(),
436            schema: self.schema.clone(),
437        }))
438    }
439
440    fn full_tuple(&self, rid: RecordId) -> QuillSQLResult<(TupleMeta, Tuple)> {
441        let tree = self
442            .store
443            .db
444            .open_tree(&self.tree_name())
445            .map_err(map_holt_err)?;
446        let key = encode_rid(rid);
447        let Some(value) = tree.get(&key).map_err(map_holt_err)? else {
448            return Err(QuillSQLError::Storage(format!(
449                "Holt tuple {} not found",
450                rid
451            )));
452        };
453        decode_row(&value, self.schema.clone())
454    }
455
456    fn insert(
457        &self,
458        txn: &mut TxnContext<'_>,
459        tuple: &Tuple,
460        indexes: &[Arc<dyn IndexHandle>],
461    ) -> QuillSQLResult<()> {
462        txn.ensure_writable(self.table_ref(), "INSERT")?;
463        self.put_txn_in_progress(txn.txn_id())?;
464        let (rid, next_rid) = self.store.reserve_rid()?;
465        let meta = TupleMeta::new(txn.txn_id(), txn.command_id());
466        let row_key = encode_rid(rid);
467        let row_value = self.row_value(meta, tuple);
468        let mut index_links = Vec::new();
469        let mut index_puts = Vec::new();
470        for handle in indexes {
471            let index_id = handle.index_id();
472            if let Ok(key_tuple) = tuple.project_with_schema(handle.key_schema()) {
473                let encoded_key = encode_index_key(&key_tuple, rid)?;
474                index_puts.push((index_id, encoded_key));
475                index_links.push((handle.clone(), key_tuple, rid));
476            }
477        }
478        let table_tree = self.tree_name();
479        let rid_value = encode_rid(rid);
480        self.store
481            .db
482            .atomic(|batch| {
483                batch.put(CATALOG_TREE, NEXT_RID, &encode_u64(next_rid));
484                batch.put(&table_tree, &row_key, &row_value);
485                for (index_id, key) in &index_puts {
486                    let index_tree = HoltStore::index_tree_name(*index_id);
487                    batch.put(&index_tree, key, &rid_value);
488                }
489            })
490            .map_err(map_holt_err)
491            .and_then(committed)?;
492        txn.transaction_mut()
493            .push_insert_undo(Arc::new(self.clone()), rid, index_links);
494        Ok(())
495    }
496
497    fn delete(
498        &self,
499        txn: &mut TxnContext<'_>,
500        rid: RecordId,
501        prev_meta: TupleMeta,
502        prev_tuple: Tuple,
503        indexes: &[Arc<dyn IndexHandle>],
504    ) -> QuillSQLResult<()> {
505        txn.ensure_writable(self.table_ref(), "DELETE")?;
506        self.put_txn_in_progress(txn.txn_id())?;
507        let mut deleted_meta = prev_meta;
508        if deleted_meta.is_deleted {
509            return Ok(());
510        }
511        deleted_meta.mark_deleted(txn.txn_id(), txn.command_id());
512        let row_key = encode_rid(rid);
513        let row_value = self.row_value(deleted_meta, &prev_tuple);
514        let mut index_links = Vec::new();
515        let mut index_deletes = Vec::new();
516        for handle in indexes {
517            let index_id = handle.index_id();
518            if let Ok(key_tuple) = prev_tuple.project_with_schema(handle.key_schema()) {
519                index_deletes.push((index_id, encode_index_key(&key_tuple, rid)?));
520                index_links.push((handle.clone(), key_tuple, rid));
521            }
522        }
523        let table_tree = self.tree_name();
524        self.store
525            .db
526            .atomic(|batch| {
527                batch.put(&table_tree, &row_key, &row_value);
528                for (index_id, key) in &index_deletes {
529                    let index_tree = HoltStore::index_tree_name(*index_id);
530                    batch.delete(&index_tree, key);
531                }
532            })
533            .map_err(map_holt_err)
534            .and_then(committed)?;
535        txn.transaction_mut().push_delete_undo(
536            Arc::new(self.clone()),
537            rid,
538            prev_meta,
539            prev_tuple,
540            index_links,
541        );
542        Ok(())
543    }
544
545    fn update(
546        &self,
547        txn: &mut TxnContext<'_>,
548        rid: RecordId,
549        new_tuple: Tuple,
550        prev_meta: TupleMeta,
551        prev_tuple: Tuple,
552        indexes: &[Arc<dyn IndexHandle>],
553    ) -> QuillSQLResult<RecordId> {
554        txn.ensure_writable(self.table_ref(), "UPDATE")?;
555        self.put_txn_in_progress(txn.txn_id())?;
556        if prev_meta.is_deleted && prev_meta.next_version.is_some() {
557            return Err(QuillSQLError::Execution(format!(
558                "tuple {} has already been updated",
559                rid
560            )));
561        }
562        let (new_rid, next_rid) = self.store.reserve_rid()?;
563        let mut old_meta = prev_meta;
564        old_meta.set_next_version(Some(new_rid));
565        old_meta.mark_deleted(txn.txn_id(), txn.command_id());
566        let mut new_meta = TupleMeta::new(txn.txn_id(), txn.command_id());
567        new_meta.set_prev_version(Some(rid));
568
569        let mut old_links = Vec::new();
570        let mut new_links = Vec::new();
571        let mut old_index_deletes = Vec::new();
572        let mut new_index_puts = Vec::new();
573        for handle in indexes {
574            let index_id = handle.index_id();
575            if let Ok(old_key) = prev_tuple.project_with_schema(handle.key_schema()) {
576                old_index_deletes.push((index_id, encode_index_key(&old_key, rid)?));
577                old_links.push((handle.clone(), old_key, rid));
578            }
579            if let Ok(new_key) = new_tuple.project_with_schema(handle.key_schema()) {
580                new_index_puts.push((index_id, encode_index_key(&new_key, new_rid)?));
581                new_links.push((handle.clone(), new_key, new_rid));
582            }
583        }
584
585        let table_tree = self.tree_name();
586        let old_row_key = encode_rid(rid);
587        let new_row_key = encode_rid(new_rid);
588        let old_row_value = self.row_value(old_meta, &prev_tuple);
589        let new_row_value = self.row_value(new_meta, &new_tuple);
590        let new_rid_value = encode_rid(new_rid);
591        self.store
592            .db
593            .atomic(|batch| {
594                batch.put(CATALOG_TREE, NEXT_RID, &encode_u64(next_rid));
595                batch.put(&table_tree, &old_row_key, &old_row_value);
596                batch.put(&table_tree, &new_row_key, &new_row_value);
597                for (index_id, key) in &old_index_deletes {
598                    let index_tree = HoltStore::index_tree_name(*index_id);
599                    batch.delete(&index_tree, key);
600                }
601                for (index_id, key) in &new_index_puts {
602                    let index_tree = HoltStore::index_tree_name(*index_id);
603                    batch.put(&index_tree, key, &new_rid_value);
604                }
605            })
606            .map_err(map_holt_err)
607            .and_then(committed)?;
608        txn.transaction_mut().push_update_undo(
609            Arc::new(self.clone()),
610            crate::transaction::UpdateUndo {
611                old_rid: rid,
612                new_rid,
613                prev_meta,
614                prev_tuple,
615                new_keys: new_links,
616                old_keys: old_links,
617            },
618        );
619        Ok(new_rid)
620    }
621
622    fn prepare_row_for_write(
623        &self,
624        txn: &mut TxnContext<'_>,
625        rid: RecordId,
626        observed_meta: &TupleMeta,
627    ) -> QuillSQLResult<Option<(TupleMeta, Tuple)>> {
628        if !txn.is_visible(observed_meta) {
629            return Ok(None);
630        }
631        txn.lock_row_exclusive(self.table_ref(), rid)?;
632        let (current_meta, current_tuple) = self.full_tuple(rid)?;
633        if !txn.is_visible(&current_meta) {
634            txn.unlock_row(self.table_ref(), rid);
635            return Ok(None);
636        }
637        Ok(Some((current_meta, current_tuple)))
638    }
639
640    fn undo_insert(&self, rid: RecordId, txn_id: TransactionId) -> QuillSQLResult<()> {
641        self.store
642            .put_txn_status(txn_id, TransactionStatus::InProgress)?;
643        let (mut meta, tuple) = self.full_tuple(rid)?;
644        meta.mark_deleted(txn_id, 0);
645        let tree = self
646            .store
647            .db
648            .open_tree(&self.tree_name())
649            .map_err(map_holt_err)?;
650        tree.put(&encode_rid(rid), &self.row_value(meta, &tuple))
651            .map_err(map_holt_err)
652    }
653
654    fn undo_delete(
655        &self,
656        rid: RecordId,
657        prev_meta: TupleMeta,
658        prev_tuple: Tuple,
659    ) -> QuillSQLResult<()> {
660        let tree = self
661            .store
662            .db
663            .open_tree(&self.tree_name())
664            .map_err(map_holt_err)?;
665        tree.put(&encode_rid(rid), &self.row_value(prev_meta, &prev_tuple))
666            .map_err(map_holt_err)
667    }
668}
669
670pub struct HoltIndexHandle {
671    name: String,
672    key_schema: SchemaRef,
673    index_id: u64,
674    store: Arc<HoltStore>,
675}
676
677impl HoltIndexHandle {
678    pub fn new(name: String, key_schema: SchemaRef, index_id: u64, store: Arc<HoltStore>) -> Self {
679        Self {
680            name,
681            key_schema,
682            index_id,
683            store,
684        }
685    }
686
687    fn tree_name(&self) -> String {
688        HoltStore::index_tree_name(self.index_id)
689    }
690}
691
692impl IndexHandle for HoltIndexHandle {
693    fn name(&self) -> &str {
694        &self.name
695    }
696
697    fn key_schema(&self) -> SchemaRef {
698        self.key_schema.clone()
699    }
700
701    fn index_id(&self) -> u64 {
702        self.index_id
703    }
704
705    fn insert(&self, key: &Tuple, rid: RecordId, txn_id: TransactionId) -> QuillSQLResult<()> {
706        self.store
707            .put_txn_status(txn_id, TransactionStatus::InProgress)?;
708        let tree = self
709            .store
710            .db
711            .open_tree(&self.tree_name())
712            .map_err(map_holt_err)?;
713        tree.put(&encode_index_key(key, rid)?, &encode_rid(rid))
714            .map_err(map_holt_err)
715    }
716
717    fn delete(&self, key: &Tuple, rid: RecordId, txn_id: TransactionId) -> QuillSQLResult<()> {
718        self.store
719            .put_txn_status(txn_id, TransactionStatus::InProgress)?;
720        let tree = self
721            .store
722            .db
723            .open_tree(&self.tree_name())
724            .map_err(map_holt_err)?;
725        tree.delete(&encode_index_key(key, rid)?)
726            .map_err(map_holt_err)?;
727        Ok(())
728    }
729
730    fn range_scan(
731        &self,
732        table: Arc<dyn TableHandle>,
733        request: IndexScanRequest,
734    ) -> QuillSQLResult<Box<dyn TupleStream>> {
735        let tree = self
736            .store
737            .db
738            .open_tree(&self.tree_name())
739            .map_err(map_holt_err)?;
740        let start_after = index_start_after_key(&request)?;
741        let mut range = tree.range();
742        if let Some(key) = start_after.as_deref() {
743            range = range.start_after(key);
744        }
745        Ok(Box::new(HoltIndexStream {
746            iter: range.into_iter(),
747            table,
748            key_schema: self.key_schema.clone(),
749            request,
750        }))
751    }
752}
753
754struct HoltTableStream {
755    iter: holt::RangeIter,
756    schema: SchemaRef,
757}
758
759impl TupleStream for HoltTableStream {
760    fn next(&mut self) -> QuillSQLResult<Option<(RecordId, TupleMeta, Tuple)>> {
761        for entry in self.iter.by_ref() {
762            let entry = entry.map_err(map_holt_err)?;
763            let holt::RangeEntry::Key { key, value, .. } = entry else {
764                continue;
765            };
766            let rid = decode_rid(&key)?;
767            let (meta, tuple) = decode_row(&value, self.schema.clone())?;
768            return Ok(Some((rid, meta, tuple)));
769        }
770        Ok(None)
771    }
772}
773
774struct HoltIndexStream {
775    iter: holt::RangeIter,
776    table: Arc<dyn TableHandle>,
777    key_schema: SchemaRef,
778    request: IndexScanRequest,
779}
780
781impl TupleStream for HoltIndexStream {
782    fn next(&mut self) -> QuillSQLResult<Option<(RecordId, TupleMeta, Tuple)>> {
783        for entry in self.iter.by_ref() {
784            let entry = entry.map_err(map_holt_err)?;
785            let holt::RangeEntry::Key { value, .. } = entry else {
786                continue;
787            };
788            let rid = decode_rid(&value)?;
789            let Ok((meta, tuple)) = self.table.full_tuple(rid) else {
790                continue;
791            };
792            let key_tuple = tuple.project_with_schema(self.key_schema.clone())?;
793            if tuple_above_end(&key_tuple, &self.request) {
794                return Ok(None);
795            }
796            if tuple_within_bounds(&key_tuple, &self.request) {
797                return Ok(Some((rid, meta, tuple)));
798            }
799        }
800        Ok(None)
801    }
802}
803
804fn encode_row(meta: TupleMeta, tuple: &Tuple) -> Vec<u8> {
805    let mut out = Vec::new();
806    out.extend(meta.insert_txn_id.to_be_bytes());
807    out.extend(meta.insert_cid.to_be_bytes());
808    out.extend(meta.delete_txn_id.to_be_bytes());
809    out.extend(meta.delete_cid.to_be_bytes());
810    out.push(u8::from(meta.is_deleted));
811    encode_optional_rid(&mut out, meta.next_version);
812    encode_optional_rid(&mut out, meta.prev_version);
813    out.extend(TupleCodec::encode(tuple));
814    out
815}
816
817fn decode_row(bytes: &[u8], schema: SchemaRef) -> QuillSQLResult<(TupleMeta, Tuple)> {
818    let (meta, offset) = decode_tuple_meta(bytes)?;
819    let (tuple, _) = TupleCodec::decode(&bytes[offset..], schema)?;
820    Ok((meta, tuple))
821}
822
823fn encode_optional_rid(out: &mut Vec<u8>, rid: Option<RecordId>) {
824    match rid {
825        Some(rid) => {
826            out.push(1);
827            out.extend(encode_rid(rid));
828        }
829        None => out.push(0),
830    }
831}
832
833fn decode_tuple_meta(bytes: &[u8]) -> QuillSQLResult<(TupleMeta, usize)> {
834    let mut offset = 0usize;
835    let insert_txn_id = read_u64(bytes, &mut offset)?;
836    let insert_cid = read_u32(bytes, &mut offset)?;
837    let delete_txn_id = read_u64(bytes, &mut offset)?;
838    let delete_cid = read_u32(bytes, &mut offset)?;
839    let is_deleted = read_u8(bytes, &mut offset)? != 0;
840    let next_version = read_optional_rid(bytes, &mut offset)?;
841    let prev_version = read_optional_rid(bytes, &mut offset)?;
842    Ok((
843        TupleMeta {
844            insert_txn_id,
845            insert_cid,
846            delete_txn_id,
847            delete_cid,
848            is_deleted,
849            next_version,
850            prev_version,
851        },
852        offset,
853    ))
854}
855
856fn read_optional_rid(bytes: &[u8], offset: &mut usize) -> QuillSQLResult<Option<RecordId>> {
857    let present = read_u8(bytes, offset)?;
858    if present == 0 {
859        return Ok(None);
860    }
861    let rid_bytes = take(bytes, offset, 8)?;
862    decode_rid(rid_bytes).map(Some)
863}
864
865fn read_u8(bytes: &[u8], offset: &mut usize) -> QuillSQLResult<u8> {
866    Ok(take(bytes, offset, 1)?[0])
867}
868
869fn read_u32(bytes: &[u8], offset: &mut usize) -> QuillSQLResult<u32> {
870    let raw = take(bytes, offset, 4)?;
871    Ok(u32::from_be_bytes(raw.try_into().unwrap()))
872}
873
874fn read_u64(bytes: &[u8], offset: &mut usize) -> QuillSQLResult<u64> {
875    let raw = take(bytes, offset, 8)?;
876    Ok(u64::from_be_bytes(raw.try_into().unwrap()))
877}
878
879fn take<'a>(bytes: &'a [u8], offset: &mut usize, len: usize) -> QuillSQLResult<&'a [u8]> {
880    let end = offset
881        .checked_add(len)
882        .ok_or_else(|| QuillSQLError::Storage("Holt row offset overflow".to_string()))?;
883    if end > bytes.len() {
884        return Err(QuillSQLError::Storage(format!(
885            "Holt row is truncated: need {} bytes at offset {}, len {}",
886            len,
887            *offset,
888            bytes.len()
889        )));
890    }
891    let out = &bytes[*offset..end];
892    *offset = end;
893    Ok(out)
894}
895
896pub fn encode_index_key(tuple: &Tuple, rid: RecordId) -> QuillSQLResult<Vec<u8>> {
897    let mut out = Vec::new();
898    for (value, col) in tuple.data.iter().zip(tuple.schema.columns.iter()) {
899        encode_ordered_scalar(&mut out, value, col.data_type)?;
900    }
901    out.push(0xff);
902    out.extend(encode_rid(rid));
903    Ok(out)
904}
905
906fn index_start_after_key(request: &IndexScanRequest) -> QuillSQLResult<Option<Vec<u8>>> {
907    match &request.start {
908        Bound::Included(start) => {
909            let min_rid = RecordId::new(0, 0);
910            Ok(Some(encode_index_key(start, min_rid)?))
911        }
912        Bound::Excluded(start) => {
913            let max_rid = RecordId::new(u32::MAX, u32::MAX);
914            Ok(Some(encode_index_key(start, max_rid)?))
915        }
916        Bound::Unbounded => Ok(None),
917    }
918}
919
920fn encode_ordered_scalar(
921    out: &mut Vec<u8>,
922    value: &ScalarValue,
923    _data_type: DataType,
924) -> QuillSQLResult<()> {
925    if value.is_null() {
926        out.push(0);
927        return Ok(());
928    }
929    out.push(1);
930    match value {
931        ScalarValue::Boolean(Some(v)) => out.push(u8::from(*v)),
932        ScalarValue::Int8(Some(v)) => out.push((*v as u8) ^ 0x80),
933        ScalarValue::Int16(Some(v)) => out.extend(((*v as u16) ^ 0x8000).to_be_bytes()),
934        ScalarValue::Int32(Some(v)) => out.extend(((*v as u32) ^ 0x8000_0000).to_be_bytes()),
935        ScalarValue::Int64(Some(v)) => {
936            out.extend(((*v as u64) ^ 0x8000_0000_0000_0000).to_be_bytes())
937        }
938        ScalarValue::UInt8(Some(v)) => out.push(*v),
939        ScalarValue::UInt16(Some(v)) => out.extend(v.to_be_bytes()),
940        ScalarValue::UInt32(Some(v)) => out.extend(v.to_be_bytes()),
941        ScalarValue::UInt64(Some(v)) => out.extend(v.to_be_bytes()),
942        ScalarValue::Float32(Some(v)) => out.extend(ordered_f32_bits(*v).to_be_bytes()),
943        ScalarValue::Float64(Some(v)) => out.extend(ordered_f64_bits(*v).to_be_bytes()),
944        ScalarValue::Varchar(Some(v)) => encode_ordered_bytes(out, v.as_bytes()),
945        ScalarValue::Boolean(None)
946        | ScalarValue::Int8(None)
947        | ScalarValue::Int16(None)
948        | ScalarValue::Int32(None)
949        | ScalarValue::Int64(None)
950        | ScalarValue::UInt8(None)
951        | ScalarValue::UInt16(None)
952        | ScalarValue::UInt32(None)
953        | ScalarValue::UInt64(None)
954        | ScalarValue::Float32(None)
955        | ScalarValue::Float64(None)
956        | ScalarValue::Varchar(None) => unreachable!("null handled above"),
957    }
958    Ok(())
959}
960
961fn ordered_f32_bits(value: f32) -> u32 {
962    let bits = value.to_bits();
963    if bits & 0x8000_0000 == 0 {
964        bits ^ 0x8000_0000
965    } else {
966        !bits
967    }
968}
969
970fn ordered_f64_bits(value: f64) -> u64 {
971    let bits = value.to_bits();
972    if bits & 0x8000_0000_0000_0000 == 0 {
973        bits ^ 0x8000_0000_0000_0000
974    } else {
975        !bits
976    }
977}
978
979fn encode_ordered_bytes(out: &mut Vec<u8>, bytes: &[u8]) {
980    for byte in bytes {
981        if *byte == 0 {
982            out.extend([0, 0xff]);
983        } else {
984            out.push(*byte);
985        }
986    }
987    out.extend([0, 0]);
988}
989
990fn tuple_within_bounds(tuple: &Tuple, request: &IndexScanRequest) -> bool {
991    let lower = match &request.start {
992        Bound::Included(start) => tuple.cmp(start) != Ordering::Less,
993        Bound::Excluded(start) => tuple.cmp(start) == Ordering::Greater,
994        Bound::Unbounded => true,
995    };
996    if !lower {
997        return false;
998    }
999    match &request.end {
1000        Bound::Included(end) => tuple.cmp(end) != Ordering::Greater,
1001        Bound::Excluded(end) => tuple.cmp(end) == Ordering::Less,
1002        Bound::Unbounded => true,
1003    }
1004}
1005
1006fn tuple_above_end(tuple: &Tuple, request: &IndexScanRequest) -> bool {
1007    match &request.end {
1008        Bound::Included(end) => tuple.cmp(end) == Ordering::Greater,
1009        Bound::Excluded(end) => tuple.cmp(end) != Ordering::Less,
1010        Bound::Unbounded => false,
1011    }
1012}
1013
1014fn read_counter(db: &holt::DB, key: &[u8], default_value: u64) -> QuillSQLResult<u64> {
1015    let catalog = db.open_tree(CATALOG_TREE).map_err(map_holt_err)?;
1016    match catalog.get(key).map_err(map_holt_err)? {
1017        Some(raw) => decode_u64(&raw),
1018        None => {
1019            catalog
1020                .put(key, &encode_u64(default_value))
1021                .map_err(map_holt_err)?;
1022            Ok(default_value)
1023        }
1024    }
1025}
1026
1027pub fn encode_rid(rid: RecordId) -> [u8; 8] {
1028    let mut out = [0u8; 8];
1029    out[..4].copy_from_slice(&rid.page_id.to_be_bytes());
1030    out[4..].copy_from_slice(&rid.slot_num.to_be_bytes());
1031    out
1032}
1033
1034pub fn decode_rid(bytes: &[u8]) -> QuillSQLResult<RecordId> {
1035    if bytes.len() != 8 {
1036        return Err(QuillSQLError::Storage(format!(
1037            "invalid Holt RID length {}",
1038            bytes.len()
1039        )));
1040    }
1041    let page_id = u32::from_be_bytes(bytes[..4].try_into().unwrap());
1042    let slot_num = u32::from_be_bytes(bytes[4..].try_into().unwrap());
1043    Ok(RecordId::new(page_id, slot_num))
1044}
1045
1046fn table_key(table: &str) -> Vec<u8> {
1047    format!("{TABLE_PREFIX}/{table}").into_bytes()
1048}
1049
1050fn table_schema_key(table: &str) -> Vec<u8> {
1051    format!("{TABLE_SCHEMA_PREFIX}/{table}").into_bytes()
1052}
1053
1054fn index_key(table: &str, index_name: &str) -> Vec<u8> {
1055    format!("{INDEX_PREFIX}/{table}/{index_name}").into_bytes()
1056}
1057
1058fn index_schema_key(table: &str, index_name: &str) -> Vec<u8> {
1059    format!("{INDEX_SCHEMA_PREFIX}/{table}/{index_name}").into_bytes()
1060}
1061
1062fn parse_canonical_table_name(canonical: &str) -> QuillSQLResult<TableReference> {
1063    let parts = canonical.split('.').collect::<Vec<_>>();
1064    match parts.as_slice() {
1065        [catalog, schema, table] => Ok(TableReference::Full {
1066            catalog: (*catalog).to_string(),
1067            schema: (*schema).to_string(),
1068            table: (*table).to_string(),
1069        }),
1070        _ => Err(QuillSQLError::Storage(format!(
1071            "invalid Holt canonical table name {canonical}"
1072        ))),
1073    }
1074}
1075
1076fn encode_schema(schema: &Schema) -> QuillSQLResult<Vec<u8>> {
1077    let mut out = Vec::new();
1078    put_u32(&mut out, schema.columns.len() as u32);
1079    for column in &schema.columns {
1080        put_string(&mut out, &column.name)?;
1081        let data_type = column.data_type.to_string();
1082        put_string(&mut out, &data_type)?;
1083        out.push(u8::from(column.nullable));
1084        let default = column.default.to_string();
1085        put_string(&mut out, &default)?;
1086    }
1087    Ok(out)
1088}
1089
1090fn decode_schema(bytes: &[u8]) -> QuillSQLResult<Schema> {
1091    let mut offset = 0usize;
1092    let column_count = read_u32(bytes, &mut offset)?;
1093    let mut columns = Vec::with_capacity(column_count as usize);
1094    for _ in 0..column_count {
1095        let name = read_string(bytes, &mut offset)?;
1096        let data_type_str = read_string(bytes, &mut offset)?;
1097        let data_type = DataType::try_from(data_type_str.as_str())?;
1098        let nullable = read_u8(bytes, &mut offset)? != 0;
1099        let default_str = read_string(bytes, &mut offset)?;
1100        let default = ScalarValue::from_string(&default_str, data_type)?;
1101        columns.push(Column::new(name, data_type, nullable).with_default(default));
1102    }
1103    Ok(Schema::new(columns))
1104}
1105
1106fn put_u32(out: &mut Vec<u8>, value: u32) {
1107    out.extend(value.to_be_bytes());
1108}
1109
1110fn put_string(out: &mut Vec<u8>, value: &str) -> QuillSQLResult<()> {
1111    let len = u32::try_from(value.len())
1112        .map_err(|_| QuillSQLError::Storage("Holt schema string is too large".to_string()))?;
1113    put_u32(out, len);
1114    out.extend(value.as_bytes());
1115    Ok(())
1116}
1117
1118fn read_string(bytes: &[u8], offset: &mut usize) -> QuillSQLResult<String> {
1119    let len = read_u32(bytes, offset)? as usize;
1120    let raw = take(bytes, offset, len)?;
1121    String::from_utf8(raw.to_vec())
1122        .map_err(|err| QuillSQLError::Storage(format!("invalid Holt schema string: {err}")))
1123}
1124
1125fn encode_u64(value: u64) -> [u8; 8] {
1126    value.to_be_bytes()
1127}
1128
1129fn decode_u64(bytes: &[u8]) -> QuillSQLResult<u64> {
1130    if bytes.len() != 8 {
1131        return Err(QuillSQLError::Storage(format!(
1132            "invalid Holt u64 length {}",
1133            bytes.len()
1134        )));
1135    }
1136    Ok(u64::from_be_bytes(bytes.try_into().unwrap()))
1137}
1138
1139fn committed(committed: bool) -> QuillSQLResult<()> {
1140    if committed {
1141        Ok(())
1142    } else {
1143        Err(QuillSQLError::Storage(
1144            "Holt metadata compare-and-set failed".to_string(),
1145        ))
1146    }
1147}
1148
1149fn encode_txn_status(status: TransactionStatus) -> u8 {
1150    match status {
1151        TransactionStatus::InProgress => 1,
1152        TransactionStatus::Committed => 2,
1153        TransactionStatus::Aborted => 3,
1154        TransactionStatus::Unknown => 4,
1155    }
1156}
1157
1158fn decode_txn_status(raw: u8) -> QuillSQLResult<TransactionStatus> {
1159    match raw {
1160        1 => Ok(TransactionStatus::InProgress),
1161        2 => Ok(TransactionStatus::Committed),
1162        3 => Ok(TransactionStatus::Aborted),
1163        4 => Ok(TransactionStatus::Unknown),
1164        other => Err(QuillSQLError::Storage(format!(
1165            "invalid Holt txn status {other}"
1166        ))),
1167    }
1168}
1169
1170pub(crate) fn map_holt_err(err: holt::Error) -> QuillSQLError {
1171    QuillSQLError::Storage(format!("Holt error: {err}"))
1172}
1173
1174#[cfg(test)]
1175mod tests {
1176    use std::sync::Arc;
1177
1178    use crate::catalog::{Column, DataType, Schema};
1179    use crate::storage::holt::encode_index_key;
1180    use crate::storage::record::RecordId;
1181    use crate::storage::tuple::Tuple;
1182    use crate::utils::scalar::ScalarValue;
1183
1184    fn schema(column_types: &[DataType]) -> Arc<Schema> {
1185        Arc::new(Schema::new(
1186            column_types
1187                .iter()
1188                .enumerate()
1189                .map(|(idx, data_type)| Column::new(format!("c{idx}"), *data_type, true))
1190                .collect(),
1191        ))
1192    }
1193
1194    fn assert_order(column_types: &[DataType], rows: Vec<Vec<ScalarValue>>) {
1195        let schema = schema(column_types);
1196        let tuples = rows
1197            .into_iter()
1198            .map(|row| Tuple::new(schema.clone(), row))
1199            .collect::<Vec<_>>();
1200        for pair in tuples.windows(2) {
1201            let left = &pair[0];
1202            let right = &pair[1];
1203            assert!(
1204                left < right,
1205                "test rows must be in Tuple order: {left:?} !< {right:?}"
1206            );
1207            let left_key = encode_index_key(left, RecordId::new(0, 1)).unwrap();
1208            let right_key = encode_index_key(right, RecordId::new(0, 1)).unwrap();
1209            assert!(
1210                left_key < right_key,
1211                "encoded key order differs from Tuple order for {left:?} and {right:?}"
1212            );
1213        }
1214    }
1215
1216    #[test]
1217    fn ordered_index_key_matches_tuple_order_for_scalars() {
1218        assert_order(
1219            &[DataType::Int32],
1220            vec![
1221                vec![ScalarValue::Int32(None)],
1222                vec![ScalarValue::Int32(Some(-10))],
1223                vec![ScalarValue::Int32(Some(0))],
1224                vec![ScalarValue::Int32(Some(10))],
1225            ],
1226        );
1227        assert_order(
1228            &[DataType::UInt64],
1229            vec![
1230                vec![ScalarValue::UInt64(None)],
1231                vec![ScalarValue::UInt64(Some(0))],
1232                vec![ScalarValue::UInt64(Some(10))],
1233                vec![ScalarValue::UInt64(Some(u64::MAX))],
1234            ],
1235        );
1236        assert_order(
1237            &[DataType::Float64],
1238            vec![
1239                vec![ScalarValue::Float64(None)],
1240                vec![ScalarValue::Float64(Some(f64::NEG_INFINITY))],
1241                vec![ScalarValue::Float64(Some(-1.0))],
1242                vec![ScalarValue::Float64(Some(-0.0))],
1243                vec![ScalarValue::Float64(Some(0.0))],
1244                vec![ScalarValue::Float64(Some(1.0))],
1245                vec![ScalarValue::Float64(Some(f64::INFINITY))],
1246            ],
1247        );
1248        assert_order(
1249            &[DataType::Varchar(None)],
1250            vec![
1251                vec![ScalarValue::Varchar(None)],
1252                vec![ScalarValue::Varchar(Some(String::new()))],
1253                vec![ScalarValue::Varchar(Some("a".to_string()))],
1254                vec![ScalarValue::Varchar(Some("aa".to_string()))],
1255                vec![ScalarValue::Varchar(Some("b".to_string()))],
1256            ],
1257        );
1258    }
1259
1260    #[test]
1261    fn ordered_index_key_handles_composite_and_duplicate_rids() {
1262        assert_order(
1263            &[DataType::Int32, DataType::Varchar(None)],
1264            vec![
1265                vec![
1266                    ScalarValue::Int32(Some(1)),
1267                    ScalarValue::Varchar(Some("a".to_string())),
1268                ],
1269                vec![
1270                    ScalarValue::Int32(Some(1)),
1271                    ScalarValue::Varchar(Some("b".to_string())),
1272                ],
1273                vec![
1274                    ScalarValue::Int32(Some(2)),
1275                    ScalarValue::Varchar(Some("a".to_string())),
1276                ],
1277            ],
1278        );
1279
1280        let schema = schema(&[DataType::Int32]);
1281        let tuple = Tuple::new(schema, vec![ScalarValue::Int32(Some(1))]);
1282        let low = encode_index_key(&tuple, RecordId::new(0, 1)).unwrap();
1283        let high = encode_index_key(&tuple, RecordId::new(0, 2)).unwrap();
1284        assert!(low < high);
1285    }
1286}