oma_refresh/
db.rs

1use std::{
2    borrow::Cow,
3    os::fd::OwnedFd,
4    path::{Path, PathBuf},
5};
6
7use ahash::{AHashMap, HashSet};
8use aho_corasick::BuildError;
9use apt_auth_config::AuthConfig;
10use bon::Builder;
11use chrono::Utc;
12use nix::{
13    errno::Errno,
14    fcntl::{
15        FcntlArg::{F_GETLK, F_SETFD, F_SETLK},
16        FdFlag, OFlag, fcntl, open,
17    },
18    libc::{F_WRLCK, SEEK_SET, flock},
19    sys::stat::Mode,
20    unistd::close,
21};
22#[cfg(feature = "apt")]
23use oma_apt::config::Config;
24use oma_apt_sources_lists::SourcesListError;
25use oma_fetch::{
26    CompressFile, DownloadEntry, DownloadManager, DownloadSource, DownloadSourceType,
27    checksum::{Checksum, ChecksumError},
28    download::{BuilderError, SuccessSummary},
29    reqwest::{
30        Client, Response,
31        header::{CONTENT_LENGTH, HeaderValue},
32    },
33};
34
35use oma_fetch::{SingleDownloadError, Summary};
36#[cfg(feature = "aosc")]
37use oma_topics::TopicManager;
38
39#[cfg(feature = "aosc")]
40use oma_fetch::reqwest::StatusCode;
41
42use oma_utils::is_termux;
43use sysinfo::{Pid, System};
44use tokio::{
45    fs::{self},
46    task::spawn_blocking,
47};
48use tracing::{debug, warn};
49
50use crate::sourceslist::{MirrorSource, MirrorSources, scan_sources_list_from_paths};
51use crate::{
52    config::{ChecksumDownloadEntry, IndexTargetConfig},
53    inrelease::{
54        ChecksumItem, InReleaseChecksum, InReleaseError, Release, file_is_compress,
55        split_ext_and_filename, verify_inrelease,
56    },
57    sourceslist::{OmaSourceEntry, OmaSourceEntryFrom, scan_sources_lists_paths},
58    util::DatabaseFilenameReplacer,
59};
60
61#[derive(Debug, thiserror::Error)]
62pub enum RefreshError {
63    #[cfg(feature = "blocking")]
64    #[error("Failed to create tokio runtime")]
65    CreateTokioRuntime(std::io::Error),
66    #[error("Invalid URL: {0}")]
67    InvalidUrl(String),
68    #[error("Scan sources.list failed: {0}")]
69    ScanSourceError(SourcesListError),
70    #[error("Unsupported Protocol: {0}")]
71    UnsupportedProtocol(String),
72    #[error("Failed to download some metadata")]
73    DownloadFailed(Option<SingleDownloadError>),
74    #[cfg(feature = "aosc")]
75    #[error(transparent)]
76    TopicsError(#[from] oma_topics::OmaTopicsError),
77    #[error("Failed to download InRelease from URL {0}: Remote file not found (HTTP 404).")]
78    NoInReleaseFile(String),
79    #[error(transparent)]
80    JoinError(#[from] tokio::task::JoinError),
81    #[error(transparent)]
82    ChecksumError(#[from] ChecksumError),
83    #[error("Failed to operate dir or file {0}: {1}")]
84    FailedToOperateDirOrFile(String, tokio::io::Error),
85    #[error("Failed to parse InRelease file: {0}")]
86    InReleaseParseError(PathBuf, InReleaseError),
87    #[error("Failed to read download dir: {0}")]
88    ReadDownloadDir(String, std::io::Error),
89    #[error(transparent)]
90    AhoCorasickBuilder(#[from] BuildError),
91    #[error("stream_replace_all failed")]
92    ReplaceAll(std::io::Error),
93    #[error("Set lock failed")]
94    SetLock(Errno),
95    #[error("Set lock failed: process {0} ({1}) is using.")]
96    SetLockWithProcess(String, i32),
97    #[error("duplicate components")]
98    DuplicateComponents(Box<str>, String),
99    #[error("sources.list is empty")]
100    SourceListsEmpty,
101    #[error("Failed to operate file: {0}")]
102    OperateFile(PathBuf, std::io::Error),
103    #[error("thread count is not illegal: {0}")]
104    WrongThreadCount(usize),
105    #[error("Failed to build download manager")]
106    DownloadManagerBuilderError(BuilderError),
107    #[error("No metadata file to download")]
108    NoMetadataToDownload,
109}
110
111type Result<T> = std::result::Result<T, RefreshError>;
112
113#[derive(Builder)]
114pub struct OmaRefresh<'a> {
115    source: PathBuf,
116    #[builder(default = 4)]
117    threads: usize,
118    arch: String,
119    download_dir: PathBuf,
120    client: &'a Client,
121    #[cfg(feature = "aosc")]
122    refresh_topics: bool,
123    #[cfg(feature = "apt")]
124    apt_config: &'a Config,
125    #[cfg(not(feature = "apt"))]
126    manifest_config: Vec<std::collections::HashMap<String, String>>,
127    #[cfg(feature = "aosc")]
128    topic_msg: &'a str,
129    auth_config: Option<&'a AuthConfig>,
130    sources_lists_paths: Option<Vec<PathBuf>>,
131    #[cfg(feature = "apt")]
132    #[builder(default)]
133    another_apt_options: Vec<String>,
134}
135
136/// Create `apt update` file lock
137fn get_apt_update_lock(download_dir: &Path) -> Result<OwnedFd> {
138    let lock_path = download_dir.join("lock");
139
140    let fd = open(
141        &lock_path,
142        OFlag::O_RDWR | OFlag::O_CREAT | OFlag::O_NOFOLLOW,
143        Mode::from_bits_truncate(0o640),
144    )
145    .map_err(RefreshError::SetLock)?;
146
147    fcntl(&fd, F_SETFD(FdFlag::FD_CLOEXEC)).map_err(RefreshError::SetLock)?;
148
149    // From apt libapt-pkg/fileutil.cc:287
150    let mut fl = flock {
151        l_type: F_WRLCK as i16,
152        l_whence: SEEK_SET as i16,
153        l_start: 0,
154        l_len: 0,
155        l_pid: -1,
156    };
157
158    if let Err(e) = fcntl(&fd, F_SETLK(&fl)) {
159        debug!("{e}");
160
161        if e == Errno::EACCES || e == Errno::EAGAIN {
162            fl.l_type = F_WRLCK as i16;
163            fl.l_whence = SEEK_SET as i16;
164            fl.l_len = 0;
165            fl.l_start = 0;
166            fl.l_pid = -1;
167            fcntl(&fd, F_GETLK(&mut fl)).ok();
168        } else {
169            fl.l_pid = -1;
170        }
171
172        close(fd).map_err(RefreshError::SetLock)?;
173
174        if fl.l_pid != -1 {
175            let mut sys = System::new();
176            sys.refresh_processes(sysinfo::ProcessesToUpdate::All, true);
177            let Some(process) = sys.process(Pid::from(fl.l_pid as usize)) else {
178                return Err(RefreshError::SetLock(e));
179            };
180
181            return Err(RefreshError::SetLockWithProcess(
182                process.name().to_string_lossy().to_string(),
183                fl.l_pid,
184            ));
185        }
186
187        return Err(RefreshError::SetLock(e));
188    }
189
190    Ok(fd)
191}
192
193#[derive(Debug)]
194pub enum Event {
195    DownloadEvent(oma_fetch::Event),
196    ScanningTopic,
197    ClosingTopic(String),
198    TopicNotInMirror { topic: String, mirror: String },
199    RunInvokeScript,
200    SourceListFileNotSupport { path: PathBuf },
201    Done,
202}
203
204impl<'a> OmaRefresh<'a> {
205    #[cfg(feature = "blocking")]
206    pub fn start_blocking(self, callback: impl AsyncFn(Event)) -> Result<Vec<SuccessSummary>> {
207        tokio::runtime::Builder::new_multi_thread()
208            .enable_all()
209            .build()
210            .map_err(RefreshError::CreateTokioRuntime)?
211            .block_on(self.start(callback))
212    }
213
214    pub async fn start(self, callback: impl AsyncFn(Event)) -> Result<Vec<SuccessSummary>> {
215        if self.threads == 0 || self.threads > 255 {
216            return Err(RefreshError::WrongThreadCount(self.threads));
217        }
218
219        #[cfg(feature = "apt")]
220        self.init_apt_options();
221
222        let paths = if let Some(ref paths) = self.sources_lists_paths {
223            paths.to_vec()
224        } else {
225            #[cfg(feature = "apt")]
226            let list_file = if is_termux() {
227                "/data/data/com.termux/files/usr/etc/apt/sources.list".to_string()
228            } else {
229                self.apt_config.file("Dir::Etc::sourcelist", "sources.list")
230            };
231
232            #[cfg(feature = "apt")]
233            let list_dir = if is_termux() {
234                "/data/data/com.termux/files/usr/etc/apt/sources.list.d".to_string()
235            } else {
236                self.apt_config
237                    .dir("Dir::Etc::sourceparts", "sources.list.d")
238            };
239
240            #[cfg(feature = "apt")]
241            {
242                debug!("sources.list is: {list_file}");
243                debug!("sources.list.d is: {list_dir}");
244            }
245
246            #[cfg(not(feature = "apt"))]
247            let list_file = if is_termux() {
248                "/data/data/com.termux/files/usr/etc/apt/sources.list".to_string()
249            } else {
250                self.source
251                    .join("etc/apt/sources.list")
252                    .to_string_lossy()
253                    .to_string()
254            };
255
256            #[cfg(not(feature = "apt"))]
257            let list_dir = if is_termux() {
258                "/data/data/com.termux/files/usr/etc/apt/sources.list.d".to_string()
259            } else {
260                self.source
261                    .join("etc/apt/sources.list.d")
262                    .to_string_lossy()
263                    .to_string()
264            };
265
266            scan_sources_lists_paths(list_file, list_dir)
267                .await
268                .map_err(RefreshError::ScanSourceError)?
269        };
270
271        #[cfg(feature = "apt")]
272        let ignores = crate::sourceslist::ignores(self.apt_config);
273
274        #[cfg(not(feature = "apt"))]
275        let ignores = vec![];
276
277        let sourcelist = scan_sources_list_from_paths(&paths, &self.arch, &ignores, &callback)
278            .await
279            .map_err(RefreshError::ScanSourceError)?;
280
281        if !self.download_dir.is_dir() {
282            fs::create_dir_all(&self.download_dir).await.map_err(|e| {
283                RefreshError::FailedToOperateDirOrFile(self.download_dir.display().to_string(), e)
284            })?;
285        }
286
287        let download_dir: Box<Path> = Box::from(self.download_dir.as_path());
288
289        let _fd = spawn_blocking(move || get_apt_update_lock(&download_dir))
290            .await
291            .unwrap()?;
292
293        detect_duplicate_repositories(&sourcelist)?;
294
295        let mut download_list = vec![];
296
297        let replacer = DatabaseFilenameReplacer::new()?;
298        let mirror_sources = self
299            .download_releases(&sourcelist, &replacer, &callback)
300            .await?;
301
302        download_list.extend(mirror_sources.0.iter().flat_map(|x| x.file_name()));
303
304        let (tasks, total, optional_index_files) = self
305            .collect_all_release_entry(&replacer, &mirror_sources)
306            .await?;
307
308        debug!("oma will download source metadata: {tasks:#?}");
309
310        if tasks.is_empty() {
311            return Err(RefreshError::NoMetadataToDownload);
312        }
313
314        for i in &tasks {
315            download_list.push(i.filename.as_str());
316        }
317
318        let (_, res) = tokio::join!(
319            remove_unused_db(&self.download_dir, download_list),
320            self.download_release_data(&callback, &tasks, total, optional_index_files)
321        );
322
323        // 有元数据更新才执行 success invoke
324        let res = res?;
325        let should_run_invoke = res.has_wrote();
326
327        if should_run_invoke {
328            callback(Event::RunInvokeScript).await;
329            #[cfg(feature = "apt")]
330            self.run_success_post_invoke().await;
331        }
332
333        callback(Event::Done).await;
334
335        Ok(res.success)
336    }
337
338    #[cfg(feature = "apt")]
339    fn init_apt_options(&self) {
340        if !is_termux() {
341            self.apt_config.set("Dir", &self.source.to_string_lossy());
342        }
343
344        for i in &self.another_apt_options {
345            let (k, v) = i.split_once('=').unwrap_or((i.as_str(), ""));
346            debug!("Setting apt opt: {k}={v}");
347            self.apt_config.set(k, v);
348        }
349
350        // default compression order
351        if self
352            .apt_config
353            .find_vector("Acquire::CompressionTypes::Order")
354            .is_empty()
355        {
356            self.apt_config.set_vector(
357                "Acquire::CompressionTypes::Order",
358                &vec!["zst", "xz", "bz2", "lzma", "gz", "lz4"],
359            );
360        }
361    }
362
363    async fn download_release_data(
364        &self,
365        callback: &impl AsyncFn(Event),
366        tasks: &[DownloadEntry],
367        total: u64,
368        optional_index_files: HashSet<String>,
369    ) -> Result<Summary> {
370        let dm = DownloadManager::builder()
371            .client(self.client)
372            .download_list(tasks)
373            .threads(self.threads)
374            .total_size(total)
375            .build();
376
377        let res = dm
378            .start_download(|event| async {
379                let mut optional = false;
380                if let oma_fetch::Event::Failed { file_name, .. } = &event
381                    && optional_index_files.contains(file_name)
382                {
383                    optional = true;
384                }
385
386                if !optional {
387                    callback(Event::DownloadEvent(event)).await;
388                }
389            })
390            .await
391            .map_err(RefreshError::DownloadManagerBuilderError)?;
392
393        let mut raise_err = false;
394
395        for fail in &res.failed {
396            if optional_index_files.contains(fail) {
397                debug!("Failed to download optional metadata file {fail}, ignoring.");
398            } else {
399                raise_err = true;
400            }
401        }
402
403        if raise_err {
404            return Err(RefreshError::DownloadFailed(None));
405        }
406
407        Ok(res)
408    }
409
410    #[cfg(feature = "apt")]
411    async fn run_success_post_invoke(&self) {
412        use tokio::process::Command;
413        use tracing::warn;
414
415        let cmds = self
416            .apt_config
417            .find_vector("APT::Update::Post-Invoke-Success");
418
419        for cmd in &cmds {
420            debug!("Running post-invoke script: {cmd}");
421            let output = Command::new("sh").arg("-c").arg(cmd).output().await;
422
423            match output {
424                Ok(output) => {
425                    if !output.status.success() {
426                        warn!(
427                            "Command {cmd} returned non-zero exit code: {}",
428                            output.status.code().unwrap_or(1)
429                        );
430                        continue;
431                    }
432                    debug!("Command {cmd} completed successfully.");
433                }
434                Err(e) => {
435                    warn!("Command {cmd} exited with error: {e}");
436                }
437            }
438        }
439    }
440
441    async fn download_releases(
442        &self,
443        sourcelist: &'a [OmaSourceEntry<'a>],
444        replacer: &DatabaseFilenameReplacer,
445        callback: &impl AsyncFn(Event),
446    ) -> Result<MirrorSources<'a>> {
447        #[cfg(feature = "aosc")]
448        let mut not_found = vec![];
449
450        #[cfg(not(feature = "aosc"))]
451        let not_found = vec![];
452
453        let mut mirror_sources =
454            MirrorSources::from_sourcelist(sourcelist, replacer, self.auth_config)?;
455
456        let results = mirror_sources
457            .fetch_all_release(
458                self.client,
459                replacer,
460                &self.download_dir,
461                self.threads,
462                callback,
463            )
464            .await;
465
466        debug!("download_releases returned: {:?}", results);
467
468        #[cfg(feature = "aosc")]
469        for result in results {
470            if let Err(e) = result {
471                match e {
472                    RefreshError::DownloadFailed(Some(SingleDownloadError::ReqwestError {
473                        source,
474                    })) if source
475                        .status()
476                        .map(|x| x == StatusCode::NOT_FOUND)
477                        .unwrap_or(false)
478                        && self.refresh_topics =>
479                    {
480                        let url = source.url().map(|x| x.to_owned());
481                        not_found.push(url.unwrap());
482                    }
483                    _ => return Err(e),
484                }
485            }
486        }
487
488        #[cfg(not(feature = "aosc"))]
489        results.into_iter().collect::<Result<Vec<_>>>()?;
490
491        self.refresh_topics(callback, not_found, &mut mirror_sources)
492            .await?;
493
494        Ok(mirror_sources)
495    }
496
497    #[cfg(feature = "aosc")]
498    async fn refresh_topics(
499        &self,
500        callback: &impl AsyncFn(Event),
501        not_found: Vec<url::Url>,
502        sources: &mut MirrorSources<'a>,
503    ) -> Result<()> {
504        if !self.refresh_topics || not_found.is_empty() {
505            return Ok(());
506        }
507
508        callback(Event::ScanningTopic).await;
509        let mut tm = TopicManager::new(self.client, &self.source, &self.arch, false).await?;
510        tm.refresh().await?;
511        let removed_suites = tm.remove_closed_topics()?;
512
513        debug!("Removed suites: {:?}", removed_suites);
514
515        for url in not_found {
516            let suite = url
517                .path_segments()
518                .and_then(|mut x| x.nth_back(1).map(|x| x.to_string()))
519                .ok_or_else(|| RefreshError::InvalidUrl(url.to_string()))?;
520
521            if !removed_suites.contains(&suite)
522                && !tm.enabled_topics().iter().any(|x| x.name == suite)
523            {
524                return Err(RefreshError::NoInReleaseFile(url.to_string()));
525            }
526
527            let pos = sources.0.iter().position(|x| x.suite() == suite).unwrap();
528            sources.0.remove(pos);
529
530            callback(Event::ClosingTopic(suite)).await;
531        }
532
533        tm.write_enabled(false).await?;
534        tm.write_sources_list(self.topic_msg, false, async move |topic, mirror| {
535            callback(Event::TopicNotInMirror { topic, mirror }).await
536        })
537        .await?;
538
539        callback(Event::DownloadEvent(oma_fetch::Event::ProgressDone(1))).await;
540
541        Ok(())
542    }
543
544    #[cfg(not(feature = "aosc"))]
545    async fn refresh_topics(
546        &self,
547        _callback: &impl AsyncFn(Event),
548        _not_found: Vec<url::Url>,
549        _sources: &mut MirrorSources<'a>,
550    ) -> Result<()> {
551        Ok(())
552    }
553
554    async fn collect_all_release_entry(
555        &self,
556        replacer: &DatabaseFilenameReplacer,
557        mirror_sources: &MirrorSources<'a>,
558    ) -> Result<(Vec<DownloadEntry>, u64, HashSet<String>)> {
559        let mut total = 0;
560        let mut tasks = vec![];
561
562        #[cfg(feature = "apt")]
563        let index_target_config =
564            IndexTargetConfig::new_from_apt_config(self.apt_config, &self.arch);
565
566        #[cfg(not(feature = "apt"))]
567        let index_target_config =
568            IndexTargetConfig::new(self.manifest_config.clone(), vec![], &self.arch);
569
570        let archs_from_file = fs::read_to_string("/var/lib/dpkg/arch").await;
571
572        let archs_from_file = if let Ok(file) = archs_from_file {
573            let res = file.lines().map(|x| x.to_string()).collect::<Vec<_>>();
574
575            if res.is_empty() { None } else { Some(res) }
576        } else {
577            None
578        };
579
580        let mut flat_repo_no_release = vec![];
581
582        let mut optional_index_files = HashSet::with_hasher(ahash::RandomState::new());
583
584        for m in &mirror_sources.0 {
585            let file_name = match m.file_name() {
586                Some(name) => name,
587                None => {
588                    flat_repo_no_release.push(m);
589                    continue;
590                }
591            };
592
593            let inrelease_path = self.download_dir.join(file_name);
594
595            let mut handle = HashSet::with_hasher(ahash::RandomState::new());
596
597            let inrelease = fs::read_to_string(&inrelease_path).await.map_err(|e| {
598                RefreshError::FailedToOperateDirOrFile(inrelease_path.display().to_string(), e)
599            })?;
600
601            let inrelease = verify_inrelease(
602                &inrelease,
603                m.signed_by(),
604                &self.source,
605                &inrelease_path,
606                m.trusted(),
607            )
608            .map_err(|e| RefreshError::InReleaseParseError(inrelease_path.to_path_buf(), e))?;
609
610            let release: Release = inrelease
611                .parse()
612                .map_err(|e| RefreshError::InReleaseParseError(inrelease_path.to_path_buf(), e))?;
613
614            if !m.is_flat() {
615                let now = Utc::now();
616
617                release.check_date(&now).map_err(|e| {
618                    RefreshError::InReleaseParseError(inrelease_path.to_path_buf(), e)
619                })?;
620
621                release.check_valid_until(&now).map_err(|e| {
622                    RefreshError::InReleaseParseError(inrelease_path.to_path_buf(), e)
623                })?;
624            }
625
626            let checksums = &release
627                .get_or_try_init_checksum_type_and_list()
628                .map_err(|e| RefreshError::InReleaseParseError(inrelease_path.to_path_buf(), e))?
629                .1;
630
631            let arch_from_local_configure = if let Some(ref f) = archs_from_file {
632                f.iter().map(|x| x.as_str()).collect::<Vec<_>>()
633            } else {
634                vec![self.arch.as_str()]
635            };
636
637            debug!("Got source entries: {:#?}", m.sources);
638
639            for ose in &m.sources {
640                let archs = if let Some(archs) = ose.archs()
641                    && !archs.is_empty()
642                {
643                    let archs = archs.iter().map(|x| x.as_str()).collect::<Vec<_>>();
644                    if arch_from_local_configure.iter().all(|x| !archs.contains(x)) {
645                        warn!(
646                            "Mirror {} does not contain architectures enabled in local configuration ({} enabled, {} available from the mirror)",
647                            ose.url(),
648                            arch_from_local_configure
649                                .iter()
650                                .map(|x| format!("'{x}'"))
651                                .collect::<Vec<_>>()
652                                .join(" "),
653                            archs
654                                .iter()
655                                .map(|x| format!("'{x}'"))
656                                .collect::<Vec<_>>()
657                                .join(" ")
658                        );
659                    }
660
661                    archs
662                } else {
663                    arch_from_local_configure.clone()
664                };
665
666                debug!("archs: {:?}", archs);
667
668                let download_list = index_target_config.get_download_list(
669                    checksums,
670                    ose.is_source(),
671                    ose.is_flat(),
672                    archs,
673                    ose.components(),
674                )?;
675
676                get_all_need_db_from_config(download_list, &mut total, checksums, &mut handle);
677            }
678
679            for i in &flat_repo_no_release {
680                collect_flat_repo_no_release(i, &self.download_dir, &mut tasks, replacer)?;
681            }
682
683            for c in &handle {
684                collect_download_task(
685                    c,
686                    m,
687                    &self.download_dir,
688                    &mut tasks,
689                    &release,
690                    replacer,
691                    &mut optional_index_files,
692                )?;
693            }
694        }
695
696        Ok((tasks, total, optional_index_files))
697    }
698}
699
700pub fn content_length(resp: &Response) -> u64 {
701    let content_length = resp
702        .headers()
703        .get(CONTENT_LENGTH)
704        .map(Cow::Borrowed)
705        .unwrap_or(Cow::Owned(HeaderValue::from(0)));
706
707    content_length
708        .to_str()
709        .ok()
710        .and_then(|x| x.parse::<u64>().ok())
711        .unwrap_or_default()
712}
713
714fn detect_duplicate_repositories(sourcelist: &[OmaSourceEntry<'_>]) -> Result<()> {
715    let mut map = AHashMap::new();
716
717    for i in sourcelist {
718        if !map.contains_key(&(i.url(), i.suite())) {
719            map.insert((i.url(), i.suite()), vec![i]);
720        } else {
721            map.get_mut(&(i.url(), i.suite())).unwrap().push(i);
722        }
723    }
724
725    // 查看源配置中是否有重复的源
726    // 重复的源的定义:源地址相同 源类型相同 源 component 有重复项
727    // 比如:
728    // deb https://mirrors.bfsu.edu.cn/anthon/debs stable main
729    // deb https://mirrors.bfsu.edu.cn/anthon/debs stable main contrib
730    // 重复的项为:deb https://mirrors.bfsu.edu.cn/anthon/debs stable main
731    for ose_list in map.values() {
732        let mut no_dups_components = HashSet::with_hasher(ahash::RandomState::new());
733
734        for ose in ose_list {
735            for c in ose.components() {
736                if !no_dups_components.contains(&(c, ose.is_source())) {
737                    no_dups_components.insert((c, ose.is_source()));
738                } else {
739                    return Err(RefreshError::DuplicateComponents(
740                        ose.url().into(),
741                        c.to_string(),
742                    ));
743                }
744            }
745        }
746    }
747
748    Ok(())
749}
750
751fn get_all_need_db_from_config(
752    filter_checksums: Vec<ChecksumDownloadEntry>,
753    total: &mut u64,
754    checksums: &[ChecksumItem],
755    handle: &mut HashSet<ChecksumDownloadEntry>,
756) {
757    for i in filter_checksums {
758        if handle.contains(&i) {
759            continue;
760        }
761
762        if i.keep_compress {
763            *total += i.item.size;
764        } else {
765            let size = if file_is_compress(&i.item.name) {
766                let (_, name_without_compress) = split_ext_and_filename(&i.item.name);
767
768                checksums
769                    .iter()
770                    .find_map(|x| {
771                        if x.name == name_without_compress {
772                            Some(x.size)
773                        } else {
774                            None
775                        }
776                    })
777                    .unwrap_or(i.item.size)
778            } else {
779                i.item.size
780            };
781
782            *total += size;
783        }
784
785        handle.insert(i);
786    }
787}
788
789async fn remove_unused_db(download_dir: &Path, download_list: Vec<&str>) -> Result<()> {
790    let mut download_dir = fs::read_dir(&download_dir)
791        .await
792        .map_err(|e| RefreshError::ReadDownloadDir(download_dir.display().to_string(), e))?;
793
794    while let Ok(Some(x)) = download_dir.next_entry().await {
795        if x.path().is_file()
796            && !download_list.contains(&&*x.file_name().to_string_lossy())
797            && x.file_name() != "lock"
798        {
799            debug!("Removing {:?}", x.file_name());
800            if let Err(e) = fs::remove_file(x.path()).await {
801                debug!("Failed to remove file {:?}: {e}", x.file_name());
802            }
803        }
804    }
805
806    Ok(())
807}
808
809fn collect_flat_repo_no_release(
810    mirror_source: &MirrorSource,
811    download_dir: &Path,
812    tasks: &mut Vec<DownloadEntry>,
813    replacer: &DatabaseFilenameReplacer,
814) -> Result<()> {
815    let msg = mirror_source.get_human_download_message(Some("Packages"))?;
816
817    let dist_url = mirror_source.dist_path();
818
819    let from = match mirror_source.from()? {
820        OmaSourceEntryFrom::Http => DownloadSourceType::Http {
821            auth: mirror_source
822                .auth()
823                .as_ref()
824                .map(|auth| (auth.login.clone(), auth.password.clone())),
825        },
826        OmaSourceEntryFrom::Local => DownloadSourceType::Local(mirror_source.is_flat()),
827    };
828
829    let download_url = format!("{dist_url}/Packages");
830    let file_path = format!("{dist_url}Packages");
831
832    let sources = vec![DownloadSource {
833        url: download_url.clone(),
834        source_type: from,
835    }];
836
837    let task = DownloadEntry::builder()
838        .source(sources)
839        .filename(replacer.replace(&file_path)?)
840        .dir(download_dir.to_path_buf())
841        .allow_resume(false)
842        .msg(msg.into())
843        .file_type(CompressFile::Nothing)
844        .build();
845
846    tasks.push(task);
847
848    Ok(())
849}
850
851fn collect_download_task(
852    c: &ChecksumDownloadEntry,
853    mirror_source: &MirrorSource<'_>,
854    download_dir: &Path,
855    tasks: &mut Vec<DownloadEntry>,
856    release: &Release,
857    replacer: &DatabaseFilenameReplacer,
858    optional_set: &mut HashSet<String>,
859) -> Result<()> {
860    let file_type = &c.msg;
861
862    let msg = mirror_source.get_human_download_message(Some(file_type))?;
863
864    let from = match mirror_source.from()? {
865        OmaSourceEntryFrom::Http => DownloadSourceType::Http {
866            auth: mirror_source
867                .auth()
868                .as_ref()
869                .map(|auth| (auth.login.clone(), auth.password.clone())),
870        },
871        OmaSourceEntryFrom::Local => DownloadSourceType::Local(
872            mirror_source.is_flat()
873                && (!file_is_compress(&c.item.name)
874                    || (file_is_compress(&c.item.name) && c.keep_compress)),
875        ),
876    };
877
878    let not_compress_filename_before = if file_is_compress(&c.item.name) {
879        Cow::Owned(split_ext_and_filename(&c.item.name).1)
880    } else {
881        Cow::Borrowed(&c.item.name)
882    };
883
884    let checksum = if c.keep_compress {
885        Some(&c.item.checksum)
886    } else {
887        release
888            .checksum_type_and_list()
889            .1
890            .iter()
891            .find(|x| x.name == *not_compress_filename_before)
892            .as_ref()
893            .map(|c| &c.checksum)
894    };
895
896    let download_url = if release.acquire_by_hash() {
897        let path = Path::new(&c.item.name);
898        let parent = path.parent().unwrap_or(path);
899        let dir = match release.checksum_type_and_list().0 {
900            InReleaseChecksum::Sha256 => "SHA256",
901            InReleaseChecksum::Sha512 => "SHA512",
902            InReleaseChecksum::Md5 => "MD5Sum",
903        };
904
905        let path = parent.join("by-hash").join(dir).join(&c.item.checksum);
906
907        mirror_source.get_download_url(&path.display().to_string())
908    } else {
909        mirror_source.get_download_url(&c.item.name)
910    };
911
912    let sources = vec![DownloadSource {
913        url: download_url.to_string(),
914        source_type: from,
915    }];
916
917    let file_name = if c.keep_compress {
918        mirror_source.get_download_file_name(Some(&c.item.name), replacer)?
919    } else {
920        mirror_source.get_download_file_name(Some(&not_compress_filename_before), replacer)?
921    };
922
923    if c.optional {
924        optional_set.insert(file_name.clone());
925    }
926
927    let task = DownloadEntry::builder()
928        .source(sources)
929        .filename(file_name)
930        .dir(download_dir.to_path_buf())
931        .allow_resume(false)
932        .msg(msg.into())
933        .file_type({
934            if c.keep_compress {
935                CompressFile::Nothing
936            } else {
937                match Path::new(&c.item.name).extension().and_then(|x| x.to_str()) {
938                    Some("gz") => CompressFile::Gzip,
939                    Some("xz") => CompressFile::Xz,
940                    Some("bz2") => CompressFile::Bz2,
941                    Some("zst") => CompressFile::Zstd,
942                    Some("lzma") => CompressFile::Lzma,
943                    Some("lz4") => CompressFile::Lz4,
944                    _ => CompressFile::Nothing,
945                }
946            }
947        })
948        .maybe_hash(if let Some(checksum) = checksum {
949            match release.checksum_type_and_list().0 {
950                InReleaseChecksum::Sha256 => Some(Checksum::from_sha256_str(checksum)?),
951                InReleaseChecksum::Sha512 => Some(Checksum::from_sha512_str(checksum)?),
952                InReleaseChecksum::Md5 => Some(Checksum::from_md5_str(checksum)?),
953            }
954        } else {
955            None
956        })
957        .build();
958
959    tasks.push(task);
960
961    Ok(())
962}