1pub mod backend;
62pub mod config;
63pub mod server;
64pub mod protocol;
65pub mod admin;
66pub mod connection_pool;
67pub mod load_balancer;
68pub mod health_checker;
69pub mod failover_controller;
70pub mod switchover_buffer;
71pub mod primary_tracker;
72pub mod pipeline;
73pub mod batch;
74pub mod request;
75
76#[cfg(feature = "pool-modes")]
78pub mod pool;
79
80#[cfg(feature = "ha-tr")]
82pub mod transaction_journal;
83#[cfg(feature = "ha-tr")]
84pub mod failover_replay;
85#[cfg(feature = "ha-tr")]
86pub mod cursor_restore;
87#[cfg(feature = "ha-tr")]
88pub mod session_migrate;
89#[cfg(feature = "ha-tr")]
90pub mod replay;
91
92#[cfg(feature = "ha-tr")]
94pub mod upgrade_orchestrator;
95
96#[cfg(feature = "ha-tr")]
98pub mod shadow_execute;
99
100#[cfg(feature = "query-cache")]
102pub mod cache;
103
104#[cfg(feature = "routing-hints")]
106pub mod routing;
107
108#[cfg(feature = "lag-routing")]
110pub mod lag;
111
112#[cfg(feature = "rate-limiting")]
114pub mod rate_limit;
115
116#[cfg(feature = "circuit-breaker")]
118pub mod circuit_breaker;
119
120#[cfg(feature = "query-analytics")]
122pub mod analytics;
123
124#[cfg(feature = "anomaly-detection")]
127pub mod anomaly;
128
129#[cfg(feature = "edge-proxy")]
131pub mod edge;
132
133#[cfg(feature = "multi-tenancy")]
135pub mod multi_tenancy;
136
137#[cfg(feature = "auth-proxy")]
139pub mod auth;
140
141#[cfg(feature = "query-rewriting")]
143pub mod rewriter;
144
145#[cfg(feature = "wasm-plugins")]
147pub mod plugins;
148
149#[cfg(feature = "graphql-gateway")]
151pub mod graphql;
152
153#[cfg(feature = "schema-routing")]
155pub mod schema_routing;
156
157#[cfg(feature = "distribcache")]
159pub mod distribcache;
160
161use thiserror::Error;
162use uuid::Uuid;
163
164#[derive(Debug, Error)]
166pub enum ProxyError {
167 #[error("Configuration error: {0}")]
168 Config(String),
169
170 #[error("Network error: {0}")]
171 Network(String),
172
173 #[error("Connection error: {0}")]
174 Connection(String),
175
176 #[error("Protocol error: {0}")]
177 Protocol(String),
178
179 #[error("Pool error: {0}")]
180 Pool(String),
181
182 #[error("Health check error: {0}")]
183 HealthCheck(String),
184
185 #[error("Failover error: {0}")]
186 Failover(String),
187
188 #[error("Failover failed: {0}")]
189 FailoverFailed(String),
190
191 #[error("Transaction replay failed: {0}")]
192 ReplayFailed(String),
193
194 #[error("Session migration failed: {0}")]
195 SessionMigration(String),
196
197 #[error("Cursor restore failed: {0}")]
198 CursorRestore(String),
199
200 #[error("Routing error: {0}")]
201 Routing(String),
202
203 #[error("Authentication error: {0}")]
204 Auth(String),
205
206 #[error("Pool exhausted: {0}")]
207 PoolExhausted(String),
208
209 #[error("Timeout: {0}")]
210 Timeout(String),
211
212 #[error("Configuration error: {0}")]
213 Configuration(String),
214
215 #[error("No healthy nodes available")]
216 NoHealthyNodes,
217
218 #[error("IO error: {0}")]
219 Io(#[from] std::io::Error),
220
221 #[error("JSON error: {0}")]
222 Json(#[from] serde_json::Error),
223
224 #[error("Internal error: {0}")]
225 Internal(String),
226}
227
228pub type Result<T> = std::result::Result<T, ProxyError>;
229
230pub const VERSION: &str = env!("CARGO_PKG_VERSION");
232
233pub const DEFAULT_PORT: u16 = 5432;
235
236pub const DEFAULT_ADMIN_PORT: u16 = 9090;
238
239#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
241pub struct NodeId(pub Uuid);
242
243impl NodeId {
244 pub fn new() -> Self {
245 Self(Uuid::new_v4())
246 }
247}
248
249impl Default for NodeId {
250 fn default() -> Self {
251 Self::new()
252 }
253}
254
255#[derive(Debug, Clone, Copy, PartialEq, Eq)]
257pub enum NodeRole {
258 Primary,
260 Standby,
262 ReadReplica,
264 Unknown,
266}
267
268#[derive(Debug, Clone)]
270pub struct NodeEndpoint {
271 pub id: NodeId,
273 pub host: String,
275 pub port: u16,
277 pub role: NodeRole,
279 pub weight: u32,
281 pub enabled: bool,
283}
284
285impl NodeEndpoint {
286 pub fn new(host: impl Into<String>, port: u16) -> Self {
287 Self {
288 id: NodeId::new(),
289 host: host.into(),
290 port,
291 role: NodeRole::Unknown,
292 weight: 100,
293 enabled: true,
294 }
295 }
296
297 pub fn with_role(mut self, role: NodeRole) -> Self {
298 self.role = role;
299 self
300 }
301
302 pub fn with_weight(mut self, weight: u32) -> Self {
303 self.weight = weight;
304 self
305 }
306
307 pub fn address(&self) -> String {
308 format!("{}:{}", self.host, self.port)
309 }
310}
311
312#[cfg(test)]
318mod postgresql_compat_tests {
319 use super::*;
320
321 #[test]
323 fn test_pg_node_endpoints() {
324 let primary = NodeEndpoint::new("pg-primary.example.com", 5432)
325 .with_role(NodeRole::Primary);
326 let standby = NodeEndpoint::new("pg-standby-1.example.com", 5432)
327 .with_role(NodeRole::Standby);
328 let replica = NodeEndpoint::new("pg-replica-1.example.com", 5433)
329 .with_role(NodeRole::ReadReplica)
330 .with_weight(50);
331
332 assert_eq!(primary.role, NodeRole::Primary);
333 assert_eq!(standby.role, NodeRole::Standby);
334 assert_eq!(replica.weight, 50);
335 assert_eq!(replica.address(), "pg-replica-1.example.com:5433");
336 }
337
338 #[test]
340 fn test_pg_load_balancer_config() {
341 use load_balancer::*;
342
343 let config = LoadBalancerConfig {
344 read_write_split: true,
345 read_strategy: RoutingStrategy::RoundRobin,
346 write_strategy: RoutingStrategy::PrimaryOnly,
347 ..Default::default()
348 };
349
350 assert!(config.read_write_split);
351 assert_eq!(config.read_strategy, RoutingStrategy::RoundRobin);
352 assert_eq!(config.write_strategy, RoutingStrategy::PrimaryOnly);
353
354 let _lb = LoadBalancer::new(config);
356 }
357
358 #[test]
360 fn test_pg_health_config() {
361 use health_checker::*;
362
363 let config = HealthConfig {
364 check_query: "SELECT 1".to_string(),
365 detailed_checks: true,
366 ..Default::default()
367 };
368
369 assert_eq!(config.check_query, "SELECT 1");
370 assert!(config.detailed_checks);
371 }
372
373 #[tokio::test]
375 async fn test_pg_failover() {
376 use failover_controller::*;
377
378 let controller = FailoverController::new(FailoverConfig {
379 auto_failover: true,
380 prefer_sync_standby: true,
381 ..Default::default()
382 });
383
384 let primary = NodeId::new();
385 controller.set_primary(primary).await;
386 assert_eq!(controller.get_primary().await, Some(primary));
387
388 let sync_standby = NodeId::new();
390 controller.register_candidate(FailoverCandidate {
391 node_id: sync_standby,
392 endpoint: NodeEndpoint::new("pg-sync", 5432).with_role(NodeRole::Standby),
393 is_sync: true,
394 lag_bytes: 0,
395 priority: 1,
396 last_heartbeat: None,
397 }).await;
398
399 let async_standby = NodeId::new();
400 controller.register_candidate(FailoverCandidate {
401 node_id: async_standby,
402 endpoint: NodeEndpoint::new("pg-async", 5432).with_role(NodeRole::Standby),
403 is_sync: false,
404 lag_bytes: 1024,
405 priority: 2,
406 last_heartbeat: None,
407 }).await;
408
409 assert_eq!(controller.state().await, FailoverState::Normal);
411 assert_eq!(controller.failover_count(), 0);
412 }
413
414 #[test]
416 fn test_pg_connection_pool() {
417 use connection_pool::*;
418
419 let config = PoolConfig {
420 min_connections: 2,
421 max_connections: 20,
422 test_on_acquire: true,
423 ..Default::default()
424 };
425
426 assert_eq!(config.min_connections, 2);
427 assert_eq!(config.max_connections, 20);
428 assert!(config.test_on_acquire);
429
430 let _pool = ConnectionPool::new(config);
431 }
432
433 #[tokio::test]
435 async fn test_pg_switchover_buffer() {
436 use switchover_buffer::*;
437
438 let buffer = SwitchoverBuffer::new(BufferConfig {
439 buffer_timeout: std::time::Duration::from_secs(5),
440 max_buffered_queries: 1000,
441 ..Default::default()
442 });
443
444 assert_eq!(buffer.state(), BufferState::Passthrough);
445 assert!(!buffer.is_buffering());
446
447 buffer.start_buffering();
449 assert!(buffer.is_buffering());
450
451 let rx = buffer.buffer_query(
452 "INSERT INTO orders VALUES (1)".to_string(),
453 vec![],
454 1,
455 ).unwrap();
456
457 buffer.stop_buffering();
459 buffer.drain(|_sql, _params| async { Ok(()) }).await;
460
461 let result = rx.await.unwrap();
462 assert!(matches!(result, BufferResult::Success));
463 }
464
465 #[test]
467 fn test_pg_primary_tracker_standalone() {
468 use primary_tracker::*;
469
470 let tracker = PrimaryTracker::new_standalone();
471
472 let pg_primary = uuid::Uuid::new_v4();
474 tracker.set_primary(pg_primary, "pg-primary.local:5432".to_string());
475 tracker.confirm_primary();
476
477 assert!(tracker.has_primary());
478 assert!(tracker.get_primary().unwrap().is_confirmed);
479
480 tracker.clear_primary();
482 assert!(!tracker.has_primary());
483
484 let pg_new_primary = uuid::Uuid::new_v4();
486 tracker.set_primary(pg_new_primary, "pg-standby.local:5432".to_string());
487 tracker.confirm_primary();
488 assert_eq!(
489 tracker.get_primary_address(),
490 Some("pg-standby.local:5432".to_string())
491 );
492 }
493
494 #[cfg(feature = "ha-tr")]
496 #[tokio::test]
497 async fn test_pg_transaction_replay() {
498 use transaction_journal::*;
499
500 let journal = TransactionJournal::new();
501 let tx_id = uuid::Uuid::new_v4();
502 let session_id = uuid::Uuid::new_v4();
503 let node = NodeId::new();
504
505 journal.begin_transaction(tx_id, session_id, node, 0).await.unwrap();
507 journal.log_statement(
508 tx_id,
509 "BEGIN".to_string(),
510 vec![],
511 None,
512 None,
513 1,
514 ).await.unwrap();
515 journal.log_statement(
516 tx_id,
517 "INSERT INTO accounts (id, balance) VALUES ($1, $2)".to_string(),
518 vec![JournalValue::Int64(1), JournalValue::Float64(100.0)],
519 Some(12345),
520 Some(1),
521 5,
522 ).await.unwrap();
523 journal.log_statement(
524 tx_id,
525 "UPDATE accounts SET balance = balance - $1 WHERE id = $2".to_string(),
526 vec![JournalValue::Float64(25.0), JournalValue::Int64(1)],
527 Some(67890),
528 Some(1),
529 3,
530 ).await.unwrap();
531
532 let j = journal.get_journal(&tx_id).await.unwrap();
533 assert_eq!(j.entries.len(), 3);
534 assert!(j.has_mutations);
535
536 assert_eq!(j.entries[0].statement_type, StatementType::Transaction);
538 assert_eq!(j.entries[1].statement_type, StatementType::Insert);
539 assert_eq!(j.entries[2].statement_type, StatementType::Update);
540
541 journal.commit_transaction(tx_id).await.unwrap();
543 assert!(journal.get_journal(&tx_id).await.is_none());
544 }
545
546 #[cfg(feature = "ha-tr")]
548 #[tokio::test]
549 async fn test_pg_session_migration() {
550 use session_migrate::*;
551
552 let migrate = SessionMigrate::new();
553 let session_id = uuid::Uuid::new_v4();
554 let node = NodeId::new();
555
556 let mut state = SessionState::new(
557 session_id,
558 "postgres".to_string(),
559 "mydb".to_string(),
560 node,
561 );
562
563 state.set_parameter("timezone".to_string(), "America/New_York".to_string());
565 state.set_parameter("search_path".to_string(), "public, app_schema".to_string());
566 state.set_parameter("statement_timeout".to_string(), "30000".to_string());
567 state.set_parameter("work_mem".to_string(), "256MB".to_string());
568
569 state.add_prepared_statement(PreparedStatementInfo {
571 name: "get_user".to_string(),
572 query: "SELECT * FROM users WHERE id = $1".to_string(),
573 param_types: vec!["integer".to_string()],
574 created_at: chrono::Utc::now(),
575 });
576
577 migrate.register_session(state).await.unwrap();
578
579 let session = migrate.get_session(&session_id).await.unwrap();
581 let restore_stmts = session.generate_restore_statements();
582
583 assert!(restore_stmts.iter().any(|s| s.contains("America/New_York")));
584 assert!(restore_stmts.iter().any(|s| s.contains("search_path")));
585 assert!(restore_stmts.iter().any(|s| s.contains("statement_timeout")));
586 assert!(restore_stmts.iter().any(|s| s.contains("PREPARE get_user")));
587 }
588
589 #[tokio::test]
591 async fn test_pg_pipelining() {
592 use pipeline::*;
593
594 let pipeline = RequestPipeline::new(PipelineConfig {
595 max_depth: 16,
596 enabled: true,
597 ..Default::default()
598 });
599
600 let conn_id = 1;
601
602 let t1 = pipeline.submit(conn_id, b"Parse: SELECT $1::int".to_vec()).unwrap();
604 let t2 = pipeline.submit(conn_id, b"Bind: [42]".to_vec()).unwrap();
605 let t3 = pipeline.submit(conn_id, b"Execute".to_vec()).unwrap();
606
607 assert_eq!(pipeline.depth(conn_id), 3);
608
609 pipeline.complete_next(conn_id, b"ParseComplete".to_vec(), true, None);
611 pipeline.complete_next(conn_id, b"BindComplete".to_vec(), true, None);
612 pipeline.complete_next(conn_id, b"DataRow: 42".to_vec(), true, None);
613
614 assert_eq!(pipeline.depth(conn_id), 0);
615
616 let r1 = t1.wait().await.unwrap();
617 assert!(r1.success);
618 }
619
620 #[tokio::test]
622 async fn test_pg_batch_insert() {
623 use batch::*;
624
625 let config = BatchConfig {
626 max_batch_size: 3,
627 ..Default::default()
628 };
629 let batcher = InsertBatcher::new(config);
630
631 batcher.add(
632 "orders".to_string(),
633 vec!["id".to_string(), "total".to_string()],
634 vec![vec!["1".to_string(), "99.99".to_string()]],
635 "INSERT INTO orders (id, total) VALUES (1, 99.99)".to_string(),
636 ).unwrap();
637
638 assert_eq!(batcher.batch_size("orders"), 1);
639
640 let stats = batcher.stats();
641 assert_eq!(stats.inserts_received, 1);
642 assert_eq!(stats.rows_received, 1);
643 }
644}