1#![cfg(all(feature = "production", feature = "rayon"))]
7
8use crate::activation::{ForkActivationTable, IsForkActive};
9use crate::error::{ConsensusError, Result};
10use crate::script::verify_script_with_context_full;
11use crate::types::{Block, ForkId, Natural, Network};
12use crate::witness::is_witness_empty;
13use crate::witness::Witness;
14use crossbeam_queue::SegQueue;
15
16use std::cell::RefCell;
17use std::sync::atomic::AtomicUsize;
18use std::sync::{Arc, Condvar, Mutex};
19use std::thread::{self, JoinHandle};
20
21const DEFAULT_BATCH_SIZE: usize = 128;
23
24thread_local! {
27 static PREVOUT_BUF: RefCell<Vec<i64>> = const { RefCell::new(Vec::new()) };
28}
29
30#[derive(Clone, Debug)]
33pub struct ScriptCheck {
34 pub tx_ctx_idx: usize,
35 pub input_idx: usize,
36 pub spk_offset: u32,
37 pub spk_len: u32,
38 pub prevout_value: i64,
39}
40
41#[derive(Clone, Debug)]
45pub struct TxScriptContext {
46 pub tx_index: usize,
47 pub prevout_values_range: (usize, usize),
49 pub script_pubkey_indices_range: (usize, usize),
51 pub flags: u32,
52 #[cfg(feature = "production")]
53 pub bip143: Option<crate::transaction_hash::Bip143PrecomputedHashes>,
54 pub loop_idx: usize,
55 pub fee: i64,
56 pub ecdsa_index_base: usize,
57 #[cfg(feature = "production")]
59 pub sighash_midstate_cache: Option<crate::transaction_hash::SighashMidstateCache>,
60}
61
62pub struct BlockSessionContext {
67 pub block: Arc<Block>,
68 pub prevout_values_buffer: Arc<Vec<i64>>,
70 pub script_pubkey_indices_buffer: Arc<Vec<(usize, usize)>>,
72 pub script_pubkey_buffer: Arc<Vec<u8>>,
74 pub witness_buffer: Arc<Vec<Vec<Witness>>>,
76 pub tx_contexts: Arc<Vec<TxScriptContext>>,
77 #[cfg(feature = "production")]
78 pub ecdsa_sub_counters: Arc<Vec<AtomicUsize>>,
79 #[cfg(feature = "production")]
80 pub schnorr_collector: Option<Arc<crate::bip348::SchnorrSignatureCollector>>,
81 pub height: Natural,
82 pub median_time_past: Option<u64>,
83 pub network: Network,
84 pub activation: ForkActivationTable,
86 pub results: Arc<SegQueue<Vec<(usize, bool)>>>,
88 #[cfg(feature = "production")]
91 pub precomputed_sighashes: Arc<Vec<Option<[u8; 32]>>>,
92 #[cfg(feature = "production")]
94 pub precomputed_p2pkh_hashes: Arc<Vec<Option<[u8; 20]>>>,
95}
96
97struct QueueState {
98 checks: Vec<ScriptCheck>,
99 n_todo: usize,
100 n_total: usize,
101 n_idle: usize,
102 error_result: Option<ConsensusError>,
103 request_stop: bool,
104 session: Option<Arc<BlockSessionContext>>,
105}
106
107pub struct ScriptCheckQueue {
109 state: Arc<Mutex<QueueState>>,
110 worker_cv: Arc<Condvar>,
111 master_cv: Arc<Condvar>,
112 control_mutex: Mutex<()>,
113 workers: Vec<JoinHandle<()>>,
114 batch_size: usize,
115}
116
117impl ScriptCheckQueue {
118 pub fn new(worker_count: usize, batch_size: Option<usize>) -> Self {
121 let batch_size = batch_size
122 .filter(|&b| b > 0 && b <= 1024)
123 .unwrap_or(DEFAULT_BATCH_SIZE);
124 let state = Arc::new(Mutex::new(QueueState {
125 checks: Vec::new(),
126 n_todo: 0,
127 n_total: 0,
128 n_idle: 0,
129 error_result: None,
130 request_stop: false,
131 session: None,
132 }));
133 let worker_cv = Arc::new(Condvar::new());
134 let master_cv = Arc::new(Condvar::new());
135
136 let mut workers = Vec::with_capacity(worker_count);
137 for n in 0..worker_count {
138 let state_clone = Arc::clone(&state);
139 let cv_clone = Arc::clone(&worker_cv);
140 let master_clone = Arc::clone(&master_cv);
141 let bs = batch_size;
142 workers.push(
143 thread::Builder::new()
144 .name(format!("scriptch.{n}"))
145 .spawn(move || {
146 Self::worker_loop(state_clone, &cv_clone, &master_clone, bs);
147 })
148 .expect("scriptch thread spawn"),
149 );
150 }
151
152 Self {
153 state,
154 worker_cv,
155 master_cv,
156 control_mutex: Mutex::new(()),
157 workers,
158 batch_size,
159 }
160 }
161
162 pub fn run_check_with_refs(
166 check: &ScriptCheck,
167 session: &BlockSessionContext,
168 ctx: &TxScriptContext,
169 refs: &[&[u8]],
170 buffer: &[u8],
171 #[cfg(feature = "production")] p2pkh_hash: Option<[u8; 20]>,
172 script_pubkey_prefetched: Option<&[u8]>,
173 prevout_values_prefetched: Option<&[i64]>,
174 ) -> std::result::Result<bool, ConsensusError> {
175 let tx = &session.block.transactions[ctx.tx_index];
176 let script_pubkey: &[u8] = match script_pubkey_prefetched {
177 Some(s) => s,
178 None => {
179 let spi = session.script_pubkey_indices_buffer.as_slice();
180 let (base, count) = ctx.script_pubkey_indices_range;
181 let (start, len) = if check.input_idx < count {
182 spi[base + check.input_idx]
183 } else {
184 (0, 0)
185 };
186 &buffer[start..start + len]
187 }
188 };
189 let witness_for_script = if !session
190 .activation
191 .is_fork_active(ForkId::SegWit, session.height)
192 {
193 None
194 } else {
195 session
196 .witness_buffer
197 .get(ctx.tx_index)
198 .and_then(|w| w.get(check.input_idx))
199 .and_then(|w| if is_witness_empty(w) { None } else { Some(w) })
200 };
201 let ecdsa_global_idx = ctx.ecdsa_index_base + check.input_idx;
202
203 #[cfg(feature = "production")]
204 let sighash_cache = ctx.sighash_midstate_cache.as_ref();
205
206 #[cfg(feature = "production")]
207 let precomputed_sighash = session
208 .precomputed_sighashes
209 .get(ecdsa_global_idx)
210 .and_then(|s| *s);
211 #[cfg(feature = "production")]
212 let precomputed_p2pkh = match p2pkh_hash {
213 Some(h) => Some(h),
214 None => session
215 .precomputed_p2pkh_hashes
216 .get(ecdsa_global_idx)
217 .and_then(|h| *h),
218 };
219
220 let do_verify = |prevout_values: &[i64]| {
221 verify_script_with_context_full(
222 &tx.inputs[check.input_idx].script_sig,
223 script_pubkey,
224 witness_for_script,
225 ctx.flags,
226 tx,
227 check.input_idx,
228 prevout_values,
229 refs,
230 Some(session.height),
231 session.median_time_past,
232 session.network,
233 crate::script::SigVersion::Base,
234 #[cfg(feature = "production")]
235 session.schnorr_collector.as_deref(),
236 #[cfg(not(feature = "production"))]
237 None,
238 #[cfg(feature = "production")]
239 ctx.bip143.as_ref(),
240 #[cfg(not(feature = "production"))]
241 None,
242 #[cfg(feature = "production")]
243 precomputed_sighash,
244 #[cfg(feature = "production")]
245 sighash_cache,
246 #[cfg(feature = "production")]
247 precomputed_p2pkh,
248 )
249 .map_err(|e| {
250 ConsensusError::BlockValidation(
251 format!(
252 "Script verification failed at tx {} input {}: {}",
253 ctx.tx_index, check.input_idx, e
254 )
255 .into(),
256 )
257 })
258 };
259
260 match prevout_values_prefetched {
261 Some(p) => do_verify(p),
262 None => {
263 let pv = session.prevout_values_buffer.as_slice();
264 let (base, count) = ctx.prevout_values_range;
265 let slice = &pv[base..][..count];
266 PREVOUT_BUF.with(|cell| {
267 let mut v = cell.borrow_mut();
268 v.clear();
269 v.extend_from_slice(slice);
270 do_verify(&v)
271 })
272 }
273 }
274 }
275
276 fn run_check<'a>(
277 check: &ScriptCheck,
278 session: &'a BlockSessionContext,
279 refs_buf: &mut Vec<&'a [u8]>,
280 ) -> std::result::Result<bool, ConsensusError> {
281 let ctx = session
282 .tx_contexts
283 .get(check.tx_ctx_idx)
284 .ok_or_else(|| ConsensusError::BlockValidation("tx_ctx_idx out of range".into()))?;
285 let buffer = session.script_pubkey_buffer.as_slice();
286 let spi = session.script_pubkey_indices_buffer.as_slice();
287 let (base, count) = ctx.script_pubkey_indices_range;
288 refs_buf.clear();
289 refs_buf.extend((0..count).map(|j| {
290 let (s, l) = spi[base + j];
291 buffer[s..s + l].as_ref()
292 }));
293 Self::run_check_with_refs(
294 check, session, ctx, refs_buf, buffer, None, None, None, )
298 }
299
300 fn worker_loop(
301 state: Arc<Mutex<QueueState>>,
302 worker_cv: &Condvar,
303 master_cv: &Condvar,
304 batch_size: usize,
305 ) {
306 let mut n_now: usize = 0;
307 let mut local_error: Option<ConsensusError> = None;
308 let mut batch_buf: Vec<ScriptCheck> = Vec::with_capacity(batch_size);
309
310 loop {
311 let (session_opt, _batch_len) = {
312 let mut guard = state.lock().unwrap();
313 if n_now > 0 {
314 if let Some(ref err) = local_error {
315 if guard.error_result.is_none() {
316 guard.error_result = Some(err.clone());
317 }
318 }
319 guard.n_todo -= n_now;
320 if guard.n_todo == 0 {
321 master_cv.notify_one();
322 }
323 n_now = 0;
324 local_error = None;
325 } else {
326 guard.n_total += 1;
327 }
328
329 loop {
330 if guard.request_stop {
331 return;
332 }
333 if guard.checks.is_empty() {
334 guard.n_idle += 1;
335 guard = worker_cv.wait(guard).unwrap();
336 guard.n_idle -= 1;
337 continue;
338 }
339 break;
340 }
341
342 let n_total = guard.n_total;
343 let n_idle = guard.n_idle;
344 let divisor = (n_total + n_idle + 1).max(1);
345 n_now = (guard.checks.len() / divisor).clamp(1, batch_size);
346 let drain_len = n_now.min(guard.checks.len());
347 batch_buf.clear();
348 let drain_start = guard.checks.len() - drain_len;
349 batch_buf.extend(guard.checks.drain(drain_start..));
350 let session = guard.session.clone();
351 (session, ())
352 };
353
354 if batch_buf.is_empty() {
355 continue;
356 }
357
358 let session = match session_opt.as_ref() {
359 Some(s) => Arc::clone(s),
360 None => continue,
361 };
362
363 let mut batch_results = Vec::with_capacity(batch_buf.len());
364 #[cfg(all(feature = "production", feature = "profile"))]
365 let t_run_check = std::time::Instant::now();
366 {
367 batch_buf.sort_unstable_by_key(|c| c.tx_ctx_idx);
368 let buffer = session.script_pubkey_buffer.as_slice();
369 let spi = session.script_pubkey_indices_buffer.as_slice();
370 let pv = session.prevout_values_buffer.as_slice();
371 let mut refs_buf: Vec<&[u8]> = Vec::with_capacity(64);
372 let mut cached_ctx_idx: usize = usize::MAX;
373 for c in batch_buf.iter() {
374 let ctx = match session.tx_contexts.get(c.tx_ctx_idx) {
375 Some(ctx) => ctx,
376 None => {
377 local_error = Some(ConsensusError::BlockValidation(
378 "tx_ctx_idx out of range".into(),
379 ));
380 break;
381 }
382 };
383 let s = c.spk_offset as usize;
384 let l = c.spk_len as usize;
385 let script_pubkey = if s + l <= buffer.len() {
386 &buffer[s..s + l]
387 } else {
388 &[]
389 };
390 let (pv_base, pv_count) = ctx.prevout_values_range;
391 let prevout_slice = &pv[pv_base..][..pv_count];
392 if c.tx_ctx_idx != cached_ctx_idx {
393 refs_buf.clear();
394 let (spi_base, spi_count) = ctx.script_pubkey_indices_range;
395 for j in 0..spi_count {
396 let (start, len) = spi[spi_base + j];
397 refs_buf.push(if start + len <= buffer.len() {
398 &buffer[start..start + len]
399 } else {
400 &[]
401 });
402 }
403 cached_ctx_idx = c.tx_ctx_idx;
404 }
405 match Self::run_check_with_refs(
406 c,
407 session.as_ref(),
408 ctx,
409 &refs_buf,
410 buffer,
411 None,
412 Some(script_pubkey),
413 Some(prevout_slice),
414 ) {
415 Ok(valid) => batch_results.push((c.tx_ctx_idx, valid)),
416 Err(e) => {
417 local_error = Some(e);
418 break;
419 }
420 }
421 }
422 }
423 #[cfg(all(feature = "production", feature = "profile"))]
424 crate::script_profile::add_worker_run_check_loop_ns(
425 t_run_check.elapsed().as_nanos() as u64
426 );
427 if !batch_results.is_empty() {
428 #[cfg(all(feature = "production", feature = "profile"))]
429 let t_results = std::time::Instant::now();
430 session.results.push(batch_results);
431 #[cfg(all(feature = "production", feature = "profile"))]
432 crate::script_profile::add_worker_results_extend_ns(
433 t_results.elapsed().as_nanos() as u64
434 );
435 }
436 }
437 }
438
439 pub fn start_session(&self, session: BlockSessionContext) {
441 let mut guard = self.state.lock().unwrap();
442 guard.session = Some(Arc::new(session));
443 guard.checks.clear();
444 guard.n_todo = 0;
445 guard.error_result = None;
446 }
447
448 pub fn add(&self, checks: Vec<ScriptCheck>) {
450 let n = checks.len();
451 if n == 0 {
452 return;
453 }
454 {
455 let mut guard = self.state.lock().unwrap();
456 guard.checks.extend(checks);
457 guard.n_todo += n;
458 }
459 if n == 1 {
460 self.worker_cv.notify_one();
461 } else {
462 self.worker_cv.notify_all();
463 }
464 }
465
466 pub fn add_from_slice(&self, checks: &[ScriptCheck]) {
468 let n = checks.len();
469 if n == 0 {
470 return;
471 }
472 {
473 let mut guard = self.state.lock().unwrap();
474 guard.checks.extend(checks.iter().cloned());
475 guard.n_todo += n;
476 }
477 if n == 1 {
478 self.worker_cv.notify_one();
479 } else {
480 self.worker_cv.notify_all();
481 }
482 }
483
484 pub fn run_checks_sequential(
486 checks: &[ScriptCheck],
487 session: &BlockSessionContext,
488 ) -> Result<Vec<(usize, bool)>> {
489 let mut results = Vec::with_capacity(checks.len());
490 let mut refs_buf = Vec::with_capacity(256);
491 for c in checks {
492 let valid = Self::run_check(c, session, &mut refs_buf)?;
493 results.push((c.tx_ctx_idx, valid));
494 }
495 Ok(results)
496 }
497
498 pub fn complete(&self) -> Result<Vec<(usize, bool)>> {
500 let _control = self.control_mutex.lock().unwrap();
501 let state = Arc::clone(&self.state);
502 let worker_cv = Arc::clone(&self.worker_cv);
503 let master_cv = Arc::clone(&self.master_cv);
504 let batch_size = self.batch_size;
505
506 let mut n_now: usize = 0;
507 let mut local_error: Option<ConsensusError> = None;
508 let mut session_opt: Option<Arc<BlockSessionContext>> = None;
509 let mut batch_buf: Vec<ScriptCheck> = Vec::with_capacity(batch_size);
510
511 loop {
512 let done = {
513 let mut guard = state.lock().unwrap();
514 if n_now > 0 {
515 if let Some(ref err) = local_error {
516 if guard.error_result.is_none() {
517 guard.error_result = Some(err.clone());
518 }
519 }
520 guard.n_todo -= n_now;
521 n_now = 0;
522 local_error = None;
523 } else {
524 guard.n_total += 1;
525 }
526
527 loop {
528 if guard.n_todo == 0 {
529 guard.n_total -= 1;
530 let results = guard
531 .session
532 .as_ref()
533 .map(|s| {
534 let mut out = Vec::with_capacity(512);
535 while let Some(batch) = s.results.pop() {
536 out.extend(batch);
537 }
538 out
539 })
540 .unwrap_or_default();
541 guard.session = None;
542 if let Some(ref e) = guard.error_result {
543 return Err(e.clone());
544 }
545 return Ok(results);
546 }
547 if guard.checks.is_empty() {
548 guard.n_idle += 1;
549 guard = master_cv.wait(guard).unwrap();
550 guard.n_idle -= 1;
551 continue;
552 }
553 break;
554 }
555
556 let n_total = guard.n_total;
557 let n_idle = guard.n_idle;
558 let divisor = (n_total + n_idle + 1).max(1);
559 n_now = (guard.checks.len() / divisor).clamp(1, batch_size);
560 let drain_len = n_now.min(guard.checks.len());
561 batch_buf.clear();
562 let drain_start = guard.checks.len() - drain_len;
563 batch_buf.extend(guard.checks.drain(drain_start..));
564 session_opt = guard.session.clone();
565 false
566 };
567
568 if batch_buf.is_empty() {
569 continue;
570 }
571
572 let session = match session_opt.as_ref() {
573 Some(s) => Arc::clone(s),
574 None => continue,
575 };
576
577 let mut batch_results = Vec::with_capacity(batch_buf.len());
578 #[cfg(all(feature = "production", feature = "profile"))]
579 let t_run_check = std::time::Instant::now();
580 {
581 batch_buf.sort_unstable_by_key(|c| c.tx_ctx_idx);
582 let buffer = session.script_pubkey_buffer.as_slice();
583 let spi = session.script_pubkey_indices_buffer.as_slice();
584 let pv = session.prevout_values_buffer.as_slice();
585 let mut refs_buf: Vec<&[u8]> = Vec::with_capacity(64);
586 let mut cached_ctx_idx: usize = usize::MAX;
587 for c in batch_buf.iter() {
588 let ctx = match session.tx_contexts.get(c.tx_ctx_idx) {
589 Some(ctx) => ctx,
590 None => {
591 local_error = Some(ConsensusError::BlockValidation(
592 "tx_ctx_idx out of range".into(),
593 ));
594 break;
595 }
596 };
597 let s = c.spk_offset as usize;
598 let l = c.spk_len as usize;
599 let script_pubkey = if s + l <= buffer.len() {
600 &buffer[s..s + l]
601 } else {
602 &[]
603 };
604 let (pv_base, pv_count) = ctx.prevout_values_range;
605 let prevout_slice = &pv[pv_base..][..pv_count];
606 if c.tx_ctx_idx != cached_ctx_idx {
607 refs_buf.clear();
608 let (spi_base, spi_count) = ctx.script_pubkey_indices_range;
609 for j in 0..spi_count {
610 let (start, len) = spi[spi_base + j];
611 refs_buf.push(if start + len <= buffer.len() {
612 &buffer[start..start + len]
613 } else {
614 &[]
615 });
616 }
617 cached_ctx_idx = c.tx_ctx_idx;
618 }
619 match Self::run_check_with_refs(
620 c,
621 session.as_ref(),
622 ctx,
623 &refs_buf,
624 buffer,
625 None,
626 Some(script_pubkey),
627 Some(prevout_slice),
628 ) {
629 Ok(valid) => batch_results.push((c.tx_ctx_idx, valid)),
630 Err(e) => {
631 local_error = Some(e);
632 break;
633 }
634 }
635 }
636 }
637 #[cfg(all(feature = "production", feature = "profile"))]
638 crate::script_profile::add_worker_run_check_loop_ns(
639 t_run_check.elapsed().as_nanos() as u64
640 );
641 if !batch_results.is_empty() {
642 #[cfg(all(feature = "production", feature = "profile"))]
643 let t_results = std::time::Instant::now();
644 session.results.push(batch_results);
645 #[cfg(all(feature = "production", feature = "profile"))]
646 crate::script_profile::add_worker_results_extend_ns(
647 t_results.elapsed().as_nanos() as u64
648 );
649 }
650 }
651 }
652}