conreg_client/lib.rs
1//! conreg是一个参考了Nacos设计的分布式服务注册和配置中心。详情请看:[conreg](https://github.com/xgpxg/conreg)
2//!
3//! conreg-client是conreg的客户端SDK,用于集成到您的服务中和conreg-server通信。
4//!
5//! ℹ️ 注意:当前conreg的0.1.x版本仍处于快速迭代中,API在未来可能会发生变化
6//!
7//! # 快速开始
8//!
9//! ## 基本使用
10//! 在项目的根目录下添加`bootstrap.yaml`配置文件:
11//! ```yaml
12//! conreg:
13//! # 服务ID
14//! # 服务ID是服务的唯一标识,同一命名空间下的服务ID不能重复
15//! service-id: test
16//! # 客户端配置,这些信息将会作为服务实例的基本信息提交到注册中心
17//! client:
18//! # 监听地址
19//! address: 127.0.0.1
20//! # 端口
21//! port: 8000
22//! # 配置中心配置
23//! config:
24//! # 配置中心地址
25//! server-addr: 127.0.0.1:8000
26//! # 配置ID
27//! # 如果多个配置中存在同名配置key,则靠后的配置将会覆盖之前的配置
28//! config-ids:
29//! - test.yaml
30//! # 注册中心配置
31//! discovery:
32//! # 注册中心地址
33//! server-addr:
34//! - 127.0.0.1:8000
35//! - 127.0.0.1:8001
36//! - 127.0.0.1:8002
37//! ```
38//!
39//! 然后,在`main`函数中初始化:
40//! ```rust
41//! #[tokio::main]
42//! async fn main(){
43//! // 初始化
44//! init().await;
45//!
46//! // 获取配置项
47//! println!("{:?}", AppConfig::get::<String>("name"));
48//!
49//! // 获取服务实例
50//! let instances = AppDiscovery::get_instances("your_service_id").await.unwrap();
51//! println!("service instances: {:?}", instances);
52//! }
53//! ```
54//!
55//! ## 命名空间
56//! conreg使用命名空间(Namespace)来对配置和服务进行隔离,默认命名空间为`public`。
57//!
58//! ## 配置中心
59//! 从配置中心中加载,并使用这些配置。目前仅支持`yaml`格式的配置。
60//!
61//! ### 初始化并加载配置
62//! ```rust
63//! #[tokio::main]
64//! async fn main() {
65//! init_with(
66//! ConRegConfigBuilder::default()
67//! .config(
68//! ConfigConfigBuilder::default()
69//! .server_addr("127.0.0.1:8000")
70//! .namespace("public")
71//! .config_ids(vec!["test.yaml".into()])
72//! .build()
73//! .unwrap(),
74//! )
75//! .build()
76//! .unwrap(),
77//!
78//! )
79//! .await;
80//! println!("{:?}", AppConfig::get::<String>("name"));
81//! println!("{:?}", AppConfig::get::<u32>("age"));
82//! }
83//! ```
84//!
85//! ### 从配置文件初始化
86//! conreg-client默认从项目根目录下的bootstrap.yaml加载配置初始化配置,就像SpringCloud一样。
87//!
88//! 以下是`bootstrap.yaml`配置示例
89//!
90//! ```yaml
91//! conreg:
92//! config:
93//! server-addr: 127.0.0.1:8000
94//! config-ids:
95//! - your_config.yaml
96//!
97//! ```
98//!
99//! 然后调用`init`方法即可初始化并获取配置内容。
100//! ```rust
101//! #[tokio::main]
102//! async fn main() {
103//! init().await;
104//! // 或者指定配置文件路径
105//! // init_from_file("config.yaml").await;
106//! println!("{:?}", AppConfig::get::<String>("name"));
107//! println!("{:?}", AppConfig::get::<u32>("age"));
108//! }
109//! ```
110//!
111//! ## 注册中心
112//! 用于服务注册和发现。
113//!
114//! ### 初始化并加载配置
115//! ```rust
116//! #[tokio::main]
117//! async fn main() {
118//! let config = ConRegConfigBuilder::default()
119//! .service_id("your_service_id")
120//! .client(
121//! ClientConfigBuilder::default()
122//! .address("127.0.0.1")
123//! .port(8080)
124//! .build()
125//! .unwrap(),
126//! )
127//! .discovery(
128//! DiscoveryConfigBuilder::default()
129//! .server_addr("127.0.0.1:8000")
130//! .build()
131//! .unwrap(),
132//! )
133//! .build()
134//! .unwrap();
135//! let service_id = config.service_id.clone();
136//! init_with(config).await;
137//! let instances = AppDiscovery::get_instances(&service_id).await.unwrap();
138//! println!("service instances: {:?}", instances);
139//! }
140//! ```
141//!
142//! ### 从配置文件初始化
143//!
144//! 默认从`bootstrap.yaml`中加载配置。
145//!
146//! 以下是示例配置:
147//! ```yaml
148//! conreg:
149//! service-id: your_service_id
150//! client:
151//! address: 127.0.0.1
152//! port: 8000
153//! discovery:
154//! server-addr:
155//! - 127.0.0.1:8000
156//! - 127.0.0.1:8001
157//! - 127.0.0.1:8002
158//! ```
159//! ```rust
160//! #[tokio::main]
161//! async fn main() {
162//! init().await;
163//! // 或者指定配置文件路径
164//! // init_from_file("config.yaml").await;
165//! init_with(config).await;
166//!
167//! let service_id = "your_service_id";
168//! let instances = AppDiscovery::get_instances(service_id).await.unwrap();
169//! println!("service instances: {:?}", instances);
170//! }
171//! ```
172//!
173//! # 负载均衡
174//! conreg-client基于`reqwest`提供了负载均衡客户端,支持使用`lb://service_id`格式的自定义协议发起请求。
175//!
176//! 参考:[`lb`]
177//!
178//! # 监听配置变更
179//! 为指定的config_id添加处理函数,在配置变更时,会调用该函数。
180//! ```rust
181//! AppConfig::add_listener("test.yaml", |config| {
182//! println!("Config changed, new config: {:?}", config);
183//! });
184//! ```
185
186use crate::conf::{ConRegConfig, ConRegConfigWrapper};
187use crate::config::Configs;
188use crate::discovery::{Discovery, DiscoveryClient};
189pub use crate::protocol::Instance;
190use anyhow::bail;
191use serde::de::DeserializeOwned;
192use std::collections::HashMap;
193use std::path::PathBuf;
194use std::process::exit;
195use std::sync::{Arc, OnceLock, RwLock};
196
197pub mod conf;
198mod config;
199mod discovery;
200pub mod lb;
201mod network;
202mod protocol;
203mod utils;
204
205struct Conreg;
206
207/// 存储配置内容
208static CONFIGS: OnceLock<Arc<RwLock<Configs>>> = OnceLock::new();
209/// 服务发现全局实例
210static DISCOVERY: OnceLock<Discovery> = OnceLock::new();
211
212impl Conreg {
213 /// 初始化配置中心和注册中心
214 async fn init(file: Option<PathBuf>) -> anyhow::Result<()> {
215 let mut file = file.unwrap_or("bootstrap.yaml".into());
216 if !file.exists() {
217 file = "bootstrap.yml".into();
218 }
219 let s = match std::fs::read_to_string(&file) {
220 Ok(s) => s,
221 Err(e) => {
222 log::error!("no bootstrap.yaml found, {}", e);
223 exit(1);
224 }
225 };
226
227 log::info!("loaded bootstrap config from {}", file.display());
228
229 let config = match serde_yaml::from_str::<ConRegConfigWrapper>(&s) {
230 Ok(config) => config,
231 Err(e) => {
232 log::error!("parse bootstrap.yaml failed, {}", e);
233 exit(1);
234 }
235 };
236
237 Self::init_with(&config.conreg).await?;
238
239 log::info!("conreg init completed");
240 Ok(())
241 }
242
243 async fn init_with(config: &ConRegConfig) -> anyhow::Result<()> {
244 #[cfg(feature = "logger")]
245 utils::init_log();
246
247 if config.config.is_some() {
248 let config_client = config::ConfigClient::new(&config);
249 let configs = config_client.load().await?;
250 CONFIGS.set(Arc::new(RwLock::new(configs))).map_err(|_| {
251 anyhow::anyhow!(
252 "config has already been initialized, please do not initialize repeatedly"
253 )
254 })?;
255 }
256
257 if config.discovery.is_some() {
258 let discovery_client = DiscoveryClient::new(config);
259 discovery_client.register().await?;
260 let discovery = Discovery::new(discovery_client).await;
261 DISCOVERY.set(discovery).map_err(|_| {
262 anyhow::anyhow!(
263 "discovery has already been initialized, please do not initialize repeatedly"
264 )
265 })?;
266 }
267
268 Ok(())
269 }
270}
271
272/// 初始化配置中心和注册中心
273pub async fn init() {
274 match Conreg::init(None).await {
275 Ok(_) => {}
276 Err(e) => {
277 log::error!("conreg init failed: {}", e);
278 exit(1);
279 }
280 };
281}
282
283/// 从配置文件初始化配置中心和注册中心
284pub async fn init_from_file(path: impl Into<PathBuf>) {
285 match Conreg::init(Some(path.into())).await {
286 Ok(_) => {}
287 Err(e) => {
288 log::error!("conreg init failed: {}", e);
289 exit(1);
290 }
291 };
292}
293
294/// 从自定义配置初始化
295pub async fn init_with(config: ConRegConfig) {
296 match Conreg::init_with(&config).await {
297 Ok(_) => {}
298 Err(e) => {
299 log::error!("conreg init failed: {}", e);
300 exit(1);
301 }
302 };
303}
304
305pub struct AppConfig;
306impl AppConfig {
307 fn reload(configs: Configs) {
308 match CONFIGS.get() {
309 None => {
310 log::error!("config not init");
311 }
312 Some(config) => {
313 *config.write().unwrap() = configs;
314 }
315 }
316 }
317
318 /// 获取配置值
319 ///
320 /// 注意:获取的值类型需要与配置中的值类型保持一致,如果不一致,可能会导致转换失败,
321 /// 转换失败时将返回`None`
322 pub fn get<V: DeserializeOwned>(key: &str) -> Option<V> {
323 match CONFIGS.get() {
324 None => {
325 log::error!("config not init");
326 None
327 }
328 Some(config) => match config.read().expect("read lock error").get(key) {
329 None => None,
330 Some(value) => match serde_yaml::from_value::<V>(value.clone()) {
331 Ok(value) => Some(value),
332 Err(e) => {
333 log::error!("parse config failed, {}", e);
334 None
335 }
336 },
337 },
338 }
339 }
340
341 /// 绑定配置内容到一个struct。
342 pub fn bind<T: DeserializeOwned>() -> anyhow::Result<T> {
343 match CONFIGS.get() {
344 None => {
345 bail!("config not init");
346 }
347 Some(config) => {
348 let value: T = serde_yaml::from_value(
349 config.read().expect("read lock error").content.clone(),
350 )?;
351 Ok(value)
352 }
353 }
354 }
355
356 /// 添加配置监听器
357 ///
358 /// - `config_id`: 配置ID
359 /// - `handler`: 配置监函数,参数为变更后、已合并并展平后的配置内容
360 pub fn add_listener(config_id: &str, handler: fn(&HashMap<String, serde_yaml::Value>)) {
361 Configs::add_listener(config_id, handler);
362 }
363}
364
365pub struct AppDiscovery;
366impl AppDiscovery {
367 /// 获取指定服务的可用的服务实例
368 pub async fn get_instances(service_id: &str) -> anyhow::Result<Vec<Instance>> {
369 match DISCOVERY.get() {
370 Some(discovery) => {
371 let instances = discovery.get_instances(service_id).await;
372 Ok(instances)
373 }
374 None => {
375 bail!("discovery not initialized")
376 }
377 }
378 }
379}
380
381#[cfg(test)]
382#[allow(unused)]
383mod tests {
384 use super::*;
385 use crate::conf::{ClientConfigBuilder, ConRegConfigBuilder, DiscoveryConfigBuilder};
386 use serde::Deserialize;
387 use std::collections::HashMap;
388 #[tokio::test]
389 async fn test_config() {
390 //init_log();
391 init().await;
392 //init_from_file("bootstrap.yaml").await;
393 /*init_with(
394 ConRegConfigBuilder::default()
395 .config(
396 ConfigConfigBuilder::default()
397 .server_addr("127.0.0.1:8000")
398 .namespace("public")
399 .config_ids(vec!["test.yaml".into()])
400 .build()
401 .unwrap(),
402 )
403 .build()
404 .unwrap(),
405 )
406 .await;*/
407 println!("{:?}", AppConfig::get::<String>("name"));
408 println!("{:?}", AppConfig::get::<u32>("age"));
409
410 #[derive(Deserialize)]
411 struct MyConfig {
412 name: String,
413 }
414 let my_config = AppConfig::bind::<MyConfig>().unwrap();
415 println!("my config, name: {:?}", my_config.name);
416
417 AppConfig::add_listener("test.yaml", |config| {
418 println!("Listen config change1: {:?}", config);
419 });
420 AppConfig::add_listener("test.yaml", |config| {
421 println!("Listen config change2: {:?}", config);
422 });
423 let h = tokio::spawn(async move {
424 loop {
425 println!("{:?}", AppConfig::get::<String>("name"));
426 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
427 }
428 });
429 tokio::join!(h);
430 }
431
432 #[tokio::test]
433 async fn test_discovery() {
434 //init_log();
435 init().await;
436 // let config = ConRegConfigBuilder::default()
437 // .service_id("your_service_id")
438 // .client(
439 // ClientConfigBuilder::default()
440 // .address("127.0.0.1")
441 // .port(8080)
442 // .build()
443 // .unwrap(),
444 // )
445 // .discovery(
446 // DiscoveryConfigBuilder::default()
447 // .server_addr(vec!["127.0.0.1:8000", "127.0.0.1:8001"])
448 // .build()
449 // .unwrap(),
450 // )
451 // .build()
452 // .unwrap();
453 // // println!("config: {:?}", config);
454 // let service_id = config.service_id.clone();
455 // init_with(config).await;
456 let h = tokio::spawn(async move {
457 loop {
458 println!("{:?}", AppConfig::get::<String>("name"));
459 let instances = AppDiscovery::get_instances(utils::current_process_name().as_str())
460 .await
461 .unwrap();
462 println!("current: {:?}", instances);
463 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
464 }
465 });
466 tokio::join!(h);
467 }
468}