etcd_detector/
meta_detect.rs1use crate::response::parse_meta_response;
2use crate::watch::Watcher;
3use detector::{Meta, MetaKey};
4use std::time::Duration;
5
6pub struct MetaDetector {
7 meta: Meta,
8 client: etcd_client::Client,
9}
10
11impl MetaDetector {
12 async fn real_fetch(&mut self) -> Result<etcd_client::GetResponse, etcd_client::Error> {
13 self.client.get(self.meta.key.path(), None).await
14 }
15
16 async fn real_fetch_all(&mut self) -> Result<etcd_client::GetResponse, etcd_client::Error> {
17 self.client
18 .get(
19 self.meta.key.root_path(),
20 Some(etcd_client::GetOptions::new().with_prefix()),
21 )
22 .await
23 }
24
25 async fn watch_inner<G>(
26 &mut self,
27 resp: etcd_client::GetResponse,
28 key: String,
29 watch_opts_fn: G,
30 ) -> Result<Watcher<Meta, MetaKey>, etcd_client::Error>
31 where
32 G: FnOnce(i64) -> etcd_client::WatchOptions,
33 {
34 let revision = resp.header().unwrap().revision();
36
37 let watch = self
39 .client
40 .watch(key, Some(watch_opts_fn(revision + 1)))
41 .await?;
42
43 Ok(Watcher::new(parse_meta_response(resp), watch.0, watch.1))
44 }
45}
46
47impl MetaDetector {
48 pub async fn connect<E: AsRef<str>, S: AsRef<[E]>>(
49 meta: Meta,
50 endpoints: S,
51 ) -> Result<Self, etcd_client::Error> {
52 let options = Some(
53 etcd_client::ConnectOptions::new()
54 .with_keep_alive(Duration::from_secs(5), Duration::from_secs(10)),
55 );
56 let client = etcd_client::Client::connect(endpoints, options).await?;
57 Ok(Self::new(client, meta))
58 }
59
60 pub fn new(client: etcd_client::Client, meta: Meta) -> Self {
61 Self { meta, client }
62 }
63}
64
65impl MetaDetector {
66 pub async fn register(&mut self) -> Result<(), etcd_client::Error> {
68 let key = self.meta.key.path();
69 let value = serde_json::to_string(&self.meta).expect("Serialize Error");
70 self.client.put(key, value, None).await.map(|_| ())
71 }
72
73 pub async fn unregister(&mut self) -> Result<(), etcd_client::Error> {
74 let key = self.meta.key.path();
75 self.client.delete(key, None).await.map(|_| ())
76 }
77
78 pub async fn fetch(&mut self) -> Result<Option<Meta>, etcd_client::Error> {
79 let metas = parse_meta_response(self.real_fetch().await?);
80 if metas.is_empty() {
81 Ok(None)
82 } else {
83 Ok(Some(metas[0].clone()))
84 }
85 }
86
87 pub async fn fetch_all(&mut self) -> Result<Vec<Meta>, etcd_client::Error> {
88 Ok(parse_meta_response(self.real_fetch_all().await?))
89 }
90
91 pub async fn watch(&mut self) -> Result<Watcher<Meta, MetaKey>, etcd_client::Error> {
93 let resp = self.real_fetch().await?;
94 self.watch_inner(resp, self.meta.key.path(), |rev| {
95 etcd_client::WatchOptions::new().with_start_revision(rev)
96 })
97 .await
98 }
99
100 pub async fn watch_all(&mut self) -> Result<Watcher<Meta, MetaKey>, etcd_client::Error> {
102 let resp = self.real_fetch_all().await?;
103 self.watch_inner(resp, self.meta.key.root_path(), |rev| {
104 etcd_client::WatchOptions::new()
105 .with_prefix()
106 .with_start_revision(rev)
107 })
108 .await
109 }
110}