ruvector_graph/distributed/
rpc.rs

1//! gRPC-based inter-node communication for distributed graph queries
2//!
3//! Provides high-performance RPC communication layer:
4//! - Query execution RPC
5//! - Data replication RPC
6//! - Cluster coordination RPC
7//! - Streaming results for large queries
8
9use crate::distributed::coordinator::{QueryPlan, QueryResult};
10use crate::distributed::shard::{EdgeData, NodeData, NodeId, ShardId};
11use crate::{GraphError, Result};
12use serde::{Deserialize, Serialize};
13use std::sync::Arc;
14use tokio::sync::RwLock;
15#[cfg(feature = "federation")]
16use tonic::{Request, Response, Status};
17
18#[cfg(not(feature = "federation"))]
19pub struct Status;
20use tracing::{debug, info, warn};
21
22/// RPC request for executing a query
23#[derive(Debug, Clone, Serialize, Deserialize)]
24pub struct ExecuteQueryRequest {
25    /// Query to execute (Cypher syntax)
26    pub query: String,
27    /// Optional parameters
28    pub parameters: std::collections::HashMap<String, serde_json::Value>,
29    /// Transaction ID (if part of a transaction)
30    pub transaction_id: Option<String>,
31}
32
33/// RPC response for query execution
34#[derive(Debug, Clone, Serialize, Deserialize)]
35pub struct ExecuteQueryResponse {
36    /// Query result
37    pub result: QueryResult,
38    /// Success indicator
39    pub success: bool,
40    /// Error message if failed
41    pub error: Option<String>,
42}
43
44/// RPC request for replicating data
45#[derive(Debug, Clone, Serialize, Deserialize)]
46pub struct ReplicateDataRequest {
47    /// Shard ID to replicate to
48    pub shard_id: ShardId,
49    /// Operation type
50    pub operation: ReplicationOperation,
51}
52
53/// Replication operation types
54#[derive(Debug, Clone, Serialize, Deserialize)]
55pub enum ReplicationOperation {
56    AddNode(NodeData),
57    AddEdge(EdgeData),
58    DeleteNode(NodeId),
59    DeleteEdge(String),
60    UpdateNode(NodeData),
61    UpdateEdge(EdgeData),
62}
63
64/// RPC response for replication
65#[derive(Debug, Clone, Serialize, Deserialize)]
66pub struct ReplicateDataResponse {
67    /// Success indicator
68    pub success: bool,
69    /// Error message if failed
70    pub error: Option<String>,
71}
72
73/// RPC request for health check
74#[derive(Debug, Clone, Serialize, Deserialize)]
75pub struct HealthCheckRequest {
76    /// Node ID performing the check
77    pub node_id: String,
78}
79
80/// RPC response for health check
81#[derive(Debug, Clone, Serialize, Deserialize)]
82pub struct HealthCheckResponse {
83    /// Node is healthy
84    pub healthy: bool,
85    /// Current load (0.0 - 1.0)
86    pub load: f64,
87    /// Number of active queries
88    pub active_queries: usize,
89    /// Uptime in seconds
90    pub uptime_seconds: u64,
91}
92
93/// RPC request for shard info
94#[derive(Debug, Clone, Serialize, Deserialize)]
95pub struct GetShardInfoRequest {
96    /// Shard ID
97    pub shard_id: ShardId,
98}
99
100/// RPC response for shard info
101#[derive(Debug, Clone, Serialize, Deserialize)]
102pub struct GetShardInfoResponse {
103    /// Shard ID
104    pub shard_id: ShardId,
105    /// Number of nodes
106    pub node_count: usize,
107    /// Number of edges
108    pub edge_count: usize,
109    /// Shard size in bytes
110    pub size_bytes: u64,
111}
112
113/// Graph RPC service trait (would be implemented via tonic in production)
114#[cfg(feature = "federation")]
115#[tonic::async_trait]
116pub trait GraphRpcService: Send + Sync {
117    /// Execute a query on this node
118    async fn execute_query(
119        &self,
120        request: ExecuteQueryRequest,
121    ) -> std::result::Result<ExecuteQueryResponse, Status>;
122
123    /// Replicate data to this node
124    async fn replicate_data(
125        &self,
126        request: ReplicateDataRequest,
127    ) -> std::result::Result<ReplicateDataResponse, Status>;
128
129    /// Health check
130    async fn health_check(
131        &self,
132        request: HealthCheckRequest,
133    ) -> std::result::Result<HealthCheckResponse, Status>;
134
135    /// Get shard information
136    async fn get_shard_info(
137        &self,
138        request: GetShardInfoRequest,
139    ) -> std::result::Result<GetShardInfoResponse, Status>;
140}
141
142/// RPC client for communicating with remote nodes
143pub struct RpcClient {
144    /// Target node address
145    target_address: String,
146    /// Connection timeout in seconds
147    timeout_seconds: u64,
148}
149
150impl RpcClient {
151    /// Create a new RPC client
152    pub fn new(target_address: String) -> Self {
153        Self {
154            target_address,
155            timeout_seconds: 30,
156        }
157    }
158
159    /// Set connection timeout
160    pub fn with_timeout(mut self, timeout_seconds: u64) -> Self {
161        self.timeout_seconds = timeout_seconds;
162        self
163    }
164
165    /// Execute a query on the remote node
166    pub async fn execute_query(
167        &self,
168        request: ExecuteQueryRequest,
169    ) -> Result<ExecuteQueryResponse> {
170        debug!(
171            "Executing remote query on {}: {}",
172            self.target_address, request.query
173        );
174
175        // In production, make actual gRPC call using tonic
176        // For now, simulate response
177        Ok(ExecuteQueryResponse {
178            result: QueryResult {
179                query_id: uuid::Uuid::new_v4().to_string(),
180                nodes: Vec::new(),
181                edges: Vec::new(),
182                aggregates: std::collections::HashMap::new(),
183                stats: crate::distributed::coordinator::QueryStats {
184                    execution_time_ms: 0,
185                    shards_queried: 0,
186                    nodes_scanned: 0,
187                    edges_scanned: 0,
188                    cached: false,
189                },
190            },
191            success: true,
192            error: None,
193        })
194    }
195
196    /// Replicate data to the remote node
197    pub async fn replicate_data(
198        &self,
199        request: ReplicateDataRequest,
200    ) -> Result<ReplicateDataResponse> {
201        debug!(
202            "Replicating data to {} for shard {}",
203            self.target_address, request.shard_id
204        );
205
206        // In production, make actual gRPC call
207        Ok(ReplicateDataResponse {
208            success: true,
209            error: None,
210        })
211    }
212
213    /// Perform health check on remote node
214    pub async fn health_check(&self, node_id: String) -> Result<HealthCheckResponse> {
215        debug!("Health check on {}", self.target_address);
216
217        // In production, make actual gRPC call
218        Ok(HealthCheckResponse {
219            healthy: true,
220            load: 0.5,
221            active_queries: 0,
222            uptime_seconds: 3600,
223        })
224    }
225
226    /// Get shard information from remote node
227    pub async fn get_shard_info(&self, shard_id: ShardId) -> Result<GetShardInfoResponse> {
228        debug!(
229            "Getting shard info for {} from {}",
230            shard_id, self.target_address
231        );
232
233        // In production, make actual gRPC call
234        Ok(GetShardInfoResponse {
235            shard_id,
236            node_count: 0,
237            edge_count: 0,
238            size_bytes: 0,
239        })
240    }
241}
242
243/// RPC server for handling incoming requests
244#[cfg(feature = "federation")]
245pub struct RpcServer {
246    /// Server address to bind to
247    bind_address: String,
248    /// Service implementation
249    service: Arc<dyn GraphRpcService>,
250}
251
252#[cfg(not(feature = "federation"))]
253pub struct RpcServer {
254    /// Server address to bind to
255    bind_address: String,
256}
257
258#[cfg(feature = "federation")]
259impl RpcServer {
260    /// Create a new RPC server
261    pub fn new(bind_address: String, service: Arc<dyn GraphRpcService>) -> Self {
262        Self {
263            bind_address,
264            service,
265        }
266    }
267
268    /// Start the RPC server
269    pub async fn start(&self) -> Result<()> {
270        info!("Starting RPC server on {}", self.bind_address);
271
272        // In production, start actual gRPC server using tonic
273        // For now, just log
274        debug!("RPC server would start on {}", self.bind_address);
275
276        Ok(())
277    }
278
279    /// Stop the RPC server
280    pub async fn stop(&self) -> Result<()> {
281        info!("Stopping RPC server");
282        Ok(())
283    }
284}
285
286#[cfg(not(feature = "federation"))]
287impl RpcServer {
288    /// Create a new RPC server
289    pub fn new(bind_address: String) -> Self {
290        Self { bind_address }
291    }
292
293    /// Start the RPC server
294    pub async fn start(&self) -> Result<()> {
295        info!("Starting RPC server on {}", self.bind_address);
296
297        // In production, start actual gRPC server using tonic
298        // For now, just log
299        debug!("RPC server would start on {}", self.bind_address);
300
301        Ok(())
302    }
303
304    /// Stop the RPC server
305    pub async fn stop(&self) -> Result<()> {
306        info!("Stopping RPC server");
307        Ok(())
308    }
309}
310
311/// Default implementation of GraphRpcService
312#[cfg(feature = "federation")]
313pub struct DefaultGraphRpcService {
314    /// Node ID
315    node_id: String,
316    /// Start time for uptime calculation
317    start_time: std::time::Instant,
318    /// Active queries counter
319    active_queries: Arc<RwLock<usize>>,
320}
321
322#[cfg(feature = "federation")]
323impl DefaultGraphRpcService {
324    /// Create a new default service
325    pub fn new(node_id: String) -> Self {
326        Self {
327            node_id,
328            start_time: std::time::Instant::now(),
329            active_queries: Arc::new(RwLock::new(0)),
330        }
331    }
332}
333
334#[cfg(feature = "federation")]
335#[tonic::async_trait]
336impl GraphRpcService for DefaultGraphRpcService {
337    async fn execute_query(
338        &self,
339        request: ExecuteQueryRequest,
340    ) -> std::result::Result<ExecuteQueryResponse, Status> {
341        // Increment active queries
342        {
343            let mut count = self.active_queries.write().await;
344            *count += 1;
345        }
346
347        debug!("Executing query: {}", request.query);
348
349        // In production, execute actual query
350        let result = QueryResult {
351            query_id: uuid::Uuid::new_v4().to_string(),
352            nodes: Vec::new(),
353            edges: Vec::new(),
354            aggregates: std::collections::HashMap::new(),
355            stats: crate::distributed::coordinator::QueryStats {
356                execution_time_ms: 0,
357                shards_queried: 0,
358                nodes_scanned: 0,
359                edges_scanned: 0,
360                cached: false,
361            },
362        };
363
364        // Decrement active queries
365        {
366            let mut count = self.active_queries.write().await;
367            *count -= 1;
368        }
369
370        Ok(ExecuteQueryResponse {
371            result,
372            success: true,
373            error: None,
374        })
375    }
376
377    async fn replicate_data(
378        &self,
379        request: ReplicateDataRequest,
380    ) -> std::result::Result<ReplicateDataResponse, Status> {
381        debug!("Replicating data for shard {}", request.shard_id);
382
383        // In production, perform actual replication
384        Ok(ReplicateDataResponse {
385            success: true,
386            error: None,
387        })
388    }
389
390    async fn health_check(
391        &self,
392        _request: HealthCheckRequest,
393    ) -> std::result::Result<HealthCheckResponse, Status> {
394        let uptime = self.start_time.elapsed().as_secs();
395        let active = *self.active_queries.read().await;
396
397        Ok(HealthCheckResponse {
398            healthy: true,
399            load: 0.5, // Would calculate actual load
400            active_queries: active,
401            uptime_seconds: uptime,
402        })
403    }
404
405    async fn get_shard_info(
406        &self,
407        request: GetShardInfoRequest,
408    ) -> std::result::Result<GetShardInfoResponse, Status> {
409        // In production, get actual shard info
410        Ok(GetShardInfoResponse {
411            shard_id: request.shard_id,
412            node_count: 0,
413            edge_count: 0,
414            size_bytes: 0,
415        })
416    }
417}
418
419/// RPC connection pool for managing connections to multiple nodes
420pub struct RpcConnectionPool {
421    /// Map of node_id to RPC client
422    clients: Arc<dashmap::DashMap<String, Arc<RpcClient>>>,
423}
424
425impl RpcConnectionPool {
426    /// Create a new connection pool
427    pub fn new() -> Self {
428        Self {
429            clients: Arc::new(dashmap::DashMap::new()),
430        }
431    }
432
433    /// Get or create a client for a node
434    pub fn get_client(&self, node_id: &str, address: &str) -> Arc<RpcClient> {
435        self.clients
436            .entry(node_id.to_string())
437            .or_insert_with(|| Arc::new(RpcClient::new(address.to_string())))
438            .clone()
439    }
440
441    /// Remove a client from the pool
442    pub fn remove_client(&self, node_id: &str) {
443        self.clients.remove(node_id);
444    }
445
446    /// Get number of active connections
447    pub fn connection_count(&self) -> usize {
448        self.clients.len()
449    }
450}
451
452impl Default for RpcConnectionPool {
453    fn default() -> Self {
454        Self::new()
455    }
456}
457
458#[cfg(test)]
459mod tests {
460    use super::*;
461
462    #[tokio::test]
463    async fn test_rpc_client() {
464        let client = RpcClient::new("localhost:9000".to_string());
465
466        let request = ExecuteQueryRequest {
467            query: "MATCH (n) RETURN n".to_string(),
468            parameters: std::collections::HashMap::new(),
469            transaction_id: None,
470        };
471
472        let response = client.execute_query(request).await.unwrap();
473        assert!(response.success);
474    }
475
476    #[tokio::test]
477    async fn test_default_service() {
478        let service = DefaultGraphRpcService::new("test-node".to_string());
479
480        let request = ExecuteQueryRequest {
481            query: "MATCH (n) RETURN n".to_string(),
482            parameters: std::collections::HashMap::new(),
483            transaction_id: None,
484        };
485
486        let response = service.execute_query(request).await.unwrap();
487        assert!(response.success);
488    }
489
490    #[tokio::test]
491    async fn test_connection_pool() {
492        let pool = RpcConnectionPool::new();
493
494        let client1 = pool.get_client("node-1", "localhost:9000");
495        let client2 = pool.get_client("node-2", "localhost:9001");
496
497        assert_eq!(pool.connection_count(), 2);
498
499        pool.remove_client("node-1");
500        assert_eq!(pool.connection_count(), 1);
501    }
502
503    #[tokio::test]
504    async fn test_health_check() {
505        let service = DefaultGraphRpcService::new("test-node".to_string());
506
507        let request = HealthCheckRequest {
508            node_id: "test".to_string(),
509        };
510
511        let response = service.health_check(request).await.unwrap();
512        assert!(response.healthy);
513        assert_eq!(response.active_queries, 0);
514    }
515}