condow_fs/
lib.rs

1//! # CONcurrent DOWnloads for local files
2//!
3//! Load parts of files concurrently.
4//!
5//! This is mostly for testing and experimenting.
6//! In most cases it is better to load sequentially from disks.
7//!
8//! ```rust, noexec
9//!
10//! use condow_fs::*;
11//! use condow_fs::config::Config;
12//!
13//! # async {
14//! let condow = FsClient::condow(Config::default()).unwrap();
15//!
16//! let stream = condow.blob().at(String::from("my_file")).range(23..46).download().await.unwrap();
17//! let downloaded_bytes: Vec<u8> = stream.into_vec().await.unwrap();
18//! # };
19//! # ()
20//! ```
21
22use std::io::SeekFrom;
23
24use anyhow::Error as AnyError;
25use bytes::Bytes;
26use condow_core::config::Config;
27use futures::future::BoxFuture;
28use tokio::fs;
29use tokio::io::{AsyncReadExt, AsyncSeekExt};
30
31use condow_core::{condow_client::CondowClient, errors::CondowError, streams::BytesStream};
32
33pub use condow_core::*;
34
35#[derive(Clone)]
36pub struct FsClient;
37
38impl FsClient {
39    /// Create a concurrent downloader from this adapter and the given [Config]
40    pub fn condow(config: Config) -> Result<Condow<Self>, AnyError> {
41        Condow::new(FsClient, config)
42    }
43}
44
45impl CondowClient for FsClient {
46    type Location = String;
47
48    fn get_size(&self, location: Self::Location) -> BoxFuture<'static, Result<u64, CondowError>> {
49        let f = async move {
50            let file = fs::File::open(location.as_str()).await?;
51            let len = file.metadata().await?.len();
52
53            Ok(len)
54        };
55
56        Box::pin(f)
57    }
58
59    fn download(
60        &self,
61        location: Self::Location,
62        range: InclusiveRange,
63    ) -> BoxFuture<'static, Result<BytesStream, CondowError>> {
64        let f = async move {
65            let bytes = {
66                let mut file = fs::File::open(location.as_str()).await?;
67                file.seek(SeekFrom::Start(range.start())).await?;
68
69                let n_bytes_to_read = range.len();
70
71                if n_bytes_to_read > usize::MAX as u64 {
72                    return Err(CondowError::new_other(
73                        "usize overflow while casting from u64",
74                    ));
75                }
76
77                let mut buffer = vec![0; n_bytes_to_read as usize];
78
79                let n_bytes_read = file.read_exact(&mut buffer).await?;
80
81                if n_bytes_read as u64 != n_bytes_to_read {
82                    return Err(CondowError::new_io(format!(
83                        "not enough bytes read (expected {} got {})",
84                        n_bytes_to_read, n_bytes_read
85                    )));
86                }
87
88                buffer
89            };
90
91            let bytes = Bytes::from(bytes);
92
93            Ok(BytesStream::once_ok(bytes))
94        };
95
96        Box::pin(f)
97    }
98}