1use crate::response::parse_service_response;
2use crate::trace::debug;
3use crate::watch::Oneself;
4use crate::{MetaDetector, Watcher};
5use detector::{Meta, MetaKey, Service, ServiceKey, ServiceStatus, Services};
6use std::collections::HashMap;
7use std::time::Duration;
8
9struct Inner {
10 service: Service,
11 client: etcd_client::Client,
12 sender: tokio::sync::watch::Sender<ServiceStatus>,
13 receiver: tokio::sync::watch::Receiver<ServiceStatus>,
14}
15
16pub struct Detector {
17 inner: Inner,
18}
19
20impl Detector {
21 pub async fn connect<E: AsRef<str>, S: AsRef<[E]>>(
23 service: Service,
24 endpoints: S,
25 ) -> Result<crate::Detector, etcd_client::Error> {
26 let options = Some(
27 etcd_client::ConnectOptions::new()
28 .with_keep_alive(Duration::from_secs(5), Duration::from_secs(10)),
29 );
30 let client = etcd_client::Client::connect(endpoints, options).await?;
31 Ok(crate::Detector::new(client, service))
32 }
33
34 pub fn new(client: etcd_client::Client, service: Service) -> Self {
35 let (sender, receiver) = tokio::sync::watch::channel(ServiceStatus::Unregistered);
36 Self {
37 inner: Inner {
38 service,
39 client,
40 sender,
41 receiver,
42 },
43 }
44 }
45
46 async fn real_fetch(&mut self) -> Result<etcd_client::GetResponse, etcd_client::Error> {
47 self.inner
48 .client
49 .get(
50 self.inner.service.key.parent_path(),
51 Some(etcd_client::GetOptions::new().with_prefix()),
52 )
53 .await
54 }
55
56 async fn real_fetch_all(&mut self) -> Result<etcd_client::GetResponse, etcd_client::Error> {
57 self.inner
58 .client
59 .get(
60 self.inner.service.key.root_path(),
61 Some(etcd_client::GetOptions::new().with_prefix()),
62 )
63 .await
64 }
65
66 async fn watch_inner(
67 &mut self,
68 resp: etcd_client::GetResponse,
69 key: String,
70 ) -> Result<Watcher<Service, ServiceKey>, etcd_client::Error> {
71 let revision = resp.header().unwrap().revision();
73
74 let watch = self
76 .inner
77 .client
78 .watch(
79 key,
80 Some(
81 etcd_client::WatchOptions::new()
82 .with_prefix()
83 .with_start_revision(revision + 1),
84 ),
85 )
86 .await?;
87
88 let init: Vec<Service> = parse_service_response(resp)
89 .into_values()
90 .map(|s| s.services)
91 .flatten()
92 .collect();
93 Ok(Watcher::new(init, watch.0, watch.1))
94 }
95}
96
97impl Detector {
98 pub fn service(&self) -> &Service {
99 &self.inner.service
100 }
101
102 pub async fn fetch(&mut self) -> Result<Services, etcd_client::Error> {
104 let name = &self.inner.service.key.name.clone();
105 let mut resp = parse_service_response(self.real_fetch().await?);
106 match resp.remove(name) {
107 None => Ok(Services::new()),
108 Some(s) => Ok(s),
109 }
110 }
111
112 pub async fn fetch_all(&mut self) -> Result<HashMap<String, Services>, etcd_client::Error> {
114 Ok(parse_service_response(self.real_fetch_all().await?))
115 }
116
117 pub async fn watch(&mut self) -> Result<Watcher<Service, ServiceKey>, etcd_client::Error> {
119 let resp = self.real_fetch().await?;
120 let parent_path = self.inner.service.key.parent_path();
121 self.watch_inner(resp, parent_path).await
122 }
123
124 pub async fn watch_all(&mut self) -> Result<Watcher<Service, ServiceKey>, etcd_client::Error> {
126 let resp = self.real_fetch_all().await?;
127 let root_path = self.inner.service.key.root_path();
128 self.watch_inner(resp, root_path).await
129 }
130
131 pub fn oneself(&self) -> Oneself {
133 Oneself::new(self.inner.receiver.clone())
134 }
135
136 pub async fn register(&mut self) -> Result<Oneself, etcd_client::Error> {
138 if !self.inner.service.key.has_id() {
139 register_service(&mut self.inner).await?;
140 }
141 Ok(self.oneself())
142 }
143}
144
145async fn register_service(inner: &mut Inner) -> Result<(), etcd_client::Error> {
146 let mut meta_detect = MetaDetector::new(
148 inner.client.clone(),
149 Meta::from_key(MetaKey::new(
150 inner.service.key.name.clone(),
151 inner.service.key.ns.clone(),
152 )),
153 );
154
155 let meta = meta_detect
156 .fetch()
157 .await?
158 .ok_or(etcd_client::Error::InvalidArgs("no meta".to_string()))?;
159 let instances = meta.instances;
160
161 inner.service.meta = Some(meta);
163 inner.service.ttl = Some(
165 inner
166 .service
167 .ttl
168 .map_or(60, |t| if t < 10 { 10 } else { t }),
169 );
170
171 let lease_id = {
172 let mut lease_id = 0;
173 for id in 0..instances {
175 inner.service.key.id = Some(id);
176 let result = do_register(&inner.service, &mut inner.client).await?;
177 if result != 0 {
178 lease_id = result;
179 break;
180 }
181 }
182 lease_id
183 };
184
185 if lease_id == 0 {
186 debug!(
187 "failed to register cause of no instance enough: {:?}",
188 inner.service
189 );
190 return Err(etcd_client::Error::InvalidArgs(
192 "no instance enough".to_string(),
193 ));
194 }
195
196 let _ = inner.sender.send(ServiceStatus::Registered);
198
199 keep_alive(
201 lease_id,
202 inner.service.clone(),
203 inner.client.clone(),
204 inner.sender.clone(),
205 );
206 Ok(())
207}
208
209async fn do_register(
210 service: &Service,
211 client: &mut etcd_client::Client,
212) -> Result<i64, etcd_client::Error> {
213 debug!("try to register with:{:?}", service);
214 let path = service.key.path().unwrap();
215 let lease = client.lease_grant(service.ttl.unwrap(), None).await?;
217
218 let cmp = etcd_client::Compare::version(&*path, etcd_client::CompareOp::Equal, 0);
220
221 let value = serde_json::to_string(&service).expect("Serialize Error");
223
224 let put = etcd_client::TxnOp::put(
226 path,
227 value,
228 Some(etcd_client::PutOptions::new().with_lease(lease.id())),
229 );
230
231 let txn = etcd_client::Txn::new().when([cmp]).and_then([put]);
232
233 let resp = client.txn(txn).await?;
235
236 if resp.succeeded() {
237 debug!("register ok, lease id:{:?}", lease.id());
238 Ok(lease.id())
239 } else {
240 Ok(0)
241 }
242}
243
244fn keep_alive(
245 lease_id: i64,
246 service: Service,
247 mut client: etcd_client::Client,
248 sender: tokio::sync::watch::Sender<ServiceStatus>,
249) {
250 async fn do_keep_alive(
251 lease_id: i64,
252 client: &mut etcd_client::Client,
253 sender: &tokio::sync::watch::Sender<ServiceStatus>,
254 ) {
255 debug!("lease id:{} enter keep_alive", lease_id);
256 loop {
257 if sender.sender_count() == 1 {
259 break;
260 }
261
262 match client.lease_keep_alive(lease_id).await {
263 Ok((mut keeper, mut stream)) => {
264 loop {
266 if sender.sender_count() == 1 {
268 break;
269 }
270 if let Err(_e) = keeper.keep_alive().await {
271 debug!(
272 "lease id:{} send keep_alive request happened error:{:?}",
273 lease_id, _e
274 );
275 break;
276 }
277 match stream.message().await {
278 Err(_e) => {
279 debug!(
280 "lease id:{} recv keep_alive response happened error:{:?}",
281 lease_id, _e
282 );
283 break;
284 }
285 Ok(Some(r)) => {
286 if r.ttl() == 0 {
287 return;
289 }
290 let _ = sender.send(ServiceStatus::Registered);
291 }
292 Ok(None) => {
293 break;
295 }
296 }
297 tokio::time::sleep(Duration::from_secs(5)).await;
299 }
300 }
301 Err(etcd_client::Error::LeaseKeepAliveError(_e)) => {
302 debug!(
304 "lease id:{} get keep_alive stream happened error:{:?}",
305 lease_id, _e
306 );
307 return;
308 }
309 Err(_e) => {
310 debug!(
311 "lease id:{} get keep_alive stream happened error:{:?}",
312 lease_id, _e
313 );
314 tokio::time::sleep(Duration::from_secs(1)).await;
315 }
316 }
317
318 let _ = sender.send(ServiceStatus::Unregistered);
320 }
321 }
322
323 tokio::spawn(async move {
324 do_keep_alive(lease_id, &mut client, &sender).await;
325 let _ = sender.send(ServiceStatus::Unregistered);
326 register_service_again(service, client, sender).await;
327 });
328}
329
330async fn register_service_again(
331 service: Service,
332 mut client: etcd_client::Client,
333 sender: tokio::sync::watch::Sender<ServiceStatus>,
334) {
335 let lease_id = loop {
337 if sender.sender_count() == 1 {
338 let path = service.key.path().unwrap();
340 let _ = client.delete(path, None).await;
341 debug!("unregister ok: {:?}", service);
342 return;
343 }
344
345 match do_register(&service, &mut client).await {
346 Ok(r) if r != 0 => {
347 break r;
348 }
349 _ => {
350 tokio::time::sleep(Duration::from_secs(2)).await;
351 }
352 }
353 };
354 if lease_id != 0 {
355 let _ = sender.send(ServiceStatus::Registered);
357 keep_alive(lease_id, service, client, sender);
358 }
359}