use std::sync::Arc;
use tokio::sync::RwLock;
use tonic::service::interceptor::InterceptedService;
use tonic::transport::Channel;
use tracing::{debug, instrument, warn};
use crate::auth::{ChannelAuthenticator, ChannelIdInterceptor, SaslStreamGuard};
use crate::client::master_inquire::{create_master_inquire_client, MasterInquireClient};
use crate::config::GoosefsConfig;
use crate::error::{Error, Result};
use crate::fs::options::DeleteOptions;
use crate::proto::grpc::file::{
file_system_master_client_service_client::FileSystemMasterClientServiceClient,
CompleteFilePOptions, CompleteFilePRequest, CreateDirectoryPOptions, CreateDirectoryPRequest,
CreateFilePOptions, CreateFilePRequest, DeletePOptions, DeletePRequest, FileInfo,
FileSystemMasterCommonPOptions, FsOpPId, GetStatusPOptions, GetStatusPRequest,
ListStatusPOptions, ListStatusPRequest, RemoveBlocksPRequest, RenamePOptions, RenamePRequest,
ScheduleAsyncPersistencePOptions, ScheduleAsyncPersistencePRequest,
};
use crate::proto::grpc::{Bits, PMode};
const MAX_RPC_RETRIES: u32 = 2;
type AuthenticatedFsClient =
FileSystemMasterClientServiceClient<InterceptedService<Channel, ChannelIdInterceptor>>;
pub fn default_dir_mode() -> PMode {
PMode {
owner_bits: Bits::All as i32, group_bits: Bits::ReadExecute as i32, other_bits: Bits::ReadExecute as i32, }
}
pub fn default_file_mode() -> PMode {
PMode {
owner_bits: Bits::ReadWrite as i32, group_bits: Bits::Read as i32, other_bits: Bits::Read as i32, }
}
#[derive(Clone)]
pub struct MasterClient {
inner: Arc<RwLock<AuthenticatedFsClient>>,
config: GoosefsConfig,
inquire_client: Arc<dyn MasterInquireClient>,
_sasl_guard: Arc<RwLock<Option<SaslStreamGuard>>>,
}
impl MasterClient {
pub async fn connect(config: &GoosefsConfig) -> Result<Self> {
let inquire_client = create_master_inquire_client(config);
Self::connect_with_inquire(config, inquire_client).await
}
pub async fn connect_with_inquire(
config: &GoosefsConfig,
inquire_client: Arc<dyn MasterInquireClient>,
) -> Result<Self> {
let primary_addr = inquire_client.get_primary_rpc_address().await?;
let (client, sasl_guard) = Self::build_authenticated_client(config, &primary_addr).await?;
debug!(addr = %primary_addr, auth_type = %config.auth_type, "connected to Goosefs Master");
Ok(Self {
inner: Arc::new(RwLock::new(client)),
config: config.clone(),
inquire_client,
_sasl_guard: Arc::new(RwLock::new(sasl_guard)),
})
}
pub fn from_channel(channel: Channel, config: GoosefsConfig) -> Self {
let inquire_client = create_master_inquire_client(&config);
let interceptor = ChannelIdInterceptor::new("test-no-auth".to_string());
let intercepted = InterceptedService::new(channel, interceptor);
Self {
inner: Arc::new(RwLock::new(FileSystemMasterClientServiceClient::new(
intercepted,
))),
config,
inquire_client,
_sasl_guard: Arc::new(RwLock::new(None)),
}
}
async fn build_authenticated_client(
config: &GoosefsConfig,
addr: &str,
) -> Result<(AuthenticatedFsClient, Option<SaslStreamGuard>)> {
let channel = Self::build_raw_channel(config, addr).await?;
let authenticator = ChannelAuthenticator::new(
config.auth_type,
config.auth_username.clone(),
None, )
.with_auth_timeout(config.auth_timeout);
let mut auth_channel = authenticator.authenticate(channel).await?;
let sasl_guard = auth_channel.take_sasl_guard();
Ok((
FileSystemMasterClientServiceClient::new(auth_channel.channel),
sasl_guard,
))
}
async fn build_raw_channel(config: &GoosefsConfig, addr: &str) -> Result<Channel> {
let endpoint_uri = format!("http://{}", addr);
let endpoint = Channel::from_shared(endpoint_uri)
.map_err(|e| Error::ConfigError {
message: format!("invalid master endpoint: {}", e),
})?
.connect_timeout(config.connect_timeout)
.timeout(config.request_timeout);
let channel = endpoint.connect().await?;
Ok(channel)
}
async fn reconnect(&self) -> Result<()> {
self.inquire_client.reset_cached_primary().await;
let primary_addr = self.inquire_client.get_primary_rpc_address().await?;
let (client, sasl_guard) =
Self::build_authenticated_client(&self.config, &primary_addr).await?;
let mut inner = self.inner.write().await;
*inner = client;
let mut guard = self._sasl_guard.write().await;
*guard = sasl_guard;
debug!(addr = %primary_addr, "reconnected to Goosefs Master after failover");
Ok(())
}
async fn with_retry<F, Fut, T>(&self, op_name: &str, f: F) -> Result<T>
where
F: Fn(AuthenticatedFsClient) -> Fut,
Fut: std::future::Future<Output = Result<T>>,
{
let mut last_err: Option<Error> = None;
for attempt in 0..=MAX_RPC_RETRIES {
let client: AuthenticatedFsClient = {
let inner = self.inner.read().await;
inner.clone()
};
match f(client).await {
Ok(result) => return Ok(result),
Err(err) => {
if err.is_retriable() && attempt < MAX_RPC_RETRIES {
warn!(
op = op_name,
attempt = attempt + 1,
max = MAX_RPC_RETRIES,
error = %err,
"retriable error, reconnecting and retrying"
);
if let Err(reconnect_err) = self.reconnect().await {
warn!(error = %reconnect_err, "reconnect failed");
last_err = Some(err);
continue;
}
} else {
return Err(err);
}
last_err = Some(err);
}
}
}
Err(last_err.unwrap_or_else(|| Error::Internal {
message: format!("{}: exhausted all retries", op_name),
source: None,
}))
}
#[instrument(skip(self), fields(path = %path))]
pub async fn get_status(&self, path: &str) -> Result<FileInfo> {
let path = path.to_string();
self.with_retry("get_status", |mut client| {
let path = path.clone();
async move {
let req = GetStatusPRequest {
path: Some(path),
options: Some(GetStatusPOptions::default()),
request_id: None,
};
let resp = client.get_status(req).await?;
resp.into_inner()
.file_info
.ok_or_else(|| Error::missing_field("file_info"))
}
})
.await
}
#[instrument(skip(self), fields(path = %path))]
pub async fn list_status(&self, path: &str, recursive: bool) -> Result<Vec<FileInfo>> {
let path = path.to_string();
self.with_retry("list_status", |mut client| {
let path = path.clone();
async move {
let req = ListStatusPRequest {
path: Some(path),
options: Some(ListStatusPOptions {
recursive: Some(recursive),
..Default::default()
}),
request_id: None,
};
let mut stream = client.list_status(req).await?.into_inner();
let mut result = Vec::new();
while let Some(resp) = stream.message().await? {
result.extend(resp.file_infos);
}
Ok(result)
}
})
.await
}
#[instrument(skip(self, options), fields(path = %path))]
pub async fn create_file(&self, path: &str, options: CreateFilePOptions) -> Result<FileInfo> {
let path = path.to_string();
self.with_retry("create_file", |mut client| {
let path = path.clone();
async move {
let req = CreateFilePRequest {
path: Some(path),
options: Some(options),
};
let resp = client.create_file(req).await?;
resp.into_inner()
.file_info
.ok_or_else(|| Error::missing_field("file_info"))
}
})
.await
}
#[instrument(skip(self), fields(path = %path))]
pub async fn complete_file(
&self,
path: &str,
ufs_length: Option<i64>,
operation_id: Option<FsOpPId>,
) -> Result<()> {
let path = path.to_string();
self.with_retry("complete_file", |mut client| {
let path = path.clone();
async move {
let common_options = operation_id.map(|op_id| FileSystemMasterCommonPOptions {
operation_id: Some(op_id),
..Default::default()
});
let req = CompleteFilePRequest {
path: Some(path),
options: Some(CompleteFilePOptions {
ufs_length,
common_options,
..Default::default()
}),
inode_id: None,
};
client.complete_file(req).await?;
Ok(())
}
})
.await
}
#[instrument(skip(self, block_ids), fields(block_count = block_ids.len()))]
pub async fn remove_blocks(&self, block_ids: Vec<i64>) -> Result<()> {
if block_ids.is_empty() {
return Ok(());
}
let block_ids_clone = block_ids.clone();
self.with_retry("remove_blocks", |mut client| {
let block_ids = block_ids_clone.clone();
async move {
let req = RemoveBlocksPRequest { block_ids };
client.remove_blocks(req).await?;
Ok(())
}
})
.await
}
#[instrument(skip(self, opts), fields(path = %path))]
pub async fn delete_with_options(&self, path: &str, opts: DeleteOptions) -> Result<()> {
let path = path.to_string();
self.with_retry("delete_with_options", |mut client| {
let path = path.clone();
let opts = opts.clone();
async move {
let req = DeletePRequest {
path: Some(path),
options: Some(DeletePOptions {
recursive: Some(opts.recursive),
unchecked: Some(opts.unchecked),
goosefs_only: Some(opts.goosefs_only),
..Default::default()
}),
};
client.remove(req).await?;
Ok(())
}
})
.await
}
#[instrument(skip(self), fields(path = %path, recursive = %recursive))]
pub async fn delete(&self, path: &str, recursive: bool) -> Result<()> {
self.delete_with_options(
path,
DeleteOptions {
recursive,
..Default::default()
},
)
.await
}
#[instrument(skip(self), fields(src = %src, dst = %dst))]
pub async fn rename(&self, src: &str, dst: &str) -> Result<()> {
let src = src.to_string();
let dst = dst.to_string();
self.with_retry("rename", |mut client| {
let src = src.clone();
let dst = dst.clone();
async move {
let req = RenamePRequest {
path: Some(src),
dst_path: Some(dst),
options: Some(RenamePOptions::default()),
};
client.rename(req).await?;
Ok(())
}
})
.await
}
#[instrument(skip(self), fields(path = %path))]
pub async fn create_directory(&self, path: &str, recursive: bool) -> Result<()> {
let path = path.to_string();
self.with_retry("create_directory", |mut client| {
let path = path.clone();
async move {
let req = CreateDirectoryPRequest {
path: Some(path),
options: Some(CreateDirectoryPOptions {
recursive: Some(recursive),
allow_exists: Some(true),
mode: Some(default_dir_mode()),
..Default::default()
}),
};
client.create_directory(req).await?;
Ok(())
}
})
.await
}
#[instrument(skip(self), fields(path = %path))]
pub async fn schedule_async_persistence(
&self,
path: &str,
persistence_wait_time: Option<i64>,
) -> Result<()> {
let path = path.to_string();
self.with_retry("schedule_async_persistence", |mut client| {
let path = path.clone();
async move {
let req = ScheduleAsyncPersistencePRequest {
path: Some(path),
options: Some(ScheduleAsyncPersistencePOptions {
common_options: None,
persistence_wait_time,
}),
};
client.schedule_async_persistence(req).await?;
Ok(())
}
})
.await
}
pub fn config(&self) -> &GoosefsConfig {
&self.config
}
pub fn inquire_client(&self) -> &Arc<dyn MasterInquireClient> {
&self.inquire_client
}
}