etcd_client/rpc/
election.rs1use crate::auth::AuthService;
4use crate::error::Result;
5use crate::intercept::InterceptedChannel;
6use crate::rpc::pb::v3electionpb::election_client::ElectionClient as PbElectionClient;
7use crate::rpc::pb::v3electionpb::{
8 CampaignRequest as PbCampaignRequest, CampaignResponse as PbCampaignResponse,
9 LeaderKey as PbLeaderKey, LeaderRequest as PbLeaderRequest, LeaderResponse as PbLeaderResponse,
10 ProclaimRequest as PbProclaimRequest, ProclaimResponse as PbProclaimResponse,
11 ResignRequest as PbResignRequest, ResignResponse as PbResignResponse,
12};
13use crate::rpc::{KeyValue, ResponseHeader};
14use http::HeaderValue;
15use std::sync::RwLock;
16use std::task::{Context, Poll};
17use std::{pin::Pin, sync::Arc};
18use tokio_stream::Stream;
19use tonic::{IntoRequest, Request, Streaming};
20
21#[repr(transparent)]
23#[derive(Clone)]
24pub struct ElectionClient {
25 inner: PbElectionClient<AuthService<InterceptedChannel>>,
26}
27
28#[derive(Debug, Default, Clone)]
30#[repr(transparent)]
31pub struct CampaignOptions(PbCampaignRequest);
32
33impl CampaignOptions {
34 #[inline]
35 pub const fn new() -> Self {
36 Self(PbCampaignRequest {
37 name: Vec::new(),
38 lease: 0,
39 value: Vec::new(),
40 })
41 }
42
43 #[inline]
45 fn with_name(mut self, name: impl Into<Vec<u8>>) -> Self {
46 self.0.name = name.into();
47 self
48 }
49
50 #[inline]
52 const fn with_lease(mut self, lease: i64) -> Self {
53 self.0.lease = lease;
54 self
55 }
56
57 #[inline]
59 fn with_value(mut self, value: impl Into<Vec<u8>>) -> Self {
60 self.0.value = value.into();
61 self
62 }
63}
64
65impl From<CampaignOptions> for PbCampaignRequest {
66 #[inline]
67 fn from(options: CampaignOptions) -> Self {
68 options.0
69 }
70}
71
72impl IntoRequest<PbCampaignRequest> for CampaignOptions {
73 #[inline]
74 fn into_request(self) -> Request<PbCampaignRequest> {
75 Request::new(self.into())
76 }
77}
78
79#[derive(Debug, Default, Clone)]
81#[repr(transparent)]
82pub struct ProclaimOptions(PbProclaimRequest);
83
84impl ProclaimOptions {
85 #[inline]
86 pub const fn new() -> Self {
87 Self(PbProclaimRequest {
88 leader: None,
89 value: Vec::new(),
90 })
91 }
92
93 #[inline]
95 fn with_value(mut self, value: impl Into<Vec<u8>>) -> Self {
96 self.0.value = value.into();
97 self
98 }
99
100 #[inline]
102 pub fn with_leader(mut self, leader: LeaderKey) -> Self {
103 self.0.leader = Some(leader.into());
104 self
105 }
106}
107
108impl From<ProclaimOptions> for PbProclaimRequest {
109 #[inline]
110 fn from(options: ProclaimOptions) -> Self {
111 options.0
112 }
113}
114
115impl IntoRequest<PbProclaimRequest> for ProclaimOptions {
116 #[inline]
117 fn into_request(self) -> Request<PbProclaimRequest> {
118 Request::new(self.into())
119 }
120}
121
122#[derive(Debug, Default, Clone)]
124#[repr(transparent)]
125pub struct LeaderOptions(PbLeaderRequest);
126
127impl LeaderOptions {
128 #[inline]
129 pub const fn new() -> Self {
130 Self(PbLeaderRequest { name: Vec::new() })
131 }
132
133 #[inline]
135 pub fn with_name(mut self, name: impl Into<Vec<u8>>) -> Self {
136 self.0.name = name.into();
137 self
138 }
139}
140
141impl From<LeaderOptions> for PbLeaderRequest {
142 #[inline]
143 fn from(options: LeaderOptions) -> Self {
144 options.0
145 }
146}
147
148impl IntoRequest<PbLeaderRequest> for LeaderOptions {
149 #[inline]
150 fn into_request(self) -> Request<PbLeaderRequest> {
151 Request::new(self.into())
152 }
153}
154
155#[derive(Debug, Default, Clone)]
157#[repr(transparent)]
158pub struct ResignOptions(PbResignRequest);
159
160impl ResignOptions {
161 #[inline]
162 pub const fn new() -> Self {
163 Self(PbResignRequest { leader: None })
164 }
165
166 #[inline]
168 pub fn with_leader(mut self, leader: LeaderKey) -> Self {
169 self.0.leader = Some(leader.into());
170 self
171 }
172}
173
174impl From<ResignOptions> for PbResignRequest {
175 #[inline]
176 fn from(options: ResignOptions) -> Self {
177 options.0
178 }
179}
180
181impl IntoRequest<PbResignRequest> for ResignOptions {
182 #[inline]
183 fn into_request(self) -> Request<PbResignRequest> {
184 Request::new(self.into())
185 }
186}
187
188#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
190#[derive(Debug, Clone)]
191#[repr(transparent)]
192pub struct CampaignResponse(PbCampaignResponse);
193
194impl CampaignResponse {
195 #[inline]
196 const fn new(resp: PbCampaignResponse) -> Self {
197 Self(resp)
198 }
199
200 #[inline]
202 pub fn header(&self) -> Option<&ResponseHeader> {
203 self.0.header.as_ref().map(From::from)
204 }
205
206 #[inline]
208 pub fn take_header(&mut self) -> Option<ResponseHeader> {
209 self.0.header.take().map(ResponseHeader::new)
210 }
211
212 #[inline]
214 pub fn leader(&self) -> Option<&LeaderKey> {
215 self.0.leader.as_ref().map(From::from)
216 }
217
218 #[inline]
220 pub fn take_leader(&mut self) -> Option<LeaderKey> {
221 self.0.leader.take().map(From::from)
222 }
223}
224
225#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
227#[derive(Debug, Clone)]
228#[repr(transparent)]
229pub struct ProclaimResponse(PbProclaimResponse);
230
231impl ProclaimResponse {
232 #[inline]
233 const fn new(resp: PbProclaimResponse) -> Self {
234 Self(resp)
235 }
236
237 #[inline]
239 pub fn header(&self) -> Option<&ResponseHeader> {
240 self.0.header.as_ref().map(From::from)
241 }
242
243 #[inline]
245 pub fn take_header(&mut self) -> Option<ResponseHeader> {
246 self.0.header.take().map(ResponseHeader::new)
247 }
248}
249
250#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
252#[derive(Debug, Clone)]
253#[repr(transparent)]
254pub struct LeaderResponse(PbLeaderResponse);
255
256impl LeaderResponse {
257 #[inline]
258 const fn new(resp: PbLeaderResponse) -> Self {
259 Self(resp)
260 }
261
262 #[inline]
264 pub fn header(&self) -> Option<&ResponseHeader> {
265 self.0.header.as_ref().map(From::from)
266 }
267
268 #[inline]
270 pub fn take_header(&mut self) -> Option<ResponseHeader> {
271 self.0.header.take().map(ResponseHeader::new)
272 }
273
274 #[inline]
276 pub fn kv(&self) -> Option<&KeyValue> {
277 self.0.kv.as_ref().map(From::from)
278 }
279
280 #[inline]
282 pub fn take_kv(&mut self) -> Option<KeyValue> {
283 self.0.kv.take().map(KeyValue::new)
284 }
285}
286
287#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
289#[derive(Debug)]
290pub struct ObserveStream {
291 stream: Streaming<PbLeaderResponse>,
292}
293
294impl ObserveStream {
295 #[inline]
296 const fn new(stream: Streaming<PbLeaderResponse>) -> Self {
297 Self { stream }
298 }
299
300 #[inline]
302 pub async fn message(&mut self) -> Result<Option<LeaderResponse>> {
303 match self.stream.message().await? {
304 Some(resp) => Ok(Some(LeaderResponse::new(resp))),
305 None => Ok(None),
306 }
307 }
308}
309
310impl Stream for ObserveStream {
311 type Item = Result<LeaderResponse>;
312
313 #[inline]
314 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
315 Pin::new(&mut self.get_mut().stream)
316 .poll_next(cx)
317 .map(|t| match t {
318 Some(Ok(resp)) => Some(Ok(LeaderResponse::new(resp))),
319 Some(Err(e)) => Some(Err(From::from(e))),
320 None => None,
321 })
322 }
323}
324
325#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
327#[derive(Debug, Clone)]
328#[repr(transparent)]
329pub struct ResignResponse(PbResignResponse);
330
331impl ResignResponse {
332 #[inline]
333 const fn new(resp: PbResignResponse) -> Self {
334 Self(resp)
335 }
336
337 #[inline]
339 pub fn header(&self) -> Option<&ResponseHeader> {
340 self.0.header.as_ref().map(From::from)
341 }
342
343 #[inline]
345 pub fn take_header(&mut self) -> Option<ResponseHeader> {
346 self.0.header.take().map(ResponseHeader::new)
347 }
348}
349
350#[derive(Debug, Clone)]
352#[repr(transparent)]
353pub struct LeaderKey(PbLeaderKey);
354
355impl LeaderKey {
356 #[inline]
358 pub const fn new() -> Self {
359 Self(PbLeaderKey {
360 name: Vec::new(),
361 key: Vec::new(),
362 rev: 0,
363 lease: 0,
364 })
365 }
366
367 #[inline]
369 pub fn with_name(mut self, name: impl Into<Vec<u8>>) -> Self {
370 self.0.name = name.into();
371 self
372 }
373
374 #[inline]
376 pub fn with_key(mut self, key: impl Into<Vec<u8>>) -> Self {
377 self.0.key = key.into();
378 self
379 }
380
381 #[inline]
383 pub const fn with_rev(mut self, rev: i64) -> Self {
384 self.0.rev = rev;
385 self
386 }
387
388 #[inline]
390 pub const fn with_lease(mut self, lease: i64) -> Self {
391 self.0.lease = lease;
392 self
393 }
394
395 #[inline]
397 pub fn name(&self) -> &[u8] {
398 &self.0.name
399 }
400
401 #[inline]
403 pub fn name_str(&self) -> Result<&str> {
404 std::str::from_utf8(self.name()).map_err(From::from)
405 }
406
407 #[inline]
414 pub unsafe fn name_str_unchecked(&self) -> &str {
415 std::str::from_utf8_unchecked(self.name())
416 }
417
418 #[inline]
421 pub fn key(&self) -> &[u8] {
422 &self.0.key
423 }
424
425 #[inline]
428 pub fn key_str(&self) -> Result<&str> {
429 std::str::from_utf8(self.key()).map_err(From::from)
430 }
431
432 #[inline]
440 pub unsafe fn key_str_unchecked(&self) -> &str {
441 std::str::from_utf8_unchecked(self.key())
442 }
443
444 #[inline]
448 pub const fn rev(&self) -> i64 {
449 self.0.rev
450 }
451
452 #[inline]
454 pub const fn lease(&self) -> i64 {
455 self.0.lease
456 }
457}
458
459impl Default for LeaderKey {
460 #[inline]
461 fn default() -> Self {
462 Self::new()
463 }
464}
465
466impl From<LeaderKey> for PbLeaderKey {
467 #[inline]
468 fn from(leader_key: LeaderKey) -> Self {
469 leader_key.0
470 }
471}
472
473impl From<PbLeaderKey> for LeaderKey {
474 #[inline]
475 fn from(key: PbLeaderKey) -> Self {
476 Self(key)
477 }
478}
479
480impl From<&PbLeaderKey> for &LeaderKey {
481 #[inline]
482 fn from(src: &PbLeaderKey) -> Self {
483 unsafe { &*(src as *const _ as *const LeaderKey) }
484 }
485}
486
487impl ElectionClient {
488 #[inline]
490 pub(crate) fn new(
491 channel: InterceptedChannel,
492 auth_token: Arc<RwLock<Option<HeaderValue>>>,
493 ) -> Self {
494 let inner = PbElectionClient::new(AuthService::new(channel, auth_token));
495 Self { inner }
496 }
497
498 #[inline]
502 pub async fn campaign(
503 &mut self,
504 name: impl Into<Vec<u8>>,
505 value: impl Into<Vec<u8>>,
506 lease: i64,
507 ) -> Result<CampaignResponse> {
508 let resp = self
509 .inner
510 .campaign(
511 CampaignOptions::new()
512 .with_name(name)
513 .with_value(value)
514 .with_lease(lease),
515 )
516 .await?
517 .into_inner();
518 Ok(CampaignResponse::new(resp))
519 }
520
521 #[inline]
523 pub async fn proclaim(
524 &mut self,
525 value: impl Into<Vec<u8>>,
526 options: Option<ProclaimOptions>,
527 ) -> Result<ProclaimResponse> {
528 let resp = self
529 .inner
530 .proclaim(options.unwrap_or_default().with_value(value))
531 .await?
532 .into_inner();
533 Ok(ProclaimResponse::new(resp))
534 }
535
536 #[inline]
538 pub async fn leader(&mut self, name: impl Into<Vec<u8>>) -> Result<LeaderResponse> {
539 let resp = self
540 .inner
541 .leader(LeaderOptions::new().with_name(name))
542 .await?
543 .into_inner();
544 Ok(LeaderResponse::new(resp))
545 }
546
547 #[inline]
550 pub async fn observe(&mut self, name: impl Into<Vec<u8>>) -> Result<ObserveStream> {
551 let resp = self
552 .inner
553 .observe(LeaderOptions::new().with_name(name))
554 .await?
555 .into_inner();
556
557 Ok(ObserveStream::new(resp))
558 }
559
560 #[inline]
562 pub async fn resign(&mut self, option: Option<ResignOptions>) -> Result<ResignResponse> {
563 let resp = self
564 .inner
565 .resign(option.unwrap_or_default())
566 .await?
567 .into_inner();
568 Ok(ResignResponse::new(resp))
569 }
570}