condow_core/
lib.rs

1//! # ConDow
2//!
3//! ## Overview
4//!
5//! ConDow is a CONcurrent DOWnloader which downloads BLOBs
6//! by splitting the download into parts and downloading them
7//! concurrently.
8//!
9//! Some services/technologies/backends can have their download
10//! speed improved, if BLOBs are downloaded concurrently by
11//! "opening multiple connections". An example for this is AWS S3.
12//!
13//! This crate provides the core functionality only. To actually
14//! use it, use one of the implementation crates:
15//!
16//! * [condow_rusoto] for downloading AWS S3 via the rusoto
17//! * [condow_fs] for using async file access via [tokio]
18//!
19//! All that is required to add more "services" is to implement
20//! the [CondowClient] trait.
21//!
22//! ## Usage
23//!
24//! To use condow a client to access remote data is required. In the examples below
25//! [InMemoryClient] is used. Usually this would be some client which really accesses
26//! remote BLOBs.
27//!
28//! The [Condow] struct itself can be used to download BLOBs. It might not be
29//! convenient to pass it around since it has 2 type parameters. Consider the
30//! traits [Downloads] (which has only an associated type) or [DownloadUntyped]
31//! (which is even object safe) to pass around instances of [Condow].
32//!
33//! ```
34//! use condow_core::condow_client::InMemoryClient;
35//! use condow_core::{Condow, config::Config};
36//!
37//! # #[tokio::main]
38//! # async fn main() {
39//! // First we need a client...
40//! let client = InMemoryClient::<String>::new_static(b"a remote BLOB");
41//!
42//! // ... and a configuration for Condow
43//! let config = Config::default();
44//!
45//! let condow = Condow::new(client, config).unwrap();
46//!
47//! assert_eq!(condow.get_size("a location").await.unwrap(), 13);
48//!
49//! // Download the complete BLOB
50//! let blob = condow.blob().at("a location").download_into_vec().await.unwrap();
51//! assert_eq!(blob, b"a remote BLOB");
52//!
53//! // Download part of a BLOB. Any Rust range syntax will work.
54//! let blob = condow.blob().at("a location").range(2..=7).download_into_vec().await.unwrap();
55//! assert_eq!(blob, b"remote");
56//!
57//! let blob = condow.blob().at("a location").range(2..).download_into_vec().await.unwrap();
58//! assert_eq!(blob, b"remote BLOB");
59//!
60//! // get an `AsyncRead` implementation
61//!
62//! use futures::AsyncReadExt;
63//! let mut reader = condow.blob().at("a location").reader().await.unwrap();
64//! let mut buf = Vec::new();
65//! reader.read_to_end(&mut buf).await.unwrap();
66//! assert_eq!(buf, b"a remote BLOB");
67//!
68//! // get an `AsyncRead`+`AsyncSeek` implementation
69//! use futures::AsyncSeekExt;
70//! let mut reader = condow.blob()
71//!     .at("a location")
72//!     .trusted_blob_size(13)
73//!     .random_access_reader()
74//!     .finish().await.unwrap();
75//! let mut buf = Vec::new();
76//! reader.seek(std::io::SeekFrom::Start(2)).await.unwrap();
77//! reader.read_to_end(&mut buf).await.unwrap();
78//! assert_eq!(buf, b"remote BLOB");
79//! # }
80//! ```
81//!
82//! ## Retries
83//!
84//! ConDow supports retries. These can be done on the downloads themselves
85//! as well on the byte streams returned from a client. If an error occurs
86//! while streaming bytes ConDow will try to reconnect with retries and
87//! resume streaming where the previous stream failed.
88//!
89//! Retries can also be attempted on size requests.
90//!
91//! Be aware that some clients might also do retries themselves based on
92//! their underlying implementation. In this case you should disable retries for either the
93//! client or ConDow itself.
94//!
95//! ## Behaviour
96//!
97//! Downloads with a maximum concurrency of 3 are streamed on the same task the download
98//! was initiated on. This means that the returned stream needs to be polled to drive
99//! pulling chunks from the network. Executing the streaming also means that panics
100//! of underlying libraries will pop up on the polling task.
101//!
102//! Downloads with a concurrency greater or equal than 4 are executed on dedicated tasks.
103//! Panics will be detected and the stream will abort with an error.
104//!
105//! With the [EnsureActivePull] config setting all downloads will be executed on dedicated tasks and
106//! panics will be detected.
107//!
108//! All downloads executed on dedicated tasks will pull bytes from the network eagerly
109//! and fill a queue.
110//!
111//! ## Instrumentation
112//!
113//! Instrumentation can be done for each individual download or centralized
114//! for global monitoring. For further information see the [probe] module.
115//!
116//! [condow_rusoto]:https://docs.rs/condow_rusoto
117//! [condow_fs]:https://docs.rs/condow_fs
118//! [InMemoryClient]:condow_client::InMemoryClient
119//! [EnsureActivePull]:config::EnsureActivePull
120use futures::future::BoxFuture;
121
122use errors::CondowError;
123use probe::Probe;
124use streams::{ChunkStream, OrderedChunkStream};
125
126#[macro_use]
127pub(crate) mod helpers;
128mod condow;
129pub mod condow_client;
130pub mod config;
131mod download_range;
132pub mod errors;
133mod machinery;
134pub mod probe;
135pub mod reader;
136mod request;
137mod retry;
138pub mod streams;
139
140pub use condow::Condow;
141pub use download_range::*;
142pub use request::{Request, RequestNoLocation};
143
144#[cfg(test)]
145pub mod test_utils;
146
147/// A common interface for downloading
148///
149/// This trait is not object safe. If you want to use dynamic dispatch you
150/// might consider the trait [DownloadsUntyped] which is object safe
151/// and accepts string slices as a location.
152///  ```
153/// # use std::sync::Arc;
154/// use condow_core::{Condow, Downloads, config::Config};
155/// use condow_core::condow_client::InMemoryClient;
156///
157/// # #[tokio::main]
158/// # async fn main() {
159/// // First we need a client... Let's make the type of the location simply `u32`s
160/// let client = InMemoryClient::<i32>::new_static(b"a remote BLOB");
161///
162/// // ... and a configuration for Condow
163/// let config = Config::default();
164///
165/// let condow = Condow::new(client, config).unwrap();
166///
167/// assert_eq!(Downloads::get_size(&condow, 42).await.unwrap(), 13);
168///
169/// let blob = Downloads::blob(&condow).at(42).download_into_vec().await.unwrap();
170/// assert_eq!(blob, b"a remote BLOB");
171///
172/// // The trait can also be used dynamically:
173///
174/// let downloads_dyn_dispatch: Arc<dyn Downloads<Location = i32>> = Arc::new(condow);
175///
176/// let blob = downloads_dyn_dispatch.blob().at(42).download_into_vec().await.unwrap();
177/// assert_eq!(blob, b"a remote BLOB");
178///
179/// # }
180/// ```
181pub trait Downloads: Send + Sync + 'static {
182    type Location: std::fmt::Debug + std::fmt::Display + Clone + Send + Sync + 'static;
183
184    /// Download a BLOB via the returned request object
185    fn blob(&self) -> RequestNoLocation<Self::Location>;
186
187    /// Get the size of a BLOB at the given location
188    fn get_size(&self, location: Self::Location) -> BoxFuture<'_, Result<u64, CondowError>>;
189}
190
191/// Downloads from a location specified by a &[str].
192///
193/// This trait is object safe
194///
195///  ```
196/// # use std::sync::Arc;
197/// use condow_core::{Condow, DownloadsUntyped, config::Config};
198/// use condow_core::condow_client::InMemoryClient;
199///
200/// # #[tokio::main]
201/// # async fn main() {
202/// // First we need a client... Let's make the type of the location simply `u32`s
203/// let client = InMemoryClient::<u32>::new_static(b"a remote BLOB");
204///
205/// // ... and a configuration for Condow
206/// let config = Config::default();
207///
208/// let condow = Condow::new(client, config).unwrap();
209///
210/// // The trait is object save:
211/// let downloader: Arc<dyn DownloadsUntyped> = Arc::new(condow);
212///
213/// // "42" parses as `u32`
214/// assert_eq!(downloader.get_size("42".to_string()).await.unwrap(), 13);
215///
216/// // "x" does not
217/// assert!(downloader.get_size("x".to_string()).await.is_err());
218///
219/// let blob = downloader.blob().at("42".to_string()).download_into_vec().await.unwrap();
220/// assert_eq!(blob, b"a remote BLOB");
221/// # }
222/// ```
223pub trait DownloadsUntyped: Send + Sync + 'static {
224    /// Download a BLOB via the returned request object
225    fn blob(&self) -> RequestNoLocation<String>;
226
227    /// Get the size of a BLOB at the given location
228    ///
229    /// A location which can not be parsed causes method to fail.
230    fn get_size(&self, location: String) -> BoxFuture<'_, Result<u64, CondowError>>;
231}
232
233#[cfg(test)]
234mod trait_tests {
235    use std::sync::Arc;
236
237    use futures::future::BoxFuture;
238
239    use crate::{
240        condow_client::InMemoryClient, config::Config, errors::CondowError, Condow, Downloads,
241        DownloadsUntyped, RequestNoLocation,
242    };
243
244    #[test]
245    fn downloads_untyped_is_object_safe_must_compile() {
246        struct Foo;
247
248        impl DownloadsUntyped for Foo {
249            fn blob(&self) -> RequestNoLocation<String> {
250                todo!()
251            }
252
253            fn get_size(&self, _location: String) -> BoxFuture<'_, Result<u64, CondowError>> {
254                todo!()
255            }
256        }
257
258        let _downloads_untyped: Box<dyn DownloadsUntyped> = Box::new(Foo);
259    }
260
261    #[test]
262    fn downloads_typed_is_object_safe_must_compile() {
263        struct Foo;
264
265        impl Downloads for Foo {
266            type Location = i32;
267
268            fn blob(&self) -> RequestNoLocation<i32> {
269                todo!()
270            }
271
272            fn get_size(&self, _location: i32) -> BoxFuture<'_, Result<u64, CondowError>> {
273                todo!()
274            }
275        }
276
277        let _downloads_untyped: Box<dyn Downloads<Location = i32>> = Box::new(Foo);
278    }
279
280    #[tokio::test]
281    async fn typed_downloader_is_usable_for_download() {
282        let client = InMemoryClient::<u32>::new_static(b"a remote BLOB");
283        let config = Config::default();
284        let condow = Condow::new(client, config).unwrap();
285        let downloader: Arc<dyn Downloads<Location = u32>> = Arc::new(condow);
286        assert_eq!(downloader.get_size(42).await.unwrap(), 13);
287        let blob = downloader
288            .blob()
289            .at(42u32)
290            .download_into_vec()
291            .await
292            .unwrap();
293        assert_eq!(blob, b"a remote BLOB");
294    }
295
296    #[tokio::test]
297    async fn typed_downloader_is_usable_for_reader() {
298        let client = InMemoryClient::<u32>::new_static(b"a remote BLOB");
299        let config = Config::default();
300        let condow = Condow::new(client, config).unwrap();
301        let downloader: Arc<dyn Downloads<Location = u32>> = Arc::new(condow);
302        assert_eq!(downloader.get_size(42).await.unwrap(), 13);
303        let _reader = downloader
304            .blob()
305            .at(42u32)
306            .random_access_reader()
307            .finish()
308            .await
309            .unwrap();
310    }
311}