1use parking_lot::RwLock;
19use std::sync::Arc;
20use std::time::{Duration, Instant};
21use tokio::sync::broadcast;
22use uuid::Uuid;
23
24#[derive(Debug, Clone)]
28pub struct TopologyNodeInfo {
29 pub node_id: Uuid,
31 pub client_addr: String,
33 pub is_healthy: bool,
35}
36
37#[derive(Debug, Clone)]
39pub enum TopologyEvent {
40 PrimaryChanged {
42 old_primary: Option<Uuid>,
43 new_primary: Uuid,
44 },
45 NodeLeft { node_id: Uuid },
47 HealthChanged { node_id: Uuid, is_healthy: bool },
49}
50
51pub trait TopologyProvider: Send + Sync + 'static {
56 fn subscribe(&self) -> broadcast::Receiver<TopologyEvent>;
58
59 fn get_primary(&self) -> Option<TopologyNodeInfo>;
61
62 fn get_node(&self, id: Uuid) -> Option<TopologyNodeInfo>;
64}
65
66#[cfg(feature = "postgres-topology")]
74pub struct PostgresTopologyProvider {
75 nodes: Vec<PostgresNode>,
77 event_tx: broadcast::Sender<TopologyEvent>,
79 current_primary: RwLock<Option<TopologyNodeInfo>>,
81 poll_interval: Duration,
83 tls_config: std::sync::Arc<rustls::ClientConfig>,
86 tls_mode: crate::backend::TlsMode,
88}
89
90#[cfg(feature = "postgres-topology")]
91#[derive(Debug, Clone)]
92pub struct PostgresNode {
93 pub node_id: Uuid,
94 pub host: String,
95 pub port: u16,
96 pub user: String,
97 pub password: Option<String>,
98 pub database: String,
99}
100
101#[cfg(feature = "postgres-topology")]
102impl PostgresTopologyProvider {
103 pub fn new(nodes: Vec<PostgresNode>) -> Self {
105 let (event_tx, _) = broadcast::channel(16);
106 Self {
107 nodes,
108 event_tx,
109 current_primary: RwLock::new(None),
110 poll_interval: Duration::from_secs(2),
111 tls_config: crate::backend::tls::default_client_config(),
112 tls_mode: crate::backend::TlsMode::Prefer,
113 }
114 }
115
116 pub fn with_poll_interval(mut self, interval: Duration) -> Self {
118 self.poll_interval = interval;
119 self
120 }
121
122 pub fn with_tls_mode(mut self, mode: crate::backend::TlsMode) -> Self {
124 self.tls_mode = mode;
125 self
126 }
127
128 pub async fn start(&self) {
130 let mut interval = tokio::time::interval(self.poll_interval);
131
132 loop {
133 interval.tick().await;
134 self.poll_nodes().await;
135 }
136 }
137
138 async fn poll_nodes(&self) {
140 let mut next_primary: Option<TopologyNodeInfo> = None;
141
142 for node in &self.nodes {
143 match self.probe_recovery(node).await {
144 Ok(in_recovery) => {
145 if !in_recovery && next_primary.is_none() {
151 next_primary = Some(TopologyNodeInfo {
152 node_id: node.node_id,
153 client_addr: format!("{}:{}", node.host, node.port),
154 is_healthy: true,
155 });
156 }
157 }
158 Err(e) => {
159 tracing::warn!(
160 node = %node.host,
161 port = node.port,
162 error = %e,
163 "topology probe failed"
164 );
165 let _ = self.event_tx.send(TopologyEvent::HealthChanged {
166 node_id: node.node_id,
167 is_healthy: false,
168 });
169 }
170 }
171 }
172
173 let old_primary_id = self.current_primary.read().as_ref().map(|p| p.node_id);
174 let new_primary_id = next_primary.as_ref().map(|p| p.node_id);
175 if old_primary_id != new_primary_id {
176 *self.current_primary.write() = next_primary;
177 if let Some(new_id) = new_primary_id {
178 let _ = self.event_tx.send(TopologyEvent::PrimaryChanged {
179 old_primary: old_primary_id,
180 new_primary: new_id,
181 });
182 }
183 }
184 }
185
186 async fn probe_recovery(&self, node: &PostgresNode) -> crate::backend::BackendResult<bool> {
191 use crate::backend::{BackendClient, BackendConfig};
192
193 let cfg = BackendConfig {
194 host: node.host.clone(),
195 port: node.port,
196 user: node.user.clone(),
197 password: node.password.clone(),
198 database: Some(node.database.clone()),
199 application_name: Some("helios-topology".into()),
200 tls_mode: self.tls_mode,
201 connect_timeout: self.poll_interval.min(Duration::from_secs(5)),
202 query_timeout: self.poll_interval,
203 tls_config: self.tls_config.clone(),
204 };
205
206 let mut client = BackendClient::connect(&cfg).await?;
207 let value = client.query_scalar("SELECT pg_is_in_recovery()").await?;
208 client.close().await;
209 Ok(value.as_bool("pg_is_in_recovery")?.unwrap_or(false))
210 }
211}
212
213#[cfg(feature = "postgres-topology")]
214impl TopologyProvider for PostgresTopologyProvider {
215 fn subscribe(&self) -> broadcast::Receiver<TopologyEvent> {
216 self.event_tx.subscribe()
217 }
218
219 fn get_primary(&self) -> Option<TopologyNodeInfo> {
220 self.current_primary.read().clone()
221 }
222
223 fn get_node(&self, id: Uuid) -> Option<TopologyNodeInfo> {
224 self.nodes
225 .iter()
226 .find(|n| n.node_id == id)
227 .map(|n| TopologyNodeInfo {
228 node_id: n.node_id,
229 client_addr: format!("{}:{}", n.host, n.port),
230 is_healthy: true, })
232 }
233}
234
235#[cfg(feature = "heliosdb-topology")]
238pub mod heliosdb_provider {
239 use super::*;
248
249 pub struct HeliosTopologyProvider<T: HeliosTopologyBridge> {
254 inner: Arc<T>,
255 }
256
257 pub trait HeliosTopologyBridge: Send + Sync + 'static {
264 fn subscribe(&self) -> broadcast::Receiver<TopologyEvent>;
265 fn get_primary(&self) -> Option<TopologyNodeInfo>;
266 fn get_node(&self, id: Uuid) -> Option<TopologyNodeInfo>;
267 }
268
269 impl<T: HeliosTopologyBridge> HeliosTopologyProvider<T> {
270 pub fn new(inner: Arc<T>) -> Self {
271 Self { inner }
272 }
273 }
274
275 impl<T: HeliosTopologyBridge> TopologyProvider for HeliosTopologyProvider<T> {
276 fn subscribe(&self) -> broadcast::Receiver<TopologyEvent> {
277 self.inner.subscribe()
278 }
279
280 fn get_primary(&self) -> Option<TopologyNodeInfo> {
281 self.inner.get_primary()
282 }
283
284 fn get_node(&self, id: Uuid) -> Option<TopologyNodeInfo> {
285 self.inner.get_node(id)
286 }
287 }
288}
289
290#[derive(Debug, Clone)]
294pub struct PrimaryInfo {
295 pub node_id: Uuid,
297 pub address: String,
299 pub became_primary_at: Instant,
301 pub is_confirmed: bool,
303}
304
305#[derive(Debug, Clone)]
307pub enum PrimaryChangeEvent {
308 Changed {
310 old: Option<Uuid>,
311 new: Uuid,
312 address: String,
313 },
314 Lost { old: Uuid },
316 Confirmed { node_id: Uuid },
318}
319
320pub struct PrimaryTracker {
329 provider: Option<Arc<dyn TopologyProvider>>,
331 current_primary: RwLock<Option<PrimaryInfo>>,
333 event_tx: broadcast::Sender<PrimaryChangeEvent>,
335 tracking_interval: Duration,
337}
338
339impl PrimaryTracker {
340 pub fn new_standalone() -> Self {
342 let (event_tx, _) = broadcast::channel(16);
343 Self {
344 provider: None,
345 current_primary: RwLock::new(None),
346 event_tx,
347 tracking_interval: Duration::from_millis(500),
348 }
349 }
350
351 pub fn with_provider(provider: Arc<dyn TopologyProvider>) -> Self {
353 let (event_tx, _) = broadcast::channel(16);
354 Self {
355 provider: Some(provider),
356 current_primary: RwLock::new(None),
357 event_tx,
358 tracking_interval: Duration::from_millis(500),
359 }
360 }
361
362 pub fn with_tracking_interval(mut self, interval: Duration) -> Self {
364 self.tracking_interval = interval;
365 self
366 }
367
368 pub fn subscribe(&self) -> broadcast::Receiver<PrimaryChangeEvent> {
370 self.event_tx.subscribe()
371 }
372
373 pub fn get_primary(&self) -> Option<PrimaryInfo> {
375 self.current_primary.read().clone()
376 }
377
378 pub fn get_primary_id(&self) -> Option<Uuid> {
380 self.current_primary.read().as_ref().map(|p| p.node_id)
381 }
382
383 pub fn get_primary_address(&self) -> Option<String> {
385 self.current_primary
386 .read()
387 .as_ref()
388 .map(|p| p.address.clone())
389 }
390
391 pub fn has_primary(&self) -> bool {
393 self.current_primary.read().is_some()
394 }
395
396 pub fn set_primary(&self, node_id: Uuid, address: String) {
398 let old_primary = self.current_primary.read().as_ref().map(|p| p.node_id);
399
400 let new_info = PrimaryInfo {
401 node_id,
402 address: address.clone(),
403 became_primary_at: Instant::now(),
404 is_confirmed: false,
405 };
406
407 *self.current_primary.write() = Some(new_info);
408
409 let _ = self.event_tx.send(PrimaryChangeEvent::Changed {
410 old: old_primary,
411 new: node_id,
412 address,
413 });
414
415 tracing::info!(
416 "Primary tracker: set primary to {} (pending confirmation)",
417 node_id
418 );
419 }
420
421 pub fn confirm_primary(&self) {
423 let mut guard = self.current_primary.write();
424 if let Some(ref mut info) = *guard {
425 info.is_confirmed = true;
426 let node_id = info.node_id;
427 drop(guard);
428
429 let _ = self
430 .event_tx
431 .send(PrimaryChangeEvent::Confirmed { node_id });
432 tracing::info!("Primary tracker: confirmed primary {}", node_id);
433 }
434 }
435
436 pub fn clear_primary(&self) {
438 let old_primary = self.current_primary.write().take();
439
440 if let Some(info) = old_primary {
441 let _ = self
442 .event_tx
443 .send(PrimaryChangeEvent::Lost { old: info.node_id });
444 tracing::warn!("Primary tracker: lost primary {}", info.node_id);
445 }
446 }
447
448 pub async fn run(&self) {
453 let provider = match &self.provider {
454 Some(p) => Arc::clone(p),
455 None => {
456 tracing::info!("Primary tracker: no topology provider, running in standalone mode");
457 return;
458 }
459 };
460
461 let mut topology_rx = provider.subscribe();
462 let mut interval = tokio::time::interval(self.tracking_interval);
463
464 self.detect_primary_from_provider(&*provider);
466
467 loop {
468 tokio::select! {
469 event = topology_rx.recv() => {
470 match event {
471 Ok(TopologyEvent::PrimaryChanged { old_primary, new_primary }) => {
472 self.handle_primary_changed(&*provider, old_primary, new_primary);
473 }
474 Ok(TopologyEvent::NodeLeft { node_id }) => {
475 self.handle_node_left(node_id);
476 }
477 Ok(TopologyEvent::HealthChanged { node_id, is_healthy }) => {
478 self.handle_health_changed(node_id, is_healthy);
479 }
480 Err(broadcast::error::RecvError::Lagged(n)) => {
481 tracing::warn!("Primary tracker lagged {} events", n);
482 }
483 Err(broadcast::error::RecvError::Closed) => {
484 break;
485 }
486 }
487 }
488 _ = interval.tick() => {
489 self.periodic_check(&*provider);
490 }
491 }
492 }
493 }
494
495 fn detect_primary_from_provider(&self, provider: &dyn TopologyProvider) {
498 if let Some(primary) = provider.get_primary() {
499 let info = PrimaryInfo {
500 node_id: primary.node_id,
501 address: primary.client_addr.clone(),
502 became_primary_at: Instant::now(),
503 is_confirmed: true,
504 };
505
506 *self.current_primary.write() = Some(info);
507 tracing::info!("Primary tracker: detected primary {}", primary.node_id);
508 }
509 }
510
511 fn handle_primary_changed(
512 &self,
513 provider: &dyn TopologyProvider,
514 old: Option<Uuid>,
515 new: Uuid,
516 ) {
517 let address = provider
518 .get_node(new)
519 .map(|n| n.client_addr)
520 .unwrap_or_else(|| format!("{}:5432", new));
521
522 let info = PrimaryInfo {
523 node_id: new,
524 address: address.clone(),
525 became_primary_at: Instant::now(),
526 is_confirmed: true,
527 };
528
529 *self.current_primary.write() = Some(info);
530
531 let _ = self
532 .event_tx
533 .send(PrimaryChangeEvent::Changed { old, new, address });
534
535 tracing::info!("Primary tracker: primary changed from {:?} to {}", old, new);
536 }
537
538 fn handle_node_left(&self, node_id: Uuid) {
539 let current = self.current_primary.read().as_ref().map(|p| p.node_id);
540 if current == Some(node_id) {
541 self.clear_primary();
542 }
543 }
544
545 fn handle_health_changed(&self, node_id: Uuid, is_healthy: bool) {
546 if !is_healthy {
547 let current = self.current_primary.read().as_ref().map(|p| p.node_id);
548 if current == Some(node_id) {
549 tracing::warn!("Primary {} became unhealthy", node_id);
550 }
551 }
552 }
553
554 fn periodic_check(&self, provider: &dyn TopologyProvider) {
555 let current_id = self.current_primary.read().as_ref().map(|p| p.node_id);
556
557 if let Some(id) = current_id {
558 if let Some(node) = provider.get_node(id) {
559 if !node.is_healthy {
560 tracing::warn!("Primary {} is unhealthy in periodic check", id);
561 }
562 } else {
563 self.clear_primary();
564 }
565 } else {
566 self.detect_primary_from_provider(provider);
567 }
568 }
569}
570
571#[cfg(test)]
572mod tests {
573 use super::*;
574
575 #[test]
576 fn test_standalone_primary_tracker() {
577 let tracker = PrimaryTracker::new_standalone();
578
579 assert!(!tracker.has_primary());
580
581 let node_id = Uuid::new_v4();
582 tracker.set_primary(node_id, "localhost:5432".to_string());
583
584 assert!(tracker.has_primary());
585 assert_eq!(tracker.get_primary_id(), Some(node_id));
586 assert_eq!(
587 tracker.get_primary_address(),
588 Some("localhost:5432".to_string())
589 );
590
591 let info = tracker.get_primary().unwrap();
593 assert!(!info.is_confirmed);
594
595 tracker.confirm_primary();
597 let info = tracker.get_primary().unwrap();
598 assert!(info.is_confirmed);
599
600 tracker.clear_primary();
602 assert!(!tracker.has_primary());
603 }
604
605 struct MockTopology {
607 event_tx: broadcast::Sender<TopologyEvent>,
608 primary: RwLock<Option<TopologyNodeInfo>>,
609 }
610
611 impl MockTopology {
612 fn new() -> Self {
613 let (event_tx, _) = broadcast::channel(16);
614 Self {
615 event_tx,
616 primary: RwLock::new(None),
617 }
618 }
619
620 fn set_primary(&self, node_id: Uuid, addr: &str) {
621 *self.primary.write() = Some(TopologyNodeInfo {
622 node_id,
623 client_addr: addr.to_string(),
624 is_healthy: true,
625 });
626 }
627 }
628
629 impl TopologyProvider for MockTopology {
630 fn subscribe(&self) -> broadcast::Receiver<TopologyEvent> {
631 self.event_tx.subscribe()
632 }
633
634 fn get_primary(&self) -> Option<TopologyNodeInfo> {
635 self.primary.read().clone()
636 }
637
638 fn get_node(&self, id: Uuid) -> Option<TopologyNodeInfo> {
639 let p = self.primary.read();
640 p.as_ref().filter(|n| n.node_id == id).cloned()
641 }
642 }
643
644 #[test]
645 fn test_provider_backed_tracker() {
646 let topo = Arc::new(MockTopology::new());
647 let node_id = Uuid::new_v4();
648 topo.set_primary(node_id, "primary:5432");
649
650 let tracker = PrimaryTracker::with_provider(topo.clone());
651 tracker.detect_primary_from_provider(topo.as_ref());
652
653 assert!(tracker.has_primary());
654 assert_eq!(tracker.get_primary_id(), Some(node_id));
655 }
656
657 #[test]
660 fn test_postgresql_failover_scenario() {
661 let topo = Arc::new(MockTopology::new());
662
663 let pg_primary = Uuid::new_v4();
665 let pg_sync = Uuid::new_v4();
666 let _pg_async = Uuid::new_v4();
667
668 topo.set_primary(pg_primary, "pg-primary:5432");
669
670 let tracker = PrimaryTracker::with_provider(topo.clone());
671 tracker.detect_primary_from_provider(topo.as_ref());
672
673 assert!(tracker.has_primary());
674 assert_eq!(
675 tracker.get_primary_address(),
676 Some("pg-primary:5432".to_string())
677 );
678
679 let mut rx = tracker.subscribe();
681
682 tracker.clear_primary();
684 assert!(!tracker.has_primary());
685
686 let event = rx.try_recv().unwrap();
688 assert!(matches!(event, PrimaryChangeEvent::Lost { old } if old == pg_primary));
689
690 tracker.set_primary(pg_sync, "pg-sync:5432".to_string());
692 assert!(tracker.has_primary());
693 assert_eq!(
694 tracker.get_primary_address(),
695 Some("pg-sync:5432".to_string())
696 );
697 assert!(!tracker.get_primary().unwrap().is_confirmed);
698
699 tracker.confirm_primary();
701 assert!(tracker.get_primary().unwrap().is_confirmed);
702
703 let event = rx.try_recv().unwrap();
705 assert!(matches!(event, PrimaryChangeEvent::Changed { new, .. } if new == pg_sync));
706 }
707
708 #[test]
711 fn test_custom_topology_provider() {
712 struct PatroniProvider {
713 leader: RwLock<Option<TopologyNodeInfo>>,
714 event_tx: broadcast::Sender<TopologyEvent>,
715 }
716
717 impl PatroniProvider {
718 fn new() -> Self {
719 let (tx, _) = broadcast::channel(16);
720 Self {
721 leader: RwLock::new(None),
722 event_tx: tx,
723 }
724 }
725 fn set_leader(&self, id: Uuid, addr: &str) {
726 *self.leader.write() = Some(TopologyNodeInfo {
727 node_id: id,
728 client_addr: addr.to_string(),
729 is_healthy: true,
730 });
731 }
732 }
733
734 impl TopologyProvider for PatroniProvider {
735 fn subscribe(&self) -> broadcast::Receiver<TopologyEvent> {
736 self.event_tx.subscribe()
737 }
738 fn get_primary(&self) -> Option<TopologyNodeInfo> {
739 self.leader.read().clone()
740 }
741 fn get_node(&self, id: Uuid) -> Option<TopologyNodeInfo> {
742 self.leader
743 .read()
744 .as_ref()
745 .filter(|n| n.node_id == id)
746 .cloned()
747 }
748 }
749
750 let patroni = Arc::new(PatroniProvider::new());
751 let leader_id = Uuid::new_v4();
752 patroni.set_leader(leader_id, "patroni-leader.svc:5432");
753
754 let tracker = PrimaryTracker::with_provider(patroni.clone());
755 tracker.detect_primary_from_provider(patroni.as_ref());
756
757 assert!(tracker.has_primary());
758 assert_eq!(
759 tracker.get_primary_address(),
760 Some("patroni-leader.svc:5432".to_string())
761 );
762 }
763
764 #[cfg(feature = "postgres-topology")]
769 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
770 async fn test_poll_nodes_all_unreachable_sets_no_primary() {
771 let nodes = vec![
772 PostgresNode {
773 node_id: Uuid::new_v4(),
774 host: "127.0.0.1".into(),
775 port: 1, user: "postgres".into(),
777 password: None,
778 database: "postgres".into(),
779 },
780 PostgresNode {
781 node_id: Uuid::new_v4(),
782 host: "127.0.0.1".into(),
783 port: 2,
784 user: "postgres".into(),
785 password: None,
786 database: "postgres".into(),
787 },
788 ];
789
790 let provider =
791 PostgresTopologyProvider::new(nodes).with_poll_interval(Duration::from_millis(200));
792 let mut rx = provider.event_tx.subscribe();
793
794 provider.poll_nodes().await;
796
797 assert!(provider.get_primary().is_none());
799
800 let mut health_events = 0;
804 for _ in 0..10 {
805 match rx.try_recv() {
806 Ok(TopologyEvent::HealthChanged {
807 is_healthy: false, ..
808 }) => {
809 health_events += 1;
810 }
811 Ok(_) => {}
812 Err(_) => break,
813 }
814 }
815 assert!(
816 health_events >= 1,
817 "expected at least one HealthChanged event"
818 );
819 }
820}