1pub mod admin;
62pub mod agent_contract;
63pub mod auth_scram;
64pub mod backend;
65pub mod batch;
66pub mod branch;
67pub mod client_tls;
68pub mod config;
69pub mod connection_pool;
70pub mod failover_controller;
71pub mod health_checker;
72pub mod http_gateway;
73pub mod load_balancer;
74pub mod mcp;
75pub mod mirror;
76pub mod pipeline;
77pub mod plugin_registry;
78pub mod primary_tracker;
79pub mod protocol;
80pub mod request;
81pub mod server;
82pub mod switchover_buffer;
83
84#[cfg(feature = "pool-modes")]
86pub mod pool;
87
88#[cfg(feature = "ha-tr")]
90pub mod cursor_restore;
91#[cfg(feature = "ha-tr")]
92pub mod failover_replay;
93#[cfg(feature = "ha-tr")]
94pub mod replay;
95#[cfg(feature = "ha-tr")]
96pub mod session_migrate;
97#[cfg(feature = "ha-tr")]
98pub mod transaction_journal;
99
100#[cfg(feature = "ha-tr")]
102pub mod upgrade_orchestrator;
103
104#[cfg(feature = "ha-tr")]
106pub mod shadow_execute;
107
108#[cfg(feature = "query-cache")]
110pub mod cache;
111
112#[cfg(feature = "routing-hints")]
114pub mod routing;
115
116#[cfg(feature = "lag-routing")]
118pub mod lag;
119
120#[cfg(feature = "rate-limiting")]
122pub mod rate_limit;
123
124#[cfg(feature = "circuit-breaker")]
126pub mod circuit_breaker;
127
128#[cfg(feature = "query-analytics")]
130pub mod analytics;
131
132#[cfg(feature = "anomaly-detection")]
135pub mod anomaly;
136
137#[cfg(feature = "edge-proxy")]
139pub mod edge;
140
141#[cfg(feature = "multi-tenancy")]
143pub mod multi_tenancy;
144
145#[cfg(feature = "auth-proxy")]
147pub mod auth;
148
149#[cfg(feature = "query-rewriting")]
151pub mod rewriter;
152
153#[cfg(feature = "wasm-plugins")]
155pub mod plugins;
156
157#[cfg(feature = "graphql-gateway")]
159pub mod graphql;
160#[cfg(feature = "graphql-gateway")]
161pub mod graphql_gateway;
162
163#[cfg(feature = "schema-routing")]
165pub mod schema_routing;
166
167#[cfg(feature = "distribcache")]
169pub mod distribcache;
170
171pub mod skills;
177
178use thiserror::Error;
179use uuid::Uuid;
180
181#[derive(Debug, Error)]
183pub enum ProxyError {
184 #[error("Configuration error: {0}")]
185 Config(String),
186
187 #[error("Network error: {0}")]
188 Network(String),
189
190 #[error("Connection error: {0}")]
191 Connection(String),
192
193 #[error("Protocol error: {0}")]
194 Protocol(String),
195
196 #[error("Pool error: {0}")]
197 Pool(String),
198
199 #[error("Health check error: {0}")]
200 HealthCheck(String),
201
202 #[error("Failover error: {0}")]
203 Failover(String),
204
205 #[error("Failover failed: {0}")]
206 FailoverFailed(String),
207
208 #[error("Transaction replay failed: {0}")]
209 ReplayFailed(String),
210
211 #[error("Session migration failed: {0}")]
212 SessionMigration(String),
213
214 #[error("Cursor restore failed: {0}")]
215 CursorRestore(String),
216
217 #[error("Routing error: {0}")]
218 Routing(String),
219
220 #[error("Authentication error: {0}")]
221 Auth(String),
222
223 #[error("Pool exhausted: {0}")]
224 PoolExhausted(String),
225
226 #[error("Timeout: {0}")]
227 Timeout(String),
228
229 #[error("Configuration error: {0}")]
230 Configuration(String),
231
232 #[error("No healthy nodes available")]
233 NoHealthyNodes,
234
235 #[error("IO error: {0}")]
236 Io(#[from] std::io::Error),
237
238 #[error("JSON error: {0}")]
239 Json(#[from] serde_json::Error),
240
241 #[error("Internal error: {0}")]
242 Internal(String),
243}
244
245pub type Result<T> = std::result::Result<T, ProxyError>;
246
247pub const VERSION: &str = env!("CARGO_PKG_VERSION");
249
250pub const DEFAULT_PORT: u16 = 5432;
252
253pub const DEFAULT_ADMIN_PORT: u16 = 9090;
255
256#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
258pub struct NodeId(pub Uuid);
259
260impl NodeId {
261 pub fn new() -> Self {
262 Self(Uuid::new_v4())
263 }
264}
265
266impl Default for NodeId {
267 fn default() -> Self {
268 Self::new()
269 }
270}
271
272#[derive(Debug, Clone, Copy, PartialEq, Eq)]
274pub enum NodeRole {
275 Primary,
277 Standby,
279 ReadReplica,
281 Unknown,
283}
284
285#[derive(Debug, Clone)]
287pub struct NodeEndpoint {
288 pub id: NodeId,
290 pub host: String,
292 pub port: u16,
294 pub role: NodeRole,
296 pub weight: u32,
298 pub enabled: bool,
300}
301
302impl NodeEndpoint {
303 pub fn new(host: impl Into<String>, port: u16) -> Self {
304 Self {
305 id: NodeId::new(),
306 host: host.into(),
307 port,
308 role: NodeRole::Unknown,
309 weight: 100,
310 enabled: true,
311 }
312 }
313
314 pub fn with_role(mut self, role: NodeRole) -> Self {
315 self.role = role;
316 self
317 }
318
319 pub fn with_weight(mut self, weight: u32) -> Self {
320 self.weight = weight;
321 self
322 }
323
324 pub fn address(&self) -> String {
325 format!("{}:{}", self.host, self.port)
326 }
327}
328
329#[cfg(test)]
335mod postgresql_compat_tests {
336 use super::*;
337
338 #[test]
340 fn test_pg_node_endpoints() {
341 let primary =
342 NodeEndpoint::new("pg-primary.example.com", 5432).with_role(NodeRole::Primary);
343 let standby =
344 NodeEndpoint::new("pg-standby-1.example.com", 5432).with_role(NodeRole::Standby);
345 let replica = NodeEndpoint::new("pg-replica-1.example.com", 5433)
346 .with_role(NodeRole::ReadReplica)
347 .with_weight(50);
348
349 assert_eq!(primary.role, NodeRole::Primary);
350 assert_eq!(standby.role, NodeRole::Standby);
351 assert_eq!(replica.weight, 50);
352 assert_eq!(replica.address(), "pg-replica-1.example.com:5433");
353 }
354
355 #[test]
357 fn test_pg_load_balancer_config() {
358 use load_balancer::*;
359
360 let config = LoadBalancerConfig {
361 read_write_split: true,
362 read_strategy: RoutingStrategy::RoundRobin,
363 write_strategy: RoutingStrategy::PrimaryOnly,
364 ..Default::default()
365 };
366
367 assert!(config.read_write_split);
368 assert_eq!(config.read_strategy, RoutingStrategy::RoundRobin);
369 assert_eq!(config.write_strategy, RoutingStrategy::PrimaryOnly);
370
371 let _lb = LoadBalancer::new(config);
373 }
374
375 #[test]
377 fn test_pg_health_config() {
378 use health_checker::*;
379
380 let config = HealthConfig {
381 check_query: "SELECT 1".to_string(),
382 detailed_checks: true,
383 ..Default::default()
384 };
385
386 assert_eq!(config.check_query, "SELECT 1");
387 assert!(config.detailed_checks);
388 }
389
390 #[tokio::test]
392 async fn test_pg_failover() {
393 use failover_controller::*;
394
395 let controller = FailoverController::new(FailoverConfig {
396 auto_failover: true,
397 prefer_sync_standby: true,
398 ..Default::default()
399 });
400
401 let primary = NodeId::new();
402 controller.set_primary(primary).await;
403 assert_eq!(controller.get_primary().await, Some(primary));
404
405 let sync_standby = NodeId::new();
407 controller
408 .register_candidate(FailoverCandidate {
409 node_id: sync_standby,
410 endpoint: NodeEndpoint::new("pg-sync", 5432).with_role(NodeRole::Standby),
411 is_sync: true,
412 lag_bytes: 0,
413 priority: 1,
414 last_heartbeat: None,
415 })
416 .await;
417
418 let async_standby = NodeId::new();
419 controller
420 .register_candidate(FailoverCandidate {
421 node_id: async_standby,
422 endpoint: NodeEndpoint::new("pg-async", 5432).with_role(NodeRole::Standby),
423 is_sync: false,
424 lag_bytes: 1024,
425 priority: 2,
426 last_heartbeat: None,
427 })
428 .await;
429
430 assert_eq!(controller.state().await, FailoverState::Normal);
432 assert_eq!(controller.failover_count(), 0);
433 }
434
435 #[test]
437 fn test_pg_connection_pool() {
438 use connection_pool::*;
439
440 let config = PoolConfig {
441 min_connections: 2,
442 max_connections: 20,
443 test_on_acquire: true,
444 ..Default::default()
445 };
446
447 assert_eq!(config.min_connections, 2);
448 assert_eq!(config.max_connections, 20);
449 assert!(config.test_on_acquire);
450
451 let _pool = ConnectionPool::new(config);
452 }
453
454 #[tokio::test]
456 async fn test_pg_switchover_buffer() {
457 use switchover_buffer::*;
458
459 let buffer = SwitchoverBuffer::new(BufferConfig {
460 buffer_timeout: std::time::Duration::from_secs(5),
461 max_buffered_queries: 1000,
462 ..Default::default()
463 });
464
465 assert_eq!(buffer.state(), BufferState::Passthrough);
466 assert!(!buffer.is_buffering());
467
468 buffer.start_buffering();
470 assert!(buffer.is_buffering());
471
472 let rx = buffer
473 .buffer_query("INSERT INTO orders VALUES (1)".to_string(), vec![], 1)
474 .unwrap();
475
476 buffer.stop_buffering();
478 buffer.drain(|_sql, _params| async { Ok(()) }).await;
479
480 let result = rx.await.unwrap();
481 assert!(matches!(result, BufferResult::Success));
482 }
483
484 #[test]
486 fn test_pg_primary_tracker_standalone() {
487 use primary_tracker::*;
488
489 let tracker = PrimaryTracker::new_standalone();
490
491 let pg_primary = uuid::Uuid::new_v4();
493 tracker.set_primary(pg_primary, "pg-primary.local:5432".to_string());
494 tracker.confirm_primary();
495
496 assert!(tracker.has_primary());
497 assert!(tracker.get_primary().unwrap().is_confirmed);
498
499 tracker.clear_primary();
501 assert!(!tracker.has_primary());
502
503 let pg_new_primary = uuid::Uuid::new_v4();
505 tracker.set_primary(pg_new_primary, "pg-standby.local:5432".to_string());
506 tracker.confirm_primary();
507 assert_eq!(
508 tracker.get_primary_address(),
509 Some("pg-standby.local:5432".to_string())
510 );
511 }
512
513 #[cfg(feature = "ha-tr")]
515 #[tokio::test]
516 async fn test_pg_transaction_replay() {
517 use transaction_journal::*;
518
519 let journal = TransactionJournal::new();
520 let tx_id = uuid::Uuid::new_v4();
521 let session_id = uuid::Uuid::new_v4();
522 let node = NodeId::new();
523
524 journal
526 .begin_transaction(tx_id, session_id, node, 0)
527 .await
528 .unwrap();
529 journal
530 .log_statement(tx_id, "BEGIN".to_string(), vec![], None, None, 1)
531 .await
532 .unwrap();
533 journal
534 .log_statement(
535 tx_id,
536 "INSERT INTO accounts (id, balance) VALUES ($1, $2)".to_string(),
537 vec![JournalValue::Int64(1), JournalValue::Float64(100.0)],
538 Some(12345),
539 Some(1),
540 5,
541 )
542 .await
543 .unwrap();
544 journal
545 .log_statement(
546 tx_id,
547 "UPDATE accounts SET balance = balance - $1 WHERE id = $2".to_string(),
548 vec![JournalValue::Float64(25.0), JournalValue::Int64(1)],
549 Some(67890),
550 Some(1),
551 3,
552 )
553 .await
554 .unwrap();
555
556 let j = journal.get_journal(&tx_id).await.unwrap();
557 assert_eq!(j.entries.len(), 3);
558 assert!(j.has_mutations);
559
560 assert_eq!(j.entries[0].statement_type, StatementType::Transaction);
562 assert_eq!(j.entries[1].statement_type, StatementType::Insert);
563 assert_eq!(j.entries[2].statement_type, StatementType::Update);
564
565 journal.commit_transaction(tx_id).await.unwrap();
567 assert!(journal.get_journal(&tx_id).await.is_none());
568 }
569
570 #[cfg(feature = "ha-tr")]
572 #[tokio::test]
573 async fn test_pg_session_migration() {
574 use session_migrate::*;
575
576 let migrate = SessionMigrate::new();
577 let session_id = uuid::Uuid::new_v4();
578 let node = NodeId::new();
579
580 let mut state =
581 SessionState::new(session_id, "postgres".to_string(), "mydb".to_string(), node);
582
583 state.set_parameter("timezone".to_string(), "America/New_York".to_string());
585 state.set_parameter("search_path".to_string(), "public, app_schema".to_string());
586 state.set_parameter("statement_timeout".to_string(), "30000".to_string());
587 state.set_parameter("work_mem".to_string(), "256MB".to_string());
588
589 state.add_prepared_statement(PreparedStatementInfo {
591 name: "get_user".to_string(),
592 query: "SELECT * FROM users WHERE id = $1".to_string(),
593 param_types: vec!["integer".to_string()],
594 created_at: chrono::Utc::now(),
595 });
596
597 migrate.register_session(state).await.unwrap();
598
599 let session = migrate.get_session(&session_id).await.unwrap();
601 let restore_stmts = session.generate_restore_statements();
602
603 assert!(restore_stmts.iter().any(|s| s.contains("America/New_York")));
604 assert!(restore_stmts.iter().any(|s| s.contains("search_path")));
605 assert!(restore_stmts
606 .iter()
607 .any(|s| s.contains("statement_timeout")));
608 assert!(restore_stmts.iter().any(|s| s.contains("PREPARE get_user")));
609 }
610
611 #[tokio::test]
613 async fn test_pg_pipelining() {
614 use pipeline::*;
615
616 let pipeline = RequestPipeline::new(PipelineConfig {
617 max_depth: 16,
618 enabled: true,
619 ..Default::default()
620 });
621
622 let conn_id = 1;
623
624 let t1 = pipeline
626 .submit(conn_id, b"Parse: SELECT $1::int".to_vec())
627 .unwrap();
628 let t2 = pipeline.submit(conn_id, b"Bind: [42]".to_vec()).unwrap();
629 let t3 = pipeline.submit(conn_id, b"Execute".to_vec()).unwrap();
630
631 assert_eq!(pipeline.depth(conn_id), 3);
632
633 pipeline.complete_next(conn_id, b"ParseComplete".to_vec(), true, None);
635 pipeline.complete_next(conn_id, b"BindComplete".to_vec(), true, None);
636 pipeline.complete_next(conn_id, b"DataRow: 42".to_vec(), true, None);
637
638 assert_eq!(pipeline.depth(conn_id), 0);
639
640 let r1 = t1.wait().await.unwrap();
641 assert!(r1.success);
642 }
643
644 #[tokio::test]
646 async fn test_pg_batch_insert() {
647 use batch::*;
648
649 let config = BatchConfig {
650 max_batch_size: 3,
651 ..Default::default()
652 };
653 let batcher = std::sync::Arc::new(InsertBatcher::new(config));
654
655 batcher
656 .add(
657 "orders".to_string(),
658 vec!["id".to_string(), "total".to_string()],
659 vec![vec!["1".to_string(), "99.99".to_string()]],
660 "INSERT INTO orders (id, total) VALUES (1, 99.99)".to_string(),
661 )
662 .unwrap();
663
664 assert_eq!(batcher.batch_size("orders"), 1);
665
666 let stats = batcher.stats();
667 assert_eq!(stats.inserts_received, 1);
668 assert_eq!(stats.rows_received, 1);
669 }
670}