use crate::response::parse_service_response;
use crate::trace::debug;
use crate::watch::Oneself;
use crate::{MetaDetector, Watcher};
use detector::{Meta, MetaKey, Service, ServiceKey, ServiceStatus, Services};
use std::collections::HashMap;
use std::time::Duration;
struct Inner {
service: Service,
client: etcd_client::Client,
sender: tokio::sync::watch::Sender<ServiceStatus>,
receiver: tokio::sync::watch::Receiver<ServiceStatus>,
}
pub struct Detector {
inner: Inner,
}
impl Detector {
pub async fn connect<E: AsRef<str>, S: AsRef<[E]>>(
service: Service,
endpoints: S,
) -> Result<crate::Detector, etcd_client::Error> {
let options = Some(
etcd_client::ConnectOptions::new()
.with_keep_alive(Duration::from_secs(5), Duration::from_secs(10)),
);
let client = etcd_client::Client::connect(endpoints, options).await?;
Ok(crate::Detector::new(client, service))
}
pub fn new(client: etcd_client::Client, service: Service) -> Self {
let (sender, receiver) = tokio::sync::watch::channel(ServiceStatus::Unregistered);
Self {
inner: Inner {
service,
client,
sender,
receiver,
},
}
}
async fn real_fetch(&mut self) -> Result<etcd_client::GetResponse, etcd_client::Error> {
self.inner
.client
.get(
self.inner.service.key.parent_path(),
Some(etcd_client::GetOptions::new().with_prefix()),
)
.await
}
async fn real_fetch_all(&mut self) -> Result<etcd_client::GetResponse, etcd_client::Error> {
self.inner
.client
.get(
self.inner.service.key.root_path(),
Some(etcd_client::GetOptions::new().with_prefix()),
)
.await
}
async fn watch_inner(
&mut self,
resp: etcd_client::GetResponse,
key: String,
) -> Result<Watcher<Service, ServiceKey>, etcd_client::Error> {
let revision = resp.header().unwrap().revision();
let watch = self
.inner
.client
.watch(
key,
Some(
etcd_client::WatchOptions::new()
.with_prefix()
.with_start_revision(revision + 1),
),
)
.await?;
let init: Vec<Service> = parse_service_response(resp)
.into_values()
.map(|s| s.services)
.flatten()
.collect();
Ok(Watcher::new(init, watch.0, watch.1))
}
}
impl Detector {
pub fn service(&self) -> &Service {
&self.inner.service
}
pub async fn fetch(&mut self) -> Result<Services, etcd_client::Error> {
let name = &self.inner.service.key.name.clone();
let mut resp = parse_service_response(self.real_fetch().await?);
match resp.remove(name) {
None => Ok(Services::new()),
Some(s) => Ok(s),
}
}
pub async fn fetch_all(&mut self) -> Result<HashMap<String, Services>, etcd_client::Error> {
Ok(parse_service_response(self.real_fetch_all().await?))
}
pub async fn watch(&mut self) -> Result<Watcher<Service, ServiceKey>, etcd_client::Error> {
let resp = self.real_fetch().await?;
let parent_path = self.inner.service.key.parent_path();
self.watch_inner(resp, parent_path).await
}
pub async fn watch_all(&mut self) -> Result<Watcher<Service, ServiceKey>, etcd_client::Error> {
let resp = self.real_fetch_all().await?;
let root_path = self.inner.service.key.root_path();
self.watch_inner(resp, root_path).await
}
pub fn oneself(&self) -> Oneself {
Oneself::new(self.inner.receiver.clone())
}
pub async fn register(&mut self) -> Result<Oneself, etcd_client::Error> {
if !self.inner.service.key.has_id() {
register_service(&mut self.inner).await?;
}
Ok(self.oneself())
}
}
async fn register_service(inner: &mut Inner) -> Result<(), etcd_client::Error> {
let mut meta_detect = MetaDetector::new(
inner.client.clone(),
Meta::from_key(MetaKey::new(
inner.service.key.name.clone(),
inner.service.key.ns.clone(),
)),
);
let meta = meta_detect
.fetch()
.await?
.ok_or(etcd_client::Error::InvalidArgs("no meta".to_string()))?;
let instances = meta.instances;
inner.service.meta = Some(meta);
inner.service.ttl = Some(
inner
.service
.ttl
.map_or(60, |t| if t < 10 { 10 } else { t }),
);
let lease_id = {
let mut lease_id = 0;
for id in 0..instances {
inner.service.key.id = Some(id);
let result = do_register(&inner.service, &mut inner.client).await?;
if result != 0 {
lease_id = result;
break;
}
}
lease_id
};
if lease_id == 0 {
debug!(
"failed to register cause of no instance enough: {:?}",
inner.service
);
return Err(etcd_client::Error::InvalidArgs(
"no instance enough".to_string(),
));
}
let _ = inner.sender.send(ServiceStatus::Registered);
keep_alive(
lease_id,
inner.service.clone(),
inner.client.clone(),
inner.sender.clone(),
);
Ok(())
}
async fn do_register(
service: &Service,
client: &mut etcd_client::Client,
) -> Result<i64, etcd_client::Error> {
debug!("try to register with:{:?}", service);
let path = service.key.path().unwrap();
let lease = client.lease_grant(service.ttl.unwrap(), None).await?;
let cmp = etcd_client::Compare::version(&*path, etcd_client::CompareOp::Equal, 0);
let value = serde_json::to_string(&service).expect("Serialize Error");
let put = etcd_client::TxnOp::put(
path,
value,
Some(etcd_client::PutOptions::new().with_lease(lease.id())),
);
let txn = etcd_client::Txn::new().when([cmp]).and_then([put]);
let resp = client.txn(txn).await?;
if resp.succeeded() {
debug!("register ok, lease id:{:?}", lease.id());
Ok(lease.id())
} else {
Ok(0)
}
}
fn keep_alive(
lease_id: i64,
service: Service,
mut client: etcd_client::Client,
sender: tokio::sync::watch::Sender<ServiceStatus>,
) {
async fn do_keep_alive(
lease_id: i64,
client: &mut etcd_client::Client,
sender: &tokio::sync::watch::Sender<ServiceStatus>,
) {
debug!("lease id:{} enter keep_alive", lease_id);
loop {
if sender.sender_count() == 1 {
break;
}
match client.lease_keep_alive(lease_id).await {
Ok((mut keeper, mut stream)) => {
loop {
if sender.sender_count() == 1 {
break;
}
if let Err(_e) = keeper.keep_alive().await {
debug!(
"lease id:{} send keep_alive request happened error:{:?}",
lease_id, _e
);
break;
}
match stream.message().await {
Err(_e) => {
debug!(
"lease id:{} recv keep_alive response happened error:{:?}",
lease_id, _e
);
break;
}
Ok(Some(r)) => {
if r.ttl() == 0 {
return;
}
let _ = sender.send(ServiceStatus::Registered);
}
Ok(None) => {
break;
}
}
tokio::time::sleep(Duration::from_secs(5)).await;
}
}
Err(etcd_client::Error::LeaseKeepAliveError(_e)) => {
debug!(
"lease id:{} get keep_alive stream happened error:{:?}",
lease_id, _e
);
return;
}
Err(_e) => {
debug!(
"lease id:{} get keep_alive stream happened error:{:?}",
lease_id, _e
);
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
let _ = sender.send(ServiceStatus::Unregistered);
}
}
tokio::spawn(async move {
do_keep_alive(lease_id, &mut client, &sender).await;
let _ = sender.send(ServiceStatus::Unregistered);
register_service_again(service, client, sender).await;
});
}
async fn register_service_again(
service: Service,
mut client: etcd_client::Client,
sender: tokio::sync::watch::Sender<ServiceStatus>,
) {
let lease_id = loop {
if sender.sender_count() == 1 {
let path = service.key.path().unwrap();
let _ = client.delete(path, None).await;
debug!("unregister ok: {:?}", service);
return;
}
match do_register(&service, &mut client).await {
Ok(r) if r != 0 => {
break r;
}
_ => {
tokio::time::sleep(Duration::from_secs(2)).await;
}
}
};
if lease_id != 0 {
let _ = sender.send(ServiceStatus::Registered);
keep_alive(lease_id, service, client, sender);
}
}