Skip to main content

tycho_collator/validator/impls/std_impl/
mod.rs

1use std::sync::Arc;
2use std::time::Duration;
3
4use anyhow::Result;
5use async_trait::async_trait;
6use indexmap::{self, IndexMap};
7use serde::{Deserialize, Serialize};
8use session::DebugLogValidatorSesssion;
9use tycho_crypto::ed25519::KeyPair;
10use tycho_types::models::*;
11use tycho_util::{FastHashMap, serde_helpers};
12
13use self::session::ValidatorSession;
14use crate::tracing_targets;
15use crate::validator::rpc::ExchangeSignaturesBackoff;
16use crate::validator::{
17    AddSession, ValidationSessionId, ValidationStatus, Validator, ValidatorNetworkContext,
18};
19
20mod session;
21
22#[derive(Debug, Clone, Serialize, Deserialize)]
23pub struct ValidatorStdImplConfig {
24    /// Backoff configuration for exchanging signatures.
25    pub exchange_signatures_backoff: ExchangeSignaturesBackoff,
26
27    /// Timeout for exchanging signatures request.
28    ///
29    /// Default: 1 second.
30    #[serde(with = "serde_helpers::humantime")]
31    pub exchange_signatures_timeout: Duration,
32
33    /// Interval for failed exchange retries.
34    ///
35    /// Default: 10 seconds.
36    #[serde(with = "serde_helpers::humantime")]
37    pub failed_exchange_interval: Duration,
38
39    /// Maximum number of parallel requests for exchanging signatures.
40    ///
41    /// Default: 10.
42    pub max_parallel_requests: usize,
43
44    /// Number of slots for future signatures.
45    ///
46    /// Default: 3.
47    pub signature_cache_slots: u32,
48
49    /// Number of blocks to keep even after their validation.
50    ///
51    /// Default: 10.
52    pub old_blocks_to_keep: u32,
53}
54
55impl Default for ValidatorStdImplConfig {
56    fn default() -> Self {
57        Self {
58            exchange_signatures_backoff: Default::default(),
59            exchange_signatures_timeout: Duration::from_secs(1),
60            failed_exchange_interval: Duration::from_secs(10),
61            max_parallel_requests: 10,
62            signature_cache_slots: 3,
63            old_blocks_to_keep: 10,
64        }
65    }
66}
67
68#[derive(Clone)]
69#[repr(transparent)]
70pub struct ValidatorStdImpl {
71    inner: Arc<Inner>,
72}
73
74impl ValidatorStdImpl {
75    pub fn new(
76        net_context: ValidatorNetworkContext,
77        keypair: Arc<KeyPair>,
78        config: ValidatorStdImplConfig,
79    ) -> Self {
80        Self {
81            inner: Arc::new(Inner {
82                net_context,
83                keypair,
84                sessions: Default::default(),
85                config,
86            }),
87        }
88    }
89}
90
91#[async_trait]
92impl Validator for ValidatorStdImpl {
93    fn add_session(&self, info: AddSession<'_>) -> Result<()> {
94        let session = ValidatorSession::new(
95            &self.inner.net_context,
96            self.inner.keypair.clone(),
97            &self.inner.config,
98            info,
99        )?;
100
101        let mut sessions = self.inner.sessions.lock();
102        let shard_sessions = sessions.entry(info.shard_ident).or_default();
103
104        match shard_sessions.entry(info.session_id) {
105            indexmap::map::Entry::Vacant(entry) => {
106                tracing::debug!(
107                    target: tracing_targets::VALIDATOR,
108                    session = ?DebugLogValidatorSesssion(&session),
109                    "new validator session added",
110                );
111                entry.insert(session);
112                Ok(())
113            }
114            indexmap::map::Entry::Occupied(_) => {
115                anyhow::bail!(
116                    "validator session already exists: ({}, {:?})",
117                    info.shard_ident,
118                    info.session_id
119                )
120            }
121        }
122    }
123
124    async fn validate(
125        &self,
126        session_id: ValidationSessionId,
127        block_id: &BlockId,
128    ) -> Result<ValidationStatus> {
129        let session = 'session: {
130            if let Some(shard_sessions) = self.inner.sessions.lock().get(&block_id.shard)
131                && let Some(session) = shard_sessions.get(&session_id)
132            {
133                break 'session session.clone();
134            }
135
136            anyhow::bail!(
137                "validator session not found: ({}, {:?})",
138                block_id.shard,
139                session_id,
140            );
141        };
142
143        session.validate_block(block_id).await
144    }
145
146    fn cancel_validation(
147        &self,
148        until: &BlockIdShort,
149        session_id: Option<ValidationSessionId>,
150    ) -> Result<()> {
151        let session = {
152            let mut sessions = self.inner.sessions.lock();
153            let Some(shard_sessions) = sessions.get_mut(&until.shard) else {
154                return Ok(());
155            };
156
157            // try to find the session by the provided `session_id` first
158            // if not provided or not found, find the latest session that started before `until`
159            let session = session_id.and_then(|id| {
160                shard_sessions.get(&id).or_else(|| {
161                    tracing::warn!(
162                        target: tracing_targets::VALIDATOR,
163                        session_id = ?id,
164                        shard = %until.shard,
165                        "validation session not found for explicit session_id" );
166                    None
167                })
168            });
169
170            let session = match session {
171                // Directly use the found session.
172                Some(session) => session.clone(),
173                // Otherwise, find the latest session that started before `until`.
174                None => {
175                    if let Some(s) = shard_sessions.iter().rev().find_map(|(_, session)| {
176                        (session.start_block_seqno() <= until.seqno).then(|| session.clone())
177                    }) {
178                        s
179                    } else {
180                        // No session found, nothing to cancel.
181                        return Ok(());
182                    }
183                }
184            };
185
186            // Remove all sessions before the found one.
187
188            // NOTE: We need to wait for some blocks in the new session before removing the old ones,
189            // so that lagging validators have enough time to receive our signatures.
190            let seqno_threshold = session
191                .start_block_seqno()
192                .saturating_add(self.inner.config.old_blocks_to_keep);
193
194            if until.seqno >= seqno_threshold {
195                while let Some(entry) = shard_sessions.first_entry() {
196                    if *entry.key() < session.id() {
197                        // Fully cancel the session before removing it.
198                        entry.get().cancel();
199                        entry.shift_remove();
200                    } else {
201                        break;
202                    }
203                }
204            }
205
206            session
207        };
208
209        // Partially cancel the found session.
210        session.cancel_until(until.seqno);
211        Ok(())
212    }
213}
214
215struct Inner {
216    net_context: ValidatorNetworkContext,
217    keypair: Arc<KeyPair>,
218    sessions: parking_lot::Mutex<Sessions>,
219    config: ValidatorStdImplConfig,
220}
221
222type Sessions = FastHashMap<ShardIdent, ShardSessions>;
223/// We use `IndexMap` because "subset short hash" component of session id is not sequential
224type ShardSessions = IndexMap<ValidationSessionId, ValidatorSession>;