firebird_wire/
transaction.rs1use crate::connection::Connection;
12use crate::error::Result;
13use crate::wire::consts::*;
14use crate::wire::response::read_response;
15use crate::wire::stream::op_packet;
16use crate::wire::xdr::ParameterBuffer;
17
18#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
20pub enum IsolationLevel {
21 #[default]
23 Snapshot,
24 SnapshotTableStability,
26 ReadCommittedRecordVersion,
28 ReadCommittedNoRecordVersion,
30 ReadCommittedReadConsistency,
32}
33
34#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
36pub enum AccessMode {
37 #[default]
39 ReadWrite,
40 ReadOnly,
42}
43
44#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
46pub enum LockResolution {
47 #[default]
49 Wait,
50 NoWait,
52}
53
54#[derive(Debug, Clone, Default)]
56pub struct TransactionBuilder {
57 pub isolation: IsolationLevel,
59 pub access: AccessMode,
61 pub lock_resolution: LockResolution,
63 pub lock_timeout: Option<i32>,
65 pub no_auto_undo: bool,
67 pub auto_commit: bool,
69}
70
71impl TransactionBuilder {
72 pub fn new() -> Self {
74 Self::default()
75 }
76 pub fn isolation(mut self, level: IsolationLevel) -> Self {
78 self.isolation = level;
79 self
80 }
81 pub fn read_only(mut self) -> Self {
83 self.access = AccessMode::ReadOnly;
84 self
85 }
86 pub fn read_write(mut self) -> Self {
88 self.access = AccessMode::ReadWrite;
89 self
90 }
91 pub fn no_wait(mut self) -> Self {
93 self.lock_resolution = LockResolution::NoWait;
94 self
95 }
96 pub fn lock_timeout(mut self, seconds: i32) -> Self {
98 self.lock_resolution = LockResolution::Wait;
99 self.lock_timeout = Some(seconds);
100 self
101 }
102
103 pub fn build(&self) -> Vec<u8> {
105 let mut pb = ParameterBuffer::new(TPB_VERSION3);
106
107 match self.access {
108 AccessMode::ReadWrite => pb.tag(tpb::WRITE),
109 AccessMode::ReadOnly => pb.tag(tpb::READ),
110 };
111
112 match self.isolation {
113 IsolationLevel::Snapshot => {
114 pb.tag(tpb::CONCURRENCY);
115 }
116 IsolationLevel::SnapshotTableStability => {
117 pb.tag(tpb::CONSISTENCY);
118 }
119 IsolationLevel::ReadCommittedRecordVersion => {
120 pb.tag(tpb::READ_COMMITTED);
121 pb.tag(tpb::REC_VERSION);
122 }
123 IsolationLevel::ReadCommittedNoRecordVersion => {
124 pb.tag(tpb::READ_COMMITTED);
125 pb.tag(tpb::NO_REC_VERSION);
126 }
127 IsolationLevel::ReadCommittedReadConsistency => {
128 pb.tag(tpb::READ_COMMITTED);
129 pb.tag(tpb::READ_CONSISTENCY);
130 }
131 }
132
133 match self.lock_resolution {
134 LockResolution::Wait => pb.tag(tpb::WAIT),
135 LockResolution::NoWait => pb.tag(tpb::NOWAIT),
136 };
137
138 if let Some(t) = self.lock_timeout {
139 pb.int(tpb::LOCK_TIMEOUT, t);
140 }
141 if self.no_auto_undo {
142 pb.tag(tpb::NO_AUTO_UNDO);
143 }
144 if self.auto_commit {
145 pb.tag(tpb::AUTOCOMMIT);
146 }
147
148 pb.into_vec()
149 }
150}
151
152#[derive(Debug)]
154pub struct Transaction {
155 handle: i32,
156 finished: bool,
157}
158
159impl Transaction {
160 pub(crate) fn new(handle: i32) -> Self {
161 Transaction {
162 handle,
163 finished: false,
164 }
165 }
166
167 pub fn handle(&self) -> i32 {
169 self.handle
170 }
171
172 pub fn commit(mut self, conn: &mut Connection) -> Result<()> {
174 self.finish(conn, op::COMMIT)
175 }
176
177 pub fn rollback(mut self, conn: &mut Connection) -> Result<()> {
179 self.finish(conn, op::ROLLBACK)
180 }
181
182 pub fn commit_retaining(&self, conn: &mut Connection) -> Result<()> {
184 self.retain(conn, op::COMMIT_RETAINING)
185 }
186
187 pub fn rollback_retaining(&self, conn: &mut Connection) -> Result<()> {
189 self.retain(conn, op::ROLLBACK_RETAINING)
190 }
191
192 fn finish(&mut self, conn: &mut Connection, opcode: i32) -> Result<()> {
193 let mut w = op_packet(opcode);
194 w.put_i32(self.handle);
195 conn.io().send(&w)?;
196 read_response(conn.io())?;
197 self.finished = true;
198 Ok(())
199 }
200
201 fn retain(&self, conn: &mut Connection, opcode: i32) -> Result<()> {
202 let mut w = op_packet(opcode);
203 w.put_i32(self.handle);
204 conn.io().send(&w)?;
205 read_response(conn.io())?;
206 Ok(())
207 }
208}
209
210impl Drop for Transaction {
211 fn drop(&mut self) {
212 if !self.finished {
213 crate::warn_unclosed("Transaction", self.handle);
214 }
215 }
216}
217
218impl Connection {
219 pub fn begin(&mut self) -> Result<Transaction> {
221 self.begin_with(&TransactionBuilder::default())
222 }
223
224 pub fn begin_with(&mut self, builder: &TransactionBuilder) -> Result<Transaction> {
226 let tpb = builder.build();
227 let mut w = op_packet(op::TRANSACTION);
228 w.put_i32(self.db_handle());
229 w.put_bytes(&tpb);
230 self.io().send(&w)?;
231 let resp = read_response(self.io())?;
232 Ok(Transaction::new(resp.handle))
233 }
234}
235
236#[cfg(test)]
237mod tests {
238 use super::*;
239
240 #[test]
241 fn default_tpb_is_write_concurrency_wait() {
242 let tpb = TransactionBuilder::default().build();
243 assert_eq!(
244 tpb,
245 vec![TPB_VERSION3, tpb::WRITE, tpb::CONCURRENCY, tpb::WAIT]
246 );
247 }
248
249 #[test]
250 fn read_committed_read_consistency_tpb() {
251 let tpb = TransactionBuilder::new()
252 .isolation(IsolationLevel::ReadCommittedReadConsistency)
253 .read_only()
254 .build();
255 assert_eq!(
256 tpb,
257 vec![
258 TPB_VERSION3,
259 tpb::READ,
260 tpb::READ_COMMITTED,
261 tpb::READ_CONSISTENCY,
262 tpb::WAIT
263 ]
264 );
265 }
266
267 #[test]
268 fn lock_timeout_tpb() {
269 let tpb = TransactionBuilder::new().lock_timeout(10).build();
270 assert!(tpb.windows(3).any(|w| w == [tpb::LOCK_TIMEOUT, 1, 10]));
272 }
273}