use super::{EtcdConnection, EtcdEntry, EtcdEvent, EtcdEventKind};
use crate::nodes::{RunParams, produce_async};
use crate::types::*;
use etcd_client::{Client, GetOptions, WatchOptions};
use futures::StreamExt;
use std::rc::Rc;
#[must_use]
pub fn etcd_sub(
connection: EtcdConnection,
prefix: impl Into<String>,
) -> Rc<dyn Stream<Burst<EtcdEvent>>> {
let prefix = prefix.into();
produce_async(move |_ctx: RunParams| async move {
let mut client = Client::connect(&connection.endpoints, None).await?;
let watch_opts = WatchOptions::new().with_prefix();
let mut watch_stream = client
.watch(prefix.as_bytes(), Some(watch_opts))
.await
.map_err(|e| anyhow::anyhow!("etcd watch failed: {e}"))?;
let get_opts = GetOptions::new().with_prefix();
let get_resp = client
.get(prefix.as_bytes(), Some(get_opts))
.await
.map_err(|e| anyhow::anyhow!("etcd get failed: {e}"))?;
let snapshot_rev = get_resp.header().map(|h| h.revision()).unwrap_or(0);
let snapshot: Vec<EtcdEvent> = get_resp
.kvs()
.iter()
.map(|kv| EtcdEvent {
kind: EtcdEventKind::Put,
entry: EtcdEntry {
key: String::from_utf8_lossy(kv.key()).into_owned(),
value: kv.value().to_vec(),
},
revision: snapshot_rev,
})
.collect();
Ok(async_stream::stream! {
for event in snapshot {
yield Ok((NanoTime::now(), event));
}
loop {
match watch_stream.next().await {
Some(Ok(resp)) => {
if resp.created() {
continue;
}
if resp.canceled() {
yield Err(anyhow::anyhow!(
"etcd watch cancelled: {}",
resp.cancel_reason()
));
break;
}
for event in resp.events() {
let kv = match event.kv() {
Some(kv) => kv,
None => continue,
};
let mod_rev = kv.mod_revision();
if mod_rev <= snapshot_rev {
continue;
}
let kind = match event.event_type() {
etcd_client::EventType::Put => EtcdEventKind::Put,
etcd_client::EventType::Delete => EtcdEventKind::Delete,
};
yield Ok((NanoTime::now(), EtcdEvent {
kind,
entry: EtcdEntry {
key: String::from_utf8_lossy(kv.key()).into_owned(),
value: kv.value().to_vec(),
},
revision: mod_rev,
}));
}
}
Some(Err(e)) => {
yield Err(anyhow::anyhow!("etcd watch error: {e}"));
break;
}
None => {
yield Err(anyhow::anyhow!("etcd watch stream closed unexpectedly"));
break;
}
}
}
})
})
}