1use std::collections::HashMap;
17use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
18use std::time::{Duration, Instant};
19
20use serde::{Deserialize, Serialize};
21
22use crate::error::{QueryError, QueryResult};
23
24#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
30pub struct ReplicaConfig {
31 pub id: String,
33 pub url: String,
35 pub role: ReplicaRole,
37 pub priority: u32,
39 pub weight: u32,
41 pub region: Option<String>,
43 pub max_lag: Option<Duration>,
45}
46
47#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
49pub enum ReplicaRole {
50 Primary,
52 Secondary,
54 Arbiter,
56 Hidden,
58}
59
60impl ReplicaConfig {
61 pub fn primary(id: impl Into<String>, url: impl Into<String>) -> Self {
63 Self {
64 id: id.into(),
65 url: url.into(),
66 role: ReplicaRole::Primary,
67 priority: 100,
68 weight: 100,
69 region: None,
70 max_lag: None,
71 }
72 }
73
74 pub fn secondary(id: impl Into<String>, url: impl Into<String>) -> Self {
76 Self {
77 id: id.into(),
78 url: url.into(),
79 role: ReplicaRole::Secondary,
80 priority: 50,
81 weight: 100,
82 region: None,
83 max_lag: Some(Duration::from_secs(10)),
84 }
85 }
86
87 pub fn with_region(mut self, region: impl Into<String>) -> Self {
89 self.region = Some(region.into());
90 self
91 }
92
93 pub fn with_weight(mut self, weight: u32) -> Self {
95 self.weight = weight;
96 self
97 }
98
99 pub fn with_priority(mut self, priority: u32) -> Self {
101 self.priority = priority;
102 self
103 }
104
105 pub fn with_max_lag(mut self, lag: Duration) -> Self {
107 self.max_lag = Some(lag);
108 self
109 }
110}
111
112#[derive(Debug, Clone, Serialize, Deserialize)]
118pub struct ReplicaSetConfig {
119 pub name: String,
121 pub replicas: Vec<ReplicaConfig>,
123 pub default_read_preference: ReadPreference,
125 pub health_check_interval: Duration,
127 pub failover_timeout: Duration,
129}
130
131impl ReplicaSetConfig {
132 pub fn new(name: impl Into<String>) -> ReplicaSetBuilder {
134 ReplicaSetBuilder::new(name)
135 }
136
137 pub fn primary(&self) -> Option<&ReplicaConfig> {
139 self.replicas.iter().find(|r| r.role == ReplicaRole::Primary)
140 }
141
142 pub fn secondaries(&self) -> impl Iterator<Item = &ReplicaConfig> {
144 self.replicas.iter().filter(|r| r.role == ReplicaRole::Secondary)
145 }
146
147 pub fn in_region(&self, region: &str) -> impl Iterator<Item = &ReplicaConfig> {
149 self.replicas
150 .iter()
151 .filter(move |r| r.region.as_deref() == Some(region))
152 }
153}
154
155#[derive(Debug, Clone)]
157pub struct ReplicaSetBuilder {
158 name: String,
159 replicas: Vec<ReplicaConfig>,
160 default_read_preference: ReadPreference,
161 health_check_interval: Duration,
162 failover_timeout: Duration,
163}
164
165impl ReplicaSetBuilder {
166 pub fn new(name: impl Into<String>) -> Self {
168 Self {
169 name: name.into(),
170 replicas: Vec::new(),
171 default_read_preference: ReadPreference::Primary,
172 health_check_interval: Duration::from_secs(10),
173 failover_timeout: Duration::from_secs(30),
174 }
175 }
176
177 pub fn replica(mut self, config: ReplicaConfig) -> Self {
179 self.replicas.push(config);
180 self
181 }
182
183 pub fn primary(self, id: impl Into<String>, url: impl Into<String>) -> Self {
185 self.replica(ReplicaConfig::primary(id, url))
186 }
187
188 pub fn secondary(self, id: impl Into<String>, url: impl Into<String>) -> Self {
190 self.replica(ReplicaConfig::secondary(id, url))
191 }
192
193 pub fn read_preference(mut self, pref: ReadPreference) -> Self {
195 self.default_read_preference = pref;
196 self
197 }
198
199 pub fn health_check_interval(mut self, interval: Duration) -> Self {
201 self.health_check_interval = interval;
202 self
203 }
204
205 pub fn failover_timeout(mut self, timeout: Duration) -> Self {
207 self.failover_timeout = timeout;
208 self
209 }
210
211 pub fn build(self) -> ReplicaSetConfig {
213 ReplicaSetConfig {
214 name: self.name,
215 replicas: self.replicas,
216 default_read_preference: self.default_read_preference,
217 health_check_interval: self.health_check_interval,
218 failover_timeout: self.failover_timeout,
219 }
220 }
221}
222
223#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
229pub enum ReadPreference {
230 Primary,
232 PrimaryPreferred,
234 Secondary,
236 SecondaryPreferred,
238 Nearest,
240 Region(String),
242 TagSet(Vec<HashMap<String, String>>),
244}
245
246impl ReadPreference {
247 pub fn region(region: impl Into<String>) -> Self {
249 Self::Region(region.into())
250 }
251
252 pub fn tag_set(tags: Vec<HashMap<String, String>>) -> Self {
254 Self::TagSet(tags)
255 }
256
257 pub fn to_mongodb(&self) -> &'static str {
259 match self {
260 Self::Primary => "primary",
261 Self::PrimaryPreferred => "primaryPreferred",
262 Self::Secondary => "secondary",
263 Self::SecondaryPreferred => "secondaryPreferred",
264 Self::Nearest => "nearest",
265 Self::Region(_) | Self::TagSet(_) => "nearest",
266 }
267 }
268
269 pub fn allows_primary(&self) -> bool {
271 matches!(
272 self,
273 Self::Primary | Self::PrimaryPreferred | Self::Nearest | Self::Region(_) | Self::TagSet(_)
274 )
275 }
276
277 pub fn allows_secondary(&self) -> bool {
279 !matches!(self, Self::Primary)
280 }
281}
282
283impl Default for ReadPreference {
284 fn default() -> Self {
285 Self::Primary
286 }
287}
288
289#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
295pub enum HealthStatus {
296 Healthy,
298 Degraded,
300 Unhealthy,
302 Unknown,
304}
305
306#[derive(Debug, Clone)]
308pub struct ReplicaHealth {
309 pub id: String,
311 pub status: HealthStatus,
313 pub lag: Option<Duration>,
315 pub last_check: Option<Instant>,
317 pub latency: Option<Duration>,
319 pub consecutive_failures: u32,
321}
322
323impl ReplicaHealth {
324 pub fn new(id: impl Into<String>) -> Self {
326 Self {
327 id: id.into(),
328 status: HealthStatus::Unknown,
329 lag: None,
330 last_check: None,
331 latency: None,
332 consecutive_failures: 0,
333 }
334 }
335
336 pub fn mark_healthy(&mut self, latency: Duration, lag: Option<Duration>) {
338 self.status = HealthStatus::Healthy;
339 self.latency = Some(latency);
340 self.lag = lag;
341 self.last_check = Some(Instant::now());
342 self.consecutive_failures = 0;
343 }
344
345 pub fn mark_degraded(&mut self, reason: &str) {
347 self.status = HealthStatus::Degraded;
348 self.last_check = Some(Instant::now());
349 let _ = reason; }
351
352 pub fn mark_unhealthy(&mut self) {
354 self.status = HealthStatus::Unhealthy;
355 self.last_check = Some(Instant::now());
356 self.consecutive_failures += 1;
357 }
358
359 pub fn is_usable(&self) -> bool {
361 matches!(self.status, HealthStatus::Healthy | HealthStatus::Degraded)
362 }
363}
364
365#[derive(Debug, Clone, Copy, PartialEq, Eq)]
371pub enum QueryType {
372 Read,
374 Write,
376 Transaction,
378}
379
380#[derive(Debug)]
382pub struct ConnectionRouter {
383 config: ReplicaSetConfig,
385 health: HashMap<String, ReplicaHealth>,
387 current_primary: Option<String>,
389 round_robin: AtomicUsize,
391 in_failover: AtomicBool,
393}
394
395impl ConnectionRouter {
396 pub fn new(config: ReplicaSetConfig) -> Self {
398 let mut health = HashMap::new();
399 let mut primary_id = None;
400
401 for replica in &config.replicas {
402 health.insert(replica.id.clone(), ReplicaHealth::new(&replica.id));
403 if replica.role == ReplicaRole::Primary {
404 primary_id = Some(replica.id.clone());
405 }
406 }
407
408 Self {
409 config,
410 health,
411 current_primary: primary_id,
412 round_robin: AtomicUsize::new(0),
413 in_failover: AtomicBool::new(false),
414 }
415 }
416
417 pub fn route(&self, query_type: QueryType, preference: Option<&ReadPreference>) -> QueryResult<&ReplicaConfig> {
419 let pref = preference.unwrap_or(&self.config.default_read_preference);
420
421 match query_type {
422 QueryType::Write | QueryType::Transaction => self.get_primary(),
423 QueryType::Read => self.route_read(pref),
424 }
425 }
426
427 pub fn get_primary(&self) -> QueryResult<&ReplicaConfig> {
429 let primary_id = self.current_primary.as_ref().ok_or_else(|| {
430 QueryError::connection("No primary replica available")
431 })?;
432
433 self.config
434 .replicas
435 .iter()
436 .find(|r| &r.id == primary_id)
437 .ok_or_else(|| QueryError::connection("Primary replica not found"))
438 }
439
440 fn route_read(&self, preference: &ReadPreference) -> QueryResult<&ReplicaConfig> {
442 match preference {
443 ReadPreference::Primary => self.get_primary(),
444 ReadPreference::PrimaryPreferred => {
445 self.get_primary().or_else(|_| self.get_any_secondary())
446 }
447 ReadPreference::Secondary => self.get_any_secondary(),
448 ReadPreference::SecondaryPreferred => {
449 self.get_any_secondary().or_else(|_| self.get_primary())
450 }
451 ReadPreference::Nearest => self.get_nearest(),
452 ReadPreference::Region(region) => self.get_in_region(region),
453 ReadPreference::TagSet(_tags) => {
454 self.get_nearest()
456 }
457 }
458 }
459
460 fn get_any_secondary(&self) -> QueryResult<&ReplicaConfig> {
462 let secondaries: Vec<_> = self
463 .config
464 .secondaries()
465 .filter(|r| self.is_replica_healthy(&r.id))
466 .collect();
467
468 if secondaries.is_empty() {
469 return Err(QueryError::connection("No healthy secondary replicas available"));
470 }
471
472 let idx = self.round_robin.fetch_add(1, Ordering::Relaxed) % secondaries.len();
474 Ok(secondaries[idx])
475 }
476
477 fn get_nearest(&self) -> QueryResult<&ReplicaConfig> {
479 let mut best: Option<(&ReplicaConfig, Duration)> = None;
480
481 for replica in &self.config.replicas {
482 if !self.is_replica_healthy(&replica.id) {
483 continue;
484 }
485
486 if let Some(health) = self.health.get(&replica.id) {
487 if let Some(latency) = health.latency {
488 match &best {
489 None => best = Some((replica, latency)),
490 Some((_, best_latency)) if latency < *best_latency => {
491 best = Some((replica, latency));
492 }
493 _ => {}
494 }
495 }
496 }
497 }
498
499 best.map(|(r, _)| r)
500 .ok_or_else(|| QueryError::connection("No healthy replicas available"))
501 }
502
503 fn get_in_region(&self, region: &str) -> QueryResult<&ReplicaConfig> {
505 let replicas: Vec<_> = self
506 .config
507 .in_region(region)
508 .filter(|r| self.is_replica_healthy(&r.id))
509 .collect();
510
511 if replicas.is_empty() {
512 return self.get_nearest();
514 }
515
516 let idx = self.round_robin.fetch_add(1, Ordering::Relaxed) % replicas.len();
517 Ok(replicas[idx])
518 }
519
520 fn is_replica_healthy(&self, id: &str) -> bool {
522 self.health
523 .get(id)
524 .map(|h| h.is_usable())
525 .unwrap_or(false)
526 }
527
528 pub fn update_health(&mut self, id: &str, status: HealthStatus, latency: Option<Duration>, lag: Option<Duration>) {
530 if let Some(health) = self.health.get_mut(id) {
531 match status {
532 HealthStatus::Healthy => {
533 health.mark_healthy(latency.unwrap_or(Duration::ZERO), lag);
534 }
535 HealthStatus::Degraded => {
536 health.mark_degraded("degraded");
537 }
538 HealthStatus::Unhealthy => {
539 health.mark_unhealthy();
540 }
541 HealthStatus::Unknown => {}
542 }
543 }
544 }
545
546 pub fn check_lag(&self, replica_id: &str, max_lag: Duration) -> bool {
548 self.health
549 .get(replica_id)
550 .and_then(|h| h.lag)
551 .map(|lag| lag <= max_lag)
552 .unwrap_or(false)
553 }
554
555 pub fn initiate_failover(&mut self) -> QueryResult<String> {
557 self.in_failover.store(true, Ordering::SeqCst);
558
559 let candidate = self
561 .config
562 .replicas
563 .iter()
564 .filter(|r| r.role == ReplicaRole::Secondary)
565 .filter(|r| self.is_replica_healthy(&r.id))
566 .max_by_key(|r| r.priority);
567
568 match candidate {
569 Some(new_primary) => {
570 let new_primary_id = new_primary.id.clone();
571 self.current_primary = Some(new_primary_id.clone());
572 self.in_failover.store(false, Ordering::SeqCst);
573 Ok(new_primary_id)
574 }
575 None => {
576 self.in_failover.store(false, Ordering::SeqCst);
577 Err(QueryError::connection("No suitable failover candidate found"))
578 }
579 }
580 }
581
582 pub fn is_in_failover(&self) -> bool {
584 self.in_failover.load(Ordering::SeqCst)
585 }
586}
587
588#[derive(Debug)]
594pub struct LagMonitor {
595 measurements: HashMap<String, LagMeasurement>,
597 max_acceptable_lag: Duration,
599}
600
601#[derive(Debug, Clone)]
603pub struct LagMeasurement {
604 pub current: Duration,
606 pub average: Duration,
608 pub max: Duration,
610 pub timestamp: Instant,
612 pub samples: u64,
614}
615
616impl LagMonitor {
617 pub fn new(max_acceptable_lag: Duration) -> Self {
619 Self {
620 measurements: HashMap::new(),
621 max_acceptable_lag,
622 }
623 }
624
625 pub fn record(&mut self, replica_id: &str, lag: Duration) {
627 let entry = self.measurements.entry(replica_id.to_string()).or_insert_with(|| {
628 LagMeasurement {
629 current: Duration::ZERO,
630 average: Duration::ZERO,
631 max: Duration::ZERO,
632 timestamp: Instant::now(),
633 samples: 0,
634 }
635 });
636
637 entry.current = lag;
638 entry.max = entry.max.max(lag);
639 entry.samples += 1;
640
641 let alpha = 0.3;
643 let new_avg = Duration::from_secs_f64(
644 entry.average.as_secs_f64() * (1.0 - alpha) + lag.as_secs_f64() * alpha,
645 );
646 entry.average = new_avg;
647 entry.timestamp = Instant::now();
648 }
649
650 pub fn is_acceptable(&self, replica_id: &str) -> bool {
652 self.measurements
653 .get(replica_id)
654 .map(|m| m.current <= self.max_acceptable_lag)
655 .unwrap_or(true) }
657
658 pub fn get_lag(&self, replica_id: &str) -> Option<Duration> {
660 self.measurements.get(replica_id).map(|m| m.current)
661 }
662
663 pub fn get_lagging_replicas(&self) -> Vec<&str> {
665 self.measurements
666 .iter()
667 .filter(|(_, m)| m.current > self.max_acceptable_lag)
668 .map(|(id, _)| id.as_str())
669 .collect()
670 }
671}
672
673pub mod lag_queries {
679 use crate::sql::DatabaseType;
680
681 pub fn check_lag_sql(db_type: DatabaseType) -> &'static str {
683 match db_type {
684 DatabaseType::PostgreSQL => {
685 "SELECT EXTRACT(EPOCH FROM (now() - pg_last_xact_replay_timestamp()))::INT AS lag_seconds"
687 }
688 DatabaseType::MySQL => {
689 "SHOW SLAVE STATUS"
691 }
692 DatabaseType::MSSQL => {
693 "SELECT datediff(s, last_commit_time, getdate()) AS lag_seconds \
695 FROM sys.dm_hadr_database_replica_states \
696 WHERE is_local = 1"
697 }
698 DatabaseType::SQLite => {
699 "SELECT 0 AS lag_seconds"
701 }
702 }
703 }
704
705 pub fn is_primary_sql(db_type: DatabaseType) -> &'static str {
707 match db_type {
708 DatabaseType::PostgreSQL => "SELECT NOT pg_is_in_recovery() AS is_primary",
709 DatabaseType::MySQL => "SELECT @@read_only = 0 AS is_primary",
710 DatabaseType::MSSQL => {
711 "SELECT CASE WHEN role = 1 THEN 1 ELSE 0 END AS is_primary \
712 FROM sys.dm_hadr_availability_replica_states \
713 WHERE is_local = 1"
714 }
715 DatabaseType::SQLite => "SELECT 1 AS is_primary",
716 }
717 }
718
719 pub fn replica_status_sql(db_type: DatabaseType) -> &'static str {
721 match db_type {
722 DatabaseType::PostgreSQL => {
723 "SELECT \
724 pg_is_in_recovery() AS is_replica, \
725 pg_last_wal_receive_lsn() AS receive_lsn, \
726 pg_last_wal_replay_lsn() AS replay_lsn"
727 }
728 DatabaseType::MySQL => "SHOW REPLICA STATUS",
729 DatabaseType::MSSQL => {
730 "SELECT synchronization_state_desc, synchronization_health_desc \
731 FROM sys.dm_hadr_database_replica_states \
732 WHERE is_local = 1"
733 }
734 DatabaseType::SQLite => "SELECT 'primary' AS status",
735 }
736 }
737}
738
739pub mod mongodb {
745 use serde::{Deserialize, Serialize};
746 use serde_json::Value as JsonValue;
747
748 use super::ReadPreference;
749
750 #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
752 pub enum ReadConcern {
753 Local,
755 Majority,
757 Linearizable,
759 Snapshot,
761 Available,
763 }
764
765 impl ReadConcern {
766 pub fn as_str(&self) -> &'static str {
768 match self {
769 Self::Local => "local",
770 Self::Majority => "majority",
771 Self::Linearizable => "linearizable",
772 Self::Snapshot => "snapshot",
773 Self::Available => "available",
774 }
775 }
776 }
777
778 #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
780 pub enum WriteConcern {
781 W1,
783 Majority,
785 W(u32),
787 Tag(String),
789 }
790
791 impl WriteConcern {
792 pub fn to_options(&self) -> JsonValue {
794 match self {
795 Self::W1 => serde_json::json!({ "w": 1 }),
796 Self::Majority => serde_json::json!({ "w": "majority" }),
797 Self::W(n) => serde_json::json!({ "w": n }),
798 Self::Tag(tag) => serde_json::json!({ "w": tag }),
799 }
800 }
801 }
802
803 #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
805 pub struct MongoReadPreference {
806 pub mode: ReadPreference,
808 pub max_staleness_seconds: Option<u32>,
810 pub tag_sets: Vec<serde_json::Map<String, JsonValue>>,
812 pub hedge: Option<bool>,
814 }
815
816 impl MongoReadPreference {
817 pub fn new(mode: ReadPreference) -> Self {
819 Self {
820 mode,
821 max_staleness_seconds: None,
822 tag_sets: Vec::new(),
823 hedge: None,
824 }
825 }
826
827 pub fn max_staleness(mut self, seconds: u32) -> Self {
829 self.max_staleness_seconds = Some(seconds);
830 self
831 }
832
833 pub fn tag_set(mut self, tags: serde_json::Map<String, JsonValue>) -> Self {
835 self.tag_sets.push(tags);
836 self
837 }
838
839 pub fn hedged(mut self) -> Self {
841 self.hedge = Some(true);
842 self
843 }
844
845 pub fn to_connection_options(&self) -> String {
847 let mut opts = vec![format!("readPreference={}", self.mode.to_mongodb())];
848
849 if let Some(staleness) = self.max_staleness_seconds {
850 opts.push(format!("maxStalenessSeconds={}", staleness));
851 }
852
853 opts.join("&")
854 }
855
856 pub fn to_command_options(&self) -> JsonValue {
858 let mut opts = serde_json::Map::new();
859 opts.insert("mode".to_string(), serde_json::json!(self.mode.to_mongodb()));
860
861 if let Some(staleness) = self.max_staleness_seconds {
862 opts.insert("maxStalenessSeconds".to_string(), serde_json::json!(staleness));
863 }
864
865 if !self.tag_sets.is_empty() {
866 opts.insert("tagSets".to_string(), serde_json::json!(self.tag_sets));
867 }
868
869 if let Some(hedge) = self.hedge {
870 opts.insert("hedge".to_string(), serde_json::json!({ "enabled": hedge }));
871 }
872
873 serde_json::json!(opts)
874 }
875 }
876
877 #[derive(Debug, Clone, Serialize, Deserialize)]
879 pub struct ReplicaSetStatus {
880 pub set: String,
882 pub members: Vec<MemberStatus>,
884 }
885
886 #[derive(Debug, Clone, Serialize, Deserialize)]
888 pub struct MemberStatus {
889 pub id: u32,
891 pub name: String,
893 pub state_str: String,
895 pub health: f64,
897 #[serde(default)]
899 pub lag_seconds: Option<i64>,
900 }
901
902 impl MemberStatus {
903 pub fn is_primary(&self) -> bool {
905 self.state_str == "PRIMARY"
906 }
907
908 pub fn is_secondary(&self) -> bool {
910 self.state_str == "SECONDARY"
911 }
912
913 pub fn is_healthy(&self) -> bool {
915 self.health >= 1.0
916 }
917 }
918}
919
920#[cfg(test)]
921mod tests {
922 use super::*;
923
924 #[test]
925 fn test_replica_config() {
926 let primary = ReplicaConfig::primary("pg1", "postgres://primary:5432/db")
927 .with_region("us-east-1");
928
929 assert_eq!(primary.role, ReplicaRole::Primary);
930 assert_eq!(primary.region.as_deref(), Some("us-east-1"));
931 }
932
933 #[test]
934 fn test_replica_set_builder() {
935 let config = ReplicaSetConfig::new("myapp")
936 .primary("pg1", "postgres://primary:5432/db")
937 .secondary("pg2", "postgres://secondary1:5432/db")
938 .secondary("pg3", "postgres://secondary2:5432/db")
939 .read_preference(ReadPreference::SecondaryPreferred)
940 .build();
941
942 assert_eq!(config.name, "myapp");
943 assert_eq!(config.replicas.len(), 3);
944 assert!(config.primary().is_some());
945 assert_eq!(config.secondaries().count(), 2);
946 }
947
948 #[test]
949 fn test_read_preference_mongodb() {
950 assert_eq!(ReadPreference::Primary.to_mongodb(), "primary");
951 assert_eq!(ReadPreference::SecondaryPreferred.to_mongodb(), "secondaryPreferred");
952 assert_eq!(ReadPreference::Nearest.to_mongodb(), "nearest");
953 }
954
955 #[test]
956 fn test_connection_router_write() {
957 let config = ReplicaSetConfig::new("test")
958 .primary("pg1", "postgres://primary:5432/db")
959 .secondary("pg2", "postgres://secondary:5432/db")
960 .build();
961
962 let mut router = ConnectionRouter::new(config);
963
964 router.update_health("pg1", HealthStatus::Healthy, Some(Duration::from_millis(5)), None);
966 router.update_health("pg2", HealthStatus::Healthy, Some(Duration::from_millis(10)), Some(Duration::from_secs(1)));
967
968 let target = router.route(QueryType::Write, None).unwrap();
970 assert_eq!(target.id, "pg1");
971 }
972
973 #[test]
974 fn test_connection_router_read_secondary() {
975 let config = ReplicaSetConfig::new("test")
976 .primary("pg1", "postgres://primary:5432/db")
977 .secondary("pg2", "postgres://secondary:5432/db")
978 .read_preference(ReadPreference::Secondary)
979 .build();
980
981 let mut router = ConnectionRouter::new(config);
982 router.update_health("pg1", HealthStatus::Healthy, Some(Duration::from_millis(5)), None);
983 router.update_health("pg2", HealthStatus::Healthy, Some(Duration::from_millis(10)), Some(Duration::from_secs(1)));
984
985 let target = router.route(QueryType::Read, None).unwrap();
987 assert_eq!(target.id, "pg2");
988 }
989
990 #[test]
991 fn test_lag_monitor() {
992 let mut monitor = LagMonitor::new(Duration::from_secs(10));
993
994 monitor.record("pg2", Duration::from_secs(5));
995 assert!(monitor.is_acceptable("pg2"));
996
997 monitor.record("pg3", Duration::from_secs(15));
998 assert!(!monitor.is_acceptable("pg3"));
999
1000 let lagging = monitor.get_lagging_replicas();
1001 assert_eq!(lagging, vec!["pg3"]);
1002 }
1003
1004 #[test]
1005 fn test_failover() {
1006 let config = ReplicaSetConfig::new("test")
1007 .primary("pg1", "postgres://primary:5432/db")
1008 .replica(ReplicaConfig::secondary("pg2", "postgres://secondary1:5432/db").with_priority(80))
1009 .replica(ReplicaConfig::secondary("pg3", "postgres://secondary2:5432/db").with_priority(60))
1010 .build();
1011
1012 let mut router = ConnectionRouter::new(config);
1013 router.update_health("pg1", HealthStatus::Unhealthy, None, None);
1014 router.update_health("pg2", HealthStatus::Healthy, Some(Duration::from_millis(10)), None);
1015 router.update_health("pg3", HealthStatus::Healthy, Some(Duration::from_millis(15)), None);
1016
1017 let new_primary = router.initiate_failover().unwrap();
1018 assert_eq!(new_primary, "pg2"); }
1020
1021 mod mongodb_tests {
1022 use super::super::mongodb::*;
1023 use super::*;
1024
1025 #[test]
1026 fn test_read_concern() {
1027 assert_eq!(ReadConcern::Majority.as_str(), "majority");
1028 assert_eq!(ReadConcern::Local.as_str(), "local");
1029 }
1030
1031 #[test]
1032 fn test_write_concern() {
1033 let w = WriteConcern::Majority;
1034 let opts = w.to_options();
1035 assert_eq!(opts["w"], "majority");
1036
1037 let w2 = WriteConcern::W(3);
1038 let opts2 = w2.to_options();
1039 assert_eq!(opts2["w"], 3);
1040 }
1041
1042 #[test]
1043 fn test_mongo_read_preference() {
1044 let pref = MongoReadPreference::new(ReadPreference::SecondaryPreferred)
1045 .max_staleness(90)
1046 .hedged();
1047
1048 let conn_opts = pref.to_connection_options();
1049 assert!(conn_opts.contains("readPreference=secondaryPreferred"));
1050 assert!(conn_opts.contains("maxStalenessSeconds=90"));
1051
1052 let cmd_opts = pref.to_command_options();
1053 assert_eq!(cmd_opts["mode"], "secondaryPreferred");
1054 assert_eq!(cmd_opts["maxStalenessSeconds"], 90);
1055 }
1056 }
1057}
1058