graph_http/
io_tools.rs

1use futures::StreamExt;
2use graph_error::io_error::{AsyncIoError, ThreadedIoError};
3use std::{
4    fs,
5    path::{Path, PathBuf},
6    sync::mpsc,
7    thread,
8};
9use tokio::io::AsyncWriteExt;
10
11pub fn create_dir<P: AsRef<Path>>(directory: P) -> Result<(), std::io::Error> {
12    if !directory.as_ref().exists() {
13        fs::create_dir_all(&directory)?;
14    }
15    Ok(())
16}
17
18pub async fn create_dir_async<P: AsRef<Path>>(directory: P) -> Result<(), std::io::Error> {
19    if !directory.as_ref().exists() {
20        tokio::fs::create_dir_all(directory).await?;
21    }
22    Ok(())
23}
24
25pub fn copy(
26    path: PathBuf,
27    mut response: reqwest::blocking::Response,
28) -> Result<PathBuf, ThreadedIoError> {
29    let (sender, receiver) = mpsc::channel();
30    let handle = thread::spawn::<_, Result<(), ThreadedIoError>>(move || {
31        let mut file_writer = fs::OpenOptions::new()
32            .create(true)
33            .write(true)
34            .read(true)
35            .open(&path)?;
36        std::io::copy(&mut response, &mut file_writer)?;
37        sender.send(Some(path))?;
38        Ok(())
39    });
40
41    handle.join().map_err(ThreadedIoError::Join)??;
42    receiver.recv()?.ok_or(ThreadedIoError::NoPath)
43}
44
45pub async fn copy_async(
46    path: PathBuf,
47    response: reqwest::Response,
48) -> Result<PathBuf, AsyncIoError> {
49    let mut file = tokio::fs::OpenOptions::new()
50        .create(true)
51        .write(true)
52        .read(true)
53        .open(&path)
54        .await?;
55    let mut stream = response.bytes_stream();
56    while let Some(item) = stream.next().await {
57        file.write_all(&item?).await?;
58    }
59    Ok(path)
60}