use crate::runtime::Runtime;
use anyhow::{Context, Result};
use async_nats::jetstream::kv;
use derive_builder::Builder;
use derive_getters::Dissolve;
use futures::StreamExt;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::{RwLock, mpsc};
use validator::Validate;
use etcd_client::{
Certificate, Compare, CompareOp, DeleteOptions, GetOptions, Identity, LockClient, LockOptions,
LockResponse, PutOptions, PutResponse, TlsOptions, Txn, TxnOp, TxnOpResponse, WatchOptions,
WatchStream, Watcher,
};
pub use etcd_client::{ConnectOptions, KeyValue, LeaseClient};
use tokio::time::{Duration, interval};
use tokio_util::sync::CancellationToken;
mod connector;
mod lease;
mod lock;
mod path;
use connector::Connector;
use lease::*;
pub use lock::*;
pub use path::*;
use super::utils::build_in_runtime;
#[derive(Clone)]
pub struct Client {
connector: Arc<Connector>,
primary_lease: u64,
runtime: Runtime,
rt: Arc<tokio::runtime::Runtime>,
}
impl std::fmt::Debug for Client {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "etcd::Client primary_lease={}", self.primary_lease)
}
}
impl Client {
pub fn builder() -> ClientOptionsBuilder {
ClientOptionsBuilder::default()
}
pub async fn new(config: ClientOptions, runtime: Runtime) -> Result<Self> {
let token = runtime.primary_token();
let ((connector, lease_id), rt) = build_in_runtime(
async move {
let etcd_urls = config.etcd_url.clone();
let connect_options = config.etcd_connect_options.clone();
let connector = Connector::new(etcd_urls, connect_options)
.await
.with_context(|| {
format!(
"Unable to connect to etcd server at {}. Check etcd server status",
config.etcd_url.join(", ")
)
})?;
let lease_id = if config.attach_lease {
create_lease(connector.clone(), 10, token)
.await
.with_context(|| {
format!(
"Unable to create lease. Check etcd server status at {}",
config.etcd_url.join(", ")
)
})?
} else {
0
};
Ok((connector, lease_id))
},
1,
)
.await?;
Ok(Client {
connector,
primary_lease: lease_id,
rt,
runtime,
})
}
pub fn etcd_client(&self) -> etcd_client::Client {
self.connector.get_client()
}
pub fn lease_id(&self) -> u64 {
self.primary_lease
}
pub async fn kv_create(&self, key: &str, value: Vec<u8>, lease_id: Option<u64>) -> Result<()> {
let id = lease_id.unwrap_or(self.lease_id());
let put_options = PutOptions::new().with_lease(id as i64);
let txn = Txn::new()
.when(vec![Compare::version(key, CompareOp::Equal, 0)]) .and_then(vec![
TxnOp::put(key, value, Some(put_options)), ]);
let result = self.connector.get_client().kv_client().txn(txn).await?;
if result.succeeded() {
Ok(())
} else {
for resp in result.op_responses() {
tracing::warn!(response = ?resp, "kv_create etcd op response");
}
anyhow::bail!("Unable to create key. Check etcd server status")
}
}
pub async fn kv_create_or_validate(
&self,
key: String,
value: Vec<u8>,
lease_id: Option<u64>,
) -> Result<()> {
let id = lease_id.unwrap_or(self.lease_id());
let put_options = PutOptions::new().with_lease(id as i64);
let txn = Txn::new()
.when(vec![Compare::version(key.as_str(), CompareOp::Equal, 0)]) .and_then(vec![
TxnOp::put(key.as_str(), value.clone(), Some(put_options)), ])
.or_else(vec![
TxnOp::txn(Txn::new().when(vec![Compare::value(
key.as_str(),
CompareOp::Equal,
value.clone(),
)])),
]);
let result = self.connector.get_client().kv_client().txn(txn).await?;
if result.succeeded() {
Ok(())
} else {
match result.op_responses().first() {
Some(response) => match response {
TxnOpResponse::Txn(response) => match response.succeeded() {
true => Ok(()),
false => anyhow::bail!(
"Unable to create or validate key. Check etcd server status"
),
},
_ => {
anyhow::bail!("Unable to validate key operation. Check etcd server status")
}
},
None => anyhow::bail!("Unable to create or validate key. Check etcd server status"),
}
}
}
pub async fn kv_put(
&self,
key: impl AsRef<str>,
value: impl AsRef<[u8]>,
lease_id: Option<u64>,
) -> Result<()> {
let id = lease_id.unwrap_or(self.lease_id());
let put_options = PutOptions::new().with_lease(id as i64);
let _ = self
.connector
.get_client()
.kv_client()
.put(key.as_ref(), value.as_ref(), Some(put_options))
.await?;
Ok(())
}
pub async fn kv_put_with_options(
&self,
key: impl AsRef<str>,
value: impl AsRef<[u8]>,
options: Option<PutOptions>,
) -> Result<PutResponse> {
let options = options
.unwrap_or_default()
.with_lease(self.lease_id() as i64);
self.connector
.get_client()
.kv_client()
.put(key.as_ref(), value.as_ref(), Some(options))
.await
.map_err(|err| err.into())
}
pub async fn kv_get(
&self,
key: impl Into<Vec<u8>>,
options: Option<GetOptions>,
) -> Result<Vec<KeyValue>> {
let mut get_response = self
.connector
.get_client()
.kv_client()
.get(key, options)
.await?;
Ok(get_response.take_kvs())
}
pub async fn kv_delete(
&self,
key: impl Into<Vec<u8>>,
options: Option<DeleteOptions>,
) -> Result<u64> {
self.connector
.get_client()
.kv_client()
.delete(key, options)
.await
.map(|del_response| del_response.deleted() as u64)
.map_err(|err| err.into())
}
pub async fn kv_get_prefix(&self, prefix: impl AsRef<str>) -> Result<Vec<KeyValue>> {
let mut get_response = self
.connector
.get_client()
.kv_client()
.get(prefix.as_ref(), Some(GetOptions::new().with_prefix()))
.await?;
Ok(get_response.take_kvs())
}
pub async fn lock(
&self,
key: impl Into<Vec<u8>>,
lease_id: Option<u64>,
) -> Result<LockResponse> {
let mut lock_client = self.connector.get_client().lock_client();
let id = lease_id.unwrap_or(self.lease_id());
let options = LockOptions::new().with_lease(id as i64);
lock_client
.lock(key, Some(options))
.await
.map_err(|err| err.into())
}
pub async fn unlock(&self, lock_key: impl Into<Vec<u8>>) -> Result<()> {
let mut lock_client = self.connector.get_client().lock_client();
lock_client
.unlock(lock_key)
.await
.map_err(|err: etcd_client::Error| anyhow::anyhow!(err))?;
Ok(())
}
pub async fn kv_watch_prefix(
&self,
prefix: impl AsRef<str> + std::fmt::Display,
) -> Result<PrefixWatcher> {
self.watch_internal(prefix, false).await
}
pub async fn kv_get_and_watch_prefix(
&self,
prefix: impl AsRef<str> + std::fmt::Display,
) -> Result<PrefixWatcher> {
self.watch_internal(prefix, true).await
}
async fn watch_internal(
&self,
prefix: impl AsRef<str> + std::fmt::Display,
include_existing: bool,
) -> Result<PrefixWatcher> {
let (tx, rx) = mpsc::channel(32);
let mut start_revision = self
.get_start_revision(
prefix.as_ref(),
if include_existing { Some(&tx) } else { None },
)
.await?;
let connector = self.connector.clone();
let prefix_str = prefix.as_ref().to_string();
self.rt.spawn(async move {
let mut reconnect = true;
while reconnect {
let watch_stream =
match Self::new_watch_stream(&connector, &prefix_str, start_revision).await {
Ok(stream) => stream,
Err(_) => return,
};
reconnect =
Self::monitor_watch_stream(watch_stream, &prefix_str, &mut start_revision, &tx)
.await;
}
});
Ok(PrefixWatcher {
prefix: prefix.as_ref().to_string(),
rx,
})
}
async fn get_start_revision(
&self,
prefix: impl AsRef<str> + std::fmt::Display,
existing_kvs_tx: Option<&mpsc::Sender<WatchEvent>>,
) -> Result<i64> {
let mut kv_client = self.connector.get_client().kv_client();
let mut get_response = kv_client
.get(prefix.as_ref(), Some(GetOptions::new().with_prefix()))
.await?;
let mut start_revision = get_response
.header()
.ok_or(anyhow::anyhow!("missing header; unable to get revision"))?
.revision();
tracing::trace!("{prefix}: start_revision: {start_revision}");
start_revision += 1;
if let Some(tx) = existing_kvs_tx {
let kvs = get_response.take_kvs();
tracing::trace!("initial kv count: {:?}", kvs.len());
for kv in kvs.into_iter() {
tx.send(WatchEvent::Put(kv)).await?;
}
}
Ok(start_revision)
}
async fn new_watch_stream(
connector: &Arc<Connector>,
prefix: &String,
start_revision: i64,
) -> Result<WatchStream> {
loop {
match connector
.get_client()
.watch_client()
.watch(
prefix.as_str(),
Some(
WatchOptions::new()
.with_prefix()
.with_start_revision(start_revision)
.with_prev_key(),
),
)
.await
{
Ok((_, watch_stream)) => {
tracing::debug!("Watch stream established for prefix '{}'", prefix);
return Ok(watch_stream);
}
Err(err) => {
tracing::debug!(error = %err, "Failed to establish watch stream for prefix '{}'", prefix);
let deadline = std::time::Instant::now() + Duration::from_secs(10);
if let Err(err) = connector.reconnect(deadline).await {
tracing::error!(
"Failed to reconnect to ETCD within 10 secs for watching prefix '{}': {}",
prefix,
err
);
return Err(err);
}
}
}
}
}
async fn monitor_watch_stream(
mut watch_stream: WatchStream,
prefix: &String,
start_revision: &mut i64,
tx: &mpsc::Sender<WatchEvent>,
) -> bool {
loop {
tokio::select! {
maybe_resp = watch_stream.next() => {
let response = match maybe_resp {
Some(Ok(res)) => res,
Some(Err(err)) => {
tracing::warn!(error = %err, "Error watching stream for prefix '{}'", prefix);
return true; }
None => {
tracing::warn!("Watch stream unexpectedly closed for prefix '{}'", prefix);
return true; }
};
*start_revision = match response.header() {
Some(header) => header.revision() + 1,
None => {
tracing::error!("Missing header in watch response for prefix '{}'", prefix);
return false;
}
};
if Self::process_watch_events(response.events(), tx).await.is_err() {
return false;
};
}
_ = tx.closed() => {
tracing::debug!("no more receivers, stopping watcher");
return false;
}
}
}
}
async fn process_watch_events(
events: &[etcd_client::Event],
tx: &mpsc::Sender<WatchEvent>,
) -> Result<()> {
for event in events {
let Some(kv) = event.kv() else {
continue; };
match event.event_type() {
etcd_client::EventType::Put => {
if let Err(err) = tx.send(WatchEvent::Put(kv.clone())).await {
tracing::error!("kv watcher error forwarding WatchEvent::Put: {err}");
return Err(err.into());
}
}
etcd_client::EventType::Delete => {
if tx.send(WatchEvent::Delete(kv.clone())).await.is_err() {
return Err(anyhow::anyhow!("failed to send WatchEvent::Delete"));
}
}
}
}
Ok(())
}
}
#[derive(Dissolve)]
pub struct PrefixWatcher {
prefix: String,
rx: mpsc::Receiver<WatchEvent>,
}
#[derive(Debug)]
pub enum WatchEvent {
Put(KeyValue),
Delete(KeyValue),
}
#[derive(Debug, Clone, Builder, Validate)]
pub struct ClientOptions {
#[validate(length(min = 1))]
pub etcd_url: Vec<String>,
#[builder(default)]
pub etcd_connect_options: Option<ConnectOptions>,
#[builder(default = "true")]
pub attach_lease: bool,
}
impl Default for ClientOptions {
fn default() -> Self {
let mut connect_options = None;
if let (Ok(username), Ok(password)) = (
std::env::var("ETCD_AUTH_USERNAME"),
std::env::var("ETCD_AUTH_PASSWORD"),
) {
connect_options = Some(ConnectOptions::new().with_user(username, password));
} else if let (Ok(ca), Ok(cert), Ok(key)) = (
std::env::var("ETCD_AUTH_CA"),
std::env::var("ETCD_AUTH_CLIENT_CERT"),
std::env::var("ETCD_AUTH_CLIENT_KEY"),
) {
connect_options = Some(
ConnectOptions::new().with_tls(
TlsOptions::new()
.ca_certificate(Certificate::from_pem(ca))
.identity(Identity::from_pem(cert, key)),
),
);
}
ClientOptions {
etcd_url: default_servers(),
etcd_connect_options: connect_options,
attach_lease: true,
}
}
}
fn default_servers() -> Vec<String> {
match std::env::var("ETCD_ENDPOINTS") {
Ok(possible_list_of_urls) => possible_list_of_urls
.split(',')
.map(|s| s.to_string())
.collect(),
Err(_) => vec!["http://localhost:2379".to_string()],
}
}
pub struct KvCache {
client: Client,
pub prefix: String,
cache: Arc<RwLock<HashMap<String, Vec<u8>>>>,
watcher: Option<PrefixWatcher>,
}
impl KvCache {
pub async fn new(
client: Client,
prefix: String,
initial_values: HashMap<String, Vec<u8>>,
) -> Result<Self> {
let mut cache = HashMap::new();
let existing_kvs = client.kv_get_prefix(&prefix).await?;
for kv in existing_kvs {
let key = String::from_utf8_lossy(kv.key()).to_string();
cache.insert(key, kv.value().to_vec());
}
for (key, value) in initial_values.iter() {
let full_key = format!("{}{}", prefix, key);
if let std::collections::hash_map::Entry::Vacant(e) = cache.entry(full_key.clone()) {
client.kv_put(&full_key, value.clone(), None).await?;
e.insert(value.clone());
}
}
let watcher = client.kv_get_and_watch_prefix(&prefix).await?;
let cache = Arc::new(RwLock::new(cache));
let mut result = Self {
client,
prefix,
cache,
watcher: Some(watcher),
};
result.start_watcher().await?;
Ok(result)
}
async fn start_watcher(&mut self) -> Result<()> {
if let Some(watcher) = self.watcher.take() {
let cache = self.cache.clone();
let prefix = self.prefix.clone();
tokio::spawn(async move {
let mut rx = watcher.rx;
while let Some(event) = rx.recv().await {
match event {
WatchEvent::Put(kv) => {
let key = String::from_utf8_lossy(kv.key()).to_string();
let value = kv.value().to_vec();
tracing::trace!("KvCache update: {} = {:?}", key, value);
let mut cache_write = cache.write().await;
cache_write.insert(key, value);
}
WatchEvent::Delete(kv) => {
let key = String::from_utf8_lossy(kv.key()).to_string();
tracing::trace!("KvCache delete: {}", key);
let mut cache_write = cache.write().await;
cache_write.remove(&key);
}
}
}
tracing::debug!("KvCache watcher for prefix '{}' stopped", prefix);
});
}
Ok(())
}
pub async fn get(&self, key: &str) -> Option<Vec<u8>> {
let full_key = format!("{}{}", self.prefix, key);
let cache_read = self.cache.read().await;
cache_read.get(&full_key).cloned()
}
pub async fn get_all(&self) -> HashMap<String, Vec<u8>> {
let cache_read = self.cache.read().await;
cache_read.clone()
}
pub async fn put(&self, key: &str, value: Vec<u8>, lease_id: Option<u64>) -> Result<()> {
let full_key = format!("{}{}", self.prefix, key);
self.client
.kv_put(&full_key, value.clone(), lease_id)
.await?;
let mut cache_write = self.cache.write().await;
cache_write.insert(full_key, value);
Ok(())
}
pub async fn delete(&self, key: &str) -> Result<()> {
let full_key = format!("{}{}", self.prefix, key);
self.client.kv_delete(full_key.clone(), None).await?;
let mut cache_write = self.cache.write().await;
cache_write.remove(&full_key);
Ok(())
}
}
#[cfg(feature = "integration")]
#[cfg(test)]
mod tests {
use crate::{DistributedRuntime, distributed::DistributedConfig};
use super::*;
#[test]
fn test_ectd_client() {
let rt = Runtime::from_settings().unwrap();
let rt_clone = rt.clone();
let config = DistributedConfig::from_settings(false);
rt_clone.primary().block_on(async move {
let drt = DistributedRuntime::new(rt, config).await.unwrap();
test_kv_create_or_validate(drt).await.unwrap();
});
}
async fn test_kv_create_or_validate(drt: DistributedRuntime) -> Result<()> {
let key = "__integration_test_key";
let value = b"test_value";
let client = drt.etcd_client().expect("etcd client should be available");
let lease_id = drt.connection_id();
let result = client.kv_create(key, value.to_vec(), Some(lease_id)).await;
assert!(result.is_ok(), "");
let result = client.kv_create(key, value.to_vec(), Some(lease_id)).await;
assert!(result.is_err());
let result = client
.kv_create_or_validate(key.to_string(), value.to_vec(), Some(lease_id))
.await;
assert!(result.is_ok());
let different_value = b"different_value";
let result = client
.kv_create_or_validate(key.to_string(), different_value.to_vec(), Some(lease_id))
.await;
assert!(result.is_err(), "");
Ok(())
}
#[test]
fn test_kv_cache() {
let rt = Runtime::from_settings().unwrap();
let rt_clone = rt.clone();
let config = DistributedConfig::from_settings(false);
rt_clone.primary().block_on(async move {
let drt = DistributedRuntime::new(rt, config).await.unwrap();
test_kv_cache_operations(drt).await.unwrap();
});
}
async fn test_kv_cache_operations(drt: DistributedRuntime) -> Result<()> {
let client = drt.etcd_client().expect("etcd client should be available");
let test_id = uuid::Uuid::new_v4().to_string();
let prefix = format!("v1/test_kv_cache_{}/", test_id);
let mut initial_values = HashMap::new();
initial_values.insert("key1".to_string(), b"value1".to_vec());
initial_values.insert("key2".to_string(), b"value2".to_vec());
let kv_cache = KvCache::new(client.clone(), prefix.clone(), initial_values).await?;
let value1 = kv_cache.get("key1").await;
assert_eq!(value1, Some(b"value1".to_vec()));
let value2 = kv_cache.get("key2").await;
assert_eq!(value2, Some(b"value2".to_vec()));
let all_values = kv_cache.get_all().await;
assert_eq!(all_values.len(), 2);
assert_eq!(
all_values.get(&format!("{}key1", prefix)),
Some(&b"value1".to_vec())
);
assert_eq!(
all_values.get(&format!("{}key2", prefix)),
Some(&b"value2".to_vec())
);
kv_cache.put("key3", b"value3".to_vec(), None).await?;
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
let value3 = kv_cache.get("key3").await;
assert_eq!(value3, Some(b"value3".to_vec()));
kv_cache
.put("key1", b"updated_value1".to_vec(), None)
.await?;
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
let updated_value1 = kv_cache.get("key1").await;
assert_eq!(updated_value1, Some(b"updated_value1".to_vec()));
client
.kv_put(
&format!("{}key2", prefix),
b"external_update".to_vec(),
None,
)
.await?;
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
let external_update = kv_cache.get("key2").await;
assert_eq!(external_update, Some(b"external_update".to_vec()));
let etcd_client = client.etcd_client();
let _ = etcd_client
.kv_client()
.delete(
prefix,
Some(etcd_client::DeleteOptions::new().with_prefix()),
)
.await?;
Ok(())
}
}