Skip to main content

asuran_chunker/
lib.rs

1//! API for describing types that can slice data into component slices in a repeatable manner
2#![warn(clippy::all)]
3#![warn(clippy::pedantic)]
4#![allow(clippy::must_use_candidate)]
5#![allow(clippy::module_name_repetitions)]
6#![allow(clippy::similar_names)]
7#![allow(clippy::pub_enum_variant_names)]
8#![allow(clippy::missing_errors_doc)]
9
10pub mod buzhash;
11pub mod fastcdc;
12pub mod static_size;
13
14pub use self::buzhash::*;
15pub use self::fastcdc::*;
16pub use self::static_size::*;
17
18use thiserror::Error;
19
20use std::io;
21
22#[cfg(feature = "streams")]
23use futures::channel::mpsc;
24#[cfg(feature = "streams")]
25use futures::sink::SinkExt;
26#[cfg(feature = "streams")]
27use smol::block_on;
28#[cfg(feature = "streams")]
29use std::thread;
30
31#[derive(Error, Debug)]
32#[non_exhaustive]
33pub enum ChunkerError {
34    #[error("Provider IO error")]
35    IOError(#[from] io::Error),
36    #[error("Internal Chunker Error")]
37    InternalError(String),
38    #[error("Slicer incorrectly applied to empty data")]
39    Empty,
40}
41
42use std::io::{Cursor, Read};
43
44/// Describes something that can slice objects in a defined, repeatable manner
45///
46/// Chunkers must meet the following properties:
47/// 1.) Data must be split into one or more chunks
48/// 2.) Data must be identical to original after a simple reconstruction by concatenation
49/// 3.) The same data and settings must produce the same slices every time
50/// 4.) Chunkers (that have a max size) should not produce any chunks larger than their `max_size`
51/// 5.) Chunkers (that have a min size) should produce, at most, 1 slice smaller than its `min_size`,
52///  and should only do as such when there is not enough data left to produce a min size chunk
53///
54/// For the time being given the lack of existential types, Chunkers use Box<dyn Read + 'static>.
55///
56/// If/when existential types get stabilized in a way that helps, this will be switched to an
57/// existential type, to drop the dynamic dispatch.
58///
59/// Chunkers should, ideally, contain only a small number of settings for the chunking algorithm,
60/// and should there for be cloneable with minimal overhead. Ideally, they should implement copy,
61/// but that is not supplied as a bound to increase the flexibility in implementation
62///
63/// The Send bound on the Read is likely temporary, it is currently required to make the streams
64/// feature work properly.
65pub trait Chunker: Clone {
66    /// The return type of the functions in this trait is an iterator over the chunks of their
67    /// input.
68    ///
69    /// The returned iterator must be owned, hence the 'static bound.
70    type Chunks: Iterator<Item = Result<Vec<u8>, ChunkerError>> + 'static;
71    /// Core function, takes a boxed owned Read and produces an iterator of Vec<u8> over it
72    fn chunk_boxed(&self, read: Box<dyn Read + Send + 'static>) -> Self::Chunks;
73    /// Convenience function that boxes a bare Read for you, and passes it to `chunk_boxed`
74    ///
75    /// This will be the primary source of interaction wth the API for most use cases
76    fn chunk<R: Read + Send + 'static>(&self, read: R) -> Self::Chunks {
77        let boxed: Box<dyn Read + Send + 'static> = Box::new(read);
78        self.chunk_boxed(boxed)
79    }
80    /// Convenience function that boxes an AsRef<[u8]> wrapped in a cursor and passes it to
81    /// `chunk_boxed`. Implementations are encouraged to overwrite when sensible.
82    ///
83    /// This method is provided to ensure API compatibility when implementations are using memory
84    /// mapped io or the like. When chunkers can sensibly override this, they are encouraged to, as
85    /// it would otherwise result in a performance overhead for consumers using memmaped IO.
86    fn chunk_slice<R: AsRef<[u8]> + Send + 'static>(&self, slice: R) -> Self::Chunks {
87        let cursor = Cursor::new(slice);
88        let boxed: Box<dyn Read + Send + 'static> = Box::new(cursor);
89        self.chunk_boxed(boxed)
90    }
91}
92
93/// Asynchronous version of `Chunker`
94///
95/// Only available if the streams feature is enabled.
96///
97/// Works by performing the chunking in an async task, falling through to the implementation in
98/// `Chunker`, and passing the results over an mspc channel
99#[cfg(feature = "streams")]
100pub trait AsyncChunker: Chunker + Send + Sync {
101    /// Async version of `Chunker::chunk_boxed`
102    fn async_chunk_boxed(
103        &self,
104        read: Box<dyn Read + Send + 'static>,
105        queue_depth: usize,
106    ) -> mpsc::Receiver<Result<Vec<u8>, ChunkerError>>;
107    /// Async version of `Chunker::chunk`
108    fn async_chunk<R: Read + Send + 'static>(
109        &self,
110        read: R,
111        queue_depth: usize,
112    ) -> mpsc::Receiver<Result<Vec<u8>, ChunkerError>>;
113    /// Async version of `Chunker::chunk_slice`
114    fn async_chunk_slice<R: AsRef<[u8]> + Send + 'static>(
115        &self,
116        slice: R,
117        queue_depth: usize,
118    ) -> mpsc::Receiver<Result<Vec<u8>, ChunkerError>>;
119}
120
121#[cfg(feature = "streams")]
122impl<T> AsyncChunker for T
123where
124    T: Chunker + Send + Sync,
125    <T as Chunker>::Chunks: Send,
126{
127    fn async_chunk_boxed(
128        &self,
129        read: Box<dyn Read + Send + 'static>,
130        queue_depth: usize,
131    ) -> mpsc::Receiver<Result<Vec<u8>, ChunkerError>> {
132        let (mut input, output) = mpsc::channel(queue_depth);
133        let iter = self.chunk_boxed(read);
134        thread::spawn(move || {
135            for chunk in iter {
136                // If we are here, and sending to the channel fails, we have no sensible way to
137                // recover, as we have lost communication with the outside world
138                block_on(input.send(chunk)).expect("Chunker to communicate with outside world.");
139            }
140        });
141        output
142    }
143    fn async_chunk<R: Read + Send + 'static>(
144        &self,
145        read: R,
146        queue_depth: usize,
147    ) -> mpsc::Receiver<Result<Vec<u8>, ChunkerError>> {
148        let (mut input, output) = mpsc::channel(queue_depth);
149        let iter = self.chunk(read);
150        thread::spawn(move || {
151            for chunk in iter {
152                // If we are here, and sending to the channel fails, we have no sensible way to
153                // recover, as we have lost communication with the outside world
154                block_on(input.send(chunk)).expect("Chunker to communicate with outside world.");
155            }
156        });
157        output
158    }
159    fn async_chunk_slice<R: AsRef<[u8]> + Send + 'static>(
160        &self,
161        slice: R,
162        queue_depth: usize,
163    ) -> mpsc::Receiver<Result<Vec<u8>, ChunkerError>> {
164        let (mut input, output) = mpsc::channel(queue_depth);
165        let iter = self.chunk_slice(slice);
166        thread::spawn(move || {
167            for chunk in iter {
168                // If we are here, and sending to the channel fails, we have no sensible way to
169                // recover, as we have lost communication with the outside world
170                block_on(input.send(chunk)).expect("Chunker to communicate with outside world.");
171            }
172        });
173        output
174    }
175}