bytes-radar 1.0.0

A tool for analyzing code statistics from remote repositories with hyper-fast performance
Documentation
use super::ProgressHook;
use crate::core::{
    analysis::{FileMetrics, ProjectAnalysis},
    error::{AnalysisError, Result},
    filter::{FilterStats, IntelligentFilter},
    registry::LanguageRegistry,
};
use flate2::read::GzDecoder;
use futures_util::StreamExt;
use std::io::{Cursor, Read};
use tar::Archive;
use tokio::sync::mpsc;

#[cfg(not(target_arch = "wasm32"))]
use tokio::task;

pub type ProgressCallback = Box<dyn Fn(u64, Option<u64>) + Send + Sync>;

pub struct StreamReader {
    receiver: mpsc::Receiver<std::io::Result<bytes::Bytes>>,
    current_chunk: Option<Cursor<bytes::Bytes>>,
    finished: bool,
}

impl StreamReader {
    #[cfg(not(target_arch = "wasm32"))]
    pub fn new(
        stream: impl futures_util::Stream<Item = reqwest::Result<bytes::Bytes>> + Send + 'static,
        progress_callback: ProgressCallback,
        total_size: Option<u64>,
    ) -> Self {
        let (tx, rx) = mpsc::channel(32);

        tokio::spawn(async move {
            let mut downloaded = 0u64;
            let mut stream = Box::pin(stream);

            while let Some(chunk_result) = stream.next().await {
                match chunk_result {
                    Ok(chunk) => {
                        downloaded += chunk.len() as u64;
                        progress_callback(downloaded, total_size);

                        if tx.send(Ok(chunk)).await.is_err() {
                            break;
                        }
                    }
                    Err(e) => {
                        let _ = tx
                            .send(Err(std::io::Error::new(
                                std::io::ErrorKind::Other,
                                format!("Stream error: {}", e),
                            )))
                            .await;
                        break;
                    }
                }
            }
        });

        Self {
            receiver: rx,
            current_chunk: None,
            finished: false,
        }
    }

    #[cfg(target_arch = "wasm32")]
    pub fn new(
        stream: impl futures_util::Stream<Item = reqwest::Result<bytes::Bytes>> + 'static,
        progress_callback: ProgressCallback,
        total_size: Option<u64>,
    ) -> Self {
        let (tx, rx) = mpsc::channel(32);

        wasm_bindgen_futures::spawn_local(async move {
            let mut downloaded = 0u64;
            let mut stream = Box::pin(stream);

            while let Some(chunk_result) = stream.next().await {
                match chunk_result {
                    Ok(chunk) => {
                        downloaded += chunk.len() as u64;
                        progress_callback(downloaded, total_size);

                        if tx.send(Ok(chunk)).await.is_err() {
                            break;
                        }
                    }
                    Err(e) => {
                        let _ = tx
                            .send(Err(std::io::Error::new(
                                std::io::ErrorKind::Other,
                                format!("Stream error: {}", e),
                            )))
                            .await;
                        break;
                    }
                }
            }
        });

        Self {
            receiver: rx,
            current_chunk: None,
            finished: false,
        }
    }
}

impl Read for StreamReader {
    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
        if let Some(ref mut cursor) = self.current_chunk {
            let read = cursor.read(buf)?;
            if read > 0 {
                return Ok(read);
            }
            self.current_chunk = None;
        }

        if self.finished {
            return Ok(0);
        }

        match self.receiver.try_recv() {
            Ok(Ok(chunk)) => {
                self.current_chunk = Some(Cursor::new(chunk));
                if let Some(ref mut cursor) = self.current_chunk {
                    cursor.read(buf)
                } else {
                    Ok(0)
                }
            }
            Ok(Err(e)) => {
                self.finished = true;
                Err(e)
            }
            Err(mpsc::error::TryRecvError::Empty) => {
                #[cfg(not(target_arch = "wasm32"))]
                {
                    match self.receiver.blocking_recv() {
                        Some(Ok(chunk)) => {
                            self.current_chunk = Some(Cursor::new(chunk));
                            if let Some(ref mut cursor) = self.current_chunk {
                                cursor.read(buf)
                            } else {
                                Ok(0)
                            }
                        }
                        Some(Err(e)) => {
                            self.finished = true;
                            Err(e)
                        }
                        None => {
                            self.finished = true;
                            Ok(0)
                        }
                    }
                }
                #[cfg(target_arch = "wasm32")]
                {
                    Err(std::io::Error::new(
                        std::io::ErrorKind::WouldBlock,
                        "Would block in WASM",
                    ))
                }
            }
            Err(mpsc::error::TryRecvError::Disconnected) => {
                self.finished = true;
                Ok(0)
            }
        }
    }
}

