Skip to main content

heliosdb_proxy/pool/
statement.rs

1//! Statement Mode Handler
2//!
3//! Implements statement pooling mode where connections are returned
4//! to the pool after each individual statement completes.
5
6use super::lease::{ClientId, ConnectionLease, LeaseAction};
7use super::mode::{PoolingMode, TransactionEvent};
8use crate::connection_pool::PooledConnection;
9
10/// Statement mode handler
11///
12/// In statement mode, connections are returned to the pool after every statement
13/// outside of an explicit transaction. This provides maximum connection sharing
14/// but with significant limitations.
15///
16/// Benefits:
17/// - Maximum connection sharing
18/// - Best for high-volume simple queries
19/// - Good for read-heavy workloads with many clients
20///
21/// Limitations:
22/// - CANNOT use server-side prepared statements (Parse/Bind/Execute)
23/// - CANNOT use LISTEN/NOTIFY
24/// - CANNOT rely on session variables
25/// - CANNOT use temp tables effectively
26/// - Must use simple query protocol
27///
28/// Use cases:
29/// - Connection poolers like PgBouncer in statement mode
30/// - High-throughput REST APIs with simple queries
31/// - Serverless environments with many concurrent clients
32pub struct StatementModeHandler {
33    /// Whether autocommit is enabled (single statements are transactions)
34    autocommit: bool,
35}
36
37impl Default for StatementModeHandler {
38    fn default() -> Self {
39        Self::new()
40    }
41}
42
43impl StatementModeHandler {
44    /// Create a new statement mode handler
45    pub fn new() -> Self {
46        Self { autocommit: true }
47    }
48
49    /// Create with specific autocommit setting
50    pub fn with_autocommit(autocommit: bool) -> Self {
51        Self { autocommit }
52    }
53
54    /// Create a lease for this mode
55    pub fn create_lease(
56        &self,
57        connection: PooledConnection,
58        client_id: ClientId,
59    ) -> ConnectionLease {
60        ConnectionLease::new(connection, PoolingMode::Statement, client_id)
61    }
62
63    /// Process a statement and determine action
64    ///
65    /// Returns the connection after every statement outside of explicit transaction.
66    pub fn on_statement_complete(&self, lease: &mut ConnectionLease, sql: &str) -> LeaseAction {
67        let event = TransactionEvent::detect(sql);
68
69        // Update lease transaction state
70        let _action = lease.on_statement_complete(sql);
71
72        // Override for statement mode specifics
73        match event {
74            TransactionEvent::Begin => {
75                // Explicit transaction - hold
76                LeaseAction::Hold
77            }
78            TransactionEvent::Commit | TransactionEvent::Rollback => {
79                // Transaction ended - reset and release
80                LeaseAction::Reset
81            }
82            _ => {
83                // In explicit transaction, hold
84                if lease.in_transaction() {
85                    LeaseAction::Hold
86                } else {
87                    // Single statement, release immediately
88                    LeaseAction::Reset
89                }
90            }
91        }
92    }
93
94    /// Process transaction end signal from backend
95    pub fn on_transaction_end(&self, lease: &mut ConnectionLease) -> LeaseAction {
96        lease.on_transaction_end()
97    }
98
99    /// Check if connection should be released
100    pub fn should_release(&self, lease: &ConnectionLease) -> bool {
101        !lease.in_transaction()
102    }
103
104    /// Called when client disconnects
105    pub fn on_client_disconnect(&self, _lease: ConnectionLease) -> LeaseAction {
106        LeaseAction::Reset
107    }
108
109    /// Get the pooling mode
110    pub fn mode(&self) -> PoolingMode {
111        PoolingMode::Statement
112    }
113
114    /// Check if autocommit is enabled
115    pub fn autocommit(&self) -> bool {
116        self.autocommit
117    }
118
119    /// Prepared statements are not supported in statement mode
120    pub fn tracks_prepared_statements(&self) -> bool {
121        false
122    }
123
124    /// Check if a query is safe for statement mode
125    ///
126    /// Returns false for queries that require session state.
127    pub fn is_safe_query(&self, sql: &str) -> bool {
128        let upper = sql.trim().to_uppercase();
129
130        // These are not safe for statement mode
131        if upper.starts_with("LISTEN")
132            || upper.starts_with("UNLISTEN")
133            || upper.starts_with("PREPARE")
134            || upper.starts_with("EXECUTE")
135            || upper.starts_with("DEALLOCATE")
136            || upper.starts_with("DECLARE")
137            || upper.starts_with("FETCH")
138            || upper.starts_with("CLOSE")
139            || upper.starts_with("MOVE")
140            || upper.contains("CREATE TEMP")
141            || upper.contains("CREATE TEMPORARY")
142        {
143            return false;
144        }
145
146        // SET commands that affect session state
147        if upper.starts_with("SET ")
148            && !upper.starts_with("SET LOCAL")
149            && !upper.starts_with("SET TRANSACTION")
150        {
151            return false;
152        }
153
154        true
155    }
156
157    /// Get warning if query is unsafe for statement mode
158    pub fn get_query_warning(&self, sql: &str) -> Option<&'static str> {
159        let upper = sql.trim().to_uppercase();
160
161        if upper.starts_with("LISTEN") || upper.starts_with("UNLISTEN") {
162            return Some(
163                "LISTEN/UNLISTEN not supported in statement mode - notifications will be lost",
164            );
165        }
166
167        if upper.starts_with("PREPARE")
168            || upper.starts_with("EXECUTE")
169            || upper.starts_with("DEALLOCATE")
170        {
171            return Some("Prepared statements not supported in statement mode");
172        }
173
174        if upper.starts_with("DECLARE") || upper.starts_with("FETCH") || upper.starts_with("CLOSE")
175        {
176            return Some("Cursors not supported in statement mode outside explicit transactions");
177        }
178
179        if upper.contains("CREATE TEMP") || upper.contains("CREATE TEMPORARY") {
180            return Some("Temporary tables may not persist correctly in statement mode");
181        }
182
183        if upper.starts_with("SET ")
184            && !upper.starts_with("SET LOCAL")
185            && !upper.starts_with("SET TRANSACTION")
186        {
187            return Some("Session variables may not persist in statement mode - use SET LOCAL within transaction");
188        }
189
190        None
191    }
192}
193
194#[cfg(test)]
195mod tests {
196    use super::*;
197    use crate::connection_pool::ConnectionState;
198    use crate::NodeId;
199    use uuid::Uuid;
200
201    fn create_test_connection() -> PooledConnection {
202        PooledConnection {
203            id: Uuid::new_v4(),
204            node_id: NodeId::new(),
205            created_at: chrono::Utc::now(),
206            last_used: chrono::Utc::now(),
207            state: ConnectionState::InUse,
208            use_count: 1,
209            permit: None,
210            client: None,
211        }
212    }
213
214    #[test]
215    fn test_statement_mode_releases_per_statement() {
216        let handler = StatementModeHandler::new();
217        let conn = create_test_connection();
218        let mut lease = handler.create_lease(conn, ClientId::new());
219
220        // Single statement should release
221        assert_eq!(
222            handler.on_statement_complete(&mut lease, "SELECT 1"),
223            LeaseAction::Reset
224        );
225    }
226
227    #[test]
228    fn test_statement_mode_holds_during_transaction() {
229        let handler = StatementModeHandler::new();
230        let conn = create_test_connection();
231        let mut lease = handler.create_lease(conn, ClientId::new());
232
233        // BEGIN should hold
234        assert_eq!(
235            handler.on_statement_complete(&mut lease, "BEGIN"),
236            LeaseAction::Hold
237        );
238
239        // Statements in transaction should hold
240        assert_eq!(
241            handler.on_statement_complete(&mut lease, "SELECT 1"),
242            LeaseAction::Hold
243        );
244        assert_eq!(
245            handler.on_statement_complete(&mut lease, "INSERT INTO t VALUES (1)"),
246            LeaseAction::Hold
247        );
248
249        // COMMIT should release
250        assert_eq!(
251            handler.on_statement_complete(&mut lease, "COMMIT"),
252            LeaseAction::Reset
253        );
254    }
255
256    #[test]
257    fn test_should_release() {
258        let handler = StatementModeHandler::new();
259        let conn = create_test_connection();
260        let lease = handler.create_lease(conn, ClientId::new());
261
262        // Not in transaction, should release
263        assert!(handler.should_release(&lease));
264    }
265
266    #[test]
267    fn test_safe_query_detection() {
268        let handler = StatementModeHandler::new();
269
270        // Safe queries
271        assert!(handler.is_safe_query("SELECT * FROM users"));
272        assert!(handler.is_safe_query("INSERT INTO users VALUES (1)"));
273        assert!(handler.is_safe_query("UPDATE users SET name = 'foo'"));
274        assert!(handler.is_safe_query("DELETE FROM users WHERE id = 1"));
275        assert!(handler.is_safe_query("SET LOCAL work_mem = '1GB'"));
276
277        // Unsafe queries
278        assert!(!handler.is_safe_query("LISTEN channel"));
279        assert!(!handler.is_safe_query("PREPARE stmt AS SELECT 1"));
280        assert!(!handler.is_safe_query("EXECUTE stmt"));
281        assert!(!handler.is_safe_query("DECLARE cursor CURSOR FOR SELECT 1"));
282        assert!(!handler.is_safe_query("CREATE TEMP TABLE t (id int)"));
283        assert!(!handler.is_safe_query("SET work_mem = '1GB'"));
284    }
285
286    #[test]
287    fn test_query_warnings() {
288        let handler = StatementModeHandler::new();
289
290        assert!(handler.get_query_warning("LISTEN channel").is_some());
291        assert!(handler
292            .get_query_warning("PREPARE stmt AS SELECT 1")
293            .is_some());
294        assert!(handler
295            .get_query_warning("CREATE TEMP TABLE t (id int)")
296            .is_some());
297        assert!(handler.get_query_warning("SET work_mem = '1GB'").is_some());
298
299        assert!(handler.get_query_warning("SELECT 1").is_none());
300        assert!(handler
301            .get_query_warning("SET LOCAL work_mem = '1GB'")
302            .is_none());
303    }
304
305    #[test]
306    fn test_mode() {
307        let handler = StatementModeHandler::new();
308        assert_eq!(handler.mode(), PoolingMode::Statement);
309    }
310
311    #[test]
312    fn test_no_prepared_statement_support() {
313        let handler = StatementModeHandler::new();
314        assert!(!handler.tracks_prepared_statements());
315    }
316}