bytes_radar/net/
stream.rs1use super::ProgressHook;
2use crate::core::{
3 analysis::{FileMetrics, ProjectAnalysis},
4 error::{AnalysisError, Result},
5 filter::{FilterStats, IntelligentFilter},
6 registry::LanguageRegistry,
7};
8use flate2::read::GzDecoder;
9use futures_util::StreamExt;
10use std::io::{Cursor, Read};
11use tar::Archive;
12use tokio::sync::mpsc;
13
14#[cfg(not(target_arch = "wasm32"))]
15use tokio::task;
16
17pub type ProgressCallback = Box<dyn Fn(u64, Option<u64>) + Send + Sync>;
18
19pub struct StreamReader {
20 receiver: mpsc::Receiver<std::io::Result<bytes::Bytes>>,
21 current_chunk: Option<Cursor<bytes::Bytes>>,
22 finished: bool,
23}
24
25impl StreamReader {
26 #[cfg(not(target_arch = "wasm32"))]
27 pub fn new(
28 stream: impl futures_util::Stream<Item = reqwest::Result<bytes::Bytes>> + Send + 'static,
29 progress_callback: ProgressCallback,
30 total_size: Option<u64>,
31 ) -> Self {
32 let (tx, rx) = mpsc::channel(32);
33
34 tokio::spawn(async move {
35 let mut downloaded = 0u64;
36 let mut stream = Box::pin(stream);
37
38 while let Some(chunk_result) = stream.next().await {
39 match chunk_result {
40 Ok(chunk) => {
41 downloaded += chunk.len() as u64;
42 progress_callback(downloaded, total_size);
43
44 if tx.send(Ok(chunk)).await.is_err() {
45 break;
46 }
47 }
48 Err(e) => {
49 let _ = tx
50 .send(Err(std::io::Error::new(
51 std::io::ErrorKind::Other,
52 format!("Stream error: {}", e),
53 )))
54 .await;
55 break;
56 }
57 }
58 }
59 });
60
61 Self {
62 receiver: rx,
63 current_chunk: None,
64 finished: false,
65 }
66 }
67
68 #[cfg(target_arch = "wasm32")]
69 pub fn new(
70 stream: impl futures_util::Stream<Item = reqwest::Result<bytes::Bytes>> + 'static,
71 progress_callback: ProgressCallback,
72 total_size: Option<u64>,
73 ) -> Self {
74 let (tx, rx) = mpsc::channel(32);
75
76 wasm_bindgen_futures::spawn_local(async move {
77 let mut downloaded = 0u64;
78 let mut stream = Box::pin(stream);
79
80 while let Some(chunk_result) = stream.next().await {
81 match chunk_result {
82 Ok(chunk) => {
83 downloaded += chunk.len() as u64;
84 progress_callback(downloaded, total_size);
85
86 if tx.send(Ok(chunk)).await.is_err() {
87 break;
88 }
89 }
90 Err(e) => {
91 let _ = tx
92 .send(Err(std::io::Error::new(
93 std::io::ErrorKind::Other,
94 format!("Stream error: {}", e),
95 )))
96 .await;
97 break;
98 }
99 }
100 }
101 });
102
103 Self {
104 receiver: rx,
105 current_chunk: None,
106 finished: false,
107 }
108 }
109}
110
111impl Read for StreamReader {
112 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
113 if let Some(ref mut cursor) = self.current_chunk {
114 let read = cursor.read(buf)?;
115 if read > 0 {
116 return Ok(read);
117 }
118 self.current_chunk = None;
119 }
120
121 if self.finished {
122 return Ok(0);
123 }
124
125 match self.receiver.try_recv() {
126 Ok(Ok(chunk)) => {
127 self.current_chunk = Some(Cursor::new(chunk));
128 if let Some(ref mut cursor) = self.current_chunk {
129 cursor.read(buf)
130 } else {
131 Ok(0)
132 }
133 }
134 Ok(Err(e)) => {
135 self.finished = true;
136 Err(e)
137 }
138 Err(mpsc::error::TryRecvError::Empty) => {
139 #[cfg(not(target_arch = "wasm32"))]
140 {
141 match self.receiver.blocking_recv() {
142 Some(Ok(chunk)) => {
143 self.current_chunk = Some(Cursor::new(chunk));
144 if let Some(ref mut cursor) = self.current_chunk {
145 cursor.read(buf)
146 } else {
147 Ok(0)
148 }
149 }
150 Some(Err(e)) => {
151 self.finished = true;
152 Err(e)
153 }
154 None => {
155 self.finished = true;
156 Ok(0)
157 }
158 }
159 }
160 #[cfg(target_arch = "wasm32")]
161 {
162 Err(std::io::Error::new(
163 std::io::ErrorKind::WouldBlock,
164 "Would block in WASM",
165 ))
166 }
167 }
168 Err(mpsc::error::TryRecvError::Disconnected) => {
169 self.finished = true;
170 Ok(0)
171 }
172 }
173 }
174}
175
176pub async fn process_tarball_stream(
177 stream_reader: StreamReader,
178 project_analysis: &mut ProjectAnalysis,
179 filter: &IntelligentFilter,
180 _progress_hook: &dyn ProgressHook,
181) -> Result<()> {
182 #[cfg(not(target_arch = "wasm32"))]
183 {
184 let filter = filter.clone();
185 let metrics_result = task::spawn_blocking(move || {
186 let decoder = GzDecoder::new(stream_reader);
187 let mut archive = Archive::new(decoder);
188
189 let entries = archive.entries().map_err(|e| {
190 AnalysisError::archive(format!("Failed to read tar entries: {}", e))
191 })?;
192
193 let mut collected_metrics = Vec::new();
194 let mut stats = FilterStats::new();
195
196 for entry in entries {
197 let entry = entry.map_err(|e| {
198 AnalysisError::archive(format!("Failed to read tar entry: {}", e))
199 })?;
200
201 if let Ok(metrics) = process_tar_entry_sync(entry, &filter, &mut stats) {
202 collected_metrics.push(metrics);
203 }
204 }
205
206 #[cfg(feature = "cli")]
207 log::info!(
208 "Filter stats: processed {}/{} files ({:.1}% filtered), saved {}",
209 stats.processed,
210 stats.total_entries,
211 stats.filter_ratio() * 100.0,
212 stats.format_bytes_saved()
213 );
214
215 Ok::<Vec<FileMetrics>, AnalysisError>(collected_metrics)
216 })
217 .await
218 .map_err(|e| AnalysisError::archive(format!("Task join error: {}", e)))??;
219
220 for metrics in metrics_result {
221 project_analysis.add_file_metrics(metrics)?;
222 }
223 }
224
225 #[cfg(target_arch = "wasm32")]
226 {
227 let decoder = GzDecoder::new(stream_reader);
228 let mut archive = Archive::new(decoder);
229
230 let entries = archive
231 .entries()
232 .map_err(|e| AnalysisError::archive(format!("Failed to read tar entries: {}", e)))?;
233
234 let mut stats = FilterStats::new();
235
236 for entry in entries {
237 let entry = entry
238 .map_err(|e| AnalysisError::archive(format!("Failed to read tar entry: {}", e)))?;
239
240 if let Ok(metrics) = process_tar_entry_sync(entry, filter, &mut stats) {
241 project_analysis.add_file_metrics(metrics)?;
242 }
243 }
244
245 web_sys::console::log_1(
246 &format!(
247 "Filter stats: processed {}/{} files ({:.1}% filtered), saved {}",
248 stats.processed,
249 stats.total_entries,
250 stats.filter_ratio() * 100.0,
251 stats.format_bytes_saved()
252 )
253 .into(),
254 );
255 }
256
257 Ok(())
258}
259
260fn process_tar_entry_sync<R: Read>(
261 mut entry: tar::Entry<'_, R>,
262 filter: &IntelligentFilter,
263 stats: &mut FilterStats,
264) -> Result<FileMetrics> {
265 let header = entry.header();
266 let path = header
267 .path()
268 .map_err(|e| AnalysisError::archive(format!("Invalid path in tar entry: {}", e)))?;
269
270 let file_path = path.to_string_lossy().to_string();
271
272 if !header.entry_type().is_file() || header.size().unwrap_or(0) == 0 {
273 return Err(AnalysisError::archive("Not a file or empty".to_string()));
274 }
275
276 let file_size = header.size().unwrap_or(0);
277
278 let should_process = filter.should_process_file(&file_path, file_size);
279 stats.record_entry(file_size, !should_process);
280
281 if !should_process {
282 return Err(AnalysisError::archive("File filtered out".to_string()));
283 }
284
285 let language = LanguageRegistry::detect_by_path(&file_path)
286 .map(|l| l.name.clone())
287 .unwrap_or_else(|| "Text".to_string());
288
289 let mut content = String::new();
290 if entry.read_to_string(&mut content).is_err() {
291 return Err(AnalysisError::archive(
292 "Failed to read file content".to_string(),
293 ));
294 }
295
296 analyze_file_content(&file_path, &content, &language, file_size)
297}
298
299fn analyze_file_content(
300 file_path: &str,
301 content: &str,
302 language: &str,
303 file_size: u64,
304) -> Result<FileMetrics> {
305 let lines: Vec<&str> = content.lines().collect();
306 let total_lines = lines.len();
307
308 let mut code_lines = 0;
309 let mut comment_lines = 0;
310 let mut blank_lines = 0;
311
312 let lang_def = LanguageRegistry::get_language(language);
313 let empty_line_comments = vec![];
314 let empty_multi_line_comments = vec![];
315 let line_comments = lang_def
316 .map(|l| &l.line_comments)
317 .unwrap_or(&empty_line_comments);
318 let multi_line_comments = lang_def
319 .map(|l| &l.multi_line_comments)
320 .unwrap_or(&empty_multi_line_comments);
321
322 let mut in_multi_line_comment = false;
323
324 for line in lines {
325 let trimmed = line.trim();
326
327 if trimmed.is_empty() {
328 blank_lines += 1;
329 continue;
330 }
331
332 let mut is_comment = false;
333
334 if !in_multi_line_comment {
335 for comment_start in line_comments {
336 if trimmed.starts_with(comment_start) {
337 is_comment = true;
338 break;
339 }
340 }
341
342 for (start, end) in multi_line_comments {
343 if trimmed.starts_with(start) {
344 is_comment = true;
345 if !trimmed.ends_with(end) {
346 in_multi_line_comment = true;
347 }
348 break;
349 }
350 }
351 } else {
352 is_comment = true;
353 for (_, end) in multi_line_comments {
354 if trimmed.ends_with(end) {
355 in_multi_line_comment = false;
356 break;
357 }
358 }
359 }
360
361 if is_comment {
362 comment_lines += 1;
363 } else {
364 code_lines += 1;
365 }
366 }
367
368 let metrics = FileMetrics::new(
369 file_path,
370 language.to_string(),
371 total_lines,
372 code_lines,
373 comment_lines,
374 blank_lines,
375 )?
376 .with_size_bytes(file_size);
377
378 Ok(metrics)
379}