ruvector_graph/distributed/
federation.rs

1//! Cross-cluster federation for distributed graph queries
2//!
3//! Enables querying across independent RuVector graph clusters:
4//! - Cluster discovery and registration
5//! - Remote query execution
6//! - Result merging from multiple clusters
7//! - Cross-cluster authentication and authorization
8
9use crate::distributed::coordinator::{QueryPlan, QueryResult};
10use crate::distributed::shard::ShardId;
11use crate::{GraphError, Result};
12use chrono::{DateTime, Utc};
13use dashmap::DashMap;
14use serde::{Deserialize, Serialize};
15use std::collections::HashMap;
16use std::sync::Arc;
17use tokio::sync::RwLock;
18use tracing::{debug, info, warn};
19use uuid::Uuid;
20
21/// Unique identifier for a cluster
22pub type ClusterId = String;
23
24/// Remote cluster information
25#[derive(Debug, Clone, Serialize, Deserialize)]
26pub struct RemoteCluster {
27    /// Unique cluster ID
28    pub cluster_id: ClusterId,
29    /// Cluster name
30    pub name: String,
31    /// Cluster endpoint URL
32    pub endpoint: String,
33    /// Cluster status
34    pub status: ClusterStatus,
35    /// Authentication token
36    pub auth_token: Option<String>,
37    /// Last health check timestamp
38    pub last_health_check: DateTime<Utc>,
39    /// Cluster metadata
40    pub metadata: HashMap<String, String>,
41    /// Number of shards in this cluster
42    pub shard_count: u32,
43    /// Cluster region/datacenter
44    pub region: Option<String>,
45}
46
47impl RemoteCluster {
48    /// Create a new remote cluster
49    pub fn new(cluster_id: ClusterId, name: String, endpoint: String) -> Self {
50        Self {
51            cluster_id,
52            name,
53            endpoint,
54            status: ClusterStatus::Unknown,
55            auth_token: None,
56            last_health_check: Utc::now(),
57            metadata: HashMap::new(),
58            shard_count: 0,
59            region: None,
60        }
61    }
62
63    /// Check if cluster is healthy
64    pub fn is_healthy(&self) -> bool {
65        matches!(self.status, ClusterStatus::Healthy)
66    }
67}
68
69/// Cluster status
70#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
71pub enum ClusterStatus {
72    /// Cluster is healthy and available
73    Healthy,
74    /// Cluster is degraded but operational
75    Degraded,
76    /// Cluster is unreachable
77    Unreachable,
78    /// Cluster status unknown
79    Unknown,
80}
81
82/// Cluster registry for managing federated clusters
83pub struct ClusterRegistry {
84    /// Registered clusters
85    clusters: Arc<DashMap<ClusterId, RemoteCluster>>,
86    /// Cluster discovery configuration
87    discovery_config: DiscoveryConfig,
88}
89
90impl ClusterRegistry {
91    /// Create a new cluster registry
92    pub fn new(discovery_config: DiscoveryConfig) -> Self {
93        Self {
94            clusters: Arc::new(DashMap::new()),
95            discovery_config,
96        }
97    }
98
99    /// Register a remote cluster
100    pub fn register_cluster(&self, cluster: RemoteCluster) -> Result<()> {
101        info!(
102            "Registering cluster: {} ({})",
103            cluster.name, cluster.cluster_id
104        );
105        self.clusters.insert(cluster.cluster_id.clone(), cluster);
106        Ok(())
107    }
108
109    /// Unregister a cluster
110    pub fn unregister_cluster(&self, cluster_id: &ClusterId) -> Result<()> {
111        info!("Unregistering cluster: {}", cluster_id);
112        self.clusters.remove(cluster_id).ok_or_else(|| {
113            GraphError::FederationError(format!("Cluster not found: {}", cluster_id))
114        })?;
115        Ok(())
116    }
117
118    /// Get a cluster by ID
119    pub fn get_cluster(&self, cluster_id: &ClusterId) -> Option<RemoteCluster> {
120        self.clusters.get(cluster_id).map(|c| c.value().clone())
121    }
122
123    /// List all registered clusters
124    pub fn list_clusters(&self) -> Vec<RemoteCluster> {
125        self.clusters.iter().map(|e| e.value().clone()).collect()
126    }
127
128    /// List healthy clusters only
129    pub fn healthy_clusters(&self) -> Vec<RemoteCluster> {
130        self.clusters
131            .iter()
132            .filter(|e| e.value().is_healthy())
133            .map(|e| e.value().clone())
134            .collect()
135    }
136
137    /// Perform health check on a cluster
138    pub async fn health_check(&self, cluster_id: &ClusterId) -> Result<ClusterStatus> {
139        let cluster = self.get_cluster(cluster_id).ok_or_else(|| {
140            GraphError::FederationError(format!("Cluster not found: {}", cluster_id))
141        })?;
142
143        // In production, make actual HTTP/gRPC health check request
144        // For now, simulate health check
145        let status = ClusterStatus::Healthy;
146
147        // Update cluster status
148        if let Some(mut entry) = self.clusters.get_mut(cluster_id) {
149            entry.status = status;
150            entry.last_health_check = Utc::now();
151        }
152
153        debug!("Health check for cluster {}: {:?}", cluster_id, status);
154        Ok(status)
155    }
156
157    /// Perform health checks on all clusters
158    pub async fn health_check_all(&self) -> HashMap<ClusterId, ClusterStatus> {
159        let mut results = HashMap::new();
160
161        for cluster in self.list_clusters() {
162            match self.health_check(&cluster.cluster_id).await {
163                Ok(status) => {
164                    results.insert(cluster.cluster_id, status);
165                }
166                Err(e) => {
167                    warn!(
168                        "Health check failed for cluster {}: {}",
169                        cluster.cluster_id, e
170                    );
171                    results.insert(cluster.cluster_id, ClusterStatus::Unreachable);
172                }
173            }
174        }
175
176        results
177    }
178
179    /// Discover clusters automatically (if enabled)
180    pub async fn discover_clusters(&self) -> Result<Vec<RemoteCluster>> {
181        if !self.discovery_config.auto_discovery {
182            return Ok(Vec::new());
183        }
184
185        info!("Discovering clusters...");
186
187        // In production, implement actual cluster discovery:
188        // - mDNS/DNS-SD for local network
189        // - Consul/etcd for service discovery
190        // - Static configuration file
191        // - Cloud provider APIs (AWS, GCP, Azure)
192
193        // For now, return empty list
194        Ok(Vec::new())
195    }
196}
197
198/// Cluster discovery configuration
199#[derive(Debug, Clone, Serialize, Deserialize)]
200pub struct DiscoveryConfig {
201    /// Enable automatic cluster discovery
202    pub auto_discovery: bool,
203    /// Discovery method
204    pub discovery_method: DiscoveryMethod,
205    /// Discovery interval in seconds
206    pub discovery_interval_seconds: u64,
207    /// Health check interval in seconds
208    pub health_check_interval_seconds: u64,
209}
210
211impl Default for DiscoveryConfig {
212    fn default() -> Self {
213        Self {
214            auto_discovery: false,
215            discovery_method: DiscoveryMethod::Static,
216            discovery_interval_seconds: 60,
217            health_check_interval_seconds: 30,
218        }
219    }
220}
221
222/// Cluster discovery method
223#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
224pub enum DiscoveryMethod {
225    /// Static configuration
226    Static,
227    /// DNS-based discovery
228    Dns,
229    /// Consul service discovery
230    Consul,
231    /// etcd service discovery
232    Etcd,
233    /// Kubernetes service discovery
234    Kubernetes,
235}
236
237/// Federated query spanning multiple clusters
238#[derive(Debug, Clone, Serialize, Deserialize)]
239pub struct FederatedQuery {
240    /// Query ID
241    pub query_id: String,
242    /// Original query
243    pub query: String,
244    /// Target clusters
245    pub target_clusters: Vec<ClusterId>,
246    /// Query execution strategy
247    pub strategy: FederationStrategy,
248    /// Created timestamp
249    pub created_at: DateTime<Utc>,
250}
251
252/// Federation strategy
253#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
254pub enum FederationStrategy {
255    /// Execute on all clusters in parallel
256    Parallel,
257    /// Execute on clusters sequentially
258    Sequential,
259    /// Execute on primary cluster, fallback to others
260    PrimaryWithFallback,
261    /// Execute on nearest/fastest cluster only
262    Nearest,
263}
264
265/// Federation engine for cross-cluster queries
266pub struct Federation {
267    /// Cluster registry
268    registry: Arc<ClusterRegistry>,
269    /// Federation configuration
270    config: FederationConfig,
271    /// Active federated queries
272    active_queries: Arc<DashMap<String, FederatedQuery>>,
273}
274
275impl Federation {
276    /// Create a new federation engine
277    pub fn new(config: FederationConfig) -> Self {
278        let discovery_config = DiscoveryConfig::default();
279        Self {
280            registry: Arc::new(ClusterRegistry::new(discovery_config)),
281            config,
282            active_queries: Arc::new(DashMap::new()),
283        }
284    }
285
286    /// Get the cluster registry
287    pub fn registry(&self) -> Arc<ClusterRegistry> {
288        Arc::clone(&self.registry)
289    }
290
291    /// Execute a federated query across multiple clusters
292    pub async fn execute_federated(
293        &self,
294        query: &str,
295        target_clusters: Option<Vec<ClusterId>>,
296    ) -> Result<FederatedQueryResult> {
297        let query_id = Uuid::new_v4().to_string();
298        let start = std::time::Instant::now();
299
300        // Determine target clusters
301        let clusters = if let Some(targets) = target_clusters {
302            targets
303                .into_iter()
304                .filter_map(|id| self.registry.get_cluster(&id))
305                .collect()
306        } else {
307            self.registry.healthy_clusters()
308        };
309
310        if clusters.is_empty() {
311            return Err(GraphError::FederationError(
312                "No healthy clusters available".to_string(),
313            ));
314        }
315
316        info!(
317            "Executing federated query {} across {} clusters",
318            query_id,
319            clusters.len()
320        );
321
322        let federated_query = FederatedQuery {
323            query_id: query_id.clone(),
324            query: query.to_string(),
325            target_clusters: clusters.iter().map(|c| c.cluster_id.clone()).collect(),
326            strategy: self.config.default_strategy,
327            created_at: Utc::now(),
328        };
329
330        self.active_queries
331            .insert(query_id.clone(), federated_query.clone());
332
333        // Execute query on each cluster based on strategy
334        let mut cluster_results = HashMap::new();
335
336        match self.config.default_strategy {
337            FederationStrategy::Parallel => {
338                // Execute on all clusters in parallel
339                let mut handles = Vec::new();
340
341                for cluster in &clusters {
342                    let cluster_id = cluster.cluster_id.clone();
343                    let query_str = query.to_string();
344                    let cluster_clone = cluster.clone();
345
346                    let handle = tokio::spawn(async move {
347                        Self::execute_on_cluster(&cluster_clone, &query_str).await
348                    });
349
350                    handles.push((cluster_id, handle));
351                }
352
353                // Collect results
354                for (cluster_id, handle) in handles {
355                    match handle.await {
356                        Ok(Ok(result)) => {
357                            cluster_results.insert(cluster_id, result);
358                        }
359                        Ok(Err(e)) => {
360                            warn!("Query failed on cluster {}: {}", cluster_id, e);
361                        }
362                        Err(e) => {
363                            warn!("Task failed for cluster {}: {}", cluster_id, e);
364                        }
365                    }
366                }
367            }
368            FederationStrategy::Sequential => {
369                // Execute on clusters sequentially
370                for cluster in &clusters {
371                    match Self::execute_on_cluster(cluster, query).await {
372                        Ok(result) => {
373                            cluster_results.insert(cluster.cluster_id.clone(), result);
374                        }
375                        Err(e) => {
376                            warn!("Query failed on cluster {}: {}", cluster.cluster_id, e);
377                        }
378                    }
379                }
380            }
381            FederationStrategy::Nearest | FederationStrategy::PrimaryWithFallback => {
382                // Execute on first healthy cluster
383                if let Some(cluster) = clusters.first() {
384                    match Self::execute_on_cluster(cluster, query).await {
385                        Ok(result) => {
386                            cluster_results.insert(cluster.cluster_id.clone(), result);
387                        }
388                        Err(e) => {
389                            warn!("Query failed on cluster {}: {}", cluster.cluster_id, e);
390                        }
391                    }
392                }
393            }
394        }
395
396        // Merge results from all clusters
397        let merged_result = self.merge_results(cluster_results)?;
398
399        let execution_time_ms = start.elapsed().as_millis() as u64;
400
401        // Remove from active queries
402        self.active_queries.remove(&query_id);
403
404        Ok(FederatedQueryResult {
405            query_id,
406            merged_result,
407            clusters_queried: clusters.len(),
408            execution_time_ms,
409        })
410    }
411
412    /// Execute query on a single remote cluster
413    async fn execute_on_cluster(cluster: &RemoteCluster, query: &str) -> Result<QueryResult> {
414        debug!("Executing query on cluster: {}", cluster.cluster_id);
415
416        // In production, make actual HTTP/gRPC call to remote cluster
417        // For now, return empty result
418        Ok(QueryResult {
419            query_id: Uuid::new_v4().to_string(),
420            nodes: Vec::new(),
421            edges: Vec::new(),
422            aggregates: HashMap::new(),
423            stats: crate::distributed::coordinator::QueryStats {
424                execution_time_ms: 0,
425                shards_queried: 0,
426                nodes_scanned: 0,
427                edges_scanned: 0,
428                cached: false,
429            },
430        })
431    }
432
433    /// Merge results from multiple clusters
434    fn merge_results(&self, results: HashMap<ClusterId, QueryResult>) -> Result<QueryResult> {
435        if results.is_empty() {
436            return Err(GraphError::FederationError(
437                "No results to merge".to_string(),
438            ));
439        }
440
441        let mut merged = QueryResult {
442            query_id: Uuid::new_v4().to_string(),
443            nodes: Vec::new(),
444            edges: Vec::new(),
445            aggregates: HashMap::new(),
446            stats: crate::distributed::coordinator::QueryStats {
447                execution_time_ms: 0,
448                shards_queried: 0,
449                nodes_scanned: 0,
450                edges_scanned: 0,
451                cached: false,
452            },
453        };
454
455        for (cluster_id, result) in results {
456            debug!("Merging results from cluster: {}", cluster_id);
457
458            // Merge nodes (deduplicating by ID)
459            for node in result.nodes {
460                if !merged.nodes.iter().any(|n| n.id == node.id) {
461                    merged.nodes.push(node);
462                }
463            }
464
465            // Merge edges (deduplicating by ID)
466            for edge in result.edges {
467                if !merged.edges.iter().any(|e| e.id == edge.id) {
468                    merged.edges.push(edge);
469                }
470            }
471
472            // Merge aggregates
473            for (key, value) in result.aggregates {
474                merged
475                    .aggregates
476                    .insert(format!("{}_{}", cluster_id, key), value);
477            }
478
479            // Aggregate stats
480            merged.stats.execution_time_ms = merged
481                .stats
482                .execution_time_ms
483                .max(result.stats.execution_time_ms);
484            merged.stats.shards_queried += result.stats.shards_queried;
485            merged.stats.nodes_scanned += result.stats.nodes_scanned;
486            merged.stats.edges_scanned += result.stats.edges_scanned;
487        }
488
489        Ok(merged)
490    }
491
492    /// Get configuration
493    pub fn config(&self) -> &FederationConfig {
494        &self.config
495    }
496}
497
498/// Federation configuration
499#[derive(Debug, Clone, Serialize, Deserialize)]
500pub struct FederationConfig {
501    /// Default federation strategy
502    pub default_strategy: FederationStrategy,
503    /// Maximum number of clusters to query
504    pub max_clusters: usize,
505    /// Query timeout in seconds
506    pub query_timeout_seconds: u64,
507    /// Enable result caching
508    pub enable_caching: bool,
509}
510
511impl Default for FederationConfig {
512    fn default() -> Self {
513        Self {
514            default_strategy: FederationStrategy::Parallel,
515            max_clusters: 10,
516            query_timeout_seconds: 30,
517            enable_caching: true,
518        }
519    }
520}
521
522/// Federated query result
523#[derive(Debug, Clone, Serialize, Deserialize)]
524pub struct FederatedQueryResult {
525    /// Query ID
526    pub query_id: String,
527    /// Merged result from all clusters
528    pub merged_result: QueryResult,
529    /// Number of clusters queried
530    pub clusters_queried: usize,
531    /// Total execution time
532    pub execution_time_ms: u64,
533}
534
535#[cfg(test)]
536mod tests {
537    use super::*;
538
539    #[test]
540    fn test_cluster_registry() {
541        let config = DiscoveryConfig::default();
542        let registry = ClusterRegistry::new(config);
543
544        let cluster = RemoteCluster::new(
545            "cluster-1".to_string(),
546            "Test Cluster".to_string(),
547            "http://localhost:8080".to_string(),
548        );
549
550        registry.register_cluster(cluster.clone()).unwrap();
551
552        assert_eq!(registry.list_clusters().len(), 1);
553        assert!(registry.get_cluster(&"cluster-1".to_string()).is_some());
554    }
555
556    #[tokio::test]
557    async fn test_federation() {
558        let config = FederationConfig::default();
559        let federation = Federation::new(config);
560
561        let cluster = RemoteCluster::new(
562            "cluster-1".to_string(),
563            "Test Cluster".to_string(),
564            "http://localhost:8080".to_string(),
565        );
566
567        federation.registry().register_cluster(cluster).unwrap();
568
569        // Test would execute federated query in production
570    }
571
572    #[test]
573    fn test_remote_cluster() {
574        let cluster = RemoteCluster::new(
575            "test".to_string(),
576            "Test".to_string(),
577            "http://localhost".to_string(),
578        );
579
580        assert!(!cluster.is_healthy());
581    }
582}