tycho_collator/validator/rpc/
service.rs1use std::sync::{Arc, Weak};
2
3use tracing::Instrument;
4use tycho_network::{PeerId, Response, Service, ServiceRequest, try_handle_prefix};
5use tycho_types::models::ShardIdent;
6use tycho_util::futures::BoxFutureOrNoop;
7
8use crate::tracing_targets;
9use crate::validator::ValidationSessionId;
10use crate::validator::proto::rpc;
11use crate::validator::rpc::ExchangeSignatures;
12
13pub struct ValidatorService<E> {
14 pub shard_ident: ShardIdent,
15 pub session_id: ValidationSessionId,
16 pub exchanger: Weak<E>,
17}
18
19impl<E: ExchangeSignatures> ValidatorService<E> {
20 fn handle_exchange_signatures(
21 &self,
22 peer_id: &PeerId,
23 block_seqno: u32,
24 signature: Arc<[u8; 64]>,
25 ) -> BoxFutureOrNoop<Option<Response>> {
26 let Some(exchanger) = self.exchanger.upgrade() else {
27 return BoxFutureOrNoop::Noop;
28 };
29
30 let peer_id = *peer_id;
31 BoxFutureOrNoop::future(
32 async move {
33 match exchanger
34 .exchange_signatures(&peer_id, block_seqno, signature)
35 .await
36 {
37 Ok(res) => Some(Response::from_tl(res)),
38 Err(e) => {
39 tracing::debug!(
41 target: tracing_targets::VALIDATOR,
42 %peer_id,
43 block_seqno,
44 "failed to exchange signatures: {e:?}",
45 );
46 None
47 }
48 }
49 }
50 .instrument(tracing::Span::current()),
51 )
52 }
53}
54
55impl<E> Clone for ValidatorService<E> {
56 #[inline]
57 fn clone(&self) -> Self {
58 Self {
59 shard_ident: self.shard_ident,
60 session_id: self.session_id,
61 exchanger: self.exchanger.clone(),
62 }
63 }
64}
65
66impl<E> Service<ServiceRequest> for ValidatorService<E>
67where
68 E: ExchangeSignatures,
69{
70 type QueryResponse = Response;
71 type OnQueryFuture = BoxFutureOrNoop<Option<Self::QueryResponse>>;
72 type OnMessageFuture = futures_util::future::Ready<()>;
73
74 #[tracing::instrument(
75 name = "on_validator_query",
76 skip_all,
77 fields(
78 shard_ident = %self.shard_ident,
79 session_id = ?self.session_id,
80 )
81 )]
82 fn on_query(&self, req: ServiceRequest) -> Self::OnQueryFuture {
83 let (constructor, body) = match try_handle_prefix(&req) {
84 Ok(rest) => rest,
85 Err(e) => {
86 tracing::debug!(
87 target: tracing_targets::VALIDATOR,
88 "failed to deserializer query: {e}",
89 );
90 return BoxFutureOrNoop::Noop;
91 }
92 };
93
94 tycho_network::match_tl_request!(body, tag = constructor, {
95 rpc::ExchangeSignaturesOwned as r => {
96 tracing::trace!(
97 target: tracing_targets::VALIDATOR,
98 block_seqno = r.block_seqno,
99 "exchangeSignatures",
100 );
101 self.handle_exchange_signatures(&req.metadata.peer_id, r.block_seqno, r.signature)
102 }
103 }, e => {
104 tracing::debug!(
105 target: tracing_targets::VALIDATOR,
106 "failed to deserializer query: {e}",
107 );
108 BoxFutureOrNoop::Noop
109 })
110 }
111
112 #[inline]
113 fn on_message(&self, _: ServiceRequest) -> Self::OnMessageFuture {
114 futures_util::future::ready(())
115 }
116}