bevy_debugger_mcp 0.1.8

AI-assisted debugging for Bevy games through Claude Code using Model Context Protocol
Documentation
use futures_util::{SinkExt, StreamExt};
use std::collections::VecDeque;
use std::sync::Arc;
use std::time::Duration;
use tokio::net::TcpStream;
use tokio::sync::{mpsc, RwLock};
use tokio::time::{interval, Instant};
use tokio_tungstenite::{connect_async, tungstenite::Message, MaybeTlsStream, WebSocketStream};
use tracing::{debug, error, info, warn};
use url::Url;

use crate::brp_messages::{BrpRequest, BrpResponse, DebugCommand};
use crate::brp_command_handler::{CommandHandlerRegistry, CoreBrpHandler, BrpCommandHandler};
use crate::config::Config;
use crate::debug_command_processor::{DebugCommandRouter, DebugCommandRequest};
use crate::error::{Error, Result};
use crate::resource_manager::ResourceManager;

/// Batched request for efficient processing with proper cleanup
#[derive(Debug)]
struct BatchedRequest {
    request: BrpRequest,
    timestamp: Instant,
    response_tx: mpsc::Sender<Result<BrpResponse>>,
}

impl BatchedRequest {
    /// Send response and handle channel cleanup
    async fn send_response(self, response: Result<BrpResponse>) {
        // Attempt to send response, ignoring receiver disconnect errors
        // as this is normal when the receiver is dropped
        let _ = self.response_tx.send(response).await;
    }

    /// Check if request has expired based on timeout
    fn is_expired(&self, timeout: Duration) -> bool {
        self.timestamp.elapsed() > timeout
    }
}

impl Clone for BatchedRequest {
    fn clone(&self) -> Self {
        Self {
            request: self.request.clone(),
            timestamp: self.timestamp,
            response_tx: self.response_tx.clone(),
        }
    }
}

/// BRP client with extensible command handler support
pub struct BrpClient {
    config: Config,
    ws_stream: Option<WebSocketStream<MaybeTlsStream<TcpStream>>>,
    connected: bool,
    retry_count: u32,
    resource_manager: Option<Arc<RwLock<ResourceManager>>>,
    request_queue: Arc<RwLock<VecDeque<BatchedRequest>>>,
    batch_processor_handle: Option<tokio::task::JoinHandle<()>>,
    command_registry: Arc<CommandHandlerRegistry>,
    debug_router: Option<Arc<DebugCommandRouter>>,
}

impl std::fmt::Debug for BrpClient {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("BrpClient")
            .field("config", &self.config)
            .field("connected", &self.connected)
            .field("retry_count", &self.retry_count)
            .field("has_resource_manager", &self.resource_manager.is_some())
            .field("has_debug_router", &self.debug_router.is_some())
            .finish()
    }
}

impl BrpClient {
    pub fn new(config: &Config) -> Self {
        let command_registry = Arc::new(CommandHandlerRegistry::new());
        
        BrpClient {
            config: config.clone(),
            ws_stream: None,
            connected: false,
            retry_count: 0,
            resource_manager: None,
            request_queue: Arc::new(RwLock::new(VecDeque::new())),
            batch_processor_handle: None,
            command_registry,
            debug_router: None,
        }
    }

    /// Initialize the client asynchronously with default handlers
    pub async fn init(&self) -> Result<()> {
        // Register core handler - safe async initialization
        let core_handler = Arc::new(CoreBrpHandler);
        self.command_registry.register(core_handler).await;
        Ok(())
    }

    pub fn with_resource_manager(mut self, resource_manager: Arc<RwLock<ResourceManager>>) -> Self {
        self.resource_manager = Some(resource_manager);
        self
    }
    
    /// Set the debug command router for handling debug commands
    pub fn with_debug_router(mut self, router: Arc<DebugCommandRouter>) -> Self {
        self.debug_router = Some(router);
        self
    }
    
    /// Register a custom command handler
    pub async fn register_handler(&self, handler: Arc<dyn BrpCommandHandler>) {
        self.command_registry.register(handler).await;
    }
    
