1use crate::runtime::Runtime;
5use anyhow::{Context, Result};
6
7use async_nats::jetstream::kv;
8use derive_builder::Builder;
9use derive_getters::Dissolve;
10use futures::StreamExt;
11use std::collections::HashMap;
12use std::sync::Arc;
13use tokio::sync::{RwLock, mpsc};
14use validator::Validate;
15
16use etcd_client::{
17 Certificate, Compare, CompareOp, DeleteOptions, GetOptions, Identity, LockClient, LockOptions,
18 LockResponse, PutOptions, PutResponse, TlsOptions, Txn, TxnOp, TxnOpResponse, WatchOptions,
19 WatchStream, Watcher,
20};
21pub use etcd_client::{ConnectOptions, KeyValue, LeaseClient};
22use tokio::time::{Duration, interval};
23use tokio_util::sync::CancellationToken;
24
25mod connector;
26mod lease;
27mod lock;
28mod path;
29
30use connector::Connector;
31use lease::*;
32pub use lock::*;
33pub use path::*;
34
35use super::utils::build_in_runtime;
36
37#[derive(Clone)]
39pub struct Client {
40 connector: Arc<Connector>,
41 primary_lease: u64,
42 runtime: Runtime,
43 rt: Arc<tokio::runtime::Runtime>,
47}
48
49impl std::fmt::Debug for Client {
50 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
51 write!(f, "etcd::Client primary_lease={}", self.primary_lease)
52 }
53}
54
55impl Client {
56 pub fn builder() -> ClientOptionsBuilder {
57 ClientOptionsBuilder::default()
58 }
59
60 pub async fn new(config: ClientOptions, runtime: Runtime) -> Result<Self> {
69 let token = runtime.primary_token();
70
71 let ((connector, lease_id), rt) = build_in_runtime(
72 async move {
73 let etcd_urls = config.etcd_url.clone();
74 let connect_options = config.etcd_connect_options.clone();
75
76 let connector = Connector::new(etcd_urls, connect_options)
78 .await
79 .with_context(|| {
80 format!(
81 "Unable to connect to etcd server at {}. Check etcd server status",
82 config.etcd_url.join(", ")
83 )
84 })?;
85
86 let lease_id = if config.attach_lease {
87 create_lease(connector.clone(), 10, token)
88 .await
89 .with_context(|| {
90 format!(
91 "Unable to create lease. Check etcd server status at {}",
92 config.etcd_url.join(", ")
93 )
94 })?
95 } else {
96 0
97 };
98
99 Ok((connector, lease_id))
100 },
101 1,
102 )
103 .await?;
104
105 Ok(Client {
106 connector,
107 primary_lease: lease_id,
108 rt,
109 runtime,
110 })
111 }
112
113 pub fn etcd_client(&self) -> etcd_client::Client {
116 self.connector.get_client()
117 }
118
119 pub fn lease_id(&self) -> u64 {
121 self.primary_lease
122 }
123
124 pub async fn kv_create(&self, key: &str, value: Vec<u8>, lease_id: Option<u64>) -> Result<()> {
125 let id = lease_id.unwrap_or(self.lease_id());
126 let put_options = PutOptions::new().with_lease(id as i64);
127
128 let txn = Txn::new()
130 .when(vec![Compare::version(key, CompareOp::Equal, 0)]) .and_then(vec![
132 TxnOp::put(key, value, Some(put_options)), ]);
134
135 let result = self.connector.get_client().kv_client().txn(txn).await?;
137
138 if result.succeeded() {
139 Ok(())
140 } else {
141 for resp in result.op_responses() {
142 tracing::warn!(response = ?resp, "kv_create etcd op response");
143 }
144 anyhow::bail!("Unable to create key. Check etcd server status")
145 }
146 }
147
148 pub async fn kv_create_or_validate(
150 &self,
151 key: String,
152 value: Vec<u8>,
153 lease_id: Option<u64>,
154 ) -> Result<()> {
155 let id = lease_id.unwrap_or(self.lease_id());
156 let put_options = PutOptions::new().with_lease(id as i64);
157
158 let txn = Txn::new()
161 .when(vec![Compare::version(key.as_str(), CompareOp::Equal, 0)]) .and_then(vec![
163 TxnOp::put(key.as_str(), value.clone(), Some(put_options)), ])
165 .or_else(vec![
166 TxnOp::txn(Txn::new().when(vec![Compare::value(
168 key.as_str(),
169 CompareOp::Equal,
170 value.clone(),
171 )])),
172 ]);
173
174 let result = self.connector.get_client().kv_client().txn(txn).await?;
176
177 if result.succeeded() {
179 Ok(())
180 } else {
181 match result.op_responses().first() {
182 Some(response) => match response {
183 TxnOpResponse::Txn(response) => match response.succeeded() {
184 true => Ok(()),
185 false => anyhow::bail!(
186 "Unable to create or validate key. Check etcd server status"
187 ),
188 },
189 _ => {
190 anyhow::bail!("Unable to validate key operation. Check etcd server status")
191 }
192 },
193 None => anyhow::bail!("Unable to create or validate key. Check etcd server status"),
194 }
195 }
196 }
197
198 pub async fn kv_put(
199 &self,
200 key: impl AsRef<str>,
201 value: impl AsRef<[u8]>,
202 lease_id: Option<u64>,
203 ) -> Result<()> {
204 let id = lease_id.unwrap_or(self.lease_id());
205 let put_options = PutOptions::new().with_lease(id as i64);
206 let _ = self
207 .connector
208 .get_client()
209 .kv_client()
210 .put(key.as_ref(), value.as_ref(), Some(put_options))
211 .await?;
212 Ok(())
213 }
214
215 pub async fn kv_put_with_options(
216 &self,
217 key: impl AsRef<str>,
218 value: impl AsRef<[u8]>,
219 options: Option<PutOptions>,
220 ) -> Result<PutResponse> {
221 let options = options
222 .unwrap_or_default()
223 .with_lease(self.lease_id() as i64);
224 self.connector
225 .get_client()
226 .kv_client()
227 .put(key.as_ref(), value.as_ref(), Some(options))
228 .await
229 .map_err(|err| err.into())
230 }
231
232 pub async fn kv_get(
233 &self,
234 key: impl Into<Vec<u8>>,
235 options: Option<GetOptions>,
236 ) -> Result<Vec<KeyValue>> {
237 let mut get_response = self
238 .connector
239 .get_client()
240 .kv_client()
241 .get(key, options)
242 .await?;
243 Ok(get_response.take_kvs())
244 }
245
246 pub async fn kv_delete(
247 &self,
248 key: impl Into<Vec<u8>>,
249 options: Option<DeleteOptions>,
250 ) -> Result<u64> {
251 self.connector
252 .get_client()
253 .kv_client()
254 .delete(key, options)
255 .await
256 .map(|del_response| del_response.deleted() as u64)
257 .map_err(|err| err.into())
258 }
259
260 pub async fn kv_get_prefix(&self, prefix: impl AsRef<str>) -> Result<Vec<KeyValue>> {
261 let mut get_response = self
262 .connector
263 .get_client()
264 .kv_client()
265 .get(prefix.as_ref(), Some(GetOptions::new().with_prefix()))
266 .await?;
267
268 Ok(get_response.take_kvs())
269 }
270
271 pub async fn lock(
274 &self,
275 key: impl Into<Vec<u8>>,
276 lease_id: Option<u64>,
277 ) -> Result<LockResponse> {
278 let mut lock_client = self.connector.get_client().lock_client();
279 let id = lease_id.unwrap_or(self.lease_id());
280 let options = LockOptions::new().with_lease(id as i64);
281 lock_client
282 .lock(key, Some(options))
283 .await
284 .map_err(|err| err.into())
285 }
286
287 pub async fn unlock(&self, lock_key: impl Into<Vec<u8>>) -> Result<()> {
289 let mut lock_client = self.connector.get_client().lock_client();
290 lock_client
291 .unlock(lock_key)
292 .await
293 .map_err(|err: etcd_client::Error| anyhow::anyhow!(err))?;
294 Ok(())
295 }
296
297 pub async fn kv_watch_prefix(
299 &self,
300 prefix: impl AsRef<str> + std::fmt::Display,
301 ) -> Result<PrefixWatcher> {
302 self.watch_internal(prefix, false).await
303 }
304
305 pub async fn kv_get_and_watch_prefix(
306 &self,
307 prefix: impl AsRef<str> + std::fmt::Display,
308 ) -> Result<PrefixWatcher> {
309 self.watch_internal(prefix, true).await
310 }
311
312 async fn watch_internal(
318 &self,
319 prefix: impl AsRef<str> + std::fmt::Display,
320 include_existing: bool,
321 ) -> Result<PrefixWatcher> {
322 let (tx, rx) = mpsc::channel(32);
323
324 let mut start_revision = self
326 .get_start_revision(
327 prefix.as_ref(),
328 if include_existing { Some(&tx) } else { None },
329 )
330 .await?;
331
332 let connector = self.connector.clone();
334 let prefix_str = prefix.as_ref().to_string();
335 self.rt.spawn(async move {
336 let mut reconnect = true;
337 while reconnect {
338 let watch_stream =
340 match Self::new_watch_stream(&connector, &prefix_str, start_revision).await {
341 Ok(stream) => stream,
342 Err(_) => return,
343 };
344
345 reconnect =
347 Self::monitor_watch_stream(watch_stream, &prefix_str, &mut start_revision, &tx)
348 .await;
349 }
350 });
351
352 Ok(PrefixWatcher {
353 prefix: prefix.as_ref().to_string(),
354 rx,
355 })
356 }
357
358 async fn get_start_revision(
363 &self,
364 prefix: impl AsRef<str> + std::fmt::Display,
365 existing_kvs_tx: Option<&mpsc::Sender<WatchEvent>>,
366 ) -> Result<i64> {
367 let mut kv_client = self.connector.get_client().kv_client();
368 let mut get_response = kv_client
369 .get(prefix.as_ref(), Some(GetOptions::new().with_prefix()))
370 .await?;
371
372 let mut start_revision = get_response
374 .header()
375 .ok_or(anyhow::anyhow!("missing header; unable to get revision"))?
376 .revision();
377 tracing::trace!("{prefix}: start_revision: {start_revision}");
378 start_revision += 1;
379
380 if let Some(tx) = existing_kvs_tx {
382 let kvs = get_response.take_kvs();
383 tracing::trace!("initial kv count: {:?}", kvs.len());
384 for kv in kvs.into_iter() {
385 tx.send(WatchEvent::Put(kv)).await?;
386 }
387 }
388
389 Ok(start_revision)
390 }
391
392 async fn new_watch_stream(
397 connector: &Arc<Connector>,
398 prefix: &String,
399 start_revision: i64,
400 ) -> Result<WatchStream> {
401 loop {
402 match connector
403 .get_client()
404 .watch_client()
405 .watch(
406 prefix.as_str(),
407 Some(
408 WatchOptions::new()
409 .with_prefix()
410 .with_start_revision(start_revision)
411 .with_prev_key(),
412 ),
413 )
414 .await
415 {
416 Ok((_, watch_stream)) => {
417 tracing::debug!("Watch stream established for prefix '{}'", prefix);
418 return Ok(watch_stream);
419 }
420 Err(err) => {
421 tracing::debug!(error = %err, "Failed to establish watch stream for prefix '{}'", prefix);
422 let deadline = std::time::Instant::now() + Duration::from_secs(10);
423 if let Err(err) = connector.reconnect(deadline).await {
424 tracing::error!(
425 "Failed to reconnect to ETCD within 10 secs for watching prefix '{}': {}",
426 prefix,
427 err
428 );
429 return Err(err);
430 }
431 }
433 }
434 }
435 }
436
437 async fn monitor_watch_stream(
443 mut watch_stream: WatchStream,
444 prefix: &String,
445 start_revision: &mut i64,
446 tx: &mpsc::Sender<WatchEvent>,
447 ) -> bool {
448 loop {
449 tokio::select! {
450 maybe_resp = watch_stream.next() => {
451 let response = match maybe_resp {
453 Some(Ok(res)) => res,
454 Some(Err(err)) => {
455 tracing::warn!(error = %err, "Error watching stream for prefix '{}'", prefix);
456 return true; }
458 None => {
459 tracing::warn!("Watch stream unexpectedly closed for prefix '{}'", prefix);
460 return true; }
462 };
463
464 *start_revision = match response.header() {
466 Some(header) => header.revision() + 1,
467 None => {
468 tracing::error!("Missing header in watch response for prefix '{}'", prefix);
469 return false;
470 }
471 };
472
473 if Self::process_watch_events(response.events(), tx).await.is_err() {
475 return false;
476 };
477 }
478 _ = tx.closed() => {
479 tracing::debug!("no more receivers, stopping watcher");
480 return false;
481 }
482 }
483 }
484 }
485
486 async fn process_watch_events(
491 events: &[etcd_client::Event],
492 tx: &mpsc::Sender<WatchEvent>,
493 ) -> Result<()> {
494 for event in events {
495 let Some(kv) = event.kv() else {
497 continue; };
499
500 match event.event_type() {
502 etcd_client::EventType::Put => {
503 if let Err(err) = tx.send(WatchEvent::Put(kv.clone())).await {
504 tracing::error!("kv watcher error forwarding WatchEvent::Put: {err}");
505 return Err(err.into());
506 }
507 }
508 etcd_client::EventType::Delete => {
509 if tx.send(WatchEvent::Delete(kv.clone())).await.is_err() {
510 return Err(anyhow::anyhow!("failed to send WatchEvent::Delete"));
511 }
512 }
513 }
514 }
515 Ok(())
516 }
517}
518
519#[derive(Dissolve)]
520pub struct PrefixWatcher {
521 prefix: String,
522 rx: mpsc::Receiver<WatchEvent>,
523}
524
525#[derive(Debug)]
526pub enum WatchEvent {
527 Put(KeyValue),
528 Delete(KeyValue),
529}
530
531#[derive(Debug, Clone, Builder, Validate)]
533pub struct ClientOptions {
534 #[validate(length(min = 1))]
535 pub etcd_url: Vec<String>,
536
537 #[builder(default)]
538 pub etcd_connect_options: Option<ConnectOptions>,
539
540 #[builder(default = "true")]
542 pub attach_lease: bool,
543}
544
545impl Default for ClientOptions {
546 fn default() -> Self {
547 let mut connect_options = None;
548
549 if let (Ok(username), Ok(password)) = (
550 std::env::var("ETCD_AUTH_USERNAME"),
551 std::env::var("ETCD_AUTH_PASSWORD"),
552 ) {
553 connect_options = Some(ConnectOptions::new().with_user(username, password));
555 } else if let (Ok(ca), Ok(cert), Ok(key)) = (
556 std::env::var("ETCD_AUTH_CA"),
557 std::env::var("ETCD_AUTH_CLIENT_CERT"),
558 std::env::var("ETCD_AUTH_CLIENT_KEY"),
559 ) {
560 connect_options = Some(
562 ConnectOptions::new().with_tls(
563 TlsOptions::new()
564 .ca_certificate(Certificate::from_pem(ca))
565 .identity(Identity::from_pem(cert, key)),
566 ),
567 );
568 }
569
570 ClientOptions {
571 etcd_url: default_servers(),
572 etcd_connect_options: connect_options,
573 attach_lease: true,
574 }
575 }
576}
577
578fn default_servers() -> Vec<String> {
579 match std::env::var("ETCD_ENDPOINTS") {
580 Ok(possible_list_of_urls) => possible_list_of_urls
581 .split(',')
582 .map(|s| s.to_string())
583 .collect(),
584 Err(_) => vec!["http://localhost:2379".to_string()],
585 }
586}
587
588pub struct KvCache {
590 client: Client,
591 pub prefix: String,
592 cache: Arc<RwLock<HashMap<String, Vec<u8>>>>,
593 watcher: Option<PrefixWatcher>,
594}
595
596impl KvCache {
597 pub async fn new(
599 client: Client,
600 prefix: String,
601 initial_values: HashMap<String, Vec<u8>>,
602 ) -> Result<Self> {
603 let mut cache = HashMap::new();
604
605 let existing_kvs = client.kv_get_prefix(&prefix).await?;
607 for kv in existing_kvs {
608 let key = String::from_utf8_lossy(kv.key()).to_string();
609 cache.insert(key, kv.value().to_vec());
610 }
611
612 for (key, value) in initial_values.iter() {
617 let full_key = format!("{}{}", prefix, key);
618 if let std::collections::hash_map::Entry::Vacant(e) = cache.entry(full_key.clone()) {
619 client.kv_put(&full_key, value.clone(), None).await?;
620 e.insert(value.clone());
621 }
622 }
623
624 let watcher = client.kv_get_and_watch_prefix(&prefix).await?;
628
629 let cache = Arc::new(RwLock::new(cache));
630 let mut result = Self {
631 client,
632 prefix,
633 cache,
634 watcher: Some(watcher),
635 };
636
637 result.start_watcher().await?;
639
640 Ok(result)
641 }
642
643 async fn start_watcher(&mut self) -> Result<()> {
645 if let Some(watcher) = self.watcher.take() {
646 let cache = self.cache.clone();
647 let prefix = self.prefix.clone();
648
649 tokio::spawn(async move {
650 let mut rx = watcher.rx;
651
652 while let Some(event) = rx.recv().await {
653 match event {
654 WatchEvent::Put(kv) => {
655 let key = String::from_utf8_lossy(kv.key()).to_string();
656 let value = kv.value().to_vec();
657
658 tracing::trace!("KvCache update: {} = {:?}", key, value);
659 let mut cache_write = cache.write().await;
660 cache_write.insert(key, value);
661 }
662 WatchEvent::Delete(kv) => {
663 let key = String::from_utf8_lossy(kv.key()).to_string();
664
665 tracing::trace!("KvCache delete: {}", key);
666 let mut cache_write = cache.write().await;
667 cache_write.remove(&key);
668 }
669 }
670 }
671
672 tracing::debug!("KvCache watcher for prefix '{}' stopped", prefix);
673 });
674 }
675
676 Ok(())
677 }
678
679 pub async fn get(&self, key: &str) -> Option<Vec<u8>> {
681 let full_key = format!("{}{}", self.prefix, key);
682 let cache_read = self.cache.read().await;
683 cache_read.get(&full_key).cloned()
684 }
685
686 pub async fn get_all(&self) -> HashMap<String, Vec<u8>> {
688 let cache_read = self.cache.read().await;
689 cache_read.clone()
690 }
691
692 pub async fn put(&self, key: &str, value: Vec<u8>, lease_id: Option<u64>) -> Result<()> {
694 let full_key = format!("{}{}", self.prefix, key);
695
696 self.client
698 .kv_put(&full_key, value.clone(), lease_id)
699 .await?;
700
701 let mut cache_write = self.cache.write().await;
703 cache_write.insert(full_key, value);
704
705 Ok(())
706 }
707
708 pub async fn delete(&self, key: &str) -> Result<()> {
710 let full_key = format!("{}{}", self.prefix, key);
711
712 self.client.kv_delete(full_key.clone(), None).await?;
714
715 let mut cache_write = self.cache.write().await;
717 cache_write.remove(&full_key);
718
719 Ok(())
720 }
721}
722
723#[cfg(feature = "integration")]
724#[cfg(test)]
725mod tests {
726 use crate::{DistributedRuntime, distributed::DistributedConfig};
727
728 use super::*;
729
730 #[test]
731 fn test_ectd_client() {
732 let rt = Runtime::from_settings().unwrap();
733 let rt_clone = rt.clone();
734 let config = DistributedConfig::from_settings(false);
735
736 rt_clone.primary().block_on(async move {
737 let drt = DistributedRuntime::new(rt, config).await.unwrap();
738 test_kv_create_or_validate(drt).await.unwrap();
739 });
740 }
741
742 async fn test_kv_create_or_validate(drt: DistributedRuntime) -> Result<()> {
743 let key = "__integration_test_key";
744 let value = b"test_value";
745
746 let client = drt.etcd_client().expect("etcd client should be available");
747 let lease_id = drt.connection_id();
748
749 let result = client.kv_create(key, value.to_vec(), Some(lease_id)).await;
751 assert!(result.is_ok(), "");
752
753 let result = client.kv_create(key, value.to_vec(), Some(lease_id)).await;
755 assert!(result.is_err());
756
757 let result = client
759 .kv_create_or_validate(key.to_string(), value.to_vec(), Some(lease_id))
760 .await;
761 assert!(result.is_ok());
762
763 let different_value = b"different_value";
765 let result = client
766 .kv_create_or_validate(key.to_string(), different_value.to_vec(), Some(lease_id))
767 .await;
768 assert!(result.is_err(), "");
769
770 Ok(())
771 }
772
773 #[test]
774 fn test_kv_cache() {
775 let rt = Runtime::from_settings().unwrap();
776 let rt_clone = rt.clone();
777 let config = DistributedConfig::from_settings(false);
778
779 rt_clone.primary().block_on(async move {
780 let drt = DistributedRuntime::new(rt, config).await.unwrap();
781 test_kv_cache_operations(drt).await.unwrap();
782 });
783 }
784
785 async fn test_kv_cache_operations(drt: DistributedRuntime) -> Result<()> {
786 let client = drt.etcd_client().expect("etcd client should be available");
788
789 let test_id = uuid::Uuid::new_v4().to_string();
791 let prefix = format!("v1/test_kv_cache_{}/", test_id);
792
793 let mut initial_values = HashMap::new();
795 initial_values.insert("key1".to_string(), b"value1".to_vec());
796 initial_values.insert("key2".to_string(), b"value2".to_vec());
797
798 let kv_cache = KvCache::new(client.clone(), prefix.clone(), initial_values).await?;
800
801 let value1 = kv_cache.get("key1").await;
803 assert_eq!(value1, Some(b"value1".to_vec()));
804
805 let value2 = kv_cache.get("key2").await;
806 assert_eq!(value2, Some(b"value2".to_vec()));
807
808 let all_values = kv_cache.get_all().await;
810 assert_eq!(all_values.len(), 2);
811 assert_eq!(
812 all_values.get(&format!("{}key1", prefix)),
813 Some(&b"value1".to_vec())
814 );
815 assert_eq!(
816 all_values.get(&format!("{}key2", prefix)),
817 Some(&b"value2".to_vec())
818 );
819
820 kv_cache.put("key3", b"value3".to_vec(), None).await?;
822
823 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
825
826 let value3 = kv_cache.get("key3").await;
828 assert_eq!(value3, Some(b"value3".to_vec()));
829
830 kv_cache
832 .put("key1", b"updated_value1".to_vec(), None)
833 .await?;
834
835 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
837
838 let updated_value1 = kv_cache.get("key1").await;
840 assert_eq!(updated_value1, Some(b"updated_value1".to_vec()));
841
842 client
844 .kv_put(
845 &format!("{}key2", prefix),
846 b"external_update".to_vec(),
847 None,
848 )
849 .await?;
850
851 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
853
854 let external_update = kv_cache.get("key2").await;
856 assert_eq!(external_update, Some(b"external_update".to_vec()));
857
858 let etcd_client = client.etcd_client();
860 let _ = etcd_client
861 .kv_client()
862 .delete(
863 prefix,
864 Some(etcd_client::DeleteOptions::new().with_prefix()),
865 )
866 .await?;
867
868 Ok(())
869 }
870}