conreg_client/
lib.rs

1//! # Conreg Client
2//!
3//! conreg是一个参考了Nacos设计的分布式服务注册和配置中心。
4//!
5//! conreg-client是conreg的客户端SDK,用于集成到您的服务中和conreg-server通信。
6//!
7//! > 注意:当前conreg的0.1.x版本仍处于快速迭代中,API在未来可能会发生变化
8//!
9//! # 快速开始
10//!
11//! ## 基本使用
12//! 在项目的根目录下添加`bootstrap.yaml`配置文件:
13//! ```yaml
14//! conreg:
15//!   # 服务ID
16//!   # 服务ID是服务的唯一标识,同一命名空间下的服务ID不能重复
17//!   service-id: test
18//!   # 客户端配置,这些信息将会作为服务实例的基本信息提交到注册中心
19//!   client:
20//!     # 监听地址
21//!     address: 127.0.0.1
22//!     # 端口
23//!     port: 8000
24//!   # 配置中心配置
25//!   config:
26//!     # 配置中心地址
27//!     server-addr: 127.0.0.1:8000
28//!     # 配置ID
29//!     # 如果多个配置中存在同名配置key,则靠后的配置将会覆盖之前的配置
30//!     config-ids:
31//!       - test.yaml
32//!   # 注册中心配置
33//!   discovery:
34//!     # 注册中心地址
35//!     server-addr:
36//!       - 127.0.0.1:8000
37//!       - 127.0.0.1:8001
38//!       - 127.0.0.1:8002
39//! ```
40//!
41//! 然后,在`main`函数中初始化:
42//! ```rust
43//! #[tokio::main]
44//! async fn main(){
45//!     // 初始化
46//!     init().await;
47//!
48//!     // 获取配置项
49//!     println!("{:?}", AppConfig::get::<String>("name"));
50//!
51//!     // 获取服务实例
52//!     let instances = AppDiscovery::get_instances("your_service_id").await.unwrap();
53//!     println!("service instances: {:?}", instances);
54//! }
55//! ```
56//!
57//! ## 命名空间
58//! conreg使用命名空间(Namespace)来对配置和服务进行隔离,默认命名空间为`public`。
59//!
60//! ## 配置中心
61//! 从配置中心中加载,并使用这些配置。目前仅支持`yaml`格式的配置。
62//!
63//! ### 初始化并加载配置
64//! ```rust
65//! #[tokio::main]
66//!  async fn main() {
67//!     init_with(
68//!         ConRegConfigBuilder::default()
69//!             .config(
70//!                 ConfigConfigBuilder::default()
71//!                     .server_addr("127.0.0.1:8000")
72//!                     .namespace("public")
73//!                     .config_ids(vec!["test.yaml".into()])
74//!                     .build()
75//!                     .unwrap(),
76//!             )
77//!             .build()
78//!             .unwrap(),
79//!
80//!     )
81//!     .await;
82//!     println!("{:?}", AppConfig::get::<String>("name"));
83//!     println!("{:?}", AppConfig::get::<u32>("age"));
84//!  }
85//! ```
86//!
87//! ### 从配置文件初始化
88//! conreg-client默认从项目根目录下的bootstrap.yaml加载配置初始化配置,就像SpringCloud一样。
89//!
90//! 以下是`bootstrap.yaml`配置示例
91//!
92//! ```yaml
93//! conreg:
94//!   config:
95//!     server-addr: 127.0.0.1:8000
96//!     config-ids:
97//!       - your_config.yaml
98//!
99//! ```
100//!
101//! 然后调用`init`方法即可初始化并获取配置内容。
102//! ```rust
103//! #[tokio::main]
104//!  async fn main() {
105//!     init().await;
106//!     // 或者指定配置文件路径
107//!     // init_from_file("config.yaml").await;
108//!     println!("{:?}", AppConfig::get::<String>("name"));
109//!     println!("{:?}", AppConfig::get::<u32>("age"));
110//!  }
111//! ```
112//!
113//! ## 注册中心
114//! 用于服务注册和发现。
115//!
116//! ### 初始化并加载配置
117//! ```rust
118//! #[tokio::main]
119//! async fn main() {
120//! let config = ConRegConfigBuilder::default()
121//!     .service_id("your_service_id")
122//!     .client(
123//!         ClientConfigBuilder::default()
124//!             .address("127.0.0.1")
125//!             .port(8080)
126//!             .build()
127//!             .unwrap(),
128//!     )
129//!     .discovery(
130//!         DiscoveryConfigBuilder::default()
131//!             .server_addr("127.0.0.1:8000")
132//!             .build()
133//!             .unwrap(),
134//!     )
135//!     .build()
136//!     .unwrap();
137//!     let service_id = config.service_id.clone();
138//!     init_with(config).await;
139//!     let instances = AppDiscovery::get_instances(&service_id).await.unwrap();
140//!     println!("service instances: {:?}", instances);
141//! }
142//! ```
143//!
144//! ### 从配置文件初始化
145//!
146//! 默认从`bootstrap.yaml`中加载配置。
147//!
148//! 以下是示例配置:
149//! ```yaml
150//! conreg:
151//!   service-id: your_service_id
152//!   client:
153//!     address: 127.0.0.1
154//!     port: 8000
155//!   discovery:
156//!     server-addr:
157//!       - 127.0.0.1:8000
158//!       - 127.0.0.1:8001
159//!       - 127.0.0.1:8002
160//! ```
161//! ```rust
162//! #[tokio::main]
163//!  async fn main() {
164//!     init().await;
165//!     // 或者指定配置文件路径
166//!     // init_from_file("config.yaml").await;
167//!     init_with(config).await;
168//!
169//!     let service_id = "your_service_id";
170//!     let instances = AppDiscovery::get_instances(service_id).await.unwrap();
171//!     println!("service instances: {:?}", instances);
172//!  }
173//! ```
174
175use crate::conf::{ConRegConfig, ConRegConfigWrapper};
176use crate::config::Configs;
177use crate::discovery::{Discovery, DiscoveryClient};
178pub use crate::protocol::Instance;
179use anyhow::bail;
180use env_logger::WriteStyle;
181use serde::de::DeserializeOwned;
182use std::path::PathBuf;
183use std::process::exit;
184use std::sync::{Arc, OnceLock, RwLock};
185
186mod conf;
187mod config;
188mod discovery;
189mod network;
190mod protocol;
191mod utils;
192
193struct ConReg;
194
195/// 存储配置内容
196static CONFIGS: OnceLock<Arc<RwLock<Configs>>> = OnceLock::new();
197/// 服务发现全局实例
198static DISCOVERY: OnceLock<Discovery> = OnceLock::new();
199
200impl ConReg {
201    /// 初始化配置中心和注册中心
202    async fn init(file: Option<PathBuf>) -> anyhow::Result<()> {
203        let mut file = file.unwrap_or("bootstrap.yaml".into());
204        if !file.exists() {
205            file = "bootstrap.yml".into();
206        }
207        let s = match std::fs::read_to_string(&file) {
208            Ok(s) => s,
209            Err(e) => {
210                log::error!("no bootstrap.yaml found, {}", e);
211                exit(1);
212            }
213        };
214
215        log::info!("loaded bootstrap config from {}", file.display());
216
217        let config = match serde_yaml::from_str::<ConRegConfigWrapper>(&s) {
218            Ok(config) => config,
219            Err(e) => {
220                log::error!("parse bootstrap.yaml failed, {}", e);
221                exit(1);
222            }
223        };
224
225        Self::init_with(&config.conreg).await?;
226
227        log::info!("conreg init completed");
228        Ok(())
229    }
230
231    async fn init_with(config: &ConRegConfig) -> anyhow::Result<()> {
232        init_log();
233
234        if config.config.is_some() {
235            let config_client = config::ConfigClient::new(&config);
236            let configs = config_client.load().await?;
237            CONFIGS.set(Arc::new(RwLock::new(configs))).map_err(|_| {
238                anyhow::anyhow!(
239                    "config has already been initialized, please do not initialize repeatedly"
240                )
241            })?;
242        }
243
244        if config.discovery.is_some() {
245            let discovery_client = DiscoveryClient::new(config);
246            discovery_client.register().await?;
247            let discovery = Discovery::new(discovery_client).await;
248            DISCOVERY.set(discovery).map_err(|_| {
249                anyhow::anyhow!(
250                    "discovery has already been initialized, please do not initialize repeatedly"
251                )
252            })?;
253        }
254
255        Ok(())
256    }
257}
258
259/// 初始化配置中心和注册中心
260pub async fn init() {
261    match ConReg::init(None).await {
262        Ok(_) => {}
263        Err(e) => {
264            log::error!("conreg init failed: {}", e);
265            exit(1);
266        }
267    };
268}
269
270/// 从配置文件初始化配置中心和注册中心
271pub async fn init_from_file(path: impl Into<PathBuf>) {
272    match ConReg::init(Some(path.into())).await {
273        Ok(_) => {}
274        Err(e) => {
275            log::error!("conreg init failed: {}", e);
276            exit(1);
277        }
278    };
279}
280
281/// 从自定义配置初始化
282pub async fn init_with(config: ConRegConfig) {
283    match ConReg::init_with(&config).await {
284        Ok(_) => {}
285        Err(e) => {
286            log::error!("conreg init failed: {}", e);
287            exit(1);
288        }
289    };
290}
291
292pub struct AppConfig;
293impl AppConfig {
294    fn reload(configs: Configs) {
295        match CONFIGS.get() {
296            None => {
297                log::error!("config not init");
298            }
299            Some(config) => {
300                *config.write().unwrap() = configs;
301            }
302        }
303    }
304
305    /// 获取配置值
306    ///
307    /// 注意:获取的值类型需要与配置中的值类型保持一致,如果不一致,可能会导致转换失败,
308    /// 转换失败时将返回`None`
309    pub fn get<V: DeserializeOwned>(key: &str) -> Option<V> {
310        match CONFIGS.get() {
311            None => {
312                log::error!("config not init");
313                None
314            }
315            Some(config) => match config.read().expect("read lock error").get(key) {
316                None => None,
317                Some(value) => match serde_yaml::from_value::<V>(value.clone()) {
318                    Ok(value) => Some(value),
319                    Err(e) => {
320                        log::error!("parse config failed, {}", e);
321                        None
322                    }
323                },
324            },
325        }
326    }
327
328    /// 绑定配置内容到一个struct。
329    pub fn bind<T: DeserializeOwned>() -> anyhow::Result<T> {
330        match CONFIGS.get() {
331            None => {
332                bail!("config not init");
333            }
334            Some(config) => {
335                let value: T = serde_yaml::from_value(
336                    config.read().expect("read lock error").content.clone(),
337                )?;
338                Ok(value)
339            }
340        }
341    }
342}
343
344fn init_log() {
345    use std::io::Write;
346    env_logger::Builder::new()
347        .filter_level(log::LevelFilter::Info)
348        .parse_default_env()
349        .format(|buf, record| {
350            let level = record.level().as_str();
351            writeln!(
352                buf,
353                "[{}][{}] - {}",
354                chrono::Local::now().format("%Y-%m-%d %H:%M:%S%.3f"),
355                level,
356                record.args()
357            )
358        })
359        .write_style(WriteStyle::Always)
360        .init();
361}
362
363pub struct AppDiscovery;
364impl AppDiscovery {
365    pub async fn get_instances(service_id: &str) -> anyhow::Result<Vec<Instance>> {
366        match DISCOVERY.get() {
367            Some(discovery) => {
368                //let discovery = discovery.read().expect("read lock error");
369                let instances = discovery.get_instances(service_id).await;
370                Ok(instances)
371            }
372            None => {
373                bail!("discovery not initialized")
374            }
375        }
376    }
377}
378
379#[cfg(test)]
380#[allow(unused)]
381mod tests {
382    use super::*;
383    use crate::conf::{ClientConfigBuilder, ConRegConfigBuilder, DiscoveryConfigBuilder};
384    use serde::Deserialize;
385    #[tokio::test]
386    async fn test_config() {
387        //init_log();
388        init().await;
389        //init_from_file("bootstrap.yaml").await;
390        /*init_with(
391            ConRegConfigBuilder::default()
392                .config(
393                    ConfigConfigBuilder::default()
394                        .server_addr("127.0.0.1:8000")
395                        .namespace("public")
396                        .config_ids(vec!["test.yaml".into()])
397                        .build()
398                        .unwrap(),
399                )
400                .build()
401                .unwrap(),
402        )
403        .await;*/
404        println!("{:?}", AppConfig::get::<String>("name"));
405        println!("{:?}", AppConfig::get::<u32>("age"));
406
407        #[derive(Deserialize)]
408        struct MyConfig {
409            name: String,
410        }
411        let my_config = AppConfig::bind::<MyConfig>().unwrap();
412        println!("my config, name: {:?}", my_config.name);
413
414        let h = tokio::spawn(async move {
415            loop {
416                println!("{:?}", AppConfig::get::<String>("name"));
417                tokio::time::sleep(std::time::Duration::from_secs(1)).await;
418            }
419        });
420        tokio::join!(h);
421    }
422
423    #[tokio::test]
424    async fn test_discovery() {
425        //init_log();
426        //init().await;
427        let config = ConRegConfigBuilder::default()
428            .service_id("your_service_id")
429            .client(
430                ClientConfigBuilder::default()
431                    .address("127.0.0.1")
432                    .port(8080)
433                    .build()
434                    .unwrap(),
435            )
436            .discovery(
437                DiscoveryConfigBuilder::default()
438                    .server_addr("127.0.0.1:8000")
439                    .build()
440                    .unwrap(),
441            )
442            .build()
443            .unwrap();
444        // println!("config: {:?}", config);
445        let service_id = config.service_id.clone();
446        init_with(config).await;
447        let h = tokio::spawn(async move {
448            loop {
449                let instances = AppDiscovery::get_instances(&service_id).await.unwrap();
450                println!("current: {:?}", instances);
451                tokio::time::sleep(std::time::Duration::from_secs(1)).await;
452            }
453        });
454        tokio::join!(h);
455    }
456}