prax_query/
replication.rs

1//! Replication and high availability support.
2//!
3//! This module provides types for managing database replication, read replicas,
4//! connection routing, and failover handling.
5//!
6//! # Database Support
7//!
8//! | Feature            | PostgreSQL | MySQL | SQLite | MSSQL     | MongoDB     |
9//! |--------------------|------------|-------|--------|-----------|-------------|
10//! | Read replicas      | ✅         | ✅    | ❌     | ✅ Always | ✅ Replica  |
11//! | Logical replication| ✅         | ✅    | ❌     | ✅        | ✅          |
12//! | Connection routing | ✅         | ✅    | ❌     | ✅        | ✅          |
13//! | Auto-failover      | ✅         | ✅    | ❌     | ✅        | ✅          |
14//! | Read preference    | ❌         | ❌    | ❌     | ❌        | ✅          |
15
16use std::collections::HashMap;
17use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
18use std::time::{Duration, Instant};
19
20use serde::{Deserialize, Serialize};
21
22use crate::error::{QueryError, QueryResult};
23
24// ============================================================================
25// Replica Configuration
26// ============================================================================
27
28/// Configuration for a database replica.
29#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
30pub struct ReplicaConfig {
31    /// Unique identifier for this replica.
32    pub id: String,
33    /// Connection URL.
34    pub url: String,
35    /// Role of this replica.
36    pub role: ReplicaRole,
37    /// Priority for failover (higher = preferred).
38    pub priority: u32,
39    /// Weight for load balancing (higher = more traffic).
40    pub weight: u32,
41    /// Region/datacenter for locality-aware routing.
42    pub region: Option<String>,
43    /// Maximum acceptable replication lag.
44    pub max_lag: Option<Duration>,
45}
46
47/// Role of a replica in the cluster.
48#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
49pub enum ReplicaRole {
50    /// Primary/master - handles writes.
51    Primary,
52    /// Secondary/replica - handles reads.
53    Secondary,
54    /// Arbiter - for elections only (MongoDB).
55    Arbiter,
56    /// Hidden - for backups, not queryable.
57    Hidden,
58}
59
60impl ReplicaConfig {
61    /// Create a primary replica config.
62    pub fn primary(id: impl Into<String>, url: impl Into<String>) -> Self {
63        Self {
64            id: id.into(),
65            url: url.into(),
66            role: ReplicaRole::Primary,
67            priority: 100,
68            weight: 100,
69            region: None,
70            max_lag: None,
71        }
72    }
73
74    /// Create a secondary replica config.
75    pub fn secondary(id: impl Into<String>, url: impl Into<String>) -> Self {
76        Self {
77            id: id.into(),
78            url: url.into(),
79            role: ReplicaRole::Secondary,
80            priority: 50,
81            weight: 100,
82            region: None,
83            max_lag: Some(Duration::from_secs(10)),
84        }
85    }
86
87    /// Set region.
88    pub fn with_region(mut self, region: impl Into<String>) -> Self {
89        self.region = Some(region.into());
90        self
91    }
92
93    /// Set weight.
94    pub fn with_weight(mut self, weight: u32) -> Self {
95        self.weight = weight;
96        self
97    }
98
99    /// Set priority.
100    pub fn with_priority(mut self, priority: u32) -> Self {
101        self.priority = priority;
102        self
103    }
104
105    /// Set max lag.
106    pub fn with_max_lag(mut self, lag: Duration) -> Self {
107        self.max_lag = Some(lag);
108        self
109    }
110}
111
112// ============================================================================
113// Replica Set Configuration
114// ============================================================================
115
116/// Configuration for a replica set.
117#[derive(Debug, Clone, Serialize, Deserialize)]
118pub struct ReplicaSetConfig {
119    /// Name of the replica set.
120    pub name: String,
121    /// List of replicas.
122    pub replicas: Vec<ReplicaConfig>,
123    /// Default read preference.
124    pub default_read_preference: ReadPreference,
125    /// Health check interval.
126    pub health_check_interval: Duration,
127    /// Failover timeout.
128    pub failover_timeout: Duration,
129}
130
131impl ReplicaSetConfig {
132    /// Create a new replica set config.
133    pub fn new(name: impl Into<String>) -> ReplicaSetBuilder {
134        ReplicaSetBuilder::new(name)
135    }
136
137    /// Get the primary replica.
138    pub fn primary(&self) -> Option<&ReplicaConfig> {
139        self.replicas.iter().find(|r| r.role == ReplicaRole::Primary)
140    }
141
142    /// Get all secondary replicas.
143    pub fn secondaries(&self) -> impl Iterator<Item = &ReplicaConfig> {
144        self.replicas.iter().filter(|r| r.role == ReplicaRole::Secondary)
145    }
146
147    /// Get replicas in a specific region.
148    pub fn in_region(&self, region: &str) -> impl Iterator<Item = &ReplicaConfig> {
149        self.replicas
150            .iter()
151            .filter(move |r| r.region.as_deref() == Some(region))
152    }
153}
154
155/// Builder for replica set configuration.
156#[derive(Debug, Clone)]
157pub struct ReplicaSetBuilder {
158    name: String,
159    replicas: Vec<ReplicaConfig>,
160    default_read_preference: ReadPreference,
161    health_check_interval: Duration,
162    failover_timeout: Duration,
163}
164
165impl ReplicaSetBuilder {
166    /// Create a new builder.
167    pub fn new(name: impl Into<String>) -> Self {
168        Self {
169            name: name.into(),
170            replicas: Vec::new(),
171            default_read_preference: ReadPreference::Primary,
172            health_check_interval: Duration::from_secs(10),
173            failover_timeout: Duration::from_secs(30),
174        }
175    }
176
177    /// Add a replica.
178    pub fn replica(mut self, config: ReplicaConfig) -> Self {
179        self.replicas.push(config);
180        self
181    }
182
183    /// Add primary.
184    pub fn primary(self, id: impl Into<String>, url: impl Into<String>) -> Self {
185        self.replica(ReplicaConfig::primary(id, url))
186    }
187
188    /// Add secondary.
189    pub fn secondary(self, id: impl Into<String>, url: impl Into<String>) -> Self {
190        self.replica(ReplicaConfig::secondary(id, url))
191    }
192
193    /// Set default read preference.
194    pub fn read_preference(mut self, pref: ReadPreference) -> Self {
195        self.default_read_preference = pref;
196        self
197    }
198
199    /// Set health check interval.
200    pub fn health_check_interval(mut self, interval: Duration) -> Self {
201        self.health_check_interval = interval;
202        self
203    }
204
205    /// Set failover timeout.
206    pub fn failover_timeout(mut self, timeout: Duration) -> Self {
207        self.failover_timeout = timeout;
208        self
209    }
210
211    /// Build the config.
212    pub fn build(self) -> ReplicaSetConfig {
213        ReplicaSetConfig {
214            name: self.name,
215            replicas: self.replicas,
216            default_read_preference: self.default_read_preference,
217            health_check_interval: self.health_check_interval,
218            failover_timeout: self.failover_timeout,
219        }
220    }
221}
222
223// ============================================================================
224// Read Preference
225// ============================================================================
226
227/// Read preference for query routing.
228#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
229pub enum ReadPreference {
230    /// Always read from primary.
231    Primary,
232    /// Prefer primary, fallback to secondary.
233    PrimaryPreferred,
234    /// Always read from secondary.
235    Secondary,
236    /// Prefer secondary, fallback to primary.
237    SecondaryPreferred,
238    /// Read from nearest replica by latency.
239    Nearest,
240    /// Read from specific region.
241    Region(String),
242    /// Custom tag set (MongoDB).
243    TagSet(Vec<HashMap<String, String>>),
244}
245
246impl ReadPreference {
247    /// Create a region preference.
248    pub fn region(region: impl Into<String>) -> Self {
249        Self::Region(region.into())
250    }
251
252    /// Create a tag set preference.
253    pub fn tag_set(tags: Vec<HashMap<String, String>>) -> Self {
254        Self::TagSet(tags)
255    }
256
257    /// Convert to MongoDB read preference string.
258    pub fn to_mongodb(&self) -> &'static str {
259        match self {
260            Self::Primary => "primary",
261            Self::PrimaryPreferred => "primaryPreferred",
262            Self::Secondary => "secondary",
263            Self::SecondaryPreferred => "secondaryPreferred",
264            Self::Nearest => "nearest",
265            Self::Region(_) | Self::TagSet(_) => "nearest",
266        }
267    }
268
269    /// Check if this preference allows reading from primary.
270    pub fn allows_primary(&self) -> bool {
271        matches!(
272            self,
273            Self::Primary | Self::PrimaryPreferred | Self::Nearest | Self::Region(_) | Self::TagSet(_)
274        )
275    }
276
277    /// Check if this preference allows reading from secondary.
278    pub fn allows_secondary(&self) -> bool {
279        !matches!(self, Self::Primary)
280    }
281}
282
283impl Default for ReadPreference {
284    fn default() -> Self {
285        Self::Primary
286    }
287}
288
289// ============================================================================
290// Replica Health
291// ============================================================================
292
293/// Health status of a replica.
294#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
295pub enum HealthStatus {
296    /// Replica is healthy and accepting connections.
297    Healthy,
298    /// Replica is degraded (high lag, slow responses).
299    Degraded,
300    /// Replica is unhealthy (not responding).
301    Unhealthy,
302    /// Health status unknown (not yet checked).
303    Unknown,
304}
305
306/// Health information for a replica.
307#[derive(Debug, Clone)]
308pub struct ReplicaHealth {
309    /// Replica ID.
310    pub id: String,
311    /// Current health status.
312    pub status: HealthStatus,
313    /// Current replication lag (if known).
314    pub lag: Option<Duration>,
315    /// Last successful health check.
316    pub last_check: Option<Instant>,
317    /// Response latency.
318    pub latency: Option<Duration>,
319    /// Consecutive failures.
320    pub consecutive_failures: u32,
321}
322
323impl ReplicaHealth {
324    /// Create a new health record.
325    pub fn new(id: impl Into<String>) -> Self {
326        Self {
327            id: id.into(),
328            status: HealthStatus::Unknown,
329            lag: None,
330            last_check: None,
331            latency: None,
332            consecutive_failures: 0,
333        }
334    }
335
336    /// Mark as healthy.
337    pub fn mark_healthy(&mut self, latency: Duration, lag: Option<Duration>) {
338        self.status = HealthStatus::Healthy;
339        self.latency = Some(latency);
340        self.lag = lag;
341        self.last_check = Some(Instant::now());
342        self.consecutive_failures = 0;
343    }
344
345    /// Mark as degraded.
346    pub fn mark_degraded(&mut self, reason: &str) {
347        self.status = HealthStatus::Degraded;
348        self.last_check = Some(Instant::now());
349        let _ = reason; // Could log this
350    }
351
352    /// Mark as unhealthy.
353    pub fn mark_unhealthy(&mut self) {
354        self.status = HealthStatus::Unhealthy;
355        self.last_check = Some(Instant::now());
356        self.consecutive_failures += 1;
357    }
358
359    /// Check if replica is usable for queries.
360    pub fn is_usable(&self) -> bool {
361        matches!(self.status, HealthStatus::Healthy | HealthStatus::Degraded)
362    }
363}
364
365// ============================================================================
366// Connection Router
367// ============================================================================
368
369/// Query type for routing decisions.
370#[derive(Debug, Clone, Copy, PartialEq, Eq)]
371pub enum QueryType {
372    /// Read-only query.
373    Read,
374    /// Write query.
375    Write,
376    /// Transaction (requires primary).
377    Transaction,
378}
379
380/// Connection router for read/write splitting.
381#[derive(Debug)]
382pub struct ConnectionRouter {
383    /// Replica set configuration.
384    config: ReplicaSetConfig,
385    /// Health status of each replica.
386    health: HashMap<String, ReplicaHealth>,
387    /// Current primary ID.
388    current_primary: Option<String>,
389    /// Round-robin counter for load balancing.
390    round_robin: AtomicUsize,
391    /// Whether router is in failover mode.
392    in_failover: AtomicBool,
393}
394
395impl ConnectionRouter {
396    /// Create a new router.
397    pub fn new(config: ReplicaSetConfig) -> Self {
398        let mut health = HashMap::new();
399        let mut primary_id = None;
400
401        for replica in &config.replicas {
402            health.insert(replica.id.clone(), ReplicaHealth::new(&replica.id));
403            if replica.role == ReplicaRole::Primary {
404                primary_id = Some(replica.id.clone());
405            }
406        }
407
408        Self {
409            config,
410            health,
411            current_primary: primary_id,
412            round_robin: AtomicUsize::new(0),
413            in_failover: AtomicBool::new(false),
414        }
415    }
416
417    /// Get replica for a query based on query type and read preference.
418    pub fn route(&self, query_type: QueryType, preference: Option<&ReadPreference>) -> QueryResult<&ReplicaConfig> {
419        let pref = preference.unwrap_or(&self.config.default_read_preference);
420
421        match query_type {
422            QueryType::Write | QueryType::Transaction => self.get_primary(),
423            QueryType::Read => self.route_read(pref),
424        }
425    }
426
427    /// Get the primary replica.
428    pub fn get_primary(&self) -> QueryResult<&ReplicaConfig> {
429        let primary_id = self.current_primary.as_ref().ok_or_else(|| {
430            QueryError::connection("No primary replica available")
431        })?;
432
433        self.config
434            .replicas
435            .iter()
436            .find(|r| &r.id == primary_id)
437            .ok_or_else(|| QueryError::connection("Primary replica not found"))
438    }
439
440    /// Route a read query based on preference.
441    fn route_read(&self, preference: &ReadPreference) -> QueryResult<&ReplicaConfig> {
442        match preference {
443            ReadPreference::Primary => self.get_primary(),
444            ReadPreference::PrimaryPreferred => {
445                self.get_primary().or_else(|_| self.get_any_secondary())
446            }
447            ReadPreference::Secondary => self.get_any_secondary(),
448            ReadPreference::SecondaryPreferred => {
449                self.get_any_secondary().or_else(|_| self.get_primary())
450            }
451            ReadPreference::Nearest => self.get_nearest(),
452            ReadPreference::Region(region) => self.get_in_region(region),
453            ReadPreference::TagSet(_tags) => {
454                // Simplified: just get nearest for now
455                self.get_nearest()
456            }
457        }
458    }
459
460    /// Get any healthy secondary.
461    fn get_any_secondary(&self) -> QueryResult<&ReplicaConfig> {
462        let secondaries: Vec<_> = self
463            .config
464            .secondaries()
465            .filter(|r| self.is_replica_healthy(&r.id))
466            .collect();
467
468        if secondaries.is_empty() {
469            return Err(QueryError::connection("No healthy secondary replicas available"));
470        }
471
472        // Round-robin selection
473        let idx = self.round_robin.fetch_add(1, Ordering::Relaxed) % secondaries.len();
474        Ok(secondaries[idx])
475    }
476
477    /// Get the nearest replica by latency.
478    fn get_nearest(&self) -> QueryResult<&ReplicaConfig> {
479        let mut best: Option<(&ReplicaConfig, Duration)> = None;
480
481        for replica in &self.config.replicas {
482            if !self.is_replica_healthy(&replica.id) {
483                continue;
484            }
485
486            if let Some(health) = self.health.get(&replica.id) {
487                if let Some(latency) = health.latency {
488                    match &best {
489                        None => best = Some((replica, latency)),
490                        Some((_, best_latency)) if latency < *best_latency => {
491                            best = Some((replica, latency));
492                        }
493                        _ => {}
494                    }
495                }
496            }
497        }
498
499        best.map(|(r, _)| r)
500            .ok_or_else(|| QueryError::connection("No healthy replicas available"))
501    }
502
503    /// Get replica in specific region.
504    fn get_in_region(&self, region: &str) -> QueryResult<&ReplicaConfig> {
505        let replicas: Vec<_> = self
506            .config
507            .in_region(region)
508            .filter(|r| self.is_replica_healthy(&r.id))
509            .collect();
510
511        if replicas.is_empty() {
512            // Fallback to nearest
513            return self.get_nearest();
514        }
515
516        let idx = self.round_robin.fetch_add(1, Ordering::Relaxed) % replicas.len();
517        Ok(replicas[idx])
518    }
519
520    /// Check if a replica is healthy.
521    fn is_replica_healthy(&self, id: &str) -> bool {
522        self.health
523            .get(id)
524            .map(|h| h.is_usable())
525            .unwrap_or(false)
526    }
527
528    /// Update health status of a replica.
529    pub fn update_health(&mut self, id: &str, status: HealthStatus, latency: Option<Duration>, lag: Option<Duration>) {
530        if let Some(health) = self.health.get_mut(id) {
531            match status {
532                HealthStatus::Healthy => {
533                    health.mark_healthy(latency.unwrap_or(Duration::ZERO), lag);
534                }
535                HealthStatus::Degraded => {
536                    health.mark_degraded("degraded");
537                }
538                HealthStatus::Unhealthy => {
539                    health.mark_unhealthy();
540                }
541                HealthStatus::Unknown => {}
542            }
543        }
544    }
545
546    /// Check if replication lag is acceptable.
547    pub fn check_lag(&self, replica_id: &str, max_lag: Duration) -> bool {
548        self.health
549            .get(replica_id)
550            .and_then(|h| h.lag)
551            .map(|lag| lag <= max_lag)
552            .unwrap_or(false)
553    }
554
555    /// Initiate failover to a new primary.
556    pub fn initiate_failover(&mut self) -> QueryResult<String> {
557        self.in_failover.store(true, Ordering::SeqCst);
558
559        // Find best candidate (highest priority secondary that is healthy)
560        let candidate = self
561            .config
562            .replicas
563            .iter()
564            .filter(|r| r.role == ReplicaRole::Secondary)
565            .filter(|r| self.is_replica_healthy(&r.id))
566            .max_by_key(|r| r.priority);
567
568        match candidate {
569            Some(new_primary) => {
570                let new_primary_id = new_primary.id.clone();
571                self.current_primary = Some(new_primary_id.clone());
572                self.in_failover.store(false, Ordering::SeqCst);
573                Ok(new_primary_id)
574            }
575            None => {
576                self.in_failover.store(false, Ordering::SeqCst);
577                Err(QueryError::connection("No suitable failover candidate found"))
578            }
579        }
580    }
581
582    /// Check if router is currently in failover mode.
583    pub fn is_in_failover(&self) -> bool {
584        self.in_failover.load(Ordering::SeqCst)
585    }
586}
587
588// ============================================================================
589// Replication Lag Monitor
590// ============================================================================
591
592/// Monitor for tracking replication lag.
593#[derive(Debug)]
594pub struct LagMonitor {
595    /// Lag measurements per replica.
596    measurements: HashMap<String, LagMeasurement>,
597    /// Maximum acceptable lag.
598    max_acceptable_lag: Duration,
599}
600
601/// Lag measurement for a replica.
602#[derive(Debug, Clone)]
603pub struct LagMeasurement {
604    /// Current lag.
605    pub current: Duration,
606    /// Average lag.
607    pub average: Duration,
608    /// Maximum observed lag.
609    pub max: Duration,
610    /// Timestamp of measurement.
611    pub timestamp: Instant,
612    /// Number of samples.
613    pub samples: u64,
614}
615
616impl LagMonitor {
617    /// Create a new lag monitor.
618    pub fn new(max_acceptable_lag: Duration) -> Self {
619        Self {
620            measurements: HashMap::new(),
621            max_acceptable_lag,
622        }
623    }
624
625    /// Record a lag measurement.
626    pub fn record(&mut self, replica_id: &str, lag: Duration) {
627        let entry = self.measurements.entry(replica_id.to_string()).or_insert_with(|| {
628            LagMeasurement {
629                current: Duration::ZERO,
630                average: Duration::ZERO,
631                max: Duration::ZERO,
632                timestamp: Instant::now(),
633                samples: 0,
634            }
635        });
636
637        entry.current = lag;
638        entry.max = entry.max.max(lag);
639        entry.samples += 1;
640
641        // Exponential moving average
642        let alpha = 0.3;
643        let new_avg = Duration::from_secs_f64(
644            entry.average.as_secs_f64() * (1.0 - alpha) + lag.as_secs_f64() * alpha,
645        );
646        entry.average = new_avg;
647        entry.timestamp = Instant::now();
648    }
649
650    /// Check if a replica is within acceptable lag.
651    pub fn is_acceptable(&self, replica_id: &str) -> bool {
652        self.measurements
653            .get(replica_id)
654            .map(|m| m.current <= self.max_acceptable_lag)
655            .unwrap_or(true) // Unknown = assume OK
656    }
657
658    /// Get current lag for a replica.
659    pub fn get_lag(&self, replica_id: &str) -> Option<Duration> {
660        self.measurements.get(replica_id).map(|m| m.current)
661    }
662
663    /// Get all replicas that are lagging too much.
664    pub fn get_lagging_replicas(&self) -> Vec<&str> {
665        self.measurements
666            .iter()
667            .filter(|(_, m)| m.current > self.max_acceptable_lag)
668            .map(|(id, _)| id.as_str())
669            .collect()
670    }
671}
672
673// ============================================================================
674// SQL Helpers for Replication Lag
675// ============================================================================
676
677/// SQL queries for checking replication lag.
678pub mod lag_queries {
679    use crate::sql::DatabaseType;
680
681    /// Generate SQL to check replication lag.
682    pub fn check_lag_sql(db_type: DatabaseType) -> &'static str {
683        match db_type {
684            DatabaseType::PostgreSQL => {
685                // Returns lag in seconds
686                "SELECT EXTRACT(EPOCH FROM (now() - pg_last_xact_replay_timestamp()))::INT AS lag_seconds"
687            }
688            DatabaseType::MySQL => {
689                // Returns Seconds_Behind_Master
690                "SHOW SLAVE STATUS"
691            }
692            DatabaseType::MSSQL => {
693                // Check AG synchronization state
694                "SELECT datediff(s, last_commit_time, getdate()) AS lag_seconds \
695                 FROM sys.dm_hadr_database_replica_states \
696                 WHERE is_local = 1"
697            }
698            DatabaseType::SQLite => {
699                // SQLite doesn't have replication
700                "SELECT 0 AS lag_seconds"
701            }
702        }
703    }
704
705    /// Generate SQL to check if replica is primary.
706    pub fn is_primary_sql(db_type: DatabaseType) -> &'static str {
707        match db_type {
708            DatabaseType::PostgreSQL => "SELECT NOT pg_is_in_recovery() AS is_primary",
709            DatabaseType::MySQL => "SELECT @@read_only = 0 AS is_primary",
710            DatabaseType::MSSQL => {
711                "SELECT CASE WHEN role = 1 THEN 1 ELSE 0 END AS is_primary \
712                 FROM sys.dm_hadr_availability_replica_states \
713                 WHERE is_local = 1"
714            }
715            DatabaseType::SQLite => "SELECT 1 AS is_primary",
716        }
717    }
718
719    /// Generate SQL to get replica status.
720    pub fn replica_status_sql(db_type: DatabaseType) -> &'static str {
721        match db_type {
722            DatabaseType::PostgreSQL => {
723                "SELECT \
724                     pg_is_in_recovery() AS is_replica, \
725                     pg_last_wal_receive_lsn() AS receive_lsn, \
726                     pg_last_wal_replay_lsn() AS replay_lsn"
727            }
728            DatabaseType::MySQL => "SHOW REPLICA STATUS",
729            DatabaseType::MSSQL => {
730                "SELECT synchronization_state_desc, synchronization_health_desc \
731                 FROM sys.dm_hadr_database_replica_states \
732                 WHERE is_local = 1"
733            }
734            DatabaseType::SQLite => "SELECT 'primary' AS status",
735        }
736    }
737}
738
739// ============================================================================
740// MongoDB Specific
741// ============================================================================
742
743/// MongoDB-specific replication types.
744pub mod mongodb {
745    use serde::{Deserialize, Serialize};
746    use serde_json::Value as JsonValue;
747
748    use super::ReadPreference;
749
750    /// MongoDB read concern level.
751    #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
752    pub enum ReadConcern {
753        /// Read from local data (fastest, may be stale).
754        Local,
755        /// Read data committed to majority of nodes.
756        Majority,
757        /// Linearizable reads (strongest, slowest).
758        Linearizable,
759        /// Read data available at query start.
760        Snapshot,
761        /// Read available data (may return orphaned docs).
762        Available,
763    }
764
765    impl ReadConcern {
766        /// Convert to MongoDB string.
767        pub fn as_str(&self) -> &'static str {
768            match self {
769                Self::Local => "local",
770                Self::Majority => "majority",
771                Self::Linearizable => "linearizable",
772                Self::Snapshot => "snapshot",
773                Self::Available => "available",
774            }
775        }
776    }
777
778    /// MongoDB write concern level.
779    #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
780    pub enum WriteConcern {
781        /// Acknowledge after writing to primary.
782        W1,
783        /// Acknowledge after writing to majority.
784        Majority,
785        /// Acknowledge after writing to specific number of nodes.
786        W(u32),
787        /// Acknowledge after writing to specific tag set.
788        Tag(String),
789    }
790
791    impl WriteConcern {
792        /// Convert to MongoDB options.
793        pub fn to_options(&self) -> JsonValue {
794            match self {
795                Self::W1 => serde_json::json!({ "w": 1 }),
796                Self::Majority => serde_json::json!({ "w": "majority" }),
797                Self::W(n) => serde_json::json!({ "w": n }),
798                Self::Tag(tag) => serde_json::json!({ "w": tag }),
799            }
800        }
801    }
802
803    /// MongoDB read preference with options.
804    #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
805    pub struct MongoReadPreference {
806        /// Read preference mode.
807        pub mode: ReadPreference,
808        /// Maximum staleness in seconds.
809        pub max_staleness_seconds: Option<u32>,
810        /// Tag sets for filtering replicas.
811        pub tag_sets: Vec<serde_json::Map<String, JsonValue>>,
812        /// Hedge reads for sharded clusters.
813        pub hedge: Option<bool>,
814    }
815
816    impl MongoReadPreference {
817        /// Create a new read preference.
818        pub fn new(mode: ReadPreference) -> Self {
819            Self {
820                mode,
821                max_staleness_seconds: None,
822                tag_sets: Vec::new(),
823                hedge: None,
824            }
825        }
826
827        /// Set max staleness.
828        pub fn max_staleness(mut self, seconds: u32) -> Self {
829            self.max_staleness_seconds = Some(seconds);
830            self
831        }
832
833        /// Add a tag set.
834        pub fn tag_set(mut self, tags: serde_json::Map<String, JsonValue>) -> Self {
835            self.tag_sets.push(tags);
836            self
837        }
838
839        /// Enable hedged reads.
840        pub fn hedged(mut self) -> Self {
841            self.hedge = Some(true);
842            self
843        }
844
845        /// Convert to MongoDB connection string options.
846        pub fn to_connection_options(&self) -> String {
847            let mut opts = vec![format!("readPreference={}", self.mode.to_mongodb())];
848
849            if let Some(staleness) = self.max_staleness_seconds {
850                opts.push(format!("maxStalenessSeconds={}", staleness));
851            }
852
853            opts.join("&")
854        }
855
856        /// Convert to MongoDB command options.
857        pub fn to_command_options(&self) -> JsonValue {
858            let mut opts = serde_json::Map::new();
859            opts.insert("mode".to_string(), serde_json::json!(self.mode.to_mongodb()));
860
861            if let Some(staleness) = self.max_staleness_seconds {
862                opts.insert("maxStalenessSeconds".to_string(), serde_json::json!(staleness));
863            }
864
865            if !self.tag_sets.is_empty() {
866                opts.insert("tagSets".to_string(), serde_json::json!(self.tag_sets));
867            }
868
869            if let Some(hedge) = self.hedge {
870                opts.insert("hedge".to_string(), serde_json::json!({ "enabled": hedge }));
871            }
872
873            serde_json::json!(opts)
874        }
875    }
876
877    /// MongoDB replica set status (from replSetGetStatus).
878    #[derive(Debug, Clone, Serialize, Deserialize)]
879    pub struct ReplicaSetStatus {
880        /// Replica set name.
881        pub set: String,
882        /// Members.
883        pub members: Vec<MemberStatus>,
884    }
885
886    /// Status of a replica set member.
887    #[derive(Debug, Clone, Serialize, Deserialize)]
888    pub struct MemberStatus {
889        /// Member ID.
890        pub id: u32,
891        /// Member name (host:port).
892        pub name: String,
893        /// State (PRIMARY, SECONDARY, etc.).
894        pub state_str: String,
895        /// Health (1 = healthy).
896        pub health: f64,
897        /// Replication lag in seconds.
898        #[serde(default)]
899        pub lag_seconds: Option<i64>,
900    }
901
902    impl MemberStatus {
903        /// Check if this member is primary.
904        pub fn is_primary(&self) -> bool {
905            self.state_str == "PRIMARY"
906        }
907
908        /// Check if this member is secondary.
909        pub fn is_secondary(&self) -> bool {
910            self.state_str == "SECONDARY"
911        }
912
913        /// Check if this member is healthy.
914        pub fn is_healthy(&self) -> bool {
915            self.health >= 1.0
916        }
917    }
918}
919
920#[cfg(test)]
921mod tests {
922    use super::*;
923
924    #[test]
925    fn test_replica_config() {
926        let primary = ReplicaConfig::primary("pg1", "postgres://primary:5432/db")
927            .with_region("us-east-1");
928
929        assert_eq!(primary.role, ReplicaRole::Primary);
930        assert_eq!(primary.region.as_deref(), Some("us-east-1"));
931    }
932
933    #[test]
934    fn test_replica_set_builder() {
935        let config = ReplicaSetConfig::new("myapp")
936            .primary("pg1", "postgres://primary:5432/db")
937            .secondary("pg2", "postgres://secondary1:5432/db")
938            .secondary("pg3", "postgres://secondary2:5432/db")
939            .read_preference(ReadPreference::SecondaryPreferred)
940            .build();
941
942        assert_eq!(config.name, "myapp");
943        assert_eq!(config.replicas.len(), 3);
944        assert!(config.primary().is_some());
945        assert_eq!(config.secondaries().count(), 2);
946    }
947
948    #[test]
949    fn test_read_preference_mongodb() {
950        assert_eq!(ReadPreference::Primary.to_mongodb(), "primary");
951        assert_eq!(ReadPreference::SecondaryPreferred.to_mongodb(), "secondaryPreferred");
952        assert_eq!(ReadPreference::Nearest.to_mongodb(), "nearest");
953    }
954
955    #[test]
956    fn test_connection_router_write() {
957        let config = ReplicaSetConfig::new("test")
958            .primary("pg1", "postgres://primary:5432/db")
959            .secondary("pg2", "postgres://secondary:5432/db")
960            .build();
961
962        let mut router = ConnectionRouter::new(config);
963
964        // Mark replicas as healthy
965        router.update_health("pg1", HealthStatus::Healthy, Some(Duration::from_millis(5)), None);
966        router.update_health("pg2", HealthStatus::Healthy, Some(Duration::from_millis(10)), Some(Duration::from_secs(1)));
967
968        // Write should go to primary
969        let target = router.route(QueryType::Write, None).unwrap();
970        assert_eq!(target.id, "pg1");
971    }
972
973    #[test]
974    fn test_connection_router_read_secondary() {
975        let config = ReplicaSetConfig::new("test")
976            .primary("pg1", "postgres://primary:5432/db")
977            .secondary("pg2", "postgres://secondary:5432/db")
978            .read_preference(ReadPreference::Secondary)
979            .build();
980
981        let mut router = ConnectionRouter::new(config);
982        router.update_health("pg1", HealthStatus::Healthy, Some(Duration::from_millis(5)), None);
983        router.update_health("pg2", HealthStatus::Healthy, Some(Duration::from_millis(10)), Some(Duration::from_secs(1)));
984
985        // Read with Secondary preference should go to secondary
986        let target = router.route(QueryType::Read, None).unwrap();
987        assert_eq!(target.id, "pg2");
988    }
989
990    #[test]
991    fn test_lag_monitor() {
992        let mut monitor = LagMonitor::new(Duration::from_secs(10));
993
994        monitor.record("pg2", Duration::from_secs(5));
995        assert!(monitor.is_acceptable("pg2"));
996
997        monitor.record("pg3", Duration::from_secs(15));
998        assert!(!monitor.is_acceptable("pg3"));
999
1000        let lagging = monitor.get_lagging_replicas();
1001        assert_eq!(lagging, vec!["pg3"]);
1002    }
1003
1004    #[test]
1005    fn test_failover() {
1006        let config = ReplicaSetConfig::new("test")
1007            .primary("pg1", "postgres://primary:5432/db")
1008            .replica(ReplicaConfig::secondary("pg2", "postgres://secondary1:5432/db").with_priority(80))
1009            .replica(ReplicaConfig::secondary("pg3", "postgres://secondary2:5432/db").with_priority(60))
1010            .build();
1011
1012        let mut router = ConnectionRouter::new(config);
1013        router.update_health("pg1", HealthStatus::Unhealthy, None, None);
1014        router.update_health("pg2", HealthStatus::Healthy, Some(Duration::from_millis(10)), None);
1015        router.update_health("pg3", HealthStatus::Healthy, Some(Duration::from_millis(15)), None);
1016
1017        let new_primary = router.initiate_failover().unwrap();
1018        assert_eq!(new_primary, "pg2"); // Higher priority
1019    }
1020
1021    mod mongodb_tests {
1022        use super::super::mongodb::*;
1023        use super::*;
1024
1025        #[test]
1026        fn test_read_concern() {
1027            assert_eq!(ReadConcern::Majority.as_str(), "majority");
1028            assert_eq!(ReadConcern::Local.as_str(), "local");
1029        }
1030
1031        #[test]
1032        fn test_write_concern() {
1033            let w = WriteConcern::Majority;
1034            let opts = w.to_options();
1035            assert_eq!(opts["w"], "majority");
1036
1037            let w2 = WriteConcern::W(3);
1038            let opts2 = w2.to_options();
1039            assert_eq!(opts2["w"], 3);
1040        }
1041
1042        #[test]
1043        fn test_mongo_read_preference() {
1044            let pref = MongoReadPreference::new(ReadPreference::SecondaryPreferred)
1045                .max_staleness(90)
1046                .hedged();
1047
1048            let conn_opts = pref.to_connection_options();
1049            assert!(conn_opts.contains("readPreference=secondaryPreferred"));
1050            assert!(conn_opts.contains("maxStalenessSeconds=90"));
1051
1052            let cmd_opts = pref.to_command_options();
1053            assert_eq!(cmd_opts["mode"], "secondaryPreferred");
1054            assert_eq!(cmd_opts["maxStalenessSeconds"], 90);
1055        }
1056    }
1057}
1058