1use futures::StreamExt;
2use graph_error::io_error::{AsyncIoError, ThreadedIoError};
3use std::{
4 fs,
5 path::{Path, PathBuf},
6 sync::mpsc,
7 thread,
8};
9use tokio::io::AsyncWriteExt;
10
11pub fn create_dir<P: AsRef<Path>>(directory: P) -> Result<(), std::io::Error> {
12 if !directory.as_ref().exists() {
13 fs::create_dir_all(&directory)?;
14 }
15 Ok(())
16}
17
18pub async fn create_dir_async<P: AsRef<Path>>(directory: P) -> Result<(), std::io::Error> {
19 if !directory.as_ref().exists() {
20 tokio::fs::create_dir_all(directory).await?;
21 }
22 Ok(())
23}
24
25pub fn copy(
26 path: PathBuf,
27 mut response: reqwest::blocking::Response,
28) -> Result<PathBuf, ThreadedIoError> {
29 let (sender, receiver) = mpsc::channel();
30 let handle = thread::spawn::<_, Result<(), ThreadedIoError>>(move || {
31 let mut file_writer = fs::OpenOptions::new()
32 .create(true)
33 .write(true)
34 .read(true)
35 .open(&path)?;
36 std::io::copy(&mut response, &mut file_writer)?;
37 sender.send(Some(path))?;
38 Ok(())
39 });
40
41 handle.join().map_err(ThreadedIoError::Join)??;
42 receiver.recv()?.ok_or(ThreadedIoError::NoPath)
43}
44
45pub async fn copy_async(
46 path: PathBuf,
47 response: reqwest::Response,
48) -> Result<PathBuf, AsyncIoError> {
49 let mut file = tokio::fs::OpenOptions::new()
50 .create(true)
51 .write(true)
52 .read(true)
53 .open(&path)
54 .await?;
55 let mut stream = response.bytes_stream();
56 while let Some(item) = stream.next().await {
57 file.write_all(&item?).await?;
58 }
59 Ok(path)
60}