pub async fn process_tarball_stream(
    stream_reader: StreamReader,
    project_analysis: &mut ProjectAnalysis,
    filter: &IntelligentFilter,
    _progress_hook: &dyn ProgressHook,
) -> Result<()> {
    #[cfg(not(target_arch = "wasm32"))]
    {
        let filter = filter.clone();
        let metrics_result = task::spawn_blocking(move || {
            let decoder = GzDecoder::new(stream_reader);
            let mut archive = Archive::new(decoder);

            let entries = archive.entries().map_err(|e| {
                AnalysisError::archive(format!("Failed to read tar entries: {}", e))
            })?;

            let mut collected_metrics = Vec::new();
            let mut stats = FilterStats::new();

            for entry in entries {
                let entry = entry.map_err(|e| {
                    AnalysisError::archive(format!("Failed to read tar entry: {}", e))
                })?;

                if let Ok(metrics) = process_tar_entry_sync(entry, &filter, &mut stats) {
                    collected_metrics.push(metrics);
                }
            }

            #[cfg(feature = "cli")]
            log::info!(
                "Filter stats: processed {}/{} files ({:.1}% filtered), saved {}",
                stats.processed,
                stats.total_entries,
                stats.filter_ratio() * 100.0,
                stats.format_bytes_saved()
            );

            Ok::<Vec<FileMetrics>, AnalysisError>(collected_metrics)
        })
        .await
        .map_err(|e| AnalysisError::archive(format!("Task join error: {}", e)))??;

        for metrics in metrics_result {
            project_analysis.add_file_metrics(metrics)?;
        }
    }

    #[cfg(target_arch = "wasm32")]
    {
        let decoder = GzDecoder::new(stream_reader);
        let mut archive = Archive::new(decoder);

        let entries = archive
            .entries()
            .map_err(|e| AnalysisError::archive(format!("Failed to read tar entries: {}", e)))?;

        let mut stats = FilterStats::new();

        for entry in entries {
            let entry = entry
                .map_err(|e| AnalysisError::archive(format!("Failed to read tar entry: {}", e)))?;

            if let Ok(metrics) = process_tar_entry_sync(entry, filter, &mut stats) {
                project_analysis.add_file_metrics(metrics)?;
            }
        }

        web_sys::console::log_1(
            &format!(
                "Filter stats: processed {}/{} files ({:.1}% filtered), saved {}",
                stats.processed,
                stats.total_entries,
                stats.filter_ratio() * 100.0,
                stats.format_bytes_saved()
            )
            .into(),
        );
    }

    Ok(())
}

fn process_tar_entry_sync<R: Read>(
    mut entry: tar::Entry<'_, R>,
    filter: &IntelligentFilter,
    stats: &mut FilterStats,
) -> Result<FileMetrics> {
    let header = entry.header();
    let path = header
        .path()
        .map_err(|e| AnalysisError::archive(format!("Invalid path in tar entry: {}", e)))?;

    let file_path = path.to_string_lossy().to_string();

    if !header.entry_type().is_file() || header.size().unwrap_or(0) == 0 {
        return Err(AnalysisError::archive("Not a file or empty".to_string()));
    }

    let file_size = header.size().unwrap_or(0);

    let should_process = filter.should_process_file(&file_path, file_size);
    stats.record_entry(file_size, !should_process);

    if !should_process {
        return Err(AnalysisError::archive("File filtered out".to_string()));
    }

    let language = LanguageRegistry::detect_by_path(&file_path)
        .map(|l| l.name.clone())
        .unwrap_or_else(|| "Text".to_string());

    let mut content = String::new();
    if entry.read_to_string(&mut content).is_err() {
        return Err(AnalysisError::archive(
            "Failed to read file content".to_string(),
        ));
    }

    analyze_file_content(&file_path, &content, &language, file_size)
}

fn analyze_file_content(
    file_path: &str,
    content: &str,
    language: &str,
    file_size: u64,
) -> Result<FileMetrics> {
    let lines: Vec<&str> = content.lines().collect();
    let total_lines = lines.len();

    let mut code_lines = 0;
    let mut comment_lines = 0;
    let mut blank_lines = 0;

    let lang_def = LanguageRegistry::get_language(language);
    let empty_line_comments = vec![];
    let empty_multi_line_comments = vec![];
    let line_comments = lang_def
        .map(|l| &l.line_comments)
        .unwrap_or(&empty_line_comments);
    let multi_line_comments = lang_def
        .map(|l| &l.multi_line_comments)
        .unwrap_or(&empty_multi_line_comments);

    let mut in_multi_line_comment = false;

    for line in lines {
        let trimmed = line.trim();

        if trimmed.is_empty() {
            blank_lines += 1;
            continue;
        }

        let mut is_comment = false;

        if !in_multi_line_comment {
            for comment_start in line_comments {
                if trimmed.starts_with(comment_start) {
                    is_comment = true;
                    break;
                }
            }

            for (start, end) in multi_line_comments {
                if trimmed.starts_with(start) {
                    is_comment = true;
                    if !trimmed.ends_with(end) {
                        in_multi_line_comment = true;
                    }
                    break;
                }
            }
        } else {
            is_comment = true;
            for (_, end) in multi_line_comments {
                if trimmed.ends_with(end) {
                    in_multi_line_comment = false;
                    break;
                }
            }
        }

        if is_comment {
            comment_lines += 1;
        } else {
            code_lines += 1;
        }
    }

    let metrics = FileMetrics::new(
        file_path,
        language.to_string(),
        total_lines,
        code_lines,
        comment_lines,
        blank_lines,
    )?
    .with_size_bytes(file_size);

    Ok(metrics)
}