Skip to main content

heliosdb_proxy/pool/
transaction.rs

1//! Transaction Mode Handler
2//!
3//! Implements transaction pooling mode where connections are returned
4//! to the pool after each transaction completes.
5
6use super::lease::{ClientId, ConnectionLease, LeaseAction};
7use super::mode::{PoolingMode, TransactionEvent};
8use super::prepared::PreparedStatementTracker;
9use crate::connection_pool::PooledConnection;
10
11/// Transaction mode handler
12///
13/// In transaction mode, connections are held for the duration of a transaction
14/// and returned to the pool after COMMIT or ROLLBACK.
15///
16/// Benefits:
17/// - Good balance between connection sharing and compatibility
18/// - Works with most PostgreSQL features within a transaction
19/// - Supports prepared statements (with tracking/recreation)
20///
21/// Limitations:
22/// - LISTEN/NOTIFY listeners are lost between transactions
23/// - Session-level settings may need to be re-applied
24/// - Temp tables persist but may not be on same connection
25pub struct TransactionModeHandler {
26    /// Whether to track and recreate prepared statements
27    track_prepared_statements: bool,
28    /// Prepared statement tracker
29    prepared_tracker: Option<PreparedStatementTracker>,
30}
31
32impl Default for TransactionModeHandler {
33    fn default() -> Self {
34        Self::new()
35    }
36}
37
38impl TransactionModeHandler {
39    /// Create a new transaction mode handler
40    pub fn new() -> Self {
41        Self {
42            track_prepared_statements: false,
43            prepared_tracker: None,
44        }
45    }
46
47    /// Create with prepared statement tracking enabled
48    pub fn with_prepared_tracking() -> Self {
49        Self {
50            track_prepared_statements: true,
51            prepared_tracker: Some(PreparedStatementTracker::new()),
52        }
53    }
54
55    /// Create a lease for this mode
56    pub fn create_lease(&self, connection: PooledConnection, client_id: ClientId) -> ConnectionLease {
57        ConnectionLease::new(connection, PoolingMode::Transaction, client_id)
58    }
59
60    /// Process a statement and determine action
61    ///
62    /// Returns the connection on COMMIT/ROLLBACK outside of savepoints.
63    pub fn on_statement_complete(&mut self, lease: &mut ConnectionLease, sql: &str) -> LeaseAction {
64        let _event = TransactionEvent::detect(sql);
65
66        // Track prepared statements if enabled
67        if self.track_prepared_statements {
68            self.track_prepared_statement(sql);
69        }
70
71        // Let the lease handle transaction state tracking
72        lease.on_statement_complete(sql)
73    }
74
75    /// Process transaction end signal from backend
76    pub fn on_transaction_end(&self, lease: &mut ConnectionLease) -> LeaseAction {
77        lease.on_transaction_end()
78    }
79
80    /// Check if connection should be released
81    pub fn should_release(&self, lease: &ConnectionLease) -> bool {
82        !lease.in_transaction()
83    }
84
85    /// Called when client disconnects
86    pub fn on_client_disconnect(&self, _lease: ConnectionLease) -> LeaseAction {
87        LeaseAction::Reset
88    }
89
90    /// Get the pooling mode
91    pub fn mode(&self) -> PoolingMode {
92        PoolingMode::Transaction
93    }
94
95    /// Check if prepared statement tracking is enabled
96    pub fn tracks_prepared_statements(&self) -> bool {
97        self.track_prepared_statements
98    }
99
100    /// Get prepared statement tracker
101    pub fn prepared_tracker(&self) -> Option<&PreparedStatementTracker> {
102        self.prepared_tracker.as_ref()
103    }
104
105    /// Get mutable prepared statement tracker
106    pub fn prepared_tracker_mut(&mut self) -> Option<&mut PreparedStatementTracker> {
107        self.prepared_tracker.as_mut()
108    }
109
110    /// Track a prepared statement from SQL
111    fn track_prepared_statement(&mut self, sql: &str) {
112        let upper = sql.trim().to_uppercase();
113
114        if let Some(tracker) = &mut self.prepared_tracker {
115            if upper.starts_with("PREPARE ") {
116                if let Some((name, _types, query)) = super::prepared::parse_prepare_statement(sql) {
117                    tracker.register(name, query, vec![]);
118                }
119            } else if upper.starts_with("DEALLOCATE ") {
120                if let Some(name_opt) = super::prepared::parse_deallocate_statement(sql) {
121                    match name_opt {
122                        Some(name) => {
123                            tracker.unregister(&name);
124                        }
125                        None => {
126                            tracker.clear();
127                        }
128                    }
129                }
130            } else if upper.starts_with("EXECUTE ") {
131                // Track execution count
132                let parts: Vec<&str> = sql.split_whitespace().collect();
133                if parts.len() >= 2 {
134                    let name = parts[1].trim_end_matches(|c| c == '(' || c == ';');
135                    tracker.record_execution(name);
136                }
137            }
138        }
139    }
140
141    /// Get SQL to recreate prepared statements on a new connection
142    pub fn get_prepared_recreation_sql(&self) -> Vec<String> {
143        self.prepared_tracker
144            .as_ref()
145            .map(|t| t.generate_prepare_sql())
146            .unwrap_or_default()
147    }
148}
149
150#[cfg(test)]
151mod tests {
152    use super::*;
153    use crate::connection_pool::ConnectionState;
154    use crate::NodeId;
155    use uuid::Uuid;
156
157    fn create_test_connection() -> PooledConnection {
158        PooledConnection {
159            id: Uuid::new_v4(),
160            node_id: NodeId::new(),
161            created_at: chrono::Utc::now(),
162            last_used: chrono::Utc::now(),
163            state: ConnectionState::InUse,
164            use_count: 1,
165            permit: None,
166            client: None,
167        }
168    }
169
170    #[test]
171    fn test_transaction_mode_holds_during_transaction() {
172        let mut handler = TransactionModeHandler::new();
173        let conn = create_test_connection();
174        let mut lease = handler.create_lease(conn, ClientId::new());
175
176        // BEGIN should hold
177        assert_eq!(
178            handler.on_statement_complete(&mut lease, "BEGIN"),
179            LeaseAction::Hold
180        );
181        assert!(lease.in_transaction());
182
183        // Statements in transaction should hold
184        assert_eq!(
185            handler.on_statement_complete(&mut lease, "INSERT INTO users VALUES (1)"),
186            LeaseAction::Hold
187        );
188    }
189
190    #[test]
191    fn test_transaction_mode_releases_on_commit() {
192        let mut handler = TransactionModeHandler::new();
193        let conn = create_test_connection();
194        let mut lease = handler.create_lease(conn, ClientId::new());
195
196        handler.on_statement_complete(&mut lease, "BEGIN");
197        handler.on_statement_complete(&mut lease, "SELECT 1");
198
199        // COMMIT should reset/release
200        assert_eq!(
201            handler.on_statement_complete(&mut lease, "COMMIT"),
202            LeaseAction::Reset
203        );
204        assert!(!lease.in_transaction());
205    }
206
207    #[test]
208    fn test_transaction_mode_releases_on_rollback() {
209        let mut handler = TransactionModeHandler::new();
210        let conn = create_test_connection();
211        let mut lease = handler.create_lease(conn, ClientId::new());
212
213        handler.on_statement_complete(&mut lease, "BEGIN");
214        handler.on_statement_complete(&mut lease, "SELECT 1");
215
216        // ROLLBACK should reset/release
217        assert_eq!(
218            handler.on_statement_complete(&mut lease, "ROLLBACK"),
219            LeaseAction::Reset
220        );
221        assert!(!lease.in_transaction());
222    }
223
224    #[test]
225    fn test_transaction_mode_savepoint_handling() {
226        let mut handler = TransactionModeHandler::new();
227        let conn = create_test_connection();
228        let mut lease = handler.create_lease(conn, ClientId::new());
229
230        handler.on_statement_complete(&mut lease, "BEGIN");
231        handler.on_statement_complete(&mut lease, "SAVEPOINT sp1");
232
233        // ROLLBACK TO SAVEPOINT should not release
234        assert_eq!(
235            handler.on_statement_complete(&mut lease, "ROLLBACK TO SAVEPOINT sp1"),
236            LeaseAction::Hold
237        );
238        assert!(lease.in_transaction());
239
240        // Final COMMIT should release
241        assert_eq!(
242            handler.on_statement_complete(&mut lease, "COMMIT"),
243            LeaseAction::Reset
244        );
245    }
246
247    #[test]
248    fn test_should_release() {
249        let handler = TransactionModeHandler::new();
250        let conn = create_test_connection();
251        let lease = handler.create_lease(conn, ClientId::new());
252
253        // Not in transaction, should release
254        assert!(handler.should_release(&lease));
255    }
256
257    #[test]
258    fn test_prepared_tracking() {
259        let mut handler = TransactionModeHandler::with_prepared_tracking();
260        let conn = create_test_connection();
261        let mut lease = handler.create_lease(conn, ClientId::new());
262
263        handler.on_statement_complete(
264            &mut lease,
265            "PREPARE get_user AS SELECT * FROM users WHERE id = $1",
266        );
267
268        let tracker = handler.prepared_tracker().unwrap();
269        assert!(tracker.contains("get_user"));
270
271        handler.on_statement_complete(&mut lease, "DEALLOCATE get_user");
272        let tracker = handler.prepared_tracker().unwrap();
273        assert!(!tracker.contains("get_user"));
274    }
275
276    #[test]
277    fn test_mode() {
278        let handler = TransactionModeHandler::new();
279        assert_eq!(handler.mode(), PoolingMode::Transaction);
280    }
281}