1use super::ProgressHook;
2use crate::core::{
3 analysis::{FileMetrics, ProjectAnalysis},
4 error::{AnalysisError, Result},
5 filter::{FilterStats, IntelligentFilter},
6 registry::LanguageRegistry,
7};
8use flate2::read::GzDecoder;
9use futures_util::StreamExt;
10use std::io::{Cursor, Read};
11use tar::Archive;
12use tokio::sync::mpsc;
13
14#[cfg(not(target_arch = "wasm32"))]
15use tokio::task;
16
17pub type ProgressCallback = Box<dyn Fn(u64, Option<u64>) + Send + Sync>;
18
19pub struct StreamReader {
20 receiver: mpsc::Receiver<std::io::Result<bytes::Bytes>>,
21 current_chunk: Option<Cursor<bytes::Bytes>>,
22 finished: bool,
23}
24
25impl StreamReader {
26 #[cfg(not(target_arch = "wasm32"))]
27 pub fn new(
28 stream: impl futures_util::Stream<Item = reqwest::Result<bytes::Bytes>> + Send + 'static,
29 progress_callback: ProgressCallback,
30 total_size: Option<u64>,
31 ) -> Self {
32 let (tx, rx) = mpsc::channel(32);
33
34 tokio::spawn(async move {
35 let mut downloaded = 0u64;
36 let mut stream = Box::pin(stream);
37
38 while let Some(chunk_result) = stream.next().await {
39 match chunk_result {
40 Ok(chunk) => {
41 downloaded += chunk.len() as u64;
42 progress_callback(downloaded, total_size);
43
44 if tx.send(Ok(chunk)).await.is_err() {
45 break;
46 }
47 }
48 Err(e) => {
49 let _ = tx
50 .send(Err(std::io::Error::new(
51 std::io::ErrorKind::Other,
52 format!("Stream error: {}", e),
53 )))
54 .await;
55 break;
56 }
57 }
58 }
59 });
60
61 Self {
62 receiver: rx,
63 current_chunk: None,
64 finished: false,
65 }
66 }
67
68 #[cfg(target_arch = "wasm32")]
69 pub fn new(
70 stream: impl futures_util::Stream<Item = reqwest::Result<bytes::Bytes>> + 'static,
71 progress_callback: ProgressCallback,
72 total_size: Option<u64>,
73 ) -> Self {
74 let (tx, rx) = mpsc::channel(32);
75
76 wasm_bindgen_futures::spawn_local(async move {
77 let mut downloaded = 0u64;
78 let mut stream = Box::pin(stream);
79
80 while let Some(chunk_result) = stream.next().await {
81 match chunk_result {
82 Ok(chunk) => {
83 downloaded += chunk.len() as u64;
84 progress_callback(downloaded, total_size);
85
86 if tx.send(Ok(chunk)).await.is_err() {
87 break;
88 }
89 }
90 Err(e) => {
91 let _ = tx
92 .send(Err(std::io::Error::new(
93 std::io::ErrorKind::Other,
94 format!("Stream error: {}", e),
95 )))
96 .await;
97 break;
98 }
99 }
100 }
101 });
102
103 Self {
104 receiver: rx,
105 current_chunk: None,
106 finished: false,
107 }
108 }
109}
110
111impl Read for StreamReader {
112 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
113 if let Some(ref mut cursor) = self.current_chunk {
114 let read = cursor.read(buf)?;
115 if read > 0 {
116 return Ok(read);
117 }
118 self.current_chunk = None;
119 }
120
121 if self.finished {
122 return Ok(0);
123 }
124
125 match self.receiver.try_recv() {
126 Ok(Ok(chunk)) => {
127 self.current_chunk = Some(Cursor::new(chunk));
128 if let Some(ref mut cursor) = self.current_chunk {
129 cursor.read(buf)
130 } else {
131 Ok(0)
132 }
133 }
134 Ok(Err(e)) => {
135 self.finished = true;
136 Err(e)
137 }
138 Err(mpsc::error::TryRecvError::Empty) => {
139 #[cfg(not(target_arch = "wasm32"))]
140 {
141 match self.receiver.blocking_recv() {
142 Some(Ok(chunk)) => {
143 self.current_chunk = Some(Cursor::new(chunk));
144 if let Some(ref mut cursor) = self.current_chunk {
145 cursor.read(buf)
146 } else {
147 Ok(0)
148 }
149 }
150 Some(Err(e)) => {
151 self.finished = true;
152 Err(e)
153 }
154 None => {
155 self.finished = true;
156 Ok(0)
157 }
158 }
159 }
160 #[cfg(target_arch = "wasm32")]
161 {
162 Err(std::io::Error::new(
163 std::io::ErrorKind::WouldBlock,
164 "Would block in WASM",
165 ))
166 }
167 }
168 Err(mpsc::error::TryRecvError::Disconnected) => {
169 self.finished = true;
170 Ok(0)
171 }
172 }
173 }
174}
175
176pub async fn process_tarball(
177 bytes: bytes::Bytes,
178 project_analysis: &mut ProjectAnalysis,
179 filter: &IntelligentFilter,
180 _progress_hook: &dyn ProgressHook,
181) -> Result<()> {
182 let decoder = GzDecoder::new(Cursor::new(bytes));
183 let mut archive = Archive::new(decoder);
184
185 let entries = archive
186 .entries()
187 .map_err(|e| AnalysisError::archive(format!("Failed to read tar entries: {}", e)))?;
188
189 let mut stats = FilterStats::new();
190
191 for entry in entries {
192 let entry = entry
193 .map_err(|e| AnalysisError::archive(format!("Failed to read tar entry: {}", e)))?;
194
195 if let Ok(metrics) = process_tar_entry_sync(entry, filter, &mut stats) {
196 project_analysis.add_file_metrics(metrics)?;
197 }
198 }
199
200 #[cfg(feature = "cli")]
201 log::info!(
202 "Filter stats: processed {}/{} files ({:.1}% filtered), saved {}",
203 stats.processed,
204 stats.total_entries,
205 stats.filter_ratio() * 100.0,
206 stats.format_bytes_saved()
207 );
208
209 Ok(())
210}
211
212pub async fn process_tarball_stream(
213 stream_reader: StreamReader,
214 project_analysis: &mut ProjectAnalysis,
215 filter: &IntelligentFilter,
216 _progress_hook: &dyn ProgressHook,
217) -> Result<()> {
218 #[cfg(not(target_arch = "wasm32"))]
219 {
220 let filter = filter.clone();
221 let metrics_result = task::spawn_blocking(move || {
222 let decoder = GzDecoder::new(stream_reader);
223 let mut archive = Archive::new(decoder);
224
225 let entries = archive.entries().map_err(|e| {
226 AnalysisError::archive(format!("Failed to read tar entries: {}", e))
227 })?;
228
229 let mut collected_metrics = Vec::new();
230 let mut stats = FilterStats::new();
231
232 for entry in entries {
233 let entry = entry.map_err(|e| {
234 AnalysisError::archive(format!("Failed to read tar entry: {}", e))
235 })?;
236
237 if let Ok(metrics) = process_tar_entry_sync(entry, &filter, &mut stats) {
238 collected_metrics.push(metrics);
239 }
240 }
241
242 #[cfg(feature = "cli")]
243 log::info!(
244 "Filter stats: processed {}/{} files ({:.1}% filtered), saved {}",
245 stats.processed,
246 stats.total_entries,
247 stats.filter_ratio() * 100.0,
248 stats.format_bytes_saved()
249 );
250
251 Ok::<Vec<FileMetrics>, AnalysisError>(collected_metrics)
252 })
253 .await
254 .map_err(|e| AnalysisError::archive(format!("Task join error: {}", e)))??;
255
256 for metrics in metrics_result {
257 project_analysis.add_file_metrics(metrics)?;
258 }
259 }
260
261 #[cfg(target_arch = "wasm32")]
262 {
263 let decoder = GzDecoder::new(stream_reader);
264 let mut archive = Archive::new(decoder);
265
266 let entries = archive
267 .entries()
268 .map_err(|e| AnalysisError::archive(format!("Failed to read tar entries: {}", e)))?;
269
270 let mut stats = FilterStats::new();
271
272 for entry in entries {
273 let entry = entry
274 .map_err(|e| AnalysisError::archive(format!("Failed to read tar entry: {}", e)))?;
275
276 if let Ok(metrics) = process_tar_entry_sync(entry, filter, &mut stats) {
277 project_analysis.add_file_metrics(metrics)?;
278 }
279 }
280 }
281
282 Ok(())
283}
284
285fn process_tar_entry_sync<R: Read>(
286 mut entry: tar::Entry<'_, R>,
287 filter: &IntelligentFilter,
288 stats: &mut FilterStats,
289) -> Result<FileMetrics> {
290 let header = entry.header();
291 let path = header
292 .path()
293 .map_err(|e| AnalysisError::archive(format!("Invalid path in tar entry: {}", e)))?;
294
295 let file_path = path.to_string_lossy().to_string();
296
297 if !header.entry_type().is_file() || header.size().unwrap_or(0) == 0 {
298 return Err(AnalysisError::archive("Not a file or empty".to_string()));
299 }
300
301 let file_size = header.size().unwrap_or(0);
302
303 let should_process = filter.should_process_file(&file_path, file_size);
304 stats.record_entry(file_size, !should_process);
305
306 if !should_process {
307 return Err(AnalysisError::archive("File filtered out".to_string()));
308 }
309
310 let language = LanguageRegistry::detect_by_path(&file_path)
311 .map(|l| l.name.clone())
312 .unwrap_or_else(|| "Text".to_string());
313
314 let mut content = String::new();
315 if entry.read_to_string(&mut content).is_err() {
316 return Err(AnalysisError::archive(
317 "Failed to read file content".to_string(),
318 ));
319 }
320
321 analyze_file_content(&file_path, &content, &language, file_size)
322}
323
324fn analyze_file_content(
325 file_path: &str,
326 content: &str,
327 language: &str,
328 file_size: u64,
329) -> Result<FileMetrics> {
330 let lines: Vec<&str> = content.lines().collect();
331 let total_lines = lines.len();
332
333 let mut code_lines = 0;
334 let mut comment_lines = 0;
335 let mut blank_lines = 0;
336
337 let lang_def = LanguageRegistry::get_language(language);
338 let empty_line_comments = vec![];
339 let empty_multi_line_comments = vec![];
340 let line_comments = lang_def
341 .map(|l| &l.line_comments)
342 .unwrap_or(&empty_line_comments);
343 let multi_line_comments = lang_def
344 .map(|l| &l.multi_line_comments)
345 .unwrap_or(&empty_multi_line_comments);
346
347 let mut in_multi_line_comment = false;
348
349 for line in lines {
350 let trimmed = line.trim();
351
352 if trimmed.is_empty() {
353 blank_lines += 1;
354 continue;
355 }
356
357 let mut is_comment = false;
358
359 if !in_multi_line_comment {
360 for comment_start in line_comments {
361 if trimmed.starts_with(comment_start) {
362 is_comment = true;
363 break;
364 }
365 }
366
367 for (start, end) in multi_line_comments {
368 if trimmed.starts_with(start) {
369 is_comment = true;
370 if !trimmed.ends_with(end) {
371 in_multi_line_comment = true;
372 }
373 break;
374 }
375 }
376 } else {
377 is_comment = true;
378 for (_, end) in multi_line_comments {
379 if trimmed.ends_with(end) {
380 in_multi_line_comment = false;
381 break;
382 }
383 }
384 }
385
386 if is_comment {
387 comment_lines += 1;
388 } else {
389 code_lines += 1;
390 }
391 }
392
393 let metrics = FileMetrics::new(
394 file_path,
395 language.to_string(),
396 total_lines,
397 code_lines,
398 comment_lines,
399 blank_lines,
400 )?
401 .with_size_bytes(file_size);
402
403 Ok(metrics)
404}