etcd_detector/
meta.rs

1use crate::util;
2use detector::{Meta, MetaEvent, WatchRx, build_watch};
3use std::error::Error;
4use std::time::Duration;
5
6pub struct DetectMeta {
7    meta: Meta,
8    client: etcd_client::Client,
9}
10
11impl DetectMeta {
12    pub async fn connect<E: AsRef<str>, S: AsRef<[E]>>(
13        meta: Meta,
14        endpoints: S,
15    ) -> Result<DetectMeta, etcd_client::Error> {
16        let options = Some(
17            etcd_client::ConnectOptions::new()
18                .with_keep_alive(Duration::from_secs(5), Duration::from_secs(10)),
19        );
20        let client = etcd_client::Client::connect(endpoints, options).await?;
21        Ok(DetectMeta::new(client, meta))
22    }
23
24    pub fn new(client: etcd_client::Client, meta: Meta) -> Self {
25        DetectMeta { meta, client }
26    }
27
28    async fn real_fetch(
29        &mut self,
30    ) -> Result<etcd_client::GetResponse, Box<dyn Error + 'static + Send>> {
31        let resp = self
32            .client
33            .get(self.meta.key.path(), None)
34            .await
35            .map_err(|e| Box::new(e) as Box<dyn Error + 'static + Send>);
36
37        resp
38    }
39
40    async fn real_fetch_all(
41        &mut self,
42    ) -> Result<etcd_client::GetResponse, Box<dyn Error + 'static + Send>> {
43        let resp = self
44            .client
45            .get(
46                self.meta.key.root_path(),
47                Some(etcd_client::GetOptions::new().with_prefix()),
48            )
49            .await
50            .map_err(|e| Box::new(e) as Box<dyn Error + 'static + Send>);
51
52        resp
53    }
54
55    async fn watch_inner<G>(
56        &mut self,
57        resp: etcd_client::GetResponse,
58        key: String,
59        watch_opts_fn: G,
60    ) -> Result<WatchRx<MetaEvent>, Box<dyn Error + 'static + Send>>
61    where
62        G: FnOnce(i64) -> etcd_client::WatchOptions,
63    {
64        // 2. 获取 revision
65        let revision = resp.header().unwrap().revision();
66
67        // 3. 创建 watch
68        let watch = self
69            .client
70            .watch(key, Some(watch_opts_fn(revision + 1)))
71            .await
72            .map_err(|e| Box::new(e) as Box<dyn Error + 'static + Send>)?;
73
74        // 4. 建通道
75        let (tx, rx) = build_watch();
76
77        // 5. 把当前数据推送给 rx
78        for m in util::parse_meta_response(resp) {
79            let _ = tx.send(MetaEvent::Changed(m));
80        }
81
82        // 6. 异步监听 watch
83        util::watch_meta(watch, tx);
84
85        Ok(rx)
86    }
87}
88
89impl detector::DetectMeta for DetectMeta {
90    /// 注册服务元数据, 不需要ttl
91    async fn register(&mut self) -> Result<(), Box<dyn Error + 'static + Send>> {
92        let key = self.meta.key.path();
93        let value = serde_json::to_string(&self.meta)
94            .map_err(|e| Box::new(e) as Box<dyn Error + 'static + Send>)?;
95        self.client
96            .put(key, value, None)
97            .await
98            .map(|_| ())
99            .map_err(|e| Box::new(e) as Box<dyn Error + 'static + Send>)
100    }
101
102    async fn fetch(&mut self) -> Result<Option<Meta>, Box<dyn Error + 'static + Send>> {
103        let metas = util::parse_meta_response(self.real_fetch().await?);
104        if metas.is_empty() {
105            Ok(None)
106        } else {
107            Ok(Some(metas[0].clone()))
108        }
109    }
110
111    async fn fetch_all(&mut self) -> Result<Vec<Meta>, Box<dyn Error + 'static + Send>> {
112        Ok(util::parse_meta_response(self.real_fetch_all().await?))
113    }
114
115    /// 监听元数据
116    async fn watch(&mut self) -> Result<WatchRx<MetaEvent>, Box<dyn Error + 'static + Send>> {
117        let resp = self.real_fetch().await?;
118        self.watch_inner(resp, self.meta.key.path(), |rev| {
119            etcd_client::WatchOptions::new().with_start_revision(rev)
120        })
121        .await
122    }
123
124    /// 监听所有的元数据
125    async fn watch_all(&mut self) -> Result<WatchRx<MetaEvent>, Box<dyn Error + 'static + Send>> {
126        let resp = self.real_fetch_all().await?;
127        self.watch_inner(resp, self.meta.key.root_path(), |rev| {
128            etcd_client::WatchOptions::new()
129                .with_prefix()
130                .with_start_revision(rev)
131        })
132        .await
133    }
134}
135
136/// 别名
137pub type MetaDetector = DetectMeta;