dynamo_runtime/transports/
etcd.rs

1// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::runtime::Runtime;
5use anyhow::{Context, Result};
6
7use async_nats::jetstream::kv;
8use derive_builder::Builder;
9use derive_getters::Dissolve;
10use futures::StreamExt;
11use std::collections::HashMap;
12use std::sync::Arc;
13use tokio::sync::{RwLock, mpsc};
14use validator::Validate;
15
16use etcd_client::{
17    Certificate, Compare, CompareOp, DeleteOptions, GetOptions, Identity, LockClient, LockOptions,
18    LockResponse, PutOptions, PutResponse, TlsOptions, Txn, TxnOp, TxnOpResponse, WatchOptions,
19    WatchStream, Watcher,
20};
21pub use etcd_client::{ConnectOptions, KeyValue, LeaseClient};
22use tokio::time::{Duration, interval};
23use tokio_util::sync::CancellationToken;
24
25mod connector;
26mod lease;
27mod lock;
28mod path;
29
30use connector::Connector;
31use lease::*;
32pub use lock::*;
33pub use path::*;
34
35use super::utils::build_in_runtime;
36
37/// ETCD Client
38#[derive(Clone)]
39pub struct Client {
40    connector: Arc<Connector>,
41    primary_lease: u64,
42    runtime: Runtime,
43    // Exclusive runtime for etcd lease keep-alive and watch tasks
44    // Avoid those tasks from being starved when the main runtime is busy
45    // WARNING: Do not await on main runtime from this runtime or deadlocks may occur
46    rt: Arc<tokio::runtime::Runtime>,
47}
48
49impl std::fmt::Debug for Client {
50    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
51        write!(f, "etcd::Client primary_lease={}", self.primary_lease)
52    }
53}
54
55impl Client {
56    pub fn builder() -> ClientOptionsBuilder {
57        ClientOptionsBuilder::default()
58    }
59
60    /// Create a new discovery client
61    ///
62    /// This will establish a connection to the etcd server, create a primary lease,
63    /// and spawn a task to keep the lease alive and tie the lifetime of the [`Runtime`]
64    /// to the lease.
65    ///
66    /// If the lease expires, the [`Runtime`] will be shutdown.
67    /// If the [`Runtime`] is shutdown, the lease will be revoked.
68    pub async fn new(config: ClientOptions, runtime: Runtime) -> Result<Self> {
69        let token = runtime.primary_token();
70
71        let ((connector, lease_id), rt) = build_in_runtime(
72            async move {
73                let etcd_urls = config.etcd_url.clone();
74                let connect_options = config.etcd_connect_options.clone();
75
76                // Create the connector
77                let connector = Connector::new(etcd_urls, connect_options)
78                    .await
79                    .with_context(|| {
80                        format!(
81                            "Unable to connect to etcd server at {}. Check etcd server status",
82                            config.etcd_url.join(", ")
83                        )
84                    })?;
85
86                let lease_id = if config.attach_lease {
87                    create_lease(connector.clone(), 10, token)
88                        .await
89                        .with_context(|| {
90                            format!(
91                                "Unable to create lease. Check etcd server status at {}",
92                                config.etcd_url.join(", ")
93                            )
94                        })?
95                } else {
96                    0
97                };
98
99                Ok((connector, lease_id))
100            },
101            1,
102        )
103        .await?;
104
105        Ok(Client {
106            connector,
107            primary_lease: lease_id,
108            rt,
109            runtime,
110        })
111    }
112
113    /// Get a clone of the underlying [`etcd_client::Client`] instance.
114    /// This returns a clone since the client is behind an RwLock.
115    pub fn etcd_client(&self) -> etcd_client::Client {
116        self.connector.get_client()
117    }
118
119    /// Get the primary lease ID.
120    pub fn lease_id(&self) -> u64 {
121        self.primary_lease
122    }
123
124    pub async fn kv_create(&self, key: &str, value: Vec<u8>, lease_id: Option<u64>) -> Result<()> {
125        let id = lease_id.unwrap_or(self.lease_id());
126        let put_options = PutOptions::new().with_lease(id as i64);
127
128        // Build the transaction
129        let txn = Txn::new()
130            .when(vec![Compare::version(key, CompareOp::Equal, 0)]) // Ensure the lock does not exist
131            .and_then(vec![
132                TxnOp::put(key, value, Some(put_options)), // Create the object
133            ]);
134
135        // Execute the transaction
136        let result = self.connector.get_client().kv_client().txn(txn).await?;
137
138        if result.succeeded() {
139            Ok(())
140        } else {
141            for resp in result.op_responses() {
142                tracing::warn!(response = ?resp, "kv_create etcd op response");
143            }
144            anyhow::bail!("Unable to create key. Check etcd server status")
145        }
146    }
147
148    /// Atomically create a key if it does not exist, or validate the values are identical if the key exists.
149    pub async fn kv_create_or_validate(
150        &self,
151        key: String,
152        value: Vec<u8>,
153        lease_id: Option<u64>,
154    ) -> Result<()> {
155        let id = lease_id.unwrap_or(self.lease_id());
156        let put_options = PutOptions::new().with_lease(id as i64);
157
158        // Build the transaction that either creates the key if it doesn't exist,
159        // or validates the existing value matches what we expect
160        let txn = Txn::new()
161            .when(vec![Compare::version(key.as_str(), CompareOp::Equal, 0)]) // Key doesn't exist
162            .and_then(vec![
163                TxnOp::put(key.as_str(), value.clone(), Some(put_options)), // Create it
164            ])
165            .or_else(vec![
166                // If key exists but values don't match, this will fail the transaction
167                TxnOp::txn(Txn::new().when(vec![Compare::value(
168                    key.as_str(),
169                    CompareOp::Equal,
170                    value.clone(),
171                )])),
172            ]);
173
174        // Execute the transaction
175        let result = self.connector.get_client().kv_client().txn(txn).await?;
176
177        // We have to enumerate the response paths to determine if the transaction succeeded
178        if result.succeeded() {
179            Ok(())
180        } else {
181            match result.op_responses().first() {
182                Some(response) => match response {
183                    TxnOpResponse::Txn(response) => match response.succeeded() {
184                        true => Ok(()),
185                        false => anyhow::bail!(
186                            "Unable to create or validate key. Check etcd server status"
187                        ),
188                    },
189                    _ => {
190                        anyhow::bail!("Unable to validate key operation. Check etcd server status")
191                    }
192                },
193                None => anyhow::bail!("Unable to create or validate key. Check etcd server status"),
194            }
195        }
196    }
197
198    pub async fn kv_put(
199        &self,
200        key: impl AsRef<str>,
201        value: impl AsRef<[u8]>,
202        lease_id: Option<u64>,
203    ) -> Result<()> {
204        let id = lease_id.unwrap_or(self.lease_id());
205        let put_options = PutOptions::new().with_lease(id as i64);
206        let _ = self
207            .connector
208            .get_client()
209            .kv_client()
210            .put(key.as_ref(), value.as_ref(), Some(put_options))
211            .await?;
212        Ok(())
213    }
214
215    pub async fn kv_put_with_options(
216        &self,
217        key: impl AsRef<str>,
218        value: impl AsRef<[u8]>,
219        options: Option<PutOptions>,
220    ) -> Result<PutResponse> {
221        let options = options
222            .unwrap_or_default()
223            .with_lease(self.lease_id() as i64);
224        self.connector
225            .get_client()
226            .kv_client()
227            .put(key.as_ref(), value.as_ref(), Some(options))
228            .await
229            .map_err(|err| err.into())
230    }
231
232    pub async fn kv_get(
233        &self,
234        key: impl Into<Vec<u8>>,
235        options: Option<GetOptions>,
236    ) -> Result<Vec<KeyValue>> {
237        let mut get_response = self
238            .connector
239            .get_client()
240            .kv_client()
241            .get(key, options)
242            .await?;
243        Ok(get_response.take_kvs())
244    }
245
246    pub async fn kv_delete(
247        &self,
248        key: impl Into<Vec<u8>>,
249        options: Option<DeleteOptions>,
250    ) -> Result<u64> {
251        self.connector
252            .get_client()
253            .kv_client()
254            .delete(key, options)
255            .await
256            .map(|del_response| del_response.deleted() as u64)
257            .map_err(|err| err.into())
258    }
259
260    pub async fn kv_get_prefix(&self, prefix: impl AsRef<str>) -> Result<Vec<KeyValue>> {
261        let mut get_response = self
262            .connector
263            .get_client()
264            .kv_client()
265            .get(prefix.as_ref(), Some(GetOptions::new().with_prefix()))
266            .await?;
267
268        Ok(get_response.take_kvs())
269    }
270
271    /// Acquire a distributed lock using etcd's native lock mechanism
272    /// Returns a LockResponse that can be used to unlock later
273    pub async fn lock(
274        &self,
275        key: impl Into<Vec<u8>>,
276        lease_id: Option<u64>,
277    ) -> Result<LockResponse> {
278        let mut lock_client = self.connector.get_client().lock_client();
279        let id = lease_id.unwrap_or(self.lease_id());
280        let options = LockOptions::new().with_lease(id as i64);
281        lock_client
282            .lock(key, Some(options))
283            .await
284            .map_err(|err| err.into())
285    }
286
287    /// Release a distributed lock using the key from the LockResponse
288    pub async fn unlock(&self, lock_key: impl Into<Vec<u8>>) -> Result<()> {
289        let mut lock_client = self.connector.get_client().lock_client();
290        lock_client
291            .unlock(lock_key)
292            .await
293            .map_err(|err: etcd_client::Error| anyhow::anyhow!(err))?;
294        Ok(())
295    }
296
297    /// Like kv_get_and_watch_prefix but only for new changes, does not include existing values.
298    pub async fn kv_watch_prefix(
299        &self,
300        prefix: impl AsRef<str> + std::fmt::Display,
301    ) -> Result<PrefixWatcher> {
302        self.watch_internal(prefix, false).await
303    }
304
305    pub async fn kv_get_and_watch_prefix(
306        &self,
307        prefix: impl AsRef<str> + std::fmt::Display,
308    ) -> Result<PrefixWatcher> {
309        self.watch_internal(prefix, true).await
310    }
311
312    /// Core watch implementation that sets up a resilient watcher for a key prefix.
313    ///
314    /// Creates a background task that maintains a watch stream with automatic reconnection
315    /// on recoverable errors. If `include_existing` is true, existing keys are included
316    /// in the initial watch events.
317    async fn watch_internal(
318        &self,
319        prefix: impl AsRef<str> + std::fmt::Display,
320        include_existing: bool,
321    ) -> Result<PrefixWatcher> {
322        let (tx, rx) = mpsc::channel(32);
323
324        // Get start revision and send existing KVs
325        let mut start_revision = self
326            .get_start_revision(
327                prefix.as_ref(),
328                if include_existing { Some(&tx) } else { None },
329            )
330            .await?;
331
332        // Resilience watch stream in background
333        let connector = self.connector.clone();
334        let prefix_str = prefix.as_ref().to_string();
335        self.rt.spawn(async move {
336            let mut reconnect = true;
337            while reconnect {
338                // Start a new watch stream
339                let watch_stream =
340                    match Self::new_watch_stream(&connector, &prefix_str, start_revision).await {
341                        Ok(stream) => stream,
342                        Err(_) => return,
343                    };
344
345                // Watch the stream
346                reconnect =
347                    Self::monitor_watch_stream(watch_stream, &prefix_str, &mut start_revision, &tx)
348                        .await;
349            }
350        });
351
352        Ok(PrefixWatcher {
353            prefix: prefix.as_ref().to_string(),
354            rx,
355        })
356    }
357
358    /// Fetch the initial revision for watching and optionally send existing key-values.
359    ///
360    /// Returns the next revision to watch from. If `existing_kvs_tx` is provided,
361    /// all existing keys with the prefix are sent through the channel first.
362    async fn get_start_revision(
363        &self,
364        prefix: impl AsRef<str> + std::fmt::Display,
365        existing_kvs_tx: Option<&mpsc::Sender<WatchEvent>>,
366    ) -> Result<i64> {
367        let mut kv_client = self.connector.get_client().kv_client();
368        let mut get_response = kv_client
369            .get(prefix.as_ref(), Some(GetOptions::new().with_prefix()))
370            .await?;
371
372        // Get the start revision
373        let mut start_revision = get_response
374            .header()
375            .ok_or(anyhow::anyhow!("missing header; unable to get revision"))?
376            .revision();
377        tracing::trace!("{prefix}: start_revision: {start_revision}");
378        start_revision += 1;
379
380        // Send existing KVs from response if requested
381        if let Some(tx) = existing_kvs_tx {
382            let kvs = get_response.take_kvs();
383            tracing::trace!("initial kv count: {:?}", kvs.len());
384            for kv in kvs.into_iter() {
385                tx.send(WatchEvent::Put(kv)).await?;
386            }
387        }
388
389        Ok(start_revision)
390    }
391
392    /// Establish a new watch stream with automatic retry and reconnection.
393    ///
394    /// Attempts to create a watch stream, reconnecting to ETCD if necessary.
395    /// Uses a 10-second timeout for reconnection attempts before giving up.
396    async fn new_watch_stream(
397        connector: &Arc<Connector>,
398        prefix: &String,
399        start_revision: i64,
400    ) -> Result<WatchStream> {
401        loop {
402            match connector
403                .get_client()
404                .watch_client()
405                .watch(
406                    prefix.as_str(),
407                    Some(
408                        WatchOptions::new()
409                            .with_prefix()
410                            .with_start_revision(start_revision)
411                            .with_prev_key(),
412                    ),
413                )
414                .await
415            {
416                Ok((_, watch_stream)) => {
417                    tracing::debug!("Watch stream established for prefix '{}'", prefix);
418                    return Ok(watch_stream);
419                }
420                Err(err) => {
421                    tracing::debug!(error = %err, "Failed to establish watch stream for prefix '{}'", prefix);
422                    let deadline = std::time::Instant::now() + Duration::from_secs(10);
423                    if let Err(err) = connector.reconnect(deadline).await {
424                        tracing::error!(
425                            "Failed to reconnect to ETCD within 10 secs for watching prefix '{}': {}",
426                            prefix,
427                            err
428                        );
429                        return Err(err);
430                    }
431                    // continue - retry establishing the watch stream
432                }
433            }
434        }
435    }
436
437    /// Monitor a watch stream and forward events to receivers.
438    ///
439    /// Returns `true` for recoverable errors (network issues, stream closure) that warrant
440    /// reconnection attempts. Returns `false` for permanent failures (protocol violations,
441    /// channel errors, no receivers) where watching should stop.
442    async fn monitor_watch_stream(
443        mut watch_stream: WatchStream,
444        prefix: &String,
445        start_revision: &mut i64,
446        tx: &mpsc::Sender<WatchEvent>,
447    ) -> bool {
448        loop {
449            tokio::select! {
450                maybe_resp = watch_stream.next() => {
451                    // Handle the watch response
452                    let response = match maybe_resp {
453                        Some(Ok(res)) => res,
454                        Some(Err(err)) => {
455                            tracing::warn!(error = %err, "Error watching stream for prefix '{}'", prefix);
456                            return true; // Exit to reconnect
457                        }
458                        None => {
459                            tracing::warn!("Watch stream unexpectedly closed for prefix '{}'", prefix);
460                            return true; // Exit to reconnect
461                        }
462                    };
463
464                    // Update revision for reconnect
465                    *start_revision = match response.header() {
466                        Some(header) => header.revision() + 1,
467                        None => {
468                            tracing::error!("Missing header in watch response for prefix '{}'", prefix);
469                            return false;
470                        }
471                    };
472
473                    // Process events
474                    if Self::process_watch_events(response.events(), tx).await.is_err() {
475                        return false;
476                    };
477                }
478                _ = tx.closed() => {
479                    tracing::debug!("no more receivers, stopping watcher");
480                    return false;
481                }
482            }
483        }
484    }
485
486    /// Process etcd events and forward them as Put/Delete watch events.
487    ///
488    /// Filters out events without key-values and transforms etcd events into
489    /// appropriate WatchEvent types for channel transmission.
490    async fn process_watch_events(
491        events: &[etcd_client::Event],
492        tx: &mpsc::Sender<WatchEvent>,
493    ) -> Result<()> {
494        for event in events {
495            // Extract the KeyValue if it exists
496            let Some(kv) = event.kv() else {
497                continue; // Skip events with no KV
498            };
499
500            // Handle based on event type
501            match event.event_type() {
502                etcd_client::EventType::Put => {
503                    if let Err(err) = tx.send(WatchEvent::Put(kv.clone())).await {
504                        tracing::error!("kv watcher error forwarding WatchEvent::Put: {err}");
505                        return Err(err.into());
506                    }
507                }
508                etcd_client::EventType::Delete => {
509                    if tx.send(WatchEvent::Delete(kv.clone())).await.is_err() {
510                        return Err(anyhow::anyhow!("failed to send WatchEvent::Delete"));
511                    }
512                }
513            }
514        }
515        Ok(())
516    }
517}
518
519#[derive(Dissolve)]
520pub struct PrefixWatcher {
521    prefix: String,
522    rx: mpsc::Receiver<WatchEvent>,
523}
524
525#[derive(Debug)]
526pub enum WatchEvent {
527    Put(KeyValue),
528    Delete(KeyValue),
529}
530
531/// ETCD client configuration options
532#[derive(Debug, Clone, Builder, Validate)]
533pub struct ClientOptions {
534    #[validate(length(min = 1))]
535    pub etcd_url: Vec<String>,
536
537    #[builder(default)]
538    pub etcd_connect_options: Option<ConnectOptions>,
539
540    /// If true, the client will attach a lease to the primary [`CancellationToken`].
541    #[builder(default = "true")]
542    pub attach_lease: bool,
543}
544
545impl Default for ClientOptions {
546    fn default() -> Self {
547        let mut connect_options = None;
548
549        if let (Ok(username), Ok(password)) = (
550            std::env::var("ETCD_AUTH_USERNAME"),
551            std::env::var("ETCD_AUTH_PASSWORD"),
552        ) {
553            // username and password are set
554            connect_options = Some(ConnectOptions::new().with_user(username, password));
555        } else if let (Ok(ca), Ok(cert), Ok(key)) = (
556            std::env::var("ETCD_AUTH_CA"),
557            std::env::var("ETCD_AUTH_CLIENT_CERT"),
558            std::env::var("ETCD_AUTH_CLIENT_KEY"),
559        ) {
560            // TLS is set
561            connect_options = Some(
562                ConnectOptions::new().with_tls(
563                    TlsOptions::new()
564                        .ca_certificate(Certificate::from_pem(ca))
565                        .identity(Identity::from_pem(cert, key)),
566                ),
567            );
568        }
569
570        ClientOptions {
571            etcd_url: default_servers(),
572            etcd_connect_options: connect_options,
573            attach_lease: true,
574        }
575    }
576}
577
578fn default_servers() -> Vec<String> {
579    match std::env::var("ETCD_ENDPOINTS") {
580        Ok(possible_list_of_urls) => possible_list_of_urls
581            .split(',')
582            .map(|s| s.to_string())
583            .collect(),
584        Err(_) => vec!["http://localhost:2379".to_string()],
585    }
586}
587
588/// A cache for etcd key-value pairs that watches for changes
589pub struct KvCache {
590    client: Client,
591    pub prefix: String,
592    cache: Arc<RwLock<HashMap<String, Vec<u8>>>>,
593    watcher: Option<PrefixWatcher>,
594}
595
596impl KvCache {
597    /// Create a new KV cache for the given prefix
598    pub async fn new(
599        client: Client,
600        prefix: String,
601        initial_values: HashMap<String, Vec<u8>>,
602    ) -> Result<Self> {
603        let mut cache = HashMap::new();
604
605        // First get all existing keys with this prefix
606        let existing_kvs = client.kv_get_prefix(&prefix).await?;
607        for kv in existing_kvs {
608            let key = String::from_utf8_lossy(kv.key()).to_string();
609            cache.insert(key, kv.value().to_vec());
610        }
611
612        // For any keys in initial_values that don't exist in etcd, write them
613        // TODO: proper lease handling, this requires the first process that write to a prefix atomically
614        // create a lease and write the lease to etcd. Later processes will attach to the lease and
615        // help refresh the lease.
616        for (key, value) in initial_values.iter() {
617            let full_key = format!("{}{}", prefix, key);
618            if let std::collections::hash_map::Entry::Vacant(e) = cache.entry(full_key.clone()) {
619                client.kv_put(&full_key, value.clone(), None).await?;
620                e.insert(value.clone());
621            }
622        }
623
624        // Start watching for changes
625        // we won't miss events between the initial push and the watcher starting because
626        // client.kv_get_and_watch_prefix() will get all kv pairs and put them back again
627        let watcher = client.kv_get_and_watch_prefix(&prefix).await?;
628
629        let cache = Arc::new(RwLock::new(cache));
630        let mut result = Self {
631            client,
632            prefix,
633            cache,
634            watcher: Some(watcher),
635        };
636
637        // Start the background watcher task
638        result.start_watcher().await?;
639
640        Ok(result)
641    }
642
643    /// Start the background watcher task
644    async fn start_watcher(&mut self) -> Result<()> {
645        if let Some(watcher) = self.watcher.take() {
646            let cache = self.cache.clone();
647            let prefix = self.prefix.clone();
648
649            tokio::spawn(async move {
650                let mut rx = watcher.rx;
651
652                while let Some(event) = rx.recv().await {
653                    match event {
654                        WatchEvent::Put(kv) => {
655                            let key = String::from_utf8_lossy(kv.key()).to_string();
656                            let value = kv.value().to_vec();
657
658                            tracing::trace!("KvCache update: {} = {:?}", key, value);
659                            let mut cache_write = cache.write().await;
660                            cache_write.insert(key, value);
661                        }
662                        WatchEvent::Delete(kv) => {
663                            let key = String::from_utf8_lossy(kv.key()).to_string();
664
665                            tracing::trace!("KvCache delete: {}", key);
666                            let mut cache_write = cache.write().await;
667                            cache_write.remove(&key);
668                        }
669                    }
670                }
671
672                tracing::debug!("KvCache watcher for prefix '{}' stopped", prefix);
673            });
674        }
675
676        Ok(())
677    }
678
679    /// Get a value from the cache
680    pub async fn get(&self, key: &str) -> Option<Vec<u8>> {
681        let full_key = format!("{}{}", self.prefix, key);
682        let cache_read = self.cache.read().await;
683        cache_read.get(&full_key).cloned()
684    }
685
686    /// Get all key-value pairs in the cache
687    pub async fn get_all(&self) -> HashMap<String, Vec<u8>> {
688        let cache_read = self.cache.read().await;
689        cache_read.clone()
690    }
691
692    /// Update a value in both the cache and etcd
693    pub async fn put(&self, key: &str, value: Vec<u8>, lease_id: Option<u64>) -> Result<()> {
694        let full_key = format!("{}{}", self.prefix, key);
695
696        // Update etcd first
697        self.client
698            .kv_put(&full_key, value.clone(), lease_id)
699            .await?;
700
701        // Then update local cache
702        let mut cache_write = self.cache.write().await;
703        cache_write.insert(full_key, value);
704
705        Ok(())
706    }
707
708    /// Delete a key from both the cache and etcd
709    pub async fn delete(&self, key: &str) -> Result<()> {
710        let full_key = format!("{}{}", self.prefix, key);
711
712        // Delete from etcd first
713        self.client.kv_delete(full_key.clone(), None).await?;
714
715        // Then remove from local cache
716        let mut cache_write = self.cache.write().await;
717        cache_write.remove(&full_key);
718
719        Ok(())
720    }
721}
722
723#[cfg(feature = "integration")]
724#[cfg(test)]
725mod tests {
726    use crate::{DistributedRuntime, distributed::DistributedConfig};
727
728    use super::*;
729
730    #[test]
731    fn test_ectd_client() {
732        let rt = Runtime::from_settings().unwrap();
733        let rt_clone = rt.clone();
734        let config = DistributedConfig::from_settings(false);
735
736        rt_clone.primary().block_on(async move {
737            let drt = DistributedRuntime::new(rt, config).await.unwrap();
738            test_kv_create_or_validate(drt).await.unwrap();
739        });
740    }
741
742    async fn test_kv_create_or_validate(drt: DistributedRuntime) -> Result<()> {
743        let key = "__integration_test_key";
744        let value = b"test_value";
745
746        let client = drt.etcd_client().expect("etcd client should be available");
747        let lease_id = drt.connection_id();
748
749        // Create the key
750        let result = client.kv_create(key, value.to_vec(), Some(lease_id)).await;
751        assert!(result.is_ok(), "");
752
753        // Try to create the key again - this should fail
754        let result = client.kv_create(key, value.to_vec(), Some(lease_id)).await;
755        assert!(result.is_err());
756
757        // Create or validate should succeed as the values match
758        let result = client
759            .kv_create_or_validate(key.to_string(), value.to_vec(), Some(lease_id))
760            .await;
761        assert!(result.is_ok());
762
763        // Try to create the key with a different value
764        let different_value = b"different_value";
765        let result = client
766            .kv_create_or_validate(key.to_string(), different_value.to_vec(), Some(lease_id))
767            .await;
768        assert!(result.is_err(), "");
769
770        Ok(())
771    }
772
773    #[test]
774    fn test_kv_cache() {
775        let rt = Runtime::from_settings().unwrap();
776        let rt_clone = rt.clone();
777        let config = DistributedConfig::from_settings(false);
778
779        rt_clone.primary().block_on(async move {
780            let drt = DistributedRuntime::new(rt, config).await.unwrap();
781            test_kv_cache_operations(drt).await.unwrap();
782        });
783    }
784
785    async fn test_kv_cache_operations(drt: DistributedRuntime) -> Result<()> {
786        // Get the client and unwrap it
787        let client = drt.etcd_client().expect("etcd client should be available");
788
789        // Create a unique test prefix to avoid conflicts with other tests
790        let test_id = uuid::Uuid::new_v4().to_string();
791        let prefix = format!("v1/test_kv_cache_{}/", test_id);
792
793        // Initial values
794        let mut initial_values = HashMap::new();
795        initial_values.insert("key1".to_string(), b"value1".to_vec());
796        initial_values.insert("key2".to_string(), b"value2".to_vec());
797
798        // Create the KV cache
799        let kv_cache = KvCache::new(client.clone(), prefix.clone(), initial_values).await?;
800
801        // Test get
802        let value1 = kv_cache.get("key1").await;
803        assert_eq!(value1, Some(b"value1".to_vec()));
804
805        let value2 = kv_cache.get("key2").await;
806        assert_eq!(value2, Some(b"value2".to_vec()));
807
808        // Test get_all
809        let all_values = kv_cache.get_all().await;
810        assert_eq!(all_values.len(), 2);
811        assert_eq!(
812            all_values.get(&format!("{}key1", prefix)),
813            Some(&b"value1".to_vec())
814        );
815        assert_eq!(
816            all_values.get(&format!("{}key2", prefix)),
817            Some(&b"value2".to_vec())
818        );
819
820        // Test put - using None for lease_id
821        kv_cache.put("key3", b"value3".to_vec(), None).await?;
822
823        // Allow some time for the update to propagate
824        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
825
826        // Verify the new value
827        let value3 = kv_cache.get("key3").await;
828        assert_eq!(value3, Some(b"value3".to_vec()));
829
830        // Test update
831        kv_cache
832            .put("key1", b"updated_value1".to_vec(), None)
833            .await?;
834
835        // Allow some time for the update to propagate
836        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
837
838        // Verify the updated value
839        let updated_value1 = kv_cache.get("key1").await;
840        assert_eq!(updated_value1, Some(b"updated_value1".to_vec()));
841
842        // Test external update (simulating another client updating a value)
843        client
844            .kv_put(
845                &format!("{}key2", prefix),
846                b"external_update".to_vec(),
847                None,
848            )
849            .await?;
850
851        // Allow some time for the update to propagate
852        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
853
854        // Verify the cache was updated
855        let external_update = kv_cache.get("key2").await;
856        assert_eq!(external_update, Some(b"external_update".to_vec()));
857
858        // Clean up - delete the test keys
859        let etcd_client = client.etcd_client();
860        let _ = etcd_client
861            .kv_client()
862            .delete(
863                prefix,
864                Some(etcd_client::DeleteOptions::new().with_prefix()),
865            )
866            .await?;
867
868        Ok(())
869    }
870}