dynamo_runtime/transports/
etcd.rs

1// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::{CancellationToken, ErrorContext, Result, Runtime, error};
5
6use async_nats::jetstream::kv;
7use derive_builder::Builder;
8use derive_getters::Dissolve;
9use futures::StreamExt;
10use std::collections::HashMap;
11use std::sync::Arc;
12use tokio::sync::{RwLock, mpsc};
13use validator::Validate;
14
15use etcd_client::{
16    Certificate, Compare, CompareOp, DeleteOptions, GetOptions, Identity, LockClient, LockOptions,
17    LockResponse, PutOptions, PutResponse, TlsOptions, Txn, TxnOp, TxnOpResponse, WatchOptions,
18    Watcher,
19};
20pub use etcd_client::{ConnectOptions, KeyValue, LeaseClient};
21use tokio::time::{Duration, interval};
22
23mod lease;
24mod lock;
25mod path;
26
27use lease::*;
28pub use lock::*;
29pub use path::*;
30
31use super::utils::build_in_runtime;
32
33/// ETCD Client
34#[derive(Clone)]
35pub struct Client {
36    client: etcd_client::Client,
37    primary_lease: i64,
38    runtime: Runtime,
39    rt: Arc<tokio::runtime::Runtime>,
40}
41
42#[derive(Debug, Clone)]
43pub struct Lease {
44    /// ETCD lease ID
45    id: i64,
46
47    /// [`CancellationToken`] associated with the lease
48    cancel_token: CancellationToken,
49}
50
51impl Lease {
52    /// Get the lease ID
53    pub fn id(&self) -> i64 {
54        self.id
55    }
56
57    /// Get the primary [`CancellationToken`] associated with the lease.
58    /// This token will revoke the lease if canceled.
59    pub fn primary_token(&self) -> CancellationToken {
60        self.cancel_token.clone()
61    }
62
63    /// Get a child [`CancellationToken`] from the lease's [`CancellationToken`].
64    /// This child token will be triggered if the lease is revoked, but will not revoke the lease if canceled.
65    pub fn child_token(&self) -> CancellationToken {
66        self.cancel_token.child_token()
67    }
68
69    /// Revoke the lease triggering the [`CancellationToken`].
70    pub fn revoke(&self) {
71        self.cancel_token.cancel();
72    }
73
74    /// Check if the lease is still valid (not revoked)
75    pub async fn is_valid(&self) -> Result<bool> {
76        // A lease is valid if its cancellation token has not been triggered
77        // We can use try_cancelled which returns immediately with a boolean
78        Ok(!self.cancel_token.is_cancelled())
79    }
80}
81
82impl Client {
83    pub fn builder() -> ClientOptionsBuilder {
84        ClientOptionsBuilder::default()
85    }
86
87    /// Create a new discovery client
88    ///
89    /// This will establish a connection to the etcd server, create a primary lease,
90    /// and spawn a task to keep the lease alive and tie the lifetime of the [`Runtime`]
91    /// to the lease.
92    ///
93    /// If the lease expires, the [`Runtime`] will be shutdown.
94    /// If the [`Runtime`] is shutdown, the lease will be revoked.
95    pub async fn new(config: ClientOptions, runtime: Runtime) -> Result<Self> {
96        let token = runtime.primary_token();
97
98        let ((client, lease_id), rt) = build_in_runtime(
99            async move {
100                let client = etcd_client::Client::connect(
101                    config.etcd_url.clone(),
102                    config.etcd_connect_options,
103                )
104                .await
105                .with_context(|| {
106                    format!(
107                        "Unable to connect to etcd server at {}. Check etcd server status",
108                        config.etcd_url.join(", ")
109                    )
110                })?;
111
112                let lease_id = if config.attach_lease {
113                    let lease_client = client.lease_client();
114
115                    let lease = create_lease(lease_client, 10, token)
116                        .await
117                        .with_context(|| {
118                            format!(
119                                "Unable to create lease. Check etcd server status at {}",
120                                config.etcd_url.join(", ")
121                            )
122                        })?;
123
124                    lease.id
125                } else {
126                    0
127                };
128
129                Ok((client, lease_id))
130            },
131            1,
132        )
133        .await?;
134
135        Ok(Client {
136            client,
137            primary_lease: lease_id,
138            rt,
139            runtime,
140        })
141    }
142
143    /// Get a reference to the underlying [`etcd_client::Client`] instance.
144    pub(crate) fn etcd_client(&self) -> &etcd_client::Client {
145        &self.client
146    }
147
148    /// Get the primary lease ID.
149    pub fn lease_id(&self) -> i64 {
150        self.primary_lease
151    }
152
153    /// Primary [`Lease`]
154    pub fn primary_lease(&self) -> Lease {
155        Lease {
156            id: self.primary_lease,
157            cancel_token: self.runtime.primary_token(),
158        }
159    }
160
161    /// Create a [`Lease`] with a given time-to-live (TTL).
162    /// This [`Lease`] will be tied to the [`Runtime`], specifically a child [`CancellationToken`].
163    pub async fn create_lease(&self, ttl: i64) -> Result<Lease> {
164        let token = self.runtime.child_token();
165        let lease_client = self.client.lease_client();
166        self.rt
167            .spawn(create_lease(lease_client, ttl, token))
168            .await?
169    }
170
171    // Revoke an etcd lease given its lease id. A wrapper over etcd_client::LeaseClient::revoke
172    pub async fn revoke_lease(&self, lease_id: i64) -> Result<()> {
173        let lease_client = self.client.lease_client();
174        self.rt.spawn(revoke_lease(lease_client, lease_id)).await?
175    }
176
177    pub async fn kv_create(&self, key: &str, value: Vec<u8>, lease_id: Option<i64>) -> Result<()> {
178        let id = lease_id.unwrap_or(self.lease_id());
179        let put_options = PutOptions::new().with_lease(id);
180
181        // Build the transaction
182        let txn = Txn::new()
183            .when(vec![Compare::version(key, CompareOp::Equal, 0)]) // Ensure the lock does not exist
184            .and_then(vec![
185                TxnOp::put(key, value, Some(put_options)), // Create the object
186            ]);
187
188        // Execute the transaction
189        let result = self.client.kv_client().txn(txn).await?;
190
191        if result.succeeded() {
192            Ok(())
193        } else {
194            for resp in result.op_responses() {
195                tracing::warn!(response = ?resp, "kv_create etcd op response");
196            }
197            Err(error!("Unable to create key. Check etcd server status"))
198        }
199    }
200
201    /// Atomically create a key if it does not exist, or validate the values are identical if the key exists.
202    pub async fn kv_create_or_validate(
203        &self,
204        key: String,
205        value: Vec<u8>,
206        lease_id: Option<i64>,
207    ) -> Result<()> {
208        let id = lease_id.unwrap_or(self.lease_id());
209        let put_options = PutOptions::new().with_lease(id);
210
211        // Build the transaction that either creates the key if it doesn't exist,
212        // or validates the existing value matches what we expect
213        let txn = Txn::new()
214            .when(vec![Compare::version(key.as_str(), CompareOp::Equal, 0)]) // Key doesn't exist
215            .and_then(vec![
216                TxnOp::put(key.as_str(), value.clone(), Some(put_options)), // Create it
217            ])
218            .or_else(vec![
219                // If key exists but values don't match, this will fail the transaction
220                TxnOp::txn(Txn::new().when(vec![Compare::value(
221                    key.as_str(),
222                    CompareOp::Equal,
223                    value.clone(),
224                )])),
225            ]);
226
227        // Execute the transaction
228        let result = self.client.kv_client().txn(txn).await?;
229
230        // We have to enumerate the response paths to determine if the transaction succeeded
231        if result.succeeded() {
232            Ok(())
233        } else {
234            match result.op_responses().first() {
235                Some(response) => match response {
236                    TxnOpResponse::Txn(response) => match response.succeeded() {
237                        true => Ok(()),
238                        false => Err(error!(
239                            "Unable to create or validate key. Check etcd server status"
240                        )),
241                    },
242                    _ => Err(error!(
243                        "Unable to validate key operation. Check etcd server status"
244                    )),
245                },
246                None => Err(error!(
247                    "Unable to create or validate key. Check etcd server status"
248                )),
249            }
250        }
251    }
252
253    pub async fn kv_put(
254        &self,
255        key: impl AsRef<str>,
256        value: impl AsRef<[u8]>,
257        lease_id: Option<i64>,
258    ) -> Result<()> {
259        let id = lease_id.unwrap_or(self.lease_id());
260        let put_options = PutOptions::new().with_lease(id);
261        let _ = self
262            .client
263            .kv_client()
264            .put(key.as_ref(), value.as_ref(), Some(put_options))
265            .await?;
266        Ok(())
267    }
268
269    pub async fn kv_put_with_options(
270        &self,
271        key: impl AsRef<str>,
272        value: impl AsRef<[u8]>,
273        options: Option<PutOptions>,
274    ) -> Result<PutResponse> {
275        let options = options
276            .unwrap_or_default()
277            .with_lease(self.primary_lease().id());
278        self.client
279            .kv_client()
280            .put(key.as_ref(), value.as_ref(), Some(options))
281            .await
282            .map_err(|err| err.into())
283    }
284
285    pub async fn kv_get(
286        &self,
287        key: impl Into<Vec<u8>>,
288        options: Option<GetOptions>,
289    ) -> Result<Vec<KeyValue>> {
290        let mut get_response = self.client.kv_client().get(key, options).await?;
291        Ok(get_response.take_kvs())
292    }
293
294    pub async fn kv_delete(
295        &self,
296        key: impl Into<Vec<u8>>,
297        options: Option<DeleteOptions>,
298    ) -> Result<i64> {
299        self.client
300            .kv_client()
301            .delete(key, options)
302            .await
303            .map(|del_response| del_response.deleted())
304            .map_err(|err| err.into())
305    }
306
307    pub async fn kv_get_prefix(&self, prefix: impl AsRef<str>) -> Result<Vec<KeyValue>> {
308        let mut get_response = self
309            .client
310            .kv_client()
311            .get(prefix.as_ref(), Some(GetOptions::new().with_prefix()))
312            .await?;
313
314        Ok(get_response.take_kvs())
315    }
316
317    /// Acquire a distributed lock using etcd's native lock mechanism
318    /// Returns a LockResponse that can be used to unlock later
319    pub async fn lock(
320        &self,
321        key: impl Into<Vec<u8>>,
322        lease_id: Option<i64>,
323    ) -> Result<LockResponse> {
324        let mut lock_client = self.client.lock_client();
325        let id = lease_id.unwrap_or(self.lease_id());
326        let options = LockOptions::new().with_lease(id);
327        lock_client
328            .lock(key, Some(options))
329            .await
330            .map_err(|err| err.into())
331    }
332
333    /// Release a distributed lock using the key from the LockResponse
334    pub async fn unlock(&self, lock_key: impl Into<Vec<u8>>) -> Result<()> {
335        let mut lock_client = self.client.lock_client();
336        lock_client
337            .unlock(lock_key)
338            .await
339            .map_err(|err: etcd_client::Error| anyhow::anyhow!(err))?;
340        Ok(())
341    }
342
343    /// Like kv_get_and_watch_prefix but only for new changes, does not include existing values.
344    pub async fn kv_watch_prefix(
345        &self,
346        prefix: impl AsRef<str> + std::fmt::Display,
347    ) -> Result<PrefixWatcher> {
348        self.watch_internal(prefix, false).await
349    }
350
351    pub async fn kv_get_and_watch_prefix(
352        &self,
353        prefix: impl AsRef<str> + std::fmt::Display,
354    ) -> Result<PrefixWatcher> {
355        self.watch_internal(prefix, true).await
356    }
357
358    async fn watch_internal(
359        &self,
360        prefix: impl AsRef<str> + std::fmt::Display,
361        include_existing: bool,
362    ) -> Result<PrefixWatcher> {
363        let mut kv_client = self.client.kv_client();
364        let mut watch_client = self.client.watch_client();
365
366        let mut get_response = kv_client
367            .get(prefix.as_ref(), Some(GetOptions::new().with_prefix()))
368            .await?;
369
370        let start_revision = get_response
371            .header()
372            .ok_or(error!("missing header; unable to get revision"))?
373            .revision();
374
375        tracing::trace!("{prefix}: start_revision: {start_revision}");
376        let start_revision = start_revision + 1;
377
378        let (watcher, mut watch_stream) = watch_client
379            .watch(
380                prefix.as_ref(),
381                Some(
382                    WatchOptions::new()
383                        .with_prefix()
384                        .with_start_revision(start_revision)
385                        .with_prev_key(),
386                ),
387            )
388            .await?;
389
390        let kvs = if include_existing {
391            let kvs = get_response.take_kvs();
392            tracing::trace!("initial kv count: {:?}", kvs.len());
393            kvs
394        } else {
395            vec![]
396        };
397
398        let (tx, rx) = mpsc::channel(32);
399
400        self.rt.spawn(async move {
401            if include_existing {
402                for kv in kvs {
403                    if tx.send(WatchEvent::Put(kv)).await.is_err() {
404                        // receiver is already closed
405                        return;
406                    }
407                }
408            }
409
410            loop {
411                tokio::select! {
412                    maybe_resp = watch_stream.next() => {
413                        // Early return for None or Err cases
414                        let Some(Ok(response)) = maybe_resp else {
415                            tracing::info!("kv watch stream closed");
416                            return;
417                        };
418
419                        // Process events
420                        for event in response.events() {
421                            // Extract the KeyValue if it exists
422                            let Some(kv) = event.kv() else {
423                                continue; // Skip events with no KV
424                            };
425
426                            // Handle based on event type
427                            match event.event_type() {
428                                etcd_client::EventType::Put => {
429                                    if let Err(err) = tx.send(WatchEvent::Put(kv.clone())).await {
430                                        tracing::error!("kv watcher error forwarding WatchEvent::Put: {err}");
431                                        return;
432                                    }
433                                }
434                                etcd_client::EventType::Delete => {
435                                    if tx.send(WatchEvent::Delete(kv.clone())).await.is_err() {
436                                        return;
437                                    }
438                                }
439                            }
440                        }
441                    }
442                    _ = tx.closed() => {
443                        tracing::debug!("no more receivers, stopping watcher");
444                        return;
445                    }
446                }
447            }
448        });
449        Ok(PrefixWatcher {
450            prefix: prefix.as_ref().to_string(),
451            watcher,
452            rx,
453        })
454    }
455}
456
457#[derive(Dissolve)]
458pub struct PrefixWatcher {
459    prefix: String,
460    watcher: Watcher,
461    rx: mpsc::Receiver<WatchEvent>,
462}
463
464#[derive(Debug)]
465pub enum WatchEvent {
466    Put(KeyValue),
467    Delete(KeyValue),
468}
469
470/// ETCD client configuration options
471#[derive(Debug, Clone, Builder, Validate)]
472pub struct ClientOptions {
473    #[validate(length(min = 1))]
474    pub etcd_url: Vec<String>,
475
476    #[builder(default)]
477    pub etcd_connect_options: Option<ConnectOptions>,
478
479    /// If true, the client will attach a lease to the primary [`CancellationToken`].
480    #[builder(default = "true")]
481    pub attach_lease: bool,
482}
483
484impl Default for ClientOptions {
485    fn default() -> Self {
486        let mut connect_options = None;
487
488        if let (Ok(username), Ok(password)) = (
489            std::env::var("ETCD_AUTH_USERNAME"),
490            std::env::var("ETCD_AUTH_PASSWORD"),
491        ) {
492            // username and password are set
493            connect_options = Some(ConnectOptions::new().with_user(username, password));
494        } else if let (Ok(ca), Ok(cert), Ok(key)) = (
495            std::env::var("ETCD_AUTH_CA"),
496            std::env::var("ETCD_AUTH_CLIENT_CERT"),
497            std::env::var("ETCD_AUTH_CLIENT_KEY"),
498        ) {
499            // TLS is set
500            connect_options = Some(
501                ConnectOptions::new().with_tls(
502                    TlsOptions::new()
503                        .ca_certificate(Certificate::from_pem(ca))
504                        .identity(Identity::from_pem(cert, key)),
505                ),
506            );
507        }
508
509        ClientOptions {
510            etcd_url: default_servers(),
511            etcd_connect_options: connect_options,
512            attach_lease: true,
513        }
514    }
515}
516
517fn default_servers() -> Vec<String> {
518    match std::env::var("ETCD_ENDPOINTS") {
519        Ok(possible_list_of_urls) => possible_list_of_urls
520            .split(',')
521            .map(|s| s.to_string())
522            .collect(),
523        Err(_) => vec!["http://localhost:2379".to_string()],
524    }
525}
526
527/// A cache for etcd key-value pairs that watches for changes
528pub struct KvCache {
529    client: Client,
530    pub prefix: String,
531    cache: Arc<RwLock<HashMap<String, Vec<u8>>>>,
532    watcher: Option<PrefixWatcher>,
533}
534
535impl KvCache {
536    /// Create a new KV cache for the given prefix
537    pub async fn new(
538        client: Client,
539        prefix: String,
540        initial_values: HashMap<String, Vec<u8>>,
541    ) -> Result<Self> {
542        let mut cache = HashMap::new();
543
544        // First get all existing keys with this prefix
545        let existing_kvs = client.kv_get_prefix(&prefix).await?;
546        for kv in existing_kvs {
547            let key = String::from_utf8_lossy(kv.key()).to_string();
548            cache.insert(key, kv.value().to_vec());
549        }
550
551        // For any keys in initial_values that don't exist in etcd, write them
552        // TODO: proper lease handling, this requires the first process that write to a prefix atomically
553        // create a lease and write the lease to etcd. Later processes will attach to the lease and
554        // help refresh the lease.
555        for (key, value) in initial_values.iter() {
556            let full_key = format!("{}{}", prefix, key);
557            if let std::collections::hash_map::Entry::Vacant(e) = cache.entry(full_key.clone()) {
558                client.kv_put(&full_key, value.clone(), None).await?;
559                e.insert(value.clone());
560            }
561        }
562
563        // Start watching for changes
564        // we won't miss events between the initial push and the watcher starting because
565        // client.kv_get_and_watch_prefix() will get all kv pairs and put them back again
566        let watcher = client.kv_get_and_watch_prefix(&prefix).await?;
567
568        let cache = Arc::new(RwLock::new(cache));
569        let mut result = Self {
570            client,
571            prefix,
572            cache,
573            watcher: Some(watcher),
574        };
575
576        // Start the background watcher task
577        result.start_watcher().await?;
578
579        Ok(result)
580    }
581
582    /// Start the background watcher task
583    async fn start_watcher(&mut self) -> Result<()> {
584        if let Some(watcher) = self.watcher.take() {
585            let cache = self.cache.clone();
586            let prefix = self.prefix.clone();
587
588            tokio::spawn(async move {
589                let mut rx = watcher.rx;
590
591                while let Some(event) = rx.recv().await {
592                    match event {
593                        WatchEvent::Put(kv) => {
594                            let key = String::from_utf8_lossy(kv.key()).to_string();
595                            let value = kv.value().to_vec();
596
597                            tracing::trace!("KvCache update: {} = {:?}", key, value);
598                            let mut cache_write = cache.write().await;
599                            cache_write.insert(key, value);
600                        }
601                        WatchEvent::Delete(kv) => {
602                            let key = String::from_utf8_lossy(kv.key()).to_string();
603
604                            tracing::trace!("KvCache delete: {}", key);
605                            let mut cache_write = cache.write().await;
606                            cache_write.remove(&key);
607                        }
608                    }
609                }
610
611                tracing::debug!("KvCache watcher for prefix '{}' stopped", prefix);
612            });
613        }
614
615        Ok(())
616    }
617
618    /// Get a value from the cache
619    pub async fn get(&self, key: &str) -> Option<Vec<u8>> {
620        let full_key = format!("{}{}", self.prefix, key);
621        let cache_read = self.cache.read().await;
622        cache_read.get(&full_key).cloned()
623    }
624
625    /// Get all key-value pairs in the cache
626    pub async fn get_all(&self) -> HashMap<String, Vec<u8>> {
627        let cache_read = self.cache.read().await;
628        cache_read.clone()
629    }
630
631    /// Update a value in both the cache and etcd
632    pub async fn put(&self, key: &str, value: Vec<u8>, lease_id: Option<i64>) -> Result<()> {
633        let full_key = format!("{}{}", self.prefix, key);
634
635        // Update etcd first
636        self.client
637            .kv_put(&full_key, value.clone(), lease_id)
638            .await?;
639
640        // Then update local cache
641        let mut cache_write = self.cache.write().await;
642        cache_write.insert(full_key, value);
643
644        Ok(())
645    }
646
647    /// Delete a key from both the cache and etcd
648    pub async fn delete(&self, key: &str) -> Result<()> {
649        let full_key = format!("{}{}", self.prefix, key);
650
651        // Delete from etcd first
652        self.client.kv_delete(full_key.clone(), None).await?;
653
654        // Then remove from local cache
655        let mut cache_write = self.cache.write().await;
656        cache_write.remove(&full_key);
657
658        Ok(())
659    }
660}
661
662#[cfg(feature = "integration")]
663#[cfg(test)]
664mod tests {
665    use crate::{DistributedRuntime, distributed::DistributedConfig};
666
667    use super::*;
668
669    #[test]
670    fn test_ectd_client() {
671        let rt = Runtime::from_settings().unwrap();
672        let rt_clone = rt.clone();
673        let config = DistributedConfig::from_settings(false);
674
675        rt_clone.primary().block_on(async move {
676            let drt = DistributedRuntime::new(rt, config).await.unwrap();
677            test_kv_create_or_validate(drt).await.unwrap();
678        });
679    }
680
681    async fn test_kv_create_or_validate(drt: DistributedRuntime) -> Result<()> {
682        let key = "__integration_test_key";
683        let value = b"test_value";
684
685        let client = drt.etcd_client().expect("etcd client should be available");
686        let lease_id = drt
687            .primary_lease()
688            .expect("primary lease should be available")
689            .id();
690
691        // Create the key
692        let result = client.kv_create(key, value.to_vec(), Some(lease_id)).await;
693        assert!(result.is_ok(), "");
694
695        // Try to create the key again - this should fail
696        let result = client.kv_create(key, value.to_vec(), Some(lease_id)).await;
697        assert!(result.is_err());
698
699        // Create or validate should succeed as the values match
700        let result = client
701            .kv_create_or_validate(key.to_string(), value.to_vec(), Some(lease_id))
702            .await;
703        assert!(result.is_ok());
704
705        // Try to create the key with a different value
706        let different_value = b"different_value";
707        let result = client
708            .kv_create_or_validate(key.to_string(), different_value.to_vec(), Some(lease_id))
709            .await;
710        assert!(result.is_err(), "");
711
712        Ok(())
713    }
714
715    #[test]
716    fn test_kv_cache() {
717        let rt = Runtime::from_settings().unwrap();
718        let rt_clone = rt.clone();
719        let config = DistributedConfig::from_settings(false);
720
721        rt_clone.primary().block_on(async move {
722            let drt = DistributedRuntime::new(rt, config).await.unwrap();
723            test_kv_cache_operations(drt).await.unwrap();
724        });
725    }
726
727    async fn test_kv_cache_operations(drt: DistributedRuntime) -> Result<()> {
728        // Get the client and unwrap it
729        let client = drt.etcd_client().expect("etcd client should be available");
730
731        // Create a unique test prefix to avoid conflicts with other tests
732        let test_id = uuid::Uuid::new_v4().to_string();
733        let prefix = format!("v1/test_kv_cache_{}/", test_id);
734
735        // Initial values
736        let mut initial_values = HashMap::new();
737        initial_values.insert("key1".to_string(), b"value1".to_vec());
738        initial_values.insert("key2".to_string(), b"value2".to_vec());
739
740        // Create the KV cache
741        let kv_cache = KvCache::new(client.clone(), prefix.clone(), initial_values).await?;
742
743        // Test get
744        let value1 = kv_cache.get("key1").await;
745        assert_eq!(value1, Some(b"value1".to_vec()));
746
747        let value2 = kv_cache.get("key2").await;
748        assert_eq!(value2, Some(b"value2".to_vec()));
749
750        // Test get_all
751        let all_values = kv_cache.get_all().await;
752        assert_eq!(all_values.len(), 2);
753        assert_eq!(
754            all_values.get(&format!("{}key1", prefix)),
755            Some(&b"value1".to_vec())
756        );
757        assert_eq!(
758            all_values.get(&format!("{}key2", prefix)),
759            Some(&b"value2".to_vec())
760        );
761
762        // Test put - using None for lease_id
763        kv_cache.put("key3", b"value3".to_vec(), None).await?;
764
765        // Allow some time for the update to propagate
766        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
767
768        // Verify the new value
769        let value3 = kv_cache.get("key3").await;
770        assert_eq!(value3, Some(b"value3".to_vec()));
771
772        // Test update
773        kv_cache
774            .put("key1", b"updated_value1".to_vec(), None)
775            .await?;
776
777        // Allow some time for the update to propagate
778        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
779
780        // Verify the updated value
781        let updated_value1 = kv_cache.get("key1").await;
782        assert_eq!(updated_value1, Some(b"updated_value1".to_vec()));
783
784        // Test external update (simulating another client updating a value)
785        client
786            .kv_put(
787                &format!("{}key2", prefix),
788                b"external_update".to_vec(),
789                None,
790            )
791            .await?;
792
793        // Allow some time for the update to propagate
794        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
795
796        // Verify the cache was updated
797        let external_update = kv_cache.get("key2").await;
798        assert_eq!(external_update, Some(b"external_update".to_vec()));
799
800        // Clean up - delete the test keys
801        let etcd_client = client.etcd_client();
802        let _ = etcd_client
803            .kv_client()
804            .delete(
805                prefix,
806                Some(etcd_client::DeleteOptions::new().with_prefix()),
807            )
808            .await?;
809
810        Ok(())
811    }
812}