goosefs_sdk/client/
worker_manager.rs1use std::sync::Arc;
12
13use tonic::service::interceptor::InterceptedService;
14use tonic::transport::Channel;
15use tracing::{debug, instrument};
16
17use crate::auth::{ChannelAuthenticator, ChannelIdInterceptor, SaslStreamGuard};
18use crate::client::master_inquire::{create_master_inquire_client, MasterInquireClient};
19use crate::config::GoosefsConfig;
20use crate::error::{Error, Result};
21use crate::proto::grpc::block::{
22 worker_manager_master_client_service_client::WorkerManagerMasterClientServiceClient,
23 GetWorkerInfoListPOptions, WorkerInfo,
24};
25
26type AuthenticatedWorkerMgrClient =
28 WorkerManagerMasterClientServiceClient<InterceptedService<Channel, ChannelIdInterceptor>>;
29
30#[derive(Clone)]
34pub struct WorkerManagerClient {
35 inner: AuthenticatedWorkerMgrClient,
36 _sasl_guard: Arc<Option<SaslStreamGuard>>,
38}
39
40impl WorkerManagerClient {
41 pub async fn connect(config: &GoosefsConfig) -> Result<Self> {
45 let inquire_client = create_master_inquire_client(config);
46 Self::connect_with_inquire(config, inquire_client).await
47 }
48
49 pub async fn connect_with_inquire(
54 config: &GoosefsConfig,
55 inquire_client: Arc<dyn MasterInquireClient>,
56 ) -> Result<Self> {
57 let primary_addr = inquire_client.get_primary_rpc_address().await?;
58 let endpoint_uri = format!("http://{}", primary_addr);
59
60 let endpoint = Channel::from_shared(endpoint_uri)
61 .map_err(|e| Error::ConfigError {
62 message: format!("invalid master endpoint: {}", e),
63 })?
64 .connect_timeout(config.connect_timeout)
65 .timeout(config.request_timeout);
66
67 let channel = endpoint.connect().await?;
68
69 let authenticator =
71 ChannelAuthenticator::new(config.auth_type, config.auth_username.clone(), None)
72 .with_auth_timeout(config.auth_timeout);
73
74 let mut auth_channel = authenticator.authenticate(channel).await?;
75 let sasl_guard = auth_channel.take_sasl_guard();
76 debug!(addr = %primary_addr, auth_type = %config.auth_type, "connected to WorkerManagerMasterClientService");
77
78 Ok(Self {
79 inner: WorkerManagerMasterClientServiceClient::new(auth_channel.channel),
80 _sasl_guard: Arc::new(sasl_guard),
81 })
82 }
83
84 pub fn from_channel(channel: Channel) -> Self {
88 let interceptor = ChannelIdInterceptor::new("test-no-auth".to_string());
89 let intercepted = InterceptedService::new(channel, interceptor);
90 Self {
91 inner: WorkerManagerMasterClientServiceClient::new(intercepted),
92 _sasl_guard: Arc::new(None),
93 }
94 }
95
96 #[instrument(skip(self))]
98 pub async fn get_worker_info_list(&self) -> Result<Vec<WorkerInfo>> {
99 let req = GetWorkerInfoListPOptions {};
100
101 let resp = self.inner.clone().get_worker_info_list(req).await?;
102
103 Ok(resp.into_inner().worker_infos)
104 }
105}