dubbo_registry_zookeeper/
lib.rs1#![allow(unused_variables, dead_code, missing_docs)]
19
20use std::{collections::HashMap, env, sync::Arc, time::Duration};
21
22use async_trait::async_trait;
23use dubbo::{
24 logger::tracing::{debug, error, info},
25 params::constants::{DUBBO_KEY, LOCALHOST_IP, PROVIDERS_KEY},
26 url::UrlParam,
27 StdError, Url,
28};
29use serde::{Deserialize, Serialize};
30use tokio::{select, sync::mpsc};
31use zookeeper::{Acl, CreateMode, WatchedEvent, WatchedEventType, Watcher, ZooKeeper};
32
33use dubbo::{
34 extension::registry_extension::{DiscoverStream, Registry, ServiceChange},
35 params::registry_param::InterfaceName,
36};
37
38pub const REGISTRY_GROUP_KEY: &str = "registry.group";
43
44struct LoggingWatcher;
45impl Watcher for LoggingWatcher {
46 fn handle(&self, e: WatchedEvent) {
47 info!("{:?}", e)
48 }
49}
50
51pub struct ZookeeperRegistry {
52 root_path: String,
53 zk_client: Arc<ZooKeeper>,
54}
55
56#[derive(Serialize, Deserialize, Debug)]
57pub struct ZkServiceInstance {
58 name: String,
59 address: String,
60 port: i32,
61}
62
63impl ZkServiceInstance {
64 pub fn get_service_name(&self) -> &str {
65 self.name.as_str()
66 }
67
68 pub fn get_host(&self) -> &str {
69 self.address.as_str()
70 }
71
72 pub fn get_port(&self) -> i32 {
73 self.port
74 }
75}
76
77impl ZookeeperRegistry {
78 pub fn new(connect_string: &str) -> ZookeeperRegistry {
79 let zk_client =
80 ZooKeeper::connect(connect_string, Duration::from_secs(15), LoggingWatcher).unwrap();
81 info!("zk server connect string: {}", connect_string);
82 ZookeeperRegistry {
83 root_path: "/services".to_string(),
84 zk_client: Arc::new(zk_client),
85 }
86 }
87
88 fn get_app_name(&self, service_name: String) -> String {
91 let res = self
92 .zk_client
93 .get_data(&("/dubbo/mapping/".to_owned() + &service_name), false);
94
95 let x = res.unwrap().0;
96 let s = match std::str::from_utf8(&x) {
97 Ok(v) => v,
98 Err(e) => panic!("Invalid UTF-8 sequence: {}", e),
99 };
100 s.to_string()
101 }
102
103 pub fn create_path(
105 &self,
106 path: &str,
107 data: &str,
108 create_mode: CreateMode,
109 ) -> Result<(), StdError> {
110 if self.exists_path(path) {
111 self.zk_client
112 .set_data(path, data.as_bytes().to_vec(), None)
113 .unwrap_or_else(|_| panic!("set data to {} failed.", path));
114 return Ok(());
115 }
116 let zk_result = self.zk_client.create(
117 path,
118 data.as_bytes().to_vec(),
119 Acl::open_unsafe().clone(),
120 create_mode,
121 );
122 match zk_result {
123 Ok(_) => Ok(()),
124 Err(err) => {
125 error!("zk path {} parent not exists.", path);
126 Err(err.into())
127 }
128 }
129 }
130
131 pub fn create_path_with_parent_check(
133 &self,
134 path: &str,
135 data: &str,
136 create_mode: CreateMode,
137 ) -> Result<(), StdError> {
138 let nodes: Vec<&str> = path.split('/').collect();
139 let mut current: String = String::new();
140 let children = *nodes.last().unwrap();
141 for node_key in nodes {
142 if node_key.is_empty() {
143 continue;
144 };
145 current.push('/');
146 current.push_str(node_key);
147 if !self.exists_path(current.as_str()) {
148 let (new_create_mode, new_data) = match children == node_key {
149 true => (create_mode, data),
150 false => (CreateMode::Persistent, ""),
151 };
152
153 self.create_path(current.as_str(), new_data, new_create_mode)?;
154 }
155 }
156 Ok(())
157 }
158
159 pub fn delete_path(&self, path: &str) {
160 if self.exists_path(path) {
161 self.zk_client.delete(path, None).unwrap()
162 }
163 }
164
165 pub fn exists_path(&self, path: &str) -> bool {
166 self.zk_client.exists(path, false).unwrap().is_some()
167 }
168
169 pub fn get_data(&self, path: &str, watch: bool) -> Option<String> {
170 if self.exists_path(path) {
171 let zk_result = self.zk_client.get_data(path, watch);
172 if let Ok(..) = zk_result {
173 Some(String::from_utf8(zk_result.unwrap().0).unwrap())
174 } else {
175 None
176 }
177 } else {
178 None
179 }
180 }
181
182 pub fn diff<'a>(
183 old_urls: &'a Vec<String>,
184 new_urls: &'a Vec<String>,
185 ) -> (Vec<String>, Vec<String>) {
186 let old_urls_map: HashMap<String, String> = old_urls
187 .iter()
188 .map(|url| url.parse())
189 .filter(|item| item.is_ok())
190 .map(|item| item.unwrap())
191 .map(|item: Url| {
192 let ip_port = item.authority().to_owned();
193 let url = item.as_str().to_owned();
194 (ip_port, url)
195 })
196 .collect();
197
198 let new_urls_map: HashMap<String, String> = new_urls
199 .iter()
200 .map(|url| url.parse())
201 .filter(|item| item.is_ok())
202 .map(|item| item.unwrap())
203 .map(|item: Url| {
204 let ip_port = item.authority().to_owned();
205 let url = item.as_str().to_owned();
206 (ip_port, url)
207 })
208 .collect();
209
210 let mut add_hosts = Vec::new();
211 let mut removed_hosts = Vec::new();
212
213 for (key, new_host) in new_urls_map.iter() {
214 let old_host = old_urls_map.get(key);
215 match old_host {
216 None => {
217 add_hosts.push(new_host.clone());
218 }
219 Some(old_host) => {
220 if !old_host.eq(new_host) {
221 removed_hosts.push(old_host.clone());
222 add_hosts.push(new_host.clone());
223 }
224 }
225 }
226 }
227
228 for (key, old_host) in old_urls_map.iter() {
229 let new_host = old_urls_map.get(key);
230 match new_host {
231 None => {
232 removed_hosts.push(old_host.clone());
233 }
234 Some(_) => {}
235 }
236 }
237
238 (removed_hosts, add_hosts)
239 }
240}
241
242impl Default for ZookeeperRegistry {
243 fn default() -> ZookeeperRegistry {
244 let zk_connect_string = match env::var("ZOOKEEPER_SERVERS") {
245 Ok(val) => val,
246 Err(_) => {
247 let default_connect_string = "localhost:2181";
248 info!(
249 "No ZOOKEEPER_SERVERS env value, using {} as default.",
250 default_connect_string
251 );
252 default_connect_string.to_string()
253 }
254 };
255 println!(
256 "using external registry with it's connect string {}",
257 zk_connect_string.as_str()
258 );
259 ZookeeperRegistry::new(zk_connect_string.as_str())
260 }
261}
262
263#[async_trait]
264impl Registry for ZookeeperRegistry {
265 async fn register(&self, url: Url) -> Result<(), StdError> {
266 debug!("register url: {}", url);
267 let interface_name = url.query::<InterfaceName>().unwrap().value();
268 let url_str = url.as_str();
269 let zk_path = format!(
270 "/{}/{}/{}/{}",
271 DUBBO_KEY, interface_name, PROVIDERS_KEY, url_str
272 );
273 self.create_path_with_parent_check(zk_path.as_str(), LOCALHOST_IP, CreateMode::Ephemeral)?;
274 Ok(())
275 }
276
277 async fn unregister(&self, url: Url) -> Result<(), StdError> {
278 let interface_name = url.query::<InterfaceName>().unwrap().value();
279 let url_str = url.as_str();
280
281 let zk_path = format!(
282 "/{}/{}/{}/{}",
283 DUBBO_KEY, interface_name, PROVIDERS_KEY, url_str
284 );
285 self.delete_path(zk_path.as_str());
286 Ok(())
287 }
288
289 async fn subscribe(&self, url: Url) -> Result<DiscoverStream, StdError> {
291 let interface_name = url.query::<InterfaceName>().unwrap().value();
292
293 let zk_path = format!("/{}/{}/{}", DUBBO_KEY, interface_name, PROVIDERS_KEY);
294
295 debug!("subscribe service: {}", zk_path);
296
297 let (listener, mut change_rx) = ZooKeeperListener::new();
298 let arc_listener = Arc::new(listener);
299
300 let watcher = ZooKeeperWatcher::new(arc_listener.clone(), zk_path.clone());
301
302 let (discover_tx, discover_rx) = mpsc::channel(64);
303
304 let zk_client_in_task = self.zk_client.clone();
305 let zk_path_in_task = zk_path.clone();
306 let interface_name_in_task = interface_name.clone();
307 let arc_listener_in_task = arc_listener.clone();
308 tokio::spawn(async move {
309 let zk_client = zk_client_in_task;
310 let zk_path = zk_path_in_task;
311 let interface_name = interface_name_in_task;
312 let listener = arc_listener_in_task;
313
314 let mut current_urls = Vec::new();
315
316 loop {
317 let changed = select! {
318 _ = discover_tx.closed() => {
319 info!("discover task quit, discover channel closed");
320 None
321 },
322 changed = change_rx.recv() => {
323 changed
324 }
325 };
326
327 match changed {
328 Some(_) => {
329 let zookeeper_watcher =
330 ZooKeeperWatcher::new(listener.clone(), zk_path.clone());
331
332 match zk_client.get_children_w(&zk_path, zookeeper_watcher) {
333 Ok(children) => {
334 let (removed, add) =
335 ZookeeperRegistry::diff(¤t_urls, &children);
336
337 for url in removed {
338 match discover_tx
339 .send(Ok(ServiceChange::Remove(url.clone())))
340 .await
341 {
342 Ok(_) => {}
343 Err(e) => {
344 error!("send service change failed: {:?}, maybe user unsubscribe", e);
345 break;
346 }
347 }
348 }
349
350 for url in add {
351 match discover_tx
352 .send(Ok(ServiceChange::Insert(url.clone(), ())))
353 .await
354 {
355 Ok(_) => {}
356 Err(e) => {
357 error!("send service change failed: {:?}, maybe user unsubscribe", e);
358 break;
359 }
360 }
361 }
362
363 current_urls = children;
364 }
365 Err(err) => {
366 error!("zk subscribe error: {}", err);
367 break;
368 }
369 }
370 }
371 None => {
372 error!("receive service change task quit, unsubscribe {}.", zk_path);
373 break;
374 }
375 }
376 }
377
378 debug!("unsubscribe service: {}", zk_path);
379 });
380
381 arc_listener.changed(zk_path);
382
383 Ok(discover_rx)
384 }
385
386 async fn unsubscribe(&self, url: Url) -> Result<(), StdError> {
387 let interface_name = url.query::<InterfaceName>().unwrap().value();
388
389 let zk_path = format!("/{}/{}/{}", DUBBO_KEY, &interface_name, PROVIDERS_KEY);
390
391 info!("unsubscribe service: {}", zk_path);
392 Ok(())
393 }
394
395 fn url(&self) -> &Url {
396 todo!()
397 }
398}
399
400pub struct ZooKeeperListener {
401 tx: mpsc::Sender<String>,
402}
403
404impl ZooKeeperListener {
405 pub fn new() -> (ZooKeeperListener, mpsc::Receiver<String>) {
406 let (tx, rx) = mpsc::channel(64);
407 let this = ZooKeeperListener { tx };
408 (this, rx)
409 }
410
411 pub fn changed(&self, path: String) {
412 match self.tx.try_send(path) {
413 Ok(_) => {}
414 Err(err) => {
415 error!("send change list to listener occur an error: {}", err);
416 return;
417 }
418 }
419 }
420}
421
422pub struct ZooKeeperWatcher {
423 listener: Arc<ZooKeeperListener>,
424 path: String,
425}
426
427impl ZooKeeperWatcher {
428 pub fn new(listener: Arc<ZooKeeperListener>, path: String) -> ZooKeeperWatcher {
429 ZooKeeperWatcher { listener, path }
430 }
431}
432
433impl Watcher for ZooKeeperWatcher {
434 fn handle(&self, event: WatchedEvent) {
435 info!("receive zookeeper event: {:?}", event);
436 let event_type: WatchedEventType = event.event_type;
437 match event_type {
438 WatchedEventType::None => {
439 info!("event type is none, ignore it.");
440 return;
441 }
442 _ => {}
443 }
444
445 self.listener.changed(self.path.clone());
446 }
447}