tycho_network/network/
peer.rs1use std::sync::Arc;
2
3use anyhow::Result;
4use bytes::Bytes;
5use tokio_util::codec::{FramedRead, FramedWrite};
6use tycho_util::metrics::{GaugeGuard, HistogramGuard};
7
8use crate::network::config::NetworkConfig;
9use crate::network::connection::Connection;
10use crate::network::wire::{make_codec, recv_response, send_request};
11use crate::types::{PeerId, Request, Response};
12
13const METRIC_OUT_QUERIES_TIME: &str = "tycho_net_out_queries_time";
15const METRIC_OUT_MESSAGES_TIME: &str = "tycho_net_out_messages_time";
16
17const METRIC_OUT_QUERIES_TOTAL: &str = "tycho_net_out_queries_total";
19const METRIC_OUT_MESSAGES_TOTAL: &str = "tycho_net_out_messages_total";
20
21const METRIC_OUT_QUERIES: &str = "tycho_net_out_queries";
23const METRIC_OUT_MESSAGES: &str = "tycho_net_out_messages";
24
25#[derive(Clone)]
26pub struct Peer {
27 connection: Connection,
28 config: Arc<NetworkConfig>,
29}
30
31impl Peer {
32 pub(crate) fn new(connection: Connection, config: Arc<NetworkConfig>) -> Self {
33 Self { connection, config }
34 }
35
36 pub fn peer_id(&self) -> &PeerId {
37 self.connection.peer_id()
38 }
39
40 pub async fn rpc(&self, request: Request) -> Result<Response> {
41 metrics::counter!(METRIC_OUT_QUERIES_TOTAL).increment(1);
42 let _gauge = GaugeGuard::increment(METRIC_OUT_QUERIES, 1);
43 let _histogram = HistogramGuard::begin(METRIC_OUT_QUERIES_TIME);
44
45 let (send_stream, recv_stream) = self.connection.open_bi().await?;
46 let mut send_stream = FramedWrite::new(send_stream, make_codec(&self.config));
47 let mut recv_stream = FramedRead::new(recv_stream, make_codec(&self.config));
48
49 send_request(&mut send_stream, request).await?;
50 send_stream.get_mut().finish()?;
51
52 recv_response(&mut recv_stream).await
53 }
54
55 pub async fn send_message(&self, request: Request) -> Result<()> {
56 metrics::counter!(METRIC_OUT_MESSAGES_TOTAL).increment(1);
57 let _gauge = GaugeGuard::increment(METRIC_OUT_MESSAGES, 1);
58 let _histogram = HistogramGuard::begin(METRIC_OUT_MESSAGES_TIME);
59
60 let send_stream = self.connection.open_uni().await?;
61 let mut send_stream = FramedWrite::new(send_stream, make_codec(&self.config));
62
63 send_request(&mut send_stream, request).await?;
64 send_stream.get_mut().finish()?;
65 _ = send_stream.get_mut().stopped().await;
66
67 Ok(())
68 }
69
70 pub fn send_datagram(&self, request: Bytes) -> Result<()> {
71 self.connection.send_datagram(request)?;
72 Ok(())
73 }
74}