d_engine/client/
cluster.rs1use std::fmt::Debug;
2use std::sync::Arc;
3
4use arc_swap::ArcSwap;
5use tonic::codec::CompressionEncoding;
6use tracing::debug;
7use tracing::error;
8use tracing::instrument;
9
10use super::ClientInner;
11use crate::proto::cluster::cluster_management_service_client::ClusterManagementServiceClient;
12use crate::proto::cluster::JoinRequest;
13use crate::proto::cluster::JoinResponse;
14use crate::proto::cluster::NodeMeta;
15use crate::ClientApiError;
16use crate::Result;
17
18#[derive(Clone)]
23pub struct ClusterClient {
24 client_inner: Arc<ArcSwap<ClientInner>>,
25}
26
27impl Debug for ClusterClient {
28 fn fmt(
29 &self,
30 f: &mut std::fmt::Formatter<'_>,
31 ) -> std::fmt::Result {
32 f.debug_struct("ClusterClient").finish()
33 }
34}
35
36impl ClusterClient {
37 pub(crate) fn new(client_inner: Arc<ArcSwap<ClientInner>>) -> Self {
38 Self { client_inner }
39 }
40
41 pub async fn list_members(&self) -> Result<Vec<NodeMeta>> {
48 let client_inner = self.client_inner.load();
49
50 Ok(client_inner.pool.get_all_members())
51 }
52
53 #[instrument(skip(self))]
61 pub async fn join_cluster(
62 &self,
63 node: NodeMeta,
64 ) -> std::result::Result<JoinResponse, ClientApiError> {
65 let client_inner = self.client_inner.load();
66 let channel = client_inner.pool.get_leader();
67
68 let mut client = ClusterManagementServiceClient::new(channel);
69 if client_inner.pool.config.enable_compression {
70 client = client
71 .send_compressed(CompressionEncoding::Gzip)
72 .accept_compressed(CompressionEncoding::Gzip);
73 }
74
75 let request = tonic::Request::new(JoinRequest {
76 node_id: node.id,
77 node_role: node.role,
78 address: node.address,
79 });
80 let response = match client.join_cluster(request).await {
81 Ok(response) => {
82 debug!("[:ClusterClient:join_cluster] response: {:?}", response);
83 response.into_inner()
84 }
85 Err(status) => {
86 error!("[:ClusterClient:join_cluster] status: {:?}", status);
87 return Err(status.into());
88 }
89 };
90
91 Ok(response)
92 }
93}