use std::time::Instant;
use falkordb::{AsyncGraph, FalkorDBError, FalkorValue};
use futures::stream::{self, StreamExt};
use serde::{Deserialize, Serialize};
#[cfg(feature = "server")]
use utoipa::ToSchema;
use crate::schema::{
attribute::{Attribute, AttributeType},
entity::Entity,
relation::Relation,
};
#[derive(Debug, Clone, Serialize, Deserialize)]
#[cfg_attr(feature = "server", derive(ToSchema))]
pub struct Schema {
pub entities: Vec<Entity>,
pub relations: Vec<Relation>,
}
impl std::fmt::Display for Schema {
fn fmt(
&self,
f: &mut std::fmt::Formatter<'_>,
) -> std::fmt::Result {
write!(
f,
"Schema with {} entities and {} relations",
self.entities.len(),
self.relations.len()
)
}
}
impl Schema {
const fn empty() -> Self {
Self {
entities: Vec::new(),
relations: Vec::new(),
}
}
pub fn add_entity(
&mut self,
entity: Entity,
) {
self.entities.push(entity);
}
pub fn add_relation(
&mut self,
relation: Relation,
) {
self.relations.push(relation);
}
async fn collect_entity_attributes(
graph: &mut AsyncGraph,
label: &str,
sample_size: usize,
) -> Result<Vec<Attribute>, FalkorDBError> {
let query = format!(
r"
MATCH (a:{label})
CALL {{
WITH a
RETURN [k IN keys(a) | [k, typeof(a[k])]] AS types
}}
WITH types
LIMIT {sample_size}
UNWIND types AS kt
RETURN kt, count(1)
ORDER BY kt[0]
"
);
let mut attributes = Self::collect_attributes(graph, label, &query).await?;
Self::collect_example_values(graph, label, &mut attributes, sample_size).await?;
Ok(attributes)
}
async fn collect_relationship_attributes(
graph: &mut AsyncGraph,
label: &str,
sample_size: usize,
) -> Result<Vec<Attribute>, FalkorDBError> {
let query = format!(
r"
MATCH ()-[a:{label}]->()
CALL {{
WITH a
RETURN [k IN keys(a) | [k, typeof(a[k])]] AS types
}}
WITH types
LIMIT {sample_size}
UNWIND types AS kt
RETURN kt, count(1)
ORDER BY kt[0]
"
);
Self::collect_attributes(graph, label, &query).await
}
async fn collect_attributes(
graph: &mut AsyncGraph,
label: &str,
query: &str,
) -> Result<Vec<Attribute>, FalkorDBError> {
tracing::info!("Collecting attributes for label '{}': {}", label, query);
let entity_attributes = graph.ro_query(query).execute().await?;
let mut attributes = Vec::new();
for record in entity_attributes.data {
if let (Some(FalkorValue::Array(kt_array)), Some(FalkorValue::I64(count))) = (record.first(), record.get(1))
{
if kt_array.len() >= 2
&& let (Some(FalkorValue::String(key_name)), Some(FalkorValue::String(type_name))) =
(kt_array.first(), kt_array.get(1))
{
tracing::info!("Found attribute: key={}, type={}, count={}", key_name, type_name, count);
let attr_type = type_name.parse::<AttributeType>().unwrap_or_else(|_| {
tracing::warn!("Unknown attribute type '{}', defaulting to String", type_name);
AttributeType::String
});
attributes.push(Attribute::new(key_name.clone(), attr_type, *count, false, false));
}
}
}
Ok(attributes)
}
#[allow(clippy::cognitive_complexity)]
async fn collect_example_values(
graph: &mut AsyncGraph,
label: &str,
attributes: &mut [Attribute],
sample_size: usize,
) -> Result<(), FalkorDBError> {
if !Self::is_valid_identifier(label) {
tracing::warn!("Skipping example collection for invalid label: {}", label);
return Ok(());
}
let max_examples = 3.min(sample_size);
for attribute in attributes {
if !Self::is_valid_property_name(&attribute.name) {
tracing::warn!(
"Skipping example collection for attribute '{}' - potentially unsafe characters",
attribute.name
);
continue;
}
let escaped_name = Self::escape_property_name(&attribute.name);
let escaped_label = Self::escape_property_name(label);
let query = format!(
r"MATCH (n:{escaped_label})
WHERE n.{escaped_name} IS NOT NULL
RETURN DISTINCT toString(n.{escaped_name}) AS value
LIMIT {max_examples}"
);
match graph.ro_query(&query).execute().await {
Ok(result) => {
let mut examples = Vec::new();
for record in result.data {
if let Some(FalkorValue::String(value)) = record.first() {
examples.push(value.clone());
}
}
if !examples.is_empty() {
attribute.examples = Some(examples);
tracing::debug!(
"Collected {} examples for {}.{}: {:?}",
attribute.examples.as_ref().map_or(0, std::vec::Vec::len),
label,
attribute.name,
attribute.examples
);
}
}
Err(e) => {
tracing::warn!("Failed to collect examples for {}.{}: {}", label, attribute.name, e);
}
}
}
Ok(())
}
fn is_valid_identifier(name: &str) -> bool {
if name.is_empty() {
return false;
}
let mut chars = name.chars();
if let Some(first) = chars.next()
&& !first.is_alphabetic()
&& first != '_'
{
return false;
}
chars.all(|c| c.is_alphanumeric() || c == '_')
}
fn is_valid_property_name(name: &str) -> bool {
if name.is_empty() {
return false;
}
let allowed = name.chars().all(|c| c.is_alphanumeric() || c == '_' || c == '.');
let no_sql_comments = !name.contains("--") && !name.contains("/*") && !name.contains("*/");
let no_cypher_comment = !name.contains("//");
let no_semicolon = !name.contains(';');
let no_backtick = !name.contains('`');
let no_union = !name.to_ascii_lowercase().contains("union");
allowed && no_sql_comments && no_cypher_comment && no_semicolon && no_backtick && no_union
}
fn escape_property_name(name: &str) -> String {
let escaped = name.replace('`', "``");
format!("`{escaped}`")
}
async fn get_entity_labels(graph: &mut AsyncGraph) -> Result<Vec<String>, FalkorDBError> {
let labels_result = graph.ro_query("CALL db.labels()").execute().await?;
let mut entity_labels = Vec::new();
for record in labels_result.data {
if let Some(FalkorValue::String(label)) = record.first() {
entity_labels.push(label.clone());
}
}
Ok(entity_labels)
}
async fn get_relationship_labels(graph: &mut AsyncGraph) -> Result<Vec<String>, FalkorDBError> {
let relations_result = graph.ro_query("CALL db.relationshipTypes()").execute().await?;
let mut relationship_labels = Vec::new();
for record in relations_result.data {
if let Some(FalkorValue::String(relation_label)) = record.first() {
relationship_labels.push(relation_label.clone());
}
}
Ok(relationship_labels)
}
async fn get_relationship_attributes(
graph: &AsyncGraph,
relationship_labels: &[String],
sample_size: usize,
) -> Result<Vec<(String, Vec<Attribute>)>, FalkorDBError> {
let relationship_attributes = Self::collect_attributes_parallel(
graph,
relationship_labels.to_vec(),
sample_size,
|mut graph, relationship_label, sample_size| async move {
Self::collect_relationship_attributes(&mut graph, &relationship_label, sample_size)
.await
.map(|attributes| (relationship_label, attributes))
.ok()
},
)
.await;
Ok(relationship_attributes)
}
async fn collect_attributes_parallel<T, F, Fut>(
graph: &AsyncGraph,
labels: Vec<String>,
sample_size: usize,
collector: F,
) -> Vec<T>
where
F: Fn(AsyncGraph, String, usize) -> Fut + Send + Sync + Clone + 'static,
Fut: std::future::Future<Output = Option<T>> + Send + 'static,
T: Send + 'static,
{
stream::iter(labels)
.map(move |label| {
let graph = graph.clone();
let collector = collector.clone();
async move { collector(graph, label, sample_size).await }
})
.buffer_unordered(usize::MAX)
.filter_map(|result| async move { result })
.collect()
.await
}
pub async fn discover_from_graph(
graph: &mut AsyncGraph,
sample_size: usize,
) -> Result<Self, FalkorDBError> {
let mut schema: Self = Self::empty();
let entity_labels = Self::get_entity_labels(graph).await?;
let entities = Self::collect_attributes_parallel(
graph,
entity_labels,
sample_size,
|mut graph, label, sample_size| async move {
Self::collect_entity_attributes(&mut graph, &label, sample_size)
.await
.map(|attributes| Entity::new(label, attributes, None))
.ok()
},
)
.await;
for entity in entities {
schema.add_entity(entity);
}
let relationship_labels = Self::get_relationship_labels(graph).await?;
let relationship_attributes =
Self::get_relationship_attributes(graph, &relationship_labels, sample_size).await?;
let entities = schema.entities.clone();
let start = Instant::now();
let queries = process_relationships(graph, &mut schema, relationship_attributes, entities).await?;
let duration = start.elapsed();
tracing::info!("Processed relationships ({} queries) in {:?}", queries, duration);
Ok(schema)
}
}
async fn process_relationships(
graph: &AsyncGraph,
schema: &mut Schema,
relationship_attributes: Vec<(String, Vec<Attribute>)>,
entities: Vec<Entity>,
) -> Result<usize, FalkorDBError> {
let mut queries = Vec::new();
for (label, attributes) in &relationship_attributes {
for source_entity in &entities {
for target_entity in &entities {
queries.push((
label.clone(),
source_entity.label.clone(),
target_entity.label.clone(),
attributes.clone(),
));
}
}
}
let ret = queries.len();
let relations: Vec<Relation> = stream::iter(queries)
.map(|(label, source_label, target_label, attributes)| {
let mut graph = graph.clone();
async move {
let query = format!("MATCH (s:{source_label})-[a:{label}]->(t:{target_label}) return a limit 1");
match graph.ro_query(&query).execute().await {
Ok(query_result) if !query_result.data.is_empty() => {
Some(Relation::new(label, source_label, target_label, attributes))
}
Ok(_) => None,
Err(e) => {
tracing::warn!("Query failed but ignored: {:?}", e);
None
}
}
}
})
.buffer_unordered(1000)
.filter_map(|result| async move { result })
.collect()
.await;
for relation in relations {
schema.add_relation(relation);
}
Ok(ret)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_valid_identifier() {
assert!(Schema::is_valid_identifier("Person"));
assert!(Schema::is_valid_identifier("_Person"));
assert!(Schema::is_valid_identifier("Person123"));
assert!(Schema::is_valid_identifier("_person_123"));
assert!(Schema::is_valid_identifier("PERSON"));
assert!(!Schema::is_valid_identifier(""));
assert!(!Schema::is_valid_identifier("123Person"));
assert!(!Schema::is_valid_identifier("Person-Name"));
assert!(!Schema::is_valid_identifier("Person Name"));
assert!(!Schema::is_valid_identifier("Person;DROP"));
assert!(!Schema::is_valid_identifier("Person'"));
assert!(!Schema::is_valid_identifier("Person\""));
}
#[test]
fn test_valid_property_name() {
assert!(Schema::is_valid_property_name("name"));
assert!(Schema::is_valid_property_name("firstName"));
assert!(Schema::is_valid_property_name("first_name"));
assert!(Schema::is_valid_property_name("name123"));
assert!(Schema::is_valid_property_name("person.name")); assert!(Schema::is_valid_property_name("_name"));
assert!(!Schema::is_valid_property_name(""));
assert!(!Schema::is_valid_property_name("name;DROP"));
assert!(!Schema::is_valid_property_name("name--comment"));
assert!(!Schema::is_valid_property_name("name/*comment*/"));
assert!(!Schema::is_valid_property_name("name'"));
assert!(!Schema::is_valid_property_name("name\""));
assert!(!Schema::is_valid_property_name("name;"));
}
#[test]
fn test_escape_property_name() {
assert_eq!(Schema::escape_property_name("name"), "`name`");
assert_eq!(Schema::escape_property_name("firstName"), "`firstName`");
assert_eq!(Schema::escape_property_name("first_name"), "`first_name`");
assert_eq!(Schema::escape_property_name("na`me"), "`na``me`");
assert_eq!(Schema::escape_property_name("`test`"), "```test```");
assert_eq!(Schema::escape_property_name(""), "``");
}
}