etcd_detector/
detector.rs

1use std::collections::HashMap;
2use std::error::Error;
3use std::sync::Arc;
4use std::sync::atomic::Ordering::Relaxed;
5use std::sync::atomic::{AtomicI64, AtomicU32};
6use std::time::Duration;
7
8use detector::{
9    OneselfRx, OneselfTx, Service, ServiceEvent, ServiceStatus, Services, WatchRx, build_oneself,
10    build_watch,
11};
12
13use crate::trace::trace;
14use crate::util;
15
16#[derive(Clone)]
17pub(crate) struct Inner {
18    pub(crate) service: Service,
19    pub(crate) client: etcd_client::Client,
20    pub(crate) status: Arc<AtomicU32>,
21    pub(crate) oneself_tx: OneselfTx,
22    pub(crate) lease_id: Arc<AtomicI64>,
23}
24
25impl Inner {
26    pub(crate) fn change_status(&self, status: ServiceStatus) {
27        trace!("{:?} status change:{:?}", self.service, status);
28        self.status.store(status as u32, Relaxed);
29        let _ = self.oneself_tx.send(status);
30    }
31
32    pub(crate) fn get_lease_id(&self) -> i64 {
33        self.lease_id.load(Relaxed)
34    }
35
36    pub(crate) fn set_lease_id(&self, id: i64) {
37        self.lease_id.store(id, Relaxed)
38    }
39}
40
41pub struct Detector {
42    inner: Option<Inner>,
43}
44
45impl Detector {
46    /// Connect to etcd servers from given endpoints.
47    pub async fn connect<E: AsRef<str>, S: AsRef<[E]>>(
48        service: Service,
49        endpoints: S,
50    ) -> Result<Detector, etcd_client::Error> {
51        let options = Some(
52            etcd_client::ConnectOptions::new()
53                .with_keep_alive(Duration::from_secs(5), Duration::from_secs(10)),
54        );
55        let client = etcd_client::Client::connect(endpoints, options).await?;
56        Ok(Detector::new(client, service))
57    }
58
59    pub fn new(client: etcd_client::Client, service: Service) -> Self {
60        let (tx, _) = build_oneself(ServiceStatus::Unregistered);
61        Detector {
62            inner: Some(Inner {
63                service,
64                client,
65                status: Arc::new(AtomicU32::new(ServiceStatus::Unregistered as u32)),
66                oneself_tx: tx,
67                lease_id: Arc::new(AtomicI64::new(0)),
68            }),
69        }
70    }
71
72    fn inner_mut(&mut self) -> &mut Inner {
73        self.inner.as_mut().unwrap()
74    }
75
76    fn inner(&self) -> &Inner {
77        self.inner.as_ref().unwrap()
78    }
79}
80
81impl detector::Detector for Detector {
82    fn service(&self) -> &Service {
83        &self.inner().service
84    }
85
86    fn status(&self) -> ServiceStatus {
87        ServiceStatus::from(self.inner().status.load(Relaxed))
88    }
89
90    /// 第一次注册如果失败则返回,如果成功也返回但内部会有掉线重试机制
91    async fn register(&mut self) -> Result<(), Box<dyn Error>> {
92        let inner = self.inner_mut();
93
94        // 已经执行过注册了
95        if inner.service.key.has_id() {
96            return Ok(());
97        }
98
99        match util::register(inner.clone()).await {
100            Err(e) => Err(e),
101            Ok(id) => {
102                // 换上id
103                inner.service.key.id = Some(id);
104                Ok(())
105            }
106        }
107    }
108
109    /// 获取本类型服所有实例
110    async fn fetch(&mut self) -> Result<Services, Box<dyn Error>> {
111        let inner = self.inner_mut();
112        let resp = inner
113            .client
114            .get(
115                inner.service.key.parent_path(),
116                Some(etcd_client::GetOptions::new().with_prefix()),
117            )
118            .await?;
119        let mut resp = util::parse_service_response(resp);
120        match resp.remove(&inner.service.key.name) {
121            None => Ok(Services::new()),
122            Some(s) => Ok(s),
123        }
124    }
125
126    /// 获取所有注册的服务实例
127    async fn fetch_all(&mut self) -> Result<HashMap<String, Services>, Box<dyn Error>> {
128        let inner = self.inner_mut();
129        let resp = inner
130            .client
131            .get(
132                inner.service.key.root_path(),
133                Some(etcd_client::GetOptions::new().with_prefix()),
134            )
135            .await?;
136        Ok(util::parse_service_response(resp))
137    }
138
139    /// 监听本类型服务实例
140    async fn watch(&mut self) -> Result<WatchRx<ServiceEvent>, Box<dyn Error>> {
141        let inner = self.inner_mut();
142        let watch = inner
143            .client
144            .watch(
145                inner.service.key.parent_path(),
146                Some(etcd_client::WatchOptions::new().with_prefix()),
147            )
148            .await?;
149
150        let (tx, rx) = build_watch();
151
152        util::watch_service(watch, tx);
153        Ok(rx)
154    }
155
156    /// 监听所有注册的服务实例
157    async fn watch_all(&mut self) -> Result<WatchRx<ServiceEvent>, Box<dyn Error>> {
158        let inner = self.inner_mut();
159        let watch = inner
160            .client
161            .watch(
162                inner.service.key.root_path(),
163                Some(etcd_client::WatchOptions::new().with_prefix()),
164            )
165            .await?;
166
167        let (tx, rx) = build_watch();
168
169        util::watch_service(watch, tx);
170        Ok(rx)
171    }
172
173    /// 监听服务自己本身状态, 需要在第一次调用register成功后才能使用它
174    fn oneself(&mut self) -> Option<OneselfRx> {
175        let inner = self.inner();
176        if inner.service.key.has_id() {
177            Some(inner.oneself_tx.oneself_rx())
178        } else {
179            None
180        }
181    }
182}
183
184impl Drop for Detector {
185    fn drop(&mut self) {
186        let inner = self.inner.take().unwrap();
187
188        // 取消掉旧租约, 让注册的服务过段时间自己消亡掉
189        let lease_id = inner.get_lease_id();
190        if lease_id != 0 {
191            let mut client = inner.client.clone();
192            // 先释放掉inner再执行租约取消
193            drop(inner);
194            tokio::spawn(async move {
195                let _ = client.lease_revoke(lease_id).await;
196                trace!("lease revoke, lease id:{}", lease_id);
197            });
198        }
199    }
200}