use super::{EtcdConnection, EtcdEntry};
use crate::burst;
use crate::nodes::{FutStream, RunParams, StreamOperators};
use crate::types::*;
use etcd_client::{Client, Compare, CompareOp, PutOptions, Txn, TxnOp};
use futures::StreamExt;
use std::pin::Pin;
use std::rc::Rc;
use std::time::Duration;
#[must_use]
pub fn etcd_pub(
connection: EtcdConnection,
upstream: &Rc<dyn Stream<Burst<EtcdEntry>>>,
lease_ttl: Option<Duration>,
force: bool,
) -> Rc<dyn Node> {
upstream.consume_async(Box::new(
move |_ctx: RunParams, source: Pin<Box<dyn FutStream<Burst<EtcdEntry>>>>| async move {
let mut client = Client::connect(&connection.endpoints, None)
.await
.map_err(|e| anyhow::anyhow!("etcd connect failed: {e}"))?;
let (lease_id, keepalive_handle) = match lease_ttl {
None => (None, None),
Some(ttl) => {
let ttl_secs = ttl.as_secs().max(1) as i64;
let lease_resp = client
.lease_grant(ttl_secs, None)
.await
.map_err(|e| anyhow::anyhow!("etcd lease_grant failed: {e}"))?;
let id = lease_resp.id();
let (mut keeper, mut ka_stream) = client
.lease_keep_alive(id)
.await
.map_err(|e| anyhow::anyhow!("etcd lease_keep_alive failed: {e}"))?;
let renew_interval = (ttl / 3).max(Duration::from_secs(1));
let handle = tokio::spawn(async move {
loop {
tokio::time::sleep(renew_interval).await;
if keeper.keep_alive().await.is_err() {
break;
}
match ka_stream.message().await {
Ok(Some(_)) => {}
_ => break,
}
}
});
(Some(id), Some(handle))
}
};
let mut source = source;
while let Some((_time, burst)) = source.next().await {
for entry in burst {
let opts = lease_id.map(|id| PutOptions::new().with_lease(id));
if force {
client
.put(entry.key, entry.value, opts)
.await
.map_err(|e| anyhow::anyhow!("etcd put failed: {e}"))?;
} else {
let key_absent = vec![Compare::create_revision(
entry.key.as_bytes(),
CompareOp::Equal,
0,
)];
let put_op = vec![TxnOp::put(
entry.key.as_bytes(),
entry.value.as_slice(),
opts,
)];
let txn = Txn::new().when(key_absent).and_then(put_op);
let resp = client
.txn(txn)
.await
.map_err(|e| anyhow::anyhow!("etcd txn failed: {e}"))?;
if !resp.succeeded() {
return Err(anyhow::anyhow!(
"etcd conditional write failed: key already exists (use force=true to overwrite): {}",
entry.key
));
}
}
}
}
if let Some(handle) = keepalive_handle {
handle.abort();
let _ = handle.await; }
if let Some(id) = lease_id {
let _ = client.lease_revoke(id).await;
}
Ok(())
},
))
}
pub trait EtcdPubOperators {
#[must_use]
fn etcd_pub(
self: &Rc<Self>,
conn: EtcdConnection,
lease_ttl: Option<Duration>,
force: bool,
) -> Rc<dyn Node>;
}
impl EtcdPubOperators for dyn Stream<Burst<EtcdEntry>> {
fn etcd_pub(
self: &Rc<Self>,
conn: EtcdConnection,
lease_ttl: Option<Duration>,
force: bool,
) -> Rc<dyn Node> {
etcd_pub(conn, self, lease_ttl, force)
}
}
impl EtcdPubOperators for dyn Stream<EtcdEntry> {
fn etcd_pub(
self: &Rc<Self>,
conn: EtcdConnection,
lease_ttl: Option<Duration>,
force: bool,
) -> Rc<dyn Node> {
let burst_stream = self.map(|entry| burst![entry]);
etcd_pub(conn, &burst_stream, lease_ttl, force)
}
}