use super::{
AUTHENTICATION_TAG_SIZE, BLOCK_SEGMENT_DSI, HIDING_SEGMENT_DSI, NONCE_SIZE,
PrivateFileContentSerializable, PrivateNode, PrivateNodeContentSerializable, PrivateNodeHeader,
PrivateRef, SnapshotKey, TemporalKey, encrypted::Encrypted, forest::traits::PrivateForest,
};
use crate::{
WNFS_VERSION, error::FsError, is_readable_wnfs_version, traits::Id, utils::OnceCellDebug,
};
use anyhow::{Result, bail};
use async_once_cell::OnceCell;
use async_stream::try_stream;
use chrono::{DateTime, Utc};
use futures::{AsyncRead, Stream, StreamExt, TryStreamExt, future};
use ipld_core::{
ipld::Ipld,
serde::{from_ipld, to_ipld},
};
use rand_core::CryptoRngCore;
use serde::{Deserialize, Serialize};
use std::{cmp::Ordering, collections::BTreeSet, iter};
use wnfs_common::{
BlockStore, CODEC_RAW, Cid, MAX_BLOCK_SIZE, Metadata,
utils::{self, Arc, BoxStream},
};
use wnfs_nameaccumulator::{Name, NameAccumulator, NameSegment};
pub const MAX_BLOCK_CONTENT_SIZE: usize = MAX_BLOCK_SIZE - NONCE_SIZE - AUTHENTICATION_TAG_SIZE;
#[derive(Debug, Clone, PartialEq)]
pub struct PrivateFile {
pub header: PrivateNodeHeader,
pub(crate) content: PrivateFileContent,
}
pub(crate) struct PrivateFileContent {
pub(crate) persisted_as: OnceCell<Cid>,
pub(crate) previous: BTreeSet<(usize, Encrypted<Cid>)>,
pub(crate) metadata: Metadata,
pub(crate) content: FileContent,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
#[allow(clippy::large_enum_variant)]
pub(crate) enum FileContent {
Inline { data: Vec<u8> },
External(PrivateForestContent),
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct PrivateForestContent {
pub(crate) key: SnapshotKey,
pub(crate) base_name: NameAccumulator,
pub(crate) block_count: u64,
pub(crate) block_content_size: u64,
}
#[derive(Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
enum MetadataContentCapsule<T> {
PrivateForestContent(T),
}
impl PrivateFile {
pub fn new(parent_name: &Name, time: DateTime<Utc>, rng: &mut impl CryptoRngCore) -> Self {
Self {
header: PrivateNodeHeader::new(parent_name, rng),
content: PrivateFileContent {
persisted_as: OnceCell::new(),
metadata: Metadata::new(time),
previous: BTreeSet::new(),
content: FileContent::Inline { data: vec![] },
},
}
}
pub fn new_rc(
parent_name: &Name,
time: DateTime<Utc>,
rng: &mut impl CryptoRngCore,
) -> Arc<Self> {
Arc::new(Self::new(parent_name, time, rng))
}
pub async fn with_content(
parent_name: &Name,
time: DateTime<Utc>,
content: Vec<u8>,
forest: &mut impl PrivateForest,
store: &impl BlockStore,
rng: &mut impl CryptoRngCore,
) -> Result<Self> {
let header = PrivateNodeHeader::new(parent_name, rng);
let content = Self::prepare_content(header.get_name(), content, forest, store, rng).await?;
Ok(Self {
header,
content: PrivateFileContent {
persisted_as: OnceCell::new(),
metadata: Metadata::new(time),
previous: BTreeSet::new(),
content,
},
})
}
pub async fn with_content_rc(
parent_name: &Name,
time: DateTime<Utc>,
content: Vec<u8>,
forest: &mut impl PrivateForest,
store: &impl BlockStore,
rng: &mut impl CryptoRngCore,
) -> Result<Arc<Self>> {
Ok(Arc::new(
Self::with_content(parent_name, time, content, forest, store, rng).await?,
))
}
pub async fn with_content_streaming(
parent_name: &Name,
time: DateTime<Utc>,
content: impl AsyncRead + Unpin,
forest: &mut impl PrivateForest,
store: &impl BlockStore,
rng: &mut impl CryptoRngCore,
) -> Result<Self> {
let header = PrivateNodeHeader::new(parent_name, rng);
let content =
Self::prepare_content_streaming(header.get_name(), content, forest, store, rng).await?;
Ok(Self {
header,
content: PrivateFileContent {
persisted_as: OnceCell::new(),
metadata: Metadata::new(time),
previous: BTreeSet::new(),
content,
},
})
}
pub async fn with_content_streaming_rc(
parent_name: &Name,
time: DateTime<Utc>,
content: impl AsyncRead + Unpin,
forest: &mut impl PrivateForest,
store: &impl BlockStore,
rng: &mut impl CryptoRngCore,
) -> Result<Arc<Self>> {
Ok(Arc::new(
Self::with_content_streaming(parent_name, time, content, forest, store, rng).await?,
))
}
pub fn copy_content_from(&mut self, other: &Self, time: DateTime<Utc>) {
self.content.metadata.upsert_mtime(time);
self.content.content = other.content.content.clone();
}
pub fn stream_content<'a>(
&'a self,
block_index: u64,
forest: &'a impl PrivateForest,
store: &'a impl BlockStore,
) -> BoxStream<'a, Result<Vec<u8>>> {
match &self.content.content {
FileContent::Inline { data } => Box::pin(try_stream! {
if block_index != 0 {
Err(FsError::FileShardNotFound)?
}
yield data.clone()
}),
FileContent::External(content) => Box::pin(content.stream(block_index, forest, store)),
}
}
pub async fn read_at<'a>(
&'a self,
byte_offset: u64,
len_limit: Option<usize>,
forest: &'a impl PrivateForest,
store: &'a impl BlockStore,
) -> Result<Vec<u8>> {
match &self.content.content {
FileContent::Inline { data } => {
let offset = byte_offset as usize;
match len_limit {
Some(len) => Ok(data[offset..offset + len].to_vec()),
None => Ok(data[offset..].to_vec()),
}
}
FileContent::External(external) => {
external
.read_at(byte_offset, len_limit, forest, store)
.await
}
}
}
pub fn get_metadata(&self) -> &Metadata {
&self.content.metadata
}
pub fn get_metadata_mut(&mut self) -> &mut Metadata {
&mut self.content.metadata
}
pub fn get_metadata_mut_rc(self: &mut Arc<Self>) -> Result<&mut Metadata> {
Ok(self.prepare_next_revision()?.get_metadata_mut())
}
pub async fn size(&self, forest: &impl PrivateForest, store: &impl BlockStore) -> Result<u64> {
match &self.content.content {
FileContent::Inline { data } => Ok(data.len() as u64),
FileContent::External(forest_content) => forest_content.size(forest, store).await,
}
}
pub async fn get_content(
&self,
forest: &impl PrivateForest,
store: &impl BlockStore,
) -> Result<Vec<u8>> {
self.read_at(0, None, forest, store).await
}
pub async fn set_content(
&mut self,
content: impl AsyncRead + Unpin,
time: DateTime<Utc>,
forest: &mut impl PrivateForest,
store: &impl BlockStore,
rng: &mut impl CryptoRngCore,
) -> Result<()> {
self.content.metadata.upsert_mtime(time);
self.content.content =
Self::prepare_content_streaming(self.header.get_name(), content, forest, store, rng)
.await?;
Ok(())
}
pub(super) async fn prepare_content(
file_name: &Name,
content: Vec<u8>,
forest: &mut impl PrivateForest,
store: &impl BlockStore,
rng: &mut impl CryptoRngCore,
) -> Result<FileContent> {
Ok(FileContent::External(
PrivateForestContent::new(file_name, content, forest, store, rng).await?,
))
}
pub(super) async fn prepare_content_streaming(
file_name: &Name,
content: impl AsyncRead + Unpin,
forest: &mut impl PrivateForest,
store: &impl BlockStore,
rng: &mut impl CryptoRngCore,
) -> Result<FileContent> {
Ok(FileContent::External(
PrivateForestContent::new_streaming(file_name, content, forest, store, rng).await?,
))
}
pub(crate) fn prepare_next_revision(self: &mut Arc<Self>) -> Result<&mut Self> {
let previous_cid = match self.content.persisted_as.get() {
Some(cid) => *cid,
None => {
return Ok(Arc::make_mut(self));
}
};
let temporal_key = self.header.derive_temporal_key();
let previous_link = (1, Encrypted::from_value(previous_cid, &temporal_key)?);
let cloned = Arc::make_mut(self);
cloned.content.persisted_as = OnceCell::new();
cloned.content.previous = [previous_link].into_iter().collect();
cloned.header.advance_ratchet();
Ok(cloned)
}
pub(crate) fn prepare_next_merge(
self: &mut Arc<Self>,
current_cid: Cid,
target_header: PrivateNodeHeader,
) -> Result<&mut Self> {
let ratchet_diff = target_header.ratchet_diff_for_merge(&self.header)?;
if self.content.previous.len() > 1 {
let cloned = Arc::make_mut(self);
cloned.content.persisted_as = OnceCell::new();
cloned.header = target_header;
cloned.content.previous = std::mem::take(&mut cloned.content.previous)
.into_iter()
.map(|(ratchet_steps, link)| (ratchet_steps + ratchet_diff, link))
.collect();
return Ok(cloned);
}
let temporal_key = self.header.derive_temporal_key();
let previous_link = (
ratchet_diff,
Encrypted::from_value(current_cid, &temporal_key)?,
);
let cloned = Arc::make_mut(self);
cloned.content.persisted_as = OnceCell::new();
cloned.header = target_header;
cloned.content.previous = [previous_link].into_iter().collect();
Ok(cloned)
}
pub(crate) async fn prepare_key_rotation(
&mut self,
parent_name: &Name,
rng: &mut impl CryptoRngCore,
) -> Result<()> {
self.header.inumber = NameSegment::new(rng);
self.header.update_name(parent_name);
self.header.reset_ratchet(rng);
self.content.persisted_as = OnceCell::new();
Ok(())
}
pub(crate) async fn store(
&self,
forest: &mut impl PrivateForest,
store: &impl BlockStore,
rng: &mut impl CryptoRngCore,
) -> Result<PrivateRef> {
let header_cid = self.header.store(store, forest).await?;
let temporal_key = self.header.derive_temporal_key();
let snapshot_key = temporal_key.derive_snapshot_key();
let name_with_revision = self.header.get_revision_name();
let content_cid = self
.content
.store(header_cid, &snapshot_key, store, rng)
.await?;
forest
.put_encrypted(&name_with_revision, [header_cid, content_cid], store)
.await?;
Ok(self
.header
.derive_revision_ref(forest)
.into_private_ref(content_cid))
}
pub(crate) async fn from_serializable(
serializable: PrivateFileContentSerializable,
temporal_key: &TemporalKey,
cid: Cid,
forest: &impl PrivateForest,
store: &impl BlockStore,
parent_name: Option<Name>,
) -> Result<Self> {
if !is_readable_wnfs_version(&serializable.version) {
bail!(FsError::UnexpectedVersion(serializable.version));
}
let content = PrivateFileContent {
persisted_as: OnceCell::new_with(cid),
previous: serializable.previous.into_iter().collect(),
metadata: serializable.metadata,
content: serializable.content,
};
let header = PrivateNodeHeader::load(
&serializable.header_cid,
temporal_key,
forest,
store,
parent_name,
)
.await?;
Ok(Self { header, content })
}
pub fn as_node(self: &Arc<Self>) -> PrivateNode {
PrivateNode::File(Arc::clone(self))
}
pub(crate) fn merge(
self: &mut Arc<Self>,
target_header: PrivateNodeHeader,
our_cid: Cid,
other: &Arc<Self>,
other_cid: Cid,
) -> Result<()> {
if our_cid == other_cid {
return Ok(());
}
let other_ratchet_diff = target_header.ratchet_diff_for_merge(&other.header)?;
let our = self.prepare_next_merge(our_cid, target_header)?;
if our.content.previous.len() > 1 {
our.content.previous.extend(
other
.content
.previous
.iter()
.cloned()
.map(|(rev_back, link)| (rev_back + other_ratchet_diff, link)),
);
} else {
let temporal_key = &other.header.derive_temporal_key();
our.content.previous.insert((
other_ratchet_diff,
Encrypted::from_value(other_cid, temporal_key)?,
));
}
let our_hash = our.content.content.crdt_tiebreaker()?;
let other_hash = other.content.content.crdt_tiebreaker()?;
match our_hash.cmp(&other_hash) {
Ordering::Greater => {
our.content.content.clone_from(&other.content.content);
our.content.metadata.clone_from(&other.content.metadata);
}
Ordering::Equal => {
our.content
.metadata
.tie_break_with(&other.content.metadata)?;
}
Ordering::Less => {
}
}
Ok(())
}
}
impl PrivateFileContent {
pub(crate) fn to_dag_cbor(&self, header_cid: Cid) -> Result<Vec<u8>> {
Ok(serde_ipld_dagcbor::to_vec(
&PrivateNodeContentSerializable::File(PrivateFileContentSerializable {
version: WNFS_VERSION,
previous: self.previous.iter().cloned().collect(),
header_cid,
metadata: self.metadata.clone(),
content: self.content.clone(),
}),
)?)
}
#[allow(clippy::suspicious)]
pub(crate) async fn store(
&self,
header_cid: Cid,
snapshot_key: &SnapshotKey,
store: &impl BlockStore,
rng: &mut impl CryptoRngCore,
) -> Result<Cid> {
Ok(*self
.persisted_as
.get_or_try_init::<anyhow::Error>(async {
let bytes = self.to_dag_cbor(header_cid)?;
let block = snapshot_key.encrypt(&bytes, rng)?;
Ok(store.put_block(block, CODEC_RAW).await?)
})
.await?)
}
}
impl FileContent {
pub(crate) fn crdt_tiebreaker(&self) -> Result<[u8; 32]> {
let bytes = serde_ipld_dagcbor::to_vec(self)?;
Ok(blake3::hash(&bytes).into())
}
}
impl PrivateForestContent {
pub async fn new(
file_name: &Name,
content: Vec<u8>,
forest: &mut impl PrivateForest,
store: &impl BlockStore,
rng: &mut impl CryptoRngCore,
) -> Result<Self> {
let (key, base_name) = Self::prepare_key_and_base_name(file_name, rng);
let block_count = (content.len() as f64 / MAX_BLOCK_CONTENT_SIZE as f64).ceil() as u64;
for (name, index) in Self::generate_shard_labels(&key, 0, block_count, &base_name).zip(0..)
{
let start = index * MAX_BLOCK_CONTENT_SIZE;
let end = content.len().min((index + 1) * MAX_BLOCK_CONTENT_SIZE);
let slice = &content[start..end];
let enc_bytes = key.encrypt(slice, rng)?;
let content_cid = store.put_block(enc_bytes, CODEC_RAW).await?;
forest
.put_encrypted(&name, Some(content_cid), store)
.await?;
}
Ok(PrivateForestContent {
key,
base_name: forest.get_accumulated_name(&base_name),
block_count,
block_content_size: MAX_BLOCK_CONTENT_SIZE as u64,
})
}
pub async fn new_streaming(
file_name: &Name,
mut content: impl AsyncRead + Unpin,
forest: &mut impl PrivateForest,
store: &impl BlockStore,
rng: &mut impl CryptoRngCore,
) -> Result<Self> {
let (key, base_name) = Self::prepare_key_and_base_name(file_name, rng);
let mut block_index = 0;
loop {
let mut current_block = vec![0u8; MAX_BLOCK_SIZE];
let nonce = SnapshotKey::generate_nonce(rng);
current_block[..NONCE_SIZE].copy_from_slice(nonce.as_ref());
let content_end = NONCE_SIZE + MAX_BLOCK_CONTENT_SIZE;
let (bytes_written, done) =
utils::read_fully(&mut content, &mut current_block[NONCE_SIZE..content_end])
.await?;
current_block.truncate(bytes_written + NONCE_SIZE);
let tag = key.encrypt_in_place(&nonce, &mut current_block[NONCE_SIZE..])?;
current_block.extend_from_slice(tag.as_ref());
let content_cid = store.put_block(current_block, CODEC_RAW).await?;
let name = Self::create_block_name(&key, block_index, &base_name);
forest
.put_encrypted(&name, Some(content_cid), store)
.await?;
block_index += 1;
if done {
break;
}
}
Ok(PrivateForestContent {
key,
base_name: forest.get_accumulated_name(&base_name),
block_count: block_index,
block_content_size: MAX_BLOCK_CONTENT_SIZE as u64,
})
}
pub fn from_metadata_value(value: &Ipld) -> Result<Self> {
let wrapped: MetadataContentCapsule<Self> = from_ipld(value.clone())?;
Ok(match wrapped {
MetadataContentCapsule::PrivateForestContent(content) => content,
})
}
pub fn as_metadata_value(&self) -> Result<Ipld> {
Ok(to_ipld(MetadataContentCapsule::PrivateForestContent(
&self,
))?)
}
pub fn stream<'a>(
&'a self,
block_index: u64,
forest: &'a impl PrivateForest,
store: &'a impl BlockStore,
) -> impl Stream<Item = Result<Vec<u8>>> + 'a {
try_stream! {
for name in Self::generate_shard_labels(
&self.key,
block_index,
self.block_count,
&Name::new(self.base_name.clone(), []),
) {
let bytes = Self::decrypt_block(&self.key, &name, forest, store).await?;
yield bytes
}
}
}
pub async fn read_at<'a>(
&'a self,
byte_offset: u64,
len_limit: Option<usize>,
forest: &'a impl PrivateForest,
store: &'a impl BlockStore,
) -> Result<Vec<u8>> {
let block_content_size = MAX_BLOCK_CONTENT_SIZE as u64;
let mut chunk_size_upper_bound = (self.get_size_upper_bound() - byte_offset) as usize;
if let Some(len_limit) = len_limit {
chunk_size_upper_bound = chunk_size_upper_bound.min(len_limit);
}
if chunk_size_upper_bound == 0 {
return Ok(vec![]);
}
let first_block = byte_offset / block_content_size;
let last_block = len_limit.map(|len| (byte_offset + len as u64) / block_content_size);
let mut bytes = Vec::with_capacity(chunk_size_upper_bound);
let mut content_stream = Box::pin(self.stream(first_block, forest, store)).enumerate();
while let Some((i, chunk)) = content_stream.next().await {
let chunk = chunk?;
let index = first_block + i as u64;
let from = if index == first_block {
(byte_offset - index * block_content_size).min(chunk.len() as u64)
} else {
0
};
let to = if Some(index) == last_block {
(byte_offset + len_limit.unwrap_or_default() as u64 - index * block_content_size)
.min(chunk.len() as u64)
} else {
chunk.len() as u64
};
bytes.extend_from_slice(&chunk[(from as usize)..(to as usize)]);
if Some(index) == last_block {
break;
}
}
Ok(bytes)
}
pub async fn get_content(
&self,
forest: &impl PrivateForest,
store: &impl BlockStore,
) -> Result<Vec<u8>> {
let mut content = Vec::with_capacity(Self::get_size_upper_bound(self) as usize);
self.stream(0, forest, store)
.try_for_each(|chunk| {
content.extend_from_slice(&chunk);
future::ready(Ok(()))
})
.await?;
Ok(content)
}
pub fn get_size_upper_bound(&self) -> u64 {
self.block_count * self.block_content_size
}
pub async fn size(&self, forest: &impl PrivateForest, store: &impl BlockStore) -> Result<u64> {
let size_without_last_block =
std::cmp::max(0, self.block_count - 1) * self.block_content_size;
let size_last_block = self
.read_at(size_without_last_block, None, forest, store)
.await?
.len() as u64;
Ok(size_without_last_block + size_last_block)
}
pub(crate) fn generate_shard_labels<'a>(
key: &'a SnapshotKey,
mut block_index: u64,
block_count: u64,
base_name: &'a Name,
) -> impl Iterator<Item = Name> + 'a {
iter::from_fn(move || {
if block_index >= block_count {
return None;
}
let label = Self::create_block_name(key, block_index, base_name);
block_index += 1;
Some(label)
})
}
async fn decrypt_block(
key: &SnapshotKey,
name: &Name,
forest: &impl PrivateForest,
store: &impl BlockStore,
) -> Result<Vec<u8>> {
let cid = forest
.get_encrypted(name, store)
.await?
.ok_or(FsError::FileShardNotFound)?
.iter()
.next()
.expect("Expected set with at least a one cid");
let enc_bytes = store.get_block(cid).await?;
let bytes = key.decrypt(&enc_bytes)?;
Ok(bytes)
}
fn create_block_name(key: &SnapshotKey, index: u64, base_name: &Name) -> Name {
let mut vec = Vec::with_capacity(40);
vec.extend(key.0); vec.extend(index.to_le_bytes()); let block_segment = NameSegment::new_hashed(BLOCK_SEGMENT_DSI, vec);
base_name.with_segments_added(Some(block_segment))
}
fn prepare_key_and_base_name(
file_name: &Name,
rng: &mut impl CryptoRngCore,
) -> (SnapshotKey, Name) {
let key = SnapshotKey::new(rng);
let hiding_segment = NameSegment::new_hashed(HIDING_SEGMENT_DSI, key.as_bytes());
let base_name = file_name.with_segments_added(Some(hiding_segment));
(key, base_name)
}
}
impl PartialEq for PrivateFileContent {
fn eq(&self, other: &Self) -> bool {
self.previous == other.previous
&& self.metadata == other.metadata
&& self.content == other.content
}
}
impl Clone for PrivateFileContent {
fn clone(&self) -> Self {
Self {
persisted_as: self
.persisted_as
.get()
.cloned()
.map(OnceCell::new_with)
.unwrap_or_default(),
previous: self.previous.clone(),
metadata: self.metadata.clone(),
content: self.content.clone(),
}
}
}
impl Id for PrivateFile {
fn get_id(&self) -> String {
format!("{:p}", &self.header)
}
}
impl std::fmt::Debug for PrivateFileContent {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PrivateFileContent")
.field(
"persisted_as",
&OnceCellDebug(self.persisted_as.get().map(|cid| format!("{cid}"))),
)
.field("previous", &self.previous)
.field("metadata", &self.metadata)
.field("content", &self.content)
.finish()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::private::forest::hamt::HamtForest;
use async_std::fs::File;
use rand::Rng;
use rand_chacha::ChaCha12Rng;
use rand_core::SeedableRng;
use wnfs_common::MemoryBlockStore;
#[async_std::test]
async fn can_create_empty_file() {
let store = &MemoryBlockStore::new();
let rng = &mut ChaCha12Rng::seed_from_u64(0);
let forest = &HamtForest::new_rsa_2048_rc(rng);
let file = PrivateFile::new(&forest.empty_name(), Utc::now(), rng);
let file_content = file.get_content(forest, store).await.unwrap();
assert!(file_content.is_empty());
}
#[async_std::test]
async fn can_stream_limited_content_from_file() {
let mut content = vec![0u8; MAX_BLOCK_CONTENT_SIZE * 5];
rand::thread_rng().fill(&mut content[..]);
let store = &MemoryBlockStore::new();
let rng = &mut ChaCha12Rng::seed_from_u64(0);
let forest = &mut HamtForest::new_rsa_2048_rc(rng);
let file = PrivateFile::with_content(
&forest.empty_name(),
Utc::now(),
content.clone(),
forest,
store,
rng,
)
.await
.unwrap();
let mut collected_content = Vec::new();
let mut block_limit = 2;
file.stream_content(2, forest, store)
.for_each(|chunk| {
if block_limit == 0 {
return future::ready(());
}
collected_content.extend_from_slice(&chunk.unwrap());
block_limit -= 1;
future::ready(())
})
.await;
assert_eq!(
collected_content,
content[2 * MAX_BLOCK_CONTENT_SIZE..4 * MAX_BLOCK_CONTENT_SIZE]
);
}
#[async_std::test]
async fn can_construct_file_from_stream() {
let disk_file = File::open("./test/fixtures/Clara Schumann, Scherzo no. 2, Op. 14.mp3")
.await
.unwrap();
let store = &MemoryBlockStore::new();
let rng = &mut ChaCha12Rng::seed_from_u64(0);
let forest = &mut HamtForest::new_rsa_2048_rc(rng);
let file = PrivateFile::with_content_streaming(
&forest.empty_name(),
Utc::now(),
disk_file,
forest,
store,
rng,
)
.await
.unwrap();
assert!(
matches!(file.content.content, FileContent::External(PrivateForestContent { block_count, .. }) if block_count > 0)
);
}
}
#[cfg(test)]
mod proptests {
use super::MAX_BLOCK_CONTENT_SIZE;
use crate::private::{
PrivateFile,
forest::{hamt::HamtForest, traits::PrivateForest},
};
use async_std::io::Cursor;
use chrono::Utc;
use futures::{StreamExt, future};
use proptest::{prop_assert, prop_assert_eq};
use rand_chacha::ChaCha12Rng;
use rand_core::SeedableRng;
use test_strategy::proptest;
use wnfs_common::{BlockStoreError, MemoryBlockStore};
const FIXTURE_SCHERZO_SIZE: usize = 4028150;
#[proptest(cases = 100)]
fn can_include_and_get_content_from_file(
#[strategy(0..(MAX_BLOCK_CONTENT_SIZE * 2))] length: usize,
) {
async_std::task::block_on(async {
let content = vec![0u8; length];
let store = &MemoryBlockStore::new();
let rng = &mut ChaCha12Rng::seed_from_u64(0);
let forest = &mut HamtForest::new_rsa_2048_rc(rng);
let file = PrivateFile::with_content(
&forest.empty_name(),
Utc::now(),
content.clone(),
forest,
store,
rng,
)
.await
.unwrap();
let collected_content = file.get_content(forest, store).await.unwrap();
prop_assert_eq!(collected_content, content);
Ok(())
})?;
}
#[proptest(cases = 10)]
fn can_include_and_stream_content_from_file(
#[strategy(0..(MAX_BLOCK_CONTENT_SIZE * 2))] length: usize,
) {
async_std::task::block_on(async {
let content = vec![0u8; length];
let store = &MemoryBlockStore::new();
let rng = &mut ChaCha12Rng::seed_from_u64(0);
let forest = &mut HamtForest::new_rsa_2048_rc(rng);
let file = PrivateFile::with_content(
&forest.empty_name(),
Utc::now(),
content.clone(),
forest,
store,
rng,
)
.await
.unwrap();
let mut collected_content = Vec::new();
file.stream_content(0, forest, store)
.for_each(|chunk| {
collected_content.extend_from_slice(&chunk.unwrap());
future::ready(())
})
.await;
prop_assert_eq!(collected_content, content);
Ok(())
})?;
}
#[proptest(cases = 100)]
fn can_propagate_missing_chunk_error(
#[strategy(0..(MAX_BLOCK_CONTENT_SIZE * 2))] length: usize,
) {
async_std::task::block_on(async {
let store = &MemoryBlockStore::new();
let rng = &mut ChaCha12Rng::seed_from_u64(0);
let forest = &mut HamtForest::new_rsa_2048_rc(rng);
let mut file = PrivateFile::new(&forest.empty_name(), Utc::now(), rng);
file.set_content(
&mut Cursor::new(vec![5u8; length]),
Utc::now(),
forest,
&MemoryBlockStore::default(),
rng,
)
.await
.unwrap();
let error = file
.get_content(forest, store)
.await
.expect_err("Expected error");
let error = error.downcast_ref::<BlockStoreError>().unwrap();
prop_assert!(matches!(error, BlockStoreError::CIDNotFound(_)));
Ok(())
})?;
}
#[proptest(cases = 10)]
fn can_read_section_of_file(
#[strategy(0..FIXTURE_SCHERZO_SIZE)] size: usize,
#[strategy(0..FIXTURE_SCHERZO_SIZE as u64)] offset: u64,
) {
use async_std::{io::SeekFrom, prelude::*};
async_std::task::block_on(async {
let size = size.min(FIXTURE_SCHERZO_SIZE - offset as usize);
let mut disk_file = async_std::fs::File::open(
"./test/fixtures/Clara Schumann, Scherzo no. 2, Op. 14.mp3",
)
.await
.unwrap();
let rng = &mut ChaCha12Rng::seed_from_u64(0);
let forest = &mut HamtForest::new_rsa_2048_rc(rng);
let store = &MemoryBlockStore::new();
let file = PrivateFile::with_content_streaming(
&forest.empty_name(),
Utc::now(),
disk_file.clone(),
forest,
store,
rng,
)
.await
.unwrap();
let mut source_content = vec![0u8; size];
disk_file.seek(SeekFrom::Start(offset)).await.unwrap();
disk_file.read_exact(&mut source_content).await.unwrap();
let wnfs_content = file
.read_at(offset, Some(size), forest, store)
.await
.unwrap();
prop_assert_eq!(source_content, wnfs_content);
Ok(())
})?;
}
}