1use std::collections::{HashMap, HashSet};
2use std::sync::Arc;
3
4use datafusion::arrow::datatypes::{i256, DataType, Field, Schema, SchemaRef, TimeUnit};
5use exoware_sdk_rs::keys::{Key, KeyCodec};
6use exoware_sdk_rs::StoreClient;
7
8use crate::codec::{primary_key_codec, secondary_index_codec};
9
10pub(crate) const TABLE_PREFIX_BITS: u8 = 4;
11pub(crate) const KEY_KIND_BITS: u8 = 1;
12pub(crate) const PRIMARY_RESERVED_BITS: u8 = TABLE_PREFIX_BITS + KEY_KIND_BITS;
13pub(crate) const INDEX_SLOT_BITS: u8 = 4;
14pub(crate) const INDEX_FAMILY_BITS: u8 = TABLE_PREFIX_BITS + KEY_KIND_BITS + INDEX_SLOT_BITS;
15pub(crate) const PRIMARY_KEY_BIT_OFFSET: usize = PRIMARY_RESERVED_BITS as usize;
16pub(crate) const INDEX_KEY_BIT_OFFSET: usize = INDEX_FAMILY_BITS as usize;
17pub(crate) const MAX_TABLES: usize = 1usize << TABLE_PREFIX_BITS;
18pub(crate) const MAX_INDEX_SPECS: usize = (1usize << INDEX_SLOT_BITS) - 1;
19pub(crate) const STRING_KEY_INLINE_LIMIT: usize = 15;
20pub(crate) const STRING_KEY_TERMINATOR: u8 = 0x00;
21pub(crate) const STRING_KEY_ESCAPE_PREFIX: u8 = 0x01;
22pub(crate) const STRING_KEY_ESCAPE_FF: u8 = 0x02;
23pub(crate) const PAGE_SIZE: usize = 1_000;
24pub(crate) const BATCH_FLUSH_ROWS: usize = 2_048;
25pub(crate) const INDEX_BACKFILL_FLUSH_ENTRIES: usize = 4_096;
26
27#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
28pub struct IndexBackfillReport {
29 pub scanned_rows: u64,
30 pub indexes_backfilled: usize,
31 pub index_entries_written: u64,
32}
33
34#[derive(Debug, Clone, PartialEq, Eq)]
35pub struct IndexBackfillOptions {
36 pub row_batch_size: usize,
37 pub start_from_primary_key: Option<Key>,
38}
39
40impl Default for IndexBackfillOptions {
41 fn default() -> Self {
42 Self {
43 row_batch_size: PAGE_SIZE,
44 start_from_primary_key: None,
45 }
46 }
47}
48
49#[derive(Debug, Clone, PartialEq, Eq)]
50pub enum IndexBackfillEvent {
51 Started {
52 table_name: String,
53 indexes_backfilled: usize,
54 row_batch_size: usize,
55 start_cursor: Key,
56 },
57 Progress {
58 scanned_rows: u64,
59 index_entries_written: u64,
60 last_scanned_primary_key: Key,
61 next_cursor: Option<Key>,
62 },
63 Completed {
64 report: IndexBackfillReport,
65 },
66}
67
68#[derive(Debug, Clone, Copy, PartialEq, Eq)]
69pub(crate) enum ListElementKind {
70 Int64,
71 Float64,
72 Boolean,
73 Utf8,
74}
75
76#[derive(Debug, Clone, Copy, PartialEq, Eq)]
77pub(crate) enum ColumnKind {
78 Int64,
79 UInt64,
80 Float64,
81 Boolean,
82 Utf8,
83 Date32,
84 Date64,
85 Timestamp,
86 Decimal128,
87 Decimal256,
88 FixedSizeBinary(usize),
89 List(ListElementKind),
90}
91
92impl ColumnKind {
93 pub(crate) fn from_data_type(data_type: &DataType) -> Result<Self, String> {
94 match data_type {
95 DataType::Int64 => Ok(Self::Int64),
96 DataType::UInt64 => Ok(Self::UInt64),
97 DataType::Float64 => Ok(Self::Float64),
98 DataType::Boolean => Ok(Self::Boolean),
99 DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View => Ok(Self::Utf8),
100 DataType::Date32 => Ok(Self::Date32),
101 DataType::Date64 => Ok(Self::Date64),
102 DataType::Timestamp(_, _) => Ok(Self::Timestamp),
103 DataType::Decimal128(_, _) => Ok(Self::Decimal128),
104 DataType::Decimal256(_, _) => Ok(Self::Decimal256),
105 DataType::FixedSizeBinary(n) => Ok(Self::FixedSizeBinary(*n as usize)),
106 DataType::List(field) | DataType::LargeList(field) => {
107 let inner = Self::from_data_type(field.data_type())?;
108 let elem = match inner {
109 Self::Int64 => ListElementKind::Int64,
110 Self::Float64 => ListElementKind::Float64,
111 Self::Boolean => ListElementKind::Boolean,
112 Self::Utf8 => ListElementKind::Utf8,
113 _ => {
114 return Err(format!(
115 "unsupported list element type {:?}; \
116 list elements must be Int64, Float64, Boolean, or Utf8",
117 field.data_type()
118 ))
119 }
120 };
121 Ok(Self::List(elem))
122 }
123 other => Err(format!(
124 "unsupported column type {other:?}; supported: \
125 Int64, UInt64, Float64, Boolean, Utf8, Date32, Date64, Timestamp, \
126 Decimal128, Decimal256, FixedSizeBinary, List"
127 )),
128 }
129 }
130
131 pub(crate) fn fixed_key_width(self) -> Option<usize> {
132 match self {
133 Self::Int64 => Some(8),
134 Self::UInt64 => Some(8),
135 Self::Float64 => Some(8),
136 Self::Boolean => Some(1),
137 Self::Utf8 => None,
138 Self::Date32 => Some(4),
139 Self::Date64 => Some(8),
140 Self::Timestamp => Some(8),
141 Self::Decimal128 => Some(16),
142 Self::Decimal256 => Some(32),
143 Self::FixedSizeBinary(n) => Some(n),
144 Self::List(_) => None,
145 }
146 }
147
148 pub(crate) fn key_width(self) -> usize {
149 self.fixed_key_width()
150 .unwrap_or(STRING_KEY_INLINE_LIMIT + 1)
151 }
152
153 pub(crate) fn indexable(self) -> bool {
154 !matches!(self, Self::List(_))
155 }
156}
157
158#[derive(Debug, Clone)]
159pub struct TableColumnConfig {
160 pub name: String,
161 pub data_type: DataType,
162 pub nullable: bool,
163}
164
165impl TableColumnConfig {
166 pub fn new(name: impl Into<String>, data_type: DataType, nullable: bool) -> Self {
167 Self {
168 name: name.into(),
169 data_type,
170 nullable,
171 }
172 }
173}
174
175#[derive(Debug, Clone, Copy, PartialEq, Eq)]
176pub enum IndexLayout {
177 Lexicographic,
178 ZOrder,
179}
180
181#[derive(Debug, Clone)]
182pub struct IndexSpec {
183 name: String,
184 key_columns: Vec<String>,
185 cover_columns: Vec<String>,
186 layout: IndexLayout,
187}
188
189impl IndexSpec {
190 #[cfg(test)]
191 pub(crate) fn new(name: impl Into<String>, key_columns: Vec<String>) -> Result<Self, String> {
192 Self::lexicographic(name, key_columns)
193 }
194
195 pub fn lexicographic(
196 name: impl Into<String>,
197 key_columns: Vec<String>,
198 ) -> Result<Self, String> {
199 let name = name.into();
200 if name.trim().is_empty() {
201 return Err("index name must not be empty".to_string());
202 }
203 if key_columns.is_empty() {
204 return Err("key_columns must not be empty".to_string());
205 }
206 Ok(Self {
207 name,
208 key_columns,
209 cover_columns: Vec::new(),
210 layout: IndexLayout::Lexicographic,
211 })
212 }
213
214 pub fn z_order(name: impl Into<String>, key_columns: Vec<String>) -> Result<Self, String> {
215 Self::lexicographic(name, key_columns).map(|spec| spec.with_layout(IndexLayout::ZOrder))
216 }
217
218 pub fn with_cover_columns(mut self, cover_columns: Vec<String>) -> Self {
219 self.cover_columns = cover_columns;
220 self
221 }
222
223 pub fn with_layout(mut self, layout: IndexLayout) -> Self {
224 self.layout = layout;
225 self
226 }
227
228 pub fn name(&self) -> &str {
229 &self.name
230 }
231
232 pub fn key_columns(&self) -> &[String] {
233 &self.key_columns
234 }
235
236 pub fn cover_columns(&self) -> &[String] {
237 &self.cover_columns
238 }
239
240 pub fn layout(&self) -> &IndexLayout {
241 &self.layout
242 }
243}
244
245pub fn default_orders_index_specs() -> Vec<IndexSpec> {
246 vec![IndexSpec::lexicographic(
247 "region_customer",
248 vec!["region".to_string(), "customer_id".to_string()],
249 )
250 .expect("default orders index must be valid")]
251}
252
253#[derive(Debug, Clone)]
254pub(crate) struct KvTableConfig {
255 pub(crate) table_prefix: u8,
256 pub(crate) columns: Vec<TableColumnConfig>,
257 pub(crate) primary_key_columns: Vec<String>,
258 pub(crate) index_specs: Vec<IndexSpec>,
259}
260
261impl KvTableConfig {
262 pub(crate) fn new(
263 table_prefix: u8,
264 columns: Vec<TableColumnConfig>,
265 primary_key_columns: Vec<String>,
266 index_specs: Vec<IndexSpec>,
267 ) -> Result<Self, String> {
268 if usize::from(table_prefix) >= MAX_TABLES {
269 return Err(format!(
270 "table prefix {table_prefix} exceeds max {} for codec layout",
271 MAX_TABLES - 1
272 ));
273 }
274 if columns.is_empty() {
275 return Err("table config requires at least one column".to_string());
276 }
277 if primary_key_columns.is_empty() {
278 return Err("primary key must have at least one column".to_string());
279 }
280
281 let mut seen = HashSet::new();
282 let mut col_kinds = HashMap::new();
283 for col in &columns {
284 if col.name.trim().is_empty() {
285 return Err("column name must not be empty".to_string());
286 }
287 if !seen.insert(col.name.clone()) {
288 return Err(format!("duplicate column '{}'", col.name));
289 }
290 let kind = ColumnKind::from_data_type(&col.data_type)?;
291 col_kinds.insert(col.name.clone(), kind);
292 }
293
294 let mut total_pk_width = 0usize;
295 for pk_col in &primary_key_columns {
296 let kind = col_kinds
297 .get(pk_col)
298 .ok_or_else(|| format!("primary key column '{pk_col}' not found"))?;
299 match kind {
300 ColumnKind::Int64
301 | ColumnKind::UInt64
302 | ColumnKind::Utf8
303 | ColumnKind::FixedSizeBinary(_) => {}
304 _ => {
305 return Err(format!(
306 "primary key column '{pk_col}' must be Int64, UInt64, Utf8, or FixedSizeBinary"
307 ));
308 }
309 }
310 total_pk_width += kind.key_width();
311 }
312 if total_pk_width > primary_key_codec(table_prefix)?.payload_capacity_bytes() {
313 return Err(format!(
314 "composite primary key is too wide ({total_pk_width} bytes) for codec payload"
315 ));
316 }
317
318 Ok(Self {
319 table_prefix,
320 columns,
321 primary_key_columns,
322 index_specs,
323 })
324 }
325
326 pub(crate) fn to_schema(&self) -> SchemaRef {
327 Arc::new(Schema::new(
328 self.columns
329 .iter()
330 .map(|col| {
331 let dt = match &col.data_type {
332 DataType::Timestamp(_, tz) => {
333 DataType::Timestamp(TimeUnit::Microsecond, tz.clone())
334 }
335 DataType::LargeList(field) => DataType::List(field.clone()),
336 other => other.clone(),
337 };
338 Field::new(&col.name, dt, col.nullable)
339 })
340 .collect::<Vec<_>>(),
341 ))
342 }
343}
344
345#[derive(Debug, Clone)]
346pub(crate) struct ResolvedColumn {
347 pub(crate) name: String,
348 pub(crate) kind: ColumnKind,
349 pub(crate) nullable: bool,
350}
351
352#[derive(Debug, Clone)]
353pub(crate) struct ResolvedIndexSpec {
354 pub(crate) id: u8,
355 pub(crate) codec: KeyCodec,
356 pub(crate) name: String,
357 pub(crate) layout: IndexLayout,
358 pub(crate) key_columns: Vec<usize>,
359 pub(crate) value_column_mask: Vec<bool>,
360 pub(crate) key_columns_width: usize,
361}
362
363#[derive(Debug, Clone)]
364pub(crate) struct TableModel {
365 pub(crate) table_prefix: u8,
366 pub(crate) primary_key_codec: KeyCodec,
367 pub(crate) schema: SchemaRef,
368 pub(crate) columns: Vec<ResolvedColumn>,
369 pub(crate) columns_by_name: HashMap<String, usize>,
370 pub(crate) primary_key_indices: Vec<usize>,
371 pub(crate) primary_key_kinds: Vec<ColumnKind>,
372 pub(crate) primary_key_width: usize,
373}
374
375impl TableModel {
376 pub(crate) fn from_config(config: &KvTableConfig) -> Result<Self, String> {
377 let schema = config.to_schema();
378 let mut columns = Vec::with_capacity(config.columns.len());
379 let mut columns_by_name = HashMap::with_capacity(config.columns.len());
380
381 for (idx, col) in config.columns.iter().enumerate() {
382 let kind = ColumnKind::from_data_type(&col.data_type)?;
383 columns.push(ResolvedColumn {
384 name: col.name.clone(),
385 kind,
386 nullable: col.nullable,
387 });
388 columns_by_name.insert(col.name.clone(), idx);
389 }
390
391 let mut primary_key_indices = Vec::with_capacity(config.primary_key_columns.len());
392 let mut primary_key_kinds = Vec::with_capacity(config.primary_key_columns.len());
393 let mut primary_key_width = 0usize;
394 for pk_col in &config.primary_key_columns {
395 let idx = *columns_by_name
396 .get(pk_col)
397 .ok_or_else(|| format!("primary key column '{pk_col}' not found"))?;
398 let kind = columns[idx].kind;
399 primary_key_indices.push(idx);
400 primary_key_kinds.push(kind);
401 primary_key_width += kind.key_width();
402 }
403
404 Ok(Self {
405 table_prefix: config.table_prefix,
406 primary_key_codec: primary_key_codec(config.table_prefix)?,
407 schema,
408 columns,
409 columns_by_name,
410 primary_key_indices,
411 primary_key_kinds,
412 primary_key_width,
413 })
414 }
415
416 pub(crate) fn is_pk_column(&self, col_idx: usize) -> bool {
418 self.primary_key_indices.contains(&col_idx)
419 }
420
421 pub(crate) fn pk_position(&self, col_idx: usize) -> Option<usize> {
422 self.primary_key_indices
423 .iter()
424 .position(|&idx| idx == col_idx)
425 }
426
427 pub(crate) fn resolve_index_specs(
428 &self,
429 specs: &[IndexSpec],
430 ) -> Result<Vec<ResolvedIndexSpec>, String> {
431 let mut out = Vec::with_capacity(specs.len());
432 let mut names = HashSet::new();
433
434 for (idx, spec) in specs.iter().enumerate() {
435 if !names.insert(spec.name.clone()) {
436 return Err(format!("duplicate index name '{}'", spec.name));
437 }
438
439 let id = u8::try_from(idx + 1).map_err(|_| {
440 format!("too many index specs for codec layout (max {MAX_INDEX_SPECS})")
441 })?;
442 if usize::from(id) > MAX_INDEX_SPECS {
443 return Err(format!(
444 "too many index specs for codec layout (max {MAX_INDEX_SPECS})"
445 ));
446 }
447 let mut key_columns = Vec::with_capacity(spec.key_columns.len());
448 let mut key_columns_width = 0usize;
449 let mut value_column_mask = vec![false; self.columns.len()];
450 for col_name in &spec.key_columns {
451 let Some(col_idx) = self.columns_by_name.get(col_name).copied() else {
452 return Err(format!(
453 "index '{}' references unknown column '{}'",
454 spec.name, col_name
455 ));
456 };
457 if !self.columns[col_idx].kind.indexable() {
458 return Err(format!(
459 "index '{}' references non-indexable column '{}'",
460 spec.name, col_name
461 ));
462 }
463 if self.columns[col_idx].nullable {
464 return Err(format!(
465 "index '{}' references nullable column '{}'; \
466 nullable columns cannot be used in index keys",
467 spec.name, col_name
468 ));
469 }
470 key_columns.push(col_idx);
471 key_columns_width += self.columns[col_idx].kind.key_width();
472 if !self.is_pk_column(col_idx) {
473 value_column_mask[col_idx] = true;
474 }
475 }
476
477 for col_name in &spec.cover_columns {
478 let Some(col_idx) = self.columns_by_name.get(col_name).copied() else {
479 return Err(format!(
480 "index '{}' cover list references unknown column '{}'",
481 spec.name, col_name
482 ));
483 };
484 if self.is_pk_column(col_idx) {
485 return Err(format!(
486 "index '{}' cover column '{}' is a primary key column; \
487 PK columns are always available from key bytes",
488 spec.name, col_name
489 ));
490 }
491 if !value_column_mask[col_idx] {
492 value_column_mask[col_idx] = true;
493 }
494 }
495 let codec = secondary_index_codec(self.table_prefix, id)?;
496 if key_columns_width + self.primary_key_width > codec.payload_capacity_bytes() {
497 return Err(format!(
498 "index '{}' key layout too wide for codec payload",
499 spec.name
500 ));
501 }
502
503 out.push(ResolvedIndexSpec {
504 id,
505 codec,
506 name: spec.name.clone(),
507 layout: spec.layout,
508 key_columns,
509 value_column_mask,
510 key_columns_width,
511 });
512 }
513
514 Ok(out)
515 }
516
517 pub(crate) fn column(&self, index: usize) -> &ResolvedColumn {
518 &self.columns[index]
519 }
520}
521
522#[derive(Debug, Clone)]
523pub enum CellValue {
524 Null,
525 Int64(i64),
526 UInt64(u64),
527 Float64(f64),
528 Boolean(bool),
529 Date32(i32),
530 Date64(i64),
531 Timestamp(i64),
532 Decimal128(i128),
533 Decimal256(i256),
534 Utf8(String),
535 FixedBinary(Vec<u8>),
536 List(Vec<CellValue>),
537}
538
539#[derive(Debug, Clone)]
540pub(crate) struct KvRow {
541 pub(crate) values: Vec<CellValue>,
542}
543
544impl KvRow {
545 pub(crate) fn primary_key_values(&self, model: &TableModel) -> Vec<&CellValue> {
546 model
547 .primary_key_indices
548 .iter()
549 .map(|&idx| &self.values[idx])
550 .collect()
551 }
552
553 pub(crate) fn value_at(&self, idx: usize) -> &CellValue {
554 &self.values[idx]
555 }
556}
557
558#[derive(Debug, Clone, Default)]
559pub(crate) struct DecodedIndexEntry {
560 pub(crate) primary_key: Key,
561 pub(crate) primary_key_values: Vec<CellValue>,
562 pub(crate) values: HashMap<usize, CellValue>,
563}
564
565#[derive(Debug, Clone, PartialEq)]
566pub(crate) struct KeyRange {
567 pub(crate) start: Key,
568 pub(crate) end: Key,
569}
570
571#[derive(Debug, Clone)]
572pub(crate) struct IndexPlan {
573 pub(crate) spec_idx: usize,
574 pub(crate) ranges: Vec<KeyRange>,
575 pub(crate) constrained_prefix_len: usize,
576 pub(crate) constrained_column_count: usize,
577}
578
579#[derive(Debug, Clone)]
580pub(crate) struct KvTable {
581 pub(crate) client: StoreClient,
582 pub(crate) model: Arc<TableModel>,
583 pub(crate) index_specs: Arc<Vec<ResolvedIndexSpec>>,
584}
585
586impl KvTable {
587 pub(crate) fn new(client: StoreClient, config: KvTableConfig) -> Result<Self, String> {
588 let model = Arc::new(TableModel::from_config(&config)?);
589 let index_specs = Arc::new(model.resolve_index_specs(&config.index_specs)?);
590 Ok(Self {
591 client,
592 model,
593 index_specs,
594 })
595 }
596}