bytes_radar/net/
stream.rs

1use super::ProgressHook;
2use crate::core::{
3    analysis::{FileMetrics, ProjectAnalysis},
4    error::{AnalysisError, Result},
5    filter::{FilterStats, IntelligentFilter},
6    registry::LanguageRegistry,
7};
8use flate2::read::GzDecoder;
9use futures_util::StreamExt;
10use std::io::{Cursor, Read};
11use tar::Archive;
12use tokio::sync::mpsc;
13
14#[cfg(not(target_arch = "wasm32"))]
15use tokio::task;
16
17pub type ProgressCallback = Box<dyn Fn(u64, Option<u64>) + Send + Sync>;
18
19pub struct StreamReader {
20    receiver: mpsc::Receiver<std::io::Result<bytes::Bytes>>,
21    current_chunk: Option<Cursor<bytes::Bytes>>,
22    finished: bool,
23}
24
25impl StreamReader {
26    #[cfg(not(target_arch = "wasm32"))]
27    pub fn new(
28        stream: impl futures_util::Stream<Item = reqwest::Result<bytes::Bytes>> + Send + 'static,
29        progress_callback: ProgressCallback,
30        total_size: Option<u64>,
31    ) -> Self {
32        let (tx, rx) = mpsc::channel(32);
33
34        tokio::spawn(async move {
35            let mut downloaded = 0u64;
36            let mut stream = Box::pin(stream);
37
38            while let Some(chunk_result) = stream.next().await {
39                match chunk_result {
40                    Ok(chunk) => {
41                        downloaded += chunk.len() as u64;
42                        progress_callback(downloaded, total_size);
43
44                        if tx.send(Ok(chunk)).await.is_err() {
45                            break;
46                        }
47                    }
48                    Err(e) => {
49                        let _ = tx
50                            .send(Err(std::io::Error::new(
51                                std::io::ErrorKind::Other,
52                                format!("Stream error: {}", e),
53                            )))
54                            .await;
55                        break;
56                    }
57                }
58            }
59        });
60
61        Self {
62            receiver: rx,
63            current_chunk: None,
64            finished: false,
65        }
66    }
67
68    #[cfg(target_arch = "wasm32")]
69    pub fn new(
70        stream: impl futures_util::Stream<Item = reqwest::Result<bytes::Bytes>> + 'static,
71        progress_callback: ProgressCallback,
72        total_size: Option<u64>,
73    ) -> Self {
74        let (tx, rx) = mpsc::channel(32);
75
76        wasm_bindgen_futures::spawn_local(async move {
77            let mut downloaded = 0u64;
78            let mut stream = Box::pin(stream);
79
80            while let Some(chunk_result) = stream.next().await {
81                match chunk_result {
82                    Ok(chunk) => {
83                        downloaded += chunk.len() as u64;
84                        progress_callback(downloaded, total_size);
85
86                        if tx.send(Ok(chunk)).await.is_err() {
87                            break;
88                        }
89                    }
90                    Err(e) => {
91                        let _ = tx
92                            .send(Err(std::io::Error::new(
93                                std::io::ErrorKind::Other,
94                                format!("Stream error: {}", e),
95                            )))
96                            .await;
97                        break;
98                    }
99                }
100            }
101        });
102
103        Self {
104            receiver: rx,
105            current_chunk: None,
106            finished: false,
107        }
108    }
109}
110
111impl Read for StreamReader {
112    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
113        if let Some(ref mut cursor) = self.current_chunk {
114            let read = cursor.read(buf)?;
115            if read > 0 {
116                return Ok(read);
117            }
118            self.current_chunk = None;
119        }
120
121        if self.finished {
122            return Ok(0);
123        }
124
125        match self.receiver.try_recv() {
126            Ok(Ok(chunk)) => {
127                self.current_chunk = Some(Cursor::new(chunk));
128                if let Some(ref mut cursor) = self.current_chunk {
129                    cursor.read(buf)
130                } else {
131                    Ok(0)
132                }
133            }
134            Ok(Err(e)) => {
135                self.finished = true;
136                Err(e)
137            }
138            Err(mpsc::error::TryRecvError::Empty) => {
139                #[cfg(not(target_arch = "wasm32"))]
140                {
141                    match self.receiver.blocking_recv() {
142                        Some(Ok(chunk)) => {
143                            self.current_chunk = Some(Cursor::new(chunk));
144                            if let Some(ref mut cursor) = self.current_chunk {
145                                cursor.read(buf)
146                            } else {
147                                Ok(0)
148                            }
149                        }
150                        Some(Err(e)) => {
151                            self.finished = true;
152                            Err(e)
153                        }
154                        None => {
155                            self.finished = true;
156                            Ok(0)
157                        }
158                    }
159                }
160                #[cfg(target_arch = "wasm32")]
161                {
162                    Err(std::io::Error::new(
163                        std::io::ErrorKind::WouldBlock,
164                        "Would block in WASM",
165                    ))
166                }
167            }
168            Err(mpsc::error::TryRecvError::Disconnected) => {
169                self.finished = true;
170                Ok(0)
171            }
172        }
173    }
174}
175
176pub async fn process_tarball_stream(
177    stream_reader: StreamReader,
178    project_analysis: &mut ProjectAnalysis,
179    filter: &IntelligentFilter,
180    _progress_hook: &dyn ProgressHook,
181) -> Result<()> {
182    #[cfg(not(target_arch = "wasm32"))]
183    {
184        let filter = filter.clone();
185        let metrics_result = task::spawn_blocking(move || {
186            let decoder = GzDecoder::new(stream_reader);
187            let mut archive = Archive::new(decoder);
188
189            let entries = archive.entries().map_err(|e| {
190                AnalysisError::archive(format!("Failed to read tar entries: {}", e))
191            })?;
192
193            let mut collected_metrics = Vec::new();
194            let mut stats = FilterStats::new();
195
196            for entry in entries {
197                let entry = entry.map_err(|e| {
198                    AnalysisError::archive(format!("Failed to read tar entry: {}", e))
199                })?;
200
201                if let Ok(metrics) = process_tar_entry_sync(entry, &filter, &mut stats) {
202                    collected_metrics.push(metrics);
203                }
204            }
205
206            #[cfg(feature = "cli")]
207            log::info!(
208                "Filter stats: processed {}/{} files ({:.1}% filtered), saved {}",
209                stats.processed,
210                stats.total_entries,
211                stats.filter_ratio() * 100.0,
212                stats.format_bytes_saved()
213            );
214
215            Ok::<Vec<FileMetrics>, AnalysisError>(collected_metrics)
216        })
217        .await
218        .map_err(|e| AnalysisError::archive(format!("Task join error: {}", e)))??;
219
220        for metrics in metrics_result {
221            project_analysis.add_file_metrics(metrics)?;
222        }
223    }
224
225    #[cfg(target_arch = "wasm32")]
226    {
227        let decoder = GzDecoder::new(stream_reader);
228        let mut archive = Archive::new(decoder);
229
230        let entries = archive
231            .entries()
232            .map_err(|e| AnalysisError::archive(format!("Failed to read tar entries: {}", e)))?;
233
234        let mut stats = FilterStats::new();
235
236        for entry in entries {
237            let entry = entry
238                .map_err(|e| AnalysisError::archive(format!("Failed to read tar entry: {}", e)))?;
239
240            if let Ok(metrics) = process_tar_entry_sync(entry, filter, &mut stats) {
241                project_analysis.add_file_metrics(metrics)?;
242            }
243        }
244
245        web_sys::console::log_1(
246            &format!(
247                "Filter stats: processed {}/{} files ({:.1}% filtered), saved {}",
248                stats.processed,
249                stats.total_entries,
250                stats.filter_ratio() * 100.0,
251                stats.format_bytes_saved()
252            )
253            .into(),
254        );
255    }
256
257    Ok(())
258}
259
260fn process_tar_entry_sync<R: Read>(
261    mut entry: tar::Entry<'_, R>,
262    filter: &IntelligentFilter,
263    stats: &mut FilterStats,
264) -> Result<FileMetrics> {
265    let header = entry.header();
266    let path = header
267        .path()
268        .map_err(|e| AnalysisError::archive(format!("Invalid path in tar entry: {}", e)))?;
269
270    let file_path = path.to_string_lossy().to_string();
271
272    if !header.entry_type().is_file() || header.size().unwrap_or(0) == 0 {
273        return Err(AnalysisError::archive("Not a file or empty".to_string()));
274    }
275
276    let file_size = header.size().unwrap_or(0);
277
278    let should_process = filter.should_process_file(&file_path, file_size);
279    stats.record_entry(file_size, !should_process);
280
281    if !should_process {
282        return Err(AnalysisError::archive("File filtered out".to_string()));
283    }
284
285    let language = LanguageRegistry::detect_by_path(&file_path)
286        .map(|l| l.name.clone())
287        .unwrap_or_else(|| "Text".to_string());
288
289    let mut content = String::new();
290    if entry.read_to_string(&mut content).is_err() {
291        return Err(AnalysisError::archive(
292            "Failed to read file content".to_string(),
293        ));
294    }
295
296    analyze_file_content(&file_path, &content, &language, file_size)
297}
298
299fn analyze_file_content(
300    file_path: &str,
301    content: &str,
302    language: &str,
303    file_size: u64,
304) -> Result<FileMetrics> {
305    let lines: Vec<&str> = content.lines().collect();
306    let total_lines = lines.len();
307
308    let mut code_lines = 0;
309    let mut comment_lines = 0;
310    let mut blank_lines = 0;
311
312    let lang_def = LanguageRegistry::get_language(language);
313    let empty_line_comments = vec![];
314    let empty_multi_line_comments = vec![];
315    let line_comments = lang_def
316        .map(|l| &l.line_comments)
317        .unwrap_or(&empty_line_comments);
318    let multi_line_comments = lang_def
319        .map(|l| &l.multi_line_comments)
320        .unwrap_or(&empty_multi_line_comments);
321
322    let mut in_multi_line_comment = false;
323
324    for line in lines {
325        let trimmed = line.trim();
326
327        if trimmed.is_empty() {
328            blank_lines += 1;
329            continue;
330        }
331
332        let mut is_comment = false;
333
334        if !in_multi_line_comment {
335            for comment_start in line_comments {
336                if trimmed.starts_with(comment_start) {
337                    is_comment = true;
338                    break;
339                }
340            }
341
342            for (start, end) in multi_line_comments {
343                if trimmed.starts_with(start) {
344                    is_comment = true;
345                    if !trimmed.ends_with(end) {
346                        in_multi_line_comment = true;
347                    }
348                    break;
349                }
350            }
351        } else {
352            is_comment = true;
353            for (_, end) in multi_line_comments {
354                if trimmed.ends_with(end) {
355                    in_multi_line_comment = false;
356                    break;
357                }
358            }
359        }
360
361        if is_comment {
362            comment_lines += 1;
363        } else {
364            code_lines += 1;
365        }
366    }
367
368    let metrics = FileMetrics::new(
369        file_path,
370        language.to_string(),
371        total_lines,
372        code_lines,
373        comment_lines,
374        blank_lines,
375    )?
376    .with_size_bytes(file_size);
377
378    Ok(metrics)
379}