tycho_network/network/
peer.rs

1use std::sync::Arc;
2
3use anyhow::Result;
4use tokio_util::codec::{FramedRead, FramedWrite};
5use tycho_util::metrics::{GaugeGuard, HistogramGuard};
6
7use crate::network::config::NetworkConfig;
8use crate::network::connection::Connection;
9use crate::network::wire::{make_codec, recv_response, send_request};
10use crate::types::{PeerId, Request, Response};
11
12// Histograms
13const METRIC_OUT_QUERIES_TIME: &str = "tycho_net_out_queries_time";
14const METRIC_OUT_MESSAGES_TIME: &str = "tycho_net_out_messages_time";
15
16// Counters
17const METRIC_OUT_QUERIES_TOTAL: &str = "tycho_net_out_queries_total";
18const METRIC_OUT_MESSAGES_TOTAL: &str = "tycho_net_out_messages_total";
19
20// Gauges
21const METRIC_OUT_QUERIES: &str = "tycho_net_out_queries";
22const METRIC_OUT_MESSAGES: &str = "tycho_net_out_messages";
23
24#[derive(Clone)]
25pub struct Peer {
26    connection: Connection,
27    config: Arc<NetworkConfig>,
28}
29
30impl Peer {
31    pub(crate) fn new(connection: Connection, config: Arc<NetworkConfig>) -> Self {
32        Self { connection, config }
33    }
34
35    pub fn peer_id(&self) -> &PeerId {
36        self.connection.peer_id()
37    }
38
39    pub async fn rpc(&self, request: Request) -> Result<Response> {
40        metrics::counter!(METRIC_OUT_QUERIES_TOTAL).increment(1);
41        let _gauge = GaugeGuard::increment(METRIC_OUT_QUERIES, 1);
42        let _histogram = HistogramGuard::begin(METRIC_OUT_QUERIES_TIME);
43
44        let (send_stream, recv_stream) = self.connection.open_bi().await?;
45        let mut send_stream = FramedWrite::new(send_stream, make_codec(&self.config));
46        let mut recv_stream = FramedRead::new(recv_stream, make_codec(&self.config));
47
48        send_request(&mut send_stream, request).await?;
49        send_stream.get_mut().finish()?;
50
51        recv_response(&mut recv_stream).await.map_err(Into::into)
52    }
53
54    pub async fn send_message(&self, request: Request) -> Result<()> {
55        metrics::counter!(METRIC_OUT_MESSAGES_TOTAL).increment(1);
56        let _gauge = GaugeGuard::increment(METRIC_OUT_MESSAGES, 1);
57        let _histogram = HistogramGuard::begin(METRIC_OUT_MESSAGES_TIME);
58
59        let send_stream = self.connection.open_uni().await?;
60        let mut send_stream = FramedWrite::new(send_stream, make_codec(&self.config));
61
62        send_request(&mut send_stream, request).await?;
63        send_stream.get_mut().finish()?;
64        _ = send_stream.get_mut().stopped().await;
65
66        Ok(())
67    }
68}
69
70impl std::fmt::Debug for Peer {
71    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
72        f.debug_tuple("Peer")
73            .field(&self.connection.peer_id())
74            .finish()
75    }
76}