heliosdb_proxy/pool/
transaction.rs1use super::lease::{ClientId, ConnectionLease, LeaseAction};
7use super::mode::{PoolingMode, TransactionEvent};
8use super::prepared::PreparedStatementTracker;
9use crate::connection_pool::PooledConnection;
10
11pub struct TransactionModeHandler {
26 track_prepared_statements: bool,
28 prepared_tracker: Option<PreparedStatementTracker>,
30}
31
32impl Default for TransactionModeHandler {
33 fn default() -> Self {
34 Self::new()
35 }
36}
37
38impl TransactionModeHandler {
39 pub fn new() -> Self {
41 Self {
42 track_prepared_statements: false,
43 prepared_tracker: None,
44 }
45 }
46
47 pub fn with_prepared_tracking() -> Self {
49 Self {
50 track_prepared_statements: true,
51 prepared_tracker: Some(PreparedStatementTracker::new()),
52 }
53 }
54
55 pub fn create_lease(
57 &self,
58 connection: PooledConnection,
59 client_id: ClientId,
60 ) -> ConnectionLease {
61 ConnectionLease::new(connection, PoolingMode::Transaction, client_id)
62 }
63
64 pub fn on_statement_complete(&mut self, lease: &mut ConnectionLease, sql: &str) -> LeaseAction {
68 let _event = TransactionEvent::detect(sql);
69
70 if self.track_prepared_statements {
72 self.track_prepared_statement(sql);
73 }
74
75 lease.on_statement_complete(sql)
77 }
78
79 pub fn on_transaction_end(&self, lease: &mut ConnectionLease) -> LeaseAction {
81 lease.on_transaction_end()
82 }
83
84 pub fn should_release(&self, lease: &ConnectionLease) -> bool {
86 !lease.in_transaction()
87 }
88
89 pub fn on_client_disconnect(&self, _lease: ConnectionLease) -> LeaseAction {
91 LeaseAction::Reset
92 }
93
94 pub fn mode(&self) -> PoolingMode {
96 PoolingMode::Transaction
97 }
98
99 pub fn tracks_prepared_statements(&self) -> bool {
101 self.track_prepared_statements
102 }
103
104 pub fn prepared_tracker(&self) -> Option<&PreparedStatementTracker> {
106 self.prepared_tracker.as_ref()
107 }
108
109 pub fn prepared_tracker_mut(&mut self) -> Option<&mut PreparedStatementTracker> {
111 self.prepared_tracker.as_mut()
112 }
113
114 fn track_prepared_statement(&mut self, sql: &str) {
116 let upper = sql.trim().to_uppercase();
117
118 if let Some(tracker) = &mut self.prepared_tracker {
119 if upper.starts_with("PREPARE ") {
120 if let Some((name, _types, query)) = super::prepared::parse_prepare_statement(sql) {
121 tracker.register(name, query, vec![]);
122 }
123 } else if upper.starts_with("DEALLOCATE ") {
124 if let Some(name_opt) = super::prepared::parse_deallocate_statement(sql) {
125 match name_opt {
126 Some(name) => {
127 tracker.unregister(&name);
128 }
129 None => {
130 tracker.clear();
131 }
132 }
133 }
134 } else if upper.starts_with("EXECUTE ") {
135 let parts: Vec<&str> = sql.split_whitespace().collect();
137 if parts.len() >= 2 {
138 let name = parts[1].trim_end_matches(['(', ';']);
139 tracker.record_execution(name);
140 }
141 }
142 }
143 }
144
145 pub fn get_prepared_recreation_sql(&self) -> Vec<String> {
147 self.prepared_tracker
148 .as_ref()
149 .map(|t| t.generate_prepare_sql())
150 .unwrap_or_default()
151 }
152}
153
154#[cfg(test)]
155mod tests {
156 use super::*;
157 use crate::connection_pool::ConnectionState;
158 use crate::NodeId;
159 use uuid::Uuid;
160
161 fn create_test_connection() -> PooledConnection {
162 PooledConnection {
163 id: Uuid::new_v4(),
164 node_id: NodeId::new(),
165 created_at: chrono::Utc::now(),
166 last_used: chrono::Utc::now(),
167 state: ConnectionState::InUse,
168 use_count: 1,
169 permit: None,
170 client: None,
171 }
172 }
173
174 #[test]
175 fn test_transaction_mode_holds_during_transaction() {
176 let mut handler = TransactionModeHandler::new();
177 let conn = create_test_connection();
178 let mut lease = handler.create_lease(conn, ClientId::new());
179
180 assert_eq!(
182 handler.on_statement_complete(&mut lease, "BEGIN"),
183 LeaseAction::Hold
184 );
185 assert!(lease.in_transaction());
186
187 assert_eq!(
189 handler.on_statement_complete(&mut lease, "INSERT INTO users VALUES (1)"),
190 LeaseAction::Hold
191 );
192 }
193
194 #[test]
195 fn test_transaction_mode_releases_on_commit() {
196 let mut handler = TransactionModeHandler::new();
197 let conn = create_test_connection();
198 let mut lease = handler.create_lease(conn, ClientId::new());
199
200 handler.on_statement_complete(&mut lease, "BEGIN");
201 handler.on_statement_complete(&mut lease, "SELECT 1");
202
203 assert_eq!(
205 handler.on_statement_complete(&mut lease, "COMMIT"),
206 LeaseAction::Reset
207 );
208 assert!(!lease.in_transaction());
209 }
210
211 #[test]
212 fn test_transaction_mode_releases_on_rollback() {
213 let mut handler = TransactionModeHandler::new();
214 let conn = create_test_connection();
215 let mut lease = handler.create_lease(conn, ClientId::new());
216
217 handler.on_statement_complete(&mut lease, "BEGIN");
218 handler.on_statement_complete(&mut lease, "SELECT 1");
219
220 assert_eq!(
222 handler.on_statement_complete(&mut lease, "ROLLBACK"),
223 LeaseAction::Reset
224 );
225 assert!(!lease.in_transaction());
226 }
227
228 #[test]
229 fn test_transaction_mode_savepoint_handling() {
230 let mut handler = TransactionModeHandler::new();
231 let conn = create_test_connection();
232 let mut lease = handler.create_lease(conn, ClientId::new());
233
234 handler.on_statement_complete(&mut lease, "BEGIN");
235 handler.on_statement_complete(&mut lease, "SAVEPOINT sp1");
236
237 assert_eq!(
239 handler.on_statement_complete(&mut lease, "ROLLBACK TO SAVEPOINT sp1"),
240 LeaseAction::Hold
241 );
242 assert!(lease.in_transaction());
243
244 assert_eq!(
246 handler.on_statement_complete(&mut lease, "COMMIT"),
247 LeaseAction::Reset
248 );
249 }
250
251 #[test]
252 fn test_should_release() {
253 let handler = TransactionModeHandler::new();
254 let conn = create_test_connection();
255 let lease = handler.create_lease(conn, ClientId::new());
256
257 assert!(handler.should_release(&lease));
259 }
260
261 #[test]
262 fn test_prepared_tracking() {
263 let mut handler = TransactionModeHandler::with_prepared_tracking();
264 let conn = create_test_connection();
265 let mut lease = handler.create_lease(conn, ClientId::new());
266
267 handler.on_statement_complete(
268 &mut lease,
269 "PREPARE get_user AS SELECT * FROM users WHERE id = $1",
270 );
271
272 let tracker = handler.prepared_tracker().unwrap();
273 assert!(tracker.contains("get_user"));
274
275 handler.on_statement_complete(&mut lease, "DEALLOCATE get_user");
276 let tracker = handler.prepared_tracker().unwrap();
277 assert!(!tracker.contains("get_user"));
278 }
279
280 #[test]
281 fn test_mode() {
282 let handler = TransactionModeHandler::new();
283 assert_eq!(handler.mode(), PoolingMode::Transaction);
284 }
285}