use std::sync::Arc;
use std::net::SocketAddr;
use tokio::sync::{mpsc, oneshot, Mutex};
use tokio::time::Instant;
use tracing::{info, error, warn, trace};
use crate::core::protocol::phantom_crypto::keys::PhantomSession;
use crate::core::protocol::phantom_crypto::packet::PhantomPacketProcessor;
use crate::core::protocol::packets::processor::pipeline::orchestrator::PipelineOrchestrator;
use crate::core::protocol::packets::processor::pipeline::stages::common::{PipelineContext};
use crate::core::protocol::packets::processor::pipeline::stages::decryption::PhantomDecryptionStage;
use crate::core::protocol::packets::processor::pipeline::stages::processing::PhantomProcessingStage;
use crate::core::protocol::packets::processor::pipeline::stages::encryption::PhantomEncryptionStage;
use crate::core::protocol::crypto::crypto_pool_phantom::PhantomCryptoPool;
use super::priority::Priority;
use super::packet_service::PhantomPacketService;
pub struct Work {
pub ctx: Arc<PhantomSession>, pub raw_payload: Vec<u8>,
pub client_ip: SocketAddr,
pub reply: oneshot::Sender<Vec<u8>>,
pub received_at: Instant,
pub priority: Priority,
pub is_large: bool,
}
pub struct Dispatcher {
tx: mpsc::Sender<Work>,
phantom_crypto_pool: Arc<PhantomCryptoPool>,
}
impl Dispatcher {
pub fn spawn(
num_workers: usize,
phantom_crypto_pool: Arc<PhantomCryptoPool>,
phantom_packet_service: Arc<PhantomPacketService>, ) -> Self {
let (tx, rx) = mpsc::channel::<Work>(65536);
let rx = Arc::new(Mutex::new(rx));
for _ in 0..num_workers {
let rx = Arc::clone(&rx);
let phantom_crypto_pool = phantom_crypto_pool.clone();
let phantom_packet_service = phantom_packet_service.clone();
tokio::spawn(async move {
let mut worker = DispatcherWorker::new(rx, phantom_crypto_pool, phantom_packet_service);
worker.run().await;
});
}
Dispatcher { tx, phantom_crypto_pool }
}
pub async fn process_directly(
&self,
ctx: Arc<PhantomSession>,
packet_type: u8,
payload: Vec<u8>,
_client_ip: SocketAddr
) -> Result<Vec<u8>, Box<dyn std::error::Error>> {
let processor = PhantomPacketProcessor::new();
let result = processor.create_outgoing(&ctx, packet_type, &payload)?;
Ok(result)
}
pub async fn submit(&self, work: Work) -> Result<(), mpsc::error::SendError<Work>> {
self.tx.send(work).await
}
}
struct DispatcherWorker {
rx: Arc<Mutex<mpsc::Receiver<Work>>>,
phantom_crypto_pool: Arc<PhantomCryptoPool>,
phantom_packet_service: Arc<PhantomPacketService>,
}
impl DispatcherWorker {
fn new(
rx: Arc<Mutex<mpsc::Receiver<Work>>>,
phantom_crypto_pool: Arc<PhantomCryptoPool>,
phantom_packet_service: Arc<PhantomPacketService>
) -> Self {
Self { rx, phantom_crypto_pool, phantom_packet_service }
}
async fn run(&mut self) {
loop {
let work = {
let mut guard = self.rx.lock().await;
guard.recv().await
};
if let Some(work) = work {
self.process_work(work).await;
} else {
break;
}
}
}
async fn process_work(&self, work: Work) {
let work_start = Instant::now();
if work.reply.is_closed() {
info!("Client disconnected, skipping processing");
return;
}
let response_packet_type = if work.raw_payload.len() >= 5 {
work.raw_payload[4]
} else {
warn!("Packet too short, using default packet type for response");
0x10 };
info!("Processing phantom work for {} (size: {} bytes, priority: {:?})",
work.client_ip, work.raw_payload.len(), work.priority);
let pipeline_start = Instant::now();
let pipeline = PipelineOrchestrator::new()
.add_stage(PhantomDecryptionStage::new(self.phantom_crypto_pool.clone()))
.add_stage(PhantomProcessingStage::new(
self.phantom_packet_service.clone(),
work.client_ip
))
.add_stage(PhantomEncryptionStage::new(
response_packet_type,
self.phantom_crypto_pool.clone()
));
let pipeline_init_time = pipeline_start.elapsed();
let context = PipelineContext::new(work.ctx, work.raw_payload);
let execute_start = Instant::now();
match pipeline.execute(context).await {
Ok(encrypted_response) => {
let execute_time = execute_start.elapsed();
let total_work_time = work_start.elapsed();
info!("Phantom pipeline execution - init: {:?}, execute: {:?}, total: {:?}, response size: {} bytes",
pipeline_init_time, execute_time, total_work_time, encrypted_response.len());
let processing_time = Instant::now().duration_since(work.received_at);
if processing_time.as_millis() > 100 {
warn!("Slow phantom packet processing: {}ms for {}", processing_time.as_millis(), work.client_ip);
}
let send_start = Instant::now();
if let Err(e) = work.reply.send(encrypted_response) {
info!("Failed to send phantom response: {:?}", e);
}
let send_time = send_start.elapsed();
trace!("Phantom response send time: {:?}", send_time);
}
Err(e) => {
error!("Phantom pipeline processing failed: {}", e);
let total_time = work_start.elapsed();
warn!("Phantom pipeline failed after {:?}: {}", total_time, e);
let error_response = format!("Phantom processing error: {}", e).into_bytes();
let _ = work.reply.send(error_response);
}
}
}
}