Skip to main content

heliosdb_proxy/
server.rs

1//! Proxy Server Implementation
2//!
3//! Main server that accepts client connections and routes them to backends.
4//! Implements PostgreSQL wire protocol forwarding with TWR (Transparent Write Routing).
5
6use crate::admin::{AdminServer, AdminState, ConfigSnapshot, NodeSnapshot};
7#[cfg(feature = "ha-tr")]
8use crate::backend::{tls::default_client_config, BackendConfig, TlsMode};
9use crate::client_tls::{build_tls_acceptor, ClientStream};
10use crate::config::{HbaAction, HbaRule, NodeConfig, NodeRole, ProxyConfig, TrMode};
11#[cfg(feature = "wasm-plugins")]
12use crate::protocol::QueryMessage;
13use crate::protocol::{
14    ErrorResponse, Message, MessageType, ProtocolCodec, StartupMessage, TransactionStatus,
15};
16use crate::{ProxyError, Result};
17use arc_swap::ArcSwap;
18use bytes::{BufMut, BytesMut};
19use dashmap::DashMap;
20use std::collections::{HashMap, HashSet};
21use std::net::SocketAddr;
22use std::sync::atomic::{AtomicU64, Ordering};
23use std::sync::Arc;
24use std::time::Duration;
25use tokio::io::{AsyncReadExt, AsyncWriteExt};
26use tokio::net::{TcpListener, TcpStream};
27use tokio::sync::{broadcast, RwLock};
28use uuid::Uuid;
29
30// Pool-modes feature imports
31#[cfg(feature = "pool-modes")]
32use crate::pool::lease::ClientId;
33#[cfg(feature = "pool-modes")]
34use crate::pool::{ConnectionPoolManager, PoolModeConfig, PoolingMode};
35#[cfg(feature = "pool-modes")]
36use crate::NodeEndpoint;
37
38// WASM plugin system imports
39#[cfg(feature = "wasm-plugins")]
40use crate::plugins::{
41    AuthRequest as PluginAuthRequest, AuthResult, HookContext, HookType, Identity, PluginManager,
42    PostQueryOutcome, PreQueryResult, QueryContext, RouteResult,
43};
44
45/// Proxy server
46pub struct ProxyServer {
47    config: ProxyConfig,
48    state: Arc<ServerState>,
49    shutdown_tx: broadcast::Sender<()>,
50    /// Path the config was loaded from, retained so `SIGHUP` can re-read it
51    /// for a zero-downtime reload (Batch H). `None` when the config was built
52    /// from CLI flags/defaults rather than a file.
53    config_path: Option<String>,
54}
55
56/// Stand-in "signal stream" on platforms without Unix signals: its `recv()`
57/// never resolves, so the `SIGHUP` select arm is simply inert there.
58#[cfg(not(unix))]
59struct HangupNever;
60#[cfg(not(unix))]
61impl HangupNever {
62    async fn recv(&mut self) -> Option<()> {
63        std::future::pending().await
64    }
65}
66
67/// Build the BackendConfig template the time-travel replay engine
68/// uses for its target connection. The replay handler swaps in
69/// `target_host` / `target_port` per request; everything else
70/// (auth, TLS policy, timeouts) comes from this template.
71///
72/// Auth defaults to the bare PostgreSQL `postgres` superuser without
73/// a password — sensible for local development against `trust` auth,
74/// never for production. Per-call credential overrides on
75/// ReplayRequestBody land in FU-21.
76///
77/// `_config` is kept in the signature so future iterations can pull
78/// shared TLS / timeout settings from the proxy config without
79/// changing the call site.
80#[cfg(feature = "ha-tr")]
81fn build_replay_backend_template(_config: &ProxyConfig) -> BackendConfig {
82    BackendConfig {
83        host: "placeholder".to_string(),
84        port: 0,
85        user: "postgres".to_string(),
86        password: None,
87        database: None,
88        application_name: Some("heliosdb-proxy-replay".to_string()),
89        tls_mode: TlsMode::Disable,
90        connect_timeout: Duration::from_secs(5),
91        query_timeout: Duration::from_secs(30),
92        tls_config: default_client_config(),
93    }
94}
95
96/// Cheap query-shape fingerprint for the anomaly detector. Replaces
97/// numeric and string literals with `?` placeholders, lower-cases
98/// keywords, and collapses whitespace. Same shape regardless of
99/// literal values — `SELECT * FROM users WHERE id = 1` and
100/// `SELECT * FROM users WHERE id = 99` map to the same fingerprint.
101///
102/// Not a parser. The analytics module has the canonical normaliser
103/// when query-analytics is on; this is a lightweight standalone so
104/// the anomaly detector works even when analytics is off.
105#[cfg(feature = "anomaly-detection")]
106fn anomaly_fingerprint(sql: &str) -> String {
107    let mut out = String::with_capacity(sql.len());
108    let mut in_single = false;
109    let mut prev_space = false;
110    let mut chars = sql.chars().peekable();
111    while let Some(c) = chars.next() {
112        if c == '\'' {
113            in_single = !in_single;
114            // Replace the entire string literal (open + body +
115            // close) with a single ?.
116            if in_single {
117                out.push('?');
118                while let Some(&n) = chars.peek() {
119                    chars.next();
120                    if n == '\'' {
121                        in_single = false;
122                        break;
123                    }
124                }
125                prev_space = false;
126                continue;
127            }
128        }
129        if c.is_ascii_digit() {
130            if !out.ends_with('?') {
131                out.push('?');
132            }
133            // Skip the rest of the number.
134            while matches!(chars.peek(), Some(c) if c.is_ascii_digit() || *c == '.') {
135                chars.next();
136            }
137            prev_space = false;
138            continue;
139        }
140        if c.is_ascii_whitespace() {
141            if !prev_space && !out.is_empty() {
142                out.push(' ');
143                prev_space = true;
144            }
145            continue;
146        }
147        out.push(c.to_ascii_lowercase());
148        prev_space = false;
149    }
150    out.trim_end().to_string()
151}
152
153/// Server runtime state
154struct ServerState {
155    /// Active client sessions
156    sessions: RwLock<HashMap<Uuid, Arc<ClientSession>>>,
157    /// Node health status
158    // Read-mostly: only the periodic health checker writes (a full-map
159    // swap), every query reads. ArcSwap makes the per-query read a single
160    // lock-free atomic load with no await, no semaphore, no guard held
161    // across the routing awaits.
162    health: ArcSwap<HashMap<String, NodeHealth>>,
163    /// Write-serialization lock for `health`. Every reader stays lock-free on
164    /// the ArcSwap; every *writer* (periodic checker, in-band demotion, SIGHUP
165    /// reconcile) holds this across its load → clone → mutate → store so the
166    /// non-atomic read-modify-write cannot lose updates under concurrency.
167    health_write: parking_lot::Mutex<()>,
168    /// Live, reloadable proxy configuration (Batch H). The accept loop snapshots
169    /// this per new connection and the health checker reads it each tick, so a
170    /// SIGHUP that swaps it takes effect for new connections and node health
171    /// without dropping any in-flight session. The fields that can only be
172    /// applied at startup (listen/admin socket addresses) are ignored on reload
173    /// with a warning. Existing connections keep the snapshot they started with.
174    live_config: ArcSwap<ProxyConfig>,
175    /// Metrics
176    metrics: ServerMetrics,
177    /// Query-cancellation routing. Maps the BackendKeyData (pid, secret)
178    /// the backend handed to the client onto the backend address that
179    /// issued it, so a later out-of-band CancelRequest (which arrives on a
180    /// fresh connection) can be forwarded to the right backend instead of
181    /// being dropped. Bounded; best-effort.
182    cancel_map: Arc<DashMap<(u32, u32), String>>,
183    /// Insertion order of `cancel_map` keys, so an overflow evicts the OLDEST
184    /// entries (FIFO) instead of clearing the whole map — a busy proxy no
185    /// longer loses every in-flight cancel registration at once.
186    cancel_order: Arc<parking_lot::Mutex<std::collections::VecDeque<(u32, u32)>>>,
187    /// Client-facing TLS acceptor, built from `[tls]` config when enabled.
188    /// `None` => the proxy rejects SSLRequests with `N` (plaintext only).
189    tls_acceptor: Option<tokio_rustls::TlsAcceptor>,
190    /// Proxy-terminated SCRAM auth state. `Some` when `[auth] mode = "scram"`:
191    /// the proxy authenticates clients itself against this user list instead
192    /// of relaying their credentials to the backend.
193    auth_file: Option<Arc<crate::auth_scram::AuthFile>>,
194    /// Traffic-mirror handle. `Some` when `[mirror] enabled`: the data path
195    /// offers write statements to a background mirror worker.
196    mirror: Option<crate::mirror::MirrorHandle>,
197    /// Migration cutover switch. When `Some`, NEW client connections are
198    /// transparently redirected to the promoted target (the former mirror)
199    /// instead of the configured primary. Set via POST /api/migration/cutover.
200    cutover: Arc<ArcSwap<Option<Arc<crate::mirror::CutoverTarget>>>>,
201    /// Load balancer state
202    lb_state: LoadBalancerState,
203    /// SQL-comment routing-hint parser. `Some` when `[routing_hints] enabled`
204    /// and the `routing-hints` feature is compiled in; the parser's own
205    /// `strip_hints` flag records whether to rewrite the SQL before forwarding.
206    /// Applied per query, taking precedence over default verb routing.
207    #[cfg(feature = "routing-hints")]
208    hint_parser: Option<crate::routing::HintParser>,
209    /// Multi-dimensional rate limiter. `Some` when `[rate_limit] enabled`;
210    /// every query is checked against it before being forwarded to a backend.
211    #[cfg(feature = "rate-limiting")]
212    rate_limiter: Option<Arc<crate::rate_limit::RateLimiter>>,
213    /// Per-node circuit breaker manager. `Some` when `[circuit_breaker]
214    /// enabled`. Records per-node success/failure on the forward path, excludes
215    /// open nodes from read selection, and fast-fails queries to an open node.
216    #[cfg(feature = "circuit-breaker")]
217    circuit_breaker: Option<Arc<crate::circuit_breaker::CircuitBreakerManager>>,
218    /// Query analytics engine. `Some` when `[analytics] enabled`. Every
219    /// forwarded query is recorded (fingerprint, latency, slow-query log).
220    #[cfg(feature = "query-analytics")]
221    analytics: Option<Arc<crate::analytics::QueryAnalytics>>,
222    /// Query-result cache (L1 hot / L2 warm). `Some` when `[cache] enabled`.
223    /// Read SELECTs are served from it; writes invalidate referenced tables.
224    #[cfg(feature = "query-cache")]
225    query_cache: Option<Arc<crate::cache::QueryCache>>,
226    /// SQL query rewriter. `Some` when `[query_rewrite] enabled` with rules.
227    /// Rewrites the query SQL on the path before forwarding.
228    #[cfg(feature = "query-rewriting")]
229    rewriter: Option<Arc<crate::rewriter::QueryRewriter>>,
230    /// Multi-tenancy manager. `Some` when `[multi_tenancy] enabled`. Identifies
231    /// the tenant for a session and injects a row-level tenant filter.
232    #[cfg(feature = "multi-tenancy")]
233    tenant_manager: Option<Arc<crate::multi_tenancy::TenantManager>>,
234    /// Schema/workload query analyzer. `Some` when `[schema_routing] enabled`;
235    /// analytical (OLAP) queries are routed to the configured analytics node.
236    #[cfg(feature = "schema-routing")]
237    schema_analyzer: Option<Arc<crate::schema_routing::QueryAnalyzer>>,
238    /// Pool manager for Session/Transaction/Statement modes
239    #[cfg(feature = "pool-modes")]
240    pool_manager: Option<Arc<ConnectionPoolManager>>,
241    /// Data-path idle backend-connection pool. `Some` only when pooling is
242    /// active (mode is Transaction or Statement); `None` leaves the 1:1
243    /// session-pinned path completely unchanged. This is the raw-stream pool
244    /// the data path actually leases from, keyed by `(node, user, database)`.
245    #[cfg(feature = "pool-modes")]
246    backend_pool: Option<Arc<crate::pool::BackendIdlePool>>,
247    /// WASM plugin manager. `None` means no plugins loaded — the per-query
248    /// hook path becomes a fast no-op. When `Some`, `PreQuery` / `PostQuery`
249    /// hooks fire on every simple-query message.
250    #[cfg(feature = "wasm-plugins")]
251    plugin_manager: Option<Arc<PluginManager>>,
252    /// Shared transaction journal — single sink for per-session
253    /// statement journaling. The replay engine reads windows from
254    /// this directly. Always present when the `ha-tr` feature is on;
255    /// journaling self-disables internally when not configured.
256    #[cfg(feature = "ha-tr")]
257    transaction_journal: Arc<crate::transaction_journal::TransactionJournal>,
258    /// Anomaly detector (T3.1). Records every query and every
259    /// auth outcome; surfaces detections via /api/anomalies.
260    #[cfg(feature = "anomaly-detection")]
261    anomaly_detector: Arc<crate::anomaly::AnomalyDetector>,
262    /// Edge cache + home registry (T3.2). Both always-present even
263    /// in Home mode (the cache is a no-op there); avoids an extra
264    /// Option in the hot path.
265    #[cfg(feature = "edge-proxy")]
266    edge_cache: Arc<crate::edge::EdgeCache>,
267    #[cfg(feature = "edge-proxy")]
268    edge_registry: Arc<crate::edge::EdgeRegistry>,
269}
270
271/// Node health status
272#[derive(Debug, Clone)]
273pub struct NodeHealth {
274    /// Node address
275    pub address: String,
276    /// Whether node is healthy
277    pub healthy: bool,
278    /// Last check time
279    pub last_check: chrono::DateTime<chrono::Utc>,
280    /// Consecutive failures
281    pub failure_count: u32,
282    /// Last error message
283    pub last_error: Option<String>,
284    /// Average latency (ms)
285    pub latency_ms: f64,
286    /// Replication lag (if applicable)
287    pub replication_lag_bytes: Option<u64>,
288}
289
290/// Server metrics
291#[derive(Default)]
292struct ServerMetrics {
293    /// Total connections accepted
294    connections_accepted: AtomicU64,
295    /// Total connections closed
296    connections_closed: AtomicU64,
297    /// Total queries processed
298    queries_processed: AtomicU64,
299    /// Total bytes received from clients
300    bytes_received: AtomicU64,
301    /// Total bytes sent to clients
302    bytes_sent: AtomicU64,
303    /// Failover count
304    failovers: AtomicU64,
305}
306
307/// Load balancer state
308struct LoadBalancerState {
309    /// Round-robin counter. Atomic so the read-routing path never
310    /// takes a write lock just to advance the rotation.
311    rr_counter: AtomicU64,
312}
313
314/// Client session
315pub struct ClientSession {
316    /// Session ID
317    pub id: Uuid,
318    /// Client address
319    pub client_addr: SocketAddr,
320    /// Current backend node
321    pub current_node: RwLock<Option<String>>,
322    /// Transaction state
323    pub tx_state: RwLock<TransactionState>,
324    /// Session variables
325    pub variables: RwLock<HashMap<String, String>>,
326    /// Created at
327    pub created_at: chrono::DateTime<chrono::Utc>,
328    /// TR mode for this session
329    pub tr_mode: TrMode,
330    /// Wall-clock instant of this session's most recent write, for
331    /// read-your-writes routing: reads within the configured window after a
332    /// write are pinned to the primary so the client observes its own writes
333    /// despite replica lag.
334    #[cfg(feature = "lag-routing")]
335    pub last_write_at: RwLock<Option<std::time::Instant>>,
336    /// Client ID for pool-modes lease tracking
337    #[cfg(feature = "pool-modes")]
338    pub pool_client_id: ClientId,
339    /// Identity returned by an `Authenticate` plugin, if any. Downstream
340    /// plugins (masking, residency routing, cost governor) read this to
341    /// gate per-user policy. `None` when no plugin ran or every plugin
342    /// deferred to the default auth flow.
343    #[cfg(feature = "wasm-plugins")]
344    pub plugin_identity: RwLock<Option<Identity>>,
345}
346
347/// Transaction state
348#[derive(Debug, Clone, Default)]
349pub struct TransactionState {
350    /// Whether in a transaction
351    pub in_transaction: bool,
352    /// Transaction ID
353    pub tx_id: Option<Uuid>,
354    /// Statements executed in current transaction
355    pub statements: Vec<StatementLog>,
356    /// Read-only transaction
357    pub read_only: bool,
358    /// Savepoints
359    pub savepoints: Vec<String>,
360}
361
362/// Logged statement for TR replay
363#[derive(Debug, Clone)]
364pub struct StatementLog {
365    /// Statement SQL
366    pub sql: String,
367    /// Parameters
368    pub params: Vec<String>,
369    /// Result checksum
370    pub result_checksum: Option<u64>,
371    /// Execution time
372    pub executed_at: chrono::DateTime<chrono::Utc>,
373}
374
375/// A cached per-session backend connection plus the set of *named* prepared
376/// statements known to be live on **this** socket.
377///
378/// Tying the prepared-statement set to the socket (rather than to the node
379/// address) is what makes prepared statements survive a backend switch: when a
380/// connection is dropped and redialed, or when a session is routed to a
381/// different node, the fresh `BackendConn` starts with an empty set, so the
382/// proxy transparently re-issues the original `Parse` for any named statement
383/// the target connection is missing before forwarding a `Bind`/`Describe` that
384/// references it (Batch F.4). The session keeps the canonical `Parse` bytes in
385/// a separate registry; this set is just "what does *this* socket already
386/// know".
387struct BackendConn {
388    stream: TcpStream,
389    prepared: HashSet<String>,
390    /// Signature (query text + parameter-type OIDs) of the *unnamed* prepared
391    /// statement currently established on this socket, if any. When the client
392    /// re-sends an identical unnamed `Parse`, the proxy can skip forwarding it
393    /// (the backend's unnamed statement already holds that SQL) and synthesize
394    /// the `ParseComplete` locally — the unnamed-Parse promotion (Batch H).
395    unnamed_sig: Option<bytes::Bytes>,
396}
397
398impl BackendConn {
399    fn new(stream: TcpStream) -> Self {
400        Self {
401            stream,
402            prepared: HashSet::new(),
403            unnamed_sig: None,
404        }
405    }
406}
407
408/// Bind a TCP listener with `SO_REUSEADDR` + `SO_REUSEPORT` so a second process
409/// can bind the same address concurrently (the kernel then load-balances new
410/// connections across both). This is what lets a new binary take over new
411/// connections while the old one drains — used for both the client and admin
412/// listeners so a binary handoff can re-bind every address (Batch H).
413pub(crate) fn bind_reuseport(addr: &str) -> Result<TcpListener> {
414    use socket2::{Domain, Protocol, Socket, Type};
415    let sockaddr: SocketAddr = addr
416        .parse()
417        .map_err(|e| ProxyError::Config(format!("invalid listen address '{}': {}", addr, e)))?;
418    let domain = if sockaddr.is_ipv6() {
419        Domain::IPV6
420    } else {
421        Domain::IPV4
422    };
423    let socket = Socket::new(domain, Type::STREAM, Some(Protocol::TCP))
424        .map_err(|e| ProxyError::Network(format!("socket(): {}", e)))?;
425    socket
426        .set_reuse_address(true)
427        .map_err(|e| ProxyError::Network(format!("SO_REUSEADDR: {}", e)))?;
428    #[cfg(all(unix, not(target_os = "solaris")))]
429    socket
430        .set_reuse_port(true)
431        .map_err(|e| ProxyError::Network(format!("SO_REUSEPORT: {}", e)))?;
432    socket
433        .set_nonblocking(true)
434        .map_err(|e| ProxyError::Network(format!("set_nonblocking: {}", e)))?;
435    socket
436        .bind(&sockaddr.into())
437        .map_err(|e| ProxyError::Network(format!("Failed to bind {}: {}", addr, e)))?;
438    socket
439        .listen(1024)
440        .map_err(|e| ProxyError::Network(format!("listen(): {}", e)))?;
441    let std_listener: std::net::TcpListener = socket.into();
442    TcpListener::from_std(std_listener)
443        .map_err(|e| ProxyError::Network(format!("from_std listener: {}", e)))
444}
445
446/// Disposition produced by the pre-query plugin hook stage.
447///
448/// When the `wasm-plugins` feature is off, only `Forward` is ever produced —
449/// the hook dispatch is compiled out entirely and the variant list exists
450/// purely for pattern-match symmetry.
451#[derive(Debug)]
452#[allow(dead_code)] // Block/Cached only constructed under wasm-plugins
453enum PreQueryAction {
454    /// Send the message to the backend as usual.
455    Forward,
456    /// A plugin blocked the query. The caller sends an error + ReadyForQuery
457    /// to the client and skips backend forwarding.
458    Block(String),
459    /// A plugin returned a cached response. Not yet wired — response
460    /// synthesis from raw bytes requires building a full protocol reply
461    /// (RowDescription + DataRow(s) + CommandComplete + ReadyForQuery),
462    /// which is the next step of T0-a. For now the caller falls back to
463    /// `Forward` and logs a warning.
464    Cached(Vec<u8>),
465}
466
467/// Override produced by the Route plugin hook. Consumed by `route_and_forward`
468/// when deciding which backend to talk to.
469///
470/// As with `PreQueryAction`, only `None` is ever produced when the
471/// `wasm-plugins` feature is off.
472#[derive(Debug)]
473#[allow(dead_code)] // Primary/Standby/Node/Block only constructed under wasm-plugins
474enum RouteOverride {
475    /// No override — use the default SQL-verb-based routing.
476    None,
477    /// Force the write path (use `select_primary_with_timeout`).
478    Primary,
479    /// Force the read path (use `select_read_node`).
480    Standby,
481    /// Use this exact node address. Takes precedence over the is_write
482    /// heuristic; the proxy will still verify the node is healthy before
483    /// connecting (via the normal switch-vs-reuse flow).
484    Node(String),
485    /// Reject the query: write a PG ErrorResponse + ReadyForQuery to
486    /// the client and skip the forward. Carries the reason the plugin
487    /// supplied. Takes precedence over every other field — the proxy
488    /// short-circuits before any backend selection.
489    Block(String),
490}
491
492impl ProxyServer {
493    /// Build a `PluginManager` from config and preload plugins from disk.
494    ///
495    /// Returns `None` when plugins are disabled in config, when the
496    /// runtime fails to initialise, or when the plugin directory is
497    /// missing. Individual per-file load failures are logged but do not
498    /// abort startup — the remaining plugins load normally and the
499    /// proxy stays up.
500    #[cfg(feature = "wasm-plugins")]
501    fn init_plugin_manager(
502        toml_cfg: &crate::config::PluginToml,
503    ) -> Option<Arc<crate::plugins::PluginManager>> {
504        if !toml_cfg.enabled {
505            return None;
506        }
507
508        let runtime_cfg = crate::plugins::PluginRuntimeConfig::from(toml_cfg);
509        let plugin_dir = runtime_cfg.plugin_dir.clone();
510
511        let pm = match crate::plugins::PluginManager::new(runtime_cfg) {
512            Ok(pm) => Arc::new(pm),
513            Err(e) => {
514                tracing::error!(error = %e, "Failed to create plugin manager; plugins disabled");
515                return None;
516            }
517        };
518
519        match std::fs::read_dir(&plugin_dir) {
520            Ok(entries) => {
521                let mut loaded = 0usize;
522                let mut failed = 0usize;
523                for entry in entries.flatten() {
524                    let path = entry.path();
525                    if path.extension().and_then(|s| s.to_str()) != Some("wasm") {
526                        continue;
527                    }
528                    match pm.load_plugin(&path) {
529                        Ok(()) => loaded += 1,
530                        Err(e) => {
531                            failed += 1;
532                            tracing::warn!(
533                                path = %path.display(),
534                                error = %e,
535                                "Failed to load plugin"
536                            );
537                        }
538                    }
539                }
540                tracing::info!(
541                    dir = %plugin_dir.display(),
542                    loaded = loaded,
543                    failed = failed,
544                    "Plugin loading complete"
545                );
546            }
547            Err(e) => {
548                tracing::warn!(
549                    dir = %plugin_dir.display(),
550                    error = %e,
551                    "Plugin directory not readable; no plugins loaded"
552                );
553            }
554        }
555
556        Some(pm)
557    }
558
559    /// Create a new proxy server
560    pub fn new(config: ProxyConfig) -> Result<Self> {
561        let (shutdown_tx, _) = broadcast::channel(1);
562
563        // Initialize health status
564        let mut health = HashMap::new();
565        for node in &config.nodes {
566            health.insert(
567                node.address(),
568                NodeHealth {
569                    address: node.address(),
570                    healthy: true, // Assume healthy until proven otherwise
571                    last_check: chrono::Utc::now(),
572                    failure_count: 0,
573                    last_error: None,
574                    latency_ms: 0.0,
575                    replication_lag_bytes: None,
576                },
577            );
578        }
579
580        // Initialize pool manager if pool-modes feature is enabled
581        #[cfg(feature = "pool-modes")]
582        let pool_manager = {
583            use crate::pool::PreparedStatementMode as PoolPreparedStatementMode;
584
585            let pool_config = PoolModeConfig {
586                default_mode: match config.pool_mode.mode {
587                    crate::config::PoolingMode::Session => PoolingMode::Session,
588                    crate::config::PoolingMode::Transaction => PoolingMode::Transaction,
589                    crate::config::PoolingMode::Statement => PoolingMode::Statement,
590                },
591                max_pool_size: config.pool_mode.max_pool_size,
592                min_idle: config.pool_mode.min_idle,
593                idle_timeout_secs: config.pool_mode.idle_timeout_secs,
594                max_lifetime_secs: config.pool_mode.max_lifetime_secs,
595                acquire_timeout_secs: config.pool_mode.acquire_timeout_secs,
596                reset_query: config.pool_mode.reset_query.clone(),
597                prepared_statement_mode: match config.pool_mode.prepared_statement_mode {
598                    crate::config::PreparedStatementMode::Disable => {
599                        PoolPreparedStatementMode::Disable
600                    }
601                    crate::config::PreparedStatementMode::Track => PoolPreparedStatementMode::Track,
602                    crate::config::PreparedStatementMode::Named => PoolPreparedStatementMode::Named,
603                },
604                test_on_acquire: config.pool.test_on_acquire,
605                validation_query: "SELECT 1".to_string(),
606                queue_timeout_secs: 30,
607                max_queue_size: 0,
608            };
609            Some(Arc::new(ConnectionPoolManager::new(pool_config)))
610        };
611
612        // The raw-stream data-path pool is only built when pooling is active
613        // (Transaction/Statement). Session mode leaves it `None` so the hot
614        // path is byte-for-byte unchanged.
615        #[cfg(feature = "pool-modes")]
616        let backend_pool = match config.pool_mode.mode {
617            crate::config::PoolingMode::Transaction | crate::config::PoolingMode::Statement => {
618                tracing::info!(
619                    mode = ?config.pool_mode.mode,
620                    max_idle_per_identity = config.pool_mode.max_pool_size,
621                    "pool-modes: data-path connection pooling enabled"
622                );
623                Some(Arc::new(crate::pool::BackendIdlePool::new(
624                    config.pool_mode.max_pool_size as usize,
625                    Self::MAX_TOTAL_IDLE_BACKEND_CONNS,
626                )))
627            }
628            crate::config::PoolingMode::Session => None,
629        };
630
631        // Initialize plugin manager if the wasm-plugins feature is enabled
632        // AND plugins are turned on in config. Scans plugin_dir for `.wasm`
633        // files and loads each; a missing directory is non-fatal and logs
634        // a warning so empty deployments don't fail startup.
635        #[cfg(feature = "wasm-plugins")]
636        let plugin_manager = Self::init_plugin_manager(&config.plugins);
637
638        // Build the client TLS acceptor if [tls] is configured + enabled.
639        // A bad cert/key is fatal at startup (fail fast, don't silently
640        // fall back to plaintext for a deployment that asked for TLS).
641        let tls_acceptor = match config.tls.as_ref() {
642            Some(tls) if tls.enabled => match build_tls_acceptor(tls) {
643                Ok(acc) => {
644                    tracing::info!(
645                        mtls = tls.require_client_cert,
646                        "client TLS termination enabled"
647                    );
648                    Some(acc)
649                }
650                Err(e) => {
651                    return Err(ProxyError::Config(format!("TLS init failed: {}", e)));
652                }
653            },
654            _ => None,
655        };
656
657        // Load the SCRAM auth_file when proxy-terminated auth is requested.
658        // Misconfiguration is fatal at startup (fail fast).
659        let auth_file = if config.auth.mode == crate::config::AuthMode::Scram {
660            let path = config.auth.auth_file.as_ref().ok_or_else(|| {
661                ProxyError::Config("auth mode 'scram' requires auth_file".to_string())
662            })?;
663            let af = crate::auth_scram::AuthFile::load(path)
664                .map_err(|e| ProxyError::Config(format!("auth_file: {}", e)))?;
665            tracing::info!(users = %(!af.is_empty()), "proxy SCRAM auth enabled");
666            Some(Arc::new(af))
667        } else {
668            None
669        };
670
671        // Spawn the traffic-mirror worker when enabled (we are inside the
672        // tokio runtime here — main is #[tokio::main]).
673        let mirror = if config.mirror.enabled {
674            tracing::info!(target = %format!("{}:{}", config.mirror.backend_host, config.mirror.backend_port),
675                writes_only = config.mirror.writes_only, "traffic mirroring enabled");
676            Some(crate::mirror::spawn(config.mirror.clone()))
677        } else {
678            None
679        };
680
681        // Build the rate limiter from the TOML config when enabled.
682        #[cfg(feature = "rate-limiting")]
683        let rate_limiter = if config.rate_limit.enabled {
684            let rl = &config.rate_limit;
685            tracing::info!(
686                qps = rl.default_qps,
687                burst = rl.default_burst,
688                key_by = ?rl.key_by,
689                "rate limiting enabled"
690            );
691            let rlc = crate::rate_limit::RateLimitConfig {
692                enabled: true,
693                default_qps: rl.default_qps,
694                default_burst: rl.default_burst,
695                default_concurrency: if rl.max_concurrent > 0 {
696                    rl.max_concurrent
697                } else {
698                    crate::rate_limit::RateLimitConfig::default().default_concurrency
699                },
700                ..Default::default()
701            };
702            Some(Arc::new(crate::rate_limit::RateLimiter::new(rlc)))
703        } else {
704            None
705        };
706
707        // Build the per-node circuit breaker manager when enabled.
708        #[cfg(feature = "circuit-breaker")]
709        let circuit_breaker = if config.circuit_breaker.enabled {
710            let cb = &config.circuit_breaker;
711            tracing::info!(
712                failure_threshold = cb.failure_threshold,
713                open_secs = cb.open_secs,
714                "circuit breaker enabled"
715            );
716            let cbc = crate::circuit_breaker::CircuitBreakerConfig {
717                failure_threshold: cb.failure_threshold,
718                cooldown: Duration::from_secs(cb.open_secs),
719                half_open_success_threshold: cb.success_threshold,
720                ..Default::default()
721            };
722            let mgr = crate::circuit_breaker::CircuitBreakerManager::new(
723                crate::circuit_breaker::ManagerConfig::new(cbc),
724            );
725            Some(Arc::new(mgr))
726        } else {
727            None
728        };
729
730        // Build the query-analytics engine when enabled.
731        #[cfg(feature = "query-analytics")]
732        let analytics = if config.analytics.enabled {
733            let a = &config.analytics;
734            tracing::info!(
735                slow_query_ms = a.slow_query_ms,
736                max_fingerprints = a.max_fingerprints,
737                "query analytics enabled"
738            );
739            let ac = crate::analytics::AnalyticsConfig {
740                enabled: true,
741                max_fingerprints: a.max_fingerprints as usize,
742                slow_query: crate::analytics::SlowQueryConfig {
743                    threshold: Duration::from_millis(a.slow_query_ms),
744                    ..Default::default()
745                },
746                ..Default::default()
747            };
748            Some(Arc::new(crate::analytics::QueryAnalytics::new(ac)))
749        } else {
750            None
751        };
752
753        // Build the query-result cache when enabled.
754        #[cfg(feature = "query-cache")]
755        let query_cache = if config.cache.enabled {
756            let c = &config.cache;
757            tracing::info!(
758                ttl_secs = c.ttl_secs,
759                max_result_bytes = c.max_result_bytes,
760                "query cache enabled (L1 hot + L2 warm)"
761            );
762            let ttl = Duration::from_secs(c.ttl_secs);
763            let cc = crate::cache::CacheConfig {
764                enabled: true,
765                default_ttl: ttl,
766                max_result_size: c.max_result_bytes,
767                l1: crate::cache::L1Config {
768                    ttl,
769                    ..Default::default()
770                },
771                l2: crate::cache::L2Config {
772                    ttl,
773                    ..Default::default()
774                },
775                ..Default::default()
776            };
777            Some(Arc::new(crate::cache::QueryCache::new(cc)))
778        } else {
779            None
780        };
781
782        // Build the SQL query rewriter from the configured rules.
783        #[cfg(feature = "query-rewriting")]
784        let rewriter = if config.query_rewrite.enabled && !config.query_rewrite.rules.is_empty() {
785            use crate::rewriter::{
786                QueryPattern, QueryRewriter, RewriteRule, RewriterConfig, Transformation,
787            };
788            let rw = QueryRewriter::new(RewriterConfig {
789                enabled: true,
790                ..Default::default()
791            });
792            let mut n = 0usize;
793            for (i, r) in config.query_rewrite.rules.iter().enumerate() {
794                let transformation =
795                    if let (Some(from), Some(to)) = (&r.match_table, &r.replace_table_with) {
796                        Transformation::ReplaceTable {
797                            from: from.clone(),
798                            to: to.clone(),
799                        }
800                    } else if let Some(w) = &r.append_where {
801                        Transformation::AppendWhereAnd(w.clone())
802                    } else if let Some(limit) = r.add_limit {
803                        Transformation::AddLimit(limit)
804                    } else {
805                        continue; // no transformation specified — skip
806                    };
807                let pattern = if let Some(t) = &r.match_table {
808                    QueryPattern::Table(t.clone())
809                } else if let Some(re) = &r.match_regex {
810                    QueryPattern::regex(re.clone())
811                } else {
812                    QueryPattern::All
813                };
814                rw.add_rule(
815                    RewriteRule::build(format!("rule-{i}"))
816                        .pattern(pattern)
817                        .transform(transformation)
818                        .build(),
819                );
820                n += 1;
821            }
822            tracing::info!(rules = n, "query rewriting enabled");
823            Some(Arc::new(rw))
824        } else {
825            None
826        };
827
828        // Build the multi-tenancy manager from the configured tenants.
829        #[cfg(feature = "multi-tenancy")]
830        let tenant_manager =
831            if config.multi_tenancy.enabled && !config.multi_tenancy.tenants.is_empty() {
832                use crate::multi_tenancy::{
833                    IdentificationMethod, IsolationStrategy, MultiTenancyConfig, TenantConfig,
834                    TenantId, TenantManagerBuilder, TenantQueryTransformer,
835                };
836                let mt = &config.multi_tenancy;
837                let identification = match mt.identify_by.as_str() {
838                    "database" => IdentificationMethod::DatabaseName,
839                    param => IdentificationMethod::Header {
840                        header_name: param.to_string(),
841                    },
842                };
843                let mtc = MultiTenancyConfig {
844                    enabled: true,
845                    identification,
846                    ..Default::default()
847                };
848                // Configure which tables are tenant-scoped + the filter column.
849                let table_refs: Vec<&str> = mt.tenant_tables.iter().map(|s| s.as_str()).collect();
850                let transformer = TenantQueryTransformer::new()
851                    .register_tables(&table_refs, mt.tenant_column.clone());
852                let tm = TenantManagerBuilder::new()
853                    .config(mtc)
854                    .query_transformer(transformer)
855                    .build();
856                for id in &mt.tenants {
857                    tm.register_tenant(TenantConfig::new(
858                        TenantId::new(id.clone()),
859                        IsolationStrategy::row("public", mt.tenant_column.clone()),
860                    ));
861                }
862                tracing::info!(
863                    tenants = mt.tenants.len(),
864                    identify_by = %mt.identify_by,
865                    "multi-tenancy enabled"
866                );
867                Some(Arc::new(tm))
868            } else {
869                None
870            };
871
872        // Build the schema/workload query analyzer when enabled.
873        #[cfg(feature = "schema-routing")]
874        let schema_analyzer =
875            if config.schema_routing.enabled && !config.schema_routing.analytics_node.is_empty() {
876                tracing::info!(
877                    analytics_node = %config.schema_routing.analytics_node,
878                    "schema/workload routing enabled (OLAP -> analytics node)"
879                );
880                let registry = Arc::new(crate::schema_routing::SchemaRegistry::new());
881                Some(Arc::new(crate::schema_routing::QueryAnalyzer::new(
882                    registry,
883                )))
884            } else {
885                None
886            };
887
888        let state = Arc::new(ServerState {
889            sessions: RwLock::new(HashMap::new()),
890            health: ArcSwap::from_pointee(health),
891            health_write: parking_lot::Mutex::new(()),
892            live_config: ArcSwap::from_pointee(config.clone()),
893            metrics: ServerMetrics::default(),
894            cancel_map: Arc::new(DashMap::new()),
895            cancel_order: Arc::new(parking_lot::Mutex::new(std::collections::VecDeque::new())),
896            tls_acceptor,
897            auth_file,
898            mirror,
899            cutover: Arc::new(ArcSwap::from_pointee(None)),
900            lb_state: LoadBalancerState {
901                rr_counter: AtomicU64::new(0),
902            },
903            #[cfg(feature = "routing-hints")]
904            hint_parser: if config.routing_hints.enabled {
905                tracing::info!(
906                    strip = config.routing_hints.strip_hints,
907                    "SQL-comment routing hints enabled"
908                );
909                Some(if config.routing_hints.strip_hints {
910                    crate::routing::HintParser::new()
911                } else {
912                    crate::routing::HintParser::without_stripping()
913                })
914            } else {
915                None
916            },
917            #[cfg(feature = "rate-limiting")]
918            rate_limiter,
919            #[cfg(feature = "circuit-breaker")]
920            circuit_breaker,
921            #[cfg(feature = "query-analytics")]
922            analytics,
923            #[cfg(feature = "query-cache")]
924            query_cache,
925            #[cfg(feature = "query-rewriting")]
926            rewriter,
927            #[cfg(feature = "multi-tenancy")]
928            tenant_manager,
929            #[cfg(feature = "schema-routing")]
930            schema_analyzer,
931            #[cfg(feature = "pool-modes")]
932            pool_manager,
933            #[cfg(feature = "pool-modes")]
934            backend_pool,
935            #[cfg(feature = "wasm-plugins")]
936            plugin_manager,
937            #[cfg(feature = "ha-tr")]
938            transaction_journal: Arc::new(crate::transaction_journal::TransactionJournal::new()),
939            #[cfg(feature = "anomaly-detection")]
940            anomaly_detector: Arc::new(crate::anomaly::AnomalyDetector::new(
941                crate::anomaly::AnomalyConfig::default(),
942            )),
943            #[cfg(feature = "edge-proxy")]
944            edge_cache: Arc::new(crate::edge::EdgeCache::new(10_000)),
945            #[cfg(feature = "edge-proxy")]
946            edge_registry: Arc::new(crate::edge::EdgeRegistry::new(
947                32,
948                std::time::Duration::from_secs(120),
949            )),
950        });
951
952        Ok(Self {
953            config,
954            state,
955            shutdown_tx,
956            config_path: None,
957        })
958    }
959
960    /// Record the config file path so `SIGHUP` can re-read it for a live
961    /// reload (Batch H). Without a path (config built from CLI flags/defaults)
962    /// a `SIGHUP` is logged and ignored — there is nothing to re-read.
963    pub fn with_config_path(mut self, path: Option<String>) -> Self {
964        self.config_path = path;
965        self
966    }
967
968    /// A stream that yields once per `SIGHUP`. On non-Unix platforms it never
969    /// yields (config reload is Unix-signal driven).
970    #[cfg(unix)]
971    fn hangup_stream() -> tokio::signal::unix::Signal {
972        tokio::signal::unix::signal(tokio::signal::unix::SignalKind::hangup())
973            .expect("failed to install SIGHUP handler")
974    }
975    #[cfg(not(unix))]
976    fn hangup_stream() -> HangupNever {
977        HangupNever
978    }
979
980    /// A stream that yields once per `SIGUSR2` — the graceful binary-handoff
981    /// drain trigger. Never yields on non-Unix platforms.
982    #[cfg(unix)]
983    fn usr2_stream() -> tokio::signal::unix::Signal {
984        tokio::signal::unix::signal(tokio::signal::unix::SignalKind::user_defined2())
985            .expect("failed to install SIGUSR2 handler")
986    }
987    #[cfg(not(unix))]
988    fn usr2_stream() -> HangupNever {
989        HangupNever
990    }
991
992    /// Wait for in-flight client connections to finish, up to `timeout`. Used by
993    /// the graceful drain after the listener is closed — the session map is the
994    /// live active-connection gauge (one entry per accepted connection).
995    async fn drain_connections(state: &Arc<ServerState>, timeout: Duration) {
996        let deadline = tokio::time::Instant::now() + timeout;
997        loop {
998            let active = state.sessions.read().await.len();
999            if active == 0 {
1000                tracing::info!("drain complete — all in-flight connections finished");
1001                return;
1002            }
1003            if tokio::time::Instant::now() >= deadline {
1004                tracing::warn!(
1005                    active,
1006                    "drain timeout reached — exiting with connections still open"
1007                );
1008                return;
1009            }
1010            tokio::time::sleep(Duration::from_millis(200)).await;
1011        }
1012    }
1013
1014    /// Graceful-drain timeout: how long to keep serving in-flight connections
1015    /// after SIGUSR2 before exiting. Sourced from `shutdown_drain_timeout_secs`
1016    /// in the live config, with the `HELIOS_DRAIN_TIMEOUT_SECS` env var as a
1017    /// runtime override.
1018    fn drain_timeout(config_secs: u64) -> Duration {
1019        let secs = std::env::var("HELIOS_DRAIN_TIMEOUT_SECS")
1020            .ok()
1021            .and_then(|s| s.parse::<u64>().ok())
1022            .unwrap_or(config_secs);
1023        Duration::from_secs(secs)
1024    }
1025
1026    /// Re-read the config file and hot-swap the live config (Batch H).
1027    ///
1028    /// New connections immediately use the reloaded config; in-flight sessions
1029    /// keep the snapshot they began with, so nothing is dropped. A parse error
1030    /// keeps the running config untouched. Socket-bound fields (listen/admin
1031    /// address) cannot change on an already-bound listener and are reported but
1032    /// not applied. The node set is reconciled into the health map so routing
1033    /// sees additions/removals at once.
1034    async fn reload_config(&self) {
1035        let Some(path) = self.config_path.as_deref() else {
1036            tracing::warn!(
1037                "SIGHUP received but config was not loaded from a file — nothing to reload"
1038            );
1039            return;
1040        };
1041        tracing::info!(path, "SIGHUP: reloading configuration");
1042        let new_config = match ProxyConfig::from_file(path) {
1043            Ok(c) => c,
1044            Err(e) => {
1045                tracing::error!(path, error = %e, "SIGHUP reload failed to parse — keeping current config");
1046                return;
1047            }
1048        };
1049        let old = self.state.live_config.load_full();
1050        if new_config.listen_address != old.listen_address {
1051            tracing::warn!(old = %old.listen_address, new = %new_config.listen_address,
1052                "listen_address change needs a restart/handoff; the bound socket is kept");
1053        }
1054        if new_config.admin_address != old.admin_address {
1055            tracing::warn!(old = %old.admin_address, new = %new_config.admin_address,
1056                "admin_address change needs a restart; the bound socket is kept");
1057        }
1058        // Reconcile node health to the new node set before publishing the
1059        // config, so the first connection on the new config can route to it.
1060        Self::reconcile_health(&self.state, &new_config);
1061        let nodes = new_config.nodes.len();
1062        let hba_rules = new_config.hba.len();
1063        let pool_max = new_config.pool.max_connections;
1064        self.state.live_config.store(Arc::new(new_config));
1065        tracing::info!(
1066            nodes,
1067            hba_rules,
1068            pool_max,
1069            "SIGHUP: configuration reloaded — applies to new connections"
1070        );
1071    }
1072
1073    /// Rebuild the health map for `config`'s node set: surviving nodes keep
1074    /// their current health; new nodes are seeded healthy (immediately
1075    /// routable, the next check confirms); removed nodes are dropped.
1076    fn reconcile_health(state: &Arc<ServerState>, config: &ProxyConfig) {
1077        // Serialize against the periodic checker and in-band demotions so this
1078        // rebuild neither clobbers nor is clobbered by a concurrent write.
1079        let _writers = state.health_write.lock();
1080        let current = state.health.load_full();
1081        let mut next: HashMap<String, NodeHealth> = HashMap::new();
1082        for node in &config.nodes {
1083            let addr = node.address();
1084            match current.get(&addr) {
1085                Some(existing) => {
1086                    next.insert(addr, existing.clone());
1087                }
1088                None => {
1089                    tracing::info!(node = %addr, "SIGHUP: new node added — seeding healthy");
1090                    next.insert(
1091                        addr.clone(),
1092                        NodeHealth {
1093                            address: addr,
1094                            healthy: true,
1095                            last_check: chrono::Utc::now(),
1096                            failure_count: 0,
1097                            last_error: None,
1098                            latency_ms: 0.0,
1099                            replication_lag_bytes: None,
1100                        },
1101                    );
1102                }
1103            }
1104        }
1105        for gone in current.keys().filter(|k| !next.contains_key(*k)) {
1106            tracing::info!(node = %gone, "SIGHUP: node removed from config");
1107        }
1108        state.health.store(Arc::new(next));
1109    }
1110
1111    /// Run the proxy server
1112    pub async fn run(&self) -> Result<()> {
1113        // Bind with SO_REUSEPORT so a freshly-started binary can bind the SAME
1114        // listen address concurrently — the kernel load-balances new
1115        // connections across both processes. That is the mechanism behind the
1116        // zero-downtime binary handoff: start the new binary, then SIGUSR2 the
1117        // old one to close its listener and drain (Batch H, item 84).
1118        let listener = bind_reuseport(&self.config.listen_address)?;
1119
1120        tracing::info!(
1121            "Proxy listening on {} (SO_REUSEPORT)",
1122            self.config.listen_address
1123        );
1124
1125        // Start background tasks
1126        let health_task = self.spawn_health_checker();
1127        let pool_task = self.spawn_pool_manager();
1128
1129        // Start admin server
1130        let admin_task = self.spawn_admin_server();
1131
1132        // Start the MCP agent gateway when enabled.
1133        let mcp_task = if self.config.mcp.enabled {
1134            let mcp_cfg = self.config.mcp.clone();
1135            // Resolve the configured agent contract (scoped grants) by id.
1136            let contract = mcp_cfg.contract.as_ref().and_then(|id| {
1137                let found = self.config.agent_contracts.iter().find(|c| &c.id == id).cloned();
1138                if found.is_none() {
1139                    tracing::warn!(%id, "mcp.contract names an unknown agent_contract; gateway runs with only the read-only guardrail");
1140                }
1141                found
1142            });
1143            Some(tokio::spawn(async move {
1144                if let Err(e) = crate::mcp::McpServer::new(mcp_cfg, contract).run().await {
1145                    tracing::error!("MCP gateway error: {}", e);
1146                }
1147            }))
1148        } else {
1149            None
1150        };
1151
1152        // Start the HTTP SQL gateway (Neon-serverless compatible) when enabled.
1153        let http_gw_task = if self.config.http_gateway.enabled {
1154            let gw_cfg = self.config.http_gateway.clone();
1155            Some(tokio::spawn(async move {
1156                if let Err(e) = crate::http_gateway::HttpGateway::new(gw_cfg).run().await {
1157                    tracing::error!("HTTP gateway error: {}", e);
1158                }
1159            }))
1160        } else {
1161            None
1162        };
1163
1164        // Start the GraphQL-to-SQL gateway when enabled.
1165        #[cfg(feature = "graphql-gateway")]
1166        let _graphql_gw_task = if self.config.graphql_gateway.enabled {
1167            let gw_cfg = self.config.graphql_gateway.clone();
1168            Some(tokio::spawn(async move {
1169                if let Err(e) = crate::graphql_gateway::GraphqlGateway::new(gw_cfg)
1170                    .run()
1171                    .await
1172                {
1173                    tracing::error!("GraphQL gateway error: {}", e);
1174                }
1175            }))
1176        } else {
1177            None
1178        };
1179
1180        let mut shutdown_rx = self.shutdown_tx.subscribe();
1181
1182        // SIGHUP -> zero-downtime config reload; SIGUSR2 -> graceful drain for
1183        // binary handoff (Batch H). On platforms without Unix signals these are
1184        // simply never readable.
1185        let mut sighup = Self::hangup_stream();
1186        let mut sigusr2 = Self::usr2_stream();
1187        let mut graceful = false;
1188
1189        loop {
1190            tokio::select! {
1191                _ = sighup.recv() => {
1192                    self.reload_config().await;
1193                }
1194                _ = sigusr2.recv() => {
1195                    tracing::info!(
1196                        "SIGUSR2: graceful binary-handoff drain — closing the listener so new \
1197                         connections route to the sibling process; finishing in-flight connections"
1198                    );
1199                    graceful = true;
1200                    break;
1201                }
1202                accept_result = listener.accept() => {
1203                    match accept_result {
1204                        Ok((stream, addr)) => {
1205                            // PG wire traffic is small request/response
1206                            // frames; Nagle + delayed-ACK costs tens of
1207                            // ms per round-trip if left on.
1208                            let _ = stream.set_nodelay(true);
1209                            self.state.metrics.connections_accepted.fetch_add(1, Ordering::Relaxed);
1210                            let state = self.state.clone();
1211                            // Snapshot the *live* config so a SIGHUP reload
1212                            // applies to new connections; in-flight sessions
1213                            // keep the snapshot they began with (Batch H).
1214                            let config = (*self.state.live_config.load_full()).clone();
1215                            let shutdown_tx = self.shutdown_tx.clone();
1216
1217                            tokio::spawn(async move {
1218                                if let Err(e) = Self::handle_client(stream, addr, state, config, shutdown_tx).await {
1219                                    tracing::error!("Client handler error: {}", e);
1220                                }
1221                            });
1222                        }
1223                        Err(e) => {
1224                            tracing::error!("Accept error: {}", e);
1225                        }
1226                    }
1227                }
1228                _ = shutdown_rx.recv() => {
1229                    tracing::info!("Shutdown signal received");
1230                    break;
1231                }
1232            }
1233        }
1234
1235        // Close the listening socket so the kernel stops routing new connections
1236        // to this process's accept queue (with SO_REUSEPORT they would otherwise
1237        // sit unaccepted) — all new connections now go to the sibling listener.
1238        drop(listener);
1239
1240        // On a graceful handoff, keep serving in-flight connections until they
1241        // finish (or the drain deadline), so nothing in flight is dropped.
1242        if graceful {
1243            let timeout =
1244                Self::drain_timeout(self.state.live_config.load().shutdown_drain_timeout_secs);
1245            tracing::info!(
1246                timeout_secs = timeout.as_secs(),
1247                "draining in-flight connections"
1248            );
1249            Self::drain_connections(&self.state, timeout).await;
1250        }
1251
1252        // Wait for background tasks
1253        health_task.abort();
1254        pool_task.abort();
1255        admin_task.abort();
1256        if let Some(t) = mcp_task {
1257            t.abort();
1258        }
1259        if let Some(t) = http_gw_task {
1260            t.abort();
1261        }
1262
1263        Ok(())
1264    }
1265
1266    /// Spawn admin API server
1267    fn spawn_admin_server(&self) -> tokio::task::JoinHandle<()> {
1268        let config = self.config.clone();
1269        let state = self.state.clone();
1270        let mut shutdown_rx = self.shutdown_tx.subscribe();
1271
1272        tokio::spawn(async move {
1273            // Create admin state
1274            let admin_state = Arc::new(AdminState::new());
1275
1276            // Initialize config snapshot
1277            {
1278                let mut snapshot = admin_state.config_snapshot.write().await;
1279                *snapshot = ConfigSnapshot {
1280                    listen_address: config.listen_address.clone(),
1281                    admin_address: config.admin_address.clone(),
1282                    tr_enabled: config.tr_enabled,
1283                    tr_mode: format!("{:?}", config.tr_mode),
1284                    pool_min_connections: config.pool.min_connections,
1285                    pool_max_connections: config.pool.max_connections,
1286                    nodes: config
1287                        .nodes
1288                        .iter()
1289                        .map(|n| NodeSnapshot {
1290                            address: n.address(),
1291                            role: format!("{:?}", n.role),
1292                            weight: n.weight,
1293                            enabled: n.enabled,
1294                        })
1295                        .collect(),
1296                };
1297            }
1298
1299            // Set proxy config for SQL routing
1300            admin_state.set_proxy_config(config.clone()).await;
1301
1302            // Require a Bearer token on admin requests when configured.
1303            admin_state
1304                .with_auth_token(config.admin_token.clone())
1305                .await;
1306
1307            // Branch-database provisioning surface.
1308            if config.branch.enabled {
1309                admin_state.with_branch(config.branch.clone()).await;
1310            }
1311
1312            // Surface traffic-mirror / migration status when mirroring is on.
1313            if let Some(ref mirror) = state.mirror {
1314                admin_state
1315                    .with_migration(crate::admin::MigrationInfo {
1316                        target: mirror.target().to_string(),
1317                        writes_only: mirror.writes_only(),
1318                        metrics: mirror.metrics.clone(),
1319                        config: config.mirror.clone(),
1320                        cutover: state.cutover.clone(),
1321                        cutover_target: crate::mirror::CutoverTarget {
1322                            addr: format!(
1323                                "{}:{}",
1324                                config.mirror.backend_host, config.mirror.backend_port
1325                            ),
1326                            user: config.mirror.backend_user.clone(),
1327                            password: config.mirror.backend_password.clone(),
1328                            database: config.mirror.backend_database.clone(),
1329                        },
1330                    })
1331                    .await;
1332            }
1333
1334            // Attach the plugin manager so /plugins + the admin UI
1335            // surface real loaded modules. Cheap Arc-clone — no
1336            // duplicate state, both AdminState and ServerState hold
1337            // the same manager.
1338            #[cfg(feature = "wasm-plugins")]
1339            if let Some(ref pm) = state.plugin_manager {
1340                admin_state.with_plugin_manager(pm.clone()).await;
1341            }
1342
1343            // Attach the pool manager so /api/pools surfaces real per-node
1344            // pool statistics instead of an empty list.
1345            #[cfg(feature = "pool-modes")]
1346            if let Some(ref pm) = state.pool_manager {
1347                admin_state.with_pool_manager(pm.clone()).await;
1348            }
1349
1350            // Attach the circuit-breaker manager so /api/circuit reports live
1351            // per-node breaker state.
1352            #[cfg(feature = "circuit-breaker")]
1353            if let Some(ref cb) = state.circuit_breaker {
1354                admin_state.with_circuit_breaker(cb.clone()).await;
1355            }
1356
1357            // Attach the time-travel replay engine. The engine reads
1358            // windows from the shared TransactionJournal and replays
1359            // statements against a target backend supplied per-request.
1360            // Per-call credential overrides land via FU-21's
1361            // ReplayRequestBody.target_user / target_password /
1362            // target_database fields.
1363            #[cfg(feature = "ha-tr")]
1364            {
1365                let template = build_replay_backend_template(&config);
1366                let engine = Arc::new(crate::replay::ReplayEngine::new(
1367                    state.transaction_journal.clone(),
1368                    template,
1369                ));
1370                admin_state.with_replay_engine(engine).await;
1371            }
1372
1373            // Attach the anomaly detector — same Arc the server
1374            // populates from the query path. /api/anomalies polls
1375            // this for surfaced detections.
1376            #[cfg(feature = "anomaly-detection")]
1377            admin_state
1378                .with_anomaly_detector(state.anomaly_detector.clone())
1379                .await;
1380
1381            // Attach the query-analytics engine so /api/analytics can read it.
1382            #[cfg(feature = "query-analytics")]
1383            if let Some(a) = state.analytics.as_ref() {
1384                admin_state.with_analytics(a.clone()).await;
1385            }
1386
1387            // Attach the edge cache + registry. Both surfaced via
1388            // /api/edge/* admin routes.
1389            #[cfg(feature = "edge-proxy")]
1390            admin_state
1391                .with_edge(state.edge_cache.clone(), state.edge_registry.clone())
1392                .await;
1393
1394            // Create admin server
1395            let admin_server = AdminServer::new(config.admin_address.clone(), admin_state.clone());
1396
1397            // Spawn state sync task
1398            let admin_state_sync = admin_state.clone();
1399            let server_state = state.clone();
1400            let sync_task = tokio::spawn(async move {
1401                let mut interval = tokio::time::interval(std::time::Duration::from_secs(1));
1402                loop {
1403                    interval.tick().await;
1404
1405                    // Sync health status
1406                    {
1407                        let health = server_state.health.load_full();
1408                        let mut admin_health = admin_state_sync.node_health.write().await;
1409                        *admin_health = (*health).clone();
1410                    }
1411
1412                    // Sync metrics
1413                    {
1414                        let metrics = ServerMetricsSnapshot {
1415                            connections_accepted: server_state
1416                                .metrics
1417                                .connections_accepted
1418                                .load(Ordering::Relaxed),
1419                            connections_closed: server_state
1420                                .metrics
1421                                .connections_closed
1422                                .load(Ordering::Relaxed),
1423                            queries_processed: server_state
1424                                .metrics
1425                                .queries_processed
1426                                .load(Ordering::Relaxed),
1427                            bytes_received: server_state
1428                                .metrics
1429                                .bytes_received
1430                                .load(Ordering::Relaxed),
1431                            bytes_sent: server_state.metrics.bytes_sent.load(Ordering::Relaxed),
1432                            failovers: server_state.metrics.failovers.load(Ordering::Relaxed),
1433                        };
1434                        let mut admin_metrics = admin_state_sync.metrics.write().await;
1435                        *admin_metrics = metrics;
1436                    }
1437
1438                    // Sync session count
1439                    {
1440                        let sessions = server_state.sessions.read().await;
1441                        let mut admin_sessions = admin_state_sync.active_sessions.write().await;
1442                        *admin_sessions = sessions.len() as u64;
1443                    }
1444                }
1445            });
1446
1447            // Run admin server
1448            tokio::select! {
1449                result = admin_server.run() => {
1450                    if let Err(e) = result {
1451                        tracing::error!("Admin server error: {}", e);
1452                    }
1453                }
1454                _ = shutdown_rx.recv() => {
1455                    tracing::info!("Admin server shutting down");
1456                }
1457            }
1458
1459            sync_task.abort();
1460        })
1461    }
1462
1463    /// Handle a client connection
1464    async fn handle_client(
1465        stream: TcpStream,
1466        addr: SocketAddr,
1467        state: Arc<ServerState>,
1468        config: ProxyConfig,
1469        _shutdown_tx: broadcast::Sender<()>,
1470    ) -> Result<()> {
1471        tracing::debug!("New client connection from {}", addr);
1472
1473        // Create session
1474        let session = Arc::new(ClientSession {
1475            id: Uuid::new_v4(),
1476            client_addr: addr,
1477            current_node: RwLock::new(None),
1478            tx_state: RwLock::new(TransactionState::default()),
1479            variables: RwLock::new(HashMap::new()),
1480            created_at: chrono::Utc::now(),
1481            tr_mode: config.tr_mode,
1482            #[cfg(feature = "lag-routing")]
1483            last_write_at: RwLock::new(None),
1484            #[cfg(feature = "pool-modes")]
1485            pool_client_id: ClientId::new(),
1486            #[cfg(feature = "wasm-plugins")]
1487            plugin_identity: RwLock::new(None),
1488        });
1489
1490        // Register session
1491        {
1492            let mut sessions = state.sessions.write().await;
1493            sessions.insert(session.id, session.clone());
1494        }
1495
1496        // Negotiate client TLS (if the client sent SSLRequest). Produces a
1497        // ClientStream that is plaintext or TLS-wrapped; the rest of the
1498        // session is written against that single stream type. `pre` carries
1499        // a first startup/cancel message already read while peeking.
1500        let result = match Self::negotiate_client_tls(stream, &state).await {
1501            Ok((mut client_stream, pre)) => {
1502                Self::client_loop(&mut client_stream, pre, &session, &state, &config).await
1503            }
1504            Err(e) => Err(e),
1505        };
1506
1507        // Cleanup session
1508        {
1509            let mut sessions = state.sessions.write().await;
1510            sessions.remove(&session.id);
1511        }
1512
1513        // Release any active pool lease if pool-modes is enabled
1514        #[cfg(feature = "pool-modes")]
1515        if let Some(ref pool_manager) = state.pool_manager {
1516            // Check if there's an active lease for this client and release it
1517            if pool_manager.has_active_lease(&session.pool_client_id) {
1518                tracing::debug!(
1519                    "Releasing pool lease for disconnecting client {:?}",
1520                    session.pool_client_id
1521                );
1522                // Note: The lease is released implicitly when the connection closes
1523                // The pool manager will clean up any orphaned leases
1524            }
1525        }
1526
1527        state
1528            .metrics
1529            .connections_closed
1530            .fetch_add(1, Ordering::Relaxed);
1531
1532        result
1533    }
1534
1535    /// Main client processing loop with full PostgreSQL protocol handling
1536    async fn client_loop(
1537        stream: &mut ClientStream,
1538        pre: Option<StartupMessage>,
1539        session: &Arc<ClientSession>,
1540        state: &Arc<ServerState>,
1541        config: &ProxyConfig,
1542    ) -> Result<()> {
1543        let codec = ProtocolCodec::new();
1544        let mut buffer = BytesMut::with_capacity(8192);
1545
1546        // Handle startup phase. The session keeps a per-node cache of
1547        // authenticated backend connections (`conns`) instead of a single
1548        // stream: when read/write routing moves a session between primary
1549        // and standby it now reuses the already-authenticated connection to
1550        // each node rather than dropping the socket and paying a fresh TCP
1551        // connect + startup + SCRAM handshake on every switch (Batch C).
1552        // Connections are authenticated with the client's own credentials
1553        // (auth is pass-through). In Transaction/Statement pooling mode they
1554        // are returned to a shared, identity-keyed idle pool at each
1555        // transaction boundary (DISCARD ALL reset on release) and reused by the
1556        // next same-identity acquisition — see `release_to_pool_if_idle` /
1557        // `ensure_conn`. The first connection of a session is still
1558        // established through the authenticated startup path; drawing the
1559        // startup connection from the pool (to reduce *concurrent* backend
1560        // connections below the client count) additionally needs
1561        // proxy-terminated backend auth and is the documented next increment.
1562        let mut conns: HashMap<String, BackendConn> = HashMap::new();
1563        let mut current_node: Option<String> =
1564            match Self::handle_startup(stream, &mut buffer, &codec, pre, session, state, config)
1565                .await
1566            {
1567                Ok((Some(stream_conn), node_addr)) => {
1568                    conns.insert(node_addr.clone(), BackendConn::new(stream_conn));
1569                    Some(node_addr)
1570                }
1571                Ok((None, _)) => {
1572                    // SSL rejected or cancel request, connection should close
1573                    return Ok(());
1574                }
1575                Err(e) => {
1576                    tracing::error!("Startup failed: {}", e);
1577                    // Send error to client
1578                    let err_msg =
1579                        Self::create_error_response("08006", &format!("Startup failed: {}", e));
1580                    let _ = stream.write_all(&err_msg).await;
1581                    return Err(e);
1582                }
1583            };
1584
1585        // Main query loop.
1586        //
1587        // Two wire shapes are handled. Simple-query (`Query`) messages are
1588        // self-contained: route, forward, and stream the response back
1589        // frame-by-frame until ReadyForQuery. Extended-protocol messages
1590        // (`Parse`/`Bind`/`Describe`/`Execute`/`Close`) carry no response of
1591        // their own until the client sends `Sync` (or `Flush`), so they are
1592        // accumulated into `pending` and forwarded as one batch at that
1593        // boundary — this is what stops the per-message 30s backend-read
1594        // timeout that made every prepared-statement driver unusable. The
1595        // routing decision for an extended batch is taken from the SQL in its
1596        // first `Parse`; a batch with no `Parse` (a re-`Bind`/`Execute` of a
1597        // named prepared statement) stays on the connection the statement was
1598        // prepared on.
1599        let mut read_buf = vec![0u8; 16384];
1600        let mut pending = BytesMut::new();
1601        let mut pending_route_sql: Option<String> = None;
1602        // Prepared-statement tracking (Batch F.4). `stmt_registry` is the
1603        // session's canonical record of every *named* `Parse` the client has
1604        // issued (name -> full Parse message bytes) so the proxy can re-prepare
1605        // a statement on any backend connection that is missing it. `batch_*`
1606        // accumulate, for the in-flight extended batch, which named statements
1607        // it defines (Parse), references (Bind/Describe-S), and closes
1608        // (Close-S) — resolved at the Sync/Flush boundary.
1609        let mut stmt_registry: HashMap<String, bytes::Bytes> = HashMap::new();
1610        // Running sum of the bytes held in `stmt_registry`, kept in step with
1611        // it so the aggregate-size cap is O(1) per Parse (see MAX_PREPARED_BYTES).
1612        let mut stmt_registry_bytes: usize = 0;
1613        let mut batch_defines: Vec<String> = Vec::new();
1614        let mut batch_refs: Vec<String> = Vec::new();
1615        let mut batch_closes: Vec<String> = Vec::new();
1616        // Unnamed-`Parse` promotion (Batch H). `held_unnamed` parks an unnamed
1617        // Parse that is the FIRST message of a batch (so the batch stays the
1618        // clean Parse→Bind→…→Sync shape) — it is NOT appended to `pending`; the
1619        // decision to forward or skip it is taken at the batch boundary once the
1620        // target connection is known. Holds (full Parse message, signature).
1621        let promote_unnamed = config.optimize_unnamed_parse;
1622        let mut held_unnamed: Option<(bytes::Bytes, bytes::Bytes)> = None;
1623        loop {
1624            // Read from client
1625            let n = stream
1626                .read(&mut read_buf)
1627                .await
1628                .map_err(|e| ProxyError::Network(format!("Read error: {}", e)))?;
1629
1630            if n == 0 {
1631                // Client disconnected
1632                break;
1633            }
1634
1635            buffer.extend_from_slice(&read_buf[..n]);
1636            state
1637                .metrics
1638                .bytes_received
1639                .fetch_add(n as u64, Ordering::Relaxed);
1640
1641            // Bound a single in-flight message: refuse before the accumulation
1642            // buffer for one (possibly malicious) oversized frame can exhaust
1643            // memory. A legitimate client never needs a single >64 MiB message.
1644            if buffer.len() > Self::MAX_PENDING_BYTES {
1645                let emsg =
1646                    Self::create_error_response("53400", "message exceeds per-session size limit");
1647                let _ = stream.write_all(&emsg).await;
1648                let _ = stream.write_all(&Self::create_ready_for_query(b'I')).await;
1649                tracing::warn!(
1650                    client = %session.client_addr,
1651                    bytes = buffer.len(),
1652                    "inbound message exceeds size cap; closing connection"
1653                );
1654                return Ok(());
1655            }
1656
1657            // Process all complete messages in buffer
1658            while let Some(msg) = codec.decode_message(&mut buffer)? {
1659                match msg.msg_type {
1660                    MessageType::Terminate => return Ok(()),
1661
1662                    // ---- Simple query protocol ----
1663                    MessageType::Query => {
1664                        // Anomaly detector — record every Query message before
1665                        // the plugin hook so a detection lands in the audit
1666                        // trail even if a plugin later blocks.
1667                        #[cfg(feature = "anomaly-detection")]
1668                        Self::record_anomaly_observation(&msg, state, session);
1669
1670                        // Plugin pre-query hook — may rewrite the SQL, block,
1671                        // or return a cached response.
1672                        let (msg, action) = Self::apply_pre_query_hook(msg, state, session);
1673
1674                        if let PreQueryAction::Block(reason) = &action {
1675                            tracing::info!(reason = %reason, "pre-query plugin blocked query");
1676                            Self::send_block_response(stream, reason, state).await?;
1677                            state
1678                                .metrics
1679                                .queries_processed
1680                                .fetch_add(1, Ordering::Relaxed);
1681                            continue;
1682                        }
1683
1684                        #[cfg(feature = "wasm-plugins")]
1685                        if let PreQueryAction::Cached(bytes) = &action {
1686                            match Self::synthesise_cached_response(bytes) {
1687                                Ok(reply) => {
1688                                    stream.write_all(&reply).await.map_err(|e| {
1689                                        ProxyError::Network(format!("Write error: {}", e))
1690                                    })?;
1691                                    state
1692                                        .metrics
1693                                        .bytes_sent
1694                                        .fetch_add(reply.len() as u64, Ordering::Relaxed);
1695                                    state
1696                                        .metrics
1697                                        .queries_processed
1698                                        .fetch_add(1, Ordering::Relaxed);
1699                                    continue;
1700                                }
1701                                Err(e) => {
1702                                    tracing::warn!(error = %e, "failed to synthesise cached response; falling back to backend");
1703                                }
1704                            }
1705                        }
1706
1707                        // Traffic mirror: offer the (final, post-rewrite)
1708                        // statement to the secondary backend. Non-blocking —
1709                        // never delays the client path.
1710                        if let Some(ref mirror) = state.mirror {
1711                            if let Some(sql) = crate::protocol::query_text(&msg.payload) {
1712                                mirror.offer(sql, Self::is_write_query(sql));
1713                            }
1714                        }
1715
1716                        #[cfg(feature = "wasm-plugins")]
1717                        let forward_start = std::time::Instant::now();
1718                        let fr = Self::forward_simple_query(
1719                            stream,
1720                            &msg,
1721                            &mut conns,
1722                            current_node.as_deref(),
1723                            session,
1724                            state,
1725                            config,
1726                        )
1727                        .await;
1728                        #[cfg(feature = "wasm-plugins")]
1729                        Self::fire_post_query_hook(
1730                            &msg,
1731                            session,
1732                            state,
1733                            &fr,
1734                            forward_start.elapsed(),
1735                        );
1736                        let (used_node, sent) = fr?;
1737                        if let Some(n) = used_node {
1738                            current_node = Some(n);
1739                        }
1740                        // Transaction/Statement pooling: park the connection
1741                        // back to the shared pool once the session is idle.
1742                        #[cfg(feature = "pool-modes")]
1743                        Self::release_to_pool_if_idle(
1744                            &mut conns,
1745                            current_node.as_deref(),
1746                            session,
1747                            state,
1748                            config,
1749                        )
1750                        .await;
1751                        state.metrics.bytes_sent.fetch_add(sent, Ordering::Relaxed);
1752                        state
1753                            .metrics
1754                            .queries_processed
1755                            .fetch_add(1, Ordering::Relaxed);
1756                    }
1757
1758                    // ---- Extended query protocol: accumulate until Sync/Flush ----
1759                    MessageType::Parse
1760                    | MessageType::Bind
1761                    | MessageType::Describe
1762                    | MessageType::Execute
1763                    | MessageType::Close => {
1764                        // Whether this message is appended to `pending`. An
1765                        // unnamed Parse held aside for promotion is the lone
1766                        // exception (resolved at the batch boundary).
1767                        let mut add_to_pending = true;
1768                        match msg.msg_type {
1769                            MessageType::Parse => {
1770                                // Register named statements so they can be
1771                                // re-prepared on a different backend later, and
1772                                // borrow the query (2nd cstring) for routing.
1773                                let name = Self::parse_stmt_name(&msg.payload);
1774                                let unnamed = name.is_empty();
1775                                if !unnamed {
1776                                    let name = name.to_string();
1777                                    let existed = stmt_registry.contains_key(&name);
1778                                    // Cap distinct prepared statements per session so a
1779                                    // client issuing unbounded named `Parse`s can't grow
1780                                    // `stmt_registry` without limit.
1781                                    if !existed
1782                                        && stmt_registry.len() >= Self::MAX_PREPARED_STATEMENTS
1783                                    {
1784                                        let emsg = Self::create_error_response(
1785                                            "54000",
1786                                            "too many prepared statements for this session",
1787                                        );
1788                                        let _ = stream.write_all(&emsg).await;
1789                                        let _ = stream
1790                                            .write_all(&Self::create_ready_for_query(b'I'))
1791                                            .await;
1792                                        tracing::warn!(
1793                                            client = %session.client_addr,
1794                                            limit = Self::MAX_PREPARED_STATEMENTS,
1795                                            "prepared-statement cap exceeded; closing connection"
1796                                        );
1797                                        return Ok(());
1798                                    }
1799                                    let encoded = msg.encode().freeze();
1800                                    // Bound the AGGREGATE bytes retained, not just the
1801                                    // count: a (possibly re-Parsed) statement that would
1802                                    // push the session over the byte cap is refused.
1803                                    let old_len =
1804                                        stmt_registry.get(&name).map(|b| b.len()).unwrap_or(0);
1805                                    let projected =
1806                                        stmt_registry_bytes.saturating_sub(old_len) + encoded.len();
1807                                    if projected > Self::MAX_PREPARED_BYTES {
1808                                        let emsg = Self::create_error_response(
1809                                            "54000",
1810                                            "prepared-statement memory limit exceeded for this session",
1811                                        );
1812                                        let _ = stream.write_all(&emsg).await;
1813                                        let _ = stream
1814                                            .write_all(&Self::create_ready_for_query(b'I'))
1815                                            .await;
1816                                        tracing::warn!(
1817                                            client = %session.client_addr,
1818                                            limit = Self::MAX_PREPARED_BYTES,
1819                                            "prepared-statement byte cap exceeded; closing connection"
1820                                        );
1821                                        return Ok(());
1822                                    }
1823                                    stmt_registry.insert(name.clone(), encoded);
1824                                    stmt_registry_bytes = projected;
1825                                    batch_defines.push(name);
1826                                }
1827                                if pending_route_sql.is_none() {
1828                                    if let Some(end) = msg.payload.iter().position(|&b| b == 0) {
1829                                        if let Some(q) =
1830                                            crate::protocol::query_text(&msg.payload[end + 1..])
1831                                        {
1832                                            if !q.is_empty() {
1833                                                pending_route_sql = Some(q.to_string());
1834                                                #[cfg(feature = "anomaly-detection")]
1835                                                Self::record_anomaly_sql(q, state, session);
1836                                            }
1837                                        }
1838                                    }
1839                                }
1840                                // Promotion: park an unnamed Parse that opens a
1841                                // fresh batch. Its signature is the payload after
1842                                // the empty statement-name NUL (query + param
1843                                // types). Anything that breaks the clean shape
1844                                // (a second Parse, a non-empty `pending`) un-parks
1845                                // it back into `pending` to preserve wire order.
1846                                if promote_unnamed
1847                                    && unnamed
1848                                    && pending.is_empty()
1849                                    && held_unnamed.is_none()
1850                                {
1851                                    let sig = bytes::Bytes::copy_from_slice(&msg.payload[1..]);
1852                                    held_unnamed = Some((msg.encode().freeze(), sig));
1853                                    add_to_pending = false;
1854                                } else if let Some((held_msg, _)) = held_unnamed.take() {
1855                                    let mut combined =
1856                                        BytesMut::with_capacity(held_msg.len() + pending.len());
1857                                    combined.extend_from_slice(&held_msg);
1858                                    combined.extend_from_slice(&pending);
1859                                    pending = combined;
1860                                }
1861                            }
1862                            MessageType::Bind => {
1863                                if let Some(name) = Self::bind_stmt_ref(&msg.payload) {
1864                                    batch_refs.push(name.to_string());
1865                                }
1866                            }
1867                            MessageType::Describe => {
1868                                if let Some(name) = Self::stmt_kind_name(&msg.payload) {
1869                                    batch_refs.push(name.to_string());
1870                                }
1871                            }
1872                            MessageType::Close => {
1873                                if let Some(name) = Self::stmt_kind_name(&msg.payload) {
1874                                    batch_closes.push(name.to_string());
1875                                }
1876                            }
1877                            _ => {}
1878                        }
1879                        if add_to_pending {
1880                            pending.extend_from_slice(&msg.encode());
1881                        }
1882                    }
1883
1884                    // ---- Extended batch boundary ----
1885                    MessageType::Sync | MessageType::Flush => {
1886                        let wait_ready = msg.msg_type == MessageType::Sync;
1887                        pending.extend_from_slice(&msg.encode());
1888                        let batch = pending.split().freeze();
1889                        // Re-prepare any named statement this batch references
1890                        // but does not itself define, in case the target
1891                        // connection (after a switch/redial) is missing it.
1892                        let reprepare: Vec<String> = batch_refs
1893                            .iter()
1894                            .filter(|r| !batch_defines.contains(r))
1895                            .cloned()
1896                            .collect();
1897                        let (used_node, sent) = Self::forward_extended_batch(
1898                            stream,
1899                            &batch,
1900                            pending_route_sql.as_deref(),
1901                            wait_ready,
1902                            &mut conns,
1903                            current_node.as_deref(),
1904                            &stmt_registry,
1905                            &reprepare,
1906                            &batch_defines,
1907                            held_unnamed.take(),
1908                            session,
1909                            state,
1910                            config,
1911                        )
1912                        .await?;
1913                        if let Some(n) = used_node {
1914                            current_node = Some(n);
1915                        }
1916                        // A `Sync` is the extended-protocol transaction/statement
1917                        // boundary (it yields ReadyForQuery); a `Flush` is not, so
1918                        // only a Sync triggers a pool release.
1919                        #[cfg(feature = "pool-modes")]
1920                        if wait_ready {
1921                            Self::release_to_pool_if_idle(
1922                                &mut conns,
1923                                current_node.as_deref(),
1924                                session,
1925                                state,
1926                                config,
1927                            )
1928                            .await;
1929                        }
1930                        state.metrics.bytes_sent.fetch_add(sent, Ordering::Relaxed);
1931                        // Closed statements are deallocated everywhere — forget
1932                        // their canonical Parse so they are never re-prepared.
1933                        for name in batch_closes.drain(..) {
1934                            if let Some(removed) = stmt_registry.remove(&name) {
1935                                stmt_registry_bytes =
1936                                    stmt_registry_bytes.saturating_sub(removed.len());
1937                            }
1938                        }
1939                        if wait_ready {
1940                            // Sync ends the extended cycle; reset routing so the
1941                            // next Parse can re-route. Flush leaves it intact so
1942                            // the rest of the in-flight sequence stays put.
1943                            pending_route_sql = None;
1944                            batch_defines.clear();
1945                            batch_refs.clear();
1946                            state
1947                                .metrics
1948                                .queries_processed
1949                                .fetch_add(1, Ordering::Relaxed);
1950                        }
1951                    }
1952
1953                    // ---- COPY sub-protocol (client -> backend) ----
1954                    MessageType::CopyData | MessageType::CopyDone | MessageType::CopyFail => {
1955                        if let Some(node) = current_node.clone() {
1956                            if let Some(b) = conns.get_mut(&node) {
1957                                b.stream.write_all(&msg.encode()).await.map_err(|e| {
1958                                    ProxyError::Network(format!("Backend copy write error: {}", e))
1959                                })?;
1960                                if matches!(
1961                                    msg.msg_type,
1962                                    MessageType::CopyDone | MessageType::CopyFail
1963                                ) {
1964                                    let r = Self::stream_until_ready(
1965                                        stream,
1966                                        &mut b.stream,
1967                                        session,
1968                                        state,
1969                                    )
1970                                    .await;
1971                                    match r {
1972                                        Ok(sent) => {
1973                                            state
1974                                                .metrics
1975                                                .bytes_sent
1976                                                .fetch_add(sent, Ordering::Relaxed);
1977                                        }
1978                                        Err(e) => {
1979                                            conns.remove(&node);
1980                                            return Err(e);
1981                                        }
1982                                    }
1983                                }
1984                            }
1985                        }
1986                    }
1987
1988                    // ---- Anything else: forward to current backend best-effort ----
1989                    _ => {
1990                        if let Some(ref node) = current_node {
1991                            if let Some(b) = conns.get_mut(node) {
1992                                let _ = b.stream.write_all(&msg.encode()).await;
1993                            }
1994                        }
1995                    }
1996                }
1997            }
1998
1999            // Bound un-flushed extended-protocol accumulation: a client must
2000            // reach a Sync/Flush boundary before this many bytes pile up in
2001            // `pending` (otherwise a never-syncing client grows it unbounded).
2002            if pending.len() > Self::MAX_PENDING_BYTES {
2003                let emsg = Self::create_error_response(
2004                    "53400",
2005                    "un-flushed extended-protocol buffer exceeds per-session limit",
2006                );
2007                let _ = stream.write_all(&emsg).await;
2008                let _ = stream.write_all(&Self::create_ready_for_query(b'I')).await;
2009                tracing::warn!(
2010                    client = %session.client_addr,
2011                    pending = pending.len(),
2012                    "pending extended-protocol buffer cap exceeded; closing connection"
2013                );
2014                return Ok(());
2015            }
2016        }
2017
2018        // On disconnect, park this session's still-idle connections so a later
2019        // same-identity client can reuse them (cross-client pooling). Anything
2020        // mid-transaction is left to drop (closed → backend rolls back).
2021        #[cfg(feature = "pool-modes")]
2022        if state.backend_pool.is_some() {
2023            let nodes: Vec<String> = conns.keys().cloned().collect();
2024            for node in nodes {
2025                Self::release_to_pool_if_idle(
2026                    &mut conns,
2027                    Some(node.as_str()),
2028                    session,
2029                    state,
2030                    config,
2031                )
2032                .await;
2033            }
2034        }
2035
2036        Ok(())
2037    }
2038
2039    /// Peek the first startup-phase message and negotiate client TLS.
2040    ///
2041    /// On `SSLRequest` the proxy answers `S` and runs a rustls server
2042    /// handshake when a TLS acceptor is configured, otherwise `N`
2043    /// (plaintext). A `Startup`/`CancelRequest` arriving first (no
2044    /// SSLRequest) is returned in `pre` so the caller doesn't re-read it.
2045    async fn negotiate_client_tls(
2046        mut tcp: TcpStream,
2047        state: &Arc<ServerState>,
2048    ) -> Result<(ClientStream, Option<StartupMessage>)> {
2049        let codec = ProtocolCodec::new();
2050        let mut buffer = BytesMut::with_capacity(1024);
2051        let mut read_buf = vec![0u8; 1024];
2052
2053        let first = loop {
2054            if let Some(msg) = codec.decode_startup(&mut buffer)? {
2055                break msg;
2056            }
2057            let n = tcp
2058                .read(&mut read_buf)
2059                .await
2060                .map_err(|e| ProxyError::Network(format!("Startup read error: {}", e)))?;
2061            if n == 0 {
2062                return Err(ProxyError::Connection(
2063                    "client closed before startup".to_string(),
2064                ));
2065            }
2066            buffer.extend_from_slice(&read_buf[..n]);
2067        };
2068
2069        match first {
2070            StartupMessage::SSLRequest => match state.tls_acceptor.as_ref() {
2071                Some(acceptor) => {
2072                    tcp.write_all(b"S")
2073                        .await
2074                        .map_err(|e| ProxyError::Network(format!("SSL accept write: {}", e)))?;
2075                    let tls = acceptor
2076                        .accept(tcp)
2077                        .await
2078                        .map_err(|e| ProxyError::Network(format!("TLS handshake failed: {}", e)))?;
2079                    if tls.get_ref().1.peer_certificates().is_some() {
2080                        tracing::debug!("client presented a certificate (mTLS)");
2081                    }
2082                    Ok((ClientStream::Tls(Box::new(tls)), None))
2083                }
2084                None => {
2085                    tcp.write_all(b"N")
2086                        .await
2087                        .map_err(|e| ProxyError::Network(format!("SSL reject write: {}", e)))?;
2088                    Ok((ClientStream::Plain(tcp), None))
2089                }
2090            },
2091            other => Ok((ClientStream::Plain(tcp), Some(other))),
2092        }
2093    }
2094
2095    /// Handle PostgreSQL startup phase (authentication). TLS/SSLRequest is
2096    /// already handled upstream in `negotiate_client_tls`; `pre` carries the
2097    /// first startup/cancel message when it was read during negotiation.
2098    async fn handle_startup(
2099        client_stream: &mut ClientStream,
2100        buffer: &mut BytesMut,
2101        codec: &ProtocolCodec,
2102        pre: Option<StartupMessage>,
2103        session: &Arc<ClientSession>,
2104        state: &Arc<ServerState>,
2105        config: &ProxyConfig,
2106    ) -> Result<(Option<TcpStream>, String)> {
2107        // Use the message already read during TLS negotiation, or read one
2108        // now (the TLS case, where the real startup follows the handshake).
2109        let startup_msg = match pre {
2110            Some(msg) => Some(msg),
2111            None => {
2112                let mut read_buf = vec![0u8; 1024];
2113                loop {
2114                    if let Some(msg) = codec.decode_startup(buffer)? {
2115                        break Some(msg);
2116                    }
2117                    let n = client_stream
2118                        .read(&mut read_buf)
2119                        .await
2120                        .map_err(|e| ProxyError::Network(format!("Startup read error: {}", e)))?;
2121                    if n == 0 {
2122                        return Ok((None, String::new()));
2123                    }
2124                    buffer.extend_from_slice(&read_buf[..n]);
2125                }
2126            }
2127        };
2128
2129        match startup_msg {
2130            Some(StartupMessage::SSLRequest) => {
2131                // SSL is negotiated upstream; a second SSLRequest here is a
2132                // protocol error — reject defensively.
2133                client_stream
2134                    .write_all(b"N")
2135                    .await
2136                    .map_err(|e| ProxyError::Network(format!("SSL reject error: {}", e)))?;
2137                Err(ProxyError::Protocol(
2138                    "unexpected SSLRequest after startup".to_string(),
2139                ))
2140            }
2141            Some(StartupMessage::CancelRequest { pid, key }) => {
2142                // Forward the cancel to the backend that owns this key, then
2143                // close (the client opened this connection only to cancel).
2144                Self::forward_cancel_request(state, pid, key).await;
2145                Ok((None, String::new()))
2146            }
2147            Some(StartupMessage::Startup { params, .. }) => {
2148                Self::connect_and_authenticate(client_stream, &params, session, state, config).await
2149            }
2150            None => Err(ProxyError::Protocol(
2151                "Incomplete startup message".to_string(),
2152            )),
2153        }
2154    }
2155
2156    /// Evaluate pg_hba-style admission rules in order. The first rule whose
2157    /// user, database, and address all match decides; if none match, admit.
2158    fn hba_admits(rules: &[HbaRule], ip: std::net::IpAddr, user: &str, database: &str) -> bool {
2159        for r in rules {
2160            let user_ok = r.user == "all" || r.user == user;
2161            let db_ok = r.database == "all" || r.database == database;
2162            if user_ok && db_ok && Self::hba_addr_matches(&r.address, ip) {
2163                return r.action == HbaAction::Allow;
2164            }
2165        }
2166        true
2167    }
2168
2169    /// Match a client address against an hba `address` spec: "all", a bare
2170    /// IP, or a CIDR (`10.0.0.0/8`, `::1/128`).
2171    fn hba_addr_matches(spec: &str, ip: std::net::IpAddr) -> bool {
2172        use std::net::IpAddr;
2173        if spec == "all" {
2174            return true;
2175        }
2176        if let Some((net, bits)) = spec.split_once('/') {
2177            let bits: u32 = match bits.parse() {
2178                Ok(b) => b,
2179                Err(_) => return false,
2180            };
2181            match (net.parse::<IpAddr>(), ip) {
2182                (Ok(IpAddr::V4(n)), IpAddr::V4(i)) if bits <= 32 => {
2183                    let mask = if bits == 0 {
2184                        0
2185                    } else {
2186                        u32::MAX << (32 - bits)
2187                    };
2188                    (u32::from(n) & mask) == (u32::from(i) & mask)
2189                }
2190                (Ok(IpAddr::V6(n)), IpAddr::V6(i)) if bits <= 128 => {
2191                    let mask = if bits == 0 {
2192                        0
2193                    } else {
2194                        u128::MAX << (128 - bits)
2195                    };
2196                    (u128::from(n) & mask) == (u128::from(i) & mask)
2197                }
2198                _ => false,
2199            }
2200        } else {
2201            spec.parse::<IpAddr>().map(|s| s == ip).unwrap_or(false)
2202        }
2203    }
2204
2205    /// Run a proxy-terminated SCRAM-SHA-256 server exchange against the
2206    /// client, validating its password with the configured `auth_file`. On
2207    /// success the client is authenticated by the proxy (no AuthenticationOk
2208    /// is sent here — the backend's is forwarded later). On any failure
2209    /// returns Err; the caller emits an ErrorResponse and closes.
2210    async fn proxy_scram_auth(
2211        client: &mut ClientStream,
2212        user: &str,
2213        state: &Arc<ServerState>,
2214    ) -> std::result::Result<(), String> {
2215        use crate::auth_scram::ScramServer;
2216        let auth_file = state.auth_file.as_ref().ok_or("scram not configured")?;
2217
2218        // 1. AuthenticationSASL: advertise SCRAM-SHA-256.
2219        let mut sasl = BytesMut::new();
2220        sasl.put_i32(10); // SASL
2221        sasl.extend_from_slice(b"SCRAM-SHA-256\0");
2222        sasl.put_u8(0); // end of mechanism list
2223        Self::write_auth_frame(client, &sasl).await?;
2224
2225        // 2. Read SASLInitialResponse ('p'): mechanism cstring + i32 len + data.
2226        let init = Self::read_password_message(client).await?;
2227        let mech_end = init
2228            .iter()
2229            .position(|&b| b == 0)
2230            .ok_or("malformed SASLInitialResponse (no mechanism)")?;
2231        if init.len() < mech_end + 5 {
2232            return Err("short SASLInitialResponse".into());
2233        }
2234        let client_first =
2235            std::str::from_utf8(&init[mech_end + 5..]).map_err(|_| "client-first not UTF-8")?;
2236
2237        // 3. Look up the verifier (unknown user -> generic failure).
2238        let verifier = auth_file.get(user).ok_or("no such user")?.clone();
2239
2240        // 4. server-first.
2241        let server_nonce = Self::random_nonce();
2242        let (server, server_first) = ScramServer::start(verifier, client_first, &server_nonce)?;
2243
2244        // 5. AuthenticationSASLContinue.
2245        let mut cont = BytesMut::new();
2246        cont.put_i32(11);
2247        cont.extend_from_slice(server_first.as_bytes());
2248        Self::write_auth_frame(client, &cont).await?;
2249
2250        // 6. Read SASLResponse ('p'): payload = client-final.
2251        let client_final_raw = Self::read_password_message(client).await?;
2252        let client_final =
2253            std::str::from_utf8(&client_final_raw).map_err(|_| "client-final not UTF-8")?;
2254
2255        // 7. Verify -> server-final.
2256        let server_final = server.finish(client_final)?;
2257
2258        // 8. AuthenticationSASLFinal (no AuthenticationOk — backend's follows).
2259        let mut fin = BytesMut::new();
2260        fin.put_i32(12);
2261        fin.extend_from_slice(server_final.as_bytes());
2262        Self::write_auth_frame(client, &fin).await?;
2263        Ok(())
2264    }
2265
2266    /// Write an AuthenticationRequest ('R') frame with the given payload.
2267    async fn write_auth_frame(
2268        client: &mut ClientStream,
2269        payload: &[u8],
2270    ) -> std::result::Result<(), String> {
2271        let mut frame = BytesMut::with_capacity(payload.len() + 5);
2272        frame.put_u8(b'R');
2273        frame.put_u32((payload.len() + 4) as u32);
2274        frame.extend_from_slice(payload);
2275        client
2276            .write_all(&frame)
2277            .await
2278            .map_err(|e| format!("client write: {}", e))
2279    }
2280
2281    /// Read one Password/SASL ('p') message from the client, returning its
2282    /// payload. Errors on EOF or any non-'p' frame.
2283    async fn read_password_message(
2284        client: &mut ClientStream,
2285    ) -> std::result::Result<BytesMut, String> {
2286        let codec = ProtocolCodec::new();
2287        let mut buffer = BytesMut::with_capacity(1024);
2288        let mut read_buf = vec![0u8; 1024];
2289        loop {
2290            if let Some(msg) = codec
2291                .decode_message(&mut buffer)
2292                .map_err(|e| format!("decode: {}", e))?
2293            {
2294                if msg.msg_type == MessageType::Password {
2295                    return Ok(msg.payload);
2296                }
2297                return Err(format!("expected SASL response, got {:?}", msg.msg_type));
2298            }
2299            let n = client
2300                .read(&mut read_buf)
2301                .await
2302                .map_err(|e| format!("client read: {}", e))?;
2303            if n == 0 {
2304                return Err("client closed during SASL".into());
2305            }
2306            buffer.extend_from_slice(&read_buf[..n]);
2307        }
2308    }
2309
2310    /// A fresh random SCRAM server nonce (printable, no comma).
2311    fn random_nonce() -> String {
2312        use rand::Rng;
2313        const CHARS: &[u8] = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789";
2314        let mut rng = rand::thread_rng();
2315        (0..24)
2316            .map(|_| CHARS[rng.gen_range(0..CHARS.len())] as char)
2317            .collect()
2318    }
2319
2320    /// Connect to backend and handle authentication
2321    async fn connect_and_authenticate(
2322        client_stream: &mut ClientStream,
2323        params: &HashMap<String, String>,
2324        session: &Arc<ClientSession>,
2325        state: &Arc<ServerState>,
2326        config: &ProxyConfig,
2327    ) -> Result<(Option<TcpStream>, String)> {
2328        // pg_hba-style admission: reject disallowed (user, database, client
2329        // address) combinations before opening any backend connection.
2330        let user = params.get("user").map(String::as_str).unwrap_or("");
2331        let database = params.get("database").map(String::as_str).unwrap_or(user);
2332        if !Self::hba_admits(&config.hba, session.client_addr.ip(), user, database) {
2333            tracing::info!(%user, %database, client = %session.client_addr, "connection rejected by hba rule");
2334            let err = Self::create_error_response(
2335                "28000",
2336                "connection rejected by proxy admission rules",
2337            );
2338            let _ = client_stream.write_all(&err).await;
2339            return Ok((None, String::new()));
2340        }
2341
2342        // Proxy-terminated SCRAM-SHA-256: when an auth_file is configured the
2343        // proxy authenticates the client itself (becoming the auth boundary)
2344        // instead of relaying credentials to the backend. On success it falls
2345        // through to the normal backend connect, whose AuthenticationOk +
2346        // session messages are forwarded to the already-authenticated client.
2347        if state.auth_file.is_some() {
2348            if let Err(e) = Self::proxy_scram_auth(client_stream, user, state).await {
2349                tracing::info!(%user, error = %e, "proxy SCRAM auth failed");
2350                let err =
2351                    Self::create_error_response("28P01", &format!("authentication failed: {}", e));
2352                let _ = client_stream.write_all(&err).await;
2353                return Ok((None, String::new()));
2354            }
2355            tracing::debug!(%user, "client authenticated by proxy SCRAM");
2356        }
2357
2358        // Plugin Authenticate hook — may deny the connection outright or
2359        // attach a richer identity (roles, tenant_id, claims) onto the
2360        // session for downstream plugins to consume. Happens before any
2361        // backend connection is opened so denials cost nothing on the
2362        // backend side.
2363        Self::apply_authenticate_hook(params, session, state).await?;
2364
2365        // Migration cutover: when active, redirect this connection to the
2366        // promoted target, substituting the target's credentials/database for
2367        // the client's so the cutover is transparent to the application.
2368        let cutover = state.cutover.load_full();
2369        let (node_addr, effective_params) = if let Some(t) = cutover.as_ref() {
2370            let mut p = params.clone();
2371            p.insert("user".to_string(), t.user.clone());
2372            if let Some(ref db) = t.database {
2373                p.insert("database".to_string(), db.clone());
2374            } else {
2375                p.remove("database");
2376            }
2377            tracing::debug!(target = %t.addr, "routing connection to cutover target");
2378            (t.addr.clone(), p)
2379        } else {
2380            (
2381                Self::select_node(session, state, config).await?,
2382                params.clone(),
2383            )
2384        };
2385
2386        // Connect to backend. A failure here (the node is down at the moment a
2387        // new client connects) demotes the node in-band too — not just failures
2388        // on the forward path — so a dead backend is detected on the very next
2389        // connection instead of waiting for the periodic health checker.
2390        let mut backend = match tokio::time::timeout(
2391            config.pool.acquire_timeout(),
2392            TcpStream::connect(&node_addr),
2393        )
2394        .await
2395        {
2396            Ok(Ok(s)) => s,
2397            Ok(Err(e)) => {
2398                let msg = format!("Failed to connect to {}: {}", node_addr, e);
2399                Self::note_backend_failure(state, &node_addr, &msg);
2400                return Err(ProxyError::Connection(msg));
2401            }
2402            Err(_) => {
2403                let msg = format!("Connection timeout to {}", node_addr);
2404                Self::note_backend_failure(state, &node_addr, &msg);
2405                return Err(ProxyError::Connection(msg));
2406            }
2407        };
2408        let _ = backend.set_nodelay(true);
2409
2410        // Build and send startup message to backend
2411        let params = &effective_params;
2412        let startup_bytes = Self::build_startup_message(params);
2413        backend
2414            .write_all(&startup_bytes)
2415            .await
2416            .map_err(|e| ProxyError::Network(format!("Backend startup write error: {}", e)))?;
2417
2418        // Forward authentication messages between client and backend.
2419        // Registers the backend's BackendKeyData so a later CancelRequest
2420        // can be routed back to this node.
2421        Self::proxy_authentication(client_stream, &mut backend, state, &node_addr).await?;
2422
2423        // Store session variables
2424        {
2425            let mut vars = session.variables.write().await;
2426            for (k, v) in params {
2427                vars.insert(k.clone(), v.clone());
2428            }
2429        }
2430
2431        Ok((Some(backend), node_addr))
2432    }
2433
2434    /// Build PostgreSQL startup message
2435    fn build_startup_message(params: &HashMap<String, String>) -> Vec<u8> {
2436        let mut payload = BytesMut::new();
2437
2438        // Protocol version 3.0
2439        payload.put_u32(196608);
2440
2441        // Parameters
2442        for (key, value) in params {
2443            payload.extend_from_slice(key.as_bytes());
2444            payload.put_u8(0);
2445            payload.extend_from_slice(value.as_bytes());
2446            payload.put_u8(0);
2447        }
2448        payload.put_u8(0); // Terminator
2449
2450        // Build complete message with length prefix
2451        let mut msg = BytesMut::new();
2452        msg.put_u32((payload.len() + 4) as u32);
2453        msg.extend_from_slice(&payload);
2454
2455        msg.to_vec()
2456    }
2457
2458    /// Cap on the cancel-key map; the oldest entries are evicted on overflow
2459    /// (a dropped stale entry only means one best-effort cancel is not
2460    /// forwarded).
2461    const MAX_CANCEL_KEYS: usize = 100_000;
2462
2463    /// Timeout for a single backend write on the forward path — a blackholed or
2464    /// hung backend must never pin a client task indefinitely. Backend reads
2465    /// are already bounded (30s); this bounds writes symmetrically.
2466    const BACKEND_WRITE_TIMEOUT: Duration = Duration::from_secs(30);
2467    /// Timeout for a single client write — a wedged or very slow client must
2468    /// not pin a proxy task (and the backend connection it holds) forever.
2469    const CLIENT_WRITE_TIMEOUT: Duration = Duration::from_secs(60);
2470    /// Timeout for the out-of-band re-prepare exchange (write Parse+Flush, read
2471    /// ParseComplete) performed on a backend connection switch.
2472    const REPREPARE_TIMEOUT: Duration = Duration::from_secs(15);
2473    /// Per-session cap on distinct named prepared statements — bounds the
2474    /// `stmt_registry` against a client that issues unbounded `Parse`s.
2475    const MAX_PREPARED_STATEMENTS: usize = 8192;
2476    /// Per-session cap on the aggregate bytes retained in `stmt_registry`. The
2477    /// count cap alone does not bound memory: each entry holds the full encoded
2478    /// `Parse`, so 8192 large statements could still retain multiple GiB. This
2479    /// bounds the total held bytes (statements are tiny in practice; a session
2480    /// that approaches this is pathological).
2481    const MAX_PREPARED_BYTES: usize = 64 * 1024 * 1024;
2482    /// Per-session cap on the un-flushed extended-protocol `pending` buffer: a
2483    /// client must reach a Sync/Flush boundary before this many bytes pile up.
2484    const MAX_PENDING_BYTES: usize = 64 * 1024 * 1024;
2485    /// Global ceiling on idle connections parked in the data-path backend pool
2486    /// across ALL `(node,user,db)` identities — bounds total file descriptors
2487    /// regardless of how many distinct identities connect.
2488    #[cfg(feature = "pool-modes")]
2489    const MAX_TOTAL_IDLE_BACKEND_CONNS: usize = 8192;
2490    /// How often the idle-connection reaper runs.
2491    const POOL_REAP_INTERVAL: Duration = Duration::from_secs(30);
2492
2493    /// Record the backend that owns a BackendKeyData (pid, secret) pair.
2494    fn register_cancel_key(state: &Arc<ServerState>, pid: u32, key: u32, node_addr: &str) {
2495        // FIFO-evict the oldest registrations when at capacity, rather than
2496        // dropping all of them. Evict a small batch so we don't churn the lock
2497        // on every insert once full.
2498        {
2499            let mut order = state.cancel_order.lock();
2500            while state.cancel_map.len() >= Self::MAX_CANCEL_KEYS {
2501                match order.pop_front() {
2502                    Some(old) => {
2503                        state.cancel_map.remove(&old);
2504                    }
2505                    None => {
2506                        // Order queue empty but map full (shouldn't happen) —
2507                        // fall back to a clear to stay bounded.
2508                        state.cancel_map.clear();
2509                        break;
2510                    }
2511                }
2512            }
2513            order.push_back((pid, key));
2514        }
2515        state.cancel_map.insert((pid, key), node_addr.to_string());
2516    }
2517
2518    /// Forward a client CancelRequest to the backend that issued the
2519    /// matching BackendKeyData. Best-effort: unknown keys are ignored.
2520    async fn forward_cancel_request(state: &Arc<ServerState>, pid: u32, key: u32) {
2521        let Some(addr) = state.cancel_map.get(&(pid, key)).map(|e| e.clone()) else {
2522            tracing::debug!(pid, "cancel request for unknown key; ignoring");
2523            return;
2524        };
2525        // CancelRequest: int32 len(16) + int32 code(80877102) + pid + key.
2526        let mut msg = BytesMut::with_capacity(16);
2527        msg.put_u32(16);
2528        msg.put_u32(80877102);
2529        msg.put_u32(pid);
2530        msg.put_u32(key);
2531        match tokio::time::timeout(Duration::from_secs(5), TcpStream::connect(&addr)).await {
2532            Ok(Ok(mut conn)) => {
2533                let _ = conn.set_nodelay(true);
2534                if let Err(e) = conn.write_all(&msg).await {
2535                    tracing::warn!(node = %addr, error = %e, "failed to forward CancelRequest");
2536                }
2537                // PG closes the connection after handling a CancelRequest.
2538            }
2539            other => {
2540                tracing::warn!(node = %addr, ?other, "could not connect to forward CancelRequest")
2541            }
2542        }
2543    }
2544
2545    /// Proxy authentication messages between client and backend
2546    async fn proxy_authentication(
2547        client_stream: &mut ClientStream,
2548        backend_stream: &mut TcpStream,
2549        state: &Arc<ServerState>,
2550        node_addr: &str,
2551    ) -> Result<()> {
2552        let codec = ProtocolCodec::new();
2553        let mut backend_buffer = BytesMut::with_capacity(4096);
2554        let mut client_buffer = BytesMut::with_capacity(4096);
2555        let mut read_buf = vec![0u8; 4096];
2556
2557        loop {
2558            // Read from backend
2559            let n = backend_stream
2560                .read(&mut read_buf)
2561                .await
2562                .map_err(|e| ProxyError::Network(format!("Backend auth read error: {}", e)))?;
2563
2564            if n == 0 {
2565                return Err(ProxyError::Connection(
2566                    "Backend closed during auth".to_string(),
2567                ));
2568            }
2569
2570            backend_buffer.extend_from_slice(&read_buf[..n]);
2571
2572            // Forward all data to client
2573            client_stream
2574                .write_all(&read_buf[..n])
2575                .await
2576                .map_err(|e| ProxyError::Network(format!("Client auth write error: {}", e)))?;
2577
2578            // Check for authentication complete or error. Bytes were
2579            // already forwarded above, so frames are consumed (decoded
2580            // once) straight out of the buffer — no clone needed.
2581            while let Some(msg) = codec.decode_message(&mut backend_buffer)? {
2582                match msg.msg_type {
2583                    MessageType::BackendKeyData
2584                        // The backend told the client how to cancel its
2585                        // queries; remember which backend owns that key so
2586                        // an out-of-band CancelRequest can be forwarded.
2587                        if msg.payload.len() >= 8 => {
2588                            let pid = u32::from_be_bytes([
2589                                msg.payload[0], msg.payload[1], msg.payload[2], msg.payload[3],
2590                            ]);
2591                            let key = u32::from_be_bytes([
2592                                msg.payload[4], msg.payload[5], msg.payload[6], msg.payload[7],
2593                            ]);
2594                            Self::register_cancel_key(state, pid, key, node_addr);
2595                        }
2596                    MessageType::AuthRequest
2597                        // Check if auth OK
2598                        if msg.payload.len() >= 4 => {
2599                            let auth_type =
2600                                i32::from_be_bytes([msg.payload[0], msg.payload[1], msg.payload[2], msg.payload[3]]);
2601                            if auth_type == 0 {
2602                                // AuthenticationOk - continue to read ReadyForQuery
2603                            }
2604                        }
2605                    MessageType::ReadyForQuery => {
2606                        // Authentication complete
2607                        return Ok(());
2608                    }
2609                    MessageType::ErrorResponse => {
2610                        // Authentication failed - error already sent to client
2611                        return Err(ProxyError::Auth("Authentication failed".to_string()));
2612                    }
2613                    _ => {
2614                        // Continue forwarding
2615                    }
2616                }
2617            }
2618
2619            // If backend requires password, forward client's response
2620            // Read password from client if needed
2621            let n = tokio::time::timeout(
2622                Duration::from_millis(100),
2623                client_stream.read(&mut read_buf),
2624            )
2625            .await;
2626
2627            if let Ok(Ok(n)) = n {
2628                if n > 0 {
2629                    client_buffer.extend_from_slice(&read_buf[..n]);
2630                    backend_stream
2631                        .write_all(&read_buf[..n])
2632                        .await
2633                        .map_err(|e| {
2634                            ProxyError::Network(format!("Backend password write error: {}", e))
2635                        })?;
2636                }
2637            }
2638        }
2639    }
2640
2641    /// Decide which node a request should be routed to, without doing any
2642    /// I/O. Reuses `current_node` when it is healthy and role-compatible
2643    /// (sticky session), otherwise selects a fresh primary/read node. The
2644    /// returned address is the key into the per-session connection cache.
2645    async fn choose_target_node(
2646        is_write: bool,
2647        forced_target: Option<String>,
2648        current_node: Option<&str>,
2649        session: &Arc<ClientSession>,
2650        state: &Arc<ServerState>,
2651        config: &ProxyConfig,
2652    ) -> Result<String> {
2653        // After a migration cutover, every request stays on the promoted
2654        // target — never route back to the former primary.
2655        if let Some(t) = state.cutover.load_full().as_ref() {
2656            return Ok(t.addr.clone());
2657        }
2658
2659        // Read-your-writes: within the window after a write, a read is pinned to
2660        // the primary (overriding the reuse-of-a-standby path) so the client
2661        // observes its own writes despite replica lag.
2662        #[cfg(feature = "lag-routing")]
2663        if !is_write && forced_target.is_none() && config.lag_routing.enabled {
2664            let last_write = *session.last_write_at.read().await;
2665            if Self::ryw_pins_primary(last_write, config.lag_routing.ryw_window_ms) {
2666                tracing::debug!(target: "helios::routing", "read-your-writes: pinning read to primary");
2667                return Self::select_primary_with_timeout(session, state, config).await;
2668            }
2669        }
2670
2671        let need_switch = if let Some(ref forced) = forced_target {
2672            let health = state.health.load_full();
2673            let reuse = current_node
2674                .map(|c| c == forced && health.get(c).map(|h| h.healthy).unwrap_or(false))
2675                .unwrap_or(false);
2676            !reuse
2677        } else if let Some(current) = current_node {
2678            let health = state.health.load_full();
2679            let current_healthy = health.get(current).map(|h| h.healthy).unwrap_or(false);
2680            if !current_healthy {
2681                true
2682            } else if is_write {
2683                let is_primary = config
2684                    .nodes
2685                    .iter()
2686                    .find(|n| n.address() == *current)
2687                    .map(|n| n.role == NodeRole::Primary)
2688                    .unwrap_or(false);
2689                !is_primary
2690            } else {
2691                false
2692            }
2693        } else {
2694            true
2695        };
2696
2697        if let Some(forced) = forced_target {
2698            // Resolve a node *name* to its address; an address is passed
2699            // through unchanged. This lets `/*helios:node=pg-standby*/` (and a
2700            // plugin `Node("name")`) target a node by its configured name
2701            // rather than requiring the raw host:port.
2702            let resolved = config
2703                .nodes
2704                .iter()
2705                .find(|n| n.name.as_deref() == Some(forced.as_str()) || n.address() == forced)
2706                .map(|n| n.address())
2707                .unwrap_or(forced);
2708            Ok(resolved)
2709        } else if need_switch {
2710            if is_write {
2711                Self::select_primary_with_timeout(session, state, config).await
2712            } else {
2713                Self::select_read_node(session, state, config).await
2714            }
2715        } else {
2716            Ok(current_node.unwrap().to_string())
2717        }
2718    }
2719
2720    /// Ensure the per-session cache holds an authenticated backend connection
2721    /// to `target`, dialing + silently re-authenticating one (with the
2722    /// client's pass-through credentials) only if absent. The cached
2723    /// connection is then reused across read/write route switches.
2724    async fn ensure_conn(
2725        conns: &mut HashMap<String, BackendConn>,
2726        target: &str,
2727        session: &Arc<ClientSession>,
2728        config: &ProxyConfig,
2729        _state: &Arc<ServerState>,
2730    ) -> Result<()> {
2731        if conns.contains_key(target) {
2732            return Ok(());
2733        }
2734
2735        // Transaction/Statement pooling: lease a parked, identity-matched
2736        // connection before paying for a fresh TCP connect + startup + auth.
2737        // The parked connection was `DISCARD ALL`-reset on release, so it is
2738        // clean for this (same-identity) client.
2739        #[cfg(feature = "pool-modes")]
2740        if let Some(pool) = _state.backend_pool.as_ref() {
2741            let key = Self::pool_key_for(target, session).await;
2742            if let Some(stream) = pool.checkout(&key) {
2743                tracing::info!(
2744                    target: "helios::pool",
2745                    node = %target,
2746                    "reused pooled backend connection"
2747                );
2748                conns.insert(target.to_string(), BackendConn::new(stream));
2749                return Ok(());
2750            }
2751        }
2752
2753        let mut backend =
2754            tokio::time::timeout(config.pool.acquire_timeout(), TcpStream::connect(target))
2755                .await
2756                .map_err(|_| ProxyError::Connection(format!("Connection timeout to {}", target)))?
2757                .map_err(|e| {
2758                    ProxyError::Connection(format!("Failed to connect to {}: {}", target, e))
2759                })?;
2760        let _ = backend.set_nodelay(true);
2761
2762        let params = session.variables.read().await.clone();
2763        let startup = Self::build_startup_message(&params);
2764        backend
2765            .write_all(&startup)
2766            .await
2767            .map_err(|e| ProxyError::Network(format!("Backend startup error: {}", e)))?;
2768        Self::complete_backend_auth(&mut backend).await?;
2769        #[cfg(feature = "pool-modes")]
2770        if _state.backend_pool.is_some() {
2771            tracing::debug!(target: "helios::pool", node = %target, "dialed fresh backend connection (pool miss)");
2772        }
2773        tracing::debug!(node = %target, "opened backend connection");
2774        conns.insert(target.to_string(), BackendConn::new(backend));
2775        Ok(())
2776    }
2777
2778    /// Build the `(node, user, database)` pool key for the current session's
2779    /// connection identity. Connections are reused only within an identity, so
2780    /// a borrower always matches the principal the parked connection was
2781    /// authenticated as.
2782    #[cfg(feature = "pool-modes")]
2783    async fn pool_key_for(target: &str, session: &Arc<ClientSession>) -> String {
2784        let vars = session.variables.read().await;
2785        let user = vars.get("user").map(|s| s.as_str()).unwrap_or("");
2786        // PostgreSQL defaults the database to the role name when unset.
2787        let database = vars.get("database").map(|s| s.as_str()).unwrap_or(user);
2788        crate::pool::pool_key(target, user, database)
2789    }
2790
2791    /// Reset a backend connection to a clean session state before parking it
2792    /// for reuse: runs the configured reset query (default `DISCARD ALL`,
2793    /// which deallocates prepared statements, drops temp tables, resets GUCs
2794    /// and advisory locks) and drains its response to `ReadyForQuery`. Returns
2795    /// `Err` if the connection is unhealthy — the caller then drops (closes)
2796    /// it instead of parking a poisoned connection.
2797    #[cfg(feature = "pool-modes")]
2798    async fn reset_backend(stream: &mut TcpStream, reset_sql: &str) -> Result<()> {
2799        let msg = crate::protocol::QueryMessage {
2800            query: reset_sql.to_string(),
2801        }
2802        .encode();
2803        stream
2804            .write_all(&msg.encode())
2805            .await
2806            .map_err(|e| ProxyError::Network(format!("reset write error: {}", e)))?;
2807
2808        let codec = ProtocolCodec::new();
2809        let mut buffer = BytesMut::with_capacity(1024);
2810        let mut read_buf = vec![0u8; 1024];
2811        loop {
2812            while let Some(m) = codec.decode_message(&mut buffer)? {
2813                if m.msg_type == MessageType::ReadyForQuery {
2814                    return Ok(());
2815                }
2816            }
2817            let n = tokio::time::timeout(Duration::from_secs(5), stream.read(&mut read_buf))
2818                .await
2819                .map_err(|_| ProxyError::Network("reset drain timeout".to_string()))?
2820                .map_err(|e| ProxyError::Network(format!("reset drain read error: {}", e)))?;
2821            if n == 0 {
2822                return Err(ProxyError::Connection(
2823                    "backend closed during reset".to_string(),
2824                ));
2825            }
2826            buffer.extend_from_slice(&read_buf[..n]);
2827        }
2828    }
2829
2830    /// Transaction/Statement pooling release point: when the session is at an
2831    /// idle boundary (`ReadyForQuery` reported not-in-transaction), reset the
2832    /// just-used connection and park it for reuse by the next same-identity
2833    /// client. A no-op in Session mode or when the feature is off. Never
2834    /// releases mid-transaction.
2835    #[cfg(feature = "pool-modes")]
2836    async fn release_to_pool_if_idle(
2837        conns: &mut HashMap<String, BackendConn>,
2838        node: Option<&str>,
2839        session: &Arc<ClientSession>,
2840        state: &Arc<ServerState>,
2841        config: &ProxyConfig,
2842    ) {
2843        let Some(pool) = state.backend_pool.as_ref() else {
2844            return;
2845        };
2846        let Some(node) = node else {
2847            return;
2848        };
2849        // Only release at a clean transaction boundary.
2850        if session.tx_state.read().await.in_transaction {
2851            return;
2852        }
2853        let Some(mut bc) = conns.remove(node) else {
2854            return;
2855        };
2856        if Self::reset_backend(&mut bc.stream, &config.pool_mode.reset_query)
2857            .await
2858            .is_ok()
2859        {
2860            let key = Self::pool_key_for(node, session).await;
2861            if pool.checkin(&key, bc.stream) {
2862                tracing::debug!(target: "helios::pool", node = %node, "parked backend connection for reuse");
2863            }
2864        }
2865        // On reset failure the connection is dropped here (closed).
2866    }
2867
2868    /// Forward a simple-query (`Query`) message and stream its response back
2869    /// to the client frame-by-frame, ending at ReadyForQuery. Picks (and, if
2870    /// needed, opens) the target node's connection from the per-session
2871    /// cache. Returns `(Some(node_used), bytes)` — `None` node means the
2872    /// request was short-circuited (plugin block) without touching a backend.
2873    async fn forward_simple_query(
2874        client: &mut ClientStream,
2875        msg: &Message,
2876        conns: &mut HashMap<String, BackendConn>,
2877        current_node: Option<&str>,
2878        session: &Arc<ClientSession>,
2879        state: &Arc<ServerState>,
2880        config: &ProxyConfig,
2881    ) -> Result<(Option<String>, u64)> {
2882        // Rate-limit gate: deny before any backend selection.
2883        #[cfg(feature = "rate-limiting")]
2884        if let Some(mut resp) = Self::rate_limit_check(session, state, config).await {
2885            resp.extend_from_slice(&Self::create_ready_for_query(b'I'));
2886            client
2887                .write_all(&resp)
2888                .await
2889                .map_err(|e| ProxyError::Network(format!("Client write error: {}", e)))?;
2890            return Ok((None, resp.len() as u64));
2891        }
2892
2893        let default_is_write = Self::is_write_message(msg);
2894        let plugin_override = Self::apply_route_hook(msg, state, session);
2895
2896        // Block short-circuits before any backend selection.
2897        if let RouteOverride::Block(reason) = plugin_override {
2898            let mut response = Vec::with_capacity(64 + reason.len());
2899            response.extend_from_slice(&Self::create_error_response(
2900                "42000",
2901                &format!("Query blocked by route plugin: {}", reason),
2902            ));
2903            response.extend_from_slice(&Self::create_ready_for_query(b'I'));
2904            client
2905                .write_all(&response)
2906                .await
2907                .map_err(|e| ProxyError::Network(format!("Client write error: {}", e)))?;
2908            return Ok((None, response.len() as u64));
2909        }
2910
2911        // SQL-comment routing hints (feature + `[routing_hints] enabled`)
2912        // refine the override, recompute the write flag on the stripped SQL,
2913        // and may rewrite the message to drop the hint comment.
2914        #[cfg(feature = "routing-hints")]
2915        let (route_override, default_is_write, stripped_msg) =
2916            Self::resolve_simple_route(msg, plugin_override, default_is_write, state);
2917        #[cfg(not(feature = "routing-hints"))]
2918        let (route_override, stripped_msg): (RouteOverride, Option<Message>) =
2919            (plugin_override, None);
2920
2921        let (is_write, forced_target) = match route_override {
2922            RouteOverride::None => (default_is_write, None),
2923            RouteOverride::Primary => (true, None),
2924            RouteOverride::Standby => (false, None),
2925            RouteOverride::Node(name) => (default_is_write, Some(name)),
2926            RouteOverride::Block(_) => unreachable!("handled above"),
2927        };
2928
2929        // Read-your-writes: stamp the session on a write so subsequent reads
2930        // pin to the primary for the configured window.
2931        #[cfg(feature = "lag-routing")]
2932        if is_write && config.lag_routing.enabled {
2933            *session.last_write_at.write().await = Some(std::time::Instant::now());
2934        }
2935
2936        // Forward the stripped message when routing-hints rewrote it, else the
2937        // original (borrowed, no copy).
2938        let forward_msg = stripped_msg.as_ref().unwrap_or(msg);
2939
2940        // Query rewriting: apply rules to the SQL; if any rule fired, forward a
2941        // rebuilt Query carrying the rewritten SQL (so caching + the backend
2942        // both see the rewritten form).
2943        #[cfg(feature = "query-rewriting")]
2944        let rewritten_msg: Option<Message> = state.rewriter.as_ref().and_then(|rw| {
2945            let sql = crate::protocol::query_text(&forward_msg.payload)?;
2946            match rw.rewrite(sql) {
2947                Ok(res) if res.was_rewritten() => {
2948                    tracing::debug!(target: "helios::rewrite", rules = ?res.rules_applied, "query rewritten");
2949                    Some(crate::protocol::QueryMessage { query: res.query().to_string() }.encode())
2950                }
2951                _ => None,
2952            }
2953        });
2954        #[cfg(feature = "query-rewriting")]
2955        let forward_msg = rewritten_msg.as_ref().unwrap_or(forward_msg);
2956
2957        // Multi-tenancy: resolve the session's tenant and inject a row-level
2958        // tenant filter. Done BEFORE the cache lookup so each tenant's results
2959        // are cached under their own (filtered) SQL — no cross-tenant leakage.
2960        #[cfg(feature = "multi-tenancy")]
2961        let tenant_msg: Option<Message> = if let Some(tm) = state.tenant_manager.as_ref() {
2962            match crate::protocol::query_text(&forward_msg.payload) {
2963                Some(sql) => {
2964                    let ctx = Self::tenant_request_ctx(session).await;
2965                    match tm.identify_tenant(&ctx) {
2966                        Some(tenant) => {
2967                            let res = tm.transform_query(sql, &tenant);
2968                            if res.transformed {
2969                                tracing::debug!(target: "helios::tenant", tenant = %tenant.0, "tenant filter injected");
2970                                Some(crate::protocol::QueryMessage { query: res.query }.encode())
2971                            } else {
2972                                None
2973                            }
2974                        }
2975                        None => None,
2976                    }
2977                }
2978                None => None,
2979            }
2980        } else {
2981            None
2982        };
2983        #[cfg(feature = "multi-tenancy")]
2984        let forward_msg = tenant_msg.as_ref().unwrap_or(forward_msg);
2985
2986        // Query cache: on a cacheable read, a hit is served from cache with no
2987        // backend round-trip; on a miss we keep the context to store the result.
2988        #[cfg(feature = "query-cache")]
2989        let cache_ctx: Option<crate::cache::CacheContext> = if is_write {
2990            None
2991        } else if let Some(qc) = state.query_cache.as_ref() {
2992            let sql = crate::protocol::query_text(&forward_msg.payload).unwrap_or("");
2993            match Self::cacheable_read_ctx(session, sql).await {
2994                Some(ctx) => {
2995                    if let crate::cache::CacheLookup::Hit { result, level } =
2996                        qc.get(sql, &ctx).await
2997                    {
2998                        tracing::debug!(target: "helios::cache", level = %level, "cache hit");
2999                        client.write_all(&result.data).await.map_err(|e| {
3000                            ProxyError::Network(format!("Client write error: {}", e))
3001                        })?;
3002                        return Ok((None, result.data.len() as u64));
3003                    }
3004                    Some(ctx)
3005                }
3006                None => None,
3007            }
3008        } else {
3009            None
3010        };
3011
3012        // Schema/workload routing: pin an analytical (OLAP) read to the
3013        // configured analytics node, unless something already forced a target.
3014        #[cfg(feature = "schema-routing")]
3015        let forced_target = match state.schema_analyzer.as_ref() {
3016            Some(analyzer)
3017                if forced_target.is_none()
3018                    && !is_write
3019                    && !config.schema_routing.analytics_node.is_empty() =>
3020            {
3021                match crate::protocol::query_text(&forward_msg.payload) {
3022                    Some(sql) if analyzer.analyze(sql).is_analytics() => {
3023                        tracing::debug!(target: "helios::schema", "OLAP query routed to analytics node");
3024                        Some(config.schema_routing.analytics_node.clone())
3025                    }
3026                    _ => forced_target,
3027                }
3028            }
3029            _ => forced_target,
3030        };
3031
3032        // Analytics: capture the forwarded SQL + start the latency timer.
3033        #[cfg(feature = "query-analytics")]
3034        let analytics_sql =
3035            crate::protocol::query_text(&forward_msg.payload).map(|s| s.to_string());
3036        #[cfg(feature = "query-analytics")]
3037        let started = std::time::Instant::now();
3038
3039        let target = Self::choose_target_node(
3040            is_write,
3041            forced_target,
3042            current_node,
3043            session,
3044            state,
3045            config,
3046        )
3047        .await?;
3048        tracing::debug!(target: "helios::routing", node = %target, is_write, "routed simple query");
3049
3050        // Circuit breaker: fast-fail when the chosen node's circuit is open.
3051        #[cfg(feature = "circuit-breaker")]
3052        if let Some(mut resp) = Self::circuit_fast_fail(state, &target) {
3053            resp.extend_from_slice(&Self::create_ready_for_query(b'I'));
3054            client
3055                .write_all(&resp)
3056                .await
3057                .map_err(|e| ProxyError::Network(format!("Client write error: {}", e)))?;
3058            return Ok((None, resp.len() as u64));
3059        }
3060
3061        // A connect/auth failure trips the breaker (and is propagated as today).
3062        if let Err(e) = Self::ensure_conn(conns, &target, session, config, state).await {
3063            Self::record_backend_failure(state, &target, &e.to_string());
3064            return Err(e);
3065        }
3066        let backend = conns.get_mut(&target).expect("just ensured");
3067
3068        let backend_err = match tokio::time::timeout(
3069            Self::BACKEND_WRITE_TIMEOUT,
3070            backend.stream.write_all(&forward_msg.encode()),
3071        )
3072        .await
3073        {
3074            Ok(Ok(())) => None,
3075            Ok(Err(e)) => Some(format!("Backend write error: {}", e)),
3076            Err(_) => Some("Backend write timeout".to_string()),
3077        };
3078        if let Some(msg) = backend_err {
3079            let e = ProxyError::Network(msg);
3080            conns.remove(&target);
3081            Self::record_backend_failure(state, &target, &e.to_string());
3082            return Err(e);
3083        }
3084
3085        // Cacheable read miss: capture the response frames and store them so a
3086        // later identical read is served from cache without a backend hit.
3087        #[cfg(feature = "query-cache")]
3088        if let (Some(ctx), Some(qc)) = (cache_ctx.as_ref(), state.query_cache.as_ref()) {
3089            return match Self::stream_until_ready_capture(client, &mut backend.stream, session)
3090                .await
3091            {
3092                Ok((sent, captured, cacheable, rows)) => {
3093                    #[cfg(feature = "circuit-breaker")]
3094                    Self::circuit_record(state, &target, true, "");
3095                    if cacheable && !captured.is_empty() {
3096                        let sql = crate::protocol::query_text(&forward_msg.payload).unwrap_or("");
3097                        qc.put(
3098                            sql,
3099                            ctx,
3100                            bytes::Bytes::from(captured),
3101                            rows,
3102                            std::time::Duration::ZERO,
3103                        )
3104                        .await;
3105                    }
3106                    #[cfg(feature = "query-analytics")]
3107                    if let Some(sql) = analytics_sql.as_deref() {
3108                        Self::record_analytics(
3109                            state,
3110                            session,
3111                            sql,
3112                            &target,
3113                            started.elapsed(),
3114                            None,
3115                        )
3116                        .await;
3117                    }
3118                    Ok((Some(target), sent))
3119                }
3120                Err(e) => {
3121                    conns.remove(&target);
3122                    Self::record_backend_failure(state, &target, &e.to_string());
3123                    Err(e)
3124                }
3125            };
3126        }
3127
3128        match Self::stream_until_ready(client, &mut backend.stream, session, state).await {
3129            Ok(sent) => {
3130                #[cfg(feature = "circuit-breaker")]
3131                Self::circuit_record(state, &target, true, "");
3132                // Invalidate cached reads referencing tables this write touched.
3133                #[cfg(feature = "query-cache")]
3134                if is_write {
3135                    if let Some(qc) = state.query_cache.as_ref() {
3136                        let sql = crate::protocol::query_text(&forward_msg.payload).unwrap_or("");
3137                        qc.invalidate_query(sql).await;
3138                    }
3139                }
3140                // Transaction Replay: journal the write for failover/time-travel.
3141                #[cfg(feature = "ha-tr")]
3142                if is_write && config.tr_enabled {
3143                    if let Some(sql) = crate::protocol::query_text(&forward_msg.payload) {
3144                        Self::journal_write(state, session, sql).await;
3145                    }
3146                }
3147                #[cfg(feature = "query-analytics")]
3148                if let Some(sql) = analytics_sql.as_deref() {
3149                    Self::record_analytics(state, session, sql, &target, started.elapsed(), None)
3150                        .await;
3151                }
3152                Ok((Some(target), sent))
3153            }
3154            Err(e) => {
3155                // Drop the broken connection so the next use redials.
3156                conns.remove(&target);
3157                Self::record_backend_failure(state, &target, &e.to_string());
3158                #[cfg(feature = "query-analytics")]
3159                if let Some(sql) = analytics_sql.as_deref() {
3160                    Self::record_analytics(
3161                        state,
3162                        session,
3163                        sql,
3164                        &target,
3165                        started.elapsed(),
3166                        Some(e.to_string()),
3167                    )
3168                    .await;
3169                }
3170                Err(e)
3171            }
3172        }
3173    }
3174
3175    /// Forward an accumulated extended-protocol batch (Parse/Bind/Describe/
3176    /// Execute/Close terminated by Sync or Flush) and stream the response.
3177    /// Routing is taken from `route_sql` (the first Parse's SQL); when it is
3178    /// `None` (a re-Bind/Execute of a named prepared statement) the request
3179    /// stays on the connection the statement was prepared on — no switch.
3180    ///
3181    /// `reprepare` lists named statements this batch references but does not
3182    /// itself define; any that the chosen connection has not seen are
3183    /// re-prepared from `registry` (their original `Parse`) before the batch is
3184    /// sent, so a named statement survives a backend switch/redial (Batch F.4).
3185    /// `defines` are the named statements this batch's own `Parse`s create —
3186    /// recorded against the connection once it accepts the batch.
3187    #[allow(clippy::too_many_arguments)]
3188    async fn forward_extended_batch(
3189        client: &mut ClientStream,
3190        batch: &[u8],
3191        route_sql: Option<&str>,
3192        wait_ready: bool,
3193        conns: &mut HashMap<String, BackendConn>,
3194        current_node: Option<&str>,
3195        registry: &HashMap<String, bytes::Bytes>,
3196        reprepare: &[String],
3197        defines: &[String],
3198        unnamed: Option<(bytes::Bytes, bytes::Bytes)>,
3199        session: &Arc<ClientSession>,
3200        state: &Arc<ServerState>,
3201        config: &ProxyConfig,
3202    ) -> Result<(Option<String>, u64)> {
3203        // Rate-limit gate. The terminating ReadyForQuery is only appended when
3204        // the batch carried a Sync (`wait_ready`); a Flush-terminated batch
3205        // expects an ErrorResponse with no ReadyForQuery.
3206        #[cfg(feature = "rate-limiting")]
3207        if let Some(mut resp) = Self::rate_limit_check(session, state, config).await {
3208            if wait_ready {
3209                resp.extend_from_slice(&Self::create_ready_for_query(b'I'));
3210            }
3211            client
3212                .write_all(&resp)
3213                .await
3214                .map_err(|e| ProxyError::Network(format!("Client write error: {}", e)))?;
3215            return Ok((None, resp.len() as u64));
3216        }
3217
3218        // Analytics: the routable SQL (first Parse) + latency timer.
3219        #[cfg(feature = "query-analytics")]
3220        let analytics_sql = route_sql.map(|s| s.to_string());
3221        #[cfg(feature = "query-analytics")]
3222        let started = std::time::Instant::now();
3223
3224        let target = match route_sql {
3225            Some(sql) => {
3226                // Routing-hints, when active, can override the verb-based
3227                // target (and recompute the write flag on the stripped SQL).
3228                #[cfg(feature = "routing-hints")]
3229                let (is_write, forced) = Self::extended_hint_route(state, sql)
3230                    .unwrap_or_else(|| (Self::is_write_query(sql), None));
3231                #[cfg(not(feature = "routing-hints"))]
3232                let (is_write, forced): (bool, Option<String>) = (Self::is_write_query(sql), None);
3233                #[cfg(feature = "lag-routing")]
3234                if is_write && config.lag_routing.enabled {
3235                    *session.last_write_at.write().await = Some(std::time::Instant::now());
3236                }
3237                Self::choose_target_node(is_write, forced, current_node, session, state, config)
3238                    .await?
3239            }
3240            // No Parse in this batch: stay on the prepared-statement /
3241            // portal connection. Fall back to a read node only if the
3242            // session has no current connection yet.
3243            None => match current_node {
3244                Some(c) => c.to_string(),
3245                None => Self::select_read_node(session, state, config).await?,
3246            },
3247        };
3248
3249        // Circuit breaker: fast-fail when the chosen node's circuit is open.
3250        #[cfg(feature = "circuit-breaker")]
3251        if let Some(mut resp) = Self::circuit_fast_fail(state, &target) {
3252            if wait_ready {
3253                resp.extend_from_slice(&Self::create_ready_for_query(b'I'));
3254            }
3255            client
3256                .write_all(&resp)
3257                .await
3258                .map_err(|e| ProxyError::Network(format!("Client write error: {}", e)))?;
3259            return Ok((None, resp.len() as u64));
3260        }
3261
3262        if let Err(e) = Self::ensure_conn(conns, &target, session, config, state).await {
3263            Self::record_backend_failure(state, &target, &e.to_string());
3264            return Err(e);
3265        }
3266        let backend = conns.get_mut(&target).expect("just ensured");
3267
3268        // Transparently re-prepare any referenced named statement this socket
3269        // is missing. Each is sent as its original `Parse` + `Flush`; the
3270        // resulting `ParseComplete` is consumed here so the client never sees
3271        // the extra round trip. A re-prepare failure recycles the connection.
3272        for name in reprepare {
3273            if backend.prepared.contains(name) {
3274                continue;
3275            }
3276            let Some(parse_bytes) = registry.get(name) else {
3277                continue; // unknown statement — let the batch surface the error
3278            };
3279            match Self::reprepare_statement(&mut backend.stream, parse_bytes).await {
3280                Ok(()) => {
3281                    backend.prepared.insert(name.clone());
3282                }
3283                Err(e) => {
3284                    conns.remove(&target);
3285                    return Err(e);
3286                }
3287            }
3288        }
3289
3290        // Unnamed-`Parse` promotion: if the held unnamed Parse matches what this
3291        // connection's unnamed statement already holds, skip forwarding it and
3292        // synthesize its `ParseComplete` to the client; otherwise forward it
3293        // first (re-establishing the connection's unnamed statement) and record
3294        // its signature. A fresh/redialed connection has no signature, so the
3295        // Parse is always (re)forwarded there — correctness is preserved.
3296        let mut inject_parse_complete = false;
3297        let mut new_unnamed_sig: Option<bytes::Bytes> = None;
3298        if let Some((parse_msg, sig)) = unnamed.as_ref() {
3299            if backend.unnamed_sig.as_deref() == Some(&sig[..]) {
3300                inject_parse_complete = true;
3301            } else {
3302                if let Err(e) = backend
3303                    .stream
3304                    .write_all(parse_msg)
3305                    .await
3306                    .map_err(|e| ProxyError::Network(format!("Backend write error: {}", e)))
3307                {
3308                    conns.remove(&target);
3309                    return Err(e);
3310                }
3311                new_unnamed_sig = Some(sig.clone());
3312            }
3313        }
3314
3315        let batch_err = match tokio::time::timeout(
3316            Self::BACKEND_WRITE_TIMEOUT,
3317            backend.stream.write_all(batch),
3318        )
3319        .await
3320        {
3321            Ok(Ok(())) => None,
3322            Ok(Err(e)) => Some(format!("Backend write error: {}", e)),
3323            Err(_) => Some("Backend write timeout".to_string()),
3324        };
3325        if let Some(msg) = batch_err {
3326            let e = ProxyError::Network(msg);
3327            conns.remove(&target);
3328            Self::record_backend_failure(state, &target, &e.to_string());
3329            return Err(e);
3330        }
3331
3332        // The client expects `ParseComplete` first; the backend won't send one
3333        // for a skipped Parse, so emit it here before relaying the response.
3334        let mut injected: u64 = 0;
3335        if inject_parse_complete {
3336            if let Err(e) = client
3337                .write_all(&[b'1', 0, 0, 0, 4])
3338                .await
3339                .map_err(|e| ProxyError::Network(format!("Client write error: {}", e)))
3340            {
3341                conns.remove(&target);
3342                return Err(e);
3343            }
3344            injected = 5;
3345        }
3346
3347        let r = if wait_ready {
3348            Self::stream_until_ready(client, &mut backend.stream, session, state).await
3349        } else {
3350            Self::stream_flush(client, &mut backend.stream, session, state).await
3351        };
3352        match r {
3353            Ok(sent) => {
3354                #[cfg(feature = "circuit-breaker")]
3355                Self::circuit_record(state, &target, true, "");
3356                #[cfg(feature = "query-analytics")]
3357                if let Some(sql) = analytics_sql.as_deref() {
3358                    Self::record_analytics(state, session, sql, &target, started.elapsed(), None)
3359                        .await;
3360                }
3361                // The connection now holds these named statements.
3362                for name in defines {
3363                    backend.prepared.insert(name.clone());
3364                }
3365                // ...and the (re)forwarded unnamed statement.
3366                if let Some(sig) = new_unnamed_sig {
3367                    backend.unnamed_sig = Some(sig);
3368                }
3369                Ok((Some(target), sent + injected))
3370            }
3371            Err(e) => {
3372                conns.remove(&target);
3373                Self::record_backend_failure(state, &target, &e.to_string());
3374                #[cfg(feature = "query-analytics")]
3375                if let Some(sql) = analytics_sql.as_deref() {
3376                    Self::record_analytics(
3377                        state,
3378                        session,
3379                        sql,
3380                        &target,
3381                        started.elapsed(),
3382                        Some(e.to_string()),
3383                    )
3384                    .await;
3385                }
3386                Err(e)
3387            }
3388        }
3389    }
3390
3391    /// Re-issue one named `Parse` on a backend socket out-of-band: send the
3392    /// original `Parse` bytes followed by a `Flush`, then read and discard the
3393    /// single `ParseComplete` the backend emits. The statement persists on the
3394    /// connection (the implicit transaction is closed later by the real
3395    /// batch's `Sync`). An `ErrorResponse` means the re-prepare failed.
3396    async fn reprepare_statement<S: AsyncReadExt + AsyncWriteExt + Unpin>(
3397        backend: &mut S,
3398        parse_bytes: &[u8],
3399    ) -> Result<()> {
3400        tokio::time::timeout(Self::REPREPARE_TIMEOUT, backend.write_all(parse_bytes))
3401            .await
3402            .map_err(|_| ProxyError::Network("re-prepare write timeout".to_string()))?
3403            .map_err(|e| ProxyError::Network(format!("re-prepare write error: {}", e)))?;
3404        // Flush: 'H' + length 4.
3405        tokio::time::timeout(
3406            Self::REPREPARE_TIMEOUT,
3407            backend.write_all(&[b'H', 0, 0, 0, 4]),
3408        )
3409        .await
3410        .map_err(|_| ProxyError::Network("re-prepare flush timeout".to_string()))?
3411        .map_err(|e| ProxyError::Network(format!("re-prepare flush error: {}", e)))?;
3412        let mtype =
3413            tokio::time::timeout(Self::REPREPARE_TIMEOUT, Self::read_one_frame_type(backend))
3414                .await
3415                .map_err(|_| ProxyError::Network("re-prepare read timeout".to_string()))??;
3416        match mtype {
3417            b'1' => Ok(()), // ParseComplete
3418            b'E' => Err(ProxyError::Protocol(
3419                "re-prepare rejected by backend".to_string(),
3420            )),
3421            other => Err(ProxyError::Protocol(format!(
3422                "unexpected re-prepare reply: {}",
3423                other as char
3424            ))),
3425        }
3426    }
3427
3428    /// Read exactly one backend message frame (5-byte header + body) and return
3429    /// its type byte, discarding the body. Used to consume the `ParseComplete`
3430    /// produced by an out-of-band re-prepare.
3431    async fn read_one_frame_type<S: AsyncReadExt + Unpin>(backend: &mut S) -> Result<u8> {
3432        let mut header = [0u8; 5];
3433        backend
3434            .read_exact(&mut header)
3435            .await
3436            .map_err(|e| ProxyError::Network(format!("re-prepare read error: {}", e)))?;
3437        let len = u32::from_be_bytes([header[1], header[2], header[3], header[4]]) as usize;
3438        let body_len = len.saturating_sub(4);
3439        if body_len > 0 {
3440            let mut body = vec![0u8; body_len];
3441            backend
3442                .read_exact(&mut body)
3443                .await
3444                .map_err(|e| ProxyError::Network(format!("re-prepare body read error: {}", e)))?;
3445        }
3446        Ok(header[0])
3447    }
3448
3449    /// Name a `Parse` defines: its first cstring. `""` is the unnamed
3450    /// statement, which is per-protocol transient and never tracked.
3451    fn parse_stmt_name(payload: &[u8]) -> &str {
3452        let end = payload.iter().position(|&b| b == 0).unwrap_or(0);
3453        std::str::from_utf8(&payload[..end]).unwrap_or("")
3454    }
3455
3456    /// Prepared-statement name a `Bind` references: the *second* cstring
3457    /// (portal name first, then statement name). `None` for the unnamed
3458    /// statement.
3459    fn bind_stmt_ref(payload: &[u8]) -> Option<&str> {
3460        let portal_end = payload.iter().position(|&b| b == 0)?;
3461        let rest = &payload[portal_end + 1..];
3462        let stmt_end = rest.iter().position(|&b| b == 0)?;
3463        let name = std::str::from_utf8(&rest[..stmt_end]).ok()?;
3464        (!name.is_empty()).then_some(name)
3465    }
3466
3467    /// Statement name a `Describe`/`Close` targets — only when it is
3468    /// statement-kind (`'S'`, not portal `'P'`). `None` otherwise.
3469    fn stmt_kind_name(payload: &[u8]) -> Option<&str> {
3470        if payload.first() != Some(&b'S') {
3471            return None;
3472        }
3473        let rest = &payload[1..];
3474        let end = rest.iter().position(|&b| b == 0)?;
3475        let name = std::str::from_utf8(&rest[..end]).ok()?;
3476        (!name.is_empty()).then_some(name)
3477    }
3478
3479    /// Stream backend response frames to the client until ReadyForQuery (end
3480    /// of a Sync/simple-query response). Forwards bytes verbatim, coalescing
3481    /// all currently-complete frames into one write and keeping only a
3482    /// partial-frame tail buffered, so proxy memory stays O(frame) rather
3483    /// than O(result). Also yields on CopyInResponse/CopyBothResponse so the
3484    /// client can supply COPY data. Updates `tx_state` from the RFQ status.
3485    /// Returns bytes streamed to the client.
3486    async fn stream_until_ready(
3487        client: &mut ClientStream,
3488        backend: &mut TcpStream,
3489        session: &Arc<ClientSession>,
3490        state: &Arc<ServerState>,
3491    ) -> Result<u64> {
3492        let _ = state;
3493        let mut buf = BytesMut::with_capacity(16384);
3494        let mut read_buf = vec![0u8; 16384];
3495        let mut sent: u64 = 0;
3496
3497        loop {
3498            // Walk complete frames in `buf`, stopping at a boundary frame.
3499            let mut consumed = 0usize;
3500            let mut ready_status: Option<u8> = None;
3501            let mut yield_for_copy = false;
3502            loop {
3503                let rem = &buf[consumed..];
3504                if rem.len() < 5 {
3505                    break;
3506                }
3507                let len = u32::from_be_bytes([rem[1], rem[2], rem[3], rem[4]]) as usize;
3508                if len < 4 || rem.len() < len + 1 {
3509                    break; // incomplete or malformed length — need more bytes
3510                }
3511                let frame_total = len + 1;
3512                let mtype = rem[0];
3513                consumed += frame_total;
3514                if mtype == b'Z' {
3515                    // ReadyForQuery: payload is one status byte at rem[5].
3516                    ready_status = Some(if frame_total >= 6 { rem[5] } else { b'I' });
3517                    break;
3518                }
3519                if mtype == b'G' || mtype == b'W' {
3520                    // CopyInResponse / CopyBothResponse: the backend now wants
3521                    // CopyData from the client — forward up to here and yield.
3522                    yield_for_copy = true;
3523                    break;
3524                }
3525            }
3526
3527            if consumed > 0 {
3528                tokio::time::timeout(
3529                    Self::CLIENT_WRITE_TIMEOUT,
3530                    client.write_all(&buf[..consumed]),
3531                )
3532                .await
3533                .map_err(|_| ProxyError::Network("Client write timeout".to_string()))?
3534                .map_err(|e| ProxyError::Network(format!("Client write error: {}", e)))?;
3535                sent += consumed as u64;
3536                let _ = buf.split_to(consumed);
3537            }
3538
3539            if let Some(status) = ready_status {
3540                let st = TransactionStatus::from_byte(status);
3541                let mut tx = session.tx_state.write().await;
3542                tx.in_transaction = st != TransactionStatus::Idle;
3543                return Ok(sent);
3544            }
3545            if yield_for_copy {
3546                return Ok(sent);
3547            }
3548
3549            let n = tokio::time::timeout(Duration::from_secs(30), backend.read(&mut read_buf))
3550                .await
3551                .map_err(|_| ProxyError::Network("Backend read timeout".to_string()))?
3552                .map_err(|e| ProxyError::Network(format!("Backend read error: {}", e)))?;
3553            if n == 0 {
3554                return Err(ProxyError::Connection(
3555                    "Backend closed mid-response".to_string(),
3556                ));
3557            }
3558            buf.extend_from_slice(&read_buf[..n]);
3559        }
3560    }
3561
3562    /// Like `stream_until_ready` but also captures the full response bytes for
3563    /// caching. Returns `(bytes_sent, captured, cacheable, row_count)`.
3564    /// `cacheable` is false if the response carried an `ErrorResponse`, ended in
3565    /// a non-idle transaction status, or yielded for COPY — none of which may
3566    /// be cached.
3567    #[cfg(feature = "query-cache")]
3568    async fn stream_until_ready_capture(
3569        client: &mut ClientStream,
3570        backend: &mut TcpStream,
3571        session: &Arc<ClientSession>,
3572    ) -> Result<(u64, Vec<u8>, bool, usize)> {
3573        let mut buf = BytesMut::with_capacity(16384);
3574        let mut read_buf = vec![0u8; 16384];
3575        let mut sent: u64 = 0;
3576        let mut captured: Vec<u8> = Vec::with_capacity(4096);
3577        let mut had_error = false;
3578        let mut row_count: usize = 0;
3579
3580        loop {
3581            let mut consumed = 0usize;
3582            let mut ready_status: Option<u8> = None;
3583            let mut yield_for_copy = false;
3584            loop {
3585                let rem = &buf[consumed..];
3586                if rem.len() < 5 {
3587                    break;
3588                }
3589                let len = u32::from_be_bytes([rem[1], rem[2], rem[3], rem[4]]) as usize;
3590                if len < 4 || rem.len() < len + 1 {
3591                    break;
3592                }
3593                let frame_total = len + 1;
3594                let mtype = rem[0];
3595                if mtype == b'E' {
3596                    had_error = true;
3597                }
3598                if mtype == b'C' {
3599                    // CommandComplete tag, e.g. "SELECT 5" — take the row count.
3600                    if let Some(tag) = rem.get(5..frame_total) {
3601                        if let Some(end) = tag.iter().position(|&b| b == 0) {
3602                            if let Ok(s) = std::str::from_utf8(&tag[..end]) {
3603                                if let Some(n) =
3604                                    s.rsplit(' ').next().and_then(|x| x.parse::<usize>().ok())
3605                                {
3606                                    row_count = n;
3607                                }
3608                            }
3609                        }
3610                    }
3611                }
3612                consumed += frame_total;
3613                if mtype == b'Z' {
3614                    ready_status = Some(if frame_total >= 6 { rem[5] } else { b'I' });
3615                    break;
3616                }
3617                if mtype == b'G' || mtype == b'W' {
3618                    yield_for_copy = true;
3619                    break;
3620                }
3621            }
3622
3623            if consumed > 0 {
3624                tokio::time::timeout(
3625                    Self::CLIENT_WRITE_TIMEOUT,
3626                    client.write_all(&buf[..consumed]),
3627                )
3628                .await
3629                .map_err(|_| ProxyError::Network("Client write timeout".to_string()))?
3630                .map_err(|e| ProxyError::Network(format!("Client write error: {}", e)))?;
3631                captured.extend_from_slice(&buf[..consumed]);
3632                sent += consumed as u64;
3633                let _ = buf.split_to(consumed);
3634            }
3635
3636            if let Some(status) = ready_status {
3637                let st = TransactionStatus::from_byte(status);
3638                let mut tx = session.tx_state.write().await;
3639                tx.in_transaction = st != TransactionStatus::Idle;
3640                let cacheable = !had_error && status == b'I';
3641                return Ok((sent, captured, cacheable, row_count));
3642            }
3643            if yield_for_copy {
3644                return Ok((sent, captured, false, row_count));
3645            }
3646
3647            let n = tokio::time::timeout(Duration::from_secs(30), backend.read(&mut read_buf))
3648                .await
3649                .map_err(|_| ProxyError::Network("Backend read timeout".to_string()))?
3650                .map_err(|e| ProxyError::Network(format!("Backend read error: {}", e)))?;
3651            if n == 0 {
3652                return Err(ProxyError::Connection(
3653                    "Backend closed mid-response".to_string(),
3654                ));
3655            }
3656            buf.extend_from_slice(&read_buf[..n]);
3657        }
3658    }
3659
3660    /// Stream whatever the backend has produced in response to a `Flush`
3661    /// (which, unlike `Sync`, produces no ReadyForQuery). Relays available
3662    /// bytes and returns once the backend goes briefly idle, so the loop can
3663    /// read the client's next frames — deadlock-free. The eventual `Sync`
3664    /// drains the final ReadyForQuery via `stream_until_ready`.
3665    async fn stream_flush(
3666        client: &mut ClientStream,
3667        backend: &mut TcpStream,
3668        session: &Arc<ClientSession>,
3669        state: &Arc<ServerState>,
3670    ) -> Result<u64> {
3671        let _ = (session, state);
3672        let mut read_buf = vec![0u8; 16384];
3673        let mut sent: u64 = 0;
3674        loop {
3675            match tokio::time::timeout(Duration::from_millis(200), backend.read(&mut read_buf))
3676                .await
3677            {
3678                Ok(Ok(0)) => {
3679                    return Err(ProxyError::Connection(
3680                        "Backend closed mid-flush".to_string(),
3681                    ))
3682                }
3683                Ok(Ok(n)) => {
3684                    client
3685                        .write_all(&read_buf[..n])
3686                        .await
3687                        .map_err(|e| ProxyError::Network(format!("Client write error: {}", e)))?;
3688                    sent += n as u64;
3689                }
3690                Ok(Err(e)) => {
3691                    return Err(ProxyError::Network(format!("Backend read error: {}", e)))
3692                }
3693                Err(_) => return Ok(sent), // idle: backend has emitted all flush output
3694            }
3695        }
3696    }
3697
3698    /// Check if a message is a write operation
3699    fn is_write_message(msg: &Message) -> bool {
3700        match msg.msg_type {
3701            MessageType::Query => {
3702                // Borrow the SQL straight out of the payload — the
3703                // message is forwarded verbatim, so no copy is needed
3704                // just to inspect the leading keyword.
3705                crate::protocol::query_text(&msg.payload)
3706                    .map(Self::is_write_query)
3707                    .unwrap_or(false)
3708            }
3709            MessageType::Parse => {
3710                // Parse payload = statement-name cstring + query
3711                // cstring; skip the name and borrow the query.
3712                msg.payload
3713                    .iter()
3714                    .position(|&b| b == 0)
3715                    .and_then(|end| crate::protocol::query_text(&msg.payload[end + 1..]))
3716                    .map(Self::is_write_query)
3717                    .unwrap_or(false)
3718            }
3719            // Execute, Bind, etc. maintain the current connection
3720            _ => false,
3721        }
3722    }
3723
3724    /// Check if SQL query is a write operation
3725    fn is_write_query(sql: &str) -> bool {
3726        use crate::protocol::starts_with_ci;
3727        let trimmed = sql.trim();
3728
3729        // Write operations
3730        if starts_with_ci(trimmed, "INSERT")
3731            || starts_with_ci(trimmed, "UPDATE")
3732            || starts_with_ci(trimmed, "DELETE")
3733            || starts_with_ci(trimmed, "CREATE")
3734            || starts_with_ci(trimmed, "DROP")
3735            || starts_with_ci(trimmed, "ALTER")
3736            || starts_with_ci(trimmed, "TRUNCATE")
3737            || starts_with_ci(trimmed, "GRANT")
3738            || starts_with_ci(trimmed, "REVOKE")
3739            || starts_with_ci(trimmed, "VACUUM")
3740            || starts_with_ci(trimmed, "REINDEX")
3741            || starts_with_ci(trimmed, "CLUSTER")
3742        {
3743            return true;
3744        }
3745
3746        // Transaction control goes to current node
3747        if starts_with_ci(trimmed, "BEGIN")
3748            || starts_with_ci(trimmed, "START")
3749            || starts_with_ci(trimmed, "COMMIT")
3750            || starts_with_ci(trimmed, "ROLLBACK")
3751            || starts_with_ci(trimmed, "SAVEPOINT")
3752            || starts_with_ci(trimmed, "RELEASE")
3753        {
3754            return true;
3755        }
3756
3757        // SET commands go to primary to maintain session state
3758        if starts_with_ci(trimmed, "SET") && !starts_with_ci(trimmed, "SET TRANSACTION READ ONLY") {
3759            return true;
3760        }
3761
3762        false
3763    }
3764
3765    /// Derive the rate-limit bucket key for a session per the configured
3766    /// keying dimension.
3767    #[cfg(feature = "rate-limiting")]
3768    async fn rate_limit_key(
3769        session: &Arc<ClientSession>,
3770        config: &ProxyConfig,
3771    ) -> crate::rate_limit::LimiterKey {
3772        use crate::config::RateLimitKeyBy;
3773        use crate::rate_limit::LimiterKey;
3774        match config.rate_limit.key_by {
3775            RateLimitKeyBy::Global => LimiterKey::Global,
3776            RateLimitKeyBy::ClientIp => LimiterKey::ClientIp(session.client_addr.ip()),
3777            RateLimitKeyBy::Database => {
3778                let vars = session.variables.read().await;
3779                LimiterKey::Database(vars.get("database").cloned().unwrap_or_default())
3780            }
3781            RateLimitKeyBy::User => {
3782                let vars = session.variables.read().await;
3783                LimiterKey::User(vars.get("user").cloned().unwrap_or_default())
3784            }
3785        }
3786    }
3787
3788    /// Check rate limits before a query is forwarded. Returns `Some(bytes)` —
3789    /// a PG `ErrorResponse` WITHOUT a trailing `ReadyForQuery` (the caller
3790    /// appends one as the protocol requires) — when the query is denied; `None`
3791    /// when it may proceed. A throttle/queue verdict is honored by sleeping for
3792    /// the engine-supplied delay (real backpressure, capped) and then allowing.
3793    #[cfg(feature = "rate-limiting")]
3794    async fn rate_limit_check(
3795        session: &Arc<ClientSession>,
3796        state: &Arc<ServerState>,
3797        config: &ProxyConfig,
3798    ) -> Option<Vec<u8>> {
3799        use crate::rate_limit::RateLimitResult;
3800        let limiter = state.rate_limiter.as_ref()?;
3801        let key = Self::rate_limit_key(session, config).await;
3802        match limiter.check(&key, 1) {
3803            RateLimitResult::Allowed => None,
3804            RateLimitResult::Warned(msg) => {
3805                tracing::warn!(key = %key, reason = %msg, "rate limit warning");
3806                None
3807            }
3808            RateLimitResult::Throttled(d) | RateLimitResult::Queued(d) => {
3809                // Cap the backpressure sleep so a misconfiguration can't pin a
3810                // connection task indefinitely.
3811                tokio::time::sleep(d.min(Duration::from_secs(5))).await;
3812                None
3813            }
3814            RateLimitResult::Denied(exc) => {
3815                tracing::info!(key = %key, "rate limit exceeded");
3816                let msg = format!(
3817                    "rate limit exceeded: {} (retry after {}ms)",
3818                    exc.message,
3819                    exc.retry_after.as_millis()
3820                );
3821                Some(Self::create_error_response("53400", &msg))
3822            }
3823        }
3824    }
3825
3826    /// In-band failure feedback. When a query fails against a backend, demote
3827    /// that node's health *immediately* — a copy-on-write update of the shared
3828    /// health snapshot, the same structure the periodic health checker
3829    /// maintains — so routing stops sending work to a dead node within one
3830    /// query instead of waiting up to a full health-check interval (the
3831    /// ~`check_interval` blind window). The periodic checker restores the node
3832    /// on its next successful probe, so this only ever *accelerates* detection.
3833    ///
3834    /// True when `err` is evidence the backend itself is unhealthy — and so
3835    /// should demote it in-band (and trip its circuit breaker) — as opposed to a
3836    /// client-side problem or a merely slow but healthy query.
3837    ///
3838    /// Excluded (return `false`, no penalty):
3839    /// * `Client …` — a failed or timed-out client write is the client's fault.
3840    /// * `Backend read timeout` — a backend that emits no bytes within the
3841    ///   streaming read window is indistinguishable from a legitimately slow but
3842    ///   healthy query (large sort/aggregate, lock wait, bulk DML). Demoting the
3843    ///   whole node — cluster-wide, for every session, bypassing the configured
3844    ///   `failure_threshold` — over one slow query is a false positive; a
3845    ///   genuinely unresponsive-but-connected backend is still caught by the
3846    ///   periodic protocol-level health probe.
3847    ///
3848    /// Still faults (return `true`): a backend read/write *error* (reset, EOF,
3849    /// broken pipe), a backend *write* timeout (the backend is not draining its
3850    /// socket), and any connect-time failure.
3851    fn is_backend_fault(err: &str) -> bool {
3852        !err.contains("Client") && !err.contains("Backend read timeout")
3853    }
3854
3855    /// Errors that do not demote a backend are filtered via `is_backend_fault`:
3856    /// a client disconnecting mid-query, or one merely-slow query, must never
3857    /// take a healthy backend out of rotation for every session.
3858    fn note_backend_failure(state: &Arc<ServerState>, addr: &str, err: &str) {
3859        if !Self::is_backend_fault(err) {
3860            return;
3861        }
3862        // Serialize the read-modify-write of the shared health snapshot. ArcSwap
3863        // makes only the final pointer swap atomic; without this lock two
3864        // concurrent writers — in-band demotions for different nodes, or an
3865        // in-band demotion racing the periodic checker's full-map rebuild — can
3866        // each load the same snapshot and clobber the other's update (a lost
3867        // update that resurrects a demoted node, or evicts a recovered one,
3868        // until the next probe). The lock serializes writers only; every routing
3869        // read stays lock-free on the ArcSwap.
3870        let _writers = state.health_write.lock();
3871        let snapshot = state.health.load_full();
3872        // Only act (and pay the clone) when the node is currently marked
3873        // healthy — avoids churning the snapshot on an already-down node.
3874        if snapshot.get(addr).map(|h| h.healthy).unwrap_or(false) {
3875            let mut next = (*snapshot).clone();
3876            if let Some(nh) = next.get_mut(addr) {
3877                nh.healthy = false;
3878                nh.failure_count = nh.failure_count.saturating_add(1);
3879                nh.last_error = Some(format!("in-band failure: {}", err));
3880                tracing::warn!(
3881                    node = %addr,
3882                    error = %err,
3883                    "in-band failure — node marked unhealthy for fast failover"
3884                );
3885            }
3886            state.health.store(Arc::new(next));
3887        }
3888    }
3889
3890    /// Record a backend forward failure: demote the node's health in-band AND
3891    /// (when the feature is on) trip its circuit breaker — the single place the
3892    /// data path reports "this backend just failed". Both signals consult the
3893    /// same `is_backend_fault` classifier, so they can never drift apart: a
3894    /// client-side error or a slow-query read timeout penalizes neither.
3895    fn record_backend_failure(state: &Arc<ServerState>, node: &str, err: &str) {
3896        Self::note_backend_failure(state, node, err);
3897        #[cfg(feature = "circuit-breaker")]
3898        if Self::is_backend_fault(err) {
3899            Self::circuit_record(state, node, false, err);
3900        }
3901    }
3902
3903    /// True when `node`'s circuit is open (avoid it / fast-fail). A half-open
3904    /// circuit returns false so a probe query is admitted.
3905    #[cfg(feature = "circuit-breaker")]
3906    fn circuit_is_open(state: &Arc<ServerState>, node: &str) -> bool {
3907        state
3908            .circuit_breaker
3909            .as_ref()
3910            .map(|cb| {
3911                cb.get_breaker(node).get_state() == crate::circuit_breaker::CircuitState::Open
3912            })
3913            .unwrap_or(false)
3914    }
3915
3916    /// Record the outcome of a forward to `node` on its circuit breaker.
3917    #[cfg(feature = "circuit-breaker")]
3918    fn circuit_record(state: &Arc<ServerState>, node: &str, success: bool, err: &str) {
3919        if let Some(cb) = state.circuit_breaker.as_ref() {
3920            let breaker = cb.get_breaker(node);
3921            if success {
3922                breaker.record_success();
3923            } else {
3924                breaker.record_failure(err);
3925            }
3926        }
3927    }
3928
3929    /// If `node`'s circuit is open, build the fast-fail `ErrorResponse` (without
3930    /// a trailing `ReadyForQuery` — the caller appends one). `None` when the
3931    /// circuit is closed or half-open and the request may proceed.
3932    #[cfg(feature = "circuit-breaker")]
3933    fn circuit_fast_fail(state: &Arc<ServerState>, node: &str) -> Option<Vec<u8>> {
3934        if Self::circuit_is_open(state, node) {
3935            tracing::info!(node = %node, "circuit open — fast-failing");
3936            Some(Self::create_error_response(
3937                "08006",
3938                &format!("circuit open for node {node}: backend temporarily unavailable"),
3939            ))
3940        } else {
3941            None
3942        }
3943    }
3944
3945    /// Read-your-writes decision: should reads be pinned to the primary given
3946    /// the session's last write and the configured window? Pure for testing.
3947    #[cfg(feature = "lag-routing")]
3948    fn ryw_pins_primary(last_write: Option<std::time::Instant>, window_ms: u64) -> bool {
3949        window_ms > 0
3950            && last_write
3951                .map(|t| t.elapsed() < Duration::from_millis(window_ms))
3952                .unwrap_or(false)
3953    }
3954
3955    /// Lag-exclusion decision: should a standby be dropped from read routing
3956    /// given its measured lag and the configured ceiling? `max=0` disables
3957    /// exclusion; unknown lag (None) never excludes. Pure for testing.
3958    #[cfg(feature = "lag-routing")]
3959    fn lag_excludes_standby(lag_bytes: Option<u64>, max_lag_bytes: u64) -> bool {
3960        max_lag_bytes > 0 && lag_bytes.map(|l| l > max_lag_bytes).unwrap_or(false)
3961    }
3962
3963    /// Pure predicate: is `sql` a plain, deterministic SELECT safe to cache?
3964    /// (Not WITH/locking/volatile.) Transaction state is checked separately.
3965    #[cfg(feature = "query-cache")]
3966    fn is_cacheable_read_sql(sql: &str) -> bool {
3967        use crate::protocol::{contains_ci, starts_with_ci};
3968        let t = sql.trim_start();
3969        if !starts_with_ci(t, "SELECT") {
3970            return false;
3971        }
3972        if contains_ci(t, "FOR UPDATE") || contains_ci(t, "FOR SHARE") {
3973            return false;
3974        }
3975        // Non-deterministic reads must not be reused.
3976        const VOLATILE: [&str; 10] = [
3977            "now(",
3978            "current_timestamp",
3979            "current_date",
3980            "current_time",
3981            "clock_timestamp",
3982            "statement_timestamp",
3983            "random(",
3984            "nextval(",
3985            "uuid_generate",
3986            "gen_random_uuid",
3987        ];
3988        !VOLATILE.iter().any(|v| contains_ci(t, v))
3989    }
3990
3991    /// Decide whether a read query is safe to serve from / store in the cache,
3992    /// and build its `CacheContext`. Returns `None` for anything not a plain,
3993    /// deterministic, non-transactional SELECT.
3994    #[cfg(feature = "query-cache")]
3995    async fn cacheable_read_ctx(
3996        session: &Arc<ClientSession>,
3997        sql: &str,
3998    ) -> Option<crate::cache::CacheContext> {
3999        if !Self::is_cacheable_read_sql(sql) {
4000            return None;
4001        }
4002        // Never cache mid-transaction (visibility would be wrong).
4003        if session.tx_state.read().await.in_transaction {
4004            return None;
4005        }
4006        let (user, database) = {
4007            let vars = session.variables.read().await;
4008            (
4009                vars.get("user").cloned(),
4010                vars.get("database")
4011                    .cloned()
4012                    .unwrap_or_else(|| "default".to_string()),
4013            )
4014        };
4015        Some(crate::cache::CacheContext {
4016            database,
4017            user,
4018            branch: None,
4019            connection_id: Some(session.id.as_u64_pair().0),
4020        })
4021    }
4022
4023    /// Build a multi-tenancy `RequestContext` from the session's startup
4024    /// parameters (user, database, application_name, ...) so the configured
4025    /// identifier can resolve the tenant.
4026    #[cfg(feature = "multi-tenancy")]
4027    async fn tenant_request_ctx(
4028        session: &Arc<ClientSession>,
4029    ) -> crate::multi_tenancy::RequestContext {
4030        let vars = session.variables.read().await;
4031        crate::multi_tenancy::RequestContext {
4032            headers: vars.clone(),
4033            username: vars.get("user").cloned(),
4034            database: vars.get("database").cloned(),
4035            auth_token: None,
4036            sql_context: HashMap::new(),
4037            client_ip: Some(session.client_addr.ip().to_string()),
4038            connection_id: Some(session.id.as_u64_pair().0),
4039        }
4040    }
4041
4042    /// Journal a successful write statement (Transaction Replay). Each write is
4043    /// recorded as its own auto-commit transaction so the time-travel/failover
4044    /// replay engine can re-apply it onto a promoted primary or a staging
4045    /// target. Best-effort: journal errors never fail the client query.
4046    #[cfg(feature = "ha-tr")]
4047    async fn journal_write(state: &Arc<ServerState>, session: &Arc<ClientSession>, sql: &str) {
4048        let tx_id = uuid::Uuid::new_v4();
4049        let j = &state.transaction_journal;
4050        if j.begin_transaction(tx_id, session.id, crate::NodeId::new(), 0)
4051            .await
4052            .is_ok()
4053        {
4054            let _ = j
4055                .log_statement(tx_id, sql.to_string(), Vec::new(), None, None, 0)
4056                .await;
4057        }
4058    }
4059
4060    /// Record a forwarded query on the analytics engine (fingerprint, latency,
4061    /// slow-query log, pattern detection). No-op when analytics is disabled.
4062    #[cfg(feature = "query-analytics")]
4063    async fn record_analytics(
4064        state: &Arc<ServerState>,
4065        session: &Arc<ClientSession>,
4066        sql: &str,
4067        node: &str,
4068        duration: Duration,
4069        error: Option<String>,
4070    ) {
4071        let Some(analytics) = state.analytics.as_ref() else {
4072            return;
4073        };
4074        let (user, database) = {
4075            let vars = session.variables.read().await;
4076            (
4077                vars.get("user").cloned().unwrap_or_default(),
4078                vars.get("database").cloned().unwrap_or_default(),
4079            )
4080        };
4081        let mut exec = crate::analytics::QueryExecution::new(sql, duration);
4082        exec.user = user;
4083        exec.database = database;
4084        exec.client_ip = session.client_addr.ip().to_string();
4085        exec.node = node.to_string();
4086        exec.session_id = Some(session.id.to_string());
4087        exec.error = error;
4088        analytics.record(exec);
4089    }
4090
4091    /// Select primary node with write timeout during failover
4092    async fn select_primary_with_timeout(
4093        session: &Arc<ClientSession>,
4094        state: &Arc<ServerState>,
4095        config: &ProxyConfig,
4096    ) -> Result<String> {
4097        let timeout = config.write_timeout();
4098        let start = std::time::Instant::now();
4099        // Poll for the promoted primary fairly tightly so writes resume
4100        // quickly after a failover (was 500ms — a needless recovery floor).
4101        let check_interval = Duration::from_millis(100);
4102
4103        loop {
4104            // Try to find healthy primary
4105            let health = state.health.load_full();
4106            let primary = config
4107                .nodes
4108                .iter()
4109                .find(|n| n.role == NodeRole::Primary && n.enabled);
4110
4111            if let Some(primary_node) = primary {
4112                if let Some(node_health) = health.get(&primary_node.address()) {
4113                    if node_health.healthy {
4114                        // Update session's current node
4115                        let mut current = session.current_node.write().await;
4116                        *current = Some(primary_node.address());
4117                        return Ok(primary_node.address());
4118                    }
4119                }
4120            }
4121            drop(health);
4122
4123            // Check if timeout exceeded
4124            if start.elapsed() >= timeout {
4125                state.metrics.failovers.fetch_add(1, Ordering::Relaxed);
4126                return Err(ProxyError::NoHealthyNodes);
4127            }
4128
4129            tracing::warn!(
4130                "Primary unavailable, waiting for failover... ({:.1}s elapsed, {:.1}s timeout)",
4131                start.elapsed().as_secs_f64(),
4132                timeout.as_secs_f64()
4133            );
4134
4135            // Wait before retry
4136            tokio::time::sleep(check_interval).await;
4137        }
4138    }
4139
4140    /// Select node for read operations with load balancing
4141    async fn select_read_node(
4142        session: &Arc<ClientSession>,
4143        state: &Arc<ServerState>,
4144        config: &ProxyConfig,
4145    ) -> Result<String> {
4146        // If in transaction, stick to current node
4147        {
4148            let tx_state = session.tx_state.read().await;
4149            if tx_state.in_transaction {
4150                if let Some(node) = session.current_node.read().await.clone() {
4151                    return Ok(node);
4152                }
4153            }
4154        }
4155
4156        // Get healthy nodes (prefer standbys for reads)
4157        let health = state.health.load_full();
4158        let healthy_standbys: Vec<&NodeConfig> = config
4159            .nodes
4160            .iter()
4161            .filter(|n| {
4162                let base = n.enabled
4163                    && (n.role == NodeRole::Standby || n.role == NodeRole::ReadReplica)
4164                    && health.get(&n.address()).map(|h| h.healthy).unwrap_or(false);
4165                // Drop a standby whose circuit is open so reads avoid it.
4166                #[cfg(feature = "circuit-breaker")]
4167                let base = base && !Self::circuit_is_open(state, &n.address());
4168                // Drop a standby lagging beyond the configured byte threshold.
4169                #[cfg(feature = "lag-routing")]
4170                let base = base
4171                    && !Self::lag_excludes_standby(
4172                        health
4173                            .get(&n.address())
4174                            .and_then(|h| h.replication_lag_bytes),
4175                        config.lag_routing.max_lag_bytes,
4176                    );
4177                base
4178            })
4179            .collect();
4180
4181        if !healthy_standbys.is_empty() {
4182            // Round-robin across healthy standbys
4183            let ticket = state.lb_state.rr_counter.fetch_add(1, Ordering::Relaxed);
4184            let index = ticket as usize % healthy_standbys.len();
4185            let node_addr = healthy_standbys[index].address();
4186
4187            let mut current = session.current_node.write().await;
4188            *current = Some(node_addr.clone());
4189            return Ok(node_addr);
4190        }
4191
4192        // Fall back to primary if no healthy standbys
4193        Self::select_node(session, state, config).await
4194    }
4195
4196    /// Complete backend authentication by reading until ReadyForQuery
4197    /// This is used when switching backends - we don't forward auth to client
4198    async fn complete_backend_auth(backend: &mut TcpStream) -> Result<()> {
4199        let codec = ProtocolCodec::new();
4200        let mut buffer = BytesMut::with_capacity(4096);
4201        let mut read_buf = vec![0u8; 4096];
4202        let timeout = Duration::from_secs(10);
4203        let start = std::time::Instant::now();
4204
4205        loop {
4206            if start.elapsed() > timeout {
4207                return Err(ProxyError::Auth(
4208                    "Backend authentication timeout".to_string(),
4209                ));
4210            }
4211
4212            let n = tokio::time::timeout(Duration::from_secs(5), backend.read(&mut read_buf))
4213                .await
4214                .map_err(|_| ProxyError::Auth("Read timeout during backend auth".to_string()))?
4215                .map_err(|e| ProxyError::Network(format!("Backend auth read error: {}", e)))?;
4216
4217            if n == 0 {
4218                return Err(ProxyError::Connection(
4219                    "Backend closed during auth".to_string(),
4220                ));
4221            }
4222
4223            buffer.extend_from_slice(&read_buf[..n]);
4224
4225            // Decode (and consume) complete frames directly; returns
4226            // None when more data is needed.
4227            while let Some(msg) = codec.decode_message(&mut buffer)? {
4228                match msg.msg_type {
4229                    MessageType::ReadyForQuery => {
4230                        // Authentication complete
4231                        return Ok(());
4232                    }
4233                    MessageType::ErrorResponse => {
4234                        let err = ErrorResponse::parse(msg.payload)
4235                            .map(|e| e.message().unwrap_or("Unknown error").to_string())
4236                            .unwrap_or_else(|_| "Parse error".to_string());
4237                        return Err(ProxyError::Auth(err));
4238                    }
4239                    _ => {
4240                        // Continue reading (AuthRequest, ParameterStatus, BackendKeyData, etc.)
4241                    }
4242                }
4243            }
4244        }
4245    }
4246
4247    /// Create PostgreSQL error response message
4248    fn create_error_response(code: &str, message: &str) -> Vec<u8> {
4249        let mut fields = HashMap::new();
4250        fields.insert('S', "ERROR".to_string());
4251        fields.insert('V', "ERROR".to_string());
4252        fields.insert('C', code.to_string());
4253        fields.insert('M', message.to_string());
4254
4255        let err = ErrorResponse { fields };
4256        err.encode().encode().to_vec()
4257    }
4258
4259    /// Create a `ReadyForQuery` frame with the given transaction-status byte
4260    /// (`b'I'` = idle, `b'T'` = in transaction, `b'E'` = failed transaction).
4261    fn create_ready_for_query(status: u8) -> Vec<u8> {
4262        let mut payload = BytesMut::with_capacity(1);
4263        payload.put_u8(status);
4264        Message::new(MessageType::ReadyForQuery, payload)
4265            .encode()
4266            .to_vec()
4267    }
4268
4269    /// Synthesise a full PostgreSQL simple-query response from a cached
4270    /// payload produced by a plugin's `PreQueryResult::Cached`.
4271    ///
4272    /// # Payload format
4273    ///
4274    /// The plugin is expected to serialise a JSON document of the form:
4275    ///
4276    /// ```json
4277    /// {
4278    ///   "columns": [
4279    ///     {"name": "id",    "oid": 23},
4280    ///     {"name": "email", "oid": 25}
4281    ///   ],
4282    ///   "rows": [
4283    ///     ["1", "alice@example.com"],
4284    ///     ["2", null]
4285    ///   ]
4286    /// }
4287    /// ```
4288    ///
4289    /// `oid` is the PostgreSQL type OID (`23` = int4, `25` = text,
4290    /// `20` = int8, `16` = bool, `1184` = timestamptz, etc.). Row values
4291    /// are strings in text format; `null` encodes a SQL NULL. The type
4292    /// OID is advisory — pgwire clients accept `25` (text) universally
4293    /// and cast as needed.
4294    ///
4295    /// # Returned bytes
4296    ///
4297    /// One concatenated PostgreSQL wire response:
4298    ///
4299    /// ```text
4300    /// RowDescription (T) + DataRow (D) × N + CommandComplete (C: "SELECT N")
4301    ///                    + ReadyForQuery (Z: idle)
4302    /// ```
4303    ///
4304    /// Returns an error on malformed JSON; the caller falls back to
4305    /// backend forwarding.
4306    #[cfg(feature = "wasm-plugins")]
4307    fn synthesise_cached_response(bytes: &[u8]) -> Result<Vec<u8>> {
4308        use serde::Deserialize;
4309
4310        #[derive(Deserialize)]
4311        struct CachedPayload {
4312            columns: Vec<ColumnDef>,
4313            rows: Vec<Vec<Option<String>>>,
4314        }
4315
4316        #[derive(Deserialize)]
4317        struct ColumnDef {
4318            name: String,
4319            #[serde(default = "default_text_oid")]
4320            oid: u32,
4321        }
4322
4323        fn default_text_oid() -> u32 {
4324            25 // text
4325        }
4326
4327        let payload: CachedPayload = serde_json::from_slice(bytes)
4328            .map_err(|e| ProxyError::Protocol(format!("invalid cached payload JSON: {}", e)))?;
4329
4330        if payload.columns.is_empty() {
4331            return Err(ProxyError::Protocol(
4332                "cached payload must declare at least one column".to_string(),
4333            ));
4334        }
4335
4336        let mut reply = Vec::new();
4337
4338        // RowDescription (tag 'T')
4339        let mut rd = BytesMut::new();
4340        rd.put_u16(payload.columns.len() as u16);
4341        for col in &payload.columns {
4342            rd.extend_from_slice(col.name.as_bytes());
4343            rd.put_u8(0); // cstring terminator
4344            rd.put_i32(0); // tableOID (unknown)
4345            rd.put_i16(0); // columnNumber (unknown)
4346            rd.put_u32(col.oid);
4347            rd.put_i16(-1); // typeLen (unspecified)
4348            rd.put_i32(-1); // typeMod (unspecified)
4349            rd.put_i16(0); // format code: text
4350        }
4351        reply.extend_from_slice(&Message::new(MessageType::RowDescription, rd).encode());
4352
4353        // DataRow (tag 'D') per row
4354        let column_count = payload.columns.len();
4355        for row in &payload.rows {
4356            if row.len() != column_count {
4357                return Err(ProxyError::Protocol(format!(
4358                    "cached row has {} values but {} columns are declared",
4359                    row.len(),
4360                    column_count
4361                )));
4362            }
4363            let mut dr = BytesMut::new();
4364            dr.put_u16(row.len() as u16);
4365            for value in row {
4366                match value {
4367                    Some(s) => {
4368                        dr.put_i32(s.len() as i32);
4369                        dr.extend_from_slice(s.as_bytes());
4370                    }
4371                    None => {
4372                        dr.put_i32(-1); // NULL sentinel
4373                    }
4374                }
4375            }
4376            reply.extend_from_slice(&Message::new(MessageType::DataRow, dr).encode());
4377        }
4378
4379        // CommandComplete (tag 'C')
4380        let tag = format!("SELECT {}", payload.rows.len());
4381        let mut cc = BytesMut::new();
4382        cc.extend_from_slice(tag.as_bytes());
4383        cc.put_u8(0);
4384        reply.extend_from_slice(&Message::new(MessageType::CommandComplete, cc).encode());
4385
4386        // ReadyForQuery (tag 'Z', status 'I' idle)
4387        reply.extend_from_slice(&Self::create_ready_for_query(b'I'));
4388
4389        Ok(reply)
4390    }
4391
4392    /// Run the pre-query plugin hook on a client message.
4393    ///
4394    /// When the `wasm-plugins` feature is off, or the plugin manager has no
4395    /// loaded plugins, this is a zero-cost passthrough that returns the
4396    /// message untouched with `PreQueryAction::Forward`.
4397    ///
4398    /// Only simple-query (`MessageType::Query`) messages are inspected today.
4399    /// Extended-protocol messages (`Parse`/`Bind`/`Execute`) are passed
4400    /// through unchanged — a future task wires them in.
4401    fn apply_pre_query_hook(
4402        msg: Message,
4403        state: &Arc<ServerState>,
4404        session: &Arc<ClientSession>,
4405    ) -> (Message, PreQueryAction) {
4406        #[cfg(feature = "wasm-plugins")]
4407        {
4408            let pm = match state.plugin_manager.as_ref() {
4409                Some(pm) => pm,
4410                None => return (msg, PreQueryAction::Forward),
4411            };
4412
4413            if msg.msg_type != MessageType::Query {
4414                return (msg, PreQueryAction::Forward);
4415            }
4416
4417            // Zero plugins registered for this hook — skip the payload
4418            // clone, SQL parse, and context construction entirely.
4419            if !pm.has_hook(HookType::PreQuery) {
4420                return (msg, PreQueryAction::Forward);
4421            }
4422
4423            let query_msg = match QueryMessage::parse(msg.payload.clone()) {
4424                Ok(q) => q,
4425                Err(_) => return (msg, PreQueryAction::Forward),
4426            };
4427
4428            let ctx = Self::build_query_context(&query_msg.query, session);
4429
4430            match pm.execute_pre_query(&ctx) {
4431                PreQueryResult::Continue => (msg, PreQueryAction::Forward),
4432                PreQueryResult::Block(reason) => (msg, PreQueryAction::Block(reason)),
4433                PreQueryResult::Rewrite(new_sql) => {
4434                    let rewritten = QueryMessage { query: new_sql }.encode();
4435                    (rewritten, PreQueryAction::Forward)
4436                }
4437                PreQueryResult::Cached(bytes) => (msg, PreQueryAction::Cached(bytes)),
4438            }
4439        }
4440        #[cfg(not(feature = "wasm-plugins"))]
4441        {
4442            let _ = (state, session);
4443            (msg, PreQueryAction::Forward)
4444        }
4445    }
4446
4447    /// Feed the anomaly detector a per-query observation. Cheap —
4448    /// only the SQL-injection scan and the novel-fingerprint check
4449    /// are non-trivial, both well under a microsecond on
4450    /// representative queries. Returns nothing; detections land in
4451    /// the detector's ring buffer and are surfaced via /api/anomalies.
4452    #[cfg(feature = "anomaly-detection")]
4453    fn record_anomaly_observation(
4454        msg: &Message,
4455        state: &Arc<ServerState>,
4456        session: &Arc<ClientSession>,
4457    ) {
4458        if msg.msg_type != MessageType::Query {
4459            return;
4460        }
4461        // Borrow the SQL straight out of the payload — the message is
4462        // forwarded verbatim, so no deep copy of the frame is needed.
4463        if let Some(query) = crate::protocol::query_text(&msg.payload) {
4464            Self::record_anomaly_sql(query, state, session);
4465        }
4466    }
4467
4468    /// Feed one SQL statement to the anomaly detector. Shared by the
4469    /// simple-query path and the extended-protocol `Parse` path so
4470    /// prepared-statement traffic is observed too.
4471    #[cfg(feature = "anomaly-detection")]
4472    fn record_anomaly_sql(query: &str, state: &Arc<ServerState>, session: &Arc<ClientSession>) {
4473        // Tenant identifier is the most-specific known per-session
4474        // attribute the proxy can attribute traffic to. Multi-tenancy
4475        // sets `tenant_id` in `variables`; otherwise we fall back to
4476        // the client address. session.variables is a tokio RwLock but this
4477        // is a sync helper — try_read avoids an await; on contention we
4478        // fall back to the client IP, still a valid per-source identifier.
4479        let tenant = match session.variables.try_read() {
4480            Ok(vars) => vars
4481                .get("tenant_id")
4482                .or_else(|| vars.get("user"))
4483                .cloned()
4484                .unwrap_or_else(|| session.client_addr.ip().to_string()),
4485            Err(_) => session.client_addr.ip().to_string(),
4486        };
4487        let fingerprint = anomaly_fingerprint(query);
4488        let obs = crate::anomaly::QueryObservation {
4489            tenant,
4490            fingerprint,
4491            sql: query.to_string(),
4492            timestamp: std::time::Instant::now(),
4493        };
4494        for ev in state.anomaly_detector.record_query(&obs) {
4495            tracing::warn!(anomaly = ?ev, "anomaly detected");
4496        }
4497    }
4498
4499    /// Send the client a `Block`-outcome response: an error frame plus
4500    /// `ReadyForQuery` so the client's state machine returns to idle and
4501    /// the next query can be accepted.
4502    async fn send_block_response(
4503        stream: &mut ClientStream,
4504        reason: &str,
4505        state: &Arc<ServerState>,
4506    ) -> Result<()> {
4507        let err =
4508            Self::create_error_response("42000", &format!("Query blocked by plugin: {}", reason));
4509        stream
4510            .write_all(&err)
4511            .await
4512            .map_err(|e| ProxyError::Network(format!("Write error: {}", e)))?;
4513        let rfq = Self::create_ready_for_query(b'I');
4514        stream
4515            .write_all(&rfq)
4516            .await
4517            .map_err(|e| ProxyError::Network(format!("Write error: {}", e)))?;
4518        state
4519            .metrics
4520            .bytes_sent
4521            .fetch_add((err.len() + rfq.len()) as u64, Ordering::Relaxed);
4522        Ok(())
4523    }
4524
4525    /// Build a `QueryContext` for the plugin hook. Populated fields: `query`
4526    /// (verbatim), `is_read_only` (derived from SQL verb), and `hook_context`
4527    /// with the session id as `client_id`. `normalized` and `tables` are
4528    /// left as cheap stand-ins until the analytics normaliser is wired in
4529    /// (T0-d, unified context).
4530    #[cfg(feature = "wasm-plugins")]
4531    fn build_query_context(query: &str, session: &Arc<ClientSession>) -> QueryContext {
4532        let is_read_only = !Self::is_write_query(query);
4533        let hook_context = HookContext {
4534            client_id: Some(session.id.to_string()),
4535            ..HookContext::default()
4536        };
4537        QueryContext {
4538            query: query.to_string(),
4539            normalized: query.to_string(),
4540            tables: Vec::new(),
4541            is_read_only,
4542            hook_context,
4543        }
4544    }
4545
4546    /// Run the Authenticate plugin hook at startup. Called from
4547    /// `connect_and_authenticate` before any backend connection.
4548    ///
4549    /// Behaviour by `AuthResult`:
4550    /// * `Defer` — no plugin opinion; proceed with the default
4551    ///   PostgreSQL auth flow unchanged.
4552    /// * `Success(identity)` — store the identity on the session so
4553    ///   downstream plugins (masking, residency) can gate on roles /
4554    ///   tenant_id / claims. PostgreSQL backend auth still runs
4555    ///   normally afterwards (the plugin does not replace PG auth in
4556    ///   this iteration; that's a follow-up).
4557    /// * `Denied(reason)` — surfaces as `ProxyError::Auth`, which the
4558    ///   caller already handles by writing an ErrorResponse to the
4559    ///   client and closing the connection.
4560    ///
4561    /// The `AuthRequest` populated here carries username, database,
4562    /// and client IP from the PostgreSQL startup parameters. Password
4563    /// is deliberately `None` — PG protocol sends the password in
4564    /// response to the backend's challenge, not at startup, so
4565    /// password-aware plugin auth is a separate future task.
4566    async fn apply_authenticate_hook(
4567        _params: &HashMap<String, String>,
4568        _session: &Arc<ClientSession>,
4569        _state: &Arc<ServerState>,
4570    ) -> Result<()> {
4571        #[cfg(feature = "wasm-plugins")]
4572        {
4573            let pm = match _state.plugin_manager.as_ref() {
4574                Some(pm) => pm,
4575                None => return Ok(()),
4576            };
4577
4578            let request = PluginAuthRequest {
4579                headers: HashMap::new(),
4580                username: _params.get("user").cloned(),
4581                password: None,
4582                client_ip: _session.client_addr.ip().to_string(),
4583                database: _params.get("database").cloned(),
4584            };
4585
4586            match pm.execute_authenticate(&request) {
4587                AuthResult::Defer => Ok(()),
4588                AuthResult::Success(identity) => {
4589                    tracing::debug!(
4590                        user = %identity.username,
4591                        roles = ?identity.roles,
4592                        "plugin authenticated user"
4593                    );
4594                    *_session.plugin_identity.write().await = Some(identity);
4595                    Ok(())
4596                }
4597                AuthResult::Denied(reason) => {
4598                    tracing::info!(
4599                        reason = %reason,
4600                        client = %_session.client_addr,
4601                        user = ?_params.get("user"),
4602                        "plugin denied authentication"
4603                    );
4604                    Err(ProxyError::Auth(format!(
4605                        "authentication denied by plugin: {}",
4606                        reason
4607                    )))
4608                }
4609            }
4610        }
4611        #[cfg(not(feature = "wasm-plugins"))]
4612        {
4613            Ok(())
4614        }
4615    }
4616
4617    /// Run the Route plugin hook on a message. Only simple-query messages
4618    /// are inspected; other message types always return `None`.
4619    fn apply_route_hook(
4620        msg: &Message,
4621        state: &Arc<ServerState>,
4622        session: &Arc<ClientSession>,
4623    ) -> RouteOverride {
4624        #[cfg(feature = "wasm-plugins")]
4625        {
4626            let pm = match state.plugin_manager.as_ref() {
4627                Some(pm) => pm,
4628                None => return RouteOverride::None,
4629            };
4630            if msg.msg_type != MessageType::Query {
4631                return RouteOverride::None;
4632            }
4633            // Zero plugins registered for this hook — skip the payload
4634            // clone, SQL parse, and context construction entirely.
4635            if !pm.has_hook(HookType::Route) {
4636                return RouteOverride::None;
4637            }
4638            let query_msg = match QueryMessage::parse(msg.payload.clone()) {
4639                Ok(q) => q,
4640                Err(_) => return RouteOverride::None,
4641            };
4642            let ctx = Self::build_query_context(&query_msg.query, session);
4643            match pm.execute_route(&ctx) {
4644                RouteResult::Default => RouteOverride::None,
4645                RouteResult::Primary => RouteOverride::Primary,
4646                RouteResult::Standby => RouteOverride::Standby,
4647                RouteResult::Node(name) => RouteOverride::Node(name),
4648                RouteResult::Block(reason) => RouteOverride::Block(reason),
4649                RouteResult::Branch(name) => {
4650                    tracing::warn!(
4651                        branch = %name,
4652                        "Route hook returned Branch but branch routing is not yet wired — using default"
4653                    );
4654                    RouteOverride::None
4655                }
4656            }
4657        }
4658        #[cfg(not(feature = "wasm-plugins"))]
4659        {
4660            let _ = (msg, state, session);
4661            RouteOverride::None
4662        }
4663    }
4664
4665    /// Map parsed SQL-comment hints to a `RouteOverride`. Precedence:
4666    /// `node=` > `route=` > `consistency=strong`. Read-tier route targets
4667    /// (standby/sync/semisync/async/local) all map to the read path; `any`
4668    /// and `vector` impose no constraint. `lag=` / `consistency=bounded`
4669    /// freshness enforcement arrives with the lag-routing feature.
4670    #[cfg(feature = "routing-hints")]
4671    fn hint_to_override(hints: &crate::routing::ParsedHints) -> RouteOverride {
4672        use crate::routing::{ConsistencyLevel, RouteTarget};
4673        if let Some(node) = &hints.node {
4674            return RouteOverride::Node(node.clone());
4675        }
4676        if let Some(route) = hints.route {
4677            return match route {
4678                RouteTarget::Primary => RouteOverride::Primary,
4679                RouteTarget::Standby
4680                | RouteTarget::Sync
4681                | RouteTarget::SemiSync
4682                | RouteTarget::Async
4683                | RouteTarget::Local => RouteOverride::Standby,
4684                RouteTarget::Any | RouteTarget::Vector => RouteOverride::None,
4685            };
4686        }
4687        if hints.consistency == Some(ConsistencyLevel::Strong) {
4688            return RouteOverride::Primary;
4689        }
4690        RouteOverride::None
4691    }
4692
4693    /// Resolve the effective routing for a simple `Query` when the
4694    /// routing-hints feature is active. Returns `(override, is_write,
4695    /// forward_msg)`: the write flag is recomputed on the hint-stripped SQL so
4696    /// a leading hint comment never masks the verb, and `forward_msg` is a
4697    /// rebuilt `Query` (hint removed) when stripping is on. An explicit
4698    /// positional hint wins over a plugin route override; a plugin `Block` is
4699    /// handled by the caller before this runs.
4700    #[cfg(feature = "routing-hints")]
4701    fn resolve_simple_route(
4702        msg: &Message,
4703        plugin_override: RouteOverride,
4704        default_is_write: bool,
4705        state: &Arc<ServerState>,
4706    ) -> (RouteOverride, bool, Option<Message>) {
4707        let parser = match state.hint_parser.as_ref() {
4708            Some(p) => p,
4709            None => return (plugin_override, default_is_write, None),
4710        };
4711        let sql = match crate::protocol::query_text(&msg.payload) {
4712            Some(s) => s,
4713            None => return (plugin_override, default_is_write, None),
4714        };
4715        let hints = parser.parse(sql);
4716        if hints.is_empty() {
4717            return (plugin_override, default_is_write, None);
4718        }
4719        let stripped = parser.strip(sql);
4720        let is_write = Self::is_write_query(&stripped);
4721        let effective = match Self::hint_to_override(&hints) {
4722            RouteOverride::None => plugin_override,
4723            hint_override => hint_override,
4724        };
4725        let forward = if parser.strip_hints {
4726            Some(crate::protocol::QueryMessage { query: stripped }.encode())
4727        } else {
4728            None
4729        };
4730        (effective, is_write, forward)
4731    }
4732
4733    /// Resolve hint-driven routing for an extended-protocol batch from the
4734    /// first Parse's SQL. `Some((is_write, forced_node))` when hints are
4735    /// present (write flag computed on the stripped SQL), else `None` so the
4736    /// caller uses verb-based defaults. The hint comment is left in the
4737    /// forwarded `Parse` (a no-op SQL comment); rewriting the batch buffer is
4738    /// unnecessary for correctness.
4739    #[cfg(feature = "routing-hints")]
4740    fn extended_hint_route(state: &Arc<ServerState>, sql: &str) -> Option<(bool, Option<String>)> {
4741        let parser = state.hint_parser.as_ref()?;
4742        let hints = parser.parse(sql);
4743        if hints.is_empty() {
4744            return None;
4745        }
4746        let stripped = parser.strip(sql);
4747        let is_write = Self::is_write_query(&stripped);
4748        match Self::hint_to_override(&hints) {
4749            RouteOverride::Primary => Some((true, None)),
4750            RouteOverride::Standby => Some((false, None)),
4751            RouteOverride::Node(n) => Some((is_write, Some(n))),
4752            _ => Some((is_write, None)),
4753        }
4754    }
4755
4756    /// Fire post-query hooks after a message has been forwarded (or failed
4757    /// to forward). Best-effort; errors from individual plugins are logged
4758    /// by the plugin manager and never surface here.
4759    #[cfg(feature = "wasm-plugins")]
4760    fn fire_post_query_hook(
4761        msg: &Message,
4762        session: &Arc<ClientSession>,
4763        state: &Arc<ServerState>,
4764        result: &Result<(Option<String>, u64)>,
4765        elapsed: Duration,
4766    ) {
4767        let pm = match state.plugin_manager.as_ref() {
4768            Some(pm) => pm,
4769            None => return,
4770        };
4771        if msg.msg_type != MessageType::Query {
4772            return;
4773        }
4774        // Zero plugins registered for this hook — skip the payload
4775        // clone, SQL parse, and context construction entirely.
4776        if !pm.has_hook(HookType::PostQuery) {
4777            return;
4778        }
4779        let query_msg = match QueryMessage::parse(msg.payload.clone()) {
4780            Ok(q) => q,
4781            Err(_) => return,
4782        };
4783        let ctx = Self::build_query_context(&query_msg.query, session);
4784        let outcome = match result {
4785            Ok((node, bytes)) => PostQueryOutcome {
4786                success: true,
4787                target_node: node.clone(),
4788                elapsed_us: elapsed.as_micros() as u64,
4789                response_bytes: *bytes,
4790                error: None,
4791            },
4792            Err(e) => PostQueryOutcome {
4793                success: false,
4794                target_node: None,
4795                elapsed_us: elapsed.as_micros() as u64,
4796                response_bytes: 0,
4797                error: Some(e.to_string()),
4798            },
4799        };
4800        pm.execute_post_query(&ctx, &outcome);
4801    }
4802
4803    /// Select a backend node for the request
4804    /// Select a backend node for initial connection
4805    /// Prefers primary but falls back to standbys for read connections
4806    async fn select_node(
4807        session: &Arc<ClientSession>,
4808        state: &Arc<ServerState>,
4809        config: &ProxyConfig,
4810    ) -> Result<String> {
4811        // If in a transaction, stick to the current node
4812        {
4813            let tx_state = session.tx_state.read().await;
4814            if tx_state.in_transaction {
4815                if let Some(node) = session.current_node.read().await.clone() {
4816                    return Ok(node);
4817                }
4818            }
4819        }
4820
4821        // Get healthy nodes
4822        let health = state.health.load_full();
4823        let healthy_nodes: Vec<&NodeConfig> = config
4824            .nodes
4825            .iter()
4826            .filter(|n| n.enabled && health.get(&n.address()).map(|h| h.healthy).unwrap_or(false))
4827            .collect();
4828
4829        if healthy_nodes.is_empty() {
4830            return Err(ProxyError::NoHealthyNodes);
4831        }
4832
4833        // Try to find healthy primary first
4834        if let Some(primary) = healthy_nodes.iter().find(|n| n.role == NodeRole::Primary) {
4835            let node_addr = primary.address();
4836            let mut current = session.current_node.write().await;
4837            *current = Some(node_addr.clone());
4838            return Ok(node_addr);
4839        }
4840
4841        // Fall back to standby if primary is unavailable
4842        // (Initial connection will work, writes will use write timeout to wait for primary)
4843        if let Some(standby) = healthy_nodes.iter().find(|n| n.role == NodeRole::Standby) {
4844            tracing::warn!("Primary unavailable, connecting to standby for initial session");
4845            let node_addr = standby.address();
4846            let mut current = session.current_node.write().await;
4847            *current = Some(node_addr.clone());
4848            return Ok(node_addr);
4849        }
4850
4851        // No nodes available
4852        Err(ProxyError::NoHealthyNodes)
4853    }
4854
4855    /// Spawn health checker background task
4856    fn spawn_health_checker(&self) -> tokio::task::JoinHandle<()> {
4857        let state = self.state.clone();
4858        let mut shutdown_rx = self.shutdown_tx.subscribe();
4859
4860        tokio::spawn(async move {
4861            let mut interval = tokio::time::interval(std::time::Duration::from_secs(
4862                state.live_config.load().health.check_interval_secs,
4863            ));
4864
4865            loop {
4866                tokio::select! {
4867                    _ = interval.tick() => {
4868                        // Read the live config each tick so a SIGHUP that
4869                        // adds/removes nodes is checked on the next sweep.
4870                        let config = state.live_config.load_full();
4871                        Self::check_all_nodes(&state, &config).await;
4872                    }
4873                    _ = shutdown_rx.recv() => {
4874                        break;
4875                    }
4876                }
4877            }
4878        })
4879    }
4880
4881    /// Check health of all nodes.
4882    ///
4883    /// Probes run concurrently (one slow/unreachable node no longer delays
4884    /// detection on the others — lowers the failover-detection latency
4885    /// floor), then a single new health snapshot is published via ArcSwap so
4886    /// readers on the query path never block.
4887    async fn check_all_nodes(state: &Arc<ServerState>, config: &ProxyConfig) {
4888        // Probe every node in parallel (owned address + timeout so each
4889        // probe is 'static and runs on its own task).
4890        let timeout = Duration::from_secs(config.health.check_timeout_secs);
4891        let mut set = tokio::task::JoinSet::new();
4892        for node in &config.nodes {
4893            let addr = node.address();
4894            set.spawn(async move {
4895                let r = Self::check_node_addr(&addr, timeout).await;
4896                (addr, r)
4897            });
4898        }
4899        let mut results = Vec::with_capacity(config.nodes.len());
4900        while let Some(joined) = set.join_next().await {
4901            if let Ok(pair) = joined {
4902                results.push(pair);
4903            }
4904        }
4905
4906        // Clone-and-modify the current snapshot, then atomically swap it in.
4907        // Hold the write lock so a concurrent in-band demotion landing in this
4908        // load→store window (or a SIGHUP reconcile) cannot clobber, or be
4909        // clobbered by, this full-map rebuild. All node probing above already
4910        // completed; no await is held under the guard.
4911        let _writers = state.health_write.lock();
4912        let mut next = (*state.health.load_full()).clone();
4913        for (addr, result) in results {
4914            if let Some(node_health) = next.get_mut(&addr) {
4915                match result {
4916                    Ok(latency) => {
4917                        node_health.healthy = true;
4918                        node_health.failure_count = 0;
4919                        node_health.latency_ms = latency;
4920                        node_health.last_error = None;
4921                    }
4922                    Err(e) => {
4923                        node_health.failure_count += 1;
4924                        node_health.last_error = Some(e.to_string());
4925                        if node_health.failure_count >= config.health.failure_threshold {
4926                            node_health.healthy = false;
4927                            tracing::warn!(
4928                                "Node {} marked unhealthy after {} failures",
4929                                addr,
4930                                node_health.failure_count
4931                            );
4932                        }
4933                    }
4934                }
4935                node_health.last_check = chrono::Utc::now();
4936            }
4937        }
4938        state.health.store(Arc::new(next));
4939    }
4940
4941    /// Check health of a single node with a protocol-level liveness probe.
4942    ///
4943    /// A bare TCP connect is not enough: a wedged backend (postmaster stuck,
4944    /// out of backend slots, mid-crash-recovery) still *accepts* the socket but
4945    /// never processes the wire protocol, so a connect-only probe reports it
4946    /// healthy. Instead we connect, send a PostgreSQL `SSLRequest`, and require
4947    /// the postmaster to answer (`S`/`N`) within the timeout. The SSLRequest is
4948    /// auth-free and not logged, so it costs the backend essentially nothing,
4949    /// yet it proves the server is actually servicing the protocol. Returns the
4950    /// round-trip latency in milliseconds.
4951    async fn check_node_addr(addr: &str, timeout: Duration) -> Result<f64> {
4952        // length(8) + SSLRequest code 80877103 (0x04D2162F).
4953        const SSL_REQUEST: [u8; 8] = [0, 0, 0, 8, 0x04, 0xD2, 0x16, 0x2F];
4954        let start = std::time::Instant::now();
4955        let mut stream = tokio::time::timeout(timeout, TcpStream::connect(addr))
4956            .await
4957            .map_err(|_| ProxyError::HealthCheck(format!("Timeout connecting to {}", addr)))?
4958            .map_err(|e| {
4959                ProxyError::HealthCheck(format!("Failed to connect to {}: {}", addr, e))
4960            })?;
4961
4962        let probe = async {
4963            stream.write_all(&SSL_REQUEST).await?;
4964            let mut resp = [0u8; 1];
4965            stream.read_exact(&mut resp).await?;
4966            Ok::<u8, std::io::Error>(resp[0])
4967        };
4968        // Budget whatever time is left after the connect for the handshake.
4969        let remaining = timeout
4970            .saturating_sub(start.elapsed())
4971            .max(Duration::from_millis(1));
4972        let byte = tokio::time::timeout(remaining, probe)
4973            .await
4974            .map_err(|_| {
4975                ProxyError::HealthCheck(format!("{} did not answer protocol probe in time", addr))
4976            })?
4977            .map_err(|e| {
4978                ProxyError::HealthCheck(format!("{} protocol probe error: {}", addr, e))
4979            })?;
4980        // 'S' (TLS available) or 'N' (not) both prove the postmaster is live and
4981        // talking the protocol; anything else means a non-PostgreSQL listener.
4982        if byte != b'S' && byte != b'N' {
4983            return Err(ProxyError::HealthCheck(format!(
4984                "{} sent unexpected probe reply {:#x}",
4985                addr, byte
4986            )));
4987        }
4988        let latency = start.elapsed().as_secs_f64() * 1000.0;
4989        Ok(latency)
4990    }
4991
4992    /// Spawn pool manager background task
4993    fn spawn_pool_manager(&self) -> tokio::task::JoinHandle<()> {
4994        // Only referenced by the pool-modes eviction/cleanup arms below.
4995        #[cfg(feature = "pool-modes")]
4996        let state = self.state.clone();
4997        let mut shutdown_rx = self.shutdown_tx.subscribe();
4998
4999        tokio::spawn(async move {
5000            let mut interval = tokio::time::interval(Self::POOL_REAP_INTERVAL);
5001
5002            loop {
5003                tokio::select! {
5004                    _ = interval.tick() => {
5005                        // Evict idle connections from pool-modes manager
5006                        #[cfg(feature = "pool-modes")]
5007                        if let Some(ref pool_manager) = state.pool_manager {
5008                            pool_manager.evict_idle().await;
5009                            tracing::trace!("Pool-modes idle eviction completed");
5010                        }
5011                        // Reap data-path idle backend connections older than the
5012                        // configured idle timeout, so a connection the backend
5013                        // would close on its own idle timeout is never handed out
5014                        // stale and idle FDs are returned to the OS.
5015                        #[cfg(feature = "pool-modes")]
5016                        if let Some(ref backend_pool) = state.backend_pool {
5017                            let ttl = std::time::Duration::from_secs(
5018                                state.live_config.load().pool_mode.idle_timeout_secs,
5019                            );
5020                            // idle_timeout_secs = 0 means "no idle TTL" (the
5021                            // PgBouncer convention). Skip reaping entirely rather
5022                            // than reaping every parked connection each cycle
5023                            // (elapsed() < ZERO is always false → retain drops
5024                            // all), which would defeat connection reuse.
5025                            let n = if ttl.is_zero() {
5026                                0
5027                            } else {
5028                                backend_pool.reap_idle(ttl)
5029                            };
5030                            if n > 0 {
5031                                tracing::debug!(
5032                                    target: "helios::pool",
5033                                    reaped = n,
5034                                    idle_remaining = backend_pool.idle_count(),
5035                                    "reaped idle backend connections (TTL)"
5036                                );
5037                            }
5038                        }
5039                    }
5040                    _ = shutdown_rx.recv() => {
5041                        // Cleanup on shutdown
5042                        #[cfg(feature = "pool-modes")]
5043                        if let Some(ref pool_manager) = state.pool_manager {
5044                            pool_manager.close_all().await;
5045                            tracing::info!("Pool-modes manager closed all connections");
5046                        }
5047                        break;
5048                    }
5049                }
5050            }
5051        })
5052    }
5053
5054    /// Shutdown the server
5055    pub fn shutdown(&self) {
5056        let _ = self.shutdown_tx.send(());
5057    }
5058
5059    /// Get pool mode statistics (if pool-modes feature enabled)
5060    #[cfg(feature = "pool-modes")]
5061    pub async fn pool_mode_stats(&self) -> Option<PoolModeStatsSnapshot> {
5062        if let Some(ref pool_manager) = self.state.pool_manager {
5063            let stats = pool_manager.get_stats().await;
5064            let metrics = pool_manager.metrics().snapshot();
5065            let default_mode = pool_manager.default_mode();
5066
5067            // Calculate average lease duration across all modes
5068            let avg_lease_duration_ms = metrics
5069                .mode_stats
5070                .get(&default_mode)
5071                .map(|s| s.avg_lease_duration_ms as u64)
5072                .unwrap_or(0);
5073
5074            Some(PoolModeStatsSnapshot {
5075                mode: format!("{:?}", default_mode),
5076                total_connections: stats.total_connections,
5077                active_leases: stats.active_connections,
5078                idle_connections: stats.idle_connections,
5079                node_count: stats.node_count,
5080                acquires: metrics.acquires,
5081                releases: metrics.releases,
5082                acquire_failures: metrics.acquire_failures,
5083                acquire_timeouts: metrics.acquire_timeouts,
5084                transactions_completed: metrics.transactions_completed,
5085                statements_executed: metrics.statements_executed,
5086                avg_lease_duration_ms,
5087            })
5088        } else {
5089            None
5090        }
5091    }
5092
5093    /// Add a node to the pool manager (if pool-modes feature enabled)
5094    #[cfg(feature = "pool-modes")]
5095    pub async fn add_node_to_pool(&self, node: &NodeConfig) {
5096        if let Some(ref pool_manager) = self.state.pool_manager {
5097            let endpoint = NodeEndpoint::new(&node.host, node.port)
5098                .with_role(match node.role {
5099                    NodeRole::Primary => crate::NodeRole::Primary,
5100                    NodeRole::Standby => crate::NodeRole::Standby,
5101                    NodeRole::ReadReplica => crate::NodeRole::ReadReplica,
5102                })
5103                .with_weight(node.weight);
5104            pool_manager.add_node(&endpoint).await;
5105            tracing::info!("Added node {} to pool manager", node.address());
5106        }
5107    }
5108
5109    /// Get server metrics
5110    pub fn metrics(&self) -> ServerMetricsSnapshot {
5111        ServerMetricsSnapshot {
5112            connections_accepted: self
5113                .state
5114                .metrics
5115                .connections_accepted
5116                .load(Ordering::Relaxed),
5117            connections_closed: self
5118                .state
5119                .metrics
5120                .connections_closed
5121                .load(Ordering::Relaxed),
5122            queries_processed: self.state.metrics.queries_processed.load(Ordering::Relaxed),
5123            bytes_received: self.state.metrics.bytes_received.load(Ordering::Relaxed),
5124            bytes_sent: self.state.metrics.bytes_sent.load(Ordering::Relaxed),
5125            failovers: self.state.metrics.failovers.load(Ordering::Relaxed),
5126        }
5127    }
5128}
5129
5130/// Metrics snapshot for external consumption
5131#[derive(Debug, Clone)]
5132pub struct ServerMetricsSnapshot {
5133    pub connections_accepted: u64,
5134    pub connections_closed: u64,
5135    pub queries_processed: u64,
5136    pub bytes_received: u64,
5137    pub bytes_sent: u64,
5138    pub failovers: u64,
5139}
5140
5141/// Pool mode statistics snapshot (when pool-modes feature is enabled)
5142#[cfg(feature = "pool-modes")]
5143#[derive(Debug, Clone)]
5144pub struct PoolModeStatsSnapshot {
5145    /// Current pooling mode
5146    pub mode: String,
5147    /// Total connections across all pools
5148    pub total_connections: usize,
5149    /// Active (leased) connections
5150    pub active_leases: usize,
5151    /// Idle connections
5152    pub idle_connections: usize,
5153    /// Number of nodes in the pool
5154    pub node_count: usize,
5155    /// Total connection acquires
5156    pub acquires: u64,
5157    /// Total connection releases
5158    pub releases: u64,
5159    /// Failed acquire attempts
5160    pub acquire_failures: u64,
5161    /// Acquire timeouts
5162    pub acquire_timeouts: u64,
5163    /// Completed transactions (Transaction mode)
5164    pub transactions_completed: u64,
5165    /// Total statements executed
5166    pub statements_executed: u64,
5167    /// Average lease duration in milliseconds
5168    pub avg_lease_duration_ms: u64,
5169}
5170
5171#[cfg(test)]
5172mod tests {
5173    use super::*;
5174    use crate::config::{HealthConfig, LoadBalancerConfig, PoolConfig};
5175    #[cfg(not(feature = "wasm-plugins"))]
5176    use crate::protocol::QueryMessage;
5177
5178    fn test_config() -> ProxyConfig {
5179        let mut config = ProxyConfig::default();
5180        config.listen_address = "127.0.0.1:0".to_string();
5181        config.add_node("127.0.0.1:5432", "primary").unwrap();
5182        config
5183    }
5184
5185    #[test]
5186    fn test_server_creation() {
5187        let config = test_config();
5188        let server = ProxyServer::new(config);
5189        assert!(server.is_ok());
5190    }
5191
5192    #[test]
5193    fn is_backend_fault_excludes_client_and_slow_query_errors() {
5194        // Real backend faults — these must demote the node in-band.
5195        assert!(ProxyServer::is_backend_fault(
5196            "Backend read error: connection reset"
5197        ));
5198        assert!(ProxyServer::is_backend_fault(
5199            "Backend write error: broken pipe"
5200        ));
5201        assert!(ProxyServer::is_backend_fault("Backend write timeout"));
5202        assert!(ProxyServer::is_backend_fault(
5203            "Failed to connect to 127.0.0.1:5432: Connection refused"
5204        ));
5205        // Not backend faults — a client-side problem, or a merely slow but
5206        // healthy query, must NEVER take a backend out of rotation cluster-wide.
5207        assert!(!ProxyServer::is_backend_fault("Backend read timeout"));
5208        assert!(!ProxyServer::is_backend_fault("Client write timeout"));
5209        assert!(!ProxyServer::is_backend_fault(
5210            "Client write error: broken pipe"
5211        ));
5212        // A backend READ timeout is exempt, but a backend read ERROR is a fault.
5213        assert!(!ProxyServer::is_backend_fault("Backend read timeout"));
5214        assert!(ProxyServer::is_backend_fault(
5215            "Backend read error: timed out"
5216        ));
5217    }
5218
5219    #[test]
5220    fn test_hba_addr_matches() {
5221        use std::net::IpAddr;
5222        let v4 = |s: &str| s.parse::<IpAddr>().unwrap();
5223        // "all" matches everything
5224        assert!(ProxyServer::hba_addr_matches("all", v4("203.0.113.7")));
5225        // CIDR membership
5226        assert!(ProxyServer::hba_addr_matches("10.0.0.0/8", v4("10.1.2.3")));
5227        assert!(!ProxyServer::hba_addr_matches("10.0.0.0/8", v4("11.1.2.3")));
5228        assert!(ProxyServer::hba_addr_matches(
5229            "127.0.0.1/32",
5230            v4("127.0.0.1")
5231        ));
5232        assert!(!ProxyServer::hba_addr_matches(
5233            "127.0.0.1/32",
5234            v4("127.0.0.2")
5235        ));
5236        // bare IP exact match
5237        assert!(ProxyServer::hba_addr_matches(
5238            "192.168.1.1",
5239            v4("192.168.1.1")
5240        ));
5241        assert!(!ProxyServer::hba_addr_matches(
5242            "192.168.1.1",
5243            v4("192.168.1.2")
5244        ));
5245        // IPv6 CIDR + /0 catch-all
5246        assert!(ProxyServer::hba_addr_matches("::1/128", v4("::1")));
5247        assert!(ProxyServer::hba_addr_matches("0.0.0.0/0", v4("8.8.8.8")));
5248    }
5249
5250    #[test]
5251    fn test_hba_admits() {
5252        use crate::config::{HbaAction, HbaRule};
5253        use std::net::IpAddr;
5254        let ip: IpAddr = "10.0.0.5".parse().unwrap();
5255        // No rules -> admit all
5256        assert!(ProxyServer::hba_admits(&[], ip, "bench", "benchdb"));
5257        // Reject a specific user, allow others (default admit)
5258        let rules = vec![HbaRule {
5259            action: HbaAction::Reject,
5260            user: "bench".into(),
5261            database: "all".into(),
5262            address: "all".into(),
5263        }];
5264        assert!(!ProxyServer::hba_admits(&rules, ip, "bench", "benchdb"));
5265        assert!(ProxyServer::hba_admits(&rules, ip, "alice", "benchdb"));
5266        // First match wins: allow bench from 10/8, reject everything else
5267        let rules = vec![
5268            HbaRule {
5269                action: HbaAction::Allow,
5270                user: "bench".into(),
5271                database: "all".into(),
5272                address: "10.0.0.0/8".into(),
5273            },
5274            HbaRule {
5275                action: HbaAction::Reject,
5276                user: "all".into(),
5277                database: "all".into(),
5278                address: "all".into(),
5279            },
5280        ];
5281        assert!(ProxyServer::hba_admits(&rules, ip, "bench", "benchdb"));
5282        assert!(!ProxyServer::hba_admits(
5283            &rules,
5284            "192.168.0.1".parse().unwrap(),
5285            "bench",
5286            "benchdb"
5287        ));
5288        assert!(!ProxyServer::hba_admits(&rules, ip, "alice", "benchdb"));
5289    }
5290
5291    #[test]
5292    fn test_initial_metrics() {
5293        let config = test_config();
5294        let server = ProxyServer::new(config).unwrap();
5295        let metrics = server.metrics();
5296        assert_eq!(metrics.connections_accepted, 0);
5297        assert_eq!(metrics.queries_processed, 0);
5298    }
5299
5300    #[tokio::test]
5301    async fn test_session_creation() {
5302        let config = test_config();
5303        let server = ProxyServer::new(config).unwrap();
5304
5305        let sessions = server.state.sessions.read().await;
5306        assert!(sessions.is_empty());
5307    }
5308
5309    #[tokio::test]
5310    async fn test_node_health_initialization() {
5311        let config = test_config();
5312        let server = ProxyServer::new(config).unwrap();
5313
5314        let health = server.state.health.load_full();
5315        assert!(!health.is_empty());
5316
5317        for node_health in health.values() {
5318            assert!(node_health.healthy);
5319            assert_eq!(node_health.failure_count, 0);
5320        }
5321    }
5322
5323    /// Build a minimal `ClientSession` for plugin-hook unit tests.
5324    fn make_test_session() -> Arc<ClientSession> {
5325        Arc::new(ClientSession {
5326            id: Uuid::new_v4(),
5327            client_addr: "127.0.0.1:0".parse().unwrap(),
5328            current_node: RwLock::new(None),
5329            tx_state: RwLock::new(TransactionState::default()),
5330            variables: RwLock::new(HashMap::new()),
5331            created_at: chrono::Utc::now(),
5332            tr_mode: crate::config::TrMode::default(),
5333            #[cfg(feature = "lag-routing")]
5334            last_write_at: RwLock::new(None),
5335            #[cfg(feature = "pool-modes")]
5336            pool_client_id: crate::pool::lease::ClientId::default(),
5337            #[cfg(feature = "wasm-plugins")]
5338            plugin_identity: RwLock::new(None),
5339        })
5340    }
5341
5342    /// With no plugin manager attached, `apply_route_hook` must be a
5343    /// zero-cost `None` return so the default SQL-verb routing applies.
5344    /// Verifies the feature-gated early-return path.
5345    #[tokio::test]
5346    async fn test_apply_route_hook_no_plugin_manager_returns_none() {
5347        let config = test_config();
5348        let server = ProxyServer::new(config).unwrap();
5349        let session = make_test_session();
5350
5351        let msg = QueryMessage {
5352            query: "SELECT * FROM users".to_string(),
5353        }
5354        .encode();
5355
5356        let decision = ProxyServer::apply_route_hook(&msg, &server.state, &session);
5357        assert!(matches!(decision, RouteOverride::None));
5358    }
5359
5360    /// Same invariant for the pre-query hook: without a plugin manager,
5361    /// `apply_pre_query_hook` must return the message unchanged with
5362    /// `PreQueryAction::Forward`.
5363    #[tokio::test]
5364    async fn test_apply_pre_query_hook_no_plugin_manager_forwards() {
5365        let config = test_config();
5366        let server = ProxyServer::new(config).unwrap();
5367        let session = make_test_session();
5368
5369        let original = QueryMessage {
5370            query: "SELECT 1".to_string(),
5371        }
5372        .encode();
5373        let original_bytes = original.encode().to_vec();
5374
5375        let (msg_out, action) =
5376            ProxyServer::apply_pre_query_hook(original, &server.state, &session);
5377
5378        assert!(matches!(action, PreQueryAction::Forward));
5379        // The message must survive the hook byte-for-byte when no plugins run.
5380        assert_eq!(msg_out.encode().to_vec(), original_bytes);
5381    }
5382
5383    /// Non-Query message types (e.g., extended-protocol Parse/Execute) must
5384    /// bypass the Route hook entirely regardless of plugin state, because
5385    /// we haven't wired SQL extraction for those variants yet.
5386    #[tokio::test]
5387    async fn test_apply_route_hook_skips_non_query_messages() {
5388        let config = test_config();
5389        let server = ProxyServer::new(config).unwrap();
5390        let session = make_test_session();
5391
5392        let sync_msg = Message::empty(MessageType::Sync);
5393        let decision = ProxyServer::apply_route_hook(&sync_msg, &server.state, &session);
5394        assert!(matches!(decision, RouteOverride::None));
5395    }
5396
5397    /// By default, `[plugins].enabled = false`, so `init_plugin_manager`
5398    /// short-circuits without touching the filesystem or wasmtime and
5399    /// returns `None`. The proxy starts normally whether or not a plugin
5400    /// directory exists on the host.
5401    #[cfg(feature = "wasm-plugins")]
5402    #[test]
5403    fn test_init_plugin_manager_disabled_by_default_returns_none() {
5404        let config = test_config();
5405        assert!(!config.plugins.enabled);
5406        let pm = ProxyServer::init_plugin_manager(&config.plugins);
5407        assert!(pm.is_none());
5408    }
5409
5410    /// Plugins enabled but pointing at a directory that doesn't exist
5411    /// must still initialise the manager (so new plugins can be hot-
5412    /// loaded later) and log a warning — it must NOT fail startup.
5413    #[cfg(feature = "wasm-plugins")]
5414    #[test]
5415    fn test_init_plugin_manager_missing_dir_logs_warning() {
5416        let mut config = test_config();
5417        config.plugins.enabled = true;
5418        config.plugins.plugin_dir = "/definitely/not/a/real/path".to_string();
5419
5420        // Manager is created; no panic; Some(pm) returned even with empty dir.
5421        let pm = ProxyServer::init_plugin_manager(&config.plugins);
5422        assert!(pm.is_some());
5423    }
5424
5425    /// With no plugin manager attached, `apply_authenticate_hook` is a
5426    /// zero-cost `Ok(())` that leaves session identity unset — the
5427    /// default PG auth flow applies.
5428    #[tokio::test]
5429    async fn test_apply_authenticate_hook_no_plugin_manager_defers() {
5430        let config = test_config();
5431        let server = ProxyServer::new(config).unwrap();
5432        let session = make_test_session();
5433
5434        let mut params = HashMap::new();
5435        params.insert("user".to_string(), "alice".to_string());
5436        params.insert("database".to_string(), "app".to_string());
5437
5438        let result = ProxyServer::apply_authenticate_hook(&params, &session, &server.state).await;
5439        assert!(result.is_ok());
5440
5441        // No plugin → no identity stored.
5442        #[cfg(feature = "wasm-plugins")]
5443        {
5444            let ident = session.plugin_identity.read().await;
5445            assert!(ident.is_none());
5446        }
5447    }
5448
5449    /// Cached-response synthesis round-trip: a well-formed plugin
5450    /// payload must produce concatenated wire frames in the order
5451    /// `T D D C Z`. We inspect the raw tag bytes directly because
5452    /// `MessageType::from_tag` conflates server→client DataRow (`'D'`)
5453    /// with client→server Describe (same byte) — a known quirk of the
5454    /// shared `MessageType` enum that the real proxy side-steps by
5455    /// knowing the direction at the call site.
5456    #[cfg(feature = "wasm-plugins")]
5457    #[test]
5458    fn test_synthesise_cached_response_roundtrip() {
5459        let payload = br#"{
5460            "columns": [
5461                {"name": "id",    "oid": 23},
5462                {"name": "email", "oid": 25}
5463            ],
5464            "rows": [
5465                ["1", "alice@example.com"],
5466                ["2", null]
5467            ]
5468        }"#;
5469        let reply = ProxyServer::synthesise_cached_response(payload).expect("synthesis");
5470
5471        // Walk the concatenation frame-by-frame via length prefixes.
5472        // Each PG message: tag(1) + length(4, big-endian, includes self) + payload.
5473        let mut tags = Vec::new();
5474        let mut i = 0;
5475        while i < reply.len() {
5476            let tag = reply[i];
5477            let len = u32::from_be_bytes([reply[i + 1], reply[i + 2], reply[i + 3], reply[i + 4]])
5478                as usize;
5479            tags.push(tag);
5480            i += 1 + len;
5481        }
5482        assert_eq!(i, reply.len(), "no trailing bytes");
5483        assert_eq!(tags, vec![b'T', b'D', b'D', b'C', b'Z'], "wire frame order");
5484
5485        // Spot-check the final ReadyForQuery payload is 'I' (idle).
5486        assert_eq!(*reply.last().unwrap(), b'I');
5487    }
5488
5489    /// Row width mismatch between columns and row data is rejected so
5490    /// the plugin author can't produce ambiguous wire frames.
5491    #[cfg(feature = "wasm-plugins")]
5492    #[test]
5493    fn test_synthesise_cached_response_rejects_row_width_mismatch() {
5494        let payload = br#"{
5495            "columns": [{"name": "id", "oid": 23}, {"name": "name", "oid": 25}],
5496            "rows": [["1", "alice", "extra"]]
5497        }"#;
5498        let result = ProxyServer::synthesise_cached_response(payload);
5499        assert!(matches!(result, Err(ProxyError::Protocol(_))));
5500    }
5501
5502    /// Empty payload (no columns) is rejected — a RowDescription with
5503    /// zero columns is technically valid PG but useless and likely a
5504    /// plugin bug.
5505    #[cfg(feature = "wasm-plugins")]
5506    #[test]
5507    fn test_synthesise_cached_response_rejects_empty_columns() {
5508        let payload = br#"{ "columns": [], "rows": [] }"#;
5509        let result = ProxyServer::synthesise_cached_response(payload);
5510        assert!(matches!(result, Err(ProxyError::Protocol(_))));
5511    }
5512
5513    /// Malformed JSON must return a Protocol error, not panic. The
5514    /// caller treats this as "fall back to backend."
5515    #[cfg(feature = "wasm-plugins")]
5516    #[test]
5517    fn test_synthesise_cached_response_rejects_bad_json() {
5518        let payload = b"not json at all";
5519        let result = ProxyServer::synthesise_cached_response(payload);
5520        assert!(matches!(result, Err(ProxyError::Protocol(_))));
5521    }
5522
5523    /// Denied by plugin surfaces as `ProxyError::Auth` so the existing
5524    /// error-response path in `handle_client` writes an ErrorResponse
5525    /// and closes the connection. Here we prove the error variant
5526    /// when the plugin manager is present but denies. We build a
5527    /// PluginManager with no plugins loaded — so it defers — and
5528    /// verify the Ok path. (Denial path requires an actual
5529    /// auth-plugin `.wasm`; covered by the plugin unit tests in
5530    /// `plugins::tests`.)
5531    #[cfg(feature = "wasm-plugins")]
5532    #[tokio::test]
5533    async fn test_apply_authenticate_hook_with_manager_no_plugins_defers() {
5534        use crate::plugins::{PluginManager, PluginRuntimeConfig};
5535
5536        let config = test_config();
5537        let server = ProxyServer::new(config).unwrap();
5538        let session = make_test_session();
5539
5540        // Synthesise a state with a real PluginManager but zero
5541        // registered plugins — every hook must defer.
5542        let pm = Arc::new(PluginManager::new(PluginRuntimeConfig::default()).unwrap());
5543        let augmented_state = Arc::new(ServerState {
5544            sessions: RwLock::new(HashMap::new()),
5545            health: ArcSwap::from_pointee(HashMap::new()),
5546            health_write: parking_lot::Mutex::new(()),
5547            live_config: ArcSwap::from_pointee(ProxyConfig::default()),
5548            metrics: ServerMetrics::default(),
5549            cancel_map: Arc::new(DashMap::new()),
5550            cancel_order: Arc::new(parking_lot::Mutex::new(std::collections::VecDeque::new())),
5551            tls_acceptor: None,
5552            auth_file: None,
5553            mirror: None,
5554            cutover: Arc::new(ArcSwap::from_pointee(None)),
5555            lb_state: LoadBalancerState {
5556                rr_counter: AtomicU64::new(0),
5557            },
5558            #[cfg(feature = "routing-hints")]
5559            hint_parser: None,
5560            #[cfg(feature = "rate-limiting")]
5561            rate_limiter: None,
5562            #[cfg(feature = "circuit-breaker")]
5563            circuit_breaker: None,
5564            #[cfg(feature = "query-analytics")]
5565            analytics: None,
5566            #[cfg(feature = "query-cache")]
5567            query_cache: None,
5568            #[cfg(feature = "query-rewriting")]
5569            rewriter: None,
5570            #[cfg(feature = "multi-tenancy")]
5571            tenant_manager: None,
5572            #[cfg(feature = "schema-routing")]
5573            schema_analyzer: None,
5574            #[cfg(feature = "pool-modes")]
5575            pool_manager: None,
5576            #[cfg(feature = "pool-modes")]
5577            backend_pool: None,
5578            plugin_manager: Some(pm),
5579            #[cfg(feature = "ha-tr")]
5580            transaction_journal: Arc::new(crate::transaction_journal::TransactionJournal::new()),
5581            #[cfg(feature = "anomaly-detection")]
5582            anomaly_detector: Arc::new(crate::anomaly::AnomalyDetector::new(
5583                crate::anomaly::AnomalyConfig::default(),
5584            )),
5585            #[cfg(feature = "edge-proxy")]
5586            edge_cache: Arc::new(crate::edge::EdgeCache::new(10_000)),
5587            #[cfg(feature = "edge-proxy")]
5588            edge_registry: Arc::new(crate::edge::EdgeRegistry::new(
5589                32,
5590                std::time::Duration::from_secs(120),
5591            )),
5592        });
5593
5594        let mut params = HashMap::new();
5595        params.insert("user".to_string(), "alice".to_string());
5596
5597        let result =
5598            ProxyServer::apply_authenticate_hook(&params, &session, &augmented_state).await;
5599        assert!(result.is_ok());
5600        let ident = session.plugin_identity.read().await;
5601        assert!(ident.is_none());
5602        // Unused bindings for the sync-state build path.
5603        let _ = server;
5604    }
5605
5606    // ---- Batch F.4: prepared-statement tracking across backend switches ----
5607
5608    fn cstr(s: &str) -> Vec<u8> {
5609        let mut v = s.as_bytes().to_vec();
5610        v.push(0);
5611        v
5612    }
5613
5614    #[test]
5615    fn parse_stmt_name_extracts_named_and_unnamed() {
5616        // Parse payload = stmt-name cstring + query cstring + int16 nparams.
5617        let mut named = cstr("ps1");
5618        named.extend_from_slice(&cstr("SELECT 1"));
5619        named.extend_from_slice(&[0, 0]);
5620        assert_eq!(ProxyServer::parse_stmt_name(&named), "ps1");
5621
5622        let mut unnamed = cstr("");
5623        unnamed.extend_from_slice(&cstr("SELECT 1"));
5624        unnamed.extend_from_slice(&[0, 0]);
5625        assert_eq!(ProxyServer::parse_stmt_name(&unnamed), "");
5626    }
5627
5628    #[test]
5629    fn bind_stmt_ref_reads_second_cstring() {
5630        // Bind payload = portal cstring + statement cstring + ...
5631        let mut named = cstr("portal_a");
5632        named.extend_from_slice(&cstr("ps1"));
5633        named.extend_from_slice(&[0, 0]); // 0 param-format codes, 0 params
5634        assert_eq!(ProxyServer::bind_stmt_ref(&named), Some("ps1"));
5635
5636        // Unnamed statement (empty second cstring) is not tracked.
5637        let mut unnamed = cstr("");
5638        unnamed.extend_from_slice(&cstr(""));
5639        assert_eq!(ProxyServer::bind_stmt_ref(&unnamed), None);
5640    }
5641
5642    #[test]
5643    fn stmt_kind_name_only_matches_statement_kind() {
5644        // Describe/Close 'S' (statement) carries a trackable name.
5645        let mut stmt = vec![b'S'];
5646        stmt.extend_from_slice(&cstr("ps1"));
5647        assert_eq!(ProxyServer::stmt_kind_name(&stmt), Some("ps1"));
5648
5649        // 'P' (portal) is not a statement reference.
5650        let mut portal = vec![b'P'];
5651        portal.extend_from_slice(&cstr("portal_a"));
5652        assert_eq!(ProxyServer::stmt_kind_name(&portal), None);
5653
5654        // Statement-kind but unnamed -> nothing to track.
5655        let mut empty = vec![b'S'];
5656        empty.extend_from_slice(&cstr(""));
5657        assert_eq!(ProxyServer::stmt_kind_name(&empty), None);
5658    }
5659
5660    #[tokio::test]
5661    async fn read_one_frame_type_consumes_full_frame() {
5662        // ParseComplete '1' with empty body, followed by a second frame to
5663        // prove only the first frame is consumed.
5664        let (mut a, mut b) = tokio::io::duplex(64);
5665        // frame 1: '1' + len(4) + no body; frame 2: 'Z' + len(5) + 'I'.
5666        let bytes = [b'1', 0, 0, 0, 4, b'Z', 0, 0, 0, 5, b'I'];
5667        b.write_all(&bytes).await.unwrap();
5668        let t = ProxyServer::read_one_frame_type(&mut a).await.unwrap();
5669        assert_eq!(t, b'1');
5670        // The next frame's type byte is still readable -> we stopped cleanly.
5671        let t2 = ProxyServer::read_one_frame_type(&mut a).await.unwrap();
5672        assert_eq!(t2, b'Z');
5673    }
5674
5675    #[tokio::test]
5676    async fn reprepare_statement_accepts_parse_complete_and_rejects_error() {
5677        // Backend answers ParseComplete -> Ok.
5678        let (mut client, mut backend) = tokio::io::duplex(64);
5679        backend.write_all(&[b'1', 0, 0, 0, 4]).await.unwrap();
5680        let parse = {
5681            let mut p = vec![b'P', 0, 0, 0, 0];
5682            p.extend_from_slice(&cstr("ps1"));
5683            p.extend_from_slice(&cstr("SELECT 1"));
5684            p.extend_from_slice(&[0, 0]);
5685            p
5686        };
5687        assert!(ProxyServer::reprepare_statement(&mut client, &parse)
5688            .await
5689            .is_ok());
5690
5691        // Backend answers ErrorResponse -> Err.
5692        let (mut client2, mut backend2) = tokio::io::duplex(64);
5693        backend2.write_all(&[b'E', 0, 0, 0, 4]).await.unwrap();
5694        assert!(ProxyServer::reprepare_statement(&mut client2, &parse)
5695            .await
5696            .is_err());
5697    }
5698
5699    // ---- routing-hints: SQL-comment hint → RouteOverride mapping ----
5700
5701    #[cfg(feature = "routing-hints")]
5702    mod routing_hints {
5703        use super::*;
5704        use crate::routing::HintParser;
5705
5706        fn over(sql: &str) -> RouteOverride {
5707            let hints = HintParser::new().parse(sql);
5708            ProxyServer::hint_to_override(&hints)
5709        }
5710
5711        #[test]
5712        fn route_primary_maps_to_primary() {
5713            assert!(matches!(
5714                over("/*helios:route=primary*/ SELECT 1"),
5715                RouteOverride::Primary
5716            ));
5717        }
5718
5719        #[test]
5720        fn read_tier_targets_map_to_standby() {
5721            for t in ["standby", "sync", "semisync", "async", "local"] {
5722                assert!(
5723                    matches!(
5724                        over(&format!("/*helios:route={t}*/ SELECT 1")),
5725                        RouteOverride::Standby
5726                    ),
5727                    "route={t} should map to Standby"
5728                );
5729            }
5730        }
5731
5732        #[test]
5733        fn any_and_vector_impose_no_constraint() {
5734            assert!(matches!(
5735                over("/*helios:route=any*/ SELECT 1"),
5736                RouteOverride::None
5737            ));
5738            assert!(matches!(
5739                over("/*helios:route=vector*/ SELECT 1"),
5740                RouteOverride::None
5741            ));
5742        }
5743
5744        #[test]
5745        fn node_hint_maps_to_node_and_wins_over_route() {
5746            // node= beats route= (precedence).
5747            match over("/*helios:node=pg-standby,route=primary*/ SELECT 1") {
5748                RouteOverride::Node(n) => assert_eq!(n, "pg-standby"),
5749                other => panic!("expected Node, got {other:?}"),
5750            }
5751        }
5752
5753        #[test]
5754        fn consistency_strong_forces_primary() {
5755            assert!(matches!(
5756                over("/*helios:consistency=strong*/ SELECT 1"),
5757                RouteOverride::Primary
5758            ));
5759        }
5760
5761        #[test]
5762        fn no_hint_yields_none() {
5763            assert!(matches!(over("SELECT 1"), RouteOverride::None));
5764        }
5765
5766        // The core correctness fix: a leading hint comment must NOT hide the
5767        // verb from write-detection. Raw classification misfires; classifying
5768        // on the stripped SQL is correct.
5769        #[test]
5770        fn write_verb_classified_after_strip() {
5771            let parser = HintParser::new();
5772            let raw = "/*helios:route=primary*/ INSERT INTO t VALUES (1)";
5773            // Raw (unstripped) wrongly looks like a read because it starts
5774            // with the comment.
5775            assert!(!ProxyServer::is_write_query(raw));
5776            // Stripped is correctly a write.
5777            assert!(ProxyServer::is_write_query(&parser.strip(raw)));
5778        }
5779
5780        #[test]
5781        fn strip_removes_hint_comment() {
5782            let parser = HintParser::new();
5783            assert_eq!(
5784                parser.strip("/*helios:route=standby*/ SELECT 42"),
5785                "SELECT 42"
5786            );
5787        }
5788    }
5789
5790    // ---- rate-limiting: the burst-then-deny contract the gate relies on ----
5791
5792    #[cfg(feature = "rate-limiting")]
5793    mod rate_limiting {
5794        use crate::rate_limit::{LimiterKey, RateLimitConfig, RateLimitResult, RateLimiter};
5795
5796        #[test]
5797        fn burst_allows_then_denies() {
5798            // Mirror the wiring's config conversion: tiny bucket, reject on
5799            // exceed (the engine default).
5800            let cfg = RateLimitConfig {
5801                enabled: true,
5802                default_qps: 1,
5803                default_burst: 2,
5804                ..Default::default()
5805            };
5806            let limiter = RateLimiter::new(cfg);
5807            let key = LimiterKey::User("u".to_string());
5808
5809            // The first `burst` checks are admitted.
5810            assert!(matches!(limiter.check(&key, 1), RateLimitResult::Allowed));
5811            assert!(matches!(limiter.check(&key, 1), RateLimitResult::Allowed));
5812
5813            // Rapid over-burst checks must produce at least one hard denial.
5814            let mut denied = false;
5815            for _ in 0..5 {
5816                if matches!(limiter.check(&key, 1), RateLimitResult::Denied(_)) {
5817                    denied = true;
5818                }
5819            }
5820            assert!(denied, "over-burst checks must yield a Denied verdict");
5821        }
5822
5823        #[test]
5824        fn distinct_keys_have_independent_buckets() {
5825            let cfg = RateLimitConfig {
5826                enabled: true,
5827                default_qps: 1,
5828                default_burst: 1,
5829                ..Default::default()
5830            };
5831            let limiter = RateLimiter::new(cfg);
5832            // Each user gets its own bucket: both first checks are admitted.
5833            assert!(matches!(
5834                limiter.check(&LimiterKey::User("a".to_string()), 1),
5835                RateLimitResult::Allowed
5836            ));
5837            assert!(matches!(
5838                limiter.check(&LimiterKey::User("b".to_string()), 1),
5839                RateLimitResult::Allowed
5840            ));
5841        }
5842    }
5843
5844    // ---- circuit-breaker: open-after-threshold contract the gate relies on ----
5845
5846    #[cfg(feature = "circuit-breaker")]
5847    mod circuit_breaker {
5848        use crate::circuit_breaker::{
5849            CircuitBreakerConfig, CircuitBreakerManager, CircuitState, ManagerConfig,
5850        };
5851        use std::time::Duration;
5852
5853        fn mgr(threshold: u32) -> CircuitBreakerManager {
5854            let cfg = CircuitBreakerConfig {
5855                failure_threshold: threshold,
5856                cooldown: Duration::from_secs(10),
5857                ..Default::default()
5858            };
5859            CircuitBreakerManager::new(ManagerConfig::new(cfg))
5860        }
5861
5862        #[test]
5863        fn opens_after_threshold_failures() {
5864            let m = mgr(3);
5865            let b = m.get_breaker("n1");
5866            assert_eq!(b.get_state(), CircuitState::Closed);
5867            b.record_failure("boom");
5868            b.record_failure("boom");
5869            // Under threshold: still serving.
5870            assert_eq!(b.get_state(), CircuitState::Closed);
5871            // Threshold reached: tripped open.
5872            b.record_failure("boom");
5873            assert_eq!(b.get_state(), CircuitState::Open);
5874        }
5875
5876        #[test]
5877        fn healthy_node_stays_closed() {
5878            let m = mgr(3);
5879            let b = m.get_breaker("n2");
5880            b.record_success();
5881            b.record_success();
5882            assert_eq!(b.get_state(), CircuitState::Closed);
5883        }
5884    }
5885
5886    // ---- query-analytics: record + literal-collapsing normalizer ----
5887
5888    #[cfg(feature = "query-analytics")]
5889    mod query_analytics {
5890        use crate::analytics::{AnalyticsConfig, OrderBy, QueryAnalytics, QueryExecution};
5891        use std::time::Duration;
5892
5893        #[test]
5894        fn records_and_collapses_literals() {
5895            let a = QueryAnalytics::new(AnalyticsConfig::default());
5896            for n in [1, 2, 3] {
5897                a.record(QueryExecution::new(
5898                    format!("select {n}"),
5899                    Duration::from_millis(1),
5900                ));
5901            }
5902            let top = a.top_queries(OrderBy::Calls, 10);
5903            assert!(!top.is_empty(), "no fingerprints recorded");
5904            // The three literal variants collapse to one fingerprint (3 calls).
5905            assert!(
5906                top.iter().any(|s| s.calls >= 3),
5907                "literals did not collapse: {:?}",
5908                top.iter()
5909                    .map(|s| (s.normalized.clone(), s.calls))
5910                    .collect::<Vec<_>>()
5911            );
5912        }
5913    }
5914
5915    // ---- lag-routing: read-your-writes window + lag-exclusion decisions ----
5916
5917    #[cfg(feature = "lag-routing")]
5918    mod lag_routing {
5919        use super::ProxyServer;
5920
5921        #[test]
5922        fn ryw_pins_recent_write() {
5923            // A write "now" falls inside a 1s window -> pin to primary.
5924            assert!(ProxyServer::ryw_pins_primary(
5925                Some(std::time::Instant::now()),
5926                1000
5927            ));
5928        }
5929
5930        #[test]
5931        fn ryw_releases_old_write() {
5932            let old = std::time::Instant::now()
5933                .checked_sub(std::time::Duration::from_secs(10))
5934                .unwrap();
5935            assert!(!ProxyServer::ryw_pins_primary(Some(old), 1000));
5936        }
5937
5938        #[test]
5939        fn ryw_no_write_or_disabled() {
5940            assert!(!ProxyServer::ryw_pins_primary(None, 1000));
5941            // window=0 disables read-your-writes entirely.
5942            assert!(!ProxyServer::ryw_pins_primary(
5943                Some(std::time::Instant::now()),
5944                0
5945            ));
5946        }
5947
5948        #[test]
5949        fn lag_exclusion_thresholds() {
5950            // max=0 disables exclusion.
5951            assert!(!ProxyServer::lag_excludes_standby(Some(999_999), 0));
5952            // unknown lag never excludes.
5953            assert!(!ProxyServer::lag_excludes_standby(None, 1000));
5954            // within ceiling stays in rotation.
5955            assert!(!ProxyServer::lag_excludes_standby(Some(500), 1000));
5956            // beyond ceiling is dropped.
5957            assert!(ProxyServer::lag_excludes_standby(Some(2000), 1000));
5958        }
5959    }
5960
5961    // ---- query-cache: which read SQL is safe to cache ----
5962
5963    #[cfg(feature = "query-cache")]
5964    mod query_cache {
5965        use super::ProxyServer;
5966
5967        #[test]
5968        fn plain_selects_are_cacheable() {
5969            assert!(ProxyServer::is_cacheable_read_sql("select v from t"));
5970            assert!(ProxyServer::is_cacheable_read_sql(
5971                "  SELECT a, b FROM users WHERE id = 5"
5972            ));
5973        }
5974
5975        #[test]
5976        fn writes_and_non_selects_are_not_cacheable() {
5977            assert!(!ProxyServer::is_cacheable_read_sql(
5978                "insert into t values (1)"
5979            ));
5980            assert!(!ProxyServer::is_cacheable_read_sql("update t set v = 1"));
5981            assert!(!ProxyServer::is_cacheable_read_sql("show search_path"));
5982        }
5983
5984        #[test]
5985        fn locking_and_volatile_selects_are_not_cacheable() {
5986            assert!(!ProxyServer::is_cacheable_read_sql(
5987                "select * from t for update"
5988            ));
5989            assert!(!ProxyServer::is_cacheable_read_sql("select now()"));
5990            assert!(!ProxyServer::is_cacheable_read_sql("select random()"));
5991            assert!(!ProxyServer::is_cacheable_read_sql("select nextval('s')"));
5992        }
5993    }
5994
5995    // ---- query-rewriting: the rules-engine rewrite contract ----
5996
5997    #[cfg(feature = "query-rewriting")]
5998    mod query_rewriting {
5999        use crate::rewriter::{
6000            QueryPattern, QueryRewriter, RewriteRule, RewriterConfig, Transformation,
6001        };
6002
6003        fn rw_with_table_replace() -> QueryRewriter {
6004            let rw = QueryRewriter::new(RewriterConfig {
6005                enabled: true,
6006                ..Default::default()
6007            });
6008            rw.add_rule(
6009                RewriteRule::build("t")
6010                    .pattern(QueryPattern::Table("a".to_string()))
6011                    .transform(Transformation::ReplaceTable {
6012                        from: "a".to_string(),
6013                        to: "b".to_string(),
6014                    })
6015                    .build(),
6016            );
6017            rw
6018        }
6019
6020        #[test]
6021        fn matching_query_is_rewritten() {
6022            let res = rw_with_table_replace().rewrite("select * from a").unwrap();
6023            assert!(res.was_rewritten(), "rule did not fire");
6024            assert!(res.query().contains('b'), "rewritten: {}", res.query());
6025            assert!(
6026                !res.query().contains("from a"),
6027                "still references a: {}",
6028                res.query()
6029            );
6030        }
6031
6032        #[test]
6033        fn unmatched_query_is_unchanged() {
6034            let res = rw_with_table_replace()
6035                .rewrite("select * from other")
6036                .unwrap();
6037            assert!(!res.was_rewritten());
6038            assert_eq!(res.query(), "select * from other");
6039        }
6040    }
6041
6042    // ---- multi-tenancy: row-filter injection per tenant ----
6043
6044    #[cfg(feature = "multi-tenancy")]
6045    mod multi_tenancy {
6046        use crate::multi_tenancy::{
6047            IdentificationMethod, IsolationStrategy, MultiTenancyConfig, TenantConfig, TenantId,
6048            TenantManager, TenantManagerBuilder, TenantQueryTransformer,
6049        };
6050
6051        fn manager() -> TenantManager {
6052            let transformer = TenantQueryTransformer::new().register_tables(&["t"], "tid");
6053            let tm = TenantManagerBuilder::new()
6054                .config(MultiTenancyConfig {
6055                    enabled: true,
6056                    identification: IdentificationMethod::Header {
6057                        header_name: "application_name".to_string(),
6058                    },
6059                    ..Default::default()
6060                })
6061                .query_transformer(transformer)
6062                .build();
6063            tm.register_tenant(TenantConfig::new(
6064                TenantId::new("acme"),
6065                IsolationStrategy::row("public", "tid"),
6066            ));
6067            tm
6068        }
6069
6070        #[test]
6071        fn tenant_table_gets_filter() {
6072            let res = manager().transform_query("select * from t", &TenantId::new("acme"));
6073            assert!(res.transformed, "expected a tenant filter to be injected");
6074            let q = res.query.to_lowercase();
6075            assert!(
6076                q.contains("tid") && q.contains("acme"),
6077                "filter missing: {}",
6078                res.query
6079            );
6080        }
6081
6082        #[test]
6083        fn non_tenant_table_passes_through() {
6084            let res = manager().transform_query("select * from other", &TenantId::new("acme"));
6085            assert!(!res.transformed);
6086        }
6087    }
6088
6089    // ---- ha-tr: the journal records statements the replay engine reads ----
6090
6091    #[cfg(feature = "ha-tr")]
6092    mod ha_tr {
6093        use crate::transaction_journal::TransactionJournal;
6094        use crate::NodeId;
6095
6096        #[tokio::test]
6097        async fn journal_records_and_windows_a_statement() {
6098            let j = TransactionJournal::new();
6099            let from = chrono::Utc::now() - chrono::Duration::seconds(60);
6100            let tx = uuid::Uuid::new_v4();
6101            j.begin_transaction(tx, uuid::Uuid::new_v4(), NodeId::new(), 0)
6102                .await
6103                .unwrap();
6104            j.log_statement(
6105                tx,
6106                "insert into t values (1)".to_string(),
6107                Vec::new(),
6108                None,
6109                None,
6110                0,
6111            )
6112            .await
6113            .unwrap();
6114            let to = chrono::Utc::now() + chrono::Duration::seconds(60);
6115            let entries = j.entries_in_window(from, to).await;
6116            assert_eq!(entries.len(), 1, "journaled statement should be in window");
6117            assert!(entries[0].1.statement.contains("insert"));
6118        }
6119    }
6120
6121    // ---- schema-routing: OLAP vs OLTP workload classification ----
6122
6123    #[cfg(feature = "schema-routing")]
6124    mod schema_routing {
6125        use crate::schema_routing::{QueryAnalyzer, SchemaRegistry};
6126        use std::sync::Arc;
6127
6128        fn analyzer() -> QueryAnalyzer {
6129            QueryAnalyzer::new(Arc::new(SchemaRegistry::new()))
6130        }
6131
6132        #[test]
6133        fn aggregation_group_by_is_analytics() {
6134            let a = analyzer();
6135            assert!(a
6136                .analyze("select count(*) from orders group by region")
6137                .is_analytics());
6138        }
6139
6140        #[test]
6141        fn simple_point_query_is_not_analytics() {
6142            let a = analyzer();
6143            assert!(!a
6144                .analyze("select * from orders where id = 1")
6145                .is_analytics());
6146        }
6147    }
6148}