etcd_detector/
detect.rs

1use crate::response::parse_service_response;
2use crate::trace::debug;
3use crate::watch::Oneself;
4use crate::{MetaDetector, Watcher};
5use detector::{Meta, MetaKey, Service, ServiceKey, ServiceStatus, Services};
6use std::collections::HashMap;
7use std::time::Duration;
8
9struct Inner {
10    service: Service,
11    client: etcd_client::Client,
12    sender: tokio::sync::watch::Sender<ServiceStatus>,
13    receiver: tokio::sync::watch::Receiver<ServiceStatus>,
14}
15
16pub struct Detector {
17    inner: Inner,
18}
19
20impl Detector {
21    /// Connect to etcd servers from given endpoints.
22    pub async fn connect<E: AsRef<str>, S: AsRef<[E]>>(
23        service: Service,
24        endpoints: S,
25    ) -> Result<crate::Detector, etcd_client::Error> {
26        let options = Some(
27            etcd_client::ConnectOptions::new()
28                .with_keep_alive(Duration::from_secs(5), Duration::from_secs(10)),
29        );
30        let client = etcd_client::Client::connect(endpoints, options).await?;
31        Ok(crate::Detector::new(client, service))
32    }
33
34    pub fn new(client: etcd_client::Client, service: Service) -> Self {
35        let (sender, receiver) = tokio::sync::watch::channel(ServiceStatus::Unregistered);
36        Self {
37            inner: Inner {
38                service,
39                client,
40                sender,
41                receiver,
42            },
43        }
44    }
45
46    async fn real_fetch(&mut self) -> Result<etcd_client::GetResponse, etcd_client::Error> {
47        self.inner
48            .client
49            .get(
50                self.inner.service.key.parent_path(),
51                Some(etcd_client::GetOptions::new().with_prefix()),
52            )
53            .await
54    }
55
56    async fn real_fetch_all(&mut self) -> Result<etcd_client::GetResponse, etcd_client::Error> {
57        self.inner
58            .client
59            .get(
60                self.inner.service.key.root_path(),
61                Some(etcd_client::GetOptions::new().with_prefix()),
62            )
63            .await
64    }
65
66    async fn watch_inner(
67        &mut self,
68        resp: etcd_client::GetResponse,
69        key: String,
70    ) -> Result<Watcher<Service, ServiceKey>, etcd_client::Error> {
71        // 2. 获取 revision
72        let revision = resp.header().unwrap().revision();
73
74        // 3. 创建 watch
75        let watch = self
76            .inner
77            .client
78            .watch(
79                key,
80                Some(
81                    etcd_client::WatchOptions::new()
82                        .with_prefix()
83                        .with_start_revision(revision + 1),
84                ),
85            )
86            .await?;
87
88        let init: Vec<Service> = parse_service_response(resp)
89            .into_values()
90            .map(|s| s.services)
91            .flatten()
92            .collect();
93        Ok(Watcher::new(init, watch.0, watch.1))
94    }
95}
96
97impl Detector {
98    pub fn service(&self) -> &Service {
99        &self.inner.service
100    }
101
102    /// 获取本类型服所有实例
103    pub async fn fetch(&mut self) -> Result<Services, etcd_client::Error> {
104        let name = &self.inner.service.key.name.clone();
105        let mut resp = parse_service_response(self.real_fetch().await?);
106        match resp.remove(name) {
107            None => Ok(Services::new()),
108            Some(s) => Ok(s),
109        }
110    }
111
112    /// 获取所有注册的服务实例
113    pub async fn fetch_all(&mut self) -> Result<HashMap<String, Services>, etcd_client::Error> {
114        Ok(parse_service_response(self.real_fetch_all().await?))
115    }
116
117    /// 监听本类型服务实例
118    pub async fn watch(&mut self) -> Result<Watcher<Service, ServiceKey>, etcd_client::Error> {
119        let resp = self.real_fetch().await?;
120        let parent_path = self.inner.service.key.parent_path();
121        self.watch_inner(resp, parent_path).await
122    }
123
124    /// 监听所有注册的服务实例
125    pub async fn watch_all(&mut self) -> Result<Watcher<Service, ServiceKey>, etcd_client::Error> {
126        let resp = self.real_fetch_all().await?;
127        let root_path = self.inner.service.key.root_path();
128        self.watch_inner(resp, root_path).await
129    }
130
131    /// 监听服务自己本身状态
132    pub fn oneself(&self) -> Oneself {
133        Oneself::new(self.inner.receiver.clone())
134    }
135
136    /// 第一次注册如果失败则返回,如果成功也返回但内部会有掉线重试机制
137    pub async fn register(&mut self) -> Result<Oneself, etcd_client::Error> {
138        if !self.inner.service.key.has_id() {
139            register_service(&mut self.inner).await?;
140        }
141        Ok(self.oneself())
142    }
143}
144
145async fn register_service(inner: &mut Inner) -> Result<(), etcd_client::Error> {
146    // 先获取元数据
147    let mut meta_detect = MetaDetector::new(
148        inner.client.clone(),
149        Meta::from_key(MetaKey::new(
150            inner.service.key.name.clone(),
151            inner.service.key.ns.clone(),
152        )),
153    );
154
155    let meta = meta_detect
156        .fetch()
157        .await?
158        .ok_or(etcd_client::Error::InvalidArgs("no meta".to_string()))?;
159    let instances = meta.instances;
160
161    // 填充meta
162    inner.service.meta = Some(meta);
163    // 如果没设置ttl则给个默认的
164    inner.service.ttl = Some(
165        inner
166            .service
167            .ttl
168            .map_or(60, |t| if t < 10 { 10 } else { t }),
169    );
170
171    let lease_id = {
172        let mut lease_id = 0;
173        // 挨个尝试
174        for id in 0..instances {
175            inner.service.key.id = Some(id);
176            let result = do_register(&inner.service, &mut inner.client).await?;
177            if result != 0 {
178                lease_id = result;
179                break;
180            }
181        }
182        lease_id
183    };
184
185    if lease_id == 0 {
186        debug!(
187            "failed to register cause of no instance enough: {:?}",
188            inner.service
189        );
190        // 实例数不够
191        return Err(etcd_client::Error::InvalidArgs(
192            "no instance enough".to_string(),
193        ));
194    }
195
196    // 状态修改
197    let _ = inner.sender.send(ServiceStatus::Registered);
198
199    // 维系租约
200    keep_alive(
201        lease_id,
202        inner.service.clone(),
203        inner.client.clone(),
204        inner.sender.clone(),
205    );
206    Ok(())
207}
208
209async fn do_register(
210    service: &Service,
211    client: &mut etcd_client::Client,
212) -> Result<i64, etcd_client::Error> {
213    debug!("try to register with:{:?}", service);
214    let path = service.key.path().unwrap();
215    // 获取租约
216    let lease = client.lease_grant(service.ttl.unwrap(), None).await?;
217
218    // key不存在时版本是0
219    let cmp = etcd_client::Compare::version(&*path, etcd_client::CompareOp::Equal, 0);
220
221    // 序列化数据
222    let value = serde_json::to_string(&service).expect("Serialize Error");
223
224    // 定义事务
225    let put = etcd_client::TxnOp::put(
226        path,
227        value,
228        Some(etcd_client::PutOptions::new().with_lease(lease.id())),
229    );
230
231    let txn = etcd_client::Txn::new().when([cmp]).and_then([put]);
232
233    // 执行事务
234    let resp = client.txn(txn).await?;
235
236    if resp.succeeded() {
237        debug!("register ok, lease id:{:?}", lease.id());
238        Ok(lease.id())
239    } else {
240        Ok(0)
241    }
242}
243
244fn keep_alive(
245    lease_id: i64,
246    service: Service,
247    mut client: etcd_client::Client,
248    sender: tokio::sync::watch::Sender<ServiceStatus>,
249) {
250    async fn do_keep_alive(
251        lease_id: i64,
252        client: &mut etcd_client::Client,
253        sender: &tokio::sync::watch::Sender<ServiceStatus>,
254    ) {
255        debug!("lease id:{} enter keep_alive", lease_id);
256        loop {
257            // 退出
258            if sender.sender_count() == 1 {
259                break;
260            }
261
262            match client.lease_keep_alive(lease_id).await {
263                Ok((mut keeper, mut stream)) => {
264                    // 持续发送消息维持租约
265                    loop {
266                        // 退出
267                        if sender.sender_count() == 1 {
268                            break;
269                        }
270                        if let Err(_e) = keeper.keep_alive().await {
271                            debug!(
272                                "lease id:{} send keep_alive request happened error:{:?}",
273                                lease_id, _e
274                            );
275                            break;
276                        }
277                        match stream.message().await {
278                            Err(_e) => {
279                                debug!(
280                                    "lease id:{} recv keep_alive response happened error:{:?}",
281                                    lease_id, _e
282                                );
283                                break;
284                            }
285                            Ok(Some(r)) => {
286                                if r.ttl() == 0 {
287                                    // 租约到期了, 要重新注册
288                                    return;
289                                }
290                                let _ = sender.send(ServiceStatus::Registered);
291                            }
292                            Ok(None) => {
293                                // 流断了
294                                break;
295                            }
296                        }
297                        // 稍做停顿
298                        tokio::time::sleep(Duration::from_secs(5)).await;
299                    }
300                }
301                Err(etcd_client::Error::LeaseKeepAliveError(_e)) => {
302                    // 租约不存在了
303                    debug!(
304                        "lease id:{} get keep_alive stream happened error:{:?}",
305                        lease_id, _e
306                    );
307                    return;
308                }
309                Err(_e) => {
310                    debug!(
311                        "lease id:{} get keep_alive stream happened error:{:?}",
312                        lease_id, _e
313                    );
314                    tokio::time::sleep(Duration::from_secs(1)).await;
315                }
316            }
317
318            // 设置为未注册
319            let _ = sender.send(ServiceStatus::Unregistered);
320        }
321    }
322
323    tokio::spawn(async move {
324        do_keep_alive(lease_id, &mut client, &sender).await;
325        let _ = sender.send(ServiceStatus::Unregistered);
326        register_service_again(service, client, sender).await;
327    });
328}
329
330async fn register_service_again(
331    service: Service,
332    mut client: etcd_client::Client,
333    sender: tokio::sync::watch::Sender<ServiceStatus>,
334) {
335    // 一直重试到成功
336    let lease_id = loop {
337        if sender.sender_count() == 1 {
338            // 直接卸载掉
339            let path = service.key.path().unwrap();
340            let _ = client.delete(path, None).await;
341            debug!("unregister ok: {:?}", service);
342            return;
343        }
344
345        match do_register(&service, &mut client).await {
346            Ok(r) if r != 0 => {
347                break r;
348            }
349            _ => {
350                tokio::time::sleep(Duration::from_secs(2)).await;
351            }
352        }
353    };
354    if lease_id != 0 {
355        // 状态修改
356        let _ = sender.send(ServiceStatus::Registered);
357        keep_alive(lease_id, service, client, sender);
358    }
359}