Skip to main content

ckb_tx_pool/block_assembler/
mod.rs

1//! Generate a new block
2
3mod candidate_uncles;
4mod process;
5
6#[cfg(test)]
7mod tests;
8
9use crate::component::entry::TxEntry;
10use crate::error::BlockAssemblerError;
11pub use candidate_uncles::CandidateUncles;
12use ckb_app_config::BlockAssemblerConfig;
13use ckb_dao::DaoCalculator;
14use ckb_error::{AnyError, InternalErrorKind};
15use ckb_jsonrpc_types::{
16    BlockTemplate as JsonBlockTemplate, CellbaseTemplate, TransactionTemplate, UncleTemplate,
17};
18use ckb_logger::{debug, error, trace};
19use ckb_reward_calculator::RewardCalculator;
20use ckb_snapshot::Snapshot;
21use ckb_store::ChainStore;
22use ckb_systemtime::unix_time_as_millis;
23use ckb_types::{
24    bytes,
25    core::{
26        BlockNumber, Capacity, Cycle, EpochExt, EpochNumberWithFraction, ScriptHashType,
27        TransactionBuilder, TransactionView, UncleBlockView, Version,
28        cell::{OverlayCellChecker, TransactionsChecker},
29    },
30    packed::{
31        self, Byte32, Bytes, CellInput, CellOutput, CellbaseWitness, OutPoint, ProposalShortId,
32        Script, Transaction,
33    },
34    prelude::*,
35};
36use http_body_util::Full;
37use hyper::{Method, Request};
38use hyper_util::client::legacy::{Client, connect::HttpConnector};
39use std::collections::HashSet;
40use std::sync::{
41    Arc,
42    atomic::{AtomicU64, Ordering},
43};
44use std::time::Duration;
45use std::{cmp, iter};
46use tokio::process::Command;
47use tokio::sync::{Mutex, RwLock};
48use tokio::task::block_in_place;
49use tokio::time::timeout;
50
51use crate::TxPool;
52pub(crate) use process::process;
53
54type FailedTxs = (ProposalShortId, Option<OutPoint>);
55type CalcDaoResult = Result<(Byte32, Vec<TxEntry>, Vec<FailedTxs>), AnyError>;
56
57#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)]
58pub(crate) struct TemplateSize {
59    pub(crate) txs: usize,
60    pub(crate) proposals: usize,
61    pub(crate) uncles: usize,
62    pub(crate) total: usize,
63}
64
65impl TemplateSize {
66    pub(crate) fn calc_total_by_proposals(&self, new_proposals_size: usize) -> usize {
67        if new_proposals_size > self.proposals {
68            self.total
69                .saturating_add(new_proposals_size - self.proposals)
70        } else {
71            self.total
72                .saturating_sub(self.proposals - new_proposals_size)
73        }
74    }
75
76    pub(crate) fn calc_total_by_uncles(&self, new_uncles_size: usize) -> usize {
77        if new_uncles_size > self.uncles {
78            self.total.saturating_add(new_uncles_size - self.uncles)
79        } else {
80            self.total.saturating_sub(self.uncles - new_uncles_size)
81        }
82    }
83
84    pub(crate) fn calc_total_by_txs(&self, new_txs_size: usize) -> usize {
85        if new_txs_size > self.txs {
86            self.total.saturating_add(new_txs_size - self.txs)
87        } else {
88            self.total.saturating_sub(self.txs - new_txs_size)
89        }
90    }
91}
92
93#[derive(Clone)]
94pub(crate) struct CurrentTemplate {
95    pub(crate) template: BlockTemplate,
96    pub(crate) size: TemplateSize,
97    pub(crate) snapshot: Arc<Snapshot>,
98    pub(crate) epoch: EpochExt,
99}
100
101/// Block generator
102#[derive(Clone)]
103pub struct BlockAssembler {
104    pub(crate) config: Arc<BlockAssemblerConfig>,
105    pub(crate) work_id: Arc<AtomicU64>,
106    pub(crate) candidate_uncles: Arc<Mutex<CandidateUncles>>,
107    pub(crate) current: Arc<Mutex<CurrentTemplate>>,
108    pub(crate) poster: Arc<Client<HttpConnector, Full<bytes::Bytes>>>,
109}
110
111impl BlockAssembler {
112    /// Construct new block generator
113    pub fn new(
114        config: BlockAssemblerConfig,
115        snapshot: Arc<Snapshot>,
116    ) -> Result<Self, BlockAssemblerError> {
117        let consensus = snapshot.consensus();
118        let tip_header = snapshot.tip_header();
119        let current_epoch = consensus
120            .next_epoch_ext(tip_header, &snapshot.borrow_as_data_loader())
121            .expect("tip header's epoch should be stored")
122            .epoch();
123        let mut builder = BlockTemplateBuilder::new(&snapshot, &current_epoch)?;
124
125        let cellbase = Self::build_cellbase(&config, &snapshot)
126            .expect("build cellbase for BlockAssembler initial");
127
128        let extension =
129            Self::build_extension(&snapshot).expect("build extension for BlockAssembler initial");
130        let basic_block_size =
131            Self::basic_block_size(cellbase.data(), &[], iter::empty(), extension.clone());
132
133        let (dao, _checked_txs, _failed_txs) =
134            Self::calc_dao(&snapshot, &current_epoch, cellbase.clone(), vec![])
135                .expect("calc_dao for BlockAssembler initial");
136
137        let work_id = AtomicU64::new(0);
138
139        builder
140            .transactions(vec![])
141            .proposals(vec![])
142            .cellbase(cellbase)
143            .work_id(work_id.fetch_add(1, Ordering::SeqCst))
144            .current_time(cmp::max(
145                unix_time_as_millis(),
146                tip_header
147                    .timestamp()
148                    .checked_add(1)
149                    .ok_or(BlockAssemblerError::Overflow)?,
150            ))
151            .dao(dao);
152        if let Some(data) = extension {
153            builder.extension(data);
154        }
155        let template = builder.build();
156
157        let size = TemplateSize {
158            txs: 0,
159            proposals: 0,
160            uncles: 0,
161            total: basic_block_size,
162        };
163        let current = CurrentTemplate {
164            template,
165            size,
166            snapshot,
167            epoch: current_epoch,
168        };
169
170        Ok(Self {
171            config: Arc::new(config),
172            work_id: Arc::new(work_id),
173            candidate_uncles: Arc::new(Mutex::new(CandidateUncles::new())),
174            current: Arc::new(Mutex::new(current)),
175            poster: Arc::new(
176                Client::builder(hyper_util::rt::TokioExecutor::new())
177                    .build::<_, Full<bytes::Bytes>>(HttpConnector::new()),
178            ),
179        })
180    }
181
182    pub(crate) async fn update_full(&self, tx_pool: &RwLock<TxPool>) -> Result<(), AnyError> {
183        let mut current = self.current.lock().await;
184        let consensus = current.snapshot.consensus();
185        let max_block_bytes = consensus.max_block_bytes() as usize;
186
187        let current_template = &current.template;
188        let uncles = &current_template.uncles;
189
190        let (proposals, txs, basic_size) = {
191            let tx_pool_reader = tx_pool.read().await;
192            if current.snapshot.tip_hash() != tx_pool_reader.snapshot().tip_hash() {
193                return Ok(());
194            }
195
196            let proposals =
197                tx_pool_reader.package_proposals(consensus.max_block_proposals_limit(), uncles);
198
199            let basic_size = Self::basic_block_size(
200                current_template.cellbase.data(),
201                uncles,
202                proposals.iter(),
203                current_template.extension.clone(),
204            );
205
206            let txs_size_limit = max_block_bytes
207                .checked_sub(basic_size)
208                .ok_or(BlockAssemblerError::Overflow)?;
209
210            let max_block_cycles = consensus.max_block_cycles();
211            let (txs, _txs_size, _cycles) =
212                tx_pool_reader.package_txs(max_block_cycles, txs_size_limit);
213            (proposals, txs, basic_size)
214        };
215
216        let proposals_size = proposals.len() * ProposalShortId::serialized_size();
217        let (dao, checked_txs, failed_txs) = Self::calc_dao(
218            &current.snapshot,
219            &current.epoch,
220            current_template.cellbase.clone(),
221            txs,
222        )?;
223        if !failed_txs.is_empty() {
224            for (id, out_point) in failed_txs {
225                //"The main reason why a proposed transaction here
226                // cannot pass the resolve check is very likely that
227                // its ancestor has not been proposed.
228                // Therefore, we don't handle it actively—instead,
229                // we wait for the ancestor to be re-proposed or
230                // to be removed on timeout.
231                debug!(
232                    "Committing tx {} resolving check failed, out_point {:?}",
233                    id, out_point
234                );
235            }
236        }
237
238        let txs_size = Self::checked_entries_size(&checked_txs)?;
239        let total_size = basic_size
240            .checked_add(txs_size)
241            .ok_or(BlockAssemblerError::Overflow)?;
242
243        let mut builder = BlockTemplateBuilder::from_template(&current.template);
244        builder
245            .set_proposals(Vec::from_iter(proposals))
246            .set_transactions(checked_txs)
247            .work_id(self.work_id.fetch_add(1, Ordering::SeqCst))
248            .current_time(cmp::max(
249                unix_time_as_millis(),
250                current.template.current_time,
251            ))
252            .dao(dao);
253
254        current.template = builder.build();
255        current.size.txs = txs_size;
256        current.size.total = total_size;
257        current.size.proposals = proposals_size;
258
259        trace!(
260            "[BlockAssembler] update_full {} uncles-{} proposals-{} txs-{}",
261            current.template.number,
262            current.template.uncles.len(),
263            current.template.proposals.len(),
264            current.template.transactions.len(),
265        );
266
267        Ok(())
268    }
269
270    pub(crate) async fn update_blank(&self, snapshot: Arc<Snapshot>) -> Result<(), AnyError> {
271        let consensus = snapshot.consensus();
272        let tip_header = snapshot.tip_header();
273        let current_epoch = consensus
274            .next_epoch_ext(tip_header, &snapshot.borrow_as_data_loader())
275            .expect("tip header's epoch should be stored")
276            .epoch();
277        let mut builder = BlockTemplateBuilder::new(&snapshot, &current_epoch)?;
278
279        let cellbase = Self::build_cellbase(&self.config, &snapshot)?;
280        let uncles = self.prepare_uncles(&snapshot, &current_epoch).await;
281        let uncles_size = uncles.len() * UncleBlockView::serialized_size_in_block();
282
283        let extension = Self::build_extension(&snapshot)?;
284        let basic_block_size =
285            Self::basic_block_size(cellbase.data(), &uncles, iter::empty(), extension.clone());
286
287        let (dao, _checked_txs, _failed_txs) =
288            Self::calc_dao(&snapshot, &current_epoch, cellbase.clone(), vec![])?;
289
290        builder
291            .transactions(vec![])
292            .proposals(vec![])
293            .cellbase(cellbase)
294            .uncles(uncles)
295            .work_id(self.work_id.fetch_add(1, Ordering::SeqCst))
296            .current_time(cmp::max(
297                unix_time_as_millis(),
298                tip_header
299                    .timestamp()
300                    .checked_add(1)
301                    .ok_or(BlockAssemblerError::Overflow)?,
302            ))
303            .dao(dao);
304        if let Some(data) = extension {
305            builder.extension(data);
306        }
307        let template = builder.build();
308
309        trace!(
310            "[BlockAssembler] update_blank {} uncles-{} proposals-{} txs-{}",
311            template.number,
312            template.uncles.len(),
313            template.proposals.len(),
314            template.transactions.len(),
315        );
316
317        let size = TemplateSize {
318            txs: 0,
319            proposals: 0,
320            uncles: uncles_size,
321            total: basic_block_size,
322        };
323
324        let new_blank = CurrentTemplate {
325            template,
326            size,
327            snapshot,
328            epoch: current_epoch,
329        };
330
331        *self.current.lock().await = new_blank;
332        Ok(())
333    }
334
335    pub(crate) async fn update_uncles(&self) {
336        let mut current = self.current.lock().await;
337        let consensus = current.snapshot.consensus();
338        let max_block_bytes = consensus.max_block_bytes() as usize;
339        let max_uncles_num = consensus.max_uncles_num();
340        let current_uncles_num = current.template.uncles.len();
341        if current_uncles_num < max_uncles_num {
342            let remain_size = max_block_bytes.saturating_sub(current.size.total);
343
344            if remain_size > UncleBlockView::serialized_size_in_block() {
345                let uncles = self.prepare_uncles(&current.snapshot, &current.epoch).await;
346
347                let new_uncle_size = uncles.len() * UncleBlockView::serialized_size_in_block();
348                let new_total_size = current.size.calc_total_by_uncles(new_uncle_size);
349
350                if new_total_size < max_block_bytes {
351                    let mut builder = BlockTemplateBuilder::from_template(&current.template);
352                    builder
353                        .set_uncles(uncles)
354                        .work_id(self.work_id.fetch_add(1, Ordering::SeqCst))
355                        .current_time(cmp::max(
356                            unix_time_as_millis(),
357                            current.template.current_time,
358                        ));
359                    current.template = builder.build();
360                    current.size.uncles = new_uncle_size;
361                    current.size.total = new_total_size;
362
363                    trace!(
364                        "[BlockAssembler] update_uncles-{} epoch-{} uncles-{} proposals-{} txs-{}",
365                        current.template.number,
366                        current.template.epoch.number(),
367                        current.template.uncles.len(),
368                        current.template.proposals.len(),
369                        current.template.transactions.len(),
370                    );
371                }
372            }
373        }
374    }
375
376    pub(crate) async fn update_proposals(&self, tx_pool: &RwLock<TxPool>) {
377        let mut current = self.current.lock().await;
378        let consensus = current.snapshot.consensus();
379        let uncles = &current.template.uncles;
380        let proposals = {
381            let tx_pool_reader = tx_pool.read().await;
382            if current.snapshot.tip_hash() != tx_pool_reader.snapshot().tip_hash() {
383                return;
384            }
385            tx_pool_reader.package_proposals(consensus.max_block_proposals_limit(), uncles)
386        };
387
388        let new_proposals_size = proposals.len() * ProposalShortId::serialized_size();
389        let new_total_size = current.size.calc_total_by_proposals(new_proposals_size);
390        let max_block_bytes = consensus.max_block_bytes() as usize;
391        if new_total_size < max_block_bytes {
392            let mut builder = BlockTemplateBuilder::from_template(&current.template);
393            builder
394                .set_proposals(Vec::from_iter(proposals))
395                .work_id(self.work_id.fetch_add(1, Ordering::SeqCst))
396                .current_time(cmp::max(
397                    unix_time_as_millis(),
398                    current.template.current_time,
399                ));
400            current.template = builder.build();
401            current.size.proposals = new_proposals_size;
402            current.size.total = new_total_size;
403
404            trace!(
405                "[BlockAssembler] update_proposals-{} epoch-{} uncles-{} proposals-{} txs-{}",
406                current.template.number,
407                current.template.epoch.number(),
408                current.template.uncles.len(),
409                current.template.proposals.len(),
410                current.template.transactions.len(),
411            );
412        }
413    }
414
415    pub(crate) async fn update_transactions(
416        &self,
417        tx_pool: &RwLock<TxPool>,
418    ) -> Result<(), AnyError> {
419        let mut current = self.current.lock().await;
420        let consensus = current.snapshot.consensus();
421        let current_template = &current.template;
422        let max_block_bytes = consensus.max_block_bytes() as usize;
423        let extension = Self::build_extension(&current.snapshot)?;
424        let txs = {
425            let tx_pool_reader = tx_pool.read().await;
426            if current.snapshot.tip_hash() != tx_pool_reader.snapshot().tip_hash() {
427                return Ok(());
428            }
429
430            let basic_block_size = Self::basic_block_size(
431                current_template.cellbase.data(),
432                &current_template.uncles,
433                current_template.proposals.iter(),
434                extension.clone(),
435            );
436
437            let txs_size_limit = max_block_bytes.checked_sub(basic_block_size);
438
439            if txs_size_limit.is_none() {
440                return Ok(());
441            }
442
443            let max_block_cycles = consensus.max_block_cycles();
444            let (txs, _txs_size, _cycles) = tx_pool_reader
445                .package_txs(max_block_cycles, txs_size_limit.expect("overflow checked"));
446            txs
447        };
448
449        if let Ok((dao, checked_txs, _failed_txs)) = Self::calc_dao(
450            &current.snapshot,
451            &current.epoch,
452            current_template.cellbase.clone(),
453            txs,
454        ) {
455            let new_txs_size = Self::checked_entries_size(&checked_txs)?;
456            let new_total_size = current.size.calc_total_by_txs(new_txs_size);
457            let mut builder = BlockTemplateBuilder::from_template(&current.template);
458            builder
459                .set_transactions(checked_txs)
460                .work_id(self.work_id.fetch_add(1, Ordering::SeqCst))
461                .current_time(cmp::max(
462                    unix_time_as_millis(),
463                    current.template.current_time,
464                ))
465                .dao(dao);
466            if let Some(data) = extension {
467                builder.extension(data);
468            }
469            current.template = builder.build();
470            current.size.txs = new_txs_size;
471            current.size.total = new_total_size;
472
473            trace!(
474                "[BlockAssembler] update_transactions-{} epoch-{} uncles-{} proposals-{} txs-{}",
475                current.template.number,
476                current.template.epoch.number(),
477                current.template.uncles.len(),
478                current.template.proposals.len(),
479                current.template.transactions.len(),
480            );
481        }
482        Ok(())
483    }
484
485    pub(crate) async fn get_current(&self) -> JsonBlockTemplate {
486        let current = self.current.lock().await;
487        (&current.template).into()
488    }
489
490    pub(crate) fn build_cellbase_witness(
491        config: &BlockAssemblerConfig,
492        snapshot: &Snapshot,
493    ) -> CellbaseWitness {
494        let hash_type: ScriptHashType = config.hash_type.into();
495        let cellbase_lock = Script::new_builder()
496            .args(config.args.as_bytes())
497            .code_hash(&config.code_hash)
498            .hash_type(hash_type)
499            .build();
500        let tip = snapshot.tip_header();
501
502        let mut message = vec![];
503        if let Some(version) = snapshot.compute_versionbits(tip) {
504            message.extend_from_slice(&version.to_le_bytes());
505            message.extend_from_slice(b" ");
506        }
507        if config.use_binary_version_as_message_prefix {
508            message.extend_from_slice(config.binary_version.as_bytes());
509        }
510        if !config.message.is_empty() {
511            message.extend_from_slice(b" ");
512            message.extend_from_slice(config.message.as_bytes());
513        }
514
515        CellbaseWitness::new_builder()
516            .lock(cellbase_lock)
517            .message(message)
518            .build()
519    }
520
521    /// Miner mined block H(c), the block reward will be finalized at H(c + w_far + 1).
522    /// Miner specify own lock in cellbase witness.
523    /// The cellbase have only one output,
524    /// miner should collect the block reward for finalize target H(max(0, c - w_far - 1))
525    pub(crate) fn build_cellbase(
526        config: &BlockAssemblerConfig,
527        snapshot: &Snapshot,
528    ) -> Result<TransactionView, AnyError> {
529        let tip = snapshot.tip_header();
530        let candidate_number = tip
531            .number()
532            .checked_add(1)
533            .ok_or(BlockAssemblerError::Overflow)?;
534        let cellbase_witness = Self::build_cellbase_witness(config, snapshot);
535
536        let tx = {
537            let (target_lock, block_reward) = block_in_place(|| {
538                RewardCalculator::new(snapshot.consensus(), snapshot).block_reward_to_finalize(tip)
539            })?;
540            let input = CellInput::new_cellbase_input(candidate_number);
541            let output = CellOutput::new_builder()
542                .capacity(block_reward.total)
543                .lock(target_lock)
544                .build();
545
546            let witness = cellbase_witness.as_bytes();
547            let no_finalization_target =
548                candidate_number <= snapshot.consensus().finalization_delay_length();
549            let tx_builder = TransactionBuilder::default().input(input).witness(witness);
550            let insufficient_reward_to_create_cell = output.is_lack_of_capacity(Capacity::zero())?;
551            if no_finalization_target || insufficient_reward_to_create_cell {
552                tx_builder.build()
553            } else {
554                tx_builder
555                    .output(output)
556                    .output_data(Bytes::default())
557                    .build()
558            }
559        };
560
561        Ok(tx)
562    }
563
564    pub(crate) fn build_extension(snapshot: &Snapshot) -> Result<Option<packed::Bytes>, AnyError> {
565        let tip_header = snapshot.tip_header();
566        // The use of the epoch number of the tip here leads to an off-by-one bug,
567        // so be careful, it needs to be preserved for consistency reasons and not fixed directly.
568        let mmr_activate = snapshot
569            .consensus()
570            .rfc0044_active(tip_header.epoch().number());
571        if mmr_activate {
572            let chain_root = snapshot
573                .chain_root_mmr(tip_header.number())
574                .get_root()
575                .map_err(|e| InternalErrorKind::MMR.other(e))?;
576            let bytes = chain_root.calc_mmr_hash().as_bytes().into();
577            Ok(Some(bytes))
578        } else {
579            Ok(None)
580        }
581    }
582
583    pub(crate) async fn prepare_uncles(
584        &self,
585        snapshot: &Snapshot,
586        current_epoch: &EpochExt,
587    ) -> Vec<UncleBlockView> {
588        let mut guard = self.candidate_uncles.lock().await;
589        guard.prepare_uncles(snapshot, current_epoch)
590    }
591
592    pub(crate) fn basic_block_size<'a>(
593        cellbase: Transaction,
594        uncles: &[UncleBlockView],
595        proposals: impl Iterator<Item = &'a ProposalShortId>,
596        extension_opt: Option<packed::Bytes>,
597    ) -> usize {
598        let empty_dao = packed::Byte32::default();
599        let raw_header = packed::RawHeader::new_builder().dao(empty_dao).build();
600        let header = packed::Header::new_builder().raw(raw_header).build();
601        let block = if let Some(extension) = extension_opt {
602            packed::BlockV1::new_builder()
603                .header(header)
604                .transactions(vec![cellbase])
605                .uncles(uncles.iter().map(|u| u.data()).collect::<Vec<_>>())
606                .proposals(proposals.cloned().collect::<Vec<_>>())
607                .extension(extension)
608                .build()
609                .as_v0()
610        } else {
611            packed::Block::new_builder()
612                .header(header)
613                .transactions(vec![cellbase])
614                .uncles(uncles.iter().map(|u| u.data()).collect::<Vec<_>>())
615                .proposals(proposals.cloned().collect::<Vec<_>>())
616                .build()
617        };
618        block.serialized_size_without_uncle_proposals()
619    }
620
621    fn checked_entries_size(entries: &[TxEntry]) -> Result<usize, BlockAssemblerError> {
622        entries.iter().try_fold(0usize, |sum, tx| {
623            sum.checked_add(tx.size)
624                .ok_or(BlockAssemblerError::Overflow)
625        })
626    }
627
628    fn calc_dao(
629        snapshot: &Snapshot,
630        current_epoch: &EpochExt,
631        cellbase: TransactionView,
632        entries: Vec<TxEntry>,
633    ) -> CalcDaoResult {
634        let tip_header = snapshot.tip_header();
635        let consensus = snapshot.consensus();
636        let mut seen_inputs = HashSet::new();
637        let mut transactions_checker = TransactionsChecker::new(iter::once(&cellbase));
638
639        let mut checked_failed_txs = vec![];
640        let checked_entries: Vec<_> = block_in_place(|| {
641            entries
642                .into_iter()
643                .filter_map(|entry| {
644                    let overlay_cell_checker =
645                        OverlayCellChecker::new(&transactions_checker, snapshot);
646                    if let Err(err) =
647                        entry
648                            .rtx
649                            .check(&mut seen_inputs, &overlay_cell_checker, snapshot)
650                    {
651                        error!(
652                            "Resolving transactions while building block template, \
653                             tip_number: {}, tip_hash: {}, tx_hash: {}, error: {:?}",
654                            tip_header.number(),
655                            tip_header.hash(),
656                            entry.transaction().hash(),
657                            err
658                        );
659                        // Returning the out_point makes debugging easier and provides better logs.
660                        checked_failed_txs
661                            .push((entry.proposal_short_id(), err.out_point().cloned()));
662                        None
663                    } else {
664                        transactions_checker.insert(entry.transaction());
665                        Some(entry)
666                    }
667                })
668                .collect()
669        });
670
671        let dummy_cellbase_entry = TxEntry::dummy_resolve(cellbase, 0, Capacity::zero(), 0);
672        let entries_iter = iter::once(&dummy_cellbase_entry)
673            .chain(checked_entries.iter())
674            .map(|entry| entry.rtx.as_ref());
675
676        // Generate DAO fields here
677        let dao = DaoCalculator::new(consensus, &snapshot.borrow_as_data_loader())
678            .dao_field_with_current_epoch(entries_iter, tip_header, current_epoch)?;
679
680        Ok((dao, checked_entries, checked_failed_txs))
681    }
682
683    pub(crate) async fn notify(&self) {
684        if !self.need_to_notify() {
685            return;
686        }
687        let template = self.get_current().await;
688        if let Ok(template_json) = serde_json::to_string(&template) {
689            let notify_timeout = Duration::from_millis(self.config.notify_timeout_millis);
690            for url in &self.config.notify {
691                if let Ok(req) = Request::builder()
692                    .method(Method::POST)
693                    .uri(url.as_ref())
694                    .header("content-type", "application/json")
695                    .body(Full::new(template_json.to_owned().into()))
696                {
697                    let client = Arc::clone(&self.poster);
698                    let url = url.to_owned();
699                    tokio::spawn(async move {
700                        let _resp =
701                            timeout(notify_timeout, client.request(req))
702                                .await
703                                .map_err(|_| {
704                                    ckb_logger::warn!(
705                                        "block assembler notifying {} timed out",
706                                        url
707                                    );
708                                });
709                    });
710                }
711            }
712
713            for script in &self.config.notify_scripts {
714                let script = script.to_owned();
715                let template_json = template_json.to_owned();
716                tokio::spawn(async move {
717                    // Errors
718                    // This future will return an error if the child process cannot be spawned
719                    // or if there is an error while awaiting its status.
720
721                    // On Unix platforms this method will fail with std::io::ErrorKind::WouldBlock
722                    // if the system process limit is reached
723                    // (which includes other applications running on the system).
724                    match timeout(
725                        notify_timeout,
726                        Command::new(&script).arg(template_json).status(),
727                    )
728                    .await
729                    {
730                        Ok(ret) => match ret {
731                            Ok(status) => debug!("the command exited with: {}", status),
732                            Err(e) => error!("the script {} failed to spawn {}", script, e),
733                        },
734                        Err(_) => {
735                            ckb_logger::warn!("block assembler notifying {} timed out", script)
736                        }
737                    }
738                });
739            }
740        }
741    }
742
743    fn need_to_notify(&self) -> bool {
744        !self.config.notify.is_empty() || !self.config.notify_scripts.is_empty()
745    }
746}
747
748#[derive(Clone)]
749pub(crate) struct BlockTemplate {
750    pub(crate) version: Version,
751    pub(crate) compact_target: u32,
752    pub(crate) number: BlockNumber,
753    pub(crate) epoch: EpochNumberWithFraction,
754    pub(crate) parent_hash: Byte32,
755    pub(crate) cycles_limit: Cycle,
756    pub(crate) bytes_limit: u64,
757    pub(crate) uncles_count_limit: u8,
758
759    // option
760    pub(crate) uncles: Vec<UncleBlockView>,
761    pub(crate) transactions: Vec<TxEntry>,
762    pub(crate) proposals: Vec<ProposalShortId>,
763    pub(crate) cellbase: TransactionView,
764    pub(crate) work_id: u64,
765    pub(crate) dao: Byte32,
766    pub(crate) current_time: u64,
767    pub(crate) extension: Option<Bytes>,
768}
769
770impl<'a> From<&'a BlockTemplate> for JsonBlockTemplate {
771    fn from(template: &'a BlockTemplate) -> JsonBlockTemplate {
772        JsonBlockTemplate {
773            version: template.version.into(),
774            compact_target: template.compact_target.into(),
775            number: template.number.into(),
776            epoch: template.epoch.into(),
777            parent_hash: (&template.parent_hash).into(),
778            cycles_limit: template.cycles_limit.into(),
779            bytes_limit: template.bytes_limit.into(),
780            uncles_count_limit: u64::from(template.uncles_count_limit).into(),
781            uncles: template.uncles.iter().map(uncle_to_template).collect(),
782            transactions: template
783                .transactions
784                .iter()
785                .map(tx_entry_to_template)
786                .collect(),
787            proposals: template.proposals.iter().map(Into::into).collect(),
788            cellbase: cellbase_to_template(&template.cellbase),
789            work_id: template.work_id.into(),
790            dao: template.dao.clone().into(),
791            current_time: template.current_time.into(),
792            extension: template.extension.as_ref().map(Into::into),
793        }
794    }
795}
796
797#[derive(Clone)]
798pub(crate) struct BlockTemplateBuilder {
799    pub(crate) version: Version,
800    pub(crate) compact_target: u32,
801    pub(crate) number: BlockNumber,
802    pub(crate) epoch: EpochNumberWithFraction,
803    pub(crate) parent_hash: Byte32,
804    pub(crate) cycles_limit: Cycle,
805    pub(crate) bytes_limit: u64,
806    pub(crate) uncles_count_limit: u8,
807
808    // option
809    pub(crate) uncles: Vec<UncleBlockView>,
810    pub(crate) transactions: Vec<TxEntry>,
811    pub(crate) proposals: Vec<ProposalShortId>,
812    pub(crate) cellbase: Option<TransactionView>,
813    pub(crate) work_id: Option<u64>,
814    pub(crate) dao: Option<Byte32>,
815    pub(crate) current_time: Option<u64>,
816    pub(crate) extension: Option<Bytes>,
817}
818
819impl BlockTemplateBuilder {
820    pub(crate) fn new(
821        snapshot: &Snapshot,
822        current_epoch: &EpochExt,
823    ) -> Result<Self, BlockAssemblerError> {
824        let consensus = snapshot.consensus();
825        let tip_header = snapshot.tip_header();
826        let tip_hash = tip_header.hash();
827        let candidate_number = tip_header
828            .number()
829            .checked_add(1)
830            .ok_or(BlockAssemblerError::Overflow)?;
831
832        let version = consensus.block_version();
833        let max_block_bytes = consensus.max_block_bytes();
834        let cycles_limit = consensus.max_block_cycles();
835        let uncles_count_limit = consensus.max_uncles_num() as u8;
836
837        Ok(Self {
838            version,
839            compact_target: current_epoch.compact_target(),
840
841            number: candidate_number,
842            epoch: current_epoch.number_with_fraction(candidate_number),
843            parent_hash: tip_hash,
844            cycles_limit,
845            bytes_limit: max_block_bytes,
846            uncles_count_limit,
847            // option
848            uncles: vec![],
849            transactions: vec![],
850            proposals: vec![],
851            cellbase: None,
852            work_id: None,
853            dao: None,
854            current_time: None,
855            extension: None,
856        })
857    }
858
859    pub(crate) fn from_template(template: &BlockTemplate) -> Self {
860        Self {
861            version: template.version,
862            compact_target: template.compact_target,
863            number: template.number,
864            epoch: template.epoch,
865            parent_hash: template.parent_hash.clone(),
866            cycles_limit: template.cycles_limit,
867            bytes_limit: template.bytes_limit,
868            uncles_count_limit: template.uncles_count_limit,
869            extension: template.extension.clone(),
870            // option
871            uncles: template.uncles.clone(),
872            transactions: template.transactions.clone(),
873            proposals: template.proposals.clone(),
874            cellbase: Some(template.cellbase.clone()),
875            work_id: None,
876            dao: Some(template.dao.clone()),
877            current_time: None,
878        }
879    }
880
881    pub(crate) fn uncles(&mut self, uncles: impl IntoIterator<Item = UncleBlockView>) -> &mut Self {
882        self.uncles.extend(uncles);
883        self
884    }
885
886    pub(crate) fn set_uncles(&mut self, uncles: Vec<UncleBlockView>) -> &mut Self {
887        self.uncles = uncles;
888        self
889    }
890
891    pub(crate) fn transactions(
892        &mut self,
893        transactions: impl IntoIterator<Item = TxEntry>,
894    ) -> &mut Self {
895        self.transactions.extend(transactions);
896        self
897    }
898
899    pub(crate) fn set_transactions(&mut self, transactions: Vec<TxEntry>) -> &mut Self {
900        self.transactions = transactions;
901        self
902    }
903
904    pub(crate) fn proposals(
905        &mut self,
906        proposals: impl IntoIterator<Item = ProposalShortId>,
907    ) -> &mut Self {
908        self.proposals.extend(proposals);
909        self
910    }
911
912    pub(crate) fn set_proposals(&mut self, proposals: Vec<ProposalShortId>) -> &mut Self {
913        self.proposals = proposals;
914        self
915    }
916
917    pub(crate) fn cellbase(&mut self, cellbase: TransactionView) -> &mut Self {
918        self.cellbase = Some(cellbase);
919        self
920    }
921
922    pub(crate) fn work_id(&mut self, work_id: u64) -> &mut Self {
923        self.work_id = Some(work_id);
924        self
925    }
926
927    pub(crate) fn dao(&mut self, dao: Byte32) -> &mut Self {
928        self.dao = Some(dao);
929        self
930    }
931
932    pub(crate) fn current_time(&mut self, current_time: u64) -> &mut Self {
933        self.current_time = Some(current_time);
934        self
935    }
936
937    #[allow(dead_code)]
938    pub(crate) fn extension(&mut self, extension: Bytes) -> &mut Self {
939        self.extension = Some(extension);
940        self
941    }
942
943    pub(crate) fn build(self) -> BlockTemplate {
944        assert!(self.cellbase.is_some(), "cellbase must be set");
945        assert!(self.work_id.is_some(), "work_id must be set");
946        assert!(self.current_time.is_some(), "current_time must be set");
947        assert!(self.dao.is_some(), "dao must be set");
948
949        BlockTemplate {
950            version: self.version,
951            compact_target: self.compact_target,
952
953            number: self.number,
954            epoch: self.epoch,
955            parent_hash: self.parent_hash,
956            cycles_limit: self.cycles_limit,
957            bytes_limit: self.bytes_limit,
958            uncles_count_limit: self.uncles_count_limit,
959            uncles: self.uncles,
960            transactions: self.transactions,
961            proposals: self.proposals,
962            cellbase: self.cellbase.expect("cellbase assert checked"),
963            work_id: self.work_id.expect("work_id assert checked"),
964            dao: self.dao.expect("dao assert checked"),
965            current_time: self.current_time.expect("current_time assert checked"),
966            extension: self.extension,
967        }
968    }
969}
970
971pub(crate) fn uncle_to_template(uncle: &UncleBlockView) -> UncleTemplate {
972    UncleTemplate {
973        hash: uncle.hash().into(),
974        required: false,
975        proposals: uncle
976            .data()
977            .proposals()
978            .into_iter()
979            .map(Into::into)
980            .collect(),
981        header: uncle.data().header().into(),
982    }
983}
984
985pub(crate) fn tx_entry_to_template(entry: &TxEntry) -> TransactionTemplate {
986    TransactionTemplate {
987        hash: entry.transaction().hash().into(),
988        required: false, // unimplemented
989        cycles: Some(entry.cycles.into()),
990        depends: None, // unimplemented
991        data: entry.transaction().data().into(),
992    }
993}
994
995pub(crate) fn cellbase_to_template(tx: &TransactionView) -> CellbaseTemplate {
996    CellbaseTemplate {
997        hash: tx.hash().into(),
998        cycles: None,
999        data: tx.data().into(),
1000    }
1001}