Skip to main content

exoware_sql/
types.rs

1use std::collections::{HashMap, HashSet};
2use std::sync::Arc;
3
4use datafusion::arrow::datatypes::{i256, DataType, Field, Schema, SchemaRef, TimeUnit};
5use exoware_sdk_rs::keys::{Key, KeyCodec};
6use exoware_sdk_rs::StoreClient;
7
8use crate::codec::{primary_key_codec, secondary_index_codec};
9
10pub(crate) const TABLE_PREFIX_BITS: u8 = 4;
11pub(crate) const KEY_KIND_BITS: u8 = 1;
12pub(crate) const PRIMARY_RESERVED_BITS: u8 = TABLE_PREFIX_BITS + KEY_KIND_BITS;
13pub(crate) const INDEX_SLOT_BITS: u8 = 4;
14pub(crate) const INDEX_FAMILY_BITS: u8 = TABLE_PREFIX_BITS + KEY_KIND_BITS + INDEX_SLOT_BITS;
15pub(crate) const PRIMARY_KEY_BIT_OFFSET: usize = PRIMARY_RESERVED_BITS as usize;
16pub(crate) const INDEX_KEY_BIT_OFFSET: usize = INDEX_FAMILY_BITS as usize;
17pub(crate) const MAX_TABLES: usize = 1usize << TABLE_PREFIX_BITS;
18pub(crate) const MAX_INDEX_SPECS: usize = (1usize << INDEX_SLOT_BITS) - 1;
19pub(crate) const STRING_KEY_INLINE_LIMIT: usize = 15;
20pub(crate) const STRING_KEY_TERMINATOR: u8 = 0x00;
21pub(crate) const STRING_KEY_ESCAPE_PREFIX: u8 = 0x01;
22pub(crate) const STRING_KEY_ESCAPE_FF: u8 = 0x02;
23pub(crate) const PAGE_SIZE: usize = 1_000;
24pub(crate) const BATCH_FLUSH_ROWS: usize = 2_048;
25pub(crate) const INDEX_BACKFILL_FLUSH_ENTRIES: usize = 4_096;
26
27#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
28pub struct IndexBackfillReport {
29    pub scanned_rows: u64,
30    pub indexes_backfilled: usize,
31    pub index_entries_written: u64,
32}
33
34#[derive(Debug, Clone, PartialEq, Eq)]
35pub struct IndexBackfillOptions {
36    pub row_batch_size: usize,
37    pub start_from_primary_key: Option<Key>,
38}
39
40impl Default for IndexBackfillOptions {
41    fn default() -> Self {
42        Self {
43            row_batch_size: PAGE_SIZE,
44            start_from_primary_key: None,
45        }
46    }
47}
48
49#[derive(Debug, Clone, PartialEq, Eq)]
50pub enum IndexBackfillEvent {
51    Started {
52        table_name: String,
53        indexes_backfilled: usize,
54        row_batch_size: usize,
55        start_cursor: Key,
56    },
57    Progress {
58        scanned_rows: u64,
59        index_entries_written: u64,
60        last_scanned_primary_key: Key,
61        next_cursor: Option<Key>,
62    },
63    Completed {
64        report: IndexBackfillReport,
65    },
66}
67
68#[derive(Debug, Clone, Copy, PartialEq, Eq)]
69pub(crate) enum ListElementKind {
70    Int64,
71    Float64,
72    Boolean,
73    Utf8,
74}
75
76#[derive(Debug, Clone, Copy, PartialEq, Eq)]
77pub(crate) enum ColumnKind {
78    Int64,
79    UInt64,
80    Float64,
81    Boolean,
82    Utf8,
83    Date32,
84    Date64,
85    Timestamp,
86    Decimal128,
87    Decimal256,
88    FixedSizeBinary(usize),
89    List(ListElementKind),
90}
91
92impl ColumnKind {
93    pub(crate) fn from_data_type(data_type: &DataType) -> Result<Self, String> {
94        match data_type {
95            DataType::Int64 => Ok(Self::Int64),
96            DataType::UInt64 => Ok(Self::UInt64),
97            DataType::Float64 => Ok(Self::Float64),
98            DataType::Boolean => Ok(Self::Boolean),
99            DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View => Ok(Self::Utf8),
100            DataType::Date32 => Ok(Self::Date32),
101            DataType::Date64 => Ok(Self::Date64),
102            DataType::Timestamp(_, _) => Ok(Self::Timestamp),
103            DataType::Decimal128(_, _) => Ok(Self::Decimal128),
104            DataType::Decimal256(_, _) => Ok(Self::Decimal256),
105            DataType::FixedSizeBinary(n) => Ok(Self::FixedSizeBinary(*n as usize)),
106            DataType::List(field) | DataType::LargeList(field) => {
107                let inner = Self::from_data_type(field.data_type())?;
108                let elem = match inner {
109                    Self::Int64 => ListElementKind::Int64,
110                    Self::Float64 => ListElementKind::Float64,
111                    Self::Boolean => ListElementKind::Boolean,
112                    Self::Utf8 => ListElementKind::Utf8,
113                    _ => {
114                        return Err(format!(
115                            "unsupported list element type {:?}; \
116                             list elements must be Int64, Float64, Boolean, or Utf8",
117                            field.data_type()
118                        ))
119                    }
120                };
121                Ok(Self::List(elem))
122            }
123            other => Err(format!(
124                "unsupported column type {other:?}; supported: \
125                 Int64, UInt64, Float64, Boolean, Utf8, Date32, Date64, Timestamp, \
126                 Decimal128, Decimal256, FixedSizeBinary, List"
127            )),
128        }
129    }
130
131    pub(crate) fn fixed_key_width(self) -> Option<usize> {
132        match self {
133            Self::Int64 => Some(8),
134            Self::UInt64 => Some(8),
135            Self::Float64 => Some(8),
136            Self::Boolean => Some(1),
137            Self::Utf8 => None,
138            Self::Date32 => Some(4),
139            Self::Date64 => Some(8),
140            Self::Timestamp => Some(8),
141            Self::Decimal128 => Some(16),
142            Self::Decimal256 => Some(32),
143            Self::FixedSizeBinary(n) => Some(n),
144            Self::List(_) => None,
145        }
146    }
147
148    pub(crate) fn key_width(self) -> usize {
149        self.fixed_key_width()
150            .unwrap_or(STRING_KEY_INLINE_LIMIT + 1)
151    }
152
153    pub(crate) fn indexable(self) -> bool {
154        !matches!(self, Self::List(_))
155    }
156}
157
158#[derive(Debug, Clone)]
159pub struct TableColumnConfig {
160    pub name: String,
161    pub data_type: DataType,
162    pub nullable: bool,
163}
164
165impl TableColumnConfig {
166    pub fn new(name: impl Into<String>, data_type: DataType, nullable: bool) -> Self {
167        Self {
168            name: name.into(),
169            data_type,
170            nullable,
171        }
172    }
173}
174
175#[derive(Debug, Clone, Copy, PartialEq, Eq)]
176pub enum IndexLayout {
177    Lexicographic,
178    ZOrder,
179}
180
181#[derive(Debug, Clone)]
182pub struct IndexSpec {
183    name: String,
184    key_columns: Vec<String>,
185    cover_columns: Vec<String>,
186    layout: IndexLayout,
187}
188
189impl IndexSpec {
190    #[cfg(test)]
191    pub(crate) fn new(name: impl Into<String>, key_columns: Vec<String>) -> Result<Self, String> {
192        Self::lexicographic(name, key_columns)
193    }
194
195    pub fn lexicographic(
196        name: impl Into<String>,
197        key_columns: Vec<String>,
198    ) -> Result<Self, String> {
199        let name = name.into();
200        if name.trim().is_empty() {
201            return Err("index name must not be empty".to_string());
202        }
203        if key_columns.is_empty() {
204            return Err("key_columns must not be empty".to_string());
205        }
206        Ok(Self {
207            name,
208            key_columns,
209            cover_columns: Vec::new(),
210            layout: IndexLayout::Lexicographic,
211        })
212    }
213
214    pub fn z_order(name: impl Into<String>, key_columns: Vec<String>) -> Result<Self, String> {
215        Self::lexicographic(name, key_columns).map(|spec| spec.with_layout(IndexLayout::ZOrder))
216    }
217
218    pub fn with_cover_columns(mut self, cover_columns: Vec<String>) -> Self {
219        self.cover_columns = cover_columns;
220        self
221    }
222
223    pub fn with_layout(mut self, layout: IndexLayout) -> Self {
224        self.layout = layout;
225        self
226    }
227
228    pub fn name(&self) -> &str {
229        &self.name
230    }
231
232    pub fn key_columns(&self) -> &[String] {
233        &self.key_columns
234    }
235
236    pub fn cover_columns(&self) -> &[String] {
237        &self.cover_columns
238    }
239
240    pub fn layout(&self) -> &IndexLayout {
241        &self.layout
242    }
243}
244
245pub fn default_orders_index_specs() -> Vec<IndexSpec> {
246    vec![IndexSpec::lexicographic(
247        "region_customer",
248        vec!["region".to_string(), "customer_id".to_string()],
249    )
250    .expect("default orders index must be valid")]
251}
252
253#[derive(Debug, Clone)]
254pub(crate) struct KvTableConfig {
255    pub(crate) table_prefix: u8,
256    pub(crate) columns: Vec<TableColumnConfig>,
257    pub(crate) primary_key_columns: Vec<String>,
258    pub(crate) index_specs: Vec<IndexSpec>,
259}
260
261impl KvTableConfig {
262    pub(crate) fn new(
263        table_prefix: u8,
264        columns: Vec<TableColumnConfig>,
265        primary_key_columns: Vec<String>,
266        index_specs: Vec<IndexSpec>,
267    ) -> Result<Self, String> {
268        if usize::from(table_prefix) >= MAX_TABLES {
269            return Err(format!(
270                "table prefix {table_prefix} exceeds max {} for codec layout",
271                MAX_TABLES - 1
272            ));
273        }
274        if columns.is_empty() {
275            return Err("table config requires at least one column".to_string());
276        }
277        if primary_key_columns.is_empty() {
278            return Err("primary key must have at least one column".to_string());
279        }
280
281        let mut seen = HashSet::new();
282        let mut col_kinds = HashMap::new();
283        for col in &columns {
284            if col.name.trim().is_empty() {
285                return Err("column name must not be empty".to_string());
286            }
287            if !seen.insert(col.name.clone()) {
288                return Err(format!("duplicate column '{}'", col.name));
289            }
290            let kind = ColumnKind::from_data_type(&col.data_type)?;
291            col_kinds.insert(col.name.clone(), kind);
292        }
293
294        let mut total_pk_width = 0usize;
295        for pk_col in &primary_key_columns {
296            let kind = col_kinds
297                .get(pk_col)
298                .ok_or_else(|| format!("primary key column '{pk_col}' not found"))?;
299            match kind {
300                ColumnKind::Int64
301                | ColumnKind::UInt64
302                | ColumnKind::Utf8
303                | ColumnKind::FixedSizeBinary(_) => {}
304                _ => {
305                    return Err(format!(
306                        "primary key column '{pk_col}' must be Int64, UInt64, Utf8, or FixedSizeBinary"
307                    ));
308                }
309            }
310            total_pk_width += kind.key_width();
311        }
312        if total_pk_width > primary_key_codec(table_prefix)?.payload_capacity_bytes() {
313            return Err(format!(
314                "composite primary key is too wide ({total_pk_width} bytes) for codec payload"
315            ));
316        }
317
318        Ok(Self {
319            table_prefix,
320            columns,
321            primary_key_columns,
322            index_specs,
323        })
324    }
325
326    pub(crate) fn to_schema(&self) -> SchemaRef {
327        Arc::new(Schema::new(
328            self.columns
329                .iter()
330                .map(|col| {
331                    let dt = match &col.data_type {
332                        DataType::Timestamp(_, tz) => {
333                            DataType::Timestamp(TimeUnit::Microsecond, tz.clone())
334                        }
335                        DataType::LargeList(field) => DataType::List(field.clone()),
336                        other => other.clone(),
337                    };
338                    Field::new(&col.name, dt, col.nullable)
339                })
340                .collect::<Vec<_>>(),
341        ))
342    }
343}
344
345#[derive(Debug, Clone)]
346pub(crate) struct ResolvedColumn {
347    pub(crate) name: String,
348    pub(crate) kind: ColumnKind,
349    pub(crate) nullable: bool,
350}
351
352#[derive(Debug, Clone)]
353pub(crate) struct ResolvedIndexSpec {
354    pub(crate) id: u8,
355    pub(crate) codec: KeyCodec,
356    pub(crate) name: String,
357    pub(crate) layout: IndexLayout,
358    pub(crate) key_columns: Vec<usize>,
359    pub(crate) value_column_mask: Vec<bool>,
360    pub(crate) key_columns_width: usize,
361}
362
363#[derive(Debug, Clone)]
364pub(crate) struct TableModel {
365    pub(crate) table_prefix: u8,
366    pub(crate) primary_key_codec: KeyCodec,
367    pub(crate) schema: SchemaRef,
368    pub(crate) columns: Vec<ResolvedColumn>,
369    pub(crate) columns_by_name: HashMap<String, usize>,
370    pub(crate) primary_key_indices: Vec<usize>,
371    pub(crate) primary_key_kinds: Vec<ColumnKind>,
372    pub(crate) primary_key_width: usize,
373}
374
375impl TableModel {
376    pub(crate) fn from_config(config: &KvTableConfig) -> Result<Self, String> {
377        let schema = config.to_schema();
378        let mut columns = Vec::with_capacity(config.columns.len());
379        let mut columns_by_name = HashMap::with_capacity(config.columns.len());
380
381        for (idx, col) in config.columns.iter().enumerate() {
382            let kind = ColumnKind::from_data_type(&col.data_type)?;
383            columns.push(ResolvedColumn {
384                name: col.name.clone(),
385                kind,
386                nullable: col.nullable,
387            });
388            columns_by_name.insert(col.name.clone(), idx);
389        }
390
391        let mut primary_key_indices = Vec::with_capacity(config.primary_key_columns.len());
392        let mut primary_key_kinds = Vec::with_capacity(config.primary_key_columns.len());
393        let mut primary_key_width = 0usize;
394        for pk_col in &config.primary_key_columns {
395            let idx = *columns_by_name
396                .get(pk_col)
397                .ok_or_else(|| format!("primary key column '{pk_col}' not found"))?;
398            let kind = columns[idx].kind;
399            primary_key_indices.push(idx);
400            primary_key_kinds.push(kind);
401            primary_key_width += kind.key_width();
402        }
403
404        Ok(Self {
405            table_prefix: config.table_prefix,
406            primary_key_codec: primary_key_codec(config.table_prefix)?,
407            schema,
408            columns,
409            columns_by_name,
410            primary_key_indices,
411            primary_key_kinds,
412            primary_key_width,
413        })
414    }
415
416    /// Whether a column index is part of the primary key.
417    pub(crate) fn is_pk_column(&self, col_idx: usize) -> bool {
418        self.primary_key_indices.contains(&col_idx)
419    }
420
421    pub(crate) fn pk_position(&self, col_idx: usize) -> Option<usize> {
422        self.primary_key_indices
423            .iter()
424            .position(|&idx| idx == col_idx)
425    }
426
427    pub(crate) fn resolve_index_specs(
428        &self,
429        specs: &[IndexSpec],
430    ) -> Result<Vec<ResolvedIndexSpec>, String> {
431        let mut out = Vec::with_capacity(specs.len());
432        let mut names = HashSet::new();
433
434        for (idx, spec) in specs.iter().enumerate() {
435            if !names.insert(spec.name.clone()) {
436                return Err(format!("duplicate index name '{}'", spec.name));
437            }
438
439            let id = u8::try_from(idx + 1).map_err(|_| {
440                format!("too many index specs for codec layout (max {MAX_INDEX_SPECS})")
441            })?;
442            if usize::from(id) > MAX_INDEX_SPECS {
443                return Err(format!(
444                    "too many index specs for codec layout (max {MAX_INDEX_SPECS})"
445                ));
446            }
447            let mut key_columns = Vec::with_capacity(spec.key_columns.len());
448            let mut key_columns_width = 0usize;
449            let mut value_column_mask = vec![false; self.columns.len()];
450            for col_name in &spec.key_columns {
451                let Some(col_idx) = self.columns_by_name.get(col_name).copied() else {
452                    return Err(format!(
453                        "index '{}' references unknown column '{}'",
454                        spec.name, col_name
455                    ));
456                };
457                if !self.columns[col_idx].kind.indexable() {
458                    return Err(format!(
459                        "index '{}' references non-indexable column '{}'",
460                        spec.name, col_name
461                    ));
462                }
463                if self.columns[col_idx].nullable {
464                    return Err(format!(
465                        "index '{}' references nullable column '{}'; \
466                         nullable columns cannot be used in index keys",
467                        spec.name, col_name
468                    ));
469                }
470                key_columns.push(col_idx);
471                key_columns_width += self.columns[col_idx].kind.key_width();
472                if !self.is_pk_column(col_idx) {
473                    value_column_mask[col_idx] = true;
474                }
475            }
476
477            for col_name in &spec.cover_columns {
478                let Some(col_idx) = self.columns_by_name.get(col_name).copied() else {
479                    return Err(format!(
480                        "index '{}' cover list references unknown column '{}'",
481                        spec.name, col_name
482                    ));
483                };
484                if self.is_pk_column(col_idx) {
485                    return Err(format!(
486                        "index '{}' cover column '{}' is a primary key column; \
487                         PK columns are always available from key bytes",
488                        spec.name, col_name
489                    ));
490                }
491                if !value_column_mask[col_idx] {
492                    value_column_mask[col_idx] = true;
493                }
494            }
495            let codec = secondary_index_codec(self.table_prefix, id)?;
496            if key_columns_width + self.primary_key_width > codec.payload_capacity_bytes() {
497                return Err(format!(
498                    "index '{}' key layout too wide for codec payload",
499                    spec.name
500                ));
501            }
502
503            out.push(ResolvedIndexSpec {
504                id,
505                codec,
506                name: spec.name.clone(),
507                layout: spec.layout,
508                key_columns,
509                value_column_mask,
510                key_columns_width,
511            });
512        }
513
514        Ok(out)
515    }
516
517    pub(crate) fn column(&self, index: usize) -> &ResolvedColumn {
518        &self.columns[index]
519    }
520}
521
522#[derive(Debug, Clone)]
523pub enum CellValue {
524    Null,
525    Int64(i64),
526    UInt64(u64),
527    Float64(f64),
528    Boolean(bool),
529    Date32(i32),
530    Date64(i64),
531    Timestamp(i64),
532    Decimal128(i128),
533    Decimal256(i256),
534    Utf8(String),
535    FixedBinary(Vec<u8>),
536    List(Vec<CellValue>),
537}
538
539#[derive(Debug, Clone)]
540pub(crate) struct KvRow {
541    pub(crate) values: Vec<CellValue>,
542}
543
544impl KvRow {
545    pub(crate) fn primary_key_values(&self, model: &TableModel) -> Vec<&CellValue> {
546        model
547            .primary_key_indices
548            .iter()
549            .map(|&idx| &self.values[idx])
550            .collect()
551    }
552
553    pub(crate) fn value_at(&self, idx: usize) -> &CellValue {
554        &self.values[idx]
555    }
556}
557
558#[derive(Debug, Clone, Default)]
559pub(crate) struct DecodedIndexEntry {
560    pub(crate) primary_key: Key,
561    pub(crate) primary_key_values: Vec<CellValue>,
562    pub(crate) values: HashMap<usize, CellValue>,
563}
564
565#[derive(Debug, Clone, PartialEq)]
566pub(crate) struct KeyRange {
567    pub(crate) start: Key,
568    pub(crate) end: Key,
569}
570
571#[derive(Debug, Clone)]
572pub(crate) struct IndexPlan {
573    pub(crate) spec_idx: usize,
574    pub(crate) ranges: Vec<KeyRange>,
575    pub(crate) constrained_prefix_len: usize,
576    pub(crate) constrained_column_count: usize,
577}
578
579#[derive(Debug, Clone)]
580pub(crate) struct KvTable {
581    pub(crate) client: StoreClient,
582    pub(crate) model: Arc<TableModel>,
583    pub(crate) index_specs: Arc<Vec<ResolvedIndexSpec>>,
584}
585
586impl KvTable {
587    pub(crate) fn new(client: StoreClient, config: KvTableConfig) -> Result<Self, String> {
588        let model = Arc::new(TableModel::from_config(&config)?);
589        let index_specs = Arc::new(model.resolve_index_specs(&config.index_specs)?);
590        Ok(Self {
591            client,
592            model,
593            index_specs,
594        })
595    }
596}