etcd_rs/kv/
txn.rs

1use super::{
2    DeleteRequest, DeleteResponse, KeyRange, PutRequest, PutResponse, RangeRequest, RangeResponse,
3};
4use crate::proto::etcdserverpb;
5use crate::ResponseHeader;
6use etcdserverpb::compare::{CompareResult, CompareTarget, TargetUnion};
7use etcdserverpb::Compare;
8
9#[derive(Debug)]
10pub struct TxnRequest {
11    proto: etcdserverpb::TxnRequest,
12}
13
14impl TxnRequest {
15    /// Creates a new TxnRequest.
16    pub fn new() -> Self {
17        Self {
18            proto: etcdserverpb::TxnRequest {
19                compare: vec![],
20                success: vec![],
21                failure: vec![],
22            },
23        }
24    }
25
26    /// Adds a version compare.
27    pub fn when_version(mut self, key_range: KeyRange, cmp: TxnCmp, version: usize) -> Self {
28        let result: CompareResult = cmp.into();
29        self.proto.compare.push(Compare {
30            result: result as i32,
31            target: CompareTarget::Version as i32,
32            key: key_range.key,
33            range_end: key_range.range_end,
34            target_union: Some(TargetUnion::Version(version as i64)),
35        });
36        self
37    }
38
39    /// Adds a create revision compare.
40    pub fn when_create_revision(
41        mut self,
42        key_range: KeyRange,
43        cmp: TxnCmp,
44        revision: usize,
45    ) -> Self {
46        let result: CompareResult = cmp.into();
47        self.proto.compare.push(Compare {
48            result: result as i32,
49            target: CompareTarget::Create as i32,
50            key: key_range.key,
51            range_end: key_range.range_end,
52            target_union: Some(TargetUnion::CreateRevision(revision as i64)),
53        });
54        self
55    }
56
57    /// Adds a mod revision compare.
58    pub fn when_mod_revision(mut self, key_range: KeyRange, cmp: TxnCmp, revision: usize) -> Self {
59        let result: CompareResult = cmp.into();
60        self.proto.compare.push(Compare {
61            result: result as i32,
62            target: CompareTarget::Mod as i32,
63            key: key_range.key,
64            range_end: key_range.range_end,
65            target_union: Some(TargetUnion::ModRevision(revision as i64)),
66        });
67        self
68    }
69
70    /// Adds a value compare.
71    pub fn when_value<V>(mut self, key_range: KeyRange, cmp: TxnCmp, value: V) -> Self
72    where
73        V: Into<Vec<u8>>,
74    {
75        let result: CompareResult = cmp.into();
76        self.proto.compare.push(Compare {
77            result: result as i32,
78            target: CompareTarget::Value as i32,
79            key: key_range.key,
80            range_end: key_range.range_end,
81            target_union: Some(TargetUnion::Value(value.into())),
82        });
83        self
84    }
85
86    /// If compare success, then execute the specified operations.
87    pub fn and_then<O>(mut self, op: O) -> Self
88    where
89        O: Into<TxnOp>,
90    {
91        self.proto.success.push(op.into().into());
92        self
93    }
94
95    /// If compare fail, then execute the specified operations.
96    pub fn or_else<O>(mut self, op: O) -> Self
97    where
98        O: Into<TxnOp>,
99    {
100        self.proto.failure.push(op.into().into());
101        self
102    }
103}
104
105impl Default for TxnRequest {
106    fn default() -> Self {
107        Self::new()
108    }
109}
110
111impl From<TxnRequest> for crate::proto::etcdserverpb::TxnRequest {
112    fn from(x: TxnRequest) -> Self {
113        x.proto
114    }
115}
116
117/// Transaction Operation.
118pub enum TxnOp {
119    Range(RangeRequest),
120    Put(PutRequest),
121    Delete(DeleteRequest),
122    Txn(TxnRequest),
123}
124
125impl From<TxnOp> for etcdserverpb::RequestOp {
126    fn from(x: TxnOp) -> etcdserverpb::RequestOp {
127        use etcdserverpb::request_op::Request;
128
129        let req = match x {
130            TxnOp::Range(req) => Request::RequestRange(req.into()),
131            TxnOp::Put(req) => Request::RequestPut(req.into()),
132            TxnOp::Delete(req) => Request::RequestDeleteRange(req.into()),
133            TxnOp::Txn(req) => Request::RequestTxn(req.into()),
134        };
135
136        etcdserverpb::RequestOp { request: Some(req) }
137    }
138}
139
140impl From<RangeRequest> for TxnOp {
141    fn from(req: RangeRequest) -> Self {
142        Self::Range(req)
143    }
144}
145
146impl From<PutRequest> for TxnOp {
147    fn from(req: PutRequest) -> Self {
148        Self::Put(req)
149    }
150}
151
152impl From<DeleteRequest> for TxnOp {
153    fn from(req: DeleteRequest) -> Self {
154        Self::Delete(req)
155    }
156}
157
158impl From<TxnRequest> for TxnOp {
159    fn from(req: TxnRequest) -> Self {
160        Self::Txn(req)
161    }
162}
163
164/// Transaction Comparation.
165pub enum TxnCmp {
166    Equal,
167    NotEqual,
168    Greater,
169    Less,
170}
171
172impl From<TxnCmp> for CompareResult {
173    fn from(x: TxnCmp) -> CompareResult {
174        match x {
175            TxnCmp::Equal => CompareResult::Equal,
176            TxnCmp::NotEqual => CompareResult::NotEqual,
177            TxnCmp::Greater => CompareResult::Greater,
178            TxnCmp::Less => CompareResult::Less,
179        }
180    }
181}
182
183/// Response transaction operation.
184#[derive(Debug, Clone)]
185pub enum TxnOpResponse {
186    Range(RangeResponse),
187    Put(PutResponse),
188    Delete(DeleteResponse),
189    Txn(TxnResponse),
190}
191
192impl From<etcdserverpb::ResponseOp> for TxnOpResponse {
193    fn from(mut resp: etcdserverpb::ResponseOp) -> Self {
194        use etcdserverpb::response_op::Response;
195        match resp.response.take().unwrap() {
196            Response::ResponseRange(r) => Self::Range(From::from(r)),
197            Response::ResponsePut(r) => Self::Put(From::from(r)),
198            Response::ResponseTxn(r) => Self::Txn(From::from(r)),
199            Response::ResponseDeleteRange(r) => Self::Delete(From::from(r)),
200        }
201    }
202}
203
204#[derive(Debug, Clone)]
205pub struct TxnResponse {
206    pub header: ResponseHeader,
207    pub succeeded: bool,
208    pub responses: Vec<TxnOpResponse>,
209}
210
211impl From<etcdserverpb::TxnResponse> for TxnResponse {
212    fn from(proto: etcdserverpb::TxnResponse) -> Self {
213        Self {
214            header: From::from(proto.header.expect("must fetch header")),
215            succeeded: proto.succeeded,
216            responses: proto.responses.into_iter().map(From::from).collect(),
217        }
218    }
219}