1use crate::distributed::coordinator::{QueryPlan, QueryResult};
10use crate::distributed::shard::ShardId;
11use crate::{GraphError, Result};
12use chrono::{DateTime, Utc};
13use dashmap::DashMap;
14use serde::{Deserialize, Serialize};
15use std::collections::HashMap;
16use std::sync::Arc;
17use tokio::sync::RwLock;
18use tracing::{debug, info, warn};
19use uuid::Uuid;
20
21pub type ClusterId = String;
23
24#[derive(Debug, Clone, Serialize, Deserialize)]
26pub struct RemoteCluster {
27 pub cluster_id: ClusterId,
29 pub name: String,
31 pub endpoint: String,
33 pub status: ClusterStatus,
35 pub auth_token: Option<String>,
37 pub last_health_check: DateTime<Utc>,
39 pub metadata: HashMap<String, String>,
41 pub shard_count: u32,
43 pub region: Option<String>,
45}
46
47impl RemoteCluster {
48 pub fn new(cluster_id: ClusterId, name: String, endpoint: String) -> Self {
50 Self {
51 cluster_id,
52 name,
53 endpoint,
54 status: ClusterStatus::Unknown,
55 auth_token: None,
56 last_health_check: Utc::now(),
57 metadata: HashMap::new(),
58 shard_count: 0,
59 region: None,
60 }
61 }
62
63 pub fn is_healthy(&self) -> bool {
65 matches!(self.status, ClusterStatus::Healthy)
66 }
67}
68
69#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
71pub enum ClusterStatus {
72 Healthy,
74 Degraded,
76 Unreachable,
78 Unknown,
80}
81
82pub struct ClusterRegistry {
84 clusters: Arc<DashMap<ClusterId, RemoteCluster>>,
86 discovery_config: DiscoveryConfig,
88}
89
90impl ClusterRegistry {
91 pub fn new(discovery_config: DiscoveryConfig) -> Self {
93 Self {
94 clusters: Arc::new(DashMap::new()),
95 discovery_config,
96 }
97 }
98
99 pub fn register_cluster(&self, cluster: RemoteCluster) -> Result<()> {
101 info!(
102 "Registering cluster: {} ({})",
103 cluster.name, cluster.cluster_id
104 );
105 self.clusters.insert(cluster.cluster_id.clone(), cluster);
106 Ok(())
107 }
108
109 pub fn unregister_cluster(&self, cluster_id: &ClusterId) -> Result<()> {
111 info!("Unregistering cluster: {}", cluster_id);
112 self.clusters.remove(cluster_id).ok_or_else(|| {
113 GraphError::FederationError(format!("Cluster not found: {}", cluster_id))
114 })?;
115 Ok(())
116 }
117
118 pub fn get_cluster(&self, cluster_id: &ClusterId) -> Option<RemoteCluster> {
120 self.clusters.get(cluster_id).map(|c| c.value().clone())
121 }
122
123 pub fn list_clusters(&self) -> Vec<RemoteCluster> {
125 self.clusters.iter().map(|e| e.value().clone()).collect()
126 }
127
128 pub fn healthy_clusters(&self) -> Vec<RemoteCluster> {
130 self.clusters
131 .iter()
132 .filter(|e| e.value().is_healthy())
133 .map(|e| e.value().clone())
134 .collect()
135 }
136
137 pub async fn health_check(&self, cluster_id: &ClusterId) -> Result<ClusterStatus> {
139 let cluster = self.get_cluster(cluster_id).ok_or_else(|| {
140 GraphError::FederationError(format!("Cluster not found: {}", cluster_id))
141 })?;
142
143 let status = ClusterStatus::Healthy;
146
147 if let Some(mut entry) = self.clusters.get_mut(cluster_id) {
149 entry.status = status;
150 entry.last_health_check = Utc::now();
151 }
152
153 debug!("Health check for cluster {}: {:?}", cluster_id, status);
154 Ok(status)
155 }
156
157 pub async fn health_check_all(&self) -> HashMap<ClusterId, ClusterStatus> {
159 let mut results = HashMap::new();
160
161 for cluster in self.list_clusters() {
162 match self.health_check(&cluster.cluster_id).await {
163 Ok(status) => {
164 results.insert(cluster.cluster_id, status);
165 }
166 Err(e) => {
167 warn!(
168 "Health check failed for cluster {}: {}",
169 cluster.cluster_id, e
170 );
171 results.insert(cluster.cluster_id, ClusterStatus::Unreachable);
172 }
173 }
174 }
175
176 results
177 }
178
179 pub async fn discover_clusters(&self) -> Result<Vec<RemoteCluster>> {
181 if !self.discovery_config.auto_discovery {
182 return Ok(Vec::new());
183 }
184
185 info!("Discovering clusters...");
186
187 Ok(Vec::new())
195 }
196}
197
198#[derive(Debug, Clone, Serialize, Deserialize)]
200pub struct DiscoveryConfig {
201 pub auto_discovery: bool,
203 pub discovery_method: DiscoveryMethod,
205 pub discovery_interval_seconds: u64,
207 pub health_check_interval_seconds: u64,
209}
210
211impl Default for DiscoveryConfig {
212 fn default() -> Self {
213 Self {
214 auto_discovery: false,
215 discovery_method: DiscoveryMethod::Static,
216 discovery_interval_seconds: 60,
217 health_check_interval_seconds: 30,
218 }
219 }
220}
221
222#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
224pub enum DiscoveryMethod {
225 Static,
227 Dns,
229 Consul,
231 Etcd,
233 Kubernetes,
235}
236
237#[derive(Debug, Clone, Serialize, Deserialize)]
239pub struct FederatedQuery {
240 pub query_id: String,
242 pub query: String,
244 pub target_clusters: Vec<ClusterId>,
246 pub strategy: FederationStrategy,
248 pub created_at: DateTime<Utc>,
250}
251
252#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
254pub enum FederationStrategy {
255 Parallel,
257 Sequential,
259 PrimaryWithFallback,
261 Nearest,
263}
264
265pub struct Federation {
267 registry: Arc<ClusterRegistry>,
269 config: FederationConfig,
271 active_queries: Arc<DashMap<String, FederatedQuery>>,
273}
274
275impl Federation {
276 pub fn new(config: FederationConfig) -> Self {
278 let discovery_config = DiscoveryConfig::default();
279 Self {
280 registry: Arc::new(ClusterRegistry::new(discovery_config)),
281 config,
282 active_queries: Arc::new(DashMap::new()),
283 }
284 }
285
286 pub fn registry(&self) -> Arc<ClusterRegistry> {
288 Arc::clone(&self.registry)
289 }
290
291 pub async fn execute_federated(
293 &self,
294 query: &str,
295 target_clusters: Option<Vec<ClusterId>>,
296 ) -> Result<FederatedQueryResult> {
297 let query_id = Uuid::new_v4().to_string();
298 let start = std::time::Instant::now();
299
300 let clusters = if let Some(targets) = target_clusters {
302 targets
303 .into_iter()
304 .filter_map(|id| self.registry.get_cluster(&id))
305 .collect()
306 } else {
307 self.registry.healthy_clusters()
308 };
309
310 if clusters.is_empty() {
311 return Err(GraphError::FederationError(
312 "No healthy clusters available".to_string(),
313 ));
314 }
315
316 info!(
317 "Executing federated query {} across {} clusters",
318 query_id,
319 clusters.len()
320 );
321
322 let federated_query = FederatedQuery {
323 query_id: query_id.clone(),
324 query: query.to_string(),
325 target_clusters: clusters.iter().map(|c| c.cluster_id.clone()).collect(),
326 strategy: self.config.default_strategy,
327 created_at: Utc::now(),
328 };
329
330 self.active_queries
331 .insert(query_id.clone(), federated_query.clone());
332
333 let mut cluster_results = HashMap::new();
335
336 match self.config.default_strategy {
337 FederationStrategy::Parallel => {
338 let mut handles = Vec::new();
340
341 for cluster in &clusters {
342 let cluster_id = cluster.cluster_id.clone();
343 let query_str = query.to_string();
344 let cluster_clone = cluster.clone();
345
346 let handle = tokio::spawn(async move {
347 Self::execute_on_cluster(&cluster_clone, &query_str).await
348 });
349
350 handles.push((cluster_id, handle));
351 }
352
353 for (cluster_id, handle) in handles {
355 match handle.await {
356 Ok(Ok(result)) => {
357 cluster_results.insert(cluster_id, result);
358 }
359 Ok(Err(e)) => {
360 warn!("Query failed on cluster {}: {}", cluster_id, e);
361 }
362 Err(e) => {
363 warn!("Task failed for cluster {}: {}", cluster_id, e);
364 }
365 }
366 }
367 }
368 FederationStrategy::Sequential => {
369 for cluster in &clusters {
371 match Self::execute_on_cluster(cluster, query).await {
372 Ok(result) => {
373 cluster_results.insert(cluster.cluster_id.clone(), result);
374 }
375 Err(e) => {
376 warn!("Query failed on cluster {}: {}", cluster.cluster_id, e);
377 }
378 }
379 }
380 }
381 FederationStrategy::Nearest | FederationStrategy::PrimaryWithFallback => {
382 if let Some(cluster) = clusters.first() {
384 match Self::execute_on_cluster(cluster, query).await {
385 Ok(result) => {
386 cluster_results.insert(cluster.cluster_id.clone(), result);
387 }
388 Err(e) => {
389 warn!("Query failed on cluster {}: {}", cluster.cluster_id, e);
390 }
391 }
392 }
393 }
394 }
395
396 let merged_result = self.merge_results(cluster_results)?;
398
399 let execution_time_ms = start.elapsed().as_millis() as u64;
400
401 self.active_queries.remove(&query_id);
403
404 Ok(FederatedQueryResult {
405 query_id,
406 merged_result,
407 clusters_queried: clusters.len(),
408 execution_time_ms,
409 })
410 }
411
412 async fn execute_on_cluster(cluster: &RemoteCluster, query: &str) -> Result<QueryResult> {
414 debug!("Executing query on cluster: {}", cluster.cluster_id);
415
416 Ok(QueryResult {
419 query_id: Uuid::new_v4().to_string(),
420 nodes: Vec::new(),
421 edges: Vec::new(),
422 aggregates: HashMap::new(),
423 stats: crate::distributed::coordinator::QueryStats {
424 execution_time_ms: 0,
425 shards_queried: 0,
426 nodes_scanned: 0,
427 edges_scanned: 0,
428 cached: false,
429 },
430 })
431 }
432
433 fn merge_results(&self, results: HashMap<ClusterId, QueryResult>) -> Result<QueryResult> {
435 if results.is_empty() {
436 return Err(GraphError::FederationError(
437 "No results to merge".to_string(),
438 ));
439 }
440
441 let mut merged = QueryResult {
442 query_id: Uuid::new_v4().to_string(),
443 nodes: Vec::new(),
444 edges: Vec::new(),
445 aggregates: HashMap::new(),
446 stats: crate::distributed::coordinator::QueryStats {
447 execution_time_ms: 0,
448 shards_queried: 0,
449 nodes_scanned: 0,
450 edges_scanned: 0,
451 cached: false,
452 },
453 };
454
455 for (cluster_id, result) in results {
456 debug!("Merging results from cluster: {}", cluster_id);
457
458 for node in result.nodes {
460 if !merged.nodes.iter().any(|n| n.id == node.id) {
461 merged.nodes.push(node);
462 }
463 }
464
465 for edge in result.edges {
467 if !merged.edges.iter().any(|e| e.id == edge.id) {
468 merged.edges.push(edge);
469 }
470 }
471
472 for (key, value) in result.aggregates {
474 merged
475 .aggregates
476 .insert(format!("{}_{}", cluster_id, key), value);
477 }
478
479 merged.stats.execution_time_ms = merged
481 .stats
482 .execution_time_ms
483 .max(result.stats.execution_time_ms);
484 merged.stats.shards_queried += result.stats.shards_queried;
485 merged.stats.nodes_scanned += result.stats.nodes_scanned;
486 merged.stats.edges_scanned += result.stats.edges_scanned;
487 }
488
489 Ok(merged)
490 }
491
492 pub fn config(&self) -> &FederationConfig {
494 &self.config
495 }
496}
497
498#[derive(Debug, Clone, Serialize, Deserialize)]
500pub struct FederationConfig {
501 pub default_strategy: FederationStrategy,
503 pub max_clusters: usize,
505 pub query_timeout_seconds: u64,
507 pub enable_caching: bool,
509}
510
511impl Default for FederationConfig {
512 fn default() -> Self {
513 Self {
514 default_strategy: FederationStrategy::Parallel,
515 max_clusters: 10,
516 query_timeout_seconds: 30,
517 enable_caching: true,
518 }
519 }
520}
521
522#[derive(Debug, Clone, Serialize, Deserialize)]
524pub struct FederatedQueryResult {
525 pub query_id: String,
527 pub merged_result: QueryResult,
529 pub clusters_queried: usize,
531 pub execution_time_ms: u64,
533}
534
535#[cfg(test)]
536mod tests {
537 use super::*;
538
539 #[test]
540 fn test_cluster_registry() {
541 let config = DiscoveryConfig::default();
542 let registry = ClusterRegistry::new(config);
543
544 let cluster = RemoteCluster::new(
545 "cluster-1".to_string(),
546 "Test Cluster".to_string(),
547 "http://localhost:8080".to_string(),
548 );
549
550 registry.register_cluster(cluster.clone()).unwrap();
551
552 assert_eq!(registry.list_clusters().len(), 1);
553 assert!(registry.get_cluster(&"cluster-1".to_string()).is_some());
554 }
555
556 #[tokio::test]
557 async fn test_federation() {
558 let config = FederationConfig::default();
559 let federation = Federation::new(config);
560
561 let cluster = RemoteCluster::new(
562 "cluster-1".to_string(),
563 "Test Cluster".to_string(),
564 "http://localhost:8080".to_string(),
565 );
566
567 federation.registry().register_cluster(cluster).unwrap();
568
569 }
571
572 #[test]
573 fn test_remote_cluster() {
574 let cluster = RemoteCluster::new(
575 "test".to_string(),
576 "Test".to_string(),
577 "http://localhost".to_string(),
578 );
579
580 assert!(!cluster.is_healthy());
581 }
582}