1use crate::core::{
2 analysis::{FileMetrics, ProjectAnalysis},
3 error::{AnalysisError, Result},
4 registry::LanguageRegistry,
5};
6use flate2::read::GzDecoder;
7use futures_util::StreamExt;
8use reqwest::Client;
9use std::io::{Cursor, Read};
10use tar::Archive;
11use tokio::task;
12
13static USER_AGENT: &str = "bytes-radar/1.0.0";
14
15#[cfg(feature = "cli")]
16use indicatif::ProgressBar;
17
18pub struct RemoteAnalyzer {
19 client: Client,
20 github_token: Option<String>,
21 timeout: u64,
22 allow_insecure: bool,
23 #[cfg(feature = "cli")]
24 progress_bar: Option<ProgressBar>,
25}
26
27impl RemoteAnalyzer {
28 pub fn new() -> Self {
29 let client = Client::builder()
30 .user_agent(USER_AGENT)
31 .timeout(std::time::Duration::from_secs(300))
32 .build()
33 .expect("Failed to create HTTP client");
34
35 Self {
36 client,
37 github_token: None,
38 timeout: 300,
39 allow_insecure: false,
40 #[cfg(feature = "cli")]
41 progress_bar: None,
42 }
43 }
44
45 #[cfg(feature = "cli")]
46 pub fn set_progress_bar(&mut self, progress_bar: Option<ProgressBar>) {
47 self.progress_bar = progress_bar;
48 }
49
50 pub fn set_github_token(&mut self, token: &str) {
51 self.github_token = Some(token.to_string());
52 self.rebuild_client();
53 }
54
55 pub fn set_timeout(&mut self, timeout: u64) {
56 self.timeout = timeout;
57 self.rebuild_client();
58 }
59
60 pub fn set_allow_insecure(&mut self, allow_insecure: bool) {
61 self.allow_insecure = allow_insecure;
62 self.rebuild_client();
63 }
64
65 fn rebuild_client(&mut self) {
66 let mut builder = Client::builder()
67 .user_agent(USER_AGENT)
68 .timeout(std::time::Duration::from_secs(self.timeout));
69
70 if self.allow_insecure {
71 builder = builder.danger_accept_invalid_certs(true);
72 }
73
74 if let Some(token) = &self.github_token {
75 let mut headers = reqwest::header::HeaderMap::new();
76 let auth_value = format!("token {}", token);
77 headers.insert(
78 reqwest::header::AUTHORIZATION,
79 auth_value.parse().expect("Invalid token format"),
80 );
81 builder = builder.default_headers(headers);
82 }
83
84 self.client = builder.build().expect("Failed to create HTTP client");
85 }
86
87 pub async fn analyze_url(&self, url: &str) -> Result<ProjectAnalysis> {
88 let download_url = self.resolve_git_url(url).await?;
89 let project_analysis = self.analyze_tarball_with_name(&download_url, url).await?;
90 Ok(project_analysis)
91 }
92
93 async fn resolve_git_url(&self, url: &str) -> Result<String> {
94 if url.ends_with(".tar.gz") || url.ends_with(".tgz") {
95 return Ok(url.to_string());
96 }
97
98 if url.starts_with("http://") || url.starts_with("https://") {
99 if !url.contains("github.com")
100 && !url.contains("gitlab.com")
101 && !url.contains("gitlab.")
102 && !url.contains("bitbucket.org")
103 && !url.contains("codeberg.org")
104 {
105 if url.ends_with(".tar.gz") || url.ends_with(".tgz") {
106 return Ok(url.to_string());
107 } else {
108 return Ok(url.to_string());
109 }
110 }
111 }
112
113 let branches = ["main", "master", "develop", "dev"];
114
115 if let Some(github_url) = self.parse_github_url_with_branch(url) {
116 return Ok(github_url);
117 }
118
119 if let Some(gitlab_url) = self.parse_gitlab_url_with_branch(url) {
120 return Ok(gitlab_url);
121 }
122
123 if let Some(bitbucket_url) = self.parse_bitbucket_url_with_branch(url) {
124 return Ok(bitbucket_url);
125 }
126
127 if let Some(codeberg_url) = self.parse_codeberg_url_with_branch(url) {
128 return Ok(codeberg_url);
129 }
130
131 for branch in &branches {
132 if let Some(github_url) = self.parse_github_url(url, branch) {
133 if self.check_url_exists(&github_url).await {
134 return Ok(github_url);
135 }
136 }
137
138 if let Some(gitlab_url) = self.parse_gitlab_url(url, branch) {
139 if self.check_url_exists(&gitlab_url).await {
140 return Ok(gitlab_url);
141 }
142 }
143
144 if let Some(bitbucket_url) = self.parse_bitbucket_url(url, branch) {
145 if self.check_url_exists(&bitbucket_url).await {
146 return Ok(bitbucket_url);
147 }
148 }
149
150 if let Some(codeberg_url) = self.parse_codeberg_url(url, branch) {
151 if self.check_url_exists(&codeberg_url).await {
152 return Ok(codeberg_url);
153 }
154 }
155 }
156
157 Err(AnalysisError::url_parsing(format!(
158 "Unsupported URL format or no accessible branch found: {}. Please provide a direct tar.gz URL or a supported repository URL.",
159 url
160 )))
161 }
162
163 fn parse_github_url_with_branch(&self, url: &str) -> Option<String> {
164 if url.contains("github.com") {
165 if url.contains("/tree/") {
166 let parts: Vec<&str> = url.split('/').collect();
167 if let Some(tree_pos) = parts.iter().position(|&x| x == "tree") {
168 if tree_pos + 1 < parts.len() && tree_pos >= 2 {
169 let owner = parts[tree_pos - 2];
170 let repo = parts[tree_pos - 1];
171 let branch = parts[tree_pos + 1];
172 return Some(format!(
173 "https://github.com/{}/{}/archive/refs/heads/{}.tar.gz",
174 owner, repo, branch
175 ));
176 }
177 }
178 }
179
180 if url.contains("/commit/") {
181 return self.extract_github_commit_url(url);
182 }
183 }
184 None
185 }
186
187 fn parse_gitlab_url_with_branch(&self, url: &str) -> Option<String> {
188 if url.contains("gitlab.com") || url.contains("gitlab.") {
189 if url.contains("/-/tree/") {
190 let parts: Vec<&str> = url.split('/').collect();
191 if let Some(tree_pos) = parts.iter().position(|&x| x == "tree") {
192 if tree_pos + 1 < parts.len() && tree_pos >= 3 {
193 let gitlab_pos = parts.iter().position(|&x| x.contains("gitlab")).unwrap();
194 let host = parts[gitlab_pos];
195 let owner = parts[gitlab_pos + 1];
196 let repo = parts[gitlab_pos + 2];
197 let branch = parts[tree_pos + 1];
198 return Some(format!(
199 "https://{}/{}{}/-/archive/{}/{}-{}.tar.gz",
200 host,
201 owner,
202 if parts.len() > gitlab_pos + 3 && parts[gitlab_pos + 3] != "-" {
203 format!("/{}", parts[gitlab_pos + 3..tree_pos - 1].join("/"))
204 } else {
205 String::new()
206 },
207 branch,
208 repo,
209 branch
210 ));
211 }
212 }
213 }
214 }
215 None
216 }
217
218 fn parse_bitbucket_url_with_branch(&self, url: &str) -> Option<String> {
219 if url.contains("bitbucket.org") {
220 if url.contains("/commits/") {
221 let parts: Vec<&str> = url.split('/').collect();
222 if let Some(commits_pos) = parts.iter().position(|&x| x == "commits") {
223 if commits_pos + 1 < parts.len() && commits_pos >= 2 {
224 let owner = parts[commits_pos - 2];
225 let repo = parts[commits_pos - 1];
226 let commit = parts[commits_pos + 1];
227 return Some(format!(
228 "https://bitbucket.org/{}/{}/get/{}.tar.gz",
229 owner, repo, commit
230 ));
231 }
232 }
233 }
234
235 if url.contains("/branch/") {
236 let parts: Vec<&str> = url.split('/').collect();
237 if let Some(branch_pos) = parts.iter().position(|&x| x == "branch") {
238 if branch_pos + 1 < parts.len() && branch_pos >= 2 {
239 let owner = parts[branch_pos - 2];
240 let repo = parts[branch_pos - 1];
241 let branch = parts[branch_pos + 1];
242 return Some(format!(
243 "https://bitbucket.org/{}/{}/get/{}.tar.gz",
244 owner, repo, branch
245 ));
246 }
247 }
248 }
249 }
250 None
251 }
252
253 fn parse_codeberg_url_with_branch(&self, url: &str) -> Option<String> {
254 if url.contains("codeberg.org") {
255 if url.contains("/commit/") {
256 let parts: Vec<&str> = url.split('/').collect();
257 if let Some(commit_pos) = parts.iter().position(|&x| x == "commit") {
258 if commit_pos + 1 < parts.len() && commit_pos >= 2 {
259 let owner = parts[commit_pos - 2];
260 let repo = parts[commit_pos - 1];
261 let commit = parts[commit_pos + 1];
262 return Some(format!(
263 "https://codeberg.org/{}/{}/archive/{}.tar.gz",
264 owner, repo, commit
265 ));
266 }
267 }
268 }
269
270 if url.contains("/src/branch/") {
271 let parts: Vec<&str> = url.split('/').collect();
272 if let Some(branch_pos) = parts.iter().position(|&x| x == "branch") {
273 if branch_pos + 1 < parts.len() && branch_pos >= 3 {
274 let owner = parts[branch_pos - 3];
275 let repo = parts[branch_pos - 2];
276 let branch = parts[branch_pos + 1];
277 return Some(format!(
278 "https://codeberg.org/{}/{}/archive/{}.tar.gz",
279 owner, repo, branch
280 ));
281 }
282 }
283 }
284 }
285 None
286 }
287
288 fn parse_bitbucket_url(&self, url: &str, branch: &str) -> Option<String> {
289 if url.contains("bitbucket.org") {
290 let parts: Vec<&str> = url.split('/').collect();
291 if let Some(bitbucket_pos) = parts.iter().position(|&x| x == "bitbucket.org") {
292 if bitbucket_pos + 2 < parts.len() {
293 let owner = parts[bitbucket_pos + 1];
294 let repo = parts[bitbucket_pos + 2];
295 return Some(format!(
296 "https://bitbucket.org/{}/{}/get/{}.tar.gz",
297 owner, repo, branch
298 ));
299 }
300 }
301 }
302 None
303 }
304
305 fn parse_codeberg_url(&self, url: &str, branch: &str) -> Option<String> {
306 if url.contains("codeberg.org") {
307 let parts: Vec<&str> = url.split('/').collect();
308 if let Some(codeberg_pos) = parts.iter().position(|&x| x == "codeberg.org") {
309 if codeberg_pos + 2 < parts.len() {
310 let owner = parts[codeberg_pos + 1];
311 let repo = parts[codeberg_pos + 2];
312 return Some(format!(
313 "https://codeberg.org/{}/{}/archive/{}.tar.gz",
314 owner, repo, branch
315 ));
316 }
317 }
318 }
319 None
320 }
321
322 async fn check_url_exists(&self, url: &str) -> bool {
323 if let Ok(response) = self.client.head(url).send().await {
324 response.status().is_success()
325 } else {
326 false
327 }
328 }
329
330 fn parse_github_url(&self, url: &str, branch: &str) -> Option<String> {
331 let url = url.trim_end_matches('/');
332
333 if url.contains("github.com") {
334 if let Some(commit_url) = self.extract_github_commit_url(url) {
335 return Some(commit_url);
336 }
337
338 if let Some(repo_url) = self.extract_github_repo_url(url, branch) {
339 return Some(repo_url);
340 }
341 }
342
343 None
344 }
345
346 fn extract_github_commit_url(&self, url: &str) -> Option<String> {
347 if url.contains("/commit/") {
348 let parts: Vec<&str> = url.split('/').collect();
349 if let Some(commit_pos) = parts.iter().position(|&x| x == "commit") {
350 if commit_pos + 1 < parts.len() {
351 let owner = parts.get(parts.len() - 4)?;
352 let repo = parts.get(parts.len() - 3)?;
353 let commit = parts.get(commit_pos + 1)?;
354 return Some(format!(
355 "https://github.com/{}/{}/archive/{}.tar.gz",
356 owner, repo, commit
357 ));
358 }
359 }
360 }
361 None
362 }
363
364 fn extract_github_repo_url(&self, url: &str, branch: &str) -> Option<String> {
365 let parts: Vec<&str> = url.split('/').collect();
366 if parts.len() >= 2 && parts.contains(&"github.com") {
367 if let Some(github_pos) = parts.iter().position(|&x| x == "github.com") {
368 if github_pos + 2 < parts.len() {
369 let owner = parts[github_pos + 1];
370 let repo = parts[github_pos + 2];
371 return Some(format!(
372 "https://github.com/{}/{}/archive/refs/heads/{}.tar.gz",
373 owner, repo, branch
374 ));
375 }
376 }
377 }
378 None
379 }
380
381 fn parse_gitlab_url(&self, url: &str, branch: &str) -> Option<String> {
382 let url = url.trim_end_matches('/');
383
384 if url.contains("gitlab.com") || url.contains("gitlab.") {
385 let parts: Vec<&str> = url.split('/').collect();
386 if let Some(gitlab_pos) = parts.iter().position(|&x| x.contains("gitlab")) {
387 if gitlab_pos + 2 < parts.len() {
388 let host = parts[gitlab_pos];
389 let owner = parts[gitlab_pos + 1];
390 let repo = parts[gitlab_pos + 2];
391 return Some(format!(
392 "https://{}/{}{}/-/archive/{}/{}-{}.tar.gz",
393 host,
394 owner,
395 if parts.len() > gitlab_pos + 3 {
396 format!("/{}", parts[gitlab_pos + 3..].join("/"))
397 } else {
398 String::new()
399 },
400 branch,
401 repo,
402 branch
403 ));
404 }
405 }
406 }
407
408 None
409 }
410
411 async fn analyze_tarball_with_name(
412 &self,
413 download_url: &str,
414 original_url: &str,
415 ) -> Result<ProjectAnalysis> {
416 let project_name = self.extract_project_name_from_original(original_url);
417 let mut project_analysis = ProjectAnalysis::new(project_name);
418
419 let response = self
420 .client
421 .get(download_url)
422 .send()
423 .await
424 .map_err(|e| AnalysisError::network(format!("Failed to fetch URL: {}", e)))?;
425
426 if !response.status().is_success() {
427 return Err(AnalysisError::network(format!(
428 "HTTP request failed with status: {}",
429 response.status()
430 )));
431 }
432
433 let total_size = response.content_length();
434
435 #[cfg(feature = "cli")]
436 if let Some(pb) = &self.progress_bar {
437 if let Some(size) = total_size {
438 use indicatif::ProgressStyle;
439 pb.set_style(
440 ProgressStyle::default_bar()
441 .template("[{elapsed_precise}] [{wide_bar:.cyan/blue}] {decimal_bytes_per_sec} {binary_bytes}/{binary_total_bytes} ({eta}) {msg}")
442 .unwrap_or_else(|_| ProgressStyle::default_bar())
443 .progress_chars("#>-"),
444 );
445 pb.set_length(size);
446 pb.set_message("Downloading and processing...");
447 } else {
448 pb.set_message("Downloading and processing...");
449 pb.enable_steady_tick(std::time::Duration::from_millis(120));
450 }
451 }
452
453 let stream = response.bytes_stream();
454 let stream_reader = StreamReader::new(
455 stream,
456 #[cfg(feature = "cli")]
457 self.progress_bar.clone(),
458 total_size,
459 );
460
461 #[cfg(feature = "cli")]
462 if let Some(pb) = &self.progress_bar {
463 pb.set_message("Processing archive...");
464 }
465
466 self.process_tarball_stream(stream_reader, &mut project_analysis)
467 .await?;
468 Ok(project_analysis)
469 }
470
471 async fn process_tarball_stream(
472 &self,
473 stream_reader: StreamReader,
474 project_analysis: &mut ProjectAnalysis,
475 ) -> Result<()> {
476 let metrics_result = task::spawn_blocking(move || {
477 let decoder = GzDecoder::new(stream_reader);
478 let mut archive = Archive::new(decoder);
479
480 let entries = archive.entries().map_err(|e| {
481 AnalysisError::archive(format!("Failed to read tar entries: {}", e))
482 })?;
483
484 let mut collected_metrics = Vec::new();
485
486 for entry in entries {
487 let entry = entry.map_err(|e| {
488 AnalysisError::archive(format!("Failed to read tar entry: {}", e))
489 })?;
490
491 if let Ok(metrics) = Self::process_tar_entry_sync(entry) {
492 collected_metrics.push(metrics);
493 }
494 }
495
496 Ok::<Vec<FileMetrics>, AnalysisError>(collected_metrics)
497 })
498 .await
499 .map_err(|e| AnalysisError::archive(format!("Task join error: {}", e)))??;
500
501 for metrics in metrics_result {
502 project_analysis.add_file_metrics(metrics)?;
503 }
504
505 Ok(())
506 }
507
508 fn process_tar_entry_sync<R: Read>(mut entry: tar::Entry<'_, R>) -> Result<FileMetrics> {
509 let header = entry.header();
510 let path = header
511 .path()
512 .map_err(|e| AnalysisError::archive(format!("Invalid path in tar entry: {}", e)))?;
513
514 let file_path = path.to_string_lossy().to_string();
515
516 if !header.entry_type().is_file() || header.size().unwrap_or(0) == 0 {
517 return Err(AnalysisError::archive("Not a file or empty".to_string()));
518 }
519
520 let file_size = header.size().unwrap_or(0);
521 let language = LanguageRegistry::detect_by_path(&file_path)
522 .map(|l| l.name.clone())
523 .unwrap_or_else(|| "Text".to_string());
524
525 let mut content = String::new();
526 if entry.read_to_string(&mut content).is_err() {
527 return Err(AnalysisError::archive(
528 "Failed to read file content".to_string(),
529 ));
530 }
531
532 analyze_file_content(&file_path, &content, &language, file_size)
533 }
534
535 fn extract_project_name_from_original(&self, url: &str) -> String {
536 if url.starts_with("http://") || url.starts_with("https://") {
537 let url = url.trim_end_matches('/');
538
539 if url.contains("/tree/") {
540 let parts: Vec<&str> = url.split('/').collect();
541 if let Some(tree_pos) = parts.iter().position(|&x| x == "tree") {
542 if tree_pos > 1 {
543 let repo = parts[tree_pos - 1];
544 let branch = parts.get(tree_pos + 1).unwrap_or(&"unknown");
545 return format!("{}@{}", repo, branch);
546 }
547 }
548 }
549
550 if url.contains("/commit/") {
551 let parts: Vec<&str> = url.split('/').collect();
552 if let Some(commit_pos) = parts.iter().position(|&x| x == "commit") {
553 if commit_pos > 1 {
554 let repo = parts[commit_pos - 1];
555 let commit = parts.get(commit_pos + 1).unwrap_or(&"unknown");
556 return format!("{}@{}", repo, &commit[..7.min(commit.len())]);
557 }
558 }
559 }
560
561 let parts: Vec<&str> = url.split('/').collect();
562 if parts.len() >= 2 {
563 let repo = parts[parts.len() - 1];
564 return format!("{}@main", repo);
565 }
566 } else if url.contains('/') && !url.contains('.') {
567 let parts: Vec<&str> = url.split('@').collect();
568 let repo_part = parts[0];
569 let branch = parts.get(1).unwrap_or(&"main");
570
571 if let Some(repo_name) = repo_part.split('/').last() {
572 return format!("{}@{}", repo_name, branch);
573 }
574 }
575
576 "remote-project".to_string()
577 }
578
579 #[allow(dead_code)]
580 fn extract_project_name(&self, url: &str) -> String {
581 let url_path = url.trim_end_matches('/');
582
583 if let Some(filename) = url_path.split('/').last() {
584 if filename.ends_with(".tar.gz") {
585 return filename.trim_end_matches(".tar.gz").to_string();
586 }
587 if filename.ends_with(".tgz") {
588 return filename.trim_end_matches(".tgz").to_string();
589 }
590 return filename.to_string();
591 }
592
593 "remote-project".to_string()
594 }
595
596 fn format_bytes_simple(bytes: u64) -> String {
597 const UNITS: &[&str] = &["B", "KiB", "MiB", "GiB", "TiB"];
598 const THRESHOLD: f64 = 1024.0;
599
600 if bytes == 0 {
601 return "0 B".to_string();
602 }
603
604 let mut size = bytes as f64;
605 let mut unit_index = 0;
606
607 while size >= THRESHOLD && unit_index < UNITS.len() - 1 {
608 size /= THRESHOLD;
609 unit_index += 1;
610 }
611
612 if unit_index == 0 {
613 format!("{} {}", bytes, UNITS[unit_index])
614 } else {
615 format!("{:.1} {}", size, UNITS[unit_index])
616 }
617 }
618}
619
620impl Default for RemoteAnalyzer {
621 fn default() -> Self {
622 Self::new()
623 }
624}
625
626fn analyze_file_content(
627 file_path: &str,
628 content: &str,
629 language: &str,
630 file_size: u64,
631) -> Result<FileMetrics> {
632 let lines: Vec<&str> = content.lines().collect();
633 let total_lines = lines.len();
634
635 let mut code_lines = 0;
636 let mut comment_lines = 0;
637 let mut blank_lines = 0;
638
639 let lang_def = LanguageRegistry::get_language(language);
640 let empty_line_comments = vec![];
641 let empty_multi_line_comments = vec![];
642 let line_comments = lang_def
643 .map(|l| &l.line_comments)
644 .unwrap_or(&empty_line_comments);
645 let multi_line_comments = lang_def
646 .map(|l| &l.multi_line_comments)
647 .unwrap_or(&empty_multi_line_comments);
648
649 let mut in_multi_line_comment = false;
650
651 for line in lines {
652 let trimmed = line.trim();
653
654 if trimmed.is_empty() {
655 blank_lines += 1;
656 continue;
657 }
658
659 let mut is_comment = false;
660
661 if !in_multi_line_comment {
662 for comment_start in line_comments {
663 if trimmed.starts_with(comment_start) {
664 is_comment = true;
665 break;
666 }
667 }
668
669 for (start, end) in multi_line_comments {
670 if trimmed.starts_with(start) {
671 is_comment = true;
672 if !trimmed.ends_with(end) {
673 in_multi_line_comment = true;
674 }
675 break;
676 }
677 }
678 } else {
679 is_comment = true;
680 for (_, end) in multi_line_comments {
681 if trimmed.ends_with(end) {
682 in_multi_line_comment = false;
683 break;
684 }
685 }
686 }
687
688 if is_comment {
689 comment_lines += 1;
690 } else {
691 code_lines += 1;
692 }
693 }
694
695 let metrics = FileMetrics::new(
696 file_path,
697 language.to_string(),
698 total_lines,
699 code_lines,
700 comment_lines,
701 blank_lines,
702 )?
703 .with_size_bytes(file_size);
704
705 Ok(metrics)
706}
707
708#[cfg(test)]
709mod tests {
710 use super::*;
711
712 #[test]
713 fn test_github_url_parsing() {
714 let analyzer = RemoteAnalyzer::new();
715
716 assert_eq!(
717 analyzer.parse_github_url("https://github.com/user/repo", "main"),
718 Some("https://github.com/user/repo/archive/refs/heads/main.tar.gz".to_string())
719 );
720
721 assert_eq!(
722 analyzer.parse_github_url("https://github.com/user/repo/commit/abc123", "main"),
723 Some("https://github.com/user/repo/archive/abc123.tar.gz".to_string())
724 );
725 }
726
727 #[test]
728 fn test_bitbucket_url_parsing() {
729 let analyzer = RemoteAnalyzer::new();
730
731 assert_eq!(
732 analyzer.parse_bitbucket_url("https://bitbucket.org/user/repo", "main"),
733 Some("https://bitbucket.org/user/repo/get/main.tar.gz".to_string())
734 );
735 }
736
737 #[test]
738 fn test_codeberg_url_parsing() {
739 let analyzer = RemoteAnalyzer::new();
740
741 assert_eq!(
742 analyzer.parse_codeberg_url("https://codeberg.org/user/repo", "main"),
743 Some("https://codeberg.org/user/repo/archive/main.tar.gz".to_string())
744 );
745 }
746
747 #[test]
748 fn test_extract_project_name() {
749 let analyzer = RemoteAnalyzer::new();
750
751 assert_eq!(
752 analyzer.extract_project_name("https://example.com/project.tar.gz"),
753 "project"
754 );
755
756 assert_eq!(
757 analyzer.extract_project_name("https://github.com/user/repo/archive/main.tar.gz"),
758 "main"
759 );
760 }
761}
762
763use tokio::sync::mpsc;
764
765struct StreamReader {
766 receiver: mpsc::Receiver<std::io::Result<bytes::Bytes>>,
767 current_chunk: Option<Cursor<bytes::Bytes>>,
768 finished: bool,
769}
770
771impl StreamReader {
772 fn new(
773 stream: impl futures_util::Stream<Item = reqwest::Result<bytes::Bytes>> + Send + 'static,
774 #[cfg(feature = "cli")] progress_bar: Option<ProgressBar>,
775 total_size: Option<u64>,
776 ) -> Self {
777 let (tx, rx) = mpsc::channel(32);
778
779 tokio::spawn(async move {
780 let mut downloaded = 0u64;
781 let mut stream = Box::pin(stream);
782
783 while let Some(chunk_result) = stream.next().await {
784 match chunk_result {
785 Ok(chunk) => {
786 downloaded += chunk.len() as u64;
787
788 #[cfg(feature = "cli")]
789 if let Some(pb) = &progress_bar {
790 if let Some(_total) = total_size {
791 pb.set_position(downloaded);
792 } else {
793 let formatted = RemoteAnalyzer::format_bytes_simple(downloaded);
794 pb.set_message(format!("Downloaded {}...", formatted));
795 }
796 }
797
798 if tx.send(Ok(chunk)).await.is_err() {
799 break;
800 }
801 }
802 Err(e) => {
803 let _ = tx
804 .send(Err(std::io::Error::new(
805 std::io::ErrorKind::Other,
806 format!("Stream error: {}", e),
807 )))
808 .await;
809 break;
810 }
811 }
812 }
813 });
814
815 Self {
816 receiver: rx,
817 current_chunk: None,
818 finished: false,
819 }
820 }
821}
822
823impl Read for StreamReader {
824 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
825 if let Some(ref mut cursor) = self.current_chunk {
826 let read = cursor.read(buf)?;
827 if read > 0 {
828 return Ok(read);
829 }
830 self.current_chunk = None;
831 }
832
833 if self.finished {
834 return Ok(0);
835 }
836
837 match self.receiver.try_recv() {
838 Ok(Ok(chunk)) => {
839 self.current_chunk = Some(Cursor::new(chunk));
840 if let Some(ref mut cursor) = self.current_chunk {
841 cursor.read(buf)
842 } else {
843 Ok(0)
844 }
845 }
846 Ok(Err(e)) => {
847 self.finished = true;
848 Err(e)
849 }
850 Err(mpsc::error::TryRecvError::Empty) => match self.receiver.blocking_recv() {
851 Some(Ok(chunk)) => {
852 self.current_chunk = Some(Cursor::new(chunk));
853 if let Some(ref mut cursor) = self.current_chunk {
854 cursor.read(buf)
855 } else {
856 Ok(0)
857 }
858 }
859 Some(Err(e)) => {
860 self.finished = true;
861 Err(e)
862 }
863 None => {
864 self.finished = true;
865 Ok(0)
866 }
867 },
868 Err(mpsc::error::TryRecvError::Disconnected) => {
869 self.finished = true;
870 Ok(0)
871 }
872 }
873 }
874}