tokio_etcd/
lib.rs

1use std::{
2    sync::{Arc, Mutex, Weak},
3    time::Duration,
4};
5use tonic::Status;
6
7mod ids;
8pub mod lease;
9mod utils;
10pub mod watcher;
11
12pub use ids::{LeaseId, WatchId};
13use lease::{LeaseHandle, LeaseWorkerHandle, LeaseWorkerMessage};
14use tokio::sync::mpsc::{unbounded_channel, UnboundedSender, WeakUnboundedSender};
15pub use tokio_etcd_grpc_client::ClientEndpointConfig;
16use tokio_etcd_grpc_client::EtcdGrpcClient;
17
18pub struct Client {
19    grpc_client: EtcdGrpcClient,
20    watcher_singleton: WeakSingleton<watcher::WatcherHandle>,
21    lease_worker_singleton: WeakUnboundedSenderSingleton<LeaseWorkerMessage>,
22}
23
24impl Client {
25    pub fn new(
26        peers: impl IntoIterator<Item = impl AsRef<str>> + ExactSizeIterator,
27        endpoint_config: ClientEndpointConfig,
28    ) -> Self {
29        let grpc_client = tokio_etcd_grpc_client::client(peers, endpoint_config).unwrap();
30        Self {
31            grpc_client,
32            watcher_singleton: WeakSingleton::new(),
33            lease_worker_singleton: WeakUnboundedSenderSingleton::new(),
34        }
35    }
36
37    /// Updates the authentication token used by the client.
38    ///
39    /// This token will be used for all future requests made by the client. This method can be used to do live
40    /// token rotation, without having to create a new client.
41    pub fn set_auth_token(&self, token: http::HeaderValue) {
42        self.grpc_client.set_auth_token(token);
43    }
44
45    /// Clears the authentication token used by the client.
46    ///
47    /// This will cause the client to make requests without an authentication token.
48    pub fn clear_auth_token(&self) {
49        self.grpc_client.clear_auth_token();
50    }
51
52    /// Creates, or returns an existing [`Watcher`] that can be used to watch keys in etcd.
53    ///
54    /// If there is already an existing watcher, this method will return a clone of it, otherwise
55    /// it will create a new watcher. When all references to the watcher are dropped, the watcher will
56    /// be dropped as well.
57    ///
58    /// The watcher coalesces watch requests, so that multiple requests watching the same key will only
59    /// result in a single watch request being made to etcd.
60    pub fn watcher(&self) -> Arc<watcher::WatcherHandle> {
61        self.watcher_singleton.get_or_init(|| {
62            watcher::WatcherHandle::new(self.grpc_client.watch(), self.grpc_client.kv())
63        })
64    }
65
66    /// Acquires a lease with the given TTL, spawning a background worker to continue to keep-alive the lease
67    /// as long as the returned [`lease::LeaseHandle`] is alive.
68    ///
69    /// The lease handle provides methods for checking if the lease is still valid.
70    ///
71    /// `ttl` must be above 10 seconds.
72    pub async fn grant_lease(&self, ttl: Duration) -> Result<LeaseHandle, Status> {
73        let sender = self.lease_worker_singleton.get_or_init(|| {
74            let handle = LeaseWorkerHandle::spawn(self.grpc_client.lease());
75            handle.into_inner()
76        });
77
78        LeaseHandle::grant(
79            self.grpc_client.lease(),
80            LeaseWorkerHandle::from_sender(sender),
81            ttl,
82        )
83        .await
84    }
85}
86
87/// A weak singleton will only create a new instance of the inner type if there are no other strong
88/// references to it. If there are, it will return a strong reference to the existing instance.
89struct WeakSingleton<T> {
90    // optimization, fixme: we could make this an RwLock.
91    inner: Mutex<Weak<T>>,
92}
93
94impl<T> WeakSingleton<T> {
95    fn new() -> Self {
96        Self {
97            inner: Mutex::new(Weak::new()),
98        }
99    }
100
101    fn get_or_init(&self, init: impl FnOnce() -> T) -> Arc<T> {
102        let mut lock = self.inner.lock().unwrap();
103        if let Some(inner) = lock.upgrade() {
104            inner
105        } else {
106            let arc = Arc::new(init());
107            *lock = Arc::downgrade(&arc);
108            arc
109        }
110    }
111}
112
113struct WeakUnboundedSenderSingleton<T> {
114    inner: Mutex<WeakUnboundedSender<T>>,
115}
116
117impl<T> WeakUnboundedSenderSingleton<T> {
118    fn new() -> Self {
119        // This is kinda awful, but tokio provides no way to create a "weak unbounded sender" without first creating a tx/rx pair.
120        let (tx, rx) = unbounded_channel();
121        let weak_tx = tx.downgrade();
122        drop(rx);
123
124        Self {
125            inner: Mutex::new(weak_tx),
126        }
127    }
128
129    fn get_or_init(&self, init: impl FnOnce() -> UnboundedSender<T>) -> UnboundedSender<T> {
130        let mut lock = self.inner.lock().unwrap();
131        if let Some(inner) = lock.upgrade() {
132            inner
133        } else {
134            let tx = init();
135            let weak_tx = tx.downgrade();
136
137            *lock = weak_tx;
138            tx
139        }
140    }
141}