Skip to main content

heliosdb_proxy/
lib.rs

1//! HeliosDB Proxy - Standalone Connection Router
2//!
3//! A standalone proxy for HeliosDB-Lite providing:
4//! - Connection pooling
5//! - Load balancing (read/write splitting)
6//! - Health monitoring
7//! - Transaction Replay (TR)
8//!
9//! # Deployment Options
10//!
11//! - **Standalone binary**: Run as a separate process
12//! - **Kubernetes sidecar**: Deploy alongside your application
13//! - **Embedded library**: Use as a library in your application
14//!
15//! # Quick Start
16//!
17//! ```bash
18//! # Start with config file
19//! heliosdb-proxy --config /etc/heliosdb/proxy.toml
20//!
21//! # Start with command line options
22//! heliosdb-proxy \
23//!   --listen 0.0.0.0:5432 \
24//!   --primary db-primary:5432 \
25//!   --standby db-standby-1:5432 \
26//!   --standby db-standby-2:5432
27//! ```
28//!
29//! # Configuration Example
30//!
31//! ```toml
32//! [proxy]
33//! listen_address = "0.0.0.0:5432"
34//! admin_address = "0.0.0.0:9090"
35//!
36//! [pool]
37//! min_connections = 5
38//! max_connections = 100
39//! idle_timeout_secs = 300
40//!
41//! [load_balancer]
42//! strategy = "round_robin"  # or "least_connections", "latency_based"
43//! read_write_split = true
44//!
45//! [health]
46//! check_interval_secs = 5
47//! failure_threshold = 3
48//!
49//! [[nodes]]
50//! host = "db-primary"
51//! port = 5432
52//! role = "primary"
53//!
54//! [[nodes]]
55//! host = "db-standby-1"
56//! port = 5432
57//! role = "standby"
58//! ```
59
60// ── Core modules (always available) ──────────────────────────────────
61pub mod backend;
62pub mod client_tls;
63pub mod auth_scram;
64pub mod agent_contract;
65pub mod mcp;
66pub mod http_gateway;
67pub mod mirror;
68pub mod branch;
69pub mod plugin_registry;
70pub mod config;
71pub mod server;
72pub mod protocol;
73pub mod admin;
74pub mod connection_pool;
75pub mod load_balancer;
76pub mod health_checker;
77pub mod failover_controller;
78pub mod switchover_buffer;
79pub mod primary_tracker;
80pub mod pipeline;
81pub mod batch;
82pub mod request;
83
84// ── Connection pooling modes (Session/Transaction/Statement) ─────────
85#[cfg(feature = "pool-modes")]
86pub mod pool;
87
88// ── TR (Transaction Replay) modules ─────────────────────────────────
89#[cfg(feature = "ha-tr")]
90pub mod transaction_journal;
91#[cfg(feature = "ha-tr")]
92pub mod failover_replay;
93#[cfg(feature = "ha-tr")]
94pub mod cursor_restore;
95#[cfg(feature = "ha-tr")]
96pub mod session_migrate;
97#[cfg(feature = "ha-tr")]
98pub mod replay;
99
100// ── Zero-downtime PG major-version upgrade orchestrator (T2.1) ─────
101#[cfg(feature = "ha-tr")]
102pub mod upgrade_orchestrator;
103
104// ── R&D: shadow execution (T3.4) ────────────────────────────────────
105#[cfg(feature = "ha-tr")]
106pub mod shadow_execute;
107
108// ── Query caching (L1/L2/L3 multi-tier cache) ──────────────────────
109#[cfg(feature = "query-cache")]
110pub mod cache;
111
112// ── Query routing hints ─────────────────────────────────────────────
113#[cfg(feature = "routing-hints")]
114pub mod routing;
115
116// ── Replica lag-aware routing ───────────────────────────────────────
117#[cfg(feature = "lag-routing")]
118pub mod lag;
119
120// ── Rate limiting and query throttling ──────────────────────────────
121#[cfg(feature = "rate-limiting")]
122pub mod rate_limit;
123
124// ── Circuit breaker pattern ─────────────────────────────────────────
125#[cfg(feature = "circuit-breaker")]
126pub mod circuit_breaker;
127
128// ── Query analytics and slow query log ──────────────────────────────
129#[cfg(feature = "query-analytics")]
130pub mod analytics;
131
132// ── Anomaly detection (T3.1) — rate spikes, credential stuffing,
133// SQL injection heuristics, novel query shapes ─────────────────────
134#[cfg(feature = "anomaly-detection")]
135pub mod anomaly;
136
137// ── Edge / geo proxy mode (T3.2) ───────────────────────────────────
138#[cfg(feature = "edge-proxy")]
139pub mod edge;
140
141// ── Multi-tenancy support ───────────────────────────────────────────
142#[cfg(feature = "multi-tenancy")]
143pub mod multi_tenancy;
144
145// ── Authentication proxy ────────────────────────────────────────────
146#[cfg(feature = "auth-proxy")]
147pub mod auth;
148
149// ── Query rewriting ─────────────────────────────────────────────────
150#[cfg(feature = "query-rewriting")]
151pub mod rewriter;
152
153// ── WASM plugin system ──────────────────────────────────────────────
154#[cfg(feature = "wasm-plugins")]
155pub mod plugins;
156
157// ── GraphQL-to-SQL gateway ──────────────────────────────────────────
158#[cfg(feature = "graphql-gateway")]
159pub mod graphql;
160
161// ── Schema-aware routing ────────────────────────────────────────────
162#[cfg(feature = "schema-routing")]
163pub mod schema_routing;
164
165// ── Distributed intelligent caching (DistribCache) ──────────────────
166#[cfg(feature = "distribcache")]
167pub mod distribcache;
168
169// ── Embedded skill-bundle deployer ──────────────────────────────────
170//
171// Always-on: the `heliosdb-proxy install skills` subcommand calls
172// into this. Adds ~80 KiB to the binary (the `.claude/skills/`
173// bundle, embedded by `include_dir!`).
174pub mod skills;
175
176use thiserror::Error;
177use uuid::Uuid;
178
179/// Proxy error types
180#[derive(Debug, Error)]
181pub enum ProxyError {
182    #[error("Configuration error: {0}")]
183    Config(String),
184
185    #[error("Network error: {0}")]
186    Network(String),
187
188    #[error("Connection error: {0}")]
189    Connection(String),
190
191    #[error("Protocol error: {0}")]
192    Protocol(String),
193
194    #[error("Pool error: {0}")]
195    Pool(String),
196
197    #[error("Health check error: {0}")]
198    HealthCheck(String),
199
200    #[error("Failover error: {0}")]
201    Failover(String),
202
203    #[error("Failover failed: {0}")]
204    FailoverFailed(String),
205
206    #[error("Transaction replay failed: {0}")]
207    ReplayFailed(String),
208
209    #[error("Session migration failed: {0}")]
210    SessionMigration(String),
211
212    #[error("Cursor restore failed: {0}")]
213    CursorRestore(String),
214
215    #[error("Routing error: {0}")]
216    Routing(String),
217
218    #[error("Authentication error: {0}")]
219    Auth(String),
220
221    #[error("Pool exhausted: {0}")]
222    PoolExhausted(String),
223
224    #[error("Timeout: {0}")]
225    Timeout(String),
226
227    #[error("Configuration error: {0}")]
228    Configuration(String),
229
230    #[error("No healthy nodes available")]
231    NoHealthyNodes,
232
233    #[error("IO error: {0}")]
234    Io(#[from] std::io::Error),
235
236    #[error("JSON error: {0}")]
237    Json(#[from] serde_json::Error),
238
239    #[error("Internal error: {0}")]
240    Internal(String),
241}
242
243pub type Result<T> = std::result::Result<T, ProxyError>;
244
245/// Proxy version
246pub const VERSION: &str = env!("CARGO_PKG_VERSION");
247
248/// Default listen port
249pub const DEFAULT_PORT: u16 = 5432;
250
251/// Default admin port
252pub const DEFAULT_ADMIN_PORT: u16 = 9090;
253
254/// Node identifier
255#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
256pub struct NodeId(pub Uuid);
257
258impl NodeId {
259    pub fn new() -> Self {
260        Self(Uuid::new_v4())
261    }
262}
263
264impl Default for NodeId {
265    fn default() -> Self {
266        Self::new()
267    }
268}
269
270/// Node role in the cluster
271#[derive(Debug, Clone, Copy, PartialEq, Eq)]
272pub enum NodeRole {
273    /// Primary node (accepts writes)
274    Primary,
275    /// Standby node (read-only, can be promoted)
276    Standby,
277    /// Read replica (read-only, cannot be promoted)
278    ReadReplica,
279    /// Unknown role (during discovery)
280    Unknown,
281}
282
283/// Node endpoint information
284#[derive(Debug, Clone)]
285pub struct NodeEndpoint {
286    /// Node identifier
287    pub id: NodeId,
288    /// Host address
289    pub host: String,
290    /// Port
291    pub port: u16,
292    /// Node role
293    pub role: NodeRole,
294    /// Weight for load balancing (higher = more traffic)
295    pub weight: u32,
296    /// Whether this node is enabled
297    pub enabled: bool,
298}
299
300impl NodeEndpoint {
301    pub fn new(host: impl Into<String>, port: u16) -> Self {
302        Self {
303            id: NodeId::new(),
304            host: host.into(),
305            port,
306            role: NodeRole::Unknown,
307            weight: 100,
308            enabled: true,
309        }
310    }
311
312    pub fn with_role(mut self, role: NodeRole) -> Self {
313        self.role = role;
314        self
315    }
316
317    pub fn with_weight(mut self, weight: u32) -> Self {
318        self.weight = weight;
319        self
320    }
321
322    pub fn address(&self) -> String {
323        format!("{}:{}", self.host, self.port)
324    }
325}
326
327// ── PostgreSQL compatibility integration test ───────────────────────
328//
329// Verifies that all feature modules can be instantiated and used in a
330// PostgreSQL-only context (no HeliosDB dependencies required).
331
332#[cfg(test)]
333mod postgresql_compat_tests {
334    use super::*;
335
336    /// All core types work for PostgreSQL endpoints.
337    #[test]
338    fn test_pg_node_endpoints() {
339        let primary = NodeEndpoint::new("pg-primary.example.com", 5432)
340            .with_role(NodeRole::Primary);
341        let standby = NodeEndpoint::new("pg-standby-1.example.com", 5432)
342            .with_role(NodeRole::Standby);
343        let replica = NodeEndpoint::new("pg-replica-1.example.com", 5433)
344            .with_role(NodeRole::ReadReplica)
345            .with_weight(50);
346
347        assert_eq!(primary.role, NodeRole::Primary);
348        assert_eq!(standby.role, NodeRole::Standby);
349        assert_eq!(replica.weight, 50);
350        assert_eq!(replica.address(), "pg-replica-1.example.com:5433");
351    }
352
353    /// Load balancer config works for PostgreSQL cluster with read/write splitting.
354    #[test]
355    fn test_pg_load_balancer_config() {
356        use load_balancer::*;
357
358        let config = LoadBalancerConfig {
359            read_write_split: true,
360            read_strategy: RoutingStrategy::RoundRobin,
361            write_strategy: RoutingStrategy::PrimaryOnly,
362            ..Default::default()
363        };
364
365        assert!(config.read_write_split);
366        assert_eq!(config.read_strategy, RoutingStrategy::RoundRobin);
367        assert_eq!(config.write_strategy, RoutingStrategy::PrimaryOnly);
368
369        // Verify the LB can be constructed
370        let _lb = LoadBalancer::new(config);
371    }
372
373    /// Health checker works with standard PostgreSQL `SELECT 1` checks.
374    #[test]
375    fn test_pg_health_config() {
376        use health_checker::*;
377
378        let config = HealthConfig {
379            check_query: "SELECT 1".to_string(),
380            detailed_checks: true,
381            ..Default::default()
382        };
383
384        assert_eq!(config.check_query, "SELECT 1");
385        assert!(config.detailed_checks);
386    }
387
388    /// Failover controller works for PostgreSQL streaming replication.
389    #[tokio::test]
390    async fn test_pg_failover() {
391        use failover_controller::*;
392
393        let controller = FailoverController::new(FailoverConfig {
394            auto_failover: true,
395            prefer_sync_standby: true,
396            ..Default::default()
397        });
398
399        let primary = NodeId::new();
400        controller.set_primary(primary).await;
401        assert_eq!(controller.get_primary().await, Some(primary));
402
403        // Register candidates
404        let sync_standby = NodeId::new();
405        controller.register_candidate(FailoverCandidate {
406            node_id: sync_standby,
407            endpoint: NodeEndpoint::new("pg-sync", 5432).with_role(NodeRole::Standby),
408            is_sync: true,
409            lag_bytes: 0,
410            priority: 1,
411            last_heartbeat: None,
412        }).await;
413
414        let async_standby = NodeId::new();
415        controller.register_candidate(FailoverCandidate {
416            node_id: async_standby,
417            endpoint: NodeEndpoint::new("pg-async", 5432).with_role(NodeRole::Standby),
418            is_sync: false,
419            lag_bytes: 1024,
420            priority: 2,
421            last_heartbeat: None,
422        }).await;
423
424        // Verify state
425        assert_eq!(controller.state().await, FailoverState::Normal);
426        assert_eq!(controller.failover_count(), 0);
427    }
428
429    /// Connection pool config works for PostgreSQL connections.
430    #[test]
431    fn test_pg_connection_pool() {
432        use connection_pool::*;
433
434        let config = PoolConfig {
435            min_connections: 2,
436            max_connections: 20,
437            test_on_acquire: true,
438            ..Default::default()
439        };
440
441        assert_eq!(config.min_connections, 2);
442        assert_eq!(config.max_connections, 20);
443        assert!(config.test_on_acquire);
444
445        let _pool = ConnectionPool::new(config);
446    }
447
448    /// Switchover buffer works for PostgreSQL planned switchover.
449    #[tokio::test]
450    async fn test_pg_switchover_buffer() {
451        use switchover_buffer::*;
452
453        let buffer = SwitchoverBuffer::new(BufferConfig {
454            buffer_timeout: std::time::Duration::from_secs(5),
455            max_buffered_queries: 1000,
456            ..Default::default()
457        });
458
459        assert_eq!(buffer.state(), BufferState::Passthrough);
460        assert!(!buffer.is_buffering());
461
462        // Simulate pg_ctl promote workflow
463        buffer.start_buffering();
464        assert!(buffer.is_buffering());
465
466        let rx = buffer.buffer_query(
467            "INSERT INTO orders VALUES (1)".to_string(),
468            vec![],
469            1,
470        ).unwrap();
471
472        // Simulate promotion complete
473        buffer.stop_buffering();
474        buffer.drain(|_sql, _params| async { Ok(()) }).await;
475
476        let result = rx.await.unwrap();
477        assert!(matches!(result, BufferResult::Success));
478    }
479
480    /// Primary tracker works with the standalone mode for PostgreSQL.
481    #[test]
482    fn test_pg_primary_tracker_standalone() {
483        use primary_tracker::*;
484
485        let tracker = PrimaryTracker::new_standalone();
486
487        // Simulate discovering primary via pg_is_in_recovery()
488        let pg_primary = uuid::Uuid::new_v4();
489        tracker.set_primary(pg_primary, "pg-primary.local:5432".to_string());
490        tracker.confirm_primary();
491
492        assert!(tracker.has_primary());
493        assert!(tracker.get_primary().unwrap().is_confirmed);
494
495        // Simulate failover detected
496        tracker.clear_primary();
497        assert!(!tracker.has_primary());
498
499        // New primary
500        let pg_new_primary = uuid::Uuid::new_v4();
501        tracker.set_primary(pg_new_primary, "pg-standby.local:5432".to_string());
502        tracker.confirm_primary();
503        assert_eq!(
504            tracker.get_primary_address(),
505            Some("pg-standby.local:5432".to_string())
506        );
507    }
508
509    /// Transaction journal works for PostgreSQL transaction replay.
510    #[cfg(feature = "ha-tr")]
511    #[tokio::test]
512    async fn test_pg_transaction_replay() {
513        use transaction_journal::*;
514
515        let journal = TransactionJournal::new();
516        let tx_id = uuid::Uuid::new_v4();
517        let session_id = uuid::Uuid::new_v4();
518        let node = NodeId::new();
519
520        // Journal a PostgreSQL transaction
521        journal.begin_transaction(tx_id, session_id, node, 0).await.unwrap();
522        journal.log_statement(
523            tx_id,
524            "BEGIN".to_string(),
525            vec![],
526            None,
527            None,
528            1,
529        ).await.unwrap();
530        journal.log_statement(
531            tx_id,
532            "INSERT INTO accounts (id, balance) VALUES ($1, $2)".to_string(),
533            vec![JournalValue::Int64(1), JournalValue::Float64(100.0)],
534            Some(12345),
535            Some(1),
536            5,
537        ).await.unwrap();
538        journal.log_statement(
539            tx_id,
540            "UPDATE accounts SET balance = balance - $1 WHERE id = $2".to_string(),
541            vec![JournalValue::Float64(25.0), JournalValue::Int64(1)],
542            Some(67890),
543            Some(1),
544            3,
545        ).await.unwrap();
546
547        let j = journal.get_journal(&tx_id).await.unwrap();
548        assert_eq!(j.entries.len(), 3);
549        assert!(j.has_mutations);
550
551        // Verify statement types
552        assert_eq!(j.entries[0].statement_type, StatementType::Transaction);
553        assert_eq!(j.entries[1].statement_type, StatementType::Insert);
554        assert_eq!(j.entries[2].statement_type, StatementType::Update);
555
556        // Commit clears journal
557        journal.commit_transaction(tx_id).await.unwrap();
558        assert!(journal.get_journal(&tx_id).await.is_none());
559    }
560
561    /// Session migration works for PostgreSQL session parameters.
562    #[cfg(feature = "ha-tr")]
563    #[tokio::test]
564    async fn test_pg_session_migration() {
565        use session_migrate::*;
566
567        let migrate = SessionMigrate::new();
568        let session_id = uuid::Uuid::new_v4();
569        let node = NodeId::new();
570
571        let mut state = SessionState::new(
572            session_id,
573            "postgres".to_string(),
574            "mydb".to_string(),
575            node,
576        );
577
578        // Set PostgreSQL-specific session parameters
579        state.set_parameter("timezone".to_string(), "America/New_York".to_string());
580        state.set_parameter("search_path".to_string(), "public, app_schema".to_string());
581        state.set_parameter("statement_timeout".to_string(), "30000".to_string());
582        state.set_parameter("work_mem".to_string(), "256MB".to_string());
583
584        // Add a prepared statement
585        state.add_prepared_statement(PreparedStatementInfo {
586            name: "get_user".to_string(),
587            query: "SELECT * FROM users WHERE id = $1".to_string(),
588            param_types: vec!["integer".to_string()],
589            created_at: chrono::Utc::now(),
590        });
591
592        migrate.register_session(state).await.unwrap();
593
594        // Generate SET statements for replay on new primary
595        let session = migrate.get_session(&session_id).await.unwrap();
596        let restore_stmts = session.generate_restore_statements();
597
598        assert!(restore_stmts.iter().any(|s| s.contains("America/New_York")));
599        assert!(restore_stmts.iter().any(|s| s.contains("search_path")));
600        assert!(restore_stmts.iter().any(|s| s.contains("statement_timeout")));
601        assert!(restore_stmts.iter().any(|s| s.contains("PREPARE get_user")));
602    }
603
604    /// Pipeline supports PostgreSQL extended query protocol pipelining.
605    #[tokio::test]
606    async fn test_pg_pipelining() {
607        use pipeline::*;
608
609        let pipeline = RequestPipeline::new(PipelineConfig {
610            max_depth: 16,
611            enabled: true,
612            ..Default::default()
613        });
614
615        let conn_id = 1;
616
617        // Simulate pipelined Parse/Bind/Execute sequence
618        let t1 = pipeline.submit(conn_id, b"Parse: SELECT $1::int".to_vec()).unwrap();
619        let t2 = pipeline.submit(conn_id, b"Bind: [42]".to_vec()).unwrap();
620        let t3 = pipeline.submit(conn_id, b"Execute".to_vec()).unwrap();
621
622        assert_eq!(pipeline.depth(conn_id), 3);
623
624        // Complete in order (FIFO — matches PG protocol)
625        pipeline.complete_next(conn_id, b"ParseComplete".to_vec(), true, None);
626        pipeline.complete_next(conn_id, b"BindComplete".to_vec(), true, None);
627        pipeline.complete_next(conn_id, b"DataRow: 42".to_vec(), true, None);
628
629        assert_eq!(pipeline.depth(conn_id), 0);
630
631        let r1 = t1.wait().await.unwrap();
632        assert!(r1.success);
633    }
634
635    /// Batch INSERT works for PostgreSQL bulk inserts.
636    #[tokio::test]
637    async fn test_pg_batch_insert() {
638        use batch::*;
639
640        let config = BatchConfig {
641            max_batch_size: 3,
642            ..Default::default()
643        };
644        let batcher = InsertBatcher::new(config);
645
646        batcher.add(
647            "orders".to_string(),
648            vec!["id".to_string(), "total".to_string()],
649            vec![vec!["1".to_string(), "99.99".to_string()]],
650            "INSERT INTO orders (id, total) VALUES (1, 99.99)".to_string(),
651        ).unwrap();
652
653        assert_eq!(batcher.batch_size("orders"), 1);
654
655        let stats = batcher.stats();
656        assert_eq!(stats.inserts_received, 1);
657        assert_eq!(stats.rows_received, 1);
658    }
659}