use std::{
future::Future,
mem,
pin::Pin,
task::{Context, Poll},
};
use futures::{future::BoxFuture, FutureExt};
use once_cell::sync::Lazy;
use rand::thread_rng;
use tokio::sync::watch;
use tower::{util::ServiceFn, Service};
use tower_batch_control::{Batch, BatchControl, RequestWeight};
use tower_fallback::Fallback;
use zebra_chain::primitives::reddsa::{batch, orchard, Error, Signature, VerificationKeyBytes};
use crate::BoxError;
use super::{spawn_fifo, spawn_fifo_and_convert};
#[cfg(test)]
mod tests;
type BatchVerifier = batch::Verifier<orchard::SpendAuth, orchard::Binding>;
type VerifyResult = Result<(), Error>;
type Sender = watch::Sender<Option<VerifyResult>>;
#[derive(Clone, Debug)]
pub struct Item(batch::Item<orchard::SpendAuth, orchard::Binding>);
impl RequestWeight for Item {}
impl From<Item> for batch::Item<orchard::SpendAuth, orchard::Binding> {
fn from(Item(item): Item) -> Self {
item
}
}
impl Item {
fn verify_single(self) -> VerifyResult {
self.0.verify_single()
}
pub fn from_spendauth(
vk_bytes: VerificationKeyBytes<orchard::SpendAuth>,
sig: Signature<orchard::SpendAuth>,
msg: &impl AsRef<[u8]>,
) -> Self {
Self(batch::Item::from_spendauth(vk_bytes, sig, msg))
}
pub fn from_binding(
vk_bytes: VerificationKeyBytes<orchard::Binding>,
sig: Signature<orchard::Binding>,
msg: &impl AsRef<[u8]>,
) -> Self {
Self(batch::Item::from_binding(vk_bytes, sig, msg))
}
}
pub static VERIFIER: Lazy<
Fallback<
Batch<Verifier, Item>,
ServiceFn<fn(Item) -> BoxFuture<'static, Result<(), BoxError>>>,
>,
> = Lazy::new(|| {
Fallback::new(
Batch::new(
Verifier::default(),
super::MAX_BATCH_SIZE,
None,
super::MAX_BATCH_LATENCY,
),
tower::service_fn(
(|item: Item| Verifier::verify_single_spawning(item).boxed()) as fn(_) -> _,
),
)
});
pub struct Verifier {
batch: BatchVerifier,
tx: Sender,
}
impl Default for Verifier {
fn default() -> Self {
let batch = BatchVerifier::default();
let (tx, _) = watch::channel(None);
Self { batch, tx }
}
}
impl Verifier {
fn take(&mut self) -> (BatchVerifier, Sender) {
let batch = mem::take(&mut self.batch);
let (tx, _) = watch::channel(None);
let tx = mem::replace(&mut self.tx, tx);
(batch, tx)
}
fn verify(batch: BatchVerifier, tx: Sender) {
let result = batch.verify(thread_rng());
let _ = tx.send(Some(result));
}
fn flush_blocking(&mut self) {
let (batch, tx) = self.take();
tokio::task::block_in_place(|| rayon::spawn_fifo(|| Self::verify(batch, tx)));
}
async fn flush_spawning(batch: BatchVerifier, tx: Sender) {
let start = std::time::Instant::now();
let result = spawn_fifo(move || batch.verify(thread_rng())).await;
let duration = start.elapsed().as_secs_f64();
let result_label = match &result {
Ok(Ok(())) => "success",
_ => "failure",
};
metrics::histogram!(
"zebra.consensus.batch.duration_seconds",
"verifier" => "redpallas",
"result" => result_label
)
.record(duration);
let _ = tx.send(result.ok());
}
async fn verify_single_spawning(item: Item) -> Result<(), BoxError> {
spawn_fifo_and_convert(move || item.verify_single()).await
}
}
impl Service<BatchControl<Item>> for Verifier {
type Response = ();
type Error = BoxError;
type Future = Pin<Box<dyn Future<Output = Result<(), BoxError>> + Send + 'static>>;
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: BatchControl<Item>) -> Self::Future {
match req {
BatchControl::Item(item) => {
tracing::trace!("got item");
self.batch.queue(item);
let mut rx = self.tx.subscribe();
Box::pin(async move {
match rx.changed().await {
Ok(()) => {
let result = rx.borrow()
.ok_or("threadpool unexpectedly dropped response channel sender. Is Zebra shutting down?")?;
if result.is_ok() {
tracing::trace!(?result, "validated redpallas signature");
metrics::counter!("signatures.redpallas.validated").increment(1);
} else {
tracing::trace!(?result, "invalid redpallas signature");
metrics::counter!("signatures.redpallas.invalid").increment(1);
}
result.map_err(BoxError::from)
}
Err(_recv_error) => panic!("verifier was dropped without flushing"),
}
})
}
BatchControl::Flush => {
tracing::trace!("got redpallas flush command");
let (batch, tx) = self.take();
Box::pin(Self::flush_spawning(batch, tx).map(Ok))
}
}
}
}
impl Drop for Verifier {
fn drop(&mut self) {
self.flush_blocking();
}
}