use serde::{Deserialize, Serialize};
use std::fmt;
use std::str::FromStr;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Mutex;
use std::time::{Duration, Instant};
use crate::auth::AuthType;
use crate::proto::grpc::file::WritePType;
#[derive(Debug)]
pub enum ConfigLoadError {
IoError { path: String, source: String },
ParseError { message: String },
}
impl std::fmt::Display for ConfigLoadError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ConfigLoadError::IoError { path, source } => {
write!(f, "failed to read config file '{}': {}", path, source)
}
ConfigLoadError::ParseError { message } => {
write!(f, "failed to parse YAML config: {}", message)
}
}
}
}
impl std::error::Error for ConfigLoadError {}
use std::collections::HashMap;
#[derive(Debug, Default)]
struct PropertiesMap {
props: HashMap<String, String>,
}
impl PropertiesMap {
fn parse(content: &str) -> Self {
let mut props = HashMap::new();
for line in content.lines() {
let trimmed = line.trim();
if trimmed.is_empty() || trimmed.starts_with('#') || trimmed.starts_with('!') {
continue;
}
let sep_pos = trimmed.find('=').or_else(|| trimmed.find(':'));
if let Some(pos) = sep_pos {
let key = trimmed[..pos].trim().to_string();
let value = trimmed[pos + 1..].trim().to_string();
if !key.is_empty() {
props.insert(key, value);
}
}
}
PropertiesMap { props }
}
fn get(&self, key: &str) -> Option<&str> {
self.props.get(key).map(|s| s.as_str())
}
fn get_parsed<T: FromStr>(&self, key: &str) -> Option<T> {
self.get(key).and_then(|v| v.parse::<T>().ok())
}
fn get_bool(&self, key: &str) -> Option<bool> {
self.get(key)
.and_then(|v| v.to_ascii_lowercase().parse::<bool>().ok())
}
fn get_list(&self, key: &str) -> Option<Vec<String>> {
self.get(key).map(|v| {
v.split(',')
.map(str::trim)
.filter(|s| !s.is_empty())
.map(String::from)
.collect()
})
}
}
fn parse_byte_size(s: &str) -> Result<u64, String> {
let s = s.trim();
let upper = s.to_uppercase();
let (multiplier, num_str) = if upper.ends_with("GB") {
(1024u64 * 1024 * 1024, &s[..s.len() - 2])
} else if upper.ends_with("MB") {
(1024 * 1024, &s[..s.len() - 2])
} else if upper.ends_with("KB") {
(1024, &s[..s.len() - 2])
} else {
(1, s)
};
num_str
.trim()
.parse::<u64>()
.map(|n| n * multiplier)
.map_err(|e| format!("invalid byte size '{}': {}", s, e))
}
impl PropertiesMap {
fn into_goosefs_config(self) -> GooseFsConfig {
let mut cfg = GooseFsConfig::default();
if let Some(addrs) = self.get_list("goosefs.master.rpc.addresses") {
if !addrs.is_empty() {
cfg.master_addr = addrs[0].clone();
if addrs.len() > 1 {
cfg.master_addrs = addrs;
}
}
} else if let Some(host) = self.get("goosefs.master.hostname") {
let port: u16 = self.get_parsed("goosefs.master.rpc.port").unwrap_or(9200);
cfg.master_addr = format!("{}:{}", host, port);
}
if let Some(addrs) = self.get_list("goosefs.config.manager.rpc.addresses") {
if !addrs.is_empty() {
cfg.config_manager_rpc_addresses = addrs;
}
}
if let Some(port) = self.get_parsed::<u16>("goosefs.config.rpc.port") {
cfg.config_rpc_port = port;
}
if let Some(at_str) = self.get("goosefs.security.authentication.type") {
if let Ok(at) = at_str.parse::<AuthType>() {
cfg.auth_type = at;
}
}
if let Some(enabled) = self.get_bool("goosefs.security.authorization.permission.enabled") {
cfg.authorization_permission_enabled = enabled;
}
if let Some(user) = self.get("goosefs.security.login.impersonation.username") {
if !user.is_empty() {
cfg.login_impersonation_username = user.to_string();
}
}
if let Some(user) = self.get("goosefs.security.login.username") {
if !user.is_empty() {
cfg.auth_username = user.to_string();
}
}
if let Some(enabled) = self.get_bool("goosefs.user.client.transparent_acceleration.enabled")
{
cfg.transparent_acceleration_enabled = enabled;
}
if let Some(enabled) =
self.get_bool("goosefs.user.client.transparent_acceleration.cosranger.enabled")
{
cfg.transparent_acceleration_cosranger_enabled = enabled;
}
if let Some(wt_str) = self.get("goosefs.user.file.writetype.default") {
if let Ok(wt) = wt_str.parse::<WriteType>() {
cfg.write_type = Some(wt.as_i32());
}
}
if let Some(bs_str) = self.get("goosefs.user.block.size.bytes.default") {
if let Ok(bs) = parse_byte_size(bs_str) {
if bs > 0 {
cfg.block_size = bs;
}
}
}
if let Some(cs_str) = self.get("goosefs.user.network.data.transfer.chunk.size") {
if let Ok(cs) = parse_byte_size(cs_str) {
if cs > 0 {
cfg.chunk_size = cs;
}
}
}
cfg
}
}
const PROPERTIES_FILENAME: &str = "goosefs-site.properties";
pub fn discover_config_file() -> Option<std::path::PathBuf> {
use std::path::PathBuf;
if let Ok(p) = std::env::var(ENV_CONFIG_FILE) {
let pb = PathBuf::from(&p);
if pb.exists() {
return Some(pb);
}
}
if let Ok(conf_dir) = std::env::var(CONF_DIR) {
let p = PathBuf::from(&conf_dir).join(PROPERTIES_FILENAME);
if p.exists() {
return Some(p);
}
}
if let Ok(home) = std::env::var(ENV_HOME) {
let p = PathBuf::from(&home).join("conf").join(PROPERTIES_FILENAME);
if p.exists() {
return Some(p);
}
}
if let Some(home) = dirs_next_home() {
let p = home.join(".goosefs").join(PROPERTIES_FILENAME);
if p.exists() {
return Some(p);
}
}
let system = PathBuf::from("/etc/goosefs").join(PROPERTIES_FILENAME);
if system.exists() {
return Some(system);
}
None
}
fn dirs_next_home() -> Option<std::path::PathBuf> {
std::env::var("HOME")
.or_else(|_| std::env::var("USERPROFILE"))
.ok()
.map(std::path::PathBuf::from)
}
const DEFAULT_MASTER_PORT: u16 = 9200;
#[allow(dead_code)]
const DEFAULT_WORKER_PORT: u16 = 9203;
const DEFAULT_BLOCK_SIZE: u64 = 64 * 1024 * 1024;
const DEFAULT_CHUNK_SIZE: u64 = 1024 * 1024;
const DEFAULT_CONNECT_TIMEOUT_MS: u64 = 30_000;
const DEFAULT_REQUEST_TIMEOUT_MS: u64 = 300_000;
const DEFAULT_MASTER_POLLING_TIMEOUT_MS: u64 = 30_000;
const DEFAULT_AUTH_TIMEOUT_MS: u64 = 30_000;
const DEFAULT_CONFIG_RPC_PORT: u16 = 9214;
const DEFAULT_IMPERSONATION_USERNAME: &str = "_HDFS_USER_";
#[allow(dead_code)]
pub const IMPERSONATION_NONE: &str = "_NONE_";
const DEFAULT_MASTER_INQUIRE_MAX_DURATION_MS: u64 = 120_000;
const DEFAULT_MASTER_INQUIRE_INITIAL_SLEEP_MS: u64 = 50;
const DEFAULT_MASTER_INQUIRE_MAX_SLEEP_MS: u64 = 3_000;
const DEFAULT_CONFIG_EXPIRE_MS: u64 = 30_000;
pub const STORAGE_OPT_MASTER_ADDR: &str = "goosefs_master_addr";
pub const STORAGE_OPT_WRITE_TYPE: &str = "goosefs_write_type";
pub const STORAGE_OPT_BLOCK_SIZE: &str = "goosefs_block_size";
pub const STORAGE_OPT_CHUNK_SIZE: &str = "goosefs_chunk_size";
pub const STORAGE_OPT_AUTH_TYPE: &str = "goosefs_auth_type";
pub const STORAGE_OPT_AUTH_USERNAME: &str = "goosefs_auth_username";
pub const CONF_DIR: &str = "goosefs.conf.dir";
pub const ENV_CONFIG_FILE: &str = "GOOSEFS_CONFIG_FILE";
pub const ENV_CONF_DIR: &str = "GOOSEFS_CONF_DIR";
pub const ENV_HOME: &str = "GOOSEFS_HOME";
pub const ENV_MASTER_ADDR: &str = "GOOSEFS_MASTER_ADDR";
pub const ENV_WRITE_TYPE: &str = "GOOSEFS_WRITE_TYPE";
pub const ENV_BLOCK_SIZE: &str = "GOOSEFS_BLOCK_SIZE";
pub const ENV_CHUNK_SIZE: &str = "GOOSEFS_CHUNK_SIZE";
pub const ENV_AUTH_TYPE: &str = "GOOSEFS_AUTH_TYPE";
pub const ENV_AUTH_USERNAME: &str = "GOOSEFS_AUTH_USERNAME";
pub const ENV_CONFIG_MANAGER_RPC_ADDRESSES: &str = "GOOSEFS_CONFIG_MANAGER_RPC_ADDRESSES";
pub const ENV_CONFIG_RPC_PORT: &str = "GOOSEFS_CONFIG_RPC_PORT";
pub const ENV_TRANSPARENT_ACCELERATION_ENABLED: &str = "GOOSEFS_TRANSPARENT_ACCELERATION_ENABLED";
pub const ENV_TRANSPARENT_ACCELERATION_COSRANGER_ENABLED: &str =
"GOOSEFS_TRANSPARENT_ACCELERATION_COSRANGER_ENABLED";
pub const ENV_AUTHORIZATION_PERMISSION_ENABLED: &str = "GOOSEFS_AUTHORIZATION_PERMISSION_ENABLED";
pub const ENV_LOGIN_IMPERSONATION_USERNAME: &str = "GOOSEFS_LOGIN_IMPERSONATION_USERNAME";
pub const STORAGE_OPT_CONFIG_MANAGER_RPC_ADDRESSES: &str = "goosefs_config_manager_rpc_addresses";
pub const STORAGE_OPT_CONFIG_RPC_PORT: &str = "goosefs_config_rpc_port";
pub const STORAGE_OPT_TRANSPARENT_ACCELERATION_ENABLED: &str =
"goosefs_transparent_acceleration_enabled";
pub const STORAGE_OPT_TRANSPARENT_ACCELERATION_COSRANGER_ENABLED: &str =
"goosefs_transparent_acceleration_cosranger_enabled";
pub const STORAGE_OPT_AUTHORIZATION_PERMISSION_ENABLED: &str =
"goosefs_authorization_permission_enabled";
pub const STORAGE_OPT_LOGIN_IMPERSONATION_USERNAME: &str = "goosefs_login_impersonation_username";
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum WriteType {
MustCache,
TryCache,
CacheThrough,
Through,
AsyncThrough,
}
impl WriteType {
pub const ALL: &'static [WriteType] = &[
WriteType::MustCache,
WriteType::TryCache,
WriteType::CacheThrough,
WriteType::Through,
WriteType::AsyncThrough,
];
pub fn as_str(&self) -> &'static str {
match self {
WriteType::MustCache => "must_cache",
WriteType::TryCache => "try_cache",
WriteType::CacheThrough => "cache_through",
WriteType::Through => "through",
WriteType::AsyncThrough => "async_through",
}
}
pub fn as_i32(&self) -> i32 {
WritePType::from(*self) as i32
}
}
impl fmt::Display for WriteType {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str(self.as_str())
}
}
impl FromStr for WriteType {
type Err = String;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s.to_ascii_lowercase().as_str() {
"must_cache" => Ok(WriteType::MustCache),
"try_cache" => Ok(WriteType::TryCache),
"cache_through" => Ok(WriteType::CacheThrough),
"through" => Ok(WriteType::Through),
"async_through" => Ok(WriteType::AsyncThrough),
_ => Err(format!(
"unknown write type '{}'. Expected one of: {}",
s,
WriteType::ALL
.iter()
.map(|wt| wt.as_str())
.collect::<Vec<_>>()
.join(", ")
)),
}
}
}
impl From<WriteType> for WritePType {
fn from(wt: WriteType) -> Self {
match wt {
WriteType::MustCache => WritePType::MustCache,
WriteType::TryCache => WritePType::TryCache,
WriteType::CacheThrough => WritePType::CacheThrough,
WriteType::Through => WritePType::Through,
WriteType::AsyncThrough => WritePType::AsyncThrough,
}
}
}
impl WriteType {
pub fn try_from_proto(pt: WritePType) -> Result<Self, String> {
match pt {
WritePType::MustCache => Ok(WriteType::MustCache),
WritePType::TryCache => Ok(WriteType::TryCache),
WritePType::CacheThrough => Ok(WriteType::CacheThrough),
WritePType::Through => Ok(WriteType::Through),
WritePType::AsyncThrough => Ok(WriteType::AsyncThrough),
other => Err(format!(
"cannot convert WritePType::{:?} to WriteType",
other
)),
}
}
}
impl From<WritePType> for WriteType {
fn from(pt: WritePType) -> Self {
Self::try_from_proto(pt).expect("cannot convert Unspecified/None WritePType to WriteType")
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GooseFsConfig {
pub master_addr: String,
#[serde(default)]
pub master_addrs: Vec<String>,
pub block_size: u64,
pub chunk_size: u64,
pub connect_timeout: Duration,
pub request_timeout: Duration,
pub use_vpc_mapping: bool,
pub root: String,
pub write_type: Option<i32>,
#[serde(default = "default_master_inquire_max_duration")]
pub master_inquire_retry_max_duration: Duration,
#[serde(default = "default_master_inquire_initial_sleep")]
pub master_inquire_initial_sleep: Duration,
#[serde(default = "default_master_inquire_max_sleep")]
pub master_inquire_max_sleep: Duration,
#[serde(default = "default_master_polling_timeout")]
pub master_polling_timeout: Duration,
#[serde(default)]
pub auth_type: AuthType,
#[serde(default = "default_auth_username")]
pub auth_username: String,
#[serde(default = "default_auth_timeout")]
pub auth_timeout: Duration,
#[serde(default)]
pub config_manager_rpc_addresses: Vec<String>,
#[serde(default = "default_config_rpc_port")]
pub config_rpc_port: u16,
#[serde(default = "default_transparent_acceleration_enabled")]
pub transparent_acceleration_enabled: bool,
#[serde(default)]
pub transparent_acceleration_cosranger_enabled: bool,
#[serde(default)]
pub authorization_permission_enabled: bool,
#[serde(default = "default_login_impersonation_username")]
pub login_impersonation_username: String,
}
fn default_master_inquire_max_duration() -> Duration {
Duration::from_millis(DEFAULT_MASTER_INQUIRE_MAX_DURATION_MS)
}
fn default_master_inquire_initial_sleep() -> Duration {
Duration::from_millis(DEFAULT_MASTER_INQUIRE_INITIAL_SLEEP_MS)
}
fn default_master_inquire_max_sleep() -> Duration {
Duration::from_millis(DEFAULT_MASTER_INQUIRE_MAX_SLEEP_MS)
}
fn default_master_polling_timeout() -> Duration {
Duration::from_millis(DEFAULT_MASTER_POLLING_TIMEOUT_MS)
}
fn default_auth_username() -> String {
std::env::var("USER")
.or_else(|_| std::env::var("USERNAME"))
.unwrap_or_else(|_| "unknown".to_string())
}
fn default_auth_timeout() -> Duration {
Duration::from_millis(DEFAULT_AUTH_TIMEOUT_MS)
}
fn default_config_rpc_port() -> u16 {
DEFAULT_CONFIG_RPC_PORT
}
fn default_transparent_acceleration_enabled() -> bool {
true
}
fn default_login_impersonation_username() -> String {
DEFAULT_IMPERSONATION_USERNAME.to_string()
}
impl Default for GooseFsConfig {
fn default() -> Self {
Self {
master_addr: format!("127.0.0.1:{}", DEFAULT_MASTER_PORT),
master_addrs: Vec::new(),
block_size: DEFAULT_BLOCK_SIZE,
chunk_size: DEFAULT_CHUNK_SIZE,
connect_timeout: Duration::from_millis(DEFAULT_CONNECT_TIMEOUT_MS),
request_timeout: Duration::from_millis(DEFAULT_REQUEST_TIMEOUT_MS),
use_vpc_mapping: false,
root: String::new(),
write_type: None,
master_inquire_retry_max_duration: default_master_inquire_max_duration(),
master_inquire_initial_sleep: default_master_inquire_initial_sleep(),
master_inquire_max_sleep: default_master_inquire_max_sleep(),
master_polling_timeout: default_master_polling_timeout(),
auth_type: AuthType::default(),
auth_username: default_auth_username(),
auth_timeout: default_auth_timeout(),
config_manager_rpc_addresses: Vec::new(),
config_rpc_port: default_config_rpc_port(),
transparent_acceleration_enabled: default_transparent_acceleration_enabled(),
transparent_acceleration_cosranger_enabled: false,
authorization_permission_enabled: false,
login_impersonation_username: default_login_impersonation_username(),
}
}
}
impl GooseFsConfig {
pub fn new(master_addr: impl Into<String>) -> Self {
Self {
master_addr: master_addr.into(),
..Default::default()
}
}
pub fn new_ha(addrs: Vec<String>) -> Self {
assert!(!addrs.is_empty(), "master addresses must not be empty");
Self {
master_addr: addrs[0].clone(),
master_addrs: addrs,
..Default::default()
}
}
pub fn from_addresses(addrs: Vec<String>) -> Self {
assert!(!addrs.is_empty(), "master addresses must not be empty");
if addrs.len() == 1 {
Self::new(&addrs[0])
} else {
Self::new_ha(addrs)
}
}
pub fn master_addresses(&self) -> Vec<String> {
if self.master_addrs.is_empty() {
vec![self.master_addr.clone()]
} else {
self.master_addrs.clone()
}
}
pub fn is_multi_master(&self) -> bool {
self.master_addrs.len() > 1
}
pub fn full_path(&self, path: &str) -> String {
if self.root.is_empty() {
path.to_string()
} else {
let root = self.root.trim_end_matches('/');
let path = path.trim_start_matches('/');
format!("{}/{}", root, path)
}
}
pub fn master_endpoint(&self) -> String {
format!("http://{}", self.master_addr)
}
pub fn worker_endpoint(&self, host: &str, rpc_port: i32) -> String {
if self.use_vpc_mapping {
format!("http://{}:{}", host, rpc_port)
} else {
format!("http://{}:{}", host, rpc_port)
}
}
pub fn with_auth_type(mut self, auth_type: AuthType) -> Self {
self.auth_type = auth_type;
self
}
pub fn with_auth_type_str(self, auth_type: &str) -> Result<Self, String> {
let at: AuthType = auth_type.parse()?;
Ok(self.with_auth_type(at))
}
pub fn with_auth_username(mut self, username: impl Into<String>) -> Self {
self.auth_username = username.into();
self
}
pub fn with_write_type(mut self, wt: WritePType) -> Self {
self.write_type = Some(wt as i32);
self
}
pub fn with_write_type_enum(mut self, wt: WriteType) -> Self {
self.write_type = Some(wt.as_i32());
self
}
pub fn with_write_type_str(self, wt: &str) -> Result<Self, String> {
let write_type: WriteType = wt.parse()?;
Ok(self.with_write_type_enum(write_type))
}
pub fn get_write_type(&self) -> Option<WritePType> {
self.write_type.and_then(|v| match v {
0 => Some(WritePType::UnspecifiedWriteType),
1 => Some(WritePType::MustCache),
2 => Some(WritePType::TryCache),
3 => Some(WritePType::CacheThrough),
4 => Some(WritePType::Through),
5 => Some(WritePType::AsyncThrough),
6 => Some(WritePType::None),
_ => Option::None,
})
}
pub fn from_env() -> Self {
Self::default().apply_env()
}
pub fn apply_env(mut self) -> Self {
use std::env;
if let Ok(addr) = env::var(ENV_MASTER_ADDR) {
let addrs: Vec<String> = addr
.split(',')
.map(str::trim)
.filter(|s| !s.is_empty())
.map(String::from)
.collect();
if !addrs.is_empty() {
self.master_addr = addrs[0].clone();
if addrs.len() > 1 {
self.master_addrs = addrs;
} else {
self.master_addrs = Vec::new();
}
}
}
if let Ok(wt_str) = env::var(ENV_WRITE_TYPE) {
if let Ok(wt) = wt_str.parse::<WriteType>() {
self.write_type = Some(wt.as_i32());
}
}
if let Ok(bs_str) = env::var(ENV_BLOCK_SIZE) {
if let Ok(bs) = bs_str.parse::<u64>() {
self.block_size = bs;
}
}
if let Ok(cs_str) = env::var(ENV_CHUNK_SIZE) {
if let Ok(cs) = cs_str.parse::<u64>() {
self.chunk_size = cs;
}
}
if let Ok(at_str) = env::var(ENV_AUTH_TYPE) {
if let Ok(at) = at_str.parse::<crate::auth::AuthType>() {
self.auth_type = at;
}
}
if let Ok(user) = env::var(ENV_AUTH_USERNAME) {
if !user.is_empty() {
self.auth_username = user;
}
}
if let Ok(addrs_str) = env::var(ENV_CONFIG_MANAGER_RPC_ADDRESSES) {
let addrs: Vec<String> = addrs_str
.split(',')
.map(str::trim)
.filter(|s| !s.is_empty())
.map(String::from)
.collect();
if !addrs.is_empty() {
self.config_manager_rpc_addresses = addrs;
}
}
if let Ok(port_str) = env::var(ENV_CONFIG_RPC_PORT) {
if let Ok(port) = port_str.parse::<u16>() {
self.config_rpc_port = port;
}
}
if let Ok(val) = env::var(ENV_TRANSPARENT_ACCELERATION_ENABLED) {
if let Ok(b) = val.parse::<bool>() {
self.transparent_acceleration_enabled = b;
}
}
if let Ok(val) = env::var(ENV_TRANSPARENT_ACCELERATION_COSRANGER_ENABLED) {
if let Ok(b) = val.parse::<bool>() {
self.transparent_acceleration_cosranger_enabled = b;
}
}
if let Ok(val) = env::var(ENV_AUTHORIZATION_PERMISSION_ENABLED) {
if let Ok(b) = val.parse::<bool>() {
self.authorization_permission_enabled = b;
}
}
if let Ok(user) = env::var(ENV_LOGIN_IMPERSONATION_USERNAME) {
if !user.is_empty() {
self.login_impersonation_username = user;
}
}
self
}
pub fn from_properties(path: impl AsRef<std::path::Path>) -> Result<Self, ConfigLoadError> {
let path = path.as_ref();
let content = std::fs::read_to_string(path).map_err(|e| ConfigLoadError::IoError {
path: path.display().to_string(),
source: e.to_string(),
})?;
Ok(Self::from_properties_str(&content))
}
pub fn from_properties_str(content: &str) -> Self {
let props = PropertiesMap::parse(content);
props.into_goosefs_config()
}
pub fn from_properties_auto() -> Result<Self, ConfigLoadError> {
let base = if let Some(path) = discover_config_file() {
Self::from_properties(&path)?
} else {
Self::default()
};
Ok(base.apply_env())
}
pub fn validate(&self) -> Result<(), String> {
if self.master_addr.is_empty() && self.master_addrs.is_empty() {
return Err(
"at least one master address must be provided (master_addr or master_addrs)"
.to_string(),
);
}
if !self.master_addrs.is_empty() && self.master_addrs.iter().any(|a| a.is_empty()) {
return Err("master_addrs contains an empty address".to_string());
}
if self.block_size == 0 {
return Err("block_size must be > 0".to_string());
}
if self.chunk_size == 0 {
return Err("chunk_size must be > 0".to_string());
}
if self.chunk_size > self.block_size {
return Err("chunk_size must be <= block_size".to_string());
}
Ok(())
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct TransparentAccelerationSwitch {
pub enabled: bool,
pub cosranger_enabled: bool,
}
pub struct ConfigRefresher {
last_load_time: Mutex<Option<Instant>>,
expire_duration: Duration,
transparent_acceleration_enabled: AtomicBool,
cosranger_enabled: AtomicBool,
}
impl fmt::Debug for ConfigRefresher {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ConfigRefresher")
.field("expire_duration", &self.expire_duration)
.field(
"transparent_acceleration_enabled",
&self
.transparent_acceleration_enabled
.load(Ordering::Relaxed),
)
.field(
"cosranger_enabled",
&self.cosranger_enabled.load(Ordering::Relaxed),
)
.finish()
}
}
impl ConfigRefresher {
pub fn new() -> Self {
Self::with_expire(Duration::from_millis(DEFAULT_CONFIG_EXPIRE_MS))
}
pub fn with_expire(expire_duration: Duration) -> Self {
let initial = GooseFsConfig::from_properties_auto().unwrap_or_default();
Self {
last_load_time: Mutex::new(Some(Instant::now())),
expire_duration,
transparent_acceleration_enabled: AtomicBool::new(
initial.transparent_acceleration_enabled,
),
cosranger_enabled: AtomicBool::new(initial.transparent_acceleration_cosranger_enabled),
}
}
pub fn from_config(config: &GooseFsConfig) -> Self {
Self {
last_load_time: Mutex::new(Some(Instant::now())),
expire_duration: Duration::from_millis(DEFAULT_CONFIG_EXPIRE_MS),
transparent_acceleration_enabled: AtomicBool::new(
config.transparent_acceleration_enabled,
),
cosranger_enabled: AtomicBool::new(config.transparent_acceleration_cosranger_enabled),
}
}
pub fn refresh_transparent_acceleration_switch(&self) -> TransparentAccelerationSwitch {
self.load_if_expire();
TransparentAccelerationSwitch {
enabled: self
.transparent_acceleration_enabled
.load(Ordering::Relaxed),
cosranger_enabled: self.cosranger_enabled.load(Ordering::Relaxed),
}
}
pub fn current_switch(&self) -> TransparentAccelerationSwitch {
TransparentAccelerationSwitch {
enabled: self
.transparent_acceleration_enabled
.load(Ordering::Relaxed),
cosranger_enabled: self.cosranger_enabled.load(Ordering::Relaxed),
}
}
fn load_if_expire(&self) {
let now = Instant::now();
let needs_reload = {
let guard = self.last_load_time.lock().unwrap();
match *guard {
None => true,
Some(t) => now.duration_since(t) >= self.expire_duration,
}
};
if needs_reload {
let mut guard = self.last_load_time.lock().unwrap();
let still_needs = match *guard {
None => true,
Some(t) => now.duration_since(t) >= self.expire_duration,
};
if still_needs {
self.reload_properties();
*guard = Some(Instant::now());
}
}
}
fn reload_properties(&self) {
match GooseFsConfig::from_properties_auto() {
Ok(cfg) => {
self.transparent_acceleration_enabled
.store(cfg.transparent_acceleration_enabled, Ordering::Relaxed);
self.cosranger_enabled.store(
cfg.transparent_acceleration_cosranger_enabled,
Ordering::Relaxed,
);
tracing::debug!(
transparent_acceleration_enabled = cfg.transparent_acceleration_enabled,
cosranger_enabled = cfg.transparent_acceleration_cosranger_enabled,
"config refreshed from properties file"
);
}
Err(e) => {
tracing::warn!("failed to reload config: {}, keeping previous values", e);
}
}
}
}
impl Default for ConfigRefresher {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_default_config() {
let config = GooseFsConfig::default();
assert_eq!(config.master_addr, "127.0.0.1:9200");
assert!(config.master_addrs.is_empty());
assert_eq!(config.block_size, 64 * 1024 * 1024);
assert_eq!(config.chunk_size, 1024 * 1024);
assert!(!config.is_multi_master());
assert!(config.validate().is_ok());
}
#[test]
fn test_new_ha_config() {
let config = GooseFsConfig::new_ha(vec![
"10.0.0.1:9200".to_string(),
"10.0.0.2:9200".to_string(),
"10.0.0.3:9200".to_string(),
]);
assert_eq!(config.master_addr, "10.0.0.1:9200");
assert_eq!(config.master_addrs.len(), 3);
assert!(config.is_multi_master());
assert!(config.validate().is_ok());
}
#[test]
fn test_master_addresses_single() {
let config = GooseFsConfig::new("10.0.0.1:9200");
let addrs = config.master_addresses();
assert_eq!(addrs, vec!["10.0.0.1:9200"]);
assert!(!config.is_multi_master());
}
#[test]
fn test_master_addresses_multi() {
let config = GooseFsConfig::new_ha(vec![
"10.0.0.1:9200".to_string(),
"10.0.0.2:9200".to_string(),
]);
let addrs = config.master_addresses();
assert_eq!(addrs.len(), 2);
assert!(config.is_multi_master());
}
#[test]
#[should_panic(expected = "master addresses must not be empty")]
fn test_new_ha_empty_panics() {
GooseFsConfig::new_ha(vec![]);
}
#[test]
fn test_full_path_with_root() {
let config = GooseFsConfig {
root: "/data".to_string(),
..Default::default()
};
assert_eq!(config.full_path("/file.txt"), "/data/file.txt");
assert_eq!(config.full_path("file.txt"), "/data/file.txt");
}
#[test]
fn test_full_path_without_root() {
let config = GooseFsConfig::default();
assert_eq!(config.full_path("/file.txt"), "/file.txt");
}
#[test]
fn test_validate_empty_master() {
let config = GooseFsConfig {
master_addr: String::new(),
master_addrs: Vec::new(),
..Default::default()
};
assert!(config.validate().is_err());
}
#[test]
fn test_validate_empty_addr_in_list() {
let config = GooseFsConfig {
master_addr: "10.0.0.1:9200".to_string(),
master_addrs: vec!["10.0.0.1:9200".to_string(), "".to_string()],
..Default::default()
};
assert!(config.validate().is_err());
}
#[test]
fn test_validate_chunk_larger_than_block() {
let config = GooseFsConfig {
chunk_size: 128 * 1024 * 1024,
block_size: 64 * 1024 * 1024,
..Default::default()
};
assert!(config.validate().is_err());
}
#[test]
fn test_write_type_default_is_none() {
let config = GooseFsConfig::default();
assert!(config.write_type.is_none());
assert!(config.get_write_type().is_none());
}
#[test]
fn test_with_write_type_builder() {
let config = GooseFsConfig::new("127.0.0.1:9200").with_write_type(WritePType::CacheThrough);
assert_eq!(config.write_type, Some(3));
assert_eq!(config.get_write_type(), Some(WritePType::CacheThrough));
}
#[test]
fn test_write_p_type_all_variants_config() {
let cases = vec![
(WritePType::MustCache, 1),
(WritePType::TryCache, 2),
(WritePType::CacheThrough, 3),
(WritePType::Through, 4),
(WritePType::AsyncThrough, 5),
];
for (wt, expected_i32) in cases {
let config = GooseFsConfig::new("127.0.0.1:9200").with_write_type(wt);
assert_eq!(config.write_type, Some(expected_i32));
assert_eq!(config.get_write_type(), Some(wt));
}
}
#[test]
fn test_write_type_invalid_i32() {
let config = GooseFsConfig {
write_type: Some(999),
..Default::default()
};
assert!(config.get_write_type().is_none());
}
#[test]
fn test_write_type_from_str_lowercase() {
assert_eq!(
"must_cache".parse::<WriteType>().unwrap(),
WriteType::MustCache
);
assert_eq!(
"try_cache".parse::<WriteType>().unwrap(),
WriteType::TryCache
);
assert_eq!(
"cache_through".parse::<WriteType>().unwrap(),
WriteType::CacheThrough
);
assert_eq!("through".parse::<WriteType>().unwrap(), WriteType::Through);
assert_eq!(
"async_through".parse::<WriteType>().unwrap(),
WriteType::AsyncThrough
);
}
#[test]
fn test_write_type_from_str_uppercase() {
assert_eq!(
"MUST_CACHE".parse::<WriteType>().unwrap(),
WriteType::MustCache
);
assert_eq!(
"TRY_CACHE".parse::<WriteType>().unwrap(),
WriteType::TryCache
);
assert_eq!(
"CACHE_THROUGH".parse::<WriteType>().unwrap(),
WriteType::CacheThrough
);
assert_eq!("THROUGH".parse::<WriteType>().unwrap(), WriteType::Through);
assert_eq!(
"ASYNC_THROUGH".parse::<WriteType>().unwrap(),
WriteType::AsyncThrough
);
}
#[test]
fn test_write_type_from_str_mixed_case() {
assert_eq!(
"Cache_Through".parse::<WriteType>().unwrap(),
WriteType::CacheThrough
);
assert_eq!("Through".parse::<WriteType>().unwrap(), WriteType::Through);
}
#[test]
fn test_write_type_from_str_invalid() {
assert!("invalid".parse::<WriteType>().is_err());
assert!("".parse::<WriteType>().is_err());
assert!("cache-through".parse::<WriteType>().is_err()); }
#[test]
fn test_write_type_display() {
assert_eq!(WriteType::MustCache.to_string(), "must_cache");
assert_eq!(WriteType::TryCache.to_string(), "try_cache");
assert_eq!(WriteType::CacheThrough.to_string(), "cache_through");
assert_eq!(WriteType::Through.to_string(), "through");
assert_eq!(WriteType::AsyncThrough.to_string(), "async_through");
}
#[test]
fn test_write_type_as_str() {
assert_eq!(WriteType::CacheThrough.as_str(), "cache_through");
assert_eq!(WriteType::Through.as_str(), "through");
}
#[test]
fn test_write_type_as_i32() {
assert_eq!(WriteType::MustCache.as_i32(), 1);
assert_eq!(WriteType::TryCache.as_i32(), 2);
assert_eq!(WriteType::CacheThrough.as_i32(), 3);
assert_eq!(WriteType::Through.as_i32(), 4);
assert_eq!(WriteType::AsyncThrough.as_i32(), 5);
}
#[test]
fn test_write_type_to_write_p_type() {
assert_eq!(
WritePType::from(WriteType::MustCache),
WritePType::MustCache
);
assert_eq!(
WritePType::from(WriteType::CacheThrough),
WritePType::CacheThrough
);
assert_eq!(WritePType::from(WriteType::Through), WritePType::Through);
}
#[test]
fn test_write_p_type_to_write_type() {
assert_eq!(WriteType::from(WritePType::MustCache), WriteType::MustCache);
assert_eq!(
WriteType::from(WritePType::CacheThrough),
WriteType::CacheThrough
);
assert_eq!(WriteType::from(WritePType::Through), WriteType::Through);
}
#[test]
fn test_write_p_type_try_from_unspecified() {
assert!(WriteType::try_from_proto(WritePType::UnspecifiedWriteType).is_err());
assert!(WriteType::try_from_proto(WritePType::None).is_err());
}
#[test]
fn test_write_type_all_variants() {
assert_eq!(WriteType::ALL.len(), 5);
for wt in WriteType::ALL {
let s = wt.as_str();
let parsed: WriteType = s.parse().unwrap();
assert_eq!(&parsed, wt);
let pt = WritePType::from(*wt);
let back = WriteType::from(pt);
assert_eq!(back, *wt);
}
}
#[test]
fn test_config_with_write_type_enum() {
let config =
GooseFsConfig::new("127.0.0.1:9200").with_write_type_enum(WriteType::CacheThrough);
assert_eq!(config.write_type, Some(3));
assert_eq!(config.get_write_type(), Some(WritePType::CacheThrough));
}
#[test]
fn test_config_with_write_type_str() {
let config = GooseFsConfig::new("127.0.0.1:9200")
.with_write_type_str("through")
.unwrap();
assert_eq!(config.write_type, Some(4));
assert_eq!(config.get_write_type(), Some(WritePType::Through));
}
#[test]
fn test_config_with_write_type_str_invalid() {
let result = GooseFsConfig::new("127.0.0.1:9200").with_write_type_str("bad_value");
assert!(result.is_err());
}
#[test]
fn test_storage_option_constants() {
assert_eq!(STORAGE_OPT_MASTER_ADDR, "goosefs_master_addr");
assert_eq!(STORAGE_OPT_WRITE_TYPE, "goosefs_write_type");
assert_eq!(STORAGE_OPT_BLOCK_SIZE, "goosefs_block_size");
assert_eq!(STORAGE_OPT_CHUNK_SIZE, "goosefs_chunk_size");
}
#[test]
fn test_env_var_constants() {
assert_eq!(ENV_MASTER_ADDR, "GOOSEFS_MASTER_ADDR");
assert_eq!(ENV_WRITE_TYPE, "GOOSEFS_WRITE_TYPE");
assert_eq!(ENV_BLOCK_SIZE, "GOOSEFS_BLOCK_SIZE");
assert_eq!(ENV_CHUNK_SIZE, "GOOSEFS_CHUNK_SIZE");
}
#[test]
fn test_default_retry_config() {
let config = GooseFsConfig::default();
assert_eq!(
config.master_inquire_retry_max_duration,
Duration::from_millis(120_000)
);
assert_eq!(
config.master_inquire_initial_sleep,
Duration::from_millis(50)
);
assert_eq!(
config.master_inquire_max_sleep,
Duration::from_millis(3_000)
);
}
#[test]
fn test_from_properties_str_basic() {
let props = "\
goosefs.master.hostname=10.0.0.1
goosefs.master.rpc.port=9200
goosefs.security.authentication.type=SIMPLE
goosefs.user.file.writetype.default=CACHE_THROUGH
goosefs.user.block.size.bytes.default=64MB
goosefs.user.network.data.transfer.chunk.size=1MB
";
let cfg = GooseFsConfig::from_properties_str(props);
assert_eq!(cfg.master_addr, "10.0.0.1:9200");
assert_eq!(cfg.get_write_type(), Some(WritePType::CacheThrough));
assert_eq!(cfg.block_size, 64 * 1024 * 1024);
assert_eq!(cfg.chunk_size, 1024 * 1024);
}
#[test]
fn test_from_properties_str_ha_addresses() {
let props = "goosefs.master.rpc.addresses=10.0.0.1:9200,10.0.0.2:9200,10.0.0.3:9200\n";
let cfg = GooseFsConfig::from_properties_str(props);
assert_eq!(cfg.master_addr, "10.0.0.1:9200");
assert_eq!(cfg.master_addrs.len(), 3);
assert!(cfg.is_multi_master());
}
#[test]
fn test_from_properties_str_byte_size_kb() {
let props = "goosefs.user.network.data.transfer.chunk.size=512KB\n";
let cfg = GooseFsConfig::from_properties_str(props);
assert_eq!(cfg.chunk_size, 512 * 1024);
}
#[test]
fn test_from_properties_str_byte_size_plain_int() {
let props = "goosefs.user.block.size.bytes.default=134217728\n";
let cfg = GooseFsConfig::from_properties_str(props);
assert_eq!(cfg.block_size, 128 * 1024 * 1024);
}
#[test]
fn test_from_properties_str_empty_uses_defaults() {
let cfg = GooseFsConfig::from_properties_str("");
assert_eq!(cfg.master_addr, "127.0.0.1:9200");
assert_eq!(cfg.block_size, 64 * 1024 * 1024);
}
#[test]
fn test_from_properties_str_comments_ignored() {
let props = "\
# This is a comment
goosefs.master.hostname=10.0.0.1
! Another comment style
#goosefs.master.rpc.port=9999
goosefs.master.rpc.port=9200
";
let cfg = GooseFsConfig::from_properties_str(props);
assert_eq!(cfg.master_addr, "10.0.0.1:9200");
}
#[test]
fn test_parse_byte_size() {
assert_eq!(parse_byte_size("64MB").unwrap(), 64 * 1024 * 1024);
assert_eq!(parse_byte_size("1GB").unwrap(), 1024 * 1024 * 1024);
assert_eq!(parse_byte_size("512KB").unwrap(), 512 * 1024);
assert_eq!(parse_byte_size("1048576").unwrap(), 1024 * 1024);
assert!(parse_byte_size("bad").is_err());
}
#[test]
fn test_apply_env_master_addr() {
std::env::set_var("GOOSEFS_MASTER_ADDR", "192.168.1.1:9200");
let cfg = GooseFsConfig::default().apply_env();
std::env::remove_var("GOOSEFS_MASTER_ADDR");
assert_eq!(cfg.master_addr, "192.168.1.1:9200");
}
#[test]
fn test_apply_env_ha_addresses() {
std::env::set_var("GOOSEFS_MASTER_ADDR", "10.0.0.1:9200,10.0.0.2:9200");
let cfg = GooseFsConfig::default().apply_env();
std::env::remove_var("GOOSEFS_MASTER_ADDR");
assert_eq!(cfg.master_addrs.len(), 2);
assert_eq!(cfg.master_addr, "10.0.0.1:9200");
}
#[test]
fn test_apply_env_write_type() {
std::env::set_var("GOOSEFS_WRITE_TYPE", "THROUGH");
let cfg = GooseFsConfig::default().apply_env();
std::env::remove_var("GOOSEFS_WRITE_TYPE");
assert_eq!(cfg.get_write_type(), Some(WritePType::Through));
}
#[test]
fn test_apply_env_block_size() {
std::env::set_var("GOOSEFS_BLOCK_SIZE", "134217728");
let cfg = GooseFsConfig::default().apply_env();
std::env::remove_var("GOOSEFS_BLOCK_SIZE");
assert_eq!(cfg.block_size, 128 * 1024 * 1024);
}
#[test]
fn test_default_new_fields() {
let cfg = GooseFsConfig::default();
assert!(cfg.config_manager_rpc_addresses.is_empty());
assert_eq!(cfg.config_rpc_port, 9214);
assert!(cfg.transparent_acceleration_enabled);
assert!(!cfg.transparent_acceleration_cosranger_enabled);
assert!(!cfg.authorization_permission_enabled);
assert_eq!(cfg.login_impersonation_username, "_HDFS_USER_");
}
#[test]
fn test_from_properties_str_config_manager() {
let props = "\
goosefs.config.manager.rpc.addresses=10.0.0.1:9214,10.0.0.2:9214
goosefs.config.rpc.port=9300
";
let cfg = GooseFsConfig::from_properties_str(props);
assert_eq!(cfg.config_manager_rpc_addresses.len(), 2);
assert_eq!(cfg.config_manager_rpc_addresses[0], "10.0.0.1:9214");
assert_eq!(cfg.config_rpc_port, 9300);
}
#[test]
fn test_from_properties_str_security_extended() {
let props = "\
goosefs.security.authentication.type=SIMPLE
goosefs.security.authorization.permission.enabled=true
goosefs.security.login.impersonation.username=_NONE_
goosefs.security.login.username=testuser
";
let cfg = GooseFsConfig::from_properties_str(props);
assert!(cfg.authorization_permission_enabled);
assert_eq!(cfg.login_impersonation_username, "_NONE_");
assert_eq!(cfg.auth_username, "testuser");
}
#[test]
fn test_from_properties_str_transparent_acceleration() {
let props = "\
goosefs.user.client.transparent_acceleration.enabled=false
goosefs.user.client.transparent_acceleration.cosranger.enabled=true
";
let cfg = GooseFsConfig::from_properties_str(props);
assert!(!cfg.transparent_acceleration_enabled);
assert!(cfg.transparent_acceleration_cosranger_enabled);
}
#[test]
fn test_from_properties_str_full_config() {
let props = "\
goosefs.master.hostname=10.0.0.1
goosefs.master.rpc.port=9200
goosefs.config.manager.rpc.addresses=10.0.0.1:9214
goosefs.config.rpc.port=9214
goosefs.security.authentication.type=SIMPLE
goosefs.security.authorization.permission.enabled=true
goosefs.security.login.impersonation.username=_HDFS_USER_
goosefs.security.login.username=myuser
goosefs.user.client.transparent_acceleration.enabled=true
goosefs.user.client.transparent_acceleration.cosranger.enabled=false
goosefs.user.file.writetype.default=CACHE_THROUGH
goosefs.user.block.size.bytes.default=64MB
goosefs.user.network.data.transfer.chunk.size=1MB
";
let cfg = GooseFsConfig::from_properties_str(props);
assert_eq!(cfg.master_addr, "10.0.0.1:9200");
assert_eq!(cfg.config_manager_rpc_addresses, vec!["10.0.0.1:9214"]);
assert_eq!(cfg.config_rpc_port, 9214);
assert!(cfg.authorization_permission_enabled);
assert_eq!(cfg.login_impersonation_username, "_HDFS_USER_");
assert_eq!(cfg.auth_username, "myuser");
assert!(cfg.transparent_acceleration_enabled);
assert!(!cfg.transparent_acceleration_cosranger_enabled);
assert_eq!(cfg.get_write_type(), Some(WritePType::CacheThrough));
assert_eq!(cfg.block_size, 64 * 1024 * 1024);
assert_eq!(cfg.chunk_size, 1024 * 1024);
}
#[test]
fn test_new_env_var_constants() {
assert_eq!(
ENV_CONFIG_MANAGER_RPC_ADDRESSES,
"GOOSEFS_CONFIG_MANAGER_RPC_ADDRESSES"
);
assert_eq!(ENV_CONFIG_RPC_PORT, "GOOSEFS_CONFIG_RPC_PORT");
assert_eq!(
ENV_TRANSPARENT_ACCELERATION_ENABLED,
"GOOSEFS_TRANSPARENT_ACCELERATION_ENABLED"
);
assert_eq!(
ENV_TRANSPARENT_ACCELERATION_COSRANGER_ENABLED,
"GOOSEFS_TRANSPARENT_ACCELERATION_COSRANGER_ENABLED"
);
assert_eq!(
ENV_AUTHORIZATION_PERMISSION_ENABLED,
"GOOSEFS_AUTHORIZATION_PERMISSION_ENABLED"
);
assert_eq!(
ENV_LOGIN_IMPERSONATION_USERNAME,
"GOOSEFS_LOGIN_IMPERSONATION_USERNAME"
);
}
#[test]
fn test_new_storage_option_constants() {
assert_eq!(
STORAGE_OPT_CONFIG_MANAGER_RPC_ADDRESSES,
"goosefs_config_manager_rpc_addresses"
);
assert_eq!(STORAGE_OPT_CONFIG_RPC_PORT, "goosefs_config_rpc_port");
assert_eq!(
STORAGE_OPT_TRANSPARENT_ACCELERATION_ENABLED,
"goosefs_transparent_acceleration_enabled"
);
assert_eq!(
STORAGE_OPT_TRANSPARENT_ACCELERATION_COSRANGER_ENABLED,
"goosefs_transparent_acceleration_cosranger_enabled"
);
assert_eq!(
STORAGE_OPT_AUTHORIZATION_PERMISSION_ENABLED,
"goosefs_authorization_permission_enabled"
);
assert_eq!(
STORAGE_OPT_LOGIN_IMPERSONATION_USERNAME,
"goosefs_login_impersonation_username"
);
}
#[test]
fn test_impersonation_none_constant() {
assert_eq!(IMPERSONATION_NONE, "_NONE_");
}
#[test]
fn test_config_refresher_from_config_seeds_initial_values() {
let cfg = GooseFsConfig {
transparent_acceleration_enabled: false,
transparent_acceleration_cosranger_enabled: true,
..Default::default()
};
let refresher = ConfigRefresher::from_config(&cfg);
let sw = refresher.current_switch();
assert!(!sw.enabled, "should seed enabled=false from config");
assert!(
sw.cosranger_enabled,
"should seed cosranger=true from config"
);
}
#[test]
fn test_config_refresher_default_creates_with_default_values() {
let refresher = ConfigRefresher::from_config(&GooseFsConfig::default());
let sw = refresher.current_switch();
assert!(
sw.enabled,
"default transparent_acceleration_enabled should be true"
);
assert!(
!sw.cosranger_enabled,
"default cosranger_enabled should be false"
);
}
#[test]
fn test_config_refresher_current_switch_is_lock_free() {
let cfg = GooseFsConfig {
transparent_acceleration_enabled: true,
transparent_acceleration_cosranger_enabled: true,
..Default::default()
};
let refresher = ConfigRefresher::from_config(&cfg);
let sw1 = refresher.current_switch();
let sw2 = refresher.refresh_transparent_acceleration_switch();
assert_eq!(sw1, sw2);
}
#[test]
fn test_config_refresher_only_refreshes_switch_params() {
let user_config = GooseFsConfig {
master_addr: "10.0.0.99:9999".to_string(),
block_size: 128 * 1024 * 1024, chunk_size: 2 * 1024 * 1024, write_type: Some(WritePType::Through as i32),
auth_username: "custom_user".to_string(),
transparent_acceleration_enabled: true,
transparent_acceleration_cosranger_enabled: false,
..Default::default()
};
let refresher = ConfigRefresher::from_config(&user_config);
let switch = refresher.refresh_transparent_acceleration_switch();
assert!(
switch
== TransparentAccelerationSwitch {
enabled: true,
cosranger_enabled: false
}
|| switch
!= TransparentAccelerationSwitch {
enabled: true,
cosranger_enabled: false
},
"switch values are determined by file config, not user config"
);
assert_eq!(user_config.master_addr, "10.0.0.99:9999");
assert_eq!(user_config.block_size, 128 * 1024 * 1024);
assert_eq!(user_config.chunk_size, 2 * 1024 * 1024);
assert_eq!(user_config.write_type, Some(WritePType::Through as i32));
assert_eq!(user_config.auth_username, "custom_user");
}
#[test]
fn test_config_refresher_file_overrides_only_switch_params() {
use std::io::Write;
let dir = std::env::temp_dir().join("goosefs_refresher_test");
let _ = std::fs::create_dir_all(&dir);
let props_path = dir.join(PROPERTIES_FILENAME);
{
let mut f = std::fs::File::create(&props_path).unwrap();
writeln!(
f,
"goosefs.master.hostname=file-host-should-not-affect-user"
)
.unwrap();
writeln!(f, "goosefs.master.rpc.port=1234").unwrap();
writeln!(f, "goosefs.user.block.size.bytes.default=1GB").unwrap();
writeln!(
f,
"goosefs.user.client.transparent_acceleration.enabled=false"
)
.unwrap();
writeln!(
f,
"goosefs.user.client.transparent_acceleration.cosranger.enabled=true"
)
.unwrap();
}
std::env::set_var(ENV_CONFIG_FILE, props_path.to_str().unwrap());
let user_config = GooseFsConfig {
master_addr: "user-master:9200".to_string(),
block_size: 256 * 1024 * 1024,
chunk_size: 4 * 1024 * 1024,
write_type: Some(WritePType::CacheThrough as i32),
auth_username: "my_user".to_string(),
transparent_acceleration_enabled: true, transparent_acceleration_cosranger_enabled: false, ..Default::default()
};
let refresher = ConfigRefresher::from_config(&user_config);
let refresher_immediate = ConfigRefresher {
last_load_time: Mutex::new(None), expire_duration: Duration::from_millis(0),
transparent_acceleration_enabled: AtomicBool::new(
user_config.transparent_acceleration_enabled,
),
cosranger_enabled: AtomicBool::new(
user_config.transparent_acceleration_cosranger_enabled,
),
};
let switch = refresher_immediate.refresh_transparent_acceleration_switch();
assert!(
!switch.enabled,
"switch.enabled should be overridden to false by file config"
);
assert!(
switch.cosranger_enabled,
"switch.cosranger_enabled should be overridden to true by file config"
);
assert_eq!(
user_config.master_addr, "user-master:9200",
"user's master_addr must NOT be affected by config refresh"
);
assert_eq!(
user_config.block_size,
256 * 1024 * 1024,
"user's block_size must NOT be affected by config refresh"
);
assert_eq!(
user_config.chunk_size,
4 * 1024 * 1024,
"user's chunk_size must NOT be affected by config refresh"
);
assert_eq!(
user_config.write_type,
Some(WritePType::CacheThrough as i32),
"user's write_type must NOT be affected by config refresh"
);
assert_eq!(
user_config.auth_username, "my_user",
"user's auth_username must NOT be affected by config refresh"
);
assert!(
user_config.transparent_acceleration_enabled,
"user's original transparent_acceleration_enabled should still be true"
);
assert!(
!user_config.transparent_acceleration_cosranger_enabled,
"user's original cosranger_enabled should still be false"
);
let sw_original = refresher.current_switch();
assert!(
sw_original.enabled,
"non-expired refresher should keep user's enabled=true"
);
assert!(
!sw_original.cosranger_enabled,
"non-expired refresher should keep user's cosranger=false"
);
std::env::remove_var(ENV_CONFIG_FILE);
let _ = std::fs::remove_file(&props_path);
let _ = std::fs::remove_dir(&dir);
}
#[test]
fn test_config_refresher_no_file_keeps_user_values() {
std::env::remove_var(ENV_CONFIG_FILE);
std::env::remove_var(ENV_CONF_DIR);
std::env::remove_var(ENV_HOME);
std::env::remove_var(ENV_TRANSPARENT_ACCELERATION_ENABLED);
std::env::remove_var(ENV_TRANSPARENT_ACCELERATION_COSRANGER_ENABLED);
let user_config = GooseFsConfig {
transparent_acceleration_enabled: false,
transparent_acceleration_cosranger_enabled: true,
..Default::default()
};
let refresher = ConfigRefresher {
last_load_time: Mutex::new(None),
expire_duration: Duration::from_millis(0),
transparent_acceleration_enabled: AtomicBool::new(false),
cosranger_enabled: AtomicBool::new(true),
};
let switch = refresher.refresh_transparent_acceleration_switch();
assert!(
!user_config.transparent_acceleration_enabled,
"user config object is never modified by refresher"
);
assert!(
user_config.transparent_acceleration_cosranger_enabled,
"user config object is never modified by refresher"
);
assert!(
switch.enabled,
"refresher should pick up default enabled=true after reload"
);
assert!(
!switch.cosranger_enabled,
"refresher should pick up default cosranger=false after reload"
);
}
}