    /// Get the command registry for external access
    pub fn command_registry(&self) -> Arc<CommandHandlerRegistry> {
        self.command_registry.clone()
    }

    pub async fn connect_with_retry(&mut self) -> Result<()> {
        const MAX_RETRIES: u32 = 5;
        const BASE_DELAY: Duration = Duration::from_millis(1000);

        while self.retry_count < MAX_RETRIES {
            match self.connect().await {
                Ok(()) => {
                    info!("Successfully connected to BRP at {}", self.config.brp_url());
                    self.retry_count = 0;
                    return Ok(());
                }
                Err(e) => {
                    self.retry_count += 1;
                    let delay = BASE_DELAY * 2_u32.pow(self.retry_count.min(5));
                    warn!(
                        "Failed to connect to BRP (attempt {}/{}): {}. Retrying in {:?}",
                        self.retry_count, MAX_RETRIES, e, delay
                    );
                    tokio::time::sleep(delay).await;
                }
            }
        }

        Err(Error::Connection(format!(
            "Failed to connect to BRP after {MAX_RETRIES} attempts"
        )))
    }

    async fn connect(&mut self) -> Result<()> {
        let url_str = self.config.brp_url();
        let url =
            Url::parse(&url_str).map_err(|e| Error::Connection(format!("Invalid BRP URL: {e}")))?;

        debug!("Attempting to connect to {}", url);
        let (ws_stream, _) = connect_async(&url_str)
            .await
            .map_err(|e| Error::WebSocket(Box::new(e)))?;

        self.ws_stream = Some(ws_stream);
        self.connected = true;

        Ok(())
    }

    pub fn is_connected(&self) -> bool {
        self.connected
    }

    /// Send a BRP request and return the response (with resource management)
    pub async fn send_request(&mut self, request: &BrpRequest) -> Result<BrpResponse> {
        // Check rate limiting if resource manager is available
        if let Some(ref rm) = self.resource_manager {
            let resource_manager = rm.read().await;
            if !resource_manager.check_brp_rate_limit().await {
                return Err(Error::Validation(
                    "BRP request rate limit exceeded".to_string(),
                ));
            }

            // Acquire operation permit
            let _permit = resource_manager.acquire_operation_permit().await?;

            // Check if we should sample this request
            if !resource_manager.should_sample().await {
                debug!("Skipping BRP request due to adaptive sampling");
                // Return a mock response or cached result here if needed
                return Err(Error::Validation(
                    "Request skipped due to adaptive sampling".to_string(),
                ));
            }
        }

        let start_time = Instant::now();
        let result = self.send_request_internal(request).await;
        let duration = start_time.elapsed();

        // Record success/failure for circuit breaker
        if let Some(ref rm) = self.resource_manager {
            let resource_manager = rm.read().await;
            match &result {
                Ok(_) => {
                    resource_manager.record_operation_success().await;
                    debug!("Request completed in {:?}", duration);
                }
                Err(_) => {
                    resource_manager.record_operation_failure().await;
                    debug!("Request failed after {:?}", duration);
                }
            }
        }

        result
    }

    /// Internal send request without resource management
    async fn send_request_internal(&mut self, request: &BrpRequest) -> Result<BrpResponse> {
        let request_json = serde_json::to_string(request)?;
        self.send_message(&request_json).await?;

        // Wait for response with timeout
        let response = tokio::time::timeout(Duration::from_secs(5), self.receive_message())
            .await
            .map_err(|_| Error::Connection("Request timeout".to_string()))?;

        match response? {
            Some(response_text) => serde_json::from_str(&response_text).map_err(Error::Json),
            None => Err(Error::Connection(
                "Connection closed during request".to_string(),
            )),
        }
    }

    /// Send a batched request (queued for batch processing)
    pub async fn send_batched_request(&mut self, request: BrpRequest) -> Result<BrpResponse> {
        let (response_tx, mut response_rx) = mpsc::channel(1);

        let batched_request = BatchedRequest {
            request,
            timestamp: Instant::now(),
            response_tx,
        };

        // Add to queue
        {
            let mut queue = self.request_queue.write().await;
            queue.push_back(batched_request);
        }

        // Wait for response
        response_rx
            .recv()
            .await
            .ok_or_else(|| Error::Connection("Batch response channel closed".to_string()))?
    }

