vc_processors/builtin/
processors.rs

1//! Built-in processors.
2//!
3
4use std::collections::BTreeMap;
5use std::fs;
6
7use anyhow::{Context, Result};
8use filecoin_proofs_api::StorageProofsError;
9use tracing::debug;
10
11use super::tasks::{
12    AddPieces, SnapEncode, SnapProve, Transfer, TransferRoute, TreeD, WindowPoSt, WindowPoStOutput, WinningPoSt, WinningPoStOutput, C2,
13    PC1, PC2,
14};
15use crate::core::{Processor, Task};
16use crate::fil_proofs::{
17    create_tree_d, generate_window_post, generate_winning_post, seal_commit_phase2, seal_pre_commit_phase1, seal_pre_commit_phase2,
18    snap_encode_into, snap_generate_sector_update_proof, to_prover_id, write_and_preprocess, PartitionProofBytes, PrivateReplicaInfo,
19};
20
21pub mod piece;
22mod transfer;
23
24#[derive(Copy, Clone, Default, Debug)]
25pub struct BuiltinProcessor;
26
27impl Processor<AddPieces> for BuiltinProcessor {
28    fn process(&self, task: AddPieces) -> Result<<AddPieces as Task>::Output> {
29        let staged_file = fs::OpenOptions::new()
30            .create(true)
31            .read(true)
32            .write(true)
33            // to make sure that we won't write into the staged file with any data exists
34            .truncate(true)
35            .open(&task.staged_filepath)
36            .with_context(|| format!("open staged file: {}", task.staged_filepath.display()))?;
37
38        let mut piece_infos = Vec::with_capacity(task.pieces.len().min(1));
39        for piece in task.pieces {
40            debug!(piece = ?piece, "trying to add piece");
41            let source = piece::fetcher::open(piece.piece_file, piece.payload_size, piece.piece_size.0).context("open piece file")?;
42            let (piece_info, _) =
43                write_and_preprocess(task.seal_proof_type, source, &staged_file, piece.piece_size).context("add piece")?;
44            piece_infos.push(piece_info);
45        }
46
47        if piece_infos.is_empty() {
48            let sector_size: u64 = task.seal_proof_type.sector_size().into();
49
50            let pi = piece::add_piece_for_cc_sector(&staged_file, sector_size).context("add piece for cc secrtor")?;
51            piece_infos.push(pi);
52        }
53
54        Ok(piece_infos)
55    }
56}
57
58impl Processor<TreeD> for BuiltinProcessor {
59    fn process(&self, task: TreeD) -> Result<<TreeD as Task>::Output> {
60        create_tree_d(task.registered_proof, Some(task.staged_file), task.cache_dir).map(|_| true)
61    }
62}
63
64impl Processor<PC1> for BuiltinProcessor {
65    fn process(&self, task: PC1) -> Result<<PC1 as Task>::Output> {
66        seal_pre_commit_phase1(
67            task.registered_proof,
68            task.cache_path,
69            task.in_path,
70            task.out_path,
71            task.prover_id,
72            task.sector_id,
73            task.ticket,
74            &task.piece_infos[..],
75        )
76    }
77}
78
79impl Processor<PC2> for BuiltinProcessor {
80    fn process(&self, task: PC2) -> Result<<PC2 as Task>::Output> {
81        seal_pre_commit_phase2(task.pc1out, task.cache_dir, task.sealed_file)
82    }
83}
84
85impl Processor<C2> for BuiltinProcessor {
86    fn process(&self, task: C2) -> Result<<C2 as Task>::Output> {
87        seal_commit_phase2(task.c1out, task.prover_id, task.sector_id)
88    }
89}
90
91impl Processor<SnapEncode> for BuiltinProcessor {
92    fn process(&self, task: SnapEncode) -> Result<<SnapEncode as Task>::Output> {
93        snap_encode_into(
94            task.registered_proof,
95            task.new_replica_path,
96            task.new_cache_path,
97            task.sector_path,
98            task.sector_cache_path,
99            task.staged_data_path,
100            &task.piece_infos[..],
101        )
102    }
103}
104
105impl Processor<SnapProve> for BuiltinProcessor {
106    fn process(&self, task: SnapProve) -> Result<<SnapProve as Task>::Output> {
107        snap_generate_sector_update_proof(
108            task.registered_proof,
109            task.vannilla_proofs.into_iter().map(PartitionProofBytes).collect(),
110            task.comm_r_old,
111            task.comm_r_new,
112            task.comm_d_new,
113        )
114    }
115}
116
117impl Processor<Transfer> for BuiltinProcessor {
118    fn process(&self, task: Transfer) -> Result<<Transfer as Task>::Output> {
119        task.routes.into_iter().try_for_each(|route| transfer::do_transfer(&route))?;
120
121        Ok(true)
122    }
123}
124
125impl Processor<WindowPoSt> for BuiltinProcessor {
126    fn process(&self, task: WindowPoSt) -> Result<<WindowPoSt as Task>::Output> {
127        let replicas = BTreeMap::from_iter(task.replicas.into_iter().map(|rep| {
128            (
129                rep.sector_id,
130                PrivateReplicaInfo::new(task.proof_type, rep.comm_r, rep.cache_dir, rep.sealed_file),
131            )
132        }));
133
134        generate_window_post(&task.seed, &replicas, to_prover_id(task.miner_id))
135            .map(|proofs| WindowPoStOutput {
136                proofs: proofs.into_iter().map(|r| r.1).collect(),
137                faults: vec![],
138            })
139            .or_else(|e| {
140                if let Some(StorageProofsError::FaultySectors(sectors)) = e.downcast_ref::<StorageProofsError>() {
141                    return Ok(WindowPoStOutput {
142                        proofs: vec![],
143                        faults: sectors.iter().map(|id| (*id).into()).collect(),
144                    });
145                }
146
147                Err(e)
148            })
149    }
150}
151
152impl Processor<WinningPoSt> for BuiltinProcessor {
153    fn process(&self, task: WinningPoSt) -> Result<<WinningPoSt as Task>::Output> {
154        let replicas = BTreeMap::from_iter(task.replicas.into_iter().map(|rep| {
155            (
156                rep.sector_id,
157                PrivateReplicaInfo::new(task.proof_type, rep.comm_r, rep.cache_dir, rep.sealed_file),
158            )
159        }));
160
161        generate_winning_post(&task.seed, &replicas, to_prover_id(task.miner_id)).map(|proofs| WinningPoStOutput {
162            proofs: proofs.into_iter().map(|r| r.1).collect(),
163        })
164    }
165}