async_fetcher/
concatenator.rs

1// Copyright 2021-2022 System76 <info@system76.com>
2// SPDX-License-Identifier: MPL-2.0
3
4use crate::Error;
5
6use async_shutdown::ShutdownManager;
7use futures::{Stream, StreamExt};
8use std::fs::{self, File};
9use std::io::copy;
10use std::{path::Path, sync::Arc};
11
12/// Accepts a stream of future file `parts` and concatenates them into the `dest` file.
13pub async fn concatenator<P: 'static>(
14    mut dest: File,
15    mut parts: P,
16    _path: Arc<Path>,
17    shutdown: ShutdownManager<()>,
18) -> Result<(), Error>
19where
20    P: Stream<Item = Result<(Arc<Path>, File), Error>> + Send + Unpin,
21{
22    let main = async move {
23        let _token = match shutdown.delay_shutdown_token() {
24            Ok(token) => token,
25            Err(_) => return Err(Error::Canceled),
26        };
27
28        let task = async {
29            while let Some(task_result) = parts.next().await {
30                crate::utils::shutdown_check(&shutdown)?;
31
32                let (source, mut source_file) = task_result?;
33                concatenate(&mut dest, source, &mut source_file)?;
34            }
35
36            Ok(())
37        };
38
39        let result = task.await;
40
41        result
42    };
43
44    tokio::task::spawn_blocking(|| futures::executor::block_on(main))
45        .await
46        .unwrap()
47}
48
49/// Concatenates a part into a file.
50fn concatenate(
51    concatenated_file: &mut File,
52    part_path: Arc<Path>,
53    part_file: &mut File,
54) -> Result<(), Error> {
55    copy(part_file, concatenated_file).map_err(Error::Concatenate)?;
56
57    if let Err(why) = fs::remove_file(&*part_path) {
58        error!("failed to remove part file ({:?}): {}", part_path, why);
59    }
60
61    Ok(())
62}