Skip to main content

bytes_radar/net/
stream.rs

1use super::ProgressHook;
2use crate::core::{
3    analysis::{FileMetrics, ProjectAnalysis},
4    error::{AnalysisError, Result},
5    filter::{FilterStats, IntelligentFilter},
6    registry::LanguageRegistry,
7};
8use flate2::read::GzDecoder;
9use futures_util::StreamExt;
10use std::io::{Cursor, Read};
11use tar::Archive;
12use tokio::sync::mpsc;
13
14#[cfg(not(target_arch = "wasm32"))]
15use tokio::task;
16
17pub type ProgressCallback = Box<dyn Fn(u64, Option<u64>) + Send + Sync>;
18
19pub struct StreamReader {
20    receiver: mpsc::Receiver<std::io::Result<bytes::Bytes>>,
21    current_chunk: Option<Cursor<bytes::Bytes>>,
22    finished: bool,
23}
24
25impl StreamReader {
26    #[cfg(not(target_arch = "wasm32"))]
27    pub fn new(
28        stream: impl futures_util::Stream<Item = reqwest::Result<bytes::Bytes>> + Send + 'static,
29        progress_callback: ProgressCallback,
30        total_size: Option<u64>,
31    ) -> Self {
32        let (tx, rx) = mpsc::channel(32);
33
34        tokio::spawn(async move {
35            let mut downloaded = 0u64;
36            let mut stream = Box::pin(stream);
37
38            while let Some(chunk_result) = stream.next().await {
39                match chunk_result {
40                    Ok(chunk) => {
41                        downloaded += chunk.len() as u64;
42                        progress_callback(downloaded, total_size);
43
44                        if tx.send(Ok(chunk)).await.is_err() {
45                            break;
46                        }
47                    }
48                    Err(e) => {
49                        let _ = tx
50                            .send(Err(std::io::Error::new(
51                                std::io::ErrorKind::Other,
52                                format!("Stream error: {}", e),
53                            )))
54                            .await;
55                        break;
56                    }
57                }
58            }
59        });
60
61        Self {
62            receiver: rx,
63            current_chunk: None,
64            finished: false,
65        }
66    }
67
68    #[cfg(target_arch = "wasm32")]
69    pub fn new(
70        stream: impl futures_util::Stream<Item = reqwest::Result<bytes::Bytes>> + 'static,
71        progress_callback: ProgressCallback,
72        total_size: Option<u64>,
73    ) -> Self {
74        let (tx, rx) = mpsc::channel(32);
75
76        wasm_bindgen_futures::spawn_local(async move {
77            let mut downloaded = 0u64;
78            let mut stream = Box::pin(stream);
79
80            while let Some(chunk_result) = stream.next().await {
81                match chunk_result {
82                    Ok(chunk) => {
83                        downloaded += chunk.len() as u64;
84                        progress_callback(downloaded, total_size);
85
86                        if tx.send(Ok(chunk)).await.is_err() {
87                            break;
88                        }
89                    }
90                    Err(e) => {
91                        let _ = tx
92                            .send(Err(std::io::Error::new(
93                                std::io::ErrorKind::Other,
94                                format!("Stream error: {}", e),
95                            )))
96                            .await;
97                        break;
98                    }
99                }
100            }
101        });
102
103        Self {
104            receiver: rx,
105            current_chunk: None,
106            finished: false,
107        }
108    }
109}
110
111impl Read for StreamReader {
112    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
113        if let Some(ref mut cursor) = self.current_chunk {
114            let read = cursor.read(buf)?;
115            if read > 0 {
116                return Ok(read);
117            }
118            self.current_chunk = None;
119        }
120
121        if self.finished {
122            return Ok(0);
123        }
124
125        match self.receiver.try_recv() {
126            Ok(Ok(chunk)) => {
127                self.current_chunk = Some(Cursor::new(chunk));
128                if let Some(ref mut cursor) = self.current_chunk {
129                    cursor.read(buf)
130                } else {
131                    Ok(0)
132                }
133            }
134            Ok(Err(e)) => {
135                self.finished = true;
136                Err(e)
137            }
138            Err(mpsc::error::TryRecvError::Empty) => {
139                #[cfg(not(target_arch = "wasm32"))]
140                {
141                    match self.receiver.blocking_recv() {
142                        Some(Ok(chunk)) => {
143                            self.current_chunk = Some(Cursor::new(chunk));
144                            if let Some(ref mut cursor) = self.current_chunk {
145                                cursor.read(buf)
146                            } else {
147                                Ok(0)
148                            }
149                        }
150                        Some(Err(e)) => {
151                            self.finished = true;
152                            Err(e)
153                        }
154                        None => {
155                            self.finished = true;
156                            Ok(0)
157                        }
158                    }
159                }
160                #[cfg(target_arch = "wasm32")]
161                {
162                    Err(std::io::Error::new(
163                        std::io::ErrorKind::WouldBlock,
164                        "Would block in WASM",
165                    ))
166                }
167            }
168            Err(mpsc::error::TryRecvError::Disconnected) => {
169                self.finished = true;
170                Ok(0)
171            }
172        }
173    }
174}
175
176pub async fn process_tarball(
177    bytes: bytes::Bytes,
178    project_analysis: &mut ProjectAnalysis,
179    filter: &IntelligentFilter,
180    _progress_hook: &dyn ProgressHook,
181) -> Result<()> {
182    let decoder = GzDecoder::new(Cursor::new(bytes));
183    let mut archive = Archive::new(decoder);
184
185    let entries = archive
186        .entries()
187        .map_err(|e| AnalysisError::archive(format!("Failed to read tar entries: {}", e)))?;
188
189    let mut stats = FilterStats::new();
190
191    for entry in entries {
192        let entry = entry
193            .map_err(|e| AnalysisError::archive(format!("Failed to read tar entry: {}", e)))?;
194
195        if let Ok(metrics) = process_tar_entry_sync(entry, filter, &mut stats) {
196            project_analysis.add_file_metrics(metrics)?;
197        }
198    }
199
200    #[cfg(feature = "cli")]
201    log::info!(
202        "Filter stats: processed {}/{} files ({:.1}% filtered), saved {}",
203        stats.processed,
204        stats.total_entries,
205        stats.filter_ratio() * 100.0,
206        stats.format_bytes_saved()
207    );
208
209    Ok(())
210}
211
212pub async fn process_tarball_stream(
213    stream_reader: StreamReader,
214    project_analysis: &mut ProjectAnalysis,
215    filter: &IntelligentFilter,
216    _progress_hook: &dyn ProgressHook,
217) -> Result<()> {
218    #[cfg(not(target_arch = "wasm32"))]
219    {
220        let filter = filter.clone();
221        let metrics_result = task::spawn_blocking(move || {
222            let decoder = GzDecoder::new(stream_reader);
223            let mut archive = Archive::new(decoder);
224
225            let entries = archive.entries().map_err(|e| {
226                AnalysisError::archive(format!("Failed to read tar entries: {}", e))
227            })?;
228
229            let mut collected_metrics = Vec::new();
230            let mut stats = FilterStats::new();
231
232            for entry in entries {
233                let entry = entry.map_err(|e| {
234                    AnalysisError::archive(format!("Failed to read tar entry: {}", e))
235                })?;
236
237                if let Ok(metrics) = process_tar_entry_sync(entry, &filter, &mut stats) {
238                    collected_metrics.push(metrics);
239                }
240            }
241
242            #[cfg(feature = "cli")]
243            log::info!(
244                "Filter stats: processed {}/{} files ({:.1}% filtered), saved {}",
245                stats.processed,
246                stats.total_entries,
247                stats.filter_ratio() * 100.0,
248                stats.format_bytes_saved()
249            );
250
251            Ok::<Vec<FileMetrics>, AnalysisError>(collected_metrics)
252        })
253        .await
254        .map_err(|e| AnalysisError::archive(format!("Task join error: {}", e)))??;
255
256        for metrics in metrics_result {
257            project_analysis.add_file_metrics(metrics)?;
258        }
259    }
260
261    #[cfg(target_arch = "wasm32")]
262    {
263        let decoder = GzDecoder::new(stream_reader);
264        let mut archive = Archive::new(decoder);
265
266        let entries = archive
267            .entries()
268            .map_err(|e| AnalysisError::archive(format!("Failed to read tar entries: {}", e)))?;
269
270        let mut stats = FilterStats::new();
271
272        for entry in entries {
273            let entry = entry
274                .map_err(|e| AnalysisError::archive(format!("Failed to read tar entry: {}", e)))?;
275
276            if let Ok(metrics) = process_tar_entry_sync(entry, filter, &mut stats) {
277                project_analysis.add_file_metrics(metrics)?;
278            }
279        }
280    }
281
282    Ok(())
283}
284
285fn process_tar_entry_sync<R: Read>(
286    mut entry: tar::Entry<'_, R>,
287    filter: &IntelligentFilter,
288    stats: &mut FilterStats,
289) -> Result<FileMetrics> {
290    let header = entry.header();
291    let path = header
292        .path()
293        .map_err(|e| AnalysisError::archive(format!("Invalid path in tar entry: {}", e)))?;
294
295    let file_path = path.to_string_lossy().to_string();
296
297    if !header.entry_type().is_file() || header.size().unwrap_or(0) == 0 {
298        return Err(AnalysisError::archive("Not a file or empty".to_string()));
299    }
300
301    let file_size = header.size().unwrap_or(0);
302
303    let should_process = filter.should_process_file(&file_path, file_size);
304    stats.record_entry(file_size, !should_process);
305
306    if !should_process {
307        return Err(AnalysisError::archive("File filtered out".to_string()));
308    }
309
310    let language = LanguageRegistry::detect_by_path(&file_path)
311        .map(|l| l.name.clone())
312        .unwrap_or_else(|| "Text".to_string());
313
314    let mut content = String::new();
315    if entry.read_to_string(&mut content).is_err() {
316        return Err(AnalysisError::archive(
317            "Failed to read file content".to_string(),
318        ));
319    }
320
321    analyze_file_content(&file_path, &content, &language, file_size)
322}
323
324fn analyze_file_content(
325    file_path: &str,
326    content: &str,
327    language: &str,
328    file_size: u64,
329) -> Result<FileMetrics> {
330    let lines: Vec<&str> = content.lines().collect();
331    let total_lines = lines.len();
332
333    let mut code_lines = 0;
334    let mut comment_lines = 0;
335    let mut blank_lines = 0;
336
337    let lang_def = LanguageRegistry::get_language(language);
338    let empty_line_comments = vec![];
339    let empty_multi_line_comments = vec![];
340    let line_comments = lang_def
341        .map(|l| &l.line_comments)
342        .unwrap_or(&empty_line_comments);
343    let multi_line_comments = lang_def
344        .map(|l| &l.multi_line_comments)
345        .unwrap_or(&empty_multi_line_comments);
346
347    let mut in_multi_line_comment = false;
348
349    for line in lines {
350        let trimmed = line.trim();
351
352        if trimmed.is_empty() {
353            blank_lines += 1;
354            continue;
355        }
356
357        let mut is_comment = false;
358
359        if !in_multi_line_comment {
360            for comment_start in line_comments {
361                if trimmed.starts_with(comment_start) {
362                    is_comment = true;
363                    break;
364                }
365            }
366
367            for (start, end) in multi_line_comments {
368                if trimmed.starts_with(start) {
369                    is_comment = true;
370                    if !trimmed.ends_with(end) {
371                        in_multi_line_comment = true;
372                    }
373                    break;
374                }
375            }
376        } else {
377            is_comment = true;
378            for (_, end) in multi_line_comments {
379                if trimmed.ends_with(end) {
380                    in_multi_line_comment = false;
381                    break;
382                }
383            }
384        }
385
386        if is_comment {
387            comment_lines += 1;
388        } else {
389            code_lines += 1;
390        }
391    }
392
393    let metrics = FileMetrics::new(
394        file_path,
395        language.to_string(),
396        total_lines,
397        code_lines,
398        comment_lines,
399        blank_lines,
400    )?
401    .with_size_bytes(file_size);
402
403    Ok(metrics)
404}