1pub mod backend;
62pub mod config;
63pub mod server;
64pub mod protocol;
65pub mod admin;
66pub mod connection_pool;
67pub mod load_balancer;
68pub mod health_checker;
69pub mod failover_controller;
70pub mod switchover_buffer;
71pub mod primary_tracker;
72pub mod pipeline;
73pub mod batch;
74pub mod request;
75
76#[cfg(feature = "pool-modes")]
78pub mod pool;
79
80#[cfg(feature = "ha-tr")]
82pub mod transaction_journal;
83#[cfg(feature = "ha-tr")]
84pub mod failover_replay;
85#[cfg(feature = "ha-tr")]
86pub mod cursor_restore;
87#[cfg(feature = "ha-tr")]
88pub mod session_migrate;
89#[cfg(feature = "ha-tr")]
90pub mod replay;
91
92#[cfg(feature = "ha-tr")]
94pub mod upgrade_orchestrator;
95
96#[cfg(feature = "ha-tr")]
98pub mod shadow_execute;
99
100#[cfg(feature = "query-cache")]
102pub mod cache;
103
104#[cfg(feature = "routing-hints")]
106pub mod routing;
107
108#[cfg(feature = "lag-routing")]
110pub mod lag;
111
112#[cfg(feature = "rate-limiting")]
114pub mod rate_limit;
115
116#[cfg(feature = "circuit-breaker")]
118pub mod circuit_breaker;
119
120#[cfg(feature = "query-analytics")]
122pub mod analytics;
123
124#[cfg(feature = "anomaly-detection")]
127pub mod anomaly;
128
129#[cfg(feature = "edge-proxy")]
131pub mod edge;
132
133#[cfg(feature = "multi-tenancy")]
135pub mod multi_tenancy;
136
137#[cfg(feature = "auth-proxy")]
139pub mod auth;
140
141#[cfg(feature = "query-rewriting")]
143pub mod rewriter;
144
145#[cfg(feature = "wasm-plugins")]
147pub mod plugins;
148
149#[cfg(feature = "graphql-gateway")]
151pub mod graphql;
152
153#[cfg(feature = "schema-routing")]
155pub mod schema_routing;
156
157#[cfg(feature = "distribcache")]
159pub mod distribcache;
160
161pub mod skills;
167
168use thiserror::Error;
169use uuid::Uuid;
170
171#[derive(Debug, Error)]
173pub enum ProxyError {
174 #[error("Configuration error: {0}")]
175 Config(String),
176
177 #[error("Network error: {0}")]
178 Network(String),
179
180 #[error("Connection error: {0}")]
181 Connection(String),
182
183 #[error("Protocol error: {0}")]
184 Protocol(String),
185
186 #[error("Pool error: {0}")]
187 Pool(String),
188
189 #[error("Health check error: {0}")]
190 HealthCheck(String),
191
192 #[error("Failover error: {0}")]
193 Failover(String),
194
195 #[error("Failover failed: {0}")]
196 FailoverFailed(String),
197
198 #[error("Transaction replay failed: {0}")]
199 ReplayFailed(String),
200
201 #[error("Session migration failed: {0}")]
202 SessionMigration(String),
203
204 #[error("Cursor restore failed: {0}")]
205 CursorRestore(String),
206
207 #[error("Routing error: {0}")]
208 Routing(String),
209
210 #[error("Authentication error: {0}")]
211 Auth(String),
212
213 #[error("Pool exhausted: {0}")]
214 PoolExhausted(String),
215
216 #[error("Timeout: {0}")]
217 Timeout(String),
218
219 #[error("Configuration error: {0}")]
220 Configuration(String),
221
222 #[error("No healthy nodes available")]
223 NoHealthyNodes,
224
225 #[error("IO error: {0}")]
226 Io(#[from] std::io::Error),
227
228 #[error("JSON error: {0}")]
229 Json(#[from] serde_json::Error),
230
231 #[error("Internal error: {0}")]
232 Internal(String),
233}
234
235pub type Result<T> = std::result::Result<T, ProxyError>;
236
237pub const VERSION: &str = env!("CARGO_PKG_VERSION");
239
240pub const DEFAULT_PORT: u16 = 5432;
242
243pub const DEFAULT_ADMIN_PORT: u16 = 9090;
245
246#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
248pub struct NodeId(pub Uuid);
249
250impl NodeId {
251 pub fn new() -> Self {
252 Self(Uuid::new_v4())
253 }
254}
255
256impl Default for NodeId {
257 fn default() -> Self {
258 Self::new()
259 }
260}
261
262#[derive(Debug, Clone, Copy, PartialEq, Eq)]
264pub enum NodeRole {
265 Primary,
267 Standby,
269 ReadReplica,
271 Unknown,
273}
274
275#[derive(Debug, Clone)]
277pub struct NodeEndpoint {
278 pub id: NodeId,
280 pub host: String,
282 pub port: u16,
284 pub role: NodeRole,
286 pub weight: u32,
288 pub enabled: bool,
290}
291
292impl NodeEndpoint {
293 pub fn new(host: impl Into<String>, port: u16) -> Self {
294 Self {
295 id: NodeId::new(),
296 host: host.into(),
297 port,
298 role: NodeRole::Unknown,
299 weight: 100,
300 enabled: true,
301 }
302 }
303
304 pub fn with_role(mut self, role: NodeRole) -> Self {
305 self.role = role;
306 self
307 }
308
309 pub fn with_weight(mut self, weight: u32) -> Self {
310 self.weight = weight;
311 self
312 }
313
314 pub fn address(&self) -> String {
315 format!("{}:{}", self.host, self.port)
316 }
317}
318
319#[cfg(test)]
325mod postgresql_compat_tests {
326 use super::*;
327
328 #[test]
330 fn test_pg_node_endpoints() {
331 let primary = NodeEndpoint::new("pg-primary.example.com", 5432)
332 .with_role(NodeRole::Primary);
333 let standby = NodeEndpoint::new("pg-standby-1.example.com", 5432)
334 .with_role(NodeRole::Standby);
335 let replica = NodeEndpoint::new("pg-replica-1.example.com", 5433)
336 .with_role(NodeRole::ReadReplica)
337 .with_weight(50);
338
339 assert_eq!(primary.role, NodeRole::Primary);
340 assert_eq!(standby.role, NodeRole::Standby);
341 assert_eq!(replica.weight, 50);
342 assert_eq!(replica.address(), "pg-replica-1.example.com:5433");
343 }
344
345 #[test]
347 fn test_pg_load_balancer_config() {
348 use load_balancer::*;
349
350 let config = LoadBalancerConfig {
351 read_write_split: true,
352 read_strategy: RoutingStrategy::RoundRobin,
353 write_strategy: RoutingStrategy::PrimaryOnly,
354 ..Default::default()
355 };
356
357 assert!(config.read_write_split);
358 assert_eq!(config.read_strategy, RoutingStrategy::RoundRobin);
359 assert_eq!(config.write_strategy, RoutingStrategy::PrimaryOnly);
360
361 let _lb = LoadBalancer::new(config);
363 }
364
365 #[test]
367 fn test_pg_health_config() {
368 use health_checker::*;
369
370 let config = HealthConfig {
371 check_query: "SELECT 1".to_string(),
372 detailed_checks: true,
373 ..Default::default()
374 };
375
376 assert_eq!(config.check_query, "SELECT 1");
377 assert!(config.detailed_checks);
378 }
379
380 #[tokio::test]
382 async fn test_pg_failover() {
383 use failover_controller::*;
384
385 let controller = FailoverController::new(FailoverConfig {
386 auto_failover: true,
387 prefer_sync_standby: true,
388 ..Default::default()
389 });
390
391 let primary = NodeId::new();
392 controller.set_primary(primary).await;
393 assert_eq!(controller.get_primary().await, Some(primary));
394
395 let sync_standby = NodeId::new();
397 controller.register_candidate(FailoverCandidate {
398 node_id: sync_standby,
399 endpoint: NodeEndpoint::new("pg-sync", 5432).with_role(NodeRole::Standby),
400 is_sync: true,
401 lag_bytes: 0,
402 priority: 1,
403 last_heartbeat: None,
404 }).await;
405
406 let async_standby = NodeId::new();
407 controller.register_candidate(FailoverCandidate {
408 node_id: async_standby,
409 endpoint: NodeEndpoint::new("pg-async", 5432).with_role(NodeRole::Standby),
410 is_sync: false,
411 lag_bytes: 1024,
412 priority: 2,
413 last_heartbeat: None,
414 }).await;
415
416 assert_eq!(controller.state().await, FailoverState::Normal);
418 assert_eq!(controller.failover_count(), 0);
419 }
420
421 #[test]
423 fn test_pg_connection_pool() {
424 use connection_pool::*;
425
426 let config = PoolConfig {
427 min_connections: 2,
428 max_connections: 20,
429 test_on_acquire: true,
430 ..Default::default()
431 };
432
433 assert_eq!(config.min_connections, 2);
434 assert_eq!(config.max_connections, 20);
435 assert!(config.test_on_acquire);
436
437 let _pool = ConnectionPool::new(config);
438 }
439
440 #[tokio::test]
442 async fn test_pg_switchover_buffer() {
443 use switchover_buffer::*;
444
445 let buffer = SwitchoverBuffer::new(BufferConfig {
446 buffer_timeout: std::time::Duration::from_secs(5),
447 max_buffered_queries: 1000,
448 ..Default::default()
449 });
450
451 assert_eq!(buffer.state(), BufferState::Passthrough);
452 assert!(!buffer.is_buffering());
453
454 buffer.start_buffering();
456 assert!(buffer.is_buffering());
457
458 let rx = buffer.buffer_query(
459 "INSERT INTO orders VALUES (1)".to_string(),
460 vec![],
461 1,
462 ).unwrap();
463
464 buffer.stop_buffering();
466 buffer.drain(|_sql, _params| async { Ok(()) }).await;
467
468 let result = rx.await.unwrap();
469 assert!(matches!(result, BufferResult::Success));
470 }
471
472 #[test]
474 fn test_pg_primary_tracker_standalone() {
475 use primary_tracker::*;
476
477 let tracker = PrimaryTracker::new_standalone();
478
479 let pg_primary = uuid::Uuid::new_v4();
481 tracker.set_primary(pg_primary, "pg-primary.local:5432".to_string());
482 tracker.confirm_primary();
483
484 assert!(tracker.has_primary());
485 assert!(tracker.get_primary().unwrap().is_confirmed);
486
487 tracker.clear_primary();
489 assert!(!tracker.has_primary());
490
491 let pg_new_primary = uuid::Uuid::new_v4();
493 tracker.set_primary(pg_new_primary, "pg-standby.local:5432".to_string());
494 tracker.confirm_primary();
495 assert_eq!(
496 tracker.get_primary_address(),
497 Some("pg-standby.local:5432".to_string())
498 );
499 }
500
501 #[cfg(feature = "ha-tr")]
503 #[tokio::test]
504 async fn test_pg_transaction_replay() {
505 use transaction_journal::*;
506
507 let journal = TransactionJournal::new();
508 let tx_id = uuid::Uuid::new_v4();
509 let session_id = uuid::Uuid::new_v4();
510 let node = NodeId::new();
511
512 journal.begin_transaction(tx_id, session_id, node, 0).await.unwrap();
514 journal.log_statement(
515 tx_id,
516 "BEGIN".to_string(),
517 vec![],
518 None,
519 None,
520 1,
521 ).await.unwrap();
522 journal.log_statement(
523 tx_id,
524 "INSERT INTO accounts (id, balance) VALUES ($1, $2)".to_string(),
525 vec![JournalValue::Int64(1), JournalValue::Float64(100.0)],
526 Some(12345),
527 Some(1),
528 5,
529 ).await.unwrap();
530 journal.log_statement(
531 tx_id,
532 "UPDATE accounts SET balance = balance - $1 WHERE id = $2".to_string(),
533 vec![JournalValue::Float64(25.0), JournalValue::Int64(1)],
534 Some(67890),
535 Some(1),
536 3,
537 ).await.unwrap();
538
539 let j = journal.get_journal(&tx_id).await.unwrap();
540 assert_eq!(j.entries.len(), 3);
541 assert!(j.has_mutations);
542
543 assert_eq!(j.entries[0].statement_type, StatementType::Transaction);
545 assert_eq!(j.entries[1].statement_type, StatementType::Insert);
546 assert_eq!(j.entries[2].statement_type, StatementType::Update);
547
548 journal.commit_transaction(tx_id).await.unwrap();
550 assert!(journal.get_journal(&tx_id).await.is_none());
551 }
552
553 #[cfg(feature = "ha-tr")]
555 #[tokio::test]
556 async fn test_pg_session_migration() {
557 use session_migrate::*;
558
559 let migrate = SessionMigrate::new();
560 let session_id = uuid::Uuid::new_v4();
561 let node = NodeId::new();
562
563 let mut state = SessionState::new(
564 session_id,
565 "postgres".to_string(),
566 "mydb".to_string(),
567 node,
568 );
569
570 state.set_parameter("timezone".to_string(), "America/New_York".to_string());
572 state.set_parameter("search_path".to_string(), "public, app_schema".to_string());
573 state.set_parameter("statement_timeout".to_string(), "30000".to_string());
574 state.set_parameter("work_mem".to_string(), "256MB".to_string());
575
576 state.add_prepared_statement(PreparedStatementInfo {
578 name: "get_user".to_string(),
579 query: "SELECT * FROM users WHERE id = $1".to_string(),
580 param_types: vec!["integer".to_string()],
581 created_at: chrono::Utc::now(),
582 });
583
584 migrate.register_session(state).await.unwrap();
585
586 let session = migrate.get_session(&session_id).await.unwrap();
588 let restore_stmts = session.generate_restore_statements();
589
590 assert!(restore_stmts.iter().any(|s| s.contains("America/New_York")));
591 assert!(restore_stmts.iter().any(|s| s.contains("search_path")));
592 assert!(restore_stmts.iter().any(|s| s.contains("statement_timeout")));
593 assert!(restore_stmts.iter().any(|s| s.contains("PREPARE get_user")));
594 }
595
596 #[tokio::test]
598 async fn test_pg_pipelining() {
599 use pipeline::*;
600
601 let pipeline = RequestPipeline::new(PipelineConfig {
602 max_depth: 16,
603 enabled: true,
604 ..Default::default()
605 });
606
607 let conn_id = 1;
608
609 let t1 = pipeline.submit(conn_id, b"Parse: SELECT $1::int".to_vec()).unwrap();
611 let t2 = pipeline.submit(conn_id, b"Bind: [42]".to_vec()).unwrap();
612 let t3 = pipeline.submit(conn_id, b"Execute".to_vec()).unwrap();
613
614 assert_eq!(pipeline.depth(conn_id), 3);
615
616 pipeline.complete_next(conn_id, b"ParseComplete".to_vec(), true, None);
618 pipeline.complete_next(conn_id, b"BindComplete".to_vec(), true, None);
619 pipeline.complete_next(conn_id, b"DataRow: 42".to_vec(), true, None);
620
621 assert_eq!(pipeline.depth(conn_id), 0);
622
623 let r1 = t1.wait().await.unwrap();
624 assert!(r1.success);
625 }
626
627 #[tokio::test]
629 async fn test_pg_batch_insert() {
630 use batch::*;
631
632 let config = BatchConfig {
633 max_batch_size: 3,
634 ..Default::default()
635 };
636 let batcher = InsertBatcher::new(config);
637
638 batcher.add(
639 "orders".to_string(),
640 vec!["id".to_string(), "total".to_string()],
641 vec![vec!["1".to_string(), "99.99".to_string()]],
642 "INSERT INTO orders (id, total) VALUES (1, 99.99)".to_string(),
643 ).unwrap();
644
645 assert_eq!(batcher.batch_size("orders"), 1);
646
647 let stats = batcher.stats();
648 assert_eq!(stats.inserts_received, 1);
649 assert_eq!(stats.rows_received, 1);
650 }
651}