etcd_detector/
detector.rs1use std::collections::HashMap;
2use std::error::Error;
3use std::sync::Arc;
4use std::sync::atomic::Ordering::Relaxed;
5use std::sync::atomic::{AtomicI64, AtomicU32};
6use std::time::Duration;
7
8use crate::trace::debug;
9use crate::util;
10use detector::{
11 OneselfRx, OneselfTx, Service, ServiceEvent, ServiceStatus, Services, WatchRx, build_oneself,
12 build_watch,
13};
14
15#[derive(Clone)]
16pub(crate) struct Inner {
17 pub(crate) service: Service,
18 pub(crate) client: etcd_client::Client,
19 pub(crate) status: Arc<AtomicU32>,
20 pub(crate) oneself_tx: OneselfTx,
21 pub(crate) lease_id: Arc<AtomicI64>,
22}
23
24impl Inner {
25 pub(crate) fn change_status(&self, status: ServiceStatus) {
26 debug!("{:?} status change:{:?}", self.service, status);
27 self.status.store(status as u32, Relaxed);
28 let _ = self.oneself_tx.send(status);
29 }
30
31 pub(crate) fn get_lease_id(&self) -> i64 {
32 self.lease_id.load(Relaxed)
33 }
34
35 pub(crate) fn set_lease_id(&self, id: i64) {
36 self.lease_id.store(id, Relaxed)
37 }
38}
39
40pub struct Detector {
41 inner: Option<Inner>,
42}
43
44impl Detector {
45 pub async fn connect<E: AsRef<str>, S: AsRef<[E]>>(
47 service: Service,
48 endpoints: S,
49 ) -> Result<Detector, etcd_client::Error> {
50 let options = Some(
51 etcd_client::ConnectOptions::new()
52 .with_keep_alive(Duration::from_secs(5), Duration::from_secs(10)),
53 );
54 let client = etcd_client::Client::connect(endpoints, options).await?;
55 Ok(Detector::new(client, service))
56 }
57
58 pub fn new(client: etcd_client::Client, service: Service) -> Self {
59 let (tx, _) = build_oneself(ServiceStatus::Unregistered);
60 Detector {
61 inner: Some(Inner {
62 service,
63 client,
64 status: Arc::new(AtomicU32::new(ServiceStatus::Unregistered as u32)),
65 oneself_tx: tx,
66 lease_id: Arc::new(AtomicI64::new(0)),
67 }),
68 }
69 }
70
71 fn inner_mut(&mut self) -> &mut Inner {
72 self.inner.as_mut().unwrap()
73 }
74
75 fn inner(&self) -> &Inner {
76 self.inner.as_ref().unwrap()
77 }
78
79 async fn real_fetch(
80 &mut self,
81 ) -> Result<etcd_client::GetResponse, Box<dyn Error + 'static + Send>> {
82 let inner = self.inner_mut();
83 let resp = inner
84 .client
85 .get(
86 inner.service.key.parent_path(),
87 Some(etcd_client::GetOptions::new().with_prefix()),
88 )
89 .await
90 .map_err(|e| Box::new(e) as Box<dyn Error + 'static + Send>);
91 resp
92 }
93
94 async fn real_fetch_all(
95 &mut self,
96 ) -> Result<etcd_client::GetResponse, Box<dyn Error + 'static + Send>> {
97 let inner = self.inner_mut();
98 let resp = inner
99 .client
100 .get(
101 inner.service.key.root_path(),
102 Some(etcd_client::GetOptions::new().with_prefix()),
103 )
104 .await
105 .map_err(|e| Box::new(e) as Box<dyn Error + 'static + Send>);
106 resp
107 }
108
109 async fn watch_inner(
110 &mut self,
111 resp: etcd_client::GetResponse,
112 key: String,
113 ) -> Result<WatchRx<ServiceEvent>, Box<dyn Error + 'static + Send>> {
114 let revision = resp.header().unwrap().revision();
116
117 let inner = self.inner_mut();
119 let watch = inner
120 .client
121 .watch(
122 key,
123 Some(
124 etcd_client::WatchOptions::new()
125 .with_prefix()
126 .with_start_revision(revision + 1),
127 ),
128 )
129 .await
130 .map_err(|e| Box::new(e) as Box<dyn Error + 'static + Send>)?;
131
132 let (tx, rx) = build_watch();
134
135 for m in util::parse_service_response(resp) {
137 for s in m.1.services {
138 let _ = tx.send(ServiceEvent::Changed(s));
139 }
140 }
141
142 util::watch_service(watch, tx);
144
145 Ok(rx)
146 }
147}
148
149impl detector::Detector for Detector {
150 fn service(&self) -> &Service {
151 &self.inner().service
152 }
153
154 fn status(&self) -> ServiceStatus {
155 ServiceStatus::from(self.inner().status.load(Relaxed))
156 }
157
158 async fn register(&mut self) -> Result<(), Box<dyn Error + 'static + Send>> {
160 let inner = self.inner_mut();
161
162 if inner.service.key.has_id() {
164 return Ok(());
165 }
166
167 match util::register(inner.clone()).await {
168 Err(e) => Err(e),
169 Ok(id) => {
170 inner.service.key.id = Some(id);
172 Ok(())
173 }
174 }
175 }
176
177 async fn fetch(&mut self) -> Result<Services, Box<dyn Error + 'static + Send>> {
179 let name = &self.inner().service.key.name.clone();
180 let mut resp = util::parse_service_response(self.real_fetch().await?);
181 match resp.remove(name) {
182 None => Ok(Services::new()),
183 Some(s) => Ok(s),
184 }
185 }
186
187 async fn fetch_all(
189 &mut self,
190 ) -> Result<HashMap<String, Services>, Box<dyn Error + 'static + Send>> {
191 Ok(util::parse_service_response(self.real_fetch_all().await?))
192 }
193
194 async fn watch(&mut self) -> Result<WatchRx<ServiceEvent>, Box<dyn Error + 'static + Send>> {
196 let resp = self.real_fetch().await?;
197 let parent_path = self.inner().service.key.parent_path();
198 self.watch_inner(resp, parent_path).await
199 }
200
201 async fn watch_all(
203 &mut self,
204 ) -> Result<WatchRx<ServiceEvent>, Box<dyn Error + 'static + Send>> {
205 let resp = self.real_fetch_all().await?;
206 let root_path = self.inner().service.key.root_path();
207 self.watch_inner(resp, root_path).await
208 }
209
210 fn oneself(&mut self) -> Option<OneselfRx> {
212 let inner = self.inner();
213 if inner.service.key.has_id() {
214 Some(inner.oneself_tx.oneself_rx())
215 } else {
216 None
217 }
218 }
219}
220
221impl Drop for Detector {
222 fn drop(&mut self) {
223 let inner = self.inner.take().unwrap();
224
225 let lease_id = inner.get_lease_id();
227 if lease_id != 0 {
228 let mut client = inner.client.clone();
229 drop(inner);
231 tokio::spawn(async move {
232 let _ = client.lease_revoke(lease_id).await;
233 debug!("lease revoke, lease id:{}", lease_id);
234 });
235 }
236 }
237}