replication_engine/
peer.rs

1// Copyright (c) 2025-2026 Adrian Robinson. Licensed under the AGPL-3.0.
2// See LICENSE file in the project root for full license text.
3
4//! Peer connection management.
5//!
6//! Manages Redis connections to peer nodes with automatic reconnection,
7//! circuit breaker protection, and Merkle root caching.
8//!
9//! # Connection Lifecycle
10//!
11//! ```text
12//! Disconnected → Connecting → Connected
13//!      ↑             ↓             ↓
14//!      └─── Backoff ←┴─────────────┘
15//! ```
16//!
17//! Connections are **lazy**: they're only established when first needed
18//! (via [`PeerConnection::ensure_connected()`]). If a connection fails,
19//! the peer enters [`PeerState::Backoff`] state with exponential backoff.
20//!
21//! # Circuit Breaker
22//!
23//! Each peer has a circuit breaker to prevent cascade failures:
24//!
25//! - **Closed**: Normal operation, requests pass through
26//! - **Open**: Too many consecutive failures, requests rejected immediately
27//!
28//! The circuit opens after `circuit_failure_threshold` consecutive failures
29//! and resets after `circuit_reset_timeout_sec` seconds.
30//!
31//! # Merkle Root Caching
32//!
33//! To reduce load during cold path repair, Merkle roots are cached for
34//! 5 seconds (see `MERKLE_ROOT_CACHE_TTL`). This prevents hammering peers
35//! when checking consistency.
36//!
37//! # Example
38//!
39//! ```rust,no_run
40//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
41//! use replication_engine::peer::PeerConnection;
42//! use replication_engine::config::PeerConfig;
43//!
44//! let config = PeerConfig::for_testing("peer-1", "redis://localhost:6379");
45//! let peer = PeerConnection::new(config);
46//!
47//! // Connection is lazy - this triggers the actual connect
48//! peer.ensure_connected().await?;
49//!
50//! // Use the connection
51//! let conn = peer.connection().await;
52//! # Ok(())
53//! # }
54//! ```
55
56use crate::config::PeerConfig;
57use crate::error::{ReplicationError, Result};
58use crate::metrics;
59use crate::resilience::RetryConfig;
60use redis::aio::ConnectionManager;
61use redis::Client;
62use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
63use std::sync::Arc;
64use std::time::{Duration, Instant};
65use tokio::sync::RwLock;
66use tokio::time::timeout;
67use tracing::{error, info, warn};
68
69/// State of a peer connection.
70///
71/// See module docs for the state transition diagram.
72#[derive(Debug, Clone, Copy, PartialEq, Eq)]
73pub enum PeerState {
74    /// Not yet connected (initial state).
75    Disconnected,
76    /// Connection attempt in progress.
77    Connecting,
78    /// Connected and healthy.
79    Connected,
80    /// Connection failed, waiting before retry.
81    Backoff,
82    /// Peer is disabled in configuration.
83    Disabled,
84}
85
86/// Circuit breaker state for peer connections.
87///
88/// Prevents cascade failures by rejecting requests when a peer
89/// is experiencing repeated failures.
90#[derive(Debug, Clone, Copy, PartialEq, Eq)]
91pub enum PeerCircuitState {
92    /// Normal operation, requests pass through.
93    Closed,
94    /// Too many failures, requests rejected immediately.
95    Open,
96}
97
98/// Cached Merkle root with TTL.
99///
100/// Used to reduce load on peers during cold path repair cycles.
101/// Roots are cached for [`MERKLE_ROOT_CACHE_TTL`] to avoid
102/// repeated queries for the same data.
103struct CachedMerkleRoot {
104    /// The cached root hash (None if peer has no data).
105    root: Option<[u8; 32]>,
106    /// When this cache entry expires.
107    expires_at: Instant,
108}
109
110/// Default TTL for cached Merkle roots (5 seconds).
111/// Short enough to catch updates, long enough to avoid hammering.
112const MERKLE_CACHE_TTL: Duration = Duration::from_secs(5);
113
114/// A managed connection to a peer's Redis.
115///
116/// Uses `redis::aio::ConnectionManager` internally, which provides:
117/// - Automatic reconnection on connection loss
118/// - Multiplexed connection (single TCP socket, multiple in-flight requests)
119/// - Connection pooling semantics (cloning is cheap, shares underlying connection)
120pub struct PeerConnection {
121    /// Peer configuration
122    pub config: PeerConfig,
123    /// Redis connection (None if disconnected).
124    /// ConnectionManager is Clone and multiplexed, so sharing is cheap.
125    conn: RwLock<Option<ConnectionManager>>,
126    /// Current state
127    state: RwLock<PeerState>,
128    /// Last successful operation timestamp
129    last_success: AtomicU64,
130    /// Consecutive failure count
131    failure_count: AtomicU64,
132    /// Whether shutdown was requested
133    shutdown: AtomicBool,
134    /// When the circuit opened (for reset timeout)
135    circuit_opened_at: RwLock<Option<Instant>>,
136    /// Cached Merkle root (avoids repeated queries during repair)
137    merkle_root_cache: RwLock<Option<CachedMerkleRoot>>,
138}
139
140impl PeerConnection {
141    /// Create a new peer connection (not yet connected).
142    pub fn new(config: PeerConfig) -> Self {
143        // All peers in the config are enabled (daemon filters disabled peers)
144        let initial_state = PeerState::Disconnected;
145
146        Self {
147            config,
148            conn: RwLock::new(None),
149            state: RwLock::new(initial_state),
150            last_success: AtomicU64::new(0),
151            failure_count: AtomicU64::new(0),
152            shutdown: AtomicBool::new(false),
153            circuit_opened_at: RwLock::new(None),
154            merkle_root_cache: RwLock::new(None),
155        }
156    }
157
158    /// Get the peer's node ID.
159    pub fn node_id(&self) -> &str {
160        &self.config.node_id
161    }
162
163    /// Get the CDC stream key for this peer.
164    pub fn cdc_stream_key(&self) -> String {
165        self.config.cdc_stream_key()
166    }
167
168    /// Get current connection state.
169    pub async fn state(&self) -> PeerState {
170        *self.state.read().await
171    }
172
173    /// Check if connected.
174    pub async fn is_connected(&self) -> bool {
175        self.state().await == PeerState::Connected
176    }
177
178    // =========================================================================
179    // Circuit Breaker
180    // =========================================================================
181
182    /// Get the current circuit breaker state.
183    pub async fn circuit_state(&self) -> PeerCircuitState {
184        let failures = self.failure_count.load(Ordering::Relaxed);
185        let threshold = self.config.circuit_failure_threshold as u64;
186
187        if failures >= threshold {
188            // Check if we should try again (half-open)
189            if let Some(opened_at) = *self.circuit_opened_at.read().await {
190                let reset_timeout = Duration::from_secs(self.config.circuit_reset_timeout_sec);
191                if opened_at.elapsed() >= reset_timeout {
192                    // Time to try again
193                    return PeerCircuitState::Closed;
194                }
195            }
196            PeerCircuitState::Open
197        } else {
198            PeerCircuitState::Closed
199        }
200    }
201
202    /// Check if the circuit is open (should reject requests).
203    pub async fn is_circuit_open(&self) -> bool {
204        self.circuit_state().await == PeerCircuitState::Open
205    }
206
207    /// Record a successful operation (resets failure count).
208    pub async fn record_success(&self) {
209        self.failure_count.store(0, Ordering::Relaxed);
210        self.last_success.store(
211            std::time::SystemTime::now()
212                .duration_since(std::time::UNIX_EPOCH)
213                .unwrap_or_default()
214                .as_millis() as u64,
215            Ordering::Relaxed,
216        );
217        // Close the circuit
218        *self.circuit_opened_at.write().await = None;
219    }
220
221    /// Record a failed operation (increments failure count, may open circuit).
222    pub async fn record_failure(&self) {
223        let failures = self.failure_count.fetch_add(1, Ordering::Relaxed) + 1;
224        let threshold = self.config.circuit_failure_threshold as u64;
225
226        if failures >= threshold {
227            // Open the circuit
228            let mut opened_at = self.circuit_opened_at.write().await;
229            if opened_at.is_none() {
230                *opened_at = Some(Instant::now());
231                warn!(
232                    peer_id = %self.config.node_id,
233                    failures,
234                    threshold,
235                    reset_timeout_sec = self.config.circuit_reset_timeout_sec,
236                    "Circuit breaker opened for peer"
237                );
238                metrics::record_peer_circuit_state(&self.config.node_id, "open");
239            }
240        }
241    }
242
243    /// Connect to the peer's Redis with retry logic.
244    pub async fn connect(&self, retry_config: &RetryConfig) -> Result<()> {
245        *self.state.write().await = PeerState::Connecting;
246        info!(peer_id = %self.config.node_id, url = %self.config.redis_url, "Connecting to peer");
247
248        let client = Client::open(self.config.redis_url.as_str()).map_err(|e| {
249            ReplicationError::PeerConnection {
250                peer_id: self.config.node_id.clone(),
251                message: format!("Invalid Redis URL: {}", e),
252            }
253        })?;
254
255        let mut attempt = 0;
256        let mut delay = retry_config.initial_delay;
257
258        loop {
259            if self.shutdown.load(Ordering::Acquire) {
260                return Err(ReplicationError::Shutdown);
261            }
262
263            attempt += 1;
264
265            // Wrap connection attempt in a timeout to avoid hanging on unreachable hosts
266            let conn_result = timeout(
267                retry_config.connection_timeout,
268                client.get_connection_manager(),
269            )
270            .await;
271
272            match conn_result {
273                Ok(Ok(conn)) => {
274                    *self.conn.write().await = Some(conn);
275                    *self.state.write().await = PeerState::Connected;
276                    self.failure_count.store(0, Ordering::Release);
277                    self.last_success
278                        .store(epoch_millis(), Ordering::Release);
279
280                    metrics::record_peer_connection(&self.config.node_id, true);
281                    metrics::record_peer_state(&self.config.node_id, "connected");
282
283                    if attempt > 1 {
284                        info!(
285                            peer_id = %self.config.node_id,
286                            attempt,
287                            "Connected to peer after retry"
288                        );
289                    } else {
290                        info!(peer_id = %self.config.node_id, "Connected to peer");
291                    }
292                    return Ok(());
293                }
294                Ok(Err(e)) => {
295                    // Redis connection error
296                    self.failure_count.fetch_add(1, Ordering::AcqRel);
297                    let err_msg = format!("{}", e);
298
299                    if attempt >= retry_config.max_attempts {
300                        *self.state.write().await = PeerState::Backoff;
301                        metrics::record_peer_connection(&self.config.node_id, false);
302                        metrics::record_peer_state(&self.config.node_id, "backoff");
303                        error!(
304                            peer_id = %self.config.node_id,
305                            attempt,
306                            error = %e,
307                            "Failed to connect after max retries"
308                        );
309                        return Err(ReplicationError::PeerConnection {
310                            peer_id: self.config.node_id.clone(),
311                            message: format!("Connection failed after {} attempts: {}", attempt, err_msg),
312                        });
313                    }
314
315                    warn!(
316                        peer_id = %self.config.node_id,
317                        attempt,
318                        delay_ms = delay.as_millis(),
319                        error = %e,
320                        "Connection attempt failed, retrying"
321                    );
322
323                    tokio::time::sleep(delay).await;
324                    delay = std::cmp::min(
325                        Duration::from_secs_f64(delay.as_secs_f64() * retry_config.backoff_factor),
326                        retry_config.max_delay,
327                    );
328                }
329                Err(_) => {
330                    // Timeout elapsed
331                    self.failure_count.fetch_add(1, Ordering::AcqRel);
332
333                    if attempt >= retry_config.max_attempts {
334                        *self.state.write().await = PeerState::Backoff;
335                        error!(
336                            peer_id = %self.config.node_id,
337                            attempt,
338                            timeout_ms = retry_config.connection_timeout.as_millis(),
339                            "Connection timed out after max retries"
340                        );
341                        return Err(ReplicationError::PeerConnection {
342                            peer_id: self.config.node_id.clone(),
343                            message: format!(
344                                "Connection timed out after {} attempts ({}ms timeout)",
345                                attempt,
346                                retry_config.connection_timeout.as_millis()
347                            ),
348                        });
349                    }
350
351                    warn!(
352                        peer_id = %self.config.node_id,
353                        attempt,
354                        delay_ms = delay.as_millis(),
355                        timeout_ms = retry_config.connection_timeout.as_millis(),
356                        "Connection attempt timed out, retrying"
357                    );
358
359                    tokio::time::sleep(delay).await;
360                    delay = std::cmp::min(
361                        Duration::from_secs_f64(delay.as_secs_f64() * retry_config.backoff_factor),
362                        retry_config.max_delay,
363                    );
364                }
365            }
366        }
367    }
368
369    /// Get a reference to the connection for operations.
370    ///
371    /// Returns None if not connected.
372    pub async fn connection(&self) -> Option<ConnectionManager> {
373        self.conn.read().await.clone()
374    }
375
376    /// Ensure the peer is connected, connecting lazily if needed.
377    ///
378    /// This provides a convenient way to get a connection without
379    /// manually calling `connect()` first. Uses default retry config.
380    ///
381    /// # Returns
382    /// The connection manager, or an error if connection failed.
383    pub async fn ensure_connected(&self) -> Result<ConnectionManager> {
384        // Fast path: already connected
385        if let Some(conn) = self.connection().await {
386            return Ok(conn);
387        }
388
389        // Need to connect
390        self.connect(&RetryConfig::default()).await?;
391        
392        self.connection().await.ok_or_else(|| {
393            ReplicationError::PeerConnection {
394                peer_id: self.config.node_id.clone(),
395                message: "Connection lost immediately after connect".to_string(),
396            }
397        })
398    }
399
400    /// Get consecutive failure count.
401    pub fn failure_count(&self) -> u64 {
402        self.failure_count.load(Ordering::Acquire)
403    }
404
405    /// Get milliseconds since last success.
406    pub fn millis_since_success(&self) -> u64 {
407        let last = self.last_success.load(Ordering::Acquire);
408        if last == 0 {
409            return u64::MAX;
410        }
411        epoch_millis().saturating_sub(last)
412    }
413
414    /// Mark connection as failed (triggers reconnect).
415    pub async fn mark_disconnected(&self) {
416        *self.conn.write().await = None;
417        *self.state.write().await = PeerState::Disconnected;
418        metrics::record_peer_state(&self.config.node_id, "disconnected");
419        warn!(peer_id = %self.config.node_id, "Connection marked as disconnected");
420    }
421
422    /// Signal shutdown.
423    pub fn shutdown(&self) {
424        self.shutdown.store(true, Ordering::Release);
425    }
426
427    // =========================================================================
428    // Health Check
429    // =========================================================================
430
431    /// Ping the peer's Redis to check connection health.
432    ///
433    /// Returns the round-trip latency on success.
434    /// Updates `last_success` timestamp on success.
435    pub async fn ping(&self) -> Result<Duration> {
436        let conn = self.connection().await.ok_or_else(|| {
437            ReplicationError::PeerConnection {
438                peer_id: self.config.node_id.clone(),
439                message: "Not connected".to_string(),
440            }
441        })?;
442
443        let mut conn = conn;
444        let start = std::time::Instant::now();
445
446        let result: String = redis::cmd("PING")
447            .query_async(&mut conn)
448            .await
449            .map_err(|e| ReplicationError::PeerConnection {
450                peer_id: self.config.node_id.clone(),
451                message: format!("PING failed: {}", e),
452            })?;
453
454        let latency = start.elapsed();
455
456        if result == "PONG" {
457            self.record_success().await;
458            metrics::record_peer_ping(&self.config.node_id, true);
459            metrics::record_peer_ping_latency(&self.config.node_id, latency);
460            Ok(latency)
461        } else {
462            self.record_failure().await;
463            metrics::record_peer_ping(&self.config.node_id, false);
464            Err(ReplicationError::PeerConnection {
465                peer_id: self.config.node_id.clone(),
466                message: format!("Unexpected PING response: {}", result),
467            })
468        }
469    }
470
471    // =========================================================================
472    // Merkle Tree Queries (for cold path repair)
473    // =========================================================================
474
475    /// Get the peer's Merkle root hash (cached with TTL).
476    /// 
477    /// Uses a short-lived cache to avoid hammering the peer during repair cycles.
478    /// The cache is automatically invalidated after `MERKLE_CACHE_TTL`.
479    pub async fn get_merkle_root(&self) -> Result<Option<[u8; 32]>> {
480        // Check cache first
481        {
482            let cache = self.merkle_root_cache.read().await;
483            if let Some(ref cached) = *cache {
484                if Instant::now() < cached.expires_at {
485                    return Ok(cached.root);
486                }
487            }
488        }
489
490        // Cache miss or expired - fetch from peer
491        let start = Instant::now();
492        let conn = self.connection().await.ok_or_else(|| {
493            ReplicationError::PeerConnection {
494                peer_id: self.config.node_id.clone(),
495                message: "Not connected".to_string(),
496            }
497        })?;
498
499        let mut conn = conn;
500        let result: Option<String> = redis::cmd("GET")
501            .arg("merkle:hash:")
502            .query_async(&mut conn)
503            .await
504            .map_err(|e| ReplicationError::PeerConnection {
505                peer_id: self.config.node_id.clone(),
506                message: format!("Failed to get Merkle root: {}", e),
507            })?;
508        
509        metrics::record_peer_operation_latency(&self.config.node_id, "get_merkle_root", start.elapsed());
510
511        let root = match result {
512            Some(hex_str) => {
513                let bytes = hex::decode(&hex_str).map_err(|e| {
514                    ReplicationError::PeerConnection {
515                        peer_id: self.config.node_id.clone(),
516                        message: format!("Invalid Merkle root hex: {}", e),
517                    }
518                })?;
519                let arr: [u8; 32] = bytes.try_into().map_err(|_| {
520                    ReplicationError::PeerConnection {
521                        peer_id: self.config.node_id.clone(),
522                        message: "Merkle root is not 32 bytes".to_string(),
523                    }
524                })?;
525                Some(arr)
526            }
527            None => None,
528        };
529
530        // Update cache
531        {
532            let mut cache = self.merkle_root_cache.write().await;
533            *cache = Some(CachedMerkleRoot {
534                root,
535                expires_at: Instant::now() + MERKLE_CACHE_TTL,
536            });
537        }
538
539        Ok(root)
540    }
541
542    /// Invalidate the Merkle root cache (call after local writes).
543    pub async fn invalidate_merkle_cache(&self) {
544        let mut cache = self.merkle_root_cache.write().await;
545        *cache = None;
546    }
547
548    /// Get children of a Merkle path (sorted by score/position).
549    pub async fn get_merkle_children(&self, path: &str) -> Result<Vec<(String, [u8; 32])>> {
550        let start = Instant::now();
551        let conn = self.connection().await.ok_or_else(|| {
552            ReplicationError::PeerConnection {
553                peer_id: self.config.node_id.clone(),
554                message: "Not connected".to_string(),
555            }
556        })?;
557
558        let mut conn = conn;
559        let key = format!("merkle:children:{}", path);
560
561        // ZRANGE returns items as (member, score) pairs with WITHSCORES
562        let items: Vec<(String, f64)> = redis::cmd("ZRANGE")
563            .arg(&key)
564            .arg(0)
565            .arg(-1)
566            .arg("WITHSCORES")
567            .query_async(&mut conn)
568            .await
569            .map_err(|e| ReplicationError::PeerConnection {
570                peer_id: self.config.node_id.clone(),
571                message: format!("Failed to get Merkle children: {}", e),
572            })?;
573
574        let mut children = Vec::with_capacity(items.len());
575        for (child_name, _score) in items {
576            // Get the hash for this child
577            let child_path = if path.is_empty() {
578                child_name.clone()
579            } else {
580                format!("{}/{}", path, child_name)
581            };
582            let hash_key = format!("merkle:hash:{}", child_path);
583
584            let hex_hash: Option<String> = redis::cmd("GET")
585                .arg(&hash_key)
586                .query_async(&mut conn)
587                .await
588                .map_err(|e| ReplicationError::PeerConnection {
589                    peer_id: self.config.node_id.clone(),
590                    message: format!("Failed to get child hash: {}", e),
591                })?;
592
593            if let Some(hex_str) = hex_hash {
594                let bytes = hex::decode(&hex_str).map_err(|e| {
595                    ReplicationError::PeerConnection {
596                        peer_id: self.config.node_id.clone(),
597                        message: format!("Invalid child hash hex: {}", e),
598                    }
599                })?;
600                let arr: [u8; 32] = bytes.try_into().map_err(|_| {
601                    ReplicationError::PeerConnection {
602                        peer_id: self.config.node_id.clone(),
603                        message: "Child hash is not 32 bytes".to_string(),
604                    }
605                })?;
606                children.push((child_name, arr));
607            }
608        }
609        
610        metrics::record_peer_operation_latency(&self.config.node_id, "get_merkle_children", start.elapsed());
611
612        Ok(children)
613    }
614
615    /// Get an item's data by key.
616    pub async fn get_item(&self, key: &str) -> Result<Option<Vec<u8>>> {
617        let start = Instant::now();
618        let conn = self.connection().await.ok_or_else(|| {
619            ReplicationError::PeerConnection {
620                peer_id: self.config.node_id.clone(),
621                message: "Not connected".to_string(),
622            }
623        })?;
624
625        let mut conn = conn;
626        let redis_key = format!("item:{}", key);
627
628        let data: Option<Vec<u8>> = redis::cmd("GET")
629            .arg(&redis_key)
630            .query_async(&mut conn)
631            .await
632            .map_err(|e| ReplicationError::PeerConnection {
633                peer_id: self.config.node_id.clone(),
634                message: format!("Failed to get item: {}", e),
635            })?;
636        
637        metrics::record_peer_operation_latency(&self.config.node_id, "get_item", start.elapsed());
638
639        Ok(data)
640    }
641}
642
643/// Manager for all peer connections.
644pub struct PeerManager {
645    /// All peer connections (keyed by node_id)
646    peers: dashmap::DashMap<String, Arc<PeerConnection>>,
647    /// Retry configuration
648    retry_config: RetryConfig,
649}
650
651impl PeerManager {
652    /// Create a new peer manager.
653    pub fn new(retry_config: RetryConfig) -> Self {
654        Self {
655            peers: dashmap::DashMap::new(),
656            retry_config,
657        }
658    }
659
660    /// Add a peer from configuration.
661    pub fn add_peer(&self, config: PeerConfig) {
662        let node_id = config.node_id.clone();
663        let conn = Arc::new(PeerConnection::new(config));
664        self.peers.insert(node_id, conn);
665    }
666
667    /// Get a peer connection by node ID.
668    pub fn get(&self, node_id: &str) -> Option<Arc<PeerConnection>> {
669        self.peers.get(node_id).map(|r| r.value().clone())
670    }
671
672    /// Get all peer connections.
673    pub fn all(&self) -> Vec<Arc<PeerConnection>> {
674        self.peers.iter().map(|r| r.value().clone()).collect()
675    }
676
677    /// Connect to all enabled peers.
678    pub async fn connect_all(&self) -> Vec<Result<()>> {
679        let mut results = Vec::new();
680        for peer in self.all() {
681            let result = peer.connect(&self.retry_config).await;
682            results.push(result);
683        }
684        results
685    }
686
687    /// Remove a peer.
688    pub fn remove_peer(&self, node_id: &str) {
689        if let Some((_, peer)) = self.peers.remove(node_id) {
690            peer.shutdown();
691        }
692    }
693
694    /// Shutdown all peers.
695    pub fn shutdown_all(&self) {
696        for peer in self.peers.iter() {
697            peer.shutdown();
698        }
699    }
700
701    /// Get count of connected peers.
702    pub async fn connected_count(&self) -> usize {
703        let mut count = 0;
704        for peer in self.all() {
705            if peer.is_connected().await {
706                count += 1;
707            }
708        }
709        count
710    }
711}
712
713/// Get current epoch milliseconds.
714fn epoch_millis() -> u64 {
715    std::time::SystemTime::now()
716        .duration_since(std::time::UNIX_EPOCH)
717        .unwrap_or_default()
718        .as_millis() as u64
719}
720
721#[cfg(test)]
722mod tests {
723    use super::*;
724
725    #[test]
726    fn test_peer_connection_new() {
727        let config = PeerConfig::for_testing("test-peer", "redis://localhost:6379");
728
729        let conn = PeerConnection::new(config);
730        assert_eq!(conn.node_id(), "test-peer");
731        assert_eq!(conn.cdc_stream_key(), "__local__:cdc");
732    }
733
734    #[test]
735    fn test_peer_state_variants() {
736        assert_eq!(PeerState::Disconnected, PeerState::Disconnected);
737        assert_ne!(PeerState::Connected, PeerState::Disconnected);
738        assert_ne!(PeerState::Connecting, PeerState::Backoff);
739        assert_eq!(PeerState::Disabled, PeerState::Disabled);
740    }
741
742    #[test]
743    fn test_peer_circuit_state_variants() {
744        assert_eq!(PeerCircuitState::Closed, PeerCircuitState::Closed);
745        assert_ne!(PeerCircuitState::Open, PeerCircuitState::Closed);
746    }
747
748    #[tokio::test]
749    async fn test_peer_initial_state() {
750        let config = PeerConfig::for_testing("test-peer", "redis://localhost:6379");
751        let conn = PeerConnection::new(config);
752        
753        assert_eq!(conn.state().await, PeerState::Disconnected);
754        assert!(!conn.is_connected().await);
755        assert_eq!(conn.failure_count(), 0);
756    }
757
758    #[tokio::test]
759    async fn test_peer_connection_not_connected() {
760        let config = PeerConfig::for_testing("test-peer", "redis://localhost:6379");
761        let conn = PeerConnection::new(config);
762        
763        // Should be None when not connected
764        assert!(conn.connection().await.is_none());
765    }
766
767    #[tokio::test]
768    async fn test_peer_circuit_breaker() {
769        let config = PeerConfig {
770            node_id: "test-peer".to_string(),
771            redis_url: "redis://localhost:6379".to_string(),
772            priority: 0,
773            circuit_failure_threshold: 3,
774            circuit_reset_timeout_sec: 1,
775            redis_prefix: None,
776        };
777
778        let conn = PeerConnection::new(config);
779
780        // Initially closed
781        assert_eq!(conn.circuit_state().await, PeerCircuitState::Closed);
782        assert!(!conn.is_circuit_open().await);
783
784        // Record failures up to threshold
785        conn.record_failure().await;
786        assert_eq!(conn.circuit_state().await, PeerCircuitState::Closed);
787        conn.record_failure().await;
788        assert_eq!(conn.circuit_state().await, PeerCircuitState::Closed);
789        conn.record_failure().await;
790        
791        // Now it should be open
792        assert_eq!(conn.circuit_state().await, PeerCircuitState::Open);
793        assert!(conn.is_circuit_open().await);
794
795        // Wait for reset timeout
796        tokio::time::sleep(Duration::from_secs(2)).await;
797        
798        // Should be closed again (half-open allows retry)
799        assert_eq!(conn.circuit_state().await, PeerCircuitState::Closed);
800
801        // Success resets everything
802        conn.record_success().await;
803        assert_eq!(conn.failure_count(), 0);
804        assert!(!conn.is_circuit_open().await);
805    }
806
807    #[tokio::test]
808    async fn test_peer_record_success_updates_last_success() {
809        let config = PeerConfig::for_testing("test-peer", "redis://localhost:6379");
810        let conn = PeerConnection::new(config);
811
812        // Initially no success recorded
813        assert_eq!(conn.last_success.load(Ordering::Acquire), 0);
814        
815        // Record some failures first
816        conn.record_failure().await;
817        conn.record_failure().await;
818        assert_eq!(conn.failure_count(), 2);
819
820        // Record success should reset failures and update timestamp
821        conn.record_success().await;
822        assert_eq!(conn.failure_count(), 0);
823        assert!(conn.last_success.load(Ordering::Acquire) > 0);
824    }
825
826    #[tokio::test]
827    async fn test_peer_millis_since_success_no_success() {
828        let config = PeerConfig::for_testing("test-peer", "redis://localhost:6379");
829        let conn = PeerConnection::new(config);
830
831        // No success recorded - should return MAX
832        assert_eq!(conn.millis_since_success(), u64::MAX);
833    }
834
835    #[tokio::test]
836    async fn test_peer_millis_since_success_after_success() {
837        let config = PeerConfig::for_testing("test-peer", "redis://localhost:6379");
838        let conn = PeerConnection::new(config);
839
840        conn.record_success().await;
841        tokio::time::sleep(Duration::from_millis(100)).await;
842        
843        let millis = conn.millis_since_success();
844        // Should be at least 100ms
845        assert!(millis >= 100);
846        // But not absurdly long (less than 1 second for this test)
847        assert!(millis < 1000);
848    }
849
850    #[tokio::test]
851    async fn test_peer_shutdown_flag() {
852        let config = PeerConfig::for_testing("test-peer", "redis://localhost:6379");
853        let conn = PeerConnection::new(config);
854
855        assert!(!conn.shutdown.load(Ordering::Acquire));
856        conn.shutdown();
857        assert!(conn.shutdown.load(Ordering::Acquire));
858    }
859
860    #[tokio::test]
861    async fn test_peer_mark_disconnected() {
862        let config = PeerConfig::for_testing("test-peer", "redis://localhost:6379");
863        let conn = PeerConnection::new(config);
864
865        // Start in Disconnected state
866        assert_eq!(conn.state().await, PeerState::Disconnected);
867        
868        // Mark disconnected (should stay disconnected)
869        conn.mark_disconnected().await;
870        assert_eq!(conn.state().await, PeerState::Disconnected);
871        assert!(conn.connection().await.is_none());
872    }
873
874    #[tokio::test]
875    async fn test_peer_invalidate_merkle_cache() {
876        let config = PeerConfig::for_testing("test-peer", "redis://localhost:6379");
877        let conn = PeerConnection::new(config);
878
879        // Initially no cache
880        assert!(conn.merkle_root_cache.read().await.is_none());
881
882        // Simulate a cached entry
883        {
884            let mut cache = conn.merkle_root_cache.write().await;
885            *cache = Some(CachedMerkleRoot {
886                root: Some([0u8; 32]),
887                expires_at: Instant::now() + Duration::from_secs(60),
888            });
889        }
890
891        // Verify cache is set
892        assert!(conn.merkle_root_cache.read().await.is_some());
893
894        // Invalidate
895        conn.invalidate_merkle_cache().await;
896
897        // Cache should be None
898        assert!(conn.merkle_root_cache.read().await.is_none());
899    }
900
901    #[test]
902    fn test_peer_manager_add_peer() {
903        let manager = PeerManager::new(RetryConfig::testing());
904        manager.add_peer(PeerConfig::for_testing("peer-1", "redis://peer1:6379"));
905        manager.add_peer(PeerConfig::for_testing("peer-2", "redis://peer2:6379"));
906
907        let peers = manager.all();
908        assert_eq!(peers.len(), 2);
909    }
910
911    #[test]
912    fn test_peer_manager_get() {
913        let manager = PeerManager::new(RetryConfig::testing());
914        manager.add_peer(PeerConfig::for_testing("peer-1", "redis://peer1:6379"));
915
916        assert!(manager.get("peer-1").is_some());
917        assert!(manager.get("nonexistent").is_none());
918    }
919
920    #[test]
921    fn test_peer_manager_remove_peer() {
922        let manager = PeerManager::new(RetryConfig::testing());
923        manager.add_peer(PeerConfig::for_testing("peer-1", "redis://peer1:6379"));
924        
925        assert!(manager.get("peer-1").is_some());
926        
927        manager.remove_peer("peer-1");
928        
929        assert!(manager.get("peer-1").is_none());
930    }
931
932    #[test]
933    fn test_peer_manager_remove_nonexistent() {
934        let manager = PeerManager::new(RetryConfig::testing());
935        // Should not panic
936        manager.remove_peer("nonexistent");
937    }
938
939    #[test]
940    fn test_peer_manager_shutdown_all() {
941        let manager = PeerManager::new(RetryConfig::testing());
942        manager.add_peer(PeerConfig::for_testing("peer-1", "redis://peer1:6379"));
943        manager.add_peer(PeerConfig::for_testing("peer-2", "redis://peer2:6379"));
944
945        manager.shutdown_all();
946
947        // Verify all peers have shutdown flag set
948        for peer in manager.all() {
949            assert!(peer.shutdown.load(Ordering::Acquire));
950        }
951    }
952
953    #[tokio::test]
954    async fn test_peer_manager_connected_count_none() {
955        let manager = PeerManager::new(RetryConfig::testing());
956        manager.add_peer(PeerConfig::for_testing("peer-1", "redis://peer1:6379"));
957        manager.add_peer(PeerConfig::for_testing("peer-2", "redis://peer2:6379"));
958
959        // No peers connected yet
960        assert_eq!(manager.connected_count().await, 0);
961    }
962
963    #[test]
964    fn test_epoch_millis() {
965        let millis = epoch_millis();
966        // Should be a reasonable epoch time (after 2020, before 2100)
967        assert!(millis > 1577836800000); // Jan 1, 2020
968        assert!(millis < 4102444800000); // Jan 1, 2100
969    }
970
971    #[test]
972    fn test_merkle_cache_ttl() {
973        // Verify constant is reasonable
974        assert_eq!(MERKLE_CACHE_TTL, Duration::from_secs(5));
975    }
976
977    #[tokio::test]
978    async fn test_peer_ping_not_connected() {
979        let config = PeerConfig::for_testing("test-peer", "redis://localhost:6379");
980        let conn = PeerConnection::new(config);
981
982        // Ping should fail when not connected
983        let result = conn.ping().await;
984        assert!(result.is_err());
985        
986        if let Err(ReplicationError::PeerConnection { peer_id, message }) = result {
987            assert_eq!(peer_id, "test-peer");
988            assert!(message.contains("Not connected"));
989        } else {
990            panic!("Expected PeerConnection error");
991        }
992    }
993
994    #[tokio::test]
995    async fn test_peer_get_merkle_root_not_connected() {
996        let config = PeerConfig::for_testing("test-peer", "redis://localhost:6379");
997        let conn = PeerConnection::new(config);
998
999        let result = conn.get_merkle_root().await;
1000        assert!(result.is_err());
1001    }
1002
1003    #[tokio::test]
1004    async fn test_peer_get_merkle_children_not_connected() {
1005        let config = PeerConfig::for_testing("test-peer", "redis://localhost:6379");
1006        let conn = PeerConnection::new(config);
1007
1008        let result = conn.get_merkle_children("some/path").await;
1009        assert!(result.is_err());
1010    }
1011
1012    #[tokio::test]
1013    async fn test_peer_get_item_not_connected() {
1014        let config = PeerConfig::for_testing("test-peer", "redis://localhost:6379");
1015        let conn = PeerConnection::new(config);
1016
1017        let result = conn.get_item("some-key").await;
1018        assert!(result.is_err());
1019    }
1020
1021    #[tokio::test]
1022    async fn test_peer_ensure_connected_fails() {
1023        // This test is slow (retries with timeouts) so we skip it in normal runs
1024        // Integration tests cover real connection scenarios
1025        // Just verify the method exists and handles errors
1026        let config = PeerConfig::for_testing("test-peer", "redis://localhost:1"); // Port 1 is typically closed
1027        let conn = PeerConnection::new(config);
1028
1029        // Should not be connected
1030        assert!(!conn.is_connected().await);
1031    }
1032
1033    #[tokio::test]
1034    async fn test_peer_circuit_opens_on_threshold() {
1035        let config = PeerConfig {
1036            node_id: "test-peer".to_string(),
1037            redis_url: "redis://localhost:6379".to_string(),
1038            priority: 0,
1039            circuit_failure_threshold: 2, // Low threshold for testing
1040            circuit_reset_timeout_sec: 30,
1041            redis_prefix: None,
1042        };
1043
1044        let conn = PeerConnection::new(config);
1045
1046        // One failure - still closed
1047        conn.record_failure().await;
1048        assert_eq!(conn.circuit_state().await, PeerCircuitState::Closed);
1049        assert_eq!(conn.failure_count(), 1);
1050
1051        // Second failure - opens
1052        conn.record_failure().await;
1053        assert_eq!(conn.circuit_state().await, PeerCircuitState::Open);
1054        assert_eq!(conn.failure_count(), 2);
1055    }
1056}