1use crate::error::Result;
4use crate::intercept::InterceptedChannel;
5use crate::rpc::pb::etcdserverpb::lease_client::LeaseClient as PbLeaseClient;
6use crate::rpc::pb::etcdserverpb::{
7 LeaseGrantRequest as PbLeaseGrantRequest, LeaseGrantResponse as PbLeaseGrantResponse,
8 LeaseKeepAliveRequest as PbLeaseKeepAliveRequest,
9 LeaseKeepAliveResponse as PbLeaseKeepAliveResponse, LeaseLeasesRequest as PbLeaseLeasesRequest,
10 LeaseLeasesResponse as PbLeaseLeasesResponse, LeaseRevokeRequest as PbLeaseRevokeRequest,
11 LeaseRevokeResponse as PbLeaseRevokeResponse, LeaseStatus as PbLeaseStatus,
12 LeaseTimeToLiveRequest as PbLeaseTimeToLiveRequest,
13 LeaseTimeToLiveResponse as PbLeaseTimeToLiveResponse,
14};
15use crate::rpc::ResponseHeader;
16use crate::vec::VecExt;
17use crate::Error;
18use std::pin::Pin;
19use std::task::{Context, Poll};
20use tokio::sync::mpsc::{channel, Sender};
21use tokio_stream::wrappers::ReceiverStream;
22use tokio_stream::Stream;
23use tonic::{IntoRequest, Request, Streaming};
24
25#[repr(transparent)]
27#[derive(Clone)]
28pub struct LeaseClient {
29 inner: PbLeaseClient<InterceptedChannel>,
30}
31
32impl LeaseClient {
33 #[inline]
35 pub(crate) fn new(channel: InterceptedChannel) -> Self {
36 let inner = PbLeaseClient::new(channel);
37 Self { inner }
38 }
39
40 #[inline]
44 pub async fn grant(
45 &mut self,
46 ttl: i64,
47 options: Option<LeaseGrantOptions>,
48 ) -> Result<LeaseGrantResponse> {
49 let resp = self
50 .inner
51 .lease_grant(options.unwrap_or_default().with_ttl(ttl))
52 .await?
53 .into_inner();
54 Ok(LeaseGrantResponse::new(resp))
55 }
56
57 #[inline]
59 pub async fn revoke(&mut self, id: i64) -> Result<LeaseRevokeResponse> {
60 let resp = self
61 .inner
62 .lease_revoke(LeaseRevokeOptions::new().with_id(id))
63 .await?
64 .into_inner();
65 Ok(LeaseRevokeResponse::new(resp))
66 }
67
68 #[inline]
71 pub async fn keep_alive(&mut self, id: i64) -> Result<(LeaseKeeper, LeaseKeepAliveStream)> {
72 let (sender, receiver) = channel::<PbLeaseKeepAliveRequest>(100);
73 sender
74 .send(LeaseKeepAliveOptions::new().with_id(id).into())
75 .await
76 .map_err(|e| Error::LeaseKeepAliveError(e.to_string()))?;
77
78 let receiver = ReceiverStream::new(receiver);
79
80 let mut stream = self.inner.lease_keep_alive(receiver).await?.into_inner();
81
82 let id = match stream.message().await? {
83 Some(resp) => {
84 if resp.ttl <= 0 {
85 return Err(Error::LeaseKeepAliveError("lease not found".to_string()));
86 }
87 resp.id
88 }
89 None => {
90 return Err(Error::WatchError(
91 "failed to create lease keeper".to_string(),
92 ));
93 }
94 };
95
96 Ok((
97 LeaseKeeper::new(id, sender),
98 LeaseKeepAliveStream::new(stream),
99 ))
100 }
101
102 #[inline]
104 pub async fn time_to_live(
105 &mut self,
106 id: i64,
107 options: Option<LeaseTimeToLiveOptions>,
108 ) -> Result<LeaseTimeToLiveResponse> {
109 let resp = self
110 .inner
111 .lease_time_to_live(options.unwrap_or_default().with_id(id))
112 .await?
113 .into_inner();
114 Ok(LeaseTimeToLiveResponse::new(resp))
115 }
116
117 #[inline]
119 pub async fn leases(&mut self) -> Result<LeaseLeasesResponse> {
120 let resp = self
121 .inner
122 .lease_leases(PbLeaseLeasesRequest {})
123 .await?
124 .into_inner();
125 Ok(LeaseLeasesResponse::new(resp))
126 }
127}
128
129#[derive(Debug, Default, Clone)]
131#[repr(transparent)]
132pub struct LeaseGrantOptions(PbLeaseGrantRequest);
133
134impl LeaseGrantOptions {
135 #[inline]
137 const fn with_ttl(mut self, ttl: i64) -> Self {
138 self.0.ttl = ttl;
139 self
140 }
141
142 #[inline]
144 pub const fn with_id(mut self, id: i64) -> Self {
145 self.0.id = id;
146 self
147 }
148
149 #[inline]
151 pub const fn new() -> Self {
152 Self(PbLeaseGrantRequest { ttl: 0, id: 0 })
153 }
154}
155
156impl From<LeaseGrantOptions> for PbLeaseGrantRequest {
157 #[inline]
158 fn from(options: LeaseGrantOptions) -> Self {
159 options.0
160 }
161}
162
163impl IntoRequest<PbLeaseGrantRequest> for LeaseGrantOptions {
164 #[inline]
165 fn into_request(self) -> Request<PbLeaseGrantRequest> {
166 Request::new(self.into())
167 }
168}
169
170#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
172#[derive(Debug, Clone)]
173#[repr(transparent)]
174pub struct LeaseGrantResponse(PbLeaseGrantResponse);
175
176impl LeaseGrantResponse {
177 #[inline]
179 const fn new(resp: PbLeaseGrantResponse) -> Self {
180 Self(resp)
181 }
182
183 #[inline]
185 pub fn header(&self) -> Option<&ResponseHeader> {
186 self.0.header.as_ref().map(From::from)
187 }
188
189 #[inline]
191 pub fn take_header(&mut self) -> Option<ResponseHeader> {
192 self.0.header.take().map(ResponseHeader::new)
193 }
194
195 #[inline]
197 pub const fn ttl(&self) -> i64 {
198 self.0.ttl
199 }
200
201 #[inline]
203 pub const fn id(&self) -> i64 {
204 self.0.id
205 }
206
207 #[inline]
209 pub fn error(&self) -> &str {
210 &self.0.error
211 }
212}
213
214#[derive(Debug, Default, Clone)]
216#[repr(transparent)]
217struct LeaseRevokeOptions(PbLeaseRevokeRequest);
218
219impl LeaseRevokeOptions {
220 #[inline]
222 fn with_id(mut self, id: i64) -> Self {
223 self.0.id = id;
224 self
225 }
226
227 #[inline]
229 pub const fn new() -> Self {
230 Self(PbLeaseRevokeRequest { id: 0 })
231 }
232}
233
234impl From<LeaseRevokeOptions> for PbLeaseRevokeRequest {
235 #[inline]
236 fn from(options: LeaseRevokeOptions) -> Self {
237 options.0
238 }
239}
240
241impl IntoRequest<PbLeaseRevokeRequest> for LeaseRevokeOptions {
242 #[inline]
243 fn into_request(self) -> Request<PbLeaseRevokeRequest> {
244 Request::new(self.into())
245 }
246}
247
248#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
250#[derive(Debug, Clone)]
251#[repr(transparent)]
252pub struct LeaseRevokeResponse(PbLeaseRevokeResponse);
253
254impl LeaseRevokeResponse {
255 #[inline]
257 const fn new(resp: PbLeaseRevokeResponse) -> Self {
258 Self(resp)
259 }
260
261 #[inline]
263 pub fn header(&self) -> Option<&ResponseHeader> {
264 self.0.header.as_ref().map(From::from)
265 }
266
267 #[inline]
269 pub fn take_header(&mut self) -> Option<ResponseHeader> {
270 self.0.header.take().map(ResponseHeader::new)
271 }
272}
273
274#[derive(Debug, Default, Clone)]
276#[repr(transparent)]
277struct LeaseKeepAliveOptions(PbLeaseKeepAliveRequest);
278
279impl LeaseKeepAliveOptions {
280 #[inline]
282 fn with_id(mut self, id: i64) -> Self {
283 self.0.id = id;
284 self
285 }
286
287 #[inline]
289 pub const fn new() -> Self {
290 Self(PbLeaseKeepAliveRequest { id: 0 })
291 }
292}
293
294impl From<LeaseKeepAliveOptions> for PbLeaseKeepAliveRequest {
295 #[inline]
296 fn from(options: LeaseKeepAliveOptions) -> Self {
297 options.0
298 }
299}
300
301impl IntoRequest<PbLeaseKeepAliveRequest> for LeaseKeepAliveOptions {
302 #[inline]
303 fn into_request(self) -> Request<PbLeaseKeepAliveRequest> {
304 Request::new(self.into())
305 }
306}
307
308#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
310#[derive(Debug, Clone)]
311#[repr(transparent)]
312pub struct LeaseKeepAliveResponse(PbLeaseKeepAliveResponse);
313
314impl LeaseKeepAliveResponse {
315 #[inline]
317 const fn new(resp: PbLeaseKeepAliveResponse) -> Self {
318 Self(resp)
319 }
320
321 #[inline]
323 pub fn header(&self) -> Option<&ResponseHeader> {
324 self.0.header.as_ref().map(From::from)
325 }
326
327 #[inline]
329 pub fn take_header(&mut self) -> Option<ResponseHeader> {
330 self.0.header.take().map(ResponseHeader::new)
331 }
332
333 #[inline]
335 pub const fn ttl(&self) -> i64 {
336 self.0.ttl
337 }
338
339 #[inline]
341 pub const fn id(&self) -> i64 {
342 self.0.id
343 }
344}
345
346#[derive(Debug, Default, Clone)]
348#[repr(transparent)]
349pub struct LeaseTimeToLiveOptions(PbLeaseTimeToLiveRequest);
350
351impl LeaseTimeToLiveOptions {
352 #[inline]
354 const fn with_id(mut self, id: i64) -> Self {
355 self.0.id = id;
356 self
357 }
358
359 #[inline]
361 pub const fn with_keys(mut self) -> Self {
362 self.0.keys = true;
363 self
364 }
365
366 #[inline]
368 pub const fn new() -> Self {
369 Self(PbLeaseTimeToLiveRequest { id: 0, keys: false })
370 }
371}
372
373impl From<LeaseTimeToLiveOptions> for PbLeaseTimeToLiveRequest {
374 #[inline]
375 fn from(options: LeaseTimeToLiveOptions) -> Self {
376 options.0
377 }
378}
379
380impl IntoRequest<PbLeaseTimeToLiveRequest> for LeaseTimeToLiveOptions {
381 #[inline]
382 fn into_request(self) -> Request<PbLeaseTimeToLiveRequest> {
383 Request::new(self.into())
384 }
385}
386
387#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
389#[derive(Debug, Clone)]
390#[repr(transparent)]
391pub struct LeaseTimeToLiveResponse(PbLeaseTimeToLiveResponse);
392
393impl LeaseTimeToLiveResponse {
394 #[inline]
396 const fn new(resp: PbLeaseTimeToLiveResponse) -> Self {
397 Self(resp)
398 }
399
400 #[inline]
402 pub fn header(&self) -> Option<&ResponseHeader> {
403 self.0.header.as_ref().map(From::from)
404 }
405
406 #[inline]
408 pub fn take_header(&mut self) -> Option<ResponseHeader> {
409 self.0.header.take().map(ResponseHeader::new)
410 }
411
412 #[inline]
414 pub const fn ttl(&self) -> i64 {
415 self.0.ttl
416 }
417
418 #[inline]
420 pub const fn id(&self) -> i64 {
421 self.0.id
422 }
423
424 #[inline]
426 pub const fn granted_ttl(&self) -> i64 {
427 self.0.granted_ttl
428 }
429
430 #[inline]
432 pub fn keys(&self) -> &[Vec<u8>] {
433 &self.0.keys
434 }
435
436 #[inline]
437 pub(crate) fn strip_keys_prefix(&mut self, prefix: &[u8]) {
438 self.0.keys.iter_mut().for_each(|key| {
439 key.strip_key_prefix(prefix);
440 });
441 }
442}
443
444#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
446#[derive(Debug, Clone)]
447#[repr(transparent)]
448pub struct LeaseLeasesResponse(PbLeaseLeasesResponse);
449
450impl LeaseLeasesResponse {
451 #[inline]
453 const fn new(resp: PbLeaseLeasesResponse) -> Self {
454 Self(resp)
455 }
456
457 #[inline]
459 pub fn header(&self) -> Option<&ResponseHeader> {
460 self.0.header.as_ref().map(From::from)
461 }
462
463 #[inline]
465 pub fn take_header(&mut self) -> Option<ResponseHeader> {
466 self.0.header.take().map(ResponseHeader::new)
467 }
468
469 #[inline]
471 pub fn leases(&self) -> &[LeaseStatus] {
472 unsafe { &*(self.0.leases.as_slice() as *const _ as *const [LeaseStatus]) }
473 }
474}
475
476#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
478#[derive(Debug, Clone, PartialEq)]
479#[repr(transparent)]
480pub struct LeaseStatus(PbLeaseStatus);
481
482impl LeaseStatus {
483 #[inline]
485 pub const fn id(&self) -> i64 {
486 self.0.id
487 }
488}
489
490impl From<&PbLeaseStatus> for &LeaseStatus {
491 #[inline]
492 fn from(src: &PbLeaseStatus) -> Self {
493 unsafe { &*(src as *const _ as *const LeaseStatus) }
494 }
495}
496
497#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
499#[derive(Debug)]
500pub struct LeaseKeeper {
501 id: i64,
502 sender: Sender<PbLeaseKeepAliveRequest>,
503}
504
505impl LeaseKeeper {
506 #[inline]
508 const fn new(id: i64, sender: Sender<PbLeaseKeepAliveRequest>) -> Self {
509 Self { id, sender }
510 }
511
512 #[inline]
514 pub const fn id(&self) -> i64 {
515 self.id
516 }
517
518 #[inline]
520 pub async fn keep_alive(&mut self) -> Result<()> {
521 self.sender
522 .send(LeaseKeepAliveOptions::new().with_id(self.id).into())
523 .await
524 .map_err(|e| Error::LeaseKeepAliveError(e.to_string()))
525 }
526}
527
528#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
530#[derive(Debug)]
531pub struct LeaseKeepAliveStream {
532 stream: Streaming<PbLeaseKeepAliveResponse>,
533}
534
535impl LeaseKeepAliveStream {
536 #[inline]
538 const fn new(stream: Streaming<PbLeaseKeepAliveResponse>) -> Self {
539 Self { stream }
540 }
541
542 #[inline]
544 pub async fn message(&mut self) -> Result<Option<LeaseKeepAliveResponse>> {
545 match self.stream.message().await? {
546 Some(resp) => Ok(Some(LeaseKeepAliveResponse::new(resp))),
547 None => Ok(None),
548 }
549 }
550}
551
552impl Stream for LeaseKeepAliveStream {
553 type Item = Result<LeaseKeepAliveResponse>;
554
555 #[inline]
556 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
557 Pin::new(&mut self.get_mut().stream)
558 .poll_next(cx)
559 .map(|t| match t {
560 Some(Ok(resp)) => Some(Ok(LeaseKeepAliveResponse::new(resp))),
561 Some(Err(e)) => Some(Err(From::from(e))),
562 None => None,
563 })
564 }
565}