Skip to main content

goosefs_sdk/client/
worker_manager.rs

1//! Goosefs Worker Manager client for worker discovery.
2//!
3//! Wraps `WorkerManagerMasterClientService` (Master:9200) to fetch
4//! the list of live workers and their addresses.
5//!
6//! ## HA / Multi-Master Support
7//!
8//! When multiple Master addresses are configured, uses
9//! [`MasterInquireClient`] to discover the Primary Master.
10
11use std::sync::Arc;
12
13use tonic::service::interceptor::InterceptedService;
14use tonic::transport::Channel;
15use tracing::{debug, instrument};
16
17use crate::auth::{ChannelAuthenticator, ChannelIdInterceptor, SaslStreamGuard};
18use crate::client::master_inquire::{create_master_inquire_client, MasterInquireClient};
19use crate::config::GoosefsConfig;
20use crate::error::{Error, Result};
21use crate::proto::grpc::block::{
22    worker_manager_master_client_service_client::WorkerManagerMasterClientServiceClient,
23    GetWorkerInfoListPOptions, WorkerInfo,
24};
25
26/// Type alias for the authenticated WorkerManager gRPC client.
27type AuthenticatedWorkerMgrClient =
28    WorkerManagerMasterClientServiceClient<InterceptedService<Channel, ChannelIdInterceptor>>;
29
30/// Client for `WorkerManagerMasterClientService` (Master:9200).
31///
32/// Used to discover the live worker list for block routing.
33#[derive(Clone)]
34pub struct WorkerManagerClient {
35    inner: AuthenticatedWorkerMgrClient,
36    /// Keeps the SASL authentication stream alive for the channel's lifetime.
37    _sasl_guard: Arc<Option<SaslStreamGuard>>,
38}
39
40impl WorkerManagerClient {
41    /// Connect to the Goosefs Master for worker management.
42    ///
43    /// In HA mode, discovers the Primary Master first via the inquire client.
44    pub async fn connect(config: &GoosefsConfig) -> Result<Self> {
45        let inquire_client = create_master_inquire_client(config);
46        Self::connect_with_inquire(config, inquire_client).await
47    }
48
49    /// Connect using an externally-provided [`MasterInquireClient`].
50    ///
51    /// This allows sharing the same inquire client with `MasterClient`,
52    /// avoiding redundant Primary discovery.
53    pub async fn connect_with_inquire(
54        config: &GoosefsConfig,
55        inquire_client: Arc<dyn MasterInquireClient>,
56    ) -> Result<Self> {
57        let primary_addr = inquire_client.get_primary_rpc_address().await?;
58        let endpoint_uri = format!("http://{}", primary_addr);
59
60        let endpoint = Channel::from_shared(endpoint_uri)
61            .map_err(|e| Error::ConfigError {
62                message: format!("invalid master endpoint: {}", e),
63            })?
64            .connect_timeout(config.connect_timeout)
65            .timeout(config.request_timeout);
66
67        let channel = endpoint.connect().await?;
68
69        // Perform SASL authentication based on the configured auth type
70        let authenticator =
71            ChannelAuthenticator::new(config.auth_type, config.auth_username.clone(), None)
72                .with_auth_timeout(config.auth_timeout);
73
74        let mut auth_channel = authenticator.authenticate(channel).await?;
75        let sasl_guard = auth_channel.take_sasl_guard();
76        debug!(addr = %primary_addr, auth_type = %config.auth_type, "connected to WorkerManagerMasterClientService");
77
78        Ok(Self {
79            inner: WorkerManagerMasterClientServiceClient::new(auth_channel.channel),
80            _sasl_guard: Arc::new(sasl_guard),
81        })
82    }
83
84    /// Create from an existing tonic channel.
85    ///
86    /// **Note**: This bypasses authentication.
87    pub fn from_channel(channel: Channel) -> Self {
88        let interceptor = ChannelIdInterceptor::new("test-no-auth".to_string());
89        let intercepted = InterceptedService::new(channel, interceptor);
90        Self {
91            inner: WorkerManagerMasterClientServiceClient::new(intercepted),
92            _sasl_guard: Arc::new(None),
93        }
94    }
95
96    /// Fetch the full list of workers from the Master.
97    #[instrument(skip(self))]
98    pub async fn get_worker_info_list(&self) -> Result<Vec<WorkerInfo>> {
99        let req = GetWorkerInfoListPOptions {};
100
101        let resp = self.inner.clone().get_worker_info_list(req).await?;
102
103        Ok(resp.into_inner().worker_infos)
104    }
105}