1use crate::{CancellationToken, ErrorContext, Result, Runtime, error};
5
6use async_nats::jetstream::kv;
7use derive_builder::Builder;
8use derive_getters::Dissolve;
9use futures::StreamExt;
10use std::collections::HashMap;
11use std::sync::Arc;
12use tokio::sync::{RwLock, mpsc};
13use validator::Validate;
14
15use etcd_client::{
16 Certificate, Compare, CompareOp, DeleteOptions, GetOptions, Identity, LockClient, LockOptions,
17 LockResponse, PutOptions, PutResponse, TlsOptions, Txn, TxnOp, TxnOpResponse, WatchOptions,
18 Watcher,
19};
20pub use etcd_client::{ConnectOptions, KeyValue, LeaseClient};
21use tokio::time::{Duration, interval};
22
23mod lease;
24mod lock;
25mod path;
26
27use lease::*;
28pub use lock::*;
29pub use path::*;
30
31use super::utils::build_in_runtime;
32
33#[derive(Clone)]
35pub struct Client {
36 client: etcd_client::Client,
37 primary_lease: u64,
38 runtime: Runtime,
39 rt: Arc<tokio::runtime::Runtime>,
40}
41
42impl std::fmt::Debug for Client {
43 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
44 write!(f, "etcd::Client primary_lease={}", self.primary_lease)
45 }
46}
47
48#[derive(Debug, Clone)]
49pub struct Lease {
50 id: u64,
53
54 cancel_token: CancellationToken,
56}
57
58impl Lease {
59 pub fn id(&self) -> u64 {
61 self.id
62 }
63
64 pub fn primary_token(&self) -> CancellationToken {
67 self.cancel_token.clone()
68 }
69
70 pub fn child_token(&self) -> CancellationToken {
73 self.cancel_token.child_token()
74 }
75
76 pub fn revoke(&self) {
78 self.cancel_token.cancel();
79 }
80
81 pub async fn is_valid(&self) -> Result<bool> {
83 Ok(!self.cancel_token.is_cancelled())
86 }
87}
88
89impl Client {
90 pub fn builder() -> ClientOptionsBuilder {
91 ClientOptionsBuilder::default()
92 }
93
94 pub async fn new(config: ClientOptions, runtime: Runtime) -> Result<Self> {
103 let token = runtime.primary_token();
104
105 let ((client, lease_id), rt) = build_in_runtime(
106 async move {
107 let client = etcd_client::Client::connect(
108 config.etcd_url.clone(),
109 config.etcd_connect_options,
110 )
111 .await
112 .with_context(|| {
113 format!(
114 "Unable to connect to etcd server at {}. Check etcd server status",
115 config.etcd_url.join(", ")
116 )
117 })?;
118
119 let lease_id = if config.attach_lease {
120 let lease_client = client.lease_client();
121
122 let lease = create_lease(lease_client, 10, token)
123 .await
124 .with_context(|| {
125 format!(
126 "Unable to create lease. Check etcd server status at {}",
127 config.etcd_url.join(", ")
128 )
129 })?;
130
131 lease.id
132 } else {
133 0
134 };
135
136 Ok((client, lease_id))
137 },
138 1,
139 )
140 .await?;
141
142 Ok(Client {
143 client,
144 primary_lease: lease_id,
145 rt,
146 runtime,
147 })
148 }
149
150 pub(crate) fn etcd_client(&self) -> &etcd_client::Client {
152 &self.client
153 }
154
155 pub fn lease_id(&self) -> u64 {
157 self.primary_lease
158 }
159
160 pub fn primary_lease(&self) -> Lease {
162 Lease {
163 id: self.primary_lease,
164 cancel_token: self.runtime.primary_token(),
165 }
166 }
167
168 pub async fn create_lease(&self, ttl: u64) -> Result<Lease> {
171 let token = self.runtime.child_token();
172 let lease_client = self.client.lease_client();
173 self.rt
174 .spawn(create_lease(lease_client, ttl, token))
175 .await?
176 }
177
178 pub async fn revoke_lease(&self, lease_id: u64) -> Result<()> {
180 let lease_client = self.client.lease_client();
181 self.rt.spawn(revoke_lease(lease_client, lease_id)).await?
182 }
183
184 pub async fn kv_create(&self, key: &str, value: Vec<u8>, lease_id: Option<u64>) -> Result<()> {
185 let id = lease_id.unwrap_or(self.lease_id());
186 let put_options = PutOptions::new().with_lease(id as i64);
187
188 let txn = Txn::new()
190 .when(vec![Compare::version(key, CompareOp::Equal, 0)]) .and_then(vec![
192 TxnOp::put(key, value, Some(put_options)), ]);
194
195 let result = self.client.kv_client().txn(txn).await?;
197
198 if result.succeeded() {
199 Ok(())
200 } else {
201 for resp in result.op_responses() {
202 tracing::warn!(response = ?resp, "kv_create etcd op response");
203 }
204 Err(error!("Unable to create key. Check etcd server status"))
205 }
206 }
207
208 pub async fn kv_create_or_validate(
210 &self,
211 key: String,
212 value: Vec<u8>,
213 lease_id: Option<u64>,
214 ) -> Result<()> {
215 let id = lease_id.unwrap_or(self.lease_id());
216 let put_options = PutOptions::new().with_lease(id as i64);
217
218 let txn = Txn::new()
221 .when(vec![Compare::version(key.as_str(), CompareOp::Equal, 0)]) .and_then(vec![
223 TxnOp::put(key.as_str(), value.clone(), Some(put_options)), ])
225 .or_else(vec![
226 TxnOp::txn(Txn::new().when(vec![Compare::value(
228 key.as_str(),
229 CompareOp::Equal,
230 value.clone(),
231 )])),
232 ]);
233
234 let result = self.client.kv_client().txn(txn).await?;
236
237 if result.succeeded() {
239 Ok(())
240 } else {
241 match result.op_responses().first() {
242 Some(response) => match response {
243 TxnOpResponse::Txn(response) => match response.succeeded() {
244 true => Ok(()),
245 false => Err(error!(
246 "Unable to create or validate key. Check etcd server status"
247 )),
248 },
249 _ => Err(error!(
250 "Unable to validate key operation. Check etcd server status"
251 )),
252 },
253 None => Err(error!(
254 "Unable to create or validate key. Check etcd server status"
255 )),
256 }
257 }
258 }
259
260 pub async fn kv_put(
261 &self,
262 key: impl AsRef<str>,
263 value: impl AsRef<[u8]>,
264 lease_id: Option<u64>,
265 ) -> Result<()> {
266 let id = lease_id.unwrap_or(self.lease_id());
267 let put_options = PutOptions::new().with_lease(id as i64);
268 let _ = self
269 .client
270 .kv_client()
271 .put(key.as_ref(), value.as_ref(), Some(put_options))
272 .await?;
273 Ok(())
274 }
275
276 pub async fn kv_put_with_options(
277 &self,
278 key: impl AsRef<str>,
279 value: impl AsRef<[u8]>,
280 options: Option<PutOptions>,
281 ) -> Result<PutResponse> {
282 let options = options
283 .unwrap_or_default()
284 .with_lease(self.primary_lease().id() as i64);
285 self.client
286 .kv_client()
287 .put(key.as_ref(), value.as_ref(), Some(options))
288 .await
289 .map_err(|err| err.into())
290 }
291
292 pub async fn kv_get(
293 &self,
294 key: impl Into<Vec<u8>>,
295 options: Option<GetOptions>,
296 ) -> Result<Vec<KeyValue>> {
297 let mut get_response = self.client.kv_client().get(key, options).await?;
298 Ok(get_response.take_kvs())
299 }
300
301 pub async fn kv_delete(
302 &self,
303 key: impl Into<Vec<u8>>,
304 options: Option<DeleteOptions>,
305 ) -> Result<u64> {
306 self.client
307 .kv_client()
308 .delete(key, options)
309 .await
310 .map(|del_response| del_response.deleted() as u64)
311 .map_err(|err| err.into())
312 }
313
314 pub async fn kv_get_prefix(&self, prefix: impl AsRef<str>) -> Result<Vec<KeyValue>> {
315 let mut get_response = self
316 .client
317 .kv_client()
318 .get(prefix.as_ref(), Some(GetOptions::new().with_prefix()))
319 .await?;
320
321 Ok(get_response.take_kvs())
322 }
323
324 pub async fn lock(
327 &self,
328 key: impl Into<Vec<u8>>,
329 lease_id: Option<u64>,
330 ) -> Result<LockResponse> {
331 let mut lock_client = self.client.lock_client();
332 let id = lease_id.unwrap_or(self.lease_id());
333 let options = LockOptions::new().with_lease(id as i64);
334 lock_client
335 .lock(key, Some(options))
336 .await
337 .map_err(|err| err.into())
338 }
339
340 pub async fn unlock(&self, lock_key: impl Into<Vec<u8>>) -> Result<()> {
342 let mut lock_client = self.client.lock_client();
343 lock_client
344 .unlock(lock_key)
345 .await
346 .map_err(|err: etcd_client::Error| anyhow::anyhow!(err))?;
347 Ok(())
348 }
349
350 pub async fn kv_watch_prefix(
352 &self,
353 prefix: impl AsRef<str> + std::fmt::Display,
354 ) -> Result<PrefixWatcher> {
355 self.watch_internal(prefix, false).await
356 }
357
358 pub async fn kv_get_and_watch_prefix(
359 &self,
360 prefix: impl AsRef<str> + std::fmt::Display,
361 ) -> Result<PrefixWatcher> {
362 self.watch_internal(prefix, true).await
363 }
364
365 async fn watch_internal(
366 &self,
367 prefix: impl AsRef<str> + std::fmt::Display,
368 include_existing: bool,
369 ) -> Result<PrefixWatcher> {
370 let mut kv_client = self.client.kv_client();
371 let mut watch_client = self.client.watch_client();
372
373 let mut get_response = kv_client
374 .get(prefix.as_ref(), Some(GetOptions::new().with_prefix()))
375 .await?;
376
377 let start_revision = get_response
378 .header()
379 .ok_or(error!("missing header; unable to get revision"))?
380 .revision();
381
382 tracing::trace!("{prefix}: start_revision: {start_revision}");
383 let start_revision = start_revision + 1;
384
385 let (watcher, mut watch_stream) = watch_client
386 .watch(
387 prefix.as_ref(),
388 Some(
389 WatchOptions::new()
390 .with_prefix()
391 .with_start_revision(start_revision)
392 .with_prev_key(),
393 ),
394 )
395 .await?;
396
397 let kvs = if include_existing {
398 let kvs = get_response.take_kvs();
399 tracing::trace!("initial kv count: {:?}", kvs.len());
400 kvs
401 } else {
402 vec![]
403 };
404
405 let (tx, rx) = mpsc::channel(32);
406
407 self.rt.spawn(async move {
408 if include_existing {
409 for kv in kvs {
410 if tx.send(WatchEvent::Put(kv)).await.is_err() {
411 return;
413 }
414 }
415 }
416
417 loop {
418 tokio::select! {
419 maybe_resp = watch_stream.next() => {
420 let Some(Ok(response)) = maybe_resp else {
422 tracing::info!("kv watch stream closed");
423 return;
424 };
425
426 for event in response.events() {
428 let Some(kv) = event.kv() else {
430 continue; };
432
433 match event.event_type() {
435 etcd_client::EventType::Put => {
436 if let Err(err) = tx.send(WatchEvent::Put(kv.clone())).await {
437 tracing::error!("kv watcher error forwarding WatchEvent::Put: {err}");
438 return;
439 }
440 }
441 etcd_client::EventType::Delete => {
442 if tx.send(WatchEvent::Delete(kv.clone())).await.is_err() {
443 return;
444 }
445 }
446 }
447 }
448 }
449 _ = tx.closed() => {
450 tracing::debug!("no more receivers, stopping watcher");
451 return;
452 }
453 }
454 }
455 });
456 Ok(PrefixWatcher {
457 prefix: prefix.as_ref().to_string(),
458 watcher,
459 rx,
460 })
461 }
462}
463
464#[derive(Dissolve)]
465pub struct PrefixWatcher {
466 prefix: String,
467 watcher: Watcher,
468 rx: mpsc::Receiver<WatchEvent>,
469}
470
471#[derive(Debug)]
472pub enum WatchEvent {
473 Put(KeyValue),
474 Delete(KeyValue),
475}
476
477#[derive(Debug, Clone, Builder, Validate)]
479pub struct ClientOptions {
480 #[validate(length(min = 1))]
481 pub etcd_url: Vec<String>,
482
483 #[builder(default)]
484 pub etcd_connect_options: Option<ConnectOptions>,
485
486 #[builder(default = "true")]
488 pub attach_lease: bool,
489}
490
491impl Default for ClientOptions {
492 fn default() -> Self {
493 let mut connect_options = None;
494
495 if let (Ok(username), Ok(password)) = (
496 std::env::var("ETCD_AUTH_USERNAME"),
497 std::env::var("ETCD_AUTH_PASSWORD"),
498 ) {
499 connect_options = Some(ConnectOptions::new().with_user(username, password));
501 } else if let (Ok(ca), Ok(cert), Ok(key)) = (
502 std::env::var("ETCD_AUTH_CA"),
503 std::env::var("ETCD_AUTH_CLIENT_CERT"),
504 std::env::var("ETCD_AUTH_CLIENT_KEY"),
505 ) {
506 connect_options = Some(
508 ConnectOptions::new().with_tls(
509 TlsOptions::new()
510 .ca_certificate(Certificate::from_pem(ca))
511 .identity(Identity::from_pem(cert, key)),
512 ),
513 );
514 }
515
516 ClientOptions {
517 etcd_url: default_servers(),
518 etcd_connect_options: connect_options,
519 attach_lease: true,
520 }
521 }
522}
523
524fn default_servers() -> Vec<String> {
525 match std::env::var("ETCD_ENDPOINTS") {
526 Ok(possible_list_of_urls) => possible_list_of_urls
527 .split(',')
528 .map(|s| s.to_string())
529 .collect(),
530 Err(_) => vec!["http://localhost:2379".to_string()],
531 }
532}
533
534pub struct KvCache {
536 client: Client,
537 pub prefix: String,
538 cache: Arc<RwLock<HashMap<String, Vec<u8>>>>,
539 watcher: Option<PrefixWatcher>,
540}
541
542impl KvCache {
543 pub async fn new(
545 client: Client,
546 prefix: String,
547 initial_values: HashMap<String, Vec<u8>>,
548 ) -> Result<Self> {
549 let mut cache = HashMap::new();
550
551 let existing_kvs = client.kv_get_prefix(&prefix).await?;
553 for kv in existing_kvs {
554 let key = String::from_utf8_lossy(kv.key()).to_string();
555 cache.insert(key, kv.value().to_vec());
556 }
557
558 for (key, value) in initial_values.iter() {
563 let full_key = format!("{}{}", prefix, key);
564 if let std::collections::hash_map::Entry::Vacant(e) = cache.entry(full_key.clone()) {
565 client.kv_put(&full_key, value.clone(), None).await?;
566 e.insert(value.clone());
567 }
568 }
569
570 let watcher = client.kv_get_and_watch_prefix(&prefix).await?;
574
575 let cache = Arc::new(RwLock::new(cache));
576 let mut result = Self {
577 client,
578 prefix,
579 cache,
580 watcher: Some(watcher),
581 };
582
583 result.start_watcher().await?;
585
586 Ok(result)
587 }
588
589 async fn start_watcher(&mut self) -> Result<()> {
591 if let Some(watcher) = self.watcher.take() {
592 let cache = self.cache.clone();
593 let prefix = self.prefix.clone();
594
595 tokio::spawn(async move {
596 let mut rx = watcher.rx;
597
598 while let Some(event) = rx.recv().await {
599 match event {
600 WatchEvent::Put(kv) => {
601 let key = String::from_utf8_lossy(kv.key()).to_string();
602 let value = kv.value().to_vec();
603
604 tracing::trace!("KvCache update: {} = {:?}", key, value);
605 let mut cache_write = cache.write().await;
606 cache_write.insert(key, value);
607 }
608 WatchEvent::Delete(kv) => {
609 let key = String::from_utf8_lossy(kv.key()).to_string();
610
611 tracing::trace!("KvCache delete: {}", key);
612 let mut cache_write = cache.write().await;
613 cache_write.remove(&key);
614 }
615 }
616 }
617
618 tracing::debug!("KvCache watcher for prefix '{}' stopped", prefix);
619 });
620 }
621
622 Ok(())
623 }
624
625 pub async fn get(&self, key: &str) -> Option<Vec<u8>> {
627 let full_key = format!("{}{}", self.prefix, key);
628 let cache_read = self.cache.read().await;
629 cache_read.get(&full_key).cloned()
630 }
631
632 pub async fn get_all(&self) -> HashMap<String, Vec<u8>> {
634 let cache_read = self.cache.read().await;
635 cache_read.clone()
636 }
637
638 pub async fn put(&self, key: &str, value: Vec<u8>, lease_id: Option<u64>) -> Result<()> {
640 let full_key = format!("{}{}", self.prefix, key);
641
642 self.client
644 .kv_put(&full_key, value.clone(), lease_id)
645 .await?;
646
647 let mut cache_write = self.cache.write().await;
649 cache_write.insert(full_key, value);
650
651 Ok(())
652 }
653
654 pub async fn delete(&self, key: &str) -> Result<()> {
656 let full_key = format!("{}{}", self.prefix, key);
657
658 self.client.kv_delete(full_key.clone(), None).await?;
660
661 let mut cache_write = self.cache.write().await;
663 cache_write.remove(&full_key);
664
665 Ok(())
666 }
667}
668
669#[cfg(feature = "integration")]
670#[cfg(test)]
671mod tests {
672 use crate::{DistributedRuntime, distributed::DistributedConfig};
673
674 use super::*;
675
676 #[test]
677 fn test_ectd_client() {
678 let rt = Runtime::from_settings().unwrap();
679 let rt_clone = rt.clone();
680 let config = DistributedConfig::from_settings(false);
681
682 rt_clone.primary().block_on(async move {
683 let drt = DistributedRuntime::new(rt, config).await.unwrap();
684 test_kv_create_or_validate(drt).await.unwrap();
685 });
686 }
687
688 async fn test_kv_create_or_validate(drt: DistributedRuntime) -> Result<()> {
689 let key = "__integration_test_key";
690 let value = b"test_value";
691
692 let client = drt.etcd_client().expect("etcd client should be available");
693 let lease_id = drt
694 .primary_lease()
695 .expect("primary lease should be available")
696 .id();
697
698 let result = client.kv_create(key, value.to_vec(), Some(lease_id)).await;
700 assert!(result.is_ok(), "");
701
702 let result = client.kv_create(key, value.to_vec(), Some(lease_id)).await;
704 assert!(result.is_err());
705
706 let result = client
708 .kv_create_or_validate(key.to_string(), value.to_vec(), Some(lease_id))
709 .await;
710 assert!(result.is_ok());
711
712 let different_value = b"different_value";
714 let result = client
715 .kv_create_or_validate(key.to_string(), different_value.to_vec(), Some(lease_id))
716 .await;
717 assert!(result.is_err(), "");
718
719 Ok(())
720 }
721
722 #[test]
723 fn test_kv_cache() {
724 let rt = Runtime::from_settings().unwrap();
725 let rt_clone = rt.clone();
726 let config = DistributedConfig::from_settings(false);
727
728 rt_clone.primary().block_on(async move {
729 let drt = DistributedRuntime::new(rt, config).await.unwrap();
730 test_kv_cache_operations(drt).await.unwrap();
731 });
732 }
733
734 async fn test_kv_cache_operations(drt: DistributedRuntime) -> Result<()> {
735 let client = drt.etcd_client().expect("etcd client should be available");
737
738 let test_id = uuid::Uuid::new_v4().to_string();
740 let prefix = format!("v1/test_kv_cache_{}/", test_id);
741
742 let mut initial_values = HashMap::new();
744 initial_values.insert("key1".to_string(), b"value1".to_vec());
745 initial_values.insert("key2".to_string(), b"value2".to_vec());
746
747 let kv_cache = KvCache::new(client.clone(), prefix.clone(), initial_values).await?;
749
750 let value1 = kv_cache.get("key1").await;
752 assert_eq!(value1, Some(b"value1".to_vec()));
753
754 let value2 = kv_cache.get("key2").await;
755 assert_eq!(value2, Some(b"value2".to_vec()));
756
757 let all_values = kv_cache.get_all().await;
759 assert_eq!(all_values.len(), 2);
760 assert_eq!(
761 all_values.get(&format!("{}key1", prefix)),
762 Some(&b"value1".to_vec())
763 );
764 assert_eq!(
765 all_values.get(&format!("{}key2", prefix)),
766 Some(&b"value2".to_vec())
767 );
768
769 kv_cache.put("key3", b"value3".to_vec(), None).await?;
771
772 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
774
775 let value3 = kv_cache.get("key3").await;
777 assert_eq!(value3, Some(b"value3".to_vec()));
778
779 kv_cache
781 .put("key1", b"updated_value1".to_vec(), None)
782 .await?;
783
784 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
786
787 let updated_value1 = kv_cache.get("key1").await;
789 assert_eq!(updated_value1, Some(b"updated_value1".to_vec()));
790
791 client
793 .kv_put(
794 &format!("{}key2", prefix),
795 b"external_update".to_vec(),
796 None,
797 )
798 .await?;
799
800 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
802
803 let external_update = kv_cache.get("key2").await;
805 assert_eq!(external_update, Some(b"external_update".to_vec()));
806
807 let etcd_client = client.etcd_client();
809 let _ = etcd_client
810 .kv_client()
811 .delete(
812 prefix,
813 Some(etcd_client::DeleteOptions::new().with_prefix()),
814 )
815 .await?;
816
817 Ok(())
818 }
819}