Skip to main content

ckb_tx_pool/block_assembler/
mod.rs

1//! Generate a new block
2
3mod candidate_uncles;
4mod process;
5
6#[cfg(test)]
7mod tests;
8
9use crate::component::entry::TxEntry;
10use crate::error::BlockAssemblerError;
11pub use candidate_uncles::CandidateUncles;
12use ckb_app_config::BlockAssemblerConfig;
13use ckb_dao::DaoCalculator;
14use ckb_error::{AnyError, InternalErrorKind};
15use ckb_jsonrpc_types::{
16    BlockTemplate as JsonBlockTemplate, CellbaseTemplate, TransactionTemplate, UncleTemplate,
17};
18use ckb_logger::{debug, error, trace};
19use ckb_reward_calculator::RewardCalculator;
20use ckb_snapshot::Snapshot;
21use ckb_store::ChainStore;
22use ckb_systemtime::unix_time_as_millis;
23use ckb_types::{
24    bytes,
25    core::{
26        BlockNumber, Capacity, Cycle, EpochExt, EpochNumberWithFraction, ScriptHashType,
27        TransactionBuilder, TransactionView, UncleBlockView, Version,
28        cell::{OverlayCellChecker, TransactionsChecker},
29    },
30    packed::{
31        self, Byte32, Bytes, CellInput, CellOutput, CellbaseWitness, OutPoint, ProposalShortId,
32        Script, Transaction,
33    },
34    prelude::*,
35};
36use http_body_util::Full;
37use hyper::{Method, Request};
38use hyper_util::client::legacy::{Client, connect::HttpConnector};
39use std::collections::HashSet;
40use std::sync::{
41    Arc,
42    atomic::{AtomicU64, Ordering},
43};
44use std::time::Duration;
45use std::{cmp, iter};
46use tokio::process::Command;
47use tokio::sync::{Mutex, RwLock};
48use tokio::task::block_in_place;
49use tokio::time::timeout;
50
51use crate::TxPool;
52pub(crate) use process::process;
53
54type FailedTxs = (ProposalShortId, Option<OutPoint>);
55type CalcDaoResult = Result<(Byte32, Vec<TxEntry>, Vec<FailedTxs>), AnyError>;
56
57#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)]
58pub(crate) struct TemplateSize {
59    pub(crate) txs: usize,
60    pub(crate) proposals: usize,
61    pub(crate) uncles: usize,
62    pub(crate) total: usize,
63}
64
65impl TemplateSize {
66    pub(crate) fn calc_total_by_proposals(&self, new_proposals_size: usize) -> usize {
67        if new_proposals_size > self.proposals {
68            self.total
69                .saturating_add(new_proposals_size - self.proposals)
70        } else {
71            self.total
72                .saturating_sub(self.proposals - new_proposals_size)
73        }
74    }
75
76    pub(crate) fn calc_total_by_uncles(&self, new_uncles_size: usize) -> usize {
77        if new_uncles_size > self.uncles {
78            self.total.saturating_add(new_uncles_size - self.uncles)
79        } else {
80            self.total.saturating_sub(self.uncles - new_uncles_size)
81        }
82    }
83
84    pub(crate) fn calc_total_by_txs(&self, new_txs_size: usize) -> usize {
85        if new_txs_size > self.txs {
86            self.total.saturating_add(new_txs_size - self.txs)
87        } else {
88            self.total.saturating_sub(self.txs - new_txs_size)
89        }
90    }
91}
92
93#[derive(Clone)]
94pub(crate) struct CurrentTemplate {
95    pub(crate) template: BlockTemplate,
96    pub(crate) size: TemplateSize,
97    pub(crate) snapshot: Arc<Snapshot>,
98    pub(crate) epoch: EpochExt,
99}
100
101/// Block generator
102#[derive(Clone)]
103pub struct BlockAssembler {
104    pub(crate) config: Arc<BlockAssemblerConfig>,
105    pub(crate) work_id: Arc<AtomicU64>,
106    pub(crate) candidate_uncles: Arc<Mutex<CandidateUncles>>,
107    pub(crate) current: Arc<Mutex<CurrentTemplate>>,
108    pub(crate) poster: Arc<Client<HttpConnector, Full<bytes::Bytes>>>,
109}
110
111impl BlockAssembler {
112    /// Construct new block generator
113    pub fn new(config: BlockAssemblerConfig, snapshot: Arc<Snapshot>) -> Self {
114        let consensus = snapshot.consensus();
115        let tip_header = snapshot.tip_header();
116        let current_epoch = consensus
117            .next_epoch_ext(tip_header, &snapshot.borrow_as_data_loader())
118            .expect("tip header's epoch should be stored")
119            .epoch();
120        let mut builder = BlockTemplateBuilder::new(&snapshot, &current_epoch);
121
122        let cellbase = Self::build_cellbase(&config, &snapshot)
123            .expect("build cellbase for BlockAssembler initial");
124
125        let extension =
126            Self::build_extension(&snapshot).expect("build extension for BlockAssembler initial");
127        let basic_block_size =
128            Self::basic_block_size(cellbase.data(), &[], iter::empty(), extension.clone());
129
130        let (dao, _checked_txs, _failed_txs) =
131            Self::calc_dao(&snapshot, &current_epoch, cellbase.clone(), vec![])
132                .expect("calc_dao for BlockAssembler initial");
133
134        let work_id = AtomicU64::new(0);
135
136        builder
137            .transactions(vec![])
138            .proposals(vec![])
139            .cellbase(cellbase)
140            .work_id(work_id.fetch_add(1, Ordering::SeqCst))
141            .current_time(cmp::max(unix_time_as_millis(), tip_header.timestamp() + 1))
142            .dao(dao);
143        if let Some(data) = extension {
144            builder.extension(data);
145        }
146        let template = builder.build();
147
148        let size = TemplateSize {
149            txs: 0,
150            proposals: 0,
151            uncles: 0,
152            total: basic_block_size,
153        };
154        let current = CurrentTemplate {
155            template,
156            size,
157            snapshot,
158            epoch: current_epoch,
159        };
160
161        Self {
162            config: Arc::new(config),
163            work_id: Arc::new(work_id),
164            candidate_uncles: Arc::new(Mutex::new(CandidateUncles::new())),
165            current: Arc::new(Mutex::new(current)),
166            poster: Arc::new(
167                Client::builder(hyper_util::rt::TokioExecutor::new())
168                    .build::<_, Full<bytes::Bytes>>(HttpConnector::new()),
169            ),
170        }
171    }
172
173    pub(crate) async fn update_full(&self, tx_pool: &RwLock<TxPool>) -> Result<(), AnyError> {
174        let mut current = self.current.lock().await;
175        let consensus = current.snapshot.consensus();
176        let max_block_bytes = consensus.max_block_bytes() as usize;
177
178        let current_template = &current.template;
179        let uncles = &current_template.uncles;
180
181        let (proposals, txs, basic_size) = {
182            let tx_pool_reader = tx_pool.read().await;
183            if current.snapshot.tip_hash() != tx_pool_reader.snapshot().tip_hash() {
184                return Ok(());
185            }
186
187            let proposals =
188                tx_pool_reader.package_proposals(consensus.max_block_proposals_limit(), uncles);
189
190            let basic_size = Self::basic_block_size(
191                current_template.cellbase.data(),
192                uncles,
193                proposals.iter(),
194                current_template.extension.clone(),
195            );
196
197            let txs_size_limit = max_block_bytes
198                .checked_sub(basic_size)
199                .ok_or(BlockAssemblerError::Overflow)?;
200
201            let max_block_cycles = consensus.max_block_cycles();
202            let (txs, _txs_size, _cycles) =
203                tx_pool_reader.package_txs(max_block_cycles, txs_size_limit);
204            (proposals, txs, basic_size)
205        };
206
207        let proposals_size = proposals.len() * ProposalShortId::serialized_size();
208        let (dao, checked_txs, failed_txs) = Self::calc_dao(
209            &current.snapshot,
210            &current.epoch,
211            current_template.cellbase.clone(),
212            txs,
213        )?;
214        if !failed_txs.is_empty() {
215            for (id, out_point) in failed_txs {
216                //"The main reason why a proposed transaction here
217                // cannot pass the resolve check is very likely that
218                // its ancestor has not been proposed.
219                // Therefore, we don't handle it actively—instead,
220                // we wait for the ancestor to be re-proposed or
221                // to be removed on timeout.
222                debug!(
223                    "Committing tx {} resolving check failed, out_point {:?}",
224                    id, out_point
225                );
226            }
227        }
228
229        let txs_size = checked_txs.iter().map(|tx| tx.size).sum();
230        let total_size = basic_size + txs_size;
231
232        let mut builder = BlockTemplateBuilder::from_template(&current.template);
233        builder
234            .set_proposals(Vec::from_iter(proposals))
235            .set_transactions(checked_txs)
236            .work_id(self.work_id.fetch_add(1, Ordering::SeqCst))
237            .current_time(cmp::max(
238                unix_time_as_millis(),
239                current.template.current_time,
240            ))
241            .dao(dao);
242
243        current.template = builder.build();
244        current.size.txs = txs_size;
245        current.size.total = total_size;
246        current.size.proposals = proposals_size;
247
248        trace!(
249            "[BlockAssembler] update_full {} uncles-{} proposals-{} txs-{}",
250            current.template.number,
251            current.template.uncles.len(),
252            current.template.proposals.len(),
253            current.template.transactions.len(),
254        );
255
256        Ok(())
257    }
258
259    pub(crate) async fn update_blank(&self, snapshot: Arc<Snapshot>) -> Result<(), AnyError> {
260        let consensus = snapshot.consensus();
261        let tip_header = snapshot.tip_header();
262        let current_epoch = consensus
263            .next_epoch_ext(tip_header, &snapshot.borrow_as_data_loader())
264            .expect("tip header's epoch should be stored")
265            .epoch();
266        let mut builder = BlockTemplateBuilder::new(&snapshot, &current_epoch);
267
268        let cellbase = Self::build_cellbase(&self.config, &snapshot)?;
269        let uncles = self.prepare_uncles(&snapshot, &current_epoch).await;
270        let uncles_size = uncles.len() * UncleBlockView::serialized_size_in_block();
271
272        let extension = Self::build_extension(&snapshot)?;
273        let basic_block_size =
274            Self::basic_block_size(cellbase.data(), &uncles, iter::empty(), extension.clone());
275
276        let (dao, _checked_txs, _failed_txs) =
277            Self::calc_dao(&snapshot, &current_epoch, cellbase.clone(), vec![])?;
278
279        builder
280            .transactions(vec![])
281            .proposals(vec![])
282            .cellbase(cellbase)
283            .uncles(uncles)
284            .work_id(self.work_id.fetch_add(1, Ordering::SeqCst))
285            .current_time(cmp::max(unix_time_as_millis(), tip_header.timestamp() + 1))
286            .dao(dao);
287        if let Some(data) = extension {
288            builder.extension(data);
289        }
290        let template = builder.build();
291
292        trace!(
293            "[BlockAssembler] update_blank {} uncles-{} proposals-{} txs-{}",
294            template.number,
295            template.uncles.len(),
296            template.proposals.len(),
297            template.transactions.len(),
298        );
299
300        let size = TemplateSize {
301            txs: 0,
302            proposals: 0,
303            uncles: uncles_size,
304            total: basic_block_size,
305        };
306
307        let new_blank = CurrentTemplate {
308            template,
309            size,
310            snapshot,
311            epoch: current_epoch,
312        };
313
314        *self.current.lock().await = new_blank;
315        Ok(())
316    }
317
318    pub(crate) async fn update_uncles(&self) {
319        let mut current = self.current.lock().await;
320        let consensus = current.snapshot.consensus();
321        let max_block_bytes = consensus.max_block_bytes() as usize;
322        let max_uncles_num = consensus.max_uncles_num();
323        let current_uncles_num = current.template.uncles.len();
324        if current_uncles_num < max_uncles_num {
325            let remain_size = max_block_bytes.saturating_sub(current.size.total);
326
327            if remain_size > UncleBlockView::serialized_size_in_block() {
328                let uncles = self.prepare_uncles(&current.snapshot, &current.epoch).await;
329
330                let new_uncle_size = uncles.len() * UncleBlockView::serialized_size_in_block();
331                let new_total_size = current.size.calc_total_by_uncles(new_uncle_size);
332
333                if new_total_size < max_block_bytes {
334                    let mut builder = BlockTemplateBuilder::from_template(&current.template);
335                    builder
336                        .set_uncles(uncles)
337                        .work_id(self.work_id.fetch_add(1, Ordering::SeqCst))
338                        .current_time(cmp::max(
339                            unix_time_as_millis(),
340                            current.template.current_time,
341                        ));
342                    current.template = builder.build();
343                    current.size.uncles = new_uncle_size;
344                    current.size.total = new_total_size;
345
346                    trace!(
347                        "[BlockAssembler] update_uncles-{} epoch-{} uncles-{} proposals-{} txs-{}",
348                        current.template.number,
349                        current.template.epoch.number(),
350                        current.template.uncles.len(),
351                        current.template.proposals.len(),
352                        current.template.transactions.len(),
353                    );
354                }
355            }
356        }
357    }
358
359    pub(crate) async fn update_proposals(&self, tx_pool: &RwLock<TxPool>) {
360        let mut current = self.current.lock().await;
361        let consensus = current.snapshot.consensus();
362        let uncles = &current.template.uncles;
363        let proposals = {
364            let tx_pool_reader = tx_pool.read().await;
365            if current.snapshot.tip_hash() != tx_pool_reader.snapshot().tip_hash() {
366                return;
367            }
368            tx_pool_reader.package_proposals(consensus.max_block_proposals_limit(), uncles)
369        };
370
371        let new_proposals_size = proposals.len() * ProposalShortId::serialized_size();
372        let new_total_size = current.size.calc_total_by_proposals(new_proposals_size);
373        let max_block_bytes = consensus.max_block_bytes() as usize;
374        if new_total_size < max_block_bytes {
375            let mut builder = BlockTemplateBuilder::from_template(&current.template);
376            builder
377                .set_proposals(Vec::from_iter(proposals))
378                .work_id(self.work_id.fetch_add(1, Ordering::SeqCst))
379                .current_time(cmp::max(
380                    unix_time_as_millis(),
381                    current.template.current_time,
382                ));
383            current.template = builder.build();
384            current.size.proposals = new_proposals_size;
385            current.size.total = new_total_size;
386
387            trace!(
388                "[BlockAssembler] update_proposals-{} epoch-{} uncles-{} proposals-{} txs-{}",
389                current.template.number,
390                current.template.epoch.number(),
391                current.template.uncles.len(),
392                current.template.proposals.len(),
393                current.template.transactions.len(),
394            );
395        }
396    }
397
398    pub(crate) async fn update_transactions(
399        &self,
400        tx_pool: &RwLock<TxPool>,
401    ) -> Result<(), AnyError> {
402        let mut current = self.current.lock().await;
403        let consensus = current.snapshot.consensus();
404        let current_template = &current.template;
405        let max_block_bytes = consensus.max_block_bytes() as usize;
406        let extension = Self::build_extension(&current.snapshot)?;
407        let txs = {
408            let tx_pool_reader = tx_pool.read().await;
409            if current.snapshot.tip_hash() != tx_pool_reader.snapshot().tip_hash() {
410                return Ok(());
411            }
412
413            let basic_block_size = Self::basic_block_size(
414                current_template.cellbase.data(),
415                &current_template.uncles,
416                current_template.proposals.iter(),
417                extension.clone(),
418            );
419
420            let txs_size_limit = max_block_bytes.checked_sub(basic_block_size);
421
422            if txs_size_limit.is_none() {
423                return Ok(());
424            }
425
426            let max_block_cycles = consensus.max_block_cycles();
427            let (txs, _txs_size, _cycles) = tx_pool_reader
428                .package_txs(max_block_cycles, txs_size_limit.expect("overflow checked"));
429            txs
430        };
431
432        if let Ok((dao, checked_txs, _failed_txs)) = Self::calc_dao(
433            &current.snapshot,
434            &current.epoch,
435            current_template.cellbase.clone(),
436            txs,
437        ) {
438            let new_txs_size = checked_txs.iter().map(|tx| tx.size).sum();
439            let new_total_size = current.size.calc_total_by_txs(new_txs_size);
440            let mut builder = BlockTemplateBuilder::from_template(&current.template);
441            builder
442                .set_transactions(checked_txs)
443                .work_id(self.work_id.fetch_add(1, Ordering::SeqCst))
444                .current_time(cmp::max(
445                    unix_time_as_millis(),
446                    current.template.current_time,
447                ))
448                .dao(dao);
449            if let Some(data) = extension {
450                builder.extension(data);
451            }
452            current.template = builder.build();
453            current.size.txs = new_txs_size;
454            current.size.total = new_total_size;
455
456            trace!(
457                "[BlockAssembler] update_transactions-{} epoch-{} uncles-{} proposals-{} txs-{}",
458                current.template.number,
459                current.template.epoch.number(),
460                current.template.uncles.len(),
461                current.template.proposals.len(),
462                current.template.transactions.len(),
463            );
464        }
465        Ok(())
466    }
467
468    pub(crate) async fn get_current(&self) -> JsonBlockTemplate {
469        let current = self.current.lock().await;
470        (&current.template).into()
471    }
472
473    pub(crate) fn build_cellbase_witness(
474        config: &BlockAssemblerConfig,
475        snapshot: &Snapshot,
476    ) -> CellbaseWitness {
477        let hash_type: ScriptHashType = config.hash_type.into();
478        let cellbase_lock = Script::new_builder()
479            .args(config.args.as_bytes())
480            .code_hash(&config.code_hash)
481            .hash_type(hash_type)
482            .build();
483        let tip = snapshot.tip_header();
484
485        let mut message = vec![];
486        if let Some(version) = snapshot.compute_versionbits(tip) {
487            message.extend_from_slice(&version.to_le_bytes());
488            message.extend_from_slice(b" ");
489        }
490        if config.use_binary_version_as_message_prefix {
491            message.extend_from_slice(config.binary_version.as_bytes());
492        }
493        if !config.message.is_empty() {
494            message.extend_from_slice(b" ");
495            message.extend_from_slice(config.message.as_bytes());
496        }
497
498        CellbaseWitness::new_builder()
499            .lock(cellbase_lock)
500            .message(message)
501            .build()
502    }
503
504    /// Miner mined block H(c), the block reward will be finalized at H(c + w_far + 1).
505    /// Miner specify own lock in cellbase witness.
506    /// The cellbase have only one output,
507    /// miner should collect the block reward for finalize target H(max(0, c - w_far - 1))
508    pub(crate) fn build_cellbase(
509        config: &BlockAssemblerConfig,
510        snapshot: &Snapshot,
511    ) -> Result<TransactionView, AnyError> {
512        let tip = snapshot.tip_header();
513        let candidate_number = tip.number() + 1;
514        let cellbase_witness = Self::build_cellbase_witness(config, snapshot);
515
516        let tx = {
517            let (target_lock, block_reward) = block_in_place(|| {
518                RewardCalculator::new(snapshot.consensus(), snapshot).block_reward_to_finalize(tip)
519            })?;
520            let input = CellInput::new_cellbase_input(candidate_number);
521            let output = CellOutput::new_builder()
522                .capacity(block_reward.total)
523                .lock(target_lock)
524                .build();
525
526            let witness = cellbase_witness.as_bytes();
527            let no_finalization_target =
528                candidate_number <= snapshot.consensus().finalization_delay_length();
529            let tx_builder = TransactionBuilder::default().input(input).witness(witness);
530            let insufficient_reward_to_create_cell = output.is_lack_of_capacity(Capacity::zero())?;
531            if no_finalization_target || insufficient_reward_to_create_cell {
532                tx_builder.build()
533            } else {
534                tx_builder
535                    .output(output)
536                    .output_data(Bytes::default())
537                    .build()
538            }
539        };
540
541        Ok(tx)
542    }
543
544    pub(crate) fn build_extension(snapshot: &Snapshot) -> Result<Option<packed::Bytes>, AnyError> {
545        let tip_header = snapshot.tip_header();
546        // The use of the epoch number of the tip here leads to an off-by-one bug,
547        // so be careful, it needs to be preserved for consistency reasons and not fixed directly.
548        let mmr_activate = snapshot
549            .consensus()
550            .rfc0044_active(tip_header.epoch().number());
551        if mmr_activate {
552            let chain_root = snapshot
553                .chain_root_mmr(tip_header.number())
554                .get_root()
555                .map_err(|e| InternalErrorKind::MMR.other(e))?;
556            let bytes = chain_root.calc_mmr_hash().as_bytes().into();
557            Ok(Some(bytes))
558        } else {
559            Ok(None)
560        }
561    }
562
563    pub(crate) async fn prepare_uncles(
564        &self,
565        snapshot: &Snapshot,
566        current_epoch: &EpochExt,
567    ) -> Vec<UncleBlockView> {
568        let mut guard = self.candidate_uncles.lock().await;
569        guard.prepare_uncles(snapshot, current_epoch)
570    }
571
572    pub(crate) fn basic_block_size<'a>(
573        cellbase: Transaction,
574        uncles: &[UncleBlockView],
575        proposals: impl Iterator<Item = &'a ProposalShortId>,
576        extension_opt: Option<packed::Bytes>,
577    ) -> usize {
578        let empty_dao = packed::Byte32::default();
579        let raw_header = packed::RawHeader::new_builder().dao(empty_dao).build();
580        let header = packed::Header::new_builder().raw(raw_header).build();
581        let block = if let Some(extension) = extension_opt {
582            packed::BlockV1::new_builder()
583                .header(header)
584                .transactions(vec![cellbase])
585                .uncles(uncles.iter().map(|u| u.data()).collect::<Vec<_>>())
586                .proposals(proposals.cloned().collect::<Vec<_>>())
587                .extension(extension)
588                .build()
589                .as_v0()
590        } else {
591            packed::Block::new_builder()
592                .header(header)
593                .transactions(vec![cellbase])
594                .uncles(uncles.iter().map(|u| u.data()).collect::<Vec<_>>())
595                .proposals(proposals.cloned().collect::<Vec<_>>())
596                .build()
597        };
598        block.serialized_size_without_uncle_proposals()
599    }
600
601    fn calc_dao(
602        snapshot: &Snapshot,
603        current_epoch: &EpochExt,
604        cellbase: TransactionView,
605        entries: Vec<TxEntry>,
606    ) -> CalcDaoResult {
607        let tip_header = snapshot.tip_header();
608        let consensus = snapshot.consensus();
609        let mut seen_inputs = HashSet::new();
610        let mut transactions_checker = TransactionsChecker::new(iter::once(&cellbase));
611
612        let mut checked_failed_txs = vec![];
613        let checked_entries: Vec<_> = block_in_place(|| {
614            entries
615                .into_iter()
616                .filter_map(|entry| {
617                    let overlay_cell_checker =
618                        OverlayCellChecker::new(&transactions_checker, snapshot);
619                    if let Err(err) =
620                        entry
621                            .rtx
622                            .check(&mut seen_inputs, &overlay_cell_checker, snapshot)
623                    {
624                        error!(
625                            "Resolving transactions while building block template, \
626                             tip_number: {}, tip_hash: {}, tx_hash: {}, error: {:?}",
627                            tip_header.number(),
628                            tip_header.hash(),
629                            entry.transaction().hash(),
630                            err
631                        );
632                        // Returning the out_point makes debugging easier and provides better logs.
633                        checked_failed_txs
634                            .push((entry.proposal_short_id(), err.out_point().cloned()));
635                        None
636                    } else {
637                        transactions_checker.insert(entry.transaction());
638                        Some(entry)
639                    }
640                })
641                .collect()
642        });
643
644        let dummy_cellbase_entry = TxEntry::dummy_resolve(cellbase, 0, Capacity::zero(), 0);
645        let entries_iter = iter::once(&dummy_cellbase_entry)
646            .chain(checked_entries.iter())
647            .map(|entry| entry.rtx.as_ref());
648
649        // Generate DAO fields here
650        let dao = DaoCalculator::new(consensus, &snapshot.borrow_as_data_loader())
651            .dao_field_with_current_epoch(entries_iter, tip_header, current_epoch)?;
652
653        Ok((dao, checked_entries, checked_failed_txs))
654    }
655
656    pub(crate) async fn notify(&self) {
657        if !self.need_to_notify() {
658            return;
659        }
660        let template = self.get_current().await;
661        if let Ok(template_json) = serde_json::to_string(&template) {
662            let notify_timeout = Duration::from_millis(self.config.notify_timeout_millis);
663            for url in &self.config.notify {
664                if let Ok(req) = Request::builder()
665                    .method(Method::POST)
666                    .uri(url.as_ref())
667                    .header("content-type", "application/json")
668                    .body(Full::new(template_json.to_owned().into()))
669                {
670                    let client = Arc::clone(&self.poster);
671                    let url = url.to_owned();
672                    tokio::spawn(async move {
673                        let _resp =
674                            timeout(notify_timeout, client.request(req))
675                                .await
676                                .map_err(|_| {
677                                    ckb_logger::warn!(
678                                        "block assembler notifying {} timed out",
679                                        url
680                                    );
681                                });
682                    });
683                }
684            }
685
686            for script in &self.config.notify_scripts {
687                let script = script.to_owned();
688                let template_json = template_json.to_owned();
689                tokio::spawn(async move {
690                    // Errors
691                    // This future will return an error if the child process cannot be spawned
692                    // or if there is an error while awaiting its status.
693
694                    // On Unix platforms this method will fail with std::io::ErrorKind::WouldBlock
695                    // if the system process limit is reached
696                    // (which includes other applications running on the system).
697                    match timeout(
698                        notify_timeout,
699                        Command::new(&script).arg(template_json).status(),
700                    )
701                    .await
702                    {
703                        Ok(ret) => match ret {
704                            Ok(status) => debug!("the command exited with: {}", status),
705                            Err(e) => error!("the script {} failed to spawn {}", script, e),
706                        },
707                        Err(_) => {
708                            ckb_logger::warn!("block assembler notifying {} timed out", script)
709                        }
710                    }
711                });
712            }
713        }
714    }
715
716    fn need_to_notify(&self) -> bool {
717        !self.config.notify.is_empty() || !self.config.notify_scripts.is_empty()
718    }
719}
720
721#[derive(Clone)]
722pub(crate) struct BlockTemplate {
723    pub(crate) version: Version,
724    pub(crate) compact_target: u32,
725    pub(crate) number: BlockNumber,
726    pub(crate) epoch: EpochNumberWithFraction,
727    pub(crate) parent_hash: Byte32,
728    pub(crate) cycles_limit: Cycle,
729    pub(crate) bytes_limit: u64,
730    pub(crate) uncles_count_limit: u8,
731
732    // option
733    pub(crate) uncles: Vec<UncleBlockView>,
734    pub(crate) transactions: Vec<TxEntry>,
735    pub(crate) proposals: Vec<ProposalShortId>,
736    pub(crate) cellbase: TransactionView,
737    pub(crate) work_id: u64,
738    pub(crate) dao: Byte32,
739    pub(crate) current_time: u64,
740    pub(crate) extension: Option<Bytes>,
741}
742
743impl<'a> From<&'a BlockTemplate> for JsonBlockTemplate {
744    fn from(template: &'a BlockTemplate) -> JsonBlockTemplate {
745        JsonBlockTemplate {
746            version: template.version.into(),
747            compact_target: template.compact_target.into(),
748            number: template.number.into(),
749            epoch: template.epoch.into(),
750            parent_hash: (&template.parent_hash).into(),
751            cycles_limit: template.cycles_limit.into(),
752            bytes_limit: template.bytes_limit.into(),
753            uncles_count_limit: u64::from(template.uncles_count_limit).into(),
754            uncles: template.uncles.iter().map(uncle_to_template).collect(),
755            transactions: template
756                .transactions
757                .iter()
758                .map(tx_entry_to_template)
759                .collect(),
760            proposals: template.proposals.iter().map(Into::into).collect(),
761            cellbase: cellbase_to_template(&template.cellbase),
762            work_id: template.work_id.into(),
763            dao: template.dao.clone().into(),
764            current_time: template.current_time.into(),
765            extension: template.extension.as_ref().map(Into::into),
766        }
767    }
768}
769
770#[derive(Clone)]
771pub(crate) struct BlockTemplateBuilder {
772    pub(crate) version: Version,
773    pub(crate) compact_target: u32,
774    pub(crate) number: BlockNumber,
775    pub(crate) epoch: EpochNumberWithFraction,
776    pub(crate) parent_hash: Byte32,
777    pub(crate) cycles_limit: Cycle,
778    pub(crate) bytes_limit: u64,
779    pub(crate) uncles_count_limit: u8,
780
781    // option
782    pub(crate) uncles: Vec<UncleBlockView>,
783    pub(crate) transactions: Vec<TxEntry>,
784    pub(crate) proposals: Vec<ProposalShortId>,
785    pub(crate) cellbase: Option<TransactionView>,
786    pub(crate) work_id: Option<u64>,
787    pub(crate) dao: Option<Byte32>,
788    pub(crate) current_time: Option<u64>,
789    pub(crate) extension: Option<Bytes>,
790}
791
792impl BlockTemplateBuilder {
793    pub(crate) fn new(snapshot: &Snapshot, current_epoch: &EpochExt) -> Self {
794        let consensus = snapshot.consensus();
795        let tip_header = snapshot.tip_header();
796        let tip_hash = tip_header.hash();
797        let candidate_number = tip_header.number() + 1;
798
799        let version = consensus.block_version();
800        let max_block_bytes = consensus.max_block_bytes();
801        let cycles_limit = consensus.max_block_cycles();
802        let uncles_count_limit = consensus.max_uncles_num() as u8;
803
804        Self {
805            version,
806            compact_target: current_epoch.compact_target(),
807
808            number: candidate_number,
809            epoch: current_epoch.number_with_fraction(candidate_number),
810            parent_hash: tip_hash,
811            cycles_limit,
812            bytes_limit: max_block_bytes,
813            uncles_count_limit,
814            // option
815            uncles: vec![],
816            transactions: vec![],
817            proposals: vec![],
818            cellbase: None,
819            work_id: None,
820            dao: None,
821            current_time: None,
822            extension: None,
823        }
824    }
825
826    pub(crate) fn from_template(template: &BlockTemplate) -> Self {
827        Self {
828            version: template.version,
829            compact_target: template.compact_target,
830            number: template.number,
831            epoch: template.epoch,
832            parent_hash: template.parent_hash.clone(),
833            cycles_limit: template.cycles_limit,
834            bytes_limit: template.bytes_limit,
835            uncles_count_limit: template.uncles_count_limit,
836            extension: template.extension.clone(),
837            // option
838            uncles: template.uncles.clone(),
839            transactions: template.transactions.clone(),
840            proposals: template.proposals.clone(),
841            cellbase: Some(template.cellbase.clone()),
842            work_id: None,
843            dao: Some(template.dao.clone()),
844            current_time: None,
845        }
846    }
847
848    pub(crate) fn uncles(&mut self, uncles: impl IntoIterator<Item = UncleBlockView>) -> &mut Self {
849        self.uncles.extend(uncles);
850        self
851    }
852
853    pub(crate) fn set_uncles(&mut self, uncles: Vec<UncleBlockView>) -> &mut Self {
854        self.uncles = uncles;
855        self
856    }
857
858    pub(crate) fn transactions(
859        &mut self,
860        transactions: impl IntoIterator<Item = TxEntry>,
861    ) -> &mut Self {
862        self.transactions.extend(transactions);
863        self
864    }
865
866    pub(crate) fn set_transactions(&mut self, transactions: Vec<TxEntry>) -> &mut Self {
867        self.transactions = transactions;
868        self
869    }
870
871    pub(crate) fn proposals(
872        &mut self,
873        proposals: impl IntoIterator<Item = ProposalShortId>,
874    ) -> &mut Self {
875        self.proposals.extend(proposals);
876        self
877    }
878
879    pub(crate) fn set_proposals(&mut self, proposals: Vec<ProposalShortId>) -> &mut Self {
880        self.proposals = proposals;
881        self
882    }
883
884    pub(crate) fn cellbase(&mut self, cellbase: TransactionView) -> &mut Self {
885        self.cellbase = Some(cellbase);
886        self
887    }
888
889    pub(crate) fn work_id(&mut self, work_id: u64) -> &mut Self {
890        self.work_id = Some(work_id);
891        self
892    }
893
894    pub(crate) fn dao(&mut self, dao: Byte32) -> &mut Self {
895        self.dao = Some(dao);
896        self
897    }
898
899    pub(crate) fn current_time(&mut self, current_time: u64) -> &mut Self {
900        self.current_time = Some(current_time);
901        self
902    }
903
904    #[allow(dead_code)]
905    pub(crate) fn extension(&mut self, extension: Bytes) -> &mut Self {
906        self.extension = Some(extension);
907        self
908    }
909
910    pub(crate) fn build(self) -> BlockTemplate {
911        assert!(self.cellbase.is_some(), "cellbase must be set");
912        assert!(self.work_id.is_some(), "work_id must be set");
913        assert!(self.current_time.is_some(), "current_time must be set");
914        assert!(self.dao.is_some(), "dao must be set");
915
916        BlockTemplate {
917            version: self.version,
918            compact_target: self.compact_target,
919
920            number: self.number,
921            epoch: self.epoch,
922            parent_hash: self.parent_hash,
923            cycles_limit: self.cycles_limit,
924            bytes_limit: self.bytes_limit,
925            uncles_count_limit: self.uncles_count_limit,
926            uncles: self.uncles,
927            transactions: self.transactions,
928            proposals: self.proposals,
929            cellbase: self.cellbase.expect("cellbase assert checked"),
930            work_id: self.work_id.expect("work_id assert checked"),
931            dao: self.dao.expect("dao assert checked"),
932            current_time: self.current_time.expect("current_time assert checked"),
933            extension: self.extension,
934        }
935    }
936}
937
938pub(crate) fn uncle_to_template(uncle: &UncleBlockView) -> UncleTemplate {
939    UncleTemplate {
940        hash: uncle.hash().into(),
941        required: false,
942        proposals: uncle
943            .data()
944            .proposals()
945            .into_iter()
946            .map(Into::into)
947            .collect(),
948        header: uncle.data().header().into(),
949    }
950}
951
952pub(crate) fn tx_entry_to_template(entry: &TxEntry) -> TransactionTemplate {
953    TransactionTemplate {
954        hash: entry.transaction().hash().into(),
955        required: false, // unimplemented
956        cycles: Some(entry.cycles.into()),
957        depends: None, // unimplemented
958        data: entry.transaction().data().into(),
959    }
960}
961
962pub(crate) fn cellbase_to_template(tx: &TransactionView) -> CellbaseTemplate {
963    CellbaseTemplate {
964        hash: tx.hash().into(),
965        cycles: None,
966        data: tx.data().into(),
967    }
968}