Skip to main content

blvm_consensus/
checkqueue.rs

1//! Script verification queue: dedicated N-1 worker threads plus master as Nth.
2//!
3//! Producer adds checks per-tx; workers pull batches and verify. Master joins until
4//! queue empty. Batch size from ibd_tuning (default 128, env/config override).
5
6#![cfg(all(feature = "production", feature = "rayon"))]
7
8use crate::activation::{ForkActivationTable, IsForkActive};
9use crate::error::{ConsensusError, Result};
10use crate::script::verify_script_with_context_full;
11use crate::types::{Block, ForkId, Natural, Network};
12use crate::witness::is_witness_empty;
13use crate::witness::Witness;
14use crossbeam_queue::SegQueue;
15
16use std::cell::RefCell;
17use std::sync::atomic::AtomicUsize;
18use std::sync::{Arc, Condvar, Mutex};
19use std::thread::{self, JoinHandle};
20
21/// Default batch size when not overridden.
22const DEFAULT_BATCH_SIZE: usize = 128;
23
24/// Thread-local HashMaps reused across batches to avoid per-batch allocations.
25/// all_refs cannot be reused because it holds refs into buffer which is batch-scoped.
26thread_local! {
27    static PREVOUT_BUF: RefCell<Vec<i64>> = const { RefCell::new(Vec::new()) };
28}
29
30/// Script check with embedded per-input data. Workers use these directly without
31/// HashMap grouping or shared-buffer indirection (Core-style self-contained checks).
32#[derive(Clone, Debug)]
33pub struct ScriptCheck {
34    pub tx_ctx_idx: usize,
35    pub input_idx: usize,
36    pub spk_offset: u32,
37    pub spk_len: u32,
38    pub prevout_value: i64,
39}
40
41/// Per-tx context shared by all inputs of that tx.
42/// prevout_script_pubkeys and prevout_values in block-level buffers;
43/// context holds (start, count) ranges to avoid per-tx Vec allocations.
44#[derive(Clone, Debug)]
45pub struct TxScriptContext {
46    pub tx_index: usize,
47    /// (start, count) range into BlockSessionContext.prevout_values_buffer
48    pub prevout_values_range: (usize, usize),
49    /// (start, count) range into BlockSessionContext.script_pubkey_indices_buffer
50    pub script_pubkey_indices_range: (usize, usize),
51    pub flags: u32,
52    #[cfg(feature = "production")]
53    pub bip143: Option<crate::transaction_hash::Bip143PrecomputedHashes>,
54    pub loop_idx: usize,
55    pub fee: i64,
56    pub ecdsa_index_base: usize,
57    /// Roadmap: Core-style (scriptCode, nHashType) -> hash cache. Helps multisig.
58    #[cfg(feature = "production")]
59    pub sighash_midstate_cache: Option<crate::transaction_hash::SighashMidstateCache>,
60}
61
62/// Session context for one block; set at start_session, used by workers until complete.
63/// When schnorr_collector is None, verify in-place (no batch collection).
64/// TxScriptContext stored in Arc to avoid cloning full context per check (workers get Arc::clone).
65/// Block-level buffers are immutable (Arc<Vec<...>>) — no locks during worker execution.
66pub struct BlockSessionContext {
67    pub block: Arc<Block>,
68    /// Block-level buffer for all prevout values; TxScriptContext.prevout_values_range indexes into this.
69    pub prevout_values_buffer: Arc<Vec<i64>>,
70    /// Block-level buffer for (start, len) pairs into script_pubkey_buffer; TxScriptContext.script_pubkey_indices_range indexes into this.
71    pub script_pubkey_indices_buffer: Arc<Vec<(usize, usize)>>,
72    /// Block-level buffer for all prevout script_pubkeys; (start, len) from script_pubkey_indices_buffer index into this.
73    pub script_pubkey_buffer: Arc<Vec<u8>>,
74    /// Block-level witness data; workers index via session.witness_buffer[ctx.tx_index][input_idx].
75    pub witness_buffer: Arc<Vec<Vec<Witness>>>,
76    pub tx_contexts: Arc<Vec<TxScriptContext>>,
77    #[cfg(feature = "production")]
78    pub ecdsa_sub_counters: Arc<Vec<AtomicUsize>>,
79    #[cfg(feature = "production")]
80    pub schnorr_collector: Option<Arc<crate::bip348::SchnorrSignatureCollector>>,
81    pub height: Natural,
82    pub median_time_past: Option<u64>,
83    pub network: Network,
84    /// Fork heights for this validation session (same table as `BlockValidationContext::activation`).
85    pub activation: ForkActivationTable,
86    /// Lock-free: workers push batch_results; master drains at complete(). Reduces Mutex contention.
87    pub results: Arc<SegQueue<Vec<(usize, bool)>>>,
88    /// Precomputed sighashes for P2PKH inputs, indexed by ecdsa_index_base + input_idx.
89    /// None = not precomputed (worker computes on demand).
90    #[cfg(feature = "production")]
91    pub precomputed_sighashes: Arc<Vec<Option<[u8; 32]>>>,
92    /// Precomputed HASH160(pubkey) for P2PKH inputs, indexed same as precomputed_sighashes.
93    #[cfg(feature = "production")]
94    pub precomputed_p2pkh_hashes: Arc<Vec<Option<[u8; 20]>>>,
95}
96
97struct QueueState {
98    checks: Vec<ScriptCheck>,
99    n_todo: usize,
100    n_total: usize,
101    n_idle: usize,
102    /// Total checks submitted in the current block session; used to pre-size the result Vec in `complete()`.
103    n_submitted: usize,
104    error_result: Option<ConsensusError>,
105    request_stop: bool,
106    session: Option<Arc<BlockSessionContext>>,
107}
108
109/// Script verification queue: N-1 dedicated workers + master joins.
110pub struct ScriptCheckQueue {
111    state: Arc<Mutex<QueueState>>,
112    worker_cv: Arc<Condvar>,
113    master_cv: Arc<Condvar>,
114    control_mutex: Mutex<()>,
115    workers: Vec<JoinHandle<()>>,
116    batch_size: usize,
117}
118
119impl ScriptCheckQueue {
120    /// Create queue with `worker_count` dedicated threads (par-1; master joins for total par).
121    /// `batch_size`: from ibd_tuning (default 128). Use `None` for default.
122    pub fn new(worker_count: usize, batch_size: Option<usize>) -> Self {
123        let batch_size = batch_size
124            .filter(|&b| b > 0 && b <= 1024)
125            .unwrap_or(DEFAULT_BATCH_SIZE);
126        let state = Arc::new(Mutex::new(QueueState {
127            checks: Vec::new(),
128            n_todo: 0,
129            n_total: 0,
130            n_idle: 0,
131            n_submitted: 0,
132            error_result: None,
133            request_stop: false,
134            session: None,
135        }));
136        let worker_cv = Arc::new(Condvar::new());
137        let master_cv = Arc::new(Condvar::new());
138
139        let mut workers = Vec::with_capacity(worker_count);
140        for n in 0..worker_count {
141            let state_clone = Arc::clone(&state);
142            let cv_clone = Arc::clone(&worker_cv);
143            let master_clone = Arc::clone(&master_cv);
144            let bs = batch_size;
145            workers.push(
146                thread::Builder::new()
147                    .name(format!("scriptch.{n}"))
148                    .spawn(move || {
149                        Self::worker_loop(state_clone, &cv_clone, &master_clone, bs);
150                    })
151                    .expect("scriptch thread spawn"),
152            );
153        }
154
155        Self {
156            state,
157            worker_cv,
158            master_cv,
159            control_mutex: Mutex::new(()),
160            workers,
161            batch_size,
162        }
163    }
164
165    /// Run a single check with pre-built refs (used when refs are cached per tx_ctx).
166    /// p2pkh_hash: when Some, P2PKH fast path skips HASH160 (batch path).
167    /// When script_pubkey and prevout_values are Some, skips per-check lock acquisitions (batch path).
168    pub fn run_check_with_refs(
169        check: &ScriptCheck,
170        session: &BlockSessionContext,
171        ctx: &TxScriptContext,
172        refs: &[&[u8]],
173        buffer: &[u8],
174        #[cfg(feature = "production")] p2pkh_hash: Option<[u8; 20]>,
175        script_pubkey_prefetched: Option<&[u8]>,
176        prevout_values_prefetched: Option<&[i64]>,
177    ) -> std::result::Result<bool, ConsensusError> {
178        let tx = &session.block.transactions[ctx.tx_index];
179        let script_pubkey: &[u8] = match script_pubkey_prefetched {
180            Some(s) => s,
181            None => {
182                let spi = session.script_pubkey_indices_buffer.as_slice();
183                let (base, count) = ctx.script_pubkey_indices_range;
184                let (start, len) = if check.input_idx < count {
185                    spi[base + check.input_idx]
186                } else {
187                    (0, 0)
188                };
189                &buffer[start..start + len]
190            }
191        };
192        let witness_for_script = if !session
193            .activation
194            .is_fork_active(ForkId::SegWit, session.height)
195        {
196            None
197        } else {
198            session
199                .witness_buffer
200                .get(ctx.tx_index)
201                .and_then(|w| w.get(check.input_idx))
202                .and_then(|w| if is_witness_empty(w) { None } else { Some(w) })
203        };
204        let ecdsa_global_idx = ctx.ecdsa_index_base + check.input_idx;
205
206        #[cfg(feature = "production")]
207        let sighash_cache = ctx.sighash_midstate_cache.as_ref();
208
209        #[cfg(feature = "production")]
210        let precomputed_sighash = session
211            .precomputed_sighashes
212            .get(ecdsa_global_idx)
213            .and_then(|s| *s);
214        #[cfg(feature = "production")]
215        let precomputed_p2pkh = match p2pkh_hash {
216            Some(h) => Some(h),
217            None => session
218                .precomputed_p2pkh_hashes
219                .get(ecdsa_global_idx)
220                .and_then(|h| *h),
221        };
222
223        let do_verify = |prevout_values: &[i64]| {
224            verify_script_with_context_full(
225                &tx.inputs[check.input_idx].script_sig,
226                script_pubkey,
227                witness_for_script,
228                ctx.flags,
229                tx,
230                check.input_idx,
231                prevout_values,
232                refs,
233                Some(session.height),
234                session.median_time_past,
235                session.network,
236                crate::script::SigVersion::Base,
237                #[cfg(feature = "production")]
238                session.schnorr_collector.as_deref(),
239                #[cfg(not(feature = "production"))]
240                None,
241                #[cfg(feature = "production")]
242                ctx.bip143.as_ref(),
243                #[cfg(not(feature = "production"))]
244                None,
245                #[cfg(feature = "production")]
246                precomputed_sighash,
247                #[cfg(feature = "production")]
248                sighash_cache,
249                #[cfg(feature = "production")]
250                precomputed_p2pkh,
251            )
252            .map_err(|e| {
253                ConsensusError::BlockValidation(
254                    format!(
255                        "Script verification failed at tx {} input {}: {}",
256                        ctx.tx_index, check.input_idx, e
257                    )
258                    .into(),
259                )
260            })
261        };
262
263        match prevout_values_prefetched {
264            Some(p) => do_verify(p),
265            None => {
266                let pv = session.prevout_values_buffer.as_slice();
267                let (base, count) = ctx.prevout_values_range;
268                let slice = &pv[base..][..count];
269                PREVOUT_BUF.with(|cell| {
270                    let mut v = cell.borrow_mut();
271                    v.clear();
272                    v.extend_from_slice(slice);
273                    do_verify(&v)
274                })
275            }
276        }
277    }
278
279    fn run_check<'a>(
280        check: &ScriptCheck,
281        session: &'a BlockSessionContext,
282        refs_buf: &mut Vec<&'a [u8]>,
283    ) -> std::result::Result<bool, ConsensusError> {
284        let ctx = session
285            .tx_contexts
286            .get(check.tx_ctx_idx)
287            .ok_or_else(|| ConsensusError::BlockValidation("tx_ctx_idx out of range".into()))?;
288        let buffer = session.script_pubkey_buffer.as_slice();
289        let spi = session.script_pubkey_indices_buffer.as_slice();
290        let (base, count) = ctx.script_pubkey_indices_range;
291        refs_buf.clear();
292        refs_buf.extend((0..count).map(|j| {
293            let (s, l) = spi[base + j];
294            buffer[s..s + l].as_ref()
295        }));
296        Self::run_check_with_refs(
297            check, session, ctx, refs_buf, buffer, None, // run_check has no batch context
298            None, // script_pubkey_prefetched
299            None, // prevout_values_prefetched
300        )
301    }
302
303    fn worker_loop(
304        state: Arc<Mutex<QueueState>>,
305        worker_cv: &Condvar,
306        master_cv: &Condvar,
307        batch_size: usize,
308    ) {
309        let mut n_now: usize = 0;
310        let mut local_error: Option<ConsensusError> = None;
311        let mut batch_buf: Vec<ScriptCheck> = Vec::with_capacity(batch_size);
312
313        loop {
314            let (session_opt, _batch_len) = {
315                let mut guard = state.lock().unwrap();
316                if n_now > 0 {
317                    if let Some(ref err) = local_error {
318                        if guard.error_result.is_none() {
319                            guard.error_result = Some(err.clone());
320                        }
321                    }
322                    guard.n_todo -= n_now;
323                    if guard.n_todo == 0 {
324                        master_cv.notify_one();
325                    }
326                    n_now = 0;
327                    local_error = None;
328                } else {
329                    guard.n_total += 1;
330                }
331
332                loop {
333                    if guard.request_stop {
334                        return;
335                    }
336                    if guard.checks.is_empty() {
337                        guard.n_idle += 1;
338                        guard = worker_cv.wait(guard).unwrap();
339                        guard.n_idle -= 1;
340                        continue;
341                    }
342                    break;
343                }
344
345                let n_total = guard.n_total;
346                let n_idle = guard.n_idle;
347                let divisor = (n_total + n_idle + 1).max(1);
348                n_now = (guard.checks.len() / divisor).clamp(1, batch_size);
349                let drain_len = n_now.min(guard.checks.len());
350                batch_buf.clear();
351                let drain_start = guard.checks.len() - drain_len;
352                batch_buf.extend(guard.checks.drain(drain_start..));
353                let session = guard.session.clone();
354                (session, ())
355            };
356
357            if batch_buf.is_empty() {
358                continue;
359            }
360
361            let session = match session_opt.as_ref() {
362                Some(s) => Arc::clone(s),
363                None => continue,
364            };
365
366            let mut batch_results = Vec::with_capacity(batch_buf.len());
367            #[cfg(all(feature = "production", feature = "profile"))]
368            let t_run_check = std::time::Instant::now();
369            {
370                batch_buf.sort_unstable_by_key(|c| c.tx_ctx_idx);
371                let buffer = session.script_pubkey_buffer.as_slice();
372                let spi = session.script_pubkey_indices_buffer.as_slice();
373                let pv = session.prevout_values_buffer.as_slice();
374                let mut refs_buf: Vec<&[u8]> = Vec::with_capacity(64);
375                let mut cached_ctx_idx: usize = usize::MAX;
376                for c in batch_buf.iter() {
377                    let ctx = match session.tx_contexts.get(c.tx_ctx_idx) {
378                        Some(ctx) => ctx,
379                        None => {
380                            local_error = Some(ConsensusError::BlockValidation(
381                                "tx_ctx_idx out of range".into(),
382                            ));
383                            break;
384                        }
385                    };
386                    let s = c.spk_offset as usize;
387                    let l = c.spk_len as usize;
388                    let script_pubkey = if s + l <= buffer.len() {
389                        &buffer[s..s + l]
390                    } else {
391                        &[]
392                    };
393                    let (pv_base, pv_count) = ctx.prevout_values_range;
394                    let prevout_slice = &pv[pv_base..][..pv_count];
395                    if c.tx_ctx_idx != cached_ctx_idx {
396                        refs_buf.clear();
397                        let (spi_base, spi_count) = ctx.script_pubkey_indices_range;
398                        for j in 0..spi_count {
399                            let (start, len) = spi[spi_base + j];
400                            refs_buf.push(if start + len <= buffer.len() {
401                                &buffer[start..start + len]
402                            } else {
403                                &[]
404                            });
405                        }
406                        cached_ctx_idx = c.tx_ctx_idx;
407                    }
408                    match Self::run_check_with_refs(
409                        c,
410                        session.as_ref(),
411                        ctx,
412                        &refs_buf,
413                        buffer,
414                        None,
415                        Some(script_pubkey),
416                        Some(prevout_slice),
417                    ) {
418                        Ok(valid) => batch_results.push((c.tx_ctx_idx, valid)),
419                        Err(e) => {
420                            local_error = Some(e);
421                            break;
422                        }
423                    }
424                }
425            }
426            #[cfg(all(feature = "production", feature = "profile"))]
427            crate::script_profile::add_worker_run_check_loop_ns(
428                t_run_check.elapsed().as_nanos() as u64
429            );
430            if !batch_results.is_empty() {
431                #[cfg(all(feature = "production", feature = "profile"))]
432                let t_results = std::time::Instant::now();
433                session.results.push(batch_results);
434                #[cfg(all(feature = "production", feature = "profile"))]
435                crate::script_profile::add_worker_results_extend_ns(
436                    t_results.elapsed().as_nanos() as u64
437                );
438            }
439        }
440    }
441
442    /// Start a block session. Must be called before any Add. Session holds shared context.
443    pub fn start_session(&self, session: BlockSessionContext) {
444        let mut guard = self.state.lock().unwrap();
445        guard.session = Some(Arc::new(session));
446        guard.checks.clear();
447        guard.n_todo = 0;
448        guard.n_submitted = 0;
449        guard.error_result = None;
450    }
451
452    /// Add checks to the queue; workers wake and process.
453    pub fn add(&self, checks: Vec<ScriptCheck>) {
454        let n = checks.len();
455        if n == 0 {
456            return;
457        }
458        {
459            let mut guard = self.state.lock().unwrap();
460            guard.checks.extend(checks);
461            guard.n_todo += n;
462            guard.n_submitted += n;
463        }
464        if n == 1 {
465            self.worker_cv.notify_one();
466        } else {
467            self.worker_cv.notify_all();
468        }
469    }
470
471    /// Add checks from a slice without consuming. Used with block-level pre-allocated Vec (Q).
472    pub fn add_from_slice(&self, checks: &[ScriptCheck]) {
473        let n = checks.len();
474        if n == 0 {
475            return;
476        }
477        {
478            let mut guard = self.state.lock().unwrap();
479            guard.checks.extend(checks.iter().cloned());
480            guard.n_todo += n;
481            guard.n_submitted += n;
482        }
483        if n == 1 {
484            self.worker_cv.notify_one();
485        } else {
486            self.worker_cv.notify_all();
487        }
488    }
489
490    /// Run checks sequentially on the current thread (fallback when parallel retry fails).
491    pub fn run_checks_sequential(
492        checks: &[ScriptCheck],
493        session: &BlockSessionContext,
494    ) -> Result<Vec<(usize, bool)>> {
495        let mut results = Vec::with_capacity(checks.len());
496        let mut refs_buf = Vec::with_capacity(256);
497        for c in checks {
498            let valid = Self::run_check(c, session, &mut refs_buf)?;
499            results.push((c.tx_ctx_idx, valid));
500        }
501        Ok(results)
502    }
503
504    /// Master joins until queue empty; returns collected (tx_ctx_idx, valid) results.
505    pub fn complete(&self) -> Result<Vec<(usize, bool)>> {
506        let _control = self.control_mutex.lock().unwrap();
507        let state = Arc::clone(&self.state);
508        let worker_cv = Arc::clone(&self.worker_cv);
509        let master_cv = Arc::clone(&self.master_cv);
510        let batch_size = self.batch_size;
511
512        let mut n_now: usize = 0;
513        let mut local_error: Option<ConsensusError> = None;
514        let mut session_opt: Option<Arc<BlockSessionContext>> = None;
515        let mut batch_buf: Vec<ScriptCheck> = Vec::with_capacity(batch_size);
516
517        loop {
518            let done = {
519                let mut guard = state.lock().unwrap();
520                if n_now > 0 {
521                    if let Some(ref err) = local_error {
522                        if guard.error_result.is_none() {
523                            guard.error_result = Some(err.clone());
524                        }
525                    }
526                    guard.n_todo -= n_now;
527                    n_now = 0;
528                    local_error = None;
529                } else {
530                    guard.n_total += 1;
531                }
532
533                loop {
534                    if guard.n_todo == 0 {
535                        guard.n_total -= 1;
536                        let n_expected = guard.n_submitted;
537                        guard.n_submitted = 0;
538                        let results = guard
539                            .session
540                            .as_ref()
541                            .map(|s| {
542                                let mut out = Vec::with_capacity(n_expected.max(64));
543                                while let Some(batch) = s.results.pop() {
544                                    out.extend(batch);
545                                }
546                                out
547                            })
548                            .unwrap_or_default();
549                        guard.session = None;
550                        if let Some(ref e) = guard.error_result {
551                            return Err(e.clone());
552                        }
553                        return Ok(results);
554                    }
555                    if guard.checks.is_empty() {
556                        guard.n_idle += 1;
557                        guard = master_cv.wait(guard).unwrap();
558                        guard.n_idle -= 1;
559                        continue;
560                    }
561                    break;
562                }
563
564                let n_total = guard.n_total;
565                let n_idle = guard.n_idle;
566                let divisor = (n_total + n_idle + 1).max(1);
567                n_now = (guard.checks.len() / divisor).clamp(1, batch_size);
568                let drain_len = n_now.min(guard.checks.len());
569                batch_buf.clear();
570                let drain_start = guard.checks.len() - drain_len;
571                batch_buf.extend(guard.checks.drain(drain_start..));
572                session_opt = guard.session.clone();
573                false
574            };
575
576            if batch_buf.is_empty() {
577                continue;
578            }
579
580            let session = match session_opt.as_ref() {
581                Some(s) => Arc::clone(s),
582                None => continue,
583            };
584
585            let mut batch_results = Vec::with_capacity(batch_buf.len());
586            #[cfg(all(feature = "production", feature = "profile"))]
587            let t_run_check = std::time::Instant::now();
588            {
589                batch_buf.sort_unstable_by_key(|c| c.tx_ctx_idx);
590                let buffer = session.script_pubkey_buffer.as_slice();
591                let spi = session.script_pubkey_indices_buffer.as_slice();
592                let pv = session.prevout_values_buffer.as_slice();
593                let mut refs_buf: Vec<&[u8]> = Vec::with_capacity(64);
594                let mut cached_ctx_idx: usize = usize::MAX;
595                for c in batch_buf.iter() {
596                    let ctx = match session.tx_contexts.get(c.tx_ctx_idx) {
597                        Some(ctx) => ctx,
598                        None => {
599                            local_error = Some(ConsensusError::BlockValidation(
600                                "tx_ctx_idx out of range".into(),
601                            ));
602                            break;
603                        }
604                    };
605                    let s = c.spk_offset as usize;
606                    let l = c.spk_len as usize;
607                    let script_pubkey = if s + l <= buffer.len() {
608                        &buffer[s..s + l]
609                    } else {
610                        &[]
611                    };
612                    let (pv_base, pv_count) = ctx.prevout_values_range;
613                    let prevout_slice = &pv[pv_base..][..pv_count];
614                    if c.tx_ctx_idx != cached_ctx_idx {
615                        refs_buf.clear();
616                        let (spi_base, spi_count) = ctx.script_pubkey_indices_range;
617                        for j in 0..spi_count {
618                            let (start, len) = spi[spi_base + j];
619                            refs_buf.push(if start + len <= buffer.len() {
620                                &buffer[start..start + len]
621                            } else {
622                                &[]
623                            });
624                        }
625                        cached_ctx_idx = c.tx_ctx_idx;
626                    }
627                    match Self::run_check_with_refs(
628                        c,
629                        session.as_ref(),
630                        ctx,
631                        &refs_buf,
632                        buffer,
633                        None,
634                        Some(script_pubkey),
635                        Some(prevout_slice),
636                    ) {
637                        Ok(valid) => batch_results.push((c.tx_ctx_idx, valid)),
638                        Err(e) => {
639                            local_error = Some(e);
640                            break;
641                        }
642                    }
643                }
644            }
645            #[cfg(all(feature = "production", feature = "profile"))]
646            crate::script_profile::add_worker_run_check_loop_ns(
647                t_run_check.elapsed().as_nanos() as u64
648            );
649            if !batch_results.is_empty() {
650                #[cfg(all(feature = "production", feature = "profile"))]
651                let t_results = std::time::Instant::now();
652                session.results.push(batch_results);
653                #[cfg(all(feature = "production", feature = "profile"))]
654                crate::script_profile::add_worker_results_extend_ns(
655                    t_results.elapsed().as_nanos() as u64
656                );
657            }
658        }
659    }
660}