d_engine/client/
cluster.rs

1use std::fmt::Debug;
2use std::sync::Arc;
3
4use arc_swap::ArcSwap;
5use tonic::codec::CompressionEncoding;
6use tracing::debug;
7use tracing::error;
8use tracing::instrument;
9
10use super::ClientInner;
11use crate::proto::cluster::cluster_management_service_client::ClusterManagementServiceClient;
12use crate::proto::cluster::JoinRequest;
13use crate::proto::cluster::JoinResponse;
14use crate::proto::cluster::NodeMeta;
15use crate::ClientApiError;
16use crate::Result;
17
18/// Cluster administration interface
19///
20/// Currently supports member discovery. Node management operations
21/// will be added in future releases.
22#[derive(Clone)]
23pub struct ClusterClient {
24    client_inner: Arc<ArcSwap<ClientInner>>,
25}
26
27impl Debug for ClusterClient {
28    fn fmt(
29        &self,
30        f: &mut std::fmt::Formatter<'_>,
31    ) -> std::fmt::Result {
32        f.debug_struct("ClusterClient").finish()
33    }
34}
35
36impl ClusterClient {
37    pub(crate) fn new(client_inner: Arc<ArcSwap<ClientInner>>) -> Self {
38        Self { client_inner }
39    }
40
41    /// Lists all cluster members with metadata
42    ///
43    /// Returns node information including:
44    /// - IP address
45    /// - Port
46    /// - Role (Leader/Follower)
47    pub async fn list_members(&self) -> Result<Vec<NodeMeta>> {
48        let client_inner = self.client_inner.load();
49
50        Ok(client_inner.pool.get_all_members())
51    }
52
53    /// Join a new node to the cluster
54    ///
55    /// # Parameters
56    /// - `node`: NodeMeta
57    ///
58    /// # Returns
59    /// - `JoinResponse` with cluster configuration if successful
60    #[instrument(skip(self))]
61    pub async fn join_cluster(
62        &self,
63        node: NodeMeta,
64    ) -> std::result::Result<JoinResponse, ClientApiError> {
65        let client_inner = self.client_inner.load();
66        let channel = client_inner.pool.get_leader();
67
68        let mut client = ClusterManagementServiceClient::new(channel);
69        if client_inner.pool.config.enable_compression {
70            client = client
71                .send_compressed(CompressionEncoding::Gzip)
72                .accept_compressed(CompressionEncoding::Gzip);
73        }
74
75        let request = tonic::Request::new(JoinRequest {
76            node_id: node.id,
77            node_role: node.role,
78            address: node.address,
79        });
80        let response = match client.join_cluster(request).await {
81            Ok(response) => {
82                debug!("[:ClusterClient:join_cluster] response: {:?}", response);
83                response.into_inner()
84            }
85            Err(status) => {
86                error!("[:ClusterClient:join_cluster] status: {:?}", status);
87                return Err(status.into());
88            }
89        };
90
91        Ok(response)
92    }
93}