runar_node 0.1.0

Runar Node implementation
Documentation
//! PeerState - Manages the state of a connection to a remote peer
//!
//! INTENTION: Tracks state, manages stream pools, and handles connection health for a single peer.

use crate::network::discovery::NodeInfo;
use crate::network::transport::{NetworkError, StreamPool};
use runar_common::logging::Logger;
use runar_macros_common::{log_debug, log_error, log_info, log_warn};
use std::fmt;
use std::sync::Arc;
use tokio::sync::RwLock;
use tokio::sync::{mpsc, Mutex};

/// PeerState - Manages the state of a connection to a remote peer
///
/// INTENTION: This component tracks the state of individual peer connections,
/// manages stream pools, and handles connection health.
///
/// ARCHITECTURAL BOUNDARIES:
/// - Only accessed by ConnectionPool and QuicTransportImpl
/// - Manages its own StreamPool instance
/// - Handles connection lifecycle for a single peer
///
/// PeerState - Manages the state of a connection to a remote peer
///
/// INTENTION: This component tracks the state of individual peer connections,
/// manages stream pools, and handles connection health. Only mutable fields are
/// protected by granular locks for reduced contention.
pub struct PeerState {
    peer_node_id: String,
    address: String,
    stream_pool: StreamPool,
    connection: Mutex<Option<quinn::Connection>>,
    last_activity: Mutex<std::time::Instant>,
    logger: Arc<Logger>,
    status_tx: mpsc::Sender<bool>,
    #[allow(dead_code)]
    status_rx: Mutex<mpsc::Receiver<bool>>,
    /// Optional node information received during handshake
    node_info: RwLock<Option<NodeInfo>>,
}

impl PeerState {
    pub async fn take_connection(&self) -> Option<quinn::Connection> {
        self.connection.lock().await.take()
    }

    pub async fn has_connection(&self) -> bool {
        self.connection.lock().await.is_some()
    }
    /// Create a new PeerState with the specified peer ID and address
    ///
    /// INTENTION: Initialize a new peer state with the given parameters.
    pub fn new(
        peer_node_id: String,
        address: String,
        max_idle_streams: usize,
        logger: Arc<Logger>,
    ) -> Self {
        let (status_tx, status_rx) = mpsc::channel(10);
        Self {
            peer_node_id,
            address,
            stream_pool: StreamPool::new(max_idle_streams, logger.clone()),
            connection: Mutex::new(None),
            last_activity: Mutex::new(std::time::Instant::now()),
            logger,
            status_tx,
            status_rx: Mutex::new(status_rx),
            node_info: RwLock::new(None),
        }
    }

    /// Set the node info for this peer
    ///
    /// INTENTION: Store the node information received during handshake.
    pub async fn set_node_info(&self, node_info: NodeInfo) {
        let mut info = self.node_info.write().await;
        *info = Some(node_info);
        log_info!(
            self.logger,
            "Node info set for peer {peer_id}",
            peer_id = self.peer_node_id
        );
    }
    /// Set the connection for this peer
    ///
    /// INTENTION: Establish a connection to the peer and update the state.
    pub async fn set_connection(&self, connection: quinn::Connection) {
        log_info!(
            self.logger,
            "🔗 [PeerState] Setting connection for peer {} - Remote: {}",
            self.peer_node_id,
            connection.remote_address()
        );

        let mut conn_guard = self.connection.lock().await;
        *conn_guard = Some(connection);
        let mut last = self.last_activity.lock().await;
        *last = std::time::Instant::now();
        let _ = self.status_tx.send(true).await;
        log_info!(
            self.logger,
            "✅ [PeerState] Connection established with peer {} at {}",
            self.peer_node_id,
            std::time::Instant::now().elapsed().as_millis()
        );
    }

