1use super::mode::{PoolingMode, TransactionEvent};
6use crate::connection_pool::PooledConnection;
7use std::time::Instant;
8use uuid::Uuid;
9
10pub struct ConnectionLease {
12 connection: PooledConnection,
14 mode: PoolingMode,
16 in_transaction: bool,
18 leased_at: Instant,
20 statements_executed: u64,
22 client_id: ClientId,
24 transaction_depth: u32,
26}
27
28#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
30pub struct ClientId(pub Uuid);
31
32impl ClientId {
33 pub fn new() -> Self {
35 Self(Uuid::new_v4())
36 }
37}
38
39impl Default for ClientId {
40 fn default() -> Self {
41 Self::new()
42 }
43}
44
45#[derive(Debug, Clone, Copy, PartialEq, Eq)]
47pub enum LeaseAction {
48 Hold,
50 Release,
52 Reset,
54 Close,
56}
57
58impl ConnectionLease {
59 pub fn new(connection: PooledConnection, mode: PoolingMode, client_id: ClientId) -> Self {
61 Self {
62 connection,
63 mode,
64 in_transaction: false,
65 leased_at: Instant::now(),
66 statements_executed: 0,
67 client_id,
68 transaction_depth: 0,
69 }
70 }
71
72 pub fn connection(&self) -> &PooledConnection {
74 &self.connection
75 }
76
77 pub fn connection_mut(&mut self) -> &mut PooledConnection {
79 &mut self.connection
80 }
81
82 pub fn into_connection(self) -> PooledConnection {
84 self.connection
85 }
86
87 pub fn mode(&self) -> PoolingMode {
89 self.mode
90 }
91
92 pub fn client_id(&self) -> ClientId {
94 self.client_id
95 }
96
97 pub fn in_transaction(&self) -> bool {
99 self.in_transaction
100 }
101
102 pub fn statements_executed(&self) -> u64 {
104 self.statements_executed
105 }
106
107 pub fn lease_duration(&self) -> std::time::Duration {
109 self.leased_at.elapsed()
110 }
111
112 pub fn on_statement_complete(&mut self, sql: &str) -> LeaseAction {
120 self.statements_executed += 1;
121
122 let event = TransactionEvent::detect(sql);
124
125 match event {
127 TransactionEvent::Begin => {
128 self.in_transaction = true;
129 self.transaction_depth = 1;
130 }
131 TransactionEvent::Savepoint => {
132 if self.in_transaction {
133 self.transaction_depth += 1;
134 }
135 }
136 TransactionEvent::ReleaseSavepoint | TransactionEvent::RollbackToSavepoint => {
137 if self.transaction_depth > 1 {
138 self.transaction_depth -= 1;
139 }
140 }
141 TransactionEvent::Commit | TransactionEvent::Rollback => {
142 self.in_transaction = false;
143 self.transaction_depth = 0;
144 }
145 TransactionEvent::Statement => {
146 }
148 }
149
150 self.determine_action(event)
152 }
153
154 pub fn on_transaction_end(&mut self) -> LeaseAction {
158 self.in_transaction = false;
159 self.transaction_depth = 0;
160
161 match self.mode {
162 PoolingMode::Session => LeaseAction::Hold,
163 PoolingMode::Transaction | PoolingMode::Statement => LeaseAction::Reset,
164 }
165 }
166
167 pub fn update_transaction_state(&mut self, in_transaction: bool) {
172 if !in_transaction && self.in_transaction {
173 self.in_transaction = false;
175 self.transaction_depth = 0;
176 } else if in_transaction && !self.in_transaction {
177 self.in_transaction = true;
179 self.transaction_depth = 1;
180 }
181 }
182
183 pub fn should_release(&self) -> bool {
185 match self.mode {
186 PoolingMode::Session => false,
187 PoolingMode::Transaction => !self.in_transaction,
188 PoolingMode::Statement => !self.in_transaction,
189 }
190 }
191
192 fn determine_action(&self, event: TransactionEvent) -> LeaseAction {
194 match self.mode {
195 PoolingMode::Session => {
196 LeaseAction::Hold
198 }
199 PoolingMode::Transaction => {
200 if event.is_transaction_end() && self.transaction_depth == 0 {
202 LeaseAction::Reset
203 } else {
204 LeaseAction::Hold
205 }
206 }
207 PoolingMode::Statement => {
208 if self.in_transaction {
210 LeaseAction::Hold
211 } else {
212 LeaseAction::Reset
213 }
214 }
215 }
216 }
217}
218
219impl std::fmt::Debug for ConnectionLease {
220 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
221 f.debug_struct("ConnectionLease")
222 .field("connection_id", &self.connection.id)
223 .field("mode", &self.mode)
224 .field("in_transaction", &self.in_transaction)
225 .field("statements_executed", &self.statements_executed)
226 .field("client_id", &self.client_id)
227 .field("transaction_depth", &self.transaction_depth)
228 .finish()
229 }
230}
231
232#[cfg(test)]
233mod tests {
234 use super::*;
235 use crate::connection_pool::ConnectionState;
236 use crate::NodeId;
237
238 fn create_test_connection() -> PooledConnection {
239 PooledConnection {
240 id: Uuid::new_v4(),
241 node_id: NodeId::new(),
242 created_at: chrono::Utc::now(),
243 last_used: chrono::Utc::now(),
244 state: ConnectionState::InUse,
245 use_count: 1,
246 permit: None,
247 client: None,
248 }
249 }
250
251 #[test]
252 fn test_session_mode_never_releases() {
253 let conn = create_test_connection();
254 let mut lease = ConnectionLease::new(conn, PoolingMode::Session, ClientId::new());
255
256 assert_eq!(
258 lease.on_statement_complete("SELECT 1"),
259 LeaseAction::Hold
260 );
261
262 assert_eq!(lease.on_statement_complete("BEGIN"), LeaseAction::Hold);
264 assert_eq!(
265 lease.on_statement_complete("SELECT * FROM users"),
266 LeaseAction::Hold
267 );
268 assert_eq!(lease.on_statement_complete("COMMIT"), LeaseAction::Hold);
269 }
270
271 #[test]
272 fn test_transaction_mode_releases_on_commit() {
273 let conn = create_test_connection();
274 let mut lease = ConnectionLease::new(conn, PoolingMode::Transaction, ClientId::new());
275
276 assert_eq!(lease.on_statement_complete("BEGIN"), LeaseAction::Hold);
278 assert!(lease.in_transaction());
279
280 assert_eq!(
281 lease.on_statement_complete("INSERT INTO users VALUES (1)"),
282 LeaseAction::Hold
283 );
284
285 assert_eq!(lease.on_statement_complete("COMMIT"), LeaseAction::Reset);
287 assert!(!lease.in_transaction());
288 }
289
290 #[test]
291 fn test_transaction_mode_releases_on_rollback() {
292 let conn = create_test_connection();
293 let mut lease = ConnectionLease::new(conn, PoolingMode::Transaction, ClientId::new());
294
295 lease.on_statement_complete("BEGIN");
296 lease.on_statement_complete("INSERT INTO users VALUES (1)");
297
298 assert_eq!(lease.on_statement_complete("ROLLBACK"), LeaseAction::Reset);
300 assert!(!lease.in_transaction());
301 }
302
303 #[test]
304 fn test_statement_mode_releases_per_statement() {
305 let conn = create_test_connection();
306 let mut lease = ConnectionLease::new(conn, PoolingMode::Statement, ClientId::new());
307
308 assert_eq!(lease.on_statement_complete("SELECT 1"), LeaseAction::Reset);
310
311 let conn2 = create_test_connection();
313 let mut lease2 = ConnectionLease::new(conn2, PoolingMode::Statement, ClientId::new());
314 assert_eq!(lease2.on_statement_complete("BEGIN"), LeaseAction::Hold);
315 assert_eq!(
316 lease2.on_statement_complete("SELECT * FROM users"),
317 LeaseAction::Hold
318 );
319 assert_eq!(lease2.on_statement_complete("COMMIT"), LeaseAction::Reset);
320 }
321
322 #[test]
323 fn test_savepoint_depth() {
324 let conn = create_test_connection();
325 let mut lease = ConnectionLease::new(conn, PoolingMode::Transaction, ClientId::new());
326
327 lease.on_statement_complete("BEGIN");
328 assert_eq!(lease.transaction_depth, 1);
329
330 lease.on_statement_complete("SAVEPOINT sp1");
331 assert_eq!(lease.transaction_depth, 2);
332
333 lease.on_statement_complete("SAVEPOINT sp2");
334 assert_eq!(lease.transaction_depth, 3);
335
336 lease.on_statement_complete("RELEASE SAVEPOINT sp2");
337 assert_eq!(lease.transaction_depth, 2);
338
339 lease.on_statement_complete("COMMIT");
340 assert_eq!(lease.transaction_depth, 0);
341 assert!(!lease.in_transaction());
342 }
343
344 #[test]
345 fn test_should_release_session_mode() {
346 let conn = create_test_connection();
347 let lease = ConnectionLease::new(conn, PoolingMode::Session, ClientId::new());
349 assert!(!lease.should_release());
350 }
351
352 #[test]
353 fn test_should_release_transaction_mode() {
354 let conn = create_test_connection();
355 let lease = ConnectionLease::new(conn, PoolingMode::Transaction, ClientId::new());
357 assert!(lease.should_release());
358 }
359
360 #[test]
361 fn test_should_release_statement_mode() {
362 let conn = create_test_connection();
363 let lease = ConnectionLease::new(conn, PoolingMode::Statement, ClientId::new());
365 assert!(lease.should_release());
366 }
367
368 #[test]
369 fn test_statements_executed_counter() {
370 let conn = create_test_connection();
371 let mut lease = ConnectionLease::new(conn, PoolingMode::Session, ClientId::new());
372
373 assert_eq!(lease.statements_executed(), 0);
374
375 lease.on_statement_complete("SELECT 1");
376 assert_eq!(lease.statements_executed(), 1);
377
378 lease.on_statement_complete("SELECT 2");
379 assert_eq!(lease.statements_executed(), 2);
380 }
381}