use std::sync::Arc;
use std::time::Duration;
use anyhow::Result;
use async_trait::async_trait;
use indexmap::{self, IndexMap};
use serde::{Deserialize, Serialize};
use session::DebugLogValidatorSesssion;
use tycho_crypto::ed25519::KeyPair;
use tycho_types::models::*;
use tycho_util::{FastHashMap, serde_helpers};
use self::session::ValidatorSession;
use crate::tracing_targets;
use crate::validator::rpc::ExchangeSignaturesBackoff;
use crate::validator::{
AddSession, ValidationSessionId, ValidationStatus, Validator, ValidatorNetworkContext,
};
mod session;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ValidatorStdImplConfig {
pub exchange_signatures_backoff: ExchangeSignaturesBackoff,
#[serde(with = "serde_helpers::humantime")]
pub exchange_signatures_timeout: Duration,
#[serde(with = "serde_helpers::humantime")]
pub failed_exchange_interval: Duration,
pub max_parallel_requests: usize,
pub signature_cache_slots: u32,
pub old_blocks_to_keep: u32,
}
impl Default for ValidatorStdImplConfig {
fn default() -> Self {
Self {
exchange_signatures_backoff: Default::default(),
exchange_signatures_timeout: Duration::from_secs(1),
failed_exchange_interval: Duration::from_secs(10),
max_parallel_requests: 10,
signature_cache_slots: 3,
old_blocks_to_keep: 10,
}
}
}
#[derive(Clone)]
#[repr(transparent)]
pub struct ValidatorStdImpl {
inner: Arc<Inner>,
}
impl ValidatorStdImpl {
pub fn new(
net_context: ValidatorNetworkContext,
keypair: Arc<KeyPair>,
config: ValidatorStdImplConfig,
) -> Self {
Self {
inner: Arc::new(Inner {
net_context,
keypair,
sessions: Default::default(),
config,
}),
}
}
}
#[async_trait]
impl Validator for ValidatorStdImpl {
fn add_session(&self, info: AddSession<'_>) -> Result<()> {
let session = ValidatorSession::new(
&self.inner.net_context,
self.inner.keypair.clone(),
&self.inner.config,
info,
)?;
let mut sessions = self.inner.sessions.lock();
let shard_sessions = sessions.entry(info.shard_ident).or_default();
match shard_sessions.entry(info.session_id) {
indexmap::map::Entry::Vacant(entry) => {
tracing::debug!(
target: tracing_targets::VALIDATOR,
session = ?DebugLogValidatorSesssion(&session),
"new validator session added",
);
entry.insert(session);
Ok(())
}
indexmap::map::Entry::Occupied(_) => {
anyhow::bail!(
"validator session already exists: ({}, {:?})",
info.shard_ident,
info.session_id
)
}
}
}
async fn validate(
&self,
session_id: ValidationSessionId,
block_id: &BlockId,
) -> Result<ValidationStatus> {
let session = 'session: {
if let Some(shard_sessions) = self.inner.sessions.lock().get(&block_id.shard)
&& let Some(session) = shard_sessions.get(&session_id)
{
break 'session session.clone();
}
anyhow::bail!(
"validator session not found: ({}, {:?})",
block_id.shard,
session_id,
);
};
session.validate_block(block_id).await
}
fn cancel_validation(
&self,
until: &BlockIdShort,
session_id: Option<ValidationSessionId>,
) -> Result<()> {
let session = {
let mut sessions = self.inner.sessions.lock();
let Some(shard_sessions) = sessions.get_mut(&until.shard) else {
return Ok(());
};
let session = session_id.and_then(|id| {
shard_sessions.get(&id).or_else(|| {
tracing::warn!(
target: tracing_targets::VALIDATOR,
session_id = ?id,
shard = %until.shard,
"validation session not found for explicit session_id" );
None
})
});
let session = match session {
Some(session) => session.clone(),
None => {
if let Some(s) = shard_sessions.iter().rev().find_map(|(_, session)| {
(session.start_block_seqno() <= until.seqno).then(|| session.clone())
}) {
s
} else {
return Ok(());
}
}
};
let seqno_threshold = session
.start_block_seqno()
.saturating_add(self.inner.config.old_blocks_to_keep);
if until.seqno >= seqno_threshold {
while let Some(entry) = shard_sessions.first_entry() {
if *entry.key() < session.id() {
entry.get().cancel();
entry.shift_remove();
} else {
break;
}
}
}
session
};
session.cancel_until(until.seqno);
Ok(())
}
}
struct Inner {
net_context: ValidatorNetworkContext,
keypair: Arc<KeyPair>,
sessions: parking_lot::Mutex<Sessions>,
config: ValidatorStdImplConfig,
}
type Sessions = FastHashMap<ShardIdent, ShardSessions>;
type ShardSessions = IndexMap<ValidationSessionId, ValidatorSession>;