use crate::network::discovery::NodeInfo;
use crate::network::transport::{NetworkError, StreamPool};
use runar_common::logging::Logger;
use runar_macros_common::{log_debug, log_error, log_info, log_warn};
use std::fmt;
use std::sync::Arc;
use tokio::sync::RwLock;
use tokio::sync::{mpsc, Mutex};
pub struct PeerState {
peer_node_id: String,
address: String,
stream_pool: StreamPool,
connection: Mutex<Option<quinn::Connection>>,
last_activity: Mutex<std::time::Instant>,
logger: Arc<Logger>,
status_tx: mpsc::Sender<bool>,
#[allow(dead_code)]
status_rx: Mutex<mpsc::Receiver<bool>>,
node_info: RwLock<Option<NodeInfo>>,
}
impl PeerState {
pub async fn take_connection(&self) -> Option<quinn::Connection> {
self.connection.lock().await.take()
}
pub async fn has_connection(&self) -> bool {
self.connection.lock().await.is_some()
}
pub fn new(
peer_node_id: String,
address: String,
max_idle_streams: usize,
logger: Arc<Logger>,
) -> Self {
let (status_tx, status_rx) = mpsc::channel(10);
Self {
peer_node_id,
address,
stream_pool: StreamPool::new(max_idle_streams, logger.clone()),
connection: Mutex::new(None),
last_activity: Mutex::new(std::time::Instant::now()),
logger,
status_tx,
status_rx: Mutex::new(status_rx),
node_info: RwLock::new(None),
}
}
pub async fn set_node_info(&self, node_info: NodeInfo) {
let mut info = self.node_info.write().await;
*info = Some(node_info);
log_info!(
self.logger,
"Node info set for peer {peer_id}",
peer_id = self.peer_node_id
);
}
pub async fn set_connection(&self, connection: quinn::Connection) {
log_info!(
self.logger,
"🔗 [PeerState] Setting connection for peer {} - Remote: {}",
self.peer_node_id,
connection.remote_address()
);
let mut conn_guard = self.connection.lock().await;
*conn_guard = Some(connection);
let mut last = self.last_activity.lock().await;
*last = std::time::Instant::now();
let _ = self.status_tx.send(true).await;
log_info!(
self.logger,
"✅ [PeerState] Connection established with peer {} at {}",
self.peer_node_id,
std::time::Instant::now().elapsed().as_millis()
);
}
pub async fn is_connected(&self) -> bool {
let conn_guard = self.connection.lock().await;
let connected = conn_guard.is_some();
log_debug!(
self.logger,
"🔍 [PeerState] Connection check for peer {} - Connected: {}",
self.peer_node_id,
connected
);
if connected {
if let Some(conn) = conn_guard.as_ref() {
let close_reason = conn.close_reason();
if close_reason.is_some() {
log_warn!(
self.logger,
"⚠️ [PeerState] Connection to peer {} is closed - Reason: {:?}",
self.peer_node_id,
close_reason
);
return false;
}
}
}
connected
}
pub async fn get_send_stream(&self) -> Result<quinn::SendStream, NetworkError> {
log_debug!(
self.logger,
"🔄 [PeerState] Checking for idle stream for peer {}",
self.peer_node_id
);
if let Some(stream) = self.stream_pool.get_idle_stream().await {
log_debug!(
self.logger,
"✅ [PeerState] Found idle stream for peer {}",
self.peer_node_id
);
return Ok(stream);
}
log_debug!(
self.logger,
"🆕 [PeerState] No idle stream available - creating new stream for peer {}",
self.peer_node_id
);
let conn_opt = { self.connection.lock().await.clone() };
if let Some(conn) = conn_opt {
log_debug!(
self.logger,
"✅ [PeerState] Connection available for peer {} - opening new stream",
self.peer_node_id
);
match conn.open_bi().await {
Ok((send_stream, _recv_stream)) => {
log_info!(
self.logger,
"✅ [PeerState] Opened new bidirectional stream to peer {}",
self.peer_node_id
);
Ok(send_stream)
}
Err(e) => {
log_error!(
self.logger,
"❌ [PeerState] Failed to open stream to peer {}: {}",
self.peer_node_id,
e
);
log_error!(
self.logger,
"🔍 [PeerState] Connection diagnostics for peer {} - Error details: {:?}",
self.peer_node_id,
e
);
Err(NetworkError::ConnectionError(format!(
"Failed to open stream: {e}"
)))
}
}
} else {
log_error!(
self.logger,
"❌ [PeerState] No connection available for peer {} - cannot create stream",
self.peer_node_id
);
Err(NetworkError::ConnectionError(
"Not connected to peer".to_string(),
))
}
}
pub async fn return_stream(&self, stream: quinn::SendStream) -> Result<(), NetworkError> {
self.stream_pool.return_stream(stream).await
}
pub async fn get_connection(&self) -> Option<quinn::Connection> {
let conn_guard = self.connection.lock().await;
conn_guard.clone()
}
pub async fn update_activity(&self) {
let mut last = self.last_activity.lock().await;
*last = std::time::Instant::now();
}
pub async fn close_connection(&self) -> Result<(), NetworkError> {
let conn_opt = { self.connection.lock().await.take() };
if let Some(conn) = conn_opt {
conn.close(0u32.into(), b"Connection closed by peer");
let _ = self.status_tx.send(false).await;
log_info!(
self.logger,
"Connection closed with peer {}",
self.peer_node_id
);
}
let _ = self.stream_pool.clear().await;
Ok(())
}
}
impl fmt::Debug for PeerState {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("PeerState")
.field("peer_id", &self.peer_node_id)
.field("address", &self.address)
.finish()
}
}