etcd_discovery/
etcd_discovery_base.rs1use std::{collections::HashMap, sync::{RwLock, Arc}};
8
9use etcd_client::*;
10use tracing::info;
11
12#[allow(dead_code)]
14pub struct EtcdDiscoveryBase {
15 etcd_client: Client,
16 service_map: Arc<RwLock<HashMap<String, String>>>,
17
18}
19
20impl EtcdDiscoveryBase {
21
22 pub fn get_service_map(&self) -> Arc<RwLock<HashMap<String, String>>> {
24 self.service_map.clone()
25 }
26
27 pub fn get_etcd_client(&self) -> Client {
29 self.etcd_client.clone()
30 }
31}
32
33impl EtcdDiscoveryBase {
34 pub fn new(client: Client) -> Self {
36
37 Self {
38 etcd_client: client,
39 service_map: Arc::new(RwLock::new(HashMap::new())),
40
41 }
42 }
43
44 pub async fn connect(etcd_addr: impl AsRef<[&str]>, options: Option<ConnectOptions>) -> Result<Self, Error> {
46 let client = Client::connect(etcd_addr, options).await?;
47 info!("etcd connect success");
48 Ok(Self::new(client))
49 }
50
51 pub async fn service_discover(&mut self, prefix: &str) -> Result<(), Error> {
53 let opt = Some(GetOptions::new().with_prefix());
54 let resp = self.etcd_client.get(prefix, opt).await?;
55 for kv in resp.kvs() {
56 let key = kv.key_str().unwrap_or_default();
57 let value = kv.value_str().unwrap_or_default();
58 info!("discover put key: {} value: {}", key, value);
59 self.add_service(key, value).await;
60 }
61
62 let opt = Some(WatchOptions::new().with_prefix());
63 let (mut watcher, mut stream) = self.etcd_client.watch(prefix, opt).await?;
64 let service_map = self.service_map.clone();
65
66 tokio::spawn(async move {
67 while let Some(resp) = stream.message().await.unwrap() {
68 for event in resp.events() {
69 match event.event_type() {
70 etcd_client::EventType::Put => {
71 if let Some(kv) = event.kv(){
72 let key = kv.key_str().unwrap_or_default();
73 let value = kv.value_str().unwrap_or_default();
74 info!("discover watch put key: {} value: {}", key, value);
75 if key.is_empty() {
76 continue
77 }
78 Self::add_service_map(&service_map, key, value).await;
79 }
80
81 }
82 etcd_client::EventType::Delete => {
83 if let Some(kv) = event.kv(){
84 let key = kv.key_str().unwrap_or_default();
85 info!("discover watch delete key: {}", key);
86 Self::remove_service_map(&service_map, key).await;
87 }
88 }
89 }
90 }
91 }
92 watcher.cancel().await.unwrap();
93
94 });
95
96
97 Ok(())
98 }
99
100 pub fn get_service(&self, key: impl AsRef<str>) -> Option<String> {
102 self.service_map.read().unwrap().get(key.as_ref()).cloned()
103 }
104
105 pub fn remove_service(&mut self, key: impl AsRef<str>) -> Option<String> {
106 self.service_map.write().unwrap().remove(key.as_ref())
107 }
108
109 pub async fn add_service(&self, key: impl AsRef<str>, url: &str) {
110 Self::add_service_map(&self.service_map, key.as_ref(), url).await;
111
112 }
113
114
115 #[inline]
116 async fn add_service_map( service_map: &RwLock<HashMap<String, String>>, key: impl Into<String>, value: &str) {
117 let key = key.into();
118 service_map
119 .write()
120 .unwrap()
121 .insert(key.clone(), value.into());
122
123
124 }
125 #[inline]
126 async fn remove_service_map( service_map: &RwLock<HashMap<String, String>>, key: impl AsRef<str>) {
127 service_map.write().unwrap().remove(key.as_ref());
128 }
129
130}
131
132
133
134#[cfg(test)]
135mod tests {
136 use etcd_client::*;
137 use super::*;
138
139 #[tokio::test]
140 async fn test_discovery() -> Result<(), etcd_client::Error> {
141 tracing_subscriber::fmt().init();
142 let opt = ConnectOptions::new().with_user("root", "789789");
143 let mut discover = EtcdDiscoveryBase::connect(["127.0.0.1:2379"], Some(opt)).await?;
144 discover.service_discover("/hello/1").await?;
145
146 tokio::time::sleep(tokio::time::Duration::from_secs(10)).await;
147
148 Ok(())
149
150 }
151
152}