etcd_client/rpc/
lease.rs

1//! Etcd Lease RPC.
2
3use crate::auth::AuthService;
4use crate::error::Result;
5use crate::intercept::InterceptedChannel;
6use crate::rpc::pb::etcdserverpb::lease_client::LeaseClient as PbLeaseClient;
7use crate::rpc::pb::etcdserverpb::{
8    LeaseGrantRequest as PbLeaseGrantRequest, LeaseGrantResponse as PbLeaseGrantResponse,
9    LeaseKeepAliveRequest as PbLeaseKeepAliveRequest,
10    LeaseKeepAliveResponse as PbLeaseKeepAliveResponse, LeaseLeasesRequest as PbLeaseLeasesRequest,
11    LeaseLeasesResponse as PbLeaseLeasesResponse, LeaseRevokeRequest as PbLeaseRevokeRequest,
12    LeaseRevokeResponse as PbLeaseRevokeResponse, LeaseStatus as PbLeaseStatus,
13    LeaseTimeToLiveRequest as PbLeaseTimeToLiveRequest,
14    LeaseTimeToLiveResponse as PbLeaseTimeToLiveResponse,
15};
16use crate::rpc::ResponseHeader;
17use crate::vec::VecExt;
18use crate::Error;
19use http::HeaderValue;
20use std::pin::Pin;
21use std::sync::{Arc, RwLock};
22use std::task::{Context, Poll};
23use tokio::sync::mpsc::{channel, Sender};
24use tokio_stream::wrappers::ReceiverStream;
25use tokio_stream::Stream;
26use tonic::{IntoRequest, Request, Streaming};
27
28/// Client for lease operations.
29#[repr(transparent)]
30#[derive(Clone)]
31pub struct LeaseClient {
32    inner: PbLeaseClient<AuthService<InterceptedChannel>>,
33}
34
35impl LeaseClient {
36    /// Creates a `LeaseClient`.
37    #[inline]
38    pub(crate) fn new(
39        channel: InterceptedChannel,
40        auth_token: Arc<RwLock<Option<HeaderValue>>>,
41    ) -> Self {
42        let inner = PbLeaseClient::new(AuthService::new(channel, auth_token));
43        Self { inner }
44    }
45
46    /// Creates a lease which expires if the server does not receive a keepAlive
47    /// within a given time to live period. All keys attached to the lease will be expired and
48    /// deleted if the lease expires. Each expired key generates a delete event in the event history.
49    #[inline]
50    pub async fn grant(
51        &mut self,
52        ttl: i64,
53        options: Option<LeaseGrantOptions>,
54    ) -> Result<LeaseGrantResponse> {
55        let resp = self
56            .inner
57            .lease_grant(options.unwrap_or_default().with_ttl(ttl))
58            .await?
59            .into_inner();
60        Ok(LeaseGrantResponse::new(resp))
61    }
62
63    /// Revokes a lease. All keys attached to the lease will expire and be deleted.
64    #[inline]
65    pub async fn revoke(&mut self, id: i64) -> Result<LeaseRevokeResponse> {
66        let resp = self
67            .inner
68            .lease_revoke(LeaseRevokeOptions::new().with_id(id))
69            .await?
70            .into_inner();
71        Ok(LeaseRevokeResponse::new(resp))
72    }
73
74    /// Keeps the lease alive by streaming keep alive requests from the client
75    /// to the server and streaming keep alive responses from the server to the client.
76    #[inline]
77    pub async fn keep_alive(&mut self, id: i64) -> Result<(LeaseKeeper, LeaseKeepAliveStream)> {
78        let (sender, receiver) = channel::<PbLeaseKeepAliveRequest>(100);
79        sender
80            .send(LeaseKeepAliveOptions::new().with_id(id).into())
81            .await
82            .map_err(|e| Error::LeaseKeepAliveError(e.to_string()))?;
83
84        let receiver = ReceiverStream::new(receiver);
85
86        let mut stream = self.inner.lease_keep_alive(receiver).await?.into_inner();
87
88        let id = match stream.message().await? {
89            Some(resp) => {
90                if resp.ttl <= 0 {
91                    return Err(Error::LeaseKeepAliveError("lease not found".to_string()));
92                }
93                resp.id
94            }
95            None => {
96                return Err(Error::WatchError(
97                    "failed to create lease keeper".to_string(),
98                ));
99            }
100        };
101
102        Ok((
103            LeaseKeeper::new(id, sender),
104            LeaseKeepAliveStream::new(stream),
105        ))
106    }
107
108    /// Retrieves lease information.
109    #[inline]
110    pub async fn time_to_live(
111        &mut self,
112        id: i64,
113        options: Option<LeaseTimeToLiveOptions>,
114    ) -> Result<LeaseTimeToLiveResponse> {
115        let resp = self
116            .inner
117            .lease_time_to_live(options.unwrap_or_default().with_id(id))
118            .await?
119            .into_inner();
120        Ok(LeaseTimeToLiveResponse::new(resp))
121    }
122
123    /// Lists all existing leases.
124    #[inline]
125    pub async fn leases(&mut self) -> Result<LeaseLeasesResponse> {
126        let resp = self
127            .inner
128            .lease_leases(PbLeaseLeasesRequest {})
129            .await?
130            .into_inner();
131        Ok(LeaseLeasesResponse::new(resp))
132    }
133}
134
135/// Options for `Grant` operation.
136#[derive(Debug, Default, Clone)]
137#[repr(transparent)]
138pub struct LeaseGrantOptions(PbLeaseGrantRequest);
139
140impl LeaseGrantOptions {
141    /// Set ttl
142    #[inline]
143    const fn with_ttl(mut self, ttl: i64) -> Self {
144        self.0.ttl = ttl;
145        self
146    }
147
148    /// Set id
149    #[inline]
150    pub const fn with_id(mut self, id: i64) -> Self {
151        self.0.id = id;
152        self
153    }
154
155    /// Creates a `LeaseGrantOptions`.
156    #[inline]
157    pub const fn new() -> Self {
158        Self(PbLeaseGrantRequest { ttl: 0, id: 0 })
159    }
160}
161
162impl From<LeaseGrantOptions> for PbLeaseGrantRequest {
163    #[inline]
164    fn from(options: LeaseGrantOptions) -> Self {
165        options.0
166    }
167}
168
169impl IntoRequest<PbLeaseGrantRequest> for LeaseGrantOptions {
170    #[inline]
171    fn into_request(self) -> Request<PbLeaseGrantRequest> {
172        Request::new(self.into())
173    }
174}
175
176/// Response for `Grant` operation.
177#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
178#[derive(Debug, Clone)]
179#[repr(transparent)]
180pub struct LeaseGrantResponse(PbLeaseGrantResponse);
181
182impl LeaseGrantResponse {
183    /// Creates a new `LeaseGrantResponse` from pb lease grant response.
184    #[inline]
185    const fn new(resp: PbLeaseGrantResponse) -> Self {
186        Self(resp)
187    }
188
189    /// Get response header.
190    #[inline]
191    pub fn header(&self) -> Option<&ResponseHeader> {
192        self.0.header.as_ref().map(From::from)
193    }
194
195    /// Takes the header out of the response, leaving a [`None`] in its place.
196    #[inline]
197    pub fn take_header(&mut self) -> Option<ResponseHeader> {
198        self.0.header.take().map(ResponseHeader::new)
199    }
200
201    /// TTL is the server chosen lease time-to-live in seconds
202    #[inline]
203    pub const fn ttl(&self) -> i64 {
204        self.0.ttl
205    }
206
207    /// ID is the lease ID for the granted lease.
208    #[inline]
209    pub const fn id(&self) -> i64 {
210        self.0.id
211    }
212
213    /// Error message if return error.
214    #[inline]
215    pub fn error(&self) -> &str {
216        &self.0.error
217    }
218}
219
220/// Options for `Revoke` operation.
221#[derive(Debug, Default, Clone)]
222#[repr(transparent)]
223struct LeaseRevokeOptions(PbLeaseRevokeRequest);
224
225impl LeaseRevokeOptions {
226    /// Set id
227    #[inline]
228    fn with_id(mut self, id: i64) -> Self {
229        self.0.id = id;
230        self
231    }
232
233    /// Creates a `LeaseRevokeOptions`.
234    #[inline]
235    pub const fn new() -> Self {
236        Self(PbLeaseRevokeRequest { id: 0 })
237    }
238}
239
240impl From<LeaseRevokeOptions> for PbLeaseRevokeRequest {
241    #[inline]
242    fn from(options: LeaseRevokeOptions) -> Self {
243        options.0
244    }
245}
246
247impl IntoRequest<PbLeaseRevokeRequest> for LeaseRevokeOptions {
248    #[inline]
249    fn into_request(self) -> Request<PbLeaseRevokeRequest> {
250        Request::new(self.into())
251    }
252}
253
254/// Response for `Revoke` operation.
255#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
256#[derive(Debug, Clone)]
257#[repr(transparent)]
258pub struct LeaseRevokeResponse(PbLeaseRevokeResponse);
259
260impl LeaseRevokeResponse {
261    /// Creates a new `LeaseRevokeResponse` from pb lease revoke response.
262    #[inline]
263    const fn new(resp: PbLeaseRevokeResponse) -> Self {
264        Self(resp)
265    }
266
267    /// Get response header.
268    #[inline]
269    pub fn header(&self) -> Option<&ResponseHeader> {
270        self.0.header.as_ref().map(From::from)
271    }
272
273    /// Takes the header out of the response, leaving a [`None`] in its place.
274    #[inline]
275    pub fn take_header(&mut self) -> Option<ResponseHeader> {
276        self.0.header.take().map(ResponseHeader::new)
277    }
278}
279
280/// Options for `KeepAlive` operation.
281#[derive(Debug, Default, Clone)]
282#[repr(transparent)]
283struct LeaseKeepAliveOptions(PbLeaseKeepAliveRequest);
284
285impl LeaseKeepAliveOptions {
286    /// Set id
287    #[inline]
288    fn with_id(mut self, id: i64) -> Self {
289        self.0.id = id;
290        self
291    }
292
293    /// Creates a `LeaseKeepAliveOptions`.
294    #[inline]
295    pub const fn new() -> Self {
296        Self(PbLeaseKeepAliveRequest { id: 0 })
297    }
298}
299
300impl From<LeaseKeepAliveOptions> for PbLeaseKeepAliveRequest {
301    #[inline]
302    fn from(options: LeaseKeepAliveOptions) -> Self {
303        options.0
304    }
305}
306
307impl IntoRequest<PbLeaseKeepAliveRequest> for LeaseKeepAliveOptions {
308    #[inline]
309    fn into_request(self) -> Request<PbLeaseKeepAliveRequest> {
310        Request::new(self.into())
311    }
312}
313
314/// Response for `KeepAlive` operation.
315#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
316#[derive(Debug, Clone)]
317#[repr(transparent)]
318pub struct LeaseKeepAliveResponse(PbLeaseKeepAliveResponse);
319
320impl LeaseKeepAliveResponse {
321    /// Creates a new `LeaseKeepAliveResponse` from pb lease KeepAlive response.
322    #[inline]
323    const fn new(resp: PbLeaseKeepAliveResponse) -> Self {
324        Self(resp)
325    }
326
327    /// Get response header.
328    #[inline]
329    pub fn header(&self) -> Option<&ResponseHeader> {
330        self.0.header.as_ref().map(From::from)
331    }
332
333    /// Takes the header out of the response, leaving a [`None`] in its place.
334    #[inline]
335    pub fn take_header(&mut self) -> Option<ResponseHeader> {
336        self.0.header.take().map(ResponseHeader::new)
337    }
338
339    /// TTL is the new time-to-live for the lease.
340    #[inline]
341    pub const fn ttl(&self) -> i64 {
342        self.0.ttl
343    }
344
345    /// ID is the lease ID for the keep alive request.
346    #[inline]
347    pub const fn id(&self) -> i64 {
348        self.0.id
349    }
350}
351
352/// Options for `TimeToLive` operation.
353#[derive(Debug, Default, Clone)]
354#[repr(transparent)]
355pub struct LeaseTimeToLiveOptions(PbLeaseTimeToLiveRequest);
356
357impl LeaseTimeToLiveOptions {
358    /// ID is the lease ID for the lease.
359    #[inline]
360    const fn with_id(mut self, id: i64) -> Self {
361        self.0.id = id;
362        self
363    }
364
365    /// Keys is true to query all the keys attached to this lease.
366    #[inline]
367    pub const fn with_keys(mut self) -> Self {
368        self.0.keys = true;
369        self
370    }
371
372    /// Creates a `LeaseTimeToLiveOptions`.
373    #[inline]
374    pub const fn new() -> Self {
375        Self(PbLeaseTimeToLiveRequest { id: 0, keys: false })
376    }
377}
378
379impl From<LeaseTimeToLiveOptions> for PbLeaseTimeToLiveRequest {
380    #[inline]
381    fn from(options: LeaseTimeToLiveOptions) -> Self {
382        options.0
383    }
384}
385
386impl IntoRequest<PbLeaseTimeToLiveRequest> for LeaseTimeToLiveOptions {
387    #[inline]
388    fn into_request(self) -> Request<PbLeaseTimeToLiveRequest> {
389        Request::new(self.into())
390    }
391}
392
393/// Response for `TimeToLive` operation.
394#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
395#[derive(Debug, Clone)]
396#[repr(transparent)]
397pub struct LeaseTimeToLiveResponse(PbLeaseTimeToLiveResponse);
398
399impl LeaseTimeToLiveResponse {
400    /// Creates a new `LeaseTimeToLiveResponse` from pb lease TimeToLive response.
401    #[inline]
402    const fn new(resp: PbLeaseTimeToLiveResponse) -> Self {
403        Self(resp)
404    }
405
406    /// Get response header.
407    #[inline]
408    pub fn header(&self) -> Option<&ResponseHeader> {
409        self.0.header.as_ref().map(From::from)
410    }
411
412    /// Takes the header out of the response, leaving a [`None`] in its place.
413    #[inline]
414    pub fn take_header(&mut self) -> Option<ResponseHeader> {
415        self.0.header.take().map(ResponseHeader::new)
416    }
417
418    /// TTL is the remaining TTL in seconds for the lease; the lease will expire in under TTL+1 seconds.
419    #[inline]
420    pub const fn ttl(&self) -> i64 {
421        self.0.ttl
422    }
423
424    /// ID is the lease ID from the keep alive request.
425    #[inline]
426    pub const fn id(&self) -> i64 {
427        self.0.id
428    }
429
430    /// GrantedTTL is the initial granted time in seconds upon lease creation/renewal.
431    #[inline]
432    pub const fn granted_ttl(&self) -> i64 {
433        self.0.granted_ttl
434    }
435
436    /// Keys is the list of keys attached to this lease.
437    #[inline]
438    pub fn keys(&self) -> &[Vec<u8>] {
439        &self.0.keys
440    }
441
442    #[inline]
443    pub(crate) fn strip_keys_prefix(&mut self, prefix: &[u8]) {
444        self.0.keys.iter_mut().for_each(|key| {
445            key.strip_key_prefix(prefix);
446        });
447    }
448}
449
450/// Response for `Leases` operation.
451#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
452#[derive(Debug, Clone)]
453#[repr(transparent)]
454pub struct LeaseLeasesResponse(PbLeaseLeasesResponse);
455
456impl LeaseLeasesResponse {
457    /// Creates a new `LeaseLeasesResponse` from pb lease Leases response.
458    #[inline]
459    const fn new(resp: PbLeaseLeasesResponse) -> Self {
460        Self(resp)
461    }
462
463    /// Get response header.
464    #[inline]
465    pub fn header(&self) -> Option<&ResponseHeader> {
466        self.0.header.as_ref().map(From::from)
467    }
468
469    /// Takes the header out of the response, leaving a [`None`] in its place.
470    #[inline]
471    pub fn take_header(&mut self) -> Option<ResponseHeader> {
472        self.0.header.take().map(ResponseHeader::new)
473    }
474
475    /// Get leases status
476    #[inline]
477    pub fn leases(&self) -> &[LeaseStatus] {
478        unsafe { &*(self.0.leases.as_slice() as *const _ as *const [LeaseStatus]) }
479    }
480}
481
482/// Lease status.
483#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
484#[derive(Debug, Clone, PartialEq)]
485#[repr(transparent)]
486pub struct LeaseStatus(PbLeaseStatus);
487
488impl LeaseStatus {
489    /// Lease id.
490    #[inline]
491    pub const fn id(&self) -> i64 {
492        self.0.id
493    }
494}
495
496impl From<&PbLeaseStatus> for &LeaseStatus {
497    #[inline]
498    fn from(src: &PbLeaseStatus) -> Self {
499        unsafe { &*(src as *const _ as *const LeaseStatus) }
500    }
501}
502
503/// The lease keep alive handle.
504#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
505#[derive(Debug)]
506pub struct LeaseKeeper {
507    id: i64,
508    sender: Sender<PbLeaseKeepAliveRequest>,
509}
510
511impl LeaseKeeper {
512    /// Creates a new `LeaseKeeper`.
513    #[inline]
514    const fn new(id: i64, sender: Sender<PbLeaseKeepAliveRequest>) -> Self {
515        Self { id, sender }
516    }
517
518    /// The lease id which user want to keep alive.
519    #[inline]
520    pub const fn id(&self) -> i64 {
521        self.id
522    }
523
524    /// Sends a keep alive request and receive response
525    #[inline]
526    pub async fn keep_alive(&mut self) -> Result<()> {
527        self.sender
528            .send(LeaseKeepAliveOptions::new().with_id(self.id).into())
529            .await
530            .map_err(|e| Error::LeaseKeepAliveError(e.to_string()))
531    }
532}
533
534/// The lease keep alive response stream.
535#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
536#[derive(Debug)]
537pub struct LeaseKeepAliveStream {
538    stream: Streaming<PbLeaseKeepAliveResponse>,
539}
540
541impl LeaseKeepAliveStream {
542    /// Creates a new `LeaseKeepAliveStream`.
543    #[inline]
544    const fn new(stream: Streaming<PbLeaseKeepAliveResponse>) -> Self {
545        Self { stream }
546    }
547
548    /// Fetches the next message from this stream.
549    #[inline]
550    pub async fn message(&mut self) -> Result<Option<LeaseKeepAliveResponse>> {
551        match self.stream.message().await? {
552            Some(resp) => Ok(Some(LeaseKeepAliveResponse::new(resp))),
553            None => Ok(None),
554        }
555    }
556}
557
558impl Stream for LeaseKeepAliveStream {
559    type Item = Result<LeaseKeepAliveResponse>;
560
561    #[inline]
562    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
563        Pin::new(&mut self.get_mut().stream)
564            .poll_next(cx)
565            .map(|t| match t {
566                Some(Ok(resp)) => Some(Ok(LeaseKeepAliveResponse::new(resp))),
567                Some(Err(e)) => Some(Err(From::from(e))),
568                None => None,
569            })
570    }
571}