use cid::Cid;
use crate::pb::{FlatUnixFs, PBLink, UnixFs, UnixFsType};
use quick_protobuf::{MessageWrite, Writer};
use std::borrow::Cow;
use std::fmt;
use sha2::{Digest, Sha256};
#[derive(Default)]
pub struct FileAdder {
chunker: Chunker,
collector: Collector,
block_buffer: Vec<u8>,
unflushed_links: Vec<Link>,
}
impl fmt::Debug for FileAdder {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
fmt,
"FileAdder {{ chunker: {:?}, block_buffer: {}/{}, unflushed_links: {} }}",
self.chunker,
self.block_buffer.len(),
self.block_buffer.capacity(),
LinkFormatter(&self.unflushed_links),
)
}
}
struct LinkFormatter<'a>(&'a [Link]);
impl fmt::Display for LinkFormatter<'_> {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
let mut iter = self.0.iter().peekable();
write!(fmt, "[")?;
let mut current = match iter.peek() {
Some(Link { depth, .. }) => depth,
None => return write!(fmt, "]"),
};
let mut count = 0;
for Link {
depth: next_depth, ..
} in iter
{
if current == next_depth {
count += 1;
} else {
write!(fmt, "{}: {}/", current, count)?;
let steps_between = if current > next_depth {
current - next_depth
} else {
next_depth - current
};
for _ in 0..steps_between - 1 {
write!(fmt, "0/")?;
}
count = 1;
current = next_depth;
}
}
write!(fmt, "{}: {}]", current, count)
}
}
struct Link {
depth: usize,
target: Cid,
total_size: u64,
file_size: u64,
}
impl fmt::Debug for Link {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("Link")
.field("depth", &self.depth)
.field("target", &format_args!("{}", self.target))
.field("total_size", &self.total_size)
.field("file_size", &self.file_size)
.finish()
}
}
#[derive(Default)]
pub struct FileAdderBuilder {
chunker: Chunker,
collector: Collector,
}
impl FileAdderBuilder {
pub fn with_chunker(self, chunker: Chunker) -> Self {
FileAdderBuilder { chunker, ..self }
}
pub fn with_collector(self, collector: impl Into<Collector>) -> Self {
FileAdderBuilder {
collector: collector.into(),
..self
}
}
pub fn build(self) -> FileAdder {
let FileAdderBuilder { chunker, collector } = self;
FileAdder {
chunker,
collector,
..Default::default()
}
}
}
impl FileAdder {
pub fn builder() -> FileAdderBuilder {
FileAdderBuilder::default()
}
pub fn size_hint(&self) -> usize {
self.chunker.size_hint()
}
pub fn push(&mut self, input: &[u8]) -> (impl Iterator<Item = (Cid, Vec<u8>)>, usize) {
let (accepted, ready) = self.chunker.accept(input, &self.block_buffer);
if self.block_buffer.is_empty() && ready {
let leaf = Self::flush_buffered_leaf(accepted, &mut self.unflushed_links, false);
assert!(leaf.is_some(), "chunk completed, must produce a new block");
self.block_buffer.clear();
let links = self.flush_buffered_links(false);
(leaf.into_iter().chain(links.into_iter()), accepted.len())
} else {
if self.block_buffer.capacity() == 0 {
self.block_buffer.reserve(self.size_hint());
}
self.block_buffer.extend_from_slice(accepted);
let written = accepted.len();
let (leaf, links) = if !ready {
(None, Vec::new())
} else {
let leaf = Self::flush_buffered_leaf(
self.block_buffer.as_slice(),
&mut self.unflushed_links,
false,
);
assert!(leaf.is_some(), "chunk completed, must produce a new block");
self.block_buffer.clear();
let links = self.flush_buffered_links(false);
(leaf, links)
};
(leaf.into_iter().chain(links.into_iter()), written)
}
}
pub fn finish(mut self) -> impl Iterator<Item = (Cid, Vec<u8>)> {
let last_leaf = Self::flush_buffered_leaf(
&self.block_buffer.as_slice(),
&mut self.unflushed_links,
true,
);
let root_links = self.flush_buffered_links(true);
last_leaf.into_iter().chain(root_links.into_iter())
}
fn flush_buffered_leaf(
input: &[u8],
unflushed_links: &mut Vec<Link>,
finishing: bool,
) -> Option<(Cid, Vec<u8>)> {
if input.is_empty() && (!finishing || !unflushed_links.is_empty()) {
return None;
}
let data = if !input.is_empty() {
Some(Cow::Borrowed(input))
} else {
None
};
let filesize = Some(input.len() as u64);
let inner = FlatUnixFs {
links: Vec::new(),
data: UnixFs {
Type: UnixFsType::File,
Data: data,
filesize,
..Default::default()
},
};
let (cid, vec) = render_and_hash(&inner);
let total_size = vec.len();
let link = Link {
depth: 0,
target: cid.clone(),
total_size: total_size as u64,
file_size: input.len() as u64,
};
unflushed_links.push(link);
Some((cid, vec))
}
fn flush_buffered_links(&mut self, finishing: bool) -> Vec<(Cid, Vec<u8>)> {
self.collector
.flush_links(&mut self.unflushed_links, finishing)
}
#[cfg(test)]
fn collect_blocks(mut self, all_content: &[u8], mut amt: usize) -> Vec<(Cid, Vec<u8>)> {
let mut written = 0;
let mut blocks_received = Vec::new();
if amt == 0 {
amt = all_content.len();
}
while written < all_content.len() {
let end = written + (all_content.len() - written).min(amt);
let slice = &all_content[written..end];
let (blocks, pushed) = self.push(slice);
blocks_received.extend(blocks);
written += pushed;
}
let last_blocks = self.finish();
blocks_received.extend(last_blocks);
blocks_received
}
}
fn render_and_hash(flat: &FlatUnixFs<'_>) -> (Cid, Vec<u8>) {
let mut out = Vec::with_capacity(flat.get_size());
let mut writer = Writer::new(&mut out);
flat.write_message(&mut writer)
.expect("unsure how this could fail");
let mh = multihash::wrap(multihash::Code::Sha2_256, &Sha256::digest(&out));
let cid = Cid::new_v0(mh).expect("sha2_256 is the correct multihash for cidv0");
(cid, out)
}
#[derive(Debug, Clone)]
pub enum Chunker {
Size(usize),
}
impl std::default::Default for Chunker {
fn default() -> Self {
Chunker::Size(256 * 1024)
}
}
impl Chunker {
fn accept<'a>(&mut self, input: &'a [u8], buffered: &[u8]) -> (&'a [u8], bool) {
use Chunker::*;
match self {
Size(max) => {
let l = input.len().min(*max - buffered.len());
let accepted = &input[..l];
let ready = buffered.len() + l >= *max;
(accepted, ready)
}
}
}
fn size_hint(&self) -> usize {
use Chunker::*;
match self {
Size(max) => *max,
}
}
}
#[derive(Debug, Clone)]
pub enum Collector {
Balanced(BalancedCollector),
}
impl std::default::Default for Collector {
fn default() -> Self {
Collector::Balanced(Default::default())
}
}
impl Collector {
fn flush_links(&mut self, pending: &mut Vec<Link>, finishing: bool) -> Vec<(Cid, Vec<u8>)> {
use Collector::*;
match self {
Balanced(bc) => bc.flush_links(pending, finishing),
}
}
}
#[derive(Clone)]
pub struct BalancedCollector {
branching_factor: usize,
reused_links: Vec<PBLink<'static>>,
reused_blocksizes: Vec<u64>,
}
impl fmt::Debug for BalancedCollector {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
fmt,
"BalancedCollector {{ branching_factor: {} }}",
self.branching_factor
)
}
}
impl std::default::Default for BalancedCollector {
fn default() -> Self {
Self::with_branching_factor(174)
}
}
impl From<BalancedCollector> for Collector {
fn from(b: BalancedCollector) -> Self {
Collector::Balanced(b)
}
}
impl BalancedCollector {
pub fn with_branching_factor(branching_factor: usize) -> Self {
assert!(branching_factor > 0);
Self {
branching_factor,
reused_links: Vec::new(),
reused_blocksizes: Vec::new(),
}
}
fn flush_links(&mut self, pending: &mut Vec<Link>, finishing: bool) -> Vec<(Cid, Vec<u8>)> {
let mut ret = Vec::new();
let mut reused_links = std::mem::take(&mut self.reused_links);
let mut reused_blocksizes = std::mem::take(&mut self.reused_blocksizes);
if let Some(need) = self.branching_factor.checked_sub(reused_links.capacity()) {
reused_links.reserve(need);
}
if let Some(need) = self
.branching_factor
.checked_sub(reused_blocksizes.capacity())
{
reused_blocksizes.reserve(need);
}
'outer: for level in 0.. {
if pending.len() == 1 && finishing
|| pending.len() <= self.branching_factor && !finishing
{
break;
}
let mut starting_point = 0;
let mut last_overwrite = None;
while let Some(mut first_at) = &pending[starting_point..]
.iter()
.position(|Link { depth, .. }| depth == &level)
{
first_at += starting_point;
if !finishing && pending[first_at..].len() <= self.branching_factor {
if let Some(last_overwrite) = last_overwrite {
pending.drain((last_overwrite + 1)..first_at);
}
break 'outer;
}
reused_links.clear();
reused_blocksizes.clear();
let mut nested_size = 0;
let mut nested_total_size = 0;
let last = (first_at + self.branching_factor).min(pending.len());
for (index, link) in pending[first_at..last].iter().enumerate() {
assert_eq!(
link.depth,
level,
"unexpected link depth {} when searching at level {} index {}",
link.depth,
level,
index + first_at
);
Self::partition_link(
link,
&mut reused_links,
&mut reused_blocksizes,
&mut nested_size,
&mut nested_total_size,
);
}
debug_assert_eq!(reused_links.len(), reused_blocksizes.len());
let inner = FlatUnixFs {
links: reused_links,
data: UnixFs {
Type: UnixFsType::File,
filesize: Some(nested_size),
blocksizes: reused_blocksizes,
..Default::default()
},
};
let (cid, vec) = render_and_hash(&inner);
let index = last_overwrite.map(|i| i + 1).unwrap_or(first_at);
pending[index] = Link {
depth: level + 1,
target: cid.clone(),
total_size: nested_total_size + vec.len() as u64,
file_size: nested_size,
};
ret.push((cid, vec));
reused_links = inner.links;
reused_blocksizes = inner.data.blocksizes;
starting_point = last;
last_overwrite = Some(index);
}
if let Some(last_overwrite) = last_overwrite {
pending.truncate(last_overwrite + 1);
}
debug_assert_eq!(
pending.iter().position(|l| l.depth == level),
None,
"should have no more of depth {}: {}",
level,
LinkFormatter(pending.as_slice())
);
}
self.reused_links = reused_links;
self.reused_blocksizes = reused_blocksizes;
ret
}
fn partition_link(
link: &Link,
links: &mut Vec<PBLink<'static>>,
blocksizes: &mut Vec<u64>,
nested_size: &mut u64,
nested_total_size: &mut u64,
) {
links.push(PBLink {
Hash: Some(link.target.to_bytes().into()),
Name: Some("".into()),
Tsize: Some(link.total_size),
});
blocksizes.push(link.file_size);
*nested_size += link.file_size;
*nested_total_size += link.total_size;
}
}
#[cfg(test)]
mod tests {
use super::{BalancedCollector, Chunker, FileAdder};
use crate::test_support::FakeBlockstore;
use cid::Cid;
use hex_literal::hex;
use std::convert::TryFrom;
#[test]
fn test_size_chunker() {
assert_eq!(size_chunker_scenario(1, 4, 0), (1, true));
assert_eq!(size_chunker_scenario(2, 4, 0), (2, true));
assert_eq!(size_chunker_scenario(2, 1, 0), (1, false));
assert_eq!(size_chunker_scenario(2, 1, 1), (1, true));
assert_eq!(size_chunker_scenario(32, 3, 29), (3, true));
assert_eq!(size_chunker_scenario(32, 4, 29), (3, true));
}
fn size_chunker_scenario(max: usize, input_len: usize, existing_len: usize) -> (usize, bool) {
let input = vec![0; input_len];
let existing = vec![0; existing_len];
let (accepted, ready) = Chunker::Size(max).accept(&input, &existing);
(accepted.len(), ready)
}
#[test]
fn favourite_single_block_file() {
let blocks = FakeBlockstore::with_fixtures();
let content = b"foobar\n";
let mut adder = FileAdder::default();
{
let (mut ready_blocks, bytes) = adder.push(content);
assert!(ready_blocks.next().is_none());
assert_eq!(bytes, content.len());
}
let (_, file_block) = adder
.finish()
.next()
.expect("there must have been the root block");
assert_eq!(
blocks.get_by_str("QmRgutAxd8t7oGkSm4wmeuByG6M51wcTso6cubDdQtuEfL"),
file_block.as_slice()
);
}
#[test]
fn favourite_multi_block_file() {
let blocks = FakeBlockstore::with_fixtures();
let content = b"foobar\n";
let adder = FileAdder::builder().with_chunker(Chunker::Size(2)).build();
let blocks_received = adder.collect_blocks(content, 0);
let expected = [
"QmfVyMoStzTvdnUR7Uotzh82gmL427q9z3xW5Y8fUoszi4",
"QmdPyW4CWE3QBkgjWfjM5f7Tjb3HukxVuBXZtkqAGwsMnm",
"QmNhDQpphvMWhdCzP74taRzXDaEfPGq8vWfFRzD7mEgePM",
"Qmc5m94Gu7z62RC8waSKkZUrCCBJPyHbkpmGzEePxy2oXJ",
"QmRJHYTNvC3hmd9gJQARxLR1QMEincccBV53bBw524yyq6",
]
.iter()
.map(|key| {
let cid = Cid::try_from(*key).unwrap();
let block = blocks.get_by_str(key).to_vec();
(cid, block)
})
.collect::<Vec<_>>();
assert_eq!(blocks_received, expected);
}
#[test]
fn three_layers() {
let content = b"Lorem ipsum dolor sit amet, sit enim montes aliquam. Cras non lorem, \
rhoncus condimentum, irure et ante. Pulvinar suscipit odio ante, et tellus a enim, \
wisi ipsum, vel rhoncus eget faucibus varius, luctus turpis nibh vel odio nulla pede.";
assert!(content.len() > 174 && content.len() < 2 * 174);
let adder = FileAdder::builder().with_chunker(Chunker::Size(1)).build();
let blocks_received = adder.collect_blocks(content, 0);
assert_eq!(blocks_received.len(), 240);
assert_eq!(
blocks_received.last().unwrap().0.to_string(),
"QmRQ6NZNUs4JrCT2y7tmCC1wUhjqYuTssB8VXbbN3rMffg"
);
}
#[test]
fn three_layers_all_subchunks() {
let content = b"Lorem ipsum dolor sit amet, sit enim montes aliquam. Cras non lorem, \
rhoncus condimentum, irure et ante. Pulvinar suscipit odio ante, et tellus a enim, \
wisi ipsum, vel rhoncus eget faucibus varius, luctus turpis nibh vel odio nulla pede.";
for amt in 1..32 {
let adder = FileAdder::builder().with_chunker(Chunker::Size(32)).build();
let blocks_received = adder.collect_blocks(content, amt);
assert_eq!(
blocks_received.last().unwrap().0.to_string(),
"QmYSLcVQqxKygiq7x9w1XGYxU29EShB8ZemiaQ8GAAw17h",
"amt: {}",
amt
);
}
}
#[test]
fn empty_file() {
let blocks = FileAdder::default().collect_blocks(b"", 0);
assert_eq!(blocks.len(), 1);
assert_eq!(blocks[0].1.as_slice(), &hex!("0a 04 08 02 18 00"));
assert_eq!(
blocks[0].0.to_string(),
"QmbFMke1KXqnYyBBWxB74N4c5SBnJMVAiMNRcGu6x1AwQH"
);
}
#[test]
fn full_link_block_and_a_byte() {
let buf = vec![0u8; 2];
let branching_factor = 174;
let mut adder = FileAdder::builder()
.with_chunker(Chunker::Size(2))
.with_collector(BalancedCollector::with_branching_factor(branching_factor))
.build();
let mut blocks_count = 0;
for _ in 0..branching_factor {
let (blocks, written) = adder.push(buf.as_slice());
assert_eq!(written, buf.len());
blocks_count += blocks.count();
}
let (blocks, written) = adder.push(&buf[0..1]);
assert_eq!(written, 1);
blocks_count += blocks.count();
let last_blocks = adder.finish().collect::<Vec<_>>();
blocks_count += last_blocks.len();
assert_eq!(blocks_count, branching_factor + 1 + 1 + 1 + 1);
assert_eq!(
last_blocks.last().unwrap().0.to_string(),
"QmcHNWF1d56uCDSfJPA7t9fadZRV9we5HGSTGSmwuqmMP9"
);
}
#[test]
fn full_link_block() {
let buf = vec![0u8; 1];
let branching_factor = 174;
let mut adder = FileAdder::builder()
.with_chunker(Chunker::Size(1))
.with_collector(BalancedCollector::with_branching_factor(branching_factor))
.build();
let mut blocks_count = 0;
for _ in 0..branching_factor {
let (blocks, written) = adder.push(buf.as_slice());
assert_eq!(written, buf.len());
blocks_count += blocks.count();
}
let mut last_blocks = adder.finish();
let last_block = last_blocks.next().expect("must not have flushed yet");
blocks_count += 1;
assert_eq!(last_blocks.next(), None);
assert_eq!(
last_block.0.to_string(),
"QmdgQac8c6Bo3MP5bHAg2yQ25KebFUsmkZFvyByYzf8UCB"
);
assert_eq!(blocks_count, 175);
}
}