use anyhow::{anyhow, Result};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, SystemTime};
use tokio::sync::RwLock;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum SourceType {
GraphQL,
REST,
OpenAPI,
SOAP,
GRPC,
OData,
Database,
JsonSchema,
Custom,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DataSource {
pub id: String,
pub name: String,
pub source_type: SourceType,
pub endpoint: String,
pub schema: Option<String>,
pub auth: Option<AuthConfig>,
pub headers: HashMap<String, String>,
pub transforms: Vec<Transform>,
pub cache: Option<CacheConfig>,
pub health_check: Option<String>,
pub enabled: bool,
pub metadata: HashMap<String, String>,
}
impl DataSource {
pub fn new(id: &str, name: &str, source_type: SourceType, endpoint: &str) -> Self {
Self {
id: id.to_string(),
name: name.to_string(),
source_type,
endpoint: endpoint.to_string(),
schema: None,
auth: None,
headers: HashMap::new(),
transforms: Vec::new(),
cache: None,
health_check: None,
enabled: true,
metadata: HashMap::new(),
}
}
pub fn with_auth(mut self, auth: AuthConfig) -> Self {
self.auth = Some(auth);
self
}
pub fn with_header(mut self, key: &str, value: &str) -> Self {
self.headers.insert(key.to_string(), value.to_string());
self
}
pub fn with_transform(mut self, transform: Transform) -> Self {
self.transforms.push(transform);
self
}
pub fn with_cache(mut self, cache: CacheConfig) -> Self {
self.cache = Some(cache);
self
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum AuthConfig {
None,
Basic { username: String, password: String },
Bearer { token: String },
ApiKey { key: String, header: String },
OAuth2 {
client_id: String,
client_secret: String,
token_url: String,
scopes: Vec<String>,
},
Custom { config: HashMap<String, String> },
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum Transform {
Prefix { value: String },
Rename { from: String, to: String },
FilterTypes { include: Vec<String> },
FilterFields {
type_name: String,
include: Vec<String>,
},
AddField {
type_name: String,
field_name: String,
field_type: String,
},
Encapsulate { namespace: String },
Snapshot { name: String },
Custom {
name: String,
config: HashMap<String, String>,
},
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CacheConfig {
pub ttl: Duration,
pub max_entries: usize,
pub key_strategy: CacheKeyStrategy,
pub invalidation_events: Vec<String>,
}
impl Default for CacheConfig {
fn default() -> Self {
Self {
ttl: Duration::from_secs(60),
max_entries: 1000,
key_strategy: CacheKeyStrategy::default(),
invalidation_events: Vec::new(),
}
}
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub enum CacheKeyStrategy {
#[default]
QueryHash,
FieldLevel,
Custom { template: String },
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TypeMergeConfig {
pub type_name: String,
pub key_field: String,
pub sources: Vec<String>,
pub resolution: MergeResolution,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum MergeResolution {
First,
Last,
MergeAll,
FailOnConflict,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CrossSourceRelation {
pub source_type: String,
pub source_field: String,
pub source_id: String,
pub target_type: String,
pub target_key_field: String,
pub target_source_id: String,
pub relation_type: RelationType,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum RelationType {
OneToOne,
OneToMany,
ManyToOne,
ManyToMany,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MeshConfig {
pub name: String,
pub sources: Vec<DataSource>,
pub type_merges: Vec<TypeMergeConfig>,
pub relations: Vec<CrossSourceRelation>,
pub global_transforms: Vec<Transform>,
pub enable_introspection: bool,
pub serve: ServeConfig,
}
impl Default for MeshConfig {
fn default() -> Self {
Self {
name: "mesh".to_string(),
sources: Vec::new(),
type_merges: Vec::new(),
relations: Vec::new(),
global_transforms: Vec::new(),
enable_introspection: true,
serve: ServeConfig::default(),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ServeConfig {
pub host: String,
pub port: u16,
pub playground: bool,
pub cors: Vec<String>,
}
impl Default for ServeConfig {
fn default() -> Self {
Self {
host: "0.0.0.0".to_string(),
port: 4000,
playground: true,
cors: vec!["*".to_string()],
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SourceHealth {
pub source_id: String,
pub healthy: bool,
pub last_check: SystemTime,
pub last_error: Option<String>,
pub latency_ms: u64,
}
struct MeshState {
config: MeshConfig,
source_health: HashMap<String, SourceHealth>,
merged_schema: Option<String>,
last_merge: Option<SystemTime>,
}
impl MeshState {
fn new(config: MeshConfig) -> Self {
Self {
config,
source_health: HashMap::new(),
merged_schema: None,
last_merge: None,
}
}
}
pub struct GraphQLMesh {
state: Arc<RwLock<MeshState>>,
}
impl GraphQLMesh {
pub fn new(config: MeshConfig) -> Self {
Self {
state: Arc::new(RwLock::new(MeshState::new(config))),
}
}
pub async fn add_source(&self, source: DataSource) -> Result<()> {
let mut state = self.state.write().await;
if state.config.sources.iter().any(|s| s.id == source.id) {
return Err(anyhow!("Source '{}' already exists", source.id));
}
state.config.sources.push(source);
state.merged_schema = None;
Ok(())
}
pub async fn remove_source(&self, source_id: &str) -> Result<()> {
let mut state = self.state.write().await;
let initial_len = state.config.sources.len();
state.config.sources.retain(|s| s.id != source_id);
if state.config.sources.len() == initial_len {
return Err(anyhow!("Source '{}' not found", source_id));
}
state.source_health.remove(source_id);
state.merged_schema = None;
Ok(())
}
pub async fn get_sources(&self) -> Vec<DataSource> {
let state = self.state.read().await;
state.config.sources.clone()
}
pub async fn get_source(&self, source_id: &str) -> Option<DataSource> {
let state = self.state.read().await;
state
.config
.sources
.iter()
.find(|s| s.id == source_id)
.cloned()
}
pub async fn update_source_health(
&self,
source_id: &str,
healthy: bool,
error: Option<String>,
latency_ms: u64,
) {
let mut state = self.state.write().await;
state.source_health.insert(
source_id.to_string(),
SourceHealth {
source_id: source_id.to_string(),
healthy,
last_check: SystemTime::now(),
last_error: error,
latency_ms,
},
);
}
pub async fn get_source_health(&self, source_id: &str) -> Option<SourceHealth> {
let state = self.state.read().await;
state.source_health.get(source_id).cloned()
}
pub async fn get_all_health(&self) -> Vec<SourceHealth> {
let state = self.state.read().await;
state.source_health.values().cloned().collect()
}
pub async fn add_type_merge(&self, merge: TypeMergeConfig) {
let mut state = self.state.write().await;
state.config.type_merges.push(merge);
state.merged_schema = None;
}
pub async fn add_relation(&self, relation: CrossSourceRelation) {
let mut state = self.state.write().await;
state.config.relations.push(relation);
state.merged_schema = None;
}
pub async fn build_schema(&self) -> Result<String> {
let mut state = self.state.write().await;
let mut sdl = String::new();
sdl.push_str("type Query {\n");
for source in &state.config.sources {
if !source.enabled {
continue;
}
let prefix = source
.transforms
.iter()
.find_map(|t| match t {
Transform::Prefix { value } => Some(value.clone()),
_ => None,
})
.unwrap_or_default();
sdl.push_str(&format!(
" # Source: {} ({})\n",
source.name,
source.source_type.name()
));
sdl.push_str(&format!(
" {}health: Boolean @source(name: \"{}\")\n",
prefix.to_lowercase(),
source.id
));
}
sdl.push_str("}\n\n");
sdl.push_str("directive @source(name: String!) on FIELD_DEFINITION\n");
state.merged_schema = Some(sdl.clone());
state.last_merge = Some(SystemTime::now());
Ok(sdl)
}
pub async fn get_merged_schema(&self) -> Option<String> {
let state = self.state.read().await;
state.merged_schema.clone()
}
pub async fn get_config(&self) -> MeshConfig {
let state = self.state.read().await;
state.config.clone()
}
pub async fn validate(&self) -> Vec<ValidationError> {
let state = self.state.read().await;
let mut errors = Vec::new();
let mut seen_ids = std::collections::HashSet::new();
for source in &state.config.sources {
if !seen_ids.insert(&source.id) {
errors.push(ValidationError {
path: format!("sources.{}", source.id),
message: "Duplicate source ID".to_string(),
});
}
}
for merge in &state.config.type_merges {
for source_id in &merge.sources {
if !state.config.sources.iter().any(|s| &s.id == source_id) {
errors.push(ValidationError {
path: format!("type_merges.{}.sources", merge.type_name),
message: format!("Unknown source: {}", source_id),
});
}
}
}
for relation in &state.config.relations {
if !state
.config
.sources
.iter()
.any(|s| s.id == relation.source_id)
{
errors.push(ValidationError {
path: format!("relations.{}.source_id", relation.source_field),
message: format!("Unknown source: {}", relation.source_id),
});
}
if !state
.config
.sources
.iter()
.any(|s| s.id == relation.target_source_id)
{
errors.push(ValidationError {
path: format!("relations.{}.target_source_id", relation.source_field),
message: format!("Unknown source: {}", relation.target_source_id),
});
}
}
errors
}
pub async fn export_yaml(&self) -> Result<String> {
let state = self.state.read().await;
let yaml = serde_json::to_string_pretty(&state.config)?;
Ok(yaml)
}
}
impl SourceType {
pub fn name(&self) -> &'static str {
match self {
SourceType::GraphQL => "GraphQL",
SourceType::REST => "REST",
SourceType::OpenAPI => "OpenAPI",
SourceType::SOAP => "SOAP",
SourceType::GRPC => "gRPC",
SourceType::OData => "OData",
SourceType::Database => "Database",
SourceType::JsonSchema => "JSON Schema",
SourceType::Custom => "Custom",
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ValidationError {
pub path: String,
pub message: String,
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_mesh_creation() {
let config = MeshConfig::default();
let mesh = GraphQLMesh::new(config);
let sources = mesh.get_sources().await;
assert!(sources.is_empty());
}
#[tokio::test]
async fn test_add_source() {
let mesh = GraphQLMesh::new(MeshConfig::default());
let source = DataSource::new(
"users",
"Users API",
SourceType::GraphQL,
"https://api.example.com/graphql",
);
mesh.add_source(source).await.expect("should succeed");
let sources = mesh.get_sources().await;
assert_eq!(sources.len(), 1);
assert_eq!(sources[0].id, "users");
}
#[tokio::test]
async fn test_duplicate_source() {
let mesh = GraphQLMesh::new(MeshConfig::default());
let source1 = DataSource::new(
"api",
"API 1",
SourceType::GraphQL,
"https://api1.example.com",
);
let source2 = DataSource::new(
"api",
"API 2",
SourceType::GraphQL,
"https://api2.example.com",
);
mesh.add_source(source1).await.expect("should succeed");
let result = mesh.add_source(source2).await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_remove_source() {
let mesh = GraphQLMesh::new(MeshConfig::default());
let source = DataSource::new(
"users",
"Users",
SourceType::GraphQL,
"https://api.example.com",
);
mesh.add_source(source).await.expect("should succeed");
mesh.remove_source("users").await.expect("should succeed");
let sources = mesh.get_sources().await;
assert!(sources.is_empty());
}
#[tokio::test]
async fn test_source_health() {
let mesh = GraphQLMesh::new(MeshConfig::default());
let source = DataSource::new("api", "API", SourceType::GraphQL, "https://api.example.com");
mesh.add_source(source).await.expect("should succeed");
mesh.update_source_health("api", true, None, 50).await;
let health = mesh.get_source_health("api").await.expect("should succeed");
assert!(health.healthy);
assert_eq!(health.latency_ms, 50);
}
#[tokio::test]
async fn test_build_schema() {
let mesh = GraphQLMesh::new(MeshConfig::default());
let source = DataSource::new(
"users",
"Users",
SourceType::GraphQL,
"https://api.example.com",
)
.with_transform(Transform::Prefix {
value: "Users_".to_string(),
});
mesh.add_source(source).await.expect("should succeed");
let schema = mesh.build_schema().await.expect("should succeed");
assert!(schema.contains("type Query"));
assert!(schema.contains("users_health"));
}
#[tokio::test]
async fn test_validation() {
let mesh = GraphQLMesh::new(MeshConfig::default());
mesh.add_type_merge(TypeMergeConfig {
type_name: "User".to_string(),
key_field: "id".to_string(),
sources: vec!["nonexistent".to_string()],
resolution: MergeResolution::MergeAll,
})
.await;
let errors = mesh.validate().await;
assert!(!errors.is_empty());
}
#[test]
fn test_data_source_builder() {
let source = DataSource::new("api", "API", SourceType::REST, "https://api.example.com")
.with_auth(AuthConfig::Bearer {
token: "secret".to_string(),
})
.with_header("X-Custom", "value")
.with_cache(CacheConfig::default());
assert!(source.auth.is_some());
assert!(source.headers.contains_key("X-Custom"));
assert!(source.cache.is_some());
}
#[test]
fn test_source_type_name() {
assert_eq!(SourceType::GraphQL.name(), "GraphQL");
assert_eq!(SourceType::REST.name(), "REST");
assert_eq!(SourceType::GRPC.name(), "gRPC");
}
}