etcd_detector/
meta_detect.rs

1use crate::response::parse_meta_response;
2use crate::watch::Watcher;
3use detector::{Meta, MetaKey};
4use std::time::Duration;
5
6pub struct MetaDetector {
7    meta: Meta,
8    client: etcd_client::Client,
9}
10
11impl MetaDetector {
12    async fn real_fetch(&mut self) -> Result<etcd_client::GetResponse, etcd_client::Error> {
13        self.client.get(self.meta.key.path(), None).await
14    }
15
16    async fn real_fetch_all(&mut self) -> Result<etcd_client::GetResponse, etcd_client::Error> {
17        self.client
18            .get(
19                self.meta.key.root_path(),
20                Some(etcd_client::GetOptions::new().with_prefix()),
21            )
22            .await
23    }
24
25    async fn watch_inner<G>(
26        &mut self,
27        resp: etcd_client::GetResponse,
28        key: String,
29        watch_opts_fn: G,
30    ) -> Result<Watcher<Meta, MetaKey>, etcd_client::Error>
31    where
32        G: FnOnce(i64) -> etcd_client::WatchOptions,
33    {
34        // 2. 获取 revision
35        let revision = resp.header().unwrap().revision();
36
37        // 3. 创建 watch
38        let watch = self
39            .client
40            .watch(key, Some(watch_opts_fn(revision + 1)))
41            .await?;
42
43        Ok(Watcher::new(parse_meta_response(resp), watch.0, watch.1))
44    }
45}
46
47impl MetaDetector {
48    pub async fn connect<E: AsRef<str>, S: AsRef<[E]>>(
49        meta: Meta,
50        endpoints: S,
51    ) -> Result<Self, etcd_client::Error> {
52        let options = Some(
53            etcd_client::ConnectOptions::new()
54                .with_keep_alive(Duration::from_secs(5), Duration::from_secs(10)),
55        );
56        let client = etcd_client::Client::connect(endpoints, options).await?;
57        Ok(Self::new(client, meta))
58    }
59
60    pub fn new(client: etcd_client::Client, meta: Meta) -> Self {
61        Self { meta, client }
62    }
63}
64
65impl MetaDetector {
66    /// 注册服务元数据, 不需要ttl
67    pub async fn register(&mut self) -> Result<(), etcd_client::Error> {
68        let key = self.meta.key.path();
69        let value = serde_json::to_string(&self.meta).expect("Serialize Error");
70        self.client.put(key, value, None).await.map(|_| ())
71    }
72
73    pub async fn unregister(&mut self) -> Result<(), etcd_client::Error> {
74        let key = self.meta.key.path();
75        self.client.delete(key, None).await.map(|_| ())
76    }
77
78    pub async fn fetch(&mut self) -> Result<Option<Meta>, etcd_client::Error> {
79        let metas = parse_meta_response(self.real_fetch().await?);
80        if metas.is_empty() {
81            Ok(None)
82        } else {
83            Ok(Some(metas[0].clone()))
84        }
85    }
86
87    pub async fn fetch_all(&mut self) -> Result<Vec<Meta>, etcd_client::Error> {
88        Ok(parse_meta_response(self.real_fetch_all().await?))
89    }
90
91    /// 监听元数据
92    pub async fn watch(&mut self) -> Result<Watcher<Meta, MetaKey>, etcd_client::Error> {
93        let resp = self.real_fetch().await?;
94        self.watch_inner(resp, self.meta.key.path(), |rev| {
95            etcd_client::WatchOptions::new().with_start_revision(rev)
96        })
97        .await
98    }
99
100    /// 监听所有的元数据
101    pub async fn watch_all(&mut self) -> Result<Watcher<Meta, MetaKey>, etcd_client::Error> {
102        let resp = self.real_fetch_all().await?;
103        self.watch_inner(resp, self.meta.key.root_path(), |rev| {
104            etcd_client::WatchOptions::new()
105                .with_prefix()
106                .with_start_revision(rev)
107        })
108        .await
109    }
110}