Skip to main content

heliosdb_proxy/pool/
transaction.rs

1//! Transaction Mode Handler
2//!
3//! Implements transaction pooling mode where connections are returned
4//! to the pool after each transaction completes.
5
6use super::lease::{ClientId, ConnectionLease, LeaseAction};
7use super::mode::{PoolingMode, TransactionEvent};
8use super::prepared::PreparedStatementTracker;
9use crate::connection_pool::PooledConnection;
10
11/// Transaction mode handler
12///
13/// In transaction mode, connections are held for the duration of a transaction
14/// and returned to the pool after COMMIT or ROLLBACK.
15///
16/// Benefits:
17/// - Good balance between connection sharing and compatibility
18/// - Works with most PostgreSQL features within a transaction
19/// - Supports prepared statements (with tracking/recreation)
20///
21/// Limitations:
22/// - LISTEN/NOTIFY listeners are lost between transactions
23/// - Session-level settings may need to be re-applied
24/// - Temp tables persist but may not be on same connection
25pub struct TransactionModeHandler {
26    /// Whether to track and recreate prepared statements
27    track_prepared_statements: bool,
28    /// Prepared statement tracker
29    prepared_tracker: Option<PreparedStatementTracker>,
30}
31
32impl Default for TransactionModeHandler {
33    fn default() -> Self {
34        Self::new()
35    }
36}
37
38impl TransactionModeHandler {
39    /// Create a new transaction mode handler
40    pub fn new() -> Self {
41        Self {
42            track_prepared_statements: false,
43            prepared_tracker: None,
44        }
45    }
46
47    /// Create with prepared statement tracking enabled
48    pub fn with_prepared_tracking() -> Self {
49        Self {
50            track_prepared_statements: true,
51            prepared_tracker: Some(PreparedStatementTracker::new()),
52        }
53    }
54
55    /// Create a lease for this mode
56    pub fn create_lease(
57        &self,
58        connection: PooledConnection,
59        client_id: ClientId,
60    ) -> ConnectionLease {
61        ConnectionLease::new(connection, PoolingMode::Transaction, client_id)
62    }
63
64    /// Process a statement and determine action
65    ///
66    /// Returns the connection on COMMIT/ROLLBACK outside of savepoints.
67    pub fn on_statement_complete(&mut self, lease: &mut ConnectionLease, sql: &str) -> LeaseAction {
68        let _event = TransactionEvent::detect(sql);
69
70        // Track prepared statements if enabled
71        if self.track_prepared_statements {
72            self.track_prepared_statement(sql);
73        }
74
75        // Let the lease handle transaction state tracking
76        lease.on_statement_complete(sql)
77    }
78
79    /// Process transaction end signal from backend
80    pub fn on_transaction_end(&self, lease: &mut ConnectionLease) -> LeaseAction {
81        lease.on_transaction_end()
82    }
83
84    /// Check if connection should be released
85    pub fn should_release(&self, lease: &ConnectionLease) -> bool {
86        !lease.in_transaction()
87    }
88
89    /// Called when client disconnects
90    pub fn on_client_disconnect(&self, _lease: ConnectionLease) -> LeaseAction {
91        LeaseAction::Reset
92    }
93
94    /// Get the pooling mode
95    pub fn mode(&self) -> PoolingMode {
96        PoolingMode::Transaction
97    }
98
99    /// Check if prepared statement tracking is enabled
100    pub fn tracks_prepared_statements(&self) -> bool {
101        self.track_prepared_statements
102    }
103
104    /// Get prepared statement tracker
105    pub fn prepared_tracker(&self) -> Option<&PreparedStatementTracker> {
106        self.prepared_tracker.as_ref()
107    }
108
109    /// Get mutable prepared statement tracker
110    pub fn prepared_tracker_mut(&mut self) -> Option<&mut PreparedStatementTracker> {
111        self.prepared_tracker.as_mut()
112    }
113
114    /// Track a prepared statement from SQL
115    fn track_prepared_statement(&mut self, sql: &str) {
116        let upper = sql.trim().to_uppercase();
117
118        if let Some(tracker) = &mut self.prepared_tracker {
119            if upper.starts_with("PREPARE ") {
120                if let Some((name, _types, query)) = super::prepared::parse_prepare_statement(sql) {
121                    tracker.register(name, query, vec![]);
122                }
123            } else if upper.starts_with("DEALLOCATE ") {
124                if let Some(name_opt) = super::prepared::parse_deallocate_statement(sql) {
125                    match name_opt {
126                        Some(name) => {
127                            tracker.unregister(&name);
128                        }
129                        None => {
130                            tracker.clear();
131                        }
132                    }
133                }
134            } else if upper.starts_with("EXECUTE ") {
135                // Track execution count
136                let parts: Vec<&str> = sql.split_whitespace().collect();
137                if parts.len() >= 2 {
138                    let name = parts[1].trim_end_matches(['(', ';']);
139                    tracker.record_execution(name);
140                }
141            }
142        }
143    }
144
145    /// Get SQL to recreate prepared statements on a new connection
146    pub fn get_prepared_recreation_sql(&self) -> Vec<String> {
147        self.prepared_tracker
148            .as_ref()
149            .map(|t| t.generate_prepare_sql())
150            .unwrap_or_default()
151    }
152}
153
154#[cfg(test)]
155mod tests {
156    use super::*;
157    use crate::connection_pool::ConnectionState;
158    use crate::NodeId;
159    use uuid::Uuid;
160
161    fn create_test_connection() -> PooledConnection {
162        PooledConnection {
163            id: Uuid::new_v4(),
164            node_id: NodeId::new(),
165            created_at: chrono::Utc::now(),
166            last_used: chrono::Utc::now(),
167            state: ConnectionState::InUse,
168            use_count: 1,
169            permit: None,
170            client: None,
171        }
172    }
173
174    #[test]
175    fn test_transaction_mode_holds_during_transaction() {
176        let mut handler = TransactionModeHandler::new();
177        let conn = create_test_connection();
178        let mut lease = handler.create_lease(conn, ClientId::new());
179
180        // BEGIN should hold
181        assert_eq!(
182            handler.on_statement_complete(&mut lease, "BEGIN"),
183            LeaseAction::Hold
184        );
185        assert!(lease.in_transaction());
186
187        // Statements in transaction should hold
188        assert_eq!(
189            handler.on_statement_complete(&mut lease, "INSERT INTO users VALUES (1)"),
190            LeaseAction::Hold
191        );
192    }
193
194    #[test]
195    fn test_transaction_mode_releases_on_commit() {
196        let mut handler = TransactionModeHandler::new();
197        let conn = create_test_connection();
198        let mut lease = handler.create_lease(conn, ClientId::new());
199
200        handler.on_statement_complete(&mut lease, "BEGIN");
201        handler.on_statement_complete(&mut lease, "SELECT 1");
202
203        // COMMIT should reset/release
204        assert_eq!(
205            handler.on_statement_complete(&mut lease, "COMMIT"),
206            LeaseAction::Reset
207        );
208        assert!(!lease.in_transaction());
209    }
210
211    #[test]
212    fn test_transaction_mode_releases_on_rollback() {
213        let mut handler = TransactionModeHandler::new();
214        let conn = create_test_connection();
215        let mut lease = handler.create_lease(conn, ClientId::new());
216
217        handler.on_statement_complete(&mut lease, "BEGIN");
218        handler.on_statement_complete(&mut lease, "SELECT 1");
219
220        // ROLLBACK should reset/release
221        assert_eq!(
222            handler.on_statement_complete(&mut lease, "ROLLBACK"),
223            LeaseAction::Reset
224        );
225        assert!(!lease.in_transaction());
226    }
227
228    #[test]
229    fn test_transaction_mode_savepoint_handling() {
230        let mut handler = TransactionModeHandler::new();
231        let conn = create_test_connection();
232        let mut lease = handler.create_lease(conn, ClientId::new());
233
234        handler.on_statement_complete(&mut lease, "BEGIN");
235        handler.on_statement_complete(&mut lease, "SAVEPOINT sp1");
236
237        // ROLLBACK TO SAVEPOINT should not release
238        assert_eq!(
239            handler.on_statement_complete(&mut lease, "ROLLBACK TO SAVEPOINT sp1"),
240            LeaseAction::Hold
241        );
242        assert!(lease.in_transaction());
243
244        // Final COMMIT should release
245        assert_eq!(
246            handler.on_statement_complete(&mut lease, "COMMIT"),
247            LeaseAction::Reset
248        );
249    }
250
251    #[test]
252    fn test_should_release() {
253        let handler = TransactionModeHandler::new();
254        let conn = create_test_connection();
255        let lease = handler.create_lease(conn, ClientId::new());
256
257        // Not in transaction, should release
258        assert!(handler.should_release(&lease));
259    }
260
261    #[test]
262    fn test_prepared_tracking() {
263        let mut handler = TransactionModeHandler::with_prepared_tracking();
264        let conn = create_test_connection();
265        let mut lease = handler.create_lease(conn, ClientId::new());
266
267        handler.on_statement_complete(
268            &mut lease,
269            "PREPARE get_user AS SELECT * FROM users WHERE id = $1",
270        );
271
272        let tracker = handler.prepared_tracker().unwrap();
273        assert!(tracker.contains("get_user"));
274
275        handler.on_statement_complete(&mut lease, "DEALLOCATE get_user");
276        let tracker = handler.prepared_tracker().unwrap();
277        assert!(!tracker.contains("get_user"));
278    }
279
280    #[test]
281    fn test_mode() {
282        let handler = TransactionModeHandler::new();
283        assert_eq!(handler.mode(), PoolingMode::Transaction);
284    }
285}