offline_intelligence/engine_management/
downloader.rs1use anyhow::Result;
7use tracing::error;
8use futures_util::StreamExt;
9use reqwest::Client;
10use std::path::PathBuf;
11use std::sync::Arc;
12use tokio::fs;
13use tokio::io::AsyncWriteExt;
14use tracing::{debug, info, warn};
15
16use super::registry::{EngineInfo, EngineStatus};
17use super::download_progress::{EngineDownloadProgressTracker, EngineDownloadProgress, EngineDownloadStatus};
18
19#[derive(Debug, Clone)]
21pub enum EngineSource {
22 OfficialGithub,
23 HuggingFace,
24 Custom(String),
25}
26
27pub struct EngineDownloader {
29 client: Client,
30 download_dir: PathBuf,
31 progress_tracker: Arc<EngineDownloadProgressTracker>,
32}
33
34impl EngineDownloader {
35 pub fn new() -> Self {
36 Self {
37 client: Client::new(),
38 download_dir: std::env::temp_dir().join("aud_io_engines"),
39 progress_tracker: Arc::new(EngineDownloadProgressTracker::new()),
40 }
41 }
42
43 pub async fn download_engine(&self, engine_info: &EngineInfo) -> Result<EngineInfo> {
45 info!("Starting download of engine: {}", engine_info.name);
46
47 let engine_id = engine_info.id.clone();
48
49 self.progress_tracker.start_download(
51 engine_id.clone(),
52 engine_info.name.clone(),
53 engine_info.file_size,
54 ).await;
55
56 fs::create_dir_all(&self.download_dir).await?;
58
59 let archive_path = self.download_archive_with_progress(engine_info).await?;
61
62 self.progress_tracker.update_status(&engine_id, EngineDownloadStatus::Extracting).await;
64
65 let install_path = self.extract_and_install(engine_info, &archive_path).await?;
67
68 self.progress_tracker.update_status(&engine_id, EngineDownloadStatus::Verifying).await;
70
71 self.verify_installation(engine_info, &install_path).await?;
73
74 let _ = fs::remove_file(&archive_path).await;
76
77 self.progress_tracker.update_progress(
79 &engine_id,
80 engine_info.file_size,
81 EngineDownloadStatus::Completed,
82 None
83 ).await;
84
85 let mut installed_engine = engine_info.clone();
87 installed_engine.status = EngineStatus::Installed;
88 installed_engine.install_path = Some(install_path);
89
90 info!("Successfully installed engine: {}", engine_info.name);
91 Ok(installed_engine)
92 }
93
94 async fn download_archive_with_progress(&self, engine_info: &EngineInfo) -> Result<PathBuf> {
96 let filename = self.get_archive_filename(engine_info);
97 let archive_path = self.download_dir.join(&filename);
98 let engine_id = engine_info.id.clone();
99
100 info!("Downloading {} from {} to {:?}", engine_info.name, engine_info.download_url, archive_path);
101
102 self.progress_tracker.update_status(&engine_id, EngineDownloadStatus::Downloading).await;
104
105 let response = self.make_download_request(engine_info).await?;
106 response.error_for_status_ref()?;
107
108 let total_size = response.content_length().unwrap_or(engine_info.file_size);
109 let mut downloaded: u64 = 0;
110 let mut file = fs::File::create(&archive_path).await?;
111
112 let start_time = std::time::Instant::now();
114
115 let mut stream = response.bytes_stream();
117 while let Some(chunk_result) = stream.next().await {
118 let chunk = chunk_result?;
119 file.write_all(&chunk).await?;
120 downloaded += chunk.len() as u64;
121
122 if downloaded % (100 * 1024) < chunk.len() as u64 || chunk.len() == 0 {
124 let elapsed_secs = start_time.elapsed().as_secs_f64();
125 let speed_bps = if elapsed_secs > 0.0 {
126 downloaded as f64 / elapsed_secs
127 } else {
128 0.0
129 };
130
131 if let Some(mut progress) = self.progress_tracker.get_progress(&engine_id).await {
133 progress.bytes_downloaded = downloaded;
134 progress.total_bytes = total_size;
135 progress.speed_bps = speed_bps;
136 if total_size > 0 {
137 progress.progress_percentage = (downloaded as f32 / total_size as f32) * 100.0;
138 }
139 self.progress_tracker.update_progress(
141 &engine_id,
142 downloaded,
143 EngineDownloadStatus::Downloading,
144 None
145 ).await;
146 }
147 }
148 }
149
150 file.flush().await?;
151 info!("Download completed: {} bytes", downloaded);
152
153 Ok(archive_path)
154 }
155
156 async fn extract_and_install(&self, engine_info: &EngineInfo, archive_path: &PathBuf) -> Result<PathBuf> {
158 let engine_storage_path = self.get_engine_storage_path()?;
159 let install_path = engine_storage_path.join(&engine_info.id);
160
161 fs::create_dir_all(&install_path).await?;
163
164 info!("Extracting to {:?}", install_path);
165
166 if archive_path.extension().map_or(false, |ext| ext == "zip") {
168 self.extract_zip(archive_path, &install_path).await?;
169 } else if archive_path.extension().map_or(false, |ext| ext == "tar" || ext == "gz") {
170 self.extract_tar_gz(archive_path, &install_path).await?;
171 } else {
172 return Err(anyhow::anyhow!("Unsupported archive format"));
173 }
174
175 #[cfg(unix)]
181 {
182 use std::os::unix::fs::PermissionsExt;
183 if let Ok(entries) = std::fs::read_dir(&install_path) {
184 for entry in entries.flatten() {
185 let entry_path = entry.path();
186 if entry_path.is_file() {
187 if let Ok(meta) = std::fs::metadata(&entry_path) {
188 let mut perms = meta.permissions();
189 perms.set_mode(0o755);
190 let _ = std::fs::set_permissions(&entry_path, perms);
191 }
192 }
193 }
194 }
195 }
196
197 #[cfg(target_os = "macos")]
206 self.remove_quarantine_attribute(&install_path).await;
207
208 self.save_engine_metadata(engine_info, &install_path).await?;
210
211 Ok(install_path)
212 }
213
214 async fn extract_zip(&self, archive_path: &PathBuf, destination: &PathBuf) -> Result<()> {
216 let archive_path_owned = archive_path.clone();
218 let destination_owned = destination.clone();
219
220 let result = tokio::task::spawn_blocking(move || {
221 std::fs::File::open(&archive_path_owned)
222 .map_err(|e| anyhow::anyhow!("Failed to open archive: {}", e))
223 .and_then(|file| {
224 zip::ZipArchive::new(file)
225 .map_err(|e| anyhow::anyhow!("Failed to read ZIP: {}", e))
226 })
227 .and_then(|mut archive| {
228 for i in 0..archive.len() {
229 let mut file = archive.by_index(i)
230 .map_err(|e| anyhow::anyhow!("Failed to read file from archive: {}", e))?;
231 let outpath = match file.enclosed_name() {
232 Some(path) => destination_owned.join(path),
233 None => continue,
234 };
235
236 if file.name().ends_with('/') {
237 std::fs::create_dir_all(&outpath)
238 .map_err(|e| anyhow::anyhow!("Failed to create directory: {}", e))?;
239 } else {
240 if let Some(p) = outpath.parent() {
241 if !p.exists() {
242 std::fs::create_dir_all(p)
243 .map_err(|e| anyhow::anyhow!("Failed to create parent directory: {}", e))?;
244 }
245 }
246 let mut outfile = std::fs::File::create(&outpath)
247 .map_err(|e| anyhow::anyhow!("Failed to create file: {}", e))?;
248 std::io::copy(&mut file, &mut outfile)
249 .map_err(|e| anyhow::anyhow!("Failed to copy file: {}", e))?;
250 }
251 }
252 Ok(())
253 })
254 }).await;
255
256 match result {
257 Ok(Ok(())) => Ok(()),
258 Ok(Err(e)) => Err(e),
259 Err(e) => Err(anyhow::anyhow!("Blocking task failed: {}", e))
260 }
261 }
262
263 async fn extract_tar_gz(&self, archive_path: &PathBuf, destination: &PathBuf) -> Result<()> {
273 let archive_path_owned = archive_path.clone();
274 let destination_owned = destination.clone();
275
276 let result = tokio::task::spawn_blocking(move || {
277 std::fs::File::open(&archive_path_owned)
278 .map_err(|e| anyhow::anyhow!("Failed to open archive: {}", e))
279 .and_then(|file| {
280 let gz_decoder = flate2::read::GzDecoder::new(file);
281 let mut archive = tar::Archive::new(gz_decoder);
282
283 for entry in archive.entries()? {
284 let mut entry = entry?;
285 let raw_path = entry.path()?.into_owned();
286
287 let mut components = raw_path.components();
291 let first = components.next();
292
293 let stripped = match first {
294 Some(std::path::Component::CurDir) => {
296 let rest: std::path::PathBuf = components.collect();
297 if rest.as_os_str().is_empty() {
298 continue; }
300 let mut inner = rest.components();
302 inner.next(); let final_path: std::path::PathBuf = inner.collect();
304 if final_path.as_os_str().is_empty() {
305 continue;
306 }
307 final_path
308 }
309 Some(_) => {
311 let rest: std::path::PathBuf = components.collect();
312 if rest.as_os_str().is_empty() {
313 continue; }
315 rest
316 }
317 None => continue,
318 };
319
320 if stripped.components().any(|c| c == std::path::Component::ParentDir) {
322 continue;
323 }
324
325 let dest_path = destination_owned.join(&stripped);
326
327 if let Some(parent) = dest_path.parent() {
328 std::fs::create_dir_all(parent)?;
329 }
330
331 if entry.header().entry_type().is_dir() {
332 std::fs::create_dir_all(&dest_path)?;
333 } else {
334 entry.unpack(&dest_path)?;
335 }
336 }
337
338 Ok(())
339 })
340 }).await;
341
342 match result {
343 Ok(Ok(())) => Ok(()),
344 Ok(Err(e)) => Err(e),
345 Err(e) => Err(anyhow::anyhow!("Blocking task failed: {}", e)),
346 }
347 }
348
349 async fn verify_installation(&self, engine_info: &EngineInfo, install_path: &PathBuf) -> Result<()> {
351 let binary_path = install_path.join(&engine_info.binary_name);
352
353 if !binary_path.exists() {
354 return Err(anyhow::anyhow!("Engine binary not found at {:?}", binary_path));
355 }
356
357 match tokio::process::Command::new(&binary_path)
363 .arg("--help")
364 .output()
365 .await
366 {
367 Ok(_) => {
368 info!("Engine binary verified: {:?}", binary_path);
369 Ok(())
370 }
371 Err(e) => {
372 let hint = if cfg!(target_os = "macos") {
373 format!(
374 "Failed to execute engine binary: {}.\n\
375 On macOS this is usually Gatekeeper quarantine — try:\n\
376 xattr -r -d com.apple.quarantine {:?}",
377 e, binary_path
378 )
379 } else {
380 format!("Failed to execute engine binary: {}", e)
381 };
382 Err(anyhow::anyhow!("{}", hint))
383 }
384 }
385 }
386
387 #[cfg(target_os = "macos")]
396 async fn remove_quarantine_attribute(&self, dir: &PathBuf) {
397 let dir_owned = dir.clone();
398 match tokio::process::Command::new("xattr")
399 .args(["-r", "-d", "com.apple.quarantine"])
400 .arg(&dir_owned)
401 .output()
402 .await
403 {
404 Ok(output) => {
405 if output.status.success() {
406 info!("Removed quarantine attribute from {:?}", dir_owned);
407 } else {
408 let stderr = String::from_utf8_lossy(&output.stderr);
410 if !stderr.contains("No such xattr") && !stderr.trim().is_empty() {
411 tracing::warn!("xattr removal warning for {:?}: {}", dir_owned, stderr.trim());
412 }
413 }
414 }
415 Err(e) => {
416 tracing::warn!("Could not run xattr to remove quarantine from {:?}: {}", dir_owned, e);
417 }
418 }
419 }
420
421 async fn save_engine_metadata(&self, engine_info: &EngineInfo, install_path: &PathBuf) -> Result<()> {
423 let metadata_path = install_path.join("metadata.json");
424 let metadata_json = serde_json::to_string_pretty(engine_info)?;
425 fs::write(&metadata_path, metadata_json).await?;
426 Ok(())
427 }
428
429 fn get_archive_filename(&self, engine_info: &EngineInfo) -> String {
431 let url_path = std::path::Path::new(&engine_info.download_url);
432 url_path
433 .file_name()
434 .and_then(|name| name.to_str())
435 .unwrap_or("engine_archive.zip")
436 .to_string()
437 }
438
439 async fn make_download_request(&self, engine_info: &EngineInfo) -> Result<reqwest::Response> {
441 let url = &engine_info.download_url;
442 info!("Attempting to download from: {}", url);
443
444 let response = self.client.get(url).send().await?;
445
446 match response.error_for_status() {
447 Ok(response) => {
448 info!("Successfully connected to download URL: {}", url);
449 Ok(response)
450 }
451 Err(e) => {
452 error!("Download failed with status error: {} - {}", url, e);
453 Err(anyhow::anyhow!("Download failed: {}", e))
454 }
455 }
456 }
457
458 fn get_engine_storage_path(&self) -> Result<PathBuf> {
460 let base_dir = if cfg!(target_os = "windows") {
461 dirs::data_dir()
462 .ok_or_else(|| anyhow::anyhow!("Failed to get APPDATA directory"))?
463 .join("Aud.io")
464 .join("engines")
465 } else if cfg!(target_os = "macos") {
466 dirs::data_dir()
467 .ok_or_else(|| anyhow::anyhow!("Failed to get Library directory"))?
468 .join("Aud.io")
469 .join("engines")
470 } else {
471 dirs::data_dir()
472 .ok_or_else(|| anyhow::anyhow!("Failed to get .local/share directory"))?
473 .join("aud.io")
474 .join("engines")
475 };
476
477 Ok(base_dir)
478 }
479
480 pub async fn is_engine_downloaded(&self, engine_id: &str) -> Result<bool> {
482 let storage_path = self.get_engine_storage_path()?;
483 let engine_path = storage_path.join(engine_id);
484 Ok(engine_path.exists())
485 }
486
487 pub async fn get_download_progress(&self, engine_id: &str) -> Result<Option<f64>> {
489 if let Some(progress) = self.progress_tracker.get_progress(engine_id).await {
490 return Ok(Some(progress.progress_percentage as f64));
491 }
492 Ok(None)
493 }
494
495 pub async fn get_all_download_progress(&self) -> Vec<EngineDownloadProgress> {
497 self.progress_tracker.get_all_downloads().await
498 }
499
500 pub fn progress_tracker(&self) -> &Arc<EngineDownloadProgressTracker> {
502 &self.progress_tracker
503 }
504
505 pub async fn cancel_download(&self, engine_id: &str) -> Result<()> {
507 if self.progress_tracker.cancel_download(engine_id).await {
508 info!("Cancelled download for engine: {}", engine_id);
509 } else {
510 info!("No active download found to cancel for engine: {}", engine_id);
511 }
512 Ok(())
513 }
514}
515