use crate::client::Client;
use crate::watch::Watcher;
use detector::{Meta, MetaKey, Service, ServiceKey, WatchEvent};
use etcd_client::Error;
use serde::de::DeserializeOwned;
use std::collections::HashMap;
use std::fmt::Debug;
use std::pin::Pin;
#[derive(Debug, Clone)]
pub enum HistoryEvent<T: Clone + Debug> {
Changed(T, Option<T>),
Deleted(T),
}
#[derive(Debug, Clone)]
pub enum HistoryType {
Meta(MetaKey),
Metas(String),
Service(ServiceKey),
Services(ServiceKey),
AllServices(String),
}
pub trait HasPath {
fn get_path(&self) -> Option<String>;
}
impl HasPath for Meta {
fn get_path(&self) -> Option<String> {
Some(self.key.path())
}
}
impl HasPath for MetaKey {
fn get_path(&self) -> Option<String> {
Some(self.path())
}
}
impl HasPath for Service {
fn get_path(&self) -> Option<String> {
self.key.path()
}
}
impl HasPath for ServiceKey {
fn get_path(&self) -> Option<String> {
self.path()
}
}
pub struct History<T1, T2>
where
T1: Clone + Debug + DeserializeOwned + 'static + Send,
T2: Clone + Debug + for<'a> TryFrom<&'a [u8]> + 'static + Send,
for<'a> <T2 as TryFrom<&'a [u8]>>::Error: Send,
{
create: Box<
dyn FnMut() -> Pin<Box<dyn Future<Output = Result<Watcher<T1, T2>, Error>> + Send>> + Send,
>,
watcher: Option<Watcher<T1, T2>>,
record: Option<HashMap<String, T1>>,
}
impl<T1, T2> History<T1, T2>
where
T1: Clone + Debug + DeserializeOwned + HasPath + 'static + Send,
T2: Clone + Debug + HasPath + for<'a> TryFrom<&'a [u8]> + 'static + Send,
for<'a> <T2 as TryFrom<&'a [u8]>>::Error: Send,
{
pub fn create(
create: Box<
dyn FnMut() -> Pin<Box<dyn Future<Output = Result<Watcher<T1, T2>, Error>> + Send>>
+ Send,
>,
) -> Self {
Self {
create,
watcher: None,
record: None,
}
}
pub async fn get_watcher(&mut self) -> Result<&mut Watcher<T1, T2>, Option<Vec<T1>>> {
if self.watcher.is_none() {
match (self.create)().await {
Ok(w) => self.watcher = Some(w),
Err(_) => return Err(self.flatten_record()),
}
}
Ok(self.watcher.as_mut().expect("infallible"))
}
pub async fn event(&mut self) -> Result<HistoryEvent<T1>, Option<Vec<T1>>> {
loop {
let watcher = self.get_watcher().await?;
let we = match watcher.event().await {
Err(_) => {
self.watcher = None;
return Err(self.flatten_record());
}
Ok(we) => we,
};
match we {
WatchEvent::Changed(m) => {
if let Some(path) = m.get_path() {
let old = self.add_record(path, m.clone());
return Ok(HistoryEvent::Changed(m, old));
}
continue;
}
WatchEvent::Deleted(k) => {
if let Some(path) = k.get_path() {
if let Some(old) = self.del_record(&path) {
return Ok(HistoryEvent::Deleted(old));
}
}
continue;
}
}
}
}
fn flatten_record(&mut self) -> Option<Vec<T1>> {
match self.record.take() {
None => None,
Some(h) => Some(h.into_values().map(|t| t).collect()),
}
}
fn add_record(&mut self, path: String, t1: T1) -> Option<T1> {
if self.record.is_none() {
self.record = Some(HashMap::new());
}
let record = self.record.as_mut().unwrap();
let old = record.insert(path, t1);
old
}
fn del_record(&mut self, path: &String) -> Option<T1> {
match self.record {
None => None,
Some(ref mut r) => r.remove(path),
}
}
}
impl History<Meta, MetaKey> {
pub fn new(client: Client, ht: HistoryType) -> Self {
match ht {
HistoryType::Meta(key) => Self::create(Box::new(move || {
let mut client = client.clone();
let key = key.clone();
Box::pin(async move { client.watch_meta(&key).await })
})),
HistoryType::Metas(ns) => Self::create(Box::new(move || {
let mut client = client.clone();
let ns = ns.clone();
Box::pin(async move { client.watch_metas(ns).await })
})),
_ => panic!("wrong type"),
}
}
}
impl History<Service, ServiceKey> {
pub fn new(client: Client, ht: HistoryType) -> Self {
match ht {
HistoryType::Service(key) => Self::create(Box::new(move || {
let mut client = client.clone();
let key = key.clone();
Box::pin(async move { client.watch_service(&key).await })
})),
HistoryType::Services(key) => Self::create(Box::new(move || {
let mut client = client.clone();
let key = key.clone();
Box::pin(async move { client.watch_services(&key).await })
})),
HistoryType::AllServices(ns) => Self::create(Box::new(move || {
let mut client = client.clone();
let ns = ns.clone();
Box::pin(async move { client.watch_all_services(ns).await })
})),
_ => panic!("wrong type"),
}
}
}