1pub mod backend;
62pub mod client_tls;
63pub mod auth_scram;
64pub mod agent_contract;
65pub mod mcp;
66pub mod http_gateway;
67pub mod mirror;
68pub mod branch;
69pub mod plugin_registry;
70pub mod config;
71pub mod server;
72pub mod protocol;
73pub mod admin;
74pub mod connection_pool;
75pub mod load_balancer;
76pub mod health_checker;
77pub mod failover_controller;
78pub mod switchover_buffer;
79pub mod primary_tracker;
80pub mod pipeline;
81pub mod batch;
82pub mod request;
83
84#[cfg(feature = "pool-modes")]
86pub mod pool;
87
88#[cfg(feature = "ha-tr")]
90pub mod transaction_journal;
91#[cfg(feature = "ha-tr")]
92pub mod failover_replay;
93#[cfg(feature = "ha-tr")]
94pub mod cursor_restore;
95#[cfg(feature = "ha-tr")]
96pub mod session_migrate;
97#[cfg(feature = "ha-tr")]
98pub mod replay;
99
100#[cfg(feature = "ha-tr")]
102pub mod upgrade_orchestrator;
103
104#[cfg(feature = "ha-tr")]
106pub mod shadow_execute;
107
108#[cfg(feature = "query-cache")]
110pub mod cache;
111
112#[cfg(feature = "routing-hints")]
114pub mod routing;
115
116#[cfg(feature = "lag-routing")]
118pub mod lag;
119
120#[cfg(feature = "rate-limiting")]
122pub mod rate_limit;
123
124#[cfg(feature = "circuit-breaker")]
126pub mod circuit_breaker;
127
128#[cfg(feature = "query-analytics")]
130pub mod analytics;
131
132#[cfg(feature = "anomaly-detection")]
135pub mod anomaly;
136
137#[cfg(feature = "edge-proxy")]
139pub mod edge;
140
141#[cfg(feature = "multi-tenancy")]
143pub mod multi_tenancy;
144
145#[cfg(feature = "auth-proxy")]
147pub mod auth;
148
149#[cfg(feature = "query-rewriting")]
151pub mod rewriter;
152
153#[cfg(feature = "wasm-plugins")]
155pub mod plugins;
156
157#[cfg(feature = "graphql-gateway")]
159pub mod graphql;
160
161#[cfg(feature = "schema-routing")]
163pub mod schema_routing;
164
165#[cfg(feature = "distribcache")]
167pub mod distribcache;
168
169pub mod skills;
175
176use thiserror::Error;
177use uuid::Uuid;
178
179#[derive(Debug, Error)]
181pub enum ProxyError {
182 #[error("Configuration error: {0}")]
183 Config(String),
184
185 #[error("Network error: {0}")]
186 Network(String),
187
188 #[error("Connection error: {0}")]
189 Connection(String),
190
191 #[error("Protocol error: {0}")]
192 Protocol(String),
193
194 #[error("Pool error: {0}")]
195 Pool(String),
196
197 #[error("Health check error: {0}")]
198 HealthCheck(String),
199
200 #[error("Failover error: {0}")]
201 Failover(String),
202
203 #[error("Failover failed: {0}")]
204 FailoverFailed(String),
205
206 #[error("Transaction replay failed: {0}")]
207 ReplayFailed(String),
208
209 #[error("Session migration failed: {0}")]
210 SessionMigration(String),
211
212 #[error("Cursor restore failed: {0}")]
213 CursorRestore(String),
214
215 #[error("Routing error: {0}")]
216 Routing(String),
217
218 #[error("Authentication error: {0}")]
219 Auth(String),
220
221 #[error("Pool exhausted: {0}")]
222 PoolExhausted(String),
223
224 #[error("Timeout: {0}")]
225 Timeout(String),
226
227 #[error("Configuration error: {0}")]
228 Configuration(String),
229
230 #[error("No healthy nodes available")]
231 NoHealthyNodes,
232
233 #[error("IO error: {0}")]
234 Io(#[from] std::io::Error),
235
236 #[error("JSON error: {0}")]
237 Json(#[from] serde_json::Error),
238
239 #[error("Internal error: {0}")]
240 Internal(String),
241}
242
243pub type Result<T> = std::result::Result<T, ProxyError>;
244
245pub const VERSION: &str = env!("CARGO_PKG_VERSION");
247
248pub const DEFAULT_PORT: u16 = 5432;
250
251pub const DEFAULT_ADMIN_PORT: u16 = 9090;
253
254#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
256pub struct NodeId(pub Uuid);
257
258impl NodeId {
259 pub fn new() -> Self {
260 Self(Uuid::new_v4())
261 }
262}
263
264impl Default for NodeId {
265 fn default() -> Self {
266 Self::new()
267 }
268}
269
270#[derive(Debug, Clone, Copy, PartialEq, Eq)]
272pub enum NodeRole {
273 Primary,
275 Standby,
277 ReadReplica,
279 Unknown,
281}
282
283#[derive(Debug, Clone)]
285pub struct NodeEndpoint {
286 pub id: NodeId,
288 pub host: String,
290 pub port: u16,
292 pub role: NodeRole,
294 pub weight: u32,
296 pub enabled: bool,
298}
299
300impl NodeEndpoint {
301 pub fn new(host: impl Into<String>, port: u16) -> Self {
302 Self {
303 id: NodeId::new(),
304 host: host.into(),
305 port,
306 role: NodeRole::Unknown,
307 weight: 100,
308 enabled: true,
309 }
310 }
311
312 pub fn with_role(mut self, role: NodeRole) -> Self {
313 self.role = role;
314 self
315 }
316
317 pub fn with_weight(mut self, weight: u32) -> Self {
318 self.weight = weight;
319 self
320 }
321
322 pub fn address(&self) -> String {
323 format!("{}:{}", self.host, self.port)
324 }
325}
326
327#[cfg(test)]
333mod postgresql_compat_tests {
334 use super::*;
335
336 #[test]
338 fn test_pg_node_endpoints() {
339 let primary = NodeEndpoint::new("pg-primary.example.com", 5432)
340 .with_role(NodeRole::Primary);
341 let standby = NodeEndpoint::new("pg-standby-1.example.com", 5432)
342 .with_role(NodeRole::Standby);
343 let replica = NodeEndpoint::new("pg-replica-1.example.com", 5433)
344 .with_role(NodeRole::ReadReplica)
345 .with_weight(50);
346
347 assert_eq!(primary.role, NodeRole::Primary);
348 assert_eq!(standby.role, NodeRole::Standby);
349 assert_eq!(replica.weight, 50);
350 assert_eq!(replica.address(), "pg-replica-1.example.com:5433");
351 }
352
353 #[test]
355 fn test_pg_load_balancer_config() {
356 use load_balancer::*;
357
358 let config = LoadBalancerConfig {
359 read_write_split: true,
360 read_strategy: RoutingStrategy::RoundRobin,
361 write_strategy: RoutingStrategy::PrimaryOnly,
362 ..Default::default()
363 };
364
365 assert!(config.read_write_split);
366 assert_eq!(config.read_strategy, RoutingStrategy::RoundRobin);
367 assert_eq!(config.write_strategy, RoutingStrategy::PrimaryOnly);
368
369 let _lb = LoadBalancer::new(config);
371 }
372
373 #[test]
375 fn test_pg_health_config() {
376 use health_checker::*;
377
378 let config = HealthConfig {
379 check_query: "SELECT 1".to_string(),
380 detailed_checks: true,
381 ..Default::default()
382 };
383
384 assert_eq!(config.check_query, "SELECT 1");
385 assert!(config.detailed_checks);
386 }
387
388 #[tokio::test]
390 async fn test_pg_failover() {
391 use failover_controller::*;
392
393 let controller = FailoverController::new(FailoverConfig {
394 auto_failover: true,
395 prefer_sync_standby: true,
396 ..Default::default()
397 });
398
399 let primary = NodeId::new();
400 controller.set_primary(primary).await;
401 assert_eq!(controller.get_primary().await, Some(primary));
402
403 let sync_standby = NodeId::new();
405 controller.register_candidate(FailoverCandidate {
406 node_id: sync_standby,
407 endpoint: NodeEndpoint::new("pg-sync", 5432).with_role(NodeRole::Standby),
408 is_sync: true,
409 lag_bytes: 0,
410 priority: 1,
411 last_heartbeat: None,
412 }).await;
413
414 let async_standby = NodeId::new();
415 controller.register_candidate(FailoverCandidate {
416 node_id: async_standby,
417 endpoint: NodeEndpoint::new("pg-async", 5432).with_role(NodeRole::Standby),
418 is_sync: false,
419 lag_bytes: 1024,
420 priority: 2,
421 last_heartbeat: None,
422 }).await;
423
424 assert_eq!(controller.state().await, FailoverState::Normal);
426 assert_eq!(controller.failover_count(), 0);
427 }
428
429 #[test]
431 fn test_pg_connection_pool() {
432 use connection_pool::*;
433
434 let config = PoolConfig {
435 min_connections: 2,
436 max_connections: 20,
437 test_on_acquire: true,
438 ..Default::default()
439 };
440
441 assert_eq!(config.min_connections, 2);
442 assert_eq!(config.max_connections, 20);
443 assert!(config.test_on_acquire);
444
445 let _pool = ConnectionPool::new(config);
446 }
447
448 #[tokio::test]
450 async fn test_pg_switchover_buffer() {
451 use switchover_buffer::*;
452
453 let buffer = SwitchoverBuffer::new(BufferConfig {
454 buffer_timeout: std::time::Duration::from_secs(5),
455 max_buffered_queries: 1000,
456 ..Default::default()
457 });
458
459 assert_eq!(buffer.state(), BufferState::Passthrough);
460 assert!(!buffer.is_buffering());
461
462 buffer.start_buffering();
464 assert!(buffer.is_buffering());
465
466 let rx = buffer.buffer_query(
467 "INSERT INTO orders VALUES (1)".to_string(),
468 vec![],
469 1,
470 ).unwrap();
471
472 buffer.stop_buffering();
474 buffer.drain(|_sql, _params| async { Ok(()) }).await;
475
476 let result = rx.await.unwrap();
477 assert!(matches!(result, BufferResult::Success));
478 }
479
480 #[test]
482 fn test_pg_primary_tracker_standalone() {
483 use primary_tracker::*;
484
485 let tracker = PrimaryTracker::new_standalone();
486
487 let pg_primary = uuid::Uuid::new_v4();
489 tracker.set_primary(pg_primary, "pg-primary.local:5432".to_string());
490 tracker.confirm_primary();
491
492 assert!(tracker.has_primary());
493 assert!(tracker.get_primary().unwrap().is_confirmed);
494
495 tracker.clear_primary();
497 assert!(!tracker.has_primary());
498
499 let pg_new_primary = uuid::Uuid::new_v4();
501 tracker.set_primary(pg_new_primary, "pg-standby.local:5432".to_string());
502 tracker.confirm_primary();
503 assert_eq!(
504 tracker.get_primary_address(),
505 Some("pg-standby.local:5432".to_string())
506 );
507 }
508
509 #[cfg(feature = "ha-tr")]
511 #[tokio::test]
512 async fn test_pg_transaction_replay() {
513 use transaction_journal::*;
514
515 let journal = TransactionJournal::new();
516 let tx_id = uuid::Uuid::new_v4();
517 let session_id = uuid::Uuid::new_v4();
518 let node = NodeId::new();
519
520 journal.begin_transaction(tx_id, session_id, node, 0).await.unwrap();
522 journal.log_statement(
523 tx_id,
524 "BEGIN".to_string(),
525 vec![],
526 None,
527 None,
528 1,
529 ).await.unwrap();
530 journal.log_statement(
531 tx_id,
532 "INSERT INTO accounts (id, balance) VALUES ($1, $2)".to_string(),
533 vec![JournalValue::Int64(1), JournalValue::Float64(100.0)],
534 Some(12345),
535 Some(1),
536 5,
537 ).await.unwrap();
538 journal.log_statement(
539 tx_id,
540 "UPDATE accounts SET balance = balance - $1 WHERE id = $2".to_string(),
541 vec![JournalValue::Float64(25.0), JournalValue::Int64(1)],
542 Some(67890),
543 Some(1),
544 3,
545 ).await.unwrap();
546
547 let j = journal.get_journal(&tx_id).await.unwrap();
548 assert_eq!(j.entries.len(), 3);
549 assert!(j.has_mutations);
550
551 assert_eq!(j.entries[0].statement_type, StatementType::Transaction);
553 assert_eq!(j.entries[1].statement_type, StatementType::Insert);
554 assert_eq!(j.entries[2].statement_type, StatementType::Update);
555
556 journal.commit_transaction(tx_id).await.unwrap();
558 assert!(journal.get_journal(&tx_id).await.is_none());
559 }
560
561 #[cfg(feature = "ha-tr")]
563 #[tokio::test]
564 async fn test_pg_session_migration() {
565 use session_migrate::*;
566
567 let migrate = SessionMigrate::new();
568 let session_id = uuid::Uuid::new_v4();
569 let node = NodeId::new();
570
571 let mut state = SessionState::new(
572 session_id,
573 "postgres".to_string(),
574 "mydb".to_string(),
575 node,
576 );
577
578 state.set_parameter("timezone".to_string(), "America/New_York".to_string());
580 state.set_parameter("search_path".to_string(), "public, app_schema".to_string());
581 state.set_parameter("statement_timeout".to_string(), "30000".to_string());
582 state.set_parameter("work_mem".to_string(), "256MB".to_string());
583
584 state.add_prepared_statement(PreparedStatementInfo {
586 name: "get_user".to_string(),
587 query: "SELECT * FROM users WHERE id = $1".to_string(),
588 param_types: vec!["integer".to_string()],
589 created_at: chrono::Utc::now(),
590 });
591
592 migrate.register_session(state).await.unwrap();
593
594 let session = migrate.get_session(&session_id).await.unwrap();
596 let restore_stmts = session.generate_restore_statements();
597
598 assert!(restore_stmts.iter().any(|s| s.contains("America/New_York")));
599 assert!(restore_stmts.iter().any(|s| s.contains("search_path")));
600 assert!(restore_stmts.iter().any(|s| s.contains("statement_timeout")));
601 assert!(restore_stmts.iter().any(|s| s.contains("PREPARE get_user")));
602 }
603
604 #[tokio::test]
606 async fn test_pg_pipelining() {
607 use pipeline::*;
608
609 let pipeline = RequestPipeline::new(PipelineConfig {
610 max_depth: 16,
611 enabled: true,
612 ..Default::default()
613 });
614
615 let conn_id = 1;
616
617 let t1 = pipeline.submit(conn_id, b"Parse: SELECT $1::int".to_vec()).unwrap();
619 let t2 = pipeline.submit(conn_id, b"Bind: [42]".to_vec()).unwrap();
620 let t3 = pipeline.submit(conn_id, b"Execute".to_vec()).unwrap();
621
622 assert_eq!(pipeline.depth(conn_id), 3);
623
624 pipeline.complete_next(conn_id, b"ParseComplete".to_vec(), true, None);
626 pipeline.complete_next(conn_id, b"BindComplete".to_vec(), true, None);
627 pipeline.complete_next(conn_id, b"DataRow: 42".to_vec(), true, None);
628
629 assert_eq!(pipeline.depth(conn_id), 0);
630
631 let r1 = t1.wait().await.unwrap();
632 assert!(r1.success);
633 }
634
635 #[tokio::test]
637 async fn test_pg_batch_insert() {
638 use batch::*;
639
640 let config = BatchConfig {
641 max_batch_size: 3,
642 ..Default::default()
643 };
644 let batcher = InsertBatcher::new(config);
645
646 batcher.add(
647 "orders".to_string(),
648 vec!["id".to_string(), "total".to_string()],
649 vec![vec!["1".to_string(), "99.99".to_string()]],
650 "INSERT INTO orders (id, total) VALUES (1, 99.99)".to_string(),
651 ).unwrap();
652
653 assert_eq!(batcher.batch_size("orders"), 1);
654
655 let stats = batcher.stats();
656 assert_eq!(stats.inserts_received, 1);
657 assert_eq!(stats.rows_received, 1);
658 }
659}