mod check_eval;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use std::sync::atomic::{AtomicU32, Ordering};
use parking_lot::{Mutex, RwLock};
use grafeo_common::collections::{GrafeoConcurrentMap, grafeo_concurrent_map};
use grafeo_common::types::{EdgeTypeId, IndexId, LabelId, PropertyKeyId, Value};
pub struct Catalog {
labels: LabelCatalog,
property_keys: PropertyCatalog,
edge_types: EdgeTypeCatalog,
indexes: IndexCatalog,
schema: Option<SchemaCatalog>,
}
impl Catalog {
#[must_use]
pub fn new() -> Self {
Self {
labels: LabelCatalog::new(),
property_keys: PropertyCatalog::new(),
edge_types: EdgeTypeCatalog::new(),
indexes: IndexCatalog::new(),
schema: Some(SchemaCatalog::new()),
}
}
#[must_use]
pub fn with_schema() -> Self {
Self::new()
}
pub fn get_or_create_label(&self, name: &str) -> LabelId {
self.labels.get_or_create(name)
}
#[must_use]
pub fn get_label_id(&self, name: &str) -> Option<LabelId> {
self.labels.get_id(name)
}
#[must_use]
pub fn get_label_name(&self, id: LabelId) -> Option<Arc<str>> {
self.labels.get_name(id)
}
#[must_use]
pub fn label_count(&self) -> usize {
self.labels.count()
}
#[must_use]
pub fn all_labels(&self) -> Vec<Arc<str>> {
self.labels.all_names()
}
pub fn get_or_create_property_key(&self, name: &str) -> PropertyKeyId {
self.property_keys.get_or_create(name)
}
#[must_use]
pub fn get_property_key_id(&self, name: &str) -> Option<PropertyKeyId> {
self.property_keys.get_id(name)
}
#[must_use]
pub fn get_property_key_name(&self, id: PropertyKeyId) -> Option<Arc<str>> {
self.property_keys.get_name(id)
}
#[must_use]
pub fn property_key_count(&self) -> usize {
self.property_keys.count()
}
#[must_use]
pub fn all_property_keys(&self) -> Vec<Arc<str>> {
self.property_keys.all_names()
}
pub fn get_or_create_edge_type(&self, name: &str) -> EdgeTypeId {
self.edge_types.get_or_create(name)
}
#[must_use]
pub fn get_edge_type_id(&self, name: &str) -> Option<EdgeTypeId> {
self.edge_types.get_id(name)
}
#[must_use]
pub fn get_edge_type_name(&self, id: EdgeTypeId) -> Option<Arc<str>> {
self.edge_types.get_name(id)
}
#[must_use]
pub fn edge_type_count(&self) -> usize {
self.edge_types.count()
}
#[must_use]
pub fn all_edge_types(&self) -> Vec<Arc<str>> {
self.edge_types.all_names()
}
pub fn create_index(
&self,
name: &str,
label: LabelId,
property_key: PropertyKeyId,
index_type: IndexType,
) -> IndexId {
self.indexes.create(name, label, property_key, index_type)
}
pub fn drop_index(&self, id: IndexId) -> bool {
self.indexes.drop(id)
}
#[must_use]
pub fn find_index_by_name(&self, name: &str) -> Option<IndexId> {
self.indexes.find_by_name(name)
}
#[must_use]
pub fn get_index(&self, id: IndexId) -> Option<IndexDefinition> {
self.indexes.get(id)
}
#[must_use]
pub fn indexes_for_label(&self, label: LabelId) -> Vec<IndexId> {
self.indexes.for_label(label)
}
#[must_use]
pub fn indexes_for_label_property(
&self,
label: LabelId,
property_key: PropertyKeyId,
) -> Vec<IndexId> {
self.indexes.for_label_property(label, property_key)
}
#[must_use]
pub fn all_indexes(&self) -> Vec<IndexDefinition> {
self.indexes.all()
}
#[must_use]
pub fn index_count(&self) -> usize {
self.indexes.count()
}
#[must_use]
pub fn has_schema(&self) -> bool {
self.schema.is_some()
}
pub fn add_unique_constraint(
&self,
label: LabelId,
property_key: PropertyKeyId,
) -> Result<(), CatalogError> {
match &self.schema {
Some(schema) => schema.add_unique_constraint(label, property_key),
None => Err(CatalogError::SchemaNotEnabled),
}
}
pub fn add_required_property(
&self,
label: LabelId,
property_key: PropertyKeyId,
) -> Result<(), CatalogError> {
match &self.schema {
Some(schema) => schema.add_required_property(label, property_key),
None => Err(CatalogError::SchemaNotEnabled),
}
}
#[must_use]
pub fn is_property_required(&self, label: LabelId, property_key: PropertyKeyId) -> bool {
self.schema
.as_ref()
.is_some_and(|s| s.is_property_required(label, property_key))
}
#[must_use]
pub fn is_property_unique(&self, label: LabelId, property_key: PropertyKeyId) -> bool {
self.schema
.as_ref()
.is_some_and(|s| s.is_property_unique(label, property_key))
}
#[must_use]
pub fn schema(&self) -> Option<&SchemaCatalog> {
self.schema.as_ref()
}
pub fn register_node_type(&self, def: NodeTypeDefinition) -> Result<(), CatalogError> {
match &self.schema {
Some(schema) => schema.register_node_type(def),
None => Err(CatalogError::SchemaNotEnabled),
}
}
pub fn register_or_replace_node_type(&self, def: NodeTypeDefinition) {
if let Some(schema) = &self.schema {
schema.register_or_replace_node_type(def);
}
}
pub fn drop_node_type(&self, name: &str) -> Result<(), CatalogError> {
match &self.schema {
Some(schema) => schema.drop_node_type(name),
None => Err(CatalogError::SchemaNotEnabled),
}
}
#[must_use]
pub fn get_node_type(&self, name: &str) -> Option<NodeTypeDefinition> {
self.schema.as_ref().and_then(|s| s.get_node_type(name))
}
#[must_use]
pub fn resolved_node_type(&self, name: &str) -> Option<NodeTypeDefinition> {
self.schema
.as_ref()
.and_then(|s| s.resolved_node_type(name))
}
#[must_use]
pub fn all_node_type_names(&self) -> Vec<String> {
self.schema
.as_ref()
.map(SchemaCatalog::all_node_types)
.unwrap_or_default()
}
#[must_use]
pub fn all_edge_type_names(&self) -> Vec<String> {
self.schema
.as_ref()
.map(SchemaCatalog::all_edge_types)
.unwrap_or_default()
}
pub fn register_edge_type_def(&self, def: EdgeTypeDefinition) -> Result<(), CatalogError> {
match &self.schema {
Some(schema) => schema.register_edge_type(def),
None => Err(CatalogError::SchemaNotEnabled),
}
}
pub fn register_or_replace_edge_type_def(&self, def: EdgeTypeDefinition) {
if let Some(schema) = &self.schema {
schema.register_or_replace_edge_type(def);
}
}
pub fn drop_edge_type_def(&self, name: &str) -> Result<(), CatalogError> {
match &self.schema {
Some(schema) => schema.drop_edge_type(name),
None => Err(CatalogError::SchemaNotEnabled),
}
}
#[must_use]
pub fn get_edge_type_def(&self, name: &str) -> Option<EdgeTypeDefinition> {
self.schema.as_ref().and_then(|s| s.get_edge_type(name))
}
pub fn register_graph_type(&self, def: GraphTypeDefinition) -> Result<(), CatalogError> {
match &self.schema {
Some(schema) => schema.register_graph_type(def),
None => Err(CatalogError::SchemaNotEnabled),
}
}
pub fn drop_graph_type(&self, name: &str) -> Result<(), CatalogError> {
match &self.schema {
Some(schema) => schema.drop_graph_type(name),
None => Err(CatalogError::SchemaNotEnabled),
}
}
#[must_use]
pub fn all_graph_type_names(&self) -> Vec<String> {
self.schema
.as_ref()
.map(SchemaCatalog::all_graph_types)
.unwrap_or_default()
}
#[must_use]
pub fn get_graph_type_def(&self, name: &str) -> Option<GraphTypeDefinition> {
self.schema.as_ref().and_then(|s| s.get_graph_type(name))
}
pub fn register_schema_namespace(&self, name: String) -> Result<(), CatalogError> {
match &self.schema {
Some(schema) => schema.register_schema(name),
None => Err(CatalogError::SchemaNotEnabled),
}
}
pub fn drop_schema_namespace(&self, name: &str) -> Result<(), CatalogError> {
match &self.schema {
Some(schema) => schema.drop_schema(name),
None => Err(CatalogError::SchemaNotEnabled),
}
}
#[must_use]
pub fn schema_exists(&self, name: &str) -> bool {
self.schema.as_ref().is_some_and(|s| s.schema_exists(name))
}
#[must_use]
pub fn schema_names(&self) -> Vec<String> {
self.schema
.as_ref()
.map(|s| s.schema_names())
.unwrap_or_default()
}
pub fn add_constraint_to_type(
&self,
label: &str,
constraint: TypeConstraint,
) -> Result<(), CatalogError> {
match &self.schema {
Some(schema) => schema.add_constraint_to_type(label, constraint),
None => Err(CatalogError::SchemaNotEnabled),
}
}
pub fn alter_node_type_add_property(
&self,
type_name: &str,
property: TypedProperty,
) -> Result<(), CatalogError> {
match &self.schema {
Some(schema) => schema.alter_node_type_add_property(type_name, property),
None => Err(CatalogError::SchemaNotEnabled),
}
}
pub fn alter_node_type_drop_property(
&self,
type_name: &str,
property_name: &str,
) -> Result<(), CatalogError> {
match &self.schema {
Some(schema) => schema.alter_node_type_drop_property(type_name, property_name),
None => Err(CatalogError::SchemaNotEnabled),
}
}
pub fn alter_edge_type_add_property(
&self,
type_name: &str,
property: TypedProperty,
) -> Result<(), CatalogError> {
match &self.schema {
Some(schema) => schema.alter_edge_type_add_property(type_name, property),
None => Err(CatalogError::SchemaNotEnabled),
}
}
pub fn alter_edge_type_drop_property(
&self,
type_name: &str,
property_name: &str,
) -> Result<(), CatalogError> {
match &self.schema {
Some(schema) => schema.alter_edge_type_drop_property(type_name, property_name),
None => Err(CatalogError::SchemaNotEnabled),
}
}
pub fn alter_graph_type_add_node_type(
&self,
graph_type_name: &str,
node_type: String,
) -> Result<(), CatalogError> {
match &self.schema {
Some(schema) => schema.alter_graph_type_add_node_type(graph_type_name, node_type),
None => Err(CatalogError::SchemaNotEnabled),
}
}
pub fn alter_graph_type_drop_node_type(
&self,
graph_type_name: &str,
node_type: &str,
) -> Result<(), CatalogError> {
match &self.schema {
Some(schema) => schema.alter_graph_type_drop_node_type(graph_type_name, node_type),
None => Err(CatalogError::SchemaNotEnabled),
}
}
pub fn alter_graph_type_add_edge_type(
&self,
graph_type_name: &str,
edge_type: String,
) -> Result<(), CatalogError> {
match &self.schema {
Some(schema) => schema.alter_graph_type_add_edge_type(graph_type_name, edge_type),
None => Err(CatalogError::SchemaNotEnabled),
}
}
pub fn alter_graph_type_drop_edge_type(
&self,
graph_type_name: &str,
edge_type: &str,
) -> Result<(), CatalogError> {
match &self.schema {
Some(schema) => schema.alter_graph_type_drop_edge_type(graph_type_name, edge_type),
None => Err(CatalogError::SchemaNotEnabled),
}
}
pub fn bind_graph_type(
&self,
graph_name: &str,
graph_type: String,
) -> Result<(), CatalogError> {
match &self.schema {
Some(schema) => {
if schema.get_graph_type(&graph_type).is_none() {
return Err(CatalogError::TypeNotFound(graph_type));
}
schema
.graph_type_bindings
.write()
.insert(graph_name.to_string(), graph_type);
Ok(())
}
None => Err(CatalogError::SchemaNotEnabled),
}
}
pub fn get_graph_type_binding(&self, graph_name: &str) -> Option<String> {
self.schema
.as_ref()?
.graph_type_bindings
.read()
.get(graph_name)
.cloned()
}
pub fn register_procedure(&self, def: ProcedureDefinition) -> Result<(), CatalogError> {
match &self.schema {
Some(schema) => schema.register_procedure(def),
None => Err(CatalogError::SchemaNotEnabled),
}
}
pub fn replace_procedure(&self, def: ProcedureDefinition) -> Result<(), CatalogError> {
match &self.schema {
Some(schema) => {
schema.replace_procedure(def);
Ok(())
}
None => Err(CatalogError::SchemaNotEnabled),
}
}
pub fn drop_procedure(&self, name: &str) -> Result<(), CatalogError> {
match &self.schema {
Some(schema) => schema.drop_procedure(name),
None => Err(CatalogError::SchemaNotEnabled),
}
}
pub fn get_procedure(&self, name: &str) -> Option<ProcedureDefinition> {
self.schema.as_ref()?.get_procedure(name)
}
#[must_use]
pub fn all_node_type_defs(&self) -> Vec<NodeTypeDefinition> {
self.schema
.as_ref()
.map(SchemaCatalog::all_node_type_defs)
.unwrap_or_default()
}
#[must_use]
pub fn all_edge_type_defs(&self) -> Vec<EdgeTypeDefinition> {
self.schema
.as_ref()
.map(SchemaCatalog::all_edge_type_defs)
.unwrap_or_default()
}
#[must_use]
pub fn all_graph_type_defs(&self) -> Vec<GraphTypeDefinition> {
self.schema
.as_ref()
.map(SchemaCatalog::all_graph_type_defs)
.unwrap_or_default()
}
#[must_use]
pub fn all_procedure_defs(&self) -> Vec<ProcedureDefinition> {
self.schema
.as_ref()
.map(SchemaCatalog::all_procedure_defs)
.unwrap_or_default()
}
#[must_use]
pub fn all_graph_type_bindings(&self) -> Vec<(String, String)> {
self.schema
.as_ref()
.map(SchemaCatalog::all_graph_type_bindings)
.unwrap_or_default()
}
}
impl Default for Catalog {
fn default() -> Self {
Self::new()
}
}
struct LabelCatalog {
name_to_id: GrafeoConcurrentMap<Arc<str>, LabelId>,
id_to_name: RwLock<Vec<Arc<str>>>,
next_id: AtomicU32,
create_lock: Mutex<()>,
}
impl LabelCatalog {
fn new() -> Self {
Self {
name_to_id: grafeo_concurrent_map(),
id_to_name: RwLock::new(Vec::new()),
next_id: AtomicU32::new(0),
create_lock: Mutex::new(()),
}
}
fn get_or_create(&self, name: &str) -> LabelId {
if let Some(id) = self.name_to_id.get(name) {
return *id;
}
let _guard = self.create_lock.lock();
if let Some(id) = self.name_to_id.get(name) {
return *id;
}
let id = LabelId::new(self.next_id.fetch_add(1, Ordering::Relaxed));
let name: Arc<str> = name.into();
self.id_to_name.write().push(Arc::clone(&name));
self.name_to_id.insert(name, id);
id
}
fn get_id(&self, name: &str) -> Option<LabelId> {
self.name_to_id.get(name).map(|r| *r)
}
fn get_name(&self, id: LabelId) -> Option<Arc<str>> {
self.id_to_name.read().get(id.as_u32() as usize).cloned()
}
fn count(&self) -> usize {
self.id_to_name.read().len()
}
fn all_names(&self) -> Vec<Arc<str>> {
self.id_to_name.read().clone()
}
}
struct PropertyCatalog {
name_to_id: GrafeoConcurrentMap<Arc<str>, PropertyKeyId>,
id_to_name: RwLock<Vec<Arc<str>>>,
next_id: AtomicU32,
create_lock: Mutex<()>,
}
impl PropertyCatalog {
fn new() -> Self {
Self {
name_to_id: grafeo_concurrent_map(),
id_to_name: RwLock::new(Vec::new()),
next_id: AtomicU32::new(0),
create_lock: Mutex::new(()),
}
}
fn get_or_create(&self, name: &str) -> PropertyKeyId {
if let Some(id) = self.name_to_id.get(name) {
return *id;
}
let _guard = self.create_lock.lock();
if let Some(id) = self.name_to_id.get(name) {
return *id;
}
let id = PropertyKeyId::new(self.next_id.fetch_add(1, Ordering::Relaxed));
let name: Arc<str> = name.into();
self.id_to_name.write().push(Arc::clone(&name));
self.name_to_id.insert(name, id);
id
}
fn get_id(&self, name: &str) -> Option<PropertyKeyId> {
self.name_to_id.get(name).map(|r| *r)
}
fn get_name(&self, id: PropertyKeyId) -> Option<Arc<str>> {
self.id_to_name.read().get(id.as_u32() as usize).cloned()
}
fn count(&self) -> usize {
self.id_to_name.read().len()
}
fn all_names(&self) -> Vec<Arc<str>> {
self.id_to_name.read().clone()
}
}
struct EdgeTypeCatalog {
name_to_id: GrafeoConcurrentMap<Arc<str>, EdgeTypeId>,
id_to_name: RwLock<Vec<Arc<str>>>,
next_id: AtomicU32,
create_lock: Mutex<()>,
}
impl EdgeTypeCatalog {
fn new() -> Self {
Self {
name_to_id: grafeo_concurrent_map(),
id_to_name: RwLock::new(Vec::new()),
next_id: AtomicU32::new(0),
create_lock: Mutex::new(()),
}
}
fn get_or_create(&self, name: &str) -> EdgeTypeId {
if let Some(id) = self.name_to_id.get(name) {
return *id;
}
let _guard = self.create_lock.lock();
if let Some(id) = self.name_to_id.get(name) {
return *id;
}
let id = EdgeTypeId::new(self.next_id.fetch_add(1, Ordering::Relaxed));
let name: Arc<str> = name.into();
self.id_to_name.write().push(Arc::clone(&name));
self.name_to_id.insert(name, id);
id
}
fn get_id(&self, name: &str) -> Option<EdgeTypeId> {
self.name_to_id.get(name).map(|r| *r)
}
fn get_name(&self, id: EdgeTypeId) -> Option<Arc<str>> {
self.id_to_name.read().get(id.as_u32() as usize).cloned()
}
fn count(&self) -> usize {
self.id_to_name.read().len()
}
fn all_names(&self) -> Vec<Arc<str>> {
self.id_to_name.read().clone()
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum IndexType {
Hash,
BTree,
FullText,
}
#[derive(Debug, Clone)]
pub struct IndexDefinition {
pub id: IndexId,
pub name: String,
pub label: LabelId,
pub property_key: PropertyKeyId,
pub index_type: IndexType,
}
struct IndexCatalog {
indexes: RwLock<HashMap<IndexId, IndexDefinition>>,
label_indexes: RwLock<HashMap<LabelId, Vec<IndexId>>>,
label_property_indexes: RwLock<HashMap<(LabelId, PropertyKeyId), Vec<IndexId>>>,
name_index: RwLock<HashMap<String, IndexId>>,
next_id: AtomicU32,
}
impl IndexCatalog {
fn new() -> Self {
Self {
indexes: RwLock::new(HashMap::new()),
label_indexes: RwLock::new(HashMap::new()),
label_property_indexes: RwLock::new(HashMap::new()),
name_index: RwLock::new(HashMap::new()),
next_id: AtomicU32::new(0),
}
}
fn create(
&self,
name: &str,
label: LabelId,
property_key: PropertyKeyId,
index_type: IndexType,
) -> IndexId {
let id = IndexId::new(self.next_id.fetch_add(1, Ordering::Relaxed));
let definition = IndexDefinition {
id,
name: name.to_string(),
label,
property_key,
index_type,
};
let mut indexes = self.indexes.write();
let mut label_indexes = self.label_indexes.write();
let mut label_property_indexes = self.label_property_indexes.write();
indexes.insert(id, definition);
label_indexes.entry(label).or_default().push(id);
label_property_indexes
.entry((label, property_key))
.or_default()
.push(id);
self.name_index.write().insert(name.to_string(), id);
id
}
fn drop(&self, id: IndexId) -> bool {
let mut indexes = self.indexes.write();
let mut label_indexes = self.label_indexes.write();
let mut label_property_indexes = self.label_property_indexes.write();
if let Some(definition) = indexes.remove(&id) {
if let Some(ids) = label_indexes.get_mut(&definition.label) {
ids.retain(|&i| i != id);
}
if let Some(ids) =
label_property_indexes.get_mut(&(definition.label, definition.property_key))
{
ids.retain(|&i| i != id);
}
self.name_index.write().remove(&definition.name);
true
} else {
false
}
}
fn find_by_name(&self, name: &str) -> Option<IndexId> {
self.name_index.read().get(name).copied()
}
fn get(&self, id: IndexId) -> Option<IndexDefinition> {
self.indexes.read().get(&id).cloned()
}
fn for_label(&self, label: LabelId) -> Vec<IndexId> {
self.label_indexes
.read()
.get(&label)
.cloned()
.unwrap_or_default()
}
fn for_label_property(&self, label: LabelId, property_key: PropertyKeyId) -> Vec<IndexId> {
self.label_property_indexes
.read()
.get(&(label, property_key))
.cloned()
.unwrap_or_default()
}
fn count(&self) -> usize {
self.indexes.read().len()
}
fn all(&self) -> Vec<IndexDefinition> {
self.indexes.read().values().cloned().collect()
}
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub enum PropertyDataType {
String,
Int64,
Float64,
Bool,
Date,
Time,
Timestamp,
Duration,
List,
ListTyped(Box<PropertyDataType>),
Map,
Bytes,
Node,
Edge,
Any,
}
impl PropertyDataType {
#[must_use]
pub fn from_type_name(name: &str) -> Self {
let upper = name.to_uppercase();
if let Some(inner) = upper
.strip_prefix("LIST<")
.and_then(|s| s.strip_suffix('>'))
{
return Self::ListTyped(Box::new(Self::from_type_name(inner)));
}
match upper.as_str() {
"STRING" | "VARCHAR" | "TEXT" => Self::String,
"INT" | "INT64" | "INTEGER" | "BIGINT" => Self::Int64,
"FLOAT" | "FLOAT64" | "DOUBLE" | "REAL" => Self::Float64,
"BOOL" | "BOOLEAN" => Self::Bool,
"DATE" => Self::Date,
"TIME" => Self::Time,
"TIMESTAMP" | "DATETIME" => Self::Timestamp,
"DURATION" | "INTERVAL" => Self::Duration,
"LIST" | "ARRAY" => Self::List,
"MAP" | "RECORD" => Self::Map,
"BYTES" | "BINARY" | "BLOB" => Self::Bytes,
"NODE" => Self::Node,
"EDGE" | "RELATIONSHIP" => Self::Edge,
_ => Self::Any,
}
}
#[must_use]
pub fn matches(&self, value: &Value) -> bool {
match (self, value) {
(Self::Any, _) | (_, Value::Null) => true,
(Self::String, Value::String(_)) => true,
(Self::Int64, Value::Int64(_)) => true,
(Self::Float64, Value::Float64(_)) => true,
(Self::Bool, Value::Bool(_)) => true,
(Self::Date, Value::Date(_)) => true,
(Self::Time, Value::Time(_)) => true,
(Self::Timestamp, Value::Timestamp(_)) => true,
(Self::Duration, Value::Duration(_)) => true,
(Self::List, Value::List(_)) => true,
(Self::ListTyped(elem_type), Value::List(items)) => {
items.iter().all(|item| elem_type.matches(item))
}
(Self::Bytes, Value::Bytes(_)) => true,
(Self::Node | Self::Edge, Value::Map(_)) => true,
_ => false,
}
}
}
impl std::fmt::Display for PropertyDataType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::String => write!(f, "STRING"),
Self::Int64 => write!(f, "INT64"),
Self::Float64 => write!(f, "FLOAT64"),
Self::Bool => write!(f, "BOOLEAN"),
Self::Date => write!(f, "DATE"),
Self::Time => write!(f, "TIME"),
Self::Timestamp => write!(f, "TIMESTAMP"),
Self::Duration => write!(f, "DURATION"),
Self::List => write!(f, "LIST"),
Self::ListTyped(elem) => write!(f, "LIST<{elem}>"),
Self::Map => write!(f, "MAP"),
Self::Bytes => write!(f, "BYTES"),
Self::Node => write!(f, "NODE"),
Self::Edge => write!(f, "EDGE"),
Self::Any => write!(f, "ANY"),
}
}
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct TypedProperty {
pub name: String,
pub data_type: PropertyDataType,
pub nullable: bool,
pub default_value: Option<Value>,
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub enum TypeConstraint {
PrimaryKey(Vec<String>),
Unique(Vec<String>),
NotNull(String),
Check {
name: Option<String>,
expression: String,
},
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct NodeTypeDefinition {
pub name: String,
pub properties: Vec<TypedProperty>,
pub constraints: Vec<TypeConstraint>,
pub parent_types: Vec<String>,
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct EdgeTypeDefinition {
pub name: String,
pub properties: Vec<TypedProperty>,
pub constraints: Vec<TypeConstraint>,
pub source_node_types: Vec<String>,
pub target_node_types: Vec<String>,
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct GraphTypeDefinition {
pub name: String,
pub allowed_node_types: Vec<String>,
pub allowed_edge_types: Vec<String>,
pub open: bool,
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct ProcedureDefinition {
pub name: String,
pub params: Vec<(String, String)>,
pub returns: Vec<(String, String)>,
pub body: String,
}
pub struct SchemaCatalog {
unique_constraints: RwLock<HashSet<(LabelId, PropertyKeyId)>>,
required_properties: RwLock<HashSet<(LabelId, PropertyKeyId)>>,
node_types: RwLock<HashMap<String, NodeTypeDefinition>>,
edge_types: RwLock<HashMap<String, EdgeTypeDefinition>>,
graph_types: RwLock<HashMap<String, GraphTypeDefinition>>,
schemas: RwLock<Vec<String>>,
graph_type_bindings: RwLock<HashMap<String, String>>,
procedures: RwLock<HashMap<String, ProcedureDefinition>>,
}
impl SchemaCatalog {
fn new() -> Self {
Self {
unique_constraints: RwLock::new(HashSet::new()),
required_properties: RwLock::new(HashSet::new()),
node_types: RwLock::new(HashMap::new()),
edge_types: RwLock::new(HashMap::new()),
graph_types: RwLock::new(HashMap::new()),
schemas: RwLock::new(Vec::new()),
graph_type_bindings: RwLock::new(HashMap::new()),
procedures: RwLock::new(HashMap::new()),
}
}
pub fn register_node_type(&self, def: NodeTypeDefinition) -> Result<(), CatalogError> {
let mut types = self.node_types.write();
if types.contains_key(&def.name) {
return Err(CatalogError::TypeAlreadyExists(def.name));
}
types.insert(def.name.clone(), def);
Ok(())
}
pub fn register_or_replace_node_type(&self, def: NodeTypeDefinition) {
self.node_types.write().insert(def.name.clone(), def);
}
pub fn drop_node_type(&self, name: &str) -> Result<(), CatalogError> {
let mut types = self.node_types.write();
if types.remove(name).is_none() {
return Err(CatalogError::TypeNotFound(name.to_string()));
}
Ok(())
}
#[must_use]
pub fn get_node_type(&self, name: &str) -> Option<NodeTypeDefinition> {
self.node_types.read().get(name).cloned()
}
#[must_use]
pub fn resolved_node_type(&self, name: &str) -> Option<NodeTypeDefinition> {
let types = self.node_types.read();
let base = types.get(name)?;
if base.parent_types.is_empty() {
return Some(base.clone());
}
let mut visited = HashSet::new();
visited.insert(name.to_string());
let mut all_properties = Vec::new();
let mut all_constraints = Vec::new();
Self::collect_inherited(
&types,
name,
&mut visited,
&mut all_properties,
&mut all_constraints,
);
Some(NodeTypeDefinition {
name: base.name.clone(),
properties: all_properties,
constraints: all_constraints,
parent_types: base.parent_types.clone(),
})
}
fn collect_inherited(
types: &HashMap<String, NodeTypeDefinition>,
name: &str,
visited: &mut HashSet<String>,
properties: &mut Vec<TypedProperty>,
constraints: &mut Vec<TypeConstraint>,
) {
let Some(def) = types.get(name) else { return };
for parent in &def.parent_types {
if visited.insert(parent.clone()) {
Self::collect_inherited(types, parent, visited, properties, constraints);
}
}
for prop in &def.properties {
if let Some(pos) = properties.iter().position(|p| p.name == prop.name) {
properties[pos] = prop.clone();
} else {
properties.push(prop.clone());
}
}
constraints.extend(def.constraints.iter().cloned());
}
#[must_use]
pub fn all_node_types(&self) -> Vec<String> {
self.node_types.read().keys().cloned().collect()
}
#[must_use]
pub fn all_node_type_defs(&self) -> Vec<NodeTypeDefinition> {
self.node_types.read().values().cloned().collect()
}
pub fn register_edge_type(&self, def: EdgeTypeDefinition) -> Result<(), CatalogError> {
let mut types = self.edge_types.write();
if types.contains_key(&def.name) {
return Err(CatalogError::TypeAlreadyExists(def.name));
}
types.insert(def.name.clone(), def);
Ok(())
}
pub fn register_or_replace_edge_type(&self, def: EdgeTypeDefinition) {
self.edge_types.write().insert(def.name.clone(), def);
}
pub fn drop_edge_type(&self, name: &str) -> Result<(), CatalogError> {
let mut types = self.edge_types.write();
if types.remove(name).is_none() {
return Err(CatalogError::TypeNotFound(name.to_string()));
}
Ok(())
}
#[must_use]
pub fn get_edge_type(&self, name: &str) -> Option<EdgeTypeDefinition> {
self.edge_types.read().get(name).cloned()
}
#[must_use]
pub fn all_edge_types(&self) -> Vec<String> {
self.edge_types.read().keys().cloned().collect()
}
#[must_use]
pub fn all_edge_type_defs(&self) -> Vec<EdgeTypeDefinition> {
self.edge_types.read().values().cloned().collect()
}
pub fn register_graph_type(&self, def: GraphTypeDefinition) -> Result<(), CatalogError> {
let mut types = self.graph_types.write();
if types.contains_key(&def.name) {
return Err(CatalogError::TypeAlreadyExists(def.name));
}
types.insert(def.name.clone(), def);
Ok(())
}
pub fn drop_graph_type(&self, name: &str) -> Result<(), CatalogError> {
let mut types = self.graph_types.write();
if types.remove(name).is_none() {
return Err(CatalogError::TypeNotFound(name.to_string()));
}
Ok(())
}
#[must_use]
pub fn get_graph_type(&self, name: &str) -> Option<GraphTypeDefinition> {
self.graph_types.read().get(name).cloned()
}
#[must_use]
pub fn all_graph_types(&self) -> Vec<String> {
self.graph_types.read().keys().cloned().collect()
}
#[must_use]
pub fn all_graph_type_defs(&self) -> Vec<GraphTypeDefinition> {
self.graph_types.read().values().cloned().collect()
}
pub fn register_schema(&self, name: String) -> Result<(), CatalogError> {
let mut schemas = self.schemas.write();
if schemas.contains(&name) {
return Err(CatalogError::SchemaAlreadyExists(name));
}
schemas.push(name);
Ok(())
}
pub fn drop_schema(&self, name: &str) -> Result<(), CatalogError> {
let mut schemas = self.schemas.write();
if let Some(pos) = schemas.iter().position(|s| s == name) {
schemas.remove(pos);
Ok(())
} else {
Err(CatalogError::SchemaNotFound(name.to_string()))
}
}
#[must_use]
pub fn schema_exists(&self, name: &str) -> bool {
self.schemas
.read()
.iter()
.any(|s| s.eq_ignore_ascii_case(name))
}
#[must_use]
pub fn schema_names(&self) -> Vec<String> {
self.schemas.read().clone()
}
pub fn add_constraint_to_type(
&self,
label: &str,
constraint: TypeConstraint,
) -> Result<(), CatalogError> {
let mut types = self.node_types.write();
if let Some(def) = types.get_mut(label) {
def.constraints.push(constraint);
} else {
types.insert(
label.to_string(),
NodeTypeDefinition {
name: label.to_string(),
properties: Vec::new(),
constraints: vec![constraint],
parent_types: Vec::new(),
},
);
}
Ok(())
}
pub fn alter_node_type_add_property(
&self,
type_name: &str,
property: TypedProperty,
) -> Result<(), CatalogError> {
let mut types = self.node_types.write();
let def = types
.get_mut(type_name)
.ok_or_else(|| CatalogError::TypeNotFound(type_name.to_string()))?;
if def.properties.iter().any(|p| p.name == property.name) {
return Err(CatalogError::TypeAlreadyExists(format!(
"property {} on {}",
property.name, type_name
)));
}
def.properties.push(property);
Ok(())
}
pub fn alter_node_type_drop_property(
&self,
type_name: &str,
property_name: &str,
) -> Result<(), CatalogError> {
let mut types = self.node_types.write();
let def = types
.get_mut(type_name)
.ok_or_else(|| CatalogError::TypeNotFound(type_name.to_string()))?;
let len_before = def.properties.len();
def.properties.retain(|p| p.name != property_name);
if def.properties.len() == len_before {
return Err(CatalogError::TypeNotFound(format!(
"property {} on {}",
property_name, type_name
)));
}
Ok(())
}
pub fn alter_edge_type_add_property(
&self,
type_name: &str,
property: TypedProperty,
) -> Result<(), CatalogError> {
let mut types = self.edge_types.write();
let def = types
.get_mut(type_name)
.ok_or_else(|| CatalogError::TypeNotFound(type_name.to_string()))?;
if def.properties.iter().any(|p| p.name == property.name) {
return Err(CatalogError::TypeAlreadyExists(format!(
"property {} on {}",
property.name, type_name
)));
}
def.properties.push(property);
Ok(())
}
pub fn alter_edge_type_drop_property(
&self,
type_name: &str,
property_name: &str,
) -> Result<(), CatalogError> {
let mut types = self.edge_types.write();
let def = types
.get_mut(type_name)
.ok_or_else(|| CatalogError::TypeNotFound(type_name.to_string()))?;
let len_before = def.properties.len();
def.properties.retain(|p| p.name != property_name);
if def.properties.len() == len_before {
return Err(CatalogError::TypeNotFound(format!(
"property {} on {}",
property_name, type_name
)));
}
Ok(())
}
pub fn alter_graph_type_add_node_type(
&self,
graph_type_name: &str,
node_type: String,
) -> Result<(), CatalogError> {
let mut types = self.graph_types.write();
let def = types
.get_mut(graph_type_name)
.ok_or_else(|| CatalogError::TypeNotFound(graph_type_name.to_string()))?;
if !def.allowed_node_types.contains(&node_type) {
def.allowed_node_types.push(node_type);
}
Ok(())
}
pub fn alter_graph_type_drop_node_type(
&self,
graph_type_name: &str,
node_type: &str,
) -> Result<(), CatalogError> {
let mut types = self.graph_types.write();
let def = types
.get_mut(graph_type_name)
.ok_or_else(|| CatalogError::TypeNotFound(graph_type_name.to_string()))?;
def.allowed_node_types.retain(|t| t != node_type);
Ok(())
}
pub fn alter_graph_type_add_edge_type(
&self,
graph_type_name: &str,
edge_type: String,
) -> Result<(), CatalogError> {
let mut types = self.graph_types.write();
let def = types
.get_mut(graph_type_name)
.ok_or_else(|| CatalogError::TypeNotFound(graph_type_name.to_string()))?;
if !def.allowed_edge_types.contains(&edge_type) {
def.allowed_edge_types.push(edge_type);
}
Ok(())
}
pub fn alter_graph_type_drop_edge_type(
&self,
graph_type_name: &str,
edge_type: &str,
) -> Result<(), CatalogError> {
let mut types = self.graph_types.write();
let def = types
.get_mut(graph_type_name)
.ok_or_else(|| CatalogError::TypeNotFound(graph_type_name.to_string()))?;
def.allowed_edge_types.retain(|t| t != edge_type);
Ok(())
}
pub fn register_procedure(&self, def: ProcedureDefinition) -> Result<(), CatalogError> {
let mut procs = self.procedures.write();
if procs.contains_key(&def.name) {
return Err(CatalogError::TypeAlreadyExists(def.name.clone()));
}
procs.insert(def.name.clone(), def);
Ok(())
}
pub fn replace_procedure(&self, def: ProcedureDefinition) {
self.procedures.write().insert(def.name.clone(), def);
}
pub fn drop_procedure(&self, name: &str) -> Result<(), CatalogError> {
let mut procs = self.procedures.write();
if procs.remove(name).is_none() {
return Err(CatalogError::TypeNotFound(name.to_string()));
}
Ok(())
}
pub fn get_procedure(&self, name: &str) -> Option<ProcedureDefinition> {
self.procedures.read().get(name).cloned()
}
#[must_use]
pub fn all_procedure_defs(&self) -> Vec<ProcedureDefinition> {
self.procedures.read().values().cloned().collect()
}
#[must_use]
pub fn all_graph_type_bindings(&self) -> Vec<(String, String)> {
self.graph_type_bindings
.read()
.iter()
.map(|(k, v)| (k.clone(), v.clone()))
.collect()
}
fn add_unique_constraint(
&self,
label: LabelId,
property_key: PropertyKeyId,
) -> Result<(), CatalogError> {
let mut constraints = self.unique_constraints.write();
let key = (label, property_key);
if !constraints.insert(key) {
return Err(CatalogError::ConstraintAlreadyExists);
}
Ok(())
}
fn add_required_property(
&self,
label: LabelId,
property_key: PropertyKeyId,
) -> Result<(), CatalogError> {
let mut required = self.required_properties.write();
let key = (label, property_key);
if !required.insert(key) {
return Err(CatalogError::ConstraintAlreadyExists);
}
Ok(())
}
fn is_property_required(&self, label: LabelId, property_key: PropertyKeyId) -> bool {
self.required_properties
.read()
.contains(&(label, property_key))
}
fn is_property_unique(&self, label: LabelId, property_key: PropertyKeyId) -> bool {
self.unique_constraints
.read()
.contains(&(label, property_key))
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum CatalogError {
SchemaNotEnabled,
ConstraintAlreadyExists,
LabelNotFound(String),
PropertyKeyNotFound(String),
EdgeTypeNotFound(String),
IndexNotFound(IndexId),
TypeAlreadyExists(String),
TypeNotFound(String),
SchemaAlreadyExists(String),
SchemaNotFound(String),
}
impl std::fmt::Display for CatalogError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::SchemaNotEnabled => write!(f, "Schema constraints are not enabled"),
Self::ConstraintAlreadyExists => write!(f, "Constraint already exists"),
Self::LabelNotFound(name) => write!(f, "Label not found: {name}"),
Self::PropertyKeyNotFound(name) => write!(f, "Property key not found: {name}"),
Self::EdgeTypeNotFound(name) => write!(f, "Edge type not found: {name}"),
Self::IndexNotFound(id) => write!(f, "Index not found: {id}"),
Self::TypeAlreadyExists(name) => write!(f, "Type already exists: {name}"),
Self::TypeNotFound(name) => write!(f, "Type not found: {name}"),
Self::SchemaAlreadyExists(name) => write!(f, "Schema already exists: {name}"),
Self::SchemaNotFound(name) => write!(f, "Schema not found: {name}"),
}
}
}
impl std::error::Error for CatalogError {}
use grafeo_core::execution::operators::ConstraintValidator;
use grafeo_core::execution::operators::OperatorError;
pub struct CatalogConstraintValidator {
catalog: Arc<Catalog>,
graph_name: Option<String>,
store: Option<Arc<dyn grafeo_core::graph::GraphStore>>,
}
impl CatalogConstraintValidator {
pub fn new(catalog: Arc<Catalog>) -> Self {
Self {
catalog,
graph_name: None,
store: None,
}
}
pub fn with_graph_name(mut self, name: String) -> Self {
self.graph_name = Some(name);
self
}
pub fn with_store(mut self, store: Arc<dyn grafeo_core::graph::GraphStore>) -> Self {
self.store = Some(store);
self
}
}
impl ConstraintValidator for CatalogConstraintValidator {
fn validate_node_property(
&self,
labels: &[String],
key: &str,
value: &Value,
) -> Result<(), OperatorError> {
for label in labels {
if let Some(type_def) = self.catalog.resolved_node_type(label)
&& let Some(typed_prop) = type_def.properties.iter().find(|p| p.name == key)
{
if !typed_prop.nullable && *value == Value::Null {
return Err(OperatorError::ConstraintViolation(format!(
"property '{key}' on :{label} is NOT NULL, cannot set to null"
)));
}
if *value != Value::Null && !typed_prop.data_type.matches(value) {
return Err(OperatorError::ConstraintViolation(format!(
"property '{key}' on :{label} expects {:?}, got {:?}",
typed_prop.data_type, value
)));
}
}
}
Ok(())
}
fn validate_node_complete(
&self,
labels: &[String],
properties: &[(String, Value)],
) -> Result<(), OperatorError> {
let prop_names: std::collections::HashSet<&str> =
properties.iter().map(|(n, _)| n.as_str()).collect();
for label in labels {
if let Some(type_def) = self.catalog.resolved_node_type(label) {
for typed_prop in &type_def.properties {
if !typed_prop.nullable
&& typed_prop.default_value.is_none()
&& !prop_names.contains(typed_prop.name.as_str())
{
return Err(OperatorError::ConstraintViolation(format!(
"missing required property '{}' on :{label}",
typed_prop.name
)));
}
}
for constraint in &type_def.constraints {
match constraint {
TypeConstraint::NotNull(prop_name) => {
if !prop_names.contains(prop_name.as_str()) {
return Err(OperatorError::ConstraintViolation(format!(
"missing required property '{prop_name}' on :{label} (NOT NULL constraint)"
)));
}
}
TypeConstraint::PrimaryKey(key_props) => {
for pk in key_props {
if !prop_names.contains(pk.as_str()) {
return Err(OperatorError::ConstraintViolation(format!(
"missing primary key property '{pk}' on :{label}"
)));
}
}
}
TypeConstraint::Check { name, expression } => {
match check_eval::evaluate_check(expression, properties) {
Ok(true) => {}
Ok(false) => {
let constraint_name = name.as_deref().unwrap_or("unnamed");
return Err(OperatorError::ConstraintViolation(format!(
"CHECK constraint '{constraint_name}' violated on :{label}"
)));
}
Err(err) => {
return Err(OperatorError::ConstraintViolation(format!(
"CHECK constraint evaluation error: {err}"
)));
}
}
}
TypeConstraint::Unique(_) => {}
}
}
}
}
Ok(())
}
fn check_unique_node_property(
&self,
labels: &[String],
key: &str,
value: &Value,
) -> Result<(), OperatorError> {
if *value == Value::Null {
return Ok(());
}
for label in labels {
if let Some(type_def) = self.catalog.resolved_node_type(label) {
for constraint in &type_def.constraints {
let is_unique = match constraint {
TypeConstraint::Unique(props) => props.iter().any(|p| p == key),
TypeConstraint::PrimaryKey(props) => props.iter().any(|p| p == key),
_ => false,
};
if is_unique && let Some(ref store) = self.store {
let existing = store.find_nodes_by_property(key, value);
for node_id in existing {
if let Some(node) = store.get_node(node_id) {
let has_label = node.labels.iter().any(|l| l.as_str() == label);
if has_label {
return Err(OperatorError::ConstraintViolation(format!(
"UNIQUE constraint violation: property '{key}' \
with value {value:?} already exists on :{label}"
)));
}
}
}
}
}
}
}
Ok(())
}
fn validate_edge_property(
&self,
edge_type: &str,
key: &str,
value: &Value,
) -> Result<(), OperatorError> {
if let Some(type_def) = self.catalog.get_edge_type_def(edge_type)
&& let Some(typed_prop) = type_def.properties.iter().find(|p| p.name == key)
{
if !typed_prop.nullable && *value == Value::Null {
return Err(OperatorError::ConstraintViolation(format!(
"property '{key}' on :{edge_type} is NOT NULL, cannot set to null"
)));
}
if *value != Value::Null && !typed_prop.data_type.matches(value) {
return Err(OperatorError::ConstraintViolation(format!(
"property '{key}' on :{edge_type} expects {:?}, got {:?}",
typed_prop.data_type, value
)));
}
}
Ok(())
}
fn validate_edge_complete(
&self,
edge_type: &str,
properties: &[(String, Value)],
) -> Result<(), OperatorError> {
if let Some(type_def) = self.catalog.get_edge_type_def(edge_type) {
let prop_names: std::collections::HashSet<&str> =
properties.iter().map(|(n, _)| n.as_str()).collect();
for typed_prop in &type_def.properties {
if !typed_prop.nullable
&& typed_prop.default_value.is_none()
&& !prop_names.contains(typed_prop.name.as_str())
{
return Err(OperatorError::ConstraintViolation(format!(
"missing required property '{}' on :{edge_type}",
typed_prop.name
)));
}
}
for constraint in &type_def.constraints {
if let TypeConstraint::Check { name, expression } = constraint {
match check_eval::evaluate_check(expression, properties) {
Ok(true) => {}
Ok(false) => {
let constraint_name = name.as_deref().unwrap_or("unnamed");
return Err(OperatorError::ConstraintViolation(format!(
"CHECK constraint '{constraint_name}' violated on :{edge_type}"
)));
}
Err(err) => {
return Err(OperatorError::ConstraintViolation(format!(
"CHECK constraint evaluation error: {err}"
)));
}
}
}
}
}
Ok(())
}
fn validate_node_labels_allowed(&self, labels: &[String]) -> Result<(), OperatorError> {
let Some(ref graph_name) = self.graph_name else {
return Ok(());
};
let Some(type_name) = self.catalog.get_graph_type_binding(graph_name) else {
return Ok(());
};
let Some(gt) = self
.catalog
.schema()
.and_then(|s| s.get_graph_type(&type_name))
else {
return Ok(());
};
if !gt.open && !gt.allowed_node_types.is_empty() {
let allowed = labels
.iter()
.any(|l| gt.allowed_node_types.iter().any(|a| a == l));
if !allowed {
return Err(OperatorError::ConstraintViolation(format!(
"node labels {labels:?} are not allowed by graph type '{}'",
gt.name
)));
}
}
Ok(())
}
fn validate_edge_type_allowed(&self, edge_type: &str) -> Result<(), OperatorError> {
let Some(ref graph_name) = self.graph_name else {
return Ok(());
};
let Some(type_name) = self.catalog.get_graph_type_binding(graph_name) else {
return Ok(());
};
let Some(gt) = self
.catalog
.schema()
.and_then(|s| s.get_graph_type(&type_name))
else {
return Ok(());
};
if !gt.open && !gt.allowed_edge_types.is_empty() {
let allowed = gt.allowed_edge_types.iter().any(|a| a == edge_type);
if !allowed {
return Err(OperatorError::ConstraintViolation(format!(
"edge type '{edge_type}' is not allowed by graph type '{}'",
gt.name
)));
}
}
Ok(())
}
fn validate_edge_endpoints(
&self,
edge_type: &str,
source_labels: &[String],
target_labels: &[String],
) -> Result<(), OperatorError> {
let Some(type_def) = self.catalog.get_edge_type_def(edge_type) else {
return Ok(());
};
if !type_def.source_node_types.is_empty() {
let source_ok = source_labels
.iter()
.any(|l| type_def.source_node_types.iter().any(|s| s == l));
if !source_ok {
return Err(OperatorError::ConstraintViolation(format!(
"source node labels {source_labels:?} are not allowed for edge type '{edge_type}', \
expected one of {:?}",
type_def.source_node_types
)));
}
}
if !type_def.target_node_types.is_empty() {
let target_ok = target_labels
.iter()
.any(|l| type_def.target_node_types.iter().any(|t| t == l));
if !target_ok {
return Err(OperatorError::ConstraintViolation(format!(
"target node labels {target_labels:?} are not allowed for edge type '{edge_type}', \
expected one of {:?}",
type_def.target_node_types
)));
}
}
Ok(())
}
fn inject_defaults(&self, labels: &[String], properties: &mut Vec<(String, Value)>) {
for label in labels {
if let Some(type_def) = self.catalog.resolved_node_type(label) {
for typed_prop in &type_def.properties {
if let Some(ref default) = typed_prop.default_value {
let already_set = properties.iter().any(|(n, _)| n == &typed_prop.name);
if !already_set {
properties.push((typed_prop.name.clone(), default.clone()));
}
}
}
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::thread;
#[test]
fn test_catalog_labels() {
let catalog = Catalog::new();
let person_id = catalog.get_or_create_label("Person");
let company_id = catalog.get_or_create_label("Company");
assert_ne!(person_id, company_id);
assert_eq!(catalog.get_or_create_label("Person"), person_id);
assert_eq!(catalog.get_label_id("Person"), Some(person_id));
assert_eq!(catalog.get_label_id("Company"), Some(company_id));
assert_eq!(catalog.get_label_id("Unknown"), None);
assert_eq!(catalog.get_label_name(person_id).as_deref(), Some("Person"));
assert_eq!(
catalog.get_label_name(company_id).as_deref(),
Some("Company")
);
assert_eq!(catalog.label_count(), 2);
}
#[test]
fn test_catalog_property_keys() {
let catalog = Catalog::new();
let name_id = catalog.get_or_create_property_key("name");
let age_id = catalog.get_or_create_property_key("age");
assert_ne!(name_id, age_id);
assert_eq!(catalog.get_or_create_property_key("name"), name_id);
assert_eq!(catalog.get_property_key_id("name"), Some(name_id));
assert_eq!(
catalog.get_property_key_name(name_id).as_deref(),
Some("name")
);
assert_eq!(catalog.property_key_count(), 2);
}
#[test]
fn test_catalog_edge_types() {
let catalog = Catalog::new();
let knows_id = catalog.get_or_create_edge_type("KNOWS");
let works_at_id = catalog.get_or_create_edge_type("WORKS_AT");
assert_ne!(knows_id, works_at_id);
assert_eq!(catalog.get_or_create_edge_type("KNOWS"), knows_id);
assert_eq!(catalog.get_edge_type_id("KNOWS"), Some(knows_id));
assert_eq!(
catalog.get_edge_type_name(knows_id).as_deref(),
Some("KNOWS")
);
assert_eq!(catalog.edge_type_count(), 2);
}
#[test]
fn test_catalog_indexes() {
let catalog = Catalog::new();
let person_id = catalog.get_or_create_label("Person");
let name_id = catalog.get_or_create_property_key("name");
let age_id = catalog.get_or_create_property_key("age");
let idx1 = catalog.create_index("idx_person_name", person_id, name_id, IndexType::Hash);
let idx2 = catalog.create_index("idx_person_age", person_id, age_id, IndexType::BTree);
assert_ne!(idx1, idx2);
assert_eq!(catalog.index_count(), 2);
let label_indexes = catalog.indexes_for_label(person_id);
assert_eq!(label_indexes.len(), 2);
assert!(label_indexes.contains(&idx1));
assert!(label_indexes.contains(&idx2));
let name_indexes = catalog.indexes_for_label_property(person_id, name_id);
assert_eq!(name_indexes.len(), 1);
assert_eq!(name_indexes[0], idx1);
let def = catalog.get_index(idx1).unwrap();
assert_eq!(def.label, person_id);
assert_eq!(def.property_key, name_id);
assert_eq!(def.index_type, IndexType::Hash);
assert!(catalog.drop_index(idx1));
assert_eq!(catalog.index_count(), 1);
assert!(catalog.get_index(idx1).is_none());
assert_eq!(catalog.indexes_for_label(person_id).len(), 1);
}
#[test]
fn test_catalog_schema_constraints() {
let catalog = Catalog::with_schema();
let person_id = catalog.get_or_create_label("Person");
let email_id = catalog.get_or_create_property_key("email");
let name_id = catalog.get_or_create_property_key("name");
assert!(catalog.add_unique_constraint(person_id, email_id).is_ok());
assert!(catalog.add_required_property(person_id, name_id).is_ok());
assert!(catalog.is_property_unique(person_id, email_id));
assert!(!catalog.is_property_unique(person_id, name_id));
assert!(catalog.is_property_required(person_id, name_id));
assert!(!catalog.is_property_required(person_id, email_id));
assert_eq!(
catalog.add_unique_constraint(person_id, email_id),
Err(CatalogError::ConstraintAlreadyExists)
);
}
#[test]
fn test_catalog_schema_always_enabled() {
let catalog = Catalog::new();
assert!(catalog.has_schema());
let person_id = catalog.get_or_create_label("Person");
let email_id = catalog.get_or_create_property_key("email");
assert_eq!(catalog.add_unique_constraint(person_id, email_id), Ok(()));
}
#[test]
fn test_catalog_default() {
let catalog = Catalog::default();
assert!(catalog.has_schema());
assert_eq!(catalog.label_count(), 0);
assert_eq!(catalog.property_key_count(), 0);
assert_eq!(catalog.edge_type_count(), 0);
assert_eq!(catalog.index_count(), 0);
}
#[test]
fn test_catalog_all_labels() {
let catalog = Catalog::new();
catalog.get_or_create_label("Person");
catalog.get_or_create_label("Company");
catalog.get_or_create_label("Product");
let all = catalog.all_labels();
assert_eq!(all.len(), 3);
assert!(all.iter().any(|l| l.as_ref() == "Person"));
assert!(all.iter().any(|l| l.as_ref() == "Company"));
assert!(all.iter().any(|l| l.as_ref() == "Product"));
}
#[test]
fn test_catalog_all_property_keys() {
let catalog = Catalog::new();
catalog.get_or_create_property_key("name");
catalog.get_or_create_property_key("age");
catalog.get_or_create_property_key("email");
let all = catalog.all_property_keys();
assert_eq!(all.len(), 3);
assert!(all.iter().any(|k| k.as_ref() == "name"));
assert!(all.iter().any(|k| k.as_ref() == "age"));
assert!(all.iter().any(|k| k.as_ref() == "email"));
}
#[test]
fn test_catalog_all_edge_types() {
let catalog = Catalog::new();
catalog.get_or_create_edge_type("KNOWS");
catalog.get_or_create_edge_type("WORKS_AT");
catalog.get_or_create_edge_type("LIVES_IN");
let all = catalog.all_edge_types();
assert_eq!(all.len(), 3);
assert!(all.iter().any(|t| t.as_ref() == "KNOWS"));
assert!(all.iter().any(|t| t.as_ref() == "WORKS_AT"));
assert!(all.iter().any(|t| t.as_ref() == "LIVES_IN"));
}
#[test]
fn test_catalog_invalid_id_lookup() {
let catalog = Catalog::new();
let _ = catalog.get_or_create_label("Person");
let invalid_label = LabelId::new(999);
let invalid_property = PropertyKeyId::new(999);
let invalid_edge_type = EdgeTypeId::new(999);
let invalid_index = IndexId::new(999);
assert!(catalog.get_label_name(invalid_label).is_none());
assert!(catalog.get_property_key_name(invalid_property).is_none());
assert!(catalog.get_edge_type_name(invalid_edge_type).is_none());
assert!(catalog.get_index(invalid_index).is_none());
}
#[test]
fn test_catalog_drop_nonexistent_index() {
let catalog = Catalog::new();
let invalid_index = IndexId::new(999);
assert!(!catalog.drop_index(invalid_index));
}
#[test]
fn test_catalog_indexes_for_nonexistent_label() {
let catalog = Catalog::new();
let invalid_label = LabelId::new(999);
let invalid_property = PropertyKeyId::new(999);
assert!(catalog.indexes_for_label(invalid_label).is_empty());
assert!(
catalog
.indexes_for_label_property(invalid_label, invalid_property)
.is_empty()
);
}
#[test]
fn test_catalog_multiple_indexes_same_property() {
let catalog = Catalog::new();
let person_id = catalog.get_or_create_label("Person");
let name_id = catalog.get_or_create_property_key("name");
let hash_idx = catalog.create_index("idx_hash", person_id, name_id, IndexType::Hash);
let btree_idx = catalog.create_index("idx_btree", person_id, name_id, IndexType::BTree);
let fulltext_idx =
catalog.create_index("idx_fulltext", person_id, name_id, IndexType::FullText);
assert_eq!(catalog.index_count(), 3);
let indexes = catalog.indexes_for_label_property(person_id, name_id);
assert_eq!(indexes.len(), 3);
assert!(indexes.contains(&hash_idx));
assert!(indexes.contains(&btree_idx));
assert!(indexes.contains(&fulltext_idx));
assert_eq!(
catalog.get_index(hash_idx).unwrap().index_type,
IndexType::Hash
);
assert_eq!(
catalog.get_index(btree_idx).unwrap().index_type,
IndexType::BTree
);
assert_eq!(
catalog.get_index(fulltext_idx).unwrap().index_type,
IndexType::FullText
);
}
#[test]
fn test_catalog_schema_required_property_duplicate() {
let catalog = Catalog::with_schema();
let person_id = catalog.get_or_create_label("Person");
let name_id = catalog.get_or_create_property_key("name");
assert!(catalog.add_required_property(person_id, name_id).is_ok());
assert_eq!(
catalog.add_required_property(person_id, name_id),
Err(CatalogError::ConstraintAlreadyExists)
);
}
#[test]
fn test_catalog_schema_check_without_constraints() {
let catalog = Catalog::new();
let person_id = catalog.get_or_create_label("Person");
let name_id = catalog.get_or_create_property_key("name");
assert!(!catalog.is_property_unique(person_id, name_id));
assert!(!catalog.is_property_required(person_id, name_id));
}
#[test]
fn test_catalog_has_schema() {
let catalog = Catalog::new();
assert!(catalog.has_schema());
let with_schema = Catalog::with_schema();
assert!(with_schema.has_schema());
}
#[test]
fn test_catalog_error_display() {
assert_eq!(
CatalogError::SchemaNotEnabled.to_string(),
"Schema constraints are not enabled"
);
assert_eq!(
CatalogError::ConstraintAlreadyExists.to_string(),
"Constraint already exists"
);
assert_eq!(
CatalogError::LabelNotFound("Person".to_string()).to_string(),
"Label not found: Person"
);
assert_eq!(
CatalogError::PropertyKeyNotFound("name".to_string()).to_string(),
"Property key not found: name"
);
assert_eq!(
CatalogError::EdgeTypeNotFound("KNOWS".to_string()).to_string(),
"Edge type not found: KNOWS"
);
let idx = IndexId::new(42);
assert!(CatalogError::IndexNotFound(idx).to_string().contains("42"));
}
#[test]
fn test_catalog_concurrent_label_creation() {
use std::sync::Arc;
let catalog = Arc::new(Catalog::new());
let mut handles = vec![];
for i in 0..10 {
let catalog = Arc::clone(&catalog);
handles.push(thread::spawn(move || {
let label_name = format!("Label{}", i % 3); catalog.get_or_create_label(&label_name)
}));
}
let mut ids: Vec<LabelId> = handles.into_iter().map(|h| h.join().unwrap()).collect();
ids.sort_by_key(|id| id.as_u32());
ids.dedup();
assert_eq!(ids.len(), 3);
assert_eq!(catalog.label_count(), 3);
}
#[test]
fn test_catalog_concurrent_property_key_creation() {
use std::sync::Arc;
let catalog = Arc::new(Catalog::new());
let mut handles = vec![];
for i in 0..10 {
let catalog = Arc::clone(&catalog);
handles.push(thread::spawn(move || {
let key_name = format!("key{}", i % 4);
catalog.get_or_create_property_key(&key_name)
}));
}
let mut ids: Vec<PropertyKeyId> = handles.into_iter().map(|h| h.join().unwrap()).collect();
ids.sort_by_key(|id| id.as_u32());
ids.dedup();
assert_eq!(ids.len(), 4);
assert_eq!(catalog.property_key_count(), 4);
}
#[test]
fn test_catalog_concurrent_index_operations() {
use std::sync::Arc;
let catalog = Arc::new(Catalog::new());
let label = catalog.get_or_create_label("Node");
let mut handles = vec![];
for i in 0..5 {
let catalog = Arc::clone(&catalog);
handles.push(thread::spawn(move || {
let prop = PropertyKeyId::new(i);
catalog.create_index(&format!("idx_{i}"), label, prop, IndexType::Hash)
}));
}
let ids: Vec<IndexId> = handles.into_iter().map(|h| h.join().unwrap()).collect();
assert_eq!(ids.len(), 5);
assert_eq!(catalog.index_count(), 5);
}
#[test]
fn test_catalog_special_characters_in_names() {
let catalog = Catalog::new();
let label1 = catalog.get_or_create_label("Label With Spaces");
let label2 = catalog.get_or_create_label("Label-With-Dashes");
let label3 = catalog.get_or_create_label("Label_With_Underscores");
let label4 = catalog.get_or_create_label("LabelWithUnicode\u{00E9}");
assert_ne!(label1, label2);
assert_ne!(label2, label3);
assert_ne!(label3, label4);
assert_eq!(
catalog.get_label_name(label1).as_deref(),
Some("Label With Spaces")
);
assert_eq!(
catalog.get_label_name(label4).as_deref(),
Some("LabelWithUnicode\u{00E9}")
);
}
#[test]
fn test_catalog_empty_names() {
let catalog = Catalog::new();
let empty_label = catalog.get_or_create_label("");
let empty_prop = catalog.get_or_create_property_key("");
let empty_edge = catalog.get_or_create_edge_type("");
assert_eq!(catalog.get_label_name(empty_label).as_deref(), Some(""));
assert_eq!(
catalog.get_property_key_name(empty_prop).as_deref(),
Some("")
);
assert_eq!(catalog.get_edge_type_name(empty_edge).as_deref(), Some(""));
assert_eq!(catalog.get_or_create_label(""), empty_label);
}
#[test]
fn test_catalog_large_number_of_entries() {
let catalog = Catalog::new();
for i in 0..1000 {
catalog.get_or_create_label(&format!("Label{}", i));
}
assert_eq!(catalog.label_count(), 1000);
let all = catalog.all_labels();
assert_eq!(all.len(), 1000);
let id = catalog.get_label_id("Label500").unwrap();
assert_eq!(catalog.get_label_name(id).as_deref(), Some("Label500"));
}
#[test]
fn test_index_definition_debug() {
let def = IndexDefinition {
id: IndexId::new(1),
name: "test_index".to_string(),
label: LabelId::new(2),
property_key: PropertyKeyId::new(3),
index_type: IndexType::Hash,
};
let debug_str = format!("{:?}", def);
assert!(debug_str.contains("IndexDefinition"));
assert!(debug_str.contains("Hash"));
}
#[test]
fn test_index_type_equality() {
assert_eq!(IndexType::Hash, IndexType::Hash);
assert_ne!(IndexType::Hash, IndexType::BTree);
assert_ne!(IndexType::BTree, IndexType::FullText);
let t = IndexType::Hash;
let t2 = t;
assert_eq!(t, t2);
}
#[test]
fn test_catalog_error_equality() {
assert_eq!(
CatalogError::SchemaNotEnabled,
CatalogError::SchemaNotEnabled
);
assert_eq!(
CatalogError::ConstraintAlreadyExists,
CatalogError::ConstraintAlreadyExists
);
assert_eq!(
CatalogError::LabelNotFound("X".to_string()),
CatalogError::LabelNotFound("X".to_string())
);
assert_ne!(
CatalogError::LabelNotFound("X".to_string()),
CatalogError::LabelNotFound("Y".to_string())
);
}
}