1use std::{
2 sync::{Arc, Mutex, Weak},
3 time::Duration,
4};
5use tonic::Status;
6
7mod ids;
8pub mod lease;
9mod utils;
10pub mod watcher;
11
12pub use ids::{LeaseId, WatchId};
13use lease::{LeaseHandle, LeaseWorkerHandle, LeaseWorkerMessage};
14use tokio::sync::mpsc::{unbounded_channel, UnboundedSender, WeakUnboundedSender};
15pub use tokio_etcd_grpc_client::ClientEndpointConfig;
16use tokio_etcd_grpc_client::EtcdGrpcClient;
17
18pub struct Client {
19 grpc_client: EtcdGrpcClient,
20 watcher_singleton: WeakSingleton<watcher::WatcherHandle>,
21 lease_worker_singleton: WeakUnboundedSenderSingleton<LeaseWorkerMessage>,
22}
23
24impl Client {
25 pub fn new(
26 peers: impl IntoIterator<Item = impl AsRef<str>> + ExactSizeIterator,
27 endpoint_config: ClientEndpointConfig,
28 ) -> Self {
29 let grpc_client = tokio_etcd_grpc_client::client(peers, endpoint_config).unwrap();
30 Self {
31 grpc_client,
32 watcher_singleton: WeakSingleton::new(),
33 lease_worker_singleton: WeakUnboundedSenderSingleton::new(),
34 }
35 }
36
37 pub fn set_auth_token(&self, token: http::HeaderValue) {
42 self.grpc_client.set_auth_token(token);
43 }
44
45 pub fn clear_auth_token(&self) {
49 self.grpc_client.clear_auth_token();
50 }
51
52 pub fn watcher(&self) -> Arc<watcher::WatcherHandle> {
61 self.watcher_singleton.get_or_init(|| {
62 watcher::WatcherHandle::new(self.grpc_client.watch(), self.grpc_client.kv())
63 })
64 }
65
66 pub async fn grant_lease(&self, ttl: Duration) -> Result<LeaseHandle, Status> {
73 let sender = self.lease_worker_singleton.get_or_init(|| {
74 let handle = LeaseWorkerHandle::spawn(self.grpc_client.lease());
75 handle.into_inner()
76 });
77
78 LeaseHandle::grant(
79 self.grpc_client.lease(),
80 LeaseWorkerHandle::from_sender(sender),
81 ttl,
82 )
83 .await
84 }
85}
86
87struct WeakSingleton<T> {
90 inner: Mutex<Weak<T>>,
92}
93
94impl<T> WeakSingleton<T> {
95 fn new() -> Self {
96 Self {
97 inner: Mutex::new(Weak::new()),
98 }
99 }
100
101 fn get_or_init(&self, init: impl FnOnce() -> T) -> Arc<T> {
102 let mut lock = self.inner.lock().unwrap();
103 if let Some(inner) = lock.upgrade() {
104 inner
105 } else {
106 let arc = Arc::new(init());
107 *lock = Arc::downgrade(&arc);
108 arc
109 }
110 }
111}
112
113struct WeakUnboundedSenderSingleton<T> {
114 inner: Mutex<WeakUnboundedSender<T>>,
115}
116
117impl<T> WeakUnboundedSenderSingleton<T> {
118 fn new() -> Self {
119 let (tx, rx) = unbounded_channel();
121 let weak_tx = tx.downgrade();
122 drop(rx);
123
124 Self {
125 inner: Mutex::new(weak_tx),
126 }
127 }
128
129 fn get_or_init(&self, init: impl FnOnce() -> UnboundedSender<T>) -> UnboundedSender<T> {
130 let mut lock = self.inner.lock().unwrap();
131 if let Some(inner) = lock.upgrade() {
132 inner
133 } else {
134 let tx = init();
135 let weak_tx = tx.downgrade();
136
137 *lock = weak_tx;
138 tx
139 }
140 }
141}