1use std::collections::HashMap;
17use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
18use std::time::{Duration, Instant};
19
20use serde::{Deserialize, Serialize};
21
22use crate::error::{QueryError, QueryResult};
23
24#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
30pub struct ReplicaConfig {
31 pub id: String,
33 pub url: String,
35 pub role: ReplicaRole,
37 pub priority: u32,
39 pub weight: u32,
41 pub region: Option<String>,
43 pub max_lag: Option<Duration>,
45}
46
47#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
49pub enum ReplicaRole {
50 Primary,
52 Secondary,
54 Arbiter,
56 Hidden,
58}
59
60impl ReplicaConfig {
61 pub fn primary(id: impl Into<String>, url: impl Into<String>) -> Self {
63 Self {
64 id: id.into(),
65 url: url.into(),
66 role: ReplicaRole::Primary,
67 priority: 100,
68 weight: 100,
69 region: None,
70 max_lag: None,
71 }
72 }
73
74 pub fn secondary(id: impl Into<String>, url: impl Into<String>) -> Self {
76 Self {
77 id: id.into(),
78 url: url.into(),
79 role: ReplicaRole::Secondary,
80 priority: 50,
81 weight: 100,
82 region: None,
83 max_lag: Some(Duration::from_secs(10)),
84 }
85 }
86
87 pub fn with_region(mut self, region: impl Into<String>) -> Self {
89 self.region = Some(region.into());
90 self
91 }
92
93 pub fn with_weight(mut self, weight: u32) -> Self {
95 self.weight = weight;
96 self
97 }
98
99 pub fn with_priority(mut self, priority: u32) -> Self {
101 self.priority = priority;
102 self
103 }
104
105 pub fn with_max_lag(mut self, lag: Duration) -> Self {
107 self.max_lag = Some(lag);
108 self
109 }
110}
111
112#[derive(Debug, Clone, Serialize, Deserialize)]
118pub struct ReplicaSetConfig {
119 pub name: String,
121 pub replicas: Vec<ReplicaConfig>,
123 pub default_read_preference: ReadPreference,
125 pub health_check_interval: Duration,
127 pub failover_timeout: Duration,
129}
130
131impl ReplicaSetConfig {
132 pub fn new(name: impl Into<String>) -> ReplicaSetBuilder {
134 ReplicaSetBuilder::new(name)
135 }
136
137 pub fn primary(&self) -> Option<&ReplicaConfig> {
139 self.replicas
140 .iter()
141 .find(|r| r.role == ReplicaRole::Primary)
142 }
143
144 pub fn secondaries(&self) -> impl Iterator<Item = &ReplicaConfig> {
146 self.replicas
147 .iter()
148 .filter(|r| r.role == ReplicaRole::Secondary)
149 }
150
151 pub fn in_region(&self, region: &str) -> impl Iterator<Item = &ReplicaConfig> {
153 self.replicas
154 .iter()
155 .filter(move |r| r.region.as_deref() == Some(region))
156 }
157}
158
159#[derive(Debug, Clone)]
161pub struct ReplicaSetBuilder {
162 name: String,
163 replicas: Vec<ReplicaConfig>,
164 default_read_preference: ReadPreference,
165 health_check_interval: Duration,
166 failover_timeout: Duration,
167}
168
169impl ReplicaSetBuilder {
170 pub fn new(name: impl Into<String>) -> Self {
172 Self {
173 name: name.into(),
174 replicas: Vec::new(),
175 default_read_preference: ReadPreference::Primary,
176 health_check_interval: Duration::from_secs(10),
177 failover_timeout: Duration::from_secs(30),
178 }
179 }
180
181 pub fn replica(mut self, config: ReplicaConfig) -> Self {
183 self.replicas.push(config);
184 self
185 }
186
187 pub fn primary(self, id: impl Into<String>, url: impl Into<String>) -> Self {
189 self.replica(ReplicaConfig::primary(id, url))
190 }
191
192 pub fn secondary(self, id: impl Into<String>, url: impl Into<String>) -> Self {
194 self.replica(ReplicaConfig::secondary(id, url))
195 }
196
197 pub fn read_preference(mut self, pref: ReadPreference) -> Self {
199 self.default_read_preference = pref;
200 self
201 }
202
203 pub fn health_check_interval(mut self, interval: Duration) -> Self {
205 self.health_check_interval = interval;
206 self
207 }
208
209 pub fn failover_timeout(mut self, timeout: Duration) -> Self {
211 self.failover_timeout = timeout;
212 self
213 }
214
215 pub fn build(self) -> ReplicaSetConfig {
217 ReplicaSetConfig {
218 name: self.name,
219 replicas: self.replicas,
220 default_read_preference: self.default_read_preference,
221 health_check_interval: self.health_check_interval,
222 failover_timeout: self.failover_timeout,
223 }
224 }
225}
226
227#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
233pub enum ReadPreference {
234 #[default]
236 Primary,
237 PrimaryPreferred,
239 Secondary,
241 SecondaryPreferred,
243 Nearest,
245 Region(String),
247 TagSet(Vec<HashMap<String, String>>),
249}
250
251impl ReadPreference {
252 pub fn region(region: impl Into<String>) -> Self {
254 Self::Region(region.into())
255 }
256
257 pub fn tag_set(tags: Vec<HashMap<String, String>>) -> Self {
259 Self::TagSet(tags)
260 }
261
262 pub fn to_mongodb(&self) -> &'static str {
264 match self {
265 Self::Primary => "primary",
266 Self::PrimaryPreferred => "primaryPreferred",
267 Self::Secondary => "secondary",
268 Self::SecondaryPreferred => "secondaryPreferred",
269 Self::Nearest => "nearest",
270 Self::Region(_) | Self::TagSet(_) => "nearest",
271 }
272 }
273
274 pub fn allows_primary(&self) -> bool {
276 matches!(
277 self,
278 Self::Primary
279 | Self::PrimaryPreferred
280 | Self::Nearest
281 | Self::Region(_)
282 | Self::TagSet(_)
283 )
284 }
285
286 pub fn allows_secondary(&self) -> bool {
288 !matches!(self, Self::Primary)
289 }
290}
291
292#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
298pub enum HealthStatus {
299 Healthy,
301 Degraded,
303 Unhealthy,
305 Unknown,
307}
308
309#[derive(Debug, Clone)]
311pub struct ReplicaHealth {
312 pub id: String,
314 pub status: HealthStatus,
316 pub lag: Option<Duration>,
318 pub last_check: Option<Instant>,
320 pub latency: Option<Duration>,
322 pub consecutive_failures: u32,
324}
325
326impl ReplicaHealth {
327 pub fn new(id: impl Into<String>) -> Self {
329 Self {
330 id: id.into(),
331 status: HealthStatus::Unknown,
332 lag: None,
333 last_check: None,
334 latency: None,
335 consecutive_failures: 0,
336 }
337 }
338
339 pub fn mark_healthy(&mut self, latency: Duration, lag: Option<Duration>) {
341 self.status = HealthStatus::Healthy;
342 self.latency = Some(latency);
343 self.lag = lag;
344 self.last_check = Some(Instant::now());
345 self.consecutive_failures = 0;
346 }
347
348 pub fn mark_degraded(&mut self, reason: &str) {
350 self.status = HealthStatus::Degraded;
351 self.last_check = Some(Instant::now());
352 let _ = reason; }
354
355 pub fn mark_unhealthy(&mut self) {
357 self.status = HealthStatus::Unhealthy;
358 self.last_check = Some(Instant::now());
359 self.consecutive_failures += 1;
360 }
361
362 pub fn is_usable(&self) -> bool {
364 matches!(self.status, HealthStatus::Healthy | HealthStatus::Degraded)
365 }
366}
367
368#[derive(Debug, Clone, Copy, PartialEq, Eq)]
374pub enum QueryType {
375 Read,
377 Write,
379 Transaction,
381}
382
383#[derive(Debug)]
385pub struct ConnectionRouter {
386 config: ReplicaSetConfig,
388 health: HashMap<String, ReplicaHealth>,
390 current_primary: Option<String>,
392 round_robin: AtomicUsize,
394 in_failover: AtomicBool,
396}
397
398impl ConnectionRouter {
399 pub fn new(config: ReplicaSetConfig) -> Self {
401 let mut health = HashMap::new();
402 let mut primary_id = None;
403
404 for replica in &config.replicas {
405 health.insert(replica.id.clone(), ReplicaHealth::new(&replica.id));
406 if replica.role == ReplicaRole::Primary {
407 primary_id = Some(replica.id.clone());
408 }
409 }
410
411 Self {
412 config,
413 health,
414 current_primary: primary_id,
415 round_robin: AtomicUsize::new(0),
416 in_failover: AtomicBool::new(false),
417 }
418 }
419
420 pub fn route(
422 &self,
423 query_type: QueryType,
424 preference: Option<&ReadPreference>,
425 ) -> QueryResult<&ReplicaConfig> {
426 let pref = preference.unwrap_or(&self.config.default_read_preference);
427
428 match query_type {
429 QueryType::Write | QueryType::Transaction => self.get_primary(),
430 QueryType::Read => self.route_read(pref),
431 }
432 }
433
434 pub fn get_primary(&self) -> QueryResult<&ReplicaConfig> {
436 let primary_id = self
437 .current_primary
438 .as_ref()
439 .ok_or_else(|| QueryError::connection("No primary replica available"))?;
440
441 self.config
442 .replicas
443 .iter()
444 .find(|r| &r.id == primary_id)
445 .ok_or_else(|| QueryError::connection("Primary replica not found"))
446 }
447
448 fn route_read(&self, preference: &ReadPreference) -> QueryResult<&ReplicaConfig> {
450 match preference {
451 ReadPreference::Primary => self.get_primary(),
452 ReadPreference::PrimaryPreferred => {
453 self.get_primary().or_else(|_| self.get_any_secondary())
454 }
455 ReadPreference::Secondary => self.get_any_secondary(),
456 ReadPreference::SecondaryPreferred => {
457 self.get_any_secondary().or_else(|_| self.get_primary())
458 }
459 ReadPreference::Nearest => self.get_nearest(),
460 ReadPreference::Region(region) => self.get_in_region(region),
461 ReadPreference::TagSet(_tags) => {
462 self.get_nearest()
464 }
465 }
466 }
467
468 fn get_any_secondary(&self) -> QueryResult<&ReplicaConfig> {
470 let secondaries: Vec<_> = self
471 .config
472 .secondaries()
473 .filter(|r| self.is_replica_healthy(&r.id))
474 .collect();
475
476 if secondaries.is_empty() {
477 return Err(QueryError::connection(
478 "No healthy secondary replicas available",
479 ));
480 }
481
482 let idx = self.round_robin.fetch_add(1, Ordering::Relaxed) % secondaries.len();
484 Ok(secondaries[idx])
485 }
486
487 fn get_nearest(&self) -> QueryResult<&ReplicaConfig> {
489 let mut best: Option<(&ReplicaConfig, Duration)> = None;
490
491 for replica in &self.config.replicas {
492 if !self.is_replica_healthy(&replica.id) {
493 continue;
494 }
495
496 if let Some(health) = self.health.get(&replica.id)
497 && let Some(latency) = health.latency
498 {
499 match &best {
500 None => best = Some((replica, latency)),
501 Some((_, best_latency)) if latency < *best_latency => {
502 best = Some((replica, latency));
503 }
504 _ => {}
505 }
506 }
507 }
508
509 best.map(|(r, _)| r)
510 .ok_or_else(|| QueryError::connection("No healthy replicas available"))
511 }
512
513 fn get_in_region(&self, region: &str) -> QueryResult<&ReplicaConfig> {
515 let replicas: Vec<_> = self
516 .config
517 .in_region(region)
518 .filter(|r| self.is_replica_healthy(&r.id))
519 .collect();
520
521 if replicas.is_empty() {
522 return self.get_nearest();
524 }
525
526 let idx = self.round_robin.fetch_add(1, Ordering::Relaxed) % replicas.len();
527 Ok(replicas[idx])
528 }
529
530 fn is_replica_healthy(&self, id: &str) -> bool {
532 self.health.get(id).map(|h| h.is_usable()).unwrap_or(false)
533 }
534
535 pub fn update_health(
537 &mut self,
538 id: &str,
539 status: HealthStatus,
540 latency: Option<Duration>,
541 lag: Option<Duration>,
542 ) {
543 if let Some(health) = self.health.get_mut(id) {
544 match status {
545 HealthStatus::Healthy => {
546 health.mark_healthy(latency.unwrap_or(Duration::ZERO), lag);
547 }
548 HealthStatus::Degraded => {
549 health.mark_degraded("degraded");
550 }
551 HealthStatus::Unhealthy => {
552 health.mark_unhealthy();
553 }
554 HealthStatus::Unknown => {}
555 }
556 }
557 }
558
559 pub fn check_lag(&self, replica_id: &str, max_lag: Duration) -> bool {
561 self.health
562 .get(replica_id)
563 .and_then(|h| h.lag)
564 .map(|lag| lag <= max_lag)
565 .unwrap_or(false)
566 }
567
568 pub fn initiate_failover(&mut self) -> QueryResult<String> {
570 self.in_failover.store(true, Ordering::SeqCst);
571
572 let candidate = self
574 .config
575 .replicas
576 .iter()
577 .filter(|r| r.role == ReplicaRole::Secondary)
578 .filter(|r| self.is_replica_healthy(&r.id))
579 .max_by_key(|r| r.priority);
580
581 match candidate {
582 Some(new_primary) => {
583 let new_primary_id = new_primary.id.clone();
584 self.current_primary = Some(new_primary_id.clone());
585 self.in_failover.store(false, Ordering::SeqCst);
586 Ok(new_primary_id)
587 }
588 None => {
589 self.in_failover.store(false, Ordering::SeqCst);
590 Err(QueryError::connection(
591 "No suitable failover candidate found",
592 ))
593 }
594 }
595 }
596
597 pub fn is_in_failover(&self) -> bool {
599 self.in_failover.load(Ordering::SeqCst)
600 }
601}
602
603#[derive(Debug)]
609pub struct LagMonitor {
610 measurements: HashMap<String, LagMeasurement>,
612 max_acceptable_lag: Duration,
614}
615
616#[derive(Debug, Clone)]
618pub struct LagMeasurement {
619 pub current: Duration,
621 pub average: Duration,
623 pub max: Duration,
625 pub timestamp: Instant,
627 pub samples: u64,
629}
630
631impl LagMonitor {
632 pub fn new(max_acceptable_lag: Duration) -> Self {
634 Self {
635 measurements: HashMap::new(),
636 max_acceptable_lag,
637 }
638 }
639
640 pub fn record(&mut self, replica_id: &str, lag: Duration) {
642 let entry = self
643 .measurements
644 .entry(replica_id.to_string())
645 .or_insert_with(|| LagMeasurement {
646 current: Duration::ZERO,
647 average: Duration::ZERO,
648 max: Duration::ZERO,
649 timestamp: Instant::now(),
650 samples: 0,
651 });
652
653 entry.current = lag;
654 entry.max = entry.max.max(lag);
655 entry.samples += 1;
656
657 let alpha = 0.3;
659 let new_avg = Duration::from_secs_f64(
660 entry.average.as_secs_f64() * (1.0 - alpha) + lag.as_secs_f64() * alpha,
661 );
662 entry.average = new_avg;
663 entry.timestamp = Instant::now();
664 }
665
666 pub fn is_acceptable(&self, replica_id: &str) -> bool {
668 self.measurements
669 .get(replica_id)
670 .map(|m| m.current <= self.max_acceptable_lag)
671 .unwrap_or(true) }
673
674 pub fn get_lag(&self, replica_id: &str) -> Option<Duration> {
676 self.measurements.get(replica_id).map(|m| m.current)
677 }
678
679 pub fn get_lagging_replicas(&self) -> Vec<&str> {
681 self.measurements
682 .iter()
683 .filter(|(_, m)| m.current > self.max_acceptable_lag)
684 .map(|(id, _)| id.as_str())
685 .collect()
686 }
687}
688
689pub mod lag_queries {
695 use crate::sql::DatabaseType;
696
697 pub fn check_lag_sql(db_type: DatabaseType) -> &'static str {
699 match db_type {
700 DatabaseType::PostgreSQL => {
701 "SELECT EXTRACT(EPOCH FROM (now() - pg_last_xact_replay_timestamp()))::INT AS lag_seconds"
703 }
704 DatabaseType::MySQL => {
705 "SHOW SLAVE STATUS"
707 }
708 DatabaseType::MSSQL => {
709 "SELECT datediff(s, last_commit_time, getdate()) AS lag_seconds \
711 FROM sys.dm_hadr_database_replica_states \
712 WHERE is_local = 1"
713 }
714 DatabaseType::SQLite => {
715 "SELECT 0 AS lag_seconds"
717 }
718 }
719 }
720
721 pub fn is_primary_sql(db_type: DatabaseType) -> &'static str {
723 match db_type {
724 DatabaseType::PostgreSQL => "SELECT NOT pg_is_in_recovery() AS is_primary",
725 DatabaseType::MySQL => "SELECT @@read_only = 0 AS is_primary",
726 DatabaseType::MSSQL => {
727 "SELECT CASE WHEN role = 1 THEN 1 ELSE 0 END AS is_primary \
728 FROM sys.dm_hadr_availability_replica_states \
729 WHERE is_local = 1"
730 }
731 DatabaseType::SQLite => "SELECT 1 AS is_primary",
732 }
733 }
734
735 pub fn replica_status_sql(db_type: DatabaseType) -> &'static str {
737 match db_type {
738 DatabaseType::PostgreSQL => {
739 "SELECT \
740 pg_is_in_recovery() AS is_replica, \
741 pg_last_wal_receive_lsn() AS receive_lsn, \
742 pg_last_wal_replay_lsn() AS replay_lsn"
743 }
744 DatabaseType::MySQL => "SHOW REPLICA STATUS",
745 DatabaseType::MSSQL => {
746 "SELECT synchronization_state_desc, synchronization_health_desc \
747 FROM sys.dm_hadr_database_replica_states \
748 WHERE is_local = 1"
749 }
750 DatabaseType::SQLite => "SELECT 'primary' AS status",
751 }
752 }
753}
754
755pub mod mongodb {
761 use serde::{Deserialize, Serialize};
762 use serde_json::Value as JsonValue;
763
764 use super::ReadPreference;
765
766 #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
768 pub enum ReadConcern {
769 Local,
771 Majority,
773 Linearizable,
775 Snapshot,
777 Available,
779 }
780
781 impl ReadConcern {
782 pub fn as_str(&self) -> &'static str {
784 match self {
785 Self::Local => "local",
786 Self::Majority => "majority",
787 Self::Linearizable => "linearizable",
788 Self::Snapshot => "snapshot",
789 Self::Available => "available",
790 }
791 }
792 }
793
794 #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
796 pub enum WriteConcern {
797 W1,
799 Majority,
801 W(u32),
803 Tag(String),
805 }
806
807 impl WriteConcern {
808 pub fn to_options(&self) -> JsonValue {
810 match self {
811 Self::W1 => serde_json::json!({ "w": 1 }),
812 Self::Majority => serde_json::json!({ "w": "majority" }),
813 Self::W(n) => serde_json::json!({ "w": n }),
814 Self::Tag(tag) => serde_json::json!({ "w": tag }),
815 }
816 }
817 }
818
819 #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
821 pub struct MongoReadPreference {
822 pub mode: ReadPreference,
824 pub max_staleness_seconds: Option<u32>,
826 pub tag_sets: Vec<serde_json::Map<String, JsonValue>>,
828 pub hedge: Option<bool>,
830 }
831
832 impl MongoReadPreference {
833 pub fn new(mode: ReadPreference) -> Self {
835 Self {
836 mode,
837 max_staleness_seconds: None,
838 tag_sets: Vec::new(),
839 hedge: None,
840 }
841 }
842
843 pub fn max_staleness(mut self, seconds: u32) -> Self {
845 self.max_staleness_seconds = Some(seconds);
846 self
847 }
848
849 pub fn tag_set(mut self, tags: serde_json::Map<String, JsonValue>) -> Self {
851 self.tag_sets.push(tags);
852 self
853 }
854
855 pub fn hedged(mut self) -> Self {
857 self.hedge = Some(true);
858 self
859 }
860
861 pub fn to_connection_options(&self) -> String {
863 let mut opts = vec![format!("readPreference={}", self.mode.to_mongodb())];
864
865 if let Some(staleness) = self.max_staleness_seconds {
866 opts.push(format!("maxStalenessSeconds={}", staleness));
867 }
868
869 opts.join("&")
870 }
871
872 pub fn to_command_options(&self) -> JsonValue {
874 let mut opts = serde_json::Map::new();
875 opts.insert(
876 "mode".to_string(),
877 serde_json::json!(self.mode.to_mongodb()),
878 );
879
880 if let Some(staleness) = self.max_staleness_seconds {
881 opts.insert(
882 "maxStalenessSeconds".to_string(),
883 serde_json::json!(staleness),
884 );
885 }
886
887 if !self.tag_sets.is_empty() {
888 opts.insert("tagSets".to_string(), serde_json::json!(self.tag_sets));
889 }
890
891 if let Some(hedge) = self.hedge {
892 opts.insert("hedge".to_string(), serde_json::json!({ "enabled": hedge }));
893 }
894
895 serde_json::json!(opts)
896 }
897 }
898
899 #[derive(Debug, Clone, Serialize, Deserialize)]
901 pub struct ReplicaSetStatus {
902 pub set: String,
904 pub members: Vec<MemberStatus>,
906 }
907
908 #[derive(Debug, Clone, Serialize, Deserialize)]
910 pub struct MemberStatus {
911 pub id: u32,
913 pub name: String,
915 pub state_str: String,
917 pub health: f64,
919 #[serde(default)]
921 pub lag_seconds: Option<i64>,
922 }
923
924 impl MemberStatus {
925 pub fn is_primary(&self) -> bool {
927 self.state_str == "PRIMARY"
928 }
929
930 pub fn is_secondary(&self) -> bool {
932 self.state_str == "SECONDARY"
933 }
934
935 pub fn is_healthy(&self) -> bool {
937 self.health >= 1.0
938 }
939 }
940}
941
942#[cfg(test)]
943mod tests {
944 use super::*;
945
946 #[test]
947 fn test_replica_config() {
948 let primary =
949 ReplicaConfig::primary("pg1", "postgres://primary:5432/db").with_region("us-east-1");
950
951 assert_eq!(primary.role, ReplicaRole::Primary);
952 assert_eq!(primary.region.as_deref(), Some("us-east-1"));
953 }
954
955 #[test]
956 fn test_replica_set_builder() {
957 let config = ReplicaSetConfig::new("myapp")
958 .primary("pg1", "postgres://primary:5432/db")
959 .secondary("pg2", "postgres://secondary1:5432/db")
960 .secondary("pg3", "postgres://secondary2:5432/db")
961 .read_preference(ReadPreference::SecondaryPreferred)
962 .build();
963
964 assert_eq!(config.name, "myapp");
965 assert_eq!(config.replicas.len(), 3);
966 assert!(config.primary().is_some());
967 assert_eq!(config.secondaries().count(), 2);
968 }
969
970 #[test]
971 fn test_read_preference_mongodb() {
972 assert_eq!(ReadPreference::Primary.to_mongodb(), "primary");
973 assert_eq!(
974 ReadPreference::SecondaryPreferred.to_mongodb(),
975 "secondaryPreferred"
976 );
977 assert_eq!(ReadPreference::Nearest.to_mongodb(), "nearest");
978 }
979
980 #[test]
981 fn test_connection_router_write() {
982 let config = ReplicaSetConfig::new("test")
983 .primary("pg1", "postgres://primary:5432/db")
984 .secondary("pg2", "postgres://secondary:5432/db")
985 .build();
986
987 let mut router = ConnectionRouter::new(config);
988
989 router.update_health(
991 "pg1",
992 HealthStatus::Healthy,
993 Some(Duration::from_millis(5)),
994 None,
995 );
996 router.update_health(
997 "pg2",
998 HealthStatus::Healthy,
999 Some(Duration::from_millis(10)),
1000 Some(Duration::from_secs(1)),
1001 );
1002
1003 let target = router.route(QueryType::Write, None).unwrap();
1005 assert_eq!(target.id, "pg1");
1006 }
1007
1008 #[test]
1009 fn test_connection_router_read_secondary() {
1010 let config = ReplicaSetConfig::new("test")
1011 .primary("pg1", "postgres://primary:5432/db")
1012 .secondary("pg2", "postgres://secondary:5432/db")
1013 .read_preference(ReadPreference::Secondary)
1014 .build();
1015
1016 let mut router = ConnectionRouter::new(config);
1017 router.update_health(
1018 "pg1",
1019 HealthStatus::Healthy,
1020 Some(Duration::from_millis(5)),
1021 None,
1022 );
1023 router.update_health(
1024 "pg2",
1025 HealthStatus::Healthy,
1026 Some(Duration::from_millis(10)),
1027 Some(Duration::from_secs(1)),
1028 );
1029
1030 let target = router.route(QueryType::Read, None).unwrap();
1032 assert_eq!(target.id, "pg2");
1033 }
1034
1035 #[test]
1036 fn test_lag_monitor() {
1037 let mut monitor = LagMonitor::new(Duration::from_secs(10));
1038
1039 monitor.record("pg2", Duration::from_secs(5));
1040 assert!(monitor.is_acceptable("pg2"));
1041
1042 monitor.record("pg3", Duration::from_secs(15));
1043 assert!(!monitor.is_acceptable("pg3"));
1044
1045 let lagging = monitor.get_lagging_replicas();
1046 assert_eq!(lagging, vec!["pg3"]);
1047 }
1048
1049 #[test]
1050 fn test_failover() {
1051 let config = ReplicaSetConfig::new("test")
1052 .primary("pg1", "postgres://primary:5432/db")
1053 .replica(
1054 ReplicaConfig::secondary("pg2", "postgres://secondary1:5432/db").with_priority(80),
1055 )
1056 .replica(
1057 ReplicaConfig::secondary("pg3", "postgres://secondary2:5432/db").with_priority(60),
1058 )
1059 .build();
1060
1061 let mut router = ConnectionRouter::new(config);
1062 router.update_health("pg1", HealthStatus::Unhealthy, None, None);
1063 router.update_health(
1064 "pg2",
1065 HealthStatus::Healthy,
1066 Some(Duration::from_millis(10)),
1067 None,
1068 );
1069 router.update_health(
1070 "pg3",
1071 HealthStatus::Healthy,
1072 Some(Duration::from_millis(15)),
1073 None,
1074 );
1075
1076 let new_primary = router.initiate_failover().unwrap();
1077 assert_eq!(new_primary, "pg2"); }
1079
1080 mod mongodb_tests {
1081 use super::super::mongodb::*;
1082 use super::*;
1083
1084 #[test]
1085 fn test_read_concern() {
1086 assert_eq!(ReadConcern::Majority.as_str(), "majority");
1087 assert_eq!(ReadConcern::Local.as_str(), "local");
1088 }
1089
1090 #[test]
1091 fn test_write_concern() {
1092 let w = WriteConcern::Majority;
1093 let opts = w.to_options();
1094 assert_eq!(opts["w"], "majority");
1095
1096 let w2 = WriteConcern::W(3);
1097 let opts2 = w2.to_options();
1098 assert_eq!(opts2["w"], 3);
1099 }
1100
1101 #[test]
1102 fn test_mongo_read_preference() {
1103 let pref = MongoReadPreference::new(ReadPreference::SecondaryPreferred)
1104 .max_staleness(90)
1105 .hedged();
1106
1107 let conn_opts = pref.to_connection_options();
1108 assert!(conn_opts.contains("readPreference=secondaryPreferred"));
1109 assert!(conn_opts.contains("maxStalenessSeconds=90"));
1110
1111 let cmd_opts = pref.to_command_options();
1112 assert_eq!(cmd_opts["mode"], "secondaryPreferred");
1113 assert_eq!(cmd_opts["maxStalenessSeconds"], 90);
1114 }
1115 }
1116}