conreg_client/
lib.rs

1//! # Conreg Client
2//!
3//! conreg是一个参考了Nacos设计的分布式服务注册和配置中心。
4//!
5//! conreg-client是conreg的客户端SDK,用于集成到您的服务中和conreg-server通信。
6//!
7//! > 注意:当前conreg的0.1.x版本仍处于快速迭代中,API在未来可能会发生变化
8//!
9//! # 快速开始
10//!
11//! ## 基本使用
12//! 在项目的根目录下添加`bootstrap.yaml`配置文件:
13//! ```yaml
14//! conreg:
15//!   # 服务ID
16//!   # 服务ID是服务的唯一标识,同一命名空间下的服务ID不能重复
17//!   service-id: test
18//!   # 客户端配置,这些信息将会作为服务实例的基本信息提交到注册中心
19//!   client:
20//!     # 监听地址
21//!     address: 127.0.0.1
22//!     # 端口
23//!     port: 8000
24//!   # 配置中心配置
25//!   config:
26//!     # 配置中心地址
27//!     server-addr: 127.0.0.1:8000
28//!     # 配置ID
29//!     # 如果多个配置中存在同名配置key,则靠后的配置将会覆盖之前的配置
30//!     config-ids:
31//!       - test.yaml
32//!   # 注册中心配置
33//!   discovery:
34//!     # 注册中心地址
35//!     server-addr:
36//!       - 127.0.0.1:8000
37//!       - 127.0.0.1:8001
38//!       - 127.0.0.1:8002
39//! ```
40//!
41//! 然后,在`main`函数中初始化:
42//! ```rust
43//! #[tokio::main]
44//! async fn main(){
45//!     // 初始化
46//!     init().await;
47//!
48//!     // 获取配置项
49//!     println!("{:?}", AppConfig::get::<String>("name"));
50//!
51//!     // 获取服务实例
52//!     let instances = AppDiscovery::get_instances("your_service_id").await.unwrap();
53//!     println!("service instances: {:?}", instances);
54//! }
55//! ```
56//!
57//! ## 命名空间
58//! conreg使用命名空间(Namespace)来对配置和服务进行隔离,默认命名空间为`public`。
59//!
60//! ## 配置中心
61//! 从配置中心中加载,并使用这些配置。目前仅支持`yaml`格式的配置。
62//!
63//! ### 初始化并加载配置
64//! ```rust
65//! #[tokio::main]
66//!  async fn main() {
67//!     init_with(
68//!         ConRegConfigBuilder::default()
69//!             .config(
70//!                 ConfigConfigBuilder::default()
71//!                     .server_addr("127.0.0.1:8000")
72//!                     .namespace("public")
73//!                     .config_ids(vec!["test.yaml".into()])
74//!                     .build()
75//!                     .unwrap(),
76//!             )
77//!             .build()
78//!             .unwrap(),
79//!
80//!     )
81//!     .await;
82//!     println!("{:?}", AppConfig::get::<String>("name"));
83//!     println!("{:?}", AppConfig::get::<u32>("age"));
84//!  }
85//! ```
86//!
87//! ### 从配置文件初始化
88//! conreg-client默认从项目根目录下的bootstrap.yaml加载配置初始化配置,就像SpringCloud一样。
89//!
90//! 以下是`bootstrap.yaml`配置示例
91//!
92//! ```yaml
93//! conreg:
94//!   config:
95//!     server-addr: 127.0.0.1:8000
96//!     config-ids:
97//!       - your_config.yaml
98//!
99//! ```
100//!
101//! 然后调用`init`方法即可初始化并获取配置内容。
102//! ```rust
103//! #[tokio::main]
104//!  async fn main() {
105//!     init().await;
106//!     // 或者指定配置文件路径
107//!     // init_from_file("config.yaml").await;
108//!     println!("{:?}", AppConfig::get::<String>("name"));
109//!     println!("{:?}", AppConfig::get::<u32>("age"));
110//!  }
111//! ```
112//!
113//! ## 注册中心
114//! 用于服务注册和发现。
115//!
116//! ### 初始化并加载配置
117//! ```rust
118//! #[tokio::main]
119//! async fn main() {
120//! let config = ConRegConfigBuilder::default()
121//!     .service_id("your_service_id")
122//!     .client(
123//!         ClientConfigBuilder::default()
124//!             .address("127.0.0.1")
125//!             .port(8080)
126//!             .build()
127//!             .unwrap(),
128//!     )
129//!     .discovery(
130//!         DiscoveryConfigBuilder::default()
131//!             .server_addr("127.0.0.1:8000")
132//!             .build()
133//!             .unwrap(),
134//!     )
135//!     .build()
136//!     .unwrap();
137//!     let service_id = config.service_id.clone();
138//!     init_with(config).await;
139//!     let instances = AppDiscovery::get_instances(&service_id).await.unwrap();
140//!     println!("service instances: {:?}", instances);
141//! }
142//! ```
143//!
144//! ### 从配置文件初始化
145//!
146//! 默认从`bootstrap.yaml`中加载配置。
147//!
148//! 以下是示例配置:
149//! ```yaml
150//! conreg:
151//!   service-id: your_service_id
152//!   client:
153//!     address: 127.0.0.1
154//!     port: 8000
155//!   discovery:
156//!     server-addr:
157//!       - 127.0.0.1:8000
158//!       - 127.0.0.1:8001
159//!       - 127.0.0.1:8002
160//! ```
161//! ```rust
162//! #[tokio::main]
163//!  async fn main() {
164//!     init().await;
165//!     // 或者指定配置文件路径
166//!     // init_from_file("config.yaml").await;
167//!     init_with(config).await;
168//!
169//!     let service_id = "your_service_id";
170//!     let instances = AppDiscovery::get_instances(service_id).await.unwrap();
171//!     println!("service instances: {:?}", instances);
172//!  }
173//! ```
174
175use crate::conf::{ConRegConfig, ConRegConfigWrapper};
176use crate::config::Configs;
177use crate::discovery::{Discovery, DiscoveryClient};
178pub use crate::protocol::Instance;
179use anyhow::bail;
180use env_logger::WriteStyle;
181use serde::Deserialize;
182use serde::de::DeserializeOwned;
183use std::path::PathBuf;
184use std::process::exit;
185use std::sync::{Arc, OnceLock, RwLock};
186
187mod conf;
188mod config;
189mod discovery;
190mod network;
191mod protocol;
192mod utils;
193
194struct ConReg;
195
196/// 存储配置内容
197static CONFIGS: OnceLock<Arc<RwLock<Configs>>> = OnceLock::new();
198/// 服务发现全局实例
199static DISCOVERY: OnceLock<Discovery> = OnceLock::new();
200
201impl ConReg {
202    /// 初始化配置中心和注册中心
203    async fn init(file: Option<PathBuf>) -> anyhow::Result<()> {
204        let mut file = file.unwrap_or("bootstrap.yaml".into());
205        if !file.exists() {
206            file = "bootstrap.yml".into();
207        }
208        let s = match std::fs::read_to_string(&file) {
209            Ok(s) => s,
210            Err(e) => {
211                log::error!("no bootstrap.yaml found, {}", e);
212                exit(1);
213            }
214        };
215
216        log::info!("loaded bootstrap config from {}", file.display());
217
218        let config = match serde_yaml::from_str::<ConRegConfigWrapper>(&s) {
219            Ok(config) => config,
220            Err(e) => {
221                log::error!("parse bootstrap.yaml failed, {}", e);
222                exit(1);
223            }
224        };
225
226        Self::init_with(&config.conreg).await?;
227
228        log::info!("conreg init completed");
229        Ok(())
230    }
231
232    async fn init_with(config: &ConRegConfig) -> anyhow::Result<()> {
233        init_log();
234
235        if config.config.is_some() {
236            let config_client = config::ConfigClient::new(&config);
237            let configs = config_client.load().await?;
238            CONFIGS.set(Arc::new(RwLock::new(configs))).map_err(|_| {
239                anyhow::anyhow!(
240                    "config has already been initialized, please do not initialize repeatedly"
241                )
242            })?;
243        }
244
245        if config.discovery.is_some() {
246            let discovery_client = DiscoveryClient::new(config);
247            discovery_client.register().await?;
248            let discovery = Discovery::new(discovery_client).await;
249            DISCOVERY.set(discovery).map_err(|_| {
250                anyhow::anyhow!(
251                    "discovery has already been initialized, please do not initialize repeatedly"
252                )
253            })?;
254        }
255
256        Ok(())
257    }
258}
259
260/// 初始化配置中心和注册中心
261pub async fn init() {
262    match ConReg::init(None).await {
263        Ok(_) => {}
264        Err(e) => {
265            log::error!("conreg init failed: {}", e);
266            exit(1);
267        }
268    };
269}
270
271/// 从配置文件初始化配置中心和注册中心
272pub async fn init_from_file(path: impl Into<PathBuf>) {
273    match ConReg::init(Some(path.into())).await {
274        Ok(_) => {}
275        Err(e) => {
276            log::error!("conreg init failed: {}", e);
277            exit(1);
278        }
279    };
280}
281
282/// 从自定义配置初始化
283pub async fn init_with(config: ConRegConfig) {
284    match ConReg::init_with(&config).await {
285        Ok(_) => {}
286        Err(e) => {
287            log::error!("conreg init failed: {}", e);
288            exit(1);
289        }
290    };
291}
292
293pub struct AppConfig;
294impl AppConfig {
295    fn reload(configs: Configs) {
296        match CONFIGS.get() {
297            None => {
298                log::error!("config not init");
299            }
300            Some(config) => {
301                *config.write().unwrap() = configs;
302            }
303        }
304    }
305
306    /// 获取配置值
307    ///
308    /// 注意:获取的值类型需要与配置中的值类型保持一致,如果不一致,可能会导致转换失败,
309    /// 转换失败时将返回`None`
310    pub fn get<V: DeserializeOwned>(key: &str) -> Option<V> {
311        match CONFIGS.get() {
312            None => {
313                log::error!("config not init");
314                None
315            }
316            Some(config) => match config.read().expect("read lock error").get(key) {
317                None => None,
318                Some(value) => match serde_yaml::from_value::<V>(value.clone()) {
319                    Ok(value) => Some(value),
320                    Err(e) => {
321                        log::error!("parse config failed, {}", e);
322                        None
323                    }
324                },
325            },
326        }
327    }
328
329    /// 绑定配置内容到一个struct。
330    pub fn bind<T: DeserializeOwned>() -> anyhow::Result<T> {
331        match CONFIGS.get() {
332            None => {
333                bail!("config not init");
334            }
335            Some(config) => {
336                let value: T = serde_yaml::from_value(
337                    config.read().expect("read lock error").content.clone(),
338                )?;
339                Ok(value)
340            }
341        }
342    }
343}
344
345fn init_log() {
346    use std::io::Write;
347    env_logger::Builder::new()
348        .filter_level(log::LevelFilter::Info)
349        .parse_default_env()
350        .format(|buf, record| {
351            let level = record.level().as_str();
352            writeln!(
353                buf,
354                "[{}][{}] - {}",
355                chrono::Local::now().format("%Y-%m-%d %H:%M:%S%.3f"),
356                level,
357                record.args()
358            )
359        })
360        .write_style(WriteStyle::Always)
361        .init();
362}
363
364pub struct AppDiscovery;
365impl AppDiscovery {
366    pub async fn get_instances(service_id: &str) -> anyhow::Result<Vec<Instance>> {
367        match DISCOVERY.get() {
368            Some(discovery) => {
369                //let discovery = discovery.read().expect("read lock error");
370                let instances = discovery.get_instances(service_id).await;
371                Ok(instances)
372            }
373            None => {
374                bail!("discovery not initialized")
375            }
376        }
377    }
378}
379
380#[cfg(test)]
381#[allow(unused)]
382mod tests {
383    use super::*;
384    use crate::conf::{ClientConfigBuilder, ConRegConfigBuilder, DiscoveryConfigBuilder};
385    #[tokio::test]
386    async fn test_config() {
387        //init_log();
388        init().await;
389        //init_from_file("bootstrap.yaml").await;
390        /*init_with(
391            ConRegConfigBuilder::default()
392                .config(
393                    ConfigConfigBuilder::default()
394                        .server_addr("127.0.0.1:8000")
395                        .namespace("public")
396                        .config_ids(vec!["test.yaml".into()])
397                        .build()
398                        .unwrap(),
399                )
400                .build()
401                .unwrap(),
402        )
403        .await;*/
404        println!("{:?}", AppConfig::get::<String>("name"));
405        println!("{:?}", AppConfig::get::<u32>("age"));
406
407        #[derive(Deserialize)]
408        struct MyConfig {
409            name: String,
410        }
411        let my_config = AppConfig::bind::<MyConfig>().unwrap();
412        println!("my config, name: {:?}", my_config.name);
413
414        let h = tokio::spawn(async move {
415            loop {
416                println!("{:?}", AppConfig::get::<String>("name"));
417                tokio::time::sleep(std::time::Duration::from_secs(1)).await;
418            }
419        });
420        tokio::join!(h);
421    }
422
423    #[tokio::test]
424    async fn test_discovery() {
425        //init_log();
426        //init().await;
427        let config = ConRegConfigBuilder::default()
428            .service_id("your_service_id")
429            .client(
430                ClientConfigBuilder::default()
431                    .address("127.0.0.1")
432                    .port(8080)
433                    .build()
434                    .unwrap(),
435            )
436            .discovery(
437                DiscoveryConfigBuilder::default()
438                    .server_addr("127.0.0.1:8000")
439                    .build()
440                    .unwrap(),
441            )
442            .build()
443            .unwrap();
444        // println!("config: {:?}", config);
445        let service_id = config.service_id.clone();
446        init_with(config).await;
447        let h = tokio::spawn(async move {
448            loop {
449                let instances = AppDiscovery::get_instances(&service_id).await.unwrap();
450                println!("current: {:?}", instances);
451                tokio::time::sleep(std::time::Duration::from_secs(1)).await;
452            }
453        });
454        tokio::join!(h);
455    }
456}