Skip to main content

heliosdb_proxy/
primary_tracker.rs

1//! Primary Tracker - Tracks current primary node for query routing
2//!
3//! Monitors cluster topology and maintains the current primary node
4//! information. During switchover, updates are received from the
5//! switchover coordinator to ensure queries are routed correctly.
6//!
7//! # Topology Providers
8//!
9//! The primary tracker uses a `TopologyProvider` trait to abstract over
10//! different topology sources:
11//!
12//! - **HeliosDB**: Uses the internal `TopologyManager` from the replication
13//!   subsystem (feature-gated behind `heliosdb-topology`).
14//! - **PostgreSQL**: Polls `pg_stat_replication` / `pg_is_in_recovery()`
15//!   to detect primary changes (feature-gated behind `postgres-topology`).
16//! - **Manual/Standalone**: Programmatic set/clear via API calls.
17
18use std::sync::Arc;
19use std::time::{Duration, Instant};
20use parking_lot::RwLock;
21use tokio::sync::broadcast;
22use uuid::Uuid;
23
24// ── Topology provider trait ─────────────────────────────────────────
25
26/// Information about a node in the cluster topology.
27#[derive(Debug, Clone)]
28pub struct TopologyNodeInfo {
29    /// Node UUID
30    pub node_id: Uuid,
31    /// Client-facing address (host:port)
32    pub client_addr: String,
33    /// Whether the node is currently healthy
34    pub is_healthy: bool,
35}
36
37/// Events emitted by a topology provider.
38#[derive(Debug, Clone)]
39pub enum TopologyEvent {
40    /// The primary node changed.
41    PrimaryChanged {
42        old_primary: Option<Uuid>,
43        new_primary: Uuid,
44    },
45    /// A node left the cluster.
46    NodeLeft { node_id: Uuid },
47    /// A node's health status changed.
48    HealthChanged { node_id: Uuid, is_healthy: bool },
49}
50
51/// Trait abstracting topology discovery.
52///
53/// Implement this for any database backend (HeliosDB, PostgreSQL, etc.)
54/// to enable automatic primary tracking.
55pub trait TopologyProvider: Send + Sync + 'static {
56    /// Subscribe to topology change events.
57    fn subscribe(&self) -> broadcast::Receiver<TopologyEvent>;
58
59    /// Get the current primary node, if one exists.
60    fn get_primary(&self) -> Option<TopologyNodeInfo>;
61
62    /// Look up a node by its UUID.
63    fn get_node(&self, id: Uuid) -> Option<TopologyNodeInfo>;
64}
65
66// ── PostgreSQL topology provider ────────────────────────────────────
67
68/// PostgreSQL-based topology provider.
69///
70/// Discovers the primary by polling `pg_is_in_recovery()` on each
71/// configured node. Detects primary changes by comparing results
72/// across polling intervals.
73#[cfg(feature = "postgres-topology")]
74pub struct PostgresTopologyProvider {
75    /// Nodes to poll
76    nodes: Vec<PostgresNode>,
77    /// Event broadcaster
78    event_tx: broadcast::Sender<TopologyEvent>,
79    /// Current primary (cached)
80    current_primary: RwLock<Option<TopologyNodeInfo>>,
81    /// Polling interval
82    poll_interval: Duration,
83    /// Shared rustls client config for TLS negotiation. Built once at
84    /// construction time from the Mozilla root set.
85    tls_config: std::sync::Arc<rustls::ClientConfig>,
86    /// TLS policy applied to every probe connection.
87    tls_mode: crate::backend::TlsMode,
88}
89
90#[cfg(feature = "postgres-topology")]
91#[derive(Debug, Clone)]
92pub struct PostgresNode {
93    pub node_id: Uuid,
94    pub host: String,
95    pub port: u16,
96    pub user: String,
97    pub password: Option<String>,
98    pub database: String,
99}
100
101#[cfg(feature = "postgres-topology")]
102impl PostgresTopologyProvider {
103    /// Create a new PostgreSQL topology provider.
104    pub fn new(nodes: Vec<PostgresNode>) -> Self {
105        let (event_tx, _) = broadcast::channel(16);
106        Self {
107            nodes,
108            event_tx,
109            current_primary: RwLock::new(None),
110            poll_interval: Duration::from_secs(2),
111            tls_config: crate::backend::tls::default_client_config(),
112            tls_mode: crate::backend::TlsMode::Prefer,
113        }
114    }
115
116    /// Set polling interval.
117    pub fn with_poll_interval(mut self, interval: Duration) -> Self {
118        self.poll_interval = interval;
119        self
120    }
121
122    /// Set the TLS policy used when opening probe connections.
123    pub fn with_tls_mode(mut self, mode: crate::backend::TlsMode) -> Self {
124        self.tls_mode = mode;
125        self
126    }
127
128    /// Start polling in the background.
129    pub async fn start(&self) {
130        let mut interval = tokio::time::interval(self.poll_interval);
131
132        loop {
133            interval.tick().await;
134            self.poll_nodes().await;
135        }
136    }
137
138    /// Poll all nodes and detect primary.
139    async fn poll_nodes(&self) {
140        let mut next_primary: Option<TopologyNodeInfo> = None;
141
142        for node in &self.nodes {
143            match self.probe_recovery(node).await {
144                Ok(in_recovery) => {
145                    // The node reporting `pg_is_in_recovery() = false` is
146                    // the primary. In a healthy cluster there is exactly
147                    // one; we take the first we encounter so split-brain
148                    // (briefly possible during failover) still yields a
149                    // deterministic choice.
150                    if !in_recovery && next_primary.is_none() {
151                        next_primary = Some(TopologyNodeInfo {
152                            node_id: node.node_id,
153                            client_addr: format!("{}:{}", node.host, node.port),
154                            is_healthy: true,
155                        });
156                    }
157                }
158                Err(e) => {
159                    tracing::warn!(
160                        node = %node.host,
161                        port = node.port,
162                        error = %e,
163                        "topology probe failed"
164                    );
165                    let _ = self.event_tx.send(TopologyEvent::HealthChanged {
166                        node_id: node.node_id,
167                        is_healthy: false,
168                    });
169                }
170            }
171        }
172
173        let old_primary_id = self.current_primary.read().as_ref().map(|p| p.node_id);
174        let new_primary_id = next_primary.as_ref().map(|p| p.node_id);
175        if old_primary_id != new_primary_id {
176            *self.current_primary.write() = next_primary;
177            if let Some(new_id) = new_primary_id {
178                let _ = self.event_tx.send(TopologyEvent::PrimaryChanged {
179                    old_primary: old_primary_id,
180                    new_primary: new_id,
181                });
182            }
183        }
184    }
185
186    /// Connect to a single node and run `SELECT pg_is_in_recovery()`.
187    ///
188    /// Returns `Ok(true)` if the node is a standby, `Ok(false)` for a
189    /// primary. Errors propagate as `BackendError`.
190    async fn probe_recovery(
191        &self,
192        node: &PostgresNode,
193    ) -> crate::backend::BackendResult<bool> {
194        use crate::backend::{BackendClient, BackendConfig};
195
196        let cfg = BackendConfig {
197            host: node.host.clone(),
198            port: node.port,
199            user: node.user.clone(),
200            password: node.password.clone(),
201            database: Some(node.database.clone()),
202            application_name: Some("helios-topology".into()),
203            tls_mode: self.tls_mode,
204            connect_timeout: self.poll_interval.min(Duration::from_secs(5)),
205            query_timeout: self.poll_interval,
206            tls_config: self.tls_config.clone(),
207        };
208
209        let mut client = BackendClient::connect(&cfg).await?;
210        let value = client
211            .query_scalar("SELECT pg_is_in_recovery()")
212            .await?;
213        client.close().await;
214        Ok(value
215            .as_bool("pg_is_in_recovery")?
216            .unwrap_or(false))
217    }
218}
219
220#[cfg(feature = "postgres-topology")]
221impl TopologyProvider for PostgresTopologyProvider {
222    fn subscribe(&self) -> broadcast::Receiver<TopologyEvent> {
223        self.event_tx.subscribe()
224    }
225
226    fn get_primary(&self) -> Option<TopologyNodeInfo> {
227        self.current_primary.read().clone()
228    }
229
230    fn get_node(&self, id: Uuid) -> Option<TopologyNodeInfo> {
231        self.nodes.iter().find(|n| n.node_id == id).map(|n| TopologyNodeInfo {
232            node_id: n.node_id,
233            client_addr: format!("{}:{}", n.host, n.port),
234            is_healthy: true, // Would be checked via actual connection
235        })
236    }
237}
238
239// ── HeliosDB topology provider (bridges to internal TopologyManager) ─
240
241#[cfg(feature = "heliosdb-topology")]
242pub mod heliosdb_provider {
243    //! Bridge to the HeliosDB-Lite internal `TopologyManager`.
244    //!
245    //! This module is only compiled when HeliosProxy is built as part of
246    //! the HeliosDB-Lite workspace (feature `heliosdb-topology`).
247    //! It wraps the internal replication types behind the generic
248    //! `TopologyProvider` trait so that `PrimaryTracker` can use them
249    //! without a hard dependency.
250
251    use super::*;
252
253    /// Wrapper that adapts the HeliosDB `TopologyManager` to the
254    /// `TopologyProvider` trait.
255    ///
256    /// Consumers pass this struct to `PrimaryTracker::with_provider()`.
257    pub struct HeliosTopologyProvider<T: HeliosTopologyBridge> {
258        inner: Arc<T>,
259    }
260
261    /// Trait that the HeliosDB replication crate must implement to
262    /// bridge into the proxy topology system.
263    ///
264    /// This avoids a direct `use crate::replication::topology` import
265    /// and allows the standalone proxy to compile without the
266    /// replication crate.
267    pub trait HeliosTopologyBridge: Send + Sync + 'static {
268        fn subscribe(&self) -> broadcast::Receiver<TopologyEvent>;
269        fn get_primary(&self) -> Option<TopologyNodeInfo>;
270        fn get_node(&self, id: Uuid) -> Option<TopologyNodeInfo>;
271    }
272
273    impl<T: HeliosTopologyBridge> HeliosTopologyProvider<T> {
274        pub fn new(inner: Arc<T>) -> Self {
275            Self { inner }
276        }
277    }
278
279    impl<T: HeliosTopologyBridge> TopologyProvider for HeliosTopologyProvider<T> {
280        fn subscribe(&self) -> broadcast::Receiver<TopologyEvent> {
281            self.inner.subscribe()
282        }
283
284        fn get_primary(&self) -> Option<TopologyNodeInfo> {
285            self.inner.get_primary()
286        }
287
288        fn get_node(&self, id: Uuid) -> Option<TopologyNodeInfo> {
289            self.inner.get_node(id)
290        }
291    }
292}
293
294// ── Primary info & events ───────────────────────────────────────────
295
296/// Primary node information
297#[derive(Debug, Clone)]
298pub struct PrimaryInfo {
299    /// Node ID
300    pub node_id: Uuid,
301    /// Client address (host:port)
302    pub address: String,
303    /// Time when this node became primary
304    pub became_primary_at: Instant,
305    /// Whether this is confirmed (vs pending switchover)
306    pub is_confirmed: bool,
307}
308
309/// Primary change event
310#[derive(Debug, Clone)]
311pub enum PrimaryChangeEvent {
312    /// Primary changed to new node
313    Changed {
314        old: Option<Uuid>,
315        new: Uuid,
316        address: String,
317    },
318    /// Primary lost (no healthy primary)
319    Lost { old: Uuid },
320    /// Primary confirmed (after switchover completes)
321    Confirmed { node_id: Uuid },
322}
323
324// ── Primary Tracker ─────────────────────────────────────────────────
325
326/// Primary Tracker
327///
328/// Can be used in three modes:
329/// 1. **With a TopologyProvider** – automatic tracking via `with_provider()`.
330/// 2. **Standalone** – manual `set_primary()` / `clear_primary()` calls.
331/// 3. **PostgreSQL** – pass a `PostgresTopologyProvider` (feature `postgres-topology`).
332pub struct PrimaryTracker {
333    /// Optional topology provider (Box<dyn> for either HeliosDB or PostgreSQL)
334    provider: Option<Arc<dyn TopologyProvider>>,
335    /// Current primary info
336    current_primary: RwLock<Option<PrimaryInfo>>,
337    /// Event broadcaster
338    event_tx: broadcast::Sender<PrimaryChangeEvent>,
339    /// Tracking interval
340    tracking_interval: Duration,
341}
342
343impl PrimaryTracker {
344    /// Create a standalone primary tracker (manual set/clear).
345    pub fn new_standalone() -> Self {
346        let (event_tx, _) = broadcast::channel(16);
347        Self {
348            provider: None,
349            current_primary: RwLock::new(None),
350            event_tx,
351            tracking_interval: Duration::from_millis(500),
352        }
353    }
354
355    /// Create a primary tracker backed by a topology provider.
356    pub fn with_provider(provider: Arc<dyn TopologyProvider>) -> Self {
357        let (event_tx, _) = broadcast::channel(16);
358        Self {
359            provider: Some(provider),
360            current_primary: RwLock::new(None),
361            event_tx,
362            tracking_interval: Duration::from_millis(500),
363        }
364    }
365
366    /// Set tracking interval.
367    pub fn with_tracking_interval(mut self, interval: Duration) -> Self {
368        self.tracking_interval = interval;
369        self
370    }
371
372    /// Subscribe to primary change events.
373    pub fn subscribe(&self) -> broadcast::Receiver<PrimaryChangeEvent> {
374        self.event_tx.subscribe()
375    }
376
377    /// Get current primary info.
378    pub fn get_primary(&self) -> Option<PrimaryInfo> {
379        self.current_primary.read().clone()
380    }
381
382    /// Get current primary node ID.
383    pub fn get_primary_id(&self) -> Option<Uuid> {
384        self.current_primary.read().as_ref().map(|p| p.node_id)
385    }
386
387    /// Get current primary address.
388    pub fn get_primary_address(&self) -> Option<String> {
389        self.current_primary.read().as_ref().map(|p| p.address.clone())
390    }
391
392    /// Check if we have a healthy primary.
393    pub fn has_primary(&self) -> bool {
394        self.current_primary.read().is_some()
395    }
396
397    /// Set primary manually (or called during switchover).
398    pub fn set_primary(&self, node_id: Uuid, address: String) {
399        let old_primary = self.current_primary.read().as_ref().map(|p| p.node_id);
400
401        let new_info = PrimaryInfo {
402            node_id,
403            address: address.clone(),
404            became_primary_at: Instant::now(),
405            is_confirmed: false,
406        };
407
408        *self.current_primary.write() = Some(new_info);
409
410        let _ = self.event_tx.send(PrimaryChangeEvent::Changed {
411            old: old_primary,
412            new: node_id,
413            address,
414        });
415
416        tracing::info!("Primary tracker: set primary to {} (pending confirmation)", node_id);
417    }
418
419    /// Confirm the current primary (called after switchover completes).
420    pub fn confirm_primary(&self) {
421        let mut guard = self.current_primary.write();
422        if let Some(ref mut info) = *guard {
423            info.is_confirmed = true;
424            let node_id = info.node_id;
425            drop(guard);
426
427            let _ = self.event_tx.send(PrimaryChangeEvent::Confirmed { node_id });
428            tracing::info!("Primary tracker: confirmed primary {}", node_id);
429        }
430    }
431
432    /// Clear primary (called when primary is lost).
433    pub fn clear_primary(&self) {
434        let old_primary = self.current_primary.write().take();
435
436        if let Some(info) = old_primary {
437            let _ = self.event_tx.send(PrimaryChangeEvent::Lost { old: info.node_id });
438            tracing::warn!("Primary tracker: lost primary {}", info.node_id);
439        }
440    }
441
442    /// Run the primary tracker loop (requires a topology provider).
443    ///
444    /// If no provider is set, this returns immediately — use manual
445    /// `set_primary()` / `clear_primary()` instead.
446    pub async fn run(&self) {
447        let provider = match &self.provider {
448            Some(p) => Arc::clone(p),
449            None => {
450                tracing::info!("Primary tracker: no topology provider, running in standalone mode");
451                return;
452            }
453        };
454
455        let mut topology_rx = provider.subscribe();
456        let mut interval = tokio::time::interval(self.tracking_interval);
457
458        // Initial detection
459        self.detect_primary_from_provider(&*provider);
460
461        loop {
462            tokio::select! {
463                event = topology_rx.recv() => {
464                    match event {
465                        Ok(TopologyEvent::PrimaryChanged { old_primary, new_primary }) => {
466                            self.handle_primary_changed(&*provider, old_primary, new_primary);
467                        }
468                        Ok(TopologyEvent::NodeLeft { node_id }) => {
469                            self.handle_node_left(node_id);
470                        }
471                        Ok(TopologyEvent::HealthChanged { node_id, is_healthy }) => {
472                            self.handle_health_changed(node_id, is_healthy);
473                        }
474                        Err(broadcast::error::RecvError::Lagged(n)) => {
475                            tracing::warn!("Primary tracker lagged {} events", n);
476                        }
477                        Err(broadcast::error::RecvError::Closed) => {
478                            break;
479                        }
480                    }
481                }
482                _ = interval.tick() => {
483                    self.periodic_check(&*provider);
484                }
485            }
486        }
487    }
488
489    // ── Internal helpers ────────────────────────────────────────────
490
491    fn detect_primary_from_provider(&self, provider: &dyn TopologyProvider) {
492        if let Some(primary) = provider.get_primary() {
493            let info = PrimaryInfo {
494                node_id: primary.node_id,
495                address: primary.client_addr.clone(),
496                became_primary_at: Instant::now(),
497                is_confirmed: true,
498            };
499
500            *self.current_primary.write() = Some(info);
501            tracing::info!("Primary tracker: detected primary {}", primary.node_id);
502        }
503    }
504
505    fn handle_primary_changed(
506        &self,
507        provider: &dyn TopologyProvider,
508        old: Option<Uuid>,
509        new: Uuid,
510    ) {
511        let address = provider
512            .get_node(new)
513            .map(|n| n.client_addr)
514            .unwrap_or_else(|| format!("{}:5432", new));
515
516        let info = PrimaryInfo {
517            node_id: new,
518            address: address.clone(),
519            became_primary_at: Instant::now(),
520            is_confirmed: true,
521        };
522
523        *self.current_primary.write() = Some(info);
524
525        let _ = self.event_tx.send(PrimaryChangeEvent::Changed {
526            old,
527            new,
528            address,
529        });
530
531        tracing::info!("Primary tracker: primary changed from {:?} to {}", old, new);
532    }
533
534    fn handle_node_left(&self, node_id: Uuid) {
535        let current = self.current_primary.read().as_ref().map(|p| p.node_id);
536        if current == Some(node_id) {
537            self.clear_primary();
538        }
539    }
540
541    fn handle_health_changed(&self, node_id: Uuid, is_healthy: bool) {
542        if !is_healthy {
543            let current = self.current_primary.read().as_ref().map(|p| p.node_id);
544            if current == Some(node_id) {
545                tracing::warn!("Primary {} became unhealthy", node_id);
546            }
547        }
548    }
549
550    fn periodic_check(&self, provider: &dyn TopologyProvider) {
551        let current_id = self.current_primary.read().as_ref().map(|p| p.node_id);
552
553        if let Some(id) = current_id {
554            if let Some(node) = provider.get_node(id) {
555                if !node.is_healthy {
556                    tracing::warn!("Primary {} is unhealthy in periodic check", id);
557                }
558            } else {
559                self.clear_primary();
560            }
561        } else {
562            self.detect_primary_from_provider(provider);
563        }
564    }
565}
566
567#[cfg(test)]
568mod tests {
569    use super::*;
570
571    #[test]
572    fn test_standalone_primary_tracker() {
573        let tracker = PrimaryTracker::new_standalone();
574
575        assert!(!tracker.has_primary());
576
577        let node_id = Uuid::new_v4();
578        tracker.set_primary(node_id, "localhost:5432".to_string());
579
580        assert!(tracker.has_primary());
581        assert_eq!(tracker.get_primary_id(), Some(node_id));
582        assert_eq!(tracker.get_primary_address(), Some("localhost:5432".to_string()));
583
584        // Not confirmed yet
585        let info = tracker.get_primary().unwrap();
586        assert!(!info.is_confirmed);
587
588        // Confirm
589        tracker.confirm_primary();
590        let info = tracker.get_primary().unwrap();
591        assert!(info.is_confirmed);
592
593        // Clear
594        tracker.clear_primary();
595        assert!(!tracker.has_primary());
596    }
597
598    /// Minimal mock topology provider for testing.
599    struct MockTopology {
600        event_tx: broadcast::Sender<TopologyEvent>,
601        primary: RwLock<Option<TopologyNodeInfo>>,
602    }
603
604    impl MockTopology {
605        fn new() -> Self {
606            let (event_tx, _) = broadcast::channel(16);
607            Self {
608                event_tx,
609                primary: RwLock::new(None),
610            }
611        }
612
613        fn set_primary(&self, node_id: Uuid, addr: &str) {
614            *self.primary.write() = Some(TopologyNodeInfo {
615                node_id,
616                client_addr: addr.to_string(),
617                is_healthy: true,
618            });
619        }
620    }
621
622    impl TopologyProvider for MockTopology {
623        fn subscribe(&self) -> broadcast::Receiver<TopologyEvent> {
624            self.event_tx.subscribe()
625        }
626
627        fn get_primary(&self) -> Option<TopologyNodeInfo> {
628            self.primary.read().clone()
629        }
630
631        fn get_node(&self, id: Uuid) -> Option<TopologyNodeInfo> {
632            let p = self.primary.read();
633            p.as_ref().filter(|n| n.node_id == id).cloned()
634        }
635    }
636
637    #[test]
638    fn test_provider_backed_tracker() {
639        let topo = Arc::new(MockTopology::new());
640        let node_id = Uuid::new_v4();
641        topo.set_primary(node_id, "primary:5432");
642
643        let tracker = PrimaryTracker::with_provider(topo.clone());
644        tracker.detect_primary_from_provider(topo.as_ref());
645
646        assert!(tracker.has_primary());
647        assert_eq!(tracker.get_primary_id(), Some(node_id));
648    }
649
650    /// Simulate a PostgreSQL 3-node cluster (primary + sync + async standby)
651    /// where the primary fails and a standby is promoted.
652    #[test]
653    fn test_postgresql_failover_scenario() {
654        let topo = Arc::new(MockTopology::new());
655
656        // Initial state: pg-primary is the primary
657        let pg_primary = Uuid::new_v4();
658        let pg_sync = Uuid::new_v4();
659        let _pg_async = Uuid::new_v4();
660
661        topo.set_primary(pg_primary, "pg-primary:5432");
662
663        let tracker = PrimaryTracker::with_provider(topo.clone());
664        tracker.detect_primary_from_provider(topo.as_ref());
665
666        assert!(tracker.has_primary());
667        assert_eq!(tracker.get_primary_address(), Some("pg-primary:5432".to_string()));
668
669        // Subscribe to events
670        let mut rx = tracker.subscribe();
671
672        // Simulate failover: primary goes down, sync standby promoted
673        tracker.clear_primary();
674        assert!(!tracker.has_primary());
675
676        // Check Lost event was emitted
677        let event = rx.try_recv().unwrap();
678        assert!(matches!(event, PrimaryChangeEvent::Lost { old } if old == pg_primary));
679
680        // New primary detected (sync standby promoted)
681        tracker.set_primary(pg_sync, "pg-sync:5432".to_string());
682        assert!(tracker.has_primary());
683        assert_eq!(tracker.get_primary_address(), Some("pg-sync:5432".to_string()));
684        assert!(!tracker.get_primary().unwrap().is_confirmed);
685
686        // Confirm after pg_basebackup / replication catchup
687        tracker.confirm_primary();
688        assert!(tracker.get_primary().unwrap().is_confirmed);
689
690        // Check Changed event
691        let event = rx.try_recv().unwrap();
692        assert!(matches!(event, PrimaryChangeEvent::Changed { new, .. } if new == pg_sync));
693    }
694
695    /// Verify the topology provider trait can be used with custom
696    /// implementations (e.g. Patroni, pg_auto_failover, Stolon).
697    #[test]
698    fn test_custom_topology_provider() {
699        struct PatroniProvider {
700            leader: RwLock<Option<TopologyNodeInfo>>,
701            event_tx: broadcast::Sender<TopologyEvent>,
702        }
703
704        impl PatroniProvider {
705            fn new() -> Self {
706                let (tx, _) = broadcast::channel(16);
707                Self { leader: RwLock::new(None), event_tx: tx }
708            }
709            fn set_leader(&self, id: Uuid, addr: &str) {
710                *self.leader.write() = Some(TopologyNodeInfo {
711                    node_id: id,
712                    client_addr: addr.to_string(),
713                    is_healthy: true,
714                });
715            }
716        }
717
718        impl TopologyProvider for PatroniProvider {
719            fn subscribe(&self) -> broadcast::Receiver<TopologyEvent> {
720                self.event_tx.subscribe()
721            }
722            fn get_primary(&self) -> Option<TopologyNodeInfo> {
723                self.leader.read().clone()
724            }
725            fn get_node(&self, id: Uuid) -> Option<TopologyNodeInfo> {
726                self.leader.read().as_ref().filter(|n| n.node_id == id).cloned()
727            }
728        }
729
730        let patroni = Arc::new(PatroniProvider::new());
731        let leader_id = Uuid::new_v4();
732        patroni.set_leader(leader_id, "patroni-leader.svc:5432");
733
734        let tracker = PrimaryTracker::with_provider(patroni.clone());
735        tracker.detect_primary_from_provider(patroni.as_ref());
736
737        assert!(tracker.has_primary());
738        assert_eq!(
739            tracker.get_primary_address(),
740            Some("patroni-leader.svc:5432".to_string())
741        );
742    }
743
744    /// Probing unreachable nodes must not crash the poller; it must
745    /// leave `current_primary` as `None` and emit `HealthChanged`
746    /// events for each failed probe. Exercises the real `probe_recovery`
747    /// path without a live PG.
748    #[cfg(feature = "postgres-topology")]
749    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
750    async fn test_poll_nodes_all_unreachable_sets_no_primary() {
751        let nodes = vec![
752            PostgresNode {
753                node_id: Uuid::new_v4(),
754                host: "127.0.0.1".into(),
755                port: 1, // no daemon
756                user: "postgres".into(),
757                password: None,
758                database: "postgres".into(),
759            },
760            PostgresNode {
761                node_id: Uuid::new_v4(),
762                host: "127.0.0.1".into(),
763                port: 2,
764                user: "postgres".into(),
765                password: None,
766                database: "postgres".into(),
767            },
768        ];
769
770        let provider = PostgresTopologyProvider::new(nodes)
771            .with_poll_interval(Duration::from_millis(200));
772        let mut rx = provider.event_tx.subscribe();
773
774        // Run exactly one poll round.
775        provider.poll_nodes().await;
776
777        // No primary detected.
778        assert!(provider.get_primary().is_none());
779
780        // Collect health-change events. Use try_recv in a loop with a
781        // small yield budget rather than blocking, so the test is
782        // deterministic.
783        let mut health_events = 0;
784        for _ in 0..10 {
785            match rx.try_recv() {
786                Ok(TopologyEvent::HealthChanged { is_healthy: false, .. }) => {
787                    health_events += 1;
788                }
789                Ok(_) => {}
790                Err(_) => break,
791            }
792        }
793        assert!(
794            health_events >= 1,
795            "expected at least one HealthChanged event"
796        );
797    }
798}