1use crate::auth::AuthService;
4use crate::channel::Channel;
5use crate::error::Result;
6use crate::rpc::pb::etcdserverpb::lease_client::LeaseClient as PbLeaseClient;
7use crate::rpc::pb::etcdserverpb::{
8 LeaseGrantRequest as PbLeaseGrantRequest, LeaseGrantResponse as PbLeaseGrantResponse,
9 LeaseKeepAliveRequest as PbLeaseKeepAliveRequest,
10 LeaseKeepAliveResponse as PbLeaseKeepAliveResponse, LeaseLeasesRequest as PbLeaseLeasesRequest,
11 LeaseLeasesResponse as PbLeaseLeasesResponse, LeaseRevokeRequest as PbLeaseRevokeRequest,
12 LeaseRevokeResponse as PbLeaseRevokeResponse, LeaseStatus as PbLeaseStatus,
13 LeaseTimeToLiveRequest as PbLeaseTimeToLiveRequest,
14 LeaseTimeToLiveResponse as PbLeaseTimeToLiveResponse,
15};
16use crate::rpc::ResponseHeader;
17use crate::vec::VecExt;
18use crate::Error;
19use http::HeaderValue;
20use std::pin::Pin;
21use std::sync::{Arc, RwLock};
22use std::task::{Context, Poll};
23use tokio::sync::mpsc::{channel, Sender};
24use tokio_stream::wrappers::ReceiverStream;
25use tokio_stream::Stream;
26use tonic::{IntoRequest, Request, Streaming};
27
28#[repr(transparent)]
30#[derive(Clone)]
31pub struct LeaseClient {
32 inner: PbLeaseClient<AuthService<Channel>>,
33}
34
35impl LeaseClient {
36 #[inline]
38 pub(crate) fn new(channel: Channel, auth_token: Arc<RwLock<Option<HeaderValue>>>) -> Self {
39 let inner = PbLeaseClient::new(AuthService::new(channel, auth_token));
40 Self { inner }
41 }
42
43 #[inline]
47 pub async fn grant(
48 &mut self,
49 ttl: i64,
50 options: Option<LeaseGrantOptions>,
51 ) -> Result<LeaseGrantResponse> {
52 let resp = self
53 .inner
54 .lease_grant(options.unwrap_or_default().with_ttl(ttl))
55 .await?
56 .into_inner();
57 Ok(LeaseGrantResponse::new(resp))
58 }
59
60 #[inline]
62 pub async fn revoke(&mut self, id: i64) -> Result<LeaseRevokeResponse> {
63 let resp = self
64 .inner
65 .lease_revoke(LeaseRevokeOptions::new().with_id(id))
66 .await?
67 .into_inner();
68 Ok(LeaseRevokeResponse::new(resp))
69 }
70
71 #[inline]
74 pub async fn keep_alive(&mut self, id: i64) -> Result<(LeaseKeeper, LeaseKeepAliveStream)> {
75 let (sender, receiver) = channel::<PbLeaseKeepAliveRequest>(100);
76 sender
77 .send(LeaseKeepAliveOptions::new().with_id(id).into())
78 .await
79 .map_err(|e| Error::LeaseKeepAliveError(e.to_string()))?;
80
81 let receiver = ReceiverStream::new(receiver);
82
83 let mut stream = self.inner.lease_keep_alive(receiver).await?.into_inner();
84
85 let id = match stream.message().await? {
86 Some(resp) => {
87 if resp.ttl <= 0 {
88 return Err(Error::LeaseKeepAliveError("lease not found".to_string()));
89 }
90 resp.id
91 }
92 None => {
93 return Err(Error::WatchError(
94 "failed to create lease keeper".to_string(),
95 ));
96 }
97 };
98
99 Ok((
100 LeaseKeeper::new(id, sender),
101 LeaseKeepAliveStream::new(stream),
102 ))
103 }
104
105 #[inline]
107 pub async fn time_to_live(
108 &mut self,
109 id: i64,
110 options: Option<LeaseTimeToLiveOptions>,
111 ) -> Result<LeaseTimeToLiveResponse> {
112 let resp = self
113 .inner
114 .lease_time_to_live(options.unwrap_or_default().with_id(id))
115 .await?
116 .into_inner();
117 Ok(LeaseTimeToLiveResponse::new(resp))
118 }
119
120 #[inline]
122 pub async fn leases(&mut self) -> Result<LeaseLeasesResponse> {
123 let resp = self
124 .inner
125 .lease_leases(PbLeaseLeasesRequest {})
126 .await?
127 .into_inner();
128 Ok(LeaseLeasesResponse::new(resp))
129 }
130}
131
132#[derive(Debug, Default, Clone)]
134#[repr(transparent)]
135pub struct LeaseGrantOptions(PbLeaseGrantRequest);
136
137impl LeaseGrantOptions {
138 #[inline]
140 const fn with_ttl(mut self, ttl: i64) -> Self {
141 self.0.ttl = ttl;
142 self
143 }
144
145 #[inline]
147 pub const fn with_id(mut self, id: i64) -> Self {
148 self.0.id = id;
149 self
150 }
151
152 #[inline]
154 pub const fn new() -> Self {
155 Self(PbLeaseGrantRequest { ttl: 0, id: 0 })
156 }
157}
158
159impl From<LeaseGrantOptions> for PbLeaseGrantRequest {
160 #[inline]
161 fn from(options: LeaseGrantOptions) -> Self {
162 options.0
163 }
164}
165
166impl IntoRequest<PbLeaseGrantRequest> for LeaseGrantOptions {
167 #[inline]
168 fn into_request(self) -> Request<PbLeaseGrantRequest> {
169 Request::new(self.into())
170 }
171}
172
173#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
175#[derive(Debug, Clone)]
176#[repr(transparent)]
177pub struct LeaseGrantResponse(PbLeaseGrantResponse);
178
179impl LeaseGrantResponse {
180 #[inline]
182 const fn new(resp: PbLeaseGrantResponse) -> Self {
183 Self(resp)
184 }
185
186 #[inline]
188 pub fn header(&self) -> Option<&ResponseHeader> {
189 self.0.header.as_ref().map(From::from)
190 }
191
192 #[inline]
194 pub fn take_header(&mut self) -> Option<ResponseHeader> {
195 self.0.header.take().map(ResponseHeader::new)
196 }
197
198 #[inline]
200 pub const fn ttl(&self) -> i64 {
201 self.0.ttl
202 }
203
204 #[inline]
206 pub const fn id(&self) -> i64 {
207 self.0.id
208 }
209
210 #[inline]
212 pub fn error(&self) -> &str {
213 &self.0.error
214 }
215}
216
217#[derive(Debug, Default, Clone)]
219#[repr(transparent)]
220struct LeaseRevokeOptions(PbLeaseRevokeRequest);
221
222impl LeaseRevokeOptions {
223 #[inline]
225 fn with_id(mut self, id: i64) -> Self {
226 self.0.id = id;
227 self
228 }
229
230 #[inline]
232 pub const fn new() -> Self {
233 Self(PbLeaseRevokeRequest { id: 0 })
234 }
235}
236
237impl From<LeaseRevokeOptions> for PbLeaseRevokeRequest {
238 #[inline]
239 fn from(options: LeaseRevokeOptions) -> Self {
240 options.0
241 }
242}
243
244impl IntoRequest<PbLeaseRevokeRequest> for LeaseRevokeOptions {
245 #[inline]
246 fn into_request(self) -> Request<PbLeaseRevokeRequest> {
247 Request::new(self.into())
248 }
249}
250
251#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
253#[derive(Debug, Clone)]
254#[repr(transparent)]
255pub struct LeaseRevokeResponse(PbLeaseRevokeResponse);
256
257impl LeaseRevokeResponse {
258 #[inline]
260 const fn new(resp: PbLeaseRevokeResponse) -> Self {
261 Self(resp)
262 }
263
264 #[inline]
266 pub fn header(&self) -> Option<&ResponseHeader> {
267 self.0.header.as_ref().map(From::from)
268 }
269
270 #[inline]
272 pub fn take_header(&mut self) -> Option<ResponseHeader> {
273 self.0.header.take().map(ResponseHeader::new)
274 }
275}
276
277#[derive(Debug, Default, Clone)]
279#[repr(transparent)]
280struct LeaseKeepAliveOptions(PbLeaseKeepAliveRequest);
281
282impl LeaseKeepAliveOptions {
283 #[inline]
285 fn with_id(mut self, id: i64) -> Self {
286 self.0.id = id;
287 self
288 }
289
290 #[inline]
292 pub const fn new() -> Self {
293 Self(PbLeaseKeepAliveRequest { id: 0 })
294 }
295}
296
297impl From<LeaseKeepAliveOptions> for PbLeaseKeepAliveRequest {
298 #[inline]
299 fn from(options: LeaseKeepAliveOptions) -> Self {
300 options.0
301 }
302}
303
304impl IntoRequest<PbLeaseKeepAliveRequest> for LeaseKeepAliveOptions {
305 #[inline]
306 fn into_request(self) -> Request<PbLeaseKeepAliveRequest> {
307 Request::new(self.into())
308 }
309}
310
311#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
313#[derive(Debug, Clone)]
314#[repr(transparent)]
315pub struct LeaseKeepAliveResponse(PbLeaseKeepAliveResponse);
316
317impl LeaseKeepAliveResponse {
318 #[inline]
320 const fn new(resp: PbLeaseKeepAliveResponse) -> Self {
321 Self(resp)
322 }
323
324 #[inline]
326 pub fn header(&self) -> Option<&ResponseHeader> {
327 self.0.header.as_ref().map(From::from)
328 }
329
330 #[inline]
332 pub fn take_header(&mut self) -> Option<ResponseHeader> {
333 self.0.header.take().map(ResponseHeader::new)
334 }
335
336 #[inline]
338 pub const fn ttl(&self) -> i64 {
339 self.0.ttl
340 }
341
342 #[inline]
344 pub const fn id(&self) -> i64 {
345 self.0.id
346 }
347}
348
349#[derive(Debug, Default, Clone)]
351#[repr(transparent)]
352pub struct LeaseTimeToLiveOptions(PbLeaseTimeToLiveRequest);
353
354impl LeaseTimeToLiveOptions {
355 #[inline]
357 const fn with_id(mut self, id: i64) -> Self {
358 self.0.id = id;
359 self
360 }
361
362 #[inline]
364 pub const fn with_keys(mut self) -> Self {
365 self.0.keys = true;
366 self
367 }
368
369 #[inline]
371 pub const fn new() -> Self {
372 Self(PbLeaseTimeToLiveRequest { id: 0, keys: false })
373 }
374}
375
376impl From<LeaseTimeToLiveOptions> for PbLeaseTimeToLiveRequest {
377 #[inline]
378 fn from(options: LeaseTimeToLiveOptions) -> Self {
379 options.0
380 }
381}
382
383impl IntoRequest<PbLeaseTimeToLiveRequest> for LeaseTimeToLiveOptions {
384 #[inline]
385 fn into_request(self) -> Request<PbLeaseTimeToLiveRequest> {
386 Request::new(self.into())
387 }
388}
389
390#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
392#[derive(Debug, Clone)]
393#[repr(transparent)]
394pub struct LeaseTimeToLiveResponse(PbLeaseTimeToLiveResponse);
395
396impl LeaseTimeToLiveResponse {
397 #[inline]
399 const fn new(resp: PbLeaseTimeToLiveResponse) -> Self {
400 Self(resp)
401 }
402
403 #[inline]
405 pub fn header(&self) -> Option<&ResponseHeader> {
406 self.0.header.as_ref().map(From::from)
407 }
408
409 #[inline]
411 pub fn take_header(&mut self) -> Option<ResponseHeader> {
412 self.0.header.take().map(ResponseHeader::new)
413 }
414
415 #[inline]
417 pub const fn ttl(&self) -> i64 {
418 self.0.ttl
419 }
420
421 #[inline]
423 pub const fn id(&self) -> i64 {
424 self.0.id
425 }
426
427 #[inline]
429 pub const fn granted_ttl(&self) -> i64 {
430 self.0.granted_ttl
431 }
432
433 #[inline]
435 pub fn keys(&self) -> &[Vec<u8>] {
436 &self.0.keys
437 }
438
439 #[inline]
440 pub(crate) fn strip_keys_prefix(&mut self, prefix: &[u8]) {
441 self.0.keys.iter_mut().for_each(|key| {
442 key.strip_key_prefix(prefix);
443 });
444 }
445}
446
447#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
449#[derive(Debug, Clone)]
450#[repr(transparent)]
451pub struct LeaseLeasesResponse(PbLeaseLeasesResponse);
452
453impl LeaseLeasesResponse {
454 #[inline]
456 const fn new(resp: PbLeaseLeasesResponse) -> Self {
457 Self(resp)
458 }
459
460 #[inline]
462 pub fn header(&self) -> Option<&ResponseHeader> {
463 self.0.header.as_ref().map(From::from)
464 }
465
466 #[inline]
468 pub fn take_header(&mut self) -> Option<ResponseHeader> {
469 self.0.header.take().map(ResponseHeader::new)
470 }
471
472 #[inline]
474 pub fn leases(&self) -> &[LeaseStatus] {
475 unsafe { &*(self.0.leases.as_slice() as *const _ as *const [LeaseStatus]) }
476 }
477}
478
479#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
481#[derive(Debug, Clone, PartialEq)]
482#[repr(transparent)]
483pub struct LeaseStatus(PbLeaseStatus);
484
485impl LeaseStatus {
486 #[inline]
488 pub const fn id(&self) -> i64 {
489 self.0.id
490 }
491}
492
493impl From<&PbLeaseStatus> for &LeaseStatus {
494 #[inline]
495 fn from(src: &PbLeaseStatus) -> Self {
496 unsafe { &*(src as *const _ as *const LeaseStatus) }
497 }
498}
499
500#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
502#[derive(Debug)]
503pub struct LeaseKeeper {
504 id: i64,
505 sender: Sender<PbLeaseKeepAliveRequest>,
506}
507
508impl LeaseKeeper {
509 #[inline]
511 const fn new(id: i64, sender: Sender<PbLeaseKeepAliveRequest>) -> Self {
512 Self { id, sender }
513 }
514
515 #[inline]
517 pub const fn id(&self) -> i64 {
518 self.id
519 }
520
521 #[inline]
523 pub async fn keep_alive(&mut self) -> Result<()> {
524 self.sender
525 .send(LeaseKeepAliveOptions::new().with_id(self.id).into())
526 .await
527 .map_err(|e| Error::LeaseKeepAliveError(e.to_string()))
528 }
529}
530
531#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
533#[derive(Debug)]
534pub struct LeaseKeepAliveStream {
535 stream: Streaming<PbLeaseKeepAliveResponse>,
536}
537
538impl LeaseKeepAliveStream {
539 #[inline]
541 const fn new(stream: Streaming<PbLeaseKeepAliveResponse>) -> Self {
542 Self { stream }
543 }
544
545 #[inline]
547 pub async fn message(&mut self) -> Result<Option<LeaseKeepAliveResponse>> {
548 match self.stream.message().await? {
549 Some(resp) => Ok(Some(LeaseKeepAliveResponse::new(resp))),
550 None => Ok(None),
551 }
552 }
553}
554
555impl Stream for LeaseKeepAliveStream {
556 type Item = Result<LeaseKeepAliveResponse>;
557
558 #[inline]
559 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
560 Pin::new(&mut self.get_mut().stream)
561 .poll_next(cx)
562 .map(|t| match t {
563 Some(Ok(resp)) => Some(Ok(LeaseKeepAliveResponse::new(resp))),
564 Some(Err(e)) => Some(Err(From::from(e))),
565 None => None,
566 })
567 }
568}