krpc_core/register/
zookeeper.rs

1use super::{Register, Resource, SocketInfo};
2use crate::support::dubbo::{decode_url, encode_url};
3use async_recursion::async_recursion;
4use std::{collections::HashMap, sync::Arc, time::Duration};
5use tokio::sync::RwLock;
6use tracing::info;
7use zk::OneshotWatcher;
8use zookeeper_client as zk;
9
10static EPHEMERAL_OPEN: &zk::CreateOptions<'static> =
11    &zk::CreateMode::Ephemeral.with_acls(zk::Acls::anyone_all());
12static CONTAINER_OPEN: &zk::CreateOptions<'static> =
13    &zk::CreateMode::Container.with_acls(zk::Acls::anyone_all());
14
15pub struct KrpcZookeeper {
16    addr: String,
17
18    root_path: String,
19
20    map: Arc<RwLock<HashMap<String, Vec<SocketInfo>>>>,
21}
22
23impl Register for KrpcZookeeper {
24    fn add_resource(&self, resource: Resource) {
25        creat_resource_node(
26            self.addr.clone(),
27            self.root_path.clone(),
28            resource,
29            self.map.clone(),
30        )
31    }
32}
33
34impl KrpcZookeeper {
35    pub fn init(
36        addr: &str,
37        _name_space: &str,
38        map: Arc<RwLock<HashMap<String, Vec<SocketInfo>>>>,
39    ) -> Self {
40        let root_path = "/dubbo".to_string();
41        let krpc_zookeeper = KrpcZookeeper {
42            addr: addr.to_string(),
43            root_path,
44            map,
45        };
46        return krpc_zookeeper;
47    }
48}
49
50#[async_recursion]
51async fn connect(cluster: &str, chroot: &str) -> zk::Client {
52    let client = match zk::Client::connect(&cluster).await {
53        Ok(client) => client,
54        Err(err) => {
55            tokio::time::sleep(Duration::from_secs(30)).await;
56            info!("connect err {:?} ,Try to re-establish the connection", err);
57            return connect(cluster, chroot).await;
58        }
59    };
60    if chroot.len() <= 1 {
61        return client;
62    }
63    let mut i = 1;
64    while i <= chroot.len() {
65        let j = match chroot[i..].find('/') {
66            Some(j) => j + i,
67            None => chroot.len(),
68        };
69        let path = &chroot[..j];
70        match client
71            .create(path, Default::default(), CONTAINER_OPEN)
72            .await
73        {
74            Ok(_) | Err(zk::Error::NodeExists) => {}
75            Err(err) => {
76                tokio::time::sleep(Duration::from_secs(30)).await;
77                info!("connect err {:?} ,Try to re-establish the connection", err);
78                return connect(cluster, chroot).await;
79            }
80        }
81        i = j + 1;
82    }
83    match client.chroot(chroot.to_string()) {
84        Ok(client) => client,
85        Err(err) => {
86            tokio::time::sleep(Duration::from_secs(30)).await;
87            info!("connect err {:?} ,Try to re-establish the connection", err);
88            return connect(cluster, chroot).await;
89        }
90    }
91}
92
93fn creat_resource_node(
94    cluster: String,
95    root: String,
96    resource: Resource,
97    map: Arc<RwLock<HashMap<String, Vec<SocketInfo>>>>,
98) {
99    let mut path = root.to_string();
100    let info = match &resource {
101        Resource::Client(info) => {
102            listener_resource_node_change(
103                cluster.clone(),
104                root,
105                Resource::Client(info.clone()),
106                map,
107            );
108            path.push_str(&("/".to_owned() + &info.server_name + "/consumers"));
109            info
110        }
111        Resource::Server(info) => {
112            path.push_str(&("/".to_owned() + &info.server_name + "/providers"));
113            info
114        }
115    };
116    let node_name = encode_url(&resource);
117    let node_data = serde_json::to_string(&info).unwrap();
118    tokio::spawn(async move {
119        loop {
120            let client = connect(&cluster, &path).await;
121            match client
122                .create(&node_name, node_data.as_bytes(), EPHEMERAL_OPEN)
123                .await
124            {
125                Ok(_) => {}
126                Err(err) => {
127                    info!("create node err {:?}", err);
128                    tokio::time::sleep(Duration::from_secs(5)).await;
129                    continue;
130                }
131            }
132            match client.check_and_watch_stat(&node_name).await {
133                Ok(watch) => {
134                    let event = watch.1.changed().await;
135                    info!("resource node event {:?}", event);
136                }
137                Err(err) => {
138                    info!("resource node err {:?}", err);
139                }
140            };
141            drop(client);
142        }
143    });
144}
145
146fn listener_resource_node_change(
147    cluster: String,
148    root: String,
149    resource: Resource,
150    map: Arc<RwLock<HashMap<String, Vec<SocketInfo>>>>,
151) {
152    let mut path = root;
153    let info = match resource {
154        Resource::Client(info) => {
155            path.push_str(&("/".to_owned() + &info.server_name + "/providers"));
156            info
157        }
158        Resource::Server(_) => return,
159    };
160    tokio::spawn(async move {
161        let mut client = connect(&cluster.clone(), &path).await;
162        let map = map;
163        let info = info;
164        loop {
165            let watcher: (Vec<String>, zk::Stat, OneshotWatcher) =
166                match client.get_and_watch_children("/").await {
167                    Ok(watcher) => watcher,
168                    Err(_) => {
169                        client = connect(&cluster.clone(), &path).await;
170                        continue;
171                    }
172                };
173            let mut server_list = vec![];
174            for node in watcher.0 {
175                let resource = decode_url(&node);
176                if let Ok(resource) = resource {
177                    if let Resource::Server(resource_info) = resource {
178                        if &info.version == &resource_info.version  {
179                            server_list.push(SocketInfo {
180                                info : resource_info,
181                                sender: Arc::new(RwLock::new(None)),
182                            });
183                        }
184                    }
185                }
186            }
187            let mut key = info.server_name.clone();
188            if let Some(version) = &info.version {
189                key.push_str(":");
190                key.push_str(version);
191            }
192            info!("update server cache {:?} : {:?}", key, server_list);
193            let mut temp_map = map.write().await;
194            temp_map.insert(key, server_list);
195            drop(temp_map);
196            let event: zk::WatchedEvent = watcher.2.changed().await;
197            if let zk::EventType::NodeChildrenChanged = event.event_type {
198                info!("Monitor node changes");
199            } else {
200                client = connect(&cluster.clone(), &path).await;
201            }
202        }
203    });
204}