use crate::shape::Shape::{WShape, WShapeComposite, WShapeLiteral, WShapeRef};
use crate::shape::{Shape, ShapeIterator, Validate};
use polars::prelude::*;
use pregel_rs::graph_frame::GraphFrame;
use pregel_rs::pregel::{Column, MessageReceiver, PregelBuilder};
pub struct PSchema {
start: Shape,
}
impl PSchema {
pub fn new(start: Shape) -> PSchema {
Self { start }
}
pub fn validate(&self, graph: GraphFrame) -> PolarsResult<DataFrame> {
if graph.edges.schema().get_field("src").is_none() {
return Err(PolarsError::SchemaFieldNotFound("src".into()));
} else if graph.edges.column("src").unwrap().len() == 0 {
return Err(PolarsError::NoData("src".into()));
}
if graph.edges.schema().get_field("dst").is_none() {
return Err(PolarsError::SchemaFieldNotFound("dst".into()));
} else if graph.edges.column("dst").unwrap().len() == 0 {
return Err(PolarsError::NoData("dst".into()));
}
if graph.edges.schema().get_field("property_id").is_none() {
return Err(PolarsError::SchemaFieldNotFound("property_id".into()));
} else if graph.edges.column("property_id").unwrap().len() == 0 {
return Err(PolarsError::NoData("property_id".into()));
}
if graph.edges.schema().get_field("dtype").is_none() {
return Err(PolarsError::SchemaFieldNotFound("dtype".into()));
} else if graph.edges.column("dtype").unwrap().len() == 0 {
return Err(PolarsError::NoData("dtype".into()));
}
let max_iterations = self.start.clone().iter().count() as u8; let tree_send_messages = self.start.clone(); let mut send_messages_iter = tree_send_messages.iter(); let tree_v_prog = self.start.clone(); let mut v_prog_iter = tree_v_prog.iter(); v_prog_iter.next(); let pregel = PregelBuilder::new(graph)
.max_iterations(if max_iterations > 1 {
max_iterations - 1
} else {
1
})
.with_vertex_column(Column::Custom("labels"))
.initial_message(Self::initial_message())
.send_messages_function(MessageReceiver::Src, || {
Self::send_messages(send_messages_iter.by_ref())
})
.aggregate_messages_function(Self::aggregate_messages)
.v_prog_function(|| Self::v_prog(v_prog_iter.by_ref()))
.build();
match pregel.run() {
Ok(result) => result
.lazy()
.select(&[
col(Column::Id.as_ref()),
col(Column::Custom("labels").as_ref()),
])
.filter(col("labels").is_not_null())
.with_common_subplan_elimination(false)
.with_streaming(true)
.collect(),
Err(error) => Err(error),
}
}
fn initial_message() -> Expr {
lit(NULL)
}
fn send_messages(iterator: &mut ShapeIterator) -> Expr {
let mut ans = lit(NULL);
if let Some(nodes) = iterator.next() {
for node in nodes {
ans = match node {
WShape(shape) => shape.validate(ans),
WShapeRef(shape) => shape.validate(ans),
WShapeLiteral(shape) => shape.validate(ans),
WShapeComposite(_) => ans,
}
}
}
ans
}
fn aggregate_messages() -> Expr {
Column::msg(None).filter(Column::msg(None).is_not_null())
}
fn v_prog(iterator: &mut ShapeIterator) -> Expr {
let mut ans = Column::msg(None);
if let Some(nodes) = iterator.next() {
for node in nodes {
if let WShapeComposite(shape) = node {
ans = shape.validate(ans);
}
}
}
ans
}
}
#[cfg(test)]
mod tests {
use crate::pschema::tests::TestEntity::*;
use crate::pschema::PSchema;
use crate::shape::{Shape, WShapeLiteral};
use crate::shape::{WShape, WShapeComposite};
use polars::df;
use polars::prelude::*;
use pregel_rs::graph_frame::GraphFrame;
use pregel_rs::pregel::Column;
use wikidata_rs::dtype::DataType;
use wikidata_rs::id::Id;
enum TestEntity {
Human,
TimBernersLee,
VintCerf,
InstanceOf,
CERN,
Award,
Spain,
Country,
Employer,
BirthPlace,
BirthDate,
London,
AwardReceived,
UnitedKingdom,
}
impl TestEntity {
fn id(&self) -> u32 {
let id = match self {
Human => Id::from("Q5"),
TimBernersLee => Id::from("Q80"),
VintCerf => Id::from("Q92743"),
InstanceOf => Id::from("P31"),
CERN => Id::from("Q42944"),
Award => Id::from("Q3320352"),
Spain => Id::from("Q29"),
Country => Id::from("P17"),
Employer => Id::from("P108"),
BirthPlace => Id::from("P19"),
BirthDate => Id::from("P569"),
London => Id::from("Q84"),
AwardReceived => Id::from("P166"),
UnitedKingdom => Id::from("Q145"),
};
u32::from(id)
}
}
fn paper_graph() -> Result<GraphFrame, String> {
let edges = match df![
Column::Src.as_ref() => [
TimBernersLee,
TimBernersLee,
London,
TimBernersLee,
TimBernersLee,
Award,
VintCerf,
CERN,
TimBernersLee,
]
.iter()
.map(TestEntity::id)
.collect::<Vec<_>>(),
Column::Custom("property_id").as_ref() => [
InstanceOf,
BirthPlace,
Country,
Employer,
AwardReceived,
Country,
InstanceOf,
AwardReceived,
BirthDate,
]
.iter()
.map(TestEntity::id)
.collect::<Vec<_>>(),
Column::Dst.as_ref() => [
Human,
London,
UnitedKingdom,
CERN,
Award,
Spain,
Human,
Award,
TimBernersLee,
]
.iter()
.map(TestEntity::id)
.collect::<Vec<_>>(),
Column::Custom("dtype").as_ref() => [
DataType::Entity,
DataType::Entity,
DataType::Entity,
DataType::Entity,
DataType::Entity,
DataType::Entity,
DataType::Entity,
DataType::Entity,
DataType::DateTime,
]
.iter()
.map(u8::from)
.collect::<Vec<_>>(),
] {
Ok(edges) => edges,
Err(_) => return Err(String::from("Error creating the edges DataFrame")),
};
match GraphFrame::from_edges(edges) {
Ok(graph) => Ok(graph),
Err(_) => Err(String::from("Error creating the GraphFrame from edges")),
}
}
fn simple_schema() -> Shape {
Shape::WShape(WShape::new(1, InstanceOf.id(), Human.id()))
}
fn paper_schema() -> Shape {
WShapeComposite::new(
1,
vec![
WShape::new(2, InstanceOf.id(), Human.id()).into(),
WShape::new(3, BirthPlace.id(), London.id()).into(),
WShapeLiteral::new(4, BirthDate.id(), DataType::DateTime).into(),
],
)
.into()
}
fn test(expected: DataFrame, actual: DataFrame) -> Result<(), String> {
let count = actual
.lazy()
.sort("id", Default::default())
.select([col("labels").arr().lengths()])
.collect()
.unwrap();
match count == expected {
true => Ok(()),
false => return Err(String::from("The DataFrames are not equals")),
}
}
#[test]
fn simple_test() -> Result<(), String> {
let graph = match paper_graph() {
Ok(graph) => graph,
Err(error) => return Err(error),
};
let expected = match DataFrame::new(vec![Series::new("labels", [1u32, 1u32])]) {
Ok(expected) => expected,
Err(_) => return Err(String::from("Error creating the expected DataFrame")),
};
match PSchema::new(simple_schema()).validate(graph) {
Ok(actual) => test(expected, actual),
Err(error) => Err(error.to_string()),
}
}
#[test]
fn paper_test() -> Result<(), String> {
let graph = match paper_graph() {
Ok(graph) => graph,
Err(error) => return Err(error),
};
let expected = match DataFrame::new(vec![Series::new("labels", [4u32, 1u32])]) {
Ok(expected) => expected,
Err(_) => return Err(String::from("Error creating the expected DataFrame")),
};
match PSchema::new(paper_schema()).validate(graph) {
Ok(actual) => test(expected, actual),
Err(error) => Err(error.to_string()),
}
}
#[test]
fn invalid_graph() -> Result<(), String> {
let edges = match df![
Column::Src.as_ref() => [
TimBernersLee,
TimBernersLee,
London,
TimBernersLee,
TimBernersLee,
Award,
VintCerf,
CERN,
TimBernersLee,
]
.iter()
.map(TestEntity::id)
.collect::<Vec<_>>(),
Column::Dst.as_ref() => [
Human,
London,
UnitedKingdom,
CERN,
Award,
Spain,
Human,
Award,
TimBernersLee,
]
.iter()
.map(TestEntity::id)
.collect::<Vec<_>>(),
] {
Ok(edges) => edges,
Err(_) => return Err(String::from("Error creating the edges DataFrame")),
};
let graph = match GraphFrame::from_edges(edges) {
Ok(graph) => graph,
Err(_) => return Err(String::from("Error creating the GraphFrame from edges")),
};
let schema = simple_schema();
match PSchema::new(schema).validate(graph) {
Ok(_) => Err(String::from("An error should have occurred")),
Err(_) => Ok(()),
}
}
#[test]
fn empty_graph() -> Result<(), String> {
let vertices = match df![
Column::Id.as_ref() => Series::default(),
] {
Ok(vertices) => vertices,
Err(_) => return Err(String::from("Error creating the vertices DataFrame")),
};
let edges = match df![
Column::Src.as_ref() => Series::default(),
Column::Custom("property_id").as_ref() => Series::default(),
Column::Dst.as_ref() => Series::default(),
Column::Custom("dtype").as_ref() => Series::default(),
] {
Ok(edges) => edges,
Err(_) => return Err(String::from("Error creating the edges DataFrame")),
};
let graph = match GraphFrame::new(vertices, edges) {
Ok(graph) => graph,
Err(_) => return Err(String::from("Error creating the GraphFrame from edges")),
};
let schema = simple_schema();
match PSchema::new(schema).validate(graph) {
Ok(_) => Err(String::from("An error should have occurred")),
Err(_) => Ok(()),
}
}
}