Skip to main content

prax_query/
replication.rs

1//! Replication and high availability support.
2//!
3//! This module provides types for managing database replication, read replicas,
4//! connection routing, and failover handling.
5//!
6//! # Database Support
7//!
8//! | Feature            | PostgreSQL | MySQL | SQLite | MSSQL     | MongoDB     |
9//! |--------------------|------------|-------|--------|-----------|-------------|
10//! | Read replicas      | ✅         | ✅    | ❌     | ✅ Always | ✅ Replica  |
11//! | Logical replication| ✅         | ✅    | ❌     | ✅        | ✅          |
12//! | Connection routing | ✅         | ✅    | ❌     | ✅        | ✅          |
13//! | Auto-failover      | ✅         | ✅    | ❌     | ✅        | ✅          |
14//! | Read preference    | ❌         | ❌    | ❌     | ❌        | ✅          |
15
16use std::collections::HashMap;
17use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
18use std::time::{Duration, Instant};
19
20use serde::{Deserialize, Serialize};
21
22use crate::error::{QueryError, QueryResult};
23
24// ============================================================================
25// Replica Configuration
26// ============================================================================
27
28/// Configuration for a database replica.
29#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
30pub struct ReplicaConfig {
31    /// Unique identifier for this replica.
32    pub id: String,
33    /// Connection URL.
34    pub url: String,
35    /// Role of this replica.
36    pub role: ReplicaRole,
37    /// Priority for failover (higher = preferred).
38    pub priority: u32,
39    /// Weight for load balancing (higher = more traffic).
40    pub weight: u32,
41    /// Region/datacenter for locality-aware routing.
42    pub region: Option<String>,
43    /// Maximum acceptable replication lag.
44    pub max_lag: Option<Duration>,
45}
46
47/// Role of a replica in the cluster.
48#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
49pub enum ReplicaRole {
50    /// Primary/master - handles writes.
51    Primary,
52    /// Secondary/replica - handles reads.
53    Secondary,
54    /// Arbiter - for elections only (MongoDB).
55    Arbiter,
56    /// Hidden - for backups, not queryable.
57    Hidden,
58}
59
60impl ReplicaConfig {
61    /// Create a primary replica config.
62    pub fn primary(id: impl Into<String>, url: impl Into<String>) -> Self {
63        Self {
64            id: id.into(),
65            url: url.into(),
66            role: ReplicaRole::Primary,
67            priority: 100,
68            weight: 100,
69            region: None,
70            max_lag: None,
71        }
72    }
73
74    /// Create a secondary replica config.
75    pub fn secondary(id: impl Into<String>, url: impl Into<String>) -> Self {
76        Self {
77            id: id.into(),
78            url: url.into(),
79            role: ReplicaRole::Secondary,
80            priority: 50,
81            weight: 100,
82            region: None,
83            max_lag: Some(Duration::from_secs(10)),
84        }
85    }
86
87    /// Set region.
88    pub fn with_region(mut self, region: impl Into<String>) -> Self {
89        self.region = Some(region.into());
90        self
91    }
92
93    /// Set weight.
94    pub fn with_weight(mut self, weight: u32) -> Self {
95        self.weight = weight;
96        self
97    }
98
99    /// Set priority.
100    pub fn with_priority(mut self, priority: u32) -> Self {
101        self.priority = priority;
102        self
103    }
104
105    /// Set max lag.
106    pub fn with_max_lag(mut self, lag: Duration) -> Self {
107        self.max_lag = Some(lag);
108        self
109    }
110}
111
112// ============================================================================
113// Replica Set Configuration
114// ============================================================================
115
116/// Configuration for a replica set.
117#[derive(Debug, Clone, Serialize, Deserialize)]
118pub struct ReplicaSetConfig {
119    /// Name of the replica set.
120    pub name: String,
121    /// List of replicas.
122    pub replicas: Vec<ReplicaConfig>,
123    /// Default read preference.
124    pub default_read_preference: ReadPreference,
125    /// Health check interval.
126    pub health_check_interval: Duration,
127    /// Failover timeout.
128    pub failover_timeout: Duration,
129}
130
131impl ReplicaSetConfig {
132    /// Create a new replica set config.
133    pub fn new(name: impl Into<String>) -> ReplicaSetBuilder {
134        ReplicaSetBuilder::new(name)
135    }
136
137    /// Get the primary replica.
138    pub fn primary(&self) -> Option<&ReplicaConfig> {
139        self.replicas
140            .iter()
141            .find(|r| r.role == ReplicaRole::Primary)
142    }
143
144    /// Get all secondary replicas.
145    pub fn secondaries(&self) -> impl Iterator<Item = &ReplicaConfig> {
146        self.replicas
147            .iter()
148            .filter(|r| r.role == ReplicaRole::Secondary)
149    }
150
151    /// Get replicas in a specific region.
152    pub fn in_region(&self, region: &str) -> impl Iterator<Item = &ReplicaConfig> {
153        self.replicas
154            .iter()
155            .filter(move |r| r.region.as_deref() == Some(region))
156    }
157}
158
159/// Builder for replica set configuration.
160#[derive(Debug, Clone)]
161pub struct ReplicaSetBuilder {
162    name: String,
163    replicas: Vec<ReplicaConfig>,
164    default_read_preference: ReadPreference,
165    health_check_interval: Duration,
166    failover_timeout: Duration,
167}
168
169impl ReplicaSetBuilder {
170    /// Create a new builder.
171    pub fn new(name: impl Into<String>) -> Self {
172        Self {
173            name: name.into(),
174            replicas: Vec::new(),
175            default_read_preference: ReadPreference::Primary,
176            health_check_interval: Duration::from_secs(10),
177            failover_timeout: Duration::from_secs(30),
178        }
179    }
180
181    /// Add a replica.
182    pub fn replica(mut self, config: ReplicaConfig) -> Self {
183        self.replicas.push(config);
184        self
185    }
186
187    /// Add primary.
188    pub fn primary(self, id: impl Into<String>, url: impl Into<String>) -> Self {
189        self.replica(ReplicaConfig::primary(id, url))
190    }
191
192    /// Add secondary.
193    pub fn secondary(self, id: impl Into<String>, url: impl Into<String>) -> Self {
194        self.replica(ReplicaConfig::secondary(id, url))
195    }
196
197    /// Set default read preference.
198    pub fn read_preference(mut self, pref: ReadPreference) -> Self {
199        self.default_read_preference = pref;
200        self
201    }
202
203    /// Set health check interval.
204    pub fn health_check_interval(mut self, interval: Duration) -> Self {
205        self.health_check_interval = interval;
206        self
207    }
208
209    /// Set failover timeout.
210    pub fn failover_timeout(mut self, timeout: Duration) -> Self {
211        self.failover_timeout = timeout;
212        self
213    }
214
215    /// Build the config.
216    pub fn build(self) -> ReplicaSetConfig {
217        ReplicaSetConfig {
218            name: self.name,
219            replicas: self.replicas,
220            default_read_preference: self.default_read_preference,
221            health_check_interval: self.health_check_interval,
222            failover_timeout: self.failover_timeout,
223        }
224    }
225}
226
227// ============================================================================
228// Read Preference
229// ============================================================================
230
231/// Read preference for query routing.
232#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
233pub enum ReadPreference {
234    /// Always read from primary.
235    Primary,
236    /// Prefer primary, fallback to secondary.
237    PrimaryPreferred,
238    /// Always read from secondary.
239    Secondary,
240    /// Prefer secondary, fallback to primary.
241    SecondaryPreferred,
242    /// Read from nearest replica by latency.
243    Nearest,
244    /// Read from specific region.
245    Region(String),
246    /// Custom tag set (MongoDB).
247    TagSet(Vec<HashMap<String, String>>),
248}
249
250impl ReadPreference {
251    /// Create a region preference.
252    pub fn region(region: impl Into<String>) -> Self {
253        Self::Region(region.into())
254    }
255
256    /// Create a tag set preference.
257    pub fn tag_set(tags: Vec<HashMap<String, String>>) -> Self {
258        Self::TagSet(tags)
259    }
260
261    /// Convert to MongoDB read preference string.
262    pub fn to_mongodb(&self) -> &'static str {
263        match self {
264            Self::Primary => "primary",
265            Self::PrimaryPreferred => "primaryPreferred",
266            Self::Secondary => "secondary",
267            Self::SecondaryPreferred => "secondaryPreferred",
268            Self::Nearest => "nearest",
269            Self::Region(_) | Self::TagSet(_) => "nearest",
270        }
271    }
272
273    /// Check if this preference allows reading from primary.
274    pub fn allows_primary(&self) -> bool {
275        matches!(
276            self,
277            Self::Primary
278                | Self::PrimaryPreferred
279                | Self::Nearest
280                | Self::Region(_)
281                | Self::TagSet(_)
282        )
283    }
284
285    /// Check if this preference allows reading from secondary.
286    pub fn allows_secondary(&self) -> bool {
287        !matches!(self, Self::Primary)
288    }
289}
290
291impl Default for ReadPreference {
292    fn default() -> Self {
293        Self::Primary
294    }
295}
296
297// ============================================================================
298// Replica Health
299// ============================================================================
300
301/// Health status of a replica.
302#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
303pub enum HealthStatus {
304    /// Replica is healthy and accepting connections.
305    Healthy,
306    /// Replica is degraded (high lag, slow responses).
307    Degraded,
308    /// Replica is unhealthy (not responding).
309    Unhealthy,
310    /// Health status unknown (not yet checked).
311    Unknown,
312}
313
314/// Health information for a replica.
315#[derive(Debug, Clone)]
316pub struct ReplicaHealth {
317    /// Replica ID.
318    pub id: String,
319    /// Current health status.
320    pub status: HealthStatus,
321    /// Current replication lag (if known).
322    pub lag: Option<Duration>,
323    /// Last successful health check.
324    pub last_check: Option<Instant>,
325    /// Response latency.
326    pub latency: Option<Duration>,
327    /// Consecutive failures.
328    pub consecutive_failures: u32,
329}
330
331impl ReplicaHealth {
332    /// Create a new health record.
333    pub fn new(id: impl Into<String>) -> Self {
334        Self {
335            id: id.into(),
336            status: HealthStatus::Unknown,
337            lag: None,
338            last_check: None,
339            latency: None,
340            consecutive_failures: 0,
341        }
342    }
343
344    /// Mark as healthy.
345    pub fn mark_healthy(&mut self, latency: Duration, lag: Option<Duration>) {
346        self.status = HealthStatus::Healthy;
347        self.latency = Some(latency);
348        self.lag = lag;
349        self.last_check = Some(Instant::now());
350        self.consecutive_failures = 0;
351    }
352
353    /// Mark as degraded.
354    pub fn mark_degraded(&mut self, reason: &str) {
355        self.status = HealthStatus::Degraded;
356        self.last_check = Some(Instant::now());
357        let _ = reason; // Could log this
358    }
359
360    /// Mark as unhealthy.
361    pub fn mark_unhealthy(&mut self) {
362        self.status = HealthStatus::Unhealthy;
363        self.last_check = Some(Instant::now());
364        self.consecutive_failures += 1;
365    }
366
367    /// Check if replica is usable for queries.
368    pub fn is_usable(&self) -> bool {
369        matches!(self.status, HealthStatus::Healthy | HealthStatus::Degraded)
370    }
371}
372
373// ============================================================================
374// Connection Router
375// ============================================================================
376
377/// Query type for routing decisions.
378#[derive(Debug, Clone, Copy, PartialEq, Eq)]
379pub enum QueryType {
380    /// Read-only query.
381    Read,
382    /// Write query.
383    Write,
384    /// Transaction (requires primary).
385    Transaction,
386}
387
388/// Connection router for read/write splitting.
389#[derive(Debug)]
390pub struct ConnectionRouter {
391    /// Replica set configuration.
392    config: ReplicaSetConfig,
393    /// Health status of each replica.
394    health: HashMap<String, ReplicaHealth>,
395    /// Current primary ID.
396    current_primary: Option<String>,
397    /// Round-robin counter for load balancing.
398    round_robin: AtomicUsize,
399    /// Whether router is in failover mode.
400    in_failover: AtomicBool,
401}
402
403impl ConnectionRouter {
404    /// Create a new router.
405    pub fn new(config: ReplicaSetConfig) -> Self {
406        let mut health = HashMap::new();
407        let mut primary_id = None;
408
409        for replica in &config.replicas {
410            health.insert(replica.id.clone(), ReplicaHealth::new(&replica.id));
411            if replica.role == ReplicaRole::Primary {
412                primary_id = Some(replica.id.clone());
413            }
414        }
415
416        Self {
417            config,
418            health,
419            current_primary: primary_id,
420            round_robin: AtomicUsize::new(0),
421            in_failover: AtomicBool::new(false),
422        }
423    }
424
425    /// Get replica for a query based on query type and read preference.
426    pub fn route(
427        &self,
428        query_type: QueryType,
429        preference: Option<&ReadPreference>,
430    ) -> QueryResult<&ReplicaConfig> {
431        let pref = preference.unwrap_or(&self.config.default_read_preference);
432
433        match query_type {
434            QueryType::Write | QueryType::Transaction => self.get_primary(),
435            QueryType::Read => self.route_read(pref),
436        }
437    }
438
439    /// Get the primary replica.
440    pub fn get_primary(&self) -> QueryResult<&ReplicaConfig> {
441        let primary_id = self
442            .current_primary
443            .as_ref()
444            .ok_or_else(|| QueryError::connection("No primary replica available"))?;
445
446        self.config
447            .replicas
448            .iter()
449            .find(|r| &r.id == primary_id)
450            .ok_or_else(|| QueryError::connection("Primary replica not found"))
451    }
452
453    /// Route a read query based on preference.
454    fn route_read(&self, preference: &ReadPreference) -> QueryResult<&ReplicaConfig> {
455        match preference {
456            ReadPreference::Primary => self.get_primary(),
457            ReadPreference::PrimaryPreferred => {
458                self.get_primary().or_else(|_| self.get_any_secondary())
459            }
460            ReadPreference::Secondary => self.get_any_secondary(),
461            ReadPreference::SecondaryPreferred => {
462                self.get_any_secondary().or_else(|_| self.get_primary())
463            }
464            ReadPreference::Nearest => self.get_nearest(),
465            ReadPreference::Region(region) => self.get_in_region(region),
466            ReadPreference::TagSet(_tags) => {
467                // Simplified: just get nearest for now
468                self.get_nearest()
469            }
470        }
471    }
472
473    /// Get any healthy secondary.
474    fn get_any_secondary(&self) -> QueryResult<&ReplicaConfig> {
475        let secondaries: Vec<_> = self
476            .config
477            .secondaries()
478            .filter(|r| self.is_replica_healthy(&r.id))
479            .collect();
480
481        if secondaries.is_empty() {
482            return Err(QueryError::connection(
483                "No healthy secondary replicas available",
484            ));
485        }
486
487        // Round-robin selection
488        let idx = self.round_robin.fetch_add(1, Ordering::Relaxed) % secondaries.len();
489        Ok(secondaries[idx])
490    }
491
492    /// Get the nearest replica by latency.
493    fn get_nearest(&self) -> QueryResult<&ReplicaConfig> {
494        let mut best: Option<(&ReplicaConfig, Duration)> = None;
495
496        for replica in &self.config.replicas {
497            if !self.is_replica_healthy(&replica.id) {
498                continue;
499            }
500
501            if let Some(health) = self.health.get(&replica.id) {
502                if let Some(latency) = health.latency {
503                    match &best {
504                        None => best = Some((replica, latency)),
505                        Some((_, best_latency)) if latency < *best_latency => {
506                            best = Some((replica, latency));
507                        }
508                        _ => {}
509                    }
510                }
511            }
512        }
513
514        best.map(|(r, _)| r)
515            .ok_or_else(|| QueryError::connection("No healthy replicas available"))
516    }
517
518    /// Get replica in specific region.
519    fn get_in_region(&self, region: &str) -> QueryResult<&ReplicaConfig> {
520        let replicas: Vec<_> = self
521            .config
522            .in_region(region)
523            .filter(|r| self.is_replica_healthy(&r.id))
524            .collect();
525
526        if replicas.is_empty() {
527            // Fallback to nearest
528            return self.get_nearest();
529        }
530
531        let idx = self.round_robin.fetch_add(1, Ordering::Relaxed) % replicas.len();
532        Ok(replicas[idx])
533    }
534
535    /// Check if a replica is healthy.
536    fn is_replica_healthy(&self, id: &str) -> bool {
537        self.health.get(id).map(|h| h.is_usable()).unwrap_or(false)
538    }
539
540    /// Update health status of a replica.
541    pub fn update_health(
542        &mut self,
543        id: &str,
544        status: HealthStatus,
545        latency: Option<Duration>,
546        lag: Option<Duration>,
547    ) {
548        if let Some(health) = self.health.get_mut(id) {
549            match status {
550                HealthStatus::Healthy => {
551                    health.mark_healthy(latency.unwrap_or(Duration::ZERO), lag);
552                }
553                HealthStatus::Degraded => {
554                    health.mark_degraded("degraded");
555                }
556                HealthStatus::Unhealthy => {
557                    health.mark_unhealthy();
558                }
559                HealthStatus::Unknown => {}
560            }
561        }
562    }
563
564    /// Check if replication lag is acceptable.
565    pub fn check_lag(&self, replica_id: &str, max_lag: Duration) -> bool {
566        self.health
567            .get(replica_id)
568            .and_then(|h| h.lag)
569            .map(|lag| lag <= max_lag)
570            .unwrap_or(false)
571    }
572
573    /// Initiate failover to a new primary.
574    pub fn initiate_failover(&mut self) -> QueryResult<String> {
575        self.in_failover.store(true, Ordering::SeqCst);
576
577        // Find best candidate (highest priority secondary that is healthy)
578        let candidate = self
579            .config
580            .replicas
581            .iter()
582            .filter(|r| r.role == ReplicaRole::Secondary)
583            .filter(|r| self.is_replica_healthy(&r.id))
584            .max_by_key(|r| r.priority);
585
586        match candidate {
587            Some(new_primary) => {
588                let new_primary_id = new_primary.id.clone();
589                self.current_primary = Some(new_primary_id.clone());
590                self.in_failover.store(false, Ordering::SeqCst);
591                Ok(new_primary_id)
592            }
593            None => {
594                self.in_failover.store(false, Ordering::SeqCst);
595                Err(QueryError::connection(
596                    "No suitable failover candidate found",
597                ))
598            }
599        }
600    }
601
602    /// Check if router is currently in failover mode.
603    pub fn is_in_failover(&self) -> bool {
604        self.in_failover.load(Ordering::SeqCst)
605    }
606}
607
608// ============================================================================
609// Replication Lag Monitor
610// ============================================================================
611
612/// Monitor for tracking replication lag.
613#[derive(Debug)]
614pub struct LagMonitor {
615    /// Lag measurements per replica.
616    measurements: HashMap<String, LagMeasurement>,
617    /// Maximum acceptable lag.
618    max_acceptable_lag: Duration,
619}
620
621/// Lag measurement for a replica.
622#[derive(Debug, Clone)]
623pub struct LagMeasurement {
624    /// Current lag.
625    pub current: Duration,
626    /// Average lag.
627    pub average: Duration,
628    /// Maximum observed lag.
629    pub max: Duration,
630    /// Timestamp of measurement.
631    pub timestamp: Instant,
632    /// Number of samples.
633    pub samples: u64,
634}
635
636impl LagMonitor {
637    /// Create a new lag monitor.
638    pub fn new(max_acceptable_lag: Duration) -> Self {
639        Self {
640            measurements: HashMap::new(),
641            max_acceptable_lag,
642        }
643    }
644
645    /// Record a lag measurement.
646    pub fn record(&mut self, replica_id: &str, lag: Duration) {
647        let entry = self
648            .measurements
649            .entry(replica_id.to_string())
650            .or_insert_with(|| LagMeasurement {
651                current: Duration::ZERO,
652                average: Duration::ZERO,
653                max: Duration::ZERO,
654                timestamp: Instant::now(),
655                samples: 0,
656            });
657
658        entry.current = lag;
659        entry.max = entry.max.max(lag);
660        entry.samples += 1;
661
662        // Exponential moving average
663        let alpha = 0.3;
664        let new_avg = Duration::from_secs_f64(
665            entry.average.as_secs_f64() * (1.0 - alpha) + lag.as_secs_f64() * alpha,
666        );
667        entry.average = new_avg;
668        entry.timestamp = Instant::now();
669    }
670
671    /// Check if a replica is within acceptable lag.
672    pub fn is_acceptable(&self, replica_id: &str) -> bool {
673        self.measurements
674            .get(replica_id)
675            .map(|m| m.current <= self.max_acceptable_lag)
676            .unwrap_or(true) // Unknown = assume OK
677    }
678
679    /// Get current lag for a replica.
680    pub fn get_lag(&self, replica_id: &str) -> Option<Duration> {
681        self.measurements.get(replica_id).map(|m| m.current)
682    }
683
684    /// Get all replicas that are lagging too much.
685    pub fn get_lagging_replicas(&self) -> Vec<&str> {
686        self.measurements
687            .iter()
688            .filter(|(_, m)| m.current > self.max_acceptable_lag)
689            .map(|(id, _)| id.as_str())
690            .collect()
691    }
692}
693
694// ============================================================================
695// SQL Helpers for Replication Lag
696// ============================================================================
697
698/// SQL queries for checking replication lag.
699pub mod lag_queries {
700    use crate::sql::DatabaseType;
701
702    /// Generate SQL to check replication lag.
703    pub fn check_lag_sql(db_type: DatabaseType) -> &'static str {
704        match db_type {
705            DatabaseType::PostgreSQL => {
706                // Returns lag in seconds
707                "SELECT EXTRACT(EPOCH FROM (now() - pg_last_xact_replay_timestamp()))::INT AS lag_seconds"
708            }
709            DatabaseType::MySQL => {
710                // Returns Seconds_Behind_Master
711                "SHOW SLAVE STATUS"
712            }
713            DatabaseType::MSSQL => {
714                // Check AG synchronization state
715                "SELECT datediff(s, last_commit_time, getdate()) AS lag_seconds \
716                 FROM sys.dm_hadr_database_replica_states \
717                 WHERE is_local = 1"
718            }
719            DatabaseType::SQLite => {
720                // SQLite doesn't have replication
721                "SELECT 0 AS lag_seconds"
722            }
723        }
724    }
725
726    /// Generate SQL to check if replica is primary.
727    pub fn is_primary_sql(db_type: DatabaseType) -> &'static str {
728        match db_type {
729            DatabaseType::PostgreSQL => "SELECT NOT pg_is_in_recovery() AS is_primary",
730            DatabaseType::MySQL => "SELECT @@read_only = 0 AS is_primary",
731            DatabaseType::MSSQL => {
732                "SELECT CASE WHEN role = 1 THEN 1 ELSE 0 END AS is_primary \
733                 FROM sys.dm_hadr_availability_replica_states \
734                 WHERE is_local = 1"
735            }
736            DatabaseType::SQLite => "SELECT 1 AS is_primary",
737        }
738    }
739
740    /// Generate SQL to get replica status.
741    pub fn replica_status_sql(db_type: DatabaseType) -> &'static str {
742        match db_type {
743            DatabaseType::PostgreSQL => {
744                "SELECT \
745                     pg_is_in_recovery() AS is_replica, \
746                     pg_last_wal_receive_lsn() AS receive_lsn, \
747                     pg_last_wal_replay_lsn() AS replay_lsn"
748            }
749            DatabaseType::MySQL => "SHOW REPLICA STATUS",
750            DatabaseType::MSSQL => {
751                "SELECT synchronization_state_desc, synchronization_health_desc \
752                 FROM sys.dm_hadr_database_replica_states \
753                 WHERE is_local = 1"
754            }
755            DatabaseType::SQLite => "SELECT 'primary' AS status",
756        }
757    }
758}
759
760// ============================================================================
761// MongoDB Specific
762// ============================================================================
763
764/// MongoDB-specific replication types.
765pub mod mongodb {
766    use serde::{Deserialize, Serialize};
767    use serde_json::Value as JsonValue;
768
769    use super::ReadPreference;
770
771    /// MongoDB read concern level.
772    #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
773    pub enum ReadConcern {
774        /// Read from local data (fastest, may be stale).
775        Local,
776        /// Read data committed to majority of nodes.
777        Majority,
778        /// Linearizable reads (strongest, slowest).
779        Linearizable,
780        /// Read data available at query start.
781        Snapshot,
782        /// Read available data (may return orphaned docs).
783        Available,
784    }
785
786    impl ReadConcern {
787        /// Convert to MongoDB string.
788        pub fn as_str(&self) -> &'static str {
789            match self {
790                Self::Local => "local",
791                Self::Majority => "majority",
792                Self::Linearizable => "linearizable",
793                Self::Snapshot => "snapshot",
794                Self::Available => "available",
795            }
796        }
797    }
798
799    /// MongoDB write concern level.
800    #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
801    pub enum WriteConcern {
802        /// Acknowledge after writing to primary.
803        W1,
804        /// Acknowledge after writing to majority.
805        Majority,
806        /// Acknowledge after writing to specific number of nodes.
807        W(u32),
808        /// Acknowledge after writing to specific tag set.
809        Tag(String),
810    }
811
812    impl WriteConcern {
813        /// Convert to MongoDB options.
814        pub fn to_options(&self) -> JsonValue {
815            match self {
816                Self::W1 => serde_json::json!({ "w": 1 }),
817                Self::Majority => serde_json::json!({ "w": "majority" }),
818                Self::W(n) => serde_json::json!({ "w": n }),
819                Self::Tag(tag) => serde_json::json!({ "w": tag }),
820            }
821        }
822    }
823
824    /// MongoDB read preference with options.
825    #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
826    pub struct MongoReadPreference {
827        /// Read preference mode.
828        pub mode: ReadPreference,
829        /// Maximum staleness in seconds.
830        pub max_staleness_seconds: Option<u32>,
831        /// Tag sets for filtering replicas.
832        pub tag_sets: Vec<serde_json::Map<String, JsonValue>>,
833        /// Hedge reads for sharded clusters.
834        pub hedge: Option<bool>,
835    }
836
837    impl MongoReadPreference {
838        /// Create a new read preference.
839        pub fn new(mode: ReadPreference) -> Self {
840            Self {
841                mode,
842                max_staleness_seconds: None,
843                tag_sets: Vec::new(),
844                hedge: None,
845            }
846        }
847
848        /// Set max staleness.
849        pub fn max_staleness(mut self, seconds: u32) -> Self {
850            self.max_staleness_seconds = Some(seconds);
851            self
852        }
853
854        /// Add a tag set.
855        pub fn tag_set(mut self, tags: serde_json::Map<String, JsonValue>) -> Self {
856            self.tag_sets.push(tags);
857            self
858        }
859
860        /// Enable hedged reads.
861        pub fn hedged(mut self) -> Self {
862            self.hedge = Some(true);
863            self
864        }
865
866        /// Convert to MongoDB connection string options.
867        pub fn to_connection_options(&self) -> String {
868            let mut opts = vec![format!("readPreference={}", self.mode.to_mongodb())];
869
870            if let Some(staleness) = self.max_staleness_seconds {
871                opts.push(format!("maxStalenessSeconds={}", staleness));
872            }
873
874            opts.join("&")
875        }
876
877        /// Convert to MongoDB command options.
878        pub fn to_command_options(&self) -> JsonValue {
879            let mut opts = serde_json::Map::new();
880            opts.insert(
881                "mode".to_string(),
882                serde_json::json!(self.mode.to_mongodb()),
883            );
884
885            if let Some(staleness) = self.max_staleness_seconds {
886                opts.insert(
887                    "maxStalenessSeconds".to_string(),
888                    serde_json::json!(staleness),
889                );
890            }
891
892            if !self.tag_sets.is_empty() {
893                opts.insert("tagSets".to_string(), serde_json::json!(self.tag_sets));
894            }
895
896            if let Some(hedge) = self.hedge {
897                opts.insert("hedge".to_string(), serde_json::json!({ "enabled": hedge }));
898            }
899
900            serde_json::json!(opts)
901        }
902    }
903
904    /// MongoDB replica set status (from replSetGetStatus).
905    #[derive(Debug, Clone, Serialize, Deserialize)]
906    pub struct ReplicaSetStatus {
907        /// Replica set name.
908        pub set: String,
909        /// Members.
910        pub members: Vec<MemberStatus>,
911    }
912
913    /// Status of a replica set member.
914    #[derive(Debug, Clone, Serialize, Deserialize)]
915    pub struct MemberStatus {
916        /// Member ID.
917        pub id: u32,
918        /// Member name (host:port).
919        pub name: String,
920        /// State (PRIMARY, SECONDARY, etc.).
921        pub state_str: String,
922        /// Health (1 = healthy).
923        pub health: f64,
924        /// Replication lag in seconds.
925        #[serde(default)]
926        pub lag_seconds: Option<i64>,
927    }
928
929    impl MemberStatus {
930        /// Check if this member is primary.
931        pub fn is_primary(&self) -> bool {
932            self.state_str == "PRIMARY"
933        }
934
935        /// Check if this member is secondary.
936        pub fn is_secondary(&self) -> bool {
937            self.state_str == "SECONDARY"
938        }
939
940        /// Check if this member is healthy.
941        pub fn is_healthy(&self) -> bool {
942            self.health >= 1.0
943        }
944    }
945}
946
947#[cfg(test)]
948mod tests {
949    use super::*;
950
951    #[test]
952    fn test_replica_config() {
953        let primary =
954            ReplicaConfig::primary("pg1", "postgres://primary:5432/db").with_region("us-east-1");
955
956        assert_eq!(primary.role, ReplicaRole::Primary);
957        assert_eq!(primary.region.as_deref(), Some("us-east-1"));
958    }
959
960    #[test]
961    fn test_replica_set_builder() {
962        let config = ReplicaSetConfig::new("myapp")
963            .primary("pg1", "postgres://primary:5432/db")
964            .secondary("pg2", "postgres://secondary1:5432/db")
965            .secondary("pg3", "postgres://secondary2:5432/db")
966            .read_preference(ReadPreference::SecondaryPreferred)
967            .build();
968
969        assert_eq!(config.name, "myapp");
970        assert_eq!(config.replicas.len(), 3);
971        assert!(config.primary().is_some());
972        assert_eq!(config.secondaries().count(), 2);
973    }
974
975    #[test]
976    fn test_read_preference_mongodb() {
977        assert_eq!(ReadPreference::Primary.to_mongodb(), "primary");
978        assert_eq!(
979            ReadPreference::SecondaryPreferred.to_mongodb(),
980            "secondaryPreferred"
981        );
982        assert_eq!(ReadPreference::Nearest.to_mongodb(), "nearest");
983    }
984
985    #[test]
986    fn test_connection_router_write() {
987        let config = ReplicaSetConfig::new("test")
988            .primary("pg1", "postgres://primary:5432/db")
989            .secondary("pg2", "postgres://secondary:5432/db")
990            .build();
991
992        let mut router = ConnectionRouter::new(config);
993
994        // Mark replicas as healthy
995        router.update_health(
996            "pg1",
997            HealthStatus::Healthy,
998            Some(Duration::from_millis(5)),
999            None,
1000        );
1001        router.update_health(
1002            "pg2",
1003            HealthStatus::Healthy,
1004            Some(Duration::from_millis(10)),
1005            Some(Duration::from_secs(1)),
1006        );
1007
1008        // Write should go to primary
1009        let target = router.route(QueryType::Write, None).unwrap();
1010        assert_eq!(target.id, "pg1");
1011    }
1012
1013    #[test]
1014    fn test_connection_router_read_secondary() {
1015        let config = ReplicaSetConfig::new("test")
1016            .primary("pg1", "postgres://primary:5432/db")
1017            .secondary("pg2", "postgres://secondary:5432/db")
1018            .read_preference(ReadPreference::Secondary)
1019            .build();
1020
1021        let mut router = ConnectionRouter::new(config);
1022        router.update_health(
1023            "pg1",
1024            HealthStatus::Healthy,
1025            Some(Duration::from_millis(5)),
1026            None,
1027        );
1028        router.update_health(
1029            "pg2",
1030            HealthStatus::Healthy,
1031            Some(Duration::from_millis(10)),
1032            Some(Duration::from_secs(1)),
1033        );
1034
1035        // Read with Secondary preference should go to secondary
1036        let target = router.route(QueryType::Read, None).unwrap();
1037        assert_eq!(target.id, "pg2");
1038    }
1039
1040    #[test]
1041    fn test_lag_monitor() {
1042        let mut monitor = LagMonitor::new(Duration::from_secs(10));
1043
1044        monitor.record("pg2", Duration::from_secs(5));
1045        assert!(monitor.is_acceptable("pg2"));
1046
1047        monitor.record("pg3", Duration::from_secs(15));
1048        assert!(!monitor.is_acceptable("pg3"));
1049
1050        let lagging = monitor.get_lagging_replicas();
1051        assert_eq!(lagging, vec!["pg3"]);
1052    }
1053
1054    #[test]
1055    fn test_failover() {
1056        let config = ReplicaSetConfig::new("test")
1057            .primary("pg1", "postgres://primary:5432/db")
1058            .replica(
1059                ReplicaConfig::secondary("pg2", "postgres://secondary1:5432/db").with_priority(80),
1060            )
1061            .replica(
1062                ReplicaConfig::secondary("pg3", "postgres://secondary2:5432/db").with_priority(60),
1063            )
1064            .build();
1065
1066        let mut router = ConnectionRouter::new(config);
1067        router.update_health("pg1", HealthStatus::Unhealthy, None, None);
1068        router.update_health(
1069            "pg2",
1070            HealthStatus::Healthy,
1071            Some(Duration::from_millis(10)),
1072            None,
1073        );
1074        router.update_health(
1075            "pg3",
1076            HealthStatus::Healthy,
1077            Some(Duration::from_millis(15)),
1078            None,
1079        );
1080
1081        let new_primary = router.initiate_failover().unwrap();
1082        assert_eq!(new_primary, "pg2"); // Higher priority
1083    }
1084
1085    mod mongodb_tests {
1086        use super::super::mongodb::*;
1087        use super::*;
1088
1089        #[test]
1090        fn test_read_concern() {
1091            assert_eq!(ReadConcern::Majority.as_str(), "majority");
1092            assert_eq!(ReadConcern::Local.as_str(), "local");
1093        }
1094
1095        #[test]
1096        fn test_write_concern() {
1097            let w = WriteConcern::Majority;
1098            let opts = w.to_options();
1099            assert_eq!(opts["w"], "majority");
1100
1101            let w2 = WriteConcern::W(3);
1102            let opts2 = w2.to_options();
1103            assert_eq!(opts2["w"], 3);
1104        }
1105
1106        #[test]
1107        fn test_mongo_read_preference() {
1108            let pref = MongoReadPreference::new(ReadPreference::SecondaryPreferred)
1109                .max_staleness(90)
1110                .hedged();
1111
1112            let conn_opts = pref.to_connection_options();
1113            assert!(conn_opts.contains("readPreference=secondaryPreferred"));
1114            assert!(conn_opts.contains("maxStalenessSeconds=90"));
1115
1116            let cmd_opts = pref.to_command_options();
1117            assert_eq!(cmd_opts["mode"], "secondaryPreferred");
1118            assert_eq!(cmd_opts["maxStalenessSeconds"], 90);
1119        }
1120    }
1121}