use std::net::SocketAddr;
use std::sync::Arc;
use tokio::net::TcpStream;
use tokio::sync::mpsc;
use tokio::time::{timeout, Duration, Instant};
use tracing::{info, warn, error, debug};
use crate::core::protocol::phantom_crypto::core::handshake::{perform_phantom_handshake, HandshakeRole};
use crate::core::protocol::server::security::rate_limiter::instance::RATE_LIMITER;
use crate::config::PhantomConfig;
use crate::core::protocol::server::heartbeat::types::ConnectionHeartbeatManager;
use crate::core::protocol::packets::processor::packet_service::PhantomPacketService;
pub async fn handle_phantom_connection(
mut stream: TcpStream,
peer: SocketAddr,
phantom_config: PhantomConfig,
session_manager: Arc<crate::core::protocol::server::session_manager_phantom::PhantomSessionManager>,
connection_manager: Arc<crate::core::protocol::server::connection_manager_phantom::PhantomConnectionManager>,
crypto_pool: Arc<crate::core::protocol::crypto::crypto_pool_phantom::PhantomCryptoPool>,
heartbeat_manager: Arc<ConnectionHeartbeatManager>,
packet_service: Arc<PhantomPacketService>,
) -> anyhow::Result<()> {
let connection_start = Instant::now();
info!(target: "server", "👻 {} attempting phantom connection", peer);
if !RATE_LIMITER.check_packet(&peer.ip().to_string(), &[]) {
warn!(target: "server", "👻 Rate limit exceeded for {}, rejecting connection", peer);
return Ok(());
}
let handshake_start = Instant::now();
let handshake_result = match timeout(
Duration::from_secs(30),
perform_phantom_handshake(&mut stream, HandshakeRole::Server)
).await {
Ok(Ok(result)) => {
let handshake_time = handshake_start.elapsed();
info!(target: "server",
"👻 Phantom handshake successful for {} in {:?}, session: {}",
peer, handshake_time, hex::encode(&result.session.session_id()));
result
},
Ok(Err(e)) => {
let handshake_time = handshake_start.elapsed();
warn!(target: "server",
"👻 Phantom handshake failed for {} after {:?}: {}",
peer, handshake_time, e);
return Ok(());
}
Err(_) => {
error!(target: "server", "👻 Phantom handshake timeout for {}", peer);
return Ok(());
}
};
if let Err(e) = phantom_config.validate() {
error!("👻 Invalid phantom configuration: {}", e);
return Ok(());
}
if phantom_config.should_use_hardware_auth() {
info!("👻 Hardware authentication enabled for session: {}",
hex::encode(&handshake_result.session.session_id()));
}
let phantom_session = Arc::new(handshake_result.session);
let session_id = phantom_session.session_id();
let (heartbeat_tx, mut heartbeat_rx) = mpsc::unbounded_channel();
heartbeat_manager.start_heartbeat(
session_id.to_vec(),
phantom_session.clone(),
peer,
heartbeat_tx,
);
info!(target: "server", "💓 Heartbeat started for session: {} from {}",
hex::encode(&session_id), peer);
let connection_result = crate::core::protocol::server::connection_manager_phantom::handle_phantom_client_connection(
stream,
peer,
phantom_session.clone(),
crypto_pool,
session_manager.clone(),
connection_manager.clone(),
heartbeat_manager.clone(),
packet_service.clone(), ).await;
let heartbeat_manager_clone = heartbeat_manager.clone();
let session_id_clone = session_id.to_vec();
let heartbeat_task = tokio::spawn(async move {
debug!("💓 Starting heartbeat receiver for session: {}", hex::encode(&session_id_clone));
while let Some(heartbeat_data) = heartbeat_rx.recv().await {
debug!("💓 Heartbeat data ready for session {}: {:?}",
hex::encode(&session_id_clone), heartbeat_data);
heartbeat_manager_clone.heartbeat_received(session_id_clone.clone());
}
debug!("💓 Heartbeat receiver stopped for session: {}", hex::encode(&session_id_clone));
});
let connection_handler_result = match connection_result {
Ok(()) => {
debug!("💓 Connection handler completed successfully for session: {}",
hex::encode(&session_id));
Ok(())
}
Err(e) => {
error!("💓 Connection handler error for session {}: {}",
hex::encode(&session_id), e);
Err(e)
}
};
heartbeat_manager.stop_heartbeat(session_id.to_vec());
info!(target: "server", "💓 Heartbeat stopped for session: {}", hex::encode(&session_id));
heartbeat_task.abort();
let total_connection_time = connection_start.elapsed();
info!(target: "server",
"👻 {} phantom connection closed after {:?}, session: {}",
peer, total_connection_time, hex::encode(session_id));
connection_handler_result
}