etcd_detector/
meta.rs

1use std::error::Error;
2use std::time::Duration;
3
4use detector::{Meta, MetaEvent, WatchRx, build_watch};
5
6use crate::util;
7
8pub struct DetectMeta {
9    meta: Meta,
10    client: etcd_client::Client,
11}
12
13impl DetectMeta {
14    pub async fn connect<E: AsRef<str>, S: AsRef<[E]>>(
15        meta: Meta,
16        endpoints: S,
17    ) -> Result<DetectMeta, etcd_client::Error> {
18        let options = Some(
19            etcd_client::ConnectOptions::new()
20                .with_keep_alive(Duration::from_secs(5), Duration::from_secs(10)),
21        );
22        let client = etcd_client::Client::connect(endpoints, options).await?;
23        Ok(DetectMeta::new(client, meta))
24    }
25
26    pub fn new(client: etcd_client::Client, meta: Meta) -> Self {
27        DetectMeta { meta, client }
28    }
29}
30
31impl detector::DetectMeta for DetectMeta {
32    /// 注册服务元数据, 不需要ttl
33    async fn register(&mut self) -> Result<(), Box<dyn Error + 'static + Send>> {
34        let key = self.meta.key.path();
35        let value = serde_json::to_string(&self.meta)
36            .map_err(|e| Box::new(e) as Box<dyn Error + 'static + Send>)?;
37        self.client
38            .put(key, value, None)
39            .await
40            .map(|_| ())
41            .map_err(|e| Box::new(e) as Box<dyn Error + 'static + Send>)
42    }
43
44    async fn fetch(&mut self) -> Result<Option<Meta>, Box<dyn Error + 'static + Send>> {
45        let resp = self
46            .client
47            .get(self.meta.key.path(), None)
48            .await
49            .map_err(|e| Box::new(e) as Box<dyn Error + 'static + Send>)?;
50        let metas = util::parse_meta_response(resp);
51        if metas.is_empty() {
52            Ok(None)
53        } else {
54            Ok(Some(metas[0].clone()))
55        }
56    }
57
58    async fn fetch_all(&mut self) -> Result<Vec<Meta>, Box<dyn Error + 'static + Send>> {
59        let resp = self
60            .client
61            .get(
62                self.meta.key.root_path(),
63                Some(etcd_client::GetOptions::new().with_prefix()),
64            )
65            .await
66            .map_err(|e| Box::new(e) as Box<dyn Error + 'static + Send>)?;
67        Ok(util::parse_meta_response(resp))
68    }
69
70    /// 监听元数据
71    async fn watch(&mut self) -> Result<WatchRx<MetaEvent>, Box<dyn Error + 'static + Send>> {
72        let watch = self
73            .client
74            .watch(self.meta.key.path(), None)
75            .await
76            .map_err(|e| Box::new(e) as Box<dyn Error + 'static + Send>)?;
77        let (tx, rx) = build_watch();
78
79        util::watch_meta(watch, tx);
80        Ok(rx)
81    }
82
83    async fn watch_all(&mut self) -> Result<WatchRx<MetaEvent>, Box<dyn Error + 'static + Send>> {
84        let watch = self
85            .client
86            .watch(
87                self.meta.key.root_path(),
88                Some(etcd_client::WatchOptions::new().with_prefix()),
89            )
90            .await
91            .map_err(|e| Box::new(e) as Box<dyn Error + 'static + Send>)?;
92        let (tx, rx) = build_watch();
93
94        util::watch_meta(watch, tx);
95        Ok(rx)
96    }
97}
98
99/// 别名
100pub type MetaDetector = DetectMeta;