1use crate::config::PeerConfig;
57use crate::error::{ReplicationError, Result};
58use crate::metrics;
59use crate::resilience::RetryConfig;
60use redis::aio::ConnectionManager;
61use redis::Client;
62use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
63use std::sync::Arc;
64use std::time::{Duration, Instant};
65use tokio::sync::RwLock;
66use tokio::time::timeout;
67use tracing::{error, info, warn};
68
69#[derive(Debug, Clone, Copy, PartialEq, Eq)]
73pub enum PeerState {
74 Disconnected,
76 Connecting,
78 Connected,
80 Backoff,
82 Disabled,
84}
85
86#[derive(Debug, Clone, Copy, PartialEq, Eq)]
91pub enum PeerCircuitState {
92 Closed,
94 Open,
96}
97
98struct CachedMerkleRoot {
104 root: Option<[u8; 32]>,
106 expires_at: Instant,
108}
109
110const MERKLE_CACHE_TTL: Duration = Duration::from_secs(5);
113
114pub struct PeerConnection {
121 pub config: PeerConfig,
123 conn: RwLock<Option<ConnectionManager>>,
126 query_conn: RwLock<Option<ConnectionManager>>,
129 state: RwLock<PeerState>,
131 last_success: AtomicU64,
133 failure_count: AtomicU64,
135 shutdown: AtomicBool,
137 circuit_opened_at: RwLock<Option<Instant>>,
139 merkle_root_cache: RwLock<Option<CachedMerkleRoot>>,
141}
142
143impl PeerConnection {
144 pub fn new(config: PeerConfig) -> Self {
146 let initial_state = PeerState::Disconnected;
148
149 Self {
150 config,
151 conn: RwLock::new(None),
152 query_conn: RwLock::new(None),
153 state: RwLock::new(initial_state),
154 last_success: AtomicU64::new(0),
155 failure_count: AtomicU64::new(0),
156 shutdown: AtomicBool::new(false),
157 circuit_opened_at: RwLock::new(None),
158 merkle_root_cache: RwLock::new(None),
159 }
160 }
161
162 pub fn node_id(&self) -> &str {
164 &self.config.node_id
165 }
166
167 pub fn cdc_stream_key(&self) -> String {
169 self.config.cdc_stream_key()
170 }
171
172 pub async fn state(&self) -> PeerState {
174 *self.state.read().await
175 }
176
177 pub async fn is_connected(&self) -> bool {
179 self.state().await == PeerState::Connected
180 }
181
182 pub async fn circuit_state(&self) -> PeerCircuitState {
188 let failures = self.failure_count.load(Ordering::Relaxed);
189 let threshold = self.config.circuit_failure_threshold as u64;
190
191 if failures >= threshold {
192 if let Some(opened_at) = *self.circuit_opened_at.read().await {
194 let reset_timeout = Duration::from_secs(self.config.circuit_reset_timeout_sec);
195 if opened_at.elapsed() >= reset_timeout {
196 return PeerCircuitState::Closed;
198 }
199 }
200 PeerCircuitState::Open
201 } else {
202 PeerCircuitState::Closed
203 }
204 }
205
206 pub async fn is_circuit_open(&self) -> bool {
208 self.circuit_state().await == PeerCircuitState::Open
209 }
210
211 pub async fn record_success(&self) {
213 self.failure_count.store(0, Ordering::Relaxed);
214 self.last_success.store(
215 std::time::SystemTime::now()
216 .duration_since(std::time::UNIX_EPOCH)
217 .unwrap_or_default()
218 .as_millis() as u64,
219 Ordering::Relaxed,
220 );
221 *self.circuit_opened_at.write().await = None;
223 }
224
225 pub async fn record_failure(&self) {
227 let failures = self.failure_count.fetch_add(1, Ordering::Relaxed) + 1;
228 let threshold = self.config.circuit_failure_threshold as u64;
229
230 if failures >= threshold {
231 let mut opened_at = self.circuit_opened_at.write().await;
233 if opened_at.is_none() {
234 *opened_at = Some(Instant::now());
235 warn!(
236 peer_id = %self.config.node_id,
237 failures,
238 threshold,
239 reset_timeout_sec = self.config.circuit_reset_timeout_sec,
240 "Circuit breaker opened for peer"
241 );
242 metrics::record_peer_circuit_state(&self.config.node_id, "open");
243 }
244 }
245 }
246
247 pub async fn connect(&self, retry_config: &RetryConfig) -> Result<()> {
249 *self.state.write().await = PeerState::Connecting;
250 info!(peer_id = %self.config.node_id, url = %self.config.redis_url, "Connecting to peer");
251
252 let client = Client::open(self.config.redis_url.as_str()).map_err(|e| {
253 ReplicationError::PeerConnection {
254 peer_id: self.config.node_id.clone(),
255 message: format!("Invalid Redis URL: {}", e),
256 }
257 })?;
258
259 let mut attempt = 0;
260 let mut delay = retry_config.initial_delay;
261
262 loop {
263 if self.shutdown.load(Ordering::Acquire) {
264 return Err(ReplicationError::Shutdown);
265 }
266
267 attempt += 1;
268
269 let conn_result = timeout(
271 retry_config.connection_timeout,
272 client.get_connection_manager(),
273 )
274 .await;
275
276 match conn_result {
277 Ok(Ok(conn)) => {
278 let query_conn = client.get_connection_manager().await.map_err(|e| {
281 ReplicationError::PeerConnection {
282 peer_id: self.config.node_id.clone(),
283 message: format!("Failed to create query connection: {}", e),
284 }
285 })?;
286
287 *self.conn.write().await = Some(conn);
288 *self.query_conn.write().await = Some(query_conn);
289 *self.state.write().await = PeerState::Connected;
290 self.failure_count.store(0, Ordering::Release);
291 self.last_success
292 .store(epoch_millis(), Ordering::Release);
293
294 metrics::record_peer_connection(&self.config.node_id, true);
295 metrics::record_peer_state(&self.config.node_id, "connected");
296
297 if attempt > 1 {
298 info!(
299 peer_id = %self.config.node_id,
300 attempt,
301 "Connected to peer after retry"
302 );
303 } else {
304 info!(peer_id = %self.config.node_id, "Connected to peer");
305 }
306 return Ok(());
307 }
308 Ok(Err(e)) => {
309 self.failure_count.fetch_add(1, Ordering::AcqRel);
311 let err_msg = format!("{}", e);
312
313 if attempt >= retry_config.max_attempts {
314 *self.state.write().await = PeerState::Backoff;
315 metrics::record_peer_connection(&self.config.node_id, false);
316 metrics::record_peer_state(&self.config.node_id, "backoff");
317 error!(
318 peer_id = %self.config.node_id,
319 attempt,
320 error = %e,
321 "Failed to connect after max retries"
322 );
323 return Err(ReplicationError::PeerConnection {
324 peer_id: self.config.node_id.clone(),
325 message: format!("Connection failed after {} attempts: {}", attempt, err_msg),
326 });
327 }
328
329 warn!(
330 peer_id = %self.config.node_id,
331 attempt,
332 delay_ms = delay.as_millis(),
333 error = %e,
334 "Connection attempt failed, retrying"
335 );
336
337 tokio::time::sleep(delay).await;
338 delay = std::cmp::min(
339 Duration::from_secs_f64(delay.as_secs_f64() * retry_config.backoff_factor),
340 retry_config.max_delay,
341 );
342 }
343 Err(_) => {
344 self.failure_count.fetch_add(1, Ordering::AcqRel);
346
347 if attempt >= retry_config.max_attempts {
348 *self.state.write().await = PeerState::Backoff;
349 error!(
350 peer_id = %self.config.node_id,
351 attempt,
352 timeout_ms = retry_config.connection_timeout.as_millis(),
353 "Connection timed out after max retries"
354 );
355 return Err(ReplicationError::PeerConnection {
356 peer_id: self.config.node_id.clone(),
357 message: format!(
358 "Connection timed out after {} attempts ({}ms timeout)",
359 attempt,
360 retry_config.connection_timeout.as_millis()
361 ),
362 });
363 }
364
365 warn!(
366 peer_id = %self.config.node_id,
367 attempt,
368 delay_ms = delay.as_millis(),
369 timeout_ms = retry_config.connection_timeout.as_millis(),
370 "Connection attempt timed out, retrying"
371 );
372
373 tokio::time::sleep(delay).await;
374 delay = std::cmp::min(
375 Duration::from_secs_f64(delay.as_secs_f64() * retry_config.backoff_factor),
376 retry_config.max_delay,
377 );
378 }
379 }
380 }
381 }
382
383 pub async fn connection(&self) -> Option<ConnectionManager> {
387 self.conn.read().await.clone()
388 }
389
390 pub async fn query_connection(&self) -> Option<ConnectionManager> {
397 self.query_conn.read().await.clone()
398 }
399
400 pub async fn ensure_connected(&self) -> Result<ConnectionManager> {
408 if let Some(conn) = self.connection().await {
410 return Ok(conn);
411 }
412
413 self.connect(&RetryConfig::default()).await?;
415
416 self.connection().await.ok_or_else(|| {
417 ReplicationError::PeerConnection {
418 peer_id: self.config.node_id.clone(),
419 message: "Connection lost immediately after connect".to_string(),
420 }
421 })
422 }
423
424 pub fn failure_count(&self) -> u64 {
426 self.failure_count.load(Ordering::Acquire)
427 }
428
429 pub fn millis_since_success(&self) -> u64 {
431 let last = self.last_success.load(Ordering::Acquire);
432 if last == 0 {
433 return u64::MAX;
434 }
435 epoch_millis().saturating_sub(last)
436 }
437
438 pub async fn mark_disconnected(&self) {
440 *self.conn.write().await = None;
441 *self.state.write().await = PeerState::Disconnected;
442 metrics::record_peer_state(&self.config.node_id, "disconnected");
443 warn!(peer_id = %self.config.node_id, "Connection marked as disconnected");
444 }
445
446 pub fn shutdown(&self) {
448 self.shutdown.store(true, Ordering::Release);
449 }
450
451 pub async fn ping(&self) -> Result<Duration> {
460 let conn = self.connection().await.ok_or_else(|| {
461 ReplicationError::PeerConnection {
462 peer_id: self.config.node_id.clone(),
463 message: "Not connected".to_string(),
464 }
465 })?;
466
467 let mut conn = conn;
468 let start = std::time::Instant::now();
469
470 let result: String = redis::cmd("PING")
471 .query_async(&mut conn)
472 .await
473 .map_err(|e| ReplicationError::PeerConnection {
474 peer_id: self.config.node_id.clone(),
475 message: format!("PING failed: {}", e),
476 })?;
477
478 let latency = start.elapsed();
479
480 if result == "PONG" {
481 self.record_success().await;
482 metrics::record_peer_ping(&self.config.node_id, true);
483 metrics::record_peer_ping_latency(&self.config.node_id, latency);
484 Ok(latency)
485 } else {
486 self.record_failure().await;
487 metrics::record_peer_ping(&self.config.node_id, false);
488 Err(ReplicationError::PeerConnection {
489 peer_id: self.config.node_id.clone(),
490 message: format!("Unexpected PING response: {}", result),
491 })
492 }
493 }
494
495 #[inline]
503 fn prefixed_key(&self, suffix: &str) -> String {
504 match &self.config.redis_prefix {
505 Some(prefix) => format!("{}{}", prefix, suffix),
506 None => suffix.to_string(),
507 }
508 }
509
510 pub async fn get_merkle_root(&self) -> Result<Option<[u8; 32]>> {
519 {
521 let cache = self.merkle_root_cache.read().await;
522 if let Some(ref cached) = *cache {
523 if Instant::now() < cached.expires_at {
524 return Ok(cached.root);
525 }
526 }
527 }
528
529 let start = Instant::now();
532 let conn = self.query_connection().await.ok_or_else(|| {
533 ReplicationError::PeerConnection {
534 peer_id: self.config.node_id.clone(),
535 message: "Not connected".to_string(),
536 }
537 })?;
538
539 let mut conn = conn;
540 let key = self.prefixed_key("merkle:hash:");
542 let result: Option<String> = redis::cmd("GET")
543 .arg(&key)
544 .query_async(&mut conn)
545 .await
546 .map_err(|e| ReplicationError::PeerConnection {
547 peer_id: self.config.node_id.clone(),
548 message: format!("Failed to get Merkle root: {}", e),
549 })?;
550
551 metrics::record_peer_operation_latency(&self.config.node_id, "get_merkle_root", start.elapsed());
552
553 let root = match result {
554 Some(hex_str) => {
555 let bytes = hex::decode(&hex_str).map_err(|e| {
556 ReplicationError::PeerConnection {
557 peer_id: self.config.node_id.clone(),
558 message: format!("Invalid Merkle root hex: {}", e),
559 }
560 })?;
561 let arr: [u8; 32] = bytes.try_into().map_err(|_| {
562 ReplicationError::PeerConnection {
563 peer_id: self.config.node_id.clone(),
564 message: "Merkle root is not 32 bytes".to_string(),
565 }
566 })?;
567 Some(arr)
568 }
569 None => None,
570 };
571
572 {
574 let mut cache = self.merkle_root_cache.write().await;
575 *cache = Some(CachedMerkleRoot {
576 root,
577 expires_at: Instant::now() + MERKLE_CACHE_TTL,
578 });
579 }
580
581 Ok(root)
582 }
583
584 pub async fn invalidate_merkle_cache(&self) {
586 let mut cache = self.merkle_root_cache.write().await;
587 *cache = None;
588 }
589
590 pub async fn get_merkle_children(&self, path: &str) -> Result<Vec<(String, [u8; 32])>> {
592 let start = Instant::now();
593 let conn = self.query_connection().await.ok_or_else(|| {
594 ReplicationError::PeerConnection {
595 peer_id: self.config.node_id.clone(),
596 message: "Not connected".to_string(),
597 }
598 })?;
599
600 let mut conn = conn;
601 let key = self.prefixed_key(&format!("merkle:children:{}", path));
602
603 let items: Vec<(String, f64)> = redis::cmd("ZRANGE")
605 .arg(&key)
606 .arg(0)
607 .arg(-1)
608 .arg("WITHSCORES")
609 .query_async(&mut conn)
610 .await
611 .map_err(|e| ReplicationError::PeerConnection {
612 peer_id: self.config.node_id.clone(),
613 message: format!("Failed to get Merkle children: {}", e),
614 })?;
615
616 let mut children = Vec::with_capacity(items.len());
617 for (child_name, _score) in items {
618 let child_path = if path.is_empty() {
620 child_name.clone()
621 } else {
622 format!("{}.{}", path, child_name)
623 };
624 let hash_key = self.prefixed_key(&format!("merkle:hash:{}", child_path));
625
626 let hex_hash: Option<String> = redis::cmd("GET")
627 .arg(&hash_key)
628 .query_async(&mut conn)
629 .await
630 .map_err(|e| ReplicationError::PeerConnection {
631 peer_id: self.config.node_id.clone(),
632 message: format!("Failed to get child hash: {}", e),
633 })?;
634
635 if let Some(hex_str) = hex_hash {
636 let bytes = hex::decode(&hex_str).map_err(|e| {
637 ReplicationError::PeerConnection {
638 peer_id: self.config.node_id.clone(),
639 message: format!("Invalid child hash hex: {}", e),
640 }
641 })?;
642 let arr: [u8; 32] = bytes.try_into().map_err(|_| {
643 ReplicationError::PeerConnection {
644 peer_id: self.config.node_id.clone(),
645 message: "Child hash is not 32 bytes".to_string(),
646 }
647 })?;
648 children.push((child_name, arr));
649 }
650 }
651
652 metrics::record_peer_operation_latency(&self.config.node_id, "get_merkle_children", start.elapsed());
653
654 Ok(children)
655 }
656
657 pub async fn get_item(&self, key: &str) -> Result<Option<Vec<u8>>> {
662 let start = Instant::now();
663 let conn = self.query_connection().await.ok_or_else(|| {
664 ReplicationError::PeerConnection {
665 peer_id: self.config.node_id.clone(),
666 message: "Not connected".to_string(),
667 }
668 })?;
669
670 let mut conn = conn;
671 let redis_key = self.prefixed_key(key);
673
674 let json_result: redis::RedisResult<Option<String>> = redis::cmd("JSON.GET")
676 .arg(&redis_key)
677 .arg("$.payload")
678 .query_async(&mut conn)
679 .await;
680
681 let data = match json_result {
682 Ok(Some(json_str)) => {
683 if let Ok(arr) = serde_json::from_str::<Vec<serde_json::Value>>(&json_str) {
686 arr.into_iter().next().map(|payload| {
687 serde_json::to_vec(&payload).unwrap_or_default()
688 })
689 } else {
690 Some(json_str.into_bytes())
692 }
693 }
694 Ok(None) | Err(_) => {
695 redis::cmd("GET")
697 .arg(&redis_key)
698 .query_async(&mut conn)
699 .await
700 .ok()
701 .flatten()
702 }
703 };
704
705 metrics::record_peer_operation_latency(&self.config.node_id, "get_item", start.elapsed());
706
707 Ok(data)
708 }
709}
710
711pub struct PeerManager {
713 peers: dashmap::DashMap<String, Arc<PeerConnection>>,
715 retry_config: RetryConfig,
717}
718
719impl PeerManager {
720 pub fn new(retry_config: RetryConfig) -> Self {
722 Self {
723 peers: dashmap::DashMap::new(),
724 retry_config,
725 }
726 }
727
728 pub fn add_peer(&self, config: PeerConfig) {
730 let node_id = config.node_id.clone();
731 let conn = Arc::new(PeerConnection::new(config));
732 self.peers.insert(node_id, conn);
733 }
734
735 pub fn get(&self, node_id: &str) -> Option<Arc<PeerConnection>> {
737 self.peers.get(node_id).map(|r| r.value().clone())
738 }
739
740 pub fn all(&self) -> Vec<Arc<PeerConnection>> {
742 self.peers.iter().map(|r| r.value().clone()).collect()
743 }
744
745 pub async fn connect_all(&self) -> Vec<Result<()>> {
747 let mut results = Vec::new();
748 for peer in self.all() {
749 let result = peer.connect(&self.retry_config).await;
750 results.push(result);
751 }
752 results
753 }
754
755 pub fn remove_peer(&self, node_id: &str) {
757 if let Some((_, peer)) = self.peers.remove(node_id) {
758 peer.shutdown();
759 }
760 }
761
762 pub fn shutdown_all(&self) {
764 for peer in self.peers.iter() {
765 peer.shutdown();
766 }
767 }
768
769 pub async fn connected_count(&self) -> usize {
771 let mut count = 0;
772 for peer in self.all() {
773 if peer.is_connected().await {
774 count += 1;
775 }
776 }
777 count
778 }
779}
780
781fn epoch_millis() -> u64 {
783 std::time::SystemTime::now()
784 .duration_since(std::time::UNIX_EPOCH)
785 .unwrap_or_default()
786 .as_millis() as u64
787}
788
789#[cfg(test)]
790mod tests {
791 use super::*;
792
793 #[test]
794 fn test_peer_connection_new() {
795 let config = PeerConfig::for_testing("test-peer", "redis://localhost:6379");
796
797 let conn = PeerConnection::new(config);
798 assert_eq!(conn.node_id(), "test-peer");
799 assert_eq!(conn.cdc_stream_key(), "cdc");
800 }
801
802 #[test]
803 fn test_peer_state_variants() {
804 assert_eq!(PeerState::Disconnected, PeerState::Disconnected);
805 assert_ne!(PeerState::Connected, PeerState::Disconnected);
806 assert_ne!(PeerState::Connecting, PeerState::Backoff);
807 assert_eq!(PeerState::Disabled, PeerState::Disabled);
808 }
809
810 #[test]
811 fn test_peer_circuit_state_variants() {
812 assert_eq!(PeerCircuitState::Closed, PeerCircuitState::Closed);
813 assert_ne!(PeerCircuitState::Open, PeerCircuitState::Closed);
814 }
815
816 #[tokio::test]
817 async fn test_peer_initial_state() {
818 let config = PeerConfig::for_testing("test-peer", "redis://localhost:6379");
819 let conn = PeerConnection::new(config);
820
821 assert_eq!(conn.state().await, PeerState::Disconnected);
822 assert!(!conn.is_connected().await);
823 assert_eq!(conn.failure_count(), 0);
824 }
825
826 #[tokio::test]
827 async fn test_peer_connection_not_connected() {
828 let config = PeerConfig::for_testing("test-peer", "redis://localhost:6379");
829 let conn = PeerConnection::new(config);
830
831 assert!(conn.connection().await.is_none());
833 }
834
835 #[tokio::test]
836 async fn test_peer_circuit_breaker() {
837 let config = PeerConfig {
838 node_id: "test-peer".to_string(),
839 redis_url: "redis://localhost:6379".to_string(),
840 priority: 0,
841 circuit_failure_threshold: 3,
842 circuit_reset_timeout_sec: 1,
843 redis_prefix: None,
844 };
845
846 let conn = PeerConnection::new(config);
847
848 assert_eq!(conn.circuit_state().await, PeerCircuitState::Closed);
850 assert!(!conn.is_circuit_open().await);
851
852 conn.record_failure().await;
854 assert_eq!(conn.circuit_state().await, PeerCircuitState::Closed);
855 conn.record_failure().await;
856 assert_eq!(conn.circuit_state().await, PeerCircuitState::Closed);
857 conn.record_failure().await;
858
859 assert_eq!(conn.circuit_state().await, PeerCircuitState::Open);
861 assert!(conn.is_circuit_open().await);
862
863 tokio::time::sleep(Duration::from_secs(2)).await;
865
866 assert_eq!(conn.circuit_state().await, PeerCircuitState::Closed);
868
869 conn.record_success().await;
871 assert_eq!(conn.failure_count(), 0);
872 assert!(!conn.is_circuit_open().await);
873 }
874
875 #[tokio::test]
876 async fn test_peer_record_success_updates_last_success() {
877 let config = PeerConfig::for_testing("test-peer", "redis://localhost:6379");
878 let conn = PeerConnection::new(config);
879
880 assert_eq!(conn.last_success.load(Ordering::Acquire), 0);
882
883 conn.record_failure().await;
885 conn.record_failure().await;
886 assert_eq!(conn.failure_count(), 2);
887
888 conn.record_success().await;
890 assert_eq!(conn.failure_count(), 0);
891 assert!(conn.last_success.load(Ordering::Acquire) > 0);
892 }
893
894 #[tokio::test]
895 async fn test_peer_millis_since_success_no_success() {
896 let config = PeerConfig::for_testing("test-peer", "redis://localhost:6379");
897 let conn = PeerConnection::new(config);
898
899 assert_eq!(conn.millis_since_success(), u64::MAX);
901 }
902
903 #[tokio::test]
904 async fn test_peer_millis_since_success_after_success() {
905 let config = PeerConfig::for_testing("test-peer", "redis://localhost:6379");
906 let conn = PeerConnection::new(config);
907
908 conn.record_success().await;
909 tokio::time::sleep(Duration::from_millis(100)).await;
910
911 let millis = conn.millis_since_success();
912 assert!(millis >= 100);
914 assert!(millis < 1000);
916 }
917
918 #[tokio::test]
919 async fn test_peer_shutdown_flag() {
920 let config = PeerConfig::for_testing("test-peer", "redis://localhost:6379");
921 let conn = PeerConnection::new(config);
922
923 assert!(!conn.shutdown.load(Ordering::Acquire));
924 conn.shutdown();
925 assert!(conn.shutdown.load(Ordering::Acquire));
926 }
927
928 #[tokio::test]
929 async fn test_peer_mark_disconnected() {
930 let config = PeerConfig::for_testing("test-peer", "redis://localhost:6379");
931 let conn = PeerConnection::new(config);
932
933 assert_eq!(conn.state().await, PeerState::Disconnected);
935
936 conn.mark_disconnected().await;
938 assert_eq!(conn.state().await, PeerState::Disconnected);
939 assert!(conn.connection().await.is_none());
940 }
941
942 #[tokio::test]
943 async fn test_peer_invalidate_merkle_cache() {
944 let config = PeerConfig::for_testing("test-peer", "redis://localhost:6379");
945 let conn = PeerConnection::new(config);
946
947 assert!(conn.merkle_root_cache.read().await.is_none());
949
950 {
952 let mut cache = conn.merkle_root_cache.write().await;
953 *cache = Some(CachedMerkleRoot {
954 root: Some([0u8; 32]),
955 expires_at: Instant::now() + Duration::from_secs(60),
956 });
957 }
958
959 assert!(conn.merkle_root_cache.read().await.is_some());
961
962 conn.invalidate_merkle_cache().await;
964
965 assert!(conn.merkle_root_cache.read().await.is_none());
967 }
968
969 #[test]
970 fn test_peer_manager_add_peer() {
971 let manager = PeerManager::new(RetryConfig::testing());
972 manager.add_peer(PeerConfig::for_testing("peer-1", "redis://peer1:6379"));
973 manager.add_peer(PeerConfig::for_testing("peer-2", "redis://peer2:6379"));
974
975 let peers = manager.all();
976 assert_eq!(peers.len(), 2);
977 }
978
979 #[test]
980 fn test_peer_manager_get() {
981 let manager = PeerManager::new(RetryConfig::testing());
982 manager.add_peer(PeerConfig::for_testing("peer-1", "redis://peer1:6379"));
983
984 assert!(manager.get("peer-1").is_some());
985 assert!(manager.get("nonexistent").is_none());
986 }
987
988 #[test]
989 fn test_peer_manager_remove_peer() {
990 let manager = PeerManager::new(RetryConfig::testing());
991 manager.add_peer(PeerConfig::for_testing("peer-1", "redis://peer1:6379"));
992
993 assert!(manager.get("peer-1").is_some());
994
995 manager.remove_peer("peer-1");
996
997 assert!(manager.get("peer-1").is_none());
998 }
999
1000 #[test]
1001 fn test_peer_manager_remove_nonexistent() {
1002 let manager = PeerManager::new(RetryConfig::testing());
1003 manager.remove_peer("nonexistent");
1005 }
1006
1007 #[test]
1008 fn test_peer_manager_shutdown_all() {
1009 let manager = PeerManager::new(RetryConfig::testing());
1010 manager.add_peer(PeerConfig::for_testing("peer-1", "redis://peer1:6379"));
1011 manager.add_peer(PeerConfig::for_testing("peer-2", "redis://peer2:6379"));
1012
1013 manager.shutdown_all();
1014
1015 for peer in manager.all() {
1017 assert!(peer.shutdown.load(Ordering::Acquire));
1018 }
1019 }
1020
1021 #[tokio::test]
1022 async fn test_peer_manager_connected_count_none() {
1023 let manager = PeerManager::new(RetryConfig::testing());
1024 manager.add_peer(PeerConfig::for_testing("peer-1", "redis://peer1:6379"));
1025 manager.add_peer(PeerConfig::for_testing("peer-2", "redis://peer2:6379"));
1026
1027 assert_eq!(manager.connected_count().await, 0);
1029 }
1030
1031 #[test]
1032 fn test_epoch_millis() {
1033 let millis = epoch_millis();
1034 assert!(millis > 1577836800000); assert!(millis < 4102444800000); }
1038
1039 #[test]
1040 fn test_merkle_cache_ttl() {
1041 assert_eq!(MERKLE_CACHE_TTL, Duration::from_secs(5));
1043 }
1044
1045 #[tokio::test]
1046 async fn test_peer_ping_not_connected() {
1047 let config = PeerConfig::for_testing("test-peer", "redis://localhost:6379");
1048 let conn = PeerConnection::new(config);
1049
1050 let result = conn.ping().await;
1052 assert!(result.is_err());
1053
1054 if let Err(ReplicationError::PeerConnection { peer_id, message }) = result {
1055 assert_eq!(peer_id, "test-peer");
1056 assert!(message.contains("Not connected"));
1057 } else {
1058 panic!("Expected PeerConnection error");
1059 }
1060 }
1061
1062 #[tokio::test]
1063 async fn test_peer_get_merkle_root_not_connected() {
1064 let config = PeerConfig::for_testing("test-peer", "redis://localhost:6379");
1065 let conn = PeerConnection::new(config);
1066
1067 let result = conn.get_merkle_root().await;
1068 assert!(result.is_err());
1069 }
1070
1071 #[tokio::test]
1072 async fn test_peer_get_merkle_children_not_connected() {
1073 let config = PeerConfig::for_testing("test-peer", "redis://localhost:6379");
1074 let conn = PeerConnection::new(config);
1075
1076 let result = conn.get_merkle_children("some/path").await;
1077 assert!(result.is_err());
1078 }
1079
1080 #[tokio::test]
1081 async fn test_peer_get_item_not_connected() {
1082 let config = PeerConfig::for_testing("test-peer", "redis://localhost:6379");
1083 let conn = PeerConnection::new(config);
1084
1085 let result = conn.get_item("some-key").await;
1086 assert!(result.is_err());
1087 }
1088
1089 #[tokio::test]
1090 async fn test_peer_ensure_connected_fails() {
1091 let config = PeerConfig::for_testing("test-peer", "redis://localhost:1"); let conn = PeerConnection::new(config);
1096
1097 assert!(!conn.is_connected().await);
1099 }
1100
1101 #[tokio::test]
1102 async fn test_peer_circuit_opens_on_threshold() {
1103 let config = PeerConfig {
1104 node_id: "test-peer".to_string(),
1105 redis_url: "redis://localhost:6379".to_string(),
1106 priority: 0,
1107 circuit_failure_threshold: 2, circuit_reset_timeout_sec: 30,
1109 redis_prefix: None,
1110 };
1111
1112 let conn = PeerConnection::new(config);
1113
1114 conn.record_failure().await;
1116 assert_eq!(conn.circuit_state().await, PeerCircuitState::Closed);
1117 assert_eq!(conn.failure_count(), 1);
1118
1119 conn.record_failure().await;
1121 assert_eq!(conn.circuit_state().await, PeerCircuitState::Open);
1122 assert_eq!(conn.failure_count(), 2);
1123 }
1124}