Skip to main content

tycho_collator/validator/rpc/
service.rs

1use std::sync::{Arc, Weak};
2
3use tracing::Instrument;
4use tycho_network::{PeerId, Response, Service, ServiceRequest, try_handle_prefix};
5use tycho_types::models::ShardIdent;
6use tycho_util::futures::BoxFutureOrNoop;
7
8use crate::tracing_targets;
9use crate::validator::ValidationSessionId;
10use crate::validator::proto::rpc;
11use crate::validator::rpc::ExchangeSignatures;
12
13pub struct ValidatorService<E> {
14    pub shard_ident: ShardIdent,
15    pub session_id: ValidationSessionId,
16    pub exchanger: Weak<E>,
17}
18
19impl<E: ExchangeSignatures> ValidatorService<E> {
20    fn handle_exchange_signatures(
21        &self,
22        peer_id: &PeerId,
23        block_seqno: u32,
24        signature: Arc<[u8; 64]>,
25    ) -> BoxFutureOrNoop<Option<Response>> {
26        let Some(exchanger) = self.exchanger.upgrade() else {
27            return BoxFutureOrNoop::Noop;
28        };
29
30        let peer_id = *peer_id;
31        BoxFutureOrNoop::future(
32            async move {
33                match exchanger
34                    .exchange_signatures(&peer_id, block_seqno, signature)
35                    .await
36                {
37                    Ok(res) => Some(Response::from_tl(res)),
38                    Err(e) => {
39                        // and the log will be full of these warnings.
40                        tracing::debug!(
41                            target: tracing_targets::VALIDATOR,
42                            %peer_id,
43                            block_seqno,
44                            "failed to exchange signatures: {e:?}",
45                        );
46                        None
47                    }
48                }
49            }
50            .instrument(tracing::Span::current()),
51        )
52    }
53}
54
55impl<E> Clone for ValidatorService<E> {
56    #[inline]
57    fn clone(&self) -> Self {
58        Self {
59            shard_ident: self.shard_ident,
60            session_id: self.session_id,
61            exchanger: self.exchanger.clone(),
62        }
63    }
64}
65
66impl<E> Service<ServiceRequest> for ValidatorService<E>
67where
68    E: ExchangeSignatures,
69{
70    type QueryResponse = Response;
71    type OnQueryFuture = BoxFutureOrNoop<Option<Self::QueryResponse>>;
72    type OnMessageFuture = futures_util::future::Ready<()>;
73
74    #[tracing::instrument(
75        name = "on_validator_query",
76        skip_all,
77        fields(
78            shard_ident = %self.shard_ident,
79            session_id = ?self.session_id,
80        )
81    )]
82    fn on_query(&self, req: ServiceRequest) -> Self::OnQueryFuture {
83        let (constructor, body) = match try_handle_prefix(&req) {
84            Ok(rest) => rest,
85            Err(e) => {
86                tracing::debug!(
87                    target: tracing_targets::VALIDATOR,
88                    "failed to deserializer query: {e}",
89                );
90                return BoxFutureOrNoop::Noop;
91            }
92        };
93
94        tycho_network::match_tl_request!(body, tag = constructor, {
95            rpc::ExchangeSignaturesOwned as r => {
96                tracing::trace!(
97                    target: tracing_targets::VALIDATOR,
98                    block_seqno = r.block_seqno,
99                    "exchangeSignatures",
100                );
101                self.handle_exchange_signatures(&req.metadata.peer_id, r.block_seqno, r.signature)
102            }
103        }, e => {
104            tracing::debug!(
105                target: tracing_targets::VALIDATOR,
106                "failed to deserializer query: {e}",
107            );
108            BoxFutureOrNoop::Noop
109        })
110    }
111
112    #[inline]
113    fn on_message(&self, _: ServiceRequest) -> Self::OnMessageFuture {
114        futures_util::future::ready(())
115    }
116}