Skip to main content

heliosdb_proxy/
primary_tracker.rs

1//! Primary Tracker - Tracks current primary node for query routing
2//!
3//! Monitors cluster topology and maintains the current primary node
4//! information. During switchover, updates are received from the
5//! switchover coordinator to ensure queries are routed correctly.
6//!
7//! # Topology Providers
8//!
9//! The primary tracker uses a `TopologyProvider` trait to abstract over
10//! different topology sources:
11//!
12//! - **HeliosDB**: Uses the internal `TopologyManager` from the replication
13//!   subsystem (feature-gated behind `heliosdb-topology`).
14//! - **PostgreSQL**: Polls `pg_stat_replication` / `pg_is_in_recovery()`
15//!   to detect primary changes (feature-gated behind `postgres-topology`).
16//! - **Manual/Standalone**: Programmatic set/clear via API calls.
17
18use parking_lot::RwLock;
19use std::sync::Arc;
20use std::time::{Duration, Instant};
21use tokio::sync::broadcast;
22use uuid::Uuid;
23
24// ── Topology provider trait ─────────────────────────────────────────
25
26/// Information about a node in the cluster topology.
27#[derive(Debug, Clone)]
28pub struct TopologyNodeInfo {
29    /// Node UUID
30    pub node_id: Uuid,
31    /// Client-facing address (host:port)
32    pub client_addr: String,
33    /// Whether the node is currently healthy
34    pub is_healthy: bool,
35}
36
37/// Events emitted by a topology provider.
38#[derive(Debug, Clone)]
39pub enum TopologyEvent {
40    /// The primary node changed.
41    PrimaryChanged {
42        old_primary: Option<Uuid>,
43        new_primary: Uuid,
44    },
45    /// A node left the cluster.
46    NodeLeft { node_id: Uuid },
47    /// A node's health status changed.
48    HealthChanged { node_id: Uuid, is_healthy: bool },
49}
50
51/// Trait abstracting topology discovery.
52///
53/// Implement this for any database backend (HeliosDB, PostgreSQL, etc.)
54/// to enable automatic primary tracking.
55pub trait TopologyProvider: Send + Sync + 'static {
56    /// Subscribe to topology change events.
57    fn subscribe(&self) -> broadcast::Receiver<TopologyEvent>;
58
59    /// Get the current primary node, if one exists.
60    fn get_primary(&self) -> Option<TopologyNodeInfo>;
61
62    /// Look up a node by its UUID.
63    fn get_node(&self, id: Uuid) -> Option<TopologyNodeInfo>;
64}
65
66// ── PostgreSQL topology provider ────────────────────────────────────
67
68/// PostgreSQL-based topology provider.
69///
70/// Discovers the primary by polling `pg_is_in_recovery()` on each
71/// configured node. Detects primary changes by comparing results
72/// across polling intervals.
73#[cfg(feature = "postgres-topology")]
74pub struct PostgresTopologyProvider {
75    /// Nodes to poll
76    nodes: Vec<PostgresNode>,
77    /// Event broadcaster
78    event_tx: broadcast::Sender<TopologyEvent>,
79    /// Current primary (cached)
80    current_primary: RwLock<Option<TopologyNodeInfo>>,
81    /// Polling interval
82    poll_interval: Duration,
83    /// Shared rustls client config for TLS negotiation. Built once at
84    /// construction time from the Mozilla root set.
85    tls_config: std::sync::Arc<rustls::ClientConfig>,
86    /// TLS policy applied to every probe connection.
87    tls_mode: crate::backend::TlsMode,
88}
89
90#[cfg(feature = "postgres-topology")]
91#[derive(Debug, Clone)]
92pub struct PostgresNode {
93    pub node_id: Uuid,
94    pub host: String,
95    pub port: u16,
96    pub user: String,
97    pub password: Option<String>,
98    pub database: String,
99}
100
101#[cfg(feature = "postgres-topology")]
102impl PostgresTopologyProvider {
103    /// Create a new PostgreSQL topology provider.
104    pub fn new(nodes: Vec<PostgresNode>) -> Self {
105        let (event_tx, _) = broadcast::channel(16);
106        Self {
107            nodes,
108            event_tx,
109            current_primary: RwLock::new(None),
110            poll_interval: Duration::from_secs(2),
111            tls_config: crate::backend::tls::default_client_config(),
112            tls_mode: crate::backend::TlsMode::Prefer,
113        }
114    }
115
116    /// Set polling interval.
117    pub fn with_poll_interval(mut self, interval: Duration) -> Self {
118        self.poll_interval = interval;
119        self
120    }
121
122    /// Set the TLS policy used when opening probe connections.
123    pub fn with_tls_mode(mut self, mode: crate::backend::TlsMode) -> Self {
124        self.tls_mode = mode;
125        self
126    }
127
128    /// Start polling in the background.
129    pub async fn start(&self) {
130        let mut interval = tokio::time::interval(self.poll_interval);
131
132        loop {
133            interval.tick().await;
134            self.poll_nodes().await;
135        }
136    }
137
138    /// Poll all nodes and detect primary.
139    async fn poll_nodes(&self) {
140        let mut next_primary: Option<TopologyNodeInfo> = None;
141
142        for node in &self.nodes {
143            match self.probe_recovery(node).await {
144                Ok(in_recovery) => {
145                    // The node reporting `pg_is_in_recovery() = false` is
146                    // the primary. In a healthy cluster there is exactly
147                    // one; we take the first we encounter so split-brain
148                    // (briefly possible during failover) still yields a
149                    // deterministic choice.
150                    if !in_recovery && next_primary.is_none() {
151                        next_primary = Some(TopologyNodeInfo {
152                            node_id: node.node_id,
153                            client_addr: format!("{}:{}", node.host, node.port),
154                            is_healthy: true,
155                        });
156                    }
157                }
158                Err(e) => {
159                    tracing::warn!(
160                        node = %node.host,
161                        port = node.port,
162                        error = %e,
163                        "topology probe failed"
164                    );
165                    let _ = self.event_tx.send(TopologyEvent::HealthChanged {
166                        node_id: node.node_id,
167                        is_healthy: false,
168                    });
169                }
170            }
171        }
172
173        let old_primary_id = self.current_primary.read().as_ref().map(|p| p.node_id);
174        let new_primary_id = next_primary.as_ref().map(|p| p.node_id);
175        if old_primary_id != new_primary_id {
176            *self.current_primary.write() = next_primary;
177            if let Some(new_id) = new_primary_id {
178                let _ = self.event_tx.send(TopologyEvent::PrimaryChanged {
179                    old_primary: old_primary_id,
180                    new_primary: new_id,
181                });
182            }
183        }
184    }
185
186    /// Connect to a single node and run `SELECT pg_is_in_recovery()`.
187    ///
188    /// Returns `Ok(true)` if the node is a standby, `Ok(false)` for a
189    /// primary. Errors propagate as `BackendError`.
190    async fn probe_recovery(&self, node: &PostgresNode) -> crate::backend::BackendResult<bool> {
191        use crate::backend::{BackendClient, BackendConfig};
192
193        let cfg = BackendConfig {
194            host: node.host.clone(),
195            port: node.port,
196            user: node.user.clone(),
197            password: node.password.clone(),
198            database: Some(node.database.clone()),
199            application_name: Some("helios-topology".into()),
200            tls_mode: self.tls_mode,
201            connect_timeout: self.poll_interval.min(Duration::from_secs(5)),
202            query_timeout: self.poll_interval,
203            tls_config: self.tls_config.clone(),
204        };
205
206        let mut client = BackendClient::connect(&cfg).await?;
207        let value = client.query_scalar("SELECT pg_is_in_recovery()").await?;
208        client.close().await;
209        Ok(value.as_bool("pg_is_in_recovery")?.unwrap_or(false))
210    }
211}
212
213#[cfg(feature = "postgres-topology")]
214impl TopologyProvider for PostgresTopologyProvider {
215    fn subscribe(&self) -> broadcast::Receiver<TopologyEvent> {
216        self.event_tx.subscribe()
217    }
218
219    fn get_primary(&self) -> Option<TopologyNodeInfo> {
220        self.current_primary.read().clone()
221    }
222
223    fn get_node(&self, id: Uuid) -> Option<TopologyNodeInfo> {
224        self.nodes
225            .iter()
226            .find(|n| n.node_id == id)
227            .map(|n| TopologyNodeInfo {
228                node_id: n.node_id,
229                client_addr: format!("{}:{}", n.host, n.port),
230                is_healthy: true, // Would be checked via actual connection
231            })
232    }
233}
234
235// ── HeliosDB topology provider (bridges to internal TopologyManager) ─
236
237#[cfg(feature = "heliosdb-topology")]
238pub mod heliosdb_provider {
239    //! Bridge to the HeliosDB-Lite internal `TopologyManager`.
240    //!
241    //! This module is only compiled when HeliosProxy is built as part of
242    //! the HeliosDB-Lite workspace (feature `heliosdb-topology`).
243    //! It wraps the internal replication types behind the generic
244    //! `TopologyProvider` trait so that `PrimaryTracker` can use them
245    //! without a hard dependency.
246
247    use super::*;
248
249    /// Wrapper that adapts the HeliosDB `TopologyManager` to the
250    /// `TopologyProvider` trait.
251    ///
252    /// Consumers pass this struct to `PrimaryTracker::with_provider()`.
253    pub struct HeliosTopologyProvider<T: HeliosTopologyBridge> {
254        inner: Arc<T>,
255    }
256
257    /// Trait that the HeliosDB replication crate must implement to
258    /// bridge into the proxy topology system.
259    ///
260    /// This avoids a direct `use crate::replication::topology` import
261    /// and allows the standalone proxy to compile without the
262    /// replication crate.
263    pub trait HeliosTopologyBridge: Send + Sync + 'static {
264        fn subscribe(&self) -> broadcast::Receiver<TopologyEvent>;
265        fn get_primary(&self) -> Option<TopologyNodeInfo>;
266        fn get_node(&self, id: Uuid) -> Option<TopologyNodeInfo>;
267    }
268
269    impl<T: HeliosTopologyBridge> HeliosTopologyProvider<T> {
270        pub fn new(inner: Arc<T>) -> Self {
271            Self { inner }
272        }
273    }
274
275    impl<T: HeliosTopologyBridge> TopologyProvider for HeliosTopologyProvider<T> {
276        fn subscribe(&self) -> broadcast::Receiver<TopologyEvent> {
277            self.inner.subscribe()
278        }
279
280        fn get_primary(&self) -> Option<TopologyNodeInfo> {
281            self.inner.get_primary()
282        }
283
284        fn get_node(&self, id: Uuid) -> Option<TopologyNodeInfo> {
285            self.inner.get_node(id)
286        }
287    }
288}
289
290// ── Primary info & events ───────────────────────────────────────────
291
292/// Primary node information
293#[derive(Debug, Clone)]
294pub struct PrimaryInfo {
295    /// Node ID
296    pub node_id: Uuid,
297    /// Client address (host:port)
298    pub address: String,
299    /// Time when this node became primary
300    pub became_primary_at: Instant,
301    /// Whether this is confirmed (vs pending switchover)
302    pub is_confirmed: bool,
303}
304
305/// Primary change event
306#[derive(Debug, Clone)]
307pub enum PrimaryChangeEvent {
308    /// Primary changed to new node
309    Changed {
310        old: Option<Uuid>,
311        new: Uuid,
312        address: String,
313    },
314    /// Primary lost (no healthy primary)
315    Lost { old: Uuid },
316    /// Primary confirmed (after switchover completes)
317    Confirmed { node_id: Uuid },
318}
319
320// ── Primary Tracker ─────────────────────────────────────────────────
321
322/// Primary Tracker
323///
324/// Can be used in three modes:
325/// 1. **With a TopologyProvider** – automatic tracking via `with_provider()`.
326/// 2. **Standalone** – manual `set_primary()` / `clear_primary()` calls.
327/// 3. **PostgreSQL** – pass a `PostgresTopologyProvider` (feature `postgres-topology`).
328pub struct PrimaryTracker {
329    /// Optional topology provider (Box<dyn> for either HeliosDB or PostgreSQL)
330    provider: Option<Arc<dyn TopologyProvider>>,
331    /// Current primary info
332    current_primary: RwLock<Option<PrimaryInfo>>,
333    /// Event broadcaster
334    event_tx: broadcast::Sender<PrimaryChangeEvent>,
335    /// Tracking interval
336    tracking_interval: Duration,
337}
338
339impl PrimaryTracker {
340    /// Create a standalone primary tracker (manual set/clear).
341    pub fn new_standalone() -> Self {
342        let (event_tx, _) = broadcast::channel(16);
343        Self {
344            provider: None,
345            current_primary: RwLock::new(None),
346            event_tx,
347            tracking_interval: Duration::from_millis(500),
348        }
349    }
350
351    /// Create a primary tracker backed by a topology provider.
352    pub fn with_provider(provider: Arc<dyn TopologyProvider>) -> Self {
353        let (event_tx, _) = broadcast::channel(16);
354        Self {
355            provider: Some(provider),
356            current_primary: RwLock::new(None),
357            event_tx,
358            tracking_interval: Duration::from_millis(500),
359        }
360    }
361
362    /// Set tracking interval.
363    pub fn with_tracking_interval(mut self, interval: Duration) -> Self {
364        self.tracking_interval = interval;
365        self
366    }
367
368    /// Subscribe to primary change events.
369    pub fn subscribe(&self) -> broadcast::Receiver<PrimaryChangeEvent> {
370        self.event_tx.subscribe()
371    }
372
373    /// Get current primary info.
374    pub fn get_primary(&self) -> Option<PrimaryInfo> {
375        self.current_primary.read().clone()
376    }
377
378    /// Get current primary node ID.
379    pub fn get_primary_id(&self) -> Option<Uuid> {
380        self.current_primary.read().as_ref().map(|p| p.node_id)
381    }
382
383    /// Get current primary address.
384    pub fn get_primary_address(&self) -> Option<String> {
385        self.current_primary
386            .read()
387            .as_ref()
388            .map(|p| p.address.clone())
389    }
390
391    /// Check if we have a healthy primary.
392    pub fn has_primary(&self) -> bool {
393        self.current_primary.read().is_some()
394    }
395
396    /// Set primary manually (or called during switchover).
397    pub fn set_primary(&self, node_id: Uuid, address: String) {
398        let old_primary = self.current_primary.read().as_ref().map(|p| p.node_id);
399
400        let new_info = PrimaryInfo {
401            node_id,
402            address: address.clone(),
403            became_primary_at: Instant::now(),
404            is_confirmed: false,
405        };
406
407        *self.current_primary.write() = Some(new_info);
408
409        let _ = self.event_tx.send(PrimaryChangeEvent::Changed {
410            old: old_primary,
411            new: node_id,
412            address,
413        });
414
415        tracing::info!(
416            "Primary tracker: set primary to {} (pending confirmation)",
417            node_id
418        );
419    }
420
421    /// Confirm the current primary (called after switchover completes).
422    pub fn confirm_primary(&self) {
423        let mut guard = self.current_primary.write();
424        if let Some(ref mut info) = *guard {
425            info.is_confirmed = true;
426            let node_id = info.node_id;
427            drop(guard);
428
429            let _ = self
430                .event_tx
431                .send(PrimaryChangeEvent::Confirmed { node_id });
432            tracing::info!("Primary tracker: confirmed primary {}", node_id);
433        }
434    }
435
436    /// Clear primary (called when primary is lost).
437    pub fn clear_primary(&self) {
438        let old_primary = self.current_primary.write().take();
439
440        if let Some(info) = old_primary {
441            let _ = self
442                .event_tx
443                .send(PrimaryChangeEvent::Lost { old: info.node_id });
444            tracing::warn!("Primary tracker: lost primary {}", info.node_id);
445        }
446    }
447
448    /// Run the primary tracker loop (requires a topology provider).
449    ///
450    /// If no provider is set, this returns immediately — use manual
451    /// `set_primary()` / `clear_primary()` instead.
452    pub async fn run(&self) {
453        let provider = match &self.provider {
454            Some(p) => Arc::clone(p),
455            None => {
456                tracing::info!("Primary tracker: no topology provider, running in standalone mode");
457                return;
458            }
459        };
460
461        let mut topology_rx = provider.subscribe();
462        let mut interval = tokio::time::interval(self.tracking_interval);
463
464        // Initial detection
465        self.detect_primary_from_provider(&*provider);
466
467        loop {
468            tokio::select! {
469                event = topology_rx.recv() => {
470                    match event {
471                        Ok(TopologyEvent::PrimaryChanged { old_primary, new_primary }) => {
472                            self.handle_primary_changed(&*provider, old_primary, new_primary);
473                        }
474                        Ok(TopologyEvent::NodeLeft { node_id }) => {
475                            self.handle_node_left(node_id);
476                        }
477                        Ok(TopologyEvent::HealthChanged { node_id, is_healthy }) => {
478                            self.handle_health_changed(node_id, is_healthy);
479                        }
480                        Err(broadcast::error::RecvError::Lagged(n)) => {
481                            tracing::warn!("Primary tracker lagged {} events", n);
482                        }
483                        Err(broadcast::error::RecvError::Closed) => {
484                            break;
485                        }
486                    }
487                }
488                _ = interval.tick() => {
489                    self.periodic_check(&*provider);
490                }
491            }
492        }
493    }
494
495    // ── Internal helpers ────────────────────────────────────────────
496
497    fn detect_primary_from_provider(&self, provider: &dyn TopologyProvider) {
498        if let Some(primary) = provider.get_primary() {
499            let info = PrimaryInfo {
500                node_id: primary.node_id,
501                address: primary.client_addr.clone(),
502                became_primary_at: Instant::now(),
503                is_confirmed: true,
504            };
505
506            *self.current_primary.write() = Some(info);
507            tracing::info!("Primary tracker: detected primary {}", primary.node_id);
508        }
509    }
510
511    fn handle_primary_changed(
512        &self,
513        provider: &dyn TopologyProvider,
514        old: Option<Uuid>,
515        new: Uuid,
516    ) {
517        let address = provider
518            .get_node(new)
519            .map(|n| n.client_addr)
520            .unwrap_or_else(|| format!("{}:5432", new));
521
522        let info = PrimaryInfo {
523            node_id: new,
524            address: address.clone(),
525            became_primary_at: Instant::now(),
526            is_confirmed: true,
527        };
528
529        *self.current_primary.write() = Some(info);
530
531        let _ = self
532            .event_tx
533            .send(PrimaryChangeEvent::Changed { old, new, address });
534
535        tracing::info!("Primary tracker: primary changed from {:?} to {}", old, new);
536    }
537
538    fn handle_node_left(&self, node_id: Uuid) {
539        let current = self.current_primary.read().as_ref().map(|p| p.node_id);
540        if current == Some(node_id) {
541            self.clear_primary();
542        }
543    }
544
545    fn handle_health_changed(&self, node_id: Uuid, is_healthy: bool) {
546        if !is_healthy {
547            let current = self.current_primary.read().as_ref().map(|p| p.node_id);
548            if current == Some(node_id) {
549                tracing::warn!("Primary {} became unhealthy", node_id);
550            }
551        }
552    }
553
554    fn periodic_check(&self, provider: &dyn TopologyProvider) {
555        let current_id = self.current_primary.read().as_ref().map(|p| p.node_id);
556
557        if let Some(id) = current_id {
558            if let Some(node) = provider.get_node(id) {
559                if !node.is_healthy {
560                    tracing::warn!("Primary {} is unhealthy in periodic check", id);
561                }
562            } else {
563                self.clear_primary();
564            }
565        } else {
566            self.detect_primary_from_provider(provider);
567        }
568    }
569}
570
571#[cfg(test)]
572mod tests {
573    use super::*;
574
575    #[test]
576    fn test_standalone_primary_tracker() {
577        let tracker = PrimaryTracker::new_standalone();
578
579        assert!(!tracker.has_primary());
580
581        let node_id = Uuid::new_v4();
582        tracker.set_primary(node_id, "localhost:5432".to_string());
583
584        assert!(tracker.has_primary());
585        assert_eq!(tracker.get_primary_id(), Some(node_id));
586        assert_eq!(
587            tracker.get_primary_address(),
588            Some("localhost:5432".to_string())
589        );
590
591        // Not confirmed yet
592        let info = tracker.get_primary().unwrap();
593        assert!(!info.is_confirmed);
594
595        // Confirm
596        tracker.confirm_primary();
597        let info = tracker.get_primary().unwrap();
598        assert!(info.is_confirmed);
599
600        // Clear
601        tracker.clear_primary();
602        assert!(!tracker.has_primary());
603    }
604
605    /// Minimal mock topology provider for testing.
606    struct MockTopology {
607        event_tx: broadcast::Sender<TopologyEvent>,
608        primary: RwLock<Option<TopologyNodeInfo>>,
609    }
610
611    impl MockTopology {
612        fn new() -> Self {
613            let (event_tx, _) = broadcast::channel(16);
614            Self {
615                event_tx,
616                primary: RwLock::new(None),
617            }
618        }
619
620        fn set_primary(&self, node_id: Uuid, addr: &str) {
621            *self.primary.write() = Some(TopologyNodeInfo {
622                node_id,
623                client_addr: addr.to_string(),
624                is_healthy: true,
625            });
626        }
627    }
628
629    impl TopologyProvider for MockTopology {
630        fn subscribe(&self) -> broadcast::Receiver<TopologyEvent> {
631            self.event_tx.subscribe()
632        }
633
634        fn get_primary(&self) -> Option<TopologyNodeInfo> {
635            self.primary.read().clone()
636        }
637
638        fn get_node(&self, id: Uuid) -> Option<TopologyNodeInfo> {
639            let p = self.primary.read();
640            p.as_ref().filter(|n| n.node_id == id).cloned()
641        }
642    }
643
644    #[test]
645    fn test_provider_backed_tracker() {
646        let topo = Arc::new(MockTopology::new());
647        let node_id = Uuid::new_v4();
648        topo.set_primary(node_id, "primary:5432");
649
650        let tracker = PrimaryTracker::with_provider(topo.clone());
651        tracker.detect_primary_from_provider(topo.as_ref());
652
653        assert!(tracker.has_primary());
654        assert_eq!(tracker.get_primary_id(), Some(node_id));
655    }
656
657    /// Simulate a PostgreSQL 3-node cluster (primary + sync + async standby)
658    /// where the primary fails and a standby is promoted.
659    #[test]
660    fn test_postgresql_failover_scenario() {
661        let topo = Arc::new(MockTopology::new());
662
663        // Initial state: pg-primary is the primary
664        let pg_primary = Uuid::new_v4();
665        let pg_sync = Uuid::new_v4();
666        let _pg_async = Uuid::new_v4();
667
668        topo.set_primary(pg_primary, "pg-primary:5432");
669
670        let tracker = PrimaryTracker::with_provider(topo.clone());
671        tracker.detect_primary_from_provider(topo.as_ref());
672
673        assert!(tracker.has_primary());
674        assert_eq!(
675            tracker.get_primary_address(),
676            Some("pg-primary:5432".to_string())
677        );
678
679        // Subscribe to events
680        let mut rx = tracker.subscribe();
681
682        // Simulate failover: primary goes down, sync standby promoted
683        tracker.clear_primary();
684        assert!(!tracker.has_primary());
685
686        // Check Lost event was emitted
687        let event = rx.try_recv().unwrap();
688        assert!(matches!(event, PrimaryChangeEvent::Lost { old } if old == pg_primary));
689
690        // New primary detected (sync standby promoted)
691        tracker.set_primary(pg_sync, "pg-sync:5432".to_string());
692        assert!(tracker.has_primary());
693        assert_eq!(
694            tracker.get_primary_address(),
695            Some("pg-sync:5432".to_string())
696        );
697        assert!(!tracker.get_primary().unwrap().is_confirmed);
698
699        // Confirm after pg_basebackup / replication catchup
700        tracker.confirm_primary();
701        assert!(tracker.get_primary().unwrap().is_confirmed);
702
703        // Check Changed event
704        let event = rx.try_recv().unwrap();
705        assert!(matches!(event, PrimaryChangeEvent::Changed { new, .. } if new == pg_sync));
706    }
707
708    /// Verify the topology provider trait can be used with custom
709    /// implementations (e.g. Patroni, pg_auto_failover, Stolon).
710    #[test]
711    fn test_custom_topology_provider() {
712        struct PatroniProvider {
713            leader: RwLock<Option<TopologyNodeInfo>>,
714            event_tx: broadcast::Sender<TopologyEvent>,
715        }
716
717        impl PatroniProvider {
718            fn new() -> Self {
719                let (tx, _) = broadcast::channel(16);
720                Self {
721                    leader: RwLock::new(None),
722                    event_tx: tx,
723                }
724            }
725            fn set_leader(&self, id: Uuid, addr: &str) {
726                *self.leader.write() = Some(TopologyNodeInfo {
727                    node_id: id,
728                    client_addr: addr.to_string(),
729                    is_healthy: true,
730                });
731            }
732        }
733
734        impl TopologyProvider for PatroniProvider {
735            fn subscribe(&self) -> broadcast::Receiver<TopologyEvent> {
736                self.event_tx.subscribe()
737            }
738            fn get_primary(&self) -> Option<TopologyNodeInfo> {
739                self.leader.read().clone()
740            }
741            fn get_node(&self, id: Uuid) -> Option<TopologyNodeInfo> {
742                self.leader
743                    .read()
744                    .as_ref()
745                    .filter(|n| n.node_id == id)
746                    .cloned()
747            }
748        }
749
750        let patroni = Arc::new(PatroniProvider::new());
751        let leader_id = Uuid::new_v4();
752        patroni.set_leader(leader_id, "patroni-leader.svc:5432");
753
754        let tracker = PrimaryTracker::with_provider(patroni.clone());
755        tracker.detect_primary_from_provider(patroni.as_ref());
756
757        assert!(tracker.has_primary());
758        assert_eq!(
759            tracker.get_primary_address(),
760            Some("patroni-leader.svc:5432".to_string())
761        );
762    }
763
764    /// Probing unreachable nodes must not crash the poller; it must
765    /// leave `current_primary` as `None` and emit `HealthChanged`
766    /// events for each failed probe. Exercises the real `probe_recovery`
767    /// path without a live PG.
768    #[cfg(feature = "postgres-topology")]
769    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
770    async fn test_poll_nodes_all_unreachable_sets_no_primary() {
771        let nodes = vec![
772            PostgresNode {
773                node_id: Uuid::new_v4(),
774                host: "127.0.0.1".into(),
775                port: 1, // no daemon
776                user: "postgres".into(),
777                password: None,
778                database: "postgres".into(),
779            },
780            PostgresNode {
781                node_id: Uuid::new_v4(),
782                host: "127.0.0.1".into(),
783                port: 2,
784                user: "postgres".into(),
785                password: None,
786                database: "postgres".into(),
787            },
788        ];
789
790        let provider =
791            PostgresTopologyProvider::new(nodes).with_poll_interval(Duration::from_millis(200));
792        let mut rx = provider.event_tx.subscribe();
793
794        // Run exactly one poll round.
795        provider.poll_nodes().await;
796
797        // No primary detected.
798        assert!(provider.get_primary().is_none());
799
800        // Collect health-change events. Use try_recv in a loop with a
801        // small yield budget rather than blocking, so the test is
802        // deterministic.
803        let mut health_events = 0;
804        for _ in 0..10 {
805            match rx.try_recv() {
806                Ok(TopologyEvent::HealthChanged {
807                    is_healthy: false, ..
808                }) => {
809                    health_events += 1;
810                }
811                Ok(_) => {}
812                Err(_) => break,
813            }
814        }
815        assert!(
816            health_events >= 1,
817            "expected at least one HealthChanged event"
818        );
819    }
820}