1use std::{
2 borrow::Cow,
3 os::fd::OwnedFd,
4 path::{Path, PathBuf},
5};
6
7use ahash::{AHashMap, HashSet};
8use aho_corasick::BuildError;
9use apt_auth_config::AuthConfig;
10use bon::Builder;
11use chrono::Utc;
12use nix::{
13 errno::Errno,
14 fcntl::{
15 FcntlArg::{F_GETLK, F_SETFD, F_SETLK},
16 FdFlag, OFlag, fcntl, open,
17 },
18 libc::{F_WRLCK, SEEK_SET, flock},
19 sys::stat::Mode,
20 unistd::close,
21};
22#[cfg(feature = "apt")]
23use oma_apt::config::Config;
24use oma_apt_sources_lists::SourcesListError;
25use oma_fetch::{
26 CompressFile, DownloadEntry, DownloadManager, DownloadSource, DownloadSourceType,
27 checksum::{Checksum, ChecksumError},
28 download::{BuilderError, SuccessSummary},
29 reqwest::{
30 Client, Response,
31 header::{CONTENT_LENGTH, HeaderValue},
32 },
33};
34
35use oma_fetch::{SingleDownloadError, Summary};
36#[cfg(feature = "aosc")]
37use oma_topics::TopicManager;
38
39#[cfg(feature = "aosc")]
40use oma_fetch::reqwest::StatusCode;
41
42use oma_utils::is_termux;
43use sysinfo::{Pid, System};
44use tokio::{
45 fs::{self},
46 task::spawn_blocking,
47};
48use tracing::{debug, warn};
49
50use crate::sourceslist::{MirrorSource, MirrorSources, scan_sources_list_from_paths};
51use crate::{
52 config::{ChecksumDownloadEntry, IndexTargetConfig},
53 inrelease::{
54 ChecksumItem, InReleaseChecksum, InReleaseError, Release, file_is_compress,
55 split_ext_and_filename, verify_inrelease,
56 },
57 sourceslist::{OmaSourceEntry, OmaSourceEntryFrom, scan_sources_lists_paths},
58 util::DatabaseFilenameReplacer,
59};
60
61#[derive(Debug, thiserror::Error)]
62pub enum RefreshError {
63 #[cfg(feature = "blocking")]
64 #[error("Failed to create tokio runtime")]
65 CreateTokioRuntime(std::io::Error),
66 #[error("Invalid URL: {0}")]
67 InvalidUrl(String),
68 #[error("Scan sources.list failed: {0}")]
69 ScanSourceError(SourcesListError),
70 #[error("Unsupported Protocol: {0}")]
71 UnsupportedProtocol(String),
72 #[error("Failed to download some metadata")]
73 DownloadFailed(Option<SingleDownloadError>),
74 #[cfg(feature = "aosc")]
75 #[error(transparent)]
76 TopicsError(#[from] oma_topics::OmaTopicsError),
77 #[error("Failed to download InRelease from URL {0}: Remote file not found (HTTP 404).")]
78 NoInReleaseFile(String),
79 #[error(transparent)]
80 JoinError(#[from] tokio::task::JoinError),
81 #[error(transparent)]
82 ChecksumError(#[from] ChecksumError),
83 #[error("Failed to operate dir or file {0}: {1}")]
84 FailedToOperateDirOrFile(String, tokio::io::Error),
85 #[error("Failed to parse InRelease file: {0}")]
86 InReleaseParseError(PathBuf, InReleaseError),
87 #[error("Failed to read download dir: {0}")]
88 ReadDownloadDir(String, std::io::Error),
89 #[error(transparent)]
90 AhoCorasickBuilder(#[from] BuildError),
91 #[error("stream_replace_all failed")]
92 ReplaceAll(std::io::Error),
93 #[error("Set lock failed")]
94 SetLock(Errno),
95 #[error("Set lock failed: process {0} ({1}) is using.")]
96 SetLockWithProcess(String, i32),
97 #[error("duplicate components")]
98 DuplicateComponents(Box<str>, String),
99 #[error("sources.list is empty")]
100 SourceListsEmpty,
101 #[error("Failed to operate file: {0}")]
102 OperateFile(PathBuf, std::io::Error),
103 #[error("thread count is not illegal: {0}")]
104 WrongThreadCount(usize),
105 #[error("Failed to build download manager")]
106 DownloadManagerBuilderError(BuilderError),
107 #[error("No metadata file to download")]
108 NoMetadataToDownload,
109}
110
111type Result<T> = std::result::Result<T, RefreshError>;
112
113#[derive(Builder)]
114pub struct OmaRefresh<'a> {
115 source: PathBuf,
116 #[builder(default = 4)]
117 threads: usize,
118 arch: String,
119 download_dir: PathBuf,
120 client: &'a Client,
121 #[cfg(feature = "aosc")]
122 refresh_topics: bool,
123 #[cfg(feature = "apt")]
124 apt_config: &'a Config,
125 #[cfg(not(feature = "apt"))]
126 manifest_config: Vec<std::collections::HashMap<String, String>>,
127 #[cfg(feature = "aosc")]
128 topic_msg: &'a str,
129 auth_config: Option<&'a AuthConfig>,
130 sources_lists_paths: Option<Vec<PathBuf>>,
131 #[cfg(feature = "apt")]
132 #[builder(default)]
133 another_apt_options: Vec<String>,
134}
135
136fn get_apt_update_lock(download_dir: &Path) -> Result<OwnedFd> {
138 let lock_path = download_dir.join("lock");
139
140 let fd = open(
141 &lock_path,
142 OFlag::O_RDWR | OFlag::O_CREAT | OFlag::O_NOFOLLOW,
143 Mode::from_bits_truncate(0o640),
144 )
145 .map_err(RefreshError::SetLock)?;
146
147 fcntl(&fd, F_SETFD(FdFlag::FD_CLOEXEC)).map_err(RefreshError::SetLock)?;
148
149 let mut fl = flock {
151 l_type: F_WRLCK as i16,
152 l_whence: SEEK_SET as i16,
153 l_start: 0,
154 l_len: 0,
155 l_pid: -1,
156 };
157
158 if let Err(e) = fcntl(&fd, F_SETLK(&fl)) {
159 debug!("{e}");
160
161 if e == Errno::EACCES || e == Errno::EAGAIN {
162 fl.l_type = F_WRLCK as i16;
163 fl.l_whence = SEEK_SET as i16;
164 fl.l_len = 0;
165 fl.l_start = 0;
166 fl.l_pid = -1;
167 fcntl(&fd, F_GETLK(&mut fl)).ok();
168 } else {
169 fl.l_pid = -1;
170 }
171
172 close(fd).map_err(RefreshError::SetLock)?;
173
174 if fl.l_pid != -1 {
175 let mut sys = System::new();
176 sys.refresh_processes(sysinfo::ProcessesToUpdate::All, true);
177 let Some(process) = sys.process(Pid::from(fl.l_pid as usize)) else {
178 return Err(RefreshError::SetLock(e));
179 };
180
181 return Err(RefreshError::SetLockWithProcess(
182 process.name().to_string_lossy().to_string(),
183 fl.l_pid,
184 ));
185 }
186
187 return Err(RefreshError::SetLock(e));
188 }
189
190 Ok(fd)
191}
192
193#[derive(Debug)]
194pub enum Event {
195 DownloadEvent(oma_fetch::Event),
196 ScanningTopic,
197 ClosingTopic(String),
198 TopicNotInMirror { topic: String, mirror: String },
199 RunInvokeScript,
200 SourceListFileNotSupport { path: PathBuf },
201 Done,
202}
203
204impl<'a> OmaRefresh<'a> {
205 #[cfg(feature = "blocking")]
206 pub fn start_blocking(self, callback: impl AsyncFn(Event)) -> Result<Vec<SuccessSummary>> {
207 tokio::runtime::Builder::new_multi_thread()
208 .enable_all()
209 .build()
210 .map_err(RefreshError::CreateTokioRuntime)?
211 .block_on(self.start(callback))
212 }
213
214 pub async fn start(self, callback: impl AsyncFn(Event)) -> Result<Vec<SuccessSummary>> {
215 if self.threads == 0 || self.threads > 255 {
216 return Err(RefreshError::WrongThreadCount(self.threads));
217 }
218
219 #[cfg(feature = "apt")]
220 self.init_apt_options();
221
222 let paths = if let Some(ref paths) = self.sources_lists_paths {
223 paths.to_vec()
224 } else {
225 #[cfg(feature = "apt")]
226 let list_file = if is_termux() {
227 "/data/data/com.termux/files/usr/etc/apt/sources.list".to_string()
228 } else {
229 self.apt_config.file("Dir::Etc::sourcelist", "sources.list")
230 };
231
232 #[cfg(feature = "apt")]
233 let list_dir = if is_termux() {
234 "/data/data/com.termux/files/usr/etc/apt/sources.list.d".to_string()
235 } else {
236 self.apt_config
237 .dir("Dir::Etc::sourceparts", "sources.list.d")
238 };
239
240 #[cfg(feature = "apt")]
241 {
242 debug!("sources.list is: {list_file}");
243 debug!("sources.list.d is: {list_dir}");
244 }
245
246 #[cfg(not(feature = "apt"))]
247 let list_file = if is_termux() {
248 "/data/data/com.termux/files/usr/etc/apt/sources.list".to_string()
249 } else {
250 self.source
251 .join("etc/apt/sources.list")
252 .to_string_lossy()
253 .to_string()
254 };
255
256 #[cfg(not(feature = "apt"))]
257 let list_dir = if is_termux() {
258 "/data/data/com.termux/files/usr/etc/apt/sources.list.d".to_string()
259 } else {
260 self.source
261 .join("etc/apt/sources.list.d")
262 .to_string_lossy()
263 .to_string()
264 };
265
266 scan_sources_lists_paths(list_file, list_dir)
267 .await
268 .map_err(RefreshError::ScanSourceError)?
269 };
270
271 #[cfg(feature = "apt")]
272 let ignores = crate::sourceslist::ignores(self.apt_config);
273
274 #[cfg(not(feature = "apt"))]
275 let ignores = vec![];
276
277 let sourcelist = scan_sources_list_from_paths(&paths, &self.arch, &ignores, &callback)
278 .await
279 .map_err(RefreshError::ScanSourceError)?;
280
281 if !self.download_dir.is_dir() {
282 fs::create_dir_all(&self.download_dir).await.map_err(|e| {
283 RefreshError::FailedToOperateDirOrFile(self.download_dir.display().to_string(), e)
284 })?;
285 }
286
287 let download_dir: Box<Path> = Box::from(self.download_dir.as_path());
288
289 let _fd = spawn_blocking(move || get_apt_update_lock(&download_dir))
290 .await
291 .unwrap()?;
292
293 detect_duplicate_repositories(&sourcelist)?;
294
295 let mut download_list = vec![];
296
297 let replacer = DatabaseFilenameReplacer::new()?;
298 let mirror_sources = self
299 .download_releases(&sourcelist, &replacer, &callback)
300 .await?;
301
302 download_list.extend(mirror_sources.0.iter().flat_map(|x| x.file_name()));
303
304 let (tasks, total, optional_index_files) = self
305 .collect_all_release_entry(&replacer, &mirror_sources)
306 .await?;
307
308 debug!("oma will download source metadata: {tasks:#?}");
309
310 if tasks.is_empty() {
311 return Err(RefreshError::NoMetadataToDownload);
312 }
313
314 for i in &tasks {
315 download_list.push(i.filename.as_str());
316 }
317
318 let (_, res) = tokio::join!(
319 remove_unused_db(&self.download_dir, download_list),
320 self.download_release_data(&callback, &tasks, total, optional_index_files)
321 );
322
323 let res = res?;
325 let should_run_invoke = res.has_wrote();
326
327 if should_run_invoke {
328 callback(Event::RunInvokeScript).await;
329 #[cfg(feature = "apt")]
330 self.run_success_post_invoke().await;
331 }
332
333 callback(Event::Done).await;
334
335 Ok(res.success)
336 }
337
338 #[cfg(feature = "apt")]
339 fn init_apt_options(&self) {
340 if !is_termux() {
341 self.apt_config.set("Dir", &self.source.to_string_lossy());
342 }
343
344 for i in &self.another_apt_options {
345 let (k, v) = i.split_once('=').unwrap_or((i.as_str(), ""));
346 debug!("Setting apt opt: {k}={v}");
347 self.apt_config.set(k, v);
348 }
349
350 if self
352 .apt_config
353 .find_vector("Acquire::CompressionTypes::Order")
354 .is_empty()
355 {
356 self.apt_config.set_vector(
357 "Acquire::CompressionTypes::Order",
358 &vec!["zst", "xz", "bz2", "lzma", "gz", "lz4"],
359 );
360 }
361 }
362
363 async fn download_release_data(
364 &self,
365 callback: &impl AsyncFn(Event),
366 tasks: &[DownloadEntry],
367 total: u64,
368 optional_index_files: HashSet<String>,
369 ) -> Result<Summary> {
370 let dm = DownloadManager::builder()
371 .client(self.client)
372 .download_list(tasks)
373 .threads(self.threads)
374 .total_size(total)
375 .build();
376
377 let res = dm
378 .start_download(|event| async {
379 let mut optional = false;
380 if let oma_fetch::Event::Failed { file_name, .. } = &event
381 && optional_index_files.contains(file_name)
382 {
383 optional = true;
384 }
385
386 if !optional {
387 callback(Event::DownloadEvent(event)).await;
388 }
389 })
390 .await
391 .map_err(RefreshError::DownloadManagerBuilderError)?;
392
393 let mut raise_err = false;
394
395 for fail in &res.failed {
396 if optional_index_files.contains(fail) {
397 debug!("Failed to download optional metadata file {fail}, ignoring.");
398 } else {
399 raise_err = true;
400 }
401 }
402
403 if raise_err {
404 return Err(RefreshError::DownloadFailed(None));
405 }
406
407 Ok(res)
408 }
409
410 #[cfg(feature = "apt")]
411 async fn run_success_post_invoke(&self) {
412 use tokio::process::Command;
413 use tracing::warn;
414
415 let cmds = self
416 .apt_config
417 .find_vector("APT::Update::Post-Invoke-Success");
418
419 for cmd in &cmds {
420 debug!("Running post-invoke script: {cmd}");
421 let output = Command::new("sh").arg("-c").arg(cmd).output().await;
422
423 match output {
424 Ok(output) => {
425 if !output.status.success() {
426 warn!(
427 "Command {cmd} returned non-zero exit code: {}",
428 output.status.code().unwrap_or(1)
429 );
430 continue;
431 }
432 debug!("Command {cmd} completed successfully.");
433 }
434 Err(e) => {
435 warn!("Command {cmd} exited with error: {e}");
436 }
437 }
438 }
439 }
440
441 async fn download_releases(
442 &self,
443 sourcelist: &'a [OmaSourceEntry<'a>],
444 replacer: &DatabaseFilenameReplacer,
445 callback: &impl AsyncFn(Event),
446 ) -> Result<MirrorSources<'a>> {
447 #[cfg(feature = "aosc")]
448 let mut not_found = vec![];
449
450 #[cfg(not(feature = "aosc"))]
451 let not_found = vec![];
452
453 let mut mirror_sources =
454 MirrorSources::from_sourcelist(sourcelist, replacer, self.auth_config)?;
455
456 let results = mirror_sources
457 .fetch_all_release(
458 self.client,
459 replacer,
460 &self.download_dir,
461 self.threads,
462 callback,
463 )
464 .await;
465
466 debug!("download_releases returned: {:?}", results);
467
468 #[cfg(feature = "aosc")]
469 for result in results {
470 if let Err(e) = result {
471 match e {
472 RefreshError::DownloadFailed(Some(SingleDownloadError::ReqwestError {
473 source,
474 })) if source
475 .status()
476 .map(|x| x == StatusCode::NOT_FOUND)
477 .unwrap_or(false)
478 && self.refresh_topics =>
479 {
480 let url = source.url().map(|x| x.to_owned());
481 not_found.push(url.unwrap());
482 }
483 _ => return Err(e),
484 }
485 }
486 }
487
488 #[cfg(not(feature = "aosc"))]
489 results.into_iter().collect::<Result<Vec<_>>>()?;
490
491 self.refresh_topics(callback, not_found, &mut mirror_sources)
492 .await?;
493
494 Ok(mirror_sources)
495 }
496
497 #[cfg(feature = "aosc")]
498 async fn refresh_topics(
499 &self,
500 callback: &impl AsyncFn(Event),
501 not_found: Vec<url::Url>,
502 sources: &mut MirrorSources<'a>,
503 ) -> Result<()> {
504 if !self.refresh_topics || not_found.is_empty() {
505 return Ok(());
506 }
507
508 callback(Event::ScanningTopic).await;
509 let mut tm = TopicManager::new(self.client, &self.source, &self.arch, false).await?;
510 tm.refresh().await?;
511 let removed_suites = tm.remove_closed_topics()?;
512
513 debug!("Removed suites: {:?}", removed_suites);
514
515 for url in not_found {
516 let suite = url
517 .path_segments()
518 .and_then(|mut x| x.nth_back(1).map(|x| x.to_string()))
519 .ok_or_else(|| RefreshError::InvalidUrl(url.to_string()))?;
520
521 if !removed_suites.contains(&suite)
522 && !tm.enabled_topics().iter().any(|x| x.name == suite)
523 {
524 return Err(RefreshError::NoInReleaseFile(url.to_string()));
525 }
526
527 let pos = sources.0.iter().position(|x| x.suite() == suite).unwrap();
528 sources.0.remove(pos);
529
530 callback(Event::ClosingTopic(suite)).await;
531 }
532
533 tm.write_enabled(false).await?;
534 tm.write_sources_list(self.topic_msg, false, async move |topic, mirror| {
535 callback(Event::TopicNotInMirror { topic, mirror }).await
536 })
537 .await?;
538
539 callback(Event::DownloadEvent(oma_fetch::Event::ProgressDone(1))).await;
540
541 Ok(())
542 }
543
544 #[cfg(not(feature = "aosc"))]
545 async fn refresh_topics(
546 &self,
547 _callback: &impl AsyncFn(Event),
548 _not_found: Vec<url::Url>,
549 _sources: &mut MirrorSources<'a>,
550 ) -> Result<()> {
551 Ok(())
552 }
553
554 async fn collect_all_release_entry(
555 &self,
556 replacer: &DatabaseFilenameReplacer,
557 mirror_sources: &MirrorSources<'a>,
558 ) -> Result<(Vec<DownloadEntry>, u64, HashSet<String>)> {
559 let mut total = 0;
560 let mut tasks = vec![];
561
562 #[cfg(feature = "apt")]
563 let index_target_config =
564 IndexTargetConfig::new_from_apt_config(self.apt_config, &self.arch);
565
566 #[cfg(not(feature = "apt"))]
567 let index_target_config =
568 IndexTargetConfig::new(self.manifest_config.clone(), vec![], &self.arch);
569
570 let archs_from_file = fs::read_to_string("/var/lib/dpkg/arch").await;
571
572 let archs_from_file = if let Ok(file) = archs_from_file {
573 let res = file.lines().map(|x| x.to_string()).collect::<Vec<_>>();
574
575 if res.is_empty() { None } else { Some(res) }
576 } else {
577 None
578 };
579
580 let mut flat_repo_no_release = vec![];
581
582 let mut optional_index_files = HashSet::with_hasher(ahash::RandomState::new());
583
584 for m in &mirror_sources.0 {
585 let file_name = match m.file_name() {
586 Some(name) => name,
587 None => {
588 flat_repo_no_release.push(m);
589 continue;
590 }
591 };
592
593 let inrelease_path = self.download_dir.join(file_name);
594
595 let mut handle = HashSet::with_hasher(ahash::RandomState::new());
596
597 let inrelease = fs::read_to_string(&inrelease_path).await.map_err(|e| {
598 RefreshError::FailedToOperateDirOrFile(inrelease_path.display().to_string(), e)
599 })?;
600
601 let inrelease = verify_inrelease(
602 &inrelease,
603 m.signed_by(),
604 &self.source,
605 &inrelease_path,
606 m.trusted(),
607 )
608 .map_err(|e| RefreshError::InReleaseParseError(inrelease_path.to_path_buf(), e))?;
609
610 let release: Release = inrelease
611 .parse()
612 .map_err(|e| RefreshError::InReleaseParseError(inrelease_path.to_path_buf(), e))?;
613
614 if !m.is_flat() {
615 let now = Utc::now();
616
617 release.check_date(&now).map_err(|e| {
618 RefreshError::InReleaseParseError(inrelease_path.to_path_buf(), e)
619 })?;
620
621 release.check_valid_until(&now).map_err(|e| {
622 RefreshError::InReleaseParseError(inrelease_path.to_path_buf(), e)
623 })?;
624 }
625
626 let checksums = &release
627 .get_or_try_init_checksum_type_and_list()
628 .map_err(|e| RefreshError::InReleaseParseError(inrelease_path.to_path_buf(), e))?
629 .1;
630
631 let arch_from_local_configure = if let Some(ref f) = archs_from_file {
632 f.iter().map(|x| x.as_str()).collect::<Vec<_>>()
633 } else {
634 vec![self.arch.as_str()]
635 };
636
637 debug!("Got source entries: {:#?}", m.sources);
638
639 for ose in &m.sources {
640 let archs = if let Some(archs) = ose.archs()
641 && !archs.is_empty()
642 {
643 let archs = archs.iter().map(|x| x.as_str()).collect::<Vec<_>>();
644 if arch_from_local_configure.iter().all(|x| !archs.contains(x)) {
645 warn!(
646 "Mirror {} does not contain architectures enabled in local configuration ({} enabled, {} available from the mirror)",
647 ose.url(),
648 arch_from_local_configure
649 .iter()
650 .map(|x| format!("'{x}'"))
651 .collect::<Vec<_>>()
652 .join(" "),
653 archs
654 .iter()
655 .map(|x| format!("'{x}'"))
656 .collect::<Vec<_>>()
657 .join(" ")
658 );
659 }
660
661 archs
662 } else {
663 arch_from_local_configure.clone()
664 };
665
666 debug!("archs: {:?}", archs);
667
668 let download_list = index_target_config.get_download_list(
669 checksums,
670 ose.is_source(),
671 ose.is_flat(),
672 archs,
673 ose.components(),
674 )?;
675
676 get_all_need_db_from_config(download_list, &mut total, checksums, &mut handle);
677 }
678
679 for i in &flat_repo_no_release {
680 collect_flat_repo_no_release(i, &self.download_dir, &mut tasks, replacer)?;
681 }
682
683 for c in &handle {
684 collect_download_task(
685 c,
686 m,
687 &self.download_dir,
688 &mut tasks,
689 &release,
690 replacer,
691 &mut optional_index_files,
692 )?;
693 }
694 }
695
696 Ok((tasks, total, optional_index_files))
697 }
698}
699
700pub fn content_length(resp: &Response) -> u64 {
701 let content_length = resp
702 .headers()
703 .get(CONTENT_LENGTH)
704 .map(Cow::Borrowed)
705 .unwrap_or(Cow::Owned(HeaderValue::from(0)));
706
707 content_length
708 .to_str()
709 .ok()
710 .and_then(|x| x.parse::<u64>().ok())
711 .unwrap_or_default()
712}
713
714fn detect_duplicate_repositories(sourcelist: &[OmaSourceEntry<'_>]) -> Result<()> {
715 let mut map = AHashMap::new();
716
717 for i in sourcelist {
718 if !map.contains_key(&(i.url(), i.suite())) {
719 map.insert((i.url(), i.suite()), vec![i]);
720 } else {
721 map.get_mut(&(i.url(), i.suite())).unwrap().push(i);
722 }
723 }
724
725 for ose_list in map.values() {
732 let mut no_dups_components = HashSet::with_hasher(ahash::RandomState::new());
733
734 for ose in ose_list {
735 for c in ose.components() {
736 if !no_dups_components.contains(&(c, ose.is_source())) {
737 no_dups_components.insert((c, ose.is_source()));
738 } else {
739 return Err(RefreshError::DuplicateComponents(
740 ose.url().into(),
741 c.to_string(),
742 ));
743 }
744 }
745 }
746 }
747
748 Ok(())
749}
750
751fn get_all_need_db_from_config(
752 filter_checksums: Vec<ChecksumDownloadEntry>,
753 total: &mut u64,
754 checksums: &[ChecksumItem],
755 handle: &mut HashSet<ChecksumDownloadEntry>,
756) {
757 for i in filter_checksums {
758 if handle.contains(&i) {
759 continue;
760 }
761
762 if i.keep_compress {
763 *total += i.item.size;
764 } else {
765 let size = if file_is_compress(&i.item.name) {
766 let (_, name_without_compress) = split_ext_and_filename(&i.item.name);
767
768 checksums
769 .iter()
770 .find_map(|x| {
771 if x.name == name_without_compress {
772 Some(x.size)
773 } else {
774 None
775 }
776 })
777 .unwrap_or(i.item.size)
778 } else {
779 i.item.size
780 };
781
782 *total += size;
783 }
784
785 handle.insert(i);
786 }
787}
788
789async fn remove_unused_db(download_dir: &Path, download_list: Vec<&str>) -> Result<()> {
790 let mut download_dir = fs::read_dir(&download_dir)
791 .await
792 .map_err(|e| RefreshError::ReadDownloadDir(download_dir.display().to_string(), e))?;
793
794 while let Ok(Some(x)) = download_dir.next_entry().await {
795 if x.path().is_file()
796 && !download_list.contains(&&*x.file_name().to_string_lossy())
797 && x.file_name() != "lock"
798 {
799 debug!("Removing {:?}", x.file_name());
800 if let Err(e) = fs::remove_file(x.path()).await {
801 debug!("Failed to remove file {:?}: {e}", x.file_name());
802 }
803 }
804 }
805
806 Ok(())
807}
808
809fn collect_flat_repo_no_release(
810 mirror_source: &MirrorSource,
811 download_dir: &Path,
812 tasks: &mut Vec<DownloadEntry>,
813 replacer: &DatabaseFilenameReplacer,
814) -> Result<()> {
815 let msg = mirror_source.get_human_download_message(Some("Packages"))?;
816
817 let dist_url = mirror_source.dist_path();
818
819 let from = match mirror_source.from()? {
820 OmaSourceEntryFrom::Http => DownloadSourceType::Http {
821 auth: mirror_source
822 .auth()
823 .as_ref()
824 .map(|auth| (auth.login.clone(), auth.password.clone())),
825 },
826 OmaSourceEntryFrom::Local => DownloadSourceType::Local(mirror_source.is_flat()),
827 };
828
829 let download_url = format!("{dist_url}/Packages");
830 let file_path = format!("{dist_url}Packages");
831
832 let sources = vec![DownloadSource {
833 url: download_url.clone(),
834 source_type: from,
835 }];
836
837 let task = DownloadEntry::builder()
838 .source(sources)
839 .filename(replacer.replace(&file_path)?)
840 .dir(download_dir.to_path_buf())
841 .allow_resume(false)
842 .msg(msg.into())
843 .file_type(CompressFile::Nothing)
844 .build();
845
846 tasks.push(task);
847
848 Ok(())
849}
850
851fn collect_download_task(
852 c: &ChecksumDownloadEntry,
853 mirror_source: &MirrorSource<'_>,
854 download_dir: &Path,
855 tasks: &mut Vec<DownloadEntry>,
856 release: &Release,
857 replacer: &DatabaseFilenameReplacer,
858 optional_set: &mut HashSet<String>,
859) -> Result<()> {
860 let file_type = &c.msg;
861
862 let msg = mirror_source.get_human_download_message(Some(file_type))?;
863
864 let from = match mirror_source.from()? {
865 OmaSourceEntryFrom::Http => DownloadSourceType::Http {
866 auth: mirror_source
867 .auth()
868 .as_ref()
869 .map(|auth| (auth.login.clone(), auth.password.clone())),
870 },
871 OmaSourceEntryFrom::Local => DownloadSourceType::Local(
872 mirror_source.is_flat()
873 && (!file_is_compress(&c.item.name)
874 || (file_is_compress(&c.item.name) && c.keep_compress)),
875 ),
876 };
877
878 let not_compress_filename_before = if file_is_compress(&c.item.name) {
879 Cow::Owned(split_ext_and_filename(&c.item.name).1)
880 } else {
881 Cow::Borrowed(&c.item.name)
882 };
883
884 let checksum = if c.keep_compress {
885 Some(&c.item.checksum)
886 } else {
887 release
888 .checksum_type_and_list()
889 .1
890 .iter()
891 .find(|x| x.name == *not_compress_filename_before)
892 .as_ref()
893 .map(|c| &c.checksum)
894 };
895
896 let download_url = if release.acquire_by_hash() {
897 let path = Path::new(&c.item.name);
898 let parent = path.parent().unwrap_or(path);
899 let dir = match release.checksum_type_and_list().0 {
900 InReleaseChecksum::Sha256 => "SHA256",
901 InReleaseChecksum::Sha512 => "SHA512",
902 InReleaseChecksum::Md5 => "MD5Sum",
903 };
904
905 let path = parent.join("by-hash").join(dir).join(&c.item.checksum);
906
907 mirror_source.get_download_url(&path.display().to_string())
908 } else {
909 mirror_source.get_download_url(&c.item.name)
910 };
911
912 let sources = vec![DownloadSource {
913 url: download_url.to_string(),
914 source_type: from,
915 }];
916
917 let file_name = if c.keep_compress {
918 mirror_source.get_download_file_name(Some(&c.item.name), replacer)?
919 } else {
920 mirror_source.get_download_file_name(Some(¬_compress_filename_before), replacer)?
921 };
922
923 if c.optional {
924 optional_set.insert(file_name.clone());
925 }
926
927 let task = DownloadEntry::builder()
928 .source(sources)
929 .filename(file_name)
930 .dir(download_dir.to_path_buf())
931 .allow_resume(false)
932 .msg(msg.into())
933 .file_type({
934 if c.keep_compress {
935 CompressFile::Nothing
936 } else {
937 match Path::new(&c.item.name).extension().and_then(|x| x.to_str()) {
938 Some("gz") => CompressFile::Gzip,
939 Some("xz") => CompressFile::Xz,
940 Some("bz2") => CompressFile::Bz2,
941 Some("zst") => CompressFile::Zstd,
942 Some("lzma") => CompressFile::Lzma,
943 Some("lz4") => CompressFile::Lz4,
944 _ => CompressFile::Nothing,
945 }
946 }
947 })
948 .maybe_hash(if let Some(checksum) = checksum {
949 match release.checksum_type_and_list().0 {
950 InReleaseChecksum::Sha256 => Some(Checksum::from_sha256_str(checksum)?),
951 InReleaseChecksum::Sha512 => Some(Checksum::from_sha512_str(checksum)?),
952 InReleaseChecksum::Md5 => Some(Checksum::from_md5_str(checksum)?),
953 }
954 } else {
955 None
956 })
957 .build();
958
959 tasks.push(task);
960
961 Ok(())
962}