Skip to main content

prax_query/
replication.rs

1//! Replication and high availability support.
2//!
3//! This module provides types for managing database replication, read replicas,
4//! connection routing, and failover handling.
5//!
6//! # Database Support
7//!
8//! | Feature            | PostgreSQL | MySQL | SQLite | MSSQL     | MongoDB     |
9//! |--------------------|------------|-------|--------|-----------|-------------|
10//! | Read replicas      | ✅         | ✅    | ❌     | ✅ Always | ✅ Replica  |
11//! | Logical replication| ✅         | ✅    | ❌     | ✅        | ✅          |
12//! | Connection routing | ✅         | ✅    | ❌     | ✅        | ✅          |
13//! | Auto-failover      | ✅         | ✅    | ❌     | ✅        | ✅          |
14//! | Read preference    | ❌         | ❌    | ❌     | ❌        | ✅          |
15
16use std::collections::HashMap;
17use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
18use std::time::{Duration, Instant};
19
20use serde::{Deserialize, Serialize};
21
22use crate::error::{QueryError, QueryResult};
23
24// ============================================================================
25// Replica Configuration
26// ============================================================================
27
28/// Configuration for a database replica.
29#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
30pub struct ReplicaConfig {
31    /// Unique identifier for this replica.
32    pub id: String,
33    /// Connection URL.
34    pub url: String,
35    /// Role of this replica.
36    pub role: ReplicaRole,
37    /// Priority for failover (higher = preferred).
38    pub priority: u32,
39    /// Weight for load balancing (higher = more traffic).
40    pub weight: u32,
41    /// Region/datacenter for locality-aware routing.
42    pub region: Option<String>,
43    /// Maximum acceptable replication lag.
44    pub max_lag: Option<Duration>,
45}
46
47/// Role of a replica in the cluster.
48#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
49pub enum ReplicaRole {
50    /// Primary/master - handles writes.
51    Primary,
52    /// Secondary/replica - handles reads.
53    Secondary,
54    /// Arbiter - for elections only (MongoDB).
55    Arbiter,
56    /// Hidden - for backups, not queryable.
57    Hidden,
58}
59
60impl ReplicaConfig {
61    /// Create a primary replica config.
62    pub fn primary(id: impl Into<String>, url: impl Into<String>) -> Self {
63        Self {
64            id: id.into(),
65            url: url.into(),
66            role: ReplicaRole::Primary,
67            priority: 100,
68            weight: 100,
69            region: None,
70            max_lag: None,
71        }
72    }
73
74    /// Create a secondary replica config.
75    pub fn secondary(id: impl Into<String>, url: impl Into<String>) -> Self {
76        Self {
77            id: id.into(),
78            url: url.into(),
79            role: ReplicaRole::Secondary,
80            priority: 50,
81            weight: 100,
82            region: None,
83            max_lag: Some(Duration::from_secs(10)),
84        }
85    }
86
87    /// Set region.
88    pub fn with_region(mut self, region: impl Into<String>) -> Self {
89        self.region = Some(region.into());
90        self
91    }
92
93    /// Set weight.
94    pub fn with_weight(mut self, weight: u32) -> Self {
95        self.weight = weight;
96        self
97    }
98
99    /// Set priority.
100    pub fn with_priority(mut self, priority: u32) -> Self {
101        self.priority = priority;
102        self
103    }
104
105    /// Set max lag.
106    pub fn with_max_lag(mut self, lag: Duration) -> Self {
107        self.max_lag = Some(lag);
108        self
109    }
110}
111
112// ============================================================================
113// Replica Set Configuration
114// ============================================================================
115
116/// Configuration for a replica set.
117#[derive(Debug, Clone, Serialize, Deserialize)]
118pub struct ReplicaSetConfig {
119    /// Name of the replica set.
120    pub name: String,
121    /// List of replicas.
122    pub replicas: Vec<ReplicaConfig>,
123    /// Default read preference.
124    pub default_read_preference: ReadPreference,
125    /// Health check interval.
126    pub health_check_interval: Duration,
127    /// Failover timeout.
128    pub failover_timeout: Duration,
129}
130
131impl ReplicaSetConfig {
132    /// Create a new replica set config.
133    pub fn new(name: impl Into<String>) -> ReplicaSetBuilder {
134        ReplicaSetBuilder::new(name)
135    }
136
137    /// Get the primary replica.
138    pub fn primary(&self) -> Option<&ReplicaConfig> {
139        self.replicas
140            .iter()
141            .find(|r| r.role == ReplicaRole::Primary)
142    }
143
144    /// Get all secondary replicas.
145    pub fn secondaries(&self) -> impl Iterator<Item = &ReplicaConfig> {
146        self.replicas
147            .iter()
148            .filter(|r| r.role == ReplicaRole::Secondary)
149    }
150
151    /// Get replicas in a specific region.
152    pub fn in_region(&self, region: &str) -> impl Iterator<Item = &ReplicaConfig> {
153        self.replicas
154            .iter()
155            .filter(move |r| r.region.as_deref() == Some(region))
156    }
157}
158
159/// Builder for replica set configuration.
160#[derive(Debug, Clone)]
161pub struct ReplicaSetBuilder {
162    name: String,
163    replicas: Vec<ReplicaConfig>,
164    default_read_preference: ReadPreference,
165    health_check_interval: Duration,
166    failover_timeout: Duration,
167}
168
169impl ReplicaSetBuilder {
170    /// Create a new builder.
171    pub fn new(name: impl Into<String>) -> Self {
172        Self {
173            name: name.into(),
174            replicas: Vec::new(),
175            default_read_preference: ReadPreference::Primary,
176            health_check_interval: Duration::from_secs(10),
177            failover_timeout: Duration::from_secs(30),
178        }
179    }
180
181    /// Add a replica.
182    pub fn replica(mut self, config: ReplicaConfig) -> Self {
183        self.replicas.push(config);
184        self
185    }
186
187    /// Add primary.
188    pub fn primary(self, id: impl Into<String>, url: impl Into<String>) -> Self {
189        self.replica(ReplicaConfig::primary(id, url))
190    }
191
192    /// Add secondary.
193    pub fn secondary(self, id: impl Into<String>, url: impl Into<String>) -> Self {
194        self.replica(ReplicaConfig::secondary(id, url))
195    }
196
197    /// Set default read preference.
198    pub fn read_preference(mut self, pref: ReadPreference) -> Self {
199        self.default_read_preference = pref;
200        self
201    }
202
203    /// Set health check interval.
204    pub fn health_check_interval(mut self, interval: Duration) -> Self {
205        self.health_check_interval = interval;
206        self
207    }
208
209    /// Set failover timeout.
210    pub fn failover_timeout(mut self, timeout: Duration) -> Self {
211        self.failover_timeout = timeout;
212        self
213    }
214
215    /// Build the config.
216    pub fn build(self) -> ReplicaSetConfig {
217        ReplicaSetConfig {
218            name: self.name,
219            replicas: self.replicas,
220            default_read_preference: self.default_read_preference,
221            health_check_interval: self.health_check_interval,
222            failover_timeout: self.failover_timeout,
223        }
224    }
225}
226
227// ============================================================================
228// Read Preference
229// ============================================================================
230
231/// Read preference for query routing.
232#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
233pub enum ReadPreference {
234    /// Always read from primary.
235    #[default]
236    Primary,
237    /// Prefer primary, fallback to secondary.
238    PrimaryPreferred,
239    /// Always read from secondary.
240    Secondary,
241    /// Prefer secondary, fallback to primary.
242    SecondaryPreferred,
243    /// Read from nearest replica by latency.
244    Nearest,
245    /// Read from specific region.
246    Region(String),
247    /// Custom tag set (MongoDB).
248    TagSet(Vec<HashMap<String, String>>),
249}
250
251impl ReadPreference {
252    /// Create a region preference.
253    pub fn region(region: impl Into<String>) -> Self {
254        Self::Region(region.into())
255    }
256
257    /// Create a tag set preference.
258    pub fn tag_set(tags: Vec<HashMap<String, String>>) -> Self {
259        Self::TagSet(tags)
260    }
261
262    /// Convert to MongoDB read preference string.
263    pub fn to_mongodb(&self) -> &'static str {
264        match self {
265            Self::Primary => "primary",
266            Self::PrimaryPreferred => "primaryPreferred",
267            Self::Secondary => "secondary",
268            Self::SecondaryPreferred => "secondaryPreferred",
269            Self::Nearest => "nearest",
270            Self::Region(_) | Self::TagSet(_) => "nearest",
271        }
272    }
273
274    /// Check if this preference allows reading from primary.
275    pub fn allows_primary(&self) -> bool {
276        matches!(
277            self,
278            Self::Primary
279                | Self::PrimaryPreferred
280                | Self::Nearest
281                | Self::Region(_)
282                | Self::TagSet(_)
283        )
284    }
285
286    /// Check if this preference allows reading from secondary.
287    pub fn allows_secondary(&self) -> bool {
288        !matches!(self, Self::Primary)
289    }
290}
291
292// ============================================================================
293// Replica Health
294// ============================================================================
295
296/// Health status of a replica.
297#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
298pub enum HealthStatus {
299    /// Replica is healthy and accepting connections.
300    Healthy,
301    /// Replica is degraded (high lag, slow responses).
302    Degraded,
303    /// Replica is unhealthy (not responding).
304    Unhealthy,
305    /// Health status unknown (not yet checked).
306    Unknown,
307}
308
309/// Health information for a replica.
310#[derive(Debug, Clone)]
311pub struct ReplicaHealth {
312    /// Replica ID.
313    pub id: String,
314    /// Current health status.
315    pub status: HealthStatus,
316    /// Current replication lag (if known).
317    pub lag: Option<Duration>,
318    /// Last successful health check.
319    pub last_check: Option<Instant>,
320    /// Response latency.
321    pub latency: Option<Duration>,
322    /// Consecutive failures.
323    pub consecutive_failures: u32,
324}
325
326impl ReplicaHealth {
327    /// Create a new health record.
328    pub fn new(id: impl Into<String>) -> Self {
329        Self {
330            id: id.into(),
331            status: HealthStatus::Unknown,
332            lag: None,
333            last_check: None,
334            latency: None,
335            consecutive_failures: 0,
336        }
337    }
338
339    /// Mark as healthy.
340    pub fn mark_healthy(&mut self, latency: Duration, lag: Option<Duration>) {
341        self.status = HealthStatus::Healthy;
342        self.latency = Some(latency);
343        self.lag = lag;
344        self.last_check = Some(Instant::now());
345        self.consecutive_failures = 0;
346    }
347
348    /// Mark as degraded.
349    pub fn mark_degraded(&mut self, reason: &str) {
350        self.status = HealthStatus::Degraded;
351        self.last_check = Some(Instant::now());
352        let _ = reason; // Could log this
353    }
354
355    /// Mark as unhealthy.
356    pub fn mark_unhealthy(&mut self) {
357        self.status = HealthStatus::Unhealthy;
358        self.last_check = Some(Instant::now());
359        self.consecutive_failures += 1;
360    }
361
362    /// Check if replica is usable for queries.
363    pub fn is_usable(&self) -> bool {
364        matches!(self.status, HealthStatus::Healthy | HealthStatus::Degraded)
365    }
366}
367
368// ============================================================================
369// Connection Router
370// ============================================================================
371
372/// Query type for routing decisions.
373#[derive(Debug, Clone, Copy, PartialEq, Eq)]
374pub enum QueryType {
375    /// Read-only query.
376    Read,
377    /// Write query.
378    Write,
379    /// Transaction (requires primary).
380    Transaction,
381}
382
383/// Connection router for read/write splitting.
384#[derive(Debug)]
385pub struct ConnectionRouter {
386    /// Replica set configuration.
387    config: ReplicaSetConfig,
388    /// Health status of each replica.
389    health: HashMap<String, ReplicaHealth>,
390    /// Current primary ID.
391    current_primary: Option<String>,
392    /// Round-robin counter for load balancing.
393    round_robin: AtomicUsize,
394    /// Whether router is in failover mode.
395    in_failover: AtomicBool,
396}
397
398impl ConnectionRouter {
399    /// Create a new router.
400    pub fn new(config: ReplicaSetConfig) -> Self {
401        let mut health = HashMap::new();
402        let mut primary_id = None;
403
404        for replica in &config.replicas {
405            health.insert(replica.id.clone(), ReplicaHealth::new(&replica.id));
406            if replica.role == ReplicaRole::Primary {
407                primary_id = Some(replica.id.clone());
408            }
409        }
410
411        Self {
412            config,
413            health,
414            current_primary: primary_id,
415            round_robin: AtomicUsize::new(0),
416            in_failover: AtomicBool::new(false),
417        }
418    }
419
420    /// Get replica for a query based on query type and read preference.
421    pub fn route(
422        &self,
423        query_type: QueryType,
424        preference: Option<&ReadPreference>,
425    ) -> QueryResult<&ReplicaConfig> {
426        let pref = preference.unwrap_or(&self.config.default_read_preference);
427
428        match query_type {
429            QueryType::Write | QueryType::Transaction => self.get_primary(),
430            QueryType::Read => self.route_read(pref),
431        }
432    }
433
434    /// Get the primary replica.
435    pub fn get_primary(&self) -> QueryResult<&ReplicaConfig> {
436        let primary_id = self
437            .current_primary
438            .as_ref()
439            .ok_or_else(|| QueryError::connection("No primary replica available"))?;
440
441        self.config
442            .replicas
443            .iter()
444            .find(|r| &r.id == primary_id)
445            .ok_or_else(|| QueryError::connection("Primary replica not found"))
446    }
447
448    /// Route a read query based on preference.
449    fn route_read(&self, preference: &ReadPreference) -> QueryResult<&ReplicaConfig> {
450        match preference {
451            ReadPreference::Primary => self.get_primary(),
452            ReadPreference::PrimaryPreferred => {
453                self.get_primary().or_else(|_| self.get_any_secondary())
454            }
455            ReadPreference::Secondary => self.get_any_secondary(),
456            ReadPreference::SecondaryPreferred => {
457                self.get_any_secondary().or_else(|_| self.get_primary())
458            }
459            ReadPreference::Nearest => self.get_nearest(),
460            ReadPreference::Region(region) => self.get_in_region(region),
461            ReadPreference::TagSet(_tags) => {
462                // Simplified: just get nearest for now
463                self.get_nearest()
464            }
465        }
466    }
467
468    /// Get any healthy secondary.
469    fn get_any_secondary(&self) -> QueryResult<&ReplicaConfig> {
470        let secondaries: Vec<_> = self
471            .config
472            .secondaries()
473            .filter(|r| self.is_replica_healthy(&r.id))
474            .collect();
475
476        if secondaries.is_empty() {
477            return Err(QueryError::connection(
478                "No healthy secondary replicas available",
479            ));
480        }
481
482        // Round-robin selection
483        let idx = self.round_robin.fetch_add(1, Ordering::Relaxed) % secondaries.len();
484        Ok(secondaries[idx])
485    }
486
487    /// Get the nearest replica by latency.
488    fn get_nearest(&self) -> QueryResult<&ReplicaConfig> {
489        let mut best: Option<(&ReplicaConfig, Duration)> = None;
490
491        for replica in &self.config.replicas {
492            if !self.is_replica_healthy(&replica.id) {
493                continue;
494            }
495
496            if let Some(health) = self.health.get(&replica.id)
497                && let Some(latency) = health.latency
498            {
499                match &best {
500                    None => best = Some((replica, latency)),
501                    Some((_, best_latency)) if latency < *best_latency => {
502                        best = Some((replica, latency));
503                    }
504                    _ => {}
505                }
506            }
507        }
508
509        best.map(|(r, _)| r)
510            .ok_or_else(|| QueryError::connection("No healthy replicas available"))
511    }
512
513    /// Get replica in specific region.
514    fn get_in_region(&self, region: &str) -> QueryResult<&ReplicaConfig> {
515        let replicas: Vec<_> = self
516            .config
517            .in_region(region)
518            .filter(|r| self.is_replica_healthy(&r.id))
519            .collect();
520
521        if replicas.is_empty() {
522            // Fallback to nearest
523            return self.get_nearest();
524        }
525
526        let idx = self.round_robin.fetch_add(1, Ordering::Relaxed) % replicas.len();
527        Ok(replicas[idx])
528    }
529
530    /// Check if a replica is healthy.
531    fn is_replica_healthy(&self, id: &str) -> bool {
532        self.health.get(id).map(|h| h.is_usable()).unwrap_or(false)
533    }
534
535    /// Update health status of a replica.
536    pub fn update_health(
537        &mut self,
538        id: &str,
539        status: HealthStatus,
540        latency: Option<Duration>,
541        lag: Option<Duration>,
542    ) {
543        if let Some(health) = self.health.get_mut(id) {
544            match status {
545                HealthStatus::Healthy => {
546                    health.mark_healthy(latency.unwrap_or(Duration::ZERO), lag);
547                }
548                HealthStatus::Degraded => {
549                    health.mark_degraded("degraded");
550                }
551                HealthStatus::Unhealthy => {
552                    health.mark_unhealthy();
553                }
554                HealthStatus::Unknown => {}
555            }
556        }
557    }
558
559    /// Check if replication lag is acceptable.
560    pub fn check_lag(&self, replica_id: &str, max_lag: Duration) -> bool {
561        self.health
562            .get(replica_id)
563            .and_then(|h| h.lag)
564            .map(|lag| lag <= max_lag)
565            .unwrap_or(false)
566    }
567
568    /// Initiate failover to a new primary.
569    pub fn initiate_failover(&mut self) -> QueryResult<String> {
570        self.in_failover.store(true, Ordering::SeqCst);
571
572        // Find best candidate (highest priority secondary that is healthy)
573        let candidate = self
574            .config
575            .replicas
576            .iter()
577            .filter(|r| r.role == ReplicaRole::Secondary)
578            .filter(|r| self.is_replica_healthy(&r.id))
579            .max_by_key(|r| r.priority);
580
581        match candidate {
582            Some(new_primary) => {
583                let new_primary_id = new_primary.id.clone();
584                self.current_primary = Some(new_primary_id.clone());
585                self.in_failover.store(false, Ordering::SeqCst);
586                Ok(new_primary_id)
587            }
588            None => {
589                self.in_failover.store(false, Ordering::SeqCst);
590                Err(QueryError::connection(
591                    "No suitable failover candidate found",
592                ))
593            }
594        }
595    }
596
597    /// Check if router is currently in failover mode.
598    pub fn is_in_failover(&self) -> bool {
599        self.in_failover.load(Ordering::SeqCst)
600    }
601}
602
603// ============================================================================
604// Replication Lag Monitor
605// ============================================================================
606
607/// Monitor for tracking replication lag.
608#[derive(Debug)]
609pub struct LagMonitor {
610    /// Lag measurements per replica.
611    measurements: HashMap<String, LagMeasurement>,
612    /// Maximum acceptable lag.
613    max_acceptable_lag: Duration,
614}
615
616/// Lag measurement for a replica.
617#[derive(Debug, Clone)]
618pub struct LagMeasurement {
619    /// Current lag.
620    pub current: Duration,
621    /// Average lag.
622    pub average: Duration,
623    /// Maximum observed lag.
624    pub max: Duration,
625    /// Timestamp of measurement.
626    pub timestamp: Instant,
627    /// Number of samples.
628    pub samples: u64,
629}
630
631impl LagMonitor {
632    /// Create a new lag monitor.
633    pub fn new(max_acceptable_lag: Duration) -> Self {
634        Self {
635            measurements: HashMap::new(),
636            max_acceptable_lag,
637        }
638    }
639
640    /// Record a lag measurement.
641    pub fn record(&mut self, replica_id: &str, lag: Duration) {
642        let entry = self
643            .measurements
644            .entry(replica_id.to_string())
645            .or_insert_with(|| LagMeasurement {
646                current: Duration::ZERO,
647                average: Duration::ZERO,
648                max: Duration::ZERO,
649                timestamp: Instant::now(),
650                samples: 0,
651            });
652
653        entry.current = lag;
654        entry.max = entry.max.max(lag);
655        entry.samples += 1;
656
657        // Exponential moving average
658        let alpha = 0.3;
659        let new_avg = Duration::from_secs_f64(
660            entry.average.as_secs_f64() * (1.0 - alpha) + lag.as_secs_f64() * alpha,
661        );
662        entry.average = new_avg;
663        entry.timestamp = Instant::now();
664    }
665
666    /// Check if a replica is within acceptable lag.
667    pub fn is_acceptable(&self, replica_id: &str) -> bool {
668        self.measurements
669            .get(replica_id)
670            .map(|m| m.current <= self.max_acceptable_lag)
671            .unwrap_or(true) // Unknown = assume OK
672    }
673
674    /// Get current lag for a replica.
675    pub fn get_lag(&self, replica_id: &str) -> Option<Duration> {
676        self.measurements.get(replica_id).map(|m| m.current)
677    }
678
679    /// Get all replicas that are lagging too much.
680    pub fn get_lagging_replicas(&self) -> Vec<&str> {
681        self.measurements
682            .iter()
683            .filter(|(_, m)| m.current > self.max_acceptable_lag)
684            .map(|(id, _)| id.as_str())
685            .collect()
686    }
687}
688
689// ============================================================================
690// SQL Helpers for Replication Lag
691// ============================================================================
692
693/// SQL queries for checking replication lag.
694pub mod lag_queries {
695    use crate::sql::DatabaseType;
696
697    /// Generate SQL to check replication lag.
698    pub fn check_lag_sql(db_type: DatabaseType) -> &'static str {
699        match db_type {
700            DatabaseType::PostgreSQL => {
701                // Returns lag in seconds
702                "SELECT EXTRACT(EPOCH FROM (now() - pg_last_xact_replay_timestamp()))::INT AS lag_seconds"
703            }
704            DatabaseType::MySQL => {
705                // Returns Seconds_Behind_Master
706                "SHOW SLAVE STATUS"
707            }
708            DatabaseType::MSSQL => {
709                // Check AG synchronization state
710                "SELECT datediff(s, last_commit_time, getdate()) AS lag_seconds \
711                 FROM sys.dm_hadr_database_replica_states \
712                 WHERE is_local = 1"
713            }
714            DatabaseType::SQLite => {
715                // SQLite doesn't have replication
716                "SELECT 0 AS lag_seconds"
717            }
718        }
719    }
720
721    /// Generate SQL to check if replica is primary.
722    pub fn is_primary_sql(db_type: DatabaseType) -> &'static str {
723        match db_type {
724            DatabaseType::PostgreSQL => "SELECT NOT pg_is_in_recovery() AS is_primary",
725            DatabaseType::MySQL => "SELECT @@read_only = 0 AS is_primary",
726            DatabaseType::MSSQL => {
727                "SELECT CASE WHEN role = 1 THEN 1 ELSE 0 END AS is_primary \
728                 FROM sys.dm_hadr_availability_replica_states \
729                 WHERE is_local = 1"
730            }
731            DatabaseType::SQLite => "SELECT 1 AS is_primary",
732        }
733    }
734
735    /// Generate SQL to get replica status.
736    pub fn replica_status_sql(db_type: DatabaseType) -> &'static str {
737        match db_type {
738            DatabaseType::PostgreSQL => {
739                "SELECT \
740                     pg_is_in_recovery() AS is_replica, \
741                     pg_last_wal_receive_lsn() AS receive_lsn, \
742                     pg_last_wal_replay_lsn() AS replay_lsn"
743            }
744            DatabaseType::MySQL => "SHOW REPLICA STATUS",
745            DatabaseType::MSSQL => {
746                "SELECT synchronization_state_desc, synchronization_health_desc \
747                 FROM sys.dm_hadr_database_replica_states \
748                 WHERE is_local = 1"
749            }
750            DatabaseType::SQLite => "SELECT 'primary' AS status",
751        }
752    }
753}
754
755// ============================================================================
756// MongoDB Specific
757// ============================================================================
758
759/// MongoDB-specific replication types.
760pub mod mongodb {
761    use serde::{Deserialize, Serialize};
762    use serde_json::Value as JsonValue;
763
764    use super::ReadPreference;
765
766    /// MongoDB read concern level.
767    #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
768    pub enum ReadConcern {
769        /// Read from local data (fastest, may be stale).
770        Local,
771        /// Read data committed to majority of nodes.
772        Majority,
773        /// Linearizable reads (strongest, slowest).
774        Linearizable,
775        /// Read data available at query start.
776        Snapshot,
777        /// Read available data (may return orphaned docs).
778        Available,
779    }
780
781    impl ReadConcern {
782        /// Convert to MongoDB string.
783        pub fn as_str(&self) -> &'static str {
784            match self {
785                Self::Local => "local",
786                Self::Majority => "majority",
787                Self::Linearizable => "linearizable",
788                Self::Snapshot => "snapshot",
789                Self::Available => "available",
790            }
791        }
792    }
793
794    /// MongoDB write concern level.
795    #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
796    pub enum WriteConcern {
797        /// Acknowledge after writing to primary.
798        W1,
799        /// Acknowledge after writing to majority.
800        Majority,
801        /// Acknowledge after writing to specific number of nodes.
802        W(u32),
803        /// Acknowledge after writing to specific tag set.
804        Tag(String),
805    }
806
807    impl WriteConcern {
808        /// Convert to MongoDB options.
809        pub fn to_options(&self) -> JsonValue {
810            match self {
811                Self::W1 => serde_json::json!({ "w": 1 }),
812                Self::Majority => serde_json::json!({ "w": "majority" }),
813                Self::W(n) => serde_json::json!({ "w": n }),
814                Self::Tag(tag) => serde_json::json!({ "w": tag }),
815            }
816        }
817    }
818
819    /// MongoDB read preference with options.
820    #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
821    pub struct MongoReadPreference {
822        /// Read preference mode.
823        pub mode: ReadPreference,
824        /// Maximum staleness in seconds.
825        pub max_staleness_seconds: Option<u32>,
826        /// Tag sets for filtering replicas.
827        pub tag_sets: Vec<serde_json::Map<String, JsonValue>>,
828        /// Hedge reads for sharded clusters.
829        pub hedge: Option<bool>,
830    }
831
832    impl MongoReadPreference {
833        /// Create a new read preference.
834        pub fn new(mode: ReadPreference) -> Self {
835            Self {
836                mode,
837                max_staleness_seconds: None,
838                tag_sets: Vec::new(),
839                hedge: None,
840            }
841        }
842
843        /// Set max staleness.
844        pub fn max_staleness(mut self, seconds: u32) -> Self {
845            self.max_staleness_seconds = Some(seconds);
846            self
847        }
848
849        /// Add a tag set.
850        pub fn tag_set(mut self, tags: serde_json::Map<String, JsonValue>) -> Self {
851            self.tag_sets.push(tags);
852            self
853        }
854
855        /// Enable hedged reads.
856        pub fn hedged(mut self) -> Self {
857            self.hedge = Some(true);
858            self
859        }
860
861        /// Convert to MongoDB connection string options.
862        pub fn to_connection_options(&self) -> String {
863            let mut opts = vec![format!("readPreference={}", self.mode.to_mongodb())];
864
865            if let Some(staleness) = self.max_staleness_seconds {
866                opts.push(format!("maxStalenessSeconds={}", staleness));
867            }
868
869            opts.join("&")
870        }
871
872        /// Convert to MongoDB command options.
873        pub fn to_command_options(&self) -> JsonValue {
874            let mut opts = serde_json::Map::new();
875            opts.insert(
876                "mode".to_string(),
877                serde_json::json!(self.mode.to_mongodb()),
878            );
879
880            if let Some(staleness) = self.max_staleness_seconds {
881                opts.insert(
882                    "maxStalenessSeconds".to_string(),
883                    serde_json::json!(staleness),
884                );
885            }
886
887            if !self.tag_sets.is_empty() {
888                opts.insert("tagSets".to_string(), serde_json::json!(self.tag_sets));
889            }
890
891            if let Some(hedge) = self.hedge {
892                opts.insert("hedge".to_string(), serde_json::json!({ "enabled": hedge }));
893            }
894
895            serde_json::json!(opts)
896        }
897    }
898
899    /// MongoDB replica set status (from replSetGetStatus).
900    #[derive(Debug, Clone, Serialize, Deserialize)]
901    pub struct ReplicaSetStatus {
902        /// Replica set name.
903        pub set: String,
904        /// Members.
905        pub members: Vec<MemberStatus>,
906    }
907
908    /// Status of a replica set member.
909    #[derive(Debug, Clone, Serialize, Deserialize)]
910    pub struct MemberStatus {
911        /// Member ID.
912        pub id: u32,
913        /// Member name (host:port).
914        pub name: String,
915        /// State (PRIMARY, SECONDARY, etc.).
916        pub state_str: String,
917        /// Health (1 = healthy).
918        pub health: f64,
919        /// Replication lag in seconds.
920        #[serde(default)]
921        pub lag_seconds: Option<i64>,
922    }
923
924    impl MemberStatus {
925        /// Check if this member is primary.
926        pub fn is_primary(&self) -> bool {
927            self.state_str == "PRIMARY"
928        }
929
930        /// Check if this member is secondary.
931        pub fn is_secondary(&self) -> bool {
932            self.state_str == "SECONDARY"
933        }
934
935        /// Check if this member is healthy.
936        pub fn is_healthy(&self) -> bool {
937            self.health >= 1.0
938        }
939    }
940}
941
942#[cfg(test)]
943mod tests {
944    use super::*;
945
946    #[test]
947    fn test_replica_config() {
948        let primary =
949            ReplicaConfig::primary("pg1", "postgres://primary:5432/db").with_region("us-east-1");
950
951        assert_eq!(primary.role, ReplicaRole::Primary);
952        assert_eq!(primary.region.as_deref(), Some("us-east-1"));
953    }
954
955    #[test]
956    fn test_replica_set_builder() {
957        let config = ReplicaSetConfig::new("myapp")
958            .primary("pg1", "postgres://primary:5432/db")
959            .secondary("pg2", "postgres://secondary1:5432/db")
960            .secondary("pg3", "postgres://secondary2:5432/db")
961            .read_preference(ReadPreference::SecondaryPreferred)
962            .build();
963
964        assert_eq!(config.name, "myapp");
965        assert_eq!(config.replicas.len(), 3);
966        assert!(config.primary().is_some());
967        assert_eq!(config.secondaries().count(), 2);
968    }
969
970    #[test]
971    fn test_read_preference_mongodb() {
972        assert_eq!(ReadPreference::Primary.to_mongodb(), "primary");
973        assert_eq!(
974            ReadPreference::SecondaryPreferred.to_mongodb(),
975            "secondaryPreferred"
976        );
977        assert_eq!(ReadPreference::Nearest.to_mongodb(), "nearest");
978    }
979
980    #[test]
981    fn test_connection_router_write() {
982        let config = ReplicaSetConfig::new("test")
983            .primary("pg1", "postgres://primary:5432/db")
984            .secondary("pg2", "postgres://secondary:5432/db")
985            .build();
986
987        let mut router = ConnectionRouter::new(config);
988
989        // Mark replicas as healthy
990        router.update_health(
991            "pg1",
992            HealthStatus::Healthy,
993            Some(Duration::from_millis(5)),
994            None,
995        );
996        router.update_health(
997            "pg2",
998            HealthStatus::Healthy,
999            Some(Duration::from_millis(10)),
1000            Some(Duration::from_secs(1)),
1001        );
1002
1003        // Write should go to primary
1004        let target = router.route(QueryType::Write, None).unwrap();
1005        assert_eq!(target.id, "pg1");
1006    }
1007
1008    #[test]
1009    fn test_connection_router_read_secondary() {
1010        let config = ReplicaSetConfig::new("test")
1011            .primary("pg1", "postgres://primary:5432/db")
1012            .secondary("pg2", "postgres://secondary:5432/db")
1013            .read_preference(ReadPreference::Secondary)
1014            .build();
1015
1016        let mut router = ConnectionRouter::new(config);
1017        router.update_health(
1018            "pg1",
1019            HealthStatus::Healthy,
1020            Some(Duration::from_millis(5)),
1021            None,
1022        );
1023        router.update_health(
1024            "pg2",
1025            HealthStatus::Healthy,
1026            Some(Duration::from_millis(10)),
1027            Some(Duration::from_secs(1)),
1028        );
1029
1030        // Read with Secondary preference should go to secondary
1031        let target = router.route(QueryType::Read, None).unwrap();
1032        assert_eq!(target.id, "pg2");
1033    }
1034
1035    #[test]
1036    fn test_lag_monitor() {
1037        let mut monitor = LagMonitor::new(Duration::from_secs(10));
1038
1039        monitor.record("pg2", Duration::from_secs(5));
1040        assert!(monitor.is_acceptable("pg2"));
1041
1042        monitor.record("pg3", Duration::from_secs(15));
1043        assert!(!monitor.is_acceptable("pg3"));
1044
1045        let lagging = monitor.get_lagging_replicas();
1046        assert_eq!(lagging, vec!["pg3"]);
1047    }
1048
1049    #[test]
1050    fn test_failover() {
1051        let config = ReplicaSetConfig::new("test")
1052            .primary("pg1", "postgres://primary:5432/db")
1053            .replica(
1054                ReplicaConfig::secondary("pg2", "postgres://secondary1:5432/db").with_priority(80),
1055            )
1056            .replica(
1057                ReplicaConfig::secondary("pg3", "postgres://secondary2:5432/db").with_priority(60),
1058            )
1059            .build();
1060
1061        let mut router = ConnectionRouter::new(config);
1062        router.update_health("pg1", HealthStatus::Unhealthy, None, None);
1063        router.update_health(
1064            "pg2",
1065            HealthStatus::Healthy,
1066            Some(Duration::from_millis(10)),
1067            None,
1068        );
1069        router.update_health(
1070            "pg3",
1071            HealthStatus::Healthy,
1072            Some(Duration::from_millis(15)),
1073            None,
1074        );
1075
1076        let new_primary = router.initiate_failover().unwrap();
1077        assert_eq!(new_primary, "pg2"); // Higher priority
1078    }
1079
1080    mod mongodb_tests {
1081        use super::super::mongodb::*;
1082        use super::*;
1083
1084        #[test]
1085        fn test_read_concern() {
1086            assert_eq!(ReadConcern::Majority.as_str(), "majority");
1087            assert_eq!(ReadConcern::Local.as_str(), "local");
1088        }
1089
1090        #[test]
1091        fn test_write_concern() {
1092            let w = WriteConcern::Majority;
1093            let opts = w.to_options();
1094            assert_eq!(opts["w"], "majority");
1095
1096            let w2 = WriteConcern::W(3);
1097            let opts2 = w2.to_options();
1098            assert_eq!(opts2["w"], 3);
1099        }
1100
1101        #[test]
1102        fn test_mongo_read_preference() {
1103            let pref = MongoReadPreference::new(ReadPreference::SecondaryPreferred)
1104                .max_staleness(90)
1105                .hedged();
1106
1107            let conn_opts = pref.to_connection_options();
1108            assert!(conn_opts.contains("readPreference=secondaryPreferred"));
1109            assert!(conn_opts.contains("maxStalenessSeconds=90"));
1110
1111            let cmd_opts = pref.to_command_options();
1112            assert_eq!(cmd_opts["mode"], "secondaryPreferred");
1113            assert_eq!(cmd_opts["maxStalenessSeconds"], 90);
1114        }
1115    }
1116}