use std::fs::{self, File, OpenOptions};
use std::io::{self, BufReader, BufWriter, Read, Write};
use std::path::{Path, PathBuf};
use anyhow::{ensure, Context, Result};
use bincode::deserialize;
use filecoin_hashers::Hasher;
use fr32::{write_unpadded, Fr32Reader};
use log::{info, trace};
use memmap::MmapOptions;
use merkletree::store::{DiskStore, LevelCacheStore, StoreConfig};
use storage_proofs_core::{
cache_key::CacheKey,
measurements::{measure_op, Operation},
merkle::get_base_tree_count,
pieces::generate_piece_commitment_bytes_from_source,
sector::SectorId,
util::default_rows_to_discard,
};
use storage_proofs_porep::{
stacked::{generate_replica_id, PersistentAux, StackedDrg, TemporaryAux},
PoRep,
};
pub use storage_proofs_update::constants::TreeRHasher;
use typenum::Unsigned;
use crate::{
commitment_reader::CommitmentReader,
constants::{
DefaultBinaryTree, DefaultOctTree, DefaultPieceDomain, DefaultPieceHasher,
MINIMUM_RESERVED_BYTES_FOR_PIECE_IN_FULLY_ALIGNED_SECTOR as MINIMUM_PIECE_SIZE,
},
parameters::public_params,
pieces::{get_piece_alignment, sum_piece_bytes_with_alignment},
types::{
Commitment, MerkleTreeTrait, PaddedBytesAmount, PieceInfo, PoRepConfig,
PoRepProofPartitions, ProverId, SealPreCommitPhase1Output, Ticket, UnpaddedByteIndex,
UnpaddedBytesAmount,
},
};
mod fake_seal;
mod post_util;
mod seal;
mod update;
mod util;
mod window_post;
mod winning_post;
pub use fake_seal::*;
pub use post_util::*;
pub use seal::*;
pub use update::*;
pub use util::*;
pub use window_post::*;
pub use winning_post::*;
pub use storage_proofs_update::constants::{hs, partition_count};
#[allow(clippy::too_many_arguments)]
pub fn get_unsealed_range<T: Into<PathBuf> + AsRef<Path>, Tree: 'static + MerkleTreeTrait>(
porep_config: PoRepConfig,
cache_path: T,
sealed_path: T,
output_path: T,
prover_id: ProverId,
sector_id: SectorId,
comm_d: Commitment,
ticket: Ticket,
offset: UnpaddedByteIndex,
num_bytes: UnpaddedBytesAmount,
) -> Result<UnpaddedBytesAmount> {
info!("get_unsealed_range:start");
let f_out = File::create(&output_path)
.with_context(|| format!("could not create output_path={:?}", output_path.as_ref()))?;
let buf_f_out = BufWriter::new(f_out);
let result = unseal_range_mapped::<_, _, Tree>(
porep_config,
cache_path,
sealed_path.into(),
buf_f_out,
prover_id,
sector_id,
comm_d,
ticket,
offset,
num_bytes,
);
info!("get_unsealed_range:finish");
result
}
#[allow(clippy::too_many_arguments)]
pub fn unseal_range<P, R, W, Tree>(
porep_config: PoRepConfig,
cache_path: P,
mut sealed_sector: R,
unsealed_output: W,
prover_id: ProverId,
sector_id: SectorId,
comm_d: Commitment,
ticket: Ticket,
offset: UnpaddedByteIndex,
num_bytes: UnpaddedBytesAmount,
) -> Result<UnpaddedBytesAmount>
where
P: Into<PathBuf> + AsRef<Path>,
R: Read,
W: Write,
Tree: 'static + MerkleTreeTrait,
{
info!("unseal_range:start");
ensure!(comm_d != [0; 32], "Invalid all zero commitment (comm_d)");
let comm_d =
as_safe_commitment::<<DefaultPieceHasher as Hasher>::Domain, _>(&comm_d, "comm_d")?;
let replica_id = generate_replica_id::<Tree::Hasher, _>(
&prover_id,
sector_id.into(),
&ticket,
comm_d,
&porep_config.porep_id,
);
let mut data = Vec::new();
sealed_sector.read_to_end(&mut data)?;
let res = unseal_range_inner::<_, _, Tree>(
porep_config,
cache_path,
&mut data,
unsealed_output,
replica_id,
offset,
num_bytes,
)?;
info!("unseal_range:finish");
Ok(res)
}
#[allow(clippy::too_many_arguments)]
pub fn unseal_range_mapped<P, W, Tree>(
porep_config: PoRepConfig,
cache_path: P,
sealed_path: PathBuf,
unsealed_output: W,
prover_id: ProverId,
sector_id: SectorId,
comm_d: Commitment,
ticket: Ticket,
offset: UnpaddedByteIndex,
num_bytes: UnpaddedBytesAmount,
) -> Result<UnpaddedBytesAmount>
where
P: Into<PathBuf> + AsRef<Path>,
W: Write,
Tree: 'static + MerkleTreeTrait,
{
info!("unseal_range_mapped:start");
ensure!(comm_d != [0; 32], "Invalid all zero commitment (comm_d)");
let comm_d =
as_safe_commitment::<<DefaultPieceHasher as Hasher>::Domain, _>(&comm_d, "comm_d")?;
let replica_id = generate_replica_id::<Tree::Hasher, _>(
&prover_id,
sector_id.into(),
&ticket,
comm_d,
&porep_config.porep_id,
);
let mapped_file = OpenOptions::new()
.read(true)
.write(true)
.open(&sealed_path)?;
let mut data = unsafe { MmapOptions::new().map_copy(&mapped_file)? };
let result = unseal_range_inner::<_, _, Tree>(
porep_config,
cache_path,
&mut data,
unsealed_output,
replica_id,
offset,
num_bytes,
);
info!("unseal_range_mapped:finish");
result
}
#[allow(clippy::too_many_arguments)]
fn unseal_range_inner<P, W, Tree>(
porep_config: PoRepConfig,
cache_path: P,
data: &mut [u8],
mut unsealed_output: W,
replica_id: <Tree::Hasher as Hasher>::Domain,
offset: UnpaddedByteIndex,
num_bytes: UnpaddedBytesAmount,
) -> Result<UnpaddedBytesAmount>
where
P: Into<PathBuf> + AsRef<Path>,
W: Write,
Tree: 'static + MerkleTreeTrait,
{
trace!("unseal_range_inner:start");
let base_tree_size = get_base_tree_size::<DefaultBinaryTree>(porep_config.sector_size)?;
let base_tree_leafs = get_base_tree_leafs::<DefaultBinaryTree>(base_tree_size)?;
let config = StoreConfig::new(
cache_path.as_ref(),
CacheKey::CommDTree.to_string(),
default_rows_to_discard(
base_tree_leafs,
<DefaultBinaryTree as MerkleTreeTrait>::Arity::to_usize(),
),
);
let pp = public_params(
PaddedBytesAmount::from(porep_config),
usize::from(PoRepProofPartitions::from(porep_config)),
porep_config.porep_id,
porep_config.api_version,
)?;
let offset_padded: PaddedBytesAmount = UnpaddedBytesAmount::from(offset).into();
let num_bytes_padded: PaddedBytesAmount = num_bytes.into();
StackedDrg::<Tree, DefaultPieceHasher>::extract_all(&pp, &replica_id, data, Some(config))?;
let start: usize = offset_padded.into();
let end = start + usize::from(num_bytes_padded);
let unsealed = &data[start..end];
let written = write_unpadded(unsealed, &mut unsealed_output, 0, num_bytes.into())
.context("write_unpadded failed")?;
let amount = UnpaddedBytesAmount(written as u64);
trace!("unseal_range_inner:finish");
Ok(amount)
}
pub fn generate_piece_commitment<T: Read>(
source: T,
piece_size: UnpaddedBytesAmount,
) -> Result<PieceInfo> {
trace!("generate_piece_commitment:start");
let result = measure_op(Operation::GeneratePieceCommitment, || {
ensure_piece_size(piece_size)?;
let source = BufReader::new(source);
let mut fr32_reader = Fr32Reader::new(source);
let commitment = generate_piece_commitment_bytes_from_source::<DefaultPieceHasher>(
&mut fr32_reader,
PaddedBytesAmount::from(piece_size).into(),
)?;
PieceInfo::new(commitment, piece_size)
});
trace!("generate_piece_commitment:finish");
result
}
pub fn add_piece<R, W>(
source: R,
target: W,
piece_size: UnpaddedBytesAmount,
piece_lengths: &[UnpaddedBytesAmount],
) -> Result<(PieceInfo, UnpaddedBytesAmount)>
where
R: Read,
W: Write,
{
trace!("add_piece:start");
let result = measure_op(Operation::AddPiece, || {
ensure_piece_size(piece_size)?;
let source = BufReader::new(source);
let mut target = BufWriter::new(target);
let written_bytes = sum_piece_bytes_with_alignment(piece_lengths);
let piece_alignment = get_piece_alignment(written_bytes, piece_size);
let fr32_reader = Fr32Reader::new(source);
for _ in 0..usize::from(PaddedBytesAmount::from(piece_alignment.left_bytes)) {
target.write_all(&[0u8][..])?;
}
let mut commitment_reader = CommitmentReader::new(fr32_reader);
let n = io::copy(&mut commitment_reader, &mut target)
.context("failed to write and preprocess bytes")?;
ensure!(n != 0, "add_piece: read 0 bytes before EOF from source");
let n = PaddedBytesAmount(n as u64);
let n: UnpaddedBytesAmount = n.into();
ensure!(n == piece_size, "add_piece: invalid bytes amount written");
for _ in 0..usize::from(PaddedBytesAmount::from(piece_alignment.right_bytes)) {
target.write_all(&[0u8][..])?;
}
let commitment = commitment_reader.finish()?;
let mut comm = [0u8; 32];
comm.copy_from_slice(commitment.as_ref());
let written = piece_alignment.left_bytes + piece_alignment.right_bytes + piece_size;
Ok((PieceInfo::new(comm, n)?, written))
});
trace!("add_piece:finish");
result
}
fn ensure_piece_size(piece_size: UnpaddedBytesAmount) -> Result<()> {
ensure!(
piece_size >= UnpaddedBytesAmount(MINIMUM_PIECE_SIZE),
"Piece must be at least {} bytes",
MINIMUM_PIECE_SIZE
);
let padded_piece_size: PaddedBytesAmount = piece_size.into();
ensure!(
u64::from(padded_piece_size).is_power_of_two(),
"Bit-padded piece size must be a power of 2 ({:?})",
padded_piece_size,
);
Ok(())
}
pub fn write_and_preprocess<R, W>(
source: R,
target: W,
piece_size: UnpaddedBytesAmount,
) -> Result<(PieceInfo, UnpaddedBytesAmount)>
where
R: Read,
W: Write,
{
add_piece(source, target, piece_size, Default::default())
}
fn verify_store(config: &StoreConfig, arity: usize, required_configs: usize) -> Result<()> {
let store_path = StoreConfig::data_path(&config.path, &config.id);
if !Path::new(&store_path).exists() {
let orig_path = store_path
.clone()
.into_os_string()
.into_string()
.expect("failed to convert store_path to string");
let mut configs: Vec<StoreConfig> = Vec::with_capacity(required_configs);
for i in 0..required_configs {
let cur_path = orig_path
.clone()
.replace(".dat", format!("-{}.dat", i).as_str());
if Path::new(&cur_path).exists() {
let path_str = cur_path.as_str();
let tree_names = vec!["tree-d", "tree-c", "tree-r-last"];
for name in tree_names {
if path_str.contains(name) {
configs.push(StoreConfig::from_config(
config,
format!("{}-{}", name, i),
None,
));
break;
}
}
}
}
ensure!(
configs.len() == required_configs,
"Missing store file (or associated split paths): {}",
store_path.display()
);
let store_len = config.size.expect("disk store size not configured");
for config in &configs {
let data_path = StoreConfig::data_path(&config.path, &config.id);
trace!(
"verify_store: {:?} has length {} bytes",
&data_path,
std::fs::metadata(&data_path)?.len()
);
ensure!(
DiskStore::<DefaultPieceDomain>::is_consistent(store_len, arity, config,)?,
"Store is inconsistent: {:?}",
&data_path
);
}
} else {
trace!(
"verify_store: {:?} has length {}",
&store_path,
std::fs::metadata(&store_path)?.len()
);
ensure!(
DiskStore::<DefaultPieceDomain>::is_consistent(
config.size.expect("disk store size not configured"),
arity,
config,
)?,
"Store is inconsistent: {:?}",
store_path
);
}
Ok(())
}
fn verify_level_cache_store<Tree: MerkleTreeTrait>(config: &StoreConfig) -> Result<()> {
let store_path = StoreConfig::data_path(&config.path, &config.id);
if !Path::new(&store_path).exists() {
let required_configs = get_base_tree_count::<Tree>();
let orig_path = store_path
.clone()
.into_os_string()
.into_string()
.expect("failed to convert store_path to string");
let mut configs: Vec<StoreConfig> = Vec::with_capacity(required_configs);
for i in 0..required_configs {
let cur_path = orig_path
.clone()
.replace(".dat", format!("-{}.dat", i).as_str());
if Path::new(&cur_path).exists() {
let path_str = cur_path.as_str();
let tree_names = vec!["tree-d", "tree-c", "tree-r-last"];
for name in tree_names {
if path_str.contains(name) {
configs.push(StoreConfig::from_config(
config,
format!("{}-{}", name, i),
None,
));
break;
}
}
}
}
ensure!(
configs.len() == required_configs,
"Missing store file (or associated split paths): {}",
store_path.display()
);
let store_len = config.size.expect("disk store size not configured");
for config in &configs {
let data_path = StoreConfig::data_path(&config.path, &config.id);
trace!(
"verify_store: {:?} has length {}",
&data_path,
std::fs::metadata(&data_path)?.len()
);
ensure!(
LevelCacheStore::<DefaultPieceDomain, File>::is_consistent(
store_len,
Tree::Arity::to_usize(),
config,
)?,
"Store is inconsistent: {:?}",
&data_path
);
}
} else {
trace!(
"verify_store: {:?} has length {}",
&store_path,
std::fs::metadata(&store_path)?.len()
);
ensure!(
LevelCacheStore::<DefaultPieceDomain, File>::is_consistent(
config.size.expect("disk store size not configured"),
Tree::Arity::to_usize(),
config,
)?,
"Store is inconsistent: {:?}",
store_path
);
}
Ok(())
}
pub fn validate_cache_for_precommit_phase2<R, T, Tree: MerkleTreeTrait>(
cache_path: R,
replica_path: T,
seal_precommit_phase1_output: &SealPreCommitPhase1Output<Tree>,
) -> Result<()>
where
R: AsRef<Path>,
T: AsRef<Path>,
{
info!("validate_cache_for_precommit_phase2:start");
ensure!(
replica_path.as_ref().exists(),
"Missing replica: {}",
replica_path.as_ref().to_path_buf().display()
);
let cache = cache_path.as_ref().to_path_buf();
seal_precommit_phase1_output
.labels
.verify_stores(verify_store, &cache)?;
let mut config = StoreConfig::from_config(
&seal_precommit_phase1_output.config,
&seal_precommit_phase1_output.config.id,
seal_precommit_phase1_output.config.size,
);
config.path = cache_path.as_ref().into();
let result = verify_store(
&config,
<DefaultBinaryTree as MerkleTreeTrait>::Arity::to_usize(),
get_base_tree_count::<Tree>(),
);
info!("validate_cache_for_precommit_phase2:finish");
result
}
pub fn validate_cache_for_commit<R, T, Tree: MerkleTreeTrait>(
cache_path: R,
replica_path: T,
) -> Result<()>
where
R: AsRef<Path>,
T: AsRef<Path>,
{
info!("validate_cache_for_commit:start");
ensure!(
replica_path.as_ref().exists(),
"Missing replica: {}",
replica_path.as_ref().to_path_buf().display()
);
let metadata = File::open(&replica_path)?.metadata()?;
ensure!(
metadata.len() > 0,
"Replica {} exists, but is empty!",
replica_path.as_ref().to_path_buf().display()
);
let cache = &cache_path.as_ref();
let p_aux_path = cache.join(CacheKey::PAux.to_string());
let p_aux_bytes = fs::read(&p_aux_path)
.with_context(|| format!("could not read file p_aux={:?}", p_aux_path))?;
let _: PersistentAux<<Tree::Hasher as Hasher>::Domain> = deserialize(&p_aux_bytes)?;
drop(p_aux_bytes);
let t_aux = {
let t_aux_path = cache.join(CacheKey::TAux.to_string());
let t_aux_bytes = fs::read(&t_aux_path)
.with_context(|| format!("could not read file t_aux={:?}", t_aux_path))?;
let mut res: TemporaryAux<Tree, DefaultPieceHasher> = deserialize(&t_aux_bytes)?;
res.set_cache_path(&cache_path);
res
};
let cache = cache_path.as_ref().to_path_buf();
t_aux.labels.verify_stores(verify_store, &cache)?;
verify_store(
&t_aux.tree_d_config,
<DefaultBinaryTree as MerkleTreeTrait>::Arity::to_usize(),
get_base_tree_count::<Tree>(),
)?;
verify_store(
&t_aux.tree_c_config,
<DefaultOctTree as MerkleTreeTrait>::Arity::to_usize(),
get_base_tree_count::<Tree>(),
)?;
verify_level_cache_store::<DefaultOctTree>(&t_aux.tree_r_last_config)?;
info!("validate_cache_for_commit:finish");
Ok(())
}