use crate::client::Client;
use detector::{Meta, MetaKey, Service, ServiceStatus};
use etcd_client::{Error, PutOptions};
use serde::{Serialize, Serializer};
use std::time::Duration;
use tokio::sync::watch::{Receiver, Sender};
enum Inner<T> {
Meta(T, Option<i64>),
Service(T, Sender<ServiceStatus>, Receiver<ServiceStatus>),
SimpleService(T),
}
pub struct SimpleService(Service);
impl Serialize for SimpleService {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
self.0.serialize(serializer)
}
}
pub struct Registrar<T> {
client: Client,
inner: Inner<T>,
}
impl<T> Registrar<T> {
async fn do_register(
&mut self,
path: String,
value: String,
ttl: Option<i64>,
) -> Result<Option<i64>, Error> {
let (opt, id) = if let Some(ttl) = ttl {
let lease = self.client.client.lease_grant(ttl, None).await?;
(
Some(PutOptions::new().with_lease(lease.id())),
Some(lease.id()),
)
} else {
(None, None)
};
let _ = self.client.client.put(path, value, opt).await?;
Ok(id)
}
async fn do_unregister(&mut self, path: String) -> Result<(), Error> {
self.client.client.delete(path, None).await.map(|_| ())
}
}
impl<T: Serialize> Registrar<T> {
fn serialize_value(&self) -> Result<String, Error> {
match self.inner {
Inner::Meta(ref m, _) => {
serde_json::to_string(m).map_err(|e| Error::InvalidArgs(format!("{:?}", e)))
}
Inner::Service(ref s, _, _) => {
serde_json::to_string(s).map_err(|e| Error::InvalidArgs(format!("{:?}", e)))
}
Inner::SimpleService(ref s) => {
serde_json::to_string(s).map_err(|e| Error::InvalidArgs(format!("{:?}", e)))
}
}
}
}
impl From<(Client, Meta)> for Registrar<Meta> {
fn from(value: (Client, Meta)) -> Self {
Self {
client: value.0,
inner: Inner::Meta(value.1, None),
}
}
}
impl Registrar<Meta> {
pub fn with_ttl(mut self, ttl: Option<i64>) -> Self {
if let Inner::Meta(_, ref mut t) = self.inner {
*t = ttl.filter(|t| *t > 0);
}
self
}
pub async fn register(&mut self) -> Result<(), Error> {
let value = self.serialize_value()?;
let inner = self.to_inner();
self.do_register(inner.0.key.path(), value, inner.1.clone())
.await
.map(|_| ())
}
pub async fn unregister(&mut self) -> Result<(), Error> {
let inner = self.to_inner();
self.do_unregister(inner.0.key.path()).await
}
fn to_inner(&self) -> (&Meta, &Option<i64>) {
match self.inner {
Inner::Meta(ref m, ref t) => (m, t),
_ => panic!("never"),
}
}
}
impl From<(Client, Service)> for Registrar<SimpleService> {
fn from(value: (Client, Service)) -> Self {
Self {
client: value.0,
inner: Inner::SimpleService(SimpleService(value.1)),
}
}
}
impl Registrar<SimpleService> {
pub async fn register(&mut self) -> Result<(), Error> {
let inner = self.to_inner();
let path = inner
.key
.path()
.ok_or(Error::InvalidArgs("no id".to_string()))?;
let ttl = inner.ttl.filter(|t| *t > 0);
let value = self.serialize_value()?;
self.do_register(path, value, ttl).await.map(|_| ())
}
pub async fn unregister(&mut self) -> Result<(), Error> {
let inner = self.to_inner();
let path = inner
.key
.path()
.ok_or(Error::InvalidArgs("no id".to_string()))?;
self.do_unregister(path).await
}
fn to_inner(&self) -> &Service {
match self.inner {
Inner::SimpleService(ref s) => &s.0,
_ => panic!("never"),
}
}
}
impl From<(Client, Service)> for Registrar<Service> {
fn from(mut value: (Client, Service)) -> Self {
value.1.ttl = value
.1
.ttl
.map_or(Some(60), |t| if t < 10 { Some(10) } else { Some(t) });
let (tx, rx) = tokio::sync::watch::channel(ServiceStatus::Unregistered);
Self {
client: value.0,
inner: Inner::Service(value.1, tx, rx),
}
}
}
impl Registrar<Service> {
pub async fn register(&mut self) -> Result<(), Error> {
let mut client = self.client.clone();
let (service, tx, _) = self.to_inner_mut();
if service.key.has_id() {
return Err(Error::InvalidArgs(
"not allow to register again".to_string(),
));
}
let meta = client
.fetch_meta(&MetaKey::new(
service.key.name.clone(),
service.key.ns.clone(),
))
.await?
.ok_or(Error::InvalidArgs("no meta".to_string()))?;
let instances = meta.instances;
service.meta = Some(meta);
let lease_id = {
let mut lease_id = 0;
for id in 0..instances {
service.key.id = Some(id);
let result = txn_register(service, client.client.clone()).await?;
if result != 0 {
lease_id = result;
break;
}
}
lease_id
};
if lease_id == 0 {
return Err(Error::InvalidArgs("no instance enough".to_string()));
}
change_status(&tx, ServiceStatus::Registered);
keep_lease_alive(lease_id, service.clone(), client.client, tx.clone());
Ok(())
}
pub fn service(&self) -> &Service {
self.to_inner().0
}
pub fn status(&self) -> ServiceStatus {
*self.to_inner().2.borrow()
}
pub async fn changed(&mut self) -> ServiceStatus {
if !self.service().key.has_id() {
return self.status();
}
let _ = self.to_inner_mut().2.changed().await.expect("impossible");
self.status()
}
}
impl Registrar<Service> {
fn to_inner_mut(
&mut self,
) -> (
&mut Service,
&mut Sender<ServiceStatus>,
&mut Receiver<ServiceStatus>,
) {
match self.inner {
Inner::Service(ref mut s, ref mut tx, ref mut rx) => (s, tx, rx),
_ => panic!("never"),
}
}
fn to_inner(&self) -> (&Service, &Sender<ServiceStatus>, &Receiver<ServiceStatus>) {
match self.inner {
Inner::Service(ref s, ref tx, ref rx) => (s, tx, rx),
_ => panic!("never"),
}
}
}
async fn txn_register(service: &Service, mut client: etcd_client::Client) -> Result<i64, Error> {
let path = service.key.path().unwrap();
let lease = client.lease_grant(service.ttl.unwrap(), None).await?;
let cmp = etcd_client::Compare::version(&*path, etcd_client::CompareOp::Equal, 0);
let value = serde_json::to_string(&service).expect("Serialize Error");
let put = etcd_client::TxnOp::put(path, value, Some(PutOptions::new().with_lease(lease.id())));
let txn = etcd_client::Txn::new().when([cmp]).and_then([put]);
let resp = client.txn(txn).await?;
if resp.succeeded() {
Ok(lease.id())
} else {
Ok(0)
}
}
fn keep_lease_alive(
lease_id: i64,
service: Service,
mut client: etcd_client::Client,
tx: Sender<ServiceStatus>,
) {
async fn do_keep_alive(
lease_id: i64,
client: &mut etcd_client::Client,
tx: &Sender<ServiceStatus>,
) {
loop {
if tx.sender_count() == 1 {
break;
}
match client.lease_keep_alive(lease_id).await {
Ok((mut keeper, mut stream)) => {
loop {
if tx.sender_count() == 1 {
break;
}
if let Err(_e) = keeper.keep_alive().await {
break;
}
match stream.message().await {
Err(_e) => {
break;
}
Ok(Some(r)) => {
if r.ttl() == 0 {
return;
}
change_status(&tx, ServiceStatus::Registered);
}
Ok(None) => {
break;
}
}
tokio::time::sleep(Duration::from_secs(5)).await;
}
}
Err(Error::LeaseKeepAliveError(_e)) => {
return;
}
Err(_e) => {
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
change_status(&tx, ServiceStatus::Unregistered);
}
}
let _ = tokio::spawn(async move {
do_keep_alive(lease_id, &mut client, &tx).await;
change_status(&tx, ServiceStatus::Unregistered);
register_service_again(service, client, tx).await;
});
}
async fn register_service_again(
service: Service,
mut client: etcd_client::Client,
tx: Sender<ServiceStatus>,
) {
let lease_id = loop {
if tx.sender_count() == 1 {
let path = service.key.path().unwrap();
let _ = client.delete(path, None).await;
return;
}
match txn_register(&service, client.clone()).await {
Ok(r) if r != 0 => {
break r;
}
_ => {
tokio::time::sleep(Duration::from_secs(2)).await;
}
}
};
if lease_id != 0 {
change_status(&tx, ServiceStatus::Registered);
keep_lease_alive(lease_id, service, client, tx);
}
}
fn change_status(tx: &Sender<ServiceStatus>, status: ServiceStatus) {
if status != *tx.borrow() {
let _ = tx.send(status);
}
}