1use crate::config::PeerConfig;
57use crate::error::{ReplicationError, Result};
58use crate::metrics;
59use crate::resilience::RetryConfig;
60use redis::aio::ConnectionManager;
61use redis::Client;
62use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
63use std::sync::Arc;
64use std::time::{Duration, Instant};
65use tokio::sync::RwLock;
66use tokio::time::timeout;
67use tracing::{error, info, warn};
68
69#[derive(Debug, Clone, Copy, PartialEq, Eq)]
73pub enum PeerState {
74 Disconnected,
76 Connecting,
78 Connected,
80 Backoff,
82 Disabled,
84}
85
86#[derive(Debug, Clone, Copy, PartialEq, Eq)]
91pub enum PeerCircuitState {
92 Closed,
94 Open,
96}
97
98struct CachedMerkleRoot {
104 root: Option<[u8; 32]>,
106 expires_at: Instant,
108}
109
110const MERKLE_CACHE_TTL: Duration = Duration::from_secs(5);
113
114pub struct PeerConnection {
121 pub config: PeerConfig,
123 conn: RwLock<Option<ConnectionManager>>,
126 state: RwLock<PeerState>,
128 last_success: AtomicU64,
130 failure_count: AtomicU64,
132 shutdown: AtomicBool,
134 circuit_opened_at: RwLock<Option<Instant>>,
136 merkle_root_cache: RwLock<Option<CachedMerkleRoot>>,
138}
139
140impl PeerConnection {
141 pub fn new(config: PeerConfig) -> Self {
143 let initial_state = PeerState::Disconnected;
145
146 Self {
147 config,
148 conn: RwLock::new(None),
149 state: RwLock::new(initial_state),
150 last_success: AtomicU64::new(0),
151 failure_count: AtomicU64::new(0),
152 shutdown: AtomicBool::new(false),
153 circuit_opened_at: RwLock::new(None),
154 merkle_root_cache: RwLock::new(None),
155 }
156 }
157
158 pub fn node_id(&self) -> &str {
160 &self.config.node_id
161 }
162
163 pub fn cdc_stream_key(&self) -> String {
165 self.config.cdc_stream_key()
166 }
167
168 pub async fn state(&self) -> PeerState {
170 *self.state.read().await
171 }
172
173 pub async fn is_connected(&self) -> bool {
175 self.state().await == PeerState::Connected
176 }
177
178 pub async fn circuit_state(&self) -> PeerCircuitState {
184 let failures = self.failure_count.load(Ordering::Relaxed);
185 let threshold = self.config.circuit_failure_threshold as u64;
186
187 if failures >= threshold {
188 if let Some(opened_at) = *self.circuit_opened_at.read().await {
190 let reset_timeout = Duration::from_secs(self.config.circuit_reset_timeout_sec);
191 if opened_at.elapsed() >= reset_timeout {
192 return PeerCircuitState::Closed;
194 }
195 }
196 PeerCircuitState::Open
197 } else {
198 PeerCircuitState::Closed
199 }
200 }
201
202 pub async fn is_circuit_open(&self) -> bool {
204 self.circuit_state().await == PeerCircuitState::Open
205 }
206
207 pub async fn record_success(&self) {
209 self.failure_count.store(0, Ordering::Relaxed);
210 self.last_success.store(
211 std::time::SystemTime::now()
212 .duration_since(std::time::UNIX_EPOCH)
213 .unwrap_or_default()
214 .as_millis() as u64,
215 Ordering::Relaxed,
216 );
217 *self.circuit_opened_at.write().await = None;
219 }
220
221 pub async fn record_failure(&self) {
223 let failures = self.failure_count.fetch_add(1, Ordering::Relaxed) + 1;
224 let threshold = self.config.circuit_failure_threshold as u64;
225
226 if failures >= threshold {
227 let mut opened_at = self.circuit_opened_at.write().await;
229 if opened_at.is_none() {
230 *opened_at = Some(Instant::now());
231 warn!(
232 peer_id = %self.config.node_id,
233 failures,
234 threshold,
235 reset_timeout_sec = self.config.circuit_reset_timeout_sec,
236 "Circuit breaker opened for peer"
237 );
238 metrics::record_peer_circuit_state(&self.config.node_id, "open");
239 }
240 }
241 }
242
243 pub async fn connect(&self, retry_config: &RetryConfig) -> Result<()> {
245 *self.state.write().await = PeerState::Connecting;
246 info!(peer_id = %self.config.node_id, url = %self.config.redis_url, "Connecting to peer");
247
248 let client = Client::open(self.config.redis_url.as_str()).map_err(|e| {
249 ReplicationError::PeerConnection {
250 peer_id: self.config.node_id.clone(),
251 message: format!("Invalid Redis URL: {}", e),
252 }
253 })?;
254
255 let mut attempt = 0;
256 let mut delay = retry_config.initial_delay;
257
258 loop {
259 if self.shutdown.load(Ordering::Acquire) {
260 return Err(ReplicationError::Shutdown);
261 }
262
263 attempt += 1;
264
265 let conn_result = timeout(
267 retry_config.connection_timeout,
268 client.get_connection_manager(),
269 )
270 .await;
271
272 match conn_result {
273 Ok(Ok(conn)) => {
274 *self.conn.write().await = Some(conn);
275 *self.state.write().await = PeerState::Connected;
276 self.failure_count.store(0, Ordering::Release);
277 self.last_success
278 .store(epoch_millis(), Ordering::Release);
279
280 metrics::record_peer_connection(&self.config.node_id, true);
281 metrics::record_peer_state(&self.config.node_id, "connected");
282
283 if attempt > 1 {
284 info!(
285 peer_id = %self.config.node_id,
286 attempt,
287 "Connected to peer after retry"
288 );
289 } else {
290 info!(peer_id = %self.config.node_id, "Connected to peer");
291 }
292 return Ok(());
293 }
294 Ok(Err(e)) => {
295 self.failure_count.fetch_add(1, Ordering::AcqRel);
297 let err_msg = format!("{}", e);
298
299 if attempt >= retry_config.max_attempts {
300 *self.state.write().await = PeerState::Backoff;
301 metrics::record_peer_connection(&self.config.node_id, false);
302 metrics::record_peer_state(&self.config.node_id, "backoff");
303 error!(
304 peer_id = %self.config.node_id,
305 attempt,
306 error = %e,
307 "Failed to connect after max retries"
308 );
309 return Err(ReplicationError::PeerConnection {
310 peer_id: self.config.node_id.clone(),
311 message: format!("Connection failed after {} attempts: {}", attempt, err_msg),
312 });
313 }
314
315 warn!(
316 peer_id = %self.config.node_id,
317 attempt,
318 delay_ms = delay.as_millis(),
319 error = %e,
320 "Connection attempt failed, retrying"
321 );
322
323 tokio::time::sleep(delay).await;
324 delay = std::cmp::min(
325 Duration::from_secs_f64(delay.as_secs_f64() * retry_config.backoff_factor),
326 retry_config.max_delay,
327 );
328 }
329 Err(_) => {
330 self.failure_count.fetch_add(1, Ordering::AcqRel);
332
333 if attempt >= retry_config.max_attempts {
334 *self.state.write().await = PeerState::Backoff;
335 error!(
336 peer_id = %self.config.node_id,
337 attempt,
338 timeout_ms = retry_config.connection_timeout.as_millis(),
339 "Connection timed out after max retries"
340 );
341 return Err(ReplicationError::PeerConnection {
342 peer_id: self.config.node_id.clone(),
343 message: format!(
344 "Connection timed out after {} attempts ({}ms timeout)",
345 attempt,
346 retry_config.connection_timeout.as_millis()
347 ),
348 });
349 }
350
351 warn!(
352 peer_id = %self.config.node_id,
353 attempt,
354 delay_ms = delay.as_millis(),
355 timeout_ms = retry_config.connection_timeout.as_millis(),
356 "Connection attempt timed out, retrying"
357 );
358
359 tokio::time::sleep(delay).await;
360 delay = std::cmp::min(
361 Duration::from_secs_f64(delay.as_secs_f64() * retry_config.backoff_factor),
362 retry_config.max_delay,
363 );
364 }
365 }
366 }
367 }
368
369 pub async fn connection(&self) -> Option<ConnectionManager> {
373 self.conn.read().await.clone()
374 }
375
376 pub async fn ensure_connected(&self) -> Result<ConnectionManager> {
384 if let Some(conn) = self.connection().await {
386 return Ok(conn);
387 }
388
389 self.connect(&RetryConfig::default()).await?;
391
392 self.connection().await.ok_or_else(|| {
393 ReplicationError::PeerConnection {
394 peer_id: self.config.node_id.clone(),
395 message: "Connection lost immediately after connect".to_string(),
396 }
397 })
398 }
399
400 pub fn failure_count(&self) -> u64 {
402 self.failure_count.load(Ordering::Acquire)
403 }
404
405 pub fn millis_since_success(&self) -> u64 {
407 let last = self.last_success.load(Ordering::Acquire);
408 if last == 0 {
409 return u64::MAX;
410 }
411 epoch_millis().saturating_sub(last)
412 }
413
414 pub async fn mark_disconnected(&self) {
416 *self.conn.write().await = None;
417 *self.state.write().await = PeerState::Disconnected;
418 metrics::record_peer_state(&self.config.node_id, "disconnected");
419 warn!(peer_id = %self.config.node_id, "Connection marked as disconnected");
420 }
421
422 pub fn shutdown(&self) {
424 self.shutdown.store(true, Ordering::Release);
425 }
426
427 pub async fn ping(&self) -> Result<Duration> {
436 let conn = self.connection().await.ok_or_else(|| {
437 ReplicationError::PeerConnection {
438 peer_id: self.config.node_id.clone(),
439 message: "Not connected".to_string(),
440 }
441 })?;
442
443 let mut conn = conn;
444 let start = std::time::Instant::now();
445
446 let result: String = redis::cmd("PING")
447 .query_async(&mut conn)
448 .await
449 .map_err(|e| ReplicationError::PeerConnection {
450 peer_id: self.config.node_id.clone(),
451 message: format!("PING failed: {}", e),
452 })?;
453
454 let latency = start.elapsed();
455
456 if result == "PONG" {
457 self.record_success().await;
458 metrics::record_peer_ping(&self.config.node_id, true);
459 metrics::record_peer_ping_latency(&self.config.node_id, latency);
460 Ok(latency)
461 } else {
462 self.record_failure().await;
463 metrics::record_peer_ping(&self.config.node_id, false);
464 Err(ReplicationError::PeerConnection {
465 peer_id: self.config.node_id.clone(),
466 message: format!("Unexpected PING response: {}", result),
467 })
468 }
469 }
470
471 pub async fn get_merkle_root(&self) -> Result<Option<[u8; 32]>> {
480 {
482 let cache = self.merkle_root_cache.read().await;
483 if let Some(ref cached) = *cache {
484 if Instant::now() < cached.expires_at {
485 return Ok(cached.root);
486 }
487 }
488 }
489
490 let start = Instant::now();
492 let conn = self.connection().await.ok_or_else(|| {
493 ReplicationError::PeerConnection {
494 peer_id: self.config.node_id.clone(),
495 message: "Not connected".to_string(),
496 }
497 })?;
498
499 let mut conn = conn;
500 let result: Option<String> = redis::cmd("GET")
501 .arg("merkle:hash:")
502 .query_async(&mut conn)
503 .await
504 .map_err(|e| ReplicationError::PeerConnection {
505 peer_id: self.config.node_id.clone(),
506 message: format!("Failed to get Merkle root: {}", e),
507 })?;
508
509 metrics::record_peer_operation_latency(&self.config.node_id, "get_merkle_root", start.elapsed());
510
511 let root = match result {
512 Some(hex_str) => {
513 let bytes = hex::decode(&hex_str).map_err(|e| {
514 ReplicationError::PeerConnection {
515 peer_id: self.config.node_id.clone(),
516 message: format!("Invalid Merkle root hex: {}", e),
517 }
518 })?;
519 let arr: [u8; 32] = bytes.try_into().map_err(|_| {
520 ReplicationError::PeerConnection {
521 peer_id: self.config.node_id.clone(),
522 message: "Merkle root is not 32 bytes".to_string(),
523 }
524 })?;
525 Some(arr)
526 }
527 None => None,
528 };
529
530 {
532 let mut cache = self.merkle_root_cache.write().await;
533 *cache = Some(CachedMerkleRoot {
534 root,
535 expires_at: Instant::now() + MERKLE_CACHE_TTL,
536 });
537 }
538
539 Ok(root)
540 }
541
542 pub async fn invalidate_merkle_cache(&self) {
544 let mut cache = self.merkle_root_cache.write().await;
545 *cache = None;
546 }
547
548 pub async fn get_merkle_children(&self, path: &str) -> Result<Vec<(String, [u8; 32])>> {
550 let start = Instant::now();
551 let conn = self.connection().await.ok_or_else(|| {
552 ReplicationError::PeerConnection {
553 peer_id: self.config.node_id.clone(),
554 message: "Not connected".to_string(),
555 }
556 })?;
557
558 let mut conn = conn;
559 let key = format!("merkle:children:{}", path);
560
561 let items: Vec<(String, f64)> = redis::cmd("ZRANGE")
563 .arg(&key)
564 .arg(0)
565 .arg(-1)
566 .arg("WITHSCORES")
567 .query_async(&mut conn)
568 .await
569 .map_err(|e| ReplicationError::PeerConnection {
570 peer_id: self.config.node_id.clone(),
571 message: format!("Failed to get Merkle children: {}", e),
572 })?;
573
574 let mut children = Vec::with_capacity(items.len());
575 for (child_name, _score) in items {
576 let child_path = if path.is_empty() {
578 child_name.clone()
579 } else {
580 format!("{}/{}", path, child_name)
581 };
582 let hash_key = format!("merkle:hash:{}", child_path);
583
584 let hex_hash: Option<String> = redis::cmd("GET")
585 .arg(&hash_key)
586 .query_async(&mut conn)
587 .await
588 .map_err(|e| ReplicationError::PeerConnection {
589 peer_id: self.config.node_id.clone(),
590 message: format!("Failed to get child hash: {}", e),
591 })?;
592
593 if let Some(hex_str) = hex_hash {
594 let bytes = hex::decode(&hex_str).map_err(|e| {
595 ReplicationError::PeerConnection {
596 peer_id: self.config.node_id.clone(),
597 message: format!("Invalid child hash hex: {}", e),
598 }
599 })?;
600 let arr: [u8; 32] = bytes.try_into().map_err(|_| {
601 ReplicationError::PeerConnection {
602 peer_id: self.config.node_id.clone(),
603 message: "Child hash is not 32 bytes".to_string(),
604 }
605 })?;
606 children.push((child_name, arr));
607 }
608 }
609
610 metrics::record_peer_operation_latency(&self.config.node_id, "get_merkle_children", start.elapsed());
611
612 Ok(children)
613 }
614
615 pub async fn get_item(&self, key: &str) -> Result<Option<Vec<u8>>> {
617 let start = Instant::now();
618 let conn = self.connection().await.ok_or_else(|| {
619 ReplicationError::PeerConnection {
620 peer_id: self.config.node_id.clone(),
621 message: "Not connected".to_string(),
622 }
623 })?;
624
625 let mut conn = conn;
626 let redis_key = format!("item:{}", key);
627
628 let data: Option<Vec<u8>> = redis::cmd("GET")
629 .arg(&redis_key)
630 .query_async(&mut conn)
631 .await
632 .map_err(|e| ReplicationError::PeerConnection {
633 peer_id: self.config.node_id.clone(),
634 message: format!("Failed to get item: {}", e),
635 })?;
636
637 metrics::record_peer_operation_latency(&self.config.node_id, "get_item", start.elapsed());
638
639 Ok(data)
640 }
641}
642
643pub struct PeerManager {
645 peers: dashmap::DashMap<String, Arc<PeerConnection>>,
647 retry_config: RetryConfig,
649}
650
651impl PeerManager {
652 pub fn new(retry_config: RetryConfig) -> Self {
654 Self {
655 peers: dashmap::DashMap::new(),
656 retry_config,
657 }
658 }
659
660 pub fn add_peer(&self, config: PeerConfig) {
662 let node_id = config.node_id.clone();
663 let conn = Arc::new(PeerConnection::new(config));
664 self.peers.insert(node_id, conn);
665 }
666
667 pub fn get(&self, node_id: &str) -> Option<Arc<PeerConnection>> {
669 self.peers.get(node_id).map(|r| r.value().clone())
670 }
671
672 pub fn all(&self) -> Vec<Arc<PeerConnection>> {
674 self.peers.iter().map(|r| r.value().clone()).collect()
675 }
676
677 pub async fn connect_all(&self) -> Vec<Result<()>> {
679 let mut results = Vec::new();
680 for peer in self.all() {
681 let result = peer.connect(&self.retry_config).await;
682 results.push(result);
683 }
684 results
685 }
686
687 pub fn remove_peer(&self, node_id: &str) {
689 if let Some((_, peer)) = self.peers.remove(node_id) {
690 peer.shutdown();
691 }
692 }
693
694 pub fn shutdown_all(&self) {
696 for peer in self.peers.iter() {
697 peer.shutdown();
698 }
699 }
700
701 pub async fn connected_count(&self) -> usize {
703 let mut count = 0;
704 for peer in self.all() {
705 if peer.is_connected().await {
706 count += 1;
707 }
708 }
709 count
710 }
711}
712
713fn epoch_millis() -> u64 {
715 std::time::SystemTime::now()
716 .duration_since(std::time::UNIX_EPOCH)
717 .unwrap_or_default()
718 .as_millis() as u64
719}
720
721#[cfg(test)]
722mod tests {
723 use super::*;
724
725 #[test]
726 fn test_peer_connection_new() {
727 let config = PeerConfig::for_testing("test-peer", "redis://localhost:6379");
728
729 let conn = PeerConnection::new(config);
730 assert_eq!(conn.node_id(), "test-peer");
731 assert_eq!(conn.cdc_stream_key(), "__local__:cdc");
732 }
733
734 #[test]
735 fn test_peer_state_variants() {
736 assert_eq!(PeerState::Disconnected, PeerState::Disconnected);
737 assert_ne!(PeerState::Connected, PeerState::Disconnected);
738 assert_ne!(PeerState::Connecting, PeerState::Backoff);
739 assert_eq!(PeerState::Disabled, PeerState::Disabled);
740 }
741
742 #[test]
743 fn test_peer_circuit_state_variants() {
744 assert_eq!(PeerCircuitState::Closed, PeerCircuitState::Closed);
745 assert_ne!(PeerCircuitState::Open, PeerCircuitState::Closed);
746 }
747
748 #[tokio::test]
749 async fn test_peer_initial_state() {
750 let config = PeerConfig::for_testing("test-peer", "redis://localhost:6379");
751 let conn = PeerConnection::new(config);
752
753 assert_eq!(conn.state().await, PeerState::Disconnected);
754 assert!(!conn.is_connected().await);
755 assert_eq!(conn.failure_count(), 0);
756 }
757
758 #[tokio::test]
759 async fn test_peer_connection_not_connected() {
760 let config = PeerConfig::for_testing("test-peer", "redis://localhost:6379");
761 let conn = PeerConnection::new(config);
762
763 assert!(conn.connection().await.is_none());
765 }
766
767 #[tokio::test]
768 async fn test_peer_circuit_breaker() {
769 let config = PeerConfig {
770 node_id: "test-peer".to_string(),
771 redis_url: "redis://localhost:6379".to_string(),
772 priority: 0,
773 circuit_failure_threshold: 3,
774 circuit_reset_timeout_sec: 1,
775 redis_prefix: None,
776 };
777
778 let conn = PeerConnection::new(config);
779
780 assert_eq!(conn.circuit_state().await, PeerCircuitState::Closed);
782 assert!(!conn.is_circuit_open().await);
783
784 conn.record_failure().await;
786 assert_eq!(conn.circuit_state().await, PeerCircuitState::Closed);
787 conn.record_failure().await;
788 assert_eq!(conn.circuit_state().await, PeerCircuitState::Closed);
789 conn.record_failure().await;
790
791 assert_eq!(conn.circuit_state().await, PeerCircuitState::Open);
793 assert!(conn.is_circuit_open().await);
794
795 tokio::time::sleep(Duration::from_secs(2)).await;
797
798 assert_eq!(conn.circuit_state().await, PeerCircuitState::Closed);
800
801 conn.record_success().await;
803 assert_eq!(conn.failure_count(), 0);
804 assert!(!conn.is_circuit_open().await);
805 }
806
807 #[tokio::test]
808 async fn test_peer_record_success_updates_last_success() {
809 let config = PeerConfig::for_testing("test-peer", "redis://localhost:6379");
810 let conn = PeerConnection::new(config);
811
812 assert_eq!(conn.last_success.load(Ordering::Acquire), 0);
814
815 conn.record_failure().await;
817 conn.record_failure().await;
818 assert_eq!(conn.failure_count(), 2);
819
820 conn.record_success().await;
822 assert_eq!(conn.failure_count(), 0);
823 assert!(conn.last_success.load(Ordering::Acquire) > 0);
824 }
825
826 #[tokio::test]
827 async fn test_peer_millis_since_success_no_success() {
828 let config = PeerConfig::for_testing("test-peer", "redis://localhost:6379");
829 let conn = PeerConnection::new(config);
830
831 assert_eq!(conn.millis_since_success(), u64::MAX);
833 }
834
835 #[tokio::test]
836 async fn test_peer_millis_since_success_after_success() {
837 let config = PeerConfig::for_testing("test-peer", "redis://localhost:6379");
838 let conn = PeerConnection::new(config);
839
840 conn.record_success().await;
841 tokio::time::sleep(Duration::from_millis(100)).await;
842
843 let millis = conn.millis_since_success();
844 assert!(millis >= 100);
846 assert!(millis < 1000);
848 }
849
850 #[tokio::test]
851 async fn test_peer_shutdown_flag() {
852 let config = PeerConfig::for_testing("test-peer", "redis://localhost:6379");
853 let conn = PeerConnection::new(config);
854
855 assert!(!conn.shutdown.load(Ordering::Acquire));
856 conn.shutdown();
857 assert!(conn.shutdown.load(Ordering::Acquire));
858 }
859
860 #[tokio::test]
861 async fn test_peer_mark_disconnected() {
862 let config = PeerConfig::for_testing("test-peer", "redis://localhost:6379");
863 let conn = PeerConnection::new(config);
864
865 assert_eq!(conn.state().await, PeerState::Disconnected);
867
868 conn.mark_disconnected().await;
870 assert_eq!(conn.state().await, PeerState::Disconnected);
871 assert!(conn.connection().await.is_none());
872 }
873
874 #[tokio::test]
875 async fn test_peer_invalidate_merkle_cache() {
876 let config = PeerConfig::for_testing("test-peer", "redis://localhost:6379");
877 let conn = PeerConnection::new(config);
878
879 assert!(conn.merkle_root_cache.read().await.is_none());
881
882 {
884 let mut cache = conn.merkle_root_cache.write().await;
885 *cache = Some(CachedMerkleRoot {
886 root: Some([0u8; 32]),
887 expires_at: Instant::now() + Duration::from_secs(60),
888 });
889 }
890
891 assert!(conn.merkle_root_cache.read().await.is_some());
893
894 conn.invalidate_merkle_cache().await;
896
897 assert!(conn.merkle_root_cache.read().await.is_none());
899 }
900
901 #[test]
902 fn test_peer_manager_add_peer() {
903 let manager = PeerManager::new(RetryConfig::testing());
904 manager.add_peer(PeerConfig::for_testing("peer-1", "redis://peer1:6379"));
905 manager.add_peer(PeerConfig::for_testing("peer-2", "redis://peer2:6379"));
906
907 let peers = manager.all();
908 assert_eq!(peers.len(), 2);
909 }
910
911 #[test]
912 fn test_peer_manager_get() {
913 let manager = PeerManager::new(RetryConfig::testing());
914 manager.add_peer(PeerConfig::for_testing("peer-1", "redis://peer1:6379"));
915
916 assert!(manager.get("peer-1").is_some());
917 assert!(manager.get("nonexistent").is_none());
918 }
919
920 #[test]
921 fn test_peer_manager_remove_peer() {
922 let manager = PeerManager::new(RetryConfig::testing());
923 manager.add_peer(PeerConfig::for_testing("peer-1", "redis://peer1:6379"));
924
925 assert!(manager.get("peer-1").is_some());
926
927 manager.remove_peer("peer-1");
928
929 assert!(manager.get("peer-1").is_none());
930 }
931
932 #[test]
933 fn test_peer_manager_remove_nonexistent() {
934 let manager = PeerManager::new(RetryConfig::testing());
935 manager.remove_peer("nonexistent");
937 }
938
939 #[test]
940 fn test_peer_manager_shutdown_all() {
941 let manager = PeerManager::new(RetryConfig::testing());
942 manager.add_peer(PeerConfig::for_testing("peer-1", "redis://peer1:6379"));
943 manager.add_peer(PeerConfig::for_testing("peer-2", "redis://peer2:6379"));
944
945 manager.shutdown_all();
946
947 for peer in manager.all() {
949 assert!(peer.shutdown.load(Ordering::Acquire));
950 }
951 }
952
953 #[tokio::test]
954 async fn test_peer_manager_connected_count_none() {
955 let manager = PeerManager::new(RetryConfig::testing());
956 manager.add_peer(PeerConfig::for_testing("peer-1", "redis://peer1:6379"));
957 manager.add_peer(PeerConfig::for_testing("peer-2", "redis://peer2:6379"));
958
959 assert_eq!(manager.connected_count().await, 0);
961 }
962
963 #[test]
964 fn test_epoch_millis() {
965 let millis = epoch_millis();
966 assert!(millis > 1577836800000); assert!(millis < 4102444800000); }
970
971 #[test]
972 fn test_merkle_cache_ttl() {
973 assert_eq!(MERKLE_CACHE_TTL, Duration::from_secs(5));
975 }
976
977 #[tokio::test]
978 async fn test_peer_ping_not_connected() {
979 let config = PeerConfig::for_testing("test-peer", "redis://localhost:6379");
980 let conn = PeerConnection::new(config);
981
982 let result = conn.ping().await;
984 assert!(result.is_err());
985
986 if let Err(ReplicationError::PeerConnection { peer_id, message }) = result {
987 assert_eq!(peer_id, "test-peer");
988 assert!(message.contains("Not connected"));
989 } else {
990 panic!("Expected PeerConnection error");
991 }
992 }
993
994 #[tokio::test]
995 async fn test_peer_get_merkle_root_not_connected() {
996 let config = PeerConfig::for_testing("test-peer", "redis://localhost:6379");
997 let conn = PeerConnection::new(config);
998
999 let result = conn.get_merkle_root().await;
1000 assert!(result.is_err());
1001 }
1002
1003 #[tokio::test]
1004 async fn test_peer_get_merkle_children_not_connected() {
1005 let config = PeerConfig::for_testing("test-peer", "redis://localhost:6379");
1006 let conn = PeerConnection::new(config);
1007
1008 let result = conn.get_merkle_children("some/path").await;
1009 assert!(result.is_err());
1010 }
1011
1012 #[tokio::test]
1013 async fn test_peer_get_item_not_connected() {
1014 let config = PeerConfig::for_testing("test-peer", "redis://localhost:6379");
1015 let conn = PeerConnection::new(config);
1016
1017 let result = conn.get_item("some-key").await;
1018 assert!(result.is_err());
1019 }
1020
1021 #[tokio::test]
1022 async fn test_peer_ensure_connected_fails() {
1023 let config = PeerConfig::for_testing("test-peer", "redis://localhost:1"); let conn = PeerConnection::new(config);
1028
1029 assert!(!conn.is_connected().await);
1031 }
1032
1033 #[tokio::test]
1034 async fn test_peer_circuit_opens_on_threshold() {
1035 let config = PeerConfig {
1036 node_id: "test-peer".to_string(),
1037 redis_url: "redis://localhost:6379".to_string(),
1038 priority: 0,
1039 circuit_failure_threshold: 2, circuit_reset_timeout_sec: 30,
1041 redis_prefix: None,
1042 };
1043
1044 let conn = PeerConnection::new(config);
1045
1046 conn.record_failure().await;
1048 assert_eq!(conn.circuit_state().await, PeerCircuitState::Closed);
1049 assert_eq!(conn.failure_count(), 1);
1050
1051 conn.record_failure().await;
1053 assert_eq!(conn.circuit_state().await, PeerCircuitState::Open);
1054 assert_eq!(conn.failure_count(), 2);
1055 }
1056}