heliosdb_proxy/pool/
transaction.rs1use super::lease::{ClientId, ConnectionLease, LeaseAction};
7use super::mode::{PoolingMode, TransactionEvent};
8use super::prepared::PreparedStatementTracker;
9use crate::connection_pool::PooledConnection;
10
11pub struct TransactionModeHandler {
26 track_prepared_statements: bool,
28 prepared_tracker: Option<PreparedStatementTracker>,
30}
31
32impl Default for TransactionModeHandler {
33 fn default() -> Self {
34 Self::new()
35 }
36}
37
38impl TransactionModeHandler {
39 pub fn new() -> Self {
41 Self {
42 track_prepared_statements: false,
43 prepared_tracker: None,
44 }
45 }
46
47 pub fn with_prepared_tracking() -> Self {
49 Self {
50 track_prepared_statements: true,
51 prepared_tracker: Some(PreparedStatementTracker::new()),
52 }
53 }
54
55 pub fn create_lease(&self, connection: PooledConnection, client_id: ClientId) -> ConnectionLease {
57 ConnectionLease::new(connection, PoolingMode::Transaction, client_id)
58 }
59
60 pub fn on_statement_complete(&mut self, lease: &mut ConnectionLease, sql: &str) -> LeaseAction {
64 let _event = TransactionEvent::detect(sql);
65
66 if self.track_prepared_statements {
68 self.track_prepared_statement(sql);
69 }
70
71 lease.on_statement_complete(sql)
73 }
74
75 pub fn on_transaction_end(&self, lease: &mut ConnectionLease) -> LeaseAction {
77 lease.on_transaction_end()
78 }
79
80 pub fn should_release(&self, lease: &ConnectionLease) -> bool {
82 !lease.in_transaction()
83 }
84
85 pub fn on_client_disconnect(&self, _lease: ConnectionLease) -> LeaseAction {
87 LeaseAction::Reset
88 }
89
90 pub fn mode(&self) -> PoolingMode {
92 PoolingMode::Transaction
93 }
94
95 pub fn tracks_prepared_statements(&self) -> bool {
97 self.track_prepared_statements
98 }
99
100 pub fn prepared_tracker(&self) -> Option<&PreparedStatementTracker> {
102 self.prepared_tracker.as_ref()
103 }
104
105 pub fn prepared_tracker_mut(&mut self) -> Option<&mut PreparedStatementTracker> {
107 self.prepared_tracker.as_mut()
108 }
109
110 fn track_prepared_statement(&mut self, sql: &str) {
112 let upper = sql.trim().to_uppercase();
113
114 if let Some(tracker) = &mut self.prepared_tracker {
115 if upper.starts_with("PREPARE ") {
116 if let Some((name, _types, query)) = super::prepared::parse_prepare_statement(sql) {
117 tracker.register(name, query, vec![]);
118 }
119 } else if upper.starts_with("DEALLOCATE ") {
120 if let Some(name_opt) = super::prepared::parse_deallocate_statement(sql) {
121 match name_opt {
122 Some(name) => {
123 tracker.unregister(&name);
124 }
125 None => {
126 tracker.clear();
127 }
128 }
129 }
130 } else if upper.starts_with("EXECUTE ") {
131 let parts: Vec<&str> = sql.split_whitespace().collect();
133 if parts.len() >= 2 {
134 let name = parts[1].trim_end_matches(|c| c == '(' || c == ';');
135 tracker.record_execution(name);
136 }
137 }
138 }
139 }
140
141 pub fn get_prepared_recreation_sql(&self) -> Vec<String> {
143 self.prepared_tracker
144 .as_ref()
145 .map(|t| t.generate_prepare_sql())
146 .unwrap_or_default()
147 }
148}
149
150#[cfg(test)]
151mod tests {
152 use super::*;
153 use crate::connection_pool::ConnectionState;
154 use crate::NodeId;
155 use uuid::Uuid;
156
157 fn create_test_connection() -> PooledConnection {
158 PooledConnection {
159 id: Uuid::new_v4(),
160 node_id: NodeId::new(),
161 created_at: chrono::Utc::now(),
162 last_used: chrono::Utc::now(),
163 state: ConnectionState::InUse,
164 use_count: 1,
165 permit: None,
166 client: None,
167 }
168 }
169
170 #[test]
171 fn test_transaction_mode_holds_during_transaction() {
172 let mut handler = TransactionModeHandler::new();
173 let conn = create_test_connection();
174 let mut lease = handler.create_lease(conn, ClientId::new());
175
176 assert_eq!(
178 handler.on_statement_complete(&mut lease, "BEGIN"),
179 LeaseAction::Hold
180 );
181 assert!(lease.in_transaction());
182
183 assert_eq!(
185 handler.on_statement_complete(&mut lease, "INSERT INTO users VALUES (1)"),
186 LeaseAction::Hold
187 );
188 }
189
190 #[test]
191 fn test_transaction_mode_releases_on_commit() {
192 let mut handler = TransactionModeHandler::new();
193 let conn = create_test_connection();
194 let mut lease = handler.create_lease(conn, ClientId::new());
195
196 handler.on_statement_complete(&mut lease, "BEGIN");
197 handler.on_statement_complete(&mut lease, "SELECT 1");
198
199 assert_eq!(
201 handler.on_statement_complete(&mut lease, "COMMIT"),
202 LeaseAction::Reset
203 );
204 assert!(!lease.in_transaction());
205 }
206
207 #[test]
208 fn test_transaction_mode_releases_on_rollback() {
209 let mut handler = TransactionModeHandler::new();
210 let conn = create_test_connection();
211 let mut lease = handler.create_lease(conn, ClientId::new());
212
213 handler.on_statement_complete(&mut lease, "BEGIN");
214 handler.on_statement_complete(&mut lease, "SELECT 1");
215
216 assert_eq!(
218 handler.on_statement_complete(&mut lease, "ROLLBACK"),
219 LeaseAction::Reset
220 );
221 assert!(!lease.in_transaction());
222 }
223
224 #[test]
225 fn test_transaction_mode_savepoint_handling() {
226 let mut handler = TransactionModeHandler::new();
227 let conn = create_test_connection();
228 let mut lease = handler.create_lease(conn, ClientId::new());
229
230 handler.on_statement_complete(&mut lease, "BEGIN");
231 handler.on_statement_complete(&mut lease, "SAVEPOINT sp1");
232
233 assert_eq!(
235 handler.on_statement_complete(&mut lease, "ROLLBACK TO SAVEPOINT sp1"),
236 LeaseAction::Hold
237 );
238 assert!(lease.in_transaction());
239
240 assert_eq!(
242 handler.on_statement_complete(&mut lease, "COMMIT"),
243 LeaseAction::Reset
244 );
245 }
246
247 #[test]
248 fn test_should_release() {
249 let handler = TransactionModeHandler::new();
250 let conn = create_test_connection();
251 let lease = handler.create_lease(conn, ClientId::new());
252
253 assert!(handler.should_release(&lease));
255 }
256
257 #[test]
258 fn test_prepared_tracking() {
259 let mut handler = TransactionModeHandler::with_prepared_tracking();
260 let conn = create_test_connection();
261 let mut lease = handler.create_lease(conn, ClientId::new());
262
263 handler.on_statement_complete(
264 &mut lease,
265 "PREPARE get_user AS SELECT * FROM users WHERE id = $1",
266 );
267
268 let tracker = handler.prepared_tracker().unwrap();
269 assert!(tracker.contains("get_user"));
270
271 handler.on_statement_complete(&mut lease, "DEALLOCATE get_user");
272 let tracker = handler.prepared_tracker().unwrap();
273 assert!(!tracker.contains("get_user"));
274 }
275
276 #[test]
277 fn test_mode() {
278 let handler = TransactionModeHandler::new();
279 assert_eq!(handler.mode(), PoolingMode::Transaction);
280 }
281}