Skip to main content

blvm_consensus/
checkqueue.rs

1//! Script verification queue: dedicated N-1 worker threads plus master as Nth.
2//!
3//! Producer adds checks per-tx; workers pull batches and verify. Master joins until
4//! queue empty. Batch size from ibd_tuning (default 128, env/config override).
5
6#![cfg(all(feature = "production", feature = "rayon"))]
7
8use crate::activation::{ForkActivationTable, IsForkActive};
9use crate::error::{ConsensusError, Result};
10use crate::script::verify_script_with_context_full;
11use crate::types::{Block, ForkId, Natural, Network};
12use crate::witness::is_witness_empty;
13use crate::witness::Witness;
14use crossbeam_queue::SegQueue;
15
16use std::cell::RefCell;
17use std::sync::atomic::AtomicUsize;
18use std::sync::{Arc, Condvar, Mutex};
19use std::thread::{self, JoinHandle};
20
21/// Default batch size when not overridden.
22const DEFAULT_BATCH_SIZE: usize = 128;
23
24/// Thread-local HashMaps reused across batches to avoid per-batch allocations.
25/// all_refs cannot be reused because it holds refs into buffer which is batch-scoped.
26thread_local! {
27    static PREVOUT_BUF: RefCell<Vec<i64>> = const { RefCell::new(Vec::new()) };
28}
29
30/// Script check with embedded per-input data. Workers use these directly without
31/// HashMap grouping or shared-buffer indirection (Core-style self-contained checks).
32#[derive(Clone, Debug)]
33pub struct ScriptCheck {
34    pub tx_ctx_idx: usize,
35    pub input_idx: usize,
36    pub spk_offset: u32,
37    pub spk_len: u32,
38    pub prevout_value: i64,
39}
40
41/// Per-tx context shared by all inputs of that tx.
42/// prevout_script_pubkeys and prevout_values in block-level buffers;
43/// context holds (start, count) ranges to avoid per-tx Vec allocations.
44#[derive(Clone, Debug)]
45pub struct TxScriptContext {
46    pub tx_index: usize,
47    /// (start, count) range into BlockSessionContext.prevout_values_buffer
48    pub prevout_values_range: (usize, usize),
49    /// (start, count) range into BlockSessionContext.script_pubkey_indices_buffer
50    pub script_pubkey_indices_range: (usize, usize),
51    pub flags: u32,
52    #[cfg(feature = "production")]
53    pub bip143: Option<crate::transaction_hash::Bip143PrecomputedHashes>,
54    pub loop_idx: usize,
55    pub fee: i64,
56    pub ecdsa_index_base: usize,
57    /// Roadmap: Core-style (scriptCode, nHashType) -> hash cache. Helps multisig.
58    #[cfg(feature = "production")]
59    pub sighash_midstate_cache: Option<crate::transaction_hash::SighashMidstateCache>,
60}
61
62/// Session context for one block; set at start_session, used by workers until complete.
63/// When schnorr_collector is None, verify in-place (no batch collection).
64/// TxScriptContext stored in Arc to avoid cloning full context per check (workers get Arc::clone).
65/// Block-level buffers are immutable (Arc<Vec<...>>) — no locks during worker execution.
66pub struct BlockSessionContext {
67    pub block: Arc<Block>,
68    /// Block-level buffer for all prevout values; TxScriptContext.prevout_values_range indexes into this.
69    pub prevout_values_buffer: Arc<Vec<i64>>,
70    /// Block-level buffer for (start, len) pairs into script_pubkey_buffer; TxScriptContext.script_pubkey_indices_range indexes into this.
71    pub script_pubkey_indices_buffer: Arc<Vec<(usize, usize)>>,
72    /// Block-level buffer for all prevout script_pubkeys; (start, len) from script_pubkey_indices_buffer index into this.
73    pub script_pubkey_buffer: Arc<Vec<u8>>,
74    /// Block-level witness data; workers index via session.witness_buffer[ctx.tx_index][input_idx].
75    pub witness_buffer: Arc<Vec<Vec<Witness>>>,
76    pub tx_contexts: Arc<Vec<TxScriptContext>>,
77    #[cfg(feature = "production")]
78    pub ecdsa_sub_counters: Arc<Vec<AtomicUsize>>,
79    #[cfg(feature = "production")]
80    pub schnorr_collector: Option<Arc<crate::bip348::SchnorrSignatureCollector>>,
81    pub height: Natural,
82    pub median_time_past: Option<u64>,
83    pub network: Network,
84    /// Fork heights for this validation session (same table as `BlockValidationContext::activation`).
85    pub activation: ForkActivationTable,
86    /// Lock-free: workers push batch_results; master drains at complete(). Reduces Mutex contention.
87    pub results: Arc<SegQueue<Vec<(usize, bool)>>>,
88    /// Precomputed sighashes for P2PKH inputs, indexed by ecdsa_index_base + input_idx.
89    /// None = not precomputed (worker computes on demand).
90    #[cfg(feature = "production")]
91    pub precomputed_sighashes: Arc<Vec<Option<[u8; 32]>>>,
92    /// Precomputed HASH160(pubkey) for P2PKH inputs, indexed same as precomputed_sighashes.
93    #[cfg(feature = "production")]
94    pub precomputed_p2pkh_hashes: Arc<Vec<Option<[u8; 20]>>>,
95}
96
97struct QueueState {
98    checks: Vec<ScriptCheck>,
99    n_todo: usize,
100    n_total: usize,
101    n_idle: usize,
102    error_result: Option<ConsensusError>,
103    request_stop: bool,
104    session: Option<Arc<BlockSessionContext>>,
105}
106
107/// Script verification queue: N-1 dedicated workers + master joins.
108pub struct ScriptCheckQueue {
109    state: Arc<Mutex<QueueState>>,
110    worker_cv: Arc<Condvar>,
111    master_cv: Arc<Condvar>,
112    control_mutex: Mutex<()>,
113    workers: Vec<JoinHandle<()>>,
114    batch_size: usize,
115}
116
117impl ScriptCheckQueue {
118    /// Create queue with `worker_count` dedicated threads (par-1; master joins for total par).
119    /// `batch_size`: from ibd_tuning (default 128). Use `None` for default.
120    pub fn new(worker_count: usize, batch_size: Option<usize>) -> Self {
121        let batch_size = batch_size
122            .filter(|&b| b > 0 && b <= 1024)
123            .unwrap_or(DEFAULT_BATCH_SIZE);
124        let state = Arc::new(Mutex::new(QueueState {
125            checks: Vec::new(),
126            n_todo: 0,
127            n_total: 0,
128            n_idle: 0,
129            error_result: None,
130            request_stop: false,
131            session: None,
132        }));
133        let worker_cv = Arc::new(Condvar::new());
134        let master_cv = Arc::new(Condvar::new());
135
136        let mut workers = Vec::with_capacity(worker_count);
137        for n in 0..worker_count {
138            let state_clone = Arc::clone(&state);
139            let cv_clone = Arc::clone(&worker_cv);
140            let master_clone = Arc::clone(&master_cv);
141            let bs = batch_size;
142            workers.push(
143                thread::Builder::new()
144                    .name(format!("scriptch.{n}"))
145                    .spawn(move || {
146                        Self::worker_loop(state_clone, &cv_clone, &master_clone, bs);
147                    })
148                    .expect("scriptch thread spawn"),
149            );
150        }
151
152        Self {
153            state,
154            worker_cv,
155            master_cv,
156            control_mutex: Mutex::new(()),
157            workers,
158            batch_size,
159        }
160    }
161
162    /// Run a single check with pre-built refs (used when refs are cached per tx_ctx).
163    /// p2pkh_hash: when Some, P2PKH fast path skips HASH160 (batch path).
164    /// When script_pubkey and prevout_values are Some, skips per-check lock acquisitions (batch path).
165    pub fn run_check_with_refs(
166        check: &ScriptCheck,
167        session: &BlockSessionContext,
168        ctx: &TxScriptContext,
169        refs: &[&[u8]],
170        buffer: &[u8],
171        #[cfg(feature = "production")] p2pkh_hash: Option<[u8; 20]>,
172        script_pubkey_prefetched: Option<&[u8]>,
173        prevout_values_prefetched: Option<&[i64]>,
174    ) -> std::result::Result<bool, ConsensusError> {
175        let tx = &session.block.transactions[ctx.tx_index];
176        let script_pubkey: &[u8] = match script_pubkey_prefetched {
177            Some(s) => s,
178            None => {
179                let spi = session.script_pubkey_indices_buffer.as_slice();
180                let (base, count) = ctx.script_pubkey_indices_range;
181                let (start, len) = if check.input_idx < count {
182                    spi[base + check.input_idx]
183                } else {
184                    (0, 0)
185                };
186                &buffer[start..start + len]
187            }
188        };
189        let witness_for_script = if !session
190            .activation
191            .is_fork_active(ForkId::SegWit, session.height)
192        {
193            None
194        } else {
195            session
196                .witness_buffer
197                .get(ctx.tx_index)
198                .and_then(|w| w.get(check.input_idx))
199                .and_then(|w| if is_witness_empty(w) { None } else { Some(w) })
200        };
201        let ecdsa_global_idx = ctx.ecdsa_index_base + check.input_idx;
202
203        #[cfg(feature = "production")]
204        let sighash_cache = ctx.sighash_midstate_cache.as_ref();
205
206        #[cfg(feature = "production")]
207        let precomputed_sighash = session
208            .precomputed_sighashes
209            .get(ecdsa_global_idx)
210            .and_then(|s| *s);
211        #[cfg(feature = "production")]
212        let precomputed_p2pkh = match p2pkh_hash {
213            Some(h) => Some(h),
214            None => session
215                .precomputed_p2pkh_hashes
216                .get(ecdsa_global_idx)
217                .and_then(|h| *h),
218        };
219
220        let do_verify = |prevout_values: &[i64]| {
221            verify_script_with_context_full(
222                &tx.inputs[check.input_idx].script_sig,
223                script_pubkey,
224                witness_for_script,
225                ctx.flags,
226                tx,
227                check.input_idx,
228                prevout_values,
229                refs,
230                Some(session.height),
231                session.median_time_past,
232                session.network,
233                crate::script::SigVersion::Base,
234                #[cfg(feature = "production")]
235                session.schnorr_collector.as_deref(),
236                #[cfg(not(feature = "production"))]
237                None,
238                #[cfg(feature = "production")]
239                ctx.bip143.as_ref(),
240                #[cfg(not(feature = "production"))]
241                None,
242                #[cfg(feature = "production")]
243                precomputed_sighash,
244                #[cfg(feature = "production")]
245                sighash_cache,
246                #[cfg(feature = "production")]
247                precomputed_p2pkh,
248            )
249            .map_err(|e| {
250                ConsensusError::BlockValidation(
251                    format!(
252                        "Script verification failed at tx {} input {}: {}",
253                        ctx.tx_index, check.input_idx, e
254                    )
255                    .into(),
256                )
257            })
258        };
259
260        match prevout_values_prefetched {
261            Some(p) => do_verify(p),
262            None => {
263                let pv = session.prevout_values_buffer.as_slice();
264                let (base, count) = ctx.prevout_values_range;
265                let slice = &pv[base..][..count];
266                PREVOUT_BUF.with(|cell| {
267                    let mut v = cell.borrow_mut();
268                    v.clear();
269                    v.extend_from_slice(slice);
270                    do_verify(&v)
271                })
272            }
273        }
274    }
275
276    fn run_check<'a>(
277        check: &ScriptCheck,
278        session: &'a BlockSessionContext,
279        refs_buf: &mut Vec<&'a [u8]>,
280    ) -> std::result::Result<bool, ConsensusError> {
281        let ctx = session
282            .tx_contexts
283            .get(check.tx_ctx_idx)
284            .ok_or_else(|| ConsensusError::BlockValidation("tx_ctx_idx out of range".into()))?;
285        let buffer = session.script_pubkey_buffer.as_slice();
286        let spi = session.script_pubkey_indices_buffer.as_slice();
287        let (base, count) = ctx.script_pubkey_indices_range;
288        refs_buf.clear();
289        refs_buf.extend((0..count).map(|j| {
290            let (s, l) = spi[base + j];
291            buffer[s..s + l].as_ref()
292        }));
293        Self::run_check_with_refs(
294            check, session, ctx, refs_buf, buffer, None, // run_check has no batch context
295            None, // script_pubkey_prefetched
296            None, // prevout_values_prefetched
297        )
298    }
299
300    fn worker_loop(
301        state: Arc<Mutex<QueueState>>,
302        worker_cv: &Condvar,
303        master_cv: &Condvar,
304        batch_size: usize,
305    ) {
306        let mut n_now: usize = 0;
307        let mut local_error: Option<ConsensusError> = None;
308        let mut batch_buf: Vec<ScriptCheck> = Vec::with_capacity(batch_size);
309
310        loop {
311            let (session_opt, _batch_len) = {
312                let mut guard = state.lock().unwrap();
313                if n_now > 0 {
314                    if let Some(ref err) = local_error {
315                        if guard.error_result.is_none() {
316                            guard.error_result = Some(err.clone());
317                        }
318                    }
319                    guard.n_todo -= n_now;
320                    if guard.n_todo == 0 {
321                        master_cv.notify_one();
322                    }
323                    n_now = 0;
324                    local_error = None;
325                } else {
326                    guard.n_total += 1;
327                }
328
329                loop {
330                    if guard.request_stop {
331                        return;
332                    }
333                    if guard.checks.is_empty() {
334                        guard.n_idle += 1;
335                        guard = worker_cv.wait(guard).unwrap();
336                        guard.n_idle -= 1;
337                        continue;
338                    }
339                    break;
340                }
341
342                let n_total = guard.n_total;
343                let n_idle = guard.n_idle;
344                let divisor = (n_total + n_idle + 1).max(1);
345                n_now = (guard.checks.len() / divisor).clamp(1, batch_size);
346                let drain_len = n_now.min(guard.checks.len());
347                batch_buf.clear();
348                let drain_start = guard.checks.len() - drain_len;
349                batch_buf.extend(guard.checks.drain(drain_start..));
350                let session = guard.session.clone();
351                (session, ())
352            };
353
354            if batch_buf.is_empty() {
355                continue;
356            }
357
358            let session = match session_opt.as_ref() {
359                Some(s) => Arc::clone(s),
360                None => continue,
361            };
362
363            let mut batch_results = Vec::with_capacity(batch_buf.len());
364            #[cfg(all(feature = "production", feature = "profile"))]
365            let t_run_check = std::time::Instant::now();
366            {
367                batch_buf.sort_unstable_by_key(|c| c.tx_ctx_idx);
368                let buffer = session.script_pubkey_buffer.as_slice();
369                let spi = session.script_pubkey_indices_buffer.as_slice();
370                let pv = session.prevout_values_buffer.as_slice();
371                let mut refs_buf: Vec<&[u8]> = Vec::with_capacity(64);
372                let mut cached_ctx_idx: usize = usize::MAX;
373                for c in batch_buf.iter() {
374                    let ctx = match session.tx_contexts.get(c.tx_ctx_idx) {
375                        Some(ctx) => ctx,
376                        None => {
377                            local_error = Some(ConsensusError::BlockValidation(
378                                "tx_ctx_idx out of range".into(),
379                            ));
380                            break;
381                        }
382                    };
383                    let s = c.spk_offset as usize;
384                    let l = c.spk_len as usize;
385                    let script_pubkey = if s + l <= buffer.len() {
386                        &buffer[s..s + l]
387                    } else {
388                        &[]
389                    };
390                    let (pv_base, pv_count) = ctx.prevout_values_range;
391                    let prevout_slice = &pv[pv_base..][..pv_count];
392                    if c.tx_ctx_idx != cached_ctx_idx {
393                        refs_buf.clear();
394                        let (spi_base, spi_count) = ctx.script_pubkey_indices_range;
395                        for j in 0..spi_count {
396                            let (start, len) = spi[spi_base + j];
397                            refs_buf.push(if start + len <= buffer.len() {
398                                &buffer[start..start + len]
399                            } else {
400                                &[]
401                            });
402                        }
403                        cached_ctx_idx = c.tx_ctx_idx;
404                    }
405                    match Self::run_check_with_refs(
406                        c,
407                        session.as_ref(),
408                        ctx,
409                        &refs_buf,
410                        buffer,
411                        None,
412                        Some(script_pubkey),
413                        Some(prevout_slice),
414                    ) {
415                        Ok(valid) => batch_results.push((c.tx_ctx_idx, valid)),
416                        Err(e) => {
417                            local_error = Some(e);
418                            break;
419                        }
420                    }
421                }
422            }
423            #[cfg(all(feature = "production", feature = "profile"))]
424            crate::script_profile::add_worker_run_check_loop_ns(
425                t_run_check.elapsed().as_nanos() as u64
426            );
427            if !batch_results.is_empty() {
428                #[cfg(all(feature = "production", feature = "profile"))]
429                let t_results = std::time::Instant::now();
430                session.results.push(batch_results);
431                #[cfg(all(feature = "production", feature = "profile"))]
432                crate::script_profile::add_worker_results_extend_ns(
433                    t_results.elapsed().as_nanos() as u64
434                );
435            }
436        }
437    }
438
439    /// Start a block session. Must be called before any Add. Session holds shared context.
440    pub fn start_session(&self, session: BlockSessionContext) {
441        let mut guard = self.state.lock().unwrap();
442        guard.session = Some(Arc::new(session));
443        guard.checks.clear();
444        guard.n_todo = 0;
445        guard.error_result = None;
446    }
447
448    /// Add checks to the queue; workers wake and process.
449    pub fn add(&self, checks: Vec<ScriptCheck>) {
450        let n = checks.len();
451        if n == 0 {
452            return;
453        }
454        {
455            let mut guard = self.state.lock().unwrap();
456            guard.checks.extend(checks);
457            guard.n_todo += n;
458        }
459        if n == 1 {
460            self.worker_cv.notify_one();
461        } else {
462            self.worker_cv.notify_all();
463        }
464    }
465
466    /// Add checks from a slice without consuming. Used with block-level pre-allocated Vec (Q).
467    pub fn add_from_slice(&self, checks: &[ScriptCheck]) {
468        let n = checks.len();
469        if n == 0 {
470            return;
471        }
472        {
473            let mut guard = self.state.lock().unwrap();
474            guard.checks.extend(checks.iter().cloned());
475            guard.n_todo += n;
476        }
477        if n == 1 {
478            self.worker_cv.notify_one();
479        } else {
480            self.worker_cv.notify_all();
481        }
482    }
483
484    /// Run checks sequentially on the current thread (fallback when parallel retry fails).
485    pub fn run_checks_sequential(
486        checks: &[ScriptCheck],
487        session: &BlockSessionContext,
488    ) -> Result<Vec<(usize, bool)>> {
489        let mut results = Vec::with_capacity(checks.len());
490        let mut refs_buf = Vec::with_capacity(256);
491        for c in checks {
492            let valid = Self::run_check(c, session, &mut refs_buf)?;
493            results.push((c.tx_ctx_idx, valid));
494        }
495        Ok(results)
496    }
497
498    /// Master joins until queue empty; returns collected (tx_ctx_idx, valid) results.
499    pub fn complete(&self) -> Result<Vec<(usize, bool)>> {
500        let _control = self.control_mutex.lock().unwrap();
501        let state = Arc::clone(&self.state);
502        let worker_cv = Arc::clone(&self.worker_cv);
503        let master_cv = Arc::clone(&self.master_cv);
504        let batch_size = self.batch_size;
505
506        let mut n_now: usize = 0;
507        let mut local_error: Option<ConsensusError> = None;
508        let mut session_opt: Option<Arc<BlockSessionContext>> = None;
509        let mut batch_buf: Vec<ScriptCheck> = Vec::with_capacity(batch_size);
510
511        loop {
512            let done = {
513                let mut guard = state.lock().unwrap();
514                if n_now > 0 {
515                    if let Some(ref err) = local_error {
516                        if guard.error_result.is_none() {
517                            guard.error_result = Some(err.clone());
518                        }
519                    }
520                    guard.n_todo -= n_now;
521                    n_now = 0;
522                    local_error = None;
523                } else {
524                    guard.n_total += 1;
525                }
526
527                loop {
528                    if guard.n_todo == 0 {
529                        guard.n_total -= 1;
530                        let results = guard
531                            .session
532                            .as_ref()
533                            .map(|s| {
534                                let mut out = Vec::with_capacity(512);
535                                while let Some(batch) = s.results.pop() {
536                                    out.extend(batch);
537                                }
538                                out
539                            })
540                            .unwrap_or_default();
541                        guard.session = None;
542                        if let Some(ref e) = guard.error_result {
543                            return Err(e.clone());
544                        }
545                        return Ok(results);
546                    }
547                    if guard.checks.is_empty() {
548                        guard.n_idle += 1;
549                        guard = master_cv.wait(guard).unwrap();
550                        guard.n_idle -= 1;
551                        continue;
552                    }
553                    break;
554                }
555
556                let n_total = guard.n_total;
557                let n_idle = guard.n_idle;
558                let divisor = (n_total + n_idle + 1).max(1);
559                n_now = (guard.checks.len() / divisor).clamp(1, batch_size);
560                let drain_len = n_now.min(guard.checks.len());
561                batch_buf.clear();
562                let drain_start = guard.checks.len() - drain_len;
563                batch_buf.extend(guard.checks.drain(drain_start..));
564                session_opt = guard.session.clone();
565                false
566            };
567
568            if batch_buf.is_empty() {
569                continue;
570            }
571
572            let session = match session_opt.as_ref() {
573                Some(s) => Arc::clone(s),
574                None => continue,
575            };
576
577            let mut batch_results = Vec::with_capacity(batch_buf.len());
578            #[cfg(all(feature = "production", feature = "profile"))]
579            let t_run_check = std::time::Instant::now();
580            {
581                batch_buf.sort_unstable_by_key(|c| c.tx_ctx_idx);
582                let buffer = session.script_pubkey_buffer.as_slice();
583                let spi = session.script_pubkey_indices_buffer.as_slice();
584                let pv = session.prevout_values_buffer.as_slice();
585                let mut refs_buf: Vec<&[u8]> = Vec::with_capacity(64);
586                let mut cached_ctx_idx: usize = usize::MAX;
587                for c in batch_buf.iter() {
588                    let ctx = match session.tx_contexts.get(c.tx_ctx_idx) {
589                        Some(ctx) => ctx,
590                        None => {
591                            local_error = Some(ConsensusError::BlockValidation(
592                                "tx_ctx_idx out of range".into(),
593                            ));
594                            break;
595                        }
596                    };
597                    let s = c.spk_offset as usize;
598                    let l = c.spk_len as usize;
599                    let script_pubkey = if s + l <= buffer.len() {
600                        &buffer[s..s + l]
601                    } else {
602                        &[]
603                    };
604                    let (pv_base, pv_count) = ctx.prevout_values_range;
605                    let prevout_slice = &pv[pv_base..][..pv_count];
606                    if c.tx_ctx_idx != cached_ctx_idx {
607                        refs_buf.clear();
608                        let (spi_base, spi_count) = ctx.script_pubkey_indices_range;
609                        for j in 0..spi_count {
610                            let (start, len) = spi[spi_base + j];
611                            refs_buf.push(if start + len <= buffer.len() {
612                                &buffer[start..start + len]
613                            } else {
614                                &[]
615                            });
616                        }
617                        cached_ctx_idx = c.tx_ctx_idx;
618                    }
619                    match Self::run_check_with_refs(
620                        c,
621                        session.as_ref(),
622                        ctx,
623                        &refs_buf,
624                        buffer,
625                        None,
626                        Some(script_pubkey),
627                        Some(prevout_slice),
628                    ) {
629                        Ok(valid) => batch_results.push((c.tx_ctx_idx, valid)),
630                        Err(e) => {
631                            local_error = Some(e);
632                            break;
633                        }
634                    }
635                }
636            }
637            #[cfg(all(feature = "production", feature = "profile"))]
638            crate::script_profile::add_worker_run_check_loop_ns(
639                t_run_check.elapsed().as_nanos() as u64
640            );
641            if !batch_results.is_empty() {
642                #[cfg(all(feature = "production", feature = "profile"))]
643                let t_results = std::time::Instant::now();
644                session.results.push(batch_results);
645                #[cfg(all(feature = "production", feature = "profile"))]
646                crate::script_profile::add_worker_results_extend_ns(
647                    t_results.elapsed().as_nanos() as u64
648                );
649            }
650        }
651    }
652}