use std::sync::Arc;
use std::time::Duration;
use arc_swap::ArcSwap;
use etcd_client::{Client, EventType};
use osproxy_core::Clock;
use osproxy_observe::{decode_directive_set, DirectiveSet};
use super::{EtcdDirectiveStore, EtcdError};
const RECONNECT_DELAY: Duration = Duration::from_secs(1);
impl EtcdDirectiveStore {
pub async fn connect(
endpoints: &[String],
key: impl Into<String>,
clock: Arc<dyn Clock>,
) -> Result<Self, EtcdError> {
let key = key.into();
let mut client = Client::connect(endpoints, None).await?;
let resp = client.get(key.clone(), None).await?;
let initial = resp
.kvs()
.first()
.and_then(|kv| decode_directive_set(kv.value(), clock.as_ref()).ok())
.unwrap_or_default();
let current = Arc::new(ArcSwap::from_pointee(initial));
let handle = tokio::runtime::Handle::current();
let endpoints = endpoints.to_vec();
let task_current = Arc::clone(¤t);
handle.spawn(watch_loop(endpoints, key, clock, task_current));
Ok(Self::from_snapshot(current))
}
}
async fn watch_loop(
endpoints: Vec<String>,
key: String,
clock: Arc<dyn Clock>,
current: Arc<ArcSwap<DirectiveSet>>,
) {
loop {
let _ = watch_once(&endpoints, &key, clock.as_ref(), ¤t).await;
tokio::time::sleep(RECONNECT_DELAY).await;
}
}
async fn watch_once(
endpoints: &[String],
key: &str,
clock: &dyn Clock,
current: &ArcSwap<DirectiveSet>,
) -> Result<(), etcd_client::Error> {
let mut client = Client::connect(endpoints, None).await?;
let resp = client.get(key, None).await?;
if let Some(kv) = resp.kvs().first() {
super::apply_value(current, kv.value(), clock);
}
let mut stream = client.watch(key, None).await?;
while let Some(resp) = stream.message().await? {
for event in resp.events() {
match event.event_type() {
EventType::Put => {
if let Some(kv) = event.kv() {
super::apply_value(current, kv.value(), clock);
}
}
EventType::Delete => {
current.store(Arc::new(DirectiveSet::new()));
}
}
}
}
Ok(())
}