etcd_client/rpc/
election.rs1use crate::error::Result;
4use crate::intercept::InterceptedChannel;
5use crate::rpc::pb::v3electionpb::election_client::ElectionClient as PbElectionClient;
6use crate::rpc::pb::v3electionpb::{
7 CampaignRequest as PbCampaignRequest, CampaignResponse as PbCampaignResponse,
8 LeaderKey as PbLeaderKey, LeaderRequest as PbLeaderRequest, LeaderResponse as PbLeaderResponse,
9 ProclaimRequest as PbProclaimRequest, ProclaimResponse as PbProclaimResponse,
10 ResignRequest as PbResignRequest, ResignResponse as PbResignResponse,
11};
12use crate::rpc::{KeyValue, ResponseHeader};
13use std::pin::Pin;
14use std::task::{Context, Poll};
15use tokio_stream::Stream;
16use tonic::{IntoRequest, Request, Streaming};
17
18#[repr(transparent)]
20#[derive(Clone)]
21pub struct ElectionClient {
22 inner: PbElectionClient<InterceptedChannel>,
23}
24
25#[derive(Debug, Default, Clone)]
27#[repr(transparent)]
28pub struct CampaignOptions(PbCampaignRequest);
29
30impl CampaignOptions {
31 #[inline]
32 pub const fn new() -> Self {
33 Self(PbCampaignRequest {
34 name: Vec::new(),
35 lease: 0,
36 value: Vec::new(),
37 })
38 }
39
40 #[inline]
42 fn with_name(mut self, name: impl Into<Vec<u8>>) -> Self {
43 self.0.name = name.into();
44 self
45 }
46
47 #[inline]
49 const fn with_lease(mut self, lease: i64) -> Self {
50 self.0.lease = lease;
51 self
52 }
53
54 #[inline]
56 fn with_value(mut self, value: impl Into<Vec<u8>>) -> Self {
57 self.0.value = value.into();
58 self
59 }
60}
61
62impl From<CampaignOptions> for PbCampaignRequest {
63 #[inline]
64 fn from(options: CampaignOptions) -> Self {
65 options.0
66 }
67}
68
69impl IntoRequest<PbCampaignRequest> for CampaignOptions {
70 #[inline]
71 fn into_request(self) -> Request<PbCampaignRequest> {
72 Request::new(self.into())
73 }
74}
75
76#[derive(Debug, Default, Clone)]
78#[repr(transparent)]
79pub struct ProclaimOptions(PbProclaimRequest);
80
81impl ProclaimOptions {
82 #[inline]
83 pub const fn new() -> Self {
84 Self(PbProclaimRequest {
85 leader: None,
86 value: Vec::new(),
87 })
88 }
89
90 #[inline]
92 fn with_value(mut self, value: impl Into<Vec<u8>>) -> Self {
93 self.0.value = value.into();
94 self
95 }
96
97 #[inline]
99 pub fn with_leader(mut self, leader: LeaderKey) -> Self {
100 self.0.leader = Some(leader.into());
101 self
102 }
103}
104
105impl From<ProclaimOptions> for PbProclaimRequest {
106 #[inline]
107 fn from(options: ProclaimOptions) -> Self {
108 options.0
109 }
110}
111
112impl IntoRequest<PbProclaimRequest> for ProclaimOptions {
113 #[inline]
114 fn into_request(self) -> Request<PbProclaimRequest> {
115 Request::new(self.into())
116 }
117}
118
119#[derive(Debug, Default, Clone)]
121#[repr(transparent)]
122pub struct LeaderOptions(PbLeaderRequest);
123
124impl LeaderOptions {
125 #[inline]
126 pub const fn new() -> Self {
127 Self(PbLeaderRequest { name: Vec::new() })
128 }
129
130 #[inline]
132 pub fn with_name(mut self, name: impl Into<Vec<u8>>) -> Self {
133 self.0.name = name.into();
134 self
135 }
136}
137
138impl From<LeaderOptions> for PbLeaderRequest {
139 #[inline]
140 fn from(options: LeaderOptions) -> Self {
141 options.0
142 }
143}
144
145impl IntoRequest<PbLeaderRequest> for LeaderOptions {
146 #[inline]
147 fn into_request(self) -> Request<PbLeaderRequest> {
148 Request::new(self.into())
149 }
150}
151
152#[derive(Debug, Default, Clone)]
154#[repr(transparent)]
155pub struct ResignOptions(PbResignRequest);
156
157impl ResignOptions {
158 #[inline]
159 pub const fn new() -> Self {
160 Self(PbResignRequest { leader: None })
161 }
162
163 #[inline]
165 pub fn with_leader(mut self, leader: LeaderKey) -> Self {
166 self.0.leader = Some(leader.into());
167 self
168 }
169}
170
171impl From<ResignOptions> for PbResignRequest {
172 #[inline]
173 fn from(options: ResignOptions) -> Self {
174 options.0
175 }
176}
177
178impl IntoRequest<PbResignRequest> for ResignOptions {
179 #[inline]
180 fn into_request(self) -> Request<PbResignRequest> {
181 Request::new(self.into())
182 }
183}
184
185#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
187#[derive(Debug, Clone)]
188#[repr(transparent)]
189pub struct CampaignResponse(PbCampaignResponse);
190
191impl CampaignResponse {
192 #[inline]
193 const fn new(resp: PbCampaignResponse) -> Self {
194 Self(resp)
195 }
196
197 #[inline]
199 pub fn header(&self) -> Option<&ResponseHeader> {
200 self.0.header.as_ref().map(From::from)
201 }
202
203 #[inline]
205 pub fn take_header(&mut self) -> Option<ResponseHeader> {
206 self.0.header.take().map(ResponseHeader::new)
207 }
208
209 #[inline]
211 pub fn leader(&self) -> Option<&LeaderKey> {
212 self.0.leader.as_ref().map(From::from)
213 }
214
215 #[inline]
217 pub fn take_leader(&mut self) -> Option<LeaderKey> {
218 self.0.leader.take().map(From::from)
219 }
220}
221
222#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
224#[derive(Debug, Clone)]
225#[repr(transparent)]
226pub struct ProclaimResponse(PbProclaimResponse);
227
228impl ProclaimResponse {
229 #[inline]
230 const fn new(resp: PbProclaimResponse) -> Self {
231 Self(resp)
232 }
233
234 #[inline]
236 pub fn header(&self) -> Option<&ResponseHeader> {
237 self.0.header.as_ref().map(From::from)
238 }
239
240 #[inline]
242 pub fn take_header(&mut self) -> Option<ResponseHeader> {
243 self.0.header.take().map(ResponseHeader::new)
244 }
245}
246
247#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
249#[derive(Debug, Clone)]
250#[repr(transparent)]
251pub struct LeaderResponse(PbLeaderResponse);
252
253impl LeaderResponse {
254 #[inline]
255 const fn new(resp: PbLeaderResponse) -> Self {
256 Self(resp)
257 }
258
259 #[inline]
261 pub fn header(&self) -> Option<&ResponseHeader> {
262 self.0.header.as_ref().map(From::from)
263 }
264
265 #[inline]
267 pub fn take_header(&mut self) -> Option<ResponseHeader> {
268 self.0.header.take().map(ResponseHeader::new)
269 }
270
271 #[inline]
273 pub fn kv(&self) -> Option<&KeyValue> {
274 self.0.kv.as_ref().map(From::from)
275 }
276
277 #[inline]
279 pub fn take_kv(&mut self) -> Option<KeyValue> {
280 self.0.kv.take().map(KeyValue::new)
281 }
282}
283
284#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
286#[derive(Debug)]
287pub struct ObserveStream {
288 stream: Streaming<PbLeaderResponse>,
289}
290
291impl ObserveStream {
292 #[inline]
293 const fn new(stream: Streaming<PbLeaderResponse>) -> Self {
294 Self { stream }
295 }
296
297 #[inline]
299 pub async fn message(&mut self) -> Result<Option<LeaderResponse>> {
300 match self.stream.message().await? {
301 Some(resp) => Ok(Some(LeaderResponse::new(resp))),
302 None => Ok(None),
303 }
304 }
305}
306
307impl Stream for ObserveStream {
308 type Item = Result<LeaderResponse>;
309
310 #[inline]
311 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
312 Pin::new(&mut self.get_mut().stream)
313 .poll_next(cx)
314 .map(|t| match t {
315 Some(Ok(resp)) => Some(Ok(LeaderResponse::new(resp))),
316 Some(Err(e)) => Some(Err(From::from(e))),
317 None => None,
318 })
319 }
320}
321
322#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
324#[derive(Debug, Clone)]
325#[repr(transparent)]
326pub struct ResignResponse(PbResignResponse);
327
328impl ResignResponse {
329 #[inline]
330 const fn new(resp: PbResignResponse) -> Self {
331 Self(resp)
332 }
333
334 #[inline]
336 pub fn header(&self) -> Option<&ResponseHeader> {
337 self.0.header.as_ref().map(From::from)
338 }
339
340 #[inline]
342 pub fn take_header(&mut self) -> Option<ResponseHeader> {
343 self.0.header.take().map(ResponseHeader::new)
344 }
345}
346
347#[derive(Debug, Clone)]
349#[repr(transparent)]
350pub struct LeaderKey(PbLeaderKey);
351
352impl LeaderKey {
353 #[inline]
355 pub const fn new() -> Self {
356 Self(PbLeaderKey {
357 name: Vec::new(),
358 key: Vec::new(),
359 rev: 0,
360 lease: 0,
361 })
362 }
363
364 #[inline]
366 pub fn with_name(mut self, name: impl Into<Vec<u8>>) -> Self {
367 self.0.name = name.into();
368 self
369 }
370
371 #[inline]
373 pub fn with_key(mut self, key: impl Into<Vec<u8>>) -> Self {
374 self.0.key = key.into();
375 self
376 }
377
378 #[inline]
380 pub const fn with_rev(mut self, rev: i64) -> Self {
381 self.0.rev = rev;
382 self
383 }
384
385 #[inline]
387 pub const fn with_lease(mut self, lease: i64) -> Self {
388 self.0.lease = lease;
389 self
390 }
391
392 #[inline]
394 pub fn name(&self) -> &[u8] {
395 &self.0.name
396 }
397
398 #[inline]
400 pub fn name_str(&self) -> Result<&str> {
401 std::str::from_utf8(self.name()).map_err(From::from)
402 }
403
404 #[inline]
411 pub unsafe fn name_str_unchecked(&self) -> &str {
412 std::str::from_utf8_unchecked(self.name())
413 }
414
415 #[inline]
418 pub fn key(&self) -> &[u8] {
419 &self.0.key
420 }
421
422 #[inline]
425 pub fn key_str(&self) -> Result<&str> {
426 std::str::from_utf8(self.key()).map_err(From::from)
427 }
428
429 #[inline]
437 pub unsafe fn key_str_unchecked(&self) -> &str {
438 std::str::from_utf8_unchecked(self.key())
439 }
440
441 #[inline]
445 pub const fn rev(&self) -> i64 {
446 self.0.rev
447 }
448
449 #[inline]
451 pub const fn lease(&self) -> i64 {
452 self.0.lease
453 }
454}
455
456impl Default for LeaderKey {
457 #[inline]
458 fn default() -> Self {
459 Self::new()
460 }
461}
462
463impl From<LeaderKey> for PbLeaderKey {
464 #[inline]
465 fn from(leader_key: LeaderKey) -> Self {
466 leader_key.0
467 }
468}
469
470impl From<PbLeaderKey> for LeaderKey {
471 #[inline]
472 fn from(key: PbLeaderKey) -> Self {
473 Self(key)
474 }
475}
476
477impl From<&PbLeaderKey> for &LeaderKey {
478 #[inline]
479 fn from(src: &PbLeaderKey) -> Self {
480 unsafe { &*(src as *const _ as *const LeaderKey) }
481 }
482}
483
484impl ElectionClient {
485 #[inline]
487 pub(crate) fn new(channel: InterceptedChannel) -> Self {
488 let inner = PbElectionClient::new(channel);
489 Self { inner }
490 }
491
492 #[inline]
496 pub async fn campaign(
497 &mut self,
498 name: impl Into<Vec<u8>>,
499 value: impl Into<Vec<u8>>,
500 lease: i64,
501 ) -> Result<CampaignResponse> {
502 let resp = self
503 .inner
504 .campaign(
505 CampaignOptions::new()
506 .with_name(name)
507 .with_value(value)
508 .with_lease(lease),
509 )
510 .await?
511 .into_inner();
512 Ok(CampaignResponse::new(resp))
513 }
514
515 #[inline]
517 pub async fn proclaim(
518 &mut self,
519 value: impl Into<Vec<u8>>,
520 options: Option<ProclaimOptions>,
521 ) -> Result<ProclaimResponse> {
522 let resp = self
523 .inner
524 .proclaim(options.unwrap_or_default().with_value(value))
525 .await?
526 .into_inner();
527 Ok(ProclaimResponse::new(resp))
528 }
529
530 #[inline]
532 pub async fn leader(&mut self, name: impl Into<Vec<u8>>) -> Result<LeaderResponse> {
533 let resp = self
534 .inner
535 .leader(LeaderOptions::new().with_name(name))
536 .await?
537 .into_inner();
538 Ok(LeaderResponse::new(resp))
539 }
540
541 #[inline]
544 pub async fn observe(&mut self, name: impl Into<Vec<u8>>) -> Result<ObserveStream> {
545 let resp = self
546 .inner
547 .observe(LeaderOptions::new().with_name(name))
548 .await?
549 .into_inner();
550
551 Ok(ObserveStream::new(resp))
552 }
553
554 #[inline]
556 pub async fn resign(&mut self, option: Option<ResignOptions>) -> Result<ResignResponse> {
557 let resp = self
558 .inner
559 .resign(option.unwrap_or_default())
560 .await?
561 .into_inner();
562 Ok(ResignResponse::new(resp))
563 }
564}