use crate::conf::{ConRegConfig, ConRegConfigWrapper};
use crate::config::Configs;
use crate::discovery::{Discovery, DiscoveryClient};
pub use crate::protocol::Instance;
use anyhow::bail;
use serde::de::DeserializeOwned;
use std::collections::HashMap;
use std::path::PathBuf;
use std::process::exit;
use std::sync::{Arc, OnceLock, RwLock};
pub mod conf;
mod config;
mod discovery;
pub mod lb;
mod network;
mod protocol;
mod utils;
#[cfg(feature = "feign")]
pub use conreg_feign_macro::{delete, feign_client, get, patch, post, put};
#[derive(Debug)]
pub enum FeignError {
RequestError(String),
DeserializationError(String),
InstanceNotFound(String),
LoadBalanceError(String),
}
impl std::fmt::Display for FeignError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
FeignError::RequestError(msg) => write!(f, "Request error: {}", msg),
FeignError::DeserializationError(msg) => {
write!(f, "Deserialization error: {}", msg)
}
FeignError::InstanceNotFound(msg) => write!(f, "Instance not found: {}", msg),
FeignError::LoadBalanceError(msg) => write!(f, "Load balance error: {}", msg),
}
}
}
impl std::error::Error for FeignError {}
impl From<crate::lb::LoadBalanceError> for FeignError {
fn from(err: crate::lb::LoadBalanceError) -> Self {
FeignError::LoadBalanceError(err.to_string())
}
}
struct Conreg;
static CONFIGS: OnceLock<Arc<RwLock<Configs>>> = OnceLock::new();
static DISCOVERY: OnceLock<Discovery> = OnceLock::new();
const NS_TOKEN_HEADER: &str = "X-NS-Token";
impl Conreg {
async fn init(file: Option<PathBuf>) -> anyhow::Result<()> {
#[cfg(feature = "tracing")]
utils::init_log();
let mut file = file.unwrap_or("bootstrap.yaml".into());
if !file.exists() {
file = "bootstrap.yml".into();
}
let s = match std::fs::read_to_string(&file) {
Ok(s) => s,
Err(e) => {
log::error!("no bootstrap.yaml found, {}", e);
exit(1);
}
};
log::info!("loaded bootstrap config from {}", file.display());
let config = match serde_yaml::from_str::<ConRegConfigWrapper>(&s) {
Ok(config) => config,
Err(e) => {
log::error!("parse bootstrap.yaml failed, {}", e);
exit(1);
}
};
Self::init_with(&config.conreg).await?;
log::info!("conreg init completed");
Ok(())
}
async fn init_with(config: &ConRegConfig) -> anyhow::Result<()> {
#[cfg(feature = "tracing")]
utils::init_log();
if config.config.is_some() {
let config_client = config::ConfigClient::new(config);
let configs = config_client.load().await?;
CONFIGS.set(Arc::new(RwLock::new(configs))).map_err(|_| {
anyhow::anyhow!(
"config has already been initialized, please do not initialize repeatedly"
)
})?;
}
if config.discovery.is_some() {
let discovery_client = DiscoveryClient::new(config);
discovery_client.register().await?;
let discovery = Discovery::new(discovery_client).await;
DISCOVERY.set(discovery).map_err(|_| {
anyhow::anyhow!(
"discovery has already been initialized, please do not initialize repeatedly"
)
})?;
}
Ok(())
}
}
pub async fn init() {
match Conreg::init(None).await {
Ok(_) => {}
Err(e) => {
log::error!("conreg init failed: {}", e);
exit(1);
}
};
}
pub async fn init_from_file(path: impl Into<PathBuf>) {
match Conreg::init(Some(path.into())).await {
Ok(_) => {}
Err(e) => {
log::error!("conreg init failed: {}", e);
exit(1);
}
};
}
pub async fn init_with(config: ConRegConfig) {
match Conreg::init_with(&config).await {
Ok(_) => {}
Err(e) => {
log::error!("conreg init failed: {}", e);
exit(1);
}
};
}
pub struct AppConfig;
impl AppConfig {
fn reload(configs: Configs) {
match CONFIGS.get() {
None => {
log::error!("config not init");
}
Some(config) => {
*config.write().unwrap() = configs;
}
}
}
pub fn get<V: DeserializeOwned>(key: &str) -> Option<V> {
match CONFIGS.get() {
None => {
log::error!("config not init");
None
}
Some(config) => match config.read().expect("read lock error").get(key) {
None => None,
Some(value) => match serde_yaml::from_value::<V>(value.clone()) {
Ok(value) => Some(value),
Err(e) => {
log::error!("parse config failed, {}", e);
None
}
},
},
}
}
pub fn get_raw<V: DeserializeOwned>(key: &str) -> Option<V> {
match CONFIGS.get() {
None => {
log::error!("config not init");
None
}
Some(config) => match config.read().expect("read lock error").get_raw(key) {
None => None,
Some(value) => match serde_yaml::from_value::<V>(value.clone()) {
Ok(value) => Some(value),
Err(e) => {
log::error!("parse config failed, {}", e);
None
}
},
},
}
}
pub fn add_listener(config_id: &str, handler: fn(&HashMap<String, serde_yaml::Value>)) {
Configs::add_listener(config_id, handler);
}
}
pub struct AppDiscovery;
impl AppDiscovery {
pub async fn get_instances(service_id: &str) -> anyhow::Result<Vec<Instance>> {
match DISCOVERY.get() {
Some(discovery) => {
let instances = discovery.get_instances(service_id).await;
Ok(instances)
}
None => {
bail!("discovery not initialized")
}
}
}
}
#[cfg(test)]
#[allow(unused)]
mod tests {
use crate::conf::{ClientConfigBuilder, ConRegConfigBuilder, DiscoveryConfigBuilder};
use crate::{AppConfig, AppDiscovery, init};
use reqwest::StatusCode;
use reqwest::multipart::{Form, Part};
use serde::{Deserialize, Serialize};
use serde_json::json;
use std::collections::HashMap;
#[tokio::test]
async fn test_config() {
init().await;
println!("{:?}", AppConfig::get::<String>("name"));
println!("{:?}", AppConfig::get::<u32>("age"));
AppConfig::add_listener("test.yaml", |config| {
println!("Listen config change1: {:?}", config);
});
AppConfig::add_listener("test2.yml", |config| {
println!("Listen config change2: {:?}", config);
});
let h = tokio::spawn(async move {
loop {
println!("{:?}", AppConfig::get::<String>("name"));
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
});
tokio::join!(h);
}
#[tokio::test]
async fn test_discovery() {
init().await;
let h = tokio::spawn(async move {
loop {
println!("{:?}", AppConfig::get::<String>("name"));
let instances =
AppDiscovery::get_instances(crate::utils::current_process_name().as_str())
.await
.unwrap();
println!("current: {:?}", instances);
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
});
tokio::join!(h);
}
}