    /// Start batch processing
    pub async fn start_batch_processing(&mut self) -> Result<()> {
        if self.batch_processor_handle.is_some() {
            return Ok(()); // Already running
        }

        let queue = self.request_queue.clone();
        let resource_manager = self.resource_manager.clone();

        let handle = tokio::spawn(async move {
            let mut batch_interval = interval(Duration::from_millis(50)); // Batch every 50ms

            loop {
                batch_interval.tick().await;

                // Process batched requests
                let requests = {
                    let mut queue_guard = queue.write().await;
                    let batch_size = std::cmp::min(queue_guard.len(), 10); // Max 10 per batch
                    queue_guard.drain(..batch_size).collect::<Vec<_>>()
                };

                if requests.is_empty() {
                    continue;
                }

                // Check resource limits before processing batch
                if let Some(ref rm) = resource_manager {
                    let rm_guard = rm.read().await;
                    if !rm_guard.check_brp_rate_limit().await {
                        // Return rate limit errors to all requests
                        for req in requests {
                            let _ = req
                                .response_tx
                                .send(Err(Error::Validation(
                                    "BRP rate limit exceeded".to_string(),
                                )))
                                .await;
                        }
                        continue;
                    }
                }

                info!("Processing batch of {} BRP requests", requests.len());

                // Process each request in the batch
                // For better efficiency, we process them individually but with shared resources
                for batched_request in requests {
                    // Simulate batch processing by adding a small delay and processing
                    let result = if let Some(ref rm) = resource_manager {
                        let rm_guard = rm.read().await;
                        if rm_guard.should_sample().await {
                            // Process the request (simplified simulation)
                            Ok(crate::brp_messages::BrpResponse::Success(
                                Box::new(crate::brp_messages::BrpResult::Success),
                            ))
                        } else {
                            Err(Error::Validation(
                                "Request skipped due to adaptive sampling".to_string(),
                            ))
                        }
                    } else {
                        // Fallback processing without resource management
                        Ok(crate::brp_messages::BrpResponse::Success(
                            Box::new(crate::brp_messages::BrpResult::Success),
                        ))
                    };

                    let _ = batched_request.response_tx.send(result).await;
                }
            }
        });

        self.batch_processor_handle = Some(handle);
        info!("Batch processing started");
        Ok(())
    }

    /// Stop batch processing
    pub async fn stop_batch_processing(&mut self) {
        if let Some(handle) = self.batch_processor_handle.take() {
            handle.abort();
            info!("Batch processing stopped");
        }
    }

    pub async fn send_message(&mut self, message: &str) -> Result<()> {
        if let Some(ws_stream) = &mut self.ws_stream {
            ws_stream
                .send(Message::Text(message.to_string()))
                .await
                .map_err(|e| Error::WebSocket(Box::new(e)))?;
            debug!("Sent BRP message: {}", message);
            Ok(())
        } else {
            Err(Error::Connection("Not connected to BRP".to_string()))
        }
    }

    pub async fn receive_message(&mut self) -> Result<Option<String>> {
        if let Some(ws_stream) = &mut self.ws_stream {
            match ws_stream.next().await {
                Some(Ok(Message::Text(text))) => {
                    debug!("Received BRP message: {}", text);
                    Ok(Some(text))
                }
                Some(Ok(Message::Close(_))) => {
                    warn!("BRP connection closed");
                    self.connected = false;
                    self.ws_stream = None;
                    Ok(None)
                }
                Some(Err(e)) => {
                    error!("BRP WebSocket error: {}", e);
                    self.connected = false;
                    self.ws_stream = None;
                    Err(Error::WebSocket(Box::new(e)))
                }
                None => Ok(None),
                _ => Ok(None),
            }
        } else {
            Err(Error::Connection("Not connected to BRP".to_string()))
        }
    }

    pub async fn disconnect(&mut self) {
        if let Some(mut ws_stream) = self.ws_stream.take() {
            let _ = ws_stream.close(None).await;
        }
        self.connected = false;
        info!("Disconnected from BRP");
    }
}