use std::{
future::Future,
mem,
pin::Pin,
task::{Context, Poll},
};
use futures::{future::BoxFuture, FutureExt};
use once_cell::sync::Lazy;
use rand::thread_rng;
use tokio::sync::watch;
use tower::{util::ServiceFn, Service};
use tower_batch_control::{Batch, BatchControl, RequestWeight};
use tower_fallback::Fallback;
use zebra_chain::primitives::ed25519::*;
use crate::BoxError;
use super::{spawn_fifo, spawn_fifo_and_convert};
#[cfg(test)]
mod tests;
type BatchVerifier = batch::Verifier;
type VerifyResult = Result<(), Error>;
type Sender = watch::Sender<Option<VerifyResult>>;
#[derive(Clone, Debug)]
pub struct Item(batch::Item);
impl RequestWeight for Item {}
impl<'msg, M: AsRef<[u8]> + ?Sized> From<(VerificationKeyBytes, Signature, &'msg M)> for Item {
fn from(tup: (VerificationKeyBytes, Signature, &'msg M)) -> Self {
Self(batch::Item::from(tup))
}
}
impl From<Item> for batch::Item {
fn from(Item(item): Item) -> Self {
item
}
}
impl Item {
fn verify_single(self) -> VerifyResult {
self.0.verify_single()
}
}
pub static VERIFIER: Lazy<
Fallback<
Batch<Verifier, Item>,
ServiceFn<fn(Item) -> BoxFuture<'static, Result<(), BoxError>>>,
>,
> = Lazy::new(|| {
Fallback::new(
Batch::new(
Verifier::default(),
super::MAX_BATCH_SIZE,
None,
super::MAX_BATCH_LATENCY,
),
tower::service_fn(
(|item: Item| Verifier::verify_single_spawning(item).boxed()) as fn(_) -> _,
),
)
});
pub struct Verifier {
batch: BatchVerifier,
tx: Sender,
}
impl Default for Verifier {
fn default() -> Self {
let batch = BatchVerifier::default();
let (tx, _) = watch::channel(None);
Self { batch, tx }
}
}
impl Verifier {
fn take(&mut self) -> (BatchVerifier, Sender) {
let batch = mem::take(&mut self.batch);
let (tx, _) = watch::channel(None);
let tx = mem::replace(&mut self.tx, tx);
(batch, tx)
}
fn verify(batch: BatchVerifier, tx: Sender) {
let result = batch.verify(thread_rng());
let _ = tx.send(Some(result));
}
fn flush_blocking(&mut self) {
let (batch, tx) = self.take();
tokio::task::block_in_place(|| rayon::spawn_fifo(|| Self::verify(batch, tx)));
}
async fn flush_spawning(batch: BatchVerifier, tx: Sender) {
let start = std::time::Instant::now();
let result = spawn_fifo(move || batch.verify(thread_rng())).await;
let duration = start.elapsed().as_secs_f64();
let result_label = match &result {
Ok(Ok(())) => "success",
_ => "failure",
};
metrics::histogram!(
"zebra.consensus.batch.duration_seconds",
"verifier" => "ed25519",
"result" => result_label
)
.record(duration);
let _ = tx.send(result.ok());
}
async fn verify_single_spawning(item: Item) -> Result<(), BoxError> {
spawn_fifo_and_convert(move || item.verify_single()).await
}
}
impl Service<BatchControl<Item>> for Verifier {
type Response = ();
type Error = BoxError;
type Future = Pin<Box<dyn Future<Output = Result<(), BoxError>> + Send + 'static>>;
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: BatchControl<Item>) -> Self::Future {
match req {
BatchControl::Item(item) => {
tracing::trace!("got ed25519 item");
self.batch.queue(item);
let mut rx = self.tx.subscribe();
Box::pin(async move {
match rx.changed().await {
Ok(()) => {
let result = rx.borrow()
.ok_or("threadpool unexpectedly dropped response channel sender. Is Zebra shutting down?")?;
if result.is_ok() {
tracing::trace!(?result, "validated ed25519 signature");
metrics::counter!("signatures.ed25519.validated").increment(1);
} else {
tracing::trace!(?result, "invalid ed25519 signature");
metrics::counter!("signatures.ed25519.invalid").increment(1);
}
result.map_err(BoxError::from)
}
Err(_recv_error) => panic!("ed25519 verifier was dropped without flushing"),
}
})
}
BatchControl::Flush => {
tracing::trace!("got ed25519 flush command");
let (batch, tx) = self.take();
Box::pin(Self::flush_spawning(batch, tx).map(Ok))
}
}
}
}
impl Drop for Verifier {
fn drop(&mut self) {
self.flush_blocking();
}
}