tycho_collator/validator/impls/std_impl/
mod.rs1use std::sync::Arc;
2use std::time::Duration;
3
4use anyhow::Result;
5use async_trait::async_trait;
6use indexmap::{self, IndexMap};
7use serde::{Deserialize, Serialize};
8use session::DebugLogValidatorSesssion;
9use tycho_crypto::ed25519::KeyPair;
10use tycho_types::models::*;
11use tycho_util::{FastHashMap, serde_helpers};
12
13use self::session::ValidatorSession;
14use crate::tracing_targets;
15use crate::validator::rpc::ExchangeSignaturesBackoff;
16use crate::validator::{
17 AddSession, ValidationSessionId, ValidationStatus, Validator, ValidatorNetworkContext,
18};
19
20mod session;
21
22#[derive(Debug, Clone, Serialize, Deserialize)]
23pub struct ValidatorStdImplConfig {
24 pub exchange_signatures_backoff: ExchangeSignaturesBackoff,
26
27 #[serde(with = "serde_helpers::humantime")]
31 pub exchange_signatures_timeout: Duration,
32
33 #[serde(with = "serde_helpers::humantime")]
37 pub failed_exchange_interval: Duration,
38
39 pub max_parallel_requests: usize,
43
44 pub signature_cache_slots: u32,
48
49 pub old_blocks_to_keep: u32,
53}
54
55impl Default for ValidatorStdImplConfig {
56 fn default() -> Self {
57 Self {
58 exchange_signatures_backoff: Default::default(),
59 exchange_signatures_timeout: Duration::from_secs(1),
60 failed_exchange_interval: Duration::from_secs(10),
61 max_parallel_requests: 10,
62 signature_cache_slots: 3,
63 old_blocks_to_keep: 10,
64 }
65 }
66}
67
68#[derive(Clone)]
69#[repr(transparent)]
70pub struct ValidatorStdImpl {
71 inner: Arc<Inner>,
72}
73
74impl ValidatorStdImpl {
75 pub fn new(
76 net_context: ValidatorNetworkContext,
77 keypair: Arc<KeyPair>,
78 config: ValidatorStdImplConfig,
79 ) -> Self {
80 Self {
81 inner: Arc::new(Inner {
82 net_context,
83 keypair,
84 sessions: Default::default(),
85 config,
86 }),
87 }
88 }
89}
90
91#[async_trait]
92impl Validator for ValidatorStdImpl {
93 fn add_session(&self, info: AddSession<'_>) -> Result<()> {
94 let session = ValidatorSession::new(
95 &self.inner.net_context,
96 self.inner.keypair.clone(),
97 &self.inner.config,
98 info,
99 )?;
100
101 let mut sessions = self.inner.sessions.lock();
102 let shard_sessions = sessions.entry(info.shard_ident).or_default();
103
104 match shard_sessions.entry(info.session_id) {
105 indexmap::map::Entry::Vacant(entry) => {
106 tracing::debug!(
107 target: tracing_targets::VALIDATOR,
108 session = ?DebugLogValidatorSesssion(&session),
109 "new validator session added",
110 );
111 entry.insert(session);
112 Ok(())
113 }
114 indexmap::map::Entry::Occupied(_) => {
115 anyhow::bail!(
116 "validator session already exists: ({}, {:?})",
117 info.shard_ident,
118 info.session_id
119 )
120 }
121 }
122 }
123
124 async fn validate(
125 &self,
126 session_id: ValidationSessionId,
127 block_id: &BlockId,
128 ) -> Result<ValidationStatus> {
129 let session = 'session: {
130 if let Some(shard_sessions) = self.inner.sessions.lock().get(&block_id.shard)
131 && let Some(session) = shard_sessions.get(&session_id)
132 {
133 break 'session session.clone();
134 }
135
136 anyhow::bail!(
137 "validator session not found: ({}, {:?})",
138 block_id.shard,
139 session_id,
140 );
141 };
142
143 session.validate_block(block_id).await
144 }
145
146 fn cancel_validation(
147 &self,
148 until: &BlockIdShort,
149 session_id: Option<ValidationSessionId>,
150 ) -> Result<()> {
151 let session = {
152 let mut sessions = self.inner.sessions.lock();
153 let Some(shard_sessions) = sessions.get_mut(&until.shard) else {
154 return Ok(());
155 };
156
157 let session = session_id.and_then(|id| {
160 shard_sessions.get(&id).or_else(|| {
161 tracing::warn!(
162 target: tracing_targets::VALIDATOR,
163 session_id = ?id,
164 shard = %until.shard,
165 "validation session not found for explicit session_id" );
166 None
167 })
168 });
169
170 let session = match session {
171 Some(session) => session.clone(),
173 None => {
175 if let Some(s) = shard_sessions.iter().rev().find_map(|(_, session)| {
176 (session.start_block_seqno() <= until.seqno).then(|| session.clone())
177 }) {
178 s
179 } else {
180 return Ok(());
182 }
183 }
184 };
185
186 let seqno_threshold = session
191 .start_block_seqno()
192 .saturating_add(self.inner.config.old_blocks_to_keep);
193
194 if until.seqno >= seqno_threshold {
195 while let Some(entry) = shard_sessions.first_entry() {
196 if *entry.key() < session.id() {
197 entry.get().cancel();
199 entry.shift_remove();
200 } else {
201 break;
202 }
203 }
204 }
205
206 session
207 };
208
209 session.cancel_until(until.seqno);
211 Ok(())
212 }
213}
214
215struct Inner {
216 net_context: ValidatorNetworkContext,
217 keypair: Arc<KeyPair>,
218 sessions: parking_lot::Mutex<Sessions>,
219 config: ValidatorStdImplConfig,
220}
221
222type Sessions = FastHashMap<ShardIdent, ShardSessions>;
223type ShardSessions = IndexMap<ValidationSessionId, ValidatorSession>;