etcd_client/rpc/
lease.rs

1//! Etcd Lease RPC.
2
3use crate::error::Result;
4use crate::intercept::InterceptedChannel;
5use crate::rpc::pb::etcdserverpb::lease_client::LeaseClient as PbLeaseClient;
6use crate::rpc::pb::etcdserverpb::{
7    LeaseGrantRequest as PbLeaseGrantRequest, LeaseGrantResponse as PbLeaseGrantResponse,
8    LeaseKeepAliveRequest as PbLeaseKeepAliveRequest,
9    LeaseKeepAliveResponse as PbLeaseKeepAliveResponse, LeaseLeasesRequest as PbLeaseLeasesRequest,
10    LeaseLeasesResponse as PbLeaseLeasesResponse, LeaseRevokeRequest as PbLeaseRevokeRequest,
11    LeaseRevokeResponse as PbLeaseRevokeResponse, LeaseStatus as PbLeaseStatus,
12    LeaseTimeToLiveRequest as PbLeaseTimeToLiveRequest,
13    LeaseTimeToLiveResponse as PbLeaseTimeToLiveResponse,
14};
15use crate::rpc::ResponseHeader;
16use crate::vec::VecExt;
17use crate::Error;
18use std::pin::Pin;
19use std::task::{Context, Poll};
20use tokio::sync::mpsc::{channel, Sender};
21use tokio_stream::wrappers::ReceiverStream;
22use tokio_stream::Stream;
23use tonic::{IntoRequest, Request, Streaming};
24
25/// Client for lease operations.
26#[repr(transparent)]
27#[derive(Clone)]
28pub struct LeaseClient {
29    inner: PbLeaseClient<InterceptedChannel>,
30}
31
32impl LeaseClient {
33    /// Creates a `LeaseClient`.
34    #[inline]
35    pub(crate) fn new(channel: InterceptedChannel) -> Self {
36        let inner = PbLeaseClient::new(channel);
37        Self { inner }
38    }
39
40    /// Creates a lease which expires if the server does not receive a keepAlive
41    /// within a given time to live period. All keys attached to the lease will be expired and
42    /// deleted if the lease expires. Each expired key generates a delete event in the event history.
43    #[inline]
44    pub async fn grant(
45        &mut self,
46        ttl: i64,
47        options: Option<LeaseGrantOptions>,
48    ) -> Result<LeaseGrantResponse> {
49        let resp = self
50            .inner
51            .lease_grant(options.unwrap_or_default().with_ttl(ttl))
52            .await?
53            .into_inner();
54        Ok(LeaseGrantResponse::new(resp))
55    }
56
57    /// Revokes a lease. All keys attached to the lease will expire and be deleted.
58    #[inline]
59    pub async fn revoke(&mut self, id: i64) -> Result<LeaseRevokeResponse> {
60        let resp = self
61            .inner
62            .lease_revoke(LeaseRevokeOptions::new().with_id(id))
63            .await?
64            .into_inner();
65        Ok(LeaseRevokeResponse::new(resp))
66    }
67
68    /// Keeps the lease alive by streaming keep alive requests from the client
69    /// to the server and streaming keep alive responses from the server to the client.
70    #[inline]
71    pub async fn keep_alive(&mut self, id: i64) -> Result<(LeaseKeeper, LeaseKeepAliveStream)> {
72        let (sender, receiver) = channel::<PbLeaseKeepAliveRequest>(100);
73        sender
74            .send(LeaseKeepAliveOptions::new().with_id(id).into())
75            .await
76            .map_err(|e| Error::LeaseKeepAliveError(e.to_string()))?;
77
78        let receiver = ReceiverStream::new(receiver);
79
80        let mut stream = self.inner.lease_keep_alive(receiver).await?.into_inner();
81
82        let id = match stream.message().await? {
83            Some(resp) => {
84                if resp.ttl <= 0 {
85                    return Err(Error::LeaseKeepAliveError("lease not found".to_string()));
86                }
87                resp.id
88            }
89            None => {
90                return Err(Error::WatchError(
91                    "failed to create lease keeper".to_string(),
92                ));
93            }
94        };
95
96        Ok((
97            LeaseKeeper::new(id, sender),
98            LeaseKeepAliveStream::new(stream),
99        ))
100    }
101
102    /// Retrieves lease information.
103    #[inline]
104    pub async fn time_to_live(
105        &mut self,
106        id: i64,
107        options: Option<LeaseTimeToLiveOptions>,
108    ) -> Result<LeaseTimeToLiveResponse> {
109        let resp = self
110            .inner
111            .lease_time_to_live(options.unwrap_or_default().with_id(id))
112            .await?
113            .into_inner();
114        Ok(LeaseTimeToLiveResponse::new(resp))
115    }
116
117    /// Lists all existing leases.
118    #[inline]
119    pub async fn leases(&mut self) -> Result<LeaseLeasesResponse> {
120        let resp = self
121            .inner
122            .lease_leases(PbLeaseLeasesRequest {})
123            .await?
124            .into_inner();
125        Ok(LeaseLeasesResponse::new(resp))
126    }
127}
128
129/// Options for `Grant` operation.
130#[derive(Debug, Default, Clone)]
131#[repr(transparent)]
132pub struct LeaseGrantOptions(PbLeaseGrantRequest);
133
134impl LeaseGrantOptions {
135    /// Set ttl
136    #[inline]
137    const fn with_ttl(mut self, ttl: i64) -> Self {
138        self.0.ttl = ttl;
139        self
140    }
141
142    /// Set id
143    #[inline]
144    pub const fn with_id(mut self, id: i64) -> Self {
145        self.0.id = id;
146        self
147    }
148
149    /// Creates a `LeaseGrantOptions`.
150    #[inline]
151    pub const fn new() -> Self {
152        Self(PbLeaseGrantRequest { ttl: 0, id: 0 })
153    }
154}
155
156impl From<LeaseGrantOptions> for PbLeaseGrantRequest {
157    #[inline]
158    fn from(options: LeaseGrantOptions) -> Self {
159        options.0
160    }
161}
162
163impl IntoRequest<PbLeaseGrantRequest> for LeaseGrantOptions {
164    #[inline]
165    fn into_request(self) -> Request<PbLeaseGrantRequest> {
166        Request::new(self.into())
167    }
168}
169
170/// Response for `Grant` operation.
171#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
172#[derive(Debug, Clone)]
173#[repr(transparent)]
174pub struct LeaseGrantResponse(PbLeaseGrantResponse);
175
176impl LeaseGrantResponse {
177    /// Creates a new `LeaseGrantResponse` from pb lease grant response.
178    #[inline]
179    const fn new(resp: PbLeaseGrantResponse) -> Self {
180        Self(resp)
181    }
182
183    /// Get response header.
184    #[inline]
185    pub fn header(&self) -> Option<&ResponseHeader> {
186        self.0.header.as_ref().map(From::from)
187    }
188
189    /// Takes the header out of the response, leaving a [`None`] in its place.
190    #[inline]
191    pub fn take_header(&mut self) -> Option<ResponseHeader> {
192        self.0.header.take().map(ResponseHeader::new)
193    }
194
195    /// TTL is the server chosen lease time-to-live in seconds
196    #[inline]
197    pub const fn ttl(&self) -> i64 {
198        self.0.ttl
199    }
200
201    /// ID is the lease ID for the granted lease.
202    #[inline]
203    pub const fn id(&self) -> i64 {
204        self.0.id
205    }
206
207    /// Error message if return error.
208    #[inline]
209    pub fn error(&self) -> &str {
210        &self.0.error
211    }
212}
213
214/// Options for `Revoke` operation.
215#[derive(Debug, Default, Clone)]
216#[repr(transparent)]
217struct LeaseRevokeOptions(PbLeaseRevokeRequest);
218
219impl LeaseRevokeOptions {
220    /// Set id
221    #[inline]
222    fn with_id(mut self, id: i64) -> Self {
223        self.0.id = id;
224        self
225    }
226
227    /// Creates a `LeaseRevokeOptions`.
228    #[inline]
229    pub const fn new() -> Self {
230        Self(PbLeaseRevokeRequest { id: 0 })
231    }
232}
233
234impl From<LeaseRevokeOptions> for PbLeaseRevokeRequest {
235    #[inline]
236    fn from(options: LeaseRevokeOptions) -> Self {
237        options.0
238    }
239}
240
241impl IntoRequest<PbLeaseRevokeRequest> for LeaseRevokeOptions {
242    #[inline]
243    fn into_request(self) -> Request<PbLeaseRevokeRequest> {
244        Request::new(self.into())
245    }
246}
247
248/// Response for `Revoke` operation.
249#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
250#[derive(Debug, Clone)]
251#[repr(transparent)]
252pub struct LeaseRevokeResponse(PbLeaseRevokeResponse);
253
254impl LeaseRevokeResponse {
255    /// Creates a new `LeaseRevokeResponse` from pb lease revoke response.
256    #[inline]
257    const fn new(resp: PbLeaseRevokeResponse) -> Self {
258        Self(resp)
259    }
260
261    /// Get response header.
262    #[inline]
263    pub fn header(&self) -> Option<&ResponseHeader> {
264        self.0.header.as_ref().map(From::from)
265    }
266
267    /// Takes the header out of the response, leaving a [`None`] in its place.
268    #[inline]
269    pub fn take_header(&mut self) -> Option<ResponseHeader> {
270        self.0.header.take().map(ResponseHeader::new)
271    }
272}
273
274/// Options for `KeepAlive` operation.
275#[derive(Debug, Default, Clone)]
276#[repr(transparent)]
277struct LeaseKeepAliveOptions(PbLeaseKeepAliveRequest);
278
279impl LeaseKeepAliveOptions {
280    /// Set id
281    #[inline]
282    fn with_id(mut self, id: i64) -> Self {
283        self.0.id = id;
284        self
285    }
286
287    /// Creates a `LeaseKeepAliveOptions`.
288    #[inline]
289    pub const fn new() -> Self {
290        Self(PbLeaseKeepAliveRequest { id: 0 })
291    }
292}
293
294impl From<LeaseKeepAliveOptions> for PbLeaseKeepAliveRequest {
295    #[inline]
296    fn from(options: LeaseKeepAliveOptions) -> Self {
297        options.0
298    }
299}
300
301impl IntoRequest<PbLeaseKeepAliveRequest> for LeaseKeepAliveOptions {
302    #[inline]
303    fn into_request(self) -> Request<PbLeaseKeepAliveRequest> {
304        Request::new(self.into())
305    }
306}
307
308/// Response for `KeepAlive` operation.
309#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
310#[derive(Debug, Clone)]
311#[repr(transparent)]
312pub struct LeaseKeepAliveResponse(PbLeaseKeepAliveResponse);
313
314impl LeaseKeepAliveResponse {
315    /// Creates a new `LeaseKeepAliveResponse` from pb lease KeepAlive response.
316    #[inline]
317    const fn new(resp: PbLeaseKeepAliveResponse) -> Self {
318        Self(resp)
319    }
320
321    /// Get response header.
322    #[inline]
323    pub fn header(&self) -> Option<&ResponseHeader> {
324        self.0.header.as_ref().map(From::from)
325    }
326
327    /// Takes the header out of the response, leaving a [`None`] in its place.
328    #[inline]
329    pub fn take_header(&mut self) -> Option<ResponseHeader> {
330        self.0.header.take().map(ResponseHeader::new)
331    }
332
333    /// TTL is the new time-to-live for the lease.
334    #[inline]
335    pub const fn ttl(&self) -> i64 {
336        self.0.ttl
337    }
338
339    /// ID is the lease ID for the keep alive request.
340    #[inline]
341    pub const fn id(&self) -> i64 {
342        self.0.id
343    }
344}
345
346/// Options for `TimeToLive` operation.
347#[derive(Debug, Default, Clone)]
348#[repr(transparent)]
349pub struct LeaseTimeToLiveOptions(PbLeaseTimeToLiveRequest);
350
351impl LeaseTimeToLiveOptions {
352    /// ID is the lease ID for the lease.
353    #[inline]
354    const fn with_id(mut self, id: i64) -> Self {
355        self.0.id = id;
356        self
357    }
358
359    /// Keys is true to query all the keys attached to this lease.
360    #[inline]
361    pub const fn with_keys(mut self) -> Self {
362        self.0.keys = true;
363        self
364    }
365
366    /// Creates a `LeaseTimeToLiveOptions`.
367    #[inline]
368    pub const fn new() -> Self {
369        Self(PbLeaseTimeToLiveRequest { id: 0, keys: false })
370    }
371}
372
373impl From<LeaseTimeToLiveOptions> for PbLeaseTimeToLiveRequest {
374    #[inline]
375    fn from(options: LeaseTimeToLiveOptions) -> Self {
376        options.0
377    }
378}
379
380impl IntoRequest<PbLeaseTimeToLiveRequest> for LeaseTimeToLiveOptions {
381    #[inline]
382    fn into_request(self) -> Request<PbLeaseTimeToLiveRequest> {
383        Request::new(self.into())
384    }
385}
386
387/// Response for `TimeToLive` operation.
388#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
389#[derive(Debug, Clone)]
390#[repr(transparent)]
391pub struct LeaseTimeToLiveResponse(PbLeaseTimeToLiveResponse);
392
393impl LeaseTimeToLiveResponse {
394    /// Creates a new `LeaseTimeToLiveResponse` from pb lease TimeToLive response.
395    #[inline]
396    const fn new(resp: PbLeaseTimeToLiveResponse) -> Self {
397        Self(resp)
398    }
399
400    /// Get response header.
401    #[inline]
402    pub fn header(&self) -> Option<&ResponseHeader> {
403        self.0.header.as_ref().map(From::from)
404    }
405
406    /// Takes the header out of the response, leaving a [`None`] in its place.
407    #[inline]
408    pub fn take_header(&mut self) -> Option<ResponseHeader> {
409        self.0.header.take().map(ResponseHeader::new)
410    }
411
412    /// TTL is the remaining TTL in seconds for the lease; the lease will expire in under TTL+1 seconds.
413    #[inline]
414    pub const fn ttl(&self) -> i64 {
415        self.0.ttl
416    }
417
418    /// ID is the lease ID from the keep alive request.
419    #[inline]
420    pub const fn id(&self) -> i64 {
421        self.0.id
422    }
423
424    /// GrantedTTL is the initial granted time in seconds upon lease creation/renewal.
425    #[inline]
426    pub const fn granted_ttl(&self) -> i64 {
427        self.0.granted_ttl
428    }
429
430    /// Keys is the list of keys attached to this lease.
431    #[inline]
432    pub fn keys(&self) -> &[Vec<u8>] {
433        &self.0.keys
434    }
435
436    #[inline]
437    pub(crate) fn strip_keys_prefix(&mut self, prefix: &[u8]) {
438        self.0.keys.iter_mut().for_each(|key| {
439            key.strip_key_prefix(prefix);
440        });
441    }
442}
443
444/// Response for `Leases` operation.
445#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
446#[derive(Debug, Clone)]
447#[repr(transparent)]
448pub struct LeaseLeasesResponse(PbLeaseLeasesResponse);
449
450impl LeaseLeasesResponse {
451    /// Creates a new `LeaseLeasesResponse` from pb lease Leases response.
452    #[inline]
453    const fn new(resp: PbLeaseLeasesResponse) -> Self {
454        Self(resp)
455    }
456
457    /// Get response header.
458    #[inline]
459    pub fn header(&self) -> Option<&ResponseHeader> {
460        self.0.header.as_ref().map(From::from)
461    }
462
463    /// Takes the header out of the response, leaving a [`None`] in its place.
464    #[inline]
465    pub fn take_header(&mut self) -> Option<ResponseHeader> {
466        self.0.header.take().map(ResponseHeader::new)
467    }
468
469    /// Get leases status
470    #[inline]
471    pub fn leases(&self) -> &[LeaseStatus] {
472        unsafe { &*(self.0.leases.as_slice() as *const _ as *const [LeaseStatus]) }
473    }
474}
475
476/// Lease status.
477#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
478#[derive(Debug, Clone, PartialEq)]
479#[repr(transparent)]
480pub struct LeaseStatus(PbLeaseStatus);
481
482impl LeaseStatus {
483    /// Lease id.
484    #[inline]
485    pub const fn id(&self) -> i64 {
486        self.0.id
487    }
488}
489
490impl From<&PbLeaseStatus> for &LeaseStatus {
491    #[inline]
492    fn from(src: &PbLeaseStatus) -> Self {
493        unsafe { &*(src as *const _ as *const LeaseStatus) }
494    }
495}
496
497/// The lease keep alive handle.
498#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
499#[derive(Debug)]
500pub struct LeaseKeeper {
501    id: i64,
502    sender: Sender<PbLeaseKeepAliveRequest>,
503}
504
505impl LeaseKeeper {
506    /// Creates a new `LeaseKeeper`.
507    #[inline]
508    const fn new(id: i64, sender: Sender<PbLeaseKeepAliveRequest>) -> Self {
509        Self { id, sender }
510    }
511
512    /// The lease id which user want to keep alive.
513    #[inline]
514    pub const fn id(&self) -> i64 {
515        self.id
516    }
517
518    /// Sends a keep alive request and receive response
519    #[inline]
520    pub async fn keep_alive(&mut self) -> Result<()> {
521        self.sender
522            .send(LeaseKeepAliveOptions::new().with_id(self.id).into())
523            .await
524            .map_err(|e| Error::LeaseKeepAliveError(e.to_string()))
525    }
526}
527
528/// The lease keep alive response stream.
529#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
530#[derive(Debug)]
531pub struct LeaseKeepAliveStream {
532    stream: Streaming<PbLeaseKeepAliveResponse>,
533}
534
535impl LeaseKeepAliveStream {
536    /// Creates a new `LeaseKeepAliveStream`.
537    #[inline]
538    const fn new(stream: Streaming<PbLeaseKeepAliveResponse>) -> Self {
539        Self { stream }
540    }
541
542    /// Fetches the next message from this stream.
543    #[inline]
544    pub async fn message(&mut self) -> Result<Option<LeaseKeepAliveResponse>> {
545        match self.stream.message().await? {
546            Some(resp) => Ok(Some(LeaseKeepAliveResponse::new(resp))),
547            None => Ok(None),
548        }
549    }
550}
551
552impl Stream for LeaseKeepAliveStream {
553    type Item = Result<LeaseKeepAliveResponse>;
554
555    #[inline]
556    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
557        Pin::new(&mut self.get_mut().stream)
558            .poll_next(cx)
559            .map(|t| match t {
560                Some(Ok(resp)) => Some(Ok(LeaseKeepAliveResponse::new(resp))),
561                Some(Err(e)) => Some(Err(From::from(e))),
562                None => None,
563            })
564    }
565}