1use std::path::{Path, PathBuf};
2use std::sync::atomic::{AtomicU64, Ordering as AtomicOrdering};
3use std::sync::Arc;
4use std::{cmp::Ordering, ops::Bound};
5
6use crate::catalog::{
7 Column, DataType, Schema, SchemaRef, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME,
8};
9use crate::error::{QuillSQLError, QuillSQLResult};
10use crate::storage::codec::TupleCodec;
11use crate::storage::engine::{IndexHandle, IndexScanRequest, TableHandle, TupleStream};
12use crate::storage::record::{RecordId, TupleMeta};
13use crate::storage::tuple::Tuple;
14use crate::transaction::{TransactionId, TransactionStatus, TxnContext};
15use crate::utils::scalar::ScalarValue;
16use crate::utils::table_ref::TableReference;
17
18const CATALOG_TREE: &str = "catalog";
19const TXN_TREE: &str = "txn";
20const NEXT_TABLE_ID: &[u8] = b"next/table-id";
21const NEXT_INDEX_ID: &[u8] = b"next/index-id";
22const NEXT_RID: &[u8] = b"next/rid";
23const TABLE_PREFIX: &str = "table";
24const INDEX_PREFIX: &str = "index";
25const TABLE_SCHEMA_PREFIX: &str = "table-schema";
26const INDEX_SCHEMA_PREFIX: &str = "index-schema";
27
28#[derive(Clone)]
29pub struct HoltStore {
30 db: Arc<holt::DB>,
31 next_table_id: Arc<AtomicU64>,
32 next_index_id: Arc<AtomicU64>,
33 next_rid: Arc<AtomicU64>,
34 dir: PathBuf,
35}
36
37impl std::fmt::Debug for HoltStore {
38 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
39 f.debug_struct("HoltStore")
40 .field("dir", &self.dir)
41 .finish_non_exhaustive()
42 }
43}
44
45impl HoltStore {
46 pub fn open(dir: impl AsRef<Path>) -> QuillSQLResult<Self> {
47 let dir = dir.as_ref().to_path_buf();
48 let db = Arc::new(holt::DB::open(holt::TreeConfig::new(&dir)).map_err(map_holt_err)?);
49 db.open_or_create_tree(CATALOG_TREE).map_err(map_holt_err)?;
50 db.open_or_create_tree(TXN_TREE).map_err(map_holt_err)?;
51
52 let next_table_id = read_counter(&db, NEXT_TABLE_ID, 1)?;
53 let next_index_id = read_counter(&db, NEXT_INDEX_ID, 1)?;
54 let next_rid = read_counter(&db, NEXT_RID, 1)?;
55
56 Ok(Self {
57 db,
58 next_table_id: Arc::new(AtomicU64::new(next_table_id)),
59 next_index_id: Arc::new(AtomicU64::new(next_index_id)),
60 next_rid: Arc::new(AtomicU64::new(next_rid)),
61 dir,
62 })
63 }
64
65 pub fn db(&self) -> Arc<holt::DB> {
66 self.db.clone()
67 }
68
69 pub fn table_tree_name(table_id: u64) -> String {
70 format!("table/{table_id}")
71 }
72
73 pub fn index_tree_name(index_id: u64) -> String {
74 format!("index/{index_id}")
75 }
76
77 pub fn canonical_table_name(table_ref: &TableReference) -> String {
78 let catalog = table_ref.catalog().unwrap_or(DEFAULT_CATALOG_NAME);
79 let schema = table_ref.schema().unwrap_or(DEFAULT_SCHEMA_NAME);
80 format!("{catalog}.{schema}.{}", table_ref.table())
81 }
82
83 pub fn table_descriptor(&self, table_ref: &TableReference) -> QuillSQLResult<Option<u64>> {
84 let key = table_key(&Self::canonical_table_name(table_ref));
85 let catalog = self.db.open_tree(CATALOG_TREE).map_err(map_holt_err)?;
86 catalog
87 .get(&key)
88 .map_err(map_holt_err)?
89 .map(|raw| decode_u64(&raw))
90 .transpose()
91 }
92
93 pub fn index_descriptor(
94 &self,
95 table_ref: &TableReference,
96 index_name: &str,
97 ) -> QuillSQLResult<Option<u64>> {
98 let table = Self::canonical_table_name(table_ref);
99 let key = index_key(&table, index_name);
100 let catalog = self.db.open_tree(CATALOG_TREE).map_err(map_holt_err)?;
101 catalog
102 .get(&key)
103 .map_err(map_holt_err)?
104 .map(|raw| decode_u64(&raw))
105 .transpose()
106 }
107
108 pub fn create_table_descriptor(
109 &self,
110 table_ref: &TableReference,
111 schema: &Schema,
112 ) -> QuillSQLResult<u64> {
113 let table = Self::canonical_table_name(table_ref);
114 let key = table_key(&table);
115 let schema_key = table_schema_key(&table);
116 let schema_value = encode_schema(schema)?;
117 let table_id = self.next_table_id.fetch_add(1, AtomicOrdering::SeqCst);
118 let next_id = table_id
119 .checked_add(1)
120 .ok_or_else(|| QuillSQLError::Storage("Holt table id overflow".to_string()))?;
121 let tree_name = Self::table_tree_name(table_id);
122 self.db
123 .open_or_create_tree(&tree_name)
124 .map_err(map_holt_err)?;
125 self.db
126 .atomic(|batch| {
127 batch.put_if_absent(CATALOG_TREE, &key, &encode_u64(table_id));
128 batch.put(CATALOG_TREE, &schema_key, &schema_value);
129 batch.put(CATALOG_TREE, NEXT_TABLE_ID, &encode_u64(next_id));
130 })
131 .map_err(map_holt_err)
132 .and_then(committed)?;
133 Ok(table_id)
134 }
135
136 pub fn create_index_descriptor(
137 &self,
138 table_ref: &TableReference,
139 index_name: &str,
140 key_schema: &Schema,
141 ) -> QuillSQLResult<u64> {
142 let table = Self::canonical_table_name(table_ref);
143 let key = index_key(&table, index_name);
144 let schema_key = index_schema_key(&table, index_name);
145 let schema_value = encode_schema(key_schema)?;
146 let index_id = self.next_index_id.fetch_add(1, AtomicOrdering::SeqCst);
147 let next_id = index_id
148 .checked_add(1)
149 .ok_or_else(|| QuillSQLError::Storage("Holt index id overflow".to_string()))?;
150 let tree_name = Self::index_tree_name(index_id);
151 self.db
152 .open_or_create_tree(&tree_name)
153 .map_err(map_holt_err)?;
154 self.db
155 .atomic(|batch| {
156 batch.put_if_absent(CATALOG_TREE, &key, &encode_u64(index_id));
157 batch.put(CATALOG_TREE, &schema_key, &schema_value);
158 batch.put(CATALOG_TREE, NEXT_INDEX_ID, &encode_u64(next_id));
159 })
160 .map_err(map_holt_err)
161 .and_then(committed)?;
162 Ok(index_id)
163 }
164
165 pub fn drop_table_descriptor(&self, table_ref: &TableReference) -> QuillSQLResult<()> {
166 let table = Self::canonical_table_name(table_ref);
167 let table_id = self.table_descriptor(table_ref)?;
168 let key = table_key(&table);
169 let schema_key = table_schema_key(&table);
170 self.db
171 .atomic(|batch| {
172 batch.delete(CATALOG_TREE, &key);
173 batch.delete(CATALOG_TREE, &schema_key);
174 })
175 .map_err(map_holt_err)
176 .and_then(committed)?;
177 if let Some(table_id) = table_id {
178 self.drop_tree_if_exists(&Self::table_tree_name(table_id))?;
179 }
180 Ok(())
181 }
182
183 pub fn drop_index_descriptor(
184 &self,
185 table_ref: &TableReference,
186 index_name: &str,
187 ) -> QuillSQLResult<()> {
188 let table = Self::canonical_table_name(table_ref);
189 let index_id = self.index_descriptor(table_ref, index_name)?;
190 let key = index_key(&table, index_name);
191 let schema_key = index_schema_key(&table, index_name);
192 self.db
193 .atomic(|batch| {
194 batch.delete(CATALOG_TREE, &key);
195 batch.delete(CATALOG_TREE, &schema_key);
196 })
197 .map_err(map_holt_err)
198 .and_then(committed)?;
199 if let Some(index_id) = index_id {
200 self.drop_tree_if_exists(&Self::index_tree_name(index_id))?;
201 }
202 Ok(())
203 }
204
205 pub fn allocate_rid(&self) -> QuillSQLResult<RecordId> {
206 let (rid, next) = self.reserve_rid()?;
207 self.db
208 .atomic(|batch| {
209 batch.put(CATALOG_TREE, NEXT_RID, &encode_u64(next));
210 })
211 .map_err(map_holt_err)
212 .and_then(committed)?;
213 Ok(rid)
214 }
215
216 pub fn insert_system_row(&self, table_id: u64, tuple: &Tuple) -> QuillSQLResult<RecordId> {
217 let (rid, next_rid) = self.reserve_rid()?;
218 let table_tree = Self::table_tree_name(table_id);
219 let row_key = encode_rid(rid);
220 let row_value = encode_row(TupleMeta::new(0, 0), tuple);
221 self.db
222 .atomic(|batch| {
223 batch.put(CATALOG_TREE, NEXT_RID, &encode_u64(next_rid));
224 batch.put(&table_tree, &row_key, &row_value);
225 })
226 .map_err(map_holt_err)
227 .and_then(committed)?;
228 Ok(rid)
229 }
230
231 pub fn mark_system_row_deleted(
232 &self,
233 table_id: u64,
234 schema: SchemaRef,
235 rid: RecordId,
236 ) -> QuillSQLResult<()> {
237 let table_tree = Self::table_tree_name(table_id);
238 let tree = self.db.open_tree(&table_tree).map_err(map_holt_err)?;
239 let row_key = encode_rid(rid);
240 let Some(raw) = tree.get(&row_key).map_err(map_holt_err)? else {
241 return Ok(());
242 };
243 let (mut meta, tuple) = decode_row(&raw, schema)?;
244 if meta.is_deleted {
245 return Ok(());
246 }
247 meta.mark_deleted(0, 0);
248 tree.put(&row_key, &encode_row(meta, &tuple))
249 .map_err(map_holt_err)
250 }
251
252 pub fn reserve_rid(&self) -> QuillSQLResult<(RecordId, u64)> {
253 let raw = self.next_rid.fetch_add(1, AtomicOrdering::SeqCst);
254 let next = raw
255 .checked_add(1)
256 .ok_or_else(|| QuillSQLError::Storage("Holt RID overflow".to_string()))?;
257 let page_id = (raw >> 32) as u32;
258 let slot_num = raw as u32;
259 Ok((RecordId::new(page_id, slot_num), next))
260 }
261
262 fn drop_tree_if_exists(&self, tree: &str) -> QuillSQLResult<()> {
263 match self.db.drop_tree(tree) {
264 Ok(()) => Ok(()),
265 Err(holt::Error::TreeNotFound { .. }) => Ok(()),
266 Err(err) => Err(map_holt_err(err)),
267 }
268 }
269
270 pub fn put_txn_status(
271 &self,
272 txn_id: TransactionId,
273 status: TransactionStatus,
274 ) -> QuillSQLResult<()> {
275 let tree = self.db.open_tree(TXN_TREE).map_err(map_holt_err)?;
276 tree.put(&encode_u64(txn_id), &[encode_txn_status(status)])
277 .map_err(map_holt_err)
278 }
279
280 pub fn txn_status(&self, txn_id: TransactionId) -> QuillSQLResult<Option<TransactionStatus>> {
281 let tree = self.db.open_tree(TXN_TREE).map_err(map_holt_err)?;
282 tree.get(&encode_u64(txn_id))
283 .map_err(map_holt_err)?
284 .map(|raw| decode_txn_status(raw.first().copied().unwrap_or(0)))
285 .transpose()
286 }
287
288 pub fn recover_txn_statuses(&self) -> QuillSQLResult<Vec<(TransactionId, TransactionStatus)>> {
289 let tree = self.db.open_tree(TXN_TREE).map_err(map_holt_err)?;
290 let mut statuses = Vec::new();
291 for entry in tree.range().into_iter() {
292 let entry = entry.map_err(map_holt_err)?;
293 let holt::RangeEntry::Key { key, value, .. } = entry else {
294 continue;
295 };
296 let txn_id = decode_u64(&key)?;
297 let mut status = decode_txn_status(value.first().copied().unwrap_or(0))?;
298 if matches!(
299 status,
300 TransactionStatus::InProgress | TransactionStatus::Unknown
301 ) {
302 status = TransactionStatus::Aborted;
303 tree.put(&key, &[encode_txn_status(status)])
304 .map_err(map_holt_err)?;
305 }
306 statuses.push((txn_id, status));
307 }
308 Ok(statuses)
309 }
310
311 pub fn table_descriptors(&self) -> QuillSQLResult<Vec<HoltTableDescriptor>> {
312 let catalog = self.db.open_tree(CATALOG_TREE).map_err(map_holt_err)?;
313 let mut descriptors = Vec::new();
314 for entry in catalog.scan(b"table/").into_iter() {
315 let entry = entry.map_err(map_holt_err)?;
316 let holt::RangeEntry::Key { key, value, .. } = entry else {
317 continue;
318 };
319 let canonical = std::str::from_utf8(&key[b"table/".len()..])
320 .map_err(|err| QuillSQLError::Storage(format!("invalid Holt table key: {err}")))?;
321 let schema_key = table_schema_key(canonical);
322 let Some(schema_raw) = catalog.get(&schema_key).map_err(map_holt_err)? else {
323 continue;
324 };
325 let table_ref = parse_canonical_table_name(canonical)?;
326 let schema = Arc::new(decode_schema(&schema_raw)?);
327 let table_id = decode_u64(&value)?;
328 descriptors.push(HoltTableDescriptor {
329 table_name: table_ref.table().to_string(),
330 table_ref,
331 schema,
332 table_id,
333 });
334 }
335 Ok(descriptors)
336 }
337
338 pub fn index_descriptors(&self) -> QuillSQLResult<Vec<HoltIndexDescriptor>> {
339 let catalog = self.db.open_tree(CATALOG_TREE).map_err(map_holt_err)?;
340 let mut descriptors = Vec::new();
341 for entry in catalog.scan(b"index/").into_iter() {
342 let entry = entry.map_err(map_holt_err)?;
343 let holt::RangeEntry::Key { key, value, .. } = entry else {
344 continue;
345 };
346 let raw = std::str::from_utf8(&key[b"index/".len()..])
347 .map_err(|err| QuillSQLError::Storage(format!("invalid Holt index key: {err}")))?;
348 let Some((canonical, index_name)) = raw.rsplit_once('/') else {
349 continue;
350 };
351 let schema_key = index_schema_key(canonical, index_name);
352 let Some(schema_raw) = catalog.get(&schema_key).map_err(map_holt_err)? else {
353 continue;
354 };
355 descriptors.push(HoltIndexDescriptor {
356 table_ref: parse_canonical_table_name(canonical)?,
357 index_name: index_name.to_string(),
358 key_schema: Arc::new(decode_schema(&schema_raw)?),
359 index_id: decode_u64(&value)?,
360 });
361 }
362 Ok(descriptors)
363 }
364}
365
366#[derive(Debug, Clone)]
367pub struct HoltTableDescriptor {
368 pub table_name: String,
369 pub table_ref: TableReference,
370 pub schema: SchemaRef,
371 pub table_id: u64,
372}
373
374#[derive(Debug, Clone)]
375pub struct HoltIndexDescriptor {
376 pub table_ref: TableReference,
377 pub index_name: String,
378 pub key_schema: SchemaRef,
379 pub index_id: u64,
380}
381
382#[derive(Clone)]
383pub struct HoltTableHandle {
384 table_ref: TableReference,
385 schema: SchemaRef,
386 table_id: u64,
387 store: Arc<HoltStore>,
388}
389
390impl HoltTableHandle {
391 pub fn new(
392 table_ref: TableReference,
393 schema: SchemaRef,
394 table_id: u64,
395 store: Arc<HoltStore>,
396 ) -> Self {
397 Self {
398 table_ref,
399 schema,
400 table_id,
401 store,
402 }
403 }
404
405 fn tree_name(&self) -> String {
406 HoltStore::table_tree_name(self.table_id)
407 }
408
409 fn row_value(&self, meta: TupleMeta, tuple: &Tuple) -> Vec<u8> {
410 encode_row(meta, tuple)
411 }
412
413 fn put_txn_in_progress(&self, txn_id: TransactionId) -> QuillSQLResult<()> {
414 self.store
415 .put_txn_status(txn_id, TransactionStatus::InProgress)
416 }
417}
418
419impl TableHandle for HoltTableHandle {
420 fn table_ref(&self) -> &TableReference {
421 &self.table_ref
422 }
423
424 fn schema(&self) -> SchemaRef {
425 self.schema.clone()
426 }
427
428 fn full_scan(&self) -> QuillSQLResult<Box<dyn TupleStream>> {
429 let tree = self
430 .store
431 .db
432 .open_tree(&self.tree_name())
433 .map_err(map_holt_err)?;
434 Ok(Box::new(HoltTableStream {
435 iter: tree.range().into_iter(),
436 schema: self.schema.clone(),
437 }))
438 }
439
440 fn full_tuple(&self, rid: RecordId) -> QuillSQLResult<(TupleMeta, Tuple)> {
441 let tree = self
442 .store
443 .db
444 .open_tree(&self.tree_name())
445 .map_err(map_holt_err)?;
446 let key = encode_rid(rid);
447 let Some(value) = tree.get(&key).map_err(map_holt_err)? else {
448 return Err(QuillSQLError::Storage(format!(
449 "Holt tuple {} not found",
450 rid
451 )));
452 };
453 decode_row(&value, self.schema.clone())
454 }
455
456 fn insert(
457 &self,
458 txn: &mut TxnContext<'_>,
459 tuple: &Tuple,
460 indexes: &[Arc<dyn IndexHandle>],
461 ) -> QuillSQLResult<()> {
462 txn.ensure_writable(self.table_ref(), "INSERT")?;
463 self.put_txn_in_progress(txn.txn_id())?;
464 let (rid, next_rid) = self.store.reserve_rid()?;
465 let meta = TupleMeta::new(txn.txn_id(), txn.command_id());
466 let row_key = encode_rid(rid);
467 let row_value = self.row_value(meta, tuple);
468 let mut index_links = Vec::new();
469 let mut index_puts = Vec::new();
470 for handle in indexes {
471 let index_id = handle.index_id();
472 if let Ok(key_tuple) = tuple.project_with_schema(handle.key_schema()) {
473 let encoded_key = encode_index_key(&key_tuple, rid)?;
474 index_puts.push((index_id, encoded_key));
475 index_links.push((handle.clone(), key_tuple, rid));
476 }
477 }
478 let table_tree = self.tree_name();
479 let rid_value = encode_rid(rid);
480 self.store
481 .db
482 .atomic(|batch| {
483 batch.put(CATALOG_TREE, NEXT_RID, &encode_u64(next_rid));
484 batch.put(&table_tree, &row_key, &row_value);
485 for (index_id, key) in &index_puts {
486 let index_tree = HoltStore::index_tree_name(*index_id);
487 batch.put(&index_tree, key, &rid_value);
488 }
489 })
490 .map_err(map_holt_err)
491 .and_then(committed)?;
492 txn.transaction_mut()
493 .push_insert_undo(Arc::new(self.clone()), rid, index_links);
494 Ok(())
495 }
496
497 fn delete(
498 &self,
499 txn: &mut TxnContext<'_>,
500 rid: RecordId,
501 prev_meta: TupleMeta,
502 prev_tuple: Tuple,
503 indexes: &[Arc<dyn IndexHandle>],
504 ) -> QuillSQLResult<()> {
505 txn.ensure_writable(self.table_ref(), "DELETE")?;
506 self.put_txn_in_progress(txn.txn_id())?;
507 let mut deleted_meta = prev_meta;
508 if deleted_meta.is_deleted {
509 return Ok(());
510 }
511 deleted_meta.mark_deleted(txn.txn_id(), txn.command_id());
512 let row_key = encode_rid(rid);
513 let row_value = self.row_value(deleted_meta, &prev_tuple);
514 let mut index_links = Vec::new();
515 let mut index_deletes = Vec::new();
516 for handle in indexes {
517 let index_id = handle.index_id();
518 if let Ok(key_tuple) = prev_tuple.project_with_schema(handle.key_schema()) {
519 index_deletes.push((index_id, encode_index_key(&key_tuple, rid)?));
520 index_links.push((handle.clone(), key_tuple, rid));
521 }
522 }
523 let table_tree = self.tree_name();
524 self.store
525 .db
526 .atomic(|batch| {
527 batch.put(&table_tree, &row_key, &row_value);
528 for (index_id, key) in &index_deletes {
529 let index_tree = HoltStore::index_tree_name(*index_id);
530 batch.delete(&index_tree, key);
531 }
532 })
533 .map_err(map_holt_err)
534 .and_then(committed)?;
535 txn.transaction_mut().push_delete_undo(
536 Arc::new(self.clone()),
537 rid,
538 prev_meta,
539 prev_tuple,
540 index_links,
541 );
542 Ok(())
543 }
544
545 fn update(
546 &self,
547 txn: &mut TxnContext<'_>,
548 rid: RecordId,
549 new_tuple: Tuple,
550 prev_meta: TupleMeta,
551 prev_tuple: Tuple,
552 indexes: &[Arc<dyn IndexHandle>],
553 ) -> QuillSQLResult<RecordId> {
554 txn.ensure_writable(self.table_ref(), "UPDATE")?;
555 self.put_txn_in_progress(txn.txn_id())?;
556 if prev_meta.is_deleted && prev_meta.next_version.is_some() {
557 return Err(QuillSQLError::Execution(format!(
558 "tuple {} has already been updated",
559 rid
560 )));
561 }
562 let (new_rid, next_rid) = self.store.reserve_rid()?;
563 let mut old_meta = prev_meta;
564 old_meta.set_next_version(Some(new_rid));
565 old_meta.mark_deleted(txn.txn_id(), txn.command_id());
566 let mut new_meta = TupleMeta::new(txn.txn_id(), txn.command_id());
567 new_meta.set_prev_version(Some(rid));
568
569 let mut old_links = Vec::new();
570 let mut new_links = Vec::new();
571 let mut old_index_deletes = Vec::new();
572 let mut new_index_puts = Vec::new();
573 for handle in indexes {
574 let index_id = handle.index_id();
575 if let Ok(old_key) = prev_tuple.project_with_schema(handle.key_schema()) {
576 old_index_deletes.push((index_id, encode_index_key(&old_key, rid)?));
577 old_links.push((handle.clone(), old_key, rid));
578 }
579 if let Ok(new_key) = new_tuple.project_with_schema(handle.key_schema()) {
580 new_index_puts.push((index_id, encode_index_key(&new_key, new_rid)?));
581 new_links.push((handle.clone(), new_key, new_rid));
582 }
583 }
584
585 let table_tree = self.tree_name();
586 let old_row_key = encode_rid(rid);
587 let new_row_key = encode_rid(new_rid);
588 let old_row_value = self.row_value(old_meta, &prev_tuple);
589 let new_row_value = self.row_value(new_meta, &new_tuple);
590 let new_rid_value = encode_rid(new_rid);
591 self.store
592 .db
593 .atomic(|batch| {
594 batch.put(CATALOG_TREE, NEXT_RID, &encode_u64(next_rid));
595 batch.put(&table_tree, &old_row_key, &old_row_value);
596 batch.put(&table_tree, &new_row_key, &new_row_value);
597 for (index_id, key) in &old_index_deletes {
598 let index_tree = HoltStore::index_tree_name(*index_id);
599 batch.delete(&index_tree, key);
600 }
601 for (index_id, key) in &new_index_puts {
602 let index_tree = HoltStore::index_tree_name(*index_id);
603 batch.put(&index_tree, key, &new_rid_value);
604 }
605 })
606 .map_err(map_holt_err)
607 .and_then(committed)?;
608 txn.transaction_mut().push_update_undo(
609 Arc::new(self.clone()),
610 crate::transaction::UpdateUndo {
611 old_rid: rid,
612 new_rid,
613 prev_meta,
614 prev_tuple,
615 new_keys: new_links,
616 old_keys: old_links,
617 },
618 );
619 Ok(new_rid)
620 }
621
622 fn prepare_row_for_write(
623 &self,
624 txn: &mut TxnContext<'_>,
625 rid: RecordId,
626 observed_meta: &TupleMeta,
627 ) -> QuillSQLResult<Option<(TupleMeta, Tuple)>> {
628 if !txn.is_visible(observed_meta) {
629 return Ok(None);
630 }
631 txn.lock_row_exclusive(self.table_ref(), rid)?;
632 let (current_meta, current_tuple) = self.full_tuple(rid)?;
633 if !txn.is_visible(¤t_meta) {
634 txn.unlock_row(self.table_ref(), rid);
635 return Ok(None);
636 }
637 Ok(Some((current_meta, current_tuple)))
638 }
639
640 fn undo_insert(&self, rid: RecordId, txn_id: TransactionId) -> QuillSQLResult<()> {
641 self.store
642 .put_txn_status(txn_id, TransactionStatus::InProgress)?;
643 let (mut meta, tuple) = self.full_tuple(rid)?;
644 meta.mark_deleted(txn_id, 0);
645 let tree = self
646 .store
647 .db
648 .open_tree(&self.tree_name())
649 .map_err(map_holt_err)?;
650 tree.put(&encode_rid(rid), &self.row_value(meta, &tuple))
651 .map_err(map_holt_err)
652 }
653
654 fn undo_delete(
655 &self,
656 rid: RecordId,
657 prev_meta: TupleMeta,
658 prev_tuple: Tuple,
659 ) -> QuillSQLResult<()> {
660 let tree = self
661 .store
662 .db
663 .open_tree(&self.tree_name())
664 .map_err(map_holt_err)?;
665 tree.put(&encode_rid(rid), &self.row_value(prev_meta, &prev_tuple))
666 .map_err(map_holt_err)
667 }
668}
669
670pub struct HoltIndexHandle {
671 name: String,
672 key_schema: SchemaRef,
673 index_id: u64,
674 store: Arc<HoltStore>,
675}
676
677impl HoltIndexHandle {
678 pub fn new(name: String, key_schema: SchemaRef, index_id: u64, store: Arc<HoltStore>) -> Self {
679 Self {
680 name,
681 key_schema,
682 index_id,
683 store,
684 }
685 }
686
687 fn tree_name(&self) -> String {
688 HoltStore::index_tree_name(self.index_id)
689 }
690}
691
692impl IndexHandle for HoltIndexHandle {
693 fn name(&self) -> &str {
694 &self.name
695 }
696
697 fn key_schema(&self) -> SchemaRef {
698 self.key_schema.clone()
699 }
700
701 fn index_id(&self) -> u64 {
702 self.index_id
703 }
704
705 fn insert(&self, key: &Tuple, rid: RecordId, txn_id: TransactionId) -> QuillSQLResult<()> {
706 self.store
707 .put_txn_status(txn_id, TransactionStatus::InProgress)?;
708 let tree = self
709 .store
710 .db
711 .open_tree(&self.tree_name())
712 .map_err(map_holt_err)?;
713 tree.put(&encode_index_key(key, rid)?, &encode_rid(rid))
714 .map_err(map_holt_err)
715 }
716
717 fn delete(&self, key: &Tuple, rid: RecordId, txn_id: TransactionId) -> QuillSQLResult<()> {
718 self.store
719 .put_txn_status(txn_id, TransactionStatus::InProgress)?;
720 let tree = self
721 .store
722 .db
723 .open_tree(&self.tree_name())
724 .map_err(map_holt_err)?;
725 tree.delete(&encode_index_key(key, rid)?)
726 .map_err(map_holt_err)?;
727 Ok(())
728 }
729
730 fn range_scan(
731 &self,
732 table: Arc<dyn TableHandle>,
733 request: IndexScanRequest,
734 ) -> QuillSQLResult<Box<dyn TupleStream>> {
735 let tree = self
736 .store
737 .db
738 .open_tree(&self.tree_name())
739 .map_err(map_holt_err)?;
740 let start_after = index_start_after_key(&request)?;
741 let mut range = tree.range();
742 if let Some(key) = start_after.as_deref() {
743 range = range.start_after(key);
744 }
745 Ok(Box::new(HoltIndexStream {
746 iter: range.into_iter(),
747 table,
748 key_schema: self.key_schema.clone(),
749 request,
750 }))
751 }
752}
753
754struct HoltTableStream {
755 iter: holt::RangeIter,
756 schema: SchemaRef,
757}
758
759impl TupleStream for HoltTableStream {
760 fn next(&mut self) -> QuillSQLResult<Option<(RecordId, TupleMeta, Tuple)>> {
761 for entry in self.iter.by_ref() {
762 let entry = entry.map_err(map_holt_err)?;
763 let holt::RangeEntry::Key { key, value, .. } = entry else {
764 continue;
765 };
766 let rid = decode_rid(&key)?;
767 let (meta, tuple) = decode_row(&value, self.schema.clone())?;
768 return Ok(Some((rid, meta, tuple)));
769 }
770 Ok(None)
771 }
772}
773
774struct HoltIndexStream {
775 iter: holt::RangeIter,
776 table: Arc<dyn TableHandle>,
777 key_schema: SchemaRef,
778 request: IndexScanRequest,
779}
780
781impl TupleStream for HoltIndexStream {
782 fn next(&mut self) -> QuillSQLResult<Option<(RecordId, TupleMeta, Tuple)>> {
783 for entry in self.iter.by_ref() {
784 let entry = entry.map_err(map_holt_err)?;
785 let holt::RangeEntry::Key { value, .. } = entry else {
786 continue;
787 };
788 let rid = decode_rid(&value)?;
789 let Ok((meta, tuple)) = self.table.full_tuple(rid) else {
790 continue;
791 };
792 let key_tuple = tuple.project_with_schema(self.key_schema.clone())?;
793 if tuple_above_end(&key_tuple, &self.request) {
794 return Ok(None);
795 }
796 if tuple_within_bounds(&key_tuple, &self.request) {
797 return Ok(Some((rid, meta, tuple)));
798 }
799 }
800 Ok(None)
801 }
802}
803
804fn encode_row(meta: TupleMeta, tuple: &Tuple) -> Vec<u8> {
805 let mut out = Vec::new();
806 out.extend(meta.insert_txn_id.to_be_bytes());
807 out.extend(meta.insert_cid.to_be_bytes());
808 out.extend(meta.delete_txn_id.to_be_bytes());
809 out.extend(meta.delete_cid.to_be_bytes());
810 out.push(u8::from(meta.is_deleted));
811 encode_optional_rid(&mut out, meta.next_version);
812 encode_optional_rid(&mut out, meta.prev_version);
813 out.extend(TupleCodec::encode(tuple));
814 out
815}
816
817fn decode_row(bytes: &[u8], schema: SchemaRef) -> QuillSQLResult<(TupleMeta, Tuple)> {
818 let (meta, offset) = decode_tuple_meta(bytes)?;
819 let (tuple, _) = TupleCodec::decode(&bytes[offset..], schema)?;
820 Ok((meta, tuple))
821}
822
823fn encode_optional_rid(out: &mut Vec<u8>, rid: Option<RecordId>) {
824 match rid {
825 Some(rid) => {
826 out.push(1);
827 out.extend(encode_rid(rid));
828 }
829 None => out.push(0),
830 }
831}
832
833fn decode_tuple_meta(bytes: &[u8]) -> QuillSQLResult<(TupleMeta, usize)> {
834 let mut offset = 0usize;
835 let insert_txn_id = read_u64(bytes, &mut offset)?;
836 let insert_cid = read_u32(bytes, &mut offset)?;
837 let delete_txn_id = read_u64(bytes, &mut offset)?;
838 let delete_cid = read_u32(bytes, &mut offset)?;
839 let is_deleted = read_u8(bytes, &mut offset)? != 0;
840 let next_version = read_optional_rid(bytes, &mut offset)?;
841 let prev_version = read_optional_rid(bytes, &mut offset)?;
842 Ok((
843 TupleMeta {
844 insert_txn_id,
845 insert_cid,
846 delete_txn_id,
847 delete_cid,
848 is_deleted,
849 next_version,
850 prev_version,
851 },
852 offset,
853 ))
854}
855
856fn read_optional_rid(bytes: &[u8], offset: &mut usize) -> QuillSQLResult<Option<RecordId>> {
857 let present = read_u8(bytes, offset)?;
858 if present == 0 {
859 return Ok(None);
860 }
861 let rid_bytes = take(bytes, offset, 8)?;
862 decode_rid(rid_bytes).map(Some)
863}
864
865fn read_u8(bytes: &[u8], offset: &mut usize) -> QuillSQLResult<u8> {
866 Ok(take(bytes, offset, 1)?[0])
867}
868
869fn read_u32(bytes: &[u8], offset: &mut usize) -> QuillSQLResult<u32> {
870 let raw = take(bytes, offset, 4)?;
871 Ok(u32::from_be_bytes(raw.try_into().unwrap()))
872}
873
874fn read_u64(bytes: &[u8], offset: &mut usize) -> QuillSQLResult<u64> {
875 let raw = take(bytes, offset, 8)?;
876 Ok(u64::from_be_bytes(raw.try_into().unwrap()))
877}
878
879fn take<'a>(bytes: &'a [u8], offset: &mut usize, len: usize) -> QuillSQLResult<&'a [u8]> {
880 let end = offset
881 .checked_add(len)
882 .ok_or_else(|| QuillSQLError::Storage("Holt row offset overflow".to_string()))?;
883 if end > bytes.len() {
884 return Err(QuillSQLError::Storage(format!(
885 "Holt row is truncated: need {} bytes at offset {}, len {}",
886 len,
887 *offset,
888 bytes.len()
889 )));
890 }
891 let out = &bytes[*offset..end];
892 *offset = end;
893 Ok(out)
894}
895
896pub fn encode_index_key(tuple: &Tuple, rid: RecordId) -> QuillSQLResult<Vec<u8>> {
897 let mut out = Vec::new();
898 for (value, col) in tuple.data.iter().zip(tuple.schema.columns.iter()) {
899 encode_ordered_scalar(&mut out, value, col.data_type)?;
900 }
901 out.push(0xff);
902 out.extend(encode_rid(rid));
903 Ok(out)
904}
905
906fn index_start_after_key(request: &IndexScanRequest) -> QuillSQLResult<Option<Vec<u8>>> {
907 match &request.start {
908 Bound::Included(start) => {
909 let min_rid = RecordId::new(0, 0);
910 Ok(Some(encode_index_key(start, min_rid)?))
911 }
912 Bound::Excluded(start) => {
913 let max_rid = RecordId::new(u32::MAX, u32::MAX);
914 Ok(Some(encode_index_key(start, max_rid)?))
915 }
916 Bound::Unbounded => Ok(None),
917 }
918}
919
920fn encode_ordered_scalar(
921 out: &mut Vec<u8>,
922 value: &ScalarValue,
923 _data_type: DataType,
924) -> QuillSQLResult<()> {
925 if value.is_null() {
926 out.push(0);
927 return Ok(());
928 }
929 out.push(1);
930 match value {
931 ScalarValue::Boolean(Some(v)) => out.push(u8::from(*v)),
932 ScalarValue::Int8(Some(v)) => out.push((*v as u8) ^ 0x80),
933 ScalarValue::Int16(Some(v)) => out.extend(((*v as u16) ^ 0x8000).to_be_bytes()),
934 ScalarValue::Int32(Some(v)) => out.extend(((*v as u32) ^ 0x8000_0000).to_be_bytes()),
935 ScalarValue::Int64(Some(v)) => {
936 out.extend(((*v as u64) ^ 0x8000_0000_0000_0000).to_be_bytes())
937 }
938 ScalarValue::UInt8(Some(v)) => out.push(*v),
939 ScalarValue::UInt16(Some(v)) => out.extend(v.to_be_bytes()),
940 ScalarValue::UInt32(Some(v)) => out.extend(v.to_be_bytes()),
941 ScalarValue::UInt64(Some(v)) => out.extend(v.to_be_bytes()),
942 ScalarValue::Float32(Some(v)) => out.extend(ordered_f32_bits(*v).to_be_bytes()),
943 ScalarValue::Float64(Some(v)) => out.extend(ordered_f64_bits(*v).to_be_bytes()),
944 ScalarValue::Varchar(Some(v)) => encode_ordered_bytes(out, v.as_bytes()),
945 ScalarValue::Boolean(None)
946 | ScalarValue::Int8(None)
947 | ScalarValue::Int16(None)
948 | ScalarValue::Int32(None)
949 | ScalarValue::Int64(None)
950 | ScalarValue::UInt8(None)
951 | ScalarValue::UInt16(None)
952 | ScalarValue::UInt32(None)
953 | ScalarValue::UInt64(None)
954 | ScalarValue::Float32(None)
955 | ScalarValue::Float64(None)
956 | ScalarValue::Varchar(None) => unreachable!("null handled above"),
957 }
958 Ok(())
959}
960
961fn ordered_f32_bits(value: f32) -> u32 {
962 let bits = value.to_bits();
963 if bits & 0x8000_0000 == 0 {
964 bits ^ 0x8000_0000
965 } else {
966 !bits
967 }
968}
969
970fn ordered_f64_bits(value: f64) -> u64 {
971 let bits = value.to_bits();
972 if bits & 0x8000_0000_0000_0000 == 0 {
973 bits ^ 0x8000_0000_0000_0000
974 } else {
975 !bits
976 }
977}
978
979fn encode_ordered_bytes(out: &mut Vec<u8>, bytes: &[u8]) {
980 for byte in bytes {
981 if *byte == 0 {
982 out.extend([0, 0xff]);
983 } else {
984 out.push(*byte);
985 }
986 }
987 out.extend([0, 0]);
988}
989
990fn tuple_within_bounds(tuple: &Tuple, request: &IndexScanRequest) -> bool {
991 let lower = match &request.start {
992 Bound::Included(start) => tuple.cmp(start) != Ordering::Less,
993 Bound::Excluded(start) => tuple.cmp(start) == Ordering::Greater,
994 Bound::Unbounded => true,
995 };
996 if !lower {
997 return false;
998 }
999 match &request.end {
1000 Bound::Included(end) => tuple.cmp(end) != Ordering::Greater,
1001 Bound::Excluded(end) => tuple.cmp(end) == Ordering::Less,
1002 Bound::Unbounded => true,
1003 }
1004}
1005
1006fn tuple_above_end(tuple: &Tuple, request: &IndexScanRequest) -> bool {
1007 match &request.end {
1008 Bound::Included(end) => tuple.cmp(end) == Ordering::Greater,
1009 Bound::Excluded(end) => tuple.cmp(end) != Ordering::Less,
1010 Bound::Unbounded => false,
1011 }
1012}
1013
1014fn read_counter(db: &holt::DB, key: &[u8], default_value: u64) -> QuillSQLResult<u64> {
1015 let catalog = db.open_tree(CATALOG_TREE).map_err(map_holt_err)?;
1016 match catalog.get(key).map_err(map_holt_err)? {
1017 Some(raw) => decode_u64(&raw),
1018 None => {
1019 catalog
1020 .put(key, &encode_u64(default_value))
1021 .map_err(map_holt_err)?;
1022 Ok(default_value)
1023 }
1024 }
1025}
1026
1027pub fn encode_rid(rid: RecordId) -> [u8; 8] {
1028 let mut out = [0u8; 8];
1029 out[..4].copy_from_slice(&rid.page_id.to_be_bytes());
1030 out[4..].copy_from_slice(&rid.slot_num.to_be_bytes());
1031 out
1032}
1033
1034pub fn decode_rid(bytes: &[u8]) -> QuillSQLResult<RecordId> {
1035 if bytes.len() != 8 {
1036 return Err(QuillSQLError::Storage(format!(
1037 "invalid Holt RID length {}",
1038 bytes.len()
1039 )));
1040 }
1041 let page_id = u32::from_be_bytes(bytes[..4].try_into().unwrap());
1042 let slot_num = u32::from_be_bytes(bytes[4..].try_into().unwrap());
1043 Ok(RecordId::new(page_id, slot_num))
1044}
1045
1046fn table_key(table: &str) -> Vec<u8> {
1047 format!("{TABLE_PREFIX}/{table}").into_bytes()
1048}
1049
1050fn table_schema_key(table: &str) -> Vec<u8> {
1051 format!("{TABLE_SCHEMA_PREFIX}/{table}").into_bytes()
1052}
1053
1054fn index_key(table: &str, index_name: &str) -> Vec<u8> {
1055 format!("{INDEX_PREFIX}/{table}/{index_name}").into_bytes()
1056}
1057
1058fn index_schema_key(table: &str, index_name: &str) -> Vec<u8> {
1059 format!("{INDEX_SCHEMA_PREFIX}/{table}/{index_name}").into_bytes()
1060}
1061
1062fn parse_canonical_table_name(canonical: &str) -> QuillSQLResult<TableReference> {
1063 let parts = canonical.split('.').collect::<Vec<_>>();
1064 match parts.as_slice() {
1065 [catalog, schema, table] => Ok(TableReference::Full {
1066 catalog: (*catalog).to_string(),
1067 schema: (*schema).to_string(),
1068 table: (*table).to_string(),
1069 }),
1070 _ => Err(QuillSQLError::Storage(format!(
1071 "invalid Holt canonical table name {canonical}"
1072 ))),
1073 }
1074}
1075
1076fn encode_schema(schema: &Schema) -> QuillSQLResult<Vec<u8>> {
1077 let mut out = Vec::new();
1078 put_u32(&mut out, schema.columns.len() as u32);
1079 for column in &schema.columns {
1080 put_string(&mut out, &column.name)?;
1081 let data_type = column.data_type.to_string();
1082 put_string(&mut out, &data_type)?;
1083 out.push(u8::from(column.nullable));
1084 let default = column.default.to_string();
1085 put_string(&mut out, &default)?;
1086 }
1087 Ok(out)
1088}
1089
1090fn decode_schema(bytes: &[u8]) -> QuillSQLResult<Schema> {
1091 let mut offset = 0usize;
1092 let column_count = read_u32(bytes, &mut offset)?;
1093 let mut columns = Vec::with_capacity(column_count as usize);
1094 for _ in 0..column_count {
1095 let name = read_string(bytes, &mut offset)?;
1096 let data_type_str = read_string(bytes, &mut offset)?;
1097 let data_type = DataType::try_from(data_type_str.as_str())?;
1098 let nullable = read_u8(bytes, &mut offset)? != 0;
1099 let default_str = read_string(bytes, &mut offset)?;
1100 let default = ScalarValue::from_string(&default_str, data_type)?;
1101 columns.push(Column::new(name, data_type, nullable).with_default(default));
1102 }
1103 Ok(Schema::new(columns))
1104}
1105
1106fn put_u32(out: &mut Vec<u8>, value: u32) {
1107 out.extend(value.to_be_bytes());
1108}
1109
1110fn put_string(out: &mut Vec<u8>, value: &str) -> QuillSQLResult<()> {
1111 let len = u32::try_from(value.len())
1112 .map_err(|_| QuillSQLError::Storage("Holt schema string is too large".to_string()))?;
1113 put_u32(out, len);
1114 out.extend(value.as_bytes());
1115 Ok(())
1116}
1117
1118fn read_string(bytes: &[u8], offset: &mut usize) -> QuillSQLResult<String> {
1119 let len = read_u32(bytes, offset)? as usize;
1120 let raw = take(bytes, offset, len)?;
1121 String::from_utf8(raw.to_vec())
1122 .map_err(|err| QuillSQLError::Storage(format!("invalid Holt schema string: {err}")))
1123}
1124
1125fn encode_u64(value: u64) -> [u8; 8] {
1126 value.to_be_bytes()
1127}
1128
1129fn decode_u64(bytes: &[u8]) -> QuillSQLResult<u64> {
1130 if bytes.len() != 8 {
1131 return Err(QuillSQLError::Storage(format!(
1132 "invalid Holt u64 length {}",
1133 bytes.len()
1134 )));
1135 }
1136 Ok(u64::from_be_bytes(bytes.try_into().unwrap()))
1137}
1138
1139fn committed(committed: bool) -> QuillSQLResult<()> {
1140 if committed {
1141 Ok(())
1142 } else {
1143 Err(QuillSQLError::Storage(
1144 "Holt metadata compare-and-set failed".to_string(),
1145 ))
1146 }
1147}
1148
1149fn encode_txn_status(status: TransactionStatus) -> u8 {
1150 match status {
1151 TransactionStatus::InProgress => 1,
1152 TransactionStatus::Committed => 2,
1153 TransactionStatus::Aborted => 3,
1154 TransactionStatus::Unknown => 4,
1155 }
1156}
1157
1158fn decode_txn_status(raw: u8) -> QuillSQLResult<TransactionStatus> {
1159 match raw {
1160 1 => Ok(TransactionStatus::InProgress),
1161 2 => Ok(TransactionStatus::Committed),
1162 3 => Ok(TransactionStatus::Aborted),
1163 4 => Ok(TransactionStatus::Unknown),
1164 other => Err(QuillSQLError::Storage(format!(
1165 "invalid Holt txn status {other}"
1166 ))),
1167 }
1168}
1169
1170pub(crate) fn map_holt_err(err: holt::Error) -> QuillSQLError {
1171 QuillSQLError::Storage(format!("Holt error: {err}"))
1172}
1173
1174#[cfg(test)]
1175mod tests {
1176 use std::sync::Arc;
1177
1178 use crate::catalog::{Column, DataType, Schema};
1179 use crate::storage::holt::encode_index_key;
1180 use crate::storage::record::RecordId;
1181 use crate::storage::tuple::Tuple;
1182 use crate::utils::scalar::ScalarValue;
1183
1184 fn schema(column_types: &[DataType]) -> Arc<Schema> {
1185 Arc::new(Schema::new(
1186 column_types
1187 .iter()
1188 .enumerate()
1189 .map(|(idx, data_type)| Column::new(format!("c{idx}"), *data_type, true))
1190 .collect(),
1191 ))
1192 }
1193
1194 fn assert_order(column_types: &[DataType], rows: Vec<Vec<ScalarValue>>) {
1195 let schema = schema(column_types);
1196 let tuples = rows
1197 .into_iter()
1198 .map(|row| Tuple::new(schema.clone(), row))
1199 .collect::<Vec<_>>();
1200 for pair in tuples.windows(2) {
1201 let left = &pair[0];
1202 let right = &pair[1];
1203 assert!(
1204 left < right,
1205 "test rows must be in Tuple order: {left:?} !< {right:?}"
1206 );
1207 let left_key = encode_index_key(left, RecordId::new(0, 1)).unwrap();
1208 let right_key = encode_index_key(right, RecordId::new(0, 1)).unwrap();
1209 assert!(
1210 left_key < right_key,
1211 "encoded key order differs from Tuple order for {left:?} and {right:?}"
1212 );
1213 }
1214 }
1215
1216 #[test]
1217 fn ordered_index_key_matches_tuple_order_for_scalars() {
1218 assert_order(
1219 &[DataType::Int32],
1220 vec![
1221 vec![ScalarValue::Int32(None)],
1222 vec![ScalarValue::Int32(Some(-10))],
1223 vec![ScalarValue::Int32(Some(0))],
1224 vec![ScalarValue::Int32(Some(10))],
1225 ],
1226 );
1227 assert_order(
1228 &[DataType::UInt64],
1229 vec![
1230 vec![ScalarValue::UInt64(None)],
1231 vec![ScalarValue::UInt64(Some(0))],
1232 vec![ScalarValue::UInt64(Some(10))],
1233 vec![ScalarValue::UInt64(Some(u64::MAX))],
1234 ],
1235 );
1236 assert_order(
1237 &[DataType::Float64],
1238 vec![
1239 vec![ScalarValue::Float64(None)],
1240 vec![ScalarValue::Float64(Some(f64::NEG_INFINITY))],
1241 vec![ScalarValue::Float64(Some(-1.0))],
1242 vec![ScalarValue::Float64(Some(-0.0))],
1243 vec![ScalarValue::Float64(Some(0.0))],
1244 vec![ScalarValue::Float64(Some(1.0))],
1245 vec![ScalarValue::Float64(Some(f64::INFINITY))],
1246 ],
1247 );
1248 assert_order(
1249 &[DataType::Varchar(None)],
1250 vec![
1251 vec![ScalarValue::Varchar(None)],
1252 vec![ScalarValue::Varchar(Some(String::new()))],
1253 vec![ScalarValue::Varchar(Some("a".to_string()))],
1254 vec![ScalarValue::Varchar(Some("aa".to_string()))],
1255 vec![ScalarValue::Varchar(Some("b".to_string()))],
1256 ],
1257 );
1258 }
1259
1260 #[test]
1261 fn ordered_index_key_handles_composite_and_duplicate_rids() {
1262 assert_order(
1263 &[DataType::Int32, DataType::Varchar(None)],
1264 vec![
1265 vec![
1266 ScalarValue::Int32(Some(1)),
1267 ScalarValue::Varchar(Some("a".to_string())),
1268 ],
1269 vec![
1270 ScalarValue::Int32(Some(1)),
1271 ScalarValue::Varchar(Some("b".to_string())),
1272 ],
1273 vec![
1274 ScalarValue::Int32(Some(2)),
1275 ScalarValue::Varchar(Some("a".to_string())),
1276 ],
1277 ],
1278 );
1279
1280 let schema = schema(&[DataType::Int32]);
1281 let tuple = Tuple::new(schema, vec![ScalarValue::Int32(Some(1))]);
1282 let low = encode_index_key(&tuple, RecordId::new(0, 1)).unwrap();
1283 let high = encode_index_key(&tuple, RecordId::new(0, 2)).unwrap();
1284 assert!(low < high);
1285 }
1286}