heliosdb_proxy/pool/
statement.rs1use super::lease::{ClientId, ConnectionLease, LeaseAction};
7use super::mode::{PoolingMode, TransactionEvent};
8use crate::connection_pool::PooledConnection;
9
10pub struct StatementModeHandler {
33 autocommit: bool,
35}
36
37impl Default for StatementModeHandler {
38 fn default() -> Self {
39 Self::new()
40 }
41}
42
43impl StatementModeHandler {
44 pub fn new() -> Self {
46 Self { autocommit: true }
47 }
48
49 pub fn with_autocommit(autocommit: bool) -> Self {
51 Self { autocommit }
52 }
53
54 pub fn create_lease(&self, connection: PooledConnection, client_id: ClientId) -> ConnectionLease {
56 ConnectionLease::new(connection, PoolingMode::Statement, client_id)
57 }
58
59 pub fn on_statement_complete(&self, lease: &mut ConnectionLease, sql: &str) -> LeaseAction {
63 let event = TransactionEvent::detect(sql);
64
65 let _action = lease.on_statement_complete(sql);
67
68 match event {
70 TransactionEvent::Begin => {
71 LeaseAction::Hold
73 }
74 TransactionEvent::Commit | TransactionEvent::Rollback => {
75 LeaseAction::Reset
77 }
78 _ => {
79 if lease.in_transaction() {
81 LeaseAction::Hold
82 } else {
83 LeaseAction::Reset
85 }
86 }
87 }
88 }
89
90 pub fn on_transaction_end(&self, lease: &mut ConnectionLease) -> LeaseAction {
92 lease.on_transaction_end()
93 }
94
95 pub fn should_release(&self, lease: &ConnectionLease) -> bool {
97 !lease.in_transaction()
98 }
99
100 pub fn on_client_disconnect(&self, _lease: ConnectionLease) -> LeaseAction {
102 LeaseAction::Reset
103 }
104
105 pub fn mode(&self) -> PoolingMode {
107 PoolingMode::Statement
108 }
109
110 pub fn autocommit(&self) -> bool {
112 self.autocommit
113 }
114
115 pub fn tracks_prepared_statements(&self) -> bool {
117 false
118 }
119
120 pub fn is_safe_query(&self, sql: &str) -> bool {
124 let upper = sql.trim().to_uppercase();
125
126 if upper.starts_with("LISTEN")
128 || upper.starts_with("UNLISTEN")
129 || upper.starts_with("PREPARE")
130 || upper.starts_with("EXECUTE")
131 || upper.starts_with("DEALLOCATE")
132 || upper.starts_with("DECLARE")
133 || upper.starts_with("FETCH")
134 || upper.starts_with("CLOSE")
135 || upper.starts_with("MOVE")
136 || upper.contains("CREATE TEMP")
137 || upper.contains("CREATE TEMPORARY")
138 {
139 return false;
140 }
141
142 if upper.starts_with("SET ")
144 && !upper.starts_with("SET LOCAL")
145 && !upper.starts_with("SET TRANSACTION")
146 {
147 return false;
148 }
149
150 true
151 }
152
153 pub fn get_query_warning(&self, sql: &str) -> Option<&'static str> {
155 let upper = sql.trim().to_uppercase();
156
157 if upper.starts_with("LISTEN") || upper.starts_with("UNLISTEN") {
158 return Some("LISTEN/UNLISTEN not supported in statement mode - notifications will be lost");
159 }
160
161 if upper.starts_with("PREPARE") || upper.starts_with("EXECUTE") || upper.starts_with("DEALLOCATE") {
162 return Some("Prepared statements not supported in statement mode");
163 }
164
165 if upper.starts_with("DECLARE") || upper.starts_with("FETCH") || upper.starts_with("CLOSE") {
166 return Some("Cursors not supported in statement mode outside explicit transactions");
167 }
168
169 if upper.contains("CREATE TEMP") || upper.contains("CREATE TEMPORARY") {
170 return Some("Temporary tables may not persist correctly in statement mode");
171 }
172
173 if upper.starts_with("SET ") && !upper.starts_with("SET LOCAL") && !upper.starts_with("SET TRANSACTION") {
174 return Some("Session variables may not persist in statement mode - use SET LOCAL within transaction");
175 }
176
177 None
178 }
179}
180
181#[cfg(test)]
182mod tests {
183 use super::*;
184 use crate::connection_pool::ConnectionState;
185 use crate::NodeId;
186 use uuid::Uuid;
187
188 fn create_test_connection() -> PooledConnection {
189 PooledConnection {
190 id: Uuid::new_v4(),
191 node_id: NodeId::new(),
192 created_at: chrono::Utc::now(),
193 last_used: chrono::Utc::now(),
194 state: ConnectionState::InUse,
195 use_count: 1,
196 permit: None,
197 client: None,
198 }
199 }
200
201 #[test]
202 fn test_statement_mode_releases_per_statement() {
203 let handler = StatementModeHandler::new();
204 let conn = create_test_connection();
205 let mut lease = handler.create_lease(conn, ClientId::new());
206
207 assert_eq!(
209 handler.on_statement_complete(&mut lease, "SELECT 1"),
210 LeaseAction::Reset
211 );
212 }
213
214 #[test]
215 fn test_statement_mode_holds_during_transaction() {
216 let handler = StatementModeHandler::new();
217 let conn = create_test_connection();
218 let mut lease = handler.create_lease(conn, ClientId::new());
219
220 assert_eq!(
222 handler.on_statement_complete(&mut lease, "BEGIN"),
223 LeaseAction::Hold
224 );
225
226 assert_eq!(
228 handler.on_statement_complete(&mut lease, "SELECT 1"),
229 LeaseAction::Hold
230 );
231 assert_eq!(
232 handler.on_statement_complete(&mut lease, "INSERT INTO t VALUES (1)"),
233 LeaseAction::Hold
234 );
235
236 assert_eq!(
238 handler.on_statement_complete(&mut lease, "COMMIT"),
239 LeaseAction::Reset
240 );
241 }
242
243 #[test]
244 fn test_should_release() {
245 let handler = StatementModeHandler::new();
246 let conn = create_test_connection();
247 let lease = handler.create_lease(conn, ClientId::new());
248
249 assert!(handler.should_release(&lease));
251 }
252
253 #[test]
254 fn test_safe_query_detection() {
255 let handler = StatementModeHandler::new();
256
257 assert!(handler.is_safe_query("SELECT * FROM users"));
259 assert!(handler.is_safe_query("INSERT INTO users VALUES (1)"));
260 assert!(handler.is_safe_query("UPDATE users SET name = 'foo'"));
261 assert!(handler.is_safe_query("DELETE FROM users WHERE id = 1"));
262 assert!(handler.is_safe_query("SET LOCAL work_mem = '1GB'"));
263
264 assert!(!handler.is_safe_query("LISTEN channel"));
266 assert!(!handler.is_safe_query("PREPARE stmt AS SELECT 1"));
267 assert!(!handler.is_safe_query("EXECUTE stmt"));
268 assert!(!handler.is_safe_query("DECLARE cursor CURSOR FOR SELECT 1"));
269 assert!(!handler.is_safe_query("CREATE TEMP TABLE t (id int)"));
270 assert!(!handler.is_safe_query("SET work_mem = '1GB'"));
271 }
272
273 #[test]
274 fn test_query_warnings() {
275 let handler = StatementModeHandler::new();
276
277 assert!(handler.get_query_warning("LISTEN channel").is_some());
278 assert!(handler.get_query_warning("PREPARE stmt AS SELECT 1").is_some());
279 assert!(handler.get_query_warning("CREATE TEMP TABLE t (id int)").is_some());
280 assert!(handler.get_query_warning("SET work_mem = '1GB'").is_some());
281
282 assert!(handler.get_query_warning("SELECT 1").is_none());
283 assert!(handler.get_query_warning("SET LOCAL work_mem = '1GB'").is_none());
284 }
285
286 #[test]
287 fn test_mode() {
288 let handler = StatementModeHandler::new();
289 assert_eq!(handler.mode(), PoolingMode::Statement);
290 }
291
292 #[test]
293 fn test_no_prepared_statement_support() {
294 let handler = StatementModeHandler::new();
295 assert!(!handler.tracks_prepared_statements());
296 }
297}