1use std::error::Error;
2use std::time::Duration;
3
4use detector::{Meta, MetaEvent, WatchRx, build_watch};
5
6use crate::util;
7
8pub struct DetectMeta {
9 meta: Meta,
10 client: etcd_client::Client,
11}
12
13impl DetectMeta {
14 pub async fn connect<E: AsRef<str>, S: AsRef<[E]>>(
15 meta: Meta,
16 endpoints: S,
17 ) -> Result<DetectMeta, etcd_client::Error> {
18 let options = Some(
19 etcd_client::ConnectOptions::new()
20 .with_keep_alive(Duration::from_secs(5), Duration::from_secs(10)),
21 );
22 let client = etcd_client::Client::connect(endpoints, options).await?;
23 Ok(DetectMeta::new(client, meta))
24 }
25
26 pub fn new(client: etcd_client::Client, meta: Meta) -> Self {
27 DetectMeta { meta, client }
28 }
29}
30
31impl detector::DetectMeta for DetectMeta {
32 async fn register(&mut self) -> Result<(), Box<dyn Error>> {
34 let key = self.meta.key.path();
35 let value = serde_json::to_string(&self.meta)?;
36 self.client
37 .put(key, value, None)
38 .await
39 .map(|_| ())
40 .map_err(|e| Box::new(e) as Box<dyn Error>)
41 }
42
43 async fn fetch(&mut self) -> Result<Option<Meta>, Box<dyn Error>> {
44 let resp = self.client.get(self.meta.key.path(), None).await?;
45 let metas = util::parse_meta_response(resp);
46 if metas.is_empty() {
47 Ok(None)
48 } else {
49 Ok(Some(metas[0].clone()))
50 }
51 }
52
53 async fn fetch_all(&mut self) -> Result<Vec<Meta>, Box<dyn Error>> {
54 let resp = self
55 .client
56 .get(
57 self.meta.key.root_path(),
58 Some(etcd_client::GetOptions::new().with_prefix()),
59 )
60 .await?;
61 Ok(util::parse_meta_response(resp))
62 }
63
64 async fn watch(&mut self) -> Result<WatchRx<MetaEvent>, Box<dyn Error>> {
66 let watch = self.client.watch(self.meta.key.path(), None).await?;
67 let (tx, rx) = build_watch();
68
69 util::watch_meta(watch, tx);
70 Ok(rx)
71 }
72
73 async fn watch_all(&mut self) -> Result<WatchRx<MetaEvent>, Box<dyn Error>> {
74 let watch = self
75 .client
76 .watch(
77 self.meta.key.root_path(),
78 Some(etcd_client::WatchOptions::new().with_prefix()),
79 )
80 .await?;
81 let (tx, rx) = build_watch();
82
83 util::watch_meta(watch, tx);
84 Ok(rx)
85 }
86}
87
88pub type MetaDetector = DetectMeta;