use crate::constants::FIELD_ID_META_KEY;
use crate::reserved::{MVCC_CREATED_BY_FIELD_ID, MVCC_DELETED_BY_FIELD_ID, ROW_ID_FIELD_ID};
use crate::types::FieldId;
use arrow::datatypes::Schema;
use rustc_hash::FxHashMap;
use std::sync::Arc;
#[derive(Debug, Clone)]
pub struct CachedSchema {
schema: Arc<Schema>,
field_ids: Vec<Option<FieldId>>,
id_to_index: FxHashMap<FieldId, usize>,
system_columns: SystemColumnPresence,
}
impl CachedSchema {
pub fn new(schema: Arc<Schema>) -> Self {
let field_count = schema.fields().len();
let mut field_ids = Vec::with_capacity(field_count);
let mut id_to_index = FxHashMap::default();
let mut system_columns = SystemColumnPresence::default();
for (idx, field) in schema.fields().iter().enumerate() {
let field_id = field
.metadata()
.get(FIELD_ID_META_KEY)
.and_then(|s| s.parse::<FieldId>().ok());
if let Some(fid) = field_id {
id_to_index.insert(fid, idx);
match fid {
ROW_ID_FIELD_ID => system_columns.has_row_id = true,
MVCC_CREATED_BY_FIELD_ID => system_columns.has_created_by = true,
MVCC_DELETED_BY_FIELD_ID => system_columns.has_deleted_by = true,
_ => {}
}
}
field_ids.push(field_id);
}
Self {
schema,
field_ids,
id_to_index,
system_columns,
}
}
#[inline]
pub fn schema(&self) -> &Arc<Schema> {
&self.schema
}
#[inline]
pub fn field_id(&self, field_index: usize) -> Option<FieldId> {
self.field_ids[field_index]
}
#[inline]
pub fn index_of_field_id(&self, field_id: FieldId) -> Option<usize> {
self.id_to_index.get(&field_id).copied()
}
#[inline]
pub fn system_columns(&self) -> SystemColumnPresence {
self.system_columns
}
#[inline]
pub fn field_count(&self) -> usize {
self.field_ids.len()
}
#[inline]
pub fn has_field_id(&self, field_index: usize) -> bool {
self.field_ids.get(field_index).and_then(|&id| id).is_some()
}
}
#[derive(Debug, Clone, Copy, Default)]
pub struct SystemColumnPresence {
pub has_row_id: bool,
pub has_created_by: bool,
pub has_deleted_by: bool,
}
impl SystemColumnPresence {
#[inline]
pub fn has_full_mvcc(&self) -> bool {
self.has_created_by && self.has_deleted_by
}
#[inline]
pub fn has_any_mvcc(&self) -> bool {
self.has_created_by || self.has_deleted_by
}
}
#[cfg(test)]
mod tests {
use super::*;
use arrow::datatypes::{DataType, Field};
use std::collections::HashMap;
fn make_field(name: &str, field_id: FieldId) -> Field {
let mut metadata = HashMap::new();
metadata.insert(FIELD_ID_META_KEY.to_string(), field_id.to_string());
Field::new(name, DataType::Utf8, false).with_metadata(metadata)
}
#[test]
fn test_cached_schema_field_id_lookup() {
let fields = vec![
make_field("name", 1),
make_field("email", 2),
make_field("age", 3),
];
let schema = Arc::new(Schema::new(fields));
let cached = CachedSchema::new(schema);
assert_eq!(cached.field_id(0), Some(1));
assert_eq!(cached.field_id(1), Some(2));
assert_eq!(cached.field_id(2), Some(3));
assert_eq!(cached.index_of_field_id(1), Some(0));
assert_eq!(cached.index_of_field_id(2), Some(1));
assert_eq!(cached.index_of_field_id(3), Some(2));
assert_eq!(cached.index_of_field_id(999), None);
}
#[test]
fn test_cached_schema_system_columns() {
let fields = vec![
make_field("row_id", ROW_ID_FIELD_ID),
make_field("name", 1),
make_field("created_by", MVCC_CREATED_BY_FIELD_ID),
make_field("deleted_by", MVCC_DELETED_BY_FIELD_ID),
];
let schema = Arc::new(Schema::new(fields));
let cached = CachedSchema::new(schema);
let presence = cached.system_columns();
assert!(presence.has_row_id);
assert!(presence.has_created_by);
assert!(presence.has_deleted_by);
assert!(presence.has_full_mvcc());
assert!(presence.has_any_mvcc());
}
#[test]
fn test_cached_schema_no_system_columns() {
let fields = vec![make_field("name", 1), make_field("email", 2)];
let schema = Arc::new(Schema::new(fields));
let cached = CachedSchema::new(schema);
let presence = cached.system_columns();
assert!(!presence.has_row_id);
assert!(!presence.has_created_by);
assert!(!presence.has_deleted_by);
assert!(!presence.has_full_mvcc());
assert!(!presence.has_any_mvcc());
}
#[test]
fn test_cached_schema_partial_mvcc() {
let fields = vec![
make_field("row_id", ROW_ID_FIELD_ID),
make_field("name", 1),
make_field("created_by", MVCC_CREATED_BY_FIELD_ID),
];
let schema = Arc::new(Schema::new(fields));
let cached = CachedSchema::new(schema);
let presence = cached.system_columns();
assert!(presence.has_row_id);
assert!(presence.has_created_by);
assert!(!presence.has_deleted_by);
assert!(!presence.has_full_mvcc());
assert!(presence.has_any_mvcc());
}
#[test]
fn test_cached_schema_field_without_id() {
let field_no_id = Field::new("temp", DataType::Utf8, false);
let field_with_id = make_field("name", 1);
let fields = vec![field_no_id, field_with_id];
let schema = Arc::new(Schema::new(fields));
let cached = CachedSchema::new(schema);
assert_eq!(cached.field_id(0), None);
assert!(!cached.has_field_id(0));
assert_eq!(cached.field_id(1), Some(1));
assert!(cached.has_field_id(1));
assert_eq!(cached.index_of_field_id(1), Some(1));
}
#[test]
fn test_cached_schema_field_count() {
let fields = vec![
make_field("name", 1),
make_field("email", 2),
make_field("age", 3),
];
let schema = Arc::new(Schema::new(fields));
let cached = CachedSchema::new(schema);
assert_eq!(cached.field_count(), 3);
assert_eq!(cached.schema().fields().len(), 3);
}
#[test]
fn test_cached_schema_clone() {
let fields = vec![make_field("name", 1)];
let schema = Arc::new(Schema::new(fields));
let cached1 = CachedSchema::new(schema);
let cached2 = cached1.clone();
assert_eq!(cached1.field_id(0), cached2.field_id(0));
assert!(
Arc::ptr_eq(cached1.schema(), cached2.schema()),
"Schema Arc should be shared"
);
}
}