1use crate::util;
2use detector::{Meta, MetaEvent, WatchRx, build_watch};
3use std::error::Error;
4use std::time::Duration;
5
6pub struct DetectMeta {
7 meta: Meta,
8 client: etcd_client::Client,
9}
10
11impl DetectMeta {
12 pub async fn connect<E: AsRef<str>, S: AsRef<[E]>>(
13 meta: Meta,
14 endpoints: S,
15 ) -> Result<DetectMeta, etcd_client::Error> {
16 let options = Some(
17 etcd_client::ConnectOptions::new()
18 .with_keep_alive(Duration::from_secs(5), Duration::from_secs(10)),
19 );
20 let client = etcd_client::Client::connect(endpoints, options).await?;
21 Ok(DetectMeta::new(client, meta))
22 }
23
24 pub fn new(client: etcd_client::Client, meta: Meta) -> Self {
25 DetectMeta { meta, client }
26 }
27
28 async fn real_fetch(
29 &mut self,
30 ) -> Result<etcd_client::GetResponse, Box<dyn Error + 'static + Send>> {
31 let resp = self
32 .client
33 .get(self.meta.key.path(), None)
34 .await
35 .map_err(|e| Box::new(e) as Box<dyn Error + 'static + Send>);
36
37 resp
38 }
39
40 async fn real_fetch_all(
41 &mut self,
42 ) -> Result<etcd_client::GetResponse, Box<dyn Error + 'static + Send>> {
43 let resp = self
44 .client
45 .get(
46 self.meta.key.root_path(),
47 Some(etcd_client::GetOptions::new().with_prefix()),
48 )
49 .await
50 .map_err(|e| Box::new(e) as Box<dyn Error + 'static + Send>);
51
52 resp
53 }
54
55 async fn watch_inner<G>(
56 &mut self,
57 resp: etcd_client::GetResponse,
58 key: String,
59 watch_opts_fn: G,
60 ) -> Result<WatchRx<MetaEvent>, Box<dyn Error + 'static + Send>>
61 where
62 G: FnOnce(i64) -> etcd_client::WatchOptions,
63 {
64 let revision = resp.header().unwrap().revision();
66
67 let watch = self
69 .client
70 .watch(key, Some(watch_opts_fn(revision + 1)))
71 .await
72 .map_err(|e| Box::new(e) as Box<dyn Error + 'static + Send>)?;
73
74 let (tx, rx) = build_watch();
76
77 for m in util::parse_meta_response(resp) {
79 let _ = tx.send(MetaEvent::Changed(m));
80 }
81
82 util::watch_meta(watch, tx);
84
85 Ok(rx)
86 }
87}
88
89impl detector::DetectMeta for DetectMeta {
90 async fn register(&mut self) -> Result<(), Box<dyn Error + 'static + Send>> {
92 let key = self.meta.key.path();
93 let value = serde_json::to_string(&self.meta)
94 .map_err(|e| Box::new(e) as Box<dyn Error + 'static + Send>)?;
95 self.client
96 .put(key, value, None)
97 .await
98 .map(|_| ())
99 .map_err(|e| Box::new(e) as Box<dyn Error + 'static + Send>)
100 }
101
102 async fn fetch(&mut self) -> Result<Option<Meta>, Box<dyn Error + 'static + Send>> {
103 let metas = util::parse_meta_response(self.real_fetch().await?);
104 if metas.is_empty() {
105 Ok(None)
106 } else {
107 Ok(Some(metas[0].clone()))
108 }
109 }
110
111 async fn fetch_all(&mut self) -> Result<Vec<Meta>, Box<dyn Error + 'static + Send>> {
112 Ok(util::parse_meta_response(self.real_fetch_all().await?))
113 }
114
115 async fn watch(&mut self) -> Result<WatchRx<MetaEvent>, Box<dyn Error + 'static + Send>> {
117 let resp = self.real_fetch().await?;
118 self.watch_inner(resp, self.meta.key.path(), |rev| {
119 etcd_client::WatchOptions::new().with_start_revision(rev)
120 })
121 .await
122 }
123
124 async fn watch_all(&mut self) -> Result<WatchRx<MetaEvent>, Box<dyn Error + 'static + Send>> {
126 let resp = self.real_fetch_all().await?;
127 self.watch_inner(resp, self.meta.key.root_path(), |rev| {
128 etcd_client::WatchOptions::new()
129 .with_prefix()
130 .with_start_revision(rev)
131 })
132 .await
133 }
134}
135
136pub type MetaDetector = DetectMeta;