use std::{
fmt,
future::Future,
mem,
pin::Pin,
task::{Context, Poll},
};
use futures::{future::BoxFuture, FutureExt};
use once_cell::sync::Lazy;
use orchard::{bundle::BatchValidator, circuit::VerifyingKey};
use rand::thread_rng;
use zcash_protocol::value::ZatBalance;
use zebra_chain::transaction::SigHash;
use crate::BoxError;
use thiserror::Error;
use tokio::sync::watch;
use tower::{util::ServiceFn, Service};
use tower_batch_control::{Batch, BatchControl, RequestWeight};
use tower_fallback::Fallback;
use super::spawn_fifo;
const HALO2_MAX_BATCH_SIZE: usize = super::MAX_BATCH_SIZE;
type VerifyResult = bool;
type Sender = watch::Sender<Option<VerifyResult>>;
pub type BatchVerifyingKey = ItemVerifyingKey;
pub type ItemVerifyingKey = VerifyingKey;
lazy_static::lazy_static! {
pub static ref VERIFYING_KEY: ItemVerifyingKey = ItemVerifyingKey::build();
}
#[derive(Clone, Debug)]
pub struct Item {
bundle: orchard::bundle::Bundle<orchard::bundle::Authorized, ZatBalance>,
sighash: SigHash,
}
impl RequestWeight for Item {
fn request_weight(&self) -> usize {
self.bundle.actions().len()
}
}
impl Item {
pub fn new(
bundle: orchard::bundle::Bundle<orchard::bundle::Authorized, ZatBalance>,
sighash: SigHash,
) -> Self {
Self { bundle, sighash }
}
pub fn verify_single(self, vk: &ItemVerifyingKey) -> bool {
let mut batch = BatchValidator::default();
batch.queue(self);
batch.validate(vk, thread_rng())
}
}
trait QueueBatchVerify {
fn queue(&mut self, item: Item);
}
impl QueueBatchVerify for BatchValidator {
fn queue(&mut self, Item { bundle, sighash }: Item) {
self.add_bundle(&bundle, sighash.0);
}
}
#[derive(Clone, Debug, Error, Eq, PartialEq)]
#[allow(missing_docs)]
pub enum Halo2Error {
#[error("the constraint system is not satisfied")]
ConstraintSystemFailure,
#[error("unknown Halo2 error")]
Other,
}
impl From<halo2::plonk::Error> for Halo2Error {
fn from(err: halo2::plonk::Error) -> Halo2Error {
match err {
halo2::plonk::Error::ConstraintSystemFailure => Halo2Error::ConstraintSystemFailure,
_ => Halo2Error::Other,
}
}
}
pub static VERIFIER: Lazy<
Fallback<
Batch<Verifier, Item>,
ServiceFn<fn(Item) -> BoxFuture<'static, Result<(), BoxError>>>,
>,
> = Lazy::new(|| {
Fallback::new(
Batch::new(
Verifier::new(&VERIFYING_KEY),
HALO2_MAX_BATCH_SIZE,
None,
super::MAX_BATCH_LATENCY,
),
tower::service_fn(
(|item: Item| Verifier::verify_single_spawning(item, &VERIFYING_KEY).boxed())
as fn(_) -> _,
),
)
});
pub struct Verifier {
batch: BatchValidator,
vk: &'static ItemVerifyingKey,
tx: Sender,
}
impl Verifier {
fn new(vk: &'static ItemVerifyingKey) -> Self {
let batch = BatchValidator::default();
let (tx, _) = watch::channel(None);
Self { batch, vk, tx }
}
fn take(&mut self) -> (BatchValidator, &'static BatchVerifyingKey, Sender) {
let batch = mem::take(&mut self.batch);
let (tx, _) = watch::channel(None);
let tx = mem::replace(&mut self.tx, tx);
(batch, self.vk, tx)
}
fn verify(batch: BatchValidator, vk: &'static BatchVerifyingKey, tx: Sender) {
let result = batch.validate(vk, thread_rng());
let _ = tx.send(Some(result));
}
fn flush_blocking(&mut self) {
let (batch, vk, tx) = self.take();
tokio::task::block_in_place(|| rayon::spawn_fifo(|| Self::verify(batch, vk, tx)));
}
async fn flush_spawning(batch: BatchValidator, vk: &'static BatchVerifyingKey, tx: Sender) {
let start = std::time::Instant::now();
let result = spawn_fifo(move || batch.validate(vk, thread_rng())).await;
let duration = start.elapsed().as_secs_f64();
let result_label = match &result {
Ok(true) => "success",
_ => "failure",
};
metrics::histogram!(
"zebra.consensus.batch.duration_seconds",
"verifier" => "halo2",
"result" => result_label
)
.record(duration);
let _ = tx.send(result.ok());
}
async fn verify_single_spawning(
item: Item,
pvk: &'static ItemVerifyingKey,
) -> Result<(), BoxError> {
if spawn_fifo(move || item.verify_single(pvk)).await? {
Ok(())
} else {
Err("could not validate orchard proof".into())
}
}
}
impl fmt::Debug for Verifier {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let name = "Verifier";
f.debug_struct(name)
.field("batch", &"..")
.field("vk", &"..")
.field("tx", &self.tx)
.finish()
}
}
impl Service<BatchControl<Item>> for Verifier {
type Response = ();
type Error = BoxError;
type Future = Pin<Box<dyn Future<Output = Result<(), BoxError>> + Send + 'static>>;
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: BatchControl<Item>) -> Self::Future {
match req {
BatchControl::Item(item) => {
tracing::trace!("got item");
self.batch.queue(item);
let mut rx = self.tx.subscribe();
Box::pin(async move {
match rx.changed().await {
Ok(()) => {
let is_valid = *rx
.borrow()
.as_ref()
.ok_or("threadpool unexpectedly dropped response channel sender. Is Zebra shutting down?")?;
if is_valid {
tracing::trace!(?is_valid, "verified halo2 proof");
metrics::counter!("proofs.halo2.verified").increment(1);
Ok(())
} else {
tracing::trace!(?is_valid, "invalid halo2 proof");
metrics::counter!("proofs.halo2.invalid").increment(1);
Err("could not validate halo2 proofs".into())
}
}
Err(_recv_error) => panic!("verifier was dropped without flushing"),
}
})
}
BatchControl::Flush => {
tracing::trace!("got halo2 flush command");
let (batch, vk, tx) = self.take();
Box::pin(Self::flush_spawning(batch, vk, tx).map(Ok))
}
}
}
}
impl Drop for Verifier {
fn drop(&mut self) {
self.flush_blocking()
}
}