conreg_client/lib.rs
1//! # Conreg Client
2//!
3//! conreg是一个参考了Nacos设计的分布式服务注册和配置中心。
4//!
5//! conreg-client是conreg的客户端SDK,用于集成到您的服务中和conreg-server通信。
6//!
7//! > 注意:当前conreg的0.1.x版本仍处于快速迭代中,API在未来可能会发生变化
8//!
9//! # 快速开始
10//!
11//! ## 基本使用
12//! 在项目的根目录下添加`bootstrap.yaml`配置文件:
13//! ```yaml
14//! conreg:
15//! # 服务ID
16//! # 服务ID是服务的唯一标识,同一命名空间下的服务ID不能重复
17//! service-id: test
18//! # 客户端配置,这些信息将会作为服务实例的基本信息提交到注册中心
19//! client:
20//! # 监听地址
21//! address: 127.0.0.1
22//! # 端口
23//! port: 8000
24//! # 配置中心配置
25//! config:
26//! # 配置中心地址
27//! server-addr: 127.0.0.1:8000
28//! # 配置ID
29//! # 如果多个配置中存在同名配置key,则靠后的配置将会覆盖之前的配置
30//! config-ids:
31//! - test.yaml
32//! # 注册中心配置
33//! discovery:
34//! # 注册中心地址
35//! server-addr:
36//! - 127.0.0.1:8000
37//! - 127.0.0.1:8001
38//! - 127.0.0.1:8002
39//! ```
40//!
41//! 然后,在`main`函数中初始化:
42//! ```rust
43//! #[tokio::main]
44//! async fn main(){
45//! // 初始化
46//! init().await;
47//!
48//! // 获取配置项
49//! println!("{:?}", AppConfig::get::<String>("name"));
50//!
51//! // 获取服务实例
52//! let instances = AppDiscovery::get_instances("your_service_id").await.unwrap();
53//! println!("service instances: {:?}", instances);
54//! }
55//! ```
56//!
57//! ## 命名空间
58//! conreg使用命名空间(Namespace)来对配置和服务进行隔离,默认命名空间为`public`。
59//!
60//! ## 配置中心
61//! 从配置中心中加载,并使用这些配置。目前仅支持`yaml`格式的配置。
62//!
63//! ### 初始化并加载配置
64//! ```rust
65//! #[tokio::main]
66//! async fn main() {
67//! init_with(
68//! ConRegConfigBuilder::default()
69//! .config(
70//! ConfigConfigBuilder::default()
71//! .server_addr("127.0.0.1:8000")
72//! .namespace("public")
73//! .config_ids(vec!["test.yaml".into()])
74//! .build()
75//! .unwrap(),
76//! )
77//! .build()
78//! .unwrap(),
79//!
80//! )
81//! .await;
82//! println!("{:?}", AppConfig::get::<String>("name"));
83//! println!("{:?}", AppConfig::get::<u32>("age"));
84//! }
85//! ```
86//!
87//! ### 从配置文件初始化
88//! conreg-client默认从项目根目录下的bootstrap.yaml加载配置初始化配置,就像SpringCloud一样。
89//!
90//! 以下是`bootstrap.yaml`配置示例
91//!
92//! ```yaml
93//! conreg:
94//! config:
95//! server-addr: 127.0.0.1:8000
96//! config-ids:
97//! - your_config.yaml
98//!
99//! ```
100//!
101//! 然后调用`init`方法即可初始化并获取配置内容。
102//! ```rust
103//! #[tokio::main]
104//! async fn main() {
105//! init().await;
106//! // 或者指定配置文件路径
107//! // init_from_file("config.yaml").await;
108//! println!("{:?}", AppConfig::get::<String>("name"));
109//! println!("{:?}", AppConfig::get::<u32>("age"));
110//! }
111//! ```
112//!
113//! ## 注册中心
114//! 用于服务注册和发现。
115//!
116//! ### 初始化并加载配置
117//! ```rust
118//! #[tokio::main]
119//! async fn main() {
120//! let config = ConRegConfigBuilder::default()
121//! .service_id("your_service_id")
122//! .client(
123//! ClientConfigBuilder::default()
124//! .address("127.0.0.1")
125//! .port(8080)
126//! .build()
127//! .unwrap(),
128//! )
129//! .discovery(
130//! DiscoveryConfigBuilder::default()
131//! .server_addr("127.0.0.1:8000")
132//! .build()
133//! .unwrap(),
134//! )
135//! .build()
136//! .unwrap();
137//! let service_id = config.service_id.clone();
138//! init_with(config).await;
139//! let instances = AppDiscovery::get_instances(&service_id).await.unwrap();
140//! println!("service instances: {:?}", instances);
141//! }
142//! ```
143//!
144//! ### 从配置文件初始化
145//!
146//! 默认从`bootstrap.yaml`中加载配置。
147//!
148//! 以下是示例配置:
149//! ```yaml
150//! conreg:
151//! service-id: your_service_id
152//! client:
153//! address: 127.0.0.1
154//! port: 8000
155//! discovery:
156//! server-addr:
157//! - 127.0.0.1:8000
158//! - 127.0.0.1:8001
159//! - 127.0.0.1:8002
160//! ```
161//! ```rust
162//! #[tokio::main]
163//! async fn main() {
164//! init().await;
165//! // 或者指定配置文件路径
166//! // init_from_file("config.yaml").await;
167//! init_with(config).await;
168//!
169//! let service_id = "your_service_id";
170//! let instances = AppDiscovery::get_instances(service_id).await.unwrap();
171//! println!("service instances: {:?}", instances);
172//! }
173//! ```
174
175use crate::conf::{ConRegConfig, ConRegConfigWrapper};
176use crate::config::Configs;
177use crate::discovery::{Discovery, DiscoveryClient};
178pub use crate::protocol::Instance;
179use anyhow::bail;
180use env_logger::WriteStyle;
181use serde::de::DeserializeOwned;
182use std::path::PathBuf;
183use std::process::exit;
184use std::sync::{Arc, OnceLock, RwLock};
185
186mod conf;
187mod config;
188mod discovery;
189mod network;
190mod protocol;
191mod utils;
192
193struct ConReg;
194
195/// 存储配置内容
196static CONFIGS: OnceLock<Arc<RwLock<Configs>>> = OnceLock::new();
197/// 服务发现全局实例
198static DISCOVERY: OnceLock<Discovery> = OnceLock::new();
199
200impl ConReg {
201 /// 初始化配置中心和注册中心
202 async fn init(file: Option<PathBuf>) -> anyhow::Result<()> {
203 let mut file = file.unwrap_or("bootstrap.yaml".into());
204 if !file.exists() {
205 file = "bootstrap.yml".into();
206 }
207 let s = match std::fs::read_to_string(&file) {
208 Ok(s) => s,
209 Err(e) => {
210 log::error!("no bootstrap.yaml found, {}", e);
211 exit(1);
212 }
213 };
214
215 log::info!("loaded bootstrap config from {}", file.display());
216
217 let config = match serde_yaml::from_str::<ConRegConfigWrapper>(&s) {
218 Ok(config) => config,
219 Err(e) => {
220 log::error!("parse bootstrap.yaml failed, {}", e);
221 exit(1);
222 }
223 };
224
225 Self::init_with(&config.conreg).await?;
226
227 log::info!("conreg init completed");
228 Ok(())
229 }
230
231 async fn init_with(config: &ConRegConfig) -> anyhow::Result<()> {
232 init_log();
233
234 if config.config.is_some() {
235 let config_client = config::ConfigClient::new(&config);
236 let configs = config_client.load().await?;
237 CONFIGS.set(Arc::new(RwLock::new(configs))).map_err(|_| {
238 anyhow::anyhow!(
239 "config has already been initialized, please do not initialize repeatedly"
240 )
241 })?;
242 }
243
244 if config.discovery.is_some() {
245 let discovery_client = DiscoveryClient::new(config);
246 discovery_client.register().await?;
247 let discovery = Discovery::new(discovery_client).await;
248 DISCOVERY.set(discovery).map_err(|_| {
249 anyhow::anyhow!(
250 "discovery has already been initialized, please do not initialize repeatedly"
251 )
252 })?;
253 }
254
255 Ok(())
256 }
257}
258
259/// 初始化配置中心和注册中心
260pub async fn init() {
261 match ConReg::init(None).await {
262 Ok(_) => {}
263 Err(e) => {
264 log::error!("conreg init failed: {}", e);
265 exit(1);
266 }
267 };
268}
269
270/// 从配置文件初始化配置中心和注册中心
271pub async fn init_from_file(path: impl Into<PathBuf>) {
272 match ConReg::init(Some(path.into())).await {
273 Ok(_) => {}
274 Err(e) => {
275 log::error!("conreg init failed: {}", e);
276 exit(1);
277 }
278 };
279}
280
281/// 从自定义配置初始化
282pub async fn init_with(config: ConRegConfig) {
283 match ConReg::init_with(&config).await {
284 Ok(_) => {}
285 Err(e) => {
286 log::error!("conreg init failed: {}", e);
287 exit(1);
288 }
289 };
290}
291
292pub struct AppConfig;
293impl AppConfig {
294 fn reload(configs: Configs) {
295 match CONFIGS.get() {
296 None => {
297 log::error!("config not init");
298 }
299 Some(config) => {
300 *config.write().unwrap() = configs;
301 }
302 }
303 }
304
305 /// 获取配置值
306 ///
307 /// 注意:获取的值类型需要与配置中的值类型保持一致,如果不一致,可能会导致转换失败,
308 /// 转换失败时将返回`None`
309 pub fn get<V: DeserializeOwned>(key: &str) -> Option<V> {
310 match CONFIGS.get() {
311 None => {
312 log::error!("config not init");
313 None
314 }
315 Some(config) => match config.read().expect("read lock error").get(key) {
316 None => None,
317 Some(value) => match serde_yaml::from_value::<V>(value.clone()) {
318 Ok(value) => Some(value),
319 Err(e) => {
320 log::error!("parse config failed, {}", e);
321 None
322 }
323 },
324 },
325 }
326 }
327
328 /// 绑定配置内容到一个struct。
329 pub fn bind<T: DeserializeOwned>() -> anyhow::Result<T> {
330 match CONFIGS.get() {
331 None => {
332 bail!("config not init");
333 }
334 Some(config) => {
335 let value: T = serde_yaml::from_value(
336 config.read().expect("read lock error").content.clone(),
337 )?;
338 Ok(value)
339 }
340 }
341 }
342}
343
344fn init_log() {
345 use std::io::Write;
346 env_logger::Builder::new()
347 .filter_level(log::LevelFilter::Info)
348 .parse_default_env()
349 .format(|buf, record| {
350 let level = record.level().as_str();
351 writeln!(
352 buf,
353 "[{}][{}] - {}",
354 chrono::Local::now().format("%Y-%m-%d %H:%M:%S%.3f"),
355 level,
356 record.args()
357 )
358 })
359 .write_style(WriteStyle::Always)
360 .init();
361}
362
363pub struct AppDiscovery;
364impl AppDiscovery {
365 pub async fn get_instances(service_id: &str) -> anyhow::Result<Vec<Instance>> {
366 match DISCOVERY.get() {
367 Some(discovery) => {
368 //let discovery = discovery.read().expect("read lock error");
369 let instances = discovery.get_instances(service_id).await;
370 Ok(instances)
371 }
372 None => {
373 bail!("discovery not initialized")
374 }
375 }
376 }
377}
378
379#[cfg(test)]
380#[allow(unused)]
381mod tests {
382 use super::*;
383 use crate::conf::{ClientConfigBuilder, ConRegConfigBuilder, DiscoveryConfigBuilder};
384 use serde::Deserialize;
385 #[tokio::test]
386 async fn test_config() {
387 //init_log();
388 init().await;
389 //init_from_file("bootstrap.yaml").await;
390 /*init_with(
391 ConRegConfigBuilder::default()
392 .config(
393 ConfigConfigBuilder::default()
394 .server_addr("127.0.0.1:8000")
395 .namespace("public")
396 .config_ids(vec!["test.yaml".into()])
397 .build()
398 .unwrap(),
399 )
400 .build()
401 .unwrap(),
402 )
403 .await;*/
404 println!("{:?}", AppConfig::get::<String>("name"));
405 println!("{:?}", AppConfig::get::<u32>("age"));
406
407 #[derive(Deserialize)]
408 struct MyConfig {
409 name: String,
410 }
411 let my_config = AppConfig::bind::<MyConfig>().unwrap();
412 println!("my config, name: {:?}", my_config.name);
413
414 let h = tokio::spawn(async move {
415 loop {
416 println!("{:?}", AppConfig::get::<String>("name"));
417 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
418 }
419 });
420 tokio::join!(h);
421 }
422
423 #[tokio::test]
424 async fn test_discovery() {
425 //init_log();
426 //init().await;
427 let config = ConRegConfigBuilder::default()
428 .service_id("your_service_id")
429 .client(
430 ClientConfigBuilder::default()
431 .address("127.0.0.1")
432 .port(8080)
433 .build()
434 .unwrap(),
435 )
436 .discovery(
437 DiscoveryConfigBuilder::default()
438 .server_addr("127.0.0.1:8000")
439 .build()
440 .unwrap(),
441 )
442 .build()
443 .unwrap();
444 // println!("config: {:?}", config);
445 let service_id = config.service_id.clone();
446 init_with(config).await;
447 let h = tokio::spawn(async move {
448 loop {
449 let instances = AppDiscovery::get_instances(&service_id).await.unwrap();
450 println!("current: {:?}", instances);
451 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
452 }
453 });
454 tokio::join!(h);
455 }
456}