1use std::time::Duration;
2
3use async_trait::async_trait;
4use tokio::sync::mpsc::channel;
5use tokio_stream::wrappers::ReceiverStream;
6use tonic::{
7 codegen::InterceptedService,
8 metadata::{Ascii, MetadataValue},
9 service::Interceptor,
10 transport::Channel,
11 Request, Status,
12};
13
14use crate::auth::{AuthOp, AuthenticateRequest, AuthenticateResponse};
15use crate::cluster::{
16 ClusterOp, MemberAddRequest, MemberAddResponse, MemberListRequest, MemberListResponse,
17 MemberRemoveRequest, MemberRemoveResponse, MemberUpdateRequest, MemberUpdateResponse,
18};
19use crate::kv::{
20 CompactRequest, CompactResponse, DeleteRequest, DeleteResponse, KeyRange, KeyValueOp,
21 PutRequest, PutResponse, RangeRequest, RangeResponse, TxnRequest, TxnResponse,
22};
23use crate::lease::{
24 LeaseGrantRequest, LeaseGrantResponse, LeaseId, LeaseKeepAlive, LeaseOp, LeaseRevokeRequest,
25 LeaseRevokeResponse, LeaseTimeToLiveRequest, LeaseTimeToLiveResponse,
26};
27use crate::proto::etcdserverpb;
28use crate::proto::etcdserverpb::cluster_client::ClusterClient;
29use crate::proto::etcdserverpb::maintenance_client::MaintenanceClient;
30use crate::proto::etcdserverpb::LeaseKeepAliveRequest;
31use crate::proto::etcdserverpb::{
32 auth_client::AuthClient, kv_client::KvClient, lease_client::LeaseClient,
33 watch_client::WatchClient,
34};
35use crate::watch::{WatchCanceler, WatchCreateRequest, WatchOp, WatchStream};
36use crate::{Error, Result};
37
38#[derive(Clone)]
39pub struct TokenInterceptor {
40 token: Option<MetadataValue<Ascii>>,
41}
42
43impl TokenInterceptor {
44 fn new(token: Option<String>) -> Self {
45 Self {
46 token: token.map(|token: String| MetadataValue::try_from(&token).unwrap()),
47 }
48 }
49}
50
51impl Interceptor for TokenInterceptor {
52 fn call(&mut self, mut req: tonic::Request<()>) -> std::result::Result<Request<()>, Status> {
53 match &self.token {
54 Some(token) => {
55 req.metadata_mut().insert("authorization", token.clone());
56 Ok(req)
57 }
58 None => Ok(req),
59 }
60 }
61}
62
63#[cfg(feature = "tls")]
64#[derive(Debug, Clone)]
65enum TlsOption {
66 None,
67 WithConfig(tonic::transport::ClientTlsConfig),
68}
69
70#[cfg(not(feature = "tls"))]
71#[derive(Debug, Clone)]
72enum TlsOption {
73 None,
74}
75
76#[derive(Debug, Clone)]
77pub struct Endpoint {
78 url: String,
79
80 tls_opt: TlsOption,
81}
82
83impl Endpoint {
84 pub fn new(url: impl Into<String>) -> Self {
85 Self {
86 url: url.into(),
87 tls_opt: TlsOption::None,
88 }
89 }
90
91 #[cfg(feature = "tls")]
92 pub fn tls_raw(
93 mut self,
94 domain_name: impl Into<String>,
95 ca_cert: impl AsRef<[u8]>,
96 client_cert: impl AsRef<[u8]>,
97 client_key: impl AsRef<[u8]>,
98 ) -> Self {
99 use tonic::transport::{Certificate, ClientTlsConfig, Identity};
100
101 let certificate = Certificate::from_pem(ca_cert);
102 let identity = Identity::from_pem(client_cert, client_key);
103
104 self.tls_opt = TlsOption::WithConfig(
105 ClientTlsConfig::new()
106 .domain_name(domain_name)
107 .ca_certificate(certificate)
108 .identity(identity),
109 );
110
111 self
112 }
113
114 #[cfg(feature = "tls")]
115 pub async fn tls(
116 self,
117 domain_name: impl Into<String>,
118 ca_cert_path: impl AsRef<std::path::Path>,
119 client_cert_path: impl AsRef<std::path::Path>,
120 client_key_path: impl AsRef<std::path::Path>,
121 ) -> Result<Self> {
122 use tokio::fs::read;
123
124 let ca_cert = read(ca_cert_path).await?;
125
126 let client_cert = read(client_cert_path).await?;
127 let client_key = read(client_key_path).await?;
128
129 Ok(self.tls_raw(domain_name, ca_cert, client_cert, client_key))
130 }
131}
132
133impl<T> From<T> for Endpoint
134where
135 T: Into<String>,
136{
137 fn from(url: T) -> Self {
138 Self {
139 url: url.into(),
140 tls_opt: TlsOption::None,
141 }
142 }
143}
144
145#[derive(Clone, Debug)]
147pub struct ClientConfig {
148 pub endpoints: Vec<Endpoint>,
149 pub auth: Option<(String, String)>,
150 pub connect_timeout: Duration,
151 pub http2_keep_alive_interval: Duration,
152}
153
154impl ClientConfig {
155 pub fn new(endpoints: impl Into<Vec<Endpoint>>) -> Self {
156 Self {
157 endpoints: endpoints.into(),
158 auth: None,
159 connect_timeout: Duration::from_secs(30),
160 http2_keep_alive_interval: Duration::from_secs(5),
161 }
162 }
163
164 pub fn auth(mut self, name: impl Into<String>, password: impl Into<String>) -> Self {
165 self.auth = Some((name.into(), password.into()));
166 self
167 }
168
169 pub fn connect_timeout(mut self, timeout: Duration) -> Self {
170 self.connect_timeout = timeout;
171 self
172 }
173
174 pub fn http2_keep_alive_interval(mut self, interval: Duration) -> Self {
175 self.http2_keep_alive_interval = interval;
176 self
177 }
178}
179
180#[derive(Clone)]
182pub struct Client {
183 auth_client: AuthClient<InterceptedService<Channel, TokenInterceptor>>,
184 kv_client: KvClient<InterceptedService<Channel, TokenInterceptor>>,
185 watch_client: WatchClient<InterceptedService<Channel, TokenInterceptor>>,
186 cluster_client: ClusterClient<InterceptedService<Channel, TokenInterceptor>>,
187 maintenance_client: MaintenanceClient<InterceptedService<Channel, TokenInterceptor>>,
188 lease_client: LeaseClient<InterceptedService<Channel, TokenInterceptor>>,
189}
190
191impl Client {
192 pub fn with_channel(channel: Channel, token: Option<String>) -> Self {
196 let auth_interceptor = TokenInterceptor::new(token);
197
198 let auth_client = AuthClient::with_interceptor(channel.clone(), auth_interceptor.clone());
199 let kv_client = KvClient::with_interceptor(channel.clone(), auth_interceptor.clone());
200 let watch_client = WatchClient::with_interceptor(channel.clone(), auth_interceptor.clone());
201 let cluster_client =
202 ClusterClient::with_interceptor(channel.clone(), auth_interceptor.clone());
203 let maintenance_client =
204 MaintenanceClient::with_interceptor(channel.clone(), auth_interceptor.clone());
205 let lease_client = LeaseClient::with_interceptor(channel, auth_interceptor);
206
207 Self {
208 auth_client,
209 kv_client,
210 watch_client,
211 cluster_client,
212 maintenance_client,
213 lease_client,
214 }
215 }
216
217 pub async fn connect_with_token(cfg: &ClientConfig, token: Option<String>) -> Result<Self> {
218 let channel = {
219 let mut endpoints = Vec::with_capacity(cfg.endpoints.len());
220 for e in cfg.endpoints.iter() {
221 let mut c = Channel::from_shared(e.url.clone())?
222 .connect_timeout(cfg.connect_timeout)
223 .http2_keep_alive_interval(cfg.http2_keep_alive_interval);
224
225 #[cfg(feature = "tls")]
226 {
227 if let TlsOption::WithConfig(tls) = e.tls_opt.clone() {
228 c = c.tls_config(tls)?;
229 }
230 }
231
232 endpoints.push(c);
233 }
234
235 Channel::balance_list(endpoints.into_iter())
236 };
237
238 Ok(Self::with_channel(channel, token))
239 }
240
241 pub async fn connect(mut cfg: ClientConfig) -> Result<Self> {
246 let cli = Self::connect_with_token(&cfg, None).await?;
247
248 match cfg.auth.take() {
249 Some((name, password)) => {
250 let token = cli.authenticate((name, password)).await?.token;
251
252 Self::connect_with_token(&cfg, Some(token)).await
253 }
254 None => Ok(cli),
255 }
256 }
257}
258
259#[async_trait]
260impl AuthOp for Client {
261 async fn authenticate<R>(&self, req: R) -> Result<AuthenticateResponse>
262 where
263 R: Into<AuthenticateRequest> + Send,
264 {
265 let req = tonic::Request::new(req.into().into());
266 let resp = self.auth_client.clone().authenticate(req).await?;
267
268 Ok(resp.into_inner().into())
269 }
270}
271
272#[async_trait]
273impl KeyValueOp for Client {
274 async fn put<R>(&self, req: R) -> Result<PutResponse>
275 where
276 R: Into<PutRequest> + Send,
277 {
278 let req = tonic::Request::new(req.into().into());
279 let resp = self.kv_client.clone().put(req).await?;
280
281 Ok(resp.into_inner().into())
282 }
283
284 async fn get<R>(&self, req: R) -> Result<RangeResponse>
285 where
286 R: Into<RangeRequest> + Send,
287 {
288 let req = tonic::Request::new(req.into().into());
289 let resp = self.kv_client.clone().range(req).await?;
290
291 Ok(resp.into_inner().into())
292 }
293
294 async fn get_all(&self) -> Result<RangeResponse> {
295 self.get(KeyRange::all()).await
296 }
297
298 async fn get_by_prefix<K>(&self, p: K) -> Result<RangeResponse>
299 where
300 K: Into<Vec<u8>> + Send,
301 {
302 self.get(KeyRange::prefix(p)).await
303 }
304
305 async fn get_range<F, E>(&self, from: F, end: E) -> Result<RangeResponse>
306 where
307 F: Into<Vec<u8>> + Send,
308 E: Into<Vec<u8>> + Send,
309 {
310 self.get(KeyRange::range(from, end)).await
311 }
312
313 async fn delete<R>(&self, req: R) -> Result<DeleteResponse>
314 where
315 R: Into<DeleteRequest> + Send,
316 {
317 let req = tonic::Request::new(req.into().into());
318 let resp = self.kv_client.clone().delete_range(req).await?;
319
320 Ok(resp.into_inner().into())
321 }
322
323 async fn delete_all(&self) -> Result<DeleteResponse> {
324 self.delete(KeyRange::all()).await
325 }
326
327 async fn delete_by_prefix<K>(&self, p: K) -> Result<DeleteResponse>
328 where
329 K: Into<Vec<u8>> + Send,
330 {
331 self.delete(KeyRange::prefix(p)).await
332 }
333
334 async fn delete_range<F, E>(&self, from: F, end: E) -> Result<DeleteResponse>
335 where
336 F: Into<Vec<u8>> + Send,
337 E: Into<Vec<u8>> + Send,
338 {
339 self.delete(KeyRange::range(from, end)).await
340 }
341
342 async fn txn<R>(&self, req: R) -> Result<TxnResponse>
343 where
344 R: Into<TxnRequest> + Send,
345 {
346 let req = tonic::Request::new(req.into().into());
347 let resp = self.kv_client.clone().txn(req).await?;
348
349 Ok(resp.into_inner().into())
350 }
351
352 async fn compact<R>(&self, req: R) -> Result<CompactResponse>
353 where
354 R: Into<CompactRequest> + Send,
355 {
356 let req = tonic::Request::new(req.into().into());
357 let resp = self.kv_client.clone().compact(req).await?;
358
359 Ok(resp.into_inner().into())
360 }
361}
362
363#[async_trait]
364impl WatchOp for Client {
365 async fn watch<R>(&self, req: R) -> Result<(WatchStream, WatchCanceler)>
366 where
367 R: Into<WatchCreateRequest> + Send,
368 {
369 let (tx, rx) = channel::<etcdserverpb::WatchRequest>(128);
370
371 tx.send(req.into().into()).await?;
372
373 let mut req = tonic::Request::new(ReceiverStream::new(rx));
374
375 req.metadata_mut()
376 .insert("hasleader", "true".try_into().unwrap());
377
378 let resp = self.watch_client.clone().watch(req).await?;
379
380 let mut inbound = resp.into_inner();
381
382 let watch_id = match inbound.message().await? {
383 Some(resp) => {
384 if !resp.created {
385 return Err(Error::WatchEvent(
386 "should receive created event at first".to_owned(),
387 ));
388 }
389 assert!(resp.events.is_empty(), "received created event {:?}", resp);
390 resp.watch_id
391 }
392
393 None => return Err(Error::CreateWatch),
394 };
395
396 Ok((WatchStream::new(inbound), WatchCanceler::new(watch_id, tx)))
397 }
398}
399
400#[async_trait]
401impl LeaseOp for Client {
402 async fn grant_lease<R>(&self, req: R) -> Result<LeaseGrantResponse>
403 where
404 R: Into<LeaseGrantRequest> + Send,
405 {
406 let req = tonic::Request::new(req.into().into());
407 let resp = self.lease_client.clone().lease_grant(req).await?;
408 Ok(resp.into_inner().into())
409 }
410
411 async fn revoke<R>(&self, req: R) -> Result<LeaseRevokeResponse>
412 where
413 R: Into<LeaseRevokeRequest> + Send,
414 {
415 let req = tonic::Request::new(req.into().into());
416 let resp = self.lease_client.clone().lease_revoke(req).await?;
417 Ok(resp.into_inner().into())
418 }
419
420 async fn keep_alive_for(&self, lease_id: LeaseId) -> Result<LeaseKeepAlive> {
421 let (req_tx, req_rx) = channel(1024);
422
423 let req_rx = ReceiverStream::new(req_rx);
424
425 let initial_req = LeaseKeepAliveRequest { id: lease_id };
426
427 req_tx
428 .send(initial_req)
429 .await
430 .map_err(|_| Error::ChannelClosed)?;
431
432 let mut resp_rx = self
433 .lease_client
434 .clone()
435 .lease_keep_alive(req_rx)
436 .await?
437 .into_inner();
438
439 let lease_id = match resp_rx.message().await? {
440 Some(resp) => resp.id,
441 None => {
442 return Err(Error::CreateWatch);
443 }
444 };
445
446 Ok(LeaseKeepAlive::new(lease_id, req_tx, resp_rx))
447 }
448
449 async fn time_to_live<R>(&self, req: R) -> Result<LeaseTimeToLiveResponse>
450 where
451 R: Into<LeaseTimeToLiveRequest> + Send,
452 {
453 let req = tonic::Request::new(req.into().into());
454 let resp = self.lease_client.clone().lease_time_to_live(req).await?;
455 Ok(resp.into_inner().into())
456 }
457}
458
459#[async_trait]
460impl ClusterOp for Client {
461 async fn member_add<R>(&self, req: R) -> Result<MemberAddResponse>
462 where
463 R: Into<MemberAddRequest> + Send,
464 {
465 let req = tonic::Request::new(req.into().into());
466 let resp = self.cluster_client.clone().member_add(req).await?;
467
468 Ok(resp.into_inner().into())
469 }
470
471 async fn member_remove<R>(&self, req: R) -> Result<MemberRemoveResponse>
472 where
473 R: Into<MemberRemoveRequest> + Send,
474 {
475 let req = tonic::Request::new(req.into().into());
476 let resp = self.cluster_client.clone().member_remove(req).await?;
477
478 Ok(resp.into_inner().into())
479 }
480
481 async fn member_update<R>(&self, req: R) -> Result<MemberUpdateResponse>
482 where
483 R: Into<MemberUpdateRequest> + Send,
484 {
485 let req = tonic::Request::new(req.into().into());
486 let resp = self.cluster_client.clone().member_update(req).await?;
487
488 Ok(resp.into_inner().into())
489 }
490
491 async fn member_list(&self) -> Result<MemberListResponse> {
492 let req = tonic::Request::new(MemberListRequest::new().into());
493 let resp = self.cluster_client.clone().member_list(req).await?;
494
495 Ok(resp.into_inner().into())
496 }
497}