    /// Check if peer is connected
    ///
    /// INTENTION: Determine if there's an active connection to the peer.
    pub async fn is_connected(&self) -> bool {
        let conn_guard = self.connection.lock().await;
        let connected = conn_guard.is_some();

        log_debug!(
            self.logger,
            "🔍 [PeerState] Connection check for peer {} - Connected: {}",
            self.peer_node_id,
            connected
        );

        // If we have a connection, also check if it's still alive
        if connected {
            if let Some(conn) = conn_guard.as_ref() {
                let close_reason = conn.close_reason();
                if close_reason.is_some() {
                    log_warn!(
                        self.logger,
                        "⚠️ [PeerState] Connection to peer {} is closed - Reason: {:?}",
                        self.peer_node_id,
                        close_reason
                    );
                    return false;
                }
            }
        }

        connected
    }

    /// Get a stream for sending messages to this peer
    ///
    /// INTENTION: Obtain a QUIC stream for sending data to this peer.
    pub async fn get_send_stream(&self) -> Result<quinn::SendStream, NetworkError> {
        log_debug!(
            self.logger,
            "🔄 [PeerState] Checking for idle stream for peer {}",
            self.peer_node_id
        );

        if let Some(stream) = self.stream_pool.get_idle_stream().await {
            log_debug!(
                self.logger,
                "✅ [PeerState] Found idle stream for peer {}",
                self.peer_node_id
            );
            return Ok(stream);
        }

        log_debug!(
            self.logger,
            "🆕 [PeerState] No idle stream available - creating new stream for peer {}",
            self.peer_node_id
        );

        let conn_opt = { self.connection.lock().await.clone() };
        if let Some(conn) = conn_opt {
            log_debug!(
                self.logger,
                "✅ [PeerState] Connection available for peer {} - opening new stream",
                self.peer_node_id
            );

            match conn.open_bi().await {
                Ok((send_stream, _recv_stream)) => {
                    log_info!(
                        self.logger,
                        "✅ [PeerState] Opened new bidirectional stream to peer {}",
                        self.peer_node_id
                    );
                    Ok(send_stream)
                }
                Err(e) => {
                    log_error!(
                        self.logger,
                        "❌ [PeerState] Failed to open stream to peer {}: {}",
                        self.peer_node_id,
                        e
                    );

                    // Log additional connection state information
                    log_error!(
                        self.logger,
                        "🔍 [PeerState] Connection diagnostics for peer {} - Error details: {:?}",
                        self.peer_node_id,
                        e
                    );

                    Err(NetworkError::ConnectionError(format!(
                        "Failed to open stream: {e}"
                    )))
                }
            }
        } else {
            log_error!(
                self.logger,
                "❌ [PeerState] No connection available for peer {} - cannot create stream",
                self.peer_node_id
            );
            Err(NetworkError::ConnectionError(
                "Not connected to peer".to_string(),
            ))
        }
    }

    /// Return a stream to the pool for reuse
    ///
    /// INTENTION: Recycle streams to avoid the overhead of creating new ones.
    pub async fn return_stream(&self, stream: quinn::SendStream) -> Result<(), NetworkError> {
        self.stream_pool.return_stream(stream).await
    }

    /// Get a clone of the connection for direct use
    ///
    /// INTENTION: Provide access to the connection for operations that need it directly
    pub async fn get_connection(&self) -> Option<quinn::Connection> {
        let conn_guard = self.connection.lock().await;
        conn_guard.clone()
    }

    /// Update the last activity timestamp
    ///
    /// INTENTION: Track when the peer was last active for connection management.
    pub async fn update_activity(&self) {
        let mut last = self.last_activity.lock().await;
        *last = std::time::Instant::now();
    }

    /// Close the connection to this peer
    ///
    /// INTENTION: Properly clean up resources when disconnecting from a peer.
    pub async fn close_connection(&self) -> Result<(), NetworkError> {
        let conn_opt = { self.connection.lock().await.take() };
        if let Some(conn) = conn_opt {
            conn.close(0u32.into(), b"Connection closed by peer");
            let _ = self.status_tx.send(false).await;
            log_info!(
                self.logger,
                "Connection closed with peer {}",
                self.peer_node_id
            );
        }
        let _ = self.stream_pool.clear().await;
        Ok(())
    }
}

impl fmt::Debug for PeerState {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        f.debug_struct("PeerState")
            .field("peer_id", &self.peer_node_id)
            .field("address", &self.address)
            .finish()
    }
}