Skip to main content

heliosdb_proxy/pool/
lease.rs

1//! Connection Lease Management
2//!
3//! Tracks connection state and determines when to release connections based on pooling mode.
4
5use super::mode::{PoolingMode, TransactionEvent};
6use crate::connection_pool::PooledConnection;
7use std::time::Instant;
8use uuid::Uuid;
9
10/// A leased connection with mode-aware lifecycle management
11pub struct ConnectionLease {
12    /// The underlying pooled connection
13    connection: PooledConnection,
14    /// Pooling mode for this lease
15    mode: PoolingMode,
16    /// Whether currently in a transaction
17    in_transaction: bool,
18    /// When the lease was acquired
19    leased_at: Instant,
20    /// Number of statements executed on this lease
21    statements_executed: u64,
22    /// Client identifier
23    client_id: ClientId,
24    /// Current transaction nesting level (for savepoints)
25    transaction_depth: u32,
26}
27
28/// Client identifier for tracking leases
29#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
30pub struct ClientId(pub Uuid);
31
32impl ClientId {
33    /// Create a new random client ID
34    pub fn new() -> Self {
35        Self(Uuid::new_v4())
36    }
37}
38
39impl Default for ClientId {
40    fn default() -> Self {
41        Self::new()
42    }
43}
44
45/// Action to take after processing a statement
46#[derive(Debug, Clone, Copy, PartialEq, Eq)]
47pub enum LeaseAction {
48    /// Keep the connection leased
49    Hold,
50    /// Return connection to pool (no reset needed)
51    Release,
52    /// Reset connection state then return to pool
53    Reset,
54    /// Connection is invalid, close it
55    Close,
56}
57
58impl ConnectionLease {
59    /// Create a new connection lease
60    pub fn new(connection: PooledConnection, mode: PoolingMode, client_id: ClientId) -> Self {
61        Self {
62            connection,
63            mode,
64            in_transaction: false,
65            leased_at: Instant::now(),
66            statements_executed: 0,
67            client_id,
68            transaction_depth: 0,
69        }
70    }
71
72    /// Get the underlying connection (immutable)
73    pub fn connection(&self) -> &PooledConnection {
74        &self.connection
75    }
76
77    /// Get the underlying connection (mutable)
78    pub fn connection_mut(&mut self) -> &mut PooledConnection {
79        &mut self.connection
80    }
81
82    /// Take ownership of the underlying connection
83    pub fn into_connection(self) -> PooledConnection {
84        self.connection
85    }
86
87    /// Get the pooling mode
88    pub fn mode(&self) -> PoolingMode {
89        self.mode
90    }
91
92    /// Get the client ID
93    pub fn client_id(&self) -> ClientId {
94        self.client_id
95    }
96
97    /// Check if currently in a transaction
98    pub fn in_transaction(&self) -> bool {
99        self.in_transaction
100    }
101
102    /// Get the number of statements executed
103    pub fn statements_executed(&self) -> u64 {
104        self.statements_executed
105    }
106
107    /// Get how long this lease has been held
108    pub fn lease_duration(&self) -> std::time::Duration {
109        self.leased_at.elapsed()
110    }
111
112    /// Process a statement and determine if connection should be released
113    ///
114    /// # Arguments
115    /// * `sql` - The SQL statement that was executed
116    ///
117    /// # Returns
118    /// The action to take with this connection
119    pub fn on_statement_complete(&mut self, sql: &str) -> LeaseAction {
120        self.statements_executed += 1;
121
122        // Detect transaction boundaries
123        let event = TransactionEvent::detect(sql);
124
125        // Update transaction state
126        match event {
127            TransactionEvent::Begin => {
128                self.in_transaction = true;
129                self.transaction_depth = 1;
130            }
131            TransactionEvent::Savepoint => {
132                if self.in_transaction {
133                    self.transaction_depth += 1;
134                }
135            }
136            TransactionEvent::ReleaseSavepoint | TransactionEvent::RollbackToSavepoint => {
137                if self.transaction_depth > 1 {
138                    self.transaction_depth -= 1;
139                }
140            }
141            TransactionEvent::Commit | TransactionEvent::Rollback => {
142                self.in_transaction = false;
143                self.transaction_depth = 0;
144            }
145            TransactionEvent::Statement => {
146                // No transaction state change
147            }
148        }
149
150        // Determine action based on mode
151        self.determine_action(event)
152    }
153
154    /// Called when transaction ends (from backend ReadyForQuery status)
155    ///
156    /// This is a more reliable way to detect transaction end than parsing SQL.
157    pub fn on_transaction_end(&mut self) -> LeaseAction {
158        self.in_transaction = false;
159        self.transaction_depth = 0;
160
161        match self.mode {
162            PoolingMode::Session => LeaseAction::Hold,
163            PoolingMode::Transaction | PoolingMode::Statement => LeaseAction::Reset,
164        }
165    }
166
167    /// Update transaction state from backend ReadyForQuery status
168    ///
169    /// # Arguments
170    /// * `in_transaction` - Whether backend reports being in a transaction
171    pub fn update_transaction_state(&mut self, in_transaction: bool) {
172        if !in_transaction && self.in_transaction {
173            // Transaction ended
174            self.in_transaction = false;
175            self.transaction_depth = 0;
176        } else if in_transaction && !self.in_transaction {
177            // Transaction started (implicit)
178            self.in_transaction = true;
179            self.transaction_depth = 1;
180        }
181    }
182
183    /// Check if connection should be released based on current state
184    pub fn should_release(&self) -> bool {
185        match self.mode {
186            PoolingMode::Session => false,
187            PoolingMode::Transaction => !self.in_transaction,
188            PoolingMode::Statement => !self.in_transaction,
189        }
190    }
191
192    /// Determine the lease action based on mode and transaction event
193    fn determine_action(&self, event: TransactionEvent) -> LeaseAction {
194        match self.mode {
195            PoolingMode::Session => {
196                // Session mode never releases until client disconnects
197                LeaseAction::Hold
198            }
199            PoolingMode::Transaction => {
200                // Transaction mode releases after transaction ends
201                if event.is_transaction_end() && self.transaction_depth == 0 {
202                    LeaseAction::Reset
203                } else {
204                    LeaseAction::Hold
205                }
206            }
207            PoolingMode::Statement => {
208                // Statement mode releases after every statement outside transaction
209                if self.in_transaction {
210                    LeaseAction::Hold
211                } else {
212                    LeaseAction::Reset
213                }
214            }
215        }
216    }
217}
218
219impl std::fmt::Debug for ConnectionLease {
220    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
221        f.debug_struct("ConnectionLease")
222            .field("connection_id", &self.connection.id)
223            .field("mode", &self.mode)
224            .field("in_transaction", &self.in_transaction)
225            .field("statements_executed", &self.statements_executed)
226            .field("client_id", &self.client_id)
227            .field("transaction_depth", &self.transaction_depth)
228            .finish()
229    }
230}
231
232#[cfg(test)]
233mod tests {
234    use super::*;
235    use crate::connection_pool::ConnectionState;
236    use crate::NodeId;
237
238    fn create_test_connection() -> PooledConnection {
239        PooledConnection {
240            id: Uuid::new_v4(),
241            node_id: NodeId::new(),
242            created_at: chrono::Utc::now(),
243            last_used: chrono::Utc::now(),
244            state: ConnectionState::InUse,
245            use_count: 1,
246            permit: None,
247            client: None,
248        }
249    }
250
251    #[test]
252    fn test_session_mode_never_releases() {
253        let conn = create_test_connection();
254        let mut lease = ConnectionLease::new(conn, PoolingMode::Session, ClientId::new());
255
256        // Statement outside transaction
257        assert_eq!(
258            lease.on_statement_complete("SELECT 1"),
259            LeaseAction::Hold
260        );
261
262        // Transaction
263        assert_eq!(lease.on_statement_complete("BEGIN"), LeaseAction::Hold);
264        assert_eq!(
265            lease.on_statement_complete("SELECT * FROM users"),
266            LeaseAction::Hold
267        );
268        assert_eq!(lease.on_statement_complete("COMMIT"), LeaseAction::Hold);
269    }
270
271    #[test]
272    fn test_transaction_mode_releases_on_commit() {
273        let conn = create_test_connection();
274        let mut lease = ConnectionLease::new(conn, PoolingMode::Transaction, ClientId::new());
275
276        // Transaction
277        assert_eq!(lease.on_statement_complete("BEGIN"), LeaseAction::Hold);
278        assert!(lease.in_transaction());
279
280        assert_eq!(
281            lease.on_statement_complete("INSERT INTO users VALUES (1)"),
282            LeaseAction::Hold
283        );
284
285        // COMMIT should release
286        assert_eq!(lease.on_statement_complete("COMMIT"), LeaseAction::Reset);
287        assert!(!lease.in_transaction());
288    }
289
290    #[test]
291    fn test_transaction_mode_releases_on_rollback() {
292        let conn = create_test_connection();
293        let mut lease = ConnectionLease::new(conn, PoolingMode::Transaction, ClientId::new());
294
295        lease.on_statement_complete("BEGIN");
296        lease.on_statement_complete("INSERT INTO users VALUES (1)");
297
298        // ROLLBACK should release
299        assert_eq!(lease.on_statement_complete("ROLLBACK"), LeaseAction::Reset);
300        assert!(!lease.in_transaction());
301    }
302
303    #[test]
304    fn test_statement_mode_releases_per_statement() {
305        let conn = create_test_connection();
306        let mut lease = ConnectionLease::new(conn, PoolingMode::Statement, ClientId::new());
307
308        // Statement outside transaction should release
309        assert_eq!(lease.on_statement_complete("SELECT 1"), LeaseAction::Reset);
310
311        // But inside transaction, hold
312        let conn2 = create_test_connection();
313        let mut lease2 = ConnectionLease::new(conn2, PoolingMode::Statement, ClientId::new());
314        assert_eq!(lease2.on_statement_complete("BEGIN"), LeaseAction::Hold);
315        assert_eq!(
316            lease2.on_statement_complete("SELECT * FROM users"),
317            LeaseAction::Hold
318        );
319        assert_eq!(lease2.on_statement_complete("COMMIT"), LeaseAction::Reset);
320    }
321
322    #[test]
323    fn test_savepoint_depth() {
324        let conn = create_test_connection();
325        let mut lease = ConnectionLease::new(conn, PoolingMode::Transaction, ClientId::new());
326
327        lease.on_statement_complete("BEGIN");
328        assert_eq!(lease.transaction_depth, 1);
329
330        lease.on_statement_complete("SAVEPOINT sp1");
331        assert_eq!(lease.transaction_depth, 2);
332
333        lease.on_statement_complete("SAVEPOINT sp2");
334        assert_eq!(lease.transaction_depth, 3);
335
336        lease.on_statement_complete("RELEASE SAVEPOINT sp2");
337        assert_eq!(lease.transaction_depth, 2);
338
339        lease.on_statement_complete("COMMIT");
340        assert_eq!(lease.transaction_depth, 0);
341        assert!(!lease.in_transaction());
342    }
343
344    #[test]
345    fn test_should_release_session_mode() {
346        let conn = create_test_connection();
347        // Session mode never releases
348        let lease = ConnectionLease::new(conn, PoolingMode::Session, ClientId::new());
349        assert!(!lease.should_release());
350    }
351
352    #[test]
353    fn test_should_release_transaction_mode() {
354        let conn = create_test_connection();
355        // Transaction mode releases when not in transaction
356        let lease = ConnectionLease::new(conn, PoolingMode::Transaction, ClientId::new());
357        assert!(lease.should_release());
358    }
359
360    #[test]
361    fn test_should_release_statement_mode() {
362        let conn = create_test_connection();
363        // Statement mode releases when not in transaction
364        let lease = ConnectionLease::new(conn, PoolingMode::Statement, ClientId::new());
365        assert!(lease.should_release());
366    }
367
368    #[test]
369    fn test_statements_executed_counter() {
370        let conn = create_test_connection();
371        let mut lease = ConnectionLease::new(conn, PoolingMode::Session, ClientId::new());
372
373        assert_eq!(lease.statements_executed(), 0);
374
375        lease.on_statement_complete("SELECT 1");
376        assert_eq!(lease.statements_executed(), 1);
377
378        lease.on_statement_complete("SELECT 2");
379        assert_eq!(lease.statements_executed(), 2);
380    }
381}