1use crate::runtime::Runtime;
5use anyhow::{Context, Result};
6
7use async_nats::jetstream::kv;
8use derive_builder::Builder;
9use derive_getters::Dissolve;
10use futures::StreamExt;
11use std::collections::HashMap;
12use std::sync::Arc;
13use tokio::sync::{RwLock, mpsc};
14use validator::Validate;
15
16use etcd_client::{
17 Certificate, Compare, CompareOp, DeleteOptions, GetOptions, Identity, LockClient, LockOptions,
18 LockResponse, PutOptions, PutResponse, TlsOptions, Txn, TxnOp, TxnOpResponse, WatchOptions,
19 WatchStream, Watcher,
20};
21pub use etcd_client::{ConnectOptions, KeyValue, LeaseClient};
22use tokio::time::{Duration, interval};
23use tokio_util::sync::CancellationToken;
24
25mod connector;
26mod lease;
27mod lock;
28
29use connector::Connector;
30use lease::*;
31pub use lock::*;
32
33use super::utils::build_in_runtime;
34use crate::config::environment_names::etcd as env_etcd;
35
36#[derive(Clone)]
38pub struct Client {
39 connector: Arc<Connector>,
40 primary_lease: u64,
41 runtime: Runtime,
42 rt: Arc<tokio::runtime::Runtime>,
46}
47
48impl std::fmt::Debug for Client {
49 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
50 write!(f, "etcd::Client primary_lease={}", self.primary_lease)
51 }
52}
53
54impl Client {
55 pub fn builder() -> ClientOptionsBuilder {
56 ClientOptionsBuilder::default()
57 }
58
59 pub async fn new(config: ClientOptions, runtime: Runtime) -> Result<Self> {
68 let token = runtime.primary_token();
69
70 let ((connector, lease_id), rt) = build_in_runtime(
71 async move {
72 let etcd_urls = config.etcd_url.clone();
73 let connect_options = config.etcd_connect_options.clone();
74
75 let connector = Connector::new(etcd_urls, connect_options)
77 .await
78 .with_context(|| {
79 format!(
80 "Unable to connect to etcd server at {}. Check etcd server status",
81 config.etcd_url.join(", ")
82 )
83 })?;
84
85 let lease_id = if config.attach_lease {
86 create_lease(connector.clone(), 10, token)
87 .await
88 .with_context(|| {
89 format!(
90 "Unable to create lease. Check etcd server status at {}",
91 config.etcd_url.join(", ")
92 )
93 })?
94 } else {
95 0
96 };
97
98 Ok((connector, lease_id))
99 },
100 1,
101 )
102 .await?;
103
104 Ok(Client {
105 connector,
106 primary_lease: lease_id,
107 rt,
108 runtime,
109 })
110 }
111
112 fn etcd_client(&self) -> etcd_client::Client {
115 self.connector.get_client()
116 }
117
118 pub fn lease_id(&self) -> u64 {
120 self.primary_lease
121 }
122
123 pub async fn kv_create(
135 &self,
136 key: &str,
137 value: Vec<u8>,
138 lease_id: Option<u64>,
139 ) -> Result<Option<u64>> {
140 let id = lease_id.unwrap_or(self.lease_id());
141 let put_options = PutOptions::new().with_lease(id as i64);
142
143 let txn = Txn::new()
145 .when(vec![Compare::version(key, CompareOp::Equal, 0)]) .and_then(vec![
147 TxnOp::put(key, value, Some(put_options)), ])
149 .or_else(vec![
150 TxnOp::get(key, None), ]);
152
153 let result = self.connector.get_client().kv_client().txn(txn).await?;
155
156 if result.succeeded() {
158 return Ok(None);
159 }
160
161 if let Some(etcd_client::TxnOpResponse::Get(get_resp)) =
163 result.op_responses().into_iter().next()
164 && let Some(kv) = get_resp.kvs().first()
165 {
166 let version = kv.version() as u64;
167 return Ok(Some(version));
168 }
169
170 for resp in result.op_responses() {
172 tracing::warn!(response = ?resp, "kv_create etcd op response");
173 }
174 anyhow::bail!("Unable to create key. Check etcd server status")
175 }
176
177 pub async fn kv_create_or_validate(
179 &self,
180 key: String,
181 value: Vec<u8>,
182 lease_id: Option<u64>,
183 ) -> Result<()> {
184 let id = lease_id.unwrap_or(self.lease_id());
185 let put_options = PutOptions::new().with_lease(id as i64);
186
187 let txn = Txn::new()
190 .when(vec![Compare::version(key.as_str(), CompareOp::Equal, 0)]) .and_then(vec![
192 TxnOp::put(key.as_str(), value.clone(), Some(put_options)), ])
194 .or_else(vec![
195 TxnOp::txn(Txn::new().when(vec![Compare::value(
197 key.as_str(),
198 CompareOp::Equal,
199 value.clone(),
200 )])),
201 ]);
202
203 let result = self.connector.get_client().kv_client().txn(txn).await?;
205
206 if result.succeeded() {
208 Ok(())
209 } else {
210 match result.op_responses().first() {
211 Some(response) => match response {
212 TxnOpResponse::Txn(response) => match response.succeeded() {
213 true => Ok(()),
214 false => anyhow::bail!(
215 "Unable to create or validate key. Check etcd server status"
216 ),
217 },
218 _ => {
219 anyhow::bail!("Unable to validate key operation. Check etcd server status")
220 }
221 },
222 None => anyhow::bail!("Unable to create or validate key. Check etcd server status"),
223 }
224 }
225 }
226
227 pub async fn kv_put(
228 &self,
229 key: impl AsRef<str>,
230 value: impl AsRef<[u8]>,
231 lease_id: Option<u64>,
232 ) -> Result<()> {
233 let id = lease_id.unwrap_or(self.lease_id());
234 let put_options = PutOptions::new().with_lease(id as i64);
235 let _ = self
236 .connector
237 .get_client()
238 .kv_client()
239 .put(key.as_ref(), value.as_ref(), Some(put_options))
240 .await?;
241 Ok(())
242 }
243
244 pub async fn kv_put_with_options(
245 &self,
246 key: impl AsRef<str>,
247 value: impl AsRef<[u8]>,
248 options: Option<PutOptions>,
249 ) -> Result<PutResponse> {
250 let options = options
251 .unwrap_or_default()
252 .with_lease(self.lease_id() as i64);
253 self.connector
254 .get_client()
255 .kv_client()
256 .put(key.as_ref(), value.as_ref(), Some(options))
257 .await
258 .map_err(|err| err.into())
259 }
260
261 pub async fn kv_get(
262 &self,
263 key: impl Into<Vec<u8>>,
264 options: Option<GetOptions>,
265 ) -> Result<Vec<KeyValue>> {
266 let mut get_response = self
267 .connector
268 .get_client()
269 .kv_client()
270 .get(key, options)
271 .await?;
272 Ok(get_response.take_kvs())
273 }
274
275 pub async fn kv_delete(
276 &self,
277 key: impl Into<Vec<u8>>,
278 options: Option<DeleteOptions>,
279 ) -> Result<u64> {
280 self.connector
281 .get_client()
282 .kv_client()
283 .delete(key, options)
284 .await
285 .map(|del_response| del_response.deleted() as u64)
286 .map_err(|err| err.into())
287 }
288
289 pub async fn kv_get_prefix(&self, prefix: impl AsRef<str>) -> Result<Vec<KeyValue>> {
290 let mut get_response = self
291 .connector
292 .get_client()
293 .kv_client()
294 .get(prefix.as_ref(), Some(GetOptions::new().with_prefix()))
295 .await?;
296
297 Ok(get_response.take_kvs())
298 }
299
300 pub async fn lock(
303 &self,
304 key: impl Into<Vec<u8>>,
305 lease_id: Option<u64>,
306 ) -> Result<LockResponse> {
307 let mut lock_client = self.connector.get_client().lock_client();
308 let id = lease_id.unwrap_or(self.lease_id());
309 let options = LockOptions::new().with_lease(id as i64);
310 lock_client
311 .lock(key, Some(options))
312 .await
313 .map_err(|err| err.into())
314 }
315
316 pub async fn unlock(&self, lock_key: impl Into<Vec<u8>>) -> Result<()> {
318 let mut lock_client = self.connector.get_client().lock_client();
319 lock_client
320 .unlock(lock_key)
321 .await
322 .map_err(|err: etcd_client::Error| anyhow::anyhow!(err))?;
323 Ok(())
324 }
325
326 pub async fn kv_watch_prefix(
328 &self,
329 prefix: impl AsRef<str> + std::fmt::Display,
330 ) -> Result<PrefixWatcher> {
331 self.watch_internal(prefix, false).await
332 }
333
334 pub async fn kv_get_and_watch_prefix(
335 &self,
336 prefix: impl AsRef<str> + std::fmt::Display,
337 ) -> Result<PrefixWatcher> {
338 self.watch_internal(prefix, true).await
339 }
340
341 async fn watch_internal(
347 &self,
348 prefix: impl AsRef<str> + std::fmt::Display,
349 include_existing: bool,
350 ) -> Result<PrefixWatcher> {
351 let (mut start_revision, existing_kvs) = self
352 .get_start_revision(prefix.as_ref(), include_existing)
353 .await?;
354
355 let existing_count = existing_kvs.as_ref().map_or(0, |kvs| kvs.len());
357 let (tx, rx) = mpsc::channel(existing_count + 32);
358
359 if let Some(kvs) = existing_kvs {
361 tracing::trace!("sending {} existing kvs", kvs.len());
362 for kv in kvs {
363 tx.send(WatchEvent::Put(kv)).await?;
364 }
365 }
366
367 let connector = self.connector.clone();
369 let prefix_str = prefix.as_ref().to_string();
370 self.rt.spawn(async move {
371 let mut reconnect = true;
372 while reconnect {
373 let watch_stream =
375 match Self::new_watch_stream(&connector, &prefix_str, start_revision).await {
376 Ok(stream) => stream,
377 Err(_) => return,
378 };
379
380 reconnect =
382 Self::monitor_watch_stream(watch_stream, &prefix_str, &mut start_revision, &tx)
383 .await;
384 }
385 });
386
387 Ok(PrefixWatcher {
388 prefix: prefix.as_ref().to_string(),
389 rx,
390 })
391 }
392
393 async fn get_start_revision(
395 &self,
396 prefix: impl AsRef<str> + std::fmt::Display,
397 include_existing: bool,
398 ) -> Result<(i64, Option<Vec<KeyValue>>)> {
399 let mut kv_client = self.connector.get_client().kv_client();
400 let mut get_response = kv_client
401 .get(prefix.as_ref(), Some(GetOptions::new().with_prefix()))
402 .await?;
403
404 let mut start_revision = get_response
406 .header()
407 .ok_or(anyhow::anyhow!("missing header; unable to get revision"))?
408 .revision();
409 tracing::trace!("{prefix}: start_revision: {start_revision}");
410 start_revision += 1;
411
412 let existing_kvs = include_existing.then(|| {
414 let kvs = get_response.take_kvs();
415 tracing::trace!("initial kv count: {:?}", kvs.len());
416 kvs
417 });
418
419 Ok((start_revision, existing_kvs))
420 }
421
422 async fn new_watch_stream(
427 connector: &Arc<Connector>,
428 prefix: &String,
429 start_revision: i64,
430 ) -> Result<WatchStream> {
431 loop {
432 match connector
433 .get_client()
434 .watch_client()
435 .watch(
436 prefix.as_str(),
437 Some(
438 WatchOptions::new()
439 .with_prefix()
440 .with_start_revision(start_revision)
441 .with_prev_key(),
442 ),
443 )
444 .await
445 {
446 Ok((_, watch_stream)) => {
447 tracing::debug!("Watch stream established for prefix '{}'", prefix);
448 return Ok(watch_stream);
449 }
450 Err(err) => {
451 tracing::debug!(error = %err, "Failed to establish watch stream for prefix '{}'", prefix);
452 let deadline = std::time::Instant::now() + Duration::from_secs(10);
453 if let Err(err) = connector.reconnect(deadline).await {
454 tracing::error!(
455 "Failed to reconnect to ETCD within 10 secs for watching prefix '{}': {}",
456 prefix,
457 err
458 );
459 return Err(err);
460 }
461 }
463 }
464 }
465 }
466
467 async fn monitor_watch_stream(
473 mut watch_stream: WatchStream,
474 prefix: &String,
475 start_revision: &mut i64,
476 tx: &mpsc::Sender<WatchEvent>,
477 ) -> bool {
478 loop {
479 tokio::select! {
480 maybe_resp = watch_stream.next() => {
481 let response = match maybe_resp {
483 Some(Ok(res)) => res,
484 Some(Err(err)) => {
485 tracing::warn!(error = %err, "Error watching stream for prefix '{}'", prefix);
486 return true; }
488 None => {
489 tracing::warn!("Watch stream unexpectedly closed for prefix '{}'", prefix);
490 return true; }
492 };
493
494 *start_revision = match response.header() {
496 Some(header) => header.revision() + 1,
497 None => {
498 tracing::error!("Missing header in watch response for prefix '{}'", prefix);
499 return false;
500 }
501 };
502
503 if Self::process_watch_events(response.events(), tx).await.is_err() {
505 return false;
506 };
507 }
508 _ = tx.closed() => {
509 tracing::debug!("no more receivers, stopping watcher");
510 return false;
511 }
512 }
513 }
514 }
515
516 async fn process_watch_events(
521 events: &[etcd_client::Event],
522 tx: &mpsc::Sender<WatchEvent>,
523 ) -> Result<()> {
524 for event in events {
525 let Some(kv) = event.kv() else {
527 continue; };
529
530 match event.event_type() {
532 etcd_client::EventType::Put => {
533 if let Err(err) = tx.send(WatchEvent::Put(kv.clone())).await {
534 tracing::error!("kv watcher error forwarding WatchEvent::Put: {err}");
535 return Err(err.into());
536 }
537 }
538 etcd_client::EventType::Delete => {
539 if tx.send(WatchEvent::Delete(kv.clone())).await.is_err() {
540 return Err(anyhow::anyhow!("failed to send WatchEvent::Delete"));
541 }
542 }
543 }
544 }
545 Ok(())
546 }
547}
548
549#[derive(Dissolve)]
550pub struct PrefixWatcher {
551 prefix: String,
552 rx: mpsc::Receiver<WatchEvent>,
553}
554
555#[derive(Debug)]
556pub enum WatchEvent {
557 Put(KeyValue),
558 Delete(KeyValue),
559}
560
561#[derive(Debug, Clone, Builder, Validate)]
563pub struct ClientOptions {
564 #[validate(length(min = 1))]
565 pub etcd_url: Vec<String>,
566
567 #[builder(default)]
568 pub etcd_connect_options: Option<ConnectOptions>,
569
570 #[builder(default = "true")]
572 pub attach_lease: bool,
573}
574
575impl Default for ClientOptions {
576 fn default() -> Self {
577 let mut connect_options = None;
578
579 if let (Ok(username), Ok(password)) = (
580 std::env::var(env_etcd::auth::ETCD_AUTH_USERNAME),
581 std::env::var(env_etcd::auth::ETCD_AUTH_PASSWORD),
582 ) {
583 connect_options = Some(ConnectOptions::new().with_user(username, password));
585 } else if let (Ok(ca), Ok(cert), Ok(key)) = (
586 std::env::var(env_etcd::auth::ETCD_AUTH_CA),
587 std::env::var(env_etcd::auth::ETCD_AUTH_CLIENT_CERT),
588 std::env::var(env_etcd::auth::ETCD_AUTH_CLIENT_KEY),
589 ) {
590 connect_options = Some(
592 ConnectOptions::new().with_tls(
593 TlsOptions::new()
594 .ca_certificate(Certificate::from_pem(ca))
595 .identity(Identity::from_pem(cert, key)),
596 ),
597 );
598 }
599
600 ClientOptions {
601 etcd_url: default_servers(),
602 etcd_connect_options: connect_options,
603 attach_lease: true,
604 }
605 }
606}
607
608fn default_servers() -> Vec<String> {
609 match std::env::var(env_etcd::ETCD_ENDPOINTS) {
610 Ok(possible_list_of_urls) => possible_list_of_urls
611 .split(',')
612 .map(|s| s.to_string())
613 .collect(),
614 Err(_) => vec!["http://localhost:2379".to_string()],
615 }
616}
617
618pub struct KvCache {
620 client: Client,
621 pub prefix: String,
622 cache: Arc<RwLock<HashMap<String, Vec<u8>>>>,
623 watcher: Option<PrefixWatcher>,
624}
625
626impl KvCache {
627 pub async fn new(
629 client: Client,
630 prefix: String,
631 initial_values: HashMap<String, Vec<u8>>,
632 ) -> Result<Self> {
633 let mut cache = HashMap::new();
634
635 let existing_kvs = client.kv_get_prefix(&prefix).await?;
637 for kv in existing_kvs {
638 let key = String::from_utf8_lossy(kv.key()).to_string();
639 cache.insert(key, kv.value().to_vec());
640 }
641
642 for (key, value) in initial_values.iter() {
647 let full_key = format!("{}{}", prefix, key);
648 if let std::collections::hash_map::Entry::Vacant(e) = cache.entry(full_key.clone()) {
649 client.kv_put(&full_key, value.clone(), None).await?;
650 e.insert(value.clone());
651 }
652 }
653
654 let watcher = client.kv_get_and_watch_prefix(&prefix).await?;
658
659 let cache = Arc::new(RwLock::new(cache));
660 let mut result = Self {
661 client,
662 prefix,
663 cache,
664 watcher: Some(watcher),
665 };
666
667 result.start_watcher().await?;
669
670 Ok(result)
671 }
672
673 async fn start_watcher(&mut self) -> Result<()> {
675 if let Some(watcher) = self.watcher.take() {
676 let cache = self.cache.clone();
677 let prefix = self.prefix.clone();
678
679 tokio::spawn(async move {
680 let mut rx = watcher.rx;
681
682 while let Some(event) = rx.recv().await {
683 match event {
684 WatchEvent::Put(kv) => {
685 let key = String::from_utf8_lossy(kv.key()).to_string();
686 let value = kv.value().to_vec();
687
688 tracing::trace!("KvCache update: {} = {:?}", key, value);
689 let mut cache_write = cache.write().await;
690 cache_write.insert(key, value);
691 }
692 WatchEvent::Delete(kv) => {
693 let key = String::from_utf8_lossy(kv.key()).to_string();
694
695 tracing::trace!("KvCache delete: {}", key);
696 let mut cache_write = cache.write().await;
697 cache_write.remove(&key);
698 }
699 }
700 }
701
702 tracing::debug!("KvCache watcher for prefix '{}' stopped", prefix);
703 });
704 }
705
706 Ok(())
707 }
708
709 pub async fn get(&self, key: &str) -> Option<Vec<u8>> {
711 let full_key = format!("{}{}", self.prefix, key);
712 let cache_read = self.cache.read().await;
713 cache_read.get(&full_key).cloned()
714 }
715
716 pub async fn get_all(&self) -> HashMap<String, Vec<u8>> {
718 let cache_read = self.cache.read().await;
719 cache_read.clone()
720 }
721
722 pub async fn put(&self, key: &str, value: Vec<u8>, lease_id: Option<u64>) -> Result<()> {
724 let full_key = format!("{}{}", self.prefix, key);
725
726 self.client
728 .kv_put(&full_key, value.clone(), lease_id)
729 .await?;
730
731 let mut cache_write = self.cache.write().await;
733 cache_write.insert(full_key, value);
734
735 Ok(())
736 }
737
738 pub async fn delete(&self, key: &str) -> Result<()> {
740 let full_key = format!("{}{}", self.prefix, key);
741
742 self.client.kv_delete(full_key.clone(), None).await?;
744
745 let mut cache_write = self.cache.write().await;
747 cache_write.remove(&full_key);
748
749 Ok(())
750 }
751}
752
753#[cfg(feature = "integration")]
754#[cfg(test)]
755mod tests {
756 use crate::{DistributedRuntime, distributed::DistributedConfig};
757
758 use super::*;
759
760 #[test]
761 fn test_ectd_client() {
762 let rt = Runtime::from_settings().unwrap();
763 let rt_clone = rt.clone();
764 let config = DistributedConfig::from_settings();
765
766 rt_clone.primary().block_on(async move {
767 let drt = DistributedRuntime::new(rt, config).await.unwrap();
768 test_kv_create_or_validate(drt).await.unwrap();
769 });
770 }
771
772 async fn test_kv_create_or_validate(drt: DistributedRuntime) -> Result<()> {
773 let key = "__integration_test_key";
774 let value = b"test_value";
775
776 let client = Client::new(ClientOptions::default(), drt.runtime().clone())
777 .await
778 .expect("etcd client should be available");
779 let lease_id = drt.connection_id();
780
781 let result = client.kv_create(key, value.to_vec(), Some(lease_id)).await;
783 assert!(result.is_ok(), "");
784
785 let result = client.kv_create(key, value.to_vec(), Some(lease_id)).await;
792 assert!(
793 result.is_ok() && result.unwrap().is_some(),
794 "Expected Ok(Some(version)) when key already exists"
795 );
796
797 let result = client
799 .kv_create_or_validate(key.to_string(), value.to_vec(), Some(lease_id))
800 .await;
801 assert!(result.is_ok());
802
803 let different_value = b"different_value";
805 let result = client
806 .kv_create_or_validate(key.to_string(), different_value.to_vec(), Some(lease_id))
807 .await;
808 assert!(result.is_err(), "");
809
810 Ok(())
811 }
812
813 #[test]
814 fn test_kv_cache() {
815 let rt = Runtime::from_settings().unwrap();
816 let rt_clone = rt.clone();
817 let config = DistributedConfig::from_settings();
818
819 rt_clone.primary().block_on(async move {
820 let drt = DistributedRuntime::new(rt, config).await.unwrap();
821 test_kv_cache_operations(drt).await.unwrap();
822 });
823 }
824
825 async fn test_kv_cache_operations(drt: DistributedRuntime) -> Result<()> {
826 let client = Client::new(ClientOptions::default(), drt.runtime().clone())
828 .await
829 .expect("etcd client should be available");
830
831 let test_id = uuid::Uuid::new_v4().to_string();
833 let prefix = format!("v1/test_kv_cache_{}/", test_id);
834
835 let mut initial_values = HashMap::new();
837 initial_values.insert("key1".to_string(), b"value1".to_vec());
838 initial_values.insert("key2".to_string(), b"value2".to_vec());
839
840 let kv_cache = KvCache::new(client.clone(), prefix.clone(), initial_values).await?;
842
843 let value1 = kv_cache.get("key1").await;
845 assert_eq!(value1, Some(b"value1".to_vec()));
846
847 let value2 = kv_cache.get("key2").await;
848 assert_eq!(value2, Some(b"value2".to_vec()));
849
850 let all_values = kv_cache.get_all().await;
852 assert_eq!(all_values.len(), 2);
853 assert_eq!(
854 all_values.get(&format!("{}key1", prefix)),
855 Some(&b"value1".to_vec())
856 );
857 assert_eq!(
858 all_values.get(&format!("{}key2", prefix)),
859 Some(&b"value2".to_vec())
860 );
861
862 kv_cache.put("key3", b"value3".to_vec(), None).await?;
864
865 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
867
868 let value3 = kv_cache.get("key3").await;
870 assert_eq!(value3, Some(b"value3".to_vec()));
871
872 kv_cache
874 .put("key1", b"updated_value1".to_vec(), None)
875 .await?;
876
877 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
879
880 let updated_value1 = kv_cache.get("key1").await;
882 assert_eq!(updated_value1, Some(b"updated_value1".to_vec()));
883
884 client
886 .kv_put(
887 &format!("{}key2", prefix),
888 b"external_update".to_vec(),
889 None,
890 )
891 .await?;
892
893 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
895
896 let external_update = kv_cache.get("key2").await;
898 assert_eq!(external_update, Some(b"external_update".to_vec()));
899
900 let etcd_client = client.etcd_client();
902 let _ = etcd_client
903 .kv_client()
904 .delete(
905 prefix,
906 Some(etcd_client::DeleteOptions::new().with_prefix()),
907 )
908 .await?;
909
910 Ok(())
911 }
912}