etcd_client/rpc/
kv.rs

1//! Etcd KV Operations.
2
3pub use crate::rpc::pb::etcdserverpb::compare::CompareResult as CompareOp;
4pub use crate::rpc::pb::etcdserverpb::range_request::{SortOrder, SortTarget};
5
6use crate::auth::AuthService;
7use crate::error::Result;
8use crate::intercept::InterceptedChannel;
9use crate::rpc::pb::etcdserverpb::compare::{CompareTarget, TargetUnion};
10use crate::rpc::pb::etcdserverpb::kv_client::KvClient as PbKvClient;
11use crate::rpc::pb::etcdserverpb::request_op::Request as PbTxnOp;
12use crate::rpc::pb::etcdserverpb::response_op::Response as PbTxnOpResponse;
13use crate::rpc::pb::etcdserverpb::{
14    CompactionRequest as PbCompactionRequest, CompactionRequest,
15    CompactionResponse as PbCompactionResponse, Compare as PbCompare,
16    DeleteRangeRequest as PbDeleteRequest, DeleteRangeRequest,
17    DeleteRangeResponse as PbDeleteResponse, PutRequest as PbPutRequest,
18    PutResponse as PbPutResponse, RangeRequest as PbRangeRequest, RangeResponse as PbRangeResponse,
19    RequestOp as PbTxnRequestOp, TxnRequest as PbTxnRequest, TxnResponse as PbTxnResponse,
20};
21use crate::rpc::{get_prefix, KeyRange, KeyValue, ResponseHeader};
22use crate::vec::VecExt;
23use http::HeaderValue;
24use std::mem::ManuallyDrop;
25use std::sync::{Arc, RwLock};
26use tonic::{IntoRequest, Request};
27
28/// Client for KV operations.
29#[repr(transparent)]
30#[derive(Clone)]
31pub struct KvClient {
32    inner: PbKvClient<AuthService<InterceptedChannel>>,
33}
34
35impl KvClient {
36    /// Creates a kv client.
37    #[inline]
38    pub(crate) fn new(
39        channel: InterceptedChannel,
40        auth_token: Arc<RwLock<Option<HeaderValue>>>,
41    ) -> Self {
42        let inner = PbKvClient::new(AuthService::new(channel, auth_token));
43        Self { inner }
44    }
45
46    /// Limits the maximum size of a decoded message.
47    ///
48    /// Default: `4MB`
49    pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
50        self.inner = self.inner.max_decoding_message_size(limit);
51        self
52    }
53
54    /// Limits the maximum size of an encoded message.
55    ///
56    /// Default: `usize::MAX`
57    pub fn max_encoding_message_size(mut self, limit: usize) -> Self {
58        self.inner = self.inner.max_encoding_message_size(limit);
59        self
60    }
61
62    /// Puts the given key into the key-value store.
63    /// A put request increments the revision of the key-value store
64    /// and generates one event in the event history.
65    #[inline]
66    pub async fn put(
67        &mut self,
68        key: impl Into<Vec<u8>>,
69        value: impl Into<Vec<u8>>,
70        options: Option<PutOptions>,
71    ) -> Result<PutResponse> {
72        let resp = self
73            .inner
74            .put(options.unwrap_or_default().with_kv(key, value))
75            .await?
76            .into_inner();
77        Ok(PutResponse::new(resp))
78    }
79
80    /// Gets the key or a range of keys from the store.
81    #[inline]
82    pub async fn get(
83        &mut self,
84        key: impl Into<Vec<u8>>,
85        options: Option<GetOptions>,
86    ) -> Result<GetResponse> {
87        let resp = self
88            .inner
89            .range(options.unwrap_or_default().with_key(key.into()))
90            .await?
91            .into_inner();
92        Ok(GetResponse::new(resp))
93    }
94
95    /// Deletes the given key or a range of keys from the key-value store.
96    #[inline]
97    pub async fn delete(
98        &mut self,
99        key: impl Into<Vec<u8>>,
100        options: Option<DeleteOptions>,
101    ) -> Result<DeleteResponse> {
102        let resp = self
103            .inner
104            .delete_range(options.unwrap_or_default().with_key(key.into()))
105            .await?
106            .into_inner();
107        Ok(DeleteResponse::new(resp))
108    }
109
110    /// Compacts the event history in the etcd key-value store. The key-value
111    /// store should be periodically compacted or the event history will continue to grow
112    /// indefinitely.
113    #[inline]
114    pub async fn compact(
115        &mut self,
116        revision: i64,
117        options: Option<CompactionOptions>,
118    ) -> Result<CompactionResponse> {
119        let resp = self
120            .inner
121            .compact(options.unwrap_or_default().with_revision(revision))
122            .await?
123            .into_inner();
124        Ok(CompactionResponse::new(resp))
125    }
126
127    /// Processes multiple operations in a single transaction.
128    /// A txn request increments the revision of the key-value store
129    /// and generates events with the same revision for every completed operation.
130    /// It is not allowed to modify the same key several times within one txn.
131    #[inline]
132    pub async fn txn(&mut self, txn: Txn) -> Result<TxnResponse> {
133        let resp = self.inner.txn(txn).await?.into_inner();
134        Ok(TxnResponse::new(resp))
135    }
136}
137
138/// Options for `Put` operation.
139#[derive(Debug, Default, Clone)]
140#[repr(transparent)]
141pub struct PutOptions(PbPutRequest);
142
143impl PutOptions {
144    /// Set key-value pair.
145    #[inline]
146    fn with_kv(mut self, key: impl Into<Vec<u8>>, value: impl Into<Vec<u8>>) -> Self {
147        self.0.key = key.into();
148        self.0.value = value.into();
149        self
150    }
151
152    /// Creates a `PutOptions`.
153    #[inline]
154    pub const fn new() -> Self {
155        Self(PbPutRequest {
156            key: Vec::new(),
157            value: Vec::new(),
158            lease: 0,
159            prev_kv: false,
160            ignore_value: false,
161            ignore_lease: false,
162        })
163    }
164
165    /// Lease is the lease ID to associate with the key in the key-value store. A lease
166    /// value of 0 indicates no lease.
167    #[inline]
168    pub const fn with_lease(mut self, lease: i64) -> Self {
169        self.0.lease = lease;
170        self
171    }
172
173    /// If prev_kv is set, etcd gets the previous key-value pair before changing it.
174    /// The previous key-value pair will be returned in the put response.
175    #[inline]
176    pub const fn with_prev_key(mut self) -> Self {
177        self.0.prev_kv = true;
178        self
179    }
180
181    /// If ignore_value is set, etcd updates the key using its current value.
182    /// Returns an error if the key does not exist.
183    #[inline]
184    pub const fn with_ignore_value(mut self) -> Self {
185        self.0.ignore_value = true;
186        self
187    }
188
189    /// If ignore_lease is set, etcd updates the key using its current lease.
190    /// Returns an error if the key does not exist.
191    #[inline]
192    pub const fn with_ignore_lease(mut self) -> Self {
193        self.0.ignore_lease = true;
194        self
195    }
196}
197
198impl From<PutOptions> for PbPutRequest {
199    #[inline]
200    fn from(options: PutOptions) -> Self {
201        options.0
202    }
203}
204
205impl IntoRequest<PbPutRequest> for PutOptions {
206    #[inline]
207    fn into_request(self) -> Request<PbPutRequest> {
208        Request::new(self.into())
209    }
210}
211
212/// Response for `Put` operation.
213#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
214#[derive(Debug, Clone)]
215#[repr(transparent)]
216pub struct PutResponse(PbPutResponse);
217
218impl PutResponse {
219    /// Create a new `PutResponse` from pb put response.
220    #[inline]
221    const fn new(resp: PbPutResponse) -> Self {
222        Self(resp)
223    }
224
225    /// Get response header.
226    #[inline]
227    pub fn header(&self) -> Option<&ResponseHeader> {
228        self.0.header.as_ref().map(From::from)
229    }
230
231    /// Takes the header out of the response, leaving a [`None`] in its place.
232    #[inline]
233    pub fn take_header(&mut self) -> Option<ResponseHeader> {
234        self.0.header.take().map(ResponseHeader::new)
235    }
236
237    /// If prev_kv is set in the request, the previous key-value pair will be returned.
238    #[inline]
239    pub fn prev_key(&self) -> Option<&KeyValue> {
240        self.0.prev_kv.as_ref().map(From::from)
241    }
242
243    /// Takes the prev_key out of the response, leaving a [`None`] in its place.
244    #[inline]
245    pub fn take_prev_key(&mut self) -> Option<KeyValue> {
246        self.0.prev_kv.take().map(KeyValue::new)
247    }
248
249    #[inline]
250    pub(crate) fn strip_prev_key_prefix(&mut self, prefix: &[u8]) {
251        if let Some(kv) = self.0.prev_kv.as_mut() {
252            kv.key.strip_key_prefix(prefix);
253        }
254    }
255}
256
257/// Options for `Get` operation.
258#[derive(Debug, Default, Clone)]
259pub struct GetOptions {
260    req: PbRangeRequest,
261    key_range: KeyRange,
262}
263
264impl GetOptions {
265    /// Sets key.
266    #[inline]
267    fn with_key(mut self, key: impl Into<Vec<u8>>) -> Self {
268        self.key_range.with_key(key);
269        self
270    }
271
272    /// Creates a `GetOptions`.
273    #[inline]
274    pub const fn new() -> Self {
275        Self {
276            req: PbRangeRequest {
277                key: Vec::new(),
278                range_end: Vec::new(),
279                limit: 0,
280                revision: 0,
281                sort_order: 0,
282                sort_target: 0,
283                serializable: false,
284                keys_only: false,
285                count_only: false,
286                min_mod_revision: 0,
287                max_mod_revision: 0,
288                min_create_revision: 0,
289                max_create_revision: 0,
290            },
291            key_range: KeyRange::new(),
292        }
293    }
294
295    /// Specifies the range of 'Get'.
296    /// Returns the keys in the range [key, end_key).
297    /// `end_key` must be lexicographically greater than start key.
298    #[inline]
299    pub fn with_range(mut self, end_key: impl Into<Vec<u8>>) -> Self {
300        self.key_range.with_range(end_key);
301        self
302    }
303
304    /// Gets all keys >= key.
305    #[inline]
306    pub fn with_from_key(mut self) -> Self {
307        self.key_range.with_from_key();
308        self
309    }
310
311    /// Gets all keys prefixed with key.
312    #[inline]
313    pub fn with_prefix(mut self) -> Self {
314        self.key_range.with_prefix();
315        self
316    }
317
318    /// Gets all keys.
319    #[inline]
320    pub fn with_all_keys(mut self) -> Self {
321        self.key_range.with_all_keys();
322        self
323    }
324
325    /// Limits the number of keys returned for the request. When limit is set to 0,
326    /// it is treated as no limit.
327    #[inline]
328    pub const fn with_limit(mut self, limit: i64) -> Self {
329        self.req.limit = limit;
330        self
331    }
332
333    /// The point-in-time of the key-value store to use for the range.
334    /// If revision is less or equal to zero, the range is over the newest key-value store.
335    /// If the revision has been compacted, ErrCompacted is returned as a response.
336    #[inline]
337    pub const fn with_revision(mut self, revision: i64) -> Self {
338        self.req.revision = revision;
339        self
340    }
341
342    /// Sets the order for returned sorted results.
343    /// It requires 'with_range' and/or 'with_prefix' to be specified too.
344    #[inline]
345    pub fn with_sort(mut self, target: SortTarget, order: SortOrder) -> Self {
346        if target == SortTarget::Key && order == SortOrder::Ascend {
347            // If order != SortOrder::None, server fetches the entire key-space,
348            // and then applies the sort and limit, if provided.
349            // Since by default the server returns results sorted by keys
350            // in lexicographically ascending order, the client should ignore
351            // SortOrder if the target is SortTarget::Key.
352            self.req.sort_order = SortOrder::None as i32;
353        } else {
354            self.req.sort_order = order as i32;
355        }
356        self.req.sort_target = target as i32;
357        self
358    }
359
360    /// Sets the get request to use serializable member-local reads.
361    /// Get requests are linearizable by default; linearizable requests have higher
362    /// latency and lower throughput than serializable requests but reflect the current
363    /// consensus of the cluster. For better performance, in exchange for possible stale reads,
364    /// a serializable get request is served locally without needing to reach consensus
365    /// with other nodes in the cluster.
366    #[inline]
367    pub const fn with_serializable(mut self) -> Self {
368        self.req.serializable = true;
369        self
370    }
371
372    /// Returns only the keys and not the values.
373    #[inline]
374    pub const fn with_keys_only(mut self) -> Self {
375        self.req.keys_only = true;
376        self
377    }
378
379    /// Returns only the count of the keys in the range.
380    #[inline]
381    pub const fn with_count_only(mut self) -> Self {
382        self.req.count_only = true;
383        self
384    }
385
386    /// Sets the lower bound for returned key mod revisions; all keys with
387    /// lesser mod revisions will be filtered away.
388    #[inline]
389    pub const fn with_min_mod_revision(mut self, revision: i64) -> Self {
390        self.req.min_mod_revision = revision;
391        self
392    }
393
394    /// Sets the upper bound for returned key mod revisions; all keys with
395    /// greater mod revisions will be filtered away.
396    #[inline]
397    pub const fn with_max_mod_revision(mut self, revision: i64) -> Self {
398        self.req.max_mod_revision = revision;
399        self
400    }
401
402    /// Sets the lower bound for returned key create revisions; all keys with
403    /// lesser create revisions will be filtered away.
404    #[inline]
405    pub const fn with_min_create_revision(mut self, revision: i64) -> Self {
406        self.req.min_create_revision = revision;
407        self
408    }
409
410    /// `max_create_revision` is the upper bound for returned key create revisions; all keys with
411    /// greater create revisions will be filtered away.
412    #[inline]
413    pub const fn with_max_create_revision(mut self, revision: i64) -> Self {
414        self.req.max_create_revision = revision;
415        self
416    }
417
418    #[inline]
419    pub(crate) fn key_range_end_mut(&mut self) -> &mut Vec<u8> {
420        &mut self.key_range.range_end
421    }
422}
423
424impl From<GetOptions> for PbRangeRequest {
425    #[inline]
426    fn from(mut options: GetOptions) -> Self {
427        let (key, rang_end) = options.key_range.build();
428        options.req.key = key;
429        options.req.range_end = rang_end;
430        options.req
431    }
432}
433
434impl IntoRequest<PbRangeRequest> for GetOptions {
435    #[inline]
436    fn into_request(self) -> Request<PbRangeRequest> {
437        Request::new(self.into())
438    }
439}
440
441/// Response for `Get` operation.
442#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
443#[derive(Debug, Clone)]
444#[repr(transparent)]
445pub struct GetResponse(PbRangeResponse);
446
447impl GetResponse {
448    /// Create a new `GetResponse` from pb get response.
449    #[inline]
450    const fn new(resp: PbRangeResponse) -> Self {
451        Self(resp)
452    }
453
454    /// Get response header.
455    #[inline]
456    pub fn header(&self) -> Option<&ResponseHeader> {
457        self.0.header.as_ref().map(From::from)
458    }
459
460    /// Takes the header out of the response, leaving a [`None`] in its place.
461    #[inline]
462    pub fn take_header(&mut self) -> Option<ResponseHeader> {
463        self.0.header.take().map(ResponseHeader::new)
464    }
465
466    /// The list of key-value pairs matched by the `Get` request.
467    /// kvs is empty when count is requested.
468    #[inline]
469    pub fn kvs(&self) -> &[KeyValue] {
470        unsafe { &*(self.0.kvs.as_slice() as *const _ as *const [KeyValue]) }
471    }
472
473    /// If `kvs` is set in the request, take the key-value pairs, leaving an empty vector in its place.
474    #[inline]
475    pub fn take_kvs(&mut self) -> Vec<KeyValue> {
476        let kvs = ManuallyDrop::new(std::mem::take(&mut self.0.kvs));
477        unsafe { Vec::from_raw_parts(kvs.as_ptr() as *mut KeyValue, kvs.len(), kvs.capacity()) }
478    }
479
480    #[inline]
481    pub(crate) fn strip_kvs_prefix(&mut self, prefix: &[u8]) {
482        for kv in self.0.kvs.iter_mut() {
483            kv.key.strip_key_prefix(prefix);
484        }
485    }
486
487    /// Indicates if there are more keys to return in the requested range.
488    #[inline]
489    pub const fn more(&self) -> bool {
490        self.0.more
491    }
492
493    /// The number of keys within the range when requested.
494    #[inline]
495    pub const fn count(&self) -> i64 {
496        self.0.count
497    }
498}
499
500/// Options for `Delete` operation.
501#[derive(Debug, Default, Clone)]
502pub struct DeleteOptions {
503    req: PbDeleteRequest,
504    key_range: KeyRange,
505}
506
507impl DeleteOptions {
508    /// Sets key.
509    #[inline]
510    fn with_key(mut self, key: impl Into<Vec<u8>>) -> Self {
511        self.key_range.with_key(key);
512        self
513    }
514
515    /// Creates a `DeleteOptions`.
516    #[inline]
517    pub const fn new() -> Self {
518        Self {
519            req: PbDeleteRequest {
520                key: Vec::new(),
521                range_end: Vec::new(),
522                prev_kv: false,
523            },
524            key_range: KeyRange::new(),
525        }
526    }
527
528    /// `end_key` is the key following the last key to delete for the range [key, end_key).
529    #[inline]
530    pub fn with_range(mut self, end_key: impl Into<Vec<u8>>) -> Self {
531        self.key_range.with_range(end_key);
532        self
533    }
534
535    /// Deletes all keys >= key.
536    #[inline]
537    pub fn with_from_key(mut self) -> Self {
538        self.key_range.with_from_key();
539        self
540    }
541
542    /// Deletes all keys prefixed with key.
543    #[inline]
544    pub fn with_prefix(mut self) -> Self {
545        self.key_range.with_prefix();
546        self
547    }
548
549    /// Deletes all keys.
550    #[inline]
551    pub fn with_all_keys(mut self) -> Self {
552        self.key_range.with_all_keys();
553        self
554    }
555
556    /// If `prev_kv` is set, etcd gets the previous key-value pairs before deleting it.
557    /// The previous key-value pairs will be returned in the delete response.
558    #[inline]
559    pub const fn with_prev_key(mut self) -> Self {
560        self.req.prev_kv = true;
561        self
562    }
563
564    #[inline]
565    pub(crate) fn key_range_end_mut(&mut self) -> &mut Vec<u8> {
566        &mut self.key_range.range_end
567    }
568}
569
570impl From<DeleteOptions> for PbDeleteRequest {
571    #[inline]
572    fn from(mut options: DeleteOptions) -> Self {
573        let (key, rang_end) = options.key_range.build();
574        options.req.key = key;
575        options.req.range_end = rang_end;
576        options.req
577    }
578}
579
580impl IntoRequest<PbDeleteRequest> for DeleteOptions {
581    #[inline]
582    fn into_request(self) -> Request<DeleteRangeRequest> {
583        Request::new(self.into())
584    }
585}
586
587/// Response for `Delete` operation.
588#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
589#[derive(Debug, Clone)]
590#[repr(transparent)]
591pub struct DeleteResponse(PbDeleteResponse);
592
593impl DeleteResponse {
594    /// Create a new `DeleteResponse` from pb delete response.
595    #[inline]
596    const fn new(resp: PbDeleteResponse) -> Self {
597        Self(resp)
598    }
599
600    /// Delete response header.
601    #[inline]
602    pub fn header(&self) -> Option<&ResponseHeader> {
603        self.0.header.as_ref().map(From::from)
604    }
605
606    /// Takes the header out of the response, leaving a [`None`] in its place.
607    #[inline]
608    pub fn take_header(&mut self) -> Option<ResponseHeader> {
609        self.0.header.take().map(ResponseHeader::new)
610    }
611
612    /// The number of keys deleted by the delete request.
613    #[inline]
614    pub const fn deleted(&self) -> i64 {
615        self.0.deleted
616    }
617
618    /// If `prev_kv` is set in the request, the previous key-value pairs will be returned.
619    #[inline]
620    pub fn prev_kvs(&self) -> &[KeyValue] {
621        unsafe { &*(self.0.prev_kvs.as_slice() as *const _ as *const [KeyValue]) }
622    }
623
624    /// If `prev_kvs` is set in the request, take the previous key-value pairs, leaving an empty vector in its place.
625    #[inline]
626    pub fn take_prev_kvs(&mut self) -> Vec<KeyValue> {
627        let kvs = ManuallyDrop::new(std::mem::take(&mut self.0.prev_kvs));
628        unsafe { Vec::from_raw_parts(kvs.as_ptr() as *mut KeyValue, kvs.len(), kvs.capacity()) }
629    }
630
631    #[inline]
632    pub(crate) fn strip_prev_kvs_prefix(&mut self, prefix: &[u8]) {
633        for kv in self.0.prev_kvs.iter_mut() {
634            kv.key.strip_key_prefix(prefix);
635        }
636    }
637}
638
639/// Options for `Compact` operation.
640#[derive(Debug, Default, Clone)]
641#[repr(transparent)]
642pub struct CompactionOptions(PbCompactionRequest);
643
644impl CompactionOptions {
645    /// Creates a `CompactionOptions`.
646    #[inline]
647    pub const fn new() -> Self {
648        Self(PbCompactionRequest {
649            revision: 0,
650            physical: false,
651        })
652    }
653
654    /// The key-value store revision for the compaction operation.
655    #[inline]
656    const fn with_revision(mut self, revision: i64) -> Self {
657        self.0.revision = revision;
658        self
659    }
660
661    /// Physical is set so the RPC will wait until the compaction is physically
662    /// applied to the local database such that compacted entries are totally
663    /// removed from the backend database.
664    #[inline]
665    pub const fn with_physical(mut self) -> Self {
666        self.0.physical = true;
667        self
668    }
669}
670
671impl IntoRequest<PbCompactionRequest> for CompactionOptions {
672    #[inline]
673    fn into_request(self) -> Request<CompactionRequest> {
674        Request::new(self.0)
675    }
676}
677
678/// Response for `Compact` operation.
679#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
680#[derive(Debug, Clone)]
681#[repr(transparent)]
682pub struct CompactionResponse(PbCompactionResponse);
683
684impl CompactionResponse {
685    /// Create a new `CompactionResponse` from pb compaction response.
686    #[inline]
687    const fn new(resp: PbCompactionResponse) -> Self {
688        Self(resp)
689    }
690
691    /// Compact response header.
692    #[inline]
693    pub fn header(&self) -> Option<&ResponseHeader> {
694        self.0.header.as_ref().map(From::from)
695    }
696
697    /// Takes the header out of the response, leaving a [`None`] in its place.
698    #[inline]
699    pub fn take_header(&mut self) -> Option<ResponseHeader> {
700        self.0.header.take().map(ResponseHeader::new)
701    }
702}
703
704/// Transaction comparison.
705#[derive(Debug, Clone)]
706#[repr(transparent)]
707pub struct Compare(PbCompare);
708
709impl Compare {
710    /// Creates a new `Compare`.
711    #[inline]
712    fn new(
713        key: impl Into<Vec<u8>>,
714        cmp: CompareOp,
715        target: CompareTarget,
716        target_union: TargetUnion,
717    ) -> Self {
718        Self(PbCompare {
719            result: cmp as i32,
720            target: target as i32,
721            key: key.into(),
722            range_end: Vec::new(),
723            target_union: Some(target_union),
724        })
725    }
726
727    /// Compares the version of the given key.
728    #[inline]
729    pub fn version(key: impl Into<Vec<u8>>, cmp: CompareOp, version: i64) -> Self {
730        Self::new(
731            key,
732            cmp,
733            CompareTarget::Version,
734            TargetUnion::Version(version),
735        )
736    }
737
738    /// Compares the creation revision of the given key.
739    #[inline]
740    pub fn create_revision(key: impl Into<Vec<u8>>, cmp: CompareOp, revision: i64) -> Self {
741        Self::new(
742            key,
743            cmp,
744            CompareTarget::Create,
745            TargetUnion::CreateRevision(revision),
746        )
747    }
748
749    /// Compares the last modified revision of the given key.
750    #[inline]
751    pub fn mod_revision(key: impl Into<Vec<u8>>, cmp: CompareOp, revision: i64) -> Self {
752        Self::new(
753            key,
754            cmp,
755            CompareTarget::Mod,
756            TargetUnion::ModRevision(revision),
757        )
758    }
759
760    /// Compares the value of the given key.
761    #[inline]
762    pub fn value(key: impl Into<Vec<u8>>, cmp: CompareOp, value: impl Into<Vec<u8>>) -> Self {
763        Self::new(
764            key,
765            cmp,
766            CompareTarget::Value,
767            TargetUnion::Value(value.into()),
768        )
769    }
770
771    /// Compares the lease id of the given key.
772    #[inline]
773    pub fn lease(key: impl Into<Vec<u8>>, cmp: CompareOp, lease: i64) -> Self {
774        Self::new(key, cmp, CompareTarget::Lease, TargetUnion::Lease(lease))
775    }
776
777    /// Sets the comparison to scan the range [key, end).
778    #[inline]
779    pub fn with_range(mut self, end: impl Into<Vec<u8>>) -> Self {
780        self.0.range_end = end.into();
781        self
782    }
783
784    /// Sets the comparison to scan all keys prefixed by the key.
785    #[inline]
786    pub fn with_prefix(mut self) -> Self {
787        self.0.range_end = get_prefix(&self.0.key);
788        self
789    }
790}
791
792/// Transaction operation.
793#[derive(Debug, Clone)]
794#[repr(transparent)]
795pub struct TxnOp(PbTxnOp);
796
797impl TxnOp {
798    /// `Put` operation.
799    #[inline]
800    pub fn put(
801        key: impl Into<Vec<u8>>,
802        value: impl Into<Vec<u8>>,
803        options: Option<PutOptions>,
804    ) -> Self {
805        TxnOp(PbTxnOp::RequestPut(
806            options.unwrap_or_default().with_kv(key, value).into(),
807        ))
808    }
809
810    /// `Get` operation.
811    #[inline]
812    pub fn get(key: impl Into<Vec<u8>>, options: Option<GetOptions>) -> Self {
813        TxnOp(PbTxnOp::RequestRange(
814            options.unwrap_or_default().with_key(key).into(),
815        ))
816    }
817
818    /// `Delete` operation.
819    #[inline]
820    pub fn delete(key: impl Into<Vec<u8>>, options: Option<DeleteOptions>) -> Self {
821        TxnOp(PbTxnOp::RequestDeleteRange(
822            options.unwrap_or_default().with_key(key).into(),
823        ))
824    }
825
826    /// `Txn` operation.
827    #[inline]
828    pub fn txn(txn: Txn) -> Self {
829        TxnOp(PbTxnOp::RequestTxn(txn.into()))
830    }
831}
832
833impl From<TxnOp> for PbTxnOp {
834    #[inline]
835    fn from(op: TxnOp) -> Self {
836        op.0
837    }
838}
839
840/// Transaction of multiple operations.
841#[derive(Debug, Default, Clone)]
842pub struct Txn {
843    req: PbTxnRequest,
844    c_when: bool,
845    c_then: bool,
846    c_else: bool,
847}
848
849impl Txn {
850    /// Creates a new transaction.
851    #[inline]
852    pub const fn new() -> Self {
853        Self {
854            req: PbTxnRequest {
855                compare: Vec::new(),
856                success: Vec::new(),
857                failure: Vec::new(),
858            },
859            c_when: false,
860            c_then: false,
861            c_else: false,
862        }
863    }
864
865    /// Takes a list of comparison. If all comparisons passed in succeed,
866    /// the operations passed into `and_then()` will be executed. Or the operations
867    /// passed into `or_else()` will be executed.
868    #[inline]
869    pub fn when(mut self, compares: impl Into<Vec<Compare>>) -> Self {
870        assert!(!self.c_when, "cannot call when twice");
871        assert!(!self.c_then, "cannot call when after and_then");
872        assert!(!self.c_else, "cannot call when after or_else");
873
874        self.c_when = true;
875
876        let compares = ManuallyDrop::new(compares.into());
877        self.req.compare = unsafe {
878            Vec::from_raw_parts(
879                compares.as_ptr() as *mut PbCompare,
880                compares.len(),
881                compares.capacity(),
882            )
883        };
884
885        self
886    }
887
888    /// Takes a list of operations. The operations list will be executed, if the
889    /// comparisons passed in `when()` succeed.
890    #[inline]
891    pub fn and_then(mut self, operations: impl Into<Vec<TxnOp>>) -> Self {
892        assert!(!self.c_then, "cannot call and_then twice");
893        assert!(!self.c_else, "cannot call and_then after or_else");
894
895        self.c_then = true;
896        self.req.success = operations
897            .into()
898            .into_iter()
899            .map(|op| PbTxnRequestOp {
900                request: Some(op.into()),
901            })
902            .collect();
903        self
904    }
905
906    /// Takes a list of operations. The operations list will be executed, if the
907    /// comparisons passed in `when()` fail.
908    #[inline]
909    pub fn or_else(mut self, operations: impl Into<Vec<TxnOp>>) -> Self {
910        assert!(!self.c_else, "cannot call or_else twice");
911
912        self.c_else = true;
913        self.req.failure = operations
914            .into()
915            .into_iter()
916            .map(|op| PbTxnRequestOp {
917                request: Some(op.into()),
918            })
919            .collect();
920        self
921    }
922
923    #[inline]
924    pub(crate) fn prefix_with(&mut self, prefix: &[u8]) {
925        self.req.prefix_with(prefix);
926    }
927}
928
929impl PbTxnRequest {
930    fn prefix_with(&mut self, prefix: &[u8]) {
931        let prefix_op = |op: &mut PbTxnRequestOp| {
932            if let Some(request) = &mut op.request {
933                match request {
934                    PbTxnOp::RequestRange(req) => {
935                        req.key.prefix_with(prefix);
936                        req.range_end.prefix_range_end_with(prefix);
937                    }
938                    PbTxnOp::RequestPut(req) => {
939                        req.key.prefix_with(prefix);
940                    }
941                    PbTxnOp::RequestDeleteRange(req) => {
942                        req.key.prefix_with(prefix);
943                        req.range_end.prefix_range_end_with(prefix);
944                    }
945                    PbTxnOp::RequestTxn(req) => {
946                        req.prefix_with(prefix);
947                    }
948                }
949            }
950        };
951
952        self.compare.iter_mut().for_each(|cmp| {
953            cmp.key.prefix_with(prefix);
954            cmp.range_end.prefix_range_end_with(prefix);
955        });
956        self.success.iter_mut().for_each(prefix_op);
957        self.failure.iter_mut().for_each(prefix_op);
958    }
959}
960
961impl From<Txn> for PbTxnRequest {
962    #[inline]
963    fn from(txn: Txn) -> Self {
964        txn.req
965    }
966}
967
968impl IntoRequest<PbTxnRequest> for Txn {
969    #[inline]
970    fn into_request(self) -> Request<PbTxnRequest> {
971        Request::new(self.into())
972    }
973}
974
975/// Transaction operation response.
976#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
977#[derive(Debug, Clone)]
978pub enum TxnOpResponse {
979    Put(PutResponse),
980    Get(GetResponse),
981    Delete(DeleteResponse),
982    Txn(TxnResponse),
983}
984
985/// Response for `Txn` operation.
986#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
987#[derive(Debug, Clone)]
988#[repr(transparent)]
989pub struct TxnResponse(PbTxnResponse);
990
991impl TxnResponse {
992    /// Creates a new `Txn` response.
993    #[inline]
994    const fn new(resp: PbTxnResponse) -> Self {
995        Self(resp)
996    }
997
998    /// Transaction response header.
999    #[inline]
1000    pub fn header(&self) -> Option<&ResponseHeader> {
1001        self.0.header.as_ref().map(From::from)
1002    }
1003
1004    /// Takes the header out of the response, leaving a [`None`] in its place.
1005    #[inline]
1006    pub fn take_header(&mut self) -> Option<ResponseHeader> {
1007        self.0.header.take().map(ResponseHeader::new)
1008    }
1009
1010    /// Returns `true` if the compare evaluated to true or `false` otherwise.
1011    #[inline]
1012    pub const fn succeeded(&self) -> bool {
1013        self.0.succeeded
1014    }
1015
1016    /// Returns responses of transaction operations.
1017    #[inline]
1018    pub fn op_responses(&self) -> Vec<TxnOpResponse> {
1019        self.0
1020            .responses
1021            .iter()
1022            .map(|resp| match resp.response.as_ref().unwrap() {
1023                PbTxnOpResponse::ResponsePut(put) => {
1024                    TxnOpResponse::Put(PutResponse::new(put.clone()))
1025                }
1026                PbTxnOpResponse::ResponseRange(get) => {
1027                    TxnOpResponse::Get(GetResponse::new(get.clone()))
1028                }
1029                PbTxnOpResponse::ResponseDeleteRange(delete) => {
1030                    TxnOpResponse::Delete(DeleteResponse::new(delete.clone()))
1031                }
1032                PbTxnOpResponse::ResponseTxn(txn) => {
1033                    TxnOpResponse::Txn(TxnResponse::new(txn.clone()))
1034                }
1035            })
1036            .collect()
1037    }
1038
1039    #[inline]
1040    pub(crate) fn strip_key_prefix(&mut self, prefix: &[u8]) {
1041        self.0.strip_key_prefix(prefix);
1042    }
1043}
1044
1045impl PbTxnResponse {
1046    fn strip_key_prefix(&mut self, prefix: &[u8]) {
1047        self.responses.iter_mut().for_each(|op| {
1048            if let Some(resp) = &mut op.response {
1049                match resp {
1050                    PbTxnOpResponse::ResponseRange(r) => {
1051                        for kv in r.kvs.iter_mut() {
1052                            kv.key.strip_key_prefix(prefix);
1053                        }
1054                    }
1055                    PbTxnOpResponse::ResponsePut(r) => {
1056                        if let Some(kv) = r.prev_kv.as_mut() {
1057                            kv.key.strip_key_prefix(prefix);
1058                        }
1059                    }
1060                    PbTxnOpResponse::ResponseDeleteRange(r) => {
1061                        for kv in r.prev_kvs.iter_mut() {
1062                            kv.key.strip_key_prefix(prefix);
1063                        }
1064                    }
1065                    PbTxnOpResponse::ResponseTxn(r) => {
1066                        r.strip_key_prefix(prefix);
1067                    }
1068                }
1069            }
1070        });
1071    }
1072}