tycho_network/network/
peer.rs1use std::sync::Arc;
2
3use anyhow::Result;
4use tokio_util::codec::{FramedRead, FramedWrite};
5use tycho_util::metrics::{GaugeGuard, HistogramGuard};
6
7use crate::network::config::NetworkConfig;
8use crate::network::connection::Connection;
9use crate::network::wire::{make_codec, recv_response, send_request};
10use crate::types::{PeerId, Request, Response};
11
12const METRIC_OUT_QUERIES_TIME: &str = "tycho_net_out_queries_time";
14const METRIC_OUT_MESSAGES_TIME: &str = "tycho_net_out_messages_time";
15
16const METRIC_OUT_QUERIES_TOTAL: &str = "tycho_net_out_queries_total";
18const METRIC_OUT_MESSAGES_TOTAL: &str = "tycho_net_out_messages_total";
19
20const METRIC_OUT_QUERIES: &str = "tycho_net_out_queries";
22const METRIC_OUT_MESSAGES: &str = "tycho_net_out_messages";
23
24#[derive(Clone)]
25pub struct Peer {
26 connection: Connection,
27 config: Arc<NetworkConfig>,
28}
29
30impl Peer {
31 pub(crate) fn new(connection: Connection, config: Arc<NetworkConfig>) -> Self {
32 Self { connection, config }
33 }
34
35 pub fn peer_id(&self) -> &PeerId {
36 self.connection.peer_id()
37 }
38
39 pub async fn rpc(&self, request: Request) -> Result<Response> {
40 metrics::counter!(METRIC_OUT_QUERIES_TOTAL).increment(1);
41 let _gauge = GaugeGuard::increment(METRIC_OUT_QUERIES, 1);
42 let _histogram = HistogramGuard::begin(METRIC_OUT_QUERIES_TIME);
43
44 let (send_stream, recv_stream) = self.connection.open_bi().await?;
45 let mut send_stream = FramedWrite::new(send_stream, make_codec(&self.config));
46 let mut recv_stream = FramedRead::new(recv_stream, make_codec(&self.config));
47
48 send_request(&mut send_stream, request).await?;
49 send_stream.get_mut().finish()?;
50
51 recv_response(&mut recv_stream).await.map_err(Into::into)
52 }
53
54 pub async fn send_message(&self, request: Request) -> Result<()> {
55 metrics::counter!(METRIC_OUT_MESSAGES_TOTAL).increment(1);
56 let _gauge = GaugeGuard::increment(METRIC_OUT_MESSAGES, 1);
57 let _histogram = HistogramGuard::begin(METRIC_OUT_MESSAGES_TIME);
58
59 let send_stream = self.connection.open_uni().await?;
60 let mut send_stream = FramedWrite::new(send_stream, make_codec(&self.config));
61
62 send_request(&mut send_stream, request).await?;
63 send_stream.get_mut().finish()?;
64 _ = send_stream.get_mut().stopped().await;
65
66 Ok(())
67 }
68}
69
70impl std::fmt::Debug for Peer {
71 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
72 f.debug_tuple("Peer")
73 .field(&self.connection.peer_id())
74 .finish()
75 }
76}