use reifydb_core::{
interface::catalog::{
id::{SubscriptionColumnId, SubscriptionId},
subscription::{Subscription, SubscriptionColumn},
},
key::{subscription::SubscriptionKey, subscription_column::SubscriptionColumnKey},
};
use reifydb_transaction::transaction::{Transaction, admin::AdminTransaction};
use reifydb_type::value::r#type::Type;
use crate::{
CatalogStore, Result,
store::{
sequence::flow::next_flow_id,
subscription::shape::{subscription, subscription_column},
},
};
#[derive(Debug, Clone)]
pub struct SubscriptionColumnToCreate {
pub name: String,
pub ty: Type,
}
#[derive(Debug, Clone)]
pub struct SubscriptionToCreate {
pub columns: Vec<SubscriptionColumnToCreate>,
}
impl CatalogStore {
pub(crate) fn create_subscription(
txn: &mut AdminTransaction,
to_create: SubscriptionToCreate,
) -> Result<Subscription> {
let flow_id = next_flow_id(txn)?;
let subscription_id = SubscriptionId(flow_id.0);
Self::store_subscription(txn, subscription_id)?;
Self::insert_columns_for_subscription(txn, subscription_id, &to_create)?;
Self::get_subscription(&mut Transaction::Admin(&mut *txn), subscription_id)
}
fn store_subscription(txn: &mut AdminTransaction, subscription: SubscriptionId) -> Result<()> {
let mut row = subscription::SHAPE.allocate();
subscription::SHAPE.set_u64(&mut row, subscription::ID, subscription.0);
subscription::SHAPE.set_u64(&mut row, subscription::ACKNOWLEDGED_VERSION, 0u64);
subscription::SHAPE.set_u64(&mut row, subscription::PRIMARY_KEY, 0u64);
txn.set(&SubscriptionKey::encoded(subscription), row)?;
Ok(())
}
fn insert_columns_for_subscription(
txn: &mut AdminTransaction,
subscription: SubscriptionId,
to_create: &SubscriptionToCreate,
) -> Result<()> {
for (idx, column_to_create) in to_create.columns.iter().enumerate() {
let column_id = SubscriptionColumnId(idx as u64);
let mut row = subscription_column::SHAPE.allocate();
subscription_column::SHAPE.set_u64(&mut row, subscription_column::ID, column_id);
subscription_column::SHAPE.set_utf8(
&mut row,
subscription_column::NAME,
&column_to_create.name,
);
subscription_column::SHAPE.set_u8(
&mut row,
subscription_column::TYPE,
column_to_create.ty.to_u8(),
);
txn.set(&SubscriptionColumnKey::encoded(subscription, column_id), row)?;
}
Ok(())
}
pub(crate) fn list_subscription_columns(
txn: &mut Transaction<'_>,
subscription: SubscriptionId,
) -> Result<Vec<SubscriptionColumn>> {
let mut stream = txn.range(SubscriptionColumnKey::subscription_range(subscription), 256)?;
let mut columns = Vec::new();
while let Some(result) = stream.next() {
let multi = result?;
let row = &multi.row;
let id = SubscriptionColumnId(subscription_column::SHAPE.get_u64(row, subscription_column::ID));
let name = subscription_column::SHAPE.get_utf8(row, subscription_column::NAME).to_string();
let ty_u8 = subscription_column::SHAPE.get_u8(row, subscription_column::TYPE);
let ty = Type::from_u8(ty_u8);
columns.push(SubscriptionColumn {
id,
name,
ty,
});
}
columns.sort_by_key(|c| c.id.0);
Ok(columns)
}
}
#[cfg(test)]
pub mod tests {
use reifydb_core::interface::catalog::id::SubscriptionColumnId;
use reifydb_engine::test_harness::create_test_admin_transaction;
use reifydb_type::value::r#type::Type;
use crate::{
CatalogStore,
store::subscription::create::{SubscriptionColumnToCreate, SubscriptionToCreate},
};
#[test]
fn test_create_subscription() {
let mut txn = create_test_admin_transaction();
let to_create = SubscriptionToCreate {
columns: vec![],
};
let result = CatalogStore::create_subscription(&mut txn, to_create).unwrap();
assert_eq!(result.acknowledged_version.0, 0);
assert!(result.columns.is_empty());
}
#[test]
fn test_create_subscription_with_columns() {
let mut txn = create_test_admin_transaction();
let to_create = SubscriptionToCreate {
columns: vec![
SubscriptionColumnToCreate {
name: "id".to_string(),
ty: Type::Int8,
},
SubscriptionColumnToCreate {
name: "name".to_string(),
ty: Type::Utf8,
},
],
};
let result = CatalogStore::create_subscription(&mut txn, to_create).unwrap();
assert_eq!(result.columns.len(), 2);
assert_eq!(result.columns[0].id, SubscriptionColumnId(0));
assert_eq!(result.columns[0].name, "id");
assert_eq!(result.columns[0].ty, Type::Int8);
assert_eq!(result.columns[1].id, SubscriptionColumnId(1));
assert_eq!(result.columns[1].name, "name");
assert_eq!(result.columns[1].ty, Type::Utf8);
}
#[test]
fn test_create_multiple_subscriptions() {
let mut txn = create_test_admin_transaction();
let sub1 = CatalogStore::create_subscription(
&mut txn,
SubscriptionToCreate {
columns: vec![],
},
)
.unwrap();
let sub2 = CatalogStore::create_subscription(
&mut txn,
SubscriptionToCreate {
columns: vec![],
},
)
.unwrap();
assert_ne!(sub1.id, sub2.id);
}
}