1use super::mode::{PoolingMode, TransactionEvent};
6use crate::connection_pool::PooledConnection;
7use std::time::Instant;
8use uuid::Uuid;
9
10pub struct ConnectionLease {
12 connection: PooledConnection,
14 mode: PoolingMode,
16 in_transaction: bool,
18 leased_at: Instant,
20 statements_executed: u64,
22 client_id: ClientId,
24 transaction_depth: u32,
26}
27
28#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
30pub struct ClientId(pub Uuid);
31
32impl ClientId {
33 pub fn new() -> Self {
35 Self(Uuid::new_v4())
36 }
37}
38
39impl Default for ClientId {
40 fn default() -> Self {
41 Self::new()
42 }
43}
44
45#[derive(Debug, Clone, Copy, PartialEq, Eq)]
47pub enum LeaseAction {
48 Hold,
50 Release,
52 Reset,
54 Close,
56}
57
58impl ConnectionLease {
59 pub fn new(connection: PooledConnection, mode: PoolingMode, client_id: ClientId) -> Self {
61 Self {
62 connection,
63 mode,
64 in_transaction: false,
65 leased_at: Instant::now(),
66 statements_executed: 0,
67 client_id,
68 transaction_depth: 0,
69 }
70 }
71
72 pub fn connection(&self) -> &PooledConnection {
74 &self.connection
75 }
76
77 pub fn connection_mut(&mut self) -> &mut PooledConnection {
79 &mut self.connection
80 }
81
82 pub fn into_connection(self) -> PooledConnection {
84 self.connection
85 }
86
87 pub fn mode(&self) -> PoolingMode {
89 self.mode
90 }
91
92 pub fn client_id(&self) -> ClientId {
94 self.client_id
95 }
96
97 pub fn in_transaction(&self) -> bool {
99 self.in_transaction
100 }
101
102 pub fn statements_executed(&self) -> u64 {
104 self.statements_executed
105 }
106
107 pub fn lease_duration(&self) -> std::time::Duration {
109 self.leased_at.elapsed()
110 }
111
112 pub fn on_statement_complete(&mut self, sql: &str) -> LeaseAction {
120 self.statements_executed += 1;
121
122 let event = TransactionEvent::detect(sql);
124
125 match event {
127 TransactionEvent::Begin => {
128 self.in_transaction = true;
129 self.transaction_depth = 1;
130 }
131 TransactionEvent::Savepoint => {
132 if self.in_transaction {
133 self.transaction_depth += 1;
134 }
135 }
136 TransactionEvent::ReleaseSavepoint | TransactionEvent::RollbackToSavepoint => {
137 if self.transaction_depth > 1 {
138 self.transaction_depth -= 1;
139 }
140 }
141 TransactionEvent::Commit | TransactionEvent::Rollback => {
142 self.in_transaction = false;
143 self.transaction_depth = 0;
144 }
145 TransactionEvent::Statement => {
146 }
148 }
149
150 self.determine_action(event)
152 }
153
154 pub fn on_transaction_end(&mut self) -> LeaseAction {
158 self.in_transaction = false;
159 self.transaction_depth = 0;
160
161 match self.mode {
162 PoolingMode::Session => LeaseAction::Hold,
163 PoolingMode::Transaction | PoolingMode::Statement => LeaseAction::Reset,
164 }
165 }
166
167 pub fn update_transaction_state(&mut self, in_transaction: bool) {
172 if !in_transaction && self.in_transaction {
173 self.in_transaction = false;
175 self.transaction_depth = 0;
176 } else if in_transaction && !self.in_transaction {
177 self.in_transaction = true;
179 self.transaction_depth = 1;
180 }
181 }
182
183 pub fn should_release(&self) -> bool {
185 match self.mode {
186 PoolingMode::Session => false,
187 PoolingMode::Transaction => !self.in_transaction,
188 PoolingMode::Statement => !self.in_transaction,
189 }
190 }
191
192 fn determine_action(&self, event: TransactionEvent) -> LeaseAction {
194 match self.mode {
195 PoolingMode::Session => {
196 LeaseAction::Hold
198 }
199 PoolingMode::Transaction => {
200 if event.is_transaction_end() && self.transaction_depth == 0 {
202 LeaseAction::Reset
203 } else {
204 LeaseAction::Hold
205 }
206 }
207 PoolingMode::Statement => {
208 if self.in_transaction {
210 LeaseAction::Hold
211 } else {
212 LeaseAction::Reset
213 }
214 }
215 }
216 }
217}
218
219impl std::fmt::Debug for ConnectionLease {
220 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
221 f.debug_struct("ConnectionLease")
222 .field("connection_id", &self.connection.id)
223 .field("mode", &self.mode)
224 .field("in_transaction", &self.in_transaction)
225 .field("statements_executed", &self.statements_executed)
226 .field("client_id", &self.client_id)
227 .field("transaction_depth", &self.transaction_depth)
228 .finish()
229 }
230}
231
232#[cfg(test)]
233mod tests {
234 use super::*;
235 use crate::connection_pool::ConnectionState;
236 use crate::NodeId;
237
238 fn create_test_connection() -> PooledConnection {
239 PooledConnection {
240 id: Uuid::new_v4(),
241 node_id: NodeId::new(),
242 created_at: chrono::Utc::now(),
243 last_used: chrono::Utc::now(),
244 state: ConnectionState::InUse,
245 use_count: 1,
246 permit: None,
247 client: None,
248 }
249 }
250
251 #[test]
252 fn test_session_mode_never_releases() {
253 let conn = create_test_connection();
254 let mut lease = ConnectionLease::new(conn, PoolingMode::Session, ClientId::new());
255
256 assert_eq!(lease.on_statement_complete("SELECT 1"), LeaseAction::Hold);
258
259 assert_eq!(lease.on_statement_complete("BEGIN"), LeaseAction::Hold);
261 assert_eq!(
262 lease.on_statement_complete("SELECT * FROM users"),
263 LeaseAction::Hold
264 );
265 assert_eq!(lease.on_statement_complete("COMMIT"), LeaseAction::Hold);
266 }
267
268 #[test]
269 fn test_transaction_mode_releases_on_commit() {
270 let conn = create_test_connection();
271 let mut lease = ConnectionLease::new(conn, PoolingMode::Transaction, ClientId::new());
272
273 assert_eq!(lease.on_statement_complete("BEGIN"), LeaseAction::Hold);
275 assert!(lease.in_transaction());
276
277 assert_eq!(
278 lease.on_statement_complete("INSERT INTO users VALUES (1)"),
279 LeaseAction::Hold
280 );
281
282 assert_eq!(lease.on_statement_complete("COMMIT"), LeaseAction::Reset);
284 assert!(!lease.in_transaction());
285 }
286
287 #[test]
288 fn test_transaction_mode_releases_on_rollback() {
289 let conn = create_test_connection();
290 let mut lease = ConnectionLease::new(conn, PoolingMode::Transaction, ClientId::new());
291
292 lease.on_statement_complete("BEGIN");
293 lease.on_statement_complete("INSERT INTO users VALUES (1)");
294
295 assert_eq!(lease.on_statement_complete("ROLLBACK"), LeaseAction::Reset);
297 assert!(!lease.in_transaction());
298 }
299
300 #[test]
301 fn test_statement_mode_releases_per_statement() {
302 let conn = create_test_connection();
303 let mut lease = ConnectionLease::new(conn, PoolingMode::Statement, ClientId::new());
304
305 assert_eq!(lease.on_statement_complete("SELECT 1"), LeaseAction::Reset);
307
308 let conn2 = create_test_connection();
310 let mut lease2 = ConnectionLease::new(conn2, PoolingMode::Statement, ClientId::new());
311 assert_eq!(lease2.on_statement_complete("BEGIN"), LeaseAction::Hold);
312 assert_eq!(
313 lease2.on_statement_complete("SELECT * FROM users"),
314 LeaseAction::Hold
315 );
316 assert_eq!(lease2.on_statement_complete("COMMIT"), LeaseAction::Reset);
317 }
318
319 #[test]
320 fn test_savepoint_depth() {
321 let conn = create_test_connection();
322 let mut lease = ConnectionLease::new(conn, PoolingMode::Transaction, ClientId::new());
323
324 lease.on_statement_complete("BEGIN");
325 assert_eq!(lease.transaction_depth, 1);
326
327 lease.on_statement_complete("SAVEPOINT sp1");
328 assert_eq!(lease.transaction_depth, 2);
329
330 lease.on_statement_complete("SAVEPOINT sp2");
331 assert_eq!(lease.transaction_depth, 3);
332
333 lease.on_statement_complete("RELEASE SAVEPOINT sp2");
334 assert_eq!(lease.transaction_depth, 2);
335
336 lease.on_statement_complete("COMMIT");
337 assert_eq!(lease.transaction_depth, 0);
338 assert!(!lease.in_transaction());
339 }
340
341 #[test]
342 fn test_should_release_session_mode() {
343 let conn = create_test_connection();
344 let lease = ConnectionLease::new(conn, PoolingMode::Session, ClientId::new());
346 assert!(!lease.should_release());
347 }
348
349 #[test]
350 fn test_should_release_transaction_mode() {
351 let conn = create_test_connection();
352 let lease = ConnectionLease::new(conn, PoolingMode::Transaction, ClientId::new());
354 assert!(lease.should_release());
355 }
356
357 #[test]
358 fn test_should_release_statement_mode() {
359 let conn = create_test_connection();
360 let lease = ConnectionLease::new(conn, PoolingMode::Statement, ClientId::new());
362 assert!(lease.should_release());
363 }
364
365 #[test]
366 fn test_statements_executed_counter() {
367 let conn = create_test_connection();
368 let mut lease = ConnectionLease::new(conn, PoolingMode::Session, ClientId::new());
369
370 assert_eq!(lease.statements_executed(), 0);
371
372 lease.on_statement_complete("SELECT 1");
373 assert_eq!(lease.statements_executed(), 1);
374
375 lease.on_statement_complete("SELECT 2");
376 assert_eq!(lease.statements_executed(), 2);
377 }
378}