Skip to main content

dynamo_runtime/transports/
etcd.rs

1// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::runtime::Runtime;
5use anyhow::{Context, Result};
6
7use async_nats::jetstream::kv;
8use derive_builder::Builder;
9use derive_getters::Dissolve;
10use futures::StreamExt;
11use std::collections::HashMap;
12use std::sync::Arc;
13use tokio::sync::{RwLock, mpsc};
14use validator::Validate;
15
16use etcd_client::{
17    Certificate, Compare, CompareOp, DeleteOptions, GetOptions, Identity, LockClient, LockOptions,
18    LockResponse, PutOptions, PutResponse, TlsOptions, Txn, TxnOp, TxnOpResponse, WatchOptions,
19    WatchStream, Watcher,
20};
21pub use etcd_client::{ConnectOptions, KeyValue, LeaseClient};
22use tokio::time::{Duration, interval};
23use tokio_util::sync::CancellationToken;
24
25mod connector;
26mod lease;
27mod lock;
28
29use connector::Connector;
30use lease::*;
31pub use lock::*;
32
33use super::utils::build_in_runtime;
34use crate::config::environment_names::etcd as env_etcd;
35
36/// ETCD Client
37#[derive(Clone)]
38pub struct Client {
39    connector: Arc<Connector>,
40    primary_lease: u64,
41    runtime: Runtime,
42    // Exclusive runtime for etcd lease keep-alive and watch tasks
43    // Avoid those tasks from being starved when the main runtime is busy
44    // WARNING: Do not await on main runtime from this runtime or deadlocks may occur
45    rt: Arc<tokio::runtime::Runtime>,
46}
47
48impl std::fmt::Debug for Client {
49    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
50        write!(f, "etcd::Client primary_lease={}", self.primary_lease)
51    }
52}
53
54impl Client {
55    pub fn builder() -> ClientOptionsBuilder {
56        ClientOptionsBuilder::default()
57    }
58
59    /// Create a new discovery client
60    ///
61    /// This will establish a connection to the etcd server, create a primary lease,
62    /// and spawn a task to keep the lease alive and tie the lifetime of the [`Runtime`]
63    /// to the lease.
64    ///
65    /// If the lease expires, the [`Runtime`] will be shutdown.
66    /// If the [`Runtime`] is shutdown, the lease will be revoked.
67    pub async fn new(config: ClientOptions, runtime: Runtime) -> Result<Self> {
68        let token = runtime.primary_token();
69
70        let ((connector, lease_id), rt) = build_in_runtime(
71            async move {
72                let etcd_urls = config.etcd_url.clone();
73                let connect_options = config.etcd_connect_options.clone();
74
75                // Create the connector
76                let connector = Connector::new(etcd_urls, connect_options)
77                    .await
78                    .with_context(|| {
79                        format!(
80                            "Unable to connect to etcd server at {}. Check etcd server status",
81                            config.etcd_url.join(", ")
82                        )
83                    })?;
84
85                let lease_id = if config.attach_lease {
86                    create_lease(connector.clone(), 10, token)
87                        .await
88                        .with_context(|| {
89                            format!(
90                                "Unable to create lease. Check etcd server status at {}",
91                                config.etcd_url.join(", ")
92                            )
93                        })?
94                } else {
95                    0
96                };
97
98                Ok((connector, lease_id))
99            },
100            1,
101        )
102        .await?;
103
104        Ok(Client {
105            connector,
106            primary_lease: lease_id,
107            rt,
108            runtime,
109        })
110    }
111
112    /// Get a clone of the underlying [`etcd_client::Client`] instance.
113    /// This returns a clone since the client is behind an RwLock.
114    fn etcd_client(&self) -> etcd_client::Client {
115        self.connector.get_client()
116    }
117
118    /// Get the primary lease ID.
119    pub fn lease_id(&self) -> u64 {
120        self.primary_lease
121    }
122
123    /// Atomically create a key-value pair if it doesn't already exist.
124    ///
125    /// Returns:
126    /// - `Ok(None)` if the key was successfully created
127    /// - `Ok(Some(version))` if the key already exists (returns the existing version)
128    /// - `Err(...)` only on actual errors (connection failure, timeout, etc.)
129    ///
130    /// This idempotent behavior was introduced in PR #4212 (Nov 10, 2025) to align with
131    /// the StoreOutcome pattern used in KeyValueStore implementations, where both
132    /// Created and Exists are successful outcomes rather than errors. This design supports
133    /// distributed systems where multiple processes might attempt to create the same key.
134    pub async fn kv_create(
135        &self,
136        key: &str,
137        value: Vec<u8>,
138        lease_id: Option<u64>,
139    ) -> Result<Option<u64>> {
140        let id = lease_id.unwrap_or(self.lease_id());
141        let put_options = PutOptions::new().with_lease(id as i64);
142
143        // Build transaction that creates key only if it doesn't exist
144        let txn = Txn::new()
145            .when(vec![Compare::version(key, CompareOp::Equal, 0)]) // Ensure the lock does not exist
146            .and_then(vec![
147                TxnOp::put(key, value, Some(put_options)), // Create the object
148            ])
149            .or_else(vec![
150                TxnOp::get(key, None), // Key exists, get its info
151            ]);
152
153        // Execute the transaction
154        let result = self.connector.get_client().kv_client().txn(txn).await?;
155
156        // Created
157        if result.succeeded() {
158            return Ok(None);
159        }
160
161        // Already exists
162        if let Some(etcd_client::TxnOpResponse::Get(get_resp)) =
163            result.op_responses().into_iter().next()
164            && let Some(kv) = get_resp.kvs().first()
165        {
166            let version = kv.version() as u64;
167            return Ok(Some(version));
168        }
169
170        // Error
171        for resp in result.op_responses() {
172            tracing::warn!(response = ?resp, "kv_create etcd op response");
173        }
174        anyhow::bail!("Unable to create key. Check etcd server status")
175    }
176
177    /// Atomically create a key if it does not exist, or validate the values are identical if the key exists.
178    pub async fn kv_create_or_validate(
179        &self,
180        key: String,
181        value: Vec<u8>,
182        lease_id: Option<u64>,
183    ) -> Result<()> {
184        let id = lease_id.unwrap_or(self.lease_id());
185        let put_options = PutOptions::new().with_lease(id as i64);
186
187        // Build the transaction that either creates the key if it doesn't exist,
188        // or validates the existing value matches what we expect
189        let txn = Txn::new()
190            .when(vec![Compare::version(key.as_str(), CompareOp::Equal, 0)]) // Key doesn't exist
191            .and_then(vec![
192                TxnOp::put(key.as_str(), value.clone(), Some(put_options)), // Create it
193            ])
194            .or_else(vec![
195                // If key exists but values don't match, this will fail the transaction
196                TxnOp::txn(Txn::new().when(vec![Compare::value(
197                    key.as_str(),
198                    CompareOp::Equal,
199                    value.clone(),
200                )])),
201            ]);
202
203        // Execute the transaction
204        let result = self.connector.get_client().kv_client().txn(txn).await?;
205
206        // We have to enumerate the response paths to determine if the transaction succeeded
207        if result.succeeded() {
208            Ok(())
209        } else {
210            match result.op_responses().first() {
211                Some(response) => match response {
212                    TxnOpResponse::Txn(response) => match response.succeeded() {
213                        true => Ok(()),
214                        false => anyhow::bail!(
215                            "Unable to create or validate key. Check etcd server status"
216                        ),
217                    },
218                    _ => {
219                        anyhow::bail!("Unable to validate key operation. Check etcd server status")
220                    }
221                },
222                None => anyhow::bail!("Unable to create or validate key. Check etcd server status"),
223            }
224        }
225    }
226
227    pub async fn kv_put(
228        &self,
229        key: impl AsRef<str>,
230        value: impl AsRef<[u8]>,
231        lease_id: Option<u64>,
232    ) -> Result<()> {
233        let id = lease_id.unwrap_or(self.lease_id());
234        let put_options = PutOptions::new().with_lease(id as i64);
235        let _ = self
236            .connector
237            .get_client()
238            .kv_client()
239            .put(key.as_ref(), value.as_ref(), Some(put_options))
240            .await?;
241        Ok(())
242    }
243
244    pub async fn kv_put_with_options(
245        &self,
246        key: impl AsRef<str>,
247        value: impl AsRef<[u8]>,
248        options: Option<PutOptions>,
249    ) -> Result<PutResponse> {
250        let options = options
251            .unwrap_or_default()
252            .with_lease(self.lease_id() as i64);
253        self.connector
254            .get_client()
255            .kv_client()
256            .put(key.as_ref(), value.as_ref(), Some(options))
257            .await
258            .map_err(|err| err.into())
259    }
260
261    pub async fn kv_get(
262        &self,
263        key: impl Into<Vec<u8>>,
264        options: Option<GetOptions>,
265    ) -> Result<Vec<KeyValue>> {
266        let mut get_response = self
267            .connector
268            .get_client()
269            .kv_client()
270            .get(key, options)
271            .await?;
272        Ok(get_response.take_kvs())
273    }
274
275    pub async fn kv_delete(
276        &self,
277        key: impl Into<Vec<u8>>,
278        options: Option<DeleteOptions>,
279    ) -> Result<u64> {
280        self.connector
281            .get_client()
282            .kv_client()
283            .delete(key, options)
284            .await
285            .map(|del_response| del_response.deleted() as u64)
286            .map_err(|err| err.into())
287    }
288
289    pub async fn kv_get_prefix(&self, prefix: impl AsRef<str>) -> Result<Vec<KeyValue>> {
290        let mut get_response = self
291            .connector
292            .get_client()
293            .kv_client()
294            .get(prefix.as_ref(), Some(GetOptions::new().with_prefix()))
295            .await?;
296
297        Ok(get_response.take_kvs())
298    }
299
300    /// Acquire a distributed lock using etcd's native lock mechanism
301    /// Returns a LockResponse that can be used to unlock later
302    pub async fn lock(
303        &self,
304        key: impl Into<Vec<u8>>,
305        lease_id: Option<u64>,
306    ) -> Result<LockResponse> {
307        let mut lock_client = self.connector.get_client().lock_client();
308        let id = lease_id.unwrap_or(self.lease_id());
309        let options = LockOptions::new().with_lease(id as i64);
310        lock_client
311            .lock(key, Some(options))
312            .await
313            .map_err(|err| err.into())
314    }
315
316    /// Release a distributed lock using the key from the LockResponse
317    pub async fn unlock(&self, lock_key: impl Into<Vec<u8>>) -> Result<()> {
318        let mut lock_client = self.connector.get_client().lock_client();
319        lock_client
320            .unlock(lock_key)
321            .await
322            .map_err(|err: etcd_client::Error| anyhow::anyhow!(err))?;
323        Ok(())
324    }
325
326    /// Like kv_get_and_watch_prefix but only for new changes, does not include existing values.
327    pub async fn kv_watch_prefix(
328        &self,
329        prefix: impl AsRef<str> + std::fmt::Display,
330    ) -> Result<PrefixWatcher> {
331        self.watch_internal(prefix, false).await
332    }
333
334    pub async fn kv_get_and_watch_prefix(
335        &self,
336        prefix: impl AsRef<str> + std::fmt::Display,
337    ) -> Result<PrefixWatcher> {
338        self.watch_internal(prefix, true).await
339    }
340
341    /// Core watch implementation that sets up a resilient watcher for a key prefix.
342    ///
343    /// Creates a background task that maintains a watch stream with automatic reconnection
344    /// on recoverable errors. If `include_existing` is true, existing keys are included
345    /// in the initial watch events.
346    async fn watch_internal(
347        &self,
348        prefix: impl AsRef<str> + std::fmt::Display,
349        include_existing: bool,
350    ) -> Result<PrefixWatcher> {
351        let (mut start_revision, existing_kvs) = self
352            .get_start_revision(prefix.as_ref(), include_existing)
353            .await?;
354
355        // Size channel to fit all existing KVs (avoids deadlock when sending before return)
356        let existing_count = existing_kvs.as_ref().map_or(0, |kvs| kvs.len());
357        let (tx, rx) = mpsc::channel(existing_count + 32);
358
359        // Send existing KVs before returning so they're immediately available to consumers
360        if let Some(kvs) = existing_kvs {
361            tracing::trace!("sending {} existing kvs", kvs.len());
362            for kv in kvs {
363                tx.send(WatchEvent::Put(kv)).await?;
364            }
365        }
366
367        // Watch for new events in background
368        let connector = self.connector.clone();
369        let prefix_str = prefix.as_ref().to_string();
370        self.rt.spawn(async move {
371            let mut reconnect = true;
372            while reconnect {
373                // Start a new watch stream
374                let watch_stream =
375                    match Self::new_watch_stream(&connector, &prefix_str, start_revision).await {
376                        Ok(stream) => stream,
377                        Err(_) => return,
378                    };
379
380                // Watch the stream
381                reconnect =
382                    Self::monitor_watch_stream(watch_stream, &prefix_str, &mut start_revision, &tx)
383                        .await;
384            }
385        });
386
387        Ok(PrefixWatcher {
388            prefix: prefix.as_ref().to_string(),
389            rx,
390        })
391    }
392
393    /// Fetch the start revision and optionally return existing key-values.
394    async fn get_start_revision(
395        &self,
396        prefix: impl AsRef<str> + std::fmt::Display,
397        include_existing: bool,
398    ) -> Result<(i64, Option<Vec<KeyValue>>)> {
399        let mut kv_client = self.connector.get_client().kv_client();
400        let mut get_response = kv_client
401            .get(prefix.as_ref(), Some(GetOptions::new().with_prefix()))
402            .await?;
403
404        // Get the start revision
405        let mut start_revision = get_response
406            .header()
407            .ok_or(anyhow::anyhow!("missing header; unable to get revision"))?
408            .revision();
409        tracing::trace!("{prefix}: start_revision: {start_revision}");
410        start_revision += 1;
411
412        // Return existing KVs if requested
413        let existing_kvs = include_existing.then(|| {
414            let kvs = get_response.take_kvs();
415            tracing::trace!("initial kv count: {:?}", kvs.len());
416            kvs
417        });
418
419        Ok((start_revision, existing_kvs))
420    }
421
422    /// Establish a new watch stream with automatic retry and reconnection.
423    ///
424    /// Attempts to create a watch stream, reconnecting to ETCD if necessary.
425    /// Uses a 10-second timeout for reconnection attempts before giving up.
426    async fn new_watch_stream(
427        connector: &Arc<Connector>,
428        prefix: &String,
429        start_revision: i64,
430    ) -> Result<WatchStream> {
431        loop {
432            match connector
433                .get_client()
434                .watch_client()
435                .watch(
436                    prefix.as_str(),
437                    Some(
438                        WatchOptions::new()
439                            .with_prefix()
440                            .with_start_revision(start_revision)
441                            .with_prev_key(),
442                    ),
443                )
444                .await
445            {
446                Ok((_, watch_stream)) => {
447                    tracing::debug!("Watch stream established for prefix '{}'", prefix);
448                    return Ok(watch_stream);
449                }
450                Err(err) => {
451                    tracing::debug!(error = %err, "Failed to establish watch stream for prefix '{}'", prefix);
452                    let deadline = std::time::Instant::now() + Duration::from_secs(10);
453                    if let Err(err) = connector.reconnect(deadline).await {
454                        tracing::error!(
455                            "Failed to reconnect to ETCD within 10 secs for watching prefix '{}': {}",
456                            prefix,
457                            err
458                        );
459                        return Err(err);
460                    }
461                    // continue - retry establishing the watch stream
462                }
463            }
464        }
465    }
466
467    /// Monitor a watch stream and forward events to receivers.
468    ///
469    /// Returns `true` for recoverable errors (network issues, stream closure) that warrant
470    /// reconnection attempts. Returns `false` for permanent failures (protocol violations,
471    /// channel errors, no receivers) where watching should stop.
472    async fn monitor_watch_stream(
473        mut watch_stream: WatchStream,
474        prefix: &String,
475        start_revision: &mut i64,
476        tx: &mpsc::Sender<WatchEvent>,
477    ) -> bool {
478        loop {
479            tokio::select! {
480                maybe_resp = watch_stream.next() => {
481                    // Handle the watch response
482                    let response = match maybe_resp {
483                        Some(Ok(res)) => res,
484                        Some(Err(err)) => {
485                            tracing::warn!(error = %err, "Error watching stream for prefix '{}'", prefix);
486                            return true; // Exit to reconnect
487                        }
488                        None => {
489                            tracing::warn!("Watch stream unexpectedly closed for prefix '{}'", prefix);
490                            return true; // Exit to reconnect
491                        }
492                    };
493
494                    // Update revision for reconnect
495                    *start_revision = match response.header() {
496                        Some(header) => header.revision() + 1,
497                        None => {
498                            tracing::error!("Missing header in watch response for prefix '{}'", prefix);
499                            return false;
500                        }
501                    };
502
503                    // Process events
504                    if Self::process_watch_events(response.events(), tx).await.is_err() {
505                        return false;
506                    };
507                }
508                _ = tx.closed() => {
509                    tracing::debug!("no more receivers, stopping watcher");
510                    return false;
511                }
512            }
513        }
514    }
515
516    /// Process etcd events and forward them as Put/Delete watch events.
517    ///
518    /// Filters out events without key-values and transforms etcd events into
519    /// appropriate WatchEvent types for channel transmission.
520    async fn process_watch_events(
521        events: &[etcd_client::Event],
522        tx: &mpsc::Sender<WatchEvent>,
523    ) -> Result<()> {
524        for event in events {
525            // Extract the KeyValue if it exists
526            let Some(kv) = event.kv() else {
527                continue; // Skip events with no KV
528            };
529
530            // Handle based on event type
531            match event.event_type() {
532                etcd_client::EventType::Put => {
533                    if let Err(err) = tx.send(WatchEvent::Put(kv.clone())).await {
534                        tracing::error!("kv watcher error forwarding WatchEvent::Put: {err}");
535                        return Err(err.into());
536                    }
537                }
538                etcd_client::EventType::Delete => {
539                    if tx.send(WatchEvent::Delete(kv.clone())).await.is_err() {
540                        return Err(anyhow::anyhow!("failed to send WatchEvent::Delete"));
541                    }
542                }
543            }
544        }
545        Ok(())
546    }
547}
548
549#[derive(Dissolve)]
550pub struct PrefixWatcher {
551    prefix: String,
552    rx: mpsc::Receiver<WatchEvent>,
553}
554
555#[derive(Debug)]
556pub enum WatchEvent {
557    Put(KeyValue),
558    Delete(KeyValue),
559}
560
561/// ETCD client configuration options
562#[derive(Debug, Clone, Builder, Validate)]
563pub struct ClientOptions {
564    #[validate(length(min = 1))]
565    pub etcd_url: Vec<String>,
566
567    #[builder(default)]
568    pub etcd_connect_options: Option<ConnectOptions>,
569
570    /// If true, the client will attach a lease to the primary [`CancellationToken`].
571    #[builder(default = "true")]
572    pub attach_lease: bool,
573}
574
575impl Default for ClientOptions {
576    fn default() -> Self {
577        let mut connect_options = None;
578
579        if let (Ok(username), Ok(password)) = (
580            std::env::var(env_etcd::auth::ETCD_AUTH_USERNAME),
581            std::env::var(env_etcd::auth::ETCD_AUTH_PASSWORD),
582        ) {
583            // username and password are set
584            connect_options = Some(ConnectOptions::new().with_user(username, password));
585        } else if let (Ok(ca), Ok(cert), Ok(key)) = (
586            std::env::var(env_etcd::auth::ETCD_AUTH_CA),
587            std::env::var(env_etcd::auth::ETCD_AUTH_CLIENT_CERT),
588            std::env::var(env_etcd::auth::ETCD_AUTH_CLIENT_KEY),
589        ) {
590            // TLS is set
591            connect_options = Some(
592                ConnectOptions::new().with_tls(
593                    TlsOptions::new()
594                        .ca_certificate(Certificate::from_pem(ca))
595                        .identity(Identity::from_pem(cert, key)),
596                ),
597            );
598        }
599
600        ClientOptions {
601            etcd_url: default_servers(),
602            etcd_connect_options: connect_options,
603            attach_lease: true,
604        }
605    }
606}
607
608fn default_servers() -> Vec<String> {
609    match std::env::var(env_etcd::ETCD_ENDPOINTS) {
610        Ok(possible_list_of_urls) => possible_list_of_urls
611            .split(',')
612            .map(|s| s.to_string())
613            .collect(),
614        Err(_) => vec!["http://localhost:2379".to_string()],
615    }
616}
617
618/// A cache for etcd key-value pairs that watches for changes
619pub struct KvCache {
620    client: Client,
621    pub prefix: String,
622    cache: Arc<RwLock<HashMap<String, Vec<u8>>>>,
623    watcher: Option<PrefixWatcher>,
624}
625
626impl KvCache {
627    /// Create a new KV cache for the given prefix
628    pub async fn new(
629        client: Client,
630        prefix: String,
631        initial_values: HashMap<String, Vec<u8>>,
632    ) -> Result<Self> {
633        let mut cache = HashMap::new();
634
635        // First get all existing keys with this prefix
636        let existing_kvs = client.kv_get_prefix(&prefix).await?;
637        for kv in existing_kvs {
638            let key = String::from_utf8_lossy(kv.key()).to_string();
639            cache.insert(key, kv.value().to_vec());
640        }
641
642        // For any keys in initial_values that don't exist in etcd, write them
643        // TODO: proper lease handling, this requires the first process that write to a prefix atomically
644        // create a lease and write the lease to etcd. Later processes will attach to the lease and
645        // help refresh the lease.
646        for (key, value) in initial_values.iter() {
647            let full_key = format!("{}{}", prefix, key);
648            if let std::collections::hash_map::Entry::Vacant(e) = cache.entry(full_key.clone()) {
649                client.kv_put(&full_key, value.clone(), None).await?;
650                e.insert(value.clone());
651            }
652        }
653
654        // Start watching for changes
655        // we won't miss events between the initial push and the watcher starting because
656        // client.kv_get_and_watch_prefix() will get all kv pairs and put them back again
657        let watcher = client.kv_get_and_watch_prefix(&prefix).await?;
658
659        let cache = Arc::new(RwLock::new(cache));
660        let mut result = Self {
661            client,
662            prefix,
663            cache,
664            watcher: Some(watcher),
665        };
666
667        // Start the background watcher task
668        result.start_watcher().await?;
669
670        Ok(result)
671    }
672
673    /// Start the background watcher task
674    async fn start_watcher(&mut self) -> Result<()> {
675        if let Some(watcher) = self.watcher.take() {
676            let cache = self.cache.clone();
677            let prefix = self.prefix.clone();
678
679            tokio::spawn(async move {
680                let mut rx = watcher.rx;
681
682                while let Some(event) = rx.recv().await {
683                    match event {
684                        WatchEvent::Put(kv) => {
685                            let key = String::from_utf8_lossy(kv.key()).to_string();
686                            let value = kv.value().to_vec();
687
688                            tracing::trace!("KvCache update: {} = {:?}", key, value);
689                            let mut cache_write = cache.write().await;
690                            cache_write.insert(key, value);
691                        }
692                        WatchEvent::Delete(kv) => {
693                            let key = String::from_utf8_lossy(kv.key()).to_string();
694
695                            tracing::trace!("KvCache delete: {}", key);
696                            let mut cache_write = cache.write().await;
697                            cache_write.remove(&key);
698                        }
699                    }
700                }
701
702                tracing::debug!("KvCache watcher for prefix '{}' stopped", prefix);
703            });
704        }
705
706        Ok(())
707    }
708
709    /// Get a value from the cache
710    pub async fn get(&self, key: &str) -> Option<Vec<u8>> {
711        let full_key = format!("{}{}", self.prefix, key);
712        let cache_read = self.cache.read().await;
713        cache_read.get(&full_key).cloned()
714    }
715
716    /// Get all key-value pairs in the cache
717    pub async fn get_all(&self) -> HashMap<String, Vec<u8>> {
718        let cache_read = self.cache.read().await;
719        cache_read.clone()
720    }
721
722    /// Update a value in both the cache and etcd
723    pub async fn put(&self, key: &str, value: Vec<u8>, lease_id: Option<u64>) -> Result<()> {
724        let full_key = format!("{}{}", self.prefix, key);
725
726        // Update etcd first
727        self.client
728            .kv_put(&full_key, value.clone(), lease_id)
729            .await?;
730
731        // Then update local cache
732        let mut cache_write = self.cache.write().await;
733        cache_write.insert(full_key, value);
734
735        Ok(())
736    }
737
738    /// Delete a key from both the cache and etcd
739    pub async fn delete(&self, key: &str) -> Result<()> {
740        let full_key = format!("{}{}", self.prefix, key);
741
742        // Delete from etcd first
743        self.client.kv_delete(full_key.clone(), None).await?;
744
745        // Then remove from local cache
746        let mut cache_write = self.cache.write().await;
747        cache_write.remove(&full_key);
748
749        Ok(())
750    }
751}
752
753#[cfg(feature = "integration")]
754#[cfg(test)]
755mod tests {
756    use crate::{DistributedRuntime, distributed::DistributedConfig};
757
758    use super::*;
759
760    #[test]
761    fn test_ectd_client() {
762        let rt = Runtime::from_settings().unwrap();
763        let rt_clone = rt.clone();
764        let config = DistributedConfig::from_settings();
765
766        rt_clone.primary().block_on(async move {
767            let drt = DistributedRuntime::new(rt, config).await.unwrap();
768            test_kv_create_or_validate(drt).await.unwrap();
769        });
770    }
771
772    async fn test_kv_create_or_validate(drt: DistributedRuntime) -> Result<()> {
773        let key = "__integration_test_key";
774        let value = b"test_value";
775
776        let client = Client::new(ClientOptions::default(), drt.runtime().clone())
777            .await
778            .expect("etcd client should be available");
779        let lease_id = drt.connection_id();
780
781        // Create the key
782        let result = client.kv_create(key, value.to_vec(), Some(lease_id)).await;
783        assert!(result.is_ok(), "");
784
785        // Try to create the key again - this should return Ok(Some(version)) indicating key already exists
786        // Note: Prior to PR #4212 (Nov 10, 2025), kv_create returned Err when key existed.
787        // PR #4212 changed the behavior to return Ok(Some(version)) for idempotency, matching
788        // the StoreOutcome::Exists pattern used in the KeyValueStore abstraction.
789        // The transaction now includes .or_else(TxnOp::get) to retrieve existing key info
790        // instead of failing, making the operation idempotent for distributed systems.
791        let result = client.kv_create(key, value.to_vec(), Some(lease_id)).await;
792        assert!(
793            result.is_ok() && result.unwrap().is_some(),
794            "Expected Ok(Some(version)) when key already exists"
795        );
796
797        // Create or validate should succeed as the values match
798        let result = client
799            .kv_create_or_validate(key.to_string(), value.to_vec(), Some(lease_id))
800            .await;
801        assert!(result.is_ok());
802
803        // Try to create the key with a different value
804        let different_value = b"different_value";
805        let result = client
806            .kv_create_or_validate(key.to_string(), different_value.to_vec(), Some(lease_id))
807            .await;
808        assert!(result.is_err(), "");
809
810        Ok(())
811    }
812
813    #[test]
814    fn test_kv_cache() {
815        let rt = Runtime::from_settings().unwrap();
816        let rt_clone = rt.clone();
817        let config = DistributedConfig::from_settings();
818
819        rt_clone.primary().block_on(async move {
820            let drt = DistributedRuntime::new(rt, config).await.unwrap();
821            test_kv_cache_operations(drt).await.unwrap();
822        });
823    }
824
825    async fn test_kv_cache_operations(drt: DistributedRuntime) -> Result<()> {
826        // Make the client and unwrap it
827        let client = Client::new(ClientOptions::default(), drt.runtime().clone())
828            .await
829            .expect("etcd client should be available");
830
831        // Create a unique test prefix to avoid conflicts with other tests
832        let test_id = uuid::Uuid::new_v4().to_string();
833        let prefix = format!("v1/test_kv_cache_{}/", test_id);
834
835        // Initial values
836        let mut initial_values = HashMap::new();
837        initial_values.insert("key1".to_string(), b"value1".to_vec());
838        initial_values.insert("key2".to_string(), b"value2".to_vec());
839
840        // Create the KV cache
841        let kv_cache = KvCache::new(client.clone(), prefix.clone(), initial_values).await?;
842
843        // Test get
844        let value1 = kv_cache.get("key1").await;
845        assert_eq!(value1, Some(b"value1".to_vec()));
846
847        let value2 = kv_cache.get("key2").await;
848        assert_eq!(value2, Some(b"value2".to_vec()));
849
850        // Test get_all
851        let all_values = kv_cache.get_all().await;
852        assert_eq!(all_values.len(), 2);
853        assert_eq!(
854            all_values.get(&format!("{}key1", prefix)),
855            Some(&b"value1".to_vec())
856        );
857        assert_eq!(
858            all_values.get(&format!("{}key2", prefix)),
859            Some(&b"value2".to_vec())
860        );
861
862        // Test put - using None for lease_id
863        kv_cache.put("key3", b"value3".to_vec(), None).await?;
864
865        // Allow some time for the update to propagate
866        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
867
868        // Verify the new value
869        let value3 = kv_cache.get("key3").await;
870        assert_eq!(value3, Some(b"value3".to_vec()));
871
872        // Test update
873        kv_cache
874            .put("key1", b"updated_value1".to_vec(), None)
875            .await?;
876
877        // Allow some time for the update to propagate
878        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
879
880        // Verify the updated value
881        let updated_value1 = kv_cache.get("key1").await;
882        assert_eq!(updated_value1, Some(b"updated_value1".to_vec()));
883
884        // Test external update (simulating another client updating a value)
885        client
886            .kv_put(
887                &format!("{}key2", prefix),
888                b"external_update".to_vec(),
889                None,
890            )
891            .await?;
892
893        // Allow some time for the update to propagate
894        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
895
896        // Verify the cache was updated
897        let external_update = kv_cache.get("key2").await;
898        assert_eq!(external_update, Some(b"external_update".to_vec()));
899
900        // Clean up - delete the test keys
901        let etcd_client = client.etcd_client();
902        let _ = etcd_client
903            .kv_client()
904            .delete(
905                prefix,
906                Some(etcd_client::DeleteOptions::new().with_prefix()),
907            )
908            .await?;
909
910        Ok(())
911    }
912}