1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
use google::spanner::v1::mutation::{Mutation, Mutation_Delete};
use google::spanner::v1::transaction::{TransactionOptions, TransactionOptions_ReadWrite};
use google::spanner::v1::spanner::{CommitRequest, BeginTransactionRequest, RollbackRequest};
use helpers::{pb_timestamp_to_datetime, make_write_pb};
use keyset::KeySet;
use session::Session;
use chrono::{DateTime, Utc};
use protobuf::well_known_types::Value;
use protobuf::RepeatedField;
/// Implement read-write transaction semantics for a session
pub struct Transaction<'a> {
session: &'a Session<'a>,
mutations: Vec<Mutation>,
commited: Option<Result<DateTime<Utc>, String>>,
rolled_back: bool,
transaction_id: Option<Vec<u8>>
}
impl<'a> Transaction<'a> {
/// Initializes a new Transaction owned by a given `session`.
///
/// # Arguments
///
/// * `session` - The session used to perform the commit.
///
/// # Return value
///
/// A `Transaction` owned by session.
pub fn new(session: &'a Session<'a>) -> Transaction {
Transaction {
session: session,
commited: None,
rolled_back: false,
mutations: Vec::new(),
transaction_id: None
}
}
/// Getter of transaction id.
///
/// # Return value
///
/// A `Option<&Vec<u8>>` of the transaction id.
pub fn transaction_id(&self) -> Option<&Vec<u8>> {
self.transaction_id.as_ref()
}
/// Getter of commited.
///
/// # Return value
///
/// A `Option<&Result<DateTime<Utc>, String>>`, if `None` transaction hasn't commited,
/// if `Some` and `Ok` it'll return the timestamp of commit else an error message.
pub fn commited(&self) -> Option<&Result<DateTime<Utc>, String>>{
self.commited.as_ref()
}
/// Helper for method [`commit`] et al
///
/// # Panics
///
/// If the object's state is invalid for making API requests.
///
/// [`commit`]: #method.commit
fn check_state(&self) {
if self.transaction_id.is_none() {
panic!("Transaction is not begun");
}
if self.commited.is_some() {
panic!("Transaction is already committed");
}
if self.rolled_back {
panic!("Transaction is already rolled back");
}
}
/// Begin a transaction on the database
///
/// # Return value
///
/// The ID for the newly-begun transaction.
///
/// # Panics
///
/// If the transaction is already begun, committed, or rolled back.
pub fn begin(mut self) -> Transaction<'a> {
if self.transaction_id.is_some() {
panic!("Transaction already begun");
}
if self.commited.is_some() {
panic!("Transaction is already committed");
}
if self.rolled_back {
panic!("Transaction is already rolled back");
}
let api = self.session.database().spanner_api();
let mut req = BeginTransactionRequest::new();
req.set_session(self.session.name());
let mut opts = TransactionOptions::new();
opts.set_read_write(TransactionOptions_ReadWrite::new());
req.set_options(opts);
let res = api.begin_transaction(&req).unwrap();
self.transaction_id = Some(res.get_id().to_vec());
self
}
/// Roll back a transaction on the database
pub fn rollback(mut self) {
self.check_state();
{
let api = self.session.database().spanner_api();
let mut req = RollbackRequest::new();
req.set_session(self.session.name());
match self.transaction_id {
Some(ref id) => { req.set_transaction_id(id.to_vec()); },
None => { }
};
api.rollback(&req).unwrap();
}
self.rolled_back = true;
}
/// Insert one or more new table rows.
///
/// # Arguments
///
/// * `table` - Name of table to be modified.
///
/// * `columns` - Name of table columns to be modified.
///
/// * `values` - Values to be modified.
pub fn insert(&mut self, table: String, columns: Vec<String>, values: Vec<Vec<Value>>) {
let mut m = Mutation::new();
m.set_insert(make_write_pb(table, columns, values));
self.mutations.push(m);
}
/// Update one or more existing table rows.
///
/// # Arguments
///
/// * `table` - Name of table to be modified.
///
/// * `columns` - Name of table columns to be modified.
///
/// * `values` - Values to be modified.
pub fn update(&mut self, table: String, columns: Vec<String>, values: Vec<Vec<Value>>) {
let mut m = Mutation::new();
m.set_update(make_write_pb(table, columns, values));
self.mutations.push(m);
}
/// Insert/update one or more table rows.
///
/// # Arguments
///
/// * `table` - Name of table to be modified.
///
/// * `columns` - Name of table columns to be modified.
///
/// * `values` - Values to be modified.
pub fn upsert(&mut self, table: String, columns: Vec<String>, values: Vec<Vec<Value>>) {
let mut m = Mutation::new();
m.set_insert_or_update(make_write_pb(table, columns, values));
self.mutations.push(m);
}
/// Replace one or more table rows.
///
/// # Arguments
///
/// * `table` - Name of table to be modified.
///
/// * `columns` - Name of table columns to be modified.
///
/// * `values` - Values to be modified.
pub fn replace(&mut self, table: String, columns: Vec<String>, values: Vec<Vec<Value>>) {
let mut m = Mutation::new();
m.set_replace(make_write_pb(table, columns, values));
self.mutations.push(m);;
}
/// Delete one or more table rows.
///
/// # Arguments
///
/// * `table` - Name of table to be modified.
///
/// * `keyset` - Keys/ranges identifying rows to delete.
pub fn delete(&mut self, table: String, keyset: KeySet) {
let mut delete = Mutation_Delete::new();
delete.set_table(table);
delete.set_key_set(keyset.to_pb());
let mut m = Mutation::new();
m.set_delete(delete);
self.mutations.push(m);
}
/// Commit mutations to the database.
///
/// # Panics
///
/// If there are no mutation to commit.
pub fn commit(&mut self) {
self.check_state();
if self.mutations.is_empty() {
panic!("No mutation to commit");
}
let api = self.session.database().spanner_api();
let mut req = CommitRequest::new();
req.set_session(self.session.name());
req.set_mutations(RepeatedField::from_vec(self.mutations.clone()));
match self.transaction_id {
Some(ref id) => { req.set_transaction_id(id.to_vec()); },
None => { }
}
let mut response = api.commit(&req).unwrap();
if response.has_commit_timestamp() {
self.commited = Some(Ok(pb_timestamp_to_datetime(response.take_commit_timestamp())));
} else {
self.commited = Some(Err("Couldn't commit transaction.".to_string()));
}
}
}