use crate::response::parse_meta_response;
use crate::watch::Watcher;
use detector::{Meta, MetaKey};
use std::time::Duration;
pub struct MetaDetector {
meta: Meta,
client: etcd_client::Client,
}
impl MetaDetector {
async fn real_fetch(&mut self) -> Result<etcd_client::GetResponse, etcd_client::Error> {
self.client.get(self.meta.key.path(), None).await
}
async fn real_fetch_all(&mut self) -> Result<etcd_client::GetResponse, etcd_client::Error> {
self.client
.get(
self.meta.key.root_path(),
Some(etcd_client::GetOptions::new().with_prefix()),
)
.await
}
async fn watch_inner<G>(
&mut self,
resp: etcd_client::GetResponse,
key: String,
watch_opts_fn: G,
) -> Result<Watcher<Meta, MetaKey>, etcd_client::Error>
where
G: FnOnce(i64) -> etcd_client::WatchOptions,
{
let revision = resp.header().unwrap().revision();
let watch = self
.client
.watch(key, Some(watch_opts_fn(revision + 1)))
.await?;
Ok(Watcher::new(parse_meta_response(resp), watch.0, watch.1))
}
}
impl MetaDetector {
pub async fn connect<E: AsRef<str>, S: AsRef<[E]>>(
meta: Meta,
endpoints: S,
) -> Result<Self, etcd_client::Error> {
let options = Some(
etcd_client::ConnectOptions::new()
.with_keep_alive(Duration::from_secs(5), Duration::from_secs(10)),
);
let client = etcd_client::Client::connect(endpoints, options).await?;
Ok(Self::new(client, meta))
}
pub fn new(client: etcd_client::Client, meta: Meta) -> Self {
Self { meta, client }
}
}
impl MetaDetector {
pub async fn register(&mut self) -> Result<(), etcd_client::Error> {
let key = self.meta.key.path();
let value = serde_json::to_string(&self.meta).expect("Serialize Error");
self.client.put(key, value, None).await.map(|_| ())
}
pub async fn unregister(&mut self) -> Result<(), etcd_client::Error> {
let key = self.meta.key.path();
self.client.delete(key, None).await.map(|_| ())
}
pub async fn fetch(&mut self) -> Result<Option<Meta>, etcd_client::Error> {
let metas = parse_meta_response(self.real_fetch().await?);
if metas.is_empty() {
Ok(None)
} else {
Ok(Some(metas[0].clone()))
}
}
pub async fn fetch_all(&mut self) -> Result<Vec<Meta>, etcd_client::Error> {
Ok(parse_meta_response(self.real_fetch_all().await?))
}
pub async fn watch(&mut self) -> Result<Watcher<Meta, MetaKey>, etcd_client::Error> {
let resp = self.real_fetch().await?;
self.watch_inner(resp, self.meta.key.path(), |rev| {
etcd_client::WatchOptions::new().with_start_revision(rev)
})
.await
}
pub async fn watch_all(&mut self) -> Result<Watcher<Meta, MetaKey>, etcd_client::Error> {
let resp = self.real_fetch_all().await?;
self.watch_inner(resp, self.meta.key.root_path(), |rev| {
etcd_client::WatchOptions::new()
.with_prefix()
.with_start_revision(rev)
})
.await
}
}