dynamo_runtime/transports/
etcd.rs

1// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::{CancellationToken, ErrorContext, Result, Runtime, error};
5
6use async_nats::jetstream::kv;
7use derive_builder::Builder;
8use derive_getters::Dissolve;
9use futures::StreamExt;
10use std::collections::HashMap;
11use std::sync::Arc;
12use tokio::sync::{RwLock, mpsc};
13use validator::Validate;
14
15use etcd_client::{
16    Certificate, Compare, CompareOp, DeleteOptions, GetOptions, Identity, LockClient, LockOptions,
17    LockResponse, PutOptions, PutResponse, TlsOptions, Txn, TxnOp, TxnOpResponse, WatchOptions,
18    Watcher,
19};
20pub use etcd_client::{ConnectOptions, KeyValue, LeaseClient};
21use tokio::time::{Duration, interval};
22
23mod lease;
24mod lock;
25mod path;
26
27use lease::*;
28pub use lock::*;
29pub use path::*;
30
31use super::utils::build_in_runtime;
32
33/// ETCD Client
34#[derive(Clone)]
35pub struct Client {
36    client: etcd_client::Client,
37    primary_lease: u64,
38    runtime: Runtime,
39    rt: Arc<tokio::runtime::Runtime>,
40}
41
42impl std::fmt::Debug for Client {
43    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
44        write!(f, "etcd::Client primary_lease={}", self.primary_lease)
45    }
46}
47
48#[derive(Debug, Clone)]
49pub struct Lease {
50    /// ETCD lease ID
51    /// Delivered as i64 by etcd because of documented gRPC limitations.
52    id: u64,
53
54    /// [`CancellationToken`] associated with the lease
55    cancel_token: CancellationToken,
56}
57
58impl Lease {
59    /// Get the lease ID
60    pub fn id(&self) -> u64 {
61        self.id
62    }
63
64    /// Get the primary [`CancellationToken`] associated with the lease.
65    /// This token will revoke the lease if canceled.
66    pub fn primary_token(&self) -> CancellationToken {
67        self.cancel_token.clone()
68    }
69
70    /// Get a child [`CancellationToken`] from the lease's [`CancellationToken`].
71    /// This child token will be triggered if the lease is revoked, but will not revoke the lease if canceled.
72    pub fn child_token(&self) -> CancellationToken {
73        self.cancel_token.child_token()
74    }
75
76    /// Revoke the lease triggering the [`CancellationToken`].
77    pub fn revoke(&self) {
78        self.cancel_token.cancel();
79    }
80
81    /// Check if the lease is still valid (not revoked)
82    pub async fn is_valid(&self) -> Result<bool> {
83        // A lease is valid if its cancellation token has not been triggered
84        // We can use try_cancelled which returns immediately with a boolean
85        Ok(!self.cancel_token.is_cancelled())
86    }
87}
88
89impl Client {
90    pub fn builder() -> ClientOptionsBuilder {
91        ClientOptionsBuilder::default()
92    }
93
94    /// Create a new discovery client
95    ///
96    /// This will establish a connection to the etcd server, create a primary lease,
97    /// and spawn a task to keep the lease alive and tie the lifetime of the [`Runtime`]
98    /// to the lease.
99    ///
100    /// If the lease expires, the [`Runtime`] will be shutdown.
101    /// If the [`Runtime`] is shutdown, the lease will be revoked.
102    pub async fn new(config: ClientOptions, runtime: Runtime) -> Result<Self> {
103        let token = runtime.primary_token();
104
105        let ((client, lease_id), rt) = build_in_runtime(
106            async move {
107                let client = etcd_client::Client::connect(
108                    config.etcd_url.clone(),
109                    config.etcd_connect_options,
110                )
111                .await
112                .with_context(|| {
113                    format!(
114                        "Unable to connect to etcd server at {}. Check etcd server status",
115                        config.etcd_url.join(", ")
116                    )
117                })?;
118
119                let lease_id = if config.attach_lease {
120                    let lease_client = client.lease_client();
121
122                    let lease = create_lease(lease_client, 10, token)
123                        .await
124                        .with_context(|| {
125                            format!(
126                                "Unable to create lease. Check etcd server status at {}",
127                                config.etcd_url.join(", ")
128                            )
129                        })?;
130
131                    lease.id
132                } else {
133                    0
134                };
135
136                Ok((client, lease_id))
137            },
138            1,
139        )
140        .await?;
141
142        Ok(Client {
143            client,
144            primary_lease: lease_id,
145            rt,
146            runtime,
147        })
148    }
149
150    /// Get a reference to the underlying [`etcd_client::Client`] instance.
151    pub(crate) fn etcd_client(&self) -> &etcd_client::Client {
152        &self.client
153    }
154
155    /// Get the primary lease ID.
156    pub fn lease_id(&self) -> u64 {
157        self.primary_lease
158    }
159
160    /// Primary [`Lease`]
161    pub fn primary_lease(&self) -> Lease {
162        Lease {
163            id: self.primary_lease,
164            cancel_token: self.runtime.primary_token(),
165        }
166    }
167
168    /// Create a [`Lease`] with a given time-to-live (TTL).
169    /// This [`Lease`] will be tied to the [`Runtime`], specifically a child [`CancellationToken`].
170    pub async fn create_lease(&self, ttl: u64) -> Result<Lease> {
171        let token = self.runtime.child_token();
172        let lease_client = self.client.lease_client();
173        self.rt
174            .spawn(create_lease(lease_client, ttl, token))
175            .await?
176    }
177
178    // Revoke an etcd lease given its lease id. A wrapper over etcd_client::LeaseClient::revoke
179    pub async fn revoke_lease(&self, lease_id: u64) -> Result<()> {
180        let lease_client = self.client.lease_client();
181        self.rt.spawn(revoke_lease(lease_client, lease_id)).await?
182    }
183
184    pub async fn kv_create(&self, key: &str, value: Vec<u8>, lease_id: Option<u64>) -> Result<()> {
185        let id = lease_id.unwrap_or(self.lease_id());
186        let put_options = PutOptions::new().with_lease(id as i64);
187
188        // Build the transaction
189        let txn = Txn::new()
190            .when(vec![Compare::version(key, CompareOp::Equal, 0)]) // Ensure the lock does not exist
191            .and_then(vec![
192                TxnOp::put(key, value, Some(put_options)), // Create the object
193            ]);
194
195        // Execute the transaction
196        let result = self.client.kv_client().txn(txn).await?;
197
198        if result.succeeded() {
199            Ok(())
200        } else {
201            for resp in result.op_responses() {
202                tracing::warn!(response = ?resp, "kv_create etcd op response");
203            }
204            Err(error!("Unable to create key. Check etcd server status"))
205        }
206    }
207
208    /// Atomically create a key if it does not exist, or validate the values are identical if the key exists.
209    pub async fn kv_create_or_validate(
210        &self,
211        key: String,
212        value: Vec<u8>,
213        lease_id: Option<u64>,
214    ) -> Result<()> {
215        let id = lease_id.unwrap_or(self.lease_id());
216        let put_options = PutOptions::new().with_lease(id as i64);
217
218        // Build the transaction that either creates the key if it doesn't exist,
219        // or validates the existing value matches what we expect
220        let txn = Txn::new()
221            .when(vec![Compare::version(key.as_str(), CompareOp::Equal, 0)]) // Key doesn't exist
222            .and_then(vec![
223                TxnOp::put(key.as_str(), value.clone(), Some(put_options)), // Create it
224            ])
225            .or_else(vec![
226                // If key exists but values don't match, this will fail the transaction
227                TxnOp::txn(Txn::new().when(vec![Compare::value(
228                    key.as_str(),
229                    CompareOp::Equal,
230                    value.clone(),
231                )])),
232            ]);
233
234        // Execute the transaction
235        let result = self.client.kv_client().txn(txn).await?;
236
237        // We have to enumerate the response paths to determine if the transaction succeeded
238        if result.succeeded() {
239            Ok(())
240        } else {
241            match result.op_responses().first() {
242                Some(response) => match response {
243                    TxnOpResponse::Txn(response) => match response.succeeded() {
244                        true => Ok(()),
245                        false => Err(error!(
246                            "Unable to create or validate key. Check etcd server status"
247                        )),
248                    },
249                    _ => Err(error!(
250                        "Unable to validate key operation. Check etcd server status"
251                    )),
252                },
253                None => Err(error!(
254                    "Unable to create or validate key. Check etcd server status"
255                )),
256            }
257        }
258    }
259
260    pub async fn kv_put(
261        &self,
262        key: impl AsRef<str>,
263        value: impl AsRef<[u8]>,
264        lease_id: Option<u64>,
265    ) -> Result<()> {
266        let id = lease_id.unwrap_or(self.lease_id());
267        let put_options = PutOptions::new().with_lease(id as i64);
268        let _ = self
269            .client
270            .kv_client()
271            .put(key.as_ref(), value.as_ref(), Some(put_options))
272            .await?;
273        Ok(())
274    }
275
276    pub async fn kv_put_with_options(
277        &self,
278        key: impl AsRef<str>,
279        value: impl AsRef<[u8]>,
280        options: Option<PutOptions>,
281    ) -> Result<PutResponse> {
282        let options = options
283            .unwrap_or_default()
284            .with_lease(self.primary_lease().id() as i64);
285        self.client
286            .kv_client()
287            .put(key.as_ref(), value.as_ref(), Some(options))
288            .await
289            .map_err(|err| err.into())
290    }
291
292    pub async fn kv_get(
293        &self,
294        key: impl Into<Vec<u8>>,
295        options: Option<GetOptions>,
296    ) -> Result<Vec<KeyValue>> {
297        let mut get_response = self.client.kv_client().get(key, options).await?;
298        Ok(get_response.take_kvs())
299    }
300
301    pub async fn kv_delete(
302        &self,
303        key: impl Into<Vec<u8>>,
304        options: Option<DeleteOptions>,
305    ) -> Result<u64> {
306        self.client
307            .kv_client()
308            .delete(key, options)
309            .await
310            .map(|del_response| del_response.deleted() as u64)
311            .map_err(|err| err.into())
312    }
313
314    pub async fn kv_get_prefix(&self, prefix: impl AsRef<str>) -> Result<Vec<KeyValue>> {
315        let mut get_response = self
316            .client
317            .kv_client()
318            .get(prefix.as_ref(), Some(GetOptions::new().with_prefix()))
319            .await?;
320
321        Ok(get_response.take_kvs())
322    }
323
324    /// Acquire a distributed lock using etcd's native lock mechanism
325    /// Returns a LockResponse that can be used to unlock later
326    pub async fn lock(
327        &self,
328        key: impl Into<Vec<u8>>,
329        lease_id: Option<u64>,
330    ) -> Result<LockResponse> {
331        let mut lock_client = self.client.lock_client();
332        let id = lease_id.unwrap_or(self.lease_id());
333        let options = LockOptions::new().with_lease(id as i64);
334        lock_client
335            .lock(key, Some(options))
336            .await
337            .map_err(|err| err.into())
338    }
339
340    /// Release a distributed lock using the key from the LockResponse
341    pub async fn unlock(&self, lock_key: impl Into<Vec<u8>>) -> Result<()> {
342        let mut lock_client = self.client.lock_client();
343        lock_client
344            .unlock(lock_key)
345            .await
346            .map_err(|err: etcd_client::Error| anyhow::anyhow!(err))?;
347        Ok(())
348    }
349
350    /// Like kv_get_and_watch_prefix but only for new changes, does not include existing values.
351    pub async fn kv_watch_prefix(
352        &self,
353        prefix: impl AsRef<str> + std::fmt::Display,
354    ) -> Result<PrefixWatcher> {
355        self.watch_internal(prefix, false).await
356    }
357
358    pub async fn kv_get_and_watch_prefix(
359        &self,
360        prefix: impl AsRef<str> + std::fmt::Display,
361    ) -> Result<PrefixWatcher> {
362        self.watch_internal(prefix, true).await
363    }
364
365    async fn watch_internal(
366        &self,
367        prefix: impl AsRef<str> + std::fmt::Display,
368        include_existing: bool,
369    ) -> Result<PrefixWatcher> {
370        let mut kv_client = self.client.kv_client();
371        let mut watch_client = self.client.watch_client();
372
373        let mut get_response = kv_client
374            .get(prefix.as_ref(), Some(GetOptions::new().with_prefix()))
375            .await?;
376
377        let start_revision = get_response
378            .header()
379            .ok_or(error!("missing header; unable to get revision"))?
380            .revision();
381
382        tracing::trace!("{prefix}: start_revision: {start_revision}");
383        let start_revision = start_revision + 1;
384
385        let (watcher, mut watch_stream) = watch_client
386            .watch(
387                prefix.as_ref(),
388                Some(
389                    WatchOptions::new()
390                        .with_prefix()
391                        .with_start_revision(start_revision)
392                        .with_prev_key(),
393                ),
394            )
395            .await?;
396
397        let kvs = if include_existing {
398            let kvs = get_response.take_kvs();
399            tracing::trace!("initial kv count: {:?}", kvs.len());
400            kvs
401        } else {
402            vec![]
403        };
404
405        let (tx, rx) = mpsc::channel(32);
406
407        self.rt.spawn(async move {
408            if include_existing {
409                for kv in kvs {
410                    if tx.send(WatchEvent::Put(kv)).await.is_err() {
411                        // receiver is already closed
412                        return;
413                    }
414                }
415            }
416
417            loop {
418                tokio::select! {
419                    maybe_resp = watch_stream.next() => {
420                        // Early return for None or Err cases
421                        let Some(Ok(response)) = maybe_resp else {
422                            tracing::info!("kv watch stream closed");
423                            return;
424                        };
425
426                        // Process events
427                        for event in response.events() {
428                            // Extract the KeyValue if it exists
429                            let Some(kv) = event.kv() else {
430                                continue; // Skip events with no KV
431                            };
432
433                            // Handle based on event type
434                            match event.event_type() {
435                                etcd_client::EventType::Put => {
436                                    if let Err(err) = tx.send(WatchEvent::Put(kv.clone())).await {
437                                        tracing::error!("kv watcher error forwarding WatchEvent::Put: {err}");
438                                        return;
439                                    }
440                                }
441                                etcd_client::EventType::Delete => {
442                                    if tx.send(WatchEvent::Delete(kv.clone())).await.is_err() {
443                                        return;
444                                    }
445                                }
446                            }
447                        }
448                    }
449                    _ = tx.closed() => {
450                        tracing::debug!("no more receivers, stopping watcher");
451                        return;
452                    }
453                }
454            }
455        });
456        Ok(PrefixWatcher {
457            prefix: prefix.as_ref().to_string(),
458            watcher,
459            rx,
460        })
461    }
462}
463
464#[derive(Dissolve)]
465pub struct PrefixWatcher {
466    prefix: String,
467    watcher: Watcher,
468    rx: mpsc::Receiver<WatchEvent>,
469}
470
471#[derive(Debug)]
472pub enum WatchEvent {
473    Put(KeyValue),
474    Delete(KeyValue),
475}
476
477/// ETCD client configuration options
478#[derive(Debug, Clone, Builder, Validate)]
479pub struct ClientOptions {
480    #[validate(length(min = 1))]
481    pub etcd_url: Vec<String>,
482
483    #[builder(default)]
484    pub etcd_connect_options: Option<ConnectOptions>,
485
486    /// If true, the client will attach a lease to the primary [`CancellationToken`].
487    #[builder(default = "true")]
488    pub attach_lease: bool,
489}
490
491impl Default for ClientOptions {
492    fn default() -> Self {
493        let mut connect_options = None;
494
495        if let (Ok(username), Ok(password)) = (
496            std::env::var("ETCD_AUTH_USERNAME"),
497            std::env::var("ETCD_AUTH_PASSWORD"),
498        ) {
499            // username and password are set
500            connect_options = Some(ConnectOptions::new().with_user(username, password));
501        } else if let (Ok(ca), Ok(cert), Ok(key)) = (
502            std::env::var("ETCD_AUTH_CA"),
503            std::env::var("ETCD_AUTH_CLIENT_CERT"),
504            std::env::var("ETCD_AUTH_CLIENT_KEY"),
505        ) {
506            // TLS is set
507            connect_options = Some(
508                ConnectOptions::new().with_tls(
509                    TlsOptions::new()
510                        .ca_certificate(Certificate::from_pem(ca))
511                        .identity(Identity::from_pem(cert, key)),
512                ),
513            );
514        }
515
516        ClientOptions {
517            etcd_url: default_servers(),
518            etcd_connect_options: connect_options,
519            attach_lease: true,
520        }
521    }
522}
523
524fn default_servers() -> Vec<String> {
525    match std::env::var("ETCD_ENDPOINTS") {
526        Ok(possible_list_of_urls) => possible_list_of_urls
527            .split(',')
528            .map(|s| s.to_string())
529            .collect(),
530        Err(_) => vec!["http://localhost:2379".to_string()],
531    }
532}
533
534/// A cache for etcd key-value pairs that watches for changes
535pub struct KvCache {
536    client: Client,
537    pub prefix: String,
538    cache: Arc<RwLock<HashMap<String, Vec<u8>>>>,
539    watcher: Option<PrefixWatcher>,
540}
541
542impl KvCache {
543    /// Create a new KV cache for the given prefix
544    pub async fn new(
545        client: Client,
546        prefix: String,
547        initial_values: HashMap<String, Vec<u8>>,
548    ) -> Result<Self> {
549        let mut cache = HashMap::new();
550
551        // First get all existing keys with this prefix
552        let existing_kvs = client.kv_get_prefix(&prefix).await?;
553        for kv in existing_kvs {
554            let key = String::from_utf8_lossy(kv.key()).to_string();
555            cache.insert(key, kv.value().to_vec());
556        }
557
558        // For any keys in initial_values that don't exist in etcd, write them
559        // TODO: proper lease handling, this requires the first process that write to a prefix atomically
560        // create a lease and write the lease to etcd. Later processes will attach to the lease and
561        // help refresh the lease.
562        for (key, value) in initial_values.iter() {
563            let full_key = format!("{}{}", prefix, key);
564            if let std::collections::hash_map::Entry::Vacant(e) = cache.entry(full_key.clone()) {
565                client.kv_put(&full_key, value.clone(), None).await?;
566                e.insert(value.clone());
567            }
568        }
569
570        // Start watching for changes
571        // we won't miss events between the initial push and the watcher starting because
572        // client.kv_get_and_watch_prefix() will get all kv pairs and put them back again
573        let watcher = client.kv_get_and_watch_prefix(&prefix).await?;
574
575        let cache = Arc::new(RwLock::new(cache));
576        let mut result = Self {
577            client,
578            prefix,
579            cache,
580            watcher: Some(watcher),
581        };
582
583        // Start the background watcher task
584        result.start_watcher().await?;
585
586        Ok(result)
587    }
588
589    /// Start the background watcher task
590    async fn start_watcher(&mut self) -> Result<()> {
591        if let Some(watcher) = self.watcher.take() {
592            let cache = self.cache.clone();
593            let prefix = self.prefix.clone();
594
595            tokio::spawn(async move {
596                let mut rx = watcher.rx;
597
598                while let Some(event) = rx.recv().await {
599                    match event {
600                        WatchEvent::Put(kv) => {
601                            let key = String::from_utf8_lossy(kv.key()).to_string();
602                            let value = kv.value().to_vec();
603
604                            tracing::trace!("KvCache update: {} = {:?}", key, value);
605                            let mut cache_write = cache.write().await;
606                            cache_write.insert(key, value);
607                        }
608                        WatchEvent::Delete(kv) => {
609                            let key = String::from_utf8_lossy(kv.key()).to_string();
610
611                            tracing::trace!("KvCache delete: {}", key);
612                            let mut cache_write = cache.write().await;
613                            cache_write.remove(&key);
614                        }
615                    }
616                }
617
618                tracing::debug!("KvCache watcher for prefix '{}' stopped", prefix);
619            });
620        }
621
622        Ok(())
623    }
624
625    /// Get a value from the cache
626    pub async fn get(&self, key: &str) -> Option<Vec<u8>> {
627        let full_key = format!("{}{}", self.prefix, key);
628        let cache_read = self.cache.read().await;
629        cache_read.get(&full_key).cloned()
630    }
631
632    /// Get all key-value pairs in the cache
633    pub async fn get_all(&self) -> HashMap<String, Vec<u8>> {
634        let cache_read = self.cache.read().await;
635        cache_read.clone()
636    }
637
638    /// Update a value in both the cache and etcd
639    pub async fn put(&self, key: &str, value: Vec<u8>, lease_id: Option<u64>) -> Result<()> {
640        let full_key = format!("{}{}", self.prefix, key);
641
642        // Update etcd first
643        self.client
644            .kv_put(&full_key, value.clone(), lease_id)
645            .await?;
646
647        // Then update local cache
648        let mut cache_write = self.cache.write().await;
649        cache_write.insert(full_key, value);
650
651        Ok(())
652    }
653
654    /// Delete a key from both the cache and etcd
655    pub async fn delete(&self, key: &str) -> Result<()> {
656        let full_key = format!("{}{}", self.prefix, key);
657
658        // Delete from etcd first
659        self.client.kv_delete(full_key.clone(), None).await?;
660
661        // Then remove from local cache
662        let mut cache_write = self.cache.write().await;
663        cache_write.remove(&full_key);
664
665        Ok(())
666    }
667}
668
669#[cfg(feature = "integration")]
670#[cfg(test)]
671mod tests {
672    use crate::{DistributedRuntime, distributed::DistributedConfig};
673
674    use super::*;
675
676    #[test]
677    fn test_ectd_client() {
678        let rt = Runtime::from_settings().unwrap();
679        let rt_clone = rt.clone();
680        let config = DistributedConfig::from_settings(false);
681
682        rt_clone.primary().block_on(async move {
683            let drt = DistributedRuntime::new(rt, config).await.unwrap();
684            test_kv_create_or_validate(drt).await.unwrap();
685        });
686    }
687
688    async fn test_kv_create_or_validate(drt: DistributedRuntime) -> Result<()> {
689        let key = "__integration_test_key";
690        let value = b"test_value";
691
692        let client = drt.etcd_client().expect("etcd client should be available");
693        let lease_id = drt
694            .primary_lease()
695            .expect("primary lease should be available")
696            .id();
697
698        // Create the key
699        let result = client.kv_create(key, value.to_vec(), Some(lease_id)).await;
700        assert!(result.is_ok(), "");
701
702        // Try to create the key again - this should fail
703        let result = client.kv_create(key, value.to_vec(), Some(lease_id)).await;
704        assert!(result.is_err());
705
706        // Create or validate should succeed as the values match
707        let result = client
708            .kv_create_or_validate(key.to_string(), value.to_vec(), Some(lease_id))
709            .await;
710        assert!(result.is_ok());
711
712        // Try to create the key with a different value
713        let different_value = b"different_value";
714        let result = client
715            .kv_create_or_validate(key.to_string(), different_value.to_vec(), Some(lease_id))
716            .await;
717        assert!(result.is_err(), "");
718
719        Ok(())
720    }
721
722    #[test]
723    fn test_kv_cache() {
724        let rt = Runtime::from_settings().unwrap();
725        let rt_clone = rt.clone();
726        let config = DistributedConfig::from_settings(false);
727
728        rt_clone.primary().block_on(async move {
729            let drt = DistributedRuntime::new(rt, config).await.unwrap();
730            test_kv_cache_operations(drt).await.unwrap();
731        });
732    }
733
734    async fn test_kv_cache_operations(drt: DistributedRuntime) -> Result<()> {
735        // Get the client and unwrap it
736        let client = drt.etcd_client().expect("etcd client should be available");
737
738        // Create a unique test prefix to avoid conflicts with other tests
739        let test_id = uuid::Uuid::new_v4().to_string();
740        let prefix = format!("v1/test_kv_cache_{}/", test_id);
741
742        // Initial values
743        let mut initial_values = HashMap::new();
744        initial_values.insert("key1".to_string(), b"value1".to_vec());
745        initial_values.insert("key2".to_string(), b"value2".to_vec());
746
747        // Create the KV cache
748        let kv_cache = KvCache::new(client.clone(), prefix.clone(), initial_values).await?;
749
750        // Test get
751        let value1 = kv_cache.get("key1").await;
752        assert_eq!(value1, Some(b"value1".to_vec()));
753
754        let value2 = kv_cache.get("key2").await;
755        assert_eq!(value2, Some(b"value2".to_vec()));
756
757        // Test get_all
758        let all_values = kv_cache.get_all().await;
759        assert_eq!(all_values.len(), 2);
760        assert_eq!(
761            all_values.get(&format!("{}key1", prefix)),
762            Some(&b"value1".to_vec())
763        );
764        assert_eq!(
765            all_values.get(&format!("{}key2", prefix)),
766            Some(&b"value2".to_vec())
767        );
768
769        // Test put - using None for lease_id
770        kv_cache.put("key3", b"value3".to_vec(), None).await?;
771
772        // Allow some time for the update to propagate
773        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
774
775        // Verify the new value
776        let value3 = kv_cache.get("key3").await;
777        assert_eq!(value3, Some(b"value3".to_vec()));
778
779        // Test update
780        kv_cache
781            .put("key1", b"updated_value1".to_vec(), None)
782            .await?;
783
784        // Allow some time for the update to propagate
785        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
786
787        // Verify the updated value
788        let updated_value1 = kv_cache.get("key1").await;
789        assert_eq!(updated_value1, Some(b"updated_value1".to_vec()));
790
791        // Test external update (simulating another client updating a value)
792        client
793            .kv_put(
794                &format!("{}key2", prefix),
795                b"external_update".to_vec(),
796                None,
797            )
798            .await?;
799
800        // Allow some time for the update to propagate
801        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
802
803        // Verify the cache was updated
804        let external_update = kv_cache.get("key2").await;
805        assert_eq!(external_update, Some(b"external_update".to_vec()));
806
807        // Clean up - delete the test keys
808        let etcd_client = client.etcd_client();
809        let _ = etcd_client
810            .kv_client()
811            .delete(
812                prefix,
813                Some(etcd_client::DeleteOptions::new().with_prefix()),
814            )
815            .await?;
816
817        Ok(())
818    }
819}