1use std::sync::Arc;
22
23use tokio::sync::RwLock;
24use tonic::service::interceptor::InterceptedService;
25use tonic::transport::Channel;
26use tracing::{debug, instrument, warn};
27
28use crate::auth::{ChannelAuthenticator, ChannelIdInterceptor, SaslStreamGuard};
29use crate::client::master_inquire::{create_master_inquire_client, MasterInquireClient};
30use crate::config::GoosefsConfig;
31use crate::error::{Error, Result};
32use crate::fs::options::DeleteOptions;
33use crate::proto::grpc::file::{
34 file_system_master_client_service_client::FileSystemMasterClientServiceClient,
35 CompleteFilePOptions, CompleteFilePRequest, CreateDirectoryPOptions, CreateDirectoryPRequest,
36 CreateFilePOptions, CreateFilePRequest, DeletePOptions, DeletePRequest, FileInfo,
37 FileSystemMasterCommonPOptions, FsOpPId, GetStatusPOptions, GetStatusPRequest,
38 ListStatusPOptions, ListStatusPRequest, RemoveBlocksPRequest, RenamePOptions, RenamePRequest,
39 ScheduleAsyncPersistencePOptions, ScheduleAsyncPersistencePRequest,
40};
41use crate::proto::grpc::{Bits, PMode};
42
43const MAX_RPC_RETRIES: u32 = 2;
45
46type AuthenticatedFsClient =
51 FileSystemMasterClientServiceClient<InterceptedService<Channel, ChannelIdInterceptor>>;
52
53pub fn default_dir_mode() -> PMode {
55 PMode {
56 owner_bits: Bits::All as i32, group_bits: Bits::ReadExecute as i32, other_bits: Bits::ReadExecute as i32, }
60}
61
62pub fn default_file_mode() -> PMode {
64 PMode {
65 owner_bits: Bits::ReadWrite as i32, group_bits: Bits::Read as i32, other_bits: Bits::Read as i32, }
69}
70
71#[derive(Clone)]
83pub struct MasterClient {
84 inner: Arc<RwLock<AuthenticatedFsClient>>,
85 config: GoosefsConfig,
86 inquire_client: Arc<dyn MasterInquireClient>,
87 _sasl_guard: Arc<RwLock<Option<SaslStreamGuard>>>,
90}
91
92impl MasterClient {
93 pub async fn connect(config: &GoosefsConfig) -> Result<Self> {
102 let inquire_client = create_master_inquire_client(config);
103 Self::connect_with_inquire(config, inquire_client).await
104 }
105
106 pub async fn connect_with_inquire(
111 config: &GoosefsConfig,
112 inquire_client: Arc<dyn MasterInquireClient>,
113 ) -> Result<Self> {
114 let primary_addr = inquire_client.get_primary_rpc_address().await?;
115 let (client, sasl_guard) = Self::build_authenticated_client(config, &primary_addr).await?;
116 debug!(addr = %primary_addr, auth_type = %config.auth_type, "connected to Goosefs Master");
117
118 Ok(Self {
119 inner: Arc::new(RwLock::new(client)),
120 config: config.clone(),
121 inquire_client,
122 _sasl_guard: Arc::new(RwLock::new(sasl_guard)),
123 })
124 }
125
126 pub fn from_channel(channel: Channel, config: GoosefsConfig) -> Self {
131 let inquire_client = create_master_inquire_client(&config);
132 let interceptor = ChannelIdInterceptor::new("test-no-auth".to_string());
133 let intercepted = InterceptedService::new(channel, interceptor);
134 Self {
135 inner: Arc::new(RwLock::new(FileSystemMasterClientServiceClient::new(
136 intercepted,
137 ))),
138 config,
139 inquire_client,
140 _sasl_guard: Arc::new(RwLock::new(None)),
141 }
142 }
143
144 async fn build_authenticated_client(
147 config: &GoosefsConfig,
148 addr: &str,
149 ) -> Result<(AuthenticatedFsClient, Option<SaslStreamGuard>)> {
150 let channel = Self::build_raw_channel(config, addr).await?;
151
152 let authenticator = ChannelAuthenticator::new(
154 config.auth_type,
155 config.auth_username.clone(),
156 None, )
158 .with_auth_timeout(config.auth_timeout);
159
160 let mut auth_channel = authenticator.authenticate(channel).await?;
161 let sasl_guard = auth_channel.take_sasl_guard();
162
163 Ok((
164 FileSystemMasterClientServiceClient::new(auth_channel.channel),
165 sasl_guard,
166 ))
167 }
168
169 async fn build_raw_channel(config: &GoosefsConfig, addr: &str) -> Result<Channel> {
171 let endpoint_uri = format!("http://{}", addr);
172 let endpoint = Channel::from_shared(endpoint_uri)
173 .map_err(|e| Error::ConfigError {
174 message: format!("invalid master endpoint: {}", e),
175 })?
176 .connect_timeout(config.connect_timeout)
177 .timeout(config.request_timeout);
178
179 let channel = endpoint.connect().await?;
180 Ok(channel)
181 }
182
183 async fn reconnect(&self) -> Result<()> {
188 self.inquire_client.reset_cached_primary().await;
190
191 let primary_addr = self.inquire_client.get_primary_rpc_address().await?;
192 let (client, sasl_guard) =
193 Self::build_authenticated_client(&self.config, &primary_addr).await?;
194 let mut inner = self.inner.write().await;
195 *inner = client;
196 let mut guard = self._sasl_guard.write().await;
198 *guard = sasl_guard;
199 debug!(addr = %primary_addr, "reconnected to Goosefs Master after failover");
200 Ok(())
201 }
202
203 async fn with_retry<F, Fut, T>(&self, op_name: &str, f: F) -> Result<T>
208 where
209 F: Fn(AuthenticatedFsClient) -> Fut,
210 Fut: std::future::Future<Output = Result<T>>,
211 {
212 let mut last_err: Option<Error> = None;
213
214 for attempt in 0..=MAX_RPC_RETRIES {
215 let client: AuthenticatedFsClient = {
216 let inner = self.inner.read().await;
217 inner.clone()
218 };
219
220 match f(client).await {
221 Ok(result) => return Ok(result),
222 Err(err) => {
223 if err.is_retriable() && attempt < MAX_RPC_RETRIES {
224 warn!(
225 op = op_name,
226 attempt = attempt + 1,
227 max = MAX_RPC_RETRIES,
228 error = %err,
229 "retriable error, reconnecting and retrying"
230 );
231 if let Err(reconnect_err) = self.reconnect().await {
232 warn!(error = %reconnect_err, "reconnect failed");
233 last_err = Some(err);
234 continue;
235 }
236 } else {
237 return Err(err);
238 }
239 last_err = Some(err);
240 }
241 }
242 }
243
244 Err(last_err.unwrap_or_else(|| Error::Internal {
245 message: format!("{}: exhausted all retries", op_name),
246 source: None,
247 }))
248 }
249
250 #[instrument(skip(self), fields(path = %path))]
252 pub async fn get_status(&self, path: &str) -> Result<FileInfo> {
253 let path = path.to_string();
254 self.with_retry("get_status", |mut client| {
255 let path = path.clone();
256 async move {
257 let req = GetStatusPRequest {
258 path: Some(path),
259 options: Some(GetStatusPOptions::default()),
260 request_id: None,
261 };
262 let resp = client.get_status(req).await?;
263 resp.into_inner()
264 .file_info
265 .ok_or_else(|| Error::missing_field("file_info"))
266 }
267 })
268 .await
269 }
270
271 #[instrument(skip(self), fields(path = %path))]
277 pub async fn list_status(&self, path: &str, recursive: bool) -> Result<Vec<FileInfo>> {
278 let path = path.to_string();
279 self.with_retry("list_status", |mut client| {
280 let path = path.clone();
281 async move {
282 let req = ListStatusPRequest {
283 path: Some(path),
284 options: Some(ListStatusPOptions {
285 recursive: Some(recursive),
286 ..Default::default()
287 }),
288 request_id: None,
289 };
290 let mut stream = client.list_status(req).await?.into_inner();
291 let mut result = Vec::new();
292 while let Some(resp) = stream.message().await? {
293 result.extend(resp.file_infos);
294 }
295 Ok(result)
296 }
297 })
298 .await
299 }
300
301 #[instrument(skip(self, options), fields(path = %path))]
303 pub async fn create_file(&self, path: &str, options: CreateFilePOptions) -> Result<FileInfo> {
304 let path = path.to_string();
305 self.with_retry("create_file", |mut client| {
306 let path = path.clone();
307 async move {
308 let req = CreateFilePRequest {
309 path: Some(path),
310 options: Some(options),
311 };
312 let resp = client.create_file(req).await?;
313 resp.into_inner()
314 .file_info
315 .ok_or_else(|| Error::missing_field("file_info"))
316 }
317 })
318 .await
319 }
320
321 #[instrument(skip(self), fields(path = %path))]
349 pub async fn complete_file(
350 &self,
351 path: &str,
352 ufs_length: Option<i64>,
353 operation_id: Option<FsOpPId>,
354 ) -> Result<()> {
355 let path = path.to_string();
356 self.with_retry("complete_file", |mut client| {
357 let path = path.clone();
358 async move {
359 let common_options = operation_id.map(|op_id| FileSystemMasterCommonPOptions {
360 operation_id: Some(op_id),
361 ..Default::default()
362 });
363 let req = CompleteFilePRequest {
364 path: Some(path),
365 options: Some(CompleteFilePOptions {
366 ufs_length,
367 common_options,
368 ..Default::default()
369 }),
370 inode_id: None,
371 };
372 client.complete_file(req).await?;
373 Ok(())
374 }
375 })
376 .await
377 }
378
379 #[instrument(skip(self, block_ids), fields(block_count = block_ids.len()))]
396 pub async fn remove_blocks(&self, block_ids: Vec<i64>) -> Result<()> {
397 if block_ids.is_empty() {
398 return Ok(());
399 }
400 let block_ids_clone = block_ids.clone();
401 self.with_retry("remove_blocks", |mut client| {
402 let block_ids = block_ids_clone.clone();
403 async move {
404 let req = RemoveBlocksPRequest { block_ids };
405 client.remove_blocks(req).await?;
406 Ok(())
407 }
408 })
409 .await
410 }
411
412 #[instrument(skip(self, opts), fields(path = %path))]
423 pub async fn delete_with_options(&self, path: &str, opts: DeleteOptions) -> Result<()> {
424 let path = path.to_string();
425 self.with_retry("delete_with_options", |mut client| {
426 let path = path.clone();
427 let opts = opts.clone();
428 async move {
429 let req = DeletePRequest {
430 path: Some(path),
431 options: Some(DeletePOptions {
432 recursive: Some(opts.recursive),
433 unchecked: Some(opts.unchecked),
434 goosefs_only: Some(opts.goosefs_only),
435 ..Default::default()
436 }),
437 };
438 client.remove(req).await?;
439 Ok(())
440 }
441 })
442 .await
443 }
444
445 #[instrument(skip(self), fields(path = %path, recursive = %recursive))]
450 pub async fn delete(&self, path: &str, recursive: bool) -> Result<()> {
451 self.delete_with_options(
452 path,
453 DeleteOptions {
454 recursive,
455 ..Default::default()
456 },
457 )
458 .await
459 }
460
461 #[instrument(skip(self), fields(src = %src, dst = %dst))]
463 pub async fn rename(&self, src: &str, dst: &str) -> Result<()> {
464 let src = src.to_string();
465 let dst = dst.to_string();
466 self.with_retry("rename", |mut client| {
467 let src = src.clone();
468 let dst = dst.clone();
469 async move {
470 let req = RenamePRequest {
471 path: Some(src),
472 dst_path: Some(dst),
473 options: Some(RenamePOptions::default()),
474 };
475 client.rename(req).await?;
476 Ok(())
477 }
478 })
479 .await
480 }
481
482 #[instrument(skip(self), fields(path = %path))]
487 pub async fn create_directory(&self, path: &str, recursive: bool) -> Result<()> {
488 let path = path.to_string();
489 self.with_retry("create_directory", |mut client| {
490 let path = path.clone();
491 async move {
492 let req = CreateDirectoryPRequest {
493 path: Some(path),
494 options: Some(CreateDirectoryPOptions {
495 recursive: Some(recursive),
496 allow_exists: Some(true),
497 mode: Some(default_dir_mode()),
498 ..Default::default()
499 }),
500 };
501 client.create_directory(req).await?;
502 Ok(())
503 }
504 })
505 .await
506 }
507
508 #[instrument(skip(self), fields(path = %path))]
511 pub async fn schedule_async_persistence(
512 &self,
513 path: &str,
514 persistence_wait_time: Option<i64>,
515 ) -> Result<()> {
516 let path = path.to_string();
517 self.with_retry("schedule_async_persistence", |mut client| {
518 let path = path.clone();
519 async move {
520 let req = ScheduleAsyncPersistencePRequest {
521 path: Some(path),
522 options: Some(ScheduleAsyncPersistencePOptions {
523 common_options: None,
524 persistence_wait_time,
525 }),
526 };
527 client.schedule_async_persistence(req).await?;
528 Ok(())
529 }
530 })
531 .await
532 }
533
534 pub fn config(&self) -> &GoosefsConfig {
536 &self.config
537 }
538
539 pub fn inquire_client(&self) -> &Arc<dyn MasterInquireClient> {
543 &self.inquire_client
544 }
545}