heliosdb_proxy/pool/
statement.rs1use super::lease::{ClientId, ConnectionLease, LeaseAction};
7use super::mode::{PoolingMode, TransactionEvent};
8use crate::connection_pool::PooledConnection;
9
10pub struct StatementModeHandler {
33 autocommit: bool,
35}
36
37impl Default for StatementModeHandler {
38 fn default() -> Self {
39 Self::new()
40 }
41}
42
43impl StatementModeHandler {
44 pub fn new() -> Self {
46 Self { autocommit: true }
47 }
48
49 pub fn with_autocommit(autocommit: bool) -> Self {
51 Self { autocommit }
52 }
53
54 pub fn create_lease(
56 &self,
57 connection: PooledConnection,
58 client_id: ClientId,
59 ) -> ConnectionLease {
60 ConnectionLease::new(connection, PoolingMode::Statement, client_id)
61 }
62
63 pub fn on_statement_complete(&self, lease: &mut ConnectionLease, sql: &str) -> LeaseAction {
67 let event = TransactionEvent::detect(sql);
68
69 let _action = lease.on_statement_complete(sql);
71
72 match event {
74 TransactionEvent::Begin => {
75 LeaseAction::Hold
77 }
78 TransactionEvent::Commit | TransactionEvent::Rollback => {
79 LeaseAction::Reset
81 }
82 _ => {
83 if lease.in_transaction() {
85 LeaseAction::Hold
86 } else {
87 LeaseAction::Reset
89 }
90 }
91 }
92 }
93
94 pub fn on_transaction_end(&self, lease: &mut ConnectionLease) -> LeaseAction {
96 lease.on_transaction_end()
97 }
98
99 pub fn should_release(&self, lease: &ConnectionLease) -> bool {
101 !lease.in_transaction()
102 }
103
104 pub fn on_client_disconnect(&self, _lease: ConnectionLease) -> LeaseAction {
106 LeaseAction::Reset
107 }
108
109 pub fn mode(&self) -> PoolingMode {
111 PoolingMode::Statement
112 }
113
114 pub fn autocommit(&self) -> bool {
116 self.autocommit
117 }
118
119 pub fn tracks_prepared_statements(&self) -> bool {
121 false
122 }
123
124 pub fn is_safe_query(&self, sql: &str) -> bool {
128 let upper = sql.trim().to_uppercase();
129
130 if upper.starts_with("LISTEN")
132 || upper.starts_with("UNLISTEN")
133 || upper.starts_with("PREPARE")
134 || upper.starts_with("EXECUTE")
135 || upper.starts_with("DEALLOCATE")
136 || upper.starts_with("DECLARE")
137 || upper.starts_with("FETCH")
138 || upper.starts_with("CLOSE")
139 || upper.starts_with("MOVE")
140 || upper.contains("CREATE TEMP")
141 || upper.contains("CREATE TEMPORARY")
142 {
143 return false;
144 }
145
146 if upper.starts_with("SET ")
148 && !upper.starts_with("SET LOCAL")
149 && !upper.starts_with("SET TRANSACTION")
150 {
151 return false;
152 }
153
154 true
155 }
156
157 pub fn get_query_warning(&self, sql: &str) -> Option<&'static str> {
159 let upper = sql.trim().to_uppercase();
160
161 if upper.starts_with("LISTEN") || upper.starts_with("UNLISTEN") {
162 return Some(
163 "LISTEN/UNLISTEN not supported in statement mode - notifications will be lost",
164 );
165 }
166
167 if upper.starts_with("PREPARE")
168 || upper.starts_with("EXECUTE")
169 || upper.starts_with("DEALLOCATE")
170 {
171 return Some("Prepared statements not supported in statement mode");
172 }
173
174 if upper.starts_with("DECLARE") || upper.starts_with("FETCH") || upper.starts_with("CLOSE")
175 {
176 return Some("Cursors not supported in statement mode outside explicit transactions");
177 }
178
179 if upper.contains("CREATE TEMP") || upper.contains("CREATE TEMPORARY") {
180 return Some("Temporary tables may not persist correctly in statement mode");
181 }
182
183 if upper.starts_with("SET ")
184 && !upper.starts_with("SET LOCAL")
185 && !upper.starts_with("SET TRANSACTION")
186 {
187 return Some("Session variables may not persist in statement mode - use SET LOCAL within transaction");
188 }
189
190 None
191 }
192}
193
194#[cfg(test)]
195mod tests {
196 use super::*;
197 use crate::connection_pool::ConnectionState;
198 use crate::NodeId;
199 use uuid::Uuid;
200
201 fn create_test_connection() -> PooledConnection {
202 PooledConnection {
203 id: Uuid::new_v4(),
204 node_id: NodeId::new(),
205 created_at: chrono::Utc::now(),
206 last_used: chrono::Utc::now(),
207 state: ConnectionState::InUse,
208 use_count: 1,
209 permit: None,
210 client: None,
211 }
212 }
213
214 #[test]
215 fn test_statement_mode_releases_per_statement() {
216 let handler = StatementModeHandler::new();
217 let conn = create_test_connection();
218 let mut lease = handler.create_lease(conn, ClientId::new());
219
220 assert_eq!(
222 handler.on_statement_complete(&mut lease, "SELECT 1"),
223 LeaseAction::Reset
224 );
225 }
226
227 #[test]
228 fn test_statement_mode_holds_during_transaction() {
229 let handler = StatementModeHandler::new();
230 let conn = create_test_connection();
231 let mut lease = handler.create_lease(conn, ClientId::new());
232
233 assert_eq!(
235 handler.on_statement_complete(&mut lease, "BEGIN"),
236 LeaseAction::Hold
237 );
238
239 assert_eq!(
241 handler.on_statement_complete(&mut lease, "SELECT 1"),
242 LeaseAction::Hold
243 );
244 assert_eq!(
245 handler.on_statement_complete(&mut lease, "INSERT INTO t VALUES (1)"),
246 LeaseAction::Hold
247 );
248
249 assert_eq!(
251 handler.on_statement_complete(&mut lease, "COMMIT"),
252 LeaseAction::Reset
253 );
254 }
255
256 #[test]
257 fn test_should_release() {
258 let handler = StatementModeHandler::new();
259 let conn = create_test_connection();
260 let lease = handler.create_lease(conn, ClientId::new());
261
262 assert!(handler.should_release(&lease));
264 }
265
266 #[test]
267 fn test_safe_query_detection() {
268 let handler = StatementModeHandler::new();
269
270 assert!(handler.is_safe_query("SELECT * FROM users"));
272 assert!(handler.is_safe_query("INSERT INTO users VALUES (1)"));
273 assert!(handler.is_safe_query("UPDATE users SET name = 'foo'"));
274 assert!(handler.is_safe_query("DELETE FROM users WHERE id = 1"));
275 assert!(handler.is_safe_query("SET LOCAL work_mem = '1GB'"));
276
277 assert!(!handler.is_safe_query("LISTEN channel"));
279 assert!(!handler.is_safe_query("PREPARE stmt AS SELECT 1"));
280 assert!(!handler.is_safe_query("EXECUTE stmt"));
281 assert!(!handler.is_safe_query("DECLARE cursor CURSOR FOR SELECT 1"));
282 assert!(!handler.is_safe_query("CREATE TEMP TABLE t (id int)"));
283 assert!(!handler.is_safe_query("SET work_mem = '1GB'"));
284 }
285
286 #[test]
287 fn test_query_warnings() {
288 let handler = StatementModeHandler::new();
289
290 assert!(handler.get_query_warning("LISTEN channel").is_some());
291 assert!(handler
292 .get_query_warning("PREPARE stmt AS SELECT 1")
293 .is_some());
294 assert!(handler
295 .get_query_warning("CREATE TEMP TABLE t (id int)")
296 .is_some());
297 assert!(handler.get_query_warning("SET work_mem = '1GB'").is_some());
298
299 assert!(handler.get_query_warning("SELECT 1").is_none());
300 assert!(handler
301 .get_query_warning("SET LOCAL work_mem = '1GB'")
302 .is_none());
303 }
304
305 #[test]
306 fn test_mode() {
307 let handler = StatementModeHandler::new();
308 assert_eq!(handler.mode(), PoolingMode::Statement);
309 }
310
311 #[test]
312 fn test_no_prepared_statement_support() {
313 let handler = StatementModeHandler::new();
314 assert!(!handler.tracks_prepared_statements());
315 }
316}