etcd_discovery/
etcd_discovery_base.rs

1/*
2 * @Author: plucky
3 * @Date: 2023-11-06 15:04:59
4 * @LastEditTime: 2023-11-12 09:32:04
5 */
6
7use std::{collections::HashMap, sync::{RwLock, Arc}};
8
9use etcd_client::*;
10use tracing::info;
11
12/// etcd discovery base, get service addr String
13#[allow(dead_code)]
14pub struct EtcdDiscoveryBase {
15    etcd_client: Client,
16    service_map: Arc<RwLock<HashMap<String, String>>>,
17    
18}
19
20impl EtcdDiscoveryBase {
21
22    /// 获取所有服务的map
23    pub fn get_service_map(&self) -> Arc<RwLock<HashMap<String, String>>> {
24        self.service_map.clone()
25    }
26
27    /// 获取etcd客户端
28    pub fn get_etcd_client(&self) -> Client {
29        self.etcd_client.clone()
30    }
31}
32
33impl EtcdDiscoveryBase {
34    /// 用etcd_client创建服务发现
35    pub fn new(client: Client) -> Self {
36        
37        Self {
38            etcd_client: client,
39            service_map: Arc::new(RwLock::new(HashMap::new())),
40            
41        }
42    }
43
44    /// 连接etcd, 创建服务发现
45    pub async fn connect(etcd_addr: impl AsRef<[&str]>, options: Option<ConnectOptions>) -> Result<Self, Error> {
46        let client = Client::connect(etcd_addr, options).await?;
47        info!("etcd connect success");
48        Ok(Self::new(client))
49    }
50    
51    /// 服务发现, prefix为服务前缀, 例如: /hello, 发现前缀/hello的所有服务
52    pub async fn service_discover(&mut self, prefix: &str) -> Result<(), Error> {
53        let opt = Some(GetOptions::new().with_prefix());
54        let resp = self.etcd_client.get(prefix, opt).await?;
55        for kv in resp.kvs() {
56            let key = kv.key_str().unwrap_or_default();
57            let value = kv.value_str().unwrap_or_default();
58            info!("discover put key: {} value: {}", key, value);
59            self.add_service(key, value).await;
60        }
61
62        let opt = Some(WatchOptions::new().with_prefix());
63        let (mut watcher, mut stream) = self.etcd_client.watch(prefix, opt).await?;
64        let service_map = self.service_map.clone();
65
66        tokio::spawn(async move {
67            while let Some(resp) = stream.message().await.unwrap() {
68                for event in resp.events() {
69                    match event.event_type() {
70                        etcd_client::EventType::Put => {
71                            if let Some(kv) = event.kv(){
72                                let key = kv.key_str().unwrap_or_default();
73                                let value = kv.value_str().unwrap_or_default();
74                                info!("discover watch put key: {} value: {}", key, value);
75                                if key.is_empty() {
76                                    continue
77                                }
78                                Self::add_service_map(&service_map, key, value).await;
79                            }
80                            
81                        }
82                        etcd_client::EventType::Delete => {
83                            if let Some(kv) = event.kv(){
84                                let key = kv.key_str().unwrap_or_default();
85                                info!("discover watch delete key: {}", key);
86                                Self::remove_service_map(&service_map, key).await;
87                            }
88                        }
89                    }
90                }
91            }
92            watcher.cancel().await.unwrap();
93
94        });
95
96
97        Ok(())
98    }
99
100    /// 获取一个服务的地址, 例如: /hello/1
101    pub fn get_service(&self, key: impl AsRef<str>) -> Option<String> {
102        self.service_map.read().unwrap().get(key.as_ref()).cloned()
103    }
104    
105    pub fn remove_service(&mut self, key: impl AsRef<str>) -> Option<String> {
106        self.service_map.write().unwrap().remove(key.as_ref())
107    }
108
109    pub async fn add_service(&self, key: impl AsRef<str>, url: &str) {
110        Self::add_service_map(&self.service_map, key.as_ref(), url).await;
111        
112    }
113
114
115    #[inline]
116    async fn add_service_map( service_map: &RwLock<HashMap<String, String>>, key: impl Into<String>, value: &str) {
117        let key = key.into();
118        service_map
119                .write()
120                .unwrap()
121                .insert(key.clone(), value.into());
122           
123        
124    }
125    #[inline]
126    async fn remove_service_map( service_map: &RwLock<HashMap<String, String>>, key: impl AsRef<str>) {
127        service_map.write().unwrap().remove(key.as_ref());
128    }
129
130}
131
132
133
134#[cfg(test)]
135mod tests {
136    use etcd_client::*;
137    use super::*;
138
139    #[tokio::test]
140    async fn test_discovery() -> Result<(), etcd_client::Error> {
141        tracing_subscriber::fmt().init();
142        let opt = ConnectOptions::new().with_user("root", "789789");
143        let mut discover = EtcdDiscoveryBase::connect(["127.0.0.1:2379"], Some(opt)).await?;
144        discover.service_discover("/hello/1").await?;
145
146        tokio::time::sleep(tokio::time::Duration::from_secs(10)).await;
147
148        Ok(())
149    
150    }
151
152}