1use std::sync::Arc;
19use std::time::{Duration, Instant};
20use parking_lot::RwLock;
21use tokio::sync::broadcast;
22use uuid::Uuid;
23
24#[derive(Debug, Clone)]
28pub struct TopologyNodeInfo {
29 pub node_id: Uuid,
31 pub client_addr: String,
33 pub is_healthy: bool,
35}
36
37#[derive(Debug, Clone)]
39pub enum TopologyEvent {
40 PrimaryChanged {
42 old_primary: Option<Uuid>,
43 new_primary: Uuid,
44 },
45 NodeLeft { node_id: Uuid },
47 HealthChanged { node_id: Uuid, is_healthy: bool },
49}
50
51pub trait TopologyProvider: Send + Sync + 'static {
56 fn subscribe(&self) -> broadcast::Receiver<TopologyEvent>;
58
59 fn get_primary(&self) -> Option<TopologyNodeInfo>;
61
62 fn get_node(&self, id: Uuid) -> Option<TopologyNodeInfo>;
64}
65
66#[cfg(feature = "postgres-topology")]
74pub struct PostgresTopologyProvider {
75 nodes: Vec<PostgresNode>,
77 event_tx: broadcast::Sender<TopologyEvent>,
79 current_primary: RwLock<Option<TopologyNodeInfo>>,
81 poll_interval: Duration,
83 tls_config: std::sync::Arc<rustls::ClientConfig>,
86 tls_mode: crate::backend::TlsMode,
88}
89
90#[cfg(feature = "postgres-topology")]
91#[derive(Debug, Clone)]
92pub struct PostgresNode {
93 pub node_id: Uuid,
94 pub host: String,
95 pub port: u16,
96 pub user: String,
97 pub password: Option<String>,
98 pub database: String,
99}
100
101#[cfg(feature = "postgres-topology")]
102impl PostgresTopologyProvider {
103 pub fn new(nodes: Vec<PostgresNode>) -> Self {
105 let (event_tx, _) = broadcast::channel(16);
106 Self {
107 nodes,
108 event_tx,
109 current_primary: RwLock::new(None),
110 poll_interval: Duration::from_secs(2),
111 tls_config: crate::backend::tls::default_client_config(),
112 tls_mode: crate::backend::TlsMode::Prefer,
113 }
114 }
115
116 pub fn with_poll_interval(mut self, interval: Duration) -> Self {
118 self.poll_interval = interval;
119 self
120 }
121
122 pub fn with_tls_mode(mut self, mode: crate::backend::TlsMode) -> Self {
124 self.tls_mode = mode;
125 self
126 }
127
128 pub async fn start(&self) {
130 let mut interval = tokio::time::interval(self.poll_interval);
131
132 loop {
133 interval.tick().await;
134 self.poll_nodes().await;
135 }
136 }
137
138 async fn poll_nodes(&self) {
140 let mut next_primary: Option<TopologyNodeInfo> = None;
141
142 for node in &self.nodes {
143 match self.probe_recovery(node).await {
144 Ok(in_recovery) => {
145 if !in_recovery && next_primary.is_none() {
151 next_primary = Some(TopologyNodeInfo {
152 node_id: node.node_id,
153 client_addr: format!("{}:{}", node.host, node.port),
154 is_healthy: true,
155 });
156 }
157 }
158 Err(e) => {
159 tracing::warn!(
160 node = %node.host,
161 port = node.port,
162 error = %e,
163 "topology probe failed"
164 );
165 let _ = self.event_tx.send(TopologyEvent::HealthChanged {
166 node_id: node.node_id,
167 is_healthy: false,
168 });
169 }
170 }
171 }
172
173 let old_primary_id = self.current_primary.read().as_ref().map(|p| p.node_id);
174 let new_primary_id = next_primary.as_ref().map(|p| p.node_id);
175 if old_primary_id != new_primary_id {
176 *self.current_primary.write() = next_primary;
177 if let Some(new_id) = new_primary_id {
178 let _ = self.event_tx.send(TopologyEvent::PrimaryChanged {
179 old_primary: old_primary_id,
180 new_primary: new_id,
181 });
182 }
183 }
184 }
185
186 async fn probe_recovery(
191 &self,
192 node: &PostgresNode,
193 ) -> crate::backend::BackendResult<bool> {
194 use crate::backend::{BackendClient, BackendConfig};
195
196 let cfg = BackendConfig {
197 host: node.host.clone(),
198 port: node.port,
199 user: node.user.clone(),
200 password: node.password.clone(),
201 database: Some(node.database.clone()),
202 application_name: Some("helios-topology".into()),
203 tls_mode: self.tls_mode,
204 connect_timeout: self.poll_interval.min(Duration::from_secs(5)),
205 query_timeout: self.poll_interval,
206 tls_config: self.tls_config.clone(),
207 };
208
209 let mut client = BackendClient::connect(&cfg).await?;
210 let value = client
211 .query_scalar("SELECT pg_is_in_recovery()")
212 .await?;
213 client.close().await;
214 Ok(value
215 .as_bool("pg_is_in_recovery")?
216 .unwrap_or(false))
217 }
218}
219
220#[cfg(feature = "postgres-topology")]
221impl TopologyProvider for PostgresTopologyProvider {
222 fn subscribe(&self) -> broadcast::Receiver<TopologyEvent> {
223 self.event_tx.subscribe()
224 }
225
226 fn get_primary(&self) -> Option<TopologyNodeInfo> {
227 self.current_primary.read().clone()
228 }
229
230 fn get_node(&self, id: Uuid) -> Option<TopologyNodeInfo> {
231 self.nodes.iter().find(|n| n.node_id == id).map(|n| TopologyNodeInfo {
232 node_id: n.node_id,
233 client_addr: format!("{}:{}", n.host, n.port),
234 is_healthy: true, })
236 }
237}
238
239#[cfg(feature = "heliosdb-topology")]
242pub mod heliosdb_provider {
243 use super::*;
252
253 pub struct HeliosTopologyProvider<T: HeliosTopologyBridge> {
258 inner: Arc<T>,
259 }
260
261 pub trait HeliosTopologyBridge: Send + Sync + 'static {
268 fn subscribe(&self) -> broadcast::Receiver<TopologyEvent>;
269 fn get_primary(&self) -> Option<TopologyNodeInfo>;
270 fn get_node(&self, id: Uuid) -> Option<TopologyNodeInfo>;
271 }
272
273 impl<T: HeliosTopologyBridge> HeliosTopologyProvider<T> {
274 pub fn new(inner: Arc<T>) -> Self {
275 Self { inner }
276 }
277 }
278
279 impl<T: HeliosTopologyBridge> TopologyProvider for HeliosTopologyProvider<T> {
280 fn subscribe(&self) -> broadcast::Receiver<TopologyEvent> {
281 self.inner.subscribe()
282 }
283
284 fn get_primary(&self) -> Option<TopologyNodeInfo> {
285 self.inner.get_primary()
286 }
287
288 fn get_node(&self, id: Uuid) -> Option<TopologyNodeInfo> {
289 self.inner.get_node(id)
290 }
291 }
292}
293
294#[derive(Debug, Clone)]
298pub struct PrimaryInfo {
299 pub node_id: Uuid,
301 pub address: String,
303 pub became_primary_at: Instant,
305 pub is_confirmed: bool,
307}
308
309#[derive(Debug, Clone)]
311pub enum PrimaryChangeEvent {
312 Changed {
314 old: Option<Uuid>,
315 new: Uuid,
316 address: String,
317 },
318 Lost { old: Uuid },
320 Confirmed { node_id: Uuid },
322}
323
324pub struct PrimaryTracker {
333 provider: Option<Arc<dyn TopologyProvider>>,
335 current_primary: RwLock<Option<PrimaryInfo>>,
337 event_tx: broadcast::Sender<PrimaryChangeEvent>,
339 tracking_interval: Duration,
341}
342
343impl PrimaryTracker {
344 pub fn new_standalone() -> Self {
346 let (event_tx, _) = broadcast::channel(16);
347 Self {
348 provider: None,
349 current_primary: RwLock::new(None),
350 event_tx,
351 tracking_interval: Duration::from_millis(500),
352 }
353 }
354
355 pub fn with_provider(provider: Arc<dyn TopologyProvider>) -> Self {
357 let (event_tx, _) = broadcast::channel(16);
358 Self {
359 provider: Some(provider),
360 current_primary: RwLock::new(None),
361 event_tx,
362 tracking_interval: Duration::from_millis(500),
363 }
364 }
365
366 pub fn with_tracking_interval(mut self, interval: Duration) -> Self {
368 self.tracking_interval = interval;
369 self
370 }
371
372 pub fn subscribe(&self) -> broadcast::Receiver<PrimaryChangeEvent> {
374 self.event_tx.subscribe()
375 }
376
377 pub fn get_primary(&self) -> Option<PrimaryInfo> {
379 self.current_primary.read().clone()
380 }
381
382 pub fn get_primary_id(&self) -> Option<Uuid> {
384 self.current_primary.read().as_ref().map(|p| p.node_id)
385 }
386
387 pub fn get_primary_address(&self) -> Option<String> {
389 self.current_primary.read().as_ref().map(|p| p.address.clone())
390 }
391
392 pub fn has_primary(&self) -> bool {
394 self.current_primary.read().is_some()
395 }
396
397 pub fn set_primary(&self, node_id: Uuid, address: String) {
399 let old_primary = self.current_primary.read().as_ref().map(|p| p.node_id);
400
401 let new_info = PrimaryInfo {
402 node_id,
403 address: address.clone(),
404 became_primary_at: Instant::now(),
405 is_confirmed: false,
406 };
407
408 *self.current_primary.write() = Some(new_info);
409
410 let _ = self.event_tx.send(PrimaryChangeEvent::Changed {
411 old: old_primary,
412 new: node_id,
413 address,
414 });
415
416 tracing::info!("Primary tracker: set primary to {} (pending confirmation)", node_id);
417 }
418
419 pub fn confirm_primary(&self) {
421 let mut guard = self.current_primary.write();
422 if let Some(ref mut info) = *guard {
423 info.is_confirmed = true;
424 let node_id = info.node_id;
425 drop(guard);
426
427 let _ = self.event_tx.send(PrimaryChangeEvent::Confirmed { node_id });
428 tracing::info!("Primary tracker: confirmed primary {}", node_id);
429 }
430 }
431
432 pub fn clear_primary(&self) {
434 let old_primary = self.current_primary.write().take();
435
436 if let Some(info) = old_primary {
437 let _ = self.event_tx.send(PrimaryChangeEvent::Lost { old: info.node_id });
438 tracing::warn!("Primary tracker: lost primary {}", info.node_id);
439 }
440 }
441
442 pub async fn run(&self) {
447 let provider = match &self.provider {
448 Some(p) => Arc::clone(p),
449 None => {
450 tracing::info!("Primary tracker: no topology provider, running in standalone mode");
451 return;
452 }
453 };
454
455 let mut topology_rx = provider.subscribe();
456 let mut interval = tokio::time::interval(self.tracking_interval);
457
458 self.detect_primary_from_provider(&*provider);
460
461 loop {
462 tokio::select! {
463 event = topology_rx.recv() => {
464 match event {
465 Ok(TopologyEvent::PrimaryChanged { old_primary, new_primary }) => {
466 self.handle_primary_changed(&*provider, old_primary, new_primary);
467 }
468 Ok(TopologyEvent::NodeLeft { node_id }) => {
469 self.handle_node_left(node_id);
470 }
471 Ok(TopologyEvent::HealthChanged { node_id, is_healthy }) => {
472 self.handle_health_changed(node_id, is_healthy);
473 }
474 Err(broadcast::error::RecvError::Lagged(n)) => {
475 tracing::warn!("Primary tracker lagged {} events", n);
476 }
477 Err(broadcast::error::RecvError::Closed) => {
478 break;
479 }
480 }
481 }
482 _ = interval.tick() => {
483 self.periodic_check(&*provider);
484 }
485 }
486 }
487 }
488
489 fn detect_primary_from_provider(&self, provider: &dyn TopologyProvider) {
492 if let Some(primary) = provider.get_primary() {
493 let info = PrimaryInfo {
494 node_id: primary.node_id,
495 address: primary.client_addr.clone(),
496 became_primary_at: Instant::now(),
497 is_confirmed: true,
498 };
499
500 *self.current_primary.write() = Some(info);
501 tracing::info!("Primary tracker: detected primary {}", primary.node_id);
502 }
503 }
504
505 fn handle_primary_changed(
506 &self,
507 provider: &dyn TopologyProvider,
508 old: Option<Uuid>,
509 new: Uuid,
510 ) {
511 let address = provider
512 .get_node(new)
513 .map(|n| n.client_addr)
514 .unwrap_or_else(|| format!("{}:5432", new));
515
516 let info = PrimaryInfo {
517 node_id: new,
518 address: address.clone(),
519 became_primary_at: Instant::now(),
520 is_confirmed: true,
521 };
522
523 *self.current_primary.write() = Some(info);
524
525 let _ = self.event_tx.send(PrimaryChangeEvent::Changed {
526 old,
527 new,
528 address,
529 });
530
531 tracing::info!("Primary tracker: primary changed from {:?} to {}", old, new);
532 }
533
534 fn handle_node_left(&self, node_id: Uuid) {
535 let current = self.current_primary.read().as_ref().map(|p| p.node_id);
536 if current == Some(node_id) {
537 self.clear_primary();
538 }
539 }
540
541 fn handle_health_changed(&self, node_id: Uuid, is_healthy: bool) {
542 if !is_healthy {
543 let current = self.current_primary.read().as_ref().map(|p| p.node_id);
544 if current == Some(node_id) {
545 tracing::warn!("Primary {} became unhealthy", node_id);
546 }
547 }
548 }
549
550 fn periodic_check(&self, provider: &dyn TopologyProvider) {
551 let current_id = self.current_primary.read().as_ref().map(|p| p.node_id);
552
553 if let Some(id) = current_id {
554 if let Some(node) = provider.get_node(id) {
555 if !node.is_healthy {
556 tracing::warn!("Primary {} is unhealthy in periodic check", id);
557 }
558 } else {
559 self.clear_primary();
560 }
561 } else {
562 self.detect_primary_from_provider(provider);
563 }
564 }
565}
566
567#[cfg(test)]
568mod tests {
569 use super::*;
570
571 #[test]
572 fn test_standalone_primary_tracker() {
573 let tracker = PrimaryTracker::new_standalone();
574
575 assert!(!tracker.has_primary());
576
577 let node_id = Uuid::new_v4();
578 tracker.set_primary(node_id, "localhost:5432".to_string());
579
580 assert!(tracker.has_primary());
581 assert_eq!(tracker.get_primary_id(), Some(node_id));
582 assert_eq!(tracker.get_primary_address(), Some("localhost:5432".to_string()));
583
584 let info = tracker.get_primary().unwrap();
586 assert!(!info.is_confirmed);
587
588 tracker.confirm_primary();
590 let info = tracker.get_primary().unwrap();
591 assert!(info.is_confirmed);
592
593 tracker.clear_primary();
595 assert!(!tracker.has_primary());
596 }
597
598 struct MockTopology {
600 event_tx: broadcast::Sender<TopologyEvent>,
601 primary: RwLock<Option<TopologyNodeInfo>>,
602 }
603
604 impl MockTopology {
605 fn new() -> Self {
606 let (event_tx, _) = broadcast::channel(16);
607 Self {
608 event_tx,
609 primary: RwLock::new(None),
610 }
611 }
612
613 fn set_primary(&self, node_id: Uuid, addr: &str) {
614 *self.primary.write() = Some(TopologyNodeInfo {
615 node_id,
616 client_addr: addr.to_string(),
617 is_healthy: true,
618 });
619 }
620 }
621
622 impl TopologyProvider for MockTopology {
623 fn subscribe(&self) -> broadcast::Receiver<TopologyEvent> {
624 self.event_tx.subscribe()
625 }
626
627 fn get_primary(&self) -> Option<TopologyNodeInfo> {
628 self.primary.read().clone()
629 }
630
631 fn get_node(&self, id: Uuid) -> Option<TopologyNodeInfo> {
632 let p = self.primary.read();
633 p.as_ref().filter(|n| n.node_id == id).cloned()
634 }
635 }
636
637 #[test]
638 fn test_provider_backed_tracker() {
639 let topo = Arc::new(MockTopology::new());
640 let node_id = Uuid::new_v4();
641 topo.set_primary(node_id, "primary:5432");
642
643 let tracker = PrimaryTracker::with_provider(topo.clone());
644 tracker.detect_primary_from_provider(topo.as_ref());
645
646 assert!(tracker.has_primary());
647 assert_eq!(tracker.get_primary_id(), Some(node_id));
648 }
649
650 #[test]
653 fn test_postgresql_failover_scenario() {
654 let topo = Arc::new(MockTopology::new());
655
656 let pg_primary = Uuid::new_v4();
658 let pg_sync = Uuid::new_v4();
659 let _pg_async = Uuid::new_v4();
660
661 topo.set_primary(pg_primary, "pg-primary:5432");
662
663 let tracker = PrimaryTracker::with_provider(topo.clone());
664 tracker.detect_primary_from_provider(topo.as_ref());
665
666 assert!(tracker.has_primary());
667 assert_eq!(tracker.get_primary_address(), Some("pg-primary:5432".to_string()));
668
669 let mut rx = tracker.subscribe();
671
672 tracker.clear_primary();
674 assert!(!tracker.has_primary());
675
676 let event = rx.try_recv().unwrap();
678 assert!(matches!(event, PrimaryChangeEvent::Lost { old } if old == pg_primary));
679
680 tracker.set_primary(pg_sync, "pg-sync:5432".to_string());
682 assert!(tracker.has_primary());
683 assert_eq!(tracker.get_primary_address(), Some("pg-sync:5432".to_string()));
684 assert!(!tracker.get_primary().unwrap().is_confirmed);
685
686 tracker.confirm_primary();
688 assert!(tracker.get_primary().unwrap().is_confirmed);
689
690 let event = rx.try_recv().unwrap();
692 assert!(matches!(event, PrimaryChangeEvent::Changed { new, .. } if new == pg_sync));
693 }
694
695 #[test]
698 fn test_custom_topology_provider() {
699 struct PatroniProvider {
700 leader: RwLock<Option<TopologyNodeInfo>>,
701 event_tx: broadcast::Sender<TopologyEvent>,
702 }
703
704 impl PatroniProvider {
705 fn new() -> Self {
706 let (tx, _) = broadcast::channel(16);
707 Self { leader: RwLock::new(None), event_tx: tx }
708 }
709 fn set_leader(&self, id: Uuid, addr: &str) {
710 *self.leader.write() = Some(TopologyNodeInfo {
711 node_id: id,
712 client_addr: addr.to_string(),
713 is_healthy: true,
714 });
715 }
716 }
717
718 impl TopologyProvider for PatroniProvider {
719 fn subscribe(&self) -> broadcast::Receiver<TopologyEvent> {
720 self.event_tx.subscribe()
721 }
722 fn get_primary(&self) -> Option<TopologyNodeInfo> {
723 self.leader.read().clone()
724 }
725 fn get_node(&self, id: Uuid) -> Option<TopologyNodeInfo> {
726 self.leader.read().as_ref().filter(|n| n.node_id == id).cloned()
727 }
728 }
729
730 let patroni = Arc::new(PatroniProvider::new());
731 let leader_id = Uuid::new_v4();
732 patroni.set_leader(leader_id, "patroni-leader.svc:5432");
733
734 let tracker = PrimaryTracker::with_provider(patroni.clone());
735 tracker.detect_primary_from_provider(patroni.as_ref());
736
737 assert!(tracker.has_primary());
738 assert_eq!(
739 tracker.get_primary_address(),
740 Some("patroni-leader.svc:5432".to_string())
741 );
742 }
743
744 #[cfg(feature = "postgres-topology")]
749 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
750 async fn test_poll_nodes_all_unreachable_sets_no_primary() {
751 let nodes = vec![
752 PostgresNode {
753 node_id: Uuid::new_v4(),
754 host: "127.0.0.1".into(),
755 port: 1, user: "postgres".into(),
757 password: None,
758 database: "postgres".into(),
759 },
760 PostgresNode {
761 node_id: Uuid::new_v4(),
762 host: "127.0.0.1".into(),
763 port: 2,
764 user: "postgres".into(),
765 password: None,
766 database: "postgres".into(),
767 },
768 ];
769
770 let provider = PostgresTopologyProvider::new(nodes)
771 .with_poll_interval(Duration::from_millis(200));
772 let mut rx = provider.event_tx.subscribe();
773
774 provider.poll_nodes().await;
776
777 assert!(provider.get_primary().is_none());
779
780 let mut health_events = 0;
784 for _ in 0..10 {
785 match rx.try_recv() {
786 Ok(TopologyEvent::HealthChanged { is_healthy: false, .. }) => {
787 health_events += 1;
788 }
789 Ok(_) => {}
790 Err(_) => break,
791 }
792 }
793 assert!(
794 health_events >= 1,
795 "expected at least one HealthChanged event"
796 );
797 }
798}