1use crate::auth::AuthService;
4use crate::error::Result;
5use crate::intercept::InterceptedChannel;
6use crate::rpc::pb::etcdserverpb::lease_client::LeaseClient as PbLeaseClient;
7use crate::rpc::pb::etcdserverpb::{
8 LeaseGrantRequest as PbLeaseGrantRequest, LeaseGrantResponse as PbLeaseGrantResponse,
9 LeaseKeepAliveRequest as PbLeaseKeepAliveRequest,
10 LeaseKeepAliveResponse as PbLeaseKeepAliveResponse, LeaseLeasesRequest as PbLeaseLeasesRequest,
11 LeaseLeasesResponse as PbLeaseLeasesResponse, LeaseRevokeRequest as PbLeaseRevokeRequest,
12 LeaseRevokeResponse as PbLeaseRevokeResponse, LeaseStatus as PbLeaseStatus,
13 LeaseTimeToLiveRequest as PbLeaseTimeToLiveRequest,
14 LeaseTimeToLiveResponse as PbLeaseTimeToLiveResponse,
15};
16use crate::rpc::ResponseHeader;
17use crate::vec::VecExt;
18use crate::Error;
19use http::HeaderValue;
20use std::pin::Pin;
21use std::sync::{Arc, RwLock};
22use std::task::{Context, Poll};
23use tokio::sync::mpsc::{channel, Sender};
24use tokio_stream::wrappers::ReceiverStream;
25use tokio_stream::Stream;
26use tonic::{IntoRequest, Request, Streaming};
27
28#[repr(transparent)]
30#[derive(Clone)]
31pub struct LeaseClient {
32 inner: PbLeaseClient<AuthService<InterceptedChannel>>,
33}
34
35impl LeaseClient {
36 #[inline]
38 pub(crate) fn new(
39 channel: InterceptedChannel,
40 auth_token: Arc<RwLock<Option<HeaderValue>>>,
41 ) -> Self {
42 let inner = PbLeaseClient::new(AuthService::new(channel, auth_token));
43 Self { inner }
44 }
45
46 #[inline]
50 pub async fn grant(
51 &mut self,
52 ttl: i64,
53 options: Option<LeaseGrantOptions>,
54 ) -> Result<LeaseGrantResponse> {
55 let resp = self
56 .inner
57 .lease_grant(options.unwrap_or_default().with_ttl(ttl))
58 .await?
59 .into_inner();
60 Ok(LeaseGrantResponse::new(resp))
61 }
62
63 #[inline]
65 pub async fn revoke(&mut self, id: i64) -> Result<LeaseRevokeResponse> {
66 let resp = self
67 .inner
68 .lease_revoke(LeaseRevokeOptions::new().with_id(id))
69 .await?
70 .into_inner();
71 Ok(LeaseRevokeResponse::new(resp))
72 }
73
74 #[inline]
77 pub async fn keep_alive(&mut self, id: i64) -> Result<(LeaseKeeper, LeaseKeepAliveStream)> {
78 let (sender, receiver) = channel::<PbLeaseKeepAliveRequest>(100);
79 sender
80 .send(LeaseKeepAliveOptions::new().with_id(id).into())
81 .await
82 .map_err(|e| Error::LeaseKeepAliveError(e.to_string()))?;
83
84 let receiver = ReceiverStream::new(receiver);
85
86 let mut stream = self.inner.lease_keep_alive(receiver).await?.into_inner();
87
88 let id = match stream.message().await? {
89 Some(resp) => {
90 if resp.ttl <= 0 {
91 return Err(Error::LeaseKeepAliveError("lease not found".to_string()));
92 }
93 resp.id
94 }
95 None => {
96 return Err(Error::WatchError(
97 "failed to create lease keeper".to_string(),
98 ));
99 }
100 };
101
102 Ok((
103 LeaseKeeper::new(id, sender),
104 LeaseKeepAliveStream::new(stream),
105 ))
106 }
107
108 #[inline]
110 pub async fn time_to_live(
111 &mut self,
112 id: i64,
113 options: Option<LeaseTimeToLiveOptions>,
114 ) -> Result<LeaseTimeToLiveResponse> {
115 let resp = self
116 .inner
117 .lease_time_to_live(options.unwrap_or_default().with_id(id))
118 .await?
119 .into_inner();
120 Ok(LeaseTimeToLiveResponse::new(resp))
121 }
122
123 #[inline]
125 pub async fn leases(&mut self) -> Result<LeaseLeasesResponse> {
126 let resp = self
127 .inner
128 .lease_leases(PbLeaseLeasesRequest {})
129 .await?
130 .into_inner();
131 Ok(LeaseLeasesResponse::new(resp))
132 }
133}
134
135#[derive(Debug, Default, Clone)]
137#[repr(transparent)]
138pub struct LeaseGrantOptions(PbLeaseGrantRequest);
139
140impl LeaseGrantOptions {
141 #[inline]
143 const fn with_ttl(mut self, ttl: i64) -> Self {
144 self.0.ttl = ttl;
145 self
146 }
147
148 #[inline]
150 pub const fn with_id(mut self, id: i64) -> Self {
151 self.0.id = id;
152 self
153 }
154
155 #[inline]
157 pub const fn new() -> Self {
158 Self(PbLeaseGrantRequest { ttl: 0, id: 0 })
159 }
160}
161
162impl From<LeaseGrantOptions> for PbLeaseGrantRequest {
163 #[inline]
164 fn from(options: LeaseGrantOptions) -> Self {
165 options.0
166 }
167}
168
169impl IntoRequest<PbLeaseGrantRequest> for LeaseGrantOptions {
170 #[inline]
171 fn into_request(self) -> Request<PbLeaseGrantRequest> {
172 Request::new(self.into())
173 }
174}
175
176#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
178#[derive(Debug, Clone)]
179#[repr(transparent)]
180pub struct LeaseGrantResponse(PbLeaseGrantResponse);
181
182impl LeaseGrantResponse {
183 #[inline]
185 const fn new(resp: PbLeaseGrantResponse) -> Self {
186 Self(resp)
187 }
188
189 #[inline]
191 pub fn header(&self) -> Option<&ResponseHeader> {
192 self.0.header.as_ref().map(From::from)
193 }
194
195 #[inline]
197 pub fn take_header(&mut self) -> Option<ResponseHeader> {
198 self.0.header.take().map(ResponseHeader::new)
199 }
200
201 #[inline]
203 pub const fn ttl(&self) -> i64 {
204 self.0.ttl
205 }
206
207 #[inline]
209 pub const fn id(&self) -> i64 {
210 self.0.id
211 }
212
213 #[inline]
215 pub fn error(&self) -> &str {
216 &self.0.error
217 }
218}
219
220#[derive(Debug, Default, Clone)]
222#[repr(transparent)]
223struct LeaseRevokeOptions(PbLeaseRevokeRequest);
224
225impl LeaseRevokeOptions {
226 #[inline]
228 fn with_id(mut self, id: i64) -> Self {
229 self.0.id = id;
230 self
231 }
232
233 #[inline]
235 pub const fn new() -> Self {
236 Self(PbLeaseRevokeRequest { id: 0 })
237 }
238}
239
240impl From<LeaseRevokeOptions> for PbLeaseRevokeRequest {
241 #[inline]
242 fn from(options: LeaseRevokeOptions) -> Self {
243 options.0
244 }
245}
246
247impl IntoRequest<PbLeaseRevokeRequest> for LeaseRevokeOptions {
248 #[inline]
249 fn into_request(self) -> Request<PbLeaseRevokeRequest> {
250 Request::new(self.into())
251 }
252}
253
254#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
256#[derive(Debug, Clone)]
257#[repr(transparent)]
258pub struct LeaseRevokeResponse(PbLeaseRevokeResponse);
259
260impl LeaseRevokeResponse {
261 #[inline]
263 const fn new(resp: PbLeaseRevokeResponse) -> Self {
264 Self(resp)
265 }
266
267 #[inline]
269 pub fn header(&self) -> Option<&ResponseHeader> {
270 self.0.header.as_ref().map(From::from)
271 }
272
273 #[inline]
275 pub fn take_header(&mut self) -> Option<ResponseHeader> {
276 self.0.header.take().map(ResponseHeader::new)
277 }
278}
279
280#[derive(Debug, Default, Clone)]
282#[repr(transparent)]
283struct LeaseKeepAliveOptions(PbLeaseKeepAliveRequest);
284
285impl LeaseKeepAliveOptions {
286 #[inline]
288 fn with_id(mut self, id: i64) -> Self {
289 self.0.id = id;
290 self
291 }
292
293 #[inline]
295 pub const fn new() -> Self {
296 Self(PbLeaseKeepAliveRequest { id: 0 })
297 }
298}
299
300impl From<LeaseKeepAliveOptions> for PbLeaseKeepAliveRequest {
301 #[inline]
302 fn from(options: LeaseKeepAliveOptions) -> Self {
303 options.0
304 }
305}
306
307impl IntoRequest<PbLeaseKeepAliveRequest> for LeaseKeepAliveOptions {
308 #[inline]
309 fn into_request(self) -> Request<PbLeaseKeepAliveRequest> {
310 Request::new(self.into())
311 }
312}
313
314#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
316#[derive(Debug, Clone)]
317#[repr(transparent)]
318pub struct LeaseKeepAliveResponse(PbLeaseKeepAliveResponse);
319
320impl LeaseKeepAliveResponse {
321 #[inline]
323 const fn new(resp: PbLeaseKeepAliveResponse) -> Self {
324 Self(resp)
325 }
326
327 #[inline]
329 pub fn header(&self) -> Option<&ResponseHeader> {
330 self.0.header.as_ref().map(From::from)
331 }
332
333 #[inline]
335 pub fn take_header(&mut self) -> Option<ResponseHeader> {
336 self.0.header.take().map(ResponseHeader::new)
337 }
338
339 #[inline]
341 pub const fn ttl(&self) -> i64 {
342 self.0.ttl
343 }
344
345 #[inline]
347 pub const fn id(&self) -> i64 {
348 self.0.id
349 }
350}
351
352#[derive(Debug, Default, Clone)]
354#[repr(transparent)]
355pub struct LeaseTimeToLiveOptions(PbLeaseTimeToLiveRequest);
356
357impl LeaseTimeToLiveOptions {
358 #[inline]
360 const fn with_id(mut self, id: i64) -> Self {
361 self.0.id = id;
362 self
363 }
364
365 #[inline]
367 pub const fn with_keys(mut self) -> Self {
368 self.0.keys = true;
369 self
370 }
371
372 #[inline]
374 pub const fn new() -> Self {
375 Self(PbLeaseTimeToLiveRequest { id: 0, keys: false })
376 }
377}
378
379impl From<LeaseTimeToLiveOptions> for PbLeaseTimeToLiveRequest {
380 #[inline]
381 fn from(options: LeaseTimeToLiveOptions) -> Self {
382 options.0
383 }
384}
385
386impl IntoRequest<PbLeaseTimeToLiveRequest> for LeaseTimeToLiveOptions {
387 #[inline]
388 fn into_request(self) -> Request<PbLeaseTimeToLiveRequest> {
389 Request::new(self.into())
390 }
391}
392
393#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
395#[derive(Debug, Clone)]
396#[repr(transparent)]
397pub struct LeaseTimeToLiveResponse(PbLeaseTimeToLiveResponse);
398
399impl LeaseTimeToLiveResponse {
400 #[inline]
402 const fn new(resp: PbLeaseTimeToLiveResponse) -> Self {
403 Self(resp)
404 }
405
406 #[inline]
408 pub fn header(&self) -> Option<&ResponseHeader> {
409 self.0.header.as_ref().map(From::from)
410 }
411
412 #[inline]
414 pub fn take_header(&mut self) -> Option<ResponseHeader> {
415 self.0.header.take().map(ResponseHeader::new)
416 }
417
418 #[inline]
420 pub const fn ttl(&self) -> i64 {
421 self.0.ttl
422 }
423
424 #[inline]
426 pub const fn id(&self) -> i64 {
427 self.0.id
428 }
429
430 #[inline]
432 pub const fn granted_ttl(&self) -> i64 {
433 self.0.granted_ttl
434 }
435
436 #[inline]
438 pub fn keys(&self) -> &[Vec<u8>] {
439 &self.0.keys
440 }
441
442 #[inline]
443 pub(crate) fn strip_keys_prefix(&mut self, prefix: &[u8]) {
444 self.0.keys.iter_mut().for_each(|key| {
445 key.strip_key_prefix(prefix);
446 });
447 }
448}
449
450#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
452#[derive(Debug, Clone)]
453#[repr(transparent)]
454pub struct LeaseLeasesResponse(PbLeaseLeasesResponse);
455
456impl LeaseLeasesResponse {
457 #[inline]
459 const fn new(resp: PbLeaseLeasesResponse) -> Self {
460 Self(resp)
461 }
462
463 #[inline]
465 pub fn header(&self) -> Option<&ResponseHeader> {
466 self.0.header.as_ref().map(From::from)
467 }
468
469 #[inline]
471 pub fn take_header(&mut self) -> Option<ResponseHeader> {
472 self.0.header.take().map(ResponseHeader::new)
473 }
474
475 #[inline]
477 pub fn leases(&self) -> &[LeaseStatus] {
478 unsafe { &*(self.0.leases.as_slice() as *const _ as *const [LeaseStatus]) }
479 }
480}
481
482#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
484#[derive(Debug, Clone, PartialEq)]
485#[repr(transparent)]
486pub struct LeaseStatus(PbLeaseStatus);
487
488impl LeaseStatus {
489 #[inline]
491 pub const fn id(&self) -> i64 {
492 self.0.id
493 }
494}
495
496impl From<&PbLeaseStatus> for &LeaseStatus {
497 #[inline]
498 fn from(src: &PbLeaseStatus) -> Self {
499 unsafe { &*(src as *const _ as *const LeaseStatus) }
500 }
501}
502
503#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
505#[derive(Debug)]
506pub struct LeaseKeeper {
507 id: i64,
508 sender: Sender<PbLeaseKeepAliveRequest>,
509}
510
511impl LeaseKeeper {
512 #[inline]
514 const fn new(id: i64, sender: Sender<PbLeaseKeepAliveRequest>) -> Self {
515 Self { id, sender }
516 }
517
518 #[inline]
520 pub const fn id(&self) -> i64 {
521 self.id
522 }
523
524 #[inline]
526 pub async fn keep_alive(&mut self) -> Result<()> {
527 self.sender
528 .send(LeaseKeepAliveOptions::new().with_id(self.id).into())
529 .await
530 .map_err(|e| Error::LeaseKeepAliveError(e.to_string()))
531 }
532}
533
534#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
536#[derive(Debug)]
537pub struct LeaseKeepAliveStream {
538 stream: Streaming<PbLeaseKeepAliveResponse>,
539}
540
541impl LeaseKeepAliveStream {
542 #[inline]
544 const fn new(stream: Streaming<PbLeaseKeepAliveResponse>) -> Self {
545 Self { stream }
546 }
547
548 #[inline]
550 pub async fn message(&mut self) -> Result<Option<LeaseKeepAliveResponse>> {
551 match self.stream.message().await? {
552 Some(resp) => Ok(Some(LeaseKeepAliveResponse::new(resp))),
553 None => Ok(None),
554 }
555 }
556}
557
558impl Stream for LeaseKeepAliveStream {
559 type Item = Result<LeaseKeepAliveResponse>;
560
561 #[inline]
562 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
563 Pin::new(&mut self.get_mut().stream)
564 .poll_next(cx)
565 .map(|t| match t {
566 Some(Ok(resp)) => Some(Ok(LeaseKeepAliveResponse::new(resp))),
567 Some(Err(e)) => Some(Err(From::from(e))),
568 None => None,
569 })
570 }
571}