Skip to main content

goosefs_sdk/client/
master.rs

1//! Goosefs Master gRPC client for file system metadata operations.
2//!
3//! Wraps `FileSystemMasterClientService` (Master:9200) providing:
4//! - `get_status` — stat / head
5//! - `list_status` — list directory (server-side streaming)
6//! - `create_file` — create a new file
7//! - `complete_file` — mark file write complete (with idempotency operation-ID)
8//! - `remove_blocks` — clean up block metadata for in-flight or failed writes
9//! - `delete` / `delete_with_options` — delete file or directory
10//! - `rename` — rename / move
11//! - `create_directory` — mkdir -p
12//!
13//! ## HA / Multi-Master Support
14//!
15//! When multiple Master addresses are configured, [`MasterClient::connect`]
16//! uses [`MasterInquireClient`] to discover the Primary Master before
17//! establishing the gRPC channel. If an RPC fails with a retriable error
18//! (`Unavailable`, `DeadlineExceeded`), the client will re-discover the
19//! Primary and rebuild the channel automatically.
20
21use std::sync::Arc;
22
23use tokio::sync::RwLock;
24use tonic::service::interceptor::InterceptedService;
25use tonic::transport::Channel;
26use tracing::{debug, instrument, warn};
27
28use crate::auth::{ChannelAuthenticator, ChannelIdInterceptor, SaslStreamGuard};
29use crate::client::master_inquire::{create_master_inquire_client, MasterInquireClient};
30use crate::config::GoosefsConfig;
31use crate::error::{Error, Result};
32use crate::fs::options::DeleteOptions;
33use crate::proto::grpc::file::{
34    file_system_master_client_service_client::FileSystemMasterClientServiceClient,
35    CompleteFilePOptions, CompleteFilePRequest, CreateDirectoryPOptions, CreateDirectoryPRequest,
36    CreateFilePOptions, CreateFilePRequest, DeletePOptions, DeletePRequest, FileInfo,
37    FileSystemMasterCommonPOptions, FsOpPId, GetStatusPOptions, GetStatusPRequest,
38    ListStatusPOptions, ListStatusPRequest, RemoveBlocksPRequest, RenamePOptions, RenamePRequest,
39    ScheduleAsyncPersistencePOptions, ScheduleAsyncPersistencePRequest,
40};
41use crate::proto::grpc::{Bits, PMode};
42
43/// Maximum number of RPC-level retries on retriable errors before giving up.
44const MAX_RPC_RETRIES: u32 = 2;
45
46/// Type alias for the authenticated gRPC client.
47///
48/// Both NOSASL and SIMPLE modes use `InterceptedService` wrapping;
49/// the difference is that NOSASL skips the SASL handshake but still injects a channel-id.
50type AuthenticatedFsClient =
51    FileSystemMasterClientServiceClient<InterceptedService<Channel, ChannelIdInterceptor>>;
52
53/// Default mode for directories: 0755 (rwxr-xr-x)
54pub fn default_dir_mode() -> PMode {
55    PMode {
56        owner_bits: Bits::All as i32,         // rwx
57        group_bits: Bits::ReadExecute as i32, // r-x
58        other_bits: Bits::ReadExecute as i32, // r-x
59    }
60}
61
62/// Default mode for files: 0644 (rw-r--r--)
63pub fn default_file_mode() -> PMode {
64    PMode {
65        owner_bits: Bits::ReadWrite as i32, // rw-
66        group_bits: Bits::Read as i32,      // r--
67        other_bits: Bits::Read as i32,      // r--
68    }
69}
70
71/// Client for Goosefs `FileSystemMasterClientService` (Master:9200).
72///
73/// In HA mode, the client holds a reference to the [`MasterInquireClient`]
74/// and can automatically re-discover the Primary Master when RPCs fail.
75///
76/// ## Authentication
77///
78/// The client supports NOSASL and SIMPLE authentication modes.
79/// When `config.auth_type` is `Simple`, the client performs a SASL PLAIN
80/// handshake after establishing the gRPC channel, then injects a `channel-id`
81/// metadata header into all subsequent RPCs.
82#[derive(Clone)]
83pub struct MasterClient {
84    inner: Arc<RwLock<AuthenticatedFsClient>>,
85    config: GoosefsConfig,
86    inquire_client: Arc<dyn MasterInquireClient>,
87    /// Keeps the SASL authentication stream alive for the channel's lifetime.
88    /// In SIMPLE mode, dropping this would cause the server to unregister the channel.
89    _sasl_guard: Arc<RwLock<Option<SaslStreamGuard>>>,
90}
91
92impl MasterClient {
93    /// Connect to the Goosefs Master.
94    ///
95    /// In single-master mode, connects directly to `config.master_addr`.
96    /// In HA mode (multiple addresses in `config.master_addrs`), uses
97    /// [`PollingMasterInquireClient`](crate::client::master_inquire::PollingMasterInquireClient)
98    /// to discover the Primary first.
99    ///
100    /// Authentication is performed according to `config.auth_type`.
101    pub async fn connect(config: &GoosefsConfig) -> Result<Self> {
102        let inquire_client = create_master_inquire_client(config);
103        Self::connect_with_inquire(config, inquire_client).await
104    }
105
106    /// Connect using an externally-provided [`MasterInquireClient`].
107    ///
108    /// This is useful when sharing a single inquire client across multiple
109    /// client types (e.g. `MasterClient` + `WorkerManagerClient`).
110    pub async fn connect_with_inquire(
111        config: &GoosefsConfig,
112        inquire_client: Arc<dyn MasterInquireClient>,
113    ) -> Result<Self> {
114        let primary_addr = inquire_client.get_primary_rpc_address().await?;
115        let (client, sasl_guard) = Self::build_authenticated_client(config, &primary_addr).await?;
116        debug!(addr = %primary_addr, auth_type = %config.auth_type, "connected to Goosefs Master");
117
118        Ok(Self {
119            inner: Arc::new(RwLock::new(client)),
120            config: config.clone(),
121            inquire_client,
122            _sasl_guard: Arc::new(RwLock::new(sasl_guard)),
123        })
124    }
125
126    /// Create from an existing tonic channel (useful for testing / channel sharing).
127    ///
128    /// **Note**: This bypasses authentication. The channel is wrapped with a
129    /// no-op channel-id interceptor for API compatibility.
130    pub fn from_channel(channel: Channel, config: GoosefsConfig) -> Self {
131        let inquire_client = create_master_inquire_client(&config);
132        let interceptor = ChannelIdInterceptor::new("test-no-auth".to_string());
133        let intercepted = InterceptedService::new(channel, interceptor);
134        Self {
135            inner: Arc::new(RwLock::new(FileSystemMasterClientServiceClient::new(
136                intercepted,
137            ))),
138            config,
139            inquire_client,
140            _sasl_guard: Arc::new(RwLock::new(None)),
141        }
142    }
143
144    /// Build a gRPC channel and perform authentication, returning an authenticated client
145    /// and the SASL stream guard that must be kept alive.
146    async fn build_authenticated_client(
147        config: &GoosefsConfig,
148        addr: &str,
149    ) -> Result<(AuthenticatedFsClient, Option<SaslStreamGuard>)> {
150        let channel = Self::build_raw_channel(config, addr).await?;
151
152        // Perform SASL authentication based on the configured auth type
153        let authenticator = ChannelAuthenticator::new(
154            config.auth_type,
155            config.auth_username.clone(),
156            None, // impersonation_user: not yet supported
157        )
158        .with_auth_timeout(config.auth_timeout);
159
160        let mut auth_channel = authenticator.authenticate(channel).await?;
161        let sasl_guard = auth_channel.take_sasl_guard();
162
163        Ok((
164            FileSystemMasterClientServiceClient::new(auth_channel.channel),
165            sasl_guard,
166        ))
167    }
168
169    /// Build a raw gRPC channel to a specific master address (without authentication).
170    async fn build_raw_channel(config: &GoosefsConfig, addr: &str) -> Result<Channel> {
171        let endpoint_uri = format!("http://{}", addr);
172        let endpoint = Channel::from_shared(endpoint_uri)
173            .map_err(|e| Error::ConfigError {
174                message: format!("invalid master endpoint: {}", e),
175            })?
176            .connect_timeout(config.connect_timeout)
177            .timeout(config.request_timeout);
178
179        let channel = endpoint.connect().await?;
180        Ok(channel)
181    }
182
183    /// Reconnect to the Primary Master after a failover.
184    ///
185    /// Resets the cached Primary in the inquire client, re-discovers the
186    /// new Primary, rebuilds the gRPC channel, and re-authenticates.
187    async fn reconnect(&self) -> Result<()> {
188        // Reset cached primary so the inquire client re-polls all addresses.
189        self.inquire_client.reset_cached_primary().await;
190
191        let primary_addr = self.inquire_client.get_primary_rpc_address().await?;
192        let (client, sasl_guard) =
193            Self::build_authenticated_client(&self.config, &primary_addr).await?;
194        let mut inner = self.inner.write().await;
195        *inner = client;
196        // Replace the old SASL guard (dropping the old one closes the old stream)
197        let mut guard = self._sasl_guard.write().await;
198        *guard = sasl_guard;
199        debug!(addr = %primary_addr, "reconnected to Goosefs Master after failover");
200        Ok(())
201    }
202
203    /// Execute an RPC with automatic retry on retriable errors.
204    ///
205    /// On retriable failure, the client reconnects to a (potentially new)
206    /// Primary Master and retries up to [`MAX_RPC_RETRIES`] times.
207    async fn with_retry<F, Fut, T>(&self, op_name: &str, f: F) -> Result<T>
208    where
209        F: Fn(AuthenticatedFsClient) -> Fut,
210        Fut: std::future::Future<Output = Result<T>>,
211    {
212        let mut last_err: Option<Error> = None;
213
214        for attempt in 0..=MAX_RPC_RETRIES {
215            let client: AuthenticatedFsClient = {
216                let inner = self.inner.read().await;
217                inner.clone()
218            };
219
220            match f(client).await {
221                Ok(result) => return Ok(result),
222                Err(err) => {
223                    if err.is_retriable() && attempt < MAX_RPC_RETRIES {
224                        warn!(
225                            op = op_name,
226                            attempt = attempt + 1,
227                            max = MAX_RPC_RETRIES,
228                            error = %err,
229                            "retriable error, reconnecting and retrying"
230                        );
231                        if let Err(reconnect_err) = self.reconnect().await {
232                            warn!(error = %reconnect_err, "reconnect failed");
233                            last_err = Some(err);
234                            continue;
235                        }
236                    } else {
237                        return Err(err);
238                    }
239                    last_err = Some(err);
240                }
241            }
242        }
243
244        Err(last_err.unwrap_or_else(|| Error::Internal {
245            message: format!("{}: exhausted all retries", op_name),
246            source: None,
247        }))
248    }
249
250    /// Get the file/directory status (equivalent to `stat` / `head`).
251    #[instrument(skip(self), fields(path = %path))]
252    pub async fn get_status(&self, path: &str) -> Result<FileInfo> {
253        let path = path.to_string();
254        self.with_retry("get_status", |mut client| {
255            let path = path.clone();
256            async move {
257                let req = GetStatusPRequest {
258                    path: Some(path),
259                    options: Some(GetStatusPOptions::default()),
260                    request_id: None,
261                };
262                let resp = client.get_status(req).await?;
263                resp.into_inner()
264                    .file_info
265                    .ok_or_else(|| Error::missing_field("file_info"))
266            }
267        })
268        .await
269    }
270
271    /// List the contents of a directory. Returns all FileInfo entries.
272    ///
273    /// This wraps a **server-side streaming** RPC — the server sends
274    /// multiple `ListStatusPResponse` messages, each containing a batch
275    /// of `FileInfo`.
276    #[instrument(skip(self), fields(path = %path))]
277    pub async fn list_status(&self, path: &str, recursive: bool) -> Result<Vec<FileInfo>> {
278        let path = path.to_string();
279        self.with_retry("list_status", |mut client| {
280            let path = path.clone();
281            async move {
282                let req = ListStatusPRequest {
283                    path: Some(path),
284                    options: Some(ListStatusPOptions {
285                        recursive: Some(recursive),
286                        ..Default::default()
287                    }),
288                    request_id: None,
289                };
290                let mut stream = client.list_status(req).await?.into_inner();
291                let mut result = Vec::new();
292                while let Some(resp) = stream.message().await? {
293                    result.extend(resp.file_infos);
294                }
295                Ok(result)
296            }
297        })
298        .await
299    }
300
301    /// Create a new file. Returns the `FileInfo` of the created file.
302    #[instrument(skip(self, options), fields(path = %path))]
303    pub async fn create_file(&self, path: &str, options: CreateFilePOptions) -> Result<FileInfo> {
304        let path = path.to_string();
305        self.with_retry("create_file", |mut client| {
306            let path = path.clone();
307            async move {
308                let req = CreateFilePRequest {
309                    path: Some(path),
310                    options: Some(options),
311                };
312                let resp = client.create_file(req).await?;
313                resp.into_inner()
314                    .file_info
315                    .ok_or_else(|| Error::missing_field("file_info"))
316            }
317        })
318        .await
319    }
320
321    /// Mark a file as completed (called after all blocks are written).
322    ///
323    /// # Idempotent operation ID
324    ///
325    /// `operation_id` is used by the Master for exactly-once semantics: if the
326    /// RPC is retried after a network hiccup the Master detects the duplicate
327    /// via `FsOpPId` and returns success without applying the operation twice.
328    ///
329    /// The caller (`GoosefsFileWriter`) generates a fresh `uuid::Uuid` at
330    /// construction time and reuses it across all `complete_file` calls for the
331    /// same write session.  The UUID is split into two `i64` halves via
332    /// `Uuid::as_u64_pair()`:
333    ///
334    /// ```text
335    /// (high, low) = uuid.as_u64_pair()
336    /// FsOpPId { most_significant_bits: high as i64,
337    ///           least_significant_bits: low  as i64 }
338    /// ```
339    ///
340    /// This matches Java `UUID.getMostSignificantBits()` / `getLeastSignificantBits()`
341    /// as verified in `DefaultFileSystemMaster.completeFile()`.
342    ///
343    /// # Note on Go SDK bug
344    ///
345    /// The Go SDK `base_filesystem.go:394-400` accepts an `operationID` parameter
346    /// but **never writes it to the proto request**.  The Rust implementation
347    /// fixes this: `operation_id` is always wired into `CompleteFilePOptions`.
348    #[instrument(skip(self), fields(path = %path))]
349    pub async fn complete_file(
350        &self,
351        path: &str,
352        ufs_length: Option<i64>,
353        operation_id: Option<FsOpPId>,
354    ) -> Result<()> {
355        let path = path.to_string();
356        self.with_retry("complete_file", |mut client| {
357            let path = path.clone();
358            async move {
359                let common_options = operation_id.map(|op_id| FileSystemMasterCommonPOptions {
360                    operation_id: Some(op_id),
361                    ..Default::default()
362                });
363                let req = CompleteFilePRequest {
364                    path: Some(path),
365                    options: Some(CompleteFilePOptions {
366                        ufs_length,
367                        common_options,
368                        ..Default::default()
369                    }),
370                    inode_id: None,
371                };
372                client.complete_file(req).await?;
373                Ok(())
374            }
375        })
376        .await
377    }
378
379    // -----------------------------------------------------------------------
380    // RemoveBlocks RPC
381    // -----------------------------------------------------------------------
382
383    /// Request the Master to free block metadata for the given block IDs.
384    ///
385    /// This is the preferred cleanup path for `GoosefsFileWriter::cancel()`:
386    /// it removes only the block metadata on the Master without touching the
387    /// file-system namespace entry (the INCOMPLETE inode).
388    ///
389    /// Falls back to `delete_with_options(unchecked=true)` when this RPC fails.
390    ///
391    /// # Java authority
392    ///
393    /// Matches `FileSystemMasterClientServiceHandler.removeBlocks()` →
394    /// `DefaultFileSystemMaster.removeBlocks(blockIds)`.
395    #[instrument(skip(self, block_ids), fields(block_count = block_ids.len()))]
396    pub async fn remove_blocks(&self, block_ids: Vec<i64>) -> Result<()> {
397        if block_ids.is_empty() {
398            return Ok(());
399        }
400        let block_ids_clone = block_ids.clone();
401        self.with_retry("remove_blocks", |mut client| {
402            let block_ids = block_ids_clone.clone();
403            async move {
404                let req = RemoveBlocksPRequest { block_ids };
405                client.remove_blocks(req).await?;
406                Ok(())
407            }
408        })
409        .await
410    }
411
412    // -----------------------------------------------------------------------
413    // Delete with full DeleteOptions
414    // -----------------------------------------------------------------------
415
416    /// Delete a file or directory with fine-grained options.
417    ///
418    /// Prefer this over the legacy [`delete`](Self::delete) wrapper when you need
419    /// `unchecked` or `goosefs_only` semantics.
420    ///
421    /// See [`DeleteOptions`] for field semantics and Java authority notes.
422    #[instrument(skip(self, opts), fields(path = %path))]
423    pub async fn delete_with_options(&self, path: &str, opts: DeleteOptions) -> Result<()> {
424        let path = path.to_string();
425        self.with_retry("delete_with_options", |mut client| {
426            let path = path.clone();
427            let opts = opts.clone();
428            async move {
429                let req = DeletePRequest {
430                    path: Some(path),
431                    options: Some(DeletePOptions {
432                        recursive: Some(opts.recursive),
433                        unchecked: Some(opts.unchecked),
434                        goosefs_only: Some(opts.goosefs_only),
435                        ..Default::default()
436                    }),
437                };
438                client.remove(req).await?;
439                Ok(())
440            }
441        })
442        .await
443    }
444
445    /// Delete a file or directory (simple recursive wrapper).
446    ///
447    /// For `unchecked` or `goosefs_only` deletion use [`delete_with_options`](Self::delete_with_options)
448    /// directly.
449    #[instrument(skip(self), fields(path = %path, recursive = %recursive))]
450    pub async fn delete(&self, path: &str, recursive: bool) -> Result<()> {
451        self.delete_with_options(
452            path,
453            DeleteOptions {
454                recursive,
455                ..Default::default()
456            },
457        )
458        .await
459    }
460
461    /// Rename (move) a file or directory.
462    #[instrument(skip(self), fields(src = %src, dst = %dst))]
463    pub async fn rename(&self, src: &str, dst: &str) -> Result<()> {
464        let src = src.to_string();
465        let dst = dst.to_string();
466        self.with_retry("rename", |mut client| {
467            let src = src.clone();
468            let dst = dst.clone();
469            async move {
470                let req = RenamePRequest {
471                    path: Some(src),
472                    dst_path: Some(dst),
473                    options: Some(RenamePOptions::default()),
474                };
475                client.rename(req).await?;
476                Ok(())
477            }
478        })
479        .await
480    }
481
482    /// Create a directory (recursive by default).
483    ///
484    /// Sets a default mode of `0755` (rwxr-xr-x) so that the corresponding
485    /// UFS directory created by Goosefs has usable permissions.
486    #[instrument(skip(self), fields(path = %path))]
487    pub async fn create_directory(&self, path: &str, recursive: bool) -> Result<()> {
488        let path = path.to_string();
489        self.with_retry("create_directory", |mut client| {
490            let path = path.clone();
491            async move {
492                let req = CreateDirectoryPRequest {
493                    path: Some(path),
494                    options: Some(CreateDirectoryPOptions {
495                        recursive: Some(recursive),
496                        allow_exists: Some(true),
497                        mode: Some(default_dir_mode()),
498                        ..Default::default()
499                    }),
500                };
501                client.create_directory(req).await?;
502                Ok(())
503            }
504        })
505        .await
506    }
507
508    /// Schedule asynchronous persistence for a file.
509    /// This will persist the file to the underlying storage system.
510    #[instrument(skip(self), fields(path = %path))]
511    pub async fn schedule_async_persistence(
512        &self,
513        path: &str,
514        persistence_wait_time: Option<i64>,
515    ) -> Result<()> {
516        let path = path.to_string();
517        self.with_retry("schedule_async_persistence", |mut client| {
518            let path = path.clone();
519            async move {
520                let req = ScheduleAsyncPersistencePRequest {
521                    path: Some(path),
522                    options: Some(ScheduleAsyncPersistencePOptions {
523                        common_options: None,
524                        persistence_wait_time,
525                    }),
526                };
527                client.schedule_async_persistence(req).await?;
528                Ok(())
529            }
530        })
531        .await
532    }
533
534    /// Get a reference to the underlying config.
535    pub fn config(&self) -> &GoosefsConfig {
536        &self.config
537    }
538
539    /// Get a reference to the underlying inquire client.
540    ///
541    /// Useful for sharing the same inquire client with `WorkerManagerClient`.
542    pub fn inquire_client(&self) -> &Arc<dyn MasterInquireClient> {
543        &self.inquire_client
544    }
545}