etcd_detector/
meta.rs

1use std::error::Error;
2use std::time::Duration;
3
4use detector::{Meta, MetaEvent, WatchRx, build_watch};
5
6use crate::util;
7
8pub struct DetectMeta {
9    meta: Meta,
10    client: etcd_client::Client,
11}
12
13impl DetectMeta {
14    pub async fn connect<E: AsRef<str>, S: AsRef<[E]>>(
15        meta: Meta,
16        endpoints: S,
17    ) -> Result<DetectMeta, etcd_client::Error> {
18        let options = Some(
19            etcd_client::ConnectOptions::new()
20                .with_keep_alive(Duration::from_secs(5), Duration::from_secs(10)),
21        );
22        let client = etcd_client::Client::connect(endpoints, options).await?;
23        Ok(DetectMeta::new(client, meta))
24    }
25
26    pub fn new(client: etcd_client::Client, meta: Meta) -> Self {
27        DetectMeta { meta, client }
28    }
29}
30
31impl detector::DetectMeta for DetectMeta {
32    /// 注册服务元数据, 不需要ttl
33    async fn register(&mut self) -> Result<(), Box<dyn Error>> {
34        let key = self.meta.key.path();
35        let value = serde_json::to_string(&self.meta)?;
36        self.client
37            .put(key, value, None)
38            .await
39            .map(|_| ())
40            .map_err(|e| Box::new(e) as Box<dyn Error>)
41    }
42
43    async fn fetch(&mut self) -> Result<Option<Meta>, Box<dyn Error>> {
44        let resp = self.client.get(self.meta.key.path(), None).await?;
45        let metas = util::parse_meta_response(resp);
46        if metas.is_empty() {
47            Ok(None)
48        } else {
49            Ok(Some(metas[0].clone()))
50        }
51    }
52
53    async fn fetch_all(&mut self) -> Result<Vec<Meta>, Box<dyn Error>> {
54        let resp = self
55            .client
56            .get(
57                self.meta.key.root_path(),
58                Some(etcd_client::GetOptions::new().with_prefix()),
59            )
60            .await?;
61        Ok(util::parse_meta_response(resp))
62    }
63
64    /// 监听元数据
65    async fn watch(&mut self) -> Result<WatchRx<MetaEvent>, Box<dyn Error>> {
66        let watch = self.client.watch(self.meta.key.path(), None).await?;
67        let (tx, rx) = build_watch();
68
69        util::watch_meta(watch, tx);
70        Ok(rx)
71    }
72
73    async fn watch_all(&mut self) -> Result<WatchRx<MetaEvent>, Box<dyn Error>> {
74        let watch = self
75            .client
76            .watch(
77                self.meta.key.root_path(),
78                Some(etcd_client::WatchOptions::new().with_prefix()),
79            )
80            .await?;
81        let (tx, rx) = build_watch();
82
83        util::watch_meta(watch, tx);
84        Ok(rx)
85    }
86}
87
88/// 别名
89pub type MetaDetector = DetectMeta;