krpc_core/register/
zookeeper.rs1use super::{Register, Resource, SocketInfo};
2use crate::support::dubbo::{decode_url, encode_url};
3use async_recursion::async_recursion;
4use std::{collections::HashMap, sync::Arc, time::Duration};
5use tokio::sync::RwLock;
6use tracing::info;
7use zk::OneshotWatcher;
8use zookeeper_client as zk;
9
10static EPHEMERAL_OPEN: &zk::CreateOptions<'static> =
11 &zk::CreateMode::Ephemeral.with_acls(zk::Acls::anyone_all());
12static CONTAINER_OPEN: &zk::CreateOptions<'static> =
13 &zk::CreateMode::Container.with_acls(zk::Acls::anyone_all());
14
15pub struct KrpcZookeeper {
16 addr: String,
17
18 root_path: String,
19
20 map: Arc<RwLock<HashMap<String, Vec<SocketInfo>>>>,
21}
22
23impl Register for KrpcZookeeper {
24 fn add_resource(&self, resource: Resource) {
25 creat_resource_node(
26 self.addr.clone(),
27 self.root_path.clone(),
28 resource,
29 self.map.clone(),
30 )
31 }
32}
33
34impl KrpcZookeeper {
35 pub fn init(
36 addr: &str,
37 _name_space: &str,
38 map: Arc<RwLock<HashMap<String, Vec<SocketInfo>>>>,
39 ) -> Self {
40 let root_path = "/dubbo".to_string();
41 let krpc_zookeeper = KrpcZookeeper {
42 addr: addr.to_string(),
43 root_path,
44 map,
45 };
46 return krpc_zookeeper;
47 }
48}
49
50#[async_recursion]
51async fn connect(cluster: &str, chroot: &str) -> zk::Client {
52 let client = match zk::Client::connect(&cluster).await {
53 Ok(client) => client,
54 Err(err) => {
55 tokio::time::sleep(Duration::from_secs(30)).await;
56 info!("connect err {:?} ,Try to re-establish the connection", err);
57 return connect(cluster, chroot).await;
58 }
59 };
60 if chroot.len() <= 1 {
61 return client;
62 }
63 let mut i = 1;
64 while i <= chroot.len() {
65 let j = match chroot[i..].find('/') {
66 Some(j) => j + i,
67 None => chroot.len(),
68 };
69 let path = &chroot[..j];
70 match client
71 .create(path, Default::default(), CONTAINER_OPEN)
72 .await
73 {
74 Ok(_) | Err(zk::Error::NodeExists) => {}
75 Err(err) => {
76 tokio::time::sleep(Duration::from_secs(30)).await;
77 info!("connect err {:?} ,Try to re-establish the connection", err);
78 return connect(cluster, chroot).await;
79 }
80 }
81 i = j + 1;
82 }
83 match client.chroot(chroot.to_string()) {
84 Ok(client) => client,
85 Err(err) => {
86 tokio::time::sleep(Duration::from_secs(30)).await;
87 info!("connect err {:?} ,Try to re-establish the connection", err);
88 return connect(cluster, chroot).await;
89 }
90 }
91}
92
93fn creat_resource_node(
94 cluster: String,
95 root: String,
96 resource: Resource,
97 map: Arc<RwLock<HashMap<String, Vec<SocketInfo>>>>,
98) {
99 let mut path = root.to_string();
100 let info = match &resource {
101 Resource::Client(info) => {
102 listener_resource_node_change(
103 cluster.clone(),
104 root,
105 Resource::Client(info.clone()),
106 map,
107 );
108 path.push_str(&("/".to_owned() + &info.server_name + "/consumers"));
109 info
110 }
111 Resource::Server(info) => {
112 path.push_str(&("/".to_owned() + &info.server_name + "/providers"));
113 info
114 }
115 };
116 let node_name = encode_url(&resource);
117 let node_data = serde_json::to_string(&info).unwrap();
118 tokio::spawn(async move {
119 loop {
120 let client = connect(&cluster, &path).await;
121 match client
122 .create(&node_name, node_data.as_bytes(), EPHEMERAL_OPEN)
123 .await
124 {
125 Ok(_) => {}
126 Err(err) => {
127 info!("create node err {:?}", err);
128 tokio::time::sleep(Duration::from_secs(5)).await;
129 continue;
130 }
131 }
132 match client.check_and_watch_stat(&node_name).await {
133 Ok(watch) => {
134 let event = watch.1.changed().await;
135 info!("resource node event {:?}", event);
136 }
137 Err(err) => {
138 info!("resource node err {:?}", err);
139 }
140 };
141 drop(client);
142 }
143 });
144}
145
146fn listener_resource_node_change(
147 cluster: String,
148 root: String,
149 resource: Resource,
150 map: Arc<RwLock<HashMap<String, Vec<SocketInfo>>>>,
151) {
152 let mut path = root;
153 let info = match resource {
154 Resource::Client(info) => {
155 path.push_str(&("/".to_owned() + &info.server_name + "/providers"));
156 info
157 }
158 Resource::Server(_) => return,
159 };
160 tokio::spawn(async move {
161 let mut client = connect(&cluster.clone(), &path).await;
162 let map = map;
163 let info = info;
164 loop {
165 let watcher: (Vec<String>, zk::Stat, OneshotWatcher) =
166 match client.get_and_watch_children("/").await {
167 Ok(watcher) => watcher,
168 Err(_) => {
169 client = connect(&cluster.clone(), &path).await;
170 continue;
171 }
172 };
173 let mut server_list = vec![];
174 for node in watcher.0 {
175 let resource = decode_url(&node);
176 if let Ok(resource) = resource {
177 if let Resource::Server(resource_info) = resource {
178 if &info.version == &resource_info.version {
179 server_list.push(SocketInfo {
180 info : resource_info,
181 sender: Arc::new(RwLock::new(None)),
182 });
183 }
184 }
185 }
186 }
187 let mut key = info.server_name.clone();
188 if let Some(version) = &info.version {
189 key.push_str(":");
190 key.push_str(version);
191 }
192 info!("update server cache {:?} : {:?}", key, server_list);
193 let mut temp_map = map.write().await;
194 temp_map.insert(key, server_list);
195 drop(temp_map);
196 let event: zk::WatchedEvent = watcher.2.changed().await;
197 if let zk::EventType::NodeChildrenChanged = event.event_type {
198 info!("Monitor node changes");
199 } else {
200 client = connect(&cluster.clone(), &path).await;
201 }
202 }
203 });
204}