use crate::conf::{ConfigConfig, ServerAddr};
use crate::network::HTTP;
use crate::protocol::request::{GetConfigReq, WatchConfigChangeReq};
use crate::{AppConfig, ConRegConfig};
use anyhow::Context;
use dashmap::DashMap;
use serde::{Deserialize, Serialize};
use serde_yaml::{Mapping, Value};
use std::collections::HashMap;
use std::sync::LazyLock;
use std::time::Duration;
pub struct ConfigClient {
config: ConfigConfig,
}
impl ConfigClient {
pub fn new(config: &ConRegConfig) -> Self {
ConfigClient {
config: config
.config
.clone()
.context("config not set, unable to create config client")
.unwrap(),
}
}
pub(crate) async fn load(&self) -> anyhow::Result<Configs> {
let mut contents = vec![];
for id in self.config.config_ids.iter() {
let content = Self::fetch_config(
&self.config.server_addr,
&self.config.namespace,
id,
&self.config.auth_token,
)
.await?;
contents.push((id.clone(), content));
}
self.start_watch().await?;
self.start_compensate().await?;
Configs::from_contents(contents)
}
async fn fetch_config(
server_addr: &ServerAddr,
namespace: &str,
config_id: &str,
auth_token: &Option<String>,
) -> anyhow::Result<String> {
let url = server_addr.build_url("/api/config/get")?;
let query = GetConfigReq {
namespace_id: namespace.to_string(),
id: config_id.to_string(),
};
let result = HTTP
.get::<HashMap<String, Value>>(
&url,
query,
match auth_token {
Some(token) => Some(vec![(crate::NS_TOKEN_HEADER, token.as_str())]),
None => None,
},
)
.await?;
let content = result
.get("content")
.ok_or(anyhow::anyhow!(
"config id [ {} ] not found in server",
config_id
))?
.as_str()
.unwrap();
log::info!("config {} fetched", config_id);
Ok(content.to_string())
}
async fn start_watch(&self) -> anyhow::Result<()> {
let config_clone = self.config.clone();
tokio::spawn(async move {
log::info!(
"start watch config changes in namespace: {}",
config_clone.namespace
);
let url = config_clone
.server_addr
.build_url("/api/config/watch")
.context("build url error from server addr")
.unwrap();
let query = WatchConfigChangeReq {
namespace_id: config_clone.namespace.clone(),
};
loop {
match HTTP.get::<Option<String>>(&url, &query, None).await {
Ok(changed_config_id) => {
if changed_config_id.is_none() {
log::info!("config no changed");
continue;
}
log::info!("config changed, reloading config");
let mut contents = vec![];
for id in config_clone.config_ids.iter() {
let content = Self::fetch_config(
&config_clone.server_addr,
&config_clone.namespace,
id,
&config_clone.auth_token,
)
.await
.unwrap();
contents.push((id.clone(), content));
}
let config = Configs::from_contents(contents).unwrap();
let new_configs = config.get_all().clone();
AppConfig::reload(config);
log::info!("config reloaded");
Self::notify_config_change(
&changed_config_id.unwrap(), &new_configs,
);
}
Err(e) => {
log::error!("watch config changes error: {}", e);
tokio::time::sleep(Duration::from_millis(500)).await;
}
};
}
});
Ok(())
}
async fn start_compensate(&self) -> anyhow::Result<()> {
let config_clone = self.config.clone();
tokio::spawn(async move {
log::info!(
"start config compensate in namespace: {}",
config_clone.namespace
);
loop {
tokio::time::sleep(Duration::from_secs(60)).await;
log::debug!("starting fetch config");
let mut contents = vec![];
for id in config_clone.config_ids.iter() {
match Self::fetch_config(
&config_clone.server_addr,
&config_clone.namespace,
id,
&config_clone.auth_token,
)
.await
{
Ok(res) => contents.push((id.clone(), res)),
Err(e) => {
log::error!("fetch config error: {}", e);
tokio::time::sleep(Duration::from_millis(500)).await;
}
};
}
AppConfig::reload(Configs::from_contents(contents).unwrap());
log::debug!("config fetch success");
}
});
Ok(())
}
fn notify_config_change(config_id: &str, changed_configs: &HashMap<String, Value>) {
let listeners = CONFIG_LISTENER.listeners.get(config_id);
if let Some(listeners) = listeners
&& !listeners.is_empty()
{
for handler in &*listeners {
handler(changed_configs)
}
}
}
}
#[derive(Debug, Serialize, Deserialize)]
pub struct Configs {
pub flatten_config: HashMap<String, Value>,
pub merged_config: HashMap<String, Value>,
}
type ConfigListeners = DashMap<String, Vec<fn(&HashMap<String, Value>)>>;
struct ConfigListener {
listeners: ConfigListeners,
}
static CONFIG_LISTENER: LazyLock<ConfigListener> = LazyLock::new(|| ConfigListener {
listeners: DashMap::new(),
});
impl Configs {
fn from_contents(contents: Vec<(String, String)>) -> anyhow::Result<Self> {
let mut builder = config::Config::builder();
for (config_id, content) in contents {
let format = Self::get_format(config_id.as_str())?;
builder = builder.add_source(config::File::from_str(&content, format));
}
let merged_config = builder
.build()?
.try_deserialize::<HashMap<String, Value>>()?;
let mut flatten_config = HashMap::new();
Self::flatten_yaml_value(
&mut flatten_config,
"",
Value::Mapping(Mapping::from_iter(
merged_config
.clone()
.into_iter()
.map(|(k, v)| (k.into(), v)),
)),
);
Ok(Configs {
flatten_config,
merged_config,
})
}
fn get_format(config_id: &str) -> anyhow::Result<config::FileFormat> {
let format = config_id.split('.').last().expect("invalid config id");
let format = match format {
"yaml" | "yml" => config::FileFormat::Yaml,
"json" => config::FileFormat::Json,
"ini" | "properties" => config::FileFormat::Ini,
"toml" => config::FileFormat::Toml,
_ => anyhow::bail!("unsupported config format: {}", config_id),
};
Ok(format)
}
fn flatten_yaml_value(result: &mut HashMap<String, Value>, prefix: &str, value: Value) {
match value {
Value::Mapping(mapping) => {
for (key, val) in mapping {
let key_str = match key {
Value::String(s) => s.clone(),
Value::Number(num) => num.to_string(),
_ => "unknown".to_string(),
};
let new_prefix = if prefix.is_empty() {
key_str
} else {
format!("{}.{}", prefix, key_str)
};
Self::flatten_yaml_value(result, &new_prefix, val);
}
}
_ => {
result.insert(prefix.to_string(), value.clone());
}
}
}
pub fn get(&self, key: &str) -> Option<&Value> {
self.flatten_config.get(key)
}
pub fn get_raw(&self, key: &str) -> Option<&Value> {
self.merged_config.get(key)
}
#[allow(unused)]
pub fn get_all(&self) -> &HashMap<String, Value> {
&self.flatten_config
}
#[allow(unused)]
pub fn contains(&self, key: &str) -> bool {
self.flatten_config.contains_key(key)
}
pub fn add_listener(config_id: &str, handler: fn(&HashMap<String, Value>)) {
if let Some(mut handlers) = CONFIG_LISTENER.listeners.get_mut(config_id) {
handlers.push(handler);
} else {
CONFIG_LISTENER
.listeners
.insert(config_id.to_string(), vec![handler]);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_app_config() {
let contents = vec![
(
"test1.yaml".to_string(),
r#"
a: 1
b: 2
c:
d: 3
e: 4
f:
g: 0
h:
- 1
- 2
"#
.to_string(),
),
(
"test2.yaml".to_string(),
r#"
a: 5
b: 6
c:
d: 7
e: 8
f: x
1: -1
h:
- 1
- 3
"#
.to_string(),
),
];
let config = Configs::from_contents(contents).unwrap();
println!("{:?}", config);
println!("{:?}", config.get("a"));
println!("{:?}", config.get("h"));
}
}