etcd_detector/
detector.rs

1use std::collections::HashMap;
2use std::error::Error;
3use std::sync::Arc;
4use std::sync::atomic::Ordering::Relaxed;
5use std::sync::atomic::{AtomicI64, AtomicU32};
6use std::time::Duration;
7
8use crate::trace::debug;
9use crate::util;
10use detector::{
11    OneselfRx, OneselfTx, Service, ServiceEvent, ServiceStatus, Services, WatchRx, build_oneself,
12    build_watch,
13};
14
15#[derive(Clone)]
16pub(crate) struct Inner {
17    pub(crate) service: Service,
18    pub(crate) client: etcd_client::Client,
19    pub(crate) status: Arc<AtomicU32>,
20    pub(crate) oneself_tx: OneselfTx,
21    pub(crate) lease_id: Arc<AtomicI64>,
22}
23
24impl Inner {
25    pub(crate) fn change_status(&self, status: ServiceStatus) {
26        debug!("{:?} status change:{:?}", self.service, status);
27        self.status.store(status as u32, Relaxed);
28        let _ = self.oneself_tx.send(status);
29    }
30
31    pub(crate) fn get_lease_id(&self) -> i64 {
32        self.lease_id.load(Relaxed)
33    }
34
35    pub(crate) fn set_lease_id(&self, id: i64) {
36        self.lease_id.store(id, Relaxed)
37    }
38}
39
40pub struct Detector {
41    inner: Option<Inner>,
42}
43
44impl Detector {
45    /// Connect to etcd servers from given endpoints.
46    pub async fn connect<E: AsRef<str>, S: AsRef<[E]>>(
47        service: Service,
48        endpoints: S,
49    ) -> Result<Detector, etcd_client::Error> {
50        let options = Some(
51            etcd_client::ConnectOptions::new()
52                .with_keep_alive(Duration::from_secs(5), Duration::from_secs(10)),
53        );
54        let client = etcd_client::Client::connect(endpoints, options).await?;
55        Ok(Detector::new(client, service))
56    }
57
58    pub fn new(client: etcd_client::Client, service: Service) -> Self {
59        let (tx, _) = build_oneself(ServiceStatus::Unregistered);
60        Detector {
61            inner: Some(Inner {
62                service,
63                client,
64                status: Arc::new(AtomicU32::new(ServiceStatus::Unregistered as u32)),
65                oneself_tx: tx,
66                lease_id: Arc::new(AtomicI64::new(0)),
67            }),
68        }
69    }
70
71    fn inner_mut(&mut self) -> &mut Inner {
72        self.inner.as_mut().unwrap()
73    }
74
75    fn inner(&self) -> &Inner {
76        self.inner.as_ref().unwrap()
77    }
78
79    async fn real_fetch(
80        &mut self,
81    ) -> Result<etcd_client::GetResponse, Box<dyn Error + 'static + Send>> {
82        let inner = self.inner_mut();
83        let resp = inner
84            .client
85            .get(
86                inner.service.key.parent_path(),
87                Some(etcd_client::GetOptions::new().with_prefix()),
88            )
89            .await
90            .map_err(|e| Box::new(e) as Box<dyn Error + 'static + Send>);
91        resp
92    }
93
94    async fn real_fetch_all(
95        &mut self,
96    ) -> Result<etcd_client::GetResponse, Box<dyn Error + 'static + Send>> {
97        let inner = self.inner_mut();
98        let resp = inner
99            .client
100            .get(
101                inner.service.key.root_path(),
102                Some(etcd_client::GetOptions::new().with_prefix()),
103            )
104            .await
105            .map_err(|e| Box::new(e) as Box<dyn Error + 'static + Send>);
106        resp
107    }
108
109    async fn watch_inner(
110        &mut self,
111        resp: etcd_client::GetResponse,
112        key: String,
113    ) -> Result<WatchRx<ServiceEvent>, Box<dyn Error + 'static + Send>> {
114        // 2. 获取 revision
115        let revision = resp.header().unwrap().revision();
116
117        // 3. 创建 watch
118        let inner = self.inner_mut();
119        let watch = inner
120            .client
121            .watch(
122                key,
123                Some(
124                    etcd_client::WatchOptions::new()
125                        .with_prefix()
126                        .with_start_revision(revision + 1),
127                ),
128            )
129            .await
130            .map_err(|e| Box::new(e) as Box<dyn Error + 'static + Send>)?;
131
132        // 4. 建通道
133        let (tx, rx) = build_watch();
134
135        // 5. 把当前数据推送给 rx
136        for m in util::parse_service_response(resp) {
137            for s in m.1.services {
138                let _ = tx.send(ServiceEvent::Changed(s));
139            }
140        }
141
142        // 6. 异步监听 watch
143        util::watch_service(watch, tx);
144
145        Ok(rx)
146    }
147}
148
149impl detector::Detector for Detector {
150    fn service(&self) -> &Service {
151        &self.inner().service
152    }
153
154    fn status(&self) -> ServiceStatus {
155        ServiceStatus::from(self.inner().status.load(Relaxed))
156    }
157
158    /// 第一次注册如果失败则返回,如果成功也返回但内部会有掉线重试机制
159    async fn register(&mut self) -> Result<(), Box<dyn Error + 'static + Send>> {
160        let inner = self.inner_mut();
161
162        // 已经执行过注册了
163        if inner.service.key.has_id() {
164            return Ok(());
165        }
166
167        match util::register(inner.clone()).await {
168            Err(e) => Err(e),
169            Ok(id) => {
170                // 换上id
171                inner.service.key.id = Some(id);
172                Ok(())
173            }
174        }
175    }
176
177    /// 获取本类型服所有实例
178    async fn fetch(&mut self) -> Result<Services, Box<dyn Error + 'static + Send>> {
179        let name = &self.inner().service.key.name.clone();
180        let mut resp = util::parse_service_response(self.real_fetch().await?);
181        match resp.remove(name) {
182            None => Ok(Services::new()),
183            Some(s) => Ok(s),
184        }
185    }
186
187    /// 获取所有注册的服务实例
188    async fn fetch_all(
189        &mut self,
190    ) -> Result<HashMap<String, Services>, Box<dyn Error + 'static + Send>> {
191        Ok(util::parse_service_response(self.real_fetch_all().await?))
192    }
193
194    /// 监听本类型服务实例
195    async fn watch(&mut self) -> Result<WatchRx<ServiceEvent>, Box<dyn Error + 'static + Send>> {
196        let resp = self.real_fetch().await?;
197        let parent_path = self.inner().service.key.parent_path();
198        self.watch_inner(resp, parent_path).await
199    }
200
201    /// 监听所有注册的服务实例
202    async fn watch_all(
203        &mut self,
204    ) -> Result<WatchRx<ServiceEvent>, Box<dyn Error + 'static + Send>> {
205        let resp = self.real_fetch_all().await?;
206        let root_path = self.inner().service.key.root_path();
207        self.watch_inner(resp, root_path).await
208    }
209
210    /// 监听服务自己本身状态, 需要在第一次调用register成功后才能使用它
211    fn oneself(&mut self) -> Option<OneselfRx> {
212        let inner = self.inner();
213        if inner.service.key.has_id() {
214            Some(inner.oneself_tx.oneself_rx())
215        } else {
216            None
217        }
218    }
219}
220
221impl Drop for Detector {
222    fn drop(&mut self) {
223        let inner = self.inner.take().unwrap();
224
225        // 取消掉旧租约, 让注册的服务过段时间自己消亡掉
226        let lease_id = inner.get_lease_id();
227        if lease_id != 0 {
228            let mut client = inner.client.clone();
229            // 先释放掉inner再执行租约取消
230            drop(inner);
231            tokio::spawn(async move {
232                let _ = client.lease_revoke(lease_id).await;
233                debug!("lease revoke, lease id:{}", lease_id);
234            });
235        }
236    }
237}