Skip to main content

dubbo_registry_zookeeper/
lib.rs

1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18#![allow(unused_variables, dead_code, missing_docs)]
19
20use std::{collections::HashMap, env, sync::Arc, time::Duration};
21
22use async_trait::async_trait;
23use dubbo::{
24    logger::tracing::{debug, error, info},
25    params::constants::{DUBBO_KEY, LOCALHOST_IP, PROVIDERS_KEY},
26    url::UrlParam,
27    StdError, Url,
28};
29use serde::{Deserialize, Serialize};
30use tokio::{select, sync::mpsc};
31use zookeeper::{Acl, CreateMode, WatchedEvent, WatchedEventType, Watcher, ZooKeeper};
32
33use dubbo::{
34    extension::registry_extension::{DiscoverStream, Registry, ServiceChange},
35    params::registry_param::InterfaceName,
36};
37
38// Get metadata of a service registration from a URL
39// rawURL = fmt.Sprintf("%s://%s%s?%s", c.Protocol, host, c.Path, s)
40// dubboPath = fmt.Sprintf("/%s/%s/%s", r.URL.GetParam(constant.RegistryGroupKey, "dubbo"), r.service(c), common.DubboNodes[common.PROVIDER])
41
42pub const REGISTRY_GROUP_KEY: &str = "registry.group";
43
44struct LoggingWatcher;
45impl Watcher for LoggingWatcher {
46    fn handle(&self, e: WatchedEvent) {
47        info!("{:?}", e)
48    }
49}
50
51pub struct ZookeeperRegistry {
52    root_path: String,
53    zk_client: Arc<ZooKeeper>,
54}
55
56#[derive(Serialize, Deserialize, Debug)]
57pub struct ZkServiceInstance {
58    name: String,
59    address: String,
60    port: i32,
61}
62
63impl ZkServiceInstance {
64    pub fn get_service_name(&self) -> &str {
65        self.name.as_str()
66    }
67
68    pub fn get_host(&self) -> &str {
69        self.address.as_str()
70    }
71
72    pub fn get_port(&self) -> i32 {
73        self.port
74    }
75}
76
77impl ZookeeperRegistry {
78    pub fn new(connect_string: &str) -> ZookeeperRegistry {
79        let zk_client =
80            ZooKeeper::connect(connect_string, Duration::from_secs(15), LoggingWatcher).unwrap();
81        info!("zk server connect string: {}", connect_string);
82        ZookeeperRegistry {
83            root_path: "/services".to_string(),
84            zk_client: Arc::new(zk_client),
85        }
86    }
87
88    // metadata /dubbo/mapping designed for application discovery; deprecated for currently using interface discovery
89    // #[deprecated]
90    fn get_app_name(&self, service_name: String) -> String {
91        let res = self
92            .zk_client
93            .get_data(&("/dubbo/mapping/".to_owned() + &service_name), false);
94
95        let x = res.unwrap().0;
96        let s = match std::str::from_utf8(&x) {
97            Ok(v) => v,
98            Err(e) => panic!("Invalid UTF-8 sequence: {}", e),
99        };
100        s.to_string()
101    }
102
103    // If the parent node does not exist in the ZooKeeper, Err(ZkError::NoNode) will be returned.
104    pub fn create_path(
105        &self,
106        path: &str,
107        data: &str,
108        create_mode: CreateMode,
109    ) -> Result<(), StdError> {
110        if self.exists_path(path) {
111            self.zk_client
112                .set_data(path, data.as_bytes().to_vec(), None)
113                .unwrap_or_else(|_| panic!("set data to {} failed.", path));
114            return Ok(());
115        }
116        let zk_result = self.zk_client.create(
117            path,
118            data.as_bytes().to_vec(),
119            Acl::open_unsafe().clone(),
120            create_mode,
121        );
122        match zk_result {
123            Ok(_) => Ok(()),
124            Err(err) => {
125                error!("zk path {} parent not exists.", path);
126                Err(err.into())
127            }
128        }
129    }
130
131    // For avoiding Err(ZkError::NoNode) when parent node is't exists
132    pub fn create_path_with_parent_check(
133        &self,
134        path: &str,
135        data: &str,
136        create_mode: CreateMode,
137    ) -> Result<(), StdError> {
138        let nodes: Vec<&str> = path.split('/').collect();
139        let mut current: String = String::new();
140        let children = *nodes.last().unwrap();
141        for node_key in nodes {
142            if node_key.is_empty() {
143                continue;
144            };
145            current.push('/');
146            current.push_str(node_key);
147            if !self.exists_path(current.as_str()) {
148                let (new_create_mode, new_data) = match children == node_key {
149                    true => (create_mode, data),
150                    false => (CreateMode::Persistent, ""),
151                };
152
153                self.create_path(current.as_str(), new_data, new_create_mode)?;
154            }
155        }
156        Ok(())
157    }
158
159    pub fn delete_path(&self, path: &str) {
160        if self.exists_path(path) {
161            self.zk_client.delete(path, None).unwrap()
162        }
163    }
164
165    pub fn exists_path(&self, path: &str) -> bool {
166        self.zk_client.exists(path, false).unwrap().is_some()
167    }
168
169    pub fn get_data(&self, path: &str, watch: bool) -> Option<String> {
170        if self.exists_path(path) {
171            let zk_result = self.zk_client.get_data(path, watch);
172            if let Ok(..) = zk_result {
173                Some(String::from_utf8(zk_result.unwrap().0).unwrap())
174            } else {
175                None
176            }
177        } else {
178            None
179        }
180    }
181
182    pub fn diff<'a>(
183        old_urls: &'a Vec<String>,
184        new_urls: &'a Vec<String>,
185    ) -> (Vec<String>, Vec<String>) {
186        let old_urls_map: HashMap<String, String> = old_urls
187            .iter()
188            .map(|url| url.parse())
189            .filter(|item| item.is_ok())
190            .map(|item| item.unwrap())
191            .map(|item: Url| {
192                let ip_port = item.authority().to_owned();
193                let url = item.as_str().to_owned();
194                (ip_port, url)
195            })
196            .collect();
197
198        let new_urls_map: HashMap<String, String> = new_urls
199            .iter()
200            .map(|url| url.parse())
201            .filter(|item| item.is_ok())
202            .map(|item| item.unwrap())
203            .map(|item: Url| {
204                let ip_port = item.authority().to_owned();
205                let url = item.as_str().to_owned();
206                (ip_port, url)
207            })
208            .collect();
209
210        let mut add_hosts = Vec::new();
211        let mut removed_hosts = Vec::new();
212
213        for (key, new_host) in new_urls_map.iter() {
214            let old_host = old_urls_map.get(key);
215            match old_host {
216                None => {
217                    add_hosts.push(new_host.clone());
218                }
219                Some(old_host) => {
220                    if !old_host.eq(new_host) {
221                        removed_hosts.push(old_host.clone());
222                        add_hosts.push(new_host.clone());
223                    }
224                }
225            }
226        }
227
228        for (key, old_host) in old_urls_map.iter() {
229            let new_host = old_urls_map.get(key);
230            match new_host {
231                None => {
232                    removed_hosts.push(old_host.clone());
233                }
234                Some(_) => {}
235            }
236        }
237
238        (removed_hosts, add_hosts)
239    }
240}
241
242impl Default for ZookeeperRegistry {
243    fn default() -> ZookeeperRegistry {
244        let zk_connect_string = match env::var("ZOOKEEPER_SERVERS") {
245            Ok(val) => val,
246            Err(_) => {
247                let default_connect_string = "localhost:2181";
248                info!(
249                    "No ZOOKEEPER_SERVERS env value, using {} as default.",
250                    default_connect_string
251                );
252                default_connect_string.to_string()
253            }
254        };
255        println!(
256            "using external registry with it's connect string {}",
257            zk_connect_string.as_str()
258        );
259        ZookeeperRegistry::new(zk_connect_string.as_str())
260    }
261}
262
263#[async_trait]
264impl Registry for ZookeeperRegistry {
265    async fn register(&self, url: Url) -> Result<(), StdError> {
266        debug!("register url: {}", url);
267        let interface_name = url.query::<InterfaceName>().unwrap().value();
268        let url_str = url.as_str();
269        let zk_path = format!(
270            "/{}/{}/{}/{}",
271            DUBBO_KEY, interface_name, PROVIDERS_KEY, url_str
272        );
273        self.create_path_with_parent_check(zk_path.as_str(), LOCALHOST_IP, CreateMode::Ephemeral)?;
274        Ok(())
275    }
276
277    async fn unregister(&self, url: Url) -> Result<(), StdError> {
278        let interface_name = url.query::<InterfaceName>().unwrap().value();
279        let url_str = url.as_str();
280
281        let zk_path = format!(
282            "/{}/{}/{}/{}",
283            DUBBO_KEY, interface_name, PROVIDERS_KEY, url_str
284        );
285        self.delete_path(zk_path.as_str());
286        Ok(())
287    }
288
289    // for consumer to find the changes of providers
290    async fn subscribe(&self, url: Url) -> Result<DiscoverStream, StdError> {
291        let interface_name = url.query::<InterfaceName>().unwrap().value();
292
293        let zk_path = format!("/{}/{}/{}", DUBBO_KEY, interface_name, PROVIDERS_KEY);
294
295        debug!("subscribe service: {}", zk_path);
296
297        let (listener, mut change_rx) = ZooKeeperListener::new();
298        let arc_listener = Arc::new(listener);
299
300        let watcher = ZooKeeperWatcher::new(arc_listener.clone(), zk_path.clone());
301
302        let (discover_tx, discover_rx) = mpsc::channel(64);
303
304        let zk_client_in_task = self.zk_client.clone();
305        let zk_path_in_task = zk_path.clone();
306        let interface_name_in_task = interface_name.clone();
307        let arc_listener_in_task = arc_listener.clone();
308        tokio::spawn(async move {
309            let zk_client = zk_client_in_task;
310            let zk_path = zk_path_in_task;
311            let interface_name = interface_name_in_task;
312            let listener = arc_listener_in_task;
313
314            let mut current_urls = Vec::new();
315
316            loop {
317                let changed = select! {
318                    _ = discover_tx.closed() => {
319                        info!("discover task quit, discover channel closed");
320                        None
321                    },
322                    changed = change_rx.recv() => {
323                        changed
324                    }
325                };
326
327                match changed {
328                    Some(_) => {
329                        let zookeeper_watcher =
330                            ZooKeeperWatcher::new(listener.clone(), zk_path.clone());
331
332                        match zk_client.get_children_w(&zk_path, zookeeper_watcher) {
333                            Ok(children) => {
334                                let (removed, add) =
335                                    ZookeeperRegistry::diff(&current_urls, &children);
336
337                                for url in removed {
338                                    match discover_tx
339                                        .send(Ok(ServiceChange::Remove(url.clone())))
340                                        .await
341                                    {
342                                        Ok(_) => {}
343                                        Err(e) => {
344                                            error!("send service change failed: {:?}, maybe user unsubscribe", e);
345                                            break;
346                                        }
347                                    }
348                                }
349
350                                for url in add {
351                                    match discover_tx
352                                        .send(Ok(ServiceChange::Insert(url.clone(), ())))
353                                        .await
354                                    {
355                                        Ok(_) => {}
356                                        Err(e) => {
357                                            error!("send service change failed: {:?}, maybe user unsubscribe", e);
358                                            break;
359                                        }
360                                    }
361                                }
362
363                                current_urls = children;
364                            }
365                            Err(err) => {
366                                error!("zk subscribe error: {}", err);
367                                break;
368                            }
369                        }
370                    }
371                    None => {
372                        error!("receive service change task quit, unsubscribe {}.", zk_path);
373                        break;
374                    }
375                }
376            }
377
378            debug!("unsubscribe service: {}", zk_path);
379        });
380
381        arc_listener.changed(zk_path);
382
383        Ok(discover_rx)
384    }
385
386    async fn unsubscribe(&self, url: Url) -> Result<(), StdError> {
387        let interface_name = url.query::<InterfaceName>().unwrap().value();
388
389        let zk_path = format!("/{}/{}/{}", DUBBO_KEY, &interface_name, PROVIDERS_KEY);
390
391        info!("unsubscribe service: {}", zk_path);
392        Ok(())
393    }
394
395    fn url(&self) -> &Url {
396        todo!()
397    }
398}
399
400pub struct ZooKeeperListener {
401    tx: mpsc::Sender<String>,
402}
403
404impl ZooKeeperListener {
405    pub fn new() -> (ZooKeeperListener, mpsc::Receiver<String>) {
406        let (tx, rx) = mpsc::channel(64);
407        let this = ZooKeeperListener { tx };
408        (this, rx)
409    }
410
411    pub fn changed(&self, path: String) {
412        match self.tx.try_send(path) {
413            Ok(_) => {}
414            Err(err) => {
415                error!("send change list to listener occur an error: {}", err);
416                return;
417            }
418        }
419    }
420}
421
422pub struct ZooKeeperWatcher {
423    listener: Arc<ZooKeeperListener>,
424    path: String,
425}
426
427impl ZooKeeperWatcher {
428    pub fn new(listener: Arc<ZooKeeperListener>, path: String) -> ZooKeeperWatcher {
429        ZooKeeperWatcher { listener, path }
430    }
431}
432
433impl Watcher for ZooKeeperWatcher {
434    fn handle(&self, event: WatchedEvent) {
435        info!("receive zookeeper event: {:?}", event);
436        let event_type: WatchedEventType = event.event_type;
437        match event_type {
438            WatchedEventType::None => {
439                info!("event type is none, ignore it.");
440                return;
441            }
442            _ => {}
443        }
444
445        self.listener.changed(self.path.clone());
446    }
447}