1use crate::{CancellationToken, ErrorContext, Result, Runtime, error};
5
6use async_nats::jetstream::kv;
7use derive_builder::Builder;
8use derive_getters::Dissolve;
9use futures::StreamExt;
10use std::collections::HashMap;
11use std::sync::Arc;
12use tokio::sync::{RwLock, mpsc};
13use validator::Validate;
14
15use etcd_client::{
16 Certificate, Compare, CompareOp, DeleteOptions, GetOptions, Identity, LockClient, LockOptions,
17 LockResponse, PutOptions, PutResponse, TlsOptions, Txn, TxnOp, TxnOpResponse, WatchOptions,
18 Watcher,
19};
20pub use etcd_client::{ConnectOptions, KeyValue, LeaseClient};
21use tokio::time::{Duration, interval};
22
23mod lease;
24mod lock;
25mod path;
26
27use lease::*;
28pub use lock::*;
29pub use path::*;
30
31use super::utils::build_in_runtime;
32
33#[derive(Clone)]
35pub struct Client {
36 client: etcd_client::Client,
37 primary_lease: i64,
38 runtime: Runtime,
39 rt: Arc<tokio::runtime::Runtime>,
40}
41
42#[derive(Debug, Clone)]
43pub struct Lease {
44 id: i64,
46
47 cancel_token: CancellationToken,
49}
50
51impl Lease {
52 pub fn id(&self) -> i64 {
54 self.id
55 }
56
57 pub fn primary_token(&self) -> CancellationToken {
60 self.cancel_token.clone()
61 }
62
63 pub fn child_token(&self) -> CancellationToken {
66 self.cancel_token.child_token()
67 }
68
69 pub fn revoke(&self) {
71 self.cancel_token.cancel();
72 }
73
74 pub async fn is_valid(&self) -> Result<bool> {
76 Ok(!self.cancel_token.is_cancelled())
79 }
80}
81
82impl Client {
83 pub fn builder() -> ClientOptionsBuilder {
84 ClientOptionsBuilder::default()
85 }
86
87 pub async fn new(config: ClientOptions, runtime: Runtime) -> Result<Self> {
96 let token = runtime.primary_token();
97
98 let ((client, lease_id), rt) = build_in_runtime(
99 async move {
100 let client = etcd_client::Client::connect(
101 config.etcd_url.clone(),
102 config.etcd_connect_options,
103 )
104 .await
105 .with_context(|| {
106 format!(
107 "Unable to connect to etcd server at {}. Check etcd server status",
108 config.etcd_url.join(", ")
109 )
110 })?;
111
112 let lease_id = if config.attach_lease {
113 let lease_client = client.lease_client();
114
115 let lease = create_lease(lease_client, 10, token)
116 .await
117 .with_context(|| {
118 format!(
119 "Unable to create lease. Check etcd server status at {}",
120 config.etcd_url.join(", ")
121 )
122 })?;
123
124 lease.id
125 } else {
126 0
127 };
128
129 Ok((client, lease_id))
130 },
131 1,
132 )
133 .await?;
134
135 Ok(Client {
136 client,
137 primary_lease: lease_id,
138 rt,
139 runtime,
140 })
141 }
142
143 pub(crate) fn etcd_client(&self) -> &etcd_client::Client {
145 &self.client
146 }
147
148 pub fn lease_id(&self) -> i64 {
150 self.primary_lease
151 }
152
153 pub fn primary_lease(&self) -> Lease {
155 Lease {
156 id: self.primary_lease,
157 cancel_token: self.runtime.primary_token(),
158 }
159 }
160
161 pub async fn create_lease(&self, ttl: i64) -> Result<Lease> {
164 let token = self.runtime.child_token();
165 let lease_client = self.client.lease_client();
166 self.rt
167 .spawn(create_lease(lease_client, ttl, token))
168 .await?
169 }
170
171 pub async fn revoke_lease(&self, lease_id: i64) -> Result<()> {
173 let lease_client = self.client.lease_client();
174 self.rt.spawn(revoke_lease(lease_client, lease_id)).await?
175 }
176
177 pub async fn kv_create(&self, key: &str, value: Vec<u8>, lease_id: Option<i64>) -> Result<()> {
178 let id = lease_id.unwrap_or(self.lease_id());
179 let put_options = PutOptions::new().with_lease(id);
180
181 let txn = Txn::new()
183 .when(vec![Compare::version(key, CompareOp::Equal, 0)]) .and_then(vec![
185 TxnOp::put(key, value, Some(put_options)), ]);
187
188 let result = self.client.kv_client().txn(txn).await?;
190
191 if result.succeeded() {
192 Ok(())
193 } else {
194 for resp in result.op_responses() {
195 tracing::warn!(response = ?resp, "kv_create etcd op response");
196 }
197 Err(error!("Unable to create key. Check etcd server status"))
198 }
199 }
200
201 pub async fn kv_create_or_validate(
203 &self,
204 key: String,
205 value: Vec<u8>,
206 lease_id: Option<i64>,
207 ) -> Result<()> {
208 let id = lease_id.unwrap_or(self.lease_id());
209 let put_options = PutOptions::new().with_lease(id);
210
211 let txn = Txn::new()
214 .when(vec![Compare::version(key.as_str(), CompareOp::Equal, 0)]) .and_then(vec![
216 TxnOp::put(key.as_str(), value.clone(), Some(put_options)), ])
218 .or_else(vec![
219 TxnOp::txn(Txn::new().when(vec![Compare::value(
221 key.as_str(),
222 CompareOp::Equal,
223 value.clone(),
224 )])),
225 ]);
226
227 let result = self.client.kv_client().txn(txn).await?;
229
230 if result.succeeded() {
232 Ok(())
233 } else {
234 match result.op_responses().first() {
235 Some(response) => match response {
236 TxnOpResponse::Txn(response) => match response.succeeded() {
237 true => Ok(()),
238 false => Err(error!(
239 "Unable to create or validate key. Check etcd server status"
240 )),
241 },
242 _ => Err(error!(
243 "Unable to validate key operation. Check etcd server status"
244 )),
245 },
246 None => Err(error!(
247 "Unable to create or validate key. Check etcd server status"
248 )),
249 }
250 }
251 }
252
253 pub async fn kv_put(
254 &self,
255 key: impl AsRef<str>,
256 value: impl AsRef<[u8]>,
257 lease_id: Option<i64>,
258 ) -> Result<()> {
259 let id = lease_id.unwrap_or(self.lease_id());
260 let put_options = PutOptions::new().with_lease(id);
261 let _ = self
262 .client
263 .kv_client()
264 .put(key.as_ref(), value.as_ref(), Some(put_options))
265 .await?;
266 Ok(())
267 }
268
269 pub async fn kv_put_with_options(
270 &self,
271 key: impl AsRef<str>,
272 value: impl AsRef<[u8]>,
273 options: Option<PutOptions>,
274 ) -> Result<PutResponse> {
275 let options = options
276 .unwrap_or_default()
277 .with_lease(self.primary_lease().id());
278 self.client
279 .kv_client()
280 .put(key.as_ref(), value.as_ref(), Some(options))
281 .await
282 .map_err(|err| err.into())
283 }
284
285 pub async fn kv_get(
286 &self,
287 key: impl Into<Vec<u8>>,
288 options: Option<GetOptions>,
289 ) -> Result<Vec<KeyValue>> {
290 let mut get_response = self.client.kv_client().get(key, options).await?;
291 Ok(get_response.take_kvs())
292 }
293
294 pub async fn kv_delete(
295 &self,
296 key: impl Into<Vec<u8>>,
297 options: Option<DeleteOptions>,
298 ) -> Result<i64> {
299 self.client
300 .kv_client()
301 .delete(key, options)
302 .await
303 .map(|del_response| del_response.deleted())
304 .map_err(|err| err.into())
305 }
306
307 pub async fn kv_get_prefix(&self, prefix: impl AsRef<str>) -> Result<Vec<KeyValue>> {
308 let mut get_response = self
309 .client
310 .kv_client()
311 .get(prefix.as_ref(), Some(GetOptions::new().with_prefix()))
312 .await?;
313
314 Ok(get_response.take_kvs())
315 }
316
317 pub async fn lock(
320 &self,
321 key: impl Into<Vec<u8>>,
322 lease_id: Option<i64>,
323 ) -> Result<LockResponse> {
324 let mut lock_client = self.client.lock_client();
325 let id = lease_id.unwrap_or(self.lease_id());
326 let options = LockOptions::new().with_lease(id);
327 lock_client
328 .lock(key, Some(options))
329 .await
330 .map_err(|err| err.into())
331 }
332
333 pub async fn unlock(&self, lock_key: impl Into<Vec<u8>>) -> Result<()> {
335 let mut lock_client = self.client.lock_client();
336 lock_client
337 .unlock(lock_key)
338 .await
339 .map_err(|err: etcd_client::Error| anyhow::anyhow!(err))?;
340 Ok(())
341 }
342
343 pub async fn kv_watch_prefix(
345 &self,
346 prefix: impl AsRef<str> + std::fmt::Display,
347 ) -> Result<PrefixWatcher> {
348 self.watch_internal(prefix, false).await
349 }
350
351 pub async fn kv_get_and_watch_prefix(
352 &self,
353 prefix: impl AsRef<str> + std::fmt::Display,
354 ) -> Result<PrefixWatcher> {
355 self.watch_internal(prefix, true).await
356 }
357
358 async fn watch_internal(
359 &self,
360 prefix: impl AsRef<str> + std::fmt::Display,
361 include_existing: bool,
362 ) -> Result<PrefixWatcher> {
363 let mut kv_client = self.client.kv_client();
364 let mut watch_client = self.client.watch_client();
365
366 let mut get_response = kv_client
367 .get(prefix.as_ref(), Some(GetOptions::new().with_prefix()))
368 .await?;
369
370 let start_revision = get_response
371 .header()
372 .ok_or(error!("missing header; unable to get revision"))?
373 .revision();
374
375 tracing::trace!("{prefix}: start_revision: {start_revision}");
376 let start_revision = start_revision + 1;
377
378 let (watcher, mut watch_stream) = watch_client
379 .watch(
380 prefix.as_ref(),
381 Some(
382 WatchOptions::new()
383 .with_prefix()
384 .with_start_revision(start_revision)
385 .with_prev_key(),
386 ),
387 )
388 .await?;
389
390 let kvs = if include_existing {
391 let kvs = get_response.take_kvs();
392 tracing::trace!("initial kv count: {:?}", kvs.len());
393 kvs
394 } else {
395 vec![]
396 };
397
398 let (tx, rx) = mpsc::channel(32);
399
400 self.rt.spawn(async move {
401 if include_existing {
402 for kv in kvs {
403 if tx.send(WatchEvent::Put(kv)).await.is_err() {
404 return;
406 }
407 }
408 }
409
410 loop {
411 tokio::select! {
412 maybe_resp = watch_stream.next() => {
413 let Some(Ok(response)) = maybe_resp else {
415 tracing::info!("kv watch stream closed");
416 return;
417 };
418
419 for event in response.events() {
421 let Some(kv) = event.kv() else {
423 continue; };
425
426 match event.event_type() {
428 etcd_client::EventType::Put => {
429 if let Err(err) = tx.send(WatchEvent::Put(kv.clone())).await {
430 tracing::error!("kv watcher error forwarding WatchEvent::Put: {err}");
431 return;
432 }
433 }
434 etcd_client::EventType::Delete => {
435 if tx.send(WatchEvent::Delete(kv.clone())).await.is_err() {
436 return;
437 }
438 }
439 }
440 }
441 }
442 _ = tx.closed() => {
443 tracing::debug!("no more receivers, stopping watcher");
444 return;
445 }
446 }
447 }
448 });
449 Ok(PrefixWatcher {
450 prefix: prefix.as_ref().to_string(),
451 watcher,
452 rx,
453 })
454 }
455}
456
457#[derive(Dissolve)]
458pub struct PrefixWatcher {
459 prefix: String,
460 watcher: Watcher,
461 rx: mpsc::Receiver<WatchEvent>,
462}
463
464#[derive(Debug)]
465pub enum WatchEvent {
466 Put(KeyValue),
467 Delete(KeyValue),
468}
469
470#[derive(Debug, Clone, Builder, Validate)]
472pub struct ClientOptions {
473 #[validate(length(min = 1))]
474 pub etcd_url: Vec<String>,
475
476 #[builder(default)]
477 pub etcd_connect_options: Option<ConnectOptions>,
478
479 #[builder(default = "true")]
481 pub attach_lease: bool,
482}
483
484impl Default for ClientOptions {
485 fn default() -> Self {
486 let mut connect_options = None;
487
488 if let (Ok(username), Ok(password)) = (
489 std::env::var("ETCD_AUTH_USERNAME"),
490 std::env::var("ETCD_AUTH_PASSWORD"),
491 ) {
492 connect_options = Some(ConnectOptions::new().with_user(username, password));
494 } else if let (Ok(ca), Ok(cert), Ok(key)) = (
495 std::env::var("ETCD_AUTH_CA"),
496 std::env::var("ETCD_AUTH_CLIENT_CERT"),
497 std::env::var("ETCD_AUTH_CLIENT_KEY"),
498 ) {
499 connect_options = Some(
501 ConnectOptions::new().with_tls(
502 TlsOptions::new()
503 .ca_certificate(Certificate::from_pem(ca))
504 .identity(Identity::from_pem(cert, key)),
505 ),
506 );
507 }
508
509 ClientOptions {
510 etcd_url: default_servers(),
511 etcd_connect_options: connect_options,
512 attach_lease: true,
513 }
514 }
515}
516
517fn default_servers() -> Vec<String> {
518 match std::env::var("ETCD_ENDPOINTS") {
519 Ok(possible_list_of_urls) => possible_list_of_urls
520 .split(',')
521 .map(|s| s.to_string())
522 .collect(),
523 Err(_) => vec!["http://localhost:2379".to_string()],
524 }
525}
526
527pub struct KvCache {
529 client: Client,
530 pub prefix: String,
531 cache: Arc<RwLock<HashMap<String, Vec<u8>>>>,
532 watcher: Option<PrefixWatcher>,
533}
534
535impl KvCache {
536 pub async fn new(
538 client: Client,
539 prefix: String,
540 initial_values: HashMap<String, Vec<u8>>,
541 ) -> Result<Self> {
542 let mut cache = HashMap::new();
543
544 let existing_kvs = client.kv_get_prefix(&prefix).await?;
546 for kv in existing_kvs {
547 let key = String::from_utf8_lossy(kv.key()).to_string();
548 cache.insert(key, kv.value().to_vec());
549 }
550
551 for (key, value) in initial_values.iter() {
556 let full_key = format!("{}{}", prefix, key);
557 if let std::collections::hash_map::Entry::Vacant(e) = cache.entry(full_key.clone()) {
558 client.kv_put(&full_key, value.clone(), None).await?;
559 e.insert(value.clone());
560 }
561 }
562
563 let watcher = client.kv_get_and_watch_prefix(&prefix).await?;
567
568 let cache = Arc::new(RwLock::new(cache));
569 let mut result = Self {
570 client,
571 prefix,
572 cache,
573 watcher: Some(watcher),
574 };
575
576 result.start_watcher().await?;
578
579 Ok(result)
580 }
581
582 async fn start_watcher(&mut self) -> Result<()> {
584 if let Some(watcher) = self.watcher.take() {
585 let cache = self.cache.clone();
586 let prefix = self.prefix.clone();
587
588 tokio::spawn(async move {
589 let mut rx = watcher.rx;
590
591 while let Some(event) = rx.recv().await {
592 match event {
593 WatchEvent::Put(kv) => {
594 let key = String::from_utf8_lossy(kv.key()).to_string();
595 let value = kv.value().to_vec();
596
597 tracing::trace!("KvCache update: {} = {:?}", key, value);
598 let mut cache_write = cache.write().await;
599 cache_write.insert(key, value);
600 }
601 WatchEvent::Delete(kv) => {
602 let key = String::from_utf8_lossy(kv.key()).to_string();
603
604 tracing::trace!("KvCache delete: {}", key);
605 let mut cache_write = cache.write().await;
606 cache_write.remove(&key);
607 }
608 }
609 }
610
611 tracing::debug!("KvCache watcher for prefix '{}' stopped", prefix);
612 });
613 }
614
615 Ok(())
616 }
617
618 pub async fn get(&self, key: &str) -> Option<Vec<u8>> {
620 let full_key = format!("{}{}", self.prefix, key);
621 let cache_read = self.cache.read().await;
622 cache_read.get(&full_key).cloned()
623 }
624
625 pub async fn get_all(&self) -> HashMap<String, Vec<u8>> {
627 let cache_read = self.cache.read().await;
628 cache_read.clone()
629 }
630
631 pub async fn put(&self, key: &str, value: Vec<u8>, lease_id: Option<i64>) -> Result<()> {
633 let full_key = format!("{}{}", self.prefix, key);
634
635 self.client
637 .kv_put(&full_key, value.clone(), lease_id)
638 .await?;
639
640 let mut cache_write = self.cache.write().await;
642 cache_write.insert(full_key, value);
643
644 Ok(())
645 }
646
647 pub async fn delete(&self, key: &str) -> Result<()> {
649 let full_key = format!("{}{}", self.prefix, key);
650
651 self.client.kv_delete(full_key.clone(), None).await?;
653
654 let mut cache_write = self.cache.write().await;
656 cache_write.remove(&full_key);
657
658 Ok(())
659 }
660}
661
662#[cfg(feature = "integration")]
663#[cfg(test)]
664mod tests {
665 use crate::{DistributedRuntime, distributed::DistributedConfig};
666
667 use super::*;
668
669 #[test]
670 fn test_ectd_client() {
671 let rt = Runtime::from_settings().unwrap();
672 let rt_clone = rt.clone();
673 let config = DistributedConfig::from_settings(false);
674
675 rt_clone.primary().block_on(async move {
676 let drt = DistributedRuntime::new(rt, config).await.unwrap();
677 test_kv_create_or_validate(drt).await.unwrap();
678 });
679 }
680
681 async fn test_kv_create_or_validate(drt: DistributedRuntime) -> Result<()> {
682 let key = "__integration_test_key";
683 let value = b"test_value";
684
685 let client = drt.etcd_client().expect("etcd client should be available");
686 let lease_id = drt
687 .primary_lease()
688 .expect("primary lease should be available")
689 .id();
690
691 let result = client.kv_create(key, value.to_vec(), Some(lease_id)).await;
693 assert!(result.is_ok(), "");
694
695 let result = client.kv_create(key, value.to_vec(), Some(lease_id)).await;
697 assert!(result.is_err());
698
699 let result = client
701 .kv_create_or_validate(key.to_string(), value.to_vec(), Some(lease_id))
702 .await;
703 assert!(result.is_ok());
704
705 let different_value = b"different_value";
707 let result = client
708 .kv_create_or_validate(key.to_string(), different_value.to_vec(), Some(lease_id))
709 .await;
710 assert!(result.is_err(), "");
711
712 Ok(())
713 }
714
715 #[test]
716 fn test_kv_cache() {
717 let rt = Runtime::from_settings().unwrap();
718 let rt_clone = rt.clone();
719 let config = DistributedConfig::from_settings(false);
720
721 rt_clone.primary().block_on(async move {
722 let drt = DistributedRuntime::new(rt, config).await.unwrap();
723 test_kv_cache_operations(drt).await.unwrap();
724 });
725 }
726
727 async fn test_kv_cache_operations(drt: DistributedRuntime) -> Result<()> {
728 let client = drt.etcd_client().expect("etcd client should be available");
730
731 let test_id = uuid::Uuid::new_v4().to_string();
733 let prefix = format!("v1/test_kv_cache_{}/", test_id);
734
735 let mut initial_values = HashMap::new();
737 initial_values.insert("key1".to_string(), b"value1".to_vec());
738 initial_values.insert("key2".to_string(), b"value2".to_vec());
739
740 let kv_cache = KvCache::new(client.clone(), prefix.clone(), initial_values).await?;
742
743 let value1 = kv_cache.get("key1").await;
745 assert_eq!(value1, Some(b"value1".to_vec()));
746
747 let value2 = kv_cache.get("key2").await;
748 assert_eq!(value2, Some(b"value2".to_vec()));
749
750 let all_values = kv_cache.get_all().await;
752 assert_eq!(all_values.len(), 2);
753 assert_eq!(
754 all_values.get(&format!("{}key1", prefix)),
755 Some(&b"value1".to_vec())
756 );
757 assert_eq!(
758 all_values.get(&format!("{}key2", prefix)),
759 Some(&b"value2".to_vec())
760 );
761
762 kv_cache.put("key3", b"value3".to_vec(), None).await?;
764
765 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
767
768 let value3 = kv_cache.get("key3").await;
770 assert_eq!(value3, Some(b"value3".to_vec()));
771
772 kv_cache
774 .put("key1", b"updated_value1".to_vec(), None)
775 .await?;
776
777 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
779
780 let updated_value1 = kv_cache.get("key1").await;
782 assert_eq!(updated_value1, Some(b"updated_value1".to_vec()));
783
784 client
786 .kv_put(
787 &format!("{}key2", prefix),
788 b"external_update".to_vec(),
789 None,
790 )
791 .await?;
792
793 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
795
796 let external_update = kv_cache.get("key2").await;
798 assert_eq!(external_update, Some(b"external_update".to_vec()));
799
800 let etcd_client = client.etcd_client();
802 let _ = etcd_client
803 .kv_client()
804 .delete(
805 prefix,
806 Some(etcd_client::DeleteOptions::new().with_prefix()),
807 )
808 .await?;
809
810 Ok(())
811 }
812}