1use std::error::Error;
2use std::time::Duration;
3
4use detector::{Meta, MetaEvent, WatchRx, build_watch};
5
6use crate::util;
7
8pub struct DetectMeta {
9 meta: Meta,
10 client: etcd_client::Client,
11}
12
13impl DetectMeta {
14 pub async fn connect<E: AsRef<str>, S: AsRef<[E]>>(
15 meta: Meta,
16 endpoints: S,
17 ) -> Result<DetectMeta, etcd_client::Error> {
18 let options = Some(
19 etcd_client::ConnectOptions::new()
20 .with_keep_alive(Duration::from_secs(5), Duration::from_secs(10)),
21 );
22 let client = etcd_client::Client::connect(endpoints, options).await?;
23 Ok(DetectMeta::new(client, meta))
24 }
25
26 pub fn new(client: etcd_client::Client, meta: Meta) -> Self {
27 DetectMeta { meta, client }
28 }
29}
30
31impl detector::DetectMeta for DetectMeta {
32 async fn register(&mut self) -> Result<(), Box<dyn Error + 'static + Send>> {
34 let key = self.meta.key.path();
35 let value = serde_json::to_string(&self.meta)
36 .map_err(|e| Box::new(e) as Box<dyn Error + 'static + Send>)?;
37 self.client
38 .put(key, value, None)
39 .await
40 .map(|_| ())
41 .map_err(|e| Box::new(e) as Box<dyn Error + 'static + Send>)
42 }
43
44 async fn fetch(&mut self) -> Result<Option<Meta>, Box<dyn Error + 'static + Send>> {
45 let resp = self
46 .client
47 .get(self.meta.key.path(), None)
48 .await
49 .map_err(|e| Box::new(e) as Box<dyn Error + 'static + Send>)?;
50 let metas = util::parse_meta_response(resp);
51 if metas.is_empty() {
52 Ok(None)
53 } else {
54 Ok(Some(metas[0].clone()))
55 }
56 }
57
58 async fn fetch_all(&mut self) -> Result<Vec<Meta>, Box<dyn Error + 'static + Send>> {
59 let resp = self
60 .client
61 .get(
62 self.meta.key.root_path(),
63 Some(etcd_client::GetOptions::new().with_prefix()),
64 )
65 .await
66 .map_err(|e| Box::new(e) as Box<dyn Error + 'static + Send>)?;
67 Ok(util::parse_meta_response(resp))
68 }
69
70 async fn watch(&mut self) -> Result<WatchRx<MetaEvent>, Box<dyn Error + 'static + Send>> {
72 let watch = self
73 .client
74 .watch(self.meta.key.path(), None)
75 .await
76 .map_err(|e| Box::new(e) as Box<dyn Error + 'static + Send>)?;
77 let (tx, rx) = build_watch();
78
79 util::watch_meta(watch, tx);
80 Ok(rx)
81 }
82
83 async fn watch_all(&mut self) -> Result<WatchRx<MetaEvent>, Box<dyn Error + 'static + Send>> {
84 let watch = self
85 .client
86 .watch(
87 self.meta.key.root_path(),
88 Some(etcd_client::WatchOptions::new().with_prefix()),
89 )
90 .await
91 .map_err(|e| Box::new(e) as Box<dyn Error + 'static + Send>)?;
92 let (tx, rx) = build_watch();
93
94 util::watch_meta(watch, tx);
95 Ok(rx)
96 }
97}
98
99pub type MetaDetector = DetectMeta;