Skip to main content

heliosdb_proxy/
lib.rs

1//! HeliosDB Proxy - Standalone Connection Router
2//!
3//! A standalone proxy for HeliosDB-Lite providing:
4//! - Connection pooling
5//! - Load balancing (read/write splitting)
6//! - Health monitoring
7//! - Transaction Replay (TR)
8//!
9//! # Deployment Options
10//!
11//! - **Standalone binary**: Run as a separate process
12//! - **Kubernetes sidecar**: Deploy alongside your application
13//! - **Embedded library**: Use as a library in your application
14//!
15//! # Quick Start
16//!
17//! ```bash
18//! # Start with config file
19//! heliosdb-proxy --config /etc/heliosdb/proxy.toml
20//!
21//! # Start with command line options
22//! heliosdb-proxy \
23//!   --listen 0.0.0.0:5432 \
24//!   --primary db-primary:5432 \
25//!   --standby db-standby-1:5432 \
26//!   --standby db-standby-2:5432
27//! ```
28//!
29//! # Configuration Example
30//!
31//! ```toml
32//! [proxy]
33//! listen_address = "0.0.0.0:5432"
34//! admin_address = "0.0.0.0:9090"
35//!
36//! [pool]
37//! min_connections = 5
38//! max_connections = 100
39//! idle_timeout_secs = 300
40//!
41//! [load_balancer]
42//! strategy = "round_robin"  # or "least_connections", "latency_based"
43//! read_write_split = true
44//!
45//! [health]
46//! check_interval_secs = 5
47//! failure_threshold = 3
48//!
49//! [[nodes]]
50//! host = "db-primary"
51//! port = 5432
52//! role = "primary"
53//!
54//! [[nodes]]
55//! host = "db-standby-1"
56//! port = 5432
57//! role = "standby"
58//! ```
59
60// ── Core modules (always available) ──────────────────────────────────
61pub mod admin;
62pub mod agent_contract;
63pub mod auth_scram;
64pub mod backend;
65pub mod batch;
66pub mod branch;
67pub mod client_tls;
68pub mod config;
69pub mod connection_pool;
70pub mod failover_controller;
71pub mod health_checker;
72pub mod http_gateway;
73pub mod load_balancer;
74pub mod mcp;
75pub mod mirror;
76pub mod pipeline;
77pub mod plugin_registry;
78pub mod primary_tracker;
79pub mod protocol;
80pub mod request;
81pub mod server;
82pub mod switchover_buffer;
83
84// ── Connection pooling modes (Session/Transaction/Statement) ─────────
85#[cfg(feature = "pool-modes")]
86pub mod pool;
87
88// ── TR (Transaction Replay) modules ─────────────────────────────────
89#[cfg(feature = "ha-tr")]
90pub mod cursor_restore;
91#[cfg(feature = "ha-tr")]
92pub mod failover_replay;
93#[cfg(feature = "ha-tr")]
94pub mod replay;
95#[cfg(feature = "ha-tr")]
96pub mod session_migrate;
97#[cfg(feature = "ha-tr")]
98pub mod transaction_journal;
99
100// ── Zero-downtime PG major-version upgrade orchestrator (T2.1) ─────
101#[cfg(feature = "ha-tr")]
102pub mod upgrade_orchestrator;
103
104// ── R&D: shadow execution (T3.4) ────────────────────────────────────
105#[cfg(feature = "ha-tr")]
106pub mod shadow_execute;
107
108// ── Query caching (L1/L2/L3 multi-tier cache) ──────────────────────
109#[cfg(feature = "query-cache")]
110pub mod cache;
111
112// ── Query routing hints ─────────────────────────────────────────────
113#[cfg(feature = "routing-hints")]
114pub mod routing;
115
116// ── Replica lag-aware routing ───────────────────────────────────────
117#[cfg(feature = "lag-routing")]
118pub mod lag;
119
120// ── Rate limiting and query throttling ──────────────────────────────
121#[cfg(feature = "rate-limiting")]
122pub mod rate_limit;
123
124// ── Circuit breaker pattern ─────────────────────────────────────────
125#[cfg(feature = "circuit-breaker")]
126pub mod circuit_breaker;
127
128// ── Query analytics and slow query log ──────────────────────────────
129#[cfg(feature = "query-analytics")]
130pub mod analytics;
131
132// ── Anomaly detection (T3.1) — rate spikes, credential stuffing,
133// SQL injection heuristics, novel query shapes ─────────────────────
134#[cfg(feature = "anomaly-detection")]
135pub mod anomaly;
136
137// ── Edge / geo proxy mode (T3.2) ───────────────────────────────────
138#[cfg(feature = "edge-proxy")]
139pub mod edge;
140
141// ── Multi-tenancy support ───────────────────────────────────────────
142#[cfg(feature = "multi-tenancy")]
143pub mod multi_tenancy;
144
145// ── Authentication proxy ────────────────────────────────────────────
146#[cfg(feature = "auth-proxy")]
147pub mod auth;
148
149// ── Query rewriting ─────────────────────────────────────────────────
150#[cfg(feature = "query-rewriting")]
151pub mod rewriter;
152
153// ── WASM plugin system ──────────────────────────────────────────────
154#[cfg(feature = "wasm-plugins")]
155pub mod plugins;
156
157// ── GraphQL-to-SQL gateway ──────────────────────────────────────────
158#[cfg(feature = "graphql-gateway")]
159pub mod graphql;
160#[cfg(feature = "graphql-gateway")]
161pub mod graphql_gateway;
162
163// ── Schema-aware routing ────────────────────────────────────────────
164#[cfg(feature = "schema-routing")]
165pub mod schema_routing;
166
167// ── Distributed intelligent caching (DistribCache) ──────────────────
168#[cfg(feature = "distribcache")]
169pub mod distribcache;
170
171// ── Embedded skill-bundle deployer ──────────────────────────────────
172//
173// Always-on: the `heliosdb-proxy install skills` subcommand calls
174// into this. Adds ~80 KiB to the binary (the `.claude/skills/`
175// bundle, embedded by `include_dir!`).
176pub mod skills;
177
178use thiserror::Error;
179use uuid::Uuid;
180
181/// Proxy error types
182#[derive(Debug, Error)]
183pub enum ProxyError {
184    #[error("Configuration error: {0}")]
185    Config(String),
186
187    #[error("Network error: {0}")]
188    Network(String),
189
190    #[error("Connection error: {0}")]
191    Connection(String),
192
193    #[error("Protocol error: {0}")]
194    Protocol(String),
195
196    #[error("Pool error: {0}")]
197    Pool(String),
198
199    #[error("Health check error: {0}")]
200    HealthCheck(String),
201
202    #[error("Failover error: {0}")]
203    Failover(String),
204
205    #[error("Failover failed: {0}")]
206    FailoverFailed(String),
207
208    #[error("Transaction replay failed: {0}")]
209    ReplayFailed(String),
210
211    #[error("Session migration failed: {0}")]
212    SessionMigration(String),
213
214    #[error("Cursor restore failed: {0}")]
215    CursorRestore(String),
216
217    #[error("Routing error: {0}")]
218    Routing(String),
219
220    #[error("Authentication error: {0}")]
221    Auth(String),
222
223    #[error("Pool exhausted: {0}")]
224    PoolExhausted(String),
225
226    #[error("Timeout: {0}")]
227    Timeout(String),
228
229    #[error("Configuration error: {0}")]
230    Configuration(String),
231
232    #[error("No healthy nodes available")]
233    NoHealthyNodes,
234
235    #[error("IO error: {0}")]
236    Io(#[from] std::io::Error),
237
238    #[error("JSON error: {0}")]
239    Json(#[from] serde_json::Error),
240
241    #[error("Internal error: {0}")]
242    Internal(String),
243}
244
245pub type Result<T> = std::result::Result<T, ProxyError>;
246
247/// Proxy version
248pub const VERSION: &str = env!("CARGO_PKG_VERSION");
249
250/// Default listen port
251pub const DEFAULT_PORT: u16 = 5432;
252
253/// Default admin port
254pub const DEFAULT_ADMIN_PORT: u16 = 9090;
255
256/// Node identifier
257#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
258pub struct NodeId(pub Uuid);
259
260impl NodeId {
261    pub fn new() -> Self {
262        Self(Uuid::new_v4())
263    }
264}
265
266impl Default for NodeId {
267    fn default() -> Self {
268        Self::new()
269    }
270}
271
272/// Node role in the cluster
273#[derive(Debug, Clone, Copy, PartialEq, Eq)]
274pub enum NodeRole {
275    /// Primary node (accepts writes)
276    Primary,
277    /// Standby node (read-only, can be promoted)
278    Standby,
279    /// Read replica (read-only, cannot be promoted)
280    ReadReplica,
281    /// Unknown role (during discovery)
282    Unknown,
283}
284
285/// Node endpoint information
286#[derive(Debug, Clone)]
287pub struct NodeEndpoint {
288    /// Node identifier
289    pub id: NodeId,
290    /// Host address
291    pub host: String,
292    /// Port
293    pub port: u16,
294    /// Node role
295    pub role: NodeRole,
296    /// Weight for load balancing (higher = more traffic)
297    pub weight: u32,
298    /// Whether this node is enabled
299    pub enabled: bool,
300}
301
302impl NodeEndpoint {
303    pub fn new(host: impl Into<String>, port: u16) -> Self {
304        Self {
305            id: NodeId::new(),
306            host: host.into(),
307            port,
308            role: NodeRole::Unknown,
309            weight: 100,
310            enabled: true,
311        }
312    }
313
314    pub fn with_role(mut self, role: NodeRole) -> Self {
315        self.role = role;
316        self
317    }
318
319    pub fn with_weight(mut self, weight: u32) -> Self {
320        self.weight = weight;
321        self
322    }
323
324    pub fn address(&self) -> String {
325        format!("{}:{}", self.host, self.port)
326    }
327}
328
329// ── PostgreSQL compatibility integration test ───────────────────────
330//
331// Verifies that all feature modules can be instantiated and used in a
332// PostgreSQL-only context (no HeliosDB dependencies required).
333
334#[cfg(test)]
335mod postgresql_compat_tests {
336    use super::*;
337
338    /// All core types work for PostgreSQL endpoints.
339    #[test]
340    fn test_pg_node_endpoints() {
341        let primary =
342            NodeEndpoint::new("pg-primary.example.com", 5432).with_role(NodeRole::Primary);
343        let standby =
344            NodeEndpoint::new("pg-standby-1.example.com", 5432).with_role(NodeRole::Standby);
345        let replica = NodeEndpoint::new("pg-replica-1.example.com", 5433)
346            .with_role(NodeRole::ReadReplica)
347            .with_weight(50);
348
349        assert_eq!(primary.role, NodeRole::Primary);
350        assert_eq!(standby.role, NodeRole::Standby);
351        assert_eq!(replica.weight, 50);
352        assert_eq!(replica.address(), "pg-replica-1.example.com:5433");
353    }
354
355    /// Load balancer config works for PostgreSQL cluster with read/write splitting.
356    #[test]
357    fn test_pg_load_balancer_config() {
358        use load_balancer::*;
359
360        let config = LoadBalancerConfig {
361            read_write_split: true,
362            read_strategy: RoutingStrategy::RoundRobin,
363            write_strategy: RoutingStrategy::PrimaryOnly,
364            ..Default::default()
365        };
366
367        assert!(config.read_write_split);
368        assert_eq!(config.read_strategy, RoutingStrategy::RoundRobin);
369        assert_eq!(config.write_strategy, RoutingStrategy::PrimaryOnly);
370
371        // Verify the LB can be constructed
372        let _lb = LoadBalancer::new(config);
373    }
374
375    /// Health checker works with standard PostgreSQL `SELECT 1` checks.
376    #[test]
377    fn test_pg_health_config() {
378        use health_checker::*;
379
380        let config = HealthConfig {
381            check_query: "SELECT 1".to_string(),
382            detailed_checks: true,
383            ..Default::default()
384        };
385
386        assert_eq!(config.check_query, "SELECT 1");
387        assert!(config.detailed_checks);
388    }
389
390    /// Failover controller works for PostgreSQL streaming replication.
391    #[tokio::test]
392    async fn test_pg_failover() {
393        use failover_controller::*;
394
395        let controller = FailoverController::new(FailoverConfig {
396            auto_failover: true,
397            prefer_sync_standby: true,
398            ..Default::default()
399        });
400
401        let primary = NodeId::new();
402        controller.set_primary(primary).await;
403        assert_eq!(controller.get_primary().await, Some(primary));
404
405        // Register candidates
406        let sync_standby = NodeId::new();
407        controller
408            .register_candidate(FailoverCandidate {
409                node_id: sync_standby,
410                endpoint: NodeEndpoint::new("pg-sync", 5432).with_role(NodeRole::Standby),
411                is_sync: true,
412                lag_bytes: 0,
413                priority: 1,
414                last_heartbeat: None,
415            })
416            .await;
417
418        let async_standby = NodeId::new();
419        controller
420            .register_candidate(FailoverCandidate {
421                node_id: async_standby,
422                endpoint: NodeEndpoint::new("pg-async", 5432).with_role(NodeRole::Standby),
423                is_sync: false,
424                lag_bytes: 1024,
425                priority: 2,
426                last_heartbeat: None,
427            })
428            .await;
429
430        // Verify state
431        assert_eq!(controller.state().await, FailoverState::Normal);
432        assert_eq!(controller.failover_count(), 0);
433    }
434
435    /// Connection pool config works for PostgreSQL connections.
436    #[test]
437    fn test_pg_connection_pool() {
438        use connection_pool::*;
439
440        let config = PoolConfig {
441            min_connections: 2,
442            max_connections: 20,
443            test_on_acquire: true,
444            ..Default::default()
445        };
446
447        assert_eq!(config.min_connections, 2);
448        assert_eq!(config.max_connections, 20);
449        assert!(config.test_on_acquire);
450
451        let _pool = ConnectionPool::new(config);
452    }
453
454    /// Switchover buffer works for PostgreSQL planned switchover.
455    #[tokio::test]
456    async fn test_pg_switchover_buffer() {
457        use switchover_buffer::*;
458
459        let buffer = SwitchoverBuffer::new(BufferConfig {
460            buffer_timeout: std::time::Duration::from_secs(5),
461            max_buffered_queries: 1000,
462            ..Default::default()
463        });
464
465        assert_eq!(buffer.state(), BufferState::Passthrough);
466        assert!(!buffer.is_buffering());
467
468        // Simulate pg_ctl promote workflow
469        buffer.start_buffering();
470        assert!(buffer.is_buffering());
471
472        let rx = buffer
473            .buffer_query("INSERT INTO orders VALUES (1)".to_string(), vec![], 1)
474            .unwrap();
475
476        // Simulate promotion complete
477        buffer.stop_buffering();
478        buffer.drain(|_sql, _params| async { Ok(()) }).await;
479
480        let result = rx.await.unwrap();
481        assert!(matches!(result, BufferResult::Success));
482    }
483
484    /// Primary tracker works with the standalone mode for PostgreSQL.
485    #[test]
486    fn test_pg_primary_tracker_standalone() {
487        use primary_tracker::*;
488
489        let tracker = PrimaryTracker::new_standalone();
490
491        // Simulate discovering primary via pg_is_in_recovery()
492        let pg_primary = uuid::Uuid::new_v4();
493        tracker.set_primary(pg_primary, "pg-primary.local:5432".to_string());
494        tracker.confirm_primary();
495
496        assert!(tracker.has_primary());
497        assert!(tracker.get_primary().unwrap().is_confirmed);
498
499        // Simulate failover detected
500        tracker.clear_primary();
501        assert!(!tracker.has_primary());
502
503        // New primary
504        let pg_new_primary = uuid::Uuid::new_v4();
505        tracker.set_primary(pg_new_primary, "pg-standby.local:5432".to_string());
506        tracker.confirm_primary();
507        assert_eq!(
508            tracker.get_primary_address(),
509            Some("pg-standby.local:5432".to_string())
510        );
511    }
512
513    /// Transaction journal works for PostgreSQL transaction replay.
514    #[cfg(feature = "ha-tr")]
515    #[tokio::test]
516    async fn test_pg_transaction_replay() {
517        use transaction_journal::*;
518
519        let journal = TransactionJournal::new();
520        let tx_id = uuid::Uuid::new_v4();
521        let session_id = uuid::Uuid::new_v4();
522        let node = NodeId::new();
523
524        // Journal a PostgreSQL transaction
525        journal
526            .begin_transaction(tx_id, session_id, node, 0)
527            .await
528            .unwrap();
529        journal
530            .log_statement(tx_id, "BEGIN".to_string(), vec![], None, None, 1)
531            .await
532            .unwrap();
533        journal
534            .log_statement(
535                tx_id,
536                "INSERT INTO accounts (id, balance) VALUES ($1, $2)".to_string(),
537                vec![JournalValue::Int64(1), JournalValue::Float64(100.0)],
538                Some(12345),
539                Some(1),
540                5,
541            )
542            .await
543            .unwrap();
544        journal
545            .log_statement(
546                tx_id,
547                "UPDATE accounts SET balance = balance - $1 WHERE id = $2".to_string(),
548                vec![JournalValue::Float64(25.0), JournalValue::Int64(1)],
549                Some(67890),
550                Some(1),
551                3,
552            )
553            .await
554            .unwrap();
555
556        let j = journal.get_journal(&tx_id).await.unwrap();
557        assert_eq!(j.entries.len(), 3);
558        assert!(j.has_mutations);
559
560        // Verify statement types
561        assert_eq!(j.entries[0].statement_type, StatementType::Transaction);
562        assert_eq!(j.entries[1].statement_type, StatementType::Insert);
563        assert_eq!(j.entries[2].statement_type, StatementType::Update);
564
565        // Commit clears journal
566        journal.commit_transaction(tx_id).await.unwrap();
567        assert!(journal.get_journal(&tx_id).await.is_none());
568    }
569
570    /// Session migration works for PostgreSQL session parameters.
571    #[cfg(feature = "ha-tr")]
572    #[tokio::test]
573    async fn test_pg_session_migration() {
574        use session_migrate::*;
575
576        let migrate = SessionMigrate::new();
577        let session_id = uuid::Uuid::new_v4();
578        let node = NodeId::new();
579
580        let mut state =
581            SessionState::new(session_id, "postgres".to_string(), "mydb".to_string(), node);
582
583        // Set PostgreSQL-specific session parameters
584        state.set_parameter("timezone".to_string(), "America/New_York".to_string());
585        state.set_parameter("search_path".to_string(), "public, app_schema".to_string());
586        state.set_parameter("statement_timeout".to_string(), "30000".to_string());
587        state.set_parameter("work_mem".to_string(), "256MB".to_string());
588
589        // Add a prepared statement
590        state.add_prepared_statement(PreparedStatementInfo {
591            name: "get_user".to_string(),
592            query: "SELECT * FROM users WHERE id = $1".to_string(),
593            param_types: vec!["integer".to_string()],
594            created_at: chrono::Utc::now(),
595        });
596
597        migrate.register_session(state).await.unwrap();
598
599        // Generate SET statements for replay on new primary
600        let session = migrate.get_session(&session_id).await.unwrap();
601        let restore_stmts = session.generate_restore_statements();
602
603        assert!(restore_stmts.iter().any(|s| s.contains("America/New_York")));
604        assert!(restore_stmts.iter().any(|s| s.contains("search_path")));
605        assert!(restore_stmts
606            .iter()
607            .any(|s| s.contains("statement_timeout")));
608        assert!(restore_stmts.iter().any(|s| s.contains("PREPARE get_user")));
609    }
610
611    /// Pipeline supports PostgreSQL extended query protocol pipelining.
612    #[tokio::test]
613    async fn test_pg_pipelining() {
614        use pipeline::*;
615
616        let pipeline = RequestPipeline::new(PipelineConfig {
617            max_depth: 16,
618            enabled: true,
619            ..Default::default()
620        });
621
622        let conn_id = 1;
623
624        // Simulate pipelined Parse/Bind/Execute sequence
625        let t1 = pipeline
626            .submit(conn_id, b"Parse: SELECT $1::int".to_vec())
627            .unwrap();
628        let t2 = pipeline.submit(conn_id, b"Bind: [42]".to_vec()).unwrap();
629        let t3 = pipeline.submit(conn_id, b"Execute".to_vec()).unwrap();
630
631        assert_eq!(pipeline.depth(conn_id), 3);
632
633        // Complete in order (FIFO — matches PG protocol)
634        pipeline.complete_next(conn_id, b"ParseComplete".to_vec(), true, None);
635        pipeline.complete_next(conn_id, b"BindComplete".to_vec(), true, None);
636        pipeline.complete_next(conn_id, b"DataRow: 42".to_vec(), true, None);
637
638        assert_eq!(pipeline.depth(conn_id), 0);
639
640        let r1 = t1.wait().await.unwrap();
641        assert!(r1.success);
642    }
643
644    /// Batch INSERT works for PostgreSQL bulk inserts.
645    #[tokio::test]
646    async fn test_pg_batch_insert() {
647        use batch::*;
648
649        let config = BatchConfig {
650            max_batch_size: 3,
651            ..Default::default()
652        };
653        let batcher = std::sync::Arc::new(InsertBatcher::new(config));
654
655        batcher
656            .add(
657                "orders".to_string(),
658                vec!["id".to_string(), "total".to_string()],
659                vec![vec!["1".to_string(), "99.99".to_string()]],
660                "INSERT INTO orders (id, total) VALUES (1, 99.99)".to_string(),
661            )
662            .unwrap();
663
664        assert_eq!(batcher.batch_size("orders"), 1);
665
666        let stats = batcher.stats();
667        assert_eq!(stats.inserts_received, 1);
668        assert_eq!(stats.rows_received, 1);
669    }
670}