etcd_detector/
detector.rs1use std::collections::HashMap;
2use std::error::Error;
3use std::sync::Arc;
4use std::sync::atomic::Ordering::Relaxed;
5use std::sync::atomic::{AtomicI64, AtomicU32};
6use std::time::Duration;
7
8use detector::{
9 OneselfRx, OneselfTx, Service, ServiceEvent, ServiceStatus, Services, WatchRx, build_oneself,
10 build_watch,
11};
12
13use crate::trace::trace;
14use crate::util;
15
16#[derive(Clone)]
17pub(crate) struct Inner {
18 pub(crate) service: Service,
19 pub(crate) client: etcd_client::Client,
20 pub(crate) status: Arc<AtomicU32>,
21 pub(crate) oneself_tx: OneselfTx,
22 pub(crate) lease_id: Arc<AtomicI64>,
23}
24
25impl Inner {
26 pub(crate) fn change_status(&self, status: ServiceStatus) {
27 trace!("{:?} status change:{:?}", self.service, status);
28 self.status.store(status as u32, Relaxed);
29 let _ = self.oneself_tx.send(status);
30 }
31
32 pub(crate) fn get_lease_id(&self) -> i64 {
33 self.lease_id.load(Relaxed)
34 }
35
36 pub(crate) fn set_lease_id(&self, id: i64) {
37 self.lease_id.store(id, Relaxed)
38 }
39}
40
41pub struct Detector {
42 inner: Option<Inner>,
43}
44
45impl Detector {
46 pub async fn connect<E: AsRef<str>, S: AsRef<[E]>>(
48 service: Service,
49 endpoints: S,
50 ) -> Result<Detector, etcd_client::Error> {
51 let options = Some(
52 etcd_client::ConnectOptions::new()
53 .with_keep_alive(Duration::from_secs(5), Duration::from_secs(10)),
54 );
55 let client = etcd_client::Client::connect(endpoints, options).await?;
56 Ok(Detector::new(client, service))
57 }
58
59 pub fn new(client: etcd_client::Client, service: Service) -> Self {
60 let (tx, _) = build_oneself(ServiceStatus::Unregistered);
61 Detector {
62 inner: Some(Inner {
63 service,
64 client,
65 status: Arc::new(AtomicU32::new(ServiceStatus::Unregistered as u32)),
66 oneself_tx: tx,
67 lease_id: Arc::new(AtomicI64::new(0)),
68 }),
69 }
70 }
71
72 fn inner_mut(&mut self) -> &mut Inner {
73 self.inner.as_mut().unwrap()
74 }
75
76 fn inner(&self) -> &Inner {
77 self.inner.as_ref().unwrap()
78 }
79}
80
81impl detector::Detector for Detector {
82 fn service(&self) -> &Service {
83 &self.inner().service
84 }
85
86 fn status(&self) -> ServiceStatus {
87 ServiceStatus::from(self.inner().status.load(Relaxed))
88 }
89
90 async fn register(&mut self) -> Result<(), Box<dyn Error>> {
92 let inner = self.inner_mut();
93
94 if inner.service.key.has_id() {
96 return Ok(());
97 }
98
99 match util::register(inner.clone()).await {
100 Err(e) => Err(e),
101 Ok(id) => {
102 inner.service.key.id = Some(id);
104 Ok(())
105 }
106 }
107 }
108
109 async fn fetch(&mut self) -> Result<Services, Box<dyn Error>> {
111 let inner = self.inner_mut();
112 let resp = inner
113 .client
114 .get(
115 inner.service.key.parent_path(),
116 Some(etcd_client::GetOptions::new().with_prefix()),
117 )
118 .await?;
119 let mut resp = util::parse_service_response(resp);
120 match resp.remove(&inner.service.key.name) {
121 None => Ok(Services::new()),
122 Some(s) => Ok(s),
123 }
124 }
125
126 async fn fetch_all(&mut self) -> Result<HashMap<String, Services>, Box<dyn Error>> {
128 let inner = self.inner_mut();
129 let resp = inner
130 .client
131 .get(
132 inner.service.key.root_path(),
133 Some(etcd_client::GetOptions::new().with_prefix()),
134 )
135 .await?;
136 Ok(util::parse_service_response(resp))
137 }
138
139 async fn watch(&mut self) -> Result<WatchRx<ServiceEvent>, Box<dyn Error>> {
141 let inner = self.inner_mut();
142 let watch = inner
143 .client
144 .watch(
145 inner.service.key.parent_path(),
146 Some(etcd_client::WatchOptions::new().with_prefix()),
147 )
148 .await?;
149
150 let (tx, rx) = build_watch();
151
152 util::watch_service(watch, tx);
153 Ok(rx)
154 }
155
156 async fn watch_all(&mut self) -> Result<WatchRx<ServiceEvent>, Box<dyn Error>> {
158 let inner = self.inner_mut();
159 let watch = inner
160 .client
161 .watch(
162 inner.service.key.root_path(),
163 Some(etcd_client::WatchOptions::new().with_prefix()),
164 )
165 .await?;
166
167 let (tx, rx) = build_watch();
168
169 util::watch_service(watch, tx);
170 Ok(rx)
171 }
172
173 fn oneself(&mut self) -> Option<OneselfRx> {
175 let inner = self.inner();
176 if inner.service.key.has_id() {
177 Some(inner.oneself_tx.oneself_rx())
178 } else {
179 None
180 }
181 }
182}
183
184impl Drop for Detector {
185 fn drop(&mut self) {
186 let inner = self.inner.take().unwrap();
187
188 let lease_id = inner.get_lease_id();
190 if lease_id != 0 {
191 let mut client = inner.client.clone();
192 drop(inner);
194 tokio::spawn(async move {
195 let _ = client.lease_revoke(lease_id).await;
196 trace!("lease revoke, lease id:{}", lease_id);
197 });
198 }
199 }
200}