Skip to main content

heliosdb_proxy/pool/
statement.rs

1//! Statement Mode Handler
2//!
3//! Implements statement pooling mode where connections are returned
4//! to the pool after each individual statement completes.
5
6use super::lease::{ClientId, ConnectionLease, LeaseAction};
7use super::mode::{PoolingMode, TransactionEvent};
8use crate::connection_pool::PooledConnection;
9
10/// Statement mode handler
11///
12/// In statement mode, connections are returned to the pool after every statement
13/// outside of an explicit transaction. This provides maximum connection sharing
14/// but with significant limitations.
15///
16/// Benefits:
17/// - Maximum connection sharing
18/// - Best for high-volume simple queries
19/// - Good for read-heavy workloads with many clients
20///
21/// Limitations:
22/// - CANNOT use server-side prepared statements (Parse/Bind/Execute)
23/// - CANNOT use LISTEN/NOTIFY
24/// - CANNOT rely on session variables
25/// - CANNOT use temp tables effectively
26/// - Must use simple query protocol
27///
28/// Use cases:
29/// - Connection poolers like PgBouncer in statement mode
30/// - High-throughput REST APIs with simple queries
31/// - Serverless environments with many concurrent clients
32pub struct StatementModeHandler {
33    /// Whether autocommit is enabled (single statements are transactions)
34    autocommit: bool,
35}
36
37impl Default for StatementModeHandler {
38    fn default() -> Self {
39        Self::new()
40    }
41}
42
43impl StatementModeHandler {
44    /// Create a new statement mode handler
45    pub fn new() -> Self {
46        Self { autocommit: true }
47    }
48
49    /// Create with specific autocommit setting
50    pub fn with_autocommit(autocommit: bool) -> Self {
51        Self { autocommit }
52    }
53
54    /// Create a lease for this mode
55    pub fn create_lease(&self, connection: PooledConnection, client_id: ClientId) -> ConnectionLease {
56        ConnectionLease::new(connection, PoolingMode::Statement, client_id)
57    }
58
59    /// Process a statement and determine action
60    ///
61    /// Returns the connection after every statement outside of explicit transaction.
62    pub fn on_statement_complete(&self, lease: &mut ConnectionLease, sql: &str) -> LeaseAction {
63        let event = TransactionEvent::detect(sql);
64
65        // Update lease transaction state
66        let _action = lease.on_statement_complete(sql);
67
68        // Override for statement mode specifics
69        match event {
70            TransactionEvent::Begin => {
71                // Explicit transaction - hold
72                LeaseAction::Hold
73            }
74            TransactionEvent::Commit | TransactionEvent::Rollback => {
75                // Transaction ended - reset and release
76                LeaseAction::Reset
77            }
78            _ => {
79                // In explicit transaction, hold
80                if lease.in_transaction() {
81                    LeaseAction::Hold
82                } else {
83                    // Single statement, release immediately
84                    LeaseAction::Reset
85                }
86            }
87        }
88    }
89
90    /// Process transaction end signal from backend
91    pub fn on_transaction_end(&self, lease: &mut ConnectionLease) -> LeaseAction {
92        lease.on_transaction_end()
93    }
94
95    /// Check if connection should be released
96    pub fn should_release(&self, lease: &ConnectionLease) -> bool {
97        !lease.in_transaction()
98    }
99
100    /// Called when client disconnects
101    pub fn on_client_disconnect(&self, _lease: ConnectionLease) -> LeaseAction {
102        LeaseAction::Reset
103    }
104
105    /// Get the pooling mode
106    pub fn mode(&self) -> PoolingMode {
107        PoolingMode::Statement
108    }
109
110    /// Check if autocommit is enabled
111    pub fn autocommit(&self) -> bool {
112        self.autocommit
113    }
114
115    /// Prepared statements are not supported in statement mode
116    pub fn tracks_prepared_statements(&self) -> bool {
117        false
118    }
119
120    /// Check if a query is safe for statement mode
121    ///
122    /// Returns false for queries that require session state.
123    pub fn is_safe_query(&self, sql: &str) -> bool {
124        let upper = sql.trim().to_uppercase();
125
126        // These are not safe for statement mode
127        if upper.starts_with("LISTEN")
128            || upper.starts_with("UNLISTEN")
129            || upper.starts_with("PREPARE")
130            || upper.starts_with("EXECUTE")
131            || upper.starts_with("DEALLOCATE")
132            || upper.starts_with("DECLARE")
133            || upper.starts_with("FETCH")
134            || upper.starts_with("CLOSE")
135            || upper.starts_with("MOVE")
136            || upper.contains("CREATE TEMP")
137            || upper.contains("CREATE TEMPORARY")
138        {
139            return false;
140        }
141
142        // SET commands that affect session state
143        if upper.starts_with("SET ")
144            && !upper.starts_with("SET LOCAL")
145            && !upper.starts_with("SET TRANSACTION")
146        {
147            return false;
148        }
149
150        true
151    }
152
153    /// Get warning if query is unsafe for statement mode
154    pub fn get_query_warning(&self, sql: &str) -> Option<&'static str> {
155        let upper = sql.trim().to_uppercase();
156
157        if upper.starts_with("LISTEN") || upper.starts_with("UNLISTEN") {
158            return Some("LISTEN/UNLISTEN not supported in statement mode - notifications will be lost");
159        }
160
161        if upper.starts_with("PREPARE") || upper.starts_with("EXECUTE") || upper.starts_with("DEALLOCATE") {
162            return Some("Prepared statements not supported in statement mode");
163        }
164
165        if upper.starts_with("DECLARE") || upper.starts_with("FETCH") || upper.starts_with("CLOSE") {
166            return Some("Cursors not supported in statement mode outside explicit transactions");
167        }
168
169        if upper.contains("CREATE TEMP") || upper.contains("CREATE TEMPORARY") {
170            return Some("Temporary tables may not persist correctly in statement mode");
171        }
172
173        if upper.starts_with("SET ") && !upper.starts_with("SET LOCAL") && !upper.starts_with("SET TRANSACTION") {
174            return Some("Session variables may not persist in statement mode - use SET LOCAL within transaction");
175        }
176
177        None
178    }
179}
180
181#[cfg(test)]
182mod tests {
183    use super::*;
184    use crate::connection_pool::ConnectionState;
185    use crate::NodeId;
186    use uuid::Uuid;
187
188    fn create_test_connection() -> PooledConnection {
189        PooledConnection {
190            id: Uuid::new_v4(),
191            node_id: NodeId::new(),
192            created_at: chrono::Utc::now(),
193            last_used: chrono::Utc::now(),
194            state: ConnectionState::InUse,
195            use_count: 1,
196            permit: None,
197            client: None,
198        }
199    }
200
201    #[test]
202    fn test_statement_mode_releases_per_statement() {
203        let handler = StatementModeHandler::new();
204        let conn = create_test_connection();
205        let mut lease = handler.create_lease(conn, ClientId::new());
206
207        // Single statement should release
208        assert_eq!(
209            handler.on_statement_complete(&mut lease, "SELECT 1"),
210            LeaseAction::Reset
211        );
212    }
213
214    #[test]
215    fn test_statement_mode_holds_during_transaction() {
216        let handler = StatementModeHandler::new();
217        let conn = create_test_connection();
218        let mut lease = handler.create_lease(conn, ClientId::new());
219
220        // BEGIN should hold
221        assert_eq!(
222            handler.on_statement_complete(&mut lease, "BEGIN"),
223            LeaseAction::Hold
224        );
225
226        // Statements in transaction should hold
227        assert_eq!(
228            handler.on_statement_complete(&mut lease, "SELECT 1"),
229            LeaseAction::Hold
230        );
231        assert_eq!(
232            handler.on_statement_complete(&mut lease, "INSERT INTO t VALUES (1)"),
233            LeaseAction::Hold
234        );
235
236        // COMMIT should release
237        assert_eq!(
238            handler.on_statement_complete(&mut lease, "COMMIT"),
239            LeaseAction::Reset
240        );
241    }
242
243    #[test]
244    fn test_should_release() {
245        let handler = StatementModeHandler::new();
246        let conn = create_test_connection();
247        let lease = handler.create_lease(conn, ClientId::new());
248
249        // Not in transaction, should release
250        assert!(handler.should_release(&lease));
251    }
252
253    #[test]
254    fn test_safe_query_detection() {
255        let handler = StatementModeHandler::new();
256
257        // Safe queries
258        assert!(handler.is_safe_query("SELECT * FROM users"));
259        assert!(handler.is_safe_query("INSERT INTO users VALUES (1)"));
260        assert!(handler.is_safe_query("UPDATE users SET name = 'foo'"));
261        assert!(handler.is_safe_query("DELETE FROM users WHERE id = 1"));
262        assert!(handler.is_safe_query("SET LOCAL work_mem = '1GB'"));
263
264        // Unsafe queries
265        assert!(!handler.is_safe_query("LISTEN channel"));
266        assert!(!handler.is_safe_query("PREPARE stmt AS SELECT 1"));
267        assert!(!handler.is_safe_query("EXECUTE stmt"));
268        assert!(!handler.is_safe_query("DECLARE cursor CURSOR FOR SELECT 1"));
269        assert!(!handler.is_safe_query("CREATE TEMP TABLE t (id int)"));
270        assert!(!handler.is_safe_query("SET work_mem = '1GB'"));
271    }
272
273    #[test]
274    fn test_query_warnings() {
275        let handler = StatementModeHandler::new();
276
277        assert!(handler.get_query_warning("LISTEN channel").is_some());
278        assert!(handler.get_query_warning("PREPARE stmt AS SELECT 1").is_some());
279        assert!(handler.get_query_warning("CREATE TEMP TABLE t (id int)").is_some());
280        assert!(handler.get_query_warning("SET work_mem = '1GB'").is_some());
281
282        assert!(handler.get_query_warning("SELECT 1").is_none());
283        assert!(handler.get_query_warning("SET LOCAL work_mem = '1GB'").is_none());
284    }
285
286    #[test]
287    fn test_mode() {
288        let handler = StatementModeHandler::new();
289        assert_eq!(handler.mode(), PoolingMode::Statement);
290    }
291
292    #[test]
293    fn test_no_prepared_statement_support() {
294        let handler = StatementModeHandler::new();
295        assert!(!handler.tracks_prepared_statements());
296    }
297}