use anyhow::{Result, anyhow};
use datafusion::arrow::datatypes::DataType;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
pub trait SchemaValidation {
fn validate_component(&self, component: &PipelineComponent) -> Result<()>;
fn validate_field_type(
&self,
field_name: &str,
field_type: &DataType,
nullable: bool,
) -> Result<()>;
fn validate_nullable_constraints(
&self,
field_name: &str,
nullable: bool,
default_value: Option<&FieldValue>,
) -> Result<()>;
fn generate_error_report(&self, errors: &[ValidationError]) -> ValidationReport;
}
pub trait YamlSerializable {
fn to_yaml(&self) -> Result<String>;
fn from_yaml(yaml_str: &str) -> Result<Self>
where
Self: Sized;
}
impl YamlSerializable for PipelineComponent {
fn to_yaml(&self) -> Result<String> {
serde_yaml::to_string(self)
.map_err(|e| anyhow!("Failed to serialize component to YAML: {}", e))
}
fn from_yaml(yaml_str: &str) -> Result<Self>
where
Self: Sized,
{
let component: PipelineComponent = serde_yaml::from_str(yaml_str)
.map_err(|e| anyhow!("Failed to deserialize YAML: {}", e))?;
Ok(component)
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum PipelineComponent {
Metadata(ComponentMetadata),
Query(QueryDefinition),
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ComponentMetadata {
pub name: String,
pub version: String,
pub description: Option<String>,
pub created_at: Option<String>,
pub updated_at: Option<String>,
}
#[derive(Debug, Clone)]
pub struct InferredFieldType {
pub field_type: DataType,
pub nullable: bool,
pub source_location: String,
}
#[derive(Debug, Clone)]
pub struct RequestSchema {
pub fields: HashMap<String, InferredFieldType>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct QueryDefinition {
pub sql: String,
pub parameters: Vec<String>,
}
#[derive(Debug, Clone)]
pub struct ResponseSchema {
pub fields: HashMap<String, InferredFieldType>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FieldDefinition {
pub field_type: String, pub nullable: bool,
pub default_value: Option<FieldValue>,
pub description: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum FieldValue {
String(String),
Integer(i64),
Float(f64),
Boolean(bool),
Null,
}
#[derive(Debug, Clone)]
pub struct ValidationError {
pub field_path: String,
pub error_type: ValidationErrorType,
pub message: String,
pub suggestion: Option<String>,
}
#[derive(Debug, Clone)]
pub enum ValidationErrorType {
TypeMismatch,
NullabilityViolation,
InvalidValue,
MissingRequired,
InvalidFormat,
}
#[derive(Debug, Clone)]
pub struct ValidationReport {
pub errors: Vec<ValidationError>,
pub warnings: Vec<String>,
pub summary: String,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_inferred_field_type_creation() {
let field = InferredFieldType {
field_type: DataType::Int64,
nullable: false,
source_location: "test_location".to_string(),
};
assert_eq!(field.field_type, DataType::Int64);
assert_eq!(field.nullable, false);
assert_eq!(field.source_location, "test_location");
}
#[test]
fn test_to_yaml_metadata_component() {
let metadata = ComponentMetadata {
name: "test_pipeline".to_string(),
version: "1.0.0".to_string(),
description: Some("Test pipeline description".to_string()),
created_at: Some("2025-01-15T10:00:00.000+00:00".to_string()),
updated_at: Some("2025-01-15T10:00:00.000+00:00".to_string()),
};
let component = PipelineComponent::Metadata(metadata);
let yaml_result = component.to_yaml();
assert!(yaml_result.is_ok());
let yaml_output = yaml_result.unwrap();
assert!(yaml_output.contains("name: test_pipeline"));
assert!(yaml_output.contains("version: 1.0.0"));
assert!(yaml_output.contains("description: Test pipeline description"));
assert!(yaml_output.contains("created_at: 2025-01-15T10:00:00.000+00:00"));
assert!(yaml_output.contains("updated_at: 2025-01-15T10:00:00.000+00:00"));
}
#[test]
fn test_to_yaml_query_component() {
let query = QueryDefinition {
sql: "SELECT * FROM table WHERE id = {user_id}".to_string(),
parameters: vec!["user_id".to_string(), "limit".to_string()],
};
let component = PipelineComponent::Query(query);
let yaml_result = component.to_yaml();
assert!(yaml_result.is_ok());
let yaml_output = yaml_result.unwrap();
assert!(yaml_output.contains("sql: SELECT * FROM table WHERE id = {user_id}"));
assert!(yaml_output.contains("user_id"));
assert!(yaml_output.contains("limit"));
}
#[test]
fn test_from_yaml_metadata_component() {
let yaml_input = r#"
!Metadata
name: test_pipeline
version: 1.0.0
description: Test pipeline description
created_at: 2025-01-15T10:00:00.000+00:00
updated_at: 2025-01-15T10:00:00.000+00:00
"#;
let result = PipelineComponent::from_yaml(yaml_input);
assert!(result.is_ok());
let component = result.unwrap();
match component {
PipelineComponent::Metadata(metadata) => {
assert_eq!(metadata.name, "test_pipeline");
assert_eq!(metadata.version, "1.0.0");
assert_eq!(
metadata.description,
Some("Test pipeline description".to_string())
);
assert_eq!(
metadata.created_at,
Some("2025-01-15T10:00:00.000+00:00".to_string())
);
assert_eq!(
metadata.updated_at,
Some("2025-01-15T10:00:00.000+00:00".to_string())
);
}
_ => panic!("Expected Metadata component"),
}
}
#[test]
fn test_from_yaml_query_component() {
let yaml_input = r#"
!Query
sql: SELECT * FROM table WHERE id = {user_id}
parameters:
- user_id
- limit
"#;
let result = PipelineComponent::from_yaml(yaml_input);
assert!(result.is_ok());
let component = result.unwrap();
match component {
PipelineComponent::Query(query) => {
assert_eq!(query.sql, "SELECT * FROM table WHERE id = {user_id}");
assert_eq!(query.parameters, vec!["user_id", "limit"]);
}
_ => panic!("Expected Query component"),
}
}
#[test]
fn test_from_yaml_malformed_yaml_error() {
let yaml_input = r#"
!Request
fields:
name:
field_type: string
nullable: false
# Missing closing structure - malformed YAML
invalid_syntax
"#;
let result = PipelineComponent::from_yaml(yaml_input);
assert!(result.is_err());
let error_msg = result.unwrap_err().to_string();
assert!(error_msg.contains("Failed to deserialize YAML"));
}
#[test]
fn test_round_trip_metadata_component() {
let original = ComponentMetadata {
name: "test_pipeline".to_string(),
version: "1.0.0".to_string(),
description: Some("Test pipeline description".to_string()),
created_at: Some("2025-01-15T10:00:00.000+00:00".to_string()),
updated_at: Some("2025-01-15T10:00:00.000+00:00".to_string()),
};
let component = PipelineComponent::Metadata(original.clone());
let yaml1 = component.to_yaml().unwrap();
let parsed_component = PipelineComponent::from_yaml(&yaml1).unwrap();
let yaml2 = parsed_component.to_yaml().unwrap();
assert_eq!(yaml1, yaml2);
match parsed_component {
PipelineComponent::Metadata(metadata) => {
assert_eq!(metadata.name, original.name);
assert_eq!(metadata.version, original.version);
assert_eq!(metadata.description, original.description);
assert_eq!(metadata.created_at, original.created_at);
assert_eq!(metadata.updated_at, original.updated_at);
}
_ => panic!("Expected Metadata component"),
}
}
#[test]
fn test_round_trip_query_component() {
let original_query = QueryDefinition {
sql: "SELECT * FROM table WHERE id = {user_id} AND status = {status}".to_string(),
parameters: vec![
"user_id".to_string(),
"status".to_string(),
"limit".to_string(),
],
};
let component = PipelineComponent::Query(original_query.clone());
let yaml1 = component.to_yaml().unwrap();
let parsed_component = PipelineComponent::from_yaml(&yaml1).unwrap();
let yaml2 = parsed_component.to_yaml().unwrap();
assert_eq!(yaml1, yaml2);
match parsed_component {
PipelineComponent::Query(query) => {
assert_eq!(query.sql, original_query.sql);
assert_eq!(query.parameters, original_query.parameters);
}
_ => panic!("Expected Query component"),
}
}
}