use std::collections::HashMap;
use std::time::Duration;
use detector::{Meta, MetaKey, Service, ServiceKey, Services};
use etcd_client::{Error, GetOptions, GetResponse, WatchOptions};
use crate::response::{parse_meta_response, parse_service_response};
use crate::watch::Watcher;
#[derive(Clone)]
pub struct Client {
pub(crate) client: etcd_client::Client,
}
pub struct Builder {
opt: etcd_client::ConnectOptions,
}
impl Builder {
pub fn new() -> Self {
Self {
opt: etcd_client::ConnectOptions::new(),
}
}
pub async fn build<E: AsRef<str>, S: AsRef<[E]>>(self, e: S) -> Result<Client, Error> {
let client = etcd_client::Client::connect(e, Some(self.opt)).await?;
Ok(Client { client })
}
pub fn with_user(mut self, name: impl Into<String>, password: impl Into<String>) -> Self {
self.opt = self.opt.with_user(name, password);
self
}
pub fn with_keep_alive(mut self, interval: Duration, timeout: Duration) -> Self {
self.opt = self.opt.with_keep_alive(interval, timeout);
self
}
pub fn with_timeout(mut self, timeout: Duration) -> Self {
self.opt = self.opt.with_timeout(timeout);
self
}
pub fn with_connect_timeout(mut self, timeout: Duration) -> Self {
self.opt = self.opt.with_connect_timeout(timeout);
self
}
pub fn with_tcp_keepalive(mut self, tcp_keepalive: Duration) -> Self {
self.opt = self.opt.with_tcp_keepalive(tcp_keepalive);
self
}
pub fn with_keep_alive_while_idle(mut self, enabled: bool) -> Self {
self.opt = self.opt.with_keep_alive_while_idle(enabled);
self
}
pub fn with_require_leader(mut self, require_leader: bool) -> Self {
self.opt = self.opt.with_require_leader(require_leader);
self
}
}
impl Client {
async fn do_fetch_meta(&mut self, key: &MetaKey) -> Result<GetResponse, Error> {
self.client.get(key.path(), None).await
}
async fn do_fetch_metas(&mut self, key: &MetaKey) -> Result<GetResponse, Error> {
self.client
.get(key.root_path(), Some(GetOptions::new().with_prefix()))
.await
}
async fn do_fetch_service(&mut self, key: &ServiceKey) -> Result<GetResponse, Error> {
self.client
.get(
key.path().ok_or(Error::InvalidArgs("no id".to_string()))?,
None,
)
.await
}
async fn do_fetch_services(&mut self, key: &ServiceKey) -> Result<GetResponse, Error> {
self.client
.get(key.parent_path(), Some(GetOptions::new().with_prefix()))
.await
}
async fn do_fetch_all_services(&mut self, key: &ServiceKey) -> Result<GetResponse, Error> {
self.client
.get(key.root_path(), Some(GetOptions::new().with_prefix()))
.await
}
async fn do_watch_meta<G>(
&mut self,
resp: GetResponse,
key: String,
watch_opts_fn: G,
) -> Result<Watcher<Meta, MetaKey>, Error>
where
G: FnOnce(i64) -> WatchOptions,
{
let revision = resp.header().unwrap().revision();
let watch = self
.client
.watch(key, Some(watch_opts_fn(revision + 1)))
.await?;
Ok(Watcher::new(parse_meta_response(resp), watch.0, watch.1))
}
async fn do_watch_service<G>(
&mut self,
resp: GetResponse,
key: String,
watch_opts_fn: G,
) -> Result<Watcher<Service, ServiceKey>, Error>
where
G: FnOnce(i64) -> WatchOptions,
{
let revision = resp.header().unwrap().revision();
let watch = self
.client
.watch(key, Some(watch_opts_fn(revision + 1)))
.await?;
let init: Vec<Service> = parse_service_response(resp)
.into_values()
.map(|s| s.services)
.flatten()
.collect();
Ok(Watcher::new(init, watch.0, watch.1))
}
}
impl Client {
pub async fn fetch_meta(&mut self, key: &MetaKey) -> Result<Option<Meta>, Error> {
let metas = parse_meta_response(self.do_fetch_meta(key).await?);
if metas.is_empty() {
Ok(None)
} else {
Ok(Some(metas[0].clone()))
}
}
pub async fn fetch_metas<S: Into<String>>(&mut self, namespace: S) -> Result<Vec<Meta>, Error> {
Ok(parse_meta_response(
self.do_fetch_metas(&MetaKey::new("whatever", namespace))
.await?,
))
}
pub async fn fetch_service(&mut self, key: &ServiceKey) -> Result<Option<Service>, Error> {
let name = key.name.clone();
let mut resp = parse_service_response(self.do_fetch_service(key).await?);
match resp.remove(&name) {
None => Ok(None),
Some(mut s) => Ok(s.services.pop()),
}
}
pub async fn fetch_services(&mut self, key: &ServiceKey) -> Result<Services, Error> {
let name = key.name.clone();
let mut resp = parse_service_response(self.do_fetch_services(key).await?);
match resp.remove(&name) {
None => Ok(Services::new()),
Some(s) => Ok(s),
}
}
pub async fn fetch_all_services<S: Into<String>>(
&mut self,
namespace: S,
) -> Result<HashMap<String, Services>, Error> {
Ok(parse_service_response(
self.do_fetch_all_services(&ServiceKey::new("whatever", namespace))
.await?,
))
}
pub async fn watch_meta(&mut self, key: &MetaKey) -> Result<Watcher<Meta, MetaKey>, Error> {
let resp = self.do_fetch_meta(key).await?;
self.do_watch_meta(resp, key.path(), |rev| {
WatchOptions::new().with_start_revision(rev)
})
.await
}
pub async fn watch_metas<S: Into<String>>(
&mut self,
namespace: S,
) -> Result<Watcher<Meta, MetaKey>, Error> {
let key = MetaKey::new("whatever", namespace);
let resp = self.do_fetch_metas(&key).await?;
self.do_watch_meta(resp, key.root_path(), |rev| {
WatchOptions::new().with_prefix().with_start_revision(rev)
})
.await
}
pub async fn watch_service(
&mut self,
key: &ServiceKey,
) -> Result<Watcher<Service, ServiceKey>, Error> {
let resp = self.do_fetch_service(key).await?;
self.do_watch_service(resp, key.path().clone().unwrap(), |rev| {
WatchOptions::new().with_start_revision(rev)
})
.await
}
pub async fn watch_services(
&mut self,
key: &ServiceKey,
) -> Result<Watcher<Service, ServiceKey>, Error> {
let resp = self.do_fetch_services(key).await?;
self.do_watch_service(resp, key.parent_path(), |rev| {
WatchOptions::new().with_prefix().with_start_revision(rev)
})
.await
}
pub async fn watch_all_services<S: Into<String>>(
&mut self,
namespace: S,
) -> Result<Watcher<Service, ServiceKey>, Error> {
let key = ServiceKey::new("whatever", namespace);
let resp = self.do_fetch_all_services(&key).await?;
self.do_watch_service(resp, key.root_path(), |rev| {
WatchOptions::new().with_prefix().with_start_revision(rev)
})
.await
}
}