Skip to main content

heliosdb_proxy/
lib.rs

1//! HeliosDB Proxy - Standalone Connection Router
2//!
3//! A standalone proxy for HeliosDB-Lite providing:
4//! - Connection pooling
5//! - Load balancing (read/write splitting)
6//! - Health monitoring
7//! - Transaction Replay (TR)
8//!
9//! # Deployment Options
10//!
11//! - **Standalone binary**: Run as a separate process
12//! - **Kubernetes sidecar**: Deploy alongside your application
13//! - **Embedded library**: Use as a library in your application
14//!
15//! # Quick Start
16//!
17//! ```bash
18//! # Start with config file
19//! heliosdb-proxy --config /etc/heliosdb/proxy.toml
20//!
21//! # Start with command line options
22//! heliosdb-proxy \
23//!   --listen 0.0.0.0:5432 \
24//!   --primary db-primary:5432 \
25//!   --standby db-standby-1:5432 \
26//!   --standby db-standby-2:5432
27//! ```
28//!
29//! # Configuration Example
30//!
31//! ```toml
32//! [proxy]
33//! listen_address = "0.0.0.0:5432"
34//! admin_address = "0.0.0.0:9090"
35//!
36//! [pool]
37//! min_connections = 5
38//! max_connections = 100
39//! idle_timeout_secs = 300
40//!
41//! [load_balancer]
42//! strategy = "round_robin"  # or "least_connections", "latency_based"
43//! read_write_split = true
44//!
45//! [health]
46//! check_interval_secs = 5
47//! failure_threshold = 3
48//!
49//! [[nodes]]
50//! host = "db-primary"
51//! port = 5432
52//! role = "primary"
53//!
54//! [[nodes]]
55//! host = "db-standby-1"
56//! port = 5432
57//! role = "standby"
58//! ```
59
60// ── Core modules (always available) ──────────────────────────────────
61pub mod backend;
62pub mod config;
63pub mod server;
64pub mod protocol;
65pub mod admin;
66pub mod connection_pool;
67pub mod load_balancer;
68pub mod health_checker;
69pub mod failover_controller;
70pub mod switchover_buffer;
71pub mod primary_tracker;
72pub mod pipeline;
73pub mod batch;
74pub mod request;
75
76// ── Connection pooling modes (Session/Transaction/Statement) ─────────
77#[cfg(feature = "pool-modes")]
78pub mod pool;
79
80// ── TR (Transaction Replay) modules ─────────────────────────────────
81#[cfg(feature = "ha-tr")]
82pub mod transaction_journal;
83#[cfg(feature = "ha-tr")]
84pub mod failover_replay;
85#[cfg(feature = "ha-tr")]
86pub mod cursor_restore;
87#[cfg(feature = "ha-tr")]
88pub mod session_migrate;
89#[cfg(feature = "ha-tr")]
90pub mod replay;
91
92// ── Zero-downtime PG major-version upgrade orchestrator (T2.1) ─────
93#[cfg(feature = "ha-tr")]
94pub mod upgrade_orchestrator;
95
96// ── R&D: shadow execution (T3.4) ────────────────────────────────────
97#[cfg(feature = "ha-tr")]
98pub mod shadow_execute;
99
100// ── Query caching (L1/L2/L3 multi-tier cache) ──────────────────────
101#[cfg(feature = "query-cache")]
102pub mod cache;
103
104// ── Query routing hints ─────────────────────────────────────────────
105#[cfg(feature = "routing-hints")]
106pub mod routing;
107
108// ── Replica lag-aware routing ───────────────────────────────────────
109#[cfg(feature = "lag-routing")]
110pub mod lag;
111
112// ── Rate limiting and query throttling ──────────────────────────────
113#[cfg(feature = "rate-limiting")]
114pub mod rate_limit;
115
116// ── Circuit breaker pattern ─────────────────────────────────────────
117#[cfg(feature = "circuit-breaker")]
118pub mod circuit_breaker;
119
120// ── Query analytics and slow query log ──────────────────────────────
121#[cfg(feature = "query-analytics")]
122pub mod analytics;
123
124// ── Anomaly detection (T3.1) — rate spikes, credential stuffing,
125// SQL injection heuristics, novel query shapes ─────────────────────
126#[cfg(feature = "anomaly-detection")]
127pub mod anomaly;
128
129// ── Edge / geo proxy mode (T3.2) ───────────────────────────────────
130#[cfg(feature = "edge-proxy")]
131pub mod edge;
132
133// ── Multi-tenancy support ───────────────────────────────────────────
134#[cfg(feature = "multi-tenancy")]
135pub mod multi_tenancy;
136
137// ── Authentication proxy ────────────────────────────────────────────
138#[cfg(feature = "auth-proxy")]
139pub mod auth;
140
141// ── Query rewriting ─────────────────────────────────────────────────
142#[cfg(feature = "query-rewriting")]
143pub mod rewriter;
144
145// ── WASM plugin system ──────────────────────────────────────────────
146#[cfg(feature = "wasm-plugins")]
147pub mod plugins;
148
149// ── GraphQL-to-SQL gateway ──────────────────────────────────────────
150#[cfg(feature = "graphql-gateway")]
151pub mod graphql;
152
153// ── Schema-aware routing ────────────────────────────────────────────
154#[cfg(feature = "schema-routing")]
155pub mod schema_routing;
156
157// ── Distributed intelligent caching (DistribCache) ──────────────────
158#[cfg(feature = "distribcache")]
159pub mod distribcache;
160
161// ── Embedded skill-bundle deployer ──────────────────────────────────
162//
163// Always-on: the `heliosdb-proxy install skills` subcommand calls
164// into this. Adds ~80 KiB to the binary (the `.claude/skills/`
165// bundle, embedded by `include_dir!`).
166pub mod skills;
167
168use thiserror::Error;
169use uuid::Uuid;
170
171/// Proxy error types
172#[derive(Debug, Error)]
173pub enum ProxyError {
174    #[error("Configuration error: {0}")]
175    Config(String),
176
177    #[error("Network error: {0}")]
178    Network(String),
179
180    #[error("Connection error: {0}")]
181    Connection(String),
182
183    #[error("Protocol error: {0}")]
184    Protocol(String),
185
186    #[error("Pool error: {0}")]
187    Pool(String),
188
189    #[error("Health check error: {0}")]
190    HealthCheck(String),
191
192    #[error("Failover error: {0}")]
193    Failover(String),
194
195    #[error("Failover failed: {0}")]
196    FailoverFailed(String),
197
198    #[error("Transaction replay failed: {0}")]
199    ReplayFailed(String),
200
201    #[error("Session migration failed: {0}")]
202    SessionMigration(String),
203
204    #[error("Cursor restore failed: {0}")]
205    CursorRestore(String),
206
207    #[error("Routing error: {0}")]
208    Routing(String),
209
210    #[error("Authentication error: {0}")]
211    Auth(String),
212
213    #[error("Pool exhausted: {0}")]
214    PoolExhausted(String),
215
216    #[error("Timeout: {0}")]
217    Timeout(String),
218
219    #[error("Configuration error: {0}")]
220    Configuration(String),
221
222    #[error("No healthy nodes available")]
223    NoHealthyNodes,
224
225    #[error("IO error: {0}")]
226    Io(#[from] std::io::Error),
227
228    #[error("JSON error: {0}")]
229    Json(#[from] serde_json::Error),
230
231    #[error("Internal error: {0}")]
232    Internal(String),
233}
234
235pub type Result<T> = std::result::Result<T, ProxyError>;
236
237/// Proxy version
238pub const VERSION: &str = env!("CARGO_PKG_VERSION");
239
240/// Default listen port
241pub const DEFAULT_PORT: u16 = 5432;
242
243/// Default admin port
244pub const DEFAULT_ADMIN_PORT: u16 = 9090;
245
246/// Node identifier
247#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
248pub struct NodeId(pub Uuid);
249
250impl NodeId {
251    pub fn new() -> Self {
252        Self(Uuid::new_v4())
253    }
254}
255
256impl Default for NodeId {
257    fn default() -> Self {
258        Self::new()
259    }
260}
261
262/// Node role in the cluster
263#[derive(Debug, Clone, Copy, PartialEq, Eq)]
264pub enum NodeRole {
265    /// Primary node (accepts writes)
266    Primary,
267    /// Standby node (read-only, can be promoted)
268    Standby,
269    /// Read replica (read-only, cannot be promoted)
270    ReadReplica,
271    /// Unknown role (during discovery)
272    Unknown,
273}
274
275/// Node endpoint information
276#[derive(Debug, Clone)]
277pub struct NodeEndpoint {
278    /// Node identifier
279    pub id: NodeId,
280    /// Host address
281    pub host: String,
282    /// Port
283    pub port: u16,
284    /// Node role
285    pub role: NodeRole,
286    /// Weight for load balancing (higher = more traffic)
287    pub weight: u32,
288    /// Whether this node is enabled
289    pub enabled: bool,
290}
291
292impl NodeEndpoint {
293    pub fn new(host: impl Into<String>, port: u16) -> Self {
294        Self {
295            id: NodeId::new(),
296            host: host.into(),
297            port,
298            role: NodeRole::Unknown,
299            weight: 100,
300            enabled: true,
301        }
302    }
303
304    pub fn with_role(mut self, role: NodeRole) -> Self {
305        self.role = role;
306        self
307    }
308
309    pub fn with_weight(mut self, weight: u32) -> Self {
310        self.weight = weight;
311        self
312    }
313
314    pub fn address(&self) -> String {
315        format!("{}:{}", self.host, self.port)
316    }
317}
318
319// ── PostgreSQL compatibility integration test ───────────────────────
320//
321// Verifies that all feature modules can be instantiated and used in a
322// PostgreSQL-only context (no HeliosDB dependencies required).
323
324#[cfg(test)]
325mod postgresql_compat_tests {
326    use super::*;
327
328    /// All core types work for PostgreSQL endpoints.
329    #[test]
330    fn test_pg_node_endpoints() {
331        let primary = NodeEndpoint::new("pg-primary.example.com", 5432)
332            .with_role(NodeRole::Primary);
333        let standby = NodeEndpoint::new("pg-standby-1.example.com", 5432)
334            .with_role(NodeRole::Standby);
335        let replica = NodeEndpoint::new("pg-replica-1.example.com", 5433)
336            .with_role(NodeRole::ReadReplica)
337            .with_weight(50);
338
339        assert_eq!(primary.role, NodeRole::Primary);
340        assert_eq!(standby.role, NodeRole::Standby);
341        assert_eq!(replica.weight, 50);
342        assert_eq!(replica.address(), "pg-replica-1.example.com:5433");
343    }
344
345    /// Load balancer config works for PostgreSQL cluster with read/write splitting.
346    #[test]
347    fn test_pg_load_balancer_config() {
348        use load_balancer::*;
349
350        let config = LoadBalancerConfig {
351            read_write_split: true,
352            read_strategy: RoutingStrategy::RoundRobin,
353            write_strategy: RoutingStrategy::PrimaryOnly,
354            ..Default::default()
355        };
356
357        assert!(config.read_write_split);
358        assert_eq!(config.read_strategy, RoutingStrategy::RoundRobin);
359        assert_eq!(config.write_strategy, RoutingStrategy::PrimaryOnly);
360
361        // Verify the LB can be constructed
362        let _lb = LoadBalancer::new(config);
363    }
364
365    /// Health checker works with standard PostgreSQL `SELECT 1` checks.
366    #[test]
367    fn test_pg_health_config() {
368        use health_checker::*;
369
370        let config = HealthConfig {
371            check_query: "SELECT 1".to_string(),
372            detailed_checks: true,
373            ..Default::default()
374        };
375
376        assert_eq!(config.check_query, "SELECT 1");
377        assert!(config.detailed_checks);
378    }
379
380    /// Failover controller works for PostgreSQL streaming replication.
381    #[tokio::test]
382    async fn test_pg_failover() {
383        use failover_controller::*;
384
385        let controller = FailoverController::new(FailoverConfig {
386            auto_failover: true,
387            prefer_sync_standby: true,
388            ..Default::default()
389        });
390
391        let primary = NodeId::new();
392        controller.set_primary(primary).await;
393        assert_eq!(controller.get_primary().await, Some(primary));
394
395        // Register candidates
396        let sync_standby = NodeId::new();
397        controller.register_candidate(FailoverCandidate {
398            node_id: sync_standby,
399            endpoint: NodeEndpoint::new("pg-sync", 5432).with_role(NodeRole::Standby),
400            is_sync: true,
401            lag_bytes: 0,
402            priority: 1,
403            last_heartbeat: None,
404        }).await;
405
406        let async_standby = NodeId::new();
407        controller.register_candidate(FailoverCandidate {
408            node_id: async_standby,
409            endpoint: NodeEndpoint::new("pg-async", 5432).with_role(NodeRole::Standby),
410            is_sync: false,
411            lag_bytes: 1024,
412            priority: 2,
413            last_heartbeat: None,
414        }).await;
415
416        // Verify state
417        assert_eq!(controller.state().await, FailoverState::Normal);
418        assert_eq!(controller.failover_count(), 0);
419    }
420
421    /// Connection pool config works for PostgreSQL connections.
422    #[test]
423    fn test_pg_connection_pool() {
424        use connection_pool::*;
425
426        let config = PoolConfig {
427            min_connections: 2,
428            max_connections: 20,
429            test_on_acquire: true,
430            ..Default::default()
431        };
432
433        assert_eq!(config.min_connections, 2);
434        assert_eq!(config.max_connections, 20);
435        assert!(config.test_on_acquire);
436
437        let _pool = ConnectionPool::new(config);
438    }
439
440    /// Switchover buffer works for PostgreSQL planned switchover.
441    #[tokio::test]
442    async fn test_pg_switchover_buffer() {
443        use switchover_buffer::*;
444
445        let buffer = SwitchoverBuffer::new(BufferConfig {
446            buffer_timeout: std::time::Duration::from_secs(5),
447            max_buffered_queries: 1000,
448            ..Default::default()
449        });
450
451        assert_eq!(buffer.state(), BufferState::Passthrough);
452        assert!(!buffer.is_buffering());
453
454        // Simulate pg_ctl promote workflow
455        buffer.start_buffering();
456        assert!(buffer.is_buffering());
457
458        let rx = buffer.buffer_query(
459            "INSERT INTO orders VALUES (1)".to_string(),
460            vec![],
461            1,
462        ).unwrap();
463
464        // Simulate promotion complete
465        buffer.stop_buffering();
466        buffer.drain(|_sql, _params| async { Ok(()) }).await;
467
468        let result = rx.await.unwrap();
469        assert!(matches!(result, BufferResult::Success));
470    }
471
472    /// Primary tracker works with the standalone mode for PostgreSQL.
473    #[test]
474    fn test_pg_primary_tracker_standalone() {
475        use primary_tracker::*;
476
477        let tracker = PrimaryTracker::new_standalone();
478
479        // Simulate discovering primary via pg_is_in_recovery()
480        let pg_primary = uuid::Uuid::new_v4();
481        tracker.set_primary(pg_primary, "pg-primary.local:5432".to_string());
482        tracker.confirm_primary();
483
484        assert!(tracker.has_primary());
485        assert!(tracker.get_primary().unwrap().is_confirmed);
486
487        // Simulate failover detected
488        tracker.clear_primary();
489        assert!(!tracker.has_primary());
490
491        // New primary
492        let pg_new_primary = uuid::Uuid::new_v4();
493        tracker.set_primary(pg_new_primary, "pg-standby.local:5432".to_string());
494        tracker.confirm_primary();
495        assert_eq!(
496            tracker.get_primary_address(),
497            Some("pg-standby.local:5432".to_string())
498        );
499    }
500
501    /// Transaction journal works for PostgreSQL transaction replay.
502    #[cfg(feature = "ha-tr")]
503    #[tokio::test]
504    async fn test_pg_transaction_replay() {
505        use transaction_journal::*;
506
507        let journal = TransactionJournal::new();
508        let tx_id = uuid::Uuid::new_v4();
509        let session_id = uuid::Uuid::new_v4();
510        let node = NodeId::new();
511
512        // Journal a PostgreSQL transaction
513        journal.begin_transaction(tx_id, session_id, node, 0).await.unwrap();
514        journal.log_statement(
515            tx_id,
516            "BEGIN".to_string(),
517            vec![],
518            None,
519            None,
520            1,
521        ).await.unwrap();
522        journal.log_statement(
523            tx_id,
524            "INSERT INTO accounts (id, balance) VALUES ($1, $2)".to_string(),
525            vec![JournalValue::Int64(1), JournalValue::Float64(100.0)],
526            Some(12345),
527            Some(1),
528            5,
529        ).await.unwrap();
530        journal.log_statement(
531            tx_id,
532            "UPDATE accounts SET balance = balance - $1 WHERE id = $2".to_string(),
533            vec![JournalValue::Float64(25.0), JournalValue::Int64(1)],
534            Some(67890),
535            Some(1),
536            3,
537        ).await.unwrap();
538
539        let j = journal.get_journal(&tx_id).await.unwrap();
540        assert_eq!(j.entries.len(), 3);
541        assert!(j.has_mutations);
542
543        // Verify statement types
544        assert_eq!(j.entries[0].statement_type, StatementType::Transaction);
545        assert_eq!(j.entries[1].statement_type, StatementType::Insert);
546        assert_eq!(j.entries[2].statement_type, StatementType::Update);
547
548        // Commit clears journal
549        journal.commit_transaction(tx_id).await.unwrap();
550        assert!(journal.get_journal(&tx_id).await.is_none());
551    }
552
553    /// Session migration works for PostgreSQL session parameters.
554    #[cfg(feature = "ha-tr")]
555    #[tokio::test]
556    async fn test_pg_session_migration() {
557        use session_migrate::*;
558
559        let migrate = SessionMigrate::new();
560        let session_id = uuid::Uuid::new_v4();
561        let node = NodeId::new();
562
563        let mut state = SessionState::new(
564            session_id,
565            "postgres".to_string(),
566            "mydb".to_string(),
567            node,
568        );
569
570        // Set PostgreSQL-specific session parameters
571        state.set_parameter("timezone".to_string(), "America/New_York".to_string());
572        state.set_parameter("search_path".to_string(), "public, app_schema".to_string());
573        state.set_parameter("statement_timeout".to_string(), "30000".to_string());
574        state.set_parameter("work_mem".to_string(), "256MB".to_string());
575
576        // Add a prepared statement
577        state.add_prepared_statement(PreparedStatementInfo {
578            name: "get_user".to_string(),
579            query: "SELECT * FROM users WHERE id = $1".to_string(),
580            param_types: vec!["integer".to_string()],
581            created_at: chrono::Utc::now(),
582        });
583
584        migrate.register_session(state).await.unwrap();
585
586        // Generate SET statements for replay on new primary
587        let session = migrate.get_session(&session_id).await.unwrap();
588        let restore_stmts = session.generate_restore_statements();
589
590        assert!(restore_stmts.iter().any(|s| s.contains("America/New_York")));
591        assert!(restore_stmts.iter().any(|s| s.contains("search_path")));
592        assert!(restore_stmts.iter().any(|s| s.contains("statement_timeout")));
593        assert!(restore_stmts.iter().any(|s| s.contains("PREPARE get_user")));
594    }
595
596    /// Pipeline supports PostgreSQL extended query protocol pipelining.
597    #[tokio::test]
598    async fn test_pg_pipelining() {
599        use pipeline::*;
600
601        let pipeline = RequestPipeline::new(PipelineConfig {
602            max_depth: 16,
603            enabled: true,
604            ..Default::default()
605        });
606
607        let conn_id = 1;
608
609        // Simulate pipelined Parse/Bind/Execute sequence
610        let t1 = pipeline.submit(conn_id, b"Parse: SELECT $1::int".to_vec()).unwrap();
611        let t2 = pipeline.submit(conn_id, b"Bind: [42]".to_vec()).unwrap();
612        let t3 = pipeline.submit(conn_id, b"Execute".to_vec()).unwrap();
613
614        assert_eq!(pipeline.depth(conn_id), 3);
615
616        // Complete in order (FIFO — matches PG protocol)
617        pipeline.complete_next(conn_id, b"ParseComplete".to_vec(), true, None);
618        pipeline.complete_next(conn_id, b"BindComplete".to_vec(), true, None);
619        pipeline.complete_next(conn_id, b"DataRow: 42".to_vec(), true, None);
620
621        assert_eq!(pipeline.depth(conn_id), 0);
622
623        let r1 = t1.wait().await.unwrap();
624        assert!(r1.success);
625    }
626
627    /// Batch INSERT works for PostgreSQL bulk inserts.
628    #[tokio::test]
629    async fn test_pg_batch_insert() {
630        use batch::*;
631
632        let config = BatchConfig {
633            max_batch_size: 3,
634            ..Default::default()
635        };
636        let batcher = InsertBatcher::new(config);
637
638        batcher.add(
639            "orders".to_string(),
640            vec!["id".to_string(), "total".to_string()],
641            vec![vec!["1".to_string(), "99.99".to_string()]],
642            "INSERT INTO orders (id, total) VALUES (1, 99.99)".to_string(),
643        ).unwrap();
644
645        assert_eq!(batcher.batch_size("orders"), 1);
646
647        let stats = batcher.stats();
648        assert_eq!(stats.inserts_received, 1);
649        assert_eq!(stats.rows_received, 1);
650    }
651}