1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
//! API for describing types that can slice data into component slices in a repeatable manner
#![warn(clippy::all)]
#![warn(clippy::pedantic)]
#![allow(clippy::must_use_candidate)]
#![allow(clippy::module_name_repetitions)]
#![allow(clippy::similar_names)]
#![allow(clippy::pub_enum_variant_names)]
#![allow(clippy::missing_errors_doc)]

pub mod buzhash;
pub mod fastcdc;
pub mod static_size;
pub use self::buzhash::*;
pub use self::fastcdc::*;
pub use self::static_size::*;

use std::io;
use thiserror::Error;

#[cfg(feature = "streams")]
use futures::channel::mpsc;
#[cfg(feature = "streams")]
use futures::sink::SinkExt;
#[cfg(feature = "streams")]
use tokio::task;

#[derive(Error, Debug)]
pub enum ChunkerError {
    #[error("Provider IO error")]
    IOError(#[from] io::Error),
    #[error("Internal Chunker Error")]
    InternalError(String),
    #[error("Slicer incorrectly applied to empty data")]
    Empty,
}

use std::io::{Cursor, Read};

/// Describes something that can slice objects in a defined, repeatable manner
///
/// Chunkers must meet the following properties:
/// 1.) Data must be split into one or more chunks
/// 2.) Data must be identical to original after a simple reconstruction by concatenation
/// 3.) The same data and settings must produce the same slices every time
/// 4.) Chunkers (that have a max size) should not produce any chunks larger than their `max_size`
/// 5.) Chunkers (that have a min size) should produce, at most, 1 slice smaller than its `min_size`,
///  and should only do as such when there is not enough data left to produce a min size chunk
///
/// For the time being given the lack of existential types, Chunkers use Box<dyn Read + 'static>.
///
/// If/when existential types get stabilized in a way that helps, this will be switched to an
/// existential type, to drop the dynamic dispatch.
///
/// Chunkers should, ideally, contain only a small number of settings for the chunking algorithm,
/// and should there for be cloneable with minimal overhead. Ideally, they should implement copy,
/// but that is not supplied as a bound to increase the flexibility in implementation
///
/// The Send bound on the Read is likely temporary, it is currently required to make the streams
/// feature work properly.
pub trait Chunker: Clone {
    /// The return type of the functions in this trait is an iterator over the chunks of their
    /// input.
    ///
    /// The returned iterator must be owned, hence the 'static bound.
    type Chunks: Iterator<Item = Result<Vec<u8>, ChunkerError>> + 'static;
    /// Core function, takes a boxed owned Read and produces an iterator of Vec<u8> over it
    fn chunk_boxed(&self, read: Box<dyn Read + Send + 'static>) -> Self::Chunks;
    /// Convenience function that boxes a bare Read for you, and passes it to `chunk_boxed`
    ///
    /// This will be the primary source of interaction wth the API for most use cases
    fn chunk<R: Read + Send + 'static>(&self, read: R) -> Self::Chunks {
        let boxed: Box<dyn Read + Send + 'static> = Box::new(read);
        self.chunk_boxed(boxed)
    }
    /// Convenience function that boxes an AsRef<[u8]> wrapped in a cursor and passes it to
    /// `chunk_boxed`. Implementations are encouraged to overwrite when sensible.
    ///
    /// This method is provided to ensure API compatibility when implementations are using memory
    /// mapped io or the like. When chunkers can sensibly override this, they are encouraged to, as
    /// it would otherwise result in a performance overhead for consumers using memmaped IO.
    fn chunk_slice<R: AsRef<[u8]> + Send + 'static>(&self, slice: R) -> Self::Chunks {
        let cursor = Cursor::new(slice);
        let boxed: Box<dyn Read + Send + 'static> = Box::new(cursor);
        self.chunk_boxed(boxed)
    }
}

/// Asynchronous version of `Chunker`
///
/// Only available if the streams feature is enabled.
///
/// Works by performing the chunking in an async task, falling through to the implementation in
/// `Chunker`, and passing the results over an mspc channel
#[cfg(feature = "streams")]
pub trait AsyncChunker: Chunker + Send + Sync {
    /// Async version of `Chunker::chunk_boxed`
    fn async_chunk_boxed(
        &self,
        read: Box<dyn Read + Send + 'static>,
    ) -> mpsc::Receiver<Result<Vec<u8>, ChunkerError>>;
    /// Async version of `Chunker::chunk`
    fn async_chunk<R: Read + Send + 'static>(
        &self,
        read: R,
    ) -> mpsc::Receiver<Result<Vec<u8>, ChunkerError>>;
    /// Async version of `Chunker::chunk_slice`
    fn async_chunk_slice<R: AsRef<[u8]> + Send + 'static>(
        &self,
        slice: R,
    ) -> mpsc::Receiver<Result<Vec<u8>, ChunkerError>>;
}

#[cfg(feature = "streams")]
impl<T> AsyncChunker for T
where
    T: Chunker + Send + Sync,
    <T as Chunker>::Chunks: Send,
{
    fn async_chunk_boxed(
        &self,
        read: Box<dyn Read + Send + 'static>,
    ) -> mpsc::Receiver<Result<Vec<u8>, ChunkerError>> {
        let (mut input, output) = mpsc::channel(100);
        let mut iter = self.chunk_boxed(read);
        task::spawn(async move {
            while let Some(chunk) = task::block_in_place(|| iter.next()) {
                input.send(chunk).await.unwrap();
            }
        });
        output
    }
    fn async_chunk<R: Read + Send + 'static>(
        &self,
        read: R,
    ) -> mpsc::Receiver<Result<Vec<u8>, ChunkerError>> {
        let (mut input, output) = mpsc::channel(100);
        let mut iter = self.chunk(read);
        task::spawn(async move {
            while let Some(chunk) = task::block_in_place(|| iter.next()) {
                input.send(chunk).await.unwrap();
            }
        });
        output
    }
    fn async_chunk_slice<R: AsRef<[u8]> + Send + 'static>(
        &self,
        slice: R,
    ) -> mpsc::Receiver<Result<Vec<u8>, ChunkerError>> {
        let (mut input, output) = mpsc::channel(100);
        let mut iter = self.chunk_slice(slice);
        task::spawn(async move {
            while let Some(chunk) = task::block_in_place(|| iter.next()) {
                input.send(chunk).await.unwrap();
            }
        });
        output
    }
}