1use std::collections::HashMap;
17use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
18use std::time::{Duration, Instant};
19
20use serde::{Deserialize, Serialize};
21
22use crate::error::{QueryError, QueryResult};
23
24#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
30pub struct ReplicaConfig {
31 pub id: String,
33 pub url: String,
35 pub role: ReplicaRole,
37 pub priority: u32,
39 pub weight: u32,
41 pub region: Option<String>,
43 pub max_lag: Option<Duration>,
45}
46
47#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
49pub enum ReplicaRole {
50 Primary,
52 Secondary,
54 Arbiter,
56 Hidden,
58}
59
60impl ReplicaConfig {
61 pub fn primary(id: impl Into<String>, url: impl Into<String>) -> Self {
63 Self {
64 id: id.into(),
65 url: url.into(),
66 role: ReplicaRole::Primary,
67 priority: 100,
68 weight: 100,
69 region: None,
70 max_lag: None,
71 }
72 }
73
74 pub fn secondary(id: impl Into<String>, url: impl Into<String>) -> Self {
76 Self {
77 id: id.into(),
78 url: url.into(),
79 role: ReplicaRole::Secondary,
80 priority: 50,
81 weight: 100,
82 region: None,
83 max_lag: Some(Duration::from_secs(10)),
84 }
85 }
86
87 pub fn with_region(mut self, region: impl Into<String>) -> Self {
89 self.region = Some(region.into());
90 self
91 }
92
93 pub fn with_weight(mut self, weight: u32) -> Self {
95 self.weight = weight;
96 self
97 }
98
99 pub fn with_priority(mut self, priority: u32) -> Self {
101 self.priority = priority;
102 self
103 }
104
105 pub fn with_max_lag(mut self, lag: Duration) -> Self {
107 self.max_lag = Some(lag);
108 self
109 }
110}
111
112#[derive(Debug, Clone, Serialize, Deserialize)]
118pub struct ReplicaSetConfig {
119 pub name: String,
121 pub replicas: Vec<ReplicaConfig>,
123 pub default_read_preference: ReadPreference,
125 pub health_check_interval: Duration,
127 pub failover_timeout: Duration,
129}
130
131impl ReplicaSetConfig {
132 pub fn new(name: impl Into<String>) -> ReplicaSetBuilder {
134 ReplicaSetBuilder::new(name)
135 }
136
137 pub fn primary(&self) -> Option<&ReplicaConfig> {
139 self.replicas
140 .iter()
141 .find(|r| r.role == ReplicaRole::Primary)
142 }
143
144 pub fn secondaries(&self) -> impl Iterator<Item = &ReplicaConfig> {
146 self.replicas
147 .iter()
148 .filter(|r| r.role == ReplicaRole::Secondary)
149 }
150
151 pub fn in_region(&self, region: &str) -> impl Iterator<Item = &ReplicaConfig> {
153 self.replicas
154 .iter()
155 .filter(move |r| r.region.as_deref() == Some(region))
156 }
157}
158
159#[derive(Debug, Clone)]
161pub struct ReplicaSetBuilder {
162 name: String,
163 replicas: Vec<ReplicaConfig>,
164 default_read_preference: ReadPreference,
165 health_check_interval: Duration,
166 failover_timeout: Duration,
167}
168
169impl ReplicaSetBuilder {
170 pub fn new(name: impl Into<String>) -> Self {
172 Self {
173 name: name.into(),
174 replicas: Vec::new(),
175 default_read_preference: ReadPreference::Primary,
176 health_check_interval: Duration::from_secs(10),
177 failover_timeout: Duration::from_secs(30),
178 }
179 }
180
181 pub fn replica(mut self, config: ReplicaConfig) -> Self {
183 self.replicas.push(config);
184 self
185 }
186
187 pub fn primary(self, id: impl Into<String>, url: impl Into<String>) -> Self {
189 self.replica(ReplicaConfig::primary(id, url))
190 }
191
192 pub fn secondary(self, id: impl Into<String>, url: impl Into<String>) -> Self {
194 self.replica(ReplicaConfig::secondary(id, url))
195 }
196
197 pub fn read_preference(mut self, pref: ReadPreference) -> Self {
199 self.default_read_preference = pref;
200 self
201 }
202
203 pub fn health_check_interval(mut self, interval: Duration) -> Self {
205 self.health_check_interval = interval;
206 self
207 }
208
209 pub fn failover_timeout(mut self, timeout: Duration) -> Self {
211 self.failover_timeout = timeout;
212 self
213 }
214
215 pub fn build(self) -> ReplicaSetConfig {
217 ReplicaSetConfig {
218 name: self.name,
219 replicas: self.replicas,
220 default_read_preference: self.default_read_preference,
221 health_check_interval: self.health_check_interval,
222 failover_timeout: self.failover_timeout,
223 }
224 }
225}
226
227#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
233pub enum ReadPreference {
234 Primary,
236 PrimaryPreferred,
238 Secondary,
240 SecondaryPreferred,
242 Nearest,
244 Region(String),
246 TagSet(Vec<HashMap<String, String>>),
248}
249
250impl ReadPreference {
251 pub fn region(region: impl Into<String>) -> Self {
253 Self::Region(region.into())
254 }
255
256 pub fn tag_set(tags: Vec<HashMap<String, String>>) -> Self {
258 Self::TagSet(tags)
259 }
260
261 pub fn to_mongodb(&self) -> &'static str {
263 match self {
264 Self::Primary => "primary",
265 Self::PrimaryPreferred => "primaryPreferred",
266 Self::Secondary => "secondary",
267 Self::SecondaryPreferred => "secondaryPreferred",
268 Self::Nearest => "nearest",
269 Self::Region(_) | Self::TagSet(_) => "nearest",
270 }
271 }
272
273 pub fn allows_primary(&self) -> bool {
275 matches!(
276 self,
277 Self::Primary
278 | Self::PrimaryPreferred
279 | Self::Nearest
280 | Self::Region(_)
281 | Self::TagSet(_)
282 )
283 }
284
285 pub fn allows_secondary(&self) -> bool {
287 !matches!(self, Self::Primary)
288 }
289}
290
291impl Default for ReadPreference {
292 fn default() -> Self {
293 Self::Primary
294 }
295}
296
297#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
303pub enum HealthStatus {
304 Healthy,
306 Degraded,
308 Unhealthy,
310 Unknown,
312}
313
314#[derive(Debug, Clone)]
316pub struct ReplicaHealth {
317 pub id: String,
319 pub status: HealthStatus,
321 pub lag: Option<Duration>,
323 pub last_check: Option<Instant>,
325 pub latency: Option<Duration>,
327 pub consecutive_failures: u32,
329}
330
331impl ReplicaHealth {
332 pub fn new(id: impl Into<String>) -> Self {
334 Self {
335 id: id.into(),
336 status: HealthStatus::Unknown,
337 lag: None,
338 last_check: None,
339 latency: None,
340 consecutive_failures: 0,
341 }
342 }
343
344 pub fn mark_healthy(&mut self, latency: Duration, lag: Option<Duration>) {
346 self.status = HealthStatus::Healthy;
347 self.latency = Some(latency);
348 self.lag = lag;
349 self.last_check = Some(Instant::now());
350 self.consecutive_failures = 0;
351 }
352
353 pub fn mark_degraded(&mut self, reason: &str) {
355 self.status = HealthStatus::Degraded;
356 self.last_check = Some(Instant::now());
357 let _ = reason; }
359
360 pub fn mark_unhealthy(&mut self) {
362 self.status = HealthStatus::Unhealthy;
363 self.last_check = Some(Instant::now());
364 self.consecutive_failures += 1;
365 }
366
367 pub fn is_usable(&self) -> bool {
369 matches!(self.status, HealthStatus::Healthy | HealthStatus::Degraded)
370 }
371}
372
373#[derive(Debug, Clone, Copy, PartialEq, Eq)]
379pub enum QueryType {
380 Read,
382 Write,
384 Transaction,
386}
387
388#[derive(Debug)]
390pub struct ConnectionRouter {
391 config: ReplicaSetConfig,
393 health: HashMap<String, ReplicaHealth>,
395 current_primary: Option<String>,
397 round_robin: AtomicUsize,
399 in_failover: AtomicBool,
401}
402
403impl ConnectionRouter {
404 pub fn new(config: ReplicaSetConfig) -> Self {
406 let mut health = HashMap::new();
407 let mut primary_id = None;
408
409 for replica in &config.replicas {
410 health.insert(replica.id.clone(), ReplicaHealth::new(&replica.id));
411 if replica.role == ReplicaRole::Primary {
412 primary_id = Some(replica.id.clone());
413 }
414 }
415
416 Self {
417 config,
418 health,
419 current_primary: primary_id,
420 round_robin: AtomicUsize::new(0),
421 in_failover: AtomicBool::new(false),
422 }
423 }
424
425 pub fn route(
427 &self,
428 query_type: QueryType,
429 preference: Option<&ReadPreference>,
430 ) -> QueryResult<&ReplicaConfig> {
431 let pref = preference.unwrap_or(&self.config.default_read_preference);
432
433 match query_type {
434 QueryType::Write | QueryType::Transaction => self.get_primary(),
435 QueryType::Read => self.route_read(pref),
436 }
437 }
438
439 pub fn get_primary(&self) -> QueryResult<&ReplicaConfig> {
441 let primary_id = self
442 .current_primary
443 .as_ref()
444 .ok_or_else(|| QueryError::connection("No primary replica available"))?;
445
446 self.config
447 .replicas
448 .iter()
449 .find(|r| &r.id == primary_id)
450 .ok_or_else(|| QueryError::connection("Primary replica not found"))
451 }
452
453 fn route_read(&self, preference: &ReadPreference) -> QueryResult<&ReplicaConfig> {
455 match preference {
456 ReadPreference::Primary => self.get_primary(),
457 ReadPreference::PrimaryPreferred => {
458 self.get_primary().or_else(|_| self.get_any_secondary())
459 }
460 ReadPreference::Secondary => self.get_any_secondary(),
461 ReadPreference::SecondaryPreferred => {
462 self.get_any_secondary().or_else(|_| self.get_primary())
463 }
464 ReadPreference::Nearest => self.get_nearest(),
465 ReadPreference::Region(region) => self.get_in_region(region),
466 ReadPreference::TagSet(_tags) => {
467 self.get_nearest()
469 }
470 }
471 }
472
473 fn get_any_secondary(&self) -> QueryResult<&ReplicaConfig> {
475 let secondaries: Vec<_> = self
476 .config
477 .secondaries()
478 .filter(|r| self.is_replica_healthy(&r.id))
479 .collect();
480
481 if secondaries.is_empty() {
482 return Err(QueryError::connection(
483 "No healthy secondary replicas available",
484 ));
485 }
486
487 let idx = self.round_robin.fetch_add(1, Ordering::Relaxed) % secondaries.len();
489 Ok(secondaries[idx])
490 }
491
492 fn get_nearest(&self) -> QueryResult<&ReplicaConfig> {
494 let mut best: Option<(&ReplicaConfig, Duration)> = None;
495
496 for replica in &self.config.replicas {
497 if !self.is_replica_healthy(&replica.id) {
498 continue;
499 }
500
501 if let Some(health) = self.health.get(&replica.id) {
502 if let Some(latency) = health.latency {
503 match &best {
504 None => best = Some((replica, latency)),
505 Some((_, best_latency)) if latency < *best_latency => {
506 best = Some((replica, latency));
507 }
508 _ => {}
509 }
510 }
511 }
512 }
513
514 best.map(|(r, _)| r)
515 .ok_or_else(|| QueryError::connection("No healthy replicas available"))
516 }
517
518 fn get_in_region(&self, region: &str) -> QueryResult<&ReplicaConfig> {
520 let replicas: Vec<_> = self
521 .config
522 .in_region(region)
523 .filter(|r| self.is_replica_healthy(&r.id))
524 .collect();
525
526 if replicas.is_empty() {
527 return self.get_nearest();
529 }
530
531 let idx = self.round_robin.fetch_add(1, Ordering::Relaxed) % replicas.len();
532 Ok(replicas[idx])
533 }
534
535 fn is_replica_healthy(&self, id: &str) -> bool {
537 self.health.get(id).map(|h| h.is_usable()).unwrap_or(false)
538 }
539
540 pub fn update_health(
542 &mut self,
543 id: &str,
544 status: HealthStatus,
545 latency: Option<Duration>,
546 lag: Option<Duration>,
547 ) {
548 if let Some(health) = self.health.get_mut(id) {
549 match status {
550 HealthStatus::Healthy => {
551 health.mark_healthy(latency.unwrap_or(Duration::ZERO), lag);
552 }
553 HealthStatus::Degraded => {
554 health.mark_degraded("degraded");
555 }
556 HealthStatus::Unhealthy => {
557 health.mark_unhealthy();
558 }
559 HealthStatus::Unknown => {}
560 }
561 }
562 }
563
564 pub fn check_lag(&self, replica_id: &str, max_lag: Duration) -> bool {
566 self.health
567 .get(replica_id)
568 .and_then(|h| h.lag)
569 .map(|lag| lag <= max_lag)
570 .unwrap_or(false)
571 }
572
573 pub fn initiate_failover(&mut self) -> QueryResult<String> {
575 self.in_failover.store(true, Ordering::SeqCst);
576
577 let candidate = self
579 .config
580 .replicas
581 .iter()
582 .filter(|r| r.role == ReplicaRole::Secondary)
583 .filter(|r| self.is_replica_healthy(&r.id))
584 .max_by_key(|r| r.priority);
585
586 match candidate {
587 Some(new_primary) => {
588 let new_primary_id = new_primary.id.clone();
589 self.current_primary = Some(new_primary_id.clone());
590 self.in_failover.store(false, Ordering::SeqCst);
591 Ok(new_primary_id)
592 }
593 None => {
594 self.in_failover.store(false, Ordering::SeqCst);
595 Err(QueryError::connection(
596 "No suitable failover candidate found",
597 ))
598 }
599 }
600 }
601
602 pub fn is_in_failover(&self) -> bool {
604 self.in_failover.load(Ordering::SeqCst)
605 }
606}
607
608#[derive(Debug)]
614pub struct LagMonitor {
615 measurements: HashMap<String, LagMeasurement>,
617 max_acceptable_lag: Duration,
619}
620
621#[derive(Debug, Clone)]
623pub struct LagMeasurement {
624 pub current: Duration,
626 pub average: Duration,
628 pub max: Duration,
630 pub timestamp: Instant,
632 pub samples: u64,
634}
635
636impl LagMonitor {
637 pub fn new(max_acceptable_lag: Duration) -> Self {
639 Self {
640 measurements: HashMap::new(),
641 max_acceptable_lag,
642 }
643 }
644
645 pub fn record(&mut self, replica_id: &str, lag: Duration) {
647 let entry = self
648 .measurements
649 .entry(replica_id.to_string())
650 .or_insert_with(|| LagMeasurement {
651 current: Duration::ZERO,
652 average: Duration::ZERO,
653 max: Duration::ZERO,
654 timestamp: Instant::now(),
655 samples: 0,
656 });
657
658 entry.current = lag;
659 entry.max = entry.max.max(lag);
660 entry.samples += 1;
661
662 let alpha = 0.3;
664 let new_avg = Duration::from_secs_f64(
665 entry.average.as_secs_f64() * (1.0 - alpha) + lag.as_secs_f64() * alpha,
666 );
667 entry.average = new_avg;
668 entry.timestamp = Instant::now();
669 }
670
671 pub fn is_acceptable(&self, replica_id: &str) -> bool {
673 self.measurements
674 .get(replica_id)
675 .map(|m| m.current <= self.max_acceptable_lag)
676 .unwrap_or(true) }
678
679 pub fn get_lag(&self, replica_id: &str) -> Option<Duration> {
681 self.measurements.get(replica_id).map(|m| m.current)
682 }
683
684 pub fn get_lagging_replicas(&self) -> Vec<&str> {
686 self.measurements
687 .iter()
688 .filter(|(_, m)| m.current > self.max_acceptable_lag)
689 .map(|(id, _)| id.as_str())
690 .collect()
691 }
692}
693
694pub mod lag_queries {
700 use crate::sql::DatabaseType;
701
702 pub fn check_lag_sql(db_type: DatabaseType) -> &'static str {
704 match db_type {
705 DatabaseType::PostgreSQL => {
706 "SELECT EXTRACT(EPOCH FROM (now() - pg_last_xact_replay_timestamp()))::INT AS lag_seconds"
708 }
709 DatabaseType::MySQL => {
710 "SHOW SLAVE STATUS"
712 }
713 DatabaseType::MSSQL => {
714 "SELECT datediff(s, last_commit_time, getdate()) AS lag_seconds \
716 FROM sys.dm_hadr_database_replica_states \
717 WHERE is_local = 1"
718 }
719 DatabaseType::SQLite => {
720 "SELECT 0 AS lag_seconds"
722 }
723 }
724 }
725
726 pub fn is_primary_sql(db_type: DatabaseType) -> &'static str {
728 match db_type {
729 DatabaseType::PostgreSQL => "SELECT NOT pg_is_in_recovery() AS is_primary",
730 DatabaseType::MySQL => "SELECT @@read_only = 0 AS is_primary",
731 DatabaseType::MSSQL => {
732 "SELECT CASE WHEN role = 1 THEN 1 ELSE 0 END AS is_primary \
733 FROM sys.dm_hadr_availability_replica_states \
734 WHERE is_local = 1"
735 }
736 DatabaseType::SQLite => "SELECT 1 AS is_primary",
737 }
738 }
739
740 pub fn replica_status_sql(db_type: DatabaseType) -> &'static str {
742 match db_type {
743 DatabaseType::PostgreSQL => {
744 "SELECT \
745 pg_is_in_recovery() AS is_replica, \
746 pg_last_wal_receive_lsn() AS receive_lsn, \
747 pg_last_wal_replay_lsn() AS replay_lsn"
748 }
749 DatabaseType::MySQL => "SHOW REPLICA STATUS",
750 DatabaseType::MSSQL => {
751 "SELECT synchronization_state_desc, synchronization_health_desc \
752 FROM sys.dm_hadr_database_replica_states \
753 WHERE is_local = 1"
754 }
755 DatabaseType::SQLite => "SELECT 'primary' AS status",
756 }
757 }
758}
759
760pub mod mongodb {
766 use serde::{Deserialize, Serialize};
767 use serde_json::Value as JsonValue;
768
769 use super::ReadPreference;
770
771 #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
773 pub enum ReadConcern {
774 Local,
776 Majority,
778 Linearizable,
780 Snapshot,
782 Available,
784 }
785
786 impl ReadConcern {
787 pub fn as_str(&self) -> &'static str {
789 match self {
790 Self::Local => "local",
791 Self::Majority => "majority",
792 Self::Linearizable => "linearizable",
793 Self::Snapshot => "snapshot",
794 Self::Available => "available",
795 }
796 }
797 }
798
799 #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
801 pub enum WriteConcern {
802 W1,
804 Majority,
806 W(u32),
808 Tag(String),
810 }
811
812 impl WriteConcern {
813 pub fn to_options(&self) -> JsonValue {
815 match self {
816 Self::W1 => serde_json::json!({ "w": 1 }),
817 Self::Majority => serde_json::json!({ "w": "majority" }),
818 Self::W(n) => serde_json::json!({ "w": n }),
819 Self::Tag(tag) => serde_json::json!({ "w": tag }),
820 }
821 }
822 }
823
824 #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
826 pub struct MongoReadPreference {
827 pub mode: ReadPreference,
829 pub max_staleness_seconds: Option<u32>,
831 pub tag_sets: Vec<serde_json::Map<String, JsonValue>>,
833 pub hedge: Option<bool>,
835 }
836
837 impl MongoReadPreference {
838 pub fn new(mode: ReadPreference) -> Self {
840 Self {
841 mode,
842 max_staleness_seconds: None,
843 tag_sets: Vec::new(),
844 hedge: None,
845 }
846 }
847
848 pub fn max_staleness(mut self, seconds: u32) -> Self {
850 self.max_staleness_seconds = Some(seconds);
851 self
852 }
853
854 pub fn tag_set(mut self, tags: serde_json::Map<String, JsonValue>) -> Self {
856 self.tag_sets.push(tags);
857 self
858 }
859
860 pub fn hedged(mut self) -> Self {
862 self.hedge = Some(true);
863 self
864 }
865
866 pub fn to_connection_options(&self) -> String {
868 let mut opts = vec![format!("readPreference={}", self.mode.to_mongodb())];
869
870 if let Some(staleness) = self.max_staleness_seconds {
871 opts.push(format!("maxStalenessSeconds={}", staleness));
872 }
873
874 opts.join("&")
875 }
876
877 pub fn to_command_options(&self) -> JsonValue {
879 let mut opts = serde_json::Map::new();
880 opts.insert(
881 "mode".to_string(),
882 serde_json::json!(self.mode.to_mongodb()),
883 );
884
885 if let Some(staleness) = self.max_staleness_seconds {
886 opts.insert(
887 "maxStalenessSeconds".to_string(),
888 serde_json::json!(staleness),
889 );
890 }
891
892 if !self.tag_sets.is_empty() {
893 opts.insert("tagSets".to_string(), serde_json::json!(self.tag_sets));
894 }
895
896 if let Some(hedge) = self.hedge {
897 opts.insert("hedge".to_string(), serde_json::json!({ "enabled": hedge }));
898 }
899
900 serde_json::json!(opts)
901 }
902 }
903
904 #[derive(Debug, Clone, Serialize, Deserialize)]
906 pub struct ReplicaSetStatus {
907 pub set: String,
909 pub members: Vec<MemberStatus>,
911 }
912
913 #[derive(Debug, Clone, Serialize, Deserialize)]
915 pub struct MemberStatus {
916 pub id: u32,
918 pub name: String,
920 pub state_str: String,
922 pub health: f64,
924 #[serde(default)]
926 pub lag_seconds: Option<i64>,
927 }
928
929 impl MemberStatus {
930 pub fn is_primary(&self) -> bool {
932 self.state_str == "PRIMARY"
933 }
934
935 pub fn is_secondary(&self) -> bool {
937 self.state_str == "SECONDARY"
938 }
939
940 pub fn is_healthy(&self) -> bool {
942 self.health >= 1.0
943 }
944 }
945}
946
947#[cfg(test)]
948mod tests {
949 use super::*;
950
951 #[test]
952 fn test_replica_config() {
953 let primary =
954 ReplicaConfig::primary("pg1", "postgres://primary:5432/db").with_region("us-east-1");
955
956 assert_eq!(primary.role, ReplicaRole::Primary);
957 assert_eq!(primary.region.as_deref(), Some("us-east-1"));
958 }
959
960 #[test]
961 fn test_replica_set_builder() {
962 let config = ReplicaSetConfig::new("myapp")
963 .primary("pg1", "postgres://primary:5432/db")
964 .secondary("pg2", "postgres://secondary1:5432/db")
965 .secondary("pg3", "postgres://secondary2:5432/db")
966 .read_preference(ReadPreference::SecondaryPreferred)
967 .build();
968
969 assert_eq!(config.name, "myapp");
970 assert_eq!(config.replicas.len(), 3);
971 assert!(config.primary().is_some());
972 assert_eq!(config.secondaries().count(), 2);
973 }
974
975 #[test]
976 fn test_read_preference_mongodb() {
977 assert_eq!(ReadPreference::Primary.to_mongodb(), "primary");
978 assert_eq!(
979 ReadPreference::SecondaryPreferred.to_mongodb(),
980 "secondaryPreferred"
981 );
982 assert_eq!(ReadPreference::Nearest.to_mongodb(), "nearest");
983 }
984
985 #[test]
986 fn test_connection_router_write() {
987 let config = ReplicaSetConfig::new("test")
988 .primary("pg1", "postgres://primary:5432/db")
989 .secondary("pg2", "postgres://secondary:5432/db")
990 .build();
991
992 let mut router = ConnectionRouter::new(config);
993
994 router.update_health(
996 "pg1",
997 HealthStatus::Healthy,
998 Some(Duration::from_millis(5)),
999 None,
1000 );
1001 router.update_health(
1002 "pg2",
1003 HealthStatus::Healthy,
1004 Some(Duration::from_millis(10)),
1005 Some(Duration::from_secs(1)),
1006 );
1007
1008 let target = router.route(QueryType::Write, None).unwrap();
1010 assert_eq!(target.id, "pg1");
1011 }
1012
1013 #[test]
1014 fn test_connection_router_read_secondary() {
1015 let config = ReplicaSetConfig::new("test")
1016 .primary("pg1", "postgres://primary:5432/db")
1017 .secondary("pg2", "postgres://secondary:5432/db")
1018 .read_preference(ReadPreference::Secondary)
1019 .build();
1020
1021 let mut router = ConnectionRouter::new(config);
1022 router.update_health(
1023 "pg1",
1024 HealthStatus::Healthy,
1025 Some(Duration::from_millis(5)),
1026 None,
1027 );
1028 router.update_health(
1029 "pg2",
1030 HealthStatus::Healthy,
1031 Some(Duration::from_millis(10)),
1032 Some(Duration::from_secs(1)),
1033 );
1034
1035 let target = router.route(QueryType::Read, None).unwrap();
1037 assert_eq!(target.id, "pg2");
1038 }
1039
1040 #[test]
1041 fn test_lag_monitor() {
1042 let mut monitor = LagMonitor::new(Duration::from_secs(10));
1043
1044 monitor.record("pg2", Duration::from_secs(5));
1045 assert!(monitor.is_acceptable("pg2"));
1046
1047 monitor.record("pg3", Duration::from_secs(15));
1048 assert!(!monitor.is_acceptable("pg3"));
1049
1050 let lagging = monitor.get_lagging_replicas();
1051 assert_eq!(lagging, vec!["pg3"]);
1052 }
1053
1054 #[test]
1055 fn test_failover() {
1056 let config = ReplicaSetConfig::new("test")
1057 .primary("pg1", "postgres://primary:5432/db")
1058 .replica(
1059 ReplicaConfig::secondary("pg2", "postgres://secondary1:5432/db").with_priority(80),
1060 )
1061 .replica(
1062 ReplicaConfig::secondary("pg3", "postgres://secondary2:5432/db").with_priority(60),
1063 )
1064 .build();
1065
1066 let mut router = ConnectionRouter::new(config);
1067 router.update_health("pg1", HealthStatus::Unhealthy, None, None);
1068 router.update_health(
1069 "pg2",
1070 HealthStatus::Healthy,
1071 Some(Duration::from_millis(10)),
1072 None,
1073 );
1074 router.update_health(
1075 "pg3",
1076 HealthStatus::Healthy,
1077 Some(Duration::from_millis(15)),
1078 None,
1079 );
1080
1081 let new_primary = router.initiate_failover().unwrap();
1082 assert_eq!(new_primary, "pg2"); }
1084
1085 mod mongodb_tests {
1086 use super::super::mongodb::*;
1087 use super::*;
1088
1089 #[test]
1090 fn test_read_concern() {
1091 assert_eq!(ReadConcern::Majority.as_str(), "majority");
1092 assert_eq!(ReadConcern::Local.as_str(), "local");
1093 }
1094
1095 #[test]
1096 fn test_write_concern() {
1097 let w = WriteConcern::Majority;
1098 let opts = w.to_options();
1099 assert_eq!(opts["w"], "majority");
1100
1101 let w2 = WriteConcern::W(3);
1102 let opts2 = w2.to_options();
1103 assert_eq!(opts2["w"], 3);
1104 }
1105
1106 #[test]
1107 fn test_mongo_read_preference() {
1108 let pref = MongoReadPreference::new(ReadPreference::SecondaryPreferred)
1109 .max_staleness(90)
1110 .hedged();
1111
1112 let conn_opts = pref.to_connection_options();
1113 assert!(conn_opts.contains("readPreference=secondaryPreferred"));
1114 assert!(conn_opts.contains("maxStalenessSeconds=90"));
1115
1116 let cmd_opts = pref.to_command_options();
1117 assert_eq!(cmd_opts["mode"], "secondaryPreferred");
1118 assert_eq!(cmd_opts["maxStalenessSeconds"], 90);
1119 }
1120 }
1121}