use std::collections::HashMap;
use std::sync::Arc;
use parking_lot::RwLock;
use serde_json::{Map, Value, json};
use tracing::debug;
const INTROSPECTION_QUERY: &str = r"
query IntrospectionQuery {
__schema {
queryType { name }
mutationType { name }
subscriptionType { name }
types { ...FullType }
directives {
name description locations
args { ...InputValue }
}
}
}
fragment FullType on __Type {
kind name description
fields(includeDeprecated: true) {
name description
args { ...InputValue }
type { ...TypeRef }
isDeprecated deprecationReason
}
inputFields { ...InputValue }
interfaces { ...TypeRef }
enumValues(includeDeprecated: true) {
name description isDeprecated deprecationReason
}
possibleTypes { ...TypeRef }
}
fragment InputValue on __InputValue {
name description
type { ...TypeRef }
defaultValue
}
fragment TypeRef on __Type {
kind name
ofType {
kind name
ofType {
kind name
ofType {
kind name
ofType {
kind name
ofType {
kind name
ofType { kind name }
}
}
}
}
}
}
";
use crate::domain::error::{ProviderError, Result, ServiceError, StygianError};
use crate::ports::{AIProvider, GraphQlAuth, ScrapingService, ServiceInput};
#[derive(Debug, Clone)]
pub struct DiscoveredSchema {
pub schema: Value,
pub source: SchemaSource,
pub confidence: f32,
pub url_pattern: Option<String>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum SchemaSource {
ExampleInference,
AiInference,
Cached,
UserCorrection,
}
#[derive(Debug, Clone)]
pub struct SchemaDiscoveryConfig {
pub max_content_chars: usize,
pub cache_schemas: bool,
pub min_ai_confidence: f32,
}
impl Default for SchemaDiscoveryConfig {
fn default() -> Self {
Self {
max_content_chars: 16_000,
cache_schemas: true,
min_ai_confidence: 0.6,
}
}
}
#[derive(Debug, Clone)]
pub struct SchemaDiff {
pub added_fields: Vec<String>,
pub removed_fields: Vec<String>,
pub type_changes: Vec<(String, String, String)>, pub is_compatible: bool,
}
pub struct SchemaDiscoveryService {
config: SchemaDiscoveryConfig,
ai_provider: Option<Arc<dyn AIProvider>>,
cache: Arc<RwLock<HashMap<String, DiscoveredSchema>>>,
corrections: Arc<RwLock<HashMap<String, Value>>>,
graphql_service: Option<Arc<dyn ScrapingService>>,
}
impl SchemaDiscoveryService {
pub fn new(config: SchemaDiscoveryConfig, ai_provider: Option<Arc<dyn AIProvider>>) -> Self {
Self {
config,
ai_provider,
cache: Arc::new(RwLock::new(HashMap::new())),
corrections: Arc::new(RwLock::new(HashMap::new())),
graphql_service: None,
}
}
#[must_use]
pub fn with_graphql_service(mut self, service: Arc<dyn ScrapingService>) -> Self {
self.graphql_service = Some(service);
self
}
pub fn infer_from_example(&self, example: &Value) -> Value {
Self::infer_schema_for_value(example)
}
fn infer_schema_for_value(value: &Value) -> Value {
match value {
Value::Object(map) => {
let mut properties = Map::new();
let mut required = Vec::new();
for (key, val) in map {
properties.insert(key.clone(), Self::infer_schema_for_value(val));
required.push(json!(key));
}
json!({
"type": "object",
"properties": Value::Object(properties),
"required": required
})
}
Value::Array(items) => {
let item_schema = items
.first()
.map_or_else(|| json!({}), Self::infer_schema_for_value);
json!({
"type": "array",
"items": item_schema
})
}
Value::String(_) => json!({"type": "string"}),
Value::Number(n) => {
if n.is_i64() || n.is_u64() {
json!({"type": "integer"})
} else {
json!({"type": "number"})
}
}
Value::Bool(_) => json!({"type": "boolean"}),
Value::Null => json!({"type": "null"}),
}
}
pub async fn infer_from_html(
&self,
html: &str,
url_pattern: Option<&str>,
) -> Result<DiscoveredSchema> {
if self.config.cache_schemas
&& let Some(pattern) = url_pattern
{
let cache = self.cache.read();
if let Some(cached) = cache.get(pattern) {
debug!(pattern, "Schema cache hit");
return Ok(DiscoveredSchema {
source: SchemaSource::Cached,
..cached.clone()
});
}
}
let provider = self.ai_provider.as_ref().ok_or_else(|| {
StygianError::Provider(ProviderError::ApiError(
"No AI provider configured for HTML schema discovery".to_string(),
))
})?;
let truncated = if html.len() > self.config.max_content_chars {
&html[..self.config.max_content_chars]
} else {
html
};
let discovery_schema = json!({
"type": "object",
"properties": {
"schema": {
"type": "object",
"description": "A valid JSON Schema object describing the structured data in the HTML"
},
"confidence": {
"type": "number",
"minimum": 0.0,
"maximum": 1.0,
"description": "Confidence in the inferred schema [0.0, 1.0]"
},
"description": {
"type": "string",
"description": "Human-readable explanation of what data was detected"
}
},
"required": ["schema", "confidence"]
});
let prompt = format!(
"Analyze the following HTML page and infer a JSON Schema for the main structured \
data it contains (e.g. product listings, articles, search results). \
Return the schema definition.\n\nHTML:\n{truncated}"
);
let result = provider.extract(prompt, discovery_schema).await?;
let schema = result
.get("schema")
.cloned()
.unwrap_or_else(|| json!({"type": "object"}));
#[allow(clippy::cast_possible_truncation)]
let confidence = result
.get("confidence")
.and_then(Value::as_f64)
.unwrap_or(0.7) as f32;
let schema = self.apply_corrections(schema);
let discovered = DiscoveredSchema {
schema,
source: SchemaSource::AiInference,
confidence,
url_pattern: url_pattern.map(str::to_string),
};
if self.config.cache_schemas
&& let Some(pattern) = url_pattern
{
self.cache
.write()
.insert(pattern.to_string(), discovered.clone());
}
Ok(discovered)
}
fn apply_corrections(&self, mut schema: Value) -> Value {
let corrections = self.corrections.read();
if corrections.is_empty() {
return schema;
}
if let Some(props) = schema.get_mut("properties").and_then(Value::as_object_mut) {
for (field, correction) in corrections.iter() {
if props.contains_key(field) {
props.insert(field.clone(), correction.clone());
}
}
}
schema
}
pub fn add_correction(&self, field_name: String, schema_fragment: Value) {
self.corrections.write().insert(field_name, schema_fragment);
}
pub fn cache_schema(
&self,
url_pattern: String,
schema: Value,
source: SchemaSource,
confidence: f32,
) {
let discovered = DiscoveredSchema {
schema,
source,
confidence,
url_pattern: Some(url_pattern.clone()),
};
self.cache.write().insert(url_pattern, discovered);
}
pub fn detect_evolution(&self, old_schema: &Value, new_schema: &Value) -> SchemaDiff {
let old_props = old_schema
.get("properties")
.and_then(Value::as_object)
.cloned()
.unwrap_or_default();
let new_props = new_schema
.get("properties")
.and_then(Value::as_object)
.cloned()
.unwrap_or_default();
let added_fields: Vec<String> = new_props
.keys()
.filter(|k| !old_props.contains_key(*k))
.cloned()
.collect();
let removed_fields: Vec<String> = old_props
.keys()
.filter(|k| !new_props.contains_key(*k))
.cloned()
.collect();
let type_changes: Vec<(String, String, String)> = old_props
.iter()
.filter_map(|(k, old_v)| {
let new_v = new_props.get(k)?;
let old_t = old_v.get("type").and_then(Value::as_str).unwrap_or("any");
let new_t = new_v.get("type").and_then(Value::as_str).unwrap_or("any");
(old_t != new_t).then(|| (k.clone(), old_t.to_string(), new_t.to_string()))
})
.collect();
let is_compatible = removed_fields.is_empty() && type_changes.is_empty();
SchemaDiff {
added_fields,
removed_fields,
type_changes,
is_compatible,
}
}
#[allow(clippy::indexing_slicing)]
pub async fn infer_from_graphql(
&self,
endpoint: &str,
auth: Option<GraphQlAuth>,
) -> Result<DiscoveredSchema> {
if self.config.cache_schemas {
let cache = self.cache.read();
if let Some(cached) = cache.get(endpoint) {
debug!(endpoint, "GraphQL introspection cache hit");
return Ok(DiscoveredSchema {
source: SchemaSource::Cached,
..cached.clone()
});
}
}
let service = self.graphql_service.as_ref().ok_or_else(|| {
StygianError::Service(ServiceError::Unavailable(
"No GraphQL service configured for introspection".to_string(),
))
})?;
let mut params = json!({
"query": INTROSPECTION_QUERY,
});
if let Some(a) = auth {
params["auth"] = json!({
"kind": match a.kind {
crate::ports::GraphQlAuthKind::Bearer => "bearer",
crate::ports::GraphQlAuthKind::ApiKey => "api_key",
crate::ports::GraphQlAuthKind::Header => "header",
crate::ports::GraphQlAuthKind::None => "none",
},
"token": a.token,
});
if let Some(header) = a.header_name {
params["auth"]["header_name"] = json!(header);
}
}
let input = ServiceInput {
url: endpoint.to_string(),
params,
};
let output = service.execute(input).await?;
let response_json: Value = serde_json::from_str(&output.data)
.map_err(|e| StygianError::Service(ServiceError::InvalidResponse(e.to_string())))?;
let introspection = response_json.get("__schema").ok_or_else(|| {
StygianError::Service(ServiceError::InvalidResponse(
"introspection response missing __schema key".to_string(),
))
})?;
let query_type_name = introspection
.get("queryType")
.and_then(|qt| qt.get("name"))
.and_then(Value::as_str)
.unwrap_or("Query")
.to_string();
let types = introspection
.get("types")
.and_then(Value::as_array)
.cloned()
.unwrap_or_default();
let mut definitions = Map::new();
for gql_type in &types {
let name = gql_type.get("name").and_then(Value::as_str).unwrap_or("");
if name.is_empty() || name.starts_with('_') {
continue;
}
let def = Self::convert_type_to_definition(gql_type);
definitions.insert(name.to_string(), def);
}
let schema = json!({
"type": "object",
"description": format!("GraphQL schema for {endpoint}"),
"definitions": Value::Object(definitions),
"queryType": query_type_name,
});
let discovered = DiscoveredSchema {
schema,
source: SchemaSource::AiInference,
confidence: 1.0,
url_pattern: Some(endpoint.to_string()),
};
if self.config.cache_schemas {
self.cache
.write()
.insert(endpoint.to_string(), discovered.clone());
}
Ok(discovered)
}
#[allow(clippy::indexing_slicing)]
fn convert_type_to_definition(gql_type: &Value) -> Value {
let kind = gql_type.get("kind").and_then(Value::as_str).unwrap_or("");
match kind {
"OBJECT" | "INTERFACE" | "INPUT_OBJECT" => {
let fields: Vec<Value> = gql_type
.get("fields")
.or_else(|| gql_type.get("inputFields"))
.and_then(Value::as_array)
.cloned()
.unwrap_or_default();
let mut properties = Map::new();
let mut required: Vec<Value> = Vec::new();
for field in &fields {
let field_name = field.get("name").and_then(Value::as_str).unwrap_or("");
if field_name.is_empty() {
continue;
}
let type_ref = &field["type"];
if type_ref.get("kind").and_then(Value::as_str) == Some("NON_NULL") {
required.push(json!(field_name));
}
properties.insert(field_name.to_string(), Self::convert_type_ref(type_ref));
}
let mut obj = json!({
"type": "object",
"properties": Value::Object(properties),
});
if !required.is_empty() {
obj["required"] = json!(required);
}
obj
}
"ENUM" => {
let values: Vec<Value> = gql_type
.get("enumValues")
.and_then(Value::as_array)
.map(|vals| {
vals.iter()
.filter_map(|v| v.get("name").and_then(Value::as_str))
.map(|s| json!(s))
.collect()
})
.unwrap_or_default();
json!({ "type": "string", "enum": values })
}
"SCALAR" => {
let name = gql_type.get("name").and_then(Value::as_str).unwrap_or("");
Self::scalar_to_json_schema(name)
}
_ => json!({}),
}
}
fn convert_type_ref(type_ref: &Value) -> Value {
let kind = type_ref.get("kind").and_then(Value::as_str).unwrap_or("");
match kind {
"NON_NULL" => Self::convert_type_ref(&type_ref["ofType"]),
"LIST" => json!({
"type": "array",
"items": Self::convert_type_ref(&type_ref["ofType"]),
}),
"OBJECT" | "INTERFACE" | "INPUT_OBJECT" | "ENUM" => {
let name = type_ref.get("name").and_then(Value::as_str).unwrap_or("");
json!({ "$ref": format!("#/definitions/{name}") })
}
"SCALAR" => {
let name = type_ref.get("name").and_then(Value::as_str).unwrap_or("");
Self::scalar_to_json_schema(name)
}
_ => json!({}),
}
}
fn scalar_to_json_schema(name: &str) -> Value {
match name {
"String" | "ID" => json!({ "type": "string" }),
"Int" => json!({ "type": "integer" }),
"Float" => json!({ "type": "number" }),
"Boolean" => json!({ "type": "boolean" }),
s if s.to_ascii_lowercase().contains("datetime")
|| s.to_ascii_lowercase().contains("date") =>
{
json!({ "type": "string", "format": "date-time" })
}
_ => json!({}),
}
}
pub fn is_valid_schema(schema: &Value) -> bool {
if !schema.is_object() {
return false;
}
schema.get("type").is_some() || schema.get("properties").is_some()
}
}
#[cfg(test)]
#[allow(
clippy::unwrap_used,
clippy::indexing_slicing,
clippy::approx_constant,
clippy::significant_drop_tightening,
clippy::float_cmp,
clippy::unnecessary_map_or
)]
mod tests {
use super::*;
use serde_json::json;
fn svc() -> SchemaDiscoveryService {
SchemaDiscoveryService::new(SchemaDiscoveryConfig::default(), None)
}
#[test]
fn test_infer_object_schema() {
let s = svc();
let schema = s.infer_from_example(&json!({"name": "Alice", "age": 30, "active": true}));
assert_eq!(schema["type"].as_str().unwrap(), "object");
assert_eq!(
schema["properties"]["name"]["type"].as_str().unwrap(),
"string"
);
assert_eq!(
schema["properties"]["age"]["type"].as_str().unwrap(),
"integer"
);
assert_eq!(
schema["properties"]["active"]["type"].as_str().unwrap(),
"boolean"
);
}
#[test]
fn test_infer_array_schema() {
let s = svc();
let schema = s.infer_from_example(&json!([{"id": 1}, {"id": 2}]));
assert_eq!(schema["type"].as_str().unwrap(), "array");
assert_eq!(schema["items"]["type"].as_str().unwrap(), "object");
}
#[test]
fn test_infer_string_schema() {
let s = svc();
let schema = s.infer_from_example(&json!("hello"));
assert_eq!(schema["type"].as_str().unwrap(), "string");
}
#[test]
fn test_infer_number_float() {
let s = svc();
let schema = s.infer_from_example(&json!(3.14));
assert_eq!(schema["type"].as_str().unwrap(), "number");
}
#[test]
fn test_infer_null() {
let s = svc();
let schema = s.infer_from_example(&Value::Null);
assert_eq!(schema["type"].as_str().unwrap(), "null");
}
#[test]
fn test_valid_schema_with_type() {
assert!(SchemaDiscoveryService::is_valid_schema(
&json!({"type": "object"})
));
}
#[test]
fn test_valid_schema_with_properties() {
assert!(SchemaDiscoveryService::is_valid_schema(
&json!({"properties": {"x": {"type": "string"}}})
));
}
#[test]
fn test_invalid_schema_string() {
assert!(!SchemaDiscoveryService::is_valid_schema(&json!(
"not schema"
)));
}
#[test]
fn test_invalid_schema_empty_object() {
assert!(!SchemaDiscoveryService::is_valid_schema(&json!({})));
}
#[test]
fn test_evolution_added_field() {
let s = svc();
let old = json!({"type": "object", "properties": {"name": {"type": "string"}}});
let new = json!({"type": "object", "properties": {"name": {"type": "string"}, "email": {"type": "string"}}});
let diff = s.detect_evolution(&old, &new);
assert!(diff.added_fields.contains(&"email".to_string()));
assert!(diff.removed_fields.is_empty());
assert!(diff.is_compatible);
}
#[test]
fn test_evolution_removed_field() {
let s = svc();
let old = json!({"type": "object", "properties": {"name": {"type": "string"}, "age": {"type": "integer"}}});
let new = json!({"type": "object", "properties": {"name": {"type": "string"}}});
let diff = s.detect_evolution(&old, &new);
assert!(diff.removed_fields.contains(&"age".to_string()));
assert!(!diff.is_compatible);
}
#[test]
fn test_evolution_type_change() {
let s = svc();
let old = json!({"type": "object", "properties": {"id": {"type": "string"}}});
let new = json!({"type": "object", "properties": {"id": {"type": "integer"}}});
let diff = s.detect_evolution(&old, &new);
assert_eq!(diff.type_changes.len(), 1);
assert_eq!(
diff.type_changes[0],
(
"id".to_string(),
"string".to_string(),
"integer".to_string()
)
);
assert!(!diff.is_compatible);
}
#[test]
fn test_evolution_no_change() {
let s = svc();
let schema = json!({"type": "object", "properties": {"x": {"type": "string"}}});
let diff = s.detect_evolution(&schema, &schema);
assert!(diff.added_fields.is_empty());
assert!(diff.removed_fields.is_empty());
assert!(diff.is_compatible);
}
#[test]
fn test_correction_overrides_inferred_type() {
let s = svc();
s.add_correction("id".to_string(), json!({"type": "integer"}));
let schema = json!({
"type": "object",
"properties": {
"id": {"type": "string"},
"name": {"type": "string"}
}
});
let corrected = s.apply_corrections(schema);
assert_eq!(
corrected["properties"]["id"]["type"].as_str().unwrap(),
"integer"
);
assert_eq!(
corrected["properties"]["name"]["type"].as_str().unwrap(),
"string"
);
}
#[test]
fn test_cache_and_retrieve() {
let s = svc();
let schema = json!({"type": "object", "properties": {"title": {"type": "string"}}});
s.cache_schema(
"example.com".to_string(),
schema.clone(),
SchemaSource::UserCorrection,
1.0,
);
let cache = s.cache.read();
let entry = cache.get("example.com").unwrap();
assert_eq!(entry.schema, schema);
assert_eq!(entry.source, SchemaSource::UserCorrection);
}
#[tokio::test]
async fn test_infer_from_html_no_provider() {
let s = svc();
let result = s
.infer_from_html("<html><body>test</body></html>", None)
.await;
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("No AI provider"));
}
#[tokio::test]
async fn test_infer_from_html_cache_hit() {
let s = svc();
let schema = json!({"type": "object"});
s.cache_schema(
"example.com".to_string(),
schema.clone(),
SchemaSource::Cached,
0.9,
);
let result = s
.infer_from_html("<html/>", Some("example.com"))
.await
.unwrap();
assert_eq!(result.source, SchemaSource::Cached);
assert_eq!(result.schema, schema);
}
struct MockGraphQlService(Value);
#[async_trait::async_trait]
impl ScrapingService for MockGraphQlService {
fn name(&self) -> &'static str {
"mock_graphql"
}
async fn execute(
&self,
_input: crate::ports::ServiceInput,
) -> Result<crate::ports::ServiceOutput> {
Ok(crate::ports::ServiceOutput {
data: serde_json::to_string(&self.0).unwrap_or_default(),
metadata: Value::default(),
})
}
}
impl SchemaDiscoveryService {
fn with_mock_graphql(config: SchemaDiscoveryConfig, response: Value) -> Self {
let svc = Arc::new(MockGraphQlService(response));
Self::new(config, None).with_graphql_service(svc)
}
}
fn make_introspection_response_simple() -> Value {
json!({
"__schema": {
"queryType": { "name": "Query" },
"mutationType": null,
"subscriptionType": null,
"types": [
{
"kind": "OBJECT",
"name": "Query",
"fields": [
{
"name": "user",
"type": { "kind": "OBJECT", "name": "User", "ofType": null }
}
],
"inputFields": null,
"interfaces": [],
"enumValues": null,
"possibleTypes": null
},
{
"kind": "OBJECT",
"name": "User",
"fields": [
{
"name": "id",
"type": { "kind": "NON_NULL", "name": null, "ofType": { "kind": "SCALAR", "name": "ID", "ofType": null } }
},
{
"name": "name",
"type": { "kind": "SCALAR", "name": "String", "ofType": null }
}
],
"inputFields": null,
"interfaces": [],
"enumValues": null,
"possibleTypes": null
},
{
"kind": "SCALAR",
"name": "String",
"fields": null,
"inputFields": null,
"interfaces": null,
"enumValues": null,
"possibleTypes": null
},
{
"kind": "SCALAR",
"name": "ID",
"fields": null,
"inputFields": null,
"interfaces": null,
"enumValues": null,
"possibleTypes": null
}
],
"directives": []
}
})
}
#[tokio::test]
async fn introspection_no_graphql_service() {
let s = svc(); let result = s
.infer_from_graphql("https://api.example.com/graphql", None)
.await;
assert!(result.is_err());
assert!(
result
.unwrap_err()
.to_string()
.contains("No GraphQL service")
);
}
#[test]
fn graphql_type_to_json_schema_scalar() {
assert_eq!(
SchemaDiscoveryService::scalar_to_json_schema("String"),
json!({"type": "string"})
);
assert_eq!(
SchemaDiscoveryService::scalar_to_json_schema("Int"),
json!({"type": "integer"})
);
assert_eq!(
SchemaDiscoveryService::scalar_to_json_schema("Float"),
json!({"type": "number"})
);
assert_eq!(
SchemaDiscoveryService::scalar_to_json_schema("Boolean"),
json!({"type": "boolean"})
);
assert_eq!(
SchemaDiscoveryService::scalar_to_json_schema("ID"),
json!({"type": "string"})
);
assert_eq!(
SchemaDiscoveryService::scalar_to_json_schema("DateTime"),
json!({"type": "string", "format": "date-time"})
);
assert_eq!(
SchemaDiscoveryService::scalar_to_json_schema("JSON"),
json!({})
);
}
#[test]
fn graphql_type_to_json_schema_object() {
let gql_type = json!({
"kind": "OBJECT",
"name": "User",
"fields": [
{
"name": "id",
"type": { "kind": "SCALAR", "name": "ID", "ofType": null }
},
{
"name": "email",
"type": { "kind": "SCALAR", "name": "String", "ofType": null }
}
]
});
let schema = SchemaDiscoveryService::convert_type_to_definition(&gql_type);
assert_eq!(schema["type"].as_str().unwrap(), "object");
assert_eq!(
schema["properties"]["id"]["type"].as_str().unwrap(),
"string"
);
assert_eq!(
schema["properties"]["email"]["type"].as_str().unwrap(),
"string"
);
assert!(
schema.get("required").is_none()
|| schema["required"].as_array().is_none_or(Vec::is_empty)
);
}
#[test]
fn graphql_non_null_adds_to_required() {
let gql_type = json!({
"kind": "OBJECT",
"name": "Product",
"fields": [
{
"name": "sku",
"type": {
"kind": "NON_NULL",
"name": null,
"ofType": { "kind": "SCALAR", "name": "String", "ofType": null }
}
},
{
"name": "description",
"type": { "kind": "SCALAR", "name": "String", "ofType": null }
}
]
});
let schema = SchemaDiscoveryService::convert_type_to_definition(&gql_type);
let required = schema["required"].as_array().unwrap();
assert!(required.contains(&json!("sku")));
assert!(!required.contains(&json!("description")));
}
#[tokio::test]
async fn introspection_result_cached() {
let response = make_introspection_response_simple();
let s =
SchemaDiscoveryService::with_mock_graphql(SchemaDiscoveryConfig::default(), response);
let first = s
.infer_from_graphql("https://api.example.com/graphql", None)
.await
.unwrap();
assert_eq!(first.source, SchemaSource::AiInference);
assert_eq!(first.confidence, 1.0);
let second = s
.infer_from_graphql("https://api.example.com/graphql", None)
.await
.unwrap();
assert_eq!(second.source, SchemaSource::Cached);
assert_eq!(second.schema["queryType"].as_str().unwrap(), "Query");
}
}