conreg_client/lib.rs
1//! # Conreg Client
2//!
3//! conreg是一个参考了Nacos设计的分布式服务注册和配置中心。
4//!
5//! conreg-client是conreg的客户端SDK,用于集成到您的服务中和conreg-server通信。
6//!
7//! > 注意:当前conreg的0.1.x版本仍处于快速迭代中,API在未来可能会发生变化
8//!
9//! # 快速开始
10//!
11//! ## 基本使用
12//! 在项目的根目录下添加`bootstrap.yaml`配置文件:
13//! ```yaml
14//! conreg:
15//! # 服务ID
16//! # 服务ID是服务的唯一标识,同一命名空间下的服务ID不能重复
17//! service-id: test
18//! # 客户端配置,这些信息将会作为服务实例的基本信息提交到注册中心
19//! client:
20//! # 监听地址
21//! address: 127.0.0.1
22//! # 端口
23//! port: 8000
24//! # 配置中心配置
25//! config:
26//! # 配置中心地址
27//! server-addr: 127.0.0.1:8000
28//! # 配置ID
29//! # 如果多个配置中存在同名配置key,则靠后的配置将会覆盖之前的配置
30//! config-ids:
31//! - test.yaml
32//! # 注册中心配置
33//! discovery:
34//! # 注册中心地址
35//! server-addr:
36//! - 127.0.0.1:8000
37//! - 127.0.0.1:8001
38//! - 127.0.0.1:8002
39//! ```
40//!
41//! 然后,在`main`函数中初始化:
42//! ```rust
43//! #[tokio::main]
44//! async fn main(){
45//! // 初始化
46//! init().await;
47//!
48//! // 获取配置项
49//! println!("{:?}", AppConfig::get::<String>("name"));
50//!
51//! // 获取服务实例
52//! let instances = AppDiscovery::get_instances("your_service_id").await.unwrap();
53//! println!("service instances: {:?}", instances);
54//! }
55//! ```
56//!
57//! ## 命名空间
58//! conreg使用命名空间(Namespace)来对配置和服务进行隔离,默认命名空间为`public`。
59//!
60//! ## 配置中心
61//! 从配置中心中加载,并使用这些配置。目前仅支持`yaml`格式的配置。
62//!
63//! ### 初始化并加载配置
64//! ```rust
65//! #[tokio::main]
66//! async fn main() {
67//! init_with(
68//! ConRegConfigBuilder::default()
69//! .config(
70//! ConfigConfigBuilder::default()
71//! .server_addr("127.0.0.1:8000")
72//! .namespace("public")
73//! .config_ids(vec!["test.yaml".into()])
74//! .build()
75//! .unwrap(),
76//! )
77//! .build()
78//! .unwrap(),
79//!
80//! )
81//! .await;
82//! println!("{:?}", AppConfig::get::<String>("name"));
83//! println!("{:?}", AppConfig::get::<u32>("age"));
84//! }
85//! ```
86//!
87//! ### 从配置文件初始化
88//! conreg-client默认从项目根目录下的bootstrap.yaml加载配置初始化配置,就像SpringCloud一样。
89//!
90//! 以下是`bootstrap.yaml`配置示例
91//!
92//! ```yaml
93//! conreg:
94//! config:
95//! server-addr: 127.0.0.1:8000
96//! config-ids:
97//! - your_config.yaml
98//!
99//! ```
100//!
101//! 然后调用`init`方法即可初始化并获取配置内容。
102//! ```rust
103//! #[tokio::main]
104//! async fn main() {
105//! init().await;
106//! // 或者指定配置文件路径
107//! // init_from_file("config.yaml").await;
108//! println!("{:?}", AppConfig::get::<String>("name"));
109//! println!("{:?}", AppConfig::get::<u32>("age"));
110//! }
111//! ```
112//!
113//! ## 注册中心
114//! 用于服务注册和发现。
115//!
116//! ### 初始化并加载配置
117//! ```rust
118//! #[tokio::main]
119//! async fn main() {
120//! let config = ConRegConfigBuilder::default()
121//! .service_id("your_service_id")
122//! .client(
123//! ClientConfigBuilder::default()
124//! .address("127.0.0.1")
125//! .port(8080)
126//! .build()
127//! .unwrap(),
128//! )
129//! .discovery(
130//! DiscoveryConfigBuilder::default()
131//! .server_addr("127.0.0.1:8000")
132//! .build()
133//! .unwrap(),
134//! )
135//! .build()
136//! .unwrap();
137//! let service_id = config.service_id.clone();
138//! init_with(config).await;
139//! let instances = AppDiscovery::get_instances(&service_id).await.unwrap();
140//! println!("service instances: {:?}", instances);
141//! }
142//! ```
143//!
144//! ### 从配置文件初始化
145//!
146//! 默认从`bootstrap.yaml`中加载配置。
147//!
148//! 以下是示例配置:
149//! ```yaml
150//! conreg:
151//! service-id: your_service_id
152//! client:
153//! address: 127.0.0.1
154//! port: 8000
155//! discovery:
156//! server-addr:
157//! - 127.0.0.1:8000
158//! - 127.0.0.1:8001
159//! - 127.0.0.1:8002
160//! ```
161//! ```rust
162//! #[tokio::main]
163//! async fn main() {
164//! init().await;
165//! // 或者指定配置文件路径
166//! // init_from_file("config.yaml").await;
167//! init_with(config).await;
168//!
169//! let service_id = "your_service_id";
170//! let instances = AppDiscovery::get_instances(service_id).await.unwrap();
171//! println!("service instances: {:?}", instances);
172//! }
173//! ```
174
175use crate::conf::{ConRegConfig, ConRegConfigWrapper};
176use crate::config::Configs;
177use crate::discovery::{Discovery, DiscoveryClient};
178pub use crate::protocol::Instance;
179use anyhow::bail;
180use env_logger::WriteStyle;
181use serde::Deserialize;
182use serde::de::DeserializeOwned;
183use std::path::PathBuf;
184use std::process::exit;
185use std::sync::{Arc, OnceLock, RwLock};
186
187mod conf;
188mod config;
189mod discovery;
190mod network;
191mod protocol;
192mod utils;
193
194struct ConReg;
195
196/// 存储配置内容
197static CONFIGS: OnceLock<Arc<RwLock<Configs>>> = OnceLock::new();
198/// 服务发现全局实例
199static DISCOVERY: OnceLock<Discovery> = OnceLock::new();
200
201impl ConReg {
202 /// 初始化配置中心和注册中心
203 async fn init(file: Option<PathBuf>) -> anyhow::Result<()> {
204 let mut file = file.unwrap_or("bootstrap.yaml".into());
205 if !file.exists() {
206 file = "bootstrap.yml".into();
207 }
208 let s = match std::fs::read_to_string(&file) {
209 Ok(s) => s,
210 Err(e) => {
211 log::error!("no bootstrap.yaml found, {}", e);
212 exit(1);
213 }
214 };
215
216 log::info!("loaded bootstrap config from {}", file.display());
217
218 let config = match serde_yaml::from_str::<ConRegConfigWrapper>(&s) {
219 Ok(config) => config,
220 Err(e) => {
221 log::error!("parse bootstrap.yaml failed, {}", e);
222 exit(1);
223 }
224 };
225
226 Self::init_with(&config.conreg).await?;
227
228 log::info!("conreg init completed");
229 Ok(())
230 }
231
232 async fn init_with(config: &ConRegConfig) -> anyhow::Result<()> {
233 init_log();
234
235 if config.config.is_some() {
236 let config_client = config::ConfigClient::new(&config);
237 let configs = config_client.load().await?;
238 CONFIGS.set(Arc::new(RwLock::new(configs))).map_err(|_| {
239 anyhow::anyhow!(
240 "config has already been initialized, please do not initialize repeatedly"
241 )
242 })?;
243 }
244
245 if config.discovery.is_some() {
246 let discovery_client = DiscoveryClient::new(config);
247 discovery_client.register().await?;
248 let discovery = Discovery::new(discovery_client).await;
249 DISCOVERY.set(discovery).map_err(|_| {
250 anyhow::anyhow!(
251 "discovery has already been initialized, please do not initialize repeatedly"
252 )
253 })?;
254 }
255
256 Ok(())
257 }
258}
259
260/// 初始化配置中心和注册中心
261pub async fn init() {
262 match ConReg::init(None).await {
263 Ok(_) => {}
264 Err(e) => {
265 log::error!("conreg init failed: {}", e);
266 exit(1);
267 }
268 };
269}
270
271/// 从配置文件初始化配置中心和注册中心
272pub async fn init_from_file(path: impl Into<PathBuf>) {
273 match ConReg::init(Some(path.into())).await {
274 Ok(_) => {}
275 Err(e) => {
276 log::error!("conreg init failed: {}", e);
277 exit(1);
278 }
279 };
280}
281
282/// 从自定义配置初始化
283pub async fn init_with(config: ConRegConfig) {
284 match ConReg::init_with(&config).await {
285 Ok(_) => {}
286 Err(e) => {
287 log::error!("conreg init failed: {}", e);
288 exit(1);
289 }
290 };
291}
292
293pub struct AppConfig;
294impl AppConfig {
295 fn reload(configs: Configs) {
296 match CONFIGS.get() {
297 None => {
298 log::error!("config not init");
299 }
300 Some(config) => {
301 *config.write().unwrap() = configs;
302 }
303 }
304 }
305
306 /// 获取配置值
307 ///
308 /// 注意:获取的值类型需要与配置中的值类型保持一致,如果不一致,可能会导致转换失败,
309 /// 转换失败时将返回`None`
310 pub fn get<V: DeserializeOwned>(key: &str) -> Option<V> {
311 match CONFIGS.get() {
312 None => {
313 log::error!("config not init");
314 None
315 }
316 Some(config) => match config.read().expect("read lock error").get(key) {
317 None => None,
318 Some(value) => match serde_yaml::from_value::<V>(value.clone()) {
319 Ok(value) => Some(value),
320 Err(e) => {
321 log::error!("parse config failed, {}", e);
322 None
323 }
324 },
325 },
326 }
327 }
328
329 /// 绑定配置内容到一个struct。
330 pub fn bind<T: DeserializeOwned>() -> anyhow::Result<T> {
331 match CONFIGS.get() {
332 None => {
333 bail!("config not init");
334 }
335 Some(config) => {
336 let value: T = serde_yaml::from_value(
337 config.read().expect("read lock error").content.clone(),
338 )?;
339 Ok(value)
340 }
341 }
342 }
343}
344
345fn init_log() {
346 use std::io::Write;
347 env_logger::Builder::new()
348 .filter_level(log::LevelFilter::Info)
349 .parse_default_env()
350 .format(|buf, record| {
351 let level = record.level().as_str();
352 writeln!(
353 buf,
354 "[{}][{}] - {}",
355 chrono::Local::now().format("%Y-%m-%d %H:%M:%S%.3f"),
356 level,
357 record.args()
358 )
359 })
360 .write_style(WriteStyle::Always)
361 .init();
362}
363
364pub struct AppDiscovery;
365impl AppDiscovery {
366 pub async fn get_instances(service_id: &str) -> anyhow::Result<Vec<Instance>> {
367 match DISCOVERY.get() {
368 Some(discovery) => {
369 //let discovery = discovery.read().expect("read lock error");
370 let instances = discovery.get_instances(service_id).await;
371 Ok(instances)
372 }
373 None => {
374 bail!("discovery not initialized")
375 }
376 }
377 }
378}
379
380#[cfg(test)]
381#[allow(unused)]
382mod tests {
383 use super::*;
384 use crate::conf::{ClientConfigBuilder, ConRegConfigBuilder, DiscoveryConfigBuilder};
385 #[tokio::test]
386 async fn test_config() {
387 //init_log();
388 init().await;
389 //init_from_file("bootstrap.yaml").await;
390 /*init_with(
391 ConRegConfigBuilder::default()
392 .config(
393 ConfigConfigBuilder::default()
394 .server_addr("127.0.0.1:8000")
395 .namespace("public")
396 .config_ids(vec!["test.yaml".into()])
397 .build()
398 .unwrap(),
399 )
400 .build()
401 .unwrap(),
402 )
403 .await;*/
404 println!("{:?}", AppConfig::get::<String>("name"));
405 println!("{:?}", AppConfig::get::<u32>("age"));
406
407 #[derive(Deserialize)]
408 struct MyConfig {
409 name: String,
410 }
411 let my_config = AppConfig::bind::<MyConfig>().unwrap();
412 println!("my config, name: {:?}", my_config.name);
413
414 let h = tokio::spawn(async move {
415 loop {
416 println!("{:?}", AppConfig::get::<String>("name"));
417 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
418 }
419 });
420 tokio::join!(h);
421 }
422
423 #[tokio::test]
424 async fn test_discovery() {
425 //init_log();
426 //init().await;
427 let config = ConRegConfigBuilder::default()
428 .service_id("your_service_id")
429 .client(
430 ClientConfigBuilder::default()
431 .address("127.0.0.1")
432 .port(8080)
433 .build()
434 .unwrap(),
435 )
436 .discovery(
437 DiscoveryConfigBuilder::default()
438 .server_addr("127.0.0.1:8000")
439 .build()
440 .unwrap(),
441 )
442 .build()
443 .unwrap();
444 // println!("config: {:?}", config);
445 let service_id = config.service_id.clone();
446 init_with(config).await;
447 let h = tokio::spawn(async move {
448 loop {
449 let instances = AppDiscovery::get_instances(&service_id).await.unwrap();
450 println!("current: {:?}", instances);
451 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
452 }
453 });
454 tokio::join!(h);
455 }
456}