1use std::collections::BTreeMap;
5use std::fs;
6
7use anyhow::{Context, Result};
8use filecoin_proofs_api::StorageProofsError;
9use tracing::debug;
10
11use super::tasks::{
12 AddPieces, SnapEncode, SnapProve, Transfer, TransferRoute, TreeD, WindowPoSt, WindowPoStOutput, WinningPoSt, WinningPoStOutput, C2,
13 PC1, PC2,
14};
15use crate::core::{Processor, Task};
16use crate::fil_proofs::{
17 create_tree_d, generate_window_post, generate_winning_post, seal_commit_phase2, seal_pre_commit_phase1, seal_pre_commit_phase2,
18 snap_encode_into, snap_generate_sector_update_proof, to_prover_id, write_and_preprocess, PartitionProofBytes, PrivateReplicaInfo,
19};
20
21pub mod piece;
22mod transfer;
23
24#[derive(Copy, Clone, Default, Debug)]
25pub struct BuiltinProcessor;
26
27impl Processor<AddPieces> for BuiltinProcessor {
28 fn process(&self, task: AddPieces) -> Result<<AddPieces as Task>::Output> {
29 let staged_file = fs::OpenOptions::new()
30 .create(true)
31 .read(true)
32 .write(true)
33 .truncate(true)
35 .open(&task.staged_filepath)
36 .with_context(|| format!("open staged file: {}", task.staged_filepath.display()))?;
37
38 let mut piece_infos = Vec::with_capacity(task.pieces.len().min(1));
39 for piece in task.pieces {
40 debug!(piece = ?piece, "trying to add piece");
41 let source = piece::fetcher::open(piece.piece_file, piece.payload_size, piece.piece_size.0).context("open piece file")?;
42 let (piece_info, _) =
43 write_and_preprocess(task.seal_proof_type, source, &staged_file, piece.piece_size).context("add piece")?;
44 piece_infos.push(piece_info);
45 }
46
47 if piece_infos.is_empty() {
48 let sector_size: u64 = task.seal_proof_type.sector_size().into();
49
50 let pi = piece::add_piece_for_cc_sector(&staged_file, sector_size).context("add piece for cc secrtor")?;
51 piece_infos.push(pi);
52 }
53
54 Ok(piece_infos)
55 }
56}
57
58impl Processor<TreeD> for BuiltinProcessor {
59 fn process(&self, task: TreeD) -> Result<<TreeD as Task>::Output> {
60 create_tree_d(task.registered_proof, Some(task.staged_file), task.cache_dir).map(|_| true)
61 }
62}
63
64impl Processor<PC1> for BuiltinProcessor {
65 fn process(&self, task: PC1) -> Result<<PC1 as Task>::Output> {
66 seal_pre_commit_phase1(
67 task.registered_proof,
68 task.cache_path,
69 task.in_path,
70 task.out_path,
71 task.prover_id,
72 task.sector_id,
73 task.ticket,
74 &task.piece_infos[..],
75 )
76 }
77}
78
79impl Processor<PC2> for BuiltinProcessor {
80 fn process(&self, task: PC2) -> Result<<PC2 as Task>::Output> {
81 seal_pre_commit_phase2(task.pc1out, task.cache_dir, task.sealed_file)
82 }
83}
84
85impl Processor<C2> for BuiltinProcessor {
86 fn process(&self, task: C2) -> Result<<C2 as Task>::Output> {
87 seal_commit_phase2(task.c1out, task.prover_id, task.sector_id)
88 }
89}
90
91impl Processor<SnapEncode> for BuiltinProcessor {
92 fn process(&self, task: SnapEncode) -> Result<<SnapEncode as Task>::Output> {
93 snap_encode_into(
94 task.registered_proof,
95 task.new_replica_path,
96 task.new_cache_path,
97 task.sector_path,
98 task.sector_cache_path,
99 task.staged_data_path,
100 &task.piece_infos[..],
101 )
102 }
103}
104
105impl Processor<SnapProve> for BuiltinProcessor {
106 fn process(&self, task: SnapProve) -> Result<<SnapProve as Task>::Output> {
107 snap_generate_sector_update_proof(
108 task.registered_proof,
109 task.vannilla_proofs.into_iter().map(PartitionProofBytes).collect(),
110 task.comm_r_old,
111 task.comm_r_new,
112 task.comm_d_new,
113 )
114 }
115}
116
117impl Processor<Transfer> for BuiltinProcessor {
118 fn process(&self, task: Transfer) -> Result<<Transfer as Task>::Output> {
119 task.routes.into_iter().try_for_each(|route| transfer::do_transfer(&route))?;
120
121 Ok(true)
122 }
123}
124
125impl Processor<WindowPoSt> for BuiltinProcessor {
126 fn process(&self, task: WindowPoSt) -> Result<<WindowPoSt as Task>::Output> {
127 let replicas = BTreeMap::from_iter(task.replicas.into_iter().map(|rep| {
128 (
129 rep.sector_id,
130 PrivateReplicaInfo::new(task.proof_type, rep.comm_r, rep.cache_dir, rep.sealed_file),
131 )
132 }));
133
134 generate_window_post(&task.seed, &replicas, to_prover_id(task.miner_id))
135 .map(|proofs| WindowPoStOutput {
136 proofs: proofs.into_iter().map(|r| r.1).collect(),
137 faults: vec![],
138 })
139 .or_else(|e| {
140 if let Some(StorageProofsError::FaultySectors(sectors)) = e.downcast_ref::<StorageProofsError>() {
141 return Ok(WindowPoStOutput {
142 proofs: vec![],
143 faults: sectors.iter().map(|id| (*id).into()).collect(),
144 });
145 }
146
147 Err(e)
148 })
149 }
150}
151
152impl Processor<WinningPoSt> for BuiltinProcessor {
153 fn process(&self, task: WinningPoSt) -> Result<<WinningPoSt as Task>::Output> {
154 let replicas = BTreeMap::from_iter(task.replicas.into_iter().map(|rep| {
155 (
156 rep.sector_id,
157 PrivateReplicaInfo::new(task.proof_type, rep.comm_r, rep.cache_dir, rep.sealed_file),
158 )
159 }));
160
161 generate_winning_post(&task.seed, &replicas, to_prover_id(task.miner_id)).map(|proofs| WinningPoStOutput {
162 proofs: proofs.into_iter().map(|r| r.1).collect(),
163 })
164 }
165}