1#![cfg(all(feature = "production", feature = "rayon"))]
7
8use crate::activation::{ForkActivationTable, IsForkActive};
9use crate::error::{ConsensusError, Result};
10use crate::script::verify_script_with_context_full;
11use crate::types::{Block, ForkId, Natural, Network};
12use crate::witness::is_witness_empty;
13use crate::witness::Witness;
14use crossbeam_queue::SegQueue;
15
16use std::cell::RefCell;
17use std::sync::atomic::AtomicUsize;
18use std::sync::{Arc, Condvar, Mutex};
19use std::thread::{self, JoinHandle};
20
21const DEFAULT_BATCH_SIZE: usize = 128;
23
24thread_local! {
27 static PREVOUT_BUF: RefCell<Vec<i64>> = const { RefCell::new(Vec::new()) };
28}
29
30#[derive(Clone, Debug)]
33pub struct ScriptCheck {
34 pub tx_ctx_idx: usize,
35 pub input_idx: usize,
36 pub spk_offset: u32,
37 pub spk_len: u32,
38 pub prevout_value: i64,
39}
40
41#[derive(Clone, Debug)]
45pub struct TxScriptContext {
46 pub tx_index: usize,
47 pub prevout_values_range: (usize, usize),
49 pub script_pubkey_indices_range: (usize, usize),
51 pub flags: u32,
52 #[cfg(feature = "production")]
53 pub bip143: Option<crate::transaction_hash::Bip143PrecomputedHashes>,
54 pub loop_idx: usize,
55 pub fee: i64,
56 pub ecdsa_index_base: usize,
57 #[cfg(feature = "production")]
59 pub sighash_midstate_cache: Option<crate::transaction_hash::SighashMidstateCache>,
60}
61
62pub struct BlockSessionContext {
67 pub block: Arc<Block>,
68 pub prevout_values_buffer: Arc<Vec<i64>>,
70 pub script_pubkey_indices_buffer: Arc<Vec<(usize, usize)>>,
72 pub script_pubkey_buffer: Arc<Vec<u8>>,
74 pub witness_buffer: Arc<Vec<Vec<Witness>>>,
76 pub tx_contexts: Arc<Vec<TxScriptContext>>,
77 #[cfg(feature = "production")]
78 pub ecdsa_sub_counters: Arc<Vec<AtomicUsize>>,
79 #[cfg(feature = "production")]
80 pub schnorr_collector: Option<Arc<crate::bip348::SchnorrSignatureCollector>>,
81 pub height: Natural,
82 pub median_time_past: Option<u64>,
83 pub network: Network,
84 pub activation: ForkActivationTable,
86 pub results: Arc<SegQueue<Vec<(usize, bool)>>>,
88 #[cfg(feature = "production")]
91 pub precomputed_sighashes: Arc<Vec<Option<[u8; 32]>>>,
92 #[cfg(feature = "production")]
94 pub precomputed_p2pkh_hashes: Arc<Vec<Option<[u8; 20]>>>,
95}
96
97struct QueueState {
98 checks: Vec<ScriptCheck>,
99 n_todo: usize,
100 n_total: usize,
101 n_idle: usize,
102 n_submitted: usize,
104 error_result: Option<ConsensusError>,
105 request_stop: bool,
106 session: Option<Arc<BlockSessionContext>>,
107}
108
109pub struct ScriptCheckQueue {
111 state: Arc<Mutex<QueueState>>,
112 worker_cv: Arc<Condvar>,
113 master_cv: Arc<Condvar>,
114 control_mutex: Mutex<()>,
115 workers: Vec<JoinHandle<()>>,
116 batch_size: usize,
117}
118
119impl ScriptCheckQueue {
120 pub fn new(worker_count: usize, batch_size: Option<usize>) -> Self {
123 let batch_size = batch_size
124 .filter(|&b| b > 0 && b <= 1024)
125 .unwrap_or(DEFAULT_BATCH_SIZE);
126 let state = Arc::new(Mutex::new(QueueState {
127 checks: Vec::new(),
128 n_todo: 0,
129 n_total: 0,
130 n_idle: 0,
131 n_submitted: 0,
132 error_result: None,
133 request_stop: false,
134 session: None,
135 }));
136 let worker_cv = Arc::new(Condvar::new());
137 let master_cv = Arc::new(Condvar::new());
138
139 let mut workers = Vec::with_capacity(worker_count);
140 for n in 0..worker_count {
141 let state_clone = Arc::clone(&state);
142 let cv_clone = Arc::clone(&worker_cv);
143 let master_clone = Arc::clone(&master_cv);
144 let bs = batch_size;
145 workers.push(
146 thread::Builder::new()
147 .name(format!("scriptch.{n}"))
148 .spawn(move || {
149 Self::worker_loop(state_clone, &cv_clone, &master_clone, bs);
150 })
151 .expect("scriptch thread spawn"),
152 );
153 }
154
155 Self {
156 state,
157 worker_cv,
158 master_cv,
159 control_mutex: Mutex::new(()),
160 workers,
161 batch_size,
162 }
163 }
164
165 pub fn run_check_with_refs(
169 check: &ScriptCheck,
170 session: &BlockSessionContext,
171 ctx: &TxScriptContext,
172 refs: &[&[u8]],
173 buffer: &[u8],
174 #[cfg(feature = "production")] p2pkh_hash: Option<[u8; 20]>,
175 script_pubkey_prefetched: Option<&[u8]>,
176 prevout_values_prefetched: Option<&[i64]>,
177 ) -> std::result::Result<bool, ConsensusError> {
178 let tx = &session.block.transactions[ctx.tx_index];
179 let script_pubkey: &[u8] = match script_pubkey_prefetched {
180 Some(s) => s,
181 None => {
182 let spi = session.script_pubkey_indices_buffer.as_slice();
183 let (base, count) = ctx.script_pubkey_indices_range;
184 let (start, len) = if check.input_idx < count {
185 spi[base + check.input_idx]
186 } else {
187 (0, 0)
188 };
189 &buffer[start..start + len]
190 }
191 };
192 let witness_for_script = if !session
193 .activation
194 .is_fork_active(ForkId::SegWit, session.height)
195 {
196 None
197 } else {
198 session
199 .witness_buffer
200 .get(ctx.tx_index)
201 .and_then(|w| w.get(check.input_idx))
202 .and_then(|w| if is_witness_empty(w) { None } else { Some(w) })
203 };
204 let ecdsa_global_idx = ctx.ecdsa_index_base + check.input_idx;
205
206 #[cfg(feature = "production")]
207 let sighash_cache = ctx.sighash_midstate_cache.as_ref();
208
209 #[cfg(feature = "production")]
210 let precomputed_sighash = session
211 .precomputed_sighashes
212 .get(ecdsa_global_idx)
213 .and_then(|s| *s);
214 #[cfg(feature = "production")]
215 let precomputed_p2pkh = match p2pkh_hash {
216 Some(h) => Some(h),
217 None => session
218 .precomputed_p2pkh_hashes
219 .get(ecdsa_global_idx)
220 .and_then(|h| *h),
221 };
222
223 let do_verify = |prevout_values: &[i64]| {
224 verify_script_with_context_full(
225 &tx.inputs[check.input_idx].script_sig,
226 script_pubkey,
227 witness_for_script,
228 ctx.flags,
229 tx,
230 check.input_idx,
231 prevout_values,
232 refs,
233 Some(session.height),
234 session.median_time_past,
235 session.network,
236 crate::script::SigVersion::Base,
237 #[cfg(feature = "production")]
238 session.schnorr_collector.as_deref(),
239 #[cfg(not(feature = "production"))]
240 None,
241 #[cfg(feature = "production")]
242 ctx.bip143.as_ref(),
243 #[cfg(not(feature = "production"))]
244 None,
245 #[cfg(feature = "production")]
246 precomputed_sighash,
247 #[cfg(feature = "production")]
248 sighash_cache,
249 #[cfg(feature = "production")]
250 precomputed_p2pkh,
251 )
252 .map_err(|e| {
253 ConsensusError::BlockValidation(
254 format!(
255 "Script verification failed at tx {} input {}: {}",
256 ctx.tx_index, check.input_idx, e
257 )
258 .into(),
259 )
260 })
261 };
262
263 match prevout_values_prefetched {
264 Some(p) => do_verify(p),
265 None => {
266 let pv = session.prevout_values_buffer.as_slice();
267 let (base, count) = ctx.prevout_values_range;
268 let slice = &pv[base..][..count];
269 PREVOUT_BUF.with(|cell| {
270 let mut v = cell.borrow_mut();
271 v.clear();
272 v.extend_from_slice(slice);
273 do_verify(&v)
274 })
275 }
276 }
277 }
278
279 fn run_check<'a>(
280 check: &ScriptCheck,
281 session: &'a BlockSessionContext,
282 refs_buf: &mut Vec<&'a [u8]>,
283 ) -> std::result::Result<bool, ConsensusError> {
284 let ctx = session
285 .tx_contexts
286 .get(check.tx_ctx_idx)
287 .ok_or_else(|| ConsensusError::BlockValidation("tx_ctx_idx out of range".into()))?;
288 let buffer = session.script_pubkey_buffer.as_slice();
289 let spi = session.script_pubkey_indices_buffer.as_slice();
290 let (base, count) = ctx.script_pubkey_indices_range;
291 refs_buf.clear();
292 refs_buf.extend((0..count).map(|j| {
293 let (s, l) = spi[base + j];
294 buffer[s..s + l].as_ref()
295 }));
296 Self::run_check_with_refs(
297 check, session, ctx, refs_buf, buffer, None, None, None, )
301 }
302
303 fn worker_loop(
304 state: Arc<Mutex<QueueState>>,
305 worker_cv: &Condvar,
306 master_cv: &Condvar,
307 batch_size: usize,
308 ) {
309 let mut n_now: usize = 0;
310 let mut local_error: Option<ConsensusError> = None;
311 let mut batch_buf: Vec<ScriptCheck> = Vec::with_capacity(batch_size);
312
313 loop {
314 let (session_opt, _batch_len) = {
315 let mut guard = state.lock().unwrap();
316 if n_now > 0 {
317 if let Some(ref err) = local_error {
318 if guard.error_result.is_none() {
319 guard.error_result = Some(err.clone());
320 }
321 }
322 guard.n_todo -= n_now;
323 if guard.n_todo == 0 {
324 master_cv.notify_one();
325 }
326 n_now = 0;
327 local_error = None;
328 } else {
329 guard.n_total += 1;
330 }
331
332 loop {
333 if guard.request_stop {
334 return;
335 }
336 if guard.checks.is_empty() {
337 guard.n_idle += 1;
338 guard = worker_cv.wait(guard).unwrap();
339 guard.n_idle -= 1;
340 continue;
341 }
342 break;
343 }
344
345 let n_total = guard.n_total;
346 let n_idle = guard.n_idle;
347 let divisor = (n_total + n_idle + 1).max(1);
348 n_now = (guard.checks.len() / divisor).clamp(1, batch_size);
349 let drain_len = n_now.min(guard.checks.len());
350 batch_buf.clear();
351 let drain_start = guard.checks.len() - drain_len;
352 batch_buf.extend(guard.checks.drain(drain_start..));
353 let session = guard.session.clone();
354 (session, ())
355 };
356
357 if batch_buf.is_empty() {
358 continue;
359 }
360
361 let session = match session_opt.as_ref() {
362 Some(s) => Arc::clone(s),
363 None => continue,
364 };
365
366 let mut batch_results = Vec::with_capacity(batch_buf.len());
367 #[cfg(all(feature = "production", feature = "profile"))]
368 let t_run_check = std::time::Instant::now();
369 {
370 batch_buf.sort_unstable_by_key(|c| c.tx_ctx_idx);
371 let buffer = session.script_pubkey_buffer.as_slice();
372 let spi = session.script_pubkey_indices_buffer.as_slice();
373 let pv = session.prevout_values_buffer.as_slice();
374 let mut refs_buf: Vec<&[u8]> = Vec::with_capacity(64);
375 let mut cached_ctx_idx: usize = usize::MAX;
376 for c in batch_buf.iter() {
377 let ctx = match session.tx_contexts.get(c.tx_ctx_idx) {
378 Some(ctx) => ctx,
379 None => {
380 local_error = Some(ConsensusError::BlockValidation(
381 "tx_ctx_idx out of range".into(),
382 ));
383 break;
384 }
385 };
386 let s = c.spk_offset as usize;
387 let l = c.spk_len as usize;
388 let script_pubkey = if s + l <= buffer.len() {
389 &buffer[s..s + l]
390 } else {
391 &[]
392 };
393 let (pv_base, pv_count) = ctx.prevout_values_range;
394 let prevout_slice = &pv[pv_base..][..pv_count];
395 if c.tx_ctx_idx != cached_ctx_idx {
396 refs_buf.clear();
397 let (spi_base, spi_count) = ctx.script_pubkey_indices_range;
398 for j in 0..spi_count {
399 let (start, len) = spi[spi_base + j];
400 refs_buf.push(if start + len <= buffer.len() {
401 &buffer[start..start + len]
402 } else {
403 &[]
404 });
405 }
406 cached_ctx_idx = c.tx_ctx_idx;
407 }
408 match Self::run_check_with_refs(
409 c,
410 session.as_ref(),
411 ctx,
412 &refs_buf,
413 buffer,
414 None,
415 Some(script_pubkey),
416 Some(prevout_slice),
417 ) {
418 Ok(valid) => batch_results.push((c.tx_ctx_idx, valid)),
419 Err(e) => {
420 local_error = Some(e);
421 break;
422 }
423 }
424 }
425 }
426 #[cfg(all(feature = "production", feature = "profile"))]
427 crate::script_profile::add_worker_run_check_loop_ns(
428 t_run_check.elapsed().as_nanos() as u64
429 );
430 if !batch_results.is_empty() {
431 #[cfg(all(feature = "production", feature = "profile"))]
432 let t_results = std::time::Instant::now();
433 session.results.push(batch_results);
434 #[cfg(all(feature = "production", feature = "profile"))]
435 crate::script_profile::add_worker_results_extend_ns(
436 t_results.elapsed().as_nanos() as u64
437 );
438 }
439 }
440 }
441
442 pub fn start_session(&self, session: BlockSessionContext) {
444 let mut guard = self.state.lock().unwrap();
445 guard.session = Some(Arc::new(session));
446 guard.checks.clear();
447 guard.n_todo = 0;
448 guard.n_submitted = 0;
449 guard.error_result = None;
450 }
451
452 pub fn add(&self, checks: Vec<ScriptCheck>) {
454 let n = checks.len();
455 if n == 0 {
456 return;
457 }
458 {
459 let mut guard = self.state.lock().unwrap();
460 guard.checks.extend(checks);
461 guard.n_todo += n;
462 guard.n_submitted += n;
463 }
464 if n == 1 {
465 self.worker_cv.notify_one();
466 } else {
467 self.worker_cv.notify_all();
468 }
469 }
470
471 pub fn add_from_slice(&self, checks: &[ScriptCheck]) {
473 let n = checks.len();
474 if n == 0 {
475 return;
476 }
477 {
478 let mut guard = self.state.lock().unwrap();
479 guard.checks.extend(checks.iter().cloned());
480 guard.n_todo += n;
481 guard.n_submitted += n;
482 }
483 if n == 1 {
484 self.worker_cv.notify_one();
485 } else {
486 self.worker_cv.notify_all();
487 }
488 }
489
490 pub fn run_checks_sequential(
492 checks: &[ScriptCheck],
493 session: &BlockSessionContext,
494 ) -> Result<Vec<(usize, bool)>> {
495 let mut results = Vec::with_capacity(checks.len());
496 let mut refs_buf = Vec::with_capacity(256);
497 for c in checks {
498 let valid = Self::run_check(c, session, &mut refs_buf)?;
499 results.push((c.tx_ctx_idx, valid));
500 }
501 Ok(results)
502 }
503
504 pub fn complete(&self) -> Result<Vec<(usize, bool)>> {
506 let _control = self.control_mutex.lock().unwrap();
507 let state = Arc::clone(&self.state);
508 let worker_cv = Arc::clone(&self.worker_cv);
509 let master_cv = Arc::clone(&self.master_cv);
510 let batch_size = self.batch_size;
511
512 let mut n_now: usize = 0;
513 let mut local_error: Option<ConsensusError> = None;
514 let mut session_opt: Option<Arc<BlockSessionContext>> = None;
515 let mut batch_buf: Vec<ScriptCheck> = Vec::with_capacity(batch_size);
516
517 loop {
518 let done = {
519 let mut guard = state.lock().unwrap();
520 if n_now > 0 {
521 if let Some(ref err) = local_error {
522 if guard.error_result.is_none() {
523 guard.error_result = Some(err.clone());
524 }
525 }
526 guard.n_todo -= n_now;
527 n_now = 0;
528 local_error = None;
529 } else {
530 guard.n_total += 1;
531 }
532
533 loop {
534 if guard.n_todo == 0 {
535 guard.n_total -= 1;
536 let n_expected = guard.n_submitted;
537 guard.n_submitted = 0;
538 let results = guard
539 .session
540 .as_ref()
541 .map(|s| {
542 let mut out = Vec::with_capacity(n_expected.max(64));
543 while let Some(batch) = s.results.pop() {
544 out.extend(batch);
545 }
546 out
547 })
548 .unwrap_or_default();
549 guard.session = None;
550 if let Some(ref e) = guard.error_result {
551 return Err(e.clone());
552 }
553 return Ok(results);
554 }
555 if guard.checks.is_empty() {
556 guard.n_idle += 1;
557 guard = master_cv.wait(guard).unwrap();
558 guard.n_idle -= 1;
559 continue;
560 }
561 break;
562 }
563
564 let n_total = guard.n_total;
565 let n_idle = guard.n_idle;
566 let divisor = (n_total + n_idle + 1).max(1);
567 n_now = (guard.checks.len() / divisor).clamp(1, batch_size);
568 let drain_len = n_now.min(guard.checks.len());
569 batch_buf.clear();
570 let drain_start = guard.checks.len() - drain_len;
571 batch_buf.extend(guard.checks.drain(drain_start..));
572 session_opt = guard.session.clone();
573 false
574 };
575
576 if batch_buf.is_empty() {
577 continue;
578 }
579
580 let session = match session_opt.as_ref() {
581 Some(s) => Arc::clone(s),
582 None => continue,
583 };
584
585 let mut batch_results = Vec::with_capacity(batch_buf.len());
586 #[cfg(all(feature = "production", feature = "profile"))]
587 let t_run_check = std::time::Instant::now();
588 {
589 batch_buf.sort_unstable_by_key(|c| c.tx_ctx_idx);
590 let buffer = session.script_pubkey_buffer.as_slice();
591 let spi = session.script_pubkey_indices_buffer.as_slice();
592 let pv = session.prevout_values_buffer.as_slice();
593 let mut refs_buf: Vec<&[u8]> = Vec::with_capacity(64);
594 let mut cached_ctx_idx: usize = usize::MAX;
595 for c in batch_buf.iter() {
596 let ctx = match session.tx_contexts.get(c.tx_ctx_idx) {
597 Some(ctx) => ctx,
598 None => {
599 local_error = Some(ConsensusError::BlockValidation(
600 "tx_ctx_idx out of range".into(),
601 ));
602 break;
603 }
604 };
605 let s = c.spk_offset as usize;
606 let l = c.spk_len as usize;
607 let script_pubkey = if s + l <= buffer.len() {
608 &buffer[s..s + l]
609 } else {
610 &[]
611 };
612 let (pv_base, pv_count) = ctx.prevout_values_range;
613 let prevout_slice = &pv[pv_base..][..pv_count];
614 if c.tx_ctx_idx != cached_ctx_idx {
615 refs_buf.clear();
616 let (spi_base, spi_count) = ctx.script_pubkey_indices_range;
617 for j in 0..spi_count {
618 let (start, len) = spi[spi_base + j];
619 refs_buf.push(if start + len <= buffer.len() {
620 &buffer[start..start + len]
621 } else {
622 &[]
623 });
624 }
625 cached_ctx_idx = c.tx_ctx_idx;
626 }
627 match Self::run_check_with_refs(
628 c,
629 session.as_ref(),
630 ctx,
631 &refs_buf,
632 buffer,
633 None,
634 Some(script_pubkey),
635 Some(prevout_slice),
636 ) {
637 Ok(valid) => batch_results.push((c.tx_ctx_idx, valid)),
638 Err(e) => {
639 local_error = Some(e);
640 break;
641 }
642 }
643 }
644 }
645 #[cfg(all(feature = "production", feature = "profile"))]
646 crate::script_profile::add_worker_run_check_loop_ns(
647 t_run_check.elapsed().as_nanos() as u64
648 );
649 if !batch_results.is_empty() {
650 #[cfg(all(feature = "production", feature = "profile"))]
651 let t_results = std::time::Instant::now();
652 session.results.push(batch_results);
653 #[cfg(all(feature = "production", feature = "profile"))]
654 crate::script_profile::add_worker_results_extend_ns(
655 t_results.elapsed().as_nanos() as u64
656 );
657 }
658 }
659 }
660}