somnytoo 2.0.0

Binary protocol server for secure communications
Documentation
use std::sync::Arc;
use std::time::Duration;
use tokio::net::TcpStream;
use tracing::{info, error};

use crate::core::protocol::phantom_crypto::core::keys::PhantomSession;
use crate::core::protocol::server::session_manager_phantom::PhantomSessionManager;
use crate::core::protocol::batch_system::integration::IntegratedBatchSystem;

pub async fn handle_phantom_client_connection(
    stream: TcpStream,
    peer: std::net::SocketAddr,
    session: Arc<PhantomSession>,
    phantom_session_manager: Arc<PhantomSessionManager>,
    connection_manager: Arc<PhantomConnectionManager>,
    batch_system: Arc<IntegratedBatchSystem>,
) -> anyhow::Result<()> {
    let session_id = session.session_id();
    info!(target: "server", "πŸ’“ Starting batch-integrated phantom connection for session: {} from {}",
        hex::encode(session_id), peer);

    return handle_connection_with_batch(
        stream,
        peer,
        session,
        phantom_session_manager,
        connection_manager,
        batch_system,
    ).await;
}

pub async fn handle_connection_with_batch(
    stream: TcpStream,
    peer: std::net::SocketAddr,
    session: Arc<PhantomSession>,
    phantom_session_manager: Arc<PhantomSessionManager>,
    connection_manager: Arc<PhantomConnectionManager>,
    batch_system: Arc<IntegratedBatchSystem>,
) -> anyhow::Result<()> {
    let session_id = session.session_id().to_vec();

    // РаздСляСм ΠΏΠΎΡ‚ΠΎΠΊ Π½Π° Ρ‡Ρ‚Π΅Π½ΠΈΠ΅ ΠΈ запись
    let (read_half, write_half) = stream.into_split();

    // РСгистрируСм соСдинСниС Π² connection manager
    let (shutdown_tx, mut shutdown_rx) = tokio::sync::mpsc::channel::<()>(1);
    connection_manager.register_connection(session_id.clone(), shutdown_tx).await;

    // Π˜ΡΠΏΠΎΠ»ΡŒΠ·ΡƒΠ΅ΠΌ Π½ΠΎΠ²Ρ‹ΠΉ BatchSystem API
    if let Err(e) = batch_system.register_connection(
        peer,
        session_id.clone(),
        Box::new(read_half),
        Box::new(write_half),
    ).await {
        error!("Failed to register connection with batch system: {}", e);
        cleanup_connection(&session_id, &phantom_session_manager, &connection_manager).await;
        return Ok(());
    }

    // РСгистрируСм сСссию Π² session manager
    phantom_session_manager.register_session(session_id.clone(), session.clone(), peer).await;

    info!("βœ… Connection {} fully registered with batch system", peer);

    // ВмСсто пассивного оТидания, ΠΆΠ΄Π΅ΠΌ сигнала ΠΎ Π·Π°Π²Π΅Ρ€ΡˆΠ΅Π½ΠΈΠΈ ΠΎΡ‚ читатСля
    tokio::select! {
        _ = shutdown_rx.recv() => {
            info!("πŸ‘» Connection {} closed by manager", peer);
        }
        // ДобавляСм Ρ‚Π°ΠΉΠΌΠ°ΡƒΡ‚ для соСдинСния
        _ = tokio::time::sleep(Duration::from_secs(300)) => {
            info!("⏰ Connection {} timeout after 5 minutes", peer);
        }
    }

    // ΠžΡ‡ΠΈΡΡ‚ΠΊΠ° ΠΏΡ€ΠΈ Π·Π°ΠΊΡ€Ρ‹Ρ‚ΠΈΠΈ соСдинСния
    cleanup_connection(&session_id, &phantom_session_manager, &connection_manager).await;

    Ok(())
}

async fn cleanup_connection(
    session_id: &[u8],
    session_manager: &Arc<PhantomSessionManager>,
    connection_manager: &Arc<PhantomConnectionManager>,
) {
    session_manager.force_remove_session(session_id).await;
    connection_manager.unregister_connection(session_id).await;
}

// Π’Π°ΠΊΠΆΠ΅ Π½ΡƒΠΆΠ½ΠΎ Π΄ΠΎΠ±Π°Π²ΠΈΡ‚ΡŒ структуру PhantomConnectionManager Ссли Π΅Ρ‘ Π½Π΅Ρ‚
#[derive(Clone)]
pub struct PhantomConnectionManager {
    active_connections: Arc<tokio::sync::RwLock<std::collections::HashMap<Vec<u8>, tokio::sync::mpsc::Sender<()>>>>,
}

impl PhantomConnectionManager {
    pub fn new() -> Self {
        Self {
            active_connections: Arc::new(tokio::sync::RwLock::new(std::collections::HashMap::new())),
        }
    }

    pub async fn register_connection(&self, session_id: Vec<u8>, shutdown_tx: tokio::sync::mpsc::Sender<()>) {
        let mut connections = self.active_connections.write().await;
        connections.insert(session_id.clone(), shutdown_tx);
    }

    pub async fn unregister_connection(&self, session_id: &[u8]) {
        let mut connections = self.active_connections.write().await;
        connections.remove(session_id);
        info!("πŸ‘» Phantom connection unregistered for session: {}", hex::encode(session_id));
    }

    pub async fn force_disconnect(&self, session_id: &[u8]) {
        if let Some(shutdown_tx) = self.active_connections.write().await.remove(session_id) {
            let _ = shutdown_tx.send(()).await;
            info!("πŸ‘» Forced disconnect for phantom session: {}", hex::encode(session_id));
        }
    }
}