use reifydb_core::{
interface::catalog::{
column::ColumnIndex,
id::{NamespaceId, RingBufferId, SeriesId, TableId, ViewId},
series::SeriesKey,
view::{
View, ViewKind,
ViewKind::{Deferred, Transactional},
ViewStorageKind,
},
},
key::{namespace_view::NamespaceViewKey, view::ViewKey},
};
use reifydb_transaction::transaction::{Transaction, admin::AdminTransaction};
use reifydb_type::{
fragment::Fragment,
value::{constraint::TypeConstraint, sumtype::SumTypeId},
};
use crate::{
CatalogStore, Result,
error::{CatalogError, CatalogObjectKind},
store::{
column::create::ColumnToCreate,
sequence::system::SystemSequence,
view::shape::{view, view_namespace},
},
};
#[derive(Debug, Clone)]
pub struct ViewColumnToCreate {
pub name: Fragment,
pub fragment: Fragment,
pub constraint: TypeConstraint,
}
#[derive(Debug, Clone)]
pub enum ViewStorageConfig {
Table {
underlying: TableId,
},
RingBuffer {
underlying: RingBufferId,
capacity: u64,
propagate_evictions: bool,
},
Series {
underlying: SeriesId,
key: SeriesKey,
tag: Option<SumTypeId>,
},
}
impl Default for ViewStorageConfig {
fn default() -> Self {
ViewStorageConfig::Table {
underlying: TableId(0),
}
}
}
#[derive(Debug, Clone)]
pub struct ViewToCreate {
pub name: Fragment,
pub namespace: NamespaceId,
pub columns: Vec<ViewColumnToCreate>,
pub storage: ViewStorageConfig,
}
impl CatalogStore {
pub(crate) fn create_deferred_view(txn: &mut AdminTransaction, to_create: ViewToCreate) -> Result<View> {
Self::create_view(txn, to_create, Deferred)
}
pub(crate) fn create_transactional_view(txn: &mut AdminTransaction, to_create: ViewToCreate) -> Result<View> {
Self::create_view(txn, to_create, Transactional)
}
fn create_view(txn: &mut AdminTransaction, to_create: ViewToCreate, kind: ViewKind) -> Result<View> {
let namespace_id = to_create.namespace;
if let Some(view) = CatalogStore::find_view_by_name(
&mut Transaction::Admin(&mut *txn),
namespace_id,
to_create.name.text(),
)? {
let namespace = CatalogStore::get_namespace(&mut Transaction::Admin(&mut *txn), namespace_id)?;
return Err(CatalogError::AlreadyExists {
kind: CatalogObjectKind::View,
namespace: namespace.name().to_string(),
name: view.name().to_string(),
fragment: to_create.name.clone(),
}
.into());
}
let view_id = SystemSequence::next_view_id(txn)?;
Self::store_view(txn, view_id, namespace_id, &to_create, kind)?;
Self::link_view_to_namespace(txn, namespace_id, view_id, to_create.name.text())?;
Self::insert_columns_for_view(txn, view_id, to_create)?;
Self::get_view(&mut Transaction::Admin(&mut *txn), view_id)
}
fn store_view(
txn: &mut AdminTransaction,
view: ViewId,
namespace: NamespaceId,
to_create: &ViewToCreate,
kind: ViewKind,
) -> Result<()> {
let mut row = view::SHAPE.allocate();
view::SHAPE.set_u64(&mut row, view::ID, view);
view::SHAPE.set_u64(&mut row, view::NAMESPACE, namespace);
view::SHAPE.set_utf8(&mut row, view::NAME, to_create.name.text());
view::SHAPE.set_u8(
&mut row,
view::KIND,
match kind {
Deferred => 0,
Transactional => 1,
},
);
view::SHAPE.set_u64(&mut row, view::PRIMARY_KEY, 0u64);
match &to_create.storage {
ViewStorageConfig::Table {
underlying,
} => {
view::SHAPE.set_u8(&mut row, view::STORAGE_KIND, ViewStorageKind::Table as u8);
view::SHAPE.set_u64(&mut row, view::UNDERLYING_SHAPE_ID, *underlying);
view::SHAPE.set_u64(&mut row, view::CAPACITY, 0u64);
view::SHAPE.set_u8(&mut row, view::PROPAGATE_EVICTIONS, 0u8);
view::SHAPE.set_utf8(&mut row, view::KEY_COLUMN, "");
view::SHAPE.set_u8(&mut row, view::KEY_KIND, 0u8);
view::SHAPE.set_u8(&mut row, view::PRECISION, 0u8);
view::SHAPE.set_u64(&mut row, view::TAG_ID, 0u64);
}
ViewStorageConfig::RingBuffer {
underlying,
capacity,
propagate_evictions,
} => {
view::SHAPE.set_u8(&mut row, view::STORAGE_KIND, ViewStorageKind::RingBuffer as u8);
view::SHAPE.set_u64(&mut row, view::UNDERLYING_SHAPE_ID, *underlying);
view::SHAPE.set_u64(&mut row, view::CAPACITY, *capacity);
view::SHAPE.set_u8(
&mut row,
view::PROPAGATE_EVICTIONS,
if *propagate_evictions {
1
} else {
0
},
);
view::SHAPE.set_utf8(&mut row, view::KEY_COLUMN, "");
view::SHAPE.set_u8(&mut row, view::KEY_KIND, 0u8);
view::SHAPE.set_u8(&mut row, view::PRECISION, 0u8);
view::SHAPE.set_u64(&mut row, view::TAG_ID, 0u64);
}
ViewStorageConfig::Series {
underlying,
key,
tag,
} => {
view::SHAPE.set_u8(&mut row, view::STORAGE_KIND, ViewStorageKind::Series as u8);
view::SHAPE.set_u64(&mut row, view::UNDERLYING_SHAPE_ID, *underlying);
view::SHAPE.set_u64(&mut row, view::CAPACITY, 0u64);
view::SHAPE.set_u8(&mut row, view::PROPAGATE_EVICTIONS, 0u8);
view::SHAPE.set_utf8(&mut row, view::KEY_COLUMN, key.column());
let (key_kind_u8, precision_u8) = match key {
SeriesKey::DateTime {
precision,
..
} => (0u8, *precision as u8),
SeriesKey::Integer {
..
} => (1u8, 0u8),
};
view::SHAPE.set_u8(&mut row, view::KEY_KIND, key_kind_u8);
view::SHAPE.set_u8(&mut row, view::PRECISION, precision_u8);
view::SHAPE.set_u64(&mut row, view::TAG_ID, tag.map(|t| t.0).unwrap_or(0));
}
}
txn.set(&ViewKey::encoded(view), row)?;
Ok(())
}
fn link_view_to_namespace(
txn: &mut AdminTransaction,
namespace: NamespaceId,
view: ViewId,
name: &str,
) -> Result<()> {
let mut row = view_namespace::SHAPE.allocate();
view_namespace::SHAPE.set_u64(&mut row, view_namespace::ID, view);
view_namespace::SHAPE.set_utf8(&mut row, view_namespace::NAME, name);
txn.set(&NamespaceViewKey::encoded(namespace, view), row)?;
Ok(())
}
fn insert_columns_for_view(txn: &mut AdminTransaction, view: ViewId, to_create: ViewToCreate) -> Result<()> {
let namespace = Self::get_namespace(&mut Transaction::Admin(&mut *txn), to_create.namespace)?;
for (idx, column_to_create) in to_create.columns.into_iter().enumerate() {
Self::create_column(
txn,
view,
ColumnToCreate {
fragment: Some(column_to_create.fragment.clone()),
namespace_name: namespace.name().to_string(),
shape_name: to_create.name.text().to_string(),
column: column_to_create.name.text().to_string(),
constraint: column_to_create.constraint.clone(),
properties: vec![],
index: ColumnIndex(idx as u8),
auto_increment: false,
dictionary_id: None, },
)?;
}
Ok(())
}
}
#[cfg(test)]
pub mod tests {
use reifydb_core::{
interface::catalog::id::{NamespaceId, ViewId},
key::namespace_view::NamespaceViewKey,
};
use reifydb_engine::test_harness::create_test_admin_transaction;
use reifydb_type::fragment::Fragment;
use super::ViewStorageConfig;
use crate::{
CatalogStore,
store::view::{create::ViewToCreate, shape::view_namespace},
test_utils::ensure_test_namespace,
};
#[test]
fn test_create_deferred_view() {
let mut txn = create_test_admin_transaction();
let namespace = ensure_test_namespace(&mut txn);
let to_create = ViewToCreate {
namespace: namespace.id(),
name: Fragment::internal("test_view"),
columns: vec![],
storage: ViewStorageConfig::default(),
};
let result = CatalogStore::create_deferred_view(&mut txn, to_create.clone()).unwrap();
assert_eq!(result.id(), ViewId(16385));
assert_eq!(result.namespace(), NamespaceId(16385));
assert_eq!(result.name(), "test_view");
let err = CatalogStore::create_deferred_view(&mut txn, to_create).unwrap_err();
assert_eq!(err.diagnostic().code, "CA_003");
}
#[test]
fn test_view_linked_to_namespace() {
let mut txn = create_test_admin_transaction();
let namespace = ensure_test_namespace(&mut txn);
let to_create = ViewToCreate {
namespace: namespace.id(),
name: Fragment::internal("test_view"),
columns: vec![],
storage: ViewStorageConfig::default(),
};
CatalogStore::create_deferred_view(&mut txn, to_create).unwrap();
let to_create = ViewToCreate {
namespace: namespace.id(),
name: Fragment::internal("another_view"),
columns: vec![],
storage: ViewStorageConfig::default(),
};
CatalogStore::create_deferred_view(&mut txn, to_create).unwrap();
let links: Vec<_> = txn
.range(NamespaceViewKey::full_scan(namespace.id()), 1024)
.unwrap()
.collect::<Result<Vec<_>, _>>()
.unwrap();
assert_eq!(links.len(), 2);
let link = &links[1];
let row = &link.row;
assert_eq!(view_namespace::SHAPE.get_u64(row, view_namespace::ID), 16385);
assert_eq!(view_namespace::SHAPE.get_utf8(row, view_namespace::NAME), "test_view");
let link = &links[0];
let row = &link.row;
assert_eq!(view_namespace::SHAPE.get_u64(row, view_namespace::ID), 16386);
assert_eq!(view_namespace::SHAPE.get_utf8(row, view_namespace::NAME), "another_view");
}
#[test]
fn test_create_deferred_view_missing_namespace() {
let mut txn = create_test_admin_transaction();
let to_create = ViewToCreate {
namespace: NamespaceId(999), name: Fragment::internal("my_view"),
columns: vec![],
storage: ViewStorageConfig::default(),
};
CatalogStore::create_deferred_view(&mut txn, to_create).unwrap_err();
}
}