conreg_client/
lib.rs

1//! conreg是一个参考了Nacos设计的分布式服务注册和配置中心。详情请看:[conreg](https://github.com/xgpxg/conreg)
2//!
3//! conreg-client是conreg的客户端SDK,用于集成到您的服务中和conreg-server通信。
4//!
5//! ℹ️ 注意:当前conreg的0.1.x版本仍处于快速迭代中,API在未来可能会发生变化
6//!
7//! # 快速开始
8//!
9//! ## 基本使用
10//! 在项目的根目录下添加`bootstrap.yaml`配置文件:
11//! ```yaml
12//! conreg:
13//!   # 服务ID
14//!   # 服务ID是服务的唯一标识,同一命名空间下的服务ID不能重复
15//!   service-id: test
16//!   # 客户端配置,这些信息将会作为服务实例的基本信息提交到注册中心
17//!   client:
18//!     # 监听地址
19//!     address: 127.0.0.1
20//!     # 端口
21//!     port: 8000
22//!   # 配置中心配置
23//!   config:
24//!     # 配置中心地址
25//!     server-addr: 127.0.0.1:8000
26//!     # 配置ID
27//!     # 如果多个配置中存在同名配置key,则靠后的配置将会覆盖之前的配置
28//!     config-ids:
29//!       - test.yaml
30//!   # 注册中心配置
31//!   discovery:
32//!     # 注册中心地址
33//!     server-addr:
34//!       - 127.0.0.1:8000
35//!       - 127.0.0.1:8001
36//!       - 127.0.0.1:8002
37//! ```
38//!
39//! 然后,在`main`函数中初始化:
40//! ```rust
41//! #[tokio::main]
42//! async fn main(){
43//!     // 初始化
44//!     init().await;
45//!
46//!     // 获取配置项
47//!     println!("{:?}", AppConfig::get::<String>("name"));
48//!
49//!     // 获取服务实例
50//!     let instances = AppDiscovery::get_instances("your_service_id").await.unwrap();
51//!     println!("service instances: {:?}", instances);
52//! }
53//! ```
54//!
55//! ## 命名空间
56//! conreg使用命名空间(Namespace)来对配置和服务进行隔离,默认命名空间为`public`。
57//!
58//! ## 配置中心
59//! 从配置中心中加载,并使用这些配置。目前仅支持`yaml`格式的配置。
60//!
61//! ### 初始化并加载配置
62//! ```rust
63//! #[tokio::main]
64//!  async fn main() {
65//!     init_with(
66//!         ConRegConfigBuilder::default()
67//!             .config(
68//!                 ConfigConfigBuilder::default()
69//!                     .server_addr("127.0.0.1:8000")
70//!                     .namespace("public")
71//!                     .config_ids(vec!["test.yaml".into()])
72//!                     .build()
73//!                     .unwrap(),
74//!             )
75//!             .build()
76//!             .unwrap(),
77//!
78//!     )
79//!     .await;
80//!     println!("{:?}", AppConfig::get::<String>("name"));
81//!     println!("{:?}", AppConfig::get::<u32>("age"));
82//!  }
83//! ```
84//!
85//! ### 从配置文件初始化
86//! conreg-client默认从项目根目录下的bootstrap.yaml加载配置初始化配置,就像SpringCloud一样。
87//!
88//! 以下是`bootstrap.yaml`配置示例
89//!
90//! ```yaml
91//! conreg:
92//!   config:
93//!     server-addr: 127.0.0.1:8000
94//!     config-ids:
95//!       - your_config.yaml
96//!
97//! ```
98//!
99//! 然后调用`init`方法即可初始化并获取配置内容。
100//! ```rust
101//! #[tokio::main]
102//!  async fn main() {
103//!     init().await;
104//!     // 或者指定配置文件路径
105//!     // init_from_file("config.yaml").await;
106//!     println!("{:?}", AppConfig::get::<String>("name"));
107//!     println!("{:?}", AppConfig::get::<u32>("age"));
108//!  }
109//! ```
110//!
111//! ## 注册中心
112//! 用于服务注册和发现。
113//!
114//! ### 初始化并加载配置
115//! ```rust
116//! #[tokio::main]
117//! async fn main() {
118//! let config = ConRegConfigBuilder::default()
119//!     .service_id("your_service_id")
120//!     .client(
121//!         ClientConfigBuilder::default()
122//!             .address("127.0.0.1")
123//!             .port(8080)
124//!             .build()
125//!             .unwrap(),
126//!     )
127//!     .discovery(
128//!         DiscoveryConfigBuilder::default()
129//!             .server_addr("127.0.0.1:8000")
130//!             .build()
131//!             .unwrap(),
132//!     )
133//!     .build()
134//!     .unwrap();
135//!     let service_id = config.service_id.clone();
136//!     init_with(config).await;
137//!     let instances = AppDiscovery::get_instances(&service_id).await.unwrap();
138//!     println!("service instances: {:?}", instances);
139//! }
140//! ```
141//!
142//! ### 从配置文件初始化
143//!
144//! 默认从`bootstrap.yaml`中加载配置。
145//!
146//! 以下是示例配置:
147//! ```yaml
148//! conreg:
149//!   service-id: your_service_id
150//!   client:
151//!     address: 127.0.0.1
152//!     port: 8000
153//!   discovery:
154//!     server-addr:
155//!       - 127.0.0.1:8000
156//!       - 127.0.0.1:8001
157//!       - 127.0.0.1:8002
158//! ```
159//! ```rust
160//! #[tokio::main]
161//!  async fn main() {
162//!     init().await;
163//!     // 或者指定配置文件路径
164//!     // init_from_file("config.yaml").await;
165//!     init_with(config).await;
166//!
167//!     let service_id = "your_service_id";
168//!     let instances = AppDiscovery::get_instances(service_id).await.unwrap();
169//!     println!("service instances: {:?}", instances);
170//!  }
171//! ```
172//!
173//! # 负载均衡
174//! conreg-client基于`reqwest`提供了负载均衡客户端,支持使用`lb://service_id`格式的自定义协议发起请求。
175//!
176//! 参考:[`lb`]
177//!
178//! # 监听配置变更
179//! 为指定的config_id添加处理函数,在配置变更时,会调用该函数。
180//! ```rust
181//! AppConfig::add_listener("test.yaml", |config| {
182//!     println!("Config changed, new config: {:?}", config);
183//! });
184//! ```
185
186use crate::conf::{ConRegConfig, ConRegConfigWrapper};
187use crate::config::Configs;
188use crate::discovery::{Discovery, DiscoveryClient};
189pub use crate::protocol::Instance;
190use anyhow::bail;
191use serde::de::DeserializeOwned;
192use std::collections::HashMap;
193use std::path::PathBuf;
194use std::process::exit;
195use std::sync::{Arc, OnceLock, RwLock};
196
197pub mod conf;
198mod config;
199mod discovery;
200pub mod lb;
201mod network;
202mod protocol;
203mod utils;
204
205struct Conreg;
206
207/// 存储配置内容
208static CONFIGS: OnceLock<Arc<RwLock<Configs>>> = OnceLock::new();
209/// 服务发现全局实例
210static DISCOVERY: OnceLock<Discovery> = OnceLock::new();
211
212impl Conreg {
213    /// 初始化配置中心和注册中心
214    async fn init(file: Option<PathBuf>) -> anyhow::Result<()> {
215        let mut file = file.unwrap_or("bootstrap.yaml".into());
216        if !file.exists() {
217            file = "bootstrap.yml".into();
218        }
219        let s = match std::fs::read_to_string(&file) {
220            Ok(s) => s,
221            Err(e) => {
222                log::error!("no bootstrap.yaml found, {}", e);
223                exit(1);
224            }
225        };
226
227        log::info!("loaded bootstrap config from {}", file.display());
228
229        let config = match serde_yaml::from_str::<ConRegConfigWrapper>(&s) {
230            Ok(config) => config,
231            Err(e) => {
232                log::error!("parse bootstrap.yaml failed, {}", e);
233                exit(1);
234            }
235        };
236
237        Self::init_with(&config.conreg).await?;
238
239        log::info!("conreg init completed");
240        Ok(())
241    }
242
243    async fn init_with(config: &ConRegConfig) -> anyhow::Result<()> {
244        #[cfg(feature = "logger")]
245        utils::init_log();
246
247        if config.config.is_some() {
248            let config_client = config::ConfigClient::new(&config);
249            let configs = config_client.load().await?;
250            CONFIGS.set(Arc::new(RwLock::new(configs))).map_err(|_| {
251                anyhow::anyhow!(
252                    "config has already been initialized, please do not initialize repeatedly"
253                )
254            })?;
255        }
256
257        if config.discovery.is_some() {
258            let discovery_client = DiscoveryClient::new(config);
259            discovery_client.register().await?;
260            let discovery = Discovery::new(discovery_client).await;
261            DISCOVERY.set(discovery).map_err(|_| {
262                anyhow::anyhow!(
263                    "discovery has already been initialized, please do not initialize repeatedly"
264                )
265            })?;
266        }
267
268        Ok(())
269    }
270}
271
272/// 初始化配置中心和注册中心
273pub async fn init() {
274    match Conreg::init(None).await {
275        Ok(_) => {}
276        Err(e) => {
277            log::error!("conreg init failed: {}", e);
278            exit(1);
279        }
280    };
281}
282
283/// 从配置文件初始化配置中心和注册中心
284pub async fn init_from_file(path: impl Into<PathBuf>) {
285    match Conreg::init(Some(path.into())).await {
286        Ok(_) => {}
287        Err(e) => {
288            log::error!("conreg init failed: {}", e);
289            exit(1);
290        }
291    };
292}
293
294/// 从自定义配置初始化
295pub async fn init_with(config: ConRegConfig) {
296    match Conreg::init_with(&config).await {
297        Ok(_) => {}
298        Err(e) => {
299            log::error!("conreg init failed: {}", e);
300            exit(1);
301        }
302    };
303}
304
305pub struct AppConfig;
306impl AppConfig {
307    fn reload(configs: Configs) {
308        match CONFIGS.get() {
309            None => {
310                log::error!("config not init");
311            }
312            Some(config) => {
313                *config.write().unwrap() = configs;
314            }
315        }
316    }
317
318    /// 获取配置值
319    ///
320    /// 注意:获取的值类型需要与配置中的值类型保持一致,如果不一致,可能会导致转换失败,
321    /// 转换失败时将返回`None`
322    pub fn get<V: DeserializeOwned>(key: &str) -> Option<V> {
323        match CONFIGS.get() {
324            None => {
325                log::error!("config not init");
326                None
327            }
328            Some(config) => match config.read().expect("read lock error").get(key) {
329                None => None,
330                Some(value) => match serde_yaml::from_value::<V>(value.clone()) {
331                    Ok(value) => Some(value),
332                    Err(e) => {
333                        log::error!("parse config failed, {}", e);
334                        None
335                    }
336                },
337            },
338        }
339    }
340
341    /// 绑定配置内容到一个struct。
342    pub fn bind<T: DeserializeOwned>() -> anyhow::Result<T> {
343        match CONFIGS.get() {
344            None => {
345                bail!("config not init");
346            }
347            Some(config) => {
348                let value: T = serde_yaml::from_value(
349                    config.read().expect("read lock error").content.clone(),
350                )?;
351                Ok(value)
352            }
353        }
354    }
355
356    /// 添加配置监听器
357    ///
358    /// - `config_id`: 配置ID
359    /// - `handler`: 配置监函数,参数为变更后、已合并并展平后的配置内容
360    pub fn add_listener(config_id: &str, handler: fn(&HashMap<String, serde_yaml::Value>)) {
361        Configs::add_listener(config_id, handler);
362    }
363}
364
365pub struct AppDiscovery;
366impl AppDiscovery {
367    /// 获取指定服务的可用的服务实例
368    pub async fn get_instances(service_id: &str) -> anyhow::Result<Vec<Instance>> {
369        match DISCOVERY.get() {
370            Some(discovery) => {
371                let instances = discovery.get_instances(service_id).await;
372                Ok(instances)
373            }
374            None => {
375                bail!("discovery not initialized")
376            }
377        }
378    }
379}
380
381#[cfg(test)]
382#[allow(unused)]
383mod tests {
384    use super::*;
385    use crate::conf::{ClientConfigBuilder, ConRegConfigBuilder, DiscoveryConfigBuilder};
386    use serde::Deserialize;
387    use std::collections::HashMap;
388    #[tokio::test]
389    async fn test_config() {
390        //init_log();
391        init().await;
392        //init_from_file("bootstrap.yaml").await;
393        /*init_with(
394            ConRegConfigBuilder::default()
395                .config(
396                    ConfigConfigBuilder::default()
397                        .server_addr("127.0.0.1:8000")
398                        .namespace("public")
399                        .config_ids(vec!["test.yaml".into()])
400                        .build()
401                        .unwrap(),
402                )
403                .build()
404                .unwrap(),
405        )
406        .await;*/
407        println!("{:?}", AppConfig::get::<String>("name"));
408        println!("{:?}", AppConfig::get::<u32>("age"));
409
410        #[derive(Deserialize)]
411        struct MyConfig {
412            name: String,
413        }
414        let my_config = AppConfig::bind::<MyConfig>().unwrap();
415        println!("my config, name: {:?}", my_config.name);
416
417        AppConfig::add_listener("test.yaml", |config| {
418            println!("Listen config change1: {:?}", config);
419        });
420        AppConfig::add_listener("test.yaml", |config| {
421            println!("Listen config change2: {:?}", config);
422        });
423        let h = tokio::spawn(async move {
424            loop {
425                println!("{:?}", AppConfig::get::<String>("name"));
426                tokio::time::sleep(std::time::Duration::from_secs(1)).await;
427            }
428        });
429        tokio::join!(h);
430    }
431
432    #[tokio::test]
433    async fn test_discovery() {
434        //init_log();
435        init().await;
436        // let config = ConRegConfigBuilder::default()
437        //     .service_id("your_service_id")
438        //     .client(
439        //         ClientConfigBuilder::default()
440        //             .address("127.0.0.1")
441        //             .port(8080)
442        //             .build()
443        //             .unwrap(),
444        //     )
445        //     .discovery(
446        //         DiscoveryConfigBuilder::default()
447        //             .server_addr(vec!["127.0.0.1:8000", "127.0.0.1:8001"])
448        //             .build()
449        //             .unwrap(),
450        //     )
451        //     .build()
452        //     .unwrap();
453        // // println!("config: {:?}", config);
454        // let service_id = config.service_id.clone();
455        // init_with(config).await;
456        let h = tokio::spawn(async move {
457            loop {
458                println!("{:?}", AppConfig::get::<String>("name"));
459                let instances = AppDiscovery::get_instances(utils::current_process_name().as_str())
460                    .await
461                    .unwrap();
462                println!("current: {:?}", instances);
463                tokio::time::sleep(std::time::Duration::from_secs(1)).await;
464            }
465        });
466        tokio::join!(h);
467    }
468}