async_fetcher/
concatenator.rs1use crate::Error;
5
6use async_shutdown::ShutdownManager;
7use futures::{Stream, StreamExt};
8use std::fs::{self, File};
9use std::io::copy;
10use std::{path::Path, sync::Arc};
11
12pub async fn concatenator<P: 'static>(
14 mut dest: File,
15 mut parts: P,
16 _path: Arc<Path>,
17 shutdown: ShutdownManager<()>,
18) -> Result<(), Error>
19where
20 P: Stream<Item = Result<(Arc<Path>, File), Error>> + Send + Unpin,
21{
22 let main = async move {
23 let _token = match shutdown.delay_shutdown_token() {
24 Ok(token) => token,
25 Err(_) => return Err(Error::Canceled),
26 };
27
28 let task = async {
29 while let Some(task_result) = parts.next().await {
30 crate::utils::shutdown_check(&shutdown)?;
31
32 let (source, mut source_file) = task_result?;
33 concatenate(&mut dest, source, &mut source_file)?;
34 }
35
36 Ok(())
37 };
38
39 let result = task.await;
40
41 result
42 };
43
44 tokio::task::spawn_blocking(|| futures::executor::block_on(main))
45 .await
46 .unwrap()
47}
48
49fn concatenate(
51 concatenated_file: &mut File,
52 part_path: Arc<Path>,
53 part_file: &mut File,
54) -> Result<(), Error> {
55 copy(part_file, concatenated_file).map_err(Error::Concatenate)?;
56
57 if let Err(why) = fs::remove_file(&*part_path) {
58 error!("failed to remove part file ({:?}): {}", part_path, why);
59 }
60
61 Ok(())
62}