replication_engine/
peer.rs

1// Copyright (c) 2025-2026 Adrian Robinson. Licensed under the AGPL-3.0.
2// See LICENSE file in the project root for full license text.
3
4//! Peer connection management.
5//!
6//! Manages Redis connections to peer nodes with automatic reconnection,
7//! circuit breaker protection, and Merkle root caching.
8//!
9//! # Connection Lifecycle
10//!
11//! ```text
12//! Disconnected → Connecting → Connected
13//!      ↑             ↓             ↓
14//!      └─── Backoff ←┴─────────────┘
15//! ```
16//!
17//! Connections are **lazy**: they're only established when first needed
18//! (via [`PeerConnection::ensure_connected()`]). If a connection fails,
19//! the peer enters [`PeerState::Backoff`] state with exponential backoff.
20//!
21//! # Circuit Breaker
22//!
23//! Each peer has a circuit breaker to prevent cascade failures:
24//!
25//! - **Closed**: Normal operation, requests pass through
26//! - **Open**: Too many consecutive failures, requests rejected immediately
27//!
28//! The circuit opens after `circuit_failure_threshold` consecutive failures
29//! and resets after `circuit_reset_timeout_sec` seconds.
30//!
31//! # Merkle Root Caching
32//!
33//! To reduce load during cold path repair, Merkle roots are cached for
34//! 5 seconds (see `MERKLE_ROOT_CACHE_TTL`). This prevents hammering peers
35//! when checking consistency.
36//!
37//! # Example
38//!
39//! ```rust,no_run
40//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
41//! use replication_engine::peer::PeerConnection;
42//! use replication_engine::config::PeerConfig;
43//!
44//! let config = PeerConfig::for_testing("peer-1", "redis://localhost:6379");
45//! let peer = PeerConnection::new(config);
46//!
47//! // Connection is lazy - this triggers the actual connect
48//! peer.ensure_connected().await?;
49//!
50//! // Use the connection
51//! let conn = peer.connection().await;
52//! # Ok(())
53//! # }
54//! ```
55
56use crate::config::PeerConfig;
57use crate::error::{ReplicationError, Result};
58use crate::metrics;
59use crate::resilience::RetryConfig;
60use redis::aio::ConnectionManager;
61use redis::Client;
62use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
63use std::sync::Arc;
64use std::time::{Duration, Instant};
65use tokio::sync::RwLock;
66use tokio::time::timeout;
67use tracing::{error, info, warn};
68
69/// State of a peer connection.
70///
71/// See module docs for the state transition diagram.
72#[derive(Debug, Clone, Copy, PartialEq, Eq)]
73pub enum PeerState {
74    /// Not yet connected (initial state).
75    Disconnected,
76    /// Connection attempt in progress.
77    Connecting,
78    /// Connected and healthy.
79    Connected,
80    /// Connection failed, waiting before retry.
81    Backoff,
82    /// Peer is disabled in configuration.
83    Disabled,
84}
85
86/// Circuit breaker state for peer connections.
87///
88/// Prevents cascade failures by rejecting requests when a peer
89/// is experiencing repeated failures.
90#[derive(Debug, Clone, Copy, PartialEq, Eq)]
91pub enum PeerCircuitState {
92    /// Normal operation, requests pass through.
93    Closed,
94    /// Too many failures, requests rejected immediately.
95    Open,
96}
97
98/// Cached Merkle root with TTL.
99///
100/// Used to reduce load on peers during cold path repair cycles.
101/// Roots are cached for [`MERKLE_ROOT_CACHE_TTL`] to avoid
102/// repeated queries for the same data.
103struct CachedMerkleRoot {
104    /// The cached root hash (None if peer has no data).
105    root: Option<[u8; 32]>,
106    /// When this cache entry expires.
107    expires_at: Instant,
108}
109
110/// Default TTL for cached Merkle roots (5 seconds).
111/// Short enough to catch updates, long enough to avoid hammering.
112const MERKLE_CACHE_TTL: Duration = Duration::from_secs(5);
113
114/// A managed connection to a peer's Redis.
115///
116/// Uses `redis::aio::ConnectionManager` internally, which provides:
117/// - Automatic reconnection on connection loss
118/// - Multiplexed connection (single TCP socket, multiple in-flight requests)
119/// - Connection pooling semantics (cloning is cheap, shares underlying connection)
120pub struct PeerConnection {
121    /// Peer configuration
122    pub config: PeerConfig,
123    /// Redis connection for stream operations (XREAD can block).
124    /// ConnectionManager is Clone and multiplexed, so sharing is cheap.
125    conn: RwLock<Option<ConnectionManager>>,
126    /// Separate Redis connection for point queries (GET, HGET, etc).
127    /// This avoids cold path being blocked by hot path's XREAD BLOCK.
128    query_conn: RwLock<Option<ConnectionManager>>,
129    /// Current state
130    state: RwLock<PeerState>,
131    /// Last successful operation timestamp
132    last_success: AtomicU64,
133    /// Consecutive failure count
134    failure_count: AtomicU64,
135    /// Whether shutdown was requested
136    shutdown: AtomicBool,
137    /// When the circuit opened (for reset timeout)
138    circuit_opened_at: RwLock<Option<Instant>>,
139    /// Cached Merkle root (avoids repeated queries during repair)
140    merkle_root_cache: RwLock<Option<CachedMerkleRoot>>,
141}
142
143impl PeerConnection {
144    /// Create a new peer connection (not yet connected).
145    pub fn new(config: PeerConfig) -> Self {
146        // All peers in the config are enabled (daemon filters disabled peers)
147        let initial_state = PeerState::Disconnected;
148
149        Self {
150            config,
151            conn: RwLock::new(None),
152            query_conn: RwLock::new(None),
153            state: RwLock::new(initial_state),
154            last_success: AtomicU64::new(0),
155            failure_count: AtomicU64::new(0),
156            shutdown: AtomicBool::new(false),
157            circuit_opened_at: RwLock::new(None),
158            merkle_root_cache: RwLock::new(None),
159        }
160    }
161
162    /// Get the peer's node ID.
163    pub fn node_id(&self) -> &str {
164        &self.config.node_id
165    }
166
167    /// Get the CDC stream key for this peer.
168    pub fn cdc_stream_key(&self) -> String {
169        self.config.cdc_stream_key()
170    }
171
172    /// Get current connection state.
173    pub async fn state(&self) -> PeerState {
174        *self.state.read().await
175    }
176
177    /// Check if connected.
178    pub async fn is_connected(&self) -> bool {
179        self.state().await == PeerState::Connected
180    }
181
182    // =========================================================================
183    // Circuit Breaker
184    // =========================================================================
185
186    /// Get the current circuit breaker state.
187    pub async fn circuit_state(&self) -> PeerCircuitState {
188        let failures = self.failure_count.load(Ordering::Relaxed);
189        let threshold = self.config.circuit_failure_threshold as u64;
190
191        if failures >= threshold {
192            // Check if we should try again (half-open)
193            if let Some(opened_at) = *self.circuit_opened_at.read().await {
194                let reset_timeout = Duration::from_secs(self.config.circuit_reset_timeout_sec);
195                if opened_at.elapsed() >= reset_timeout {
196                    // Time to try again
197                    return PeerCircuitState::Closed;
198                }
199            }
200            PeerCircuitState::Open
201        } else {
202            PeerCircuitState::Closed
203        }
204    }
205
206    /// Check if the circuit is open (should reject requests).
207    pub async fn is_circuit_open(&self) -> bool {
208        self.circuit_state().await == PeerCircuitState::Open
209    }
210
211    /// Record a successful operation (resets failure count).
212    pub async fn record_success(&self) {
213        self.failure_count.store(0, Ordering::Relaxed);
214        self.last_success.store(
215            std::time::SystemTime::now()
216                .duration_since(std::time::UNIX_EPOCH)
217                .unwrap_or_default()
218                .as_millis() as u64,
219            Ordering::Relaxed,
220        );
221        // Close the circuit
222        *self.circuit_opened_at.write().await = None;
223    }
224
225    /// Record a failed operation (increments failure count, may open circuit).
226    pub async fn record_failure(&self) {
227        let failures = self.failure_count.fetch_add(1, Ordering::Relaxed) + 1;
228        let threshold = self.config.circuit_failure_threshold as u64;
229
230        if failures >= threshold {
231            // Open the circuit
232            let mut opened_at = self.circuit_opened_at.write().await;
233            if opened_at.is_none() {
234                *opened_at = Some(Instant::now());
235                warn!(
236                    peer_id = %self.config.node_id,
237                    failures,
238                    threshold,
239                    reset_timeout_sec = self.config.circuit_reset_timeout_sec,
240                    "Circuit breaker opened for peer"
241                );
242                metrics::record_peer_circuit_state(&self.config.node_id, "open");
243            }
244        }
245    }
246
247    /// Connect to the peer's Redis with retry logic.
248    pub async fn connect(&self, retry_config: &RetryConfig) -> Result<()> {
249        *self.state.write().await = PeerState::Connecting;
250        info!(peer_id = %self.config.node_id, url = %self.config.redis_url, "Connecting to peer");
251
252        let client = Client::open(self.config.redis_url.as_str()).map_err(|e| {
253            ReplicationError::PeerConnection {
254                peer_id: self.config.node_id.clone(),
255                message: format!("Invalid Redis URL: {}", e),
256            }
257        })?;
258
259        let mut attempt = 0;
260        let mut delay = retry_config.initial_delay;
261
262        loop {
263            if self.shutdown.load(Ordering::Acquire) {
264                return Err(ReplicationError::Shutdown);
265            }
266
267            attempt += 1;
268
269            // Wrap connection attempt in a timeout to avoid hanging on unreachable hosts
270            let conn_result = timeout(
271                retry_config.connection_timeout,
272                client.get_connection_manager(),
273            )
274            .await;
275
276            match conn_result {
277                Ok(Ok(conn)) => {
278                    // Create a second connection for point queries
279                    // This avoids cold path being blocked by hot path's XREAD BLOCK
280                    let query_conn = client.get_connection_manager().await.map_err(|e| {
281                        ReplicationError::PeerConnection {
282                            peer_id: self.config.node_id.clone(),
283                            message: format!("Failed to create query connection: {}", e),
284                        }
285                    })?;
286                    
287                    *self.conn.write().await = Some(conn);
288                    *self.query_conn.write().await = Some(query_conn);
289                    *self.state.write().await = PeerState::Connected;
290                    self.failure_count.store(0, Ordering::Release);
291                    self.last_success
292                        .store(epoch_millis(), Ordering::Release);
293
294                    metrics::record_peer_connection(&self.config.node_id, true);
295                    metrics::record_peer_state(&self.config.node_id, "connected");
296
297                    if attempt > 1 {
298                        info!(
299                            peer_id = %self.config.node_id,
300                            attempt,
301                            "Connected to peer after retry"
302                        );
303                    } else {
304                        info!(peer_id = %self.config.node_id, "Connected to peer");
305                    }
306                    return Ok(());
307                }
308                Ok(Err(e)) => {
309                    // Redis connection error
310                    self.failure_count.fetch_add(1, Ordering::AcqRel);
311                    let err_msg = format!("{}", e);
312
313                    if attempt >= retry_config.max_attempts {
314                        *self.state.write().await = PeerState::Backoff;
315                        metrics::record_peer_connection(&self.config.node_id, false);
316                        metrics::record_peer_state(&self.config.node_id, "backoff");
317                        error!(
318                            peer_id = %self.config.node_id,
319                            attempt,
320                            error = %e,
321                            "Failed to connect after max retries"
322                        );
323                        return Err(ReplicationError::PeerConnection {
324                            peer_id: self.config.node_id.clone(),
325                            message: format!("Connection failed after {} attempts: {}", attempt, err_msg),
326                        });
327                    }
328
329                    warn!(
330                        peer_id = %self.config.node_id,
331                        attempt,
332                        delay_ms = delay.as_millis(),
333                        error = %e,
334                        "Connection attempt failed, retrying"
335                    );
336
337                    tokio::time::sleep(delay).await;
338                    delay = std::cmp::min(
339                        Duration::from_secs_f64(delay.as_secs_f64() * retry_config.backoff_factor),
340                        retry_config.max_delay,
341                    );
342                }
343                Err(_) => {
344                    // Timeout elapsed
345                    self.failure_count.fetch_add(1, Ordering::AcqRel);
346
347                    if attempt >= retry_config.max_attempts {
348                        *self.state.write().await = PeerState::Backoff;
349                        error!(
350                            peer_id = %self.config.node_id,
351                            attempt,
352                            timeout_ms = retry_config.connection_timeout.as_millis(),
353                            "Connection timed out after max retries"
354                        );
355                        return Err(ReplicationError::PeerConnection {
356                            peer_id: self.config.node_id.clone(),
357                            message: format!(
358                                "Connection timed out after {} attempts ({}ms timeout)",
359                                attempt,
360                                retry_config.connection_timeout.as_millis()
361                            ),
362                        });
363                    }
364
365                    warn!(
366                        peer_id = %self.config.node_id,
367                        attempt,
368                        delay_ms = delay.as_millis(),
369                        timeout_ms = retry_config.connection_timeout.as_millis(),
370                        "Connection attempt timed out, retrying"
371                    );
372
373                    tokio::time::sleep(delay).await;
374                    delay = std::cmp::min(
375                        Duration::from_secs_f64(delay.as_secs_f64() * retry_config.backoff_factor),
376                        retry_config.max_delay,
377                    );
378                }
379            }
380        }
381    }
382
383    /// Get the stream connection (for XREAD and other potentially blocking ops).
384    ///
385    /// Returns None if not connected.
386    pub async fn connection(&self) -> Option<ConnectionManager> {
387        self.conn.read().await.clone()
388    }
389
390    /// Get the query connection (for GET, HGET, and other fast point queries).
391    /// 
392    /// This is a separate connection that won't be blocked by XREAD BLOCK
393    /// on the stream connection.
394    ///
395    /// Returns None if not connected.
396    pub async fn query_connection(&self) -> Option<ConnectionManager> {
397        self.query_conn.read().await.clone()
398    }
399
400    /// Ensure the peer is connected, connecting lazily if needed.
401    ///
402    /// This provides a convenient way to get a connection without
403    /// manually calling `connect()` first. Uses default retry config.
404    ///
405    /// # Returns
406    /// The connection manager, or an error if connection failed.
407    pub async fn ensure_connected(&self) -> Result<ConnectionManager> {
408        // Fast path: already connected
409        if let Some(conn) = self.connection().await {
410            return Ok(conn);
411        }
412
413        // Need to connect
414        self.connect(&RetryConfig::default()).await?;
415        
416        self.connection().await.ok_or_else(|| {
417            ReplicationError::PeerConnection {
418                peer_id: self.config.node_id.clone(),
419                message: "Connection lost immediately after connect".to_string(),
420            }
421        })
422    }
423
424    /// Get consecutive failure count.
425    pub fn failure_count(&self) -> u64 {
426        self.failure_count.load(Ordering::Acquire)
427    }
428
429    /// Get milliseconds since last success.
430    pub fn millis_since_success(&self) -> u64 {
431        let last = self.last_success.load(Ordering::Acquire);
432        if last == 0 {
433            return u64::MAX;
434        }
435        epoch_millis().saturating_sub(last)
436    }
437
438    /// Mark connection as failed (triggers reconnect).
439    pub async fn mark_disconnected(&self) {
440        *self.conn.write().await = None;
441        *self.state.write().await = PeerState::Disconnected;
442        metrics::record_peer_state(&self.config.node_id, "disconnected");
443        warn!(peer_id = %self.config.node_id, "Connection marked as disconnected");
444    }
445
446    /// Signal shutdown.
447    pub fn shutdown(&self) {
448        self.shutdown.store(true, Ordering::Release);
449    }
450
451    // =========================================================================
452    // Health Check
453    // =========================================================================
454
455    /// Ping the peer's Redis to check connection health.
456    ///
457    /// Returns the round-trip latency on success.
458    /// Updates `last_success` timestamp on success.
459    pub async fn ping(&self) -> Result<Duration> {
460        let conn = self.connection().await.ok_or_else(|| {
461            ReplicationError::PeerConnection {
462                peer_id: self.config.node_id.clone(),
463                message: "Not connected".to_string(),
464            }
465        })?;
466
467        let mut conn = conn;
468        let start = std::time::Instant::now();
469
470        let result: String = redis::cmd("PING")
471            .query_async(&mut conn)
472            .await
473            .map_err(|e| ReplicationError::PeerConnection {
474                peer_id: self.config.node_id.clone(),
475                message: format!("PING failed: {}", e),
476            })?;
477
478        let latency = start.elapsed();
479
480        if result == "PONG" {
481            self.record_success().await;
482            metrics::record_peer_ping(&self.config.node_id, true);
483            metrics::record_peer_ping_latency(&self.config.node_id, latency);
484            Ok(latency)
485        } else {
486            self.record_failure().await;
487            metrics::record_peer_ping(&self.config.node_id, false);
488            Err(ReplicationError::PeerConnection {
489                peer_id: self.config.node_id.clone(),
490                message: format!("Unexpected PING response: {}", result),
491            })
492        }
493    }
494
495    // =========================================================================
496    // Key Prefixing (for shared Redis instances)
497    // =========================================================================
498
499    /// Build a prefixed key for this peer's Redis.
500    ///
501    /// Uses the peer's configured `redis_prefix` (e.g., "node-b:").
502    #[inline]
503    fn prefixed_key(&self, suffix: &str) -> String {
504        match &self.config.redis_prefix {
505            Some(prefix) => format!("{}{}", prefix, suffix),
506            None => suffix.to_string(),
507        }
508    }
509
510    // =========================================================================
511    // Merkle Tree Queries (for cold path repair)
512    // =========================================================================
513
514    /// Get the peer's Merkle root hash (cached with TTL).
515    /// 
516    /// Uses a short-lived cache to avoid hammering the peer during repair cycles.
517    /// The cache is automatically invalidated after `MERKLE_CACHE_TTL`.
518    pub async fn get_merkle_root(&self) -> Result<Option<[u8; 32]>> {
519        // Check cache first
520        {
521            let cache = self.merkle_root_cache.read().await;
522            if let Some(ref cached) = *cache {
523                if Instant::now() < cached.expires_at {
524                    return Ok(cached.root);
525                }
526            }
527        }
528
529        // Cache miss or expired - fetch from peer using query connection
530        // (query_connection is separate from stream connection to avoid XREAD blocking)
531        let start = Instant::now();
532        let conn = self.query_connection().await.ok_or_else(|| {
533            ReplicationError::PeerConnection {
534                peer_id: self.config.node_id.clone(),
535                message: "Not connected".to_string(),
536            }
537        })?;
538
539        let mut conn = conn;
540        // Root hash is at merkle:hash: (empty path suffix)
541        let key = self.prefixed_key("merkle:hash:");
542        let result: Option<String> = redis::cmd("GET")
543            .arg(&key)
544            .query_async(&mut conn)
545            .await
546            .map_err(|e| ReplicationError::PeerConnection {
547                peer_id: self.config.node_id.clone(),
548                message: format!("Failed to get Merkle root: {}", e),
549            })?;
550        
551        metrics::record_peer_operation_latency(&self.config.node_id, "get_merkle_root", start.elapsed());
552
553        let root = match result {
554            Some(hex_str) => {
555                let bytes = hex::decode(&hex_str).map_err(|e| {
556                    ReplicationError::PeerConnection {
557                        peer_id: self.config.node_id.clone(),
558                        message: format!("Invalid Merkle root hex: {}", e),
559                    }
560                })?;
561                let arr: [u8; 32] = bytes.try_into().map_err(|_| {
562                    ReplicationError::PeerConnection {
563                        peer_id: self.config.node_id.clone(),
564                        message: "Merkle root is not 32 bytes".to_string(),
565                    }
566                })?;
567                Some(arr)
568            }
569            None => None,
570        };
571
572        // Update cache
573        {
574            let mut cache = self.merkle_root_cache.write().await;
575            *cache = Some(CachedMerkleRoot {
576                root,
577                expires_at: Instant::now() + MERKLE_CACHE_TTL,
578            });
579        }
580
581        Ok(root)
582    }
583
584    /// Invalidate the Merkle root cache (call after local writes).
585    pub async fn invalidate_merkle_cache(&self) {
586        let mut cache = self.merkle_root_cache.write().await;
587        *cache = None;
588    }
589
590    /// Get children of a Merkle path (sorted by score/position).
591    pub async fn get_merkle_children(&self, path: &str) -> Result<Vec<(String, [u8; 32])>> {
592        let start = Instant::now();
593        let conn = self.query_connection().await.ok_or_else(|| {
594            ReplicationError::PeerConnection {
595                peer_id: self.config.node_id.clone(),
596                message: "Not connected".to_string(),
597            }
598        })?;
599
600        let mut conn = conn;
601        let key = self.prefixed_key(&format!("merkle:children:{}", path));
602
603        // ZRANGE returns items as (member, score) pairs with WITHSCORES
604        let items: Vec<(String, f64)> = redis::cmd("ZRANGE")
605            .arg(&key)
606            .arg(0)
607            .arg(-1)
608            .arg("WITHSCORES")
609            .query_async(&mut conn)
610            .await
611            .map_err(|e| ReplicationError::PeerConnection {
612                peer_id: self.config.node_id.clone(),
613                message: format!("Failed to get Merkle children: {}", e),
614            })?;
615
616        let mut children = Vec::with_capacity(items.len());
617        for (child_name, _score) in items {
618            // Get the hash for this child
619            let child_path = if path.is_empty() {
620                child_name.clone()
621            } else {
622                format!("{}.{}", path, child_name)
623            };
624            let hash_key = self.prefixed_key(&format!("merkle:hash:{}", child_path));
625
626            let hex_hash: Option<String> = redis::cmd("GET")
627                .arg(&hash_key)
628                .query_async(&mut conn)
629                .await
630                .map_err(|e| ReplicationError::PeerConnection {
631                    peer_id: self.config.node_id.clone(),
632                    message: format!("Failed to get child hash: {}", e),
633                })?;
634
635            if let Some(hex_str) = hex_hash {
636                let bytes = hex::decode(&hex_str).map_err(|e| {
637                    ReplicationError::PeerConnection {
638                        peer_id: self.config.node_id.clone(),
639                        message: format!("Invalid child hash hex: {}", e),
640                    }
641                })?;
642                let arr: [u8; 32] = bytes.try_into().map_err(|_| {
643                    ReplicationError::PeerConnection {
644                        peer_id: self.config.node_id.clone(),
645                        message: "Child hash is not 32 bytes".to_string(),
646                    }
647                })?;
648                children.push((child_name, arr));
649            }
650        }
651        
652        metrics::record_peer_operation_latency(&self.config.node_id, "get_merkle_children", start.elapsed());
653
654        Ok(children)
655    }
656
657    /// Get an item's data by key.
658    /// 
659    /// Returns the raw item content from the peer's Redis.
660    /// Uses JSON.GET for JSON items, GET for binary items.
661    pub async fn get_item(&self, key: &str) -> Result<Option<Vec<u8>>> {
662        let start = Instant::now();
663        let conn = self.query_connection().await.ok_or_else(|| {
664            ReplicationError::PeerConnection {
665                peer_id: self.config.node_id.clone(),
666                message: "Not connected".to_string(),
667            }
668        })?;
669
670        let mut conn = conn;
671        // Items are stored at {prefix}{object_id} in sync-engine
672        let redis_key = self.prefixed_key(key);
673
674        // First try JSON.GET (most items are JSON)
675        let json_result: redis::RedisResult<Option<String>> = redis::cmd("JSON.GET")
676            .arg(&redis_key)
677            .arg("$.payload")
678            .query_async(&mut conn)
679            .await;
680        
681        let data = match json_result {
682            Ok(Some(json_str)) => {
683                // Parse the JSON array response from JSONPath and extract payload
684                // JSON.GET returns ["payload_content"] for $.payload
685                if let Ok(arr) = serde_json::from_str::<Vec<serde_json::Value>>(&json_str) {
686                    arr.into_iter().next().map(|payload| {
687                        serde_json::to_vec(&payload).unwrap_or_default()
688                    })
689                } else {
690                    // Might be direct payload string
691                    Some(json_str.into_bytes())
692                }
693            }
694            Ok(None) | Err(_) => {
695                // Fall back to regular GET for binary items
696                redis::cmd("GET")
697                    .arg(&redis_key)
698                    .query_async(&mut conn)
699                    .await
700                    .ok()
701                    .flatten()
702            }
703        };
704        
705        metrics::record_peer_operation_latency(&self.config.node_id, "get_item", start.elapsed());
706
707        Ok(data)
708    }
709}
710
711/// Manager for all peer connections.
712pub struct PeerManager {
713    /// All peer connections (keyed by node_id)
714    peers: dashmap::DashMap<String, Arc<PeerConnection>>,
715    /// Retry configuration
716    retry_config: RetryConfig,
717}
718
719impl PeerManager {
720    /// Create a new peer manager.
721    pub fn new(retry_config: RetryConfig) -> Self {
722        Self {
723            peers: dashmap::DashMap::new(),
724            retry_config,
725        }
726    }
727
728    /// Add a peer from configuration.
729    pub fn add_peer(&self, config: PeerConfig) {
730        let node_id = config.node_id.clone();
731        let conn = Arc::new(PeerConnection::new(config));
732        self.peers.insert(node_id, conn);
733    }
734
735    /// Get a peer connection by node ID.
736    pub fn get(&self, node_id: &str) -> Option<Arc<PeerConnection>> {
737        self.peers.get(node_id).map(|r| r.value().clone())
738    }
739
740    /// Get all peer connections.
741    pub fn all(&self) -> Vec<Arc<PeerConnection>> {
742        self.peers.iter().map(|r| r.value().clone()).collect()
743    }
744
745    /// Connect to all enabled peers.
746    pub async fn connect_all(&self) -> Vec<Result<()>> {
747        let mut results = Vec::new();
748        for peer in self.all() {
749            let result = peer.connect(&self.retry_config).await;
750            results.push(result);
751        }
752        results
753    }
754
755    /// Remove a peer.
756    pub fn remove_peer(&self, node_id: &str) {
757        if let Some((_, peer)) = self.peers.remove(node_id) {
758            peer.shutdown();
759        }
760    }
761
762    /// Shutdown all peers.
763    pub fn shutdown_all(&self) {
764        for peer in self.peers.iter() {
765            peer.shutdown();
766        }
767    }
768
769    /// Get count of connected peers.
770    pub async fn connected_count(&self) -> usize {
771        let mut count = 0;
772        for peer in self.all() {
773            if peer.is_connected().await {
774                count += 1;
775            }
776        }
777        count
778    }
779}
780
781/// Get current epoch milliseconds.
782fn epoch_millis() -> u64 {
783    std::time::SystemTime::now()
784        .duration_since(std::time::UNIX_EPOCH)
785        .unwrap_or_default()
786        .as_millis() as u64
787}
788
789#[cfg(test)]
790mod tests {
791    use super::*;
792
793    #[test]
794    fn test_peer_connection_new() {
795        let config = PeerConfig::for_testing("test-peer", "redis://localhost:6379");
796
797        let conn = PeerConnection::new(config);
798        assert_eq!(conn.node_id(), "test-peer");
799        assert_eq!(conn.cdc_stream_key(), "cdc");
800    }
801
802    #[test]
803    fn test_peer_state_variants() {
804        assert_eq!(PeerState::Disconnected, PeerState::Disconnected);
805        assert_ne!(PeerState::Connected, PeerState::Disconnected);
806        assert_ne!(PeerState::Connecting, PeerState::Backoff);
807        assert_eq!(PeerState::Disabled, PeerState::Disabled);
808    }
809
810    #[test]
811    fn test_peer_circuit_state_variants() {
812        assert_eq!(PeerCircuitState::Closed, PeerCircuitState::Closed);
813        assert_ne!(PeerCircuitState::Open, PeerCircuitState::Closed);
814    }
815
816    #[tokio::test]
817    async fn test_peer_initial_state() {
818        let config = PeerConfig::for_testing("test-peer", "redis://localhost:6379");
819        let conn = PeerConnection::new(config);
820        
821        assert_eq!(conn.state().await, PeerState::Disconnected);
822        assert!(!conn.is_connected().await);
823        assert_eq!(conn.failure_count(), 0);
824    }
825
826    #[tokio::test]
827    async fn test_peer_connection_not_connected() {
828        let config = PeerConfig::for_testing("test-peer", "redis://localhost:6379");
829        let conn = PeerConnection::new(config);
830        
831        // Should be None when not connected
832        assert!(conn.connection().await.is_none());
833    }
834
835    #[tokio::test]
836    async fn test_peer_circuit_breaker() {
837        let config = PeerConfig {
838            node_id: "test-peer".to_string(),
839            redis_url: "redis://localhost:6379".to_string(),
840            priority: 0,
841            circuit_failure_threshold: 3,
842            circuit_reset_timeout_sec: 1,
843            redis_prefix: None,
844        };
845
846        let conn = PeerConnection::new(config);
847
848        // Initially closed
849        assert_eq!(conn.circuit_state().await, PeerCircuitState::Closed);
850        assert!(!conn.is_circuit_open().await);
851
852        // Record failures up to threshold
853        conn.record_failure().await;
854        assert_eq!(conn.circuit_state().await, PeerCircuitState::Closed);
855        conn.record_failure().await;
856        assert_eq!(conn.circuit_state().await, PeerCircuitState::Closed);
857        conn.record_failure().await;
858        
859        // Now it should be open
860        assert_eq!(conn.circuit_state().await, PeerCircuitState::Open);
861        assert!(conn.is_circuit_open().await);
862
863        // Wait for reset timeout
864        tokio::time::sleep(Duration::from_secs(2)).await;
865        
866        // Should be closed again (half-open allows retry)
867        assert_eq!(conn.circuit_state().await, PeerCircuitState::Closed);
868
869        // Success resets everything
870        conn.record_success().await;
871        assert_eq!(conn.failure_count(), 0);
872        assert!(!conn.is_circuit_open().await);
873    }
874
875    #[tokio::test]
876    async fn test_peer_record_success_updates_last_success() {
877        let config = PeerConfig::for_testing("test-peer", "redis://localhost:6379");
878        let conn = PeerConnection::new(config);
879
880        // Initially no success recorded
881        assert_eq!(conn.last_success.load(Ordering::Acquire), 0);
882        
883        // Record some failures first
884        conn.record_failure().await;
885        conn.record_failure().await;
886        assert_eq!(conn.failure_count(), 2);
887
888        // Record success should reset failures and update timestamp
889        conn.record_success().await;
890        assert_eq!(conn.failure_count(), 0);
891        assert!(conn.last_success.load(Ordering::Acquire) > 0);
892    }
893
894    #[tokio::test]
895    async fn test_peer_millis_since_success_no_success() {
896        let config = PeerConfig::for_testing("test-peer", "redis://localhost:6379");
897        let conn = PeerConnection::new(config);
898
899        // No success recorded - should return MAX
900        assert_eq!(conn.millis_since_success(), u64::MAX);
901    }
902
903    #[tokio::test]
904    async fn test_peer_millis_since_success_after_success() {
905        let config = PeerConfig::for_testing("test-peer", "redis://localhost:6379");
906        let conn = PeerConnection::new(config);
907
908        conn.record_success().await;
909        tokio::time::sleep(Duration::from_millis(100)).await;
910        
911        let millis = conn.millis_since_success();
912        // Should be at least 100ms
913        assert!(millis >= 100);
914        // But not absurdly long (less than 1 second for this test)
915        assert!(millis < 1000);
916    }
917
918    #[tokio::test]
919    async fn test_peer_shutdown_flag() {
920        let config = PeerConfig::for_testing("test-peer", "redis://localhost:6379");
921        let conn = PeerConnection::new(config);
922
923        assert!(!conn.shutdown.load(Ordering::Acquire));
924        conn.shutdown();
925        assert!(conn.shutdown.load(Ordering::Acquire));
926    }
927
928    #[tokio::test]
929    async fn test_peer_mark_disconnected() {
930        let config = PeerConfig::for_testing("test-peer", "redis://localhost:6379");
931        let conn = PeerConnection::new(config);
932
933        // Start in Disconnected state
934        assert_eq!(conn.state().await, PeerState::Disconnected);
935        
936        // Mark disconnected (should stay disconnected)
937        conn.mark_disconnected().await;
938        assert_eq!(conn.state().await, PeerState::Disconnected);
939        assert!(conn.connection().await.is_none());
940    }
941
942    #[tokio::test]
943    async fn test_peer_invalidate_merkle_cache() {
944        let config = PeerConfig::for_testing("test-peer", "redis://localhost:6379");
945        let conn = PeerConnection::new(config);
946
947        // Initially no cache
948        assert!(conn.merkle_root_cache.read().await.is_none());
949
950        // Simulate a cached entry
951        {
952            let mut cache = conn.merkle_root_cache.write().await;
953            *cache = Some(CachedMerkleRoot {
954                root: Some([0u8; 32]),
955                expires_at: Instant::now() + Duration::from_secs(60),
956            });
957        }
958
959        // Verify cache is set
960        assert!(conn.merkle_root_cache.read().await.is_some());
961
962        // Invalidate
963        conn.invalidate_merkle_cache().await;
964
965        // Cache should be None
966        assert!(conn.merkle_root_cache.read().await.is_none());
967    }
968
969    #[test]
970    fn test_peer_manager_add_peer() {
971        let manager = PeerManager::new(RetryConfig::testing());
972        manager.add_peer(PeerConfig::for_testing("peer-1", "redis://peer1:6379"));
973        manager.add_peer(PeerConfig::for_testing("peer-2", "redis://peer2:6379"));
974
975        let peers = manager.all();
976        assert_eq!(peers.len(), 2);
977    }
978
979    #[test]
980    fn test_peer_manager_get() {
981        let manager = PeerManager::new(RetryConfig::testing());
982        manager.add_peer(PeerConfig::for_testing("peer-1", "redis://peer1:6379"));
983
984        assert!(manager.get("peer-1").is_some());
985        assert!(manager.get("nonexistent").is_none());
986    }
987
988    #[test]
989    fn test_peer_manager_remove_peer() {
990        let manager = PeerManager::new(RetryConfig::testing());
991        manager.add_peer(PeerConfig::for_testing("peer-1", "redis://peer1:6379"));
992        
993        assert!(manager.get("peer-1").is_some());
994        
995        manager.remove_peer("peer-1");
996        
997        assert!(manager.get("peer-1").is_none());
998    }
999
1000    #[test]
1001    fn test_peer_manager_remove_nonexistent() {
1002        let manager = PeerManager::new(RetryConfig::testing());
1003        // Should not panic
1004        manager.remove_peer("nonexistent");
1005    }
1006
1007    #[test]
1008    fn test_peer_manager_shutdown_all() {
1009        let manager = PeerManager::new(RetryConfig::testing());
1010        manager.add_peer(PeerConfig::for_testing("peer-1", "redis://peer1:6379"));
1011        manager.add_peer(PeerConfig::for_testing("peer-2", "redis://peer2:6379"));
1012
1013        manager.shutdown_all();
1014
1015        // Verify all peers have shutdown flag set
1016        for peer in manager.all() {
1017            assert!(peer.shutdown.load(Ordering::Acquire));
1018        }
1019    }
1020
1021    #[tokio::test]
1022    async fn test_peer_manager_connected_count_none() {
1023        let manager = PeerManager::new(RetryConfig::testing());
1024        manager.add_peer(PeerConfig::for_testing("peer-1", "redis://peer1:6379"));
1025        manager.add_peer(PeerConfig::for_testing("peer-2", "redis://peer2:6379"));
1026
1027        // No peers connected yet
1028        assert_eq!(manager.connected_count().await, 0);
1029    }
1030
1031    #[test]
1032    fn test_epoch_millis() {
1033        let millis = epoch_millis();
1034        // Should be a reasonable epoch time (after 2020, before 2100)
1035        assert!(millis > 1577836800000); // Jan 1, 2020
1036        assert!(millis < 4102444800000); // Jan 1, 2100
1037    }
1038
1039    #[test]
1040    fn test_merkle_cache_ttl() {
1041        // Verify constant is reasonable
1042        assert_eq!(MERKLE_CACHE_TTL, Duration::from_secs(5));
1043    }
1044
1045    #[tokio::test]
1046    async fn test_peer_ping_not_connected() {
1047        let config = PeerConfig::for_testing("test-peer", "redis://localhost:6379");
1048        let conn = PeerConnection::new(config);
1049
1050        // Ping should fail when not connected
1051        let result = conn.ping().await;
1052        assert!(result.is_err());
1053        
1054        if let Err(ReplicationError::PeerConnection { peer_id, message }) = result {
1055            assert_eq!(peer_id, "test-peer");
1056            assert!(message.contains("Not connected"));
1057        } else {
1058            panic!("Expected PeerConnection error");
1059        }
1060    }
1061
1062    #[tokio::test]
1063    async fn test_peer_get_merkle_root_not_connected() {
1064        let config = PeerConfig::for_testing("test-peer", "redis://localhost:6379");
1065        let conn = PeerConnection::new(config);
1066
1067        let result = conn.get_merkle_root().await;
1068        assert!(result.is_err());
1069    }
1070
1071    #[tokio::test]
1072    async fn test_peer_get_merkle_children_not_connected() {
1073        let config = PeerConfig::for_testing("test-peer", "redis://localhost:6379");
1074        let conn = PeerConnection::new(config);
1075
1076        let result = conn.get_merkle_children("some/path").await;
1077        assert!(result.is_err());
1078    }
1079
1080    #[tokio::test]
1081    async fn test_peer_get_item_not_connected() {
1082        let config = PeerConfig::for_testing("test-peer", "redis://localhost:6379");
1083        let conn = PeerConnection::new(config);
1084
1085        let result = conn.get_item("some-key").await;
1086        assert!(result.is_err());
1087    }
1088
1089    #[tokio::test]
1090    async fn test_peer_ensure_connected_fails() {
1091        // This test is slow (retries with timeouts) so we skip it in normal runs
1092        // Integration tests cover real connection scenarios
1093        // Just verify the method exists and handles errors
1094        let config = PeerConfig::for_testing("test-peer", "redis://localhost:1"); // Port 1 is typically closed
1095        let conn = PeerConnection::new(config);
1096
1097        // Should not be connected
1098        assert!(!conn.is_connected().await);
1099    }
1100
1101    #[tokio::test]
1102    async fn test_peer_circuit_opens_on_threshold() {
1103        let config = PeerConfig {
1104            node_id: "test-peer".to_string(),
1105            redis_url: "redis://localhost:6379".to_string(),
1106            priority: 0,
1107            circuit_failure_threshold: 2, // Low threshold for testing
1108            circuit_reset_timeout_sec: 30,
1109            redis_prefix: None,
1110        };
1111
1112        let conn = PeerConnection::new(config);
1113
1114        // One failure - still closed
1115        conn.record_failure().await;
1116        assert_eq!(conn.circuit_state().await, PeerCircuitState::Closed);
1117        assert_eq!(conn.failure_count(), 1);
1118
1119        // Second failure - opens
1120        conn.record_failure().await;
1121        assert_eq!(conn.circuit_state().await, PeerCircuitState::Open);
1122        assert_eq!(conn.failure_count(), 2);
1123    }